22 #include "RConfigure.h" 
   35 #include <sys/types.h> 
   48 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \ 
   49     (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \ 
   50      (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3))) 
   52 #define lockf(fd, op, sz)   flock((fd), (op)) 
   54 #define F_LOCK             (LOCK_EX | LOCK_NB) 
   57 #define F_ULOCK             LOCK_UN 
   88 #include "compiledata.h" 
  112 TProofServ *gProofServ = 0;
 
  115 static volatile Int_t gProofServDebug = 1;
 
  118 Int_t TProofServ::fgLogToSysLog = 0;
 
  119 TString TProofServ::fgSysLogService(
"proof");
 
  120 TString TProofServ::fgSysLogEntity(
"undef:default");
 
  123 FILE *TProofServ::fgErrorHandlerFile = 0;
 
  128 static const char *__crashreporter_info__ = 0;
 
  129 asm(
".desc ___crashreporter_info__, 0x10");
 
  134 Int_t TProofServ::fgRecursive = 0;
 
  137 TString TProofServ::fgLastMsg(
"<undef>");
 
  138 Long64_t TProofServ::fgLastEntry = -1;
 
  141 Long_t TProofServ::fgVirtMemMax = -1;
 
  142 Long_t TProofServ::fgResMemMax = -1;
 
  143 Float_t TProofServ::fgMemHWM = 0.80;
 
  144 Float_t TProofServ::fgMemStop = 0.95;
 
  147 static void SendAsynMsg(
const char *msg) {
 
  148   if (gProofServ) gProofServ->SendAsynMessage(msg, kTRUE);
 
  154 class TProofServTerminationHandler : 
public TSignalHandler {
 
  157    TProofServTerminationHandler(TProofServ *s)
 
  158       : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
 
  165 Bool_t TProofServTerminationHandler::Notify()
 
  167    Printf(
"Received SIGTERM: terminating");
 
  168    fServ->HandleTermination();
 
  175 class TProofServInterruptHandler : 
public TSignalHandler {
 
  178    TProofServInterruptHandler(TProofServ *s)
 
  179       : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
 
  186 Bool_t TProofServInterruptHandler::Notify()
 
  188    fServ->HandleUrgentData();
 
  189    if (TROOT::Initialized()) {
 
  198 class TProofServSigPipeHandler : 
public TSignalHandler {
 
  201    TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
 
  209 Bool_t TProofServSigPipeHandler::Notify()
 
  211    fServ->HandleSigPipe();
 
  218 class TProofServInputHandler : 
public TFileHandler {
 
  221    TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
 
  224    Bool_t ReadNotify() { 
return Notify(); }
 
  230 Bool_t TProofServInputHandler::Notify()
 
  232    fServ->HandleSocketInput();
 
  236 TString TProofServLogHandler::fgPfx = 
""; 
 
  237 Int_t TProofServLogHandler::fgCmdRtn = 0; 
 
  242 TProofServLogHandler::TProofServLogHandler(
const char *cmd,
 
  243                                              TSocket *s, 
const char *pfx)
 
  244                      : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
 
  246    ResetBit(kFileIsPipe);
 
  250       fFile = gSystem->OpenPipe(cmd, 
"r");
 
  252          SetFd(fileno(fFile));
 
  259          Error(
"TProofServLogHandler", 
"executing command in pipe");
 
  263       Error(
"TProofServLogHandler",
 
  264             "undefined command (%p) or socket (%p)", (
int *)cmd, s);
 
  270 TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s, 
const char *pfx)
 
  271                      : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
 
  273    ResetBit(kFileIsPipe);
 
  278       SetFd(fileno(fFile));
 
  282       Error(
"TProofServLogHandler", 
"undefined file (%p) or socket (%p)", f, s);
 
  288 TProofServLogHandler::~TProofServLogHandler()
 
  290    if (TestBit(kFileIsPipe) && fFile) {
 
  291       Int_t rc = gSystem->ClosePipe(fFile);
 
  295       fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
 
  300    ResetBit(kFileIsPipe);
 
  305 Bool_t TProofServLogHandler::Notify()
 
  308       TMessage m(kPROOF_MESSAGE);
 
  312       while (fgets(line, 
sizeof(line), fFile)) {
 
  313          if ((plf = strchr(line, 
'\n')))
 
  317          if (fPfx.Length() > 0) {
 
  319             log.Form(
"%s: %s", fPfx.Data(), line);
 
  320          } 
else if (fgPfx.Length() > 0) {
 
  322             log.Form(
"%s: %s", fgPfx.Data(), line);
 
  328          m.Reset(kPROOF_MESSAGE);
 
  338 void TProofServLogHandler::SetDefaultPrefix(
const char *pfx)
 
  346 Int_t TProofServLogHandler::GetCmdRtn()
 
  354 TProofServLogHandlerGuard::TProofServLogHandlerGuard(
const char *cmd, TSocket *s,
 
  355                                                      const char *pfx, Bool_t on)
 
  359       fExecHandler = 
new TProofServLogHandler(cmd, s, pfx);
 
  360       if (fExecHandler->IsValid()) {
 
  361          gSystem->AddFileHandler(fExecHandler);
 
  363          Error(
"TProofServLogHandlerGuard",
"invalid handler");
 
  367          Error(
"TProofServLogHandlerGuard",
"undefined command");
 
  374 TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
 
  375                                                      const char *pfx, Bool_t on)
 
  379       fExecHandler = 
new TProofServLogHandler(f, s, pfx);
 
  380       if (fExecHandler->IsValid()) {
 
  381          gSystem->AddFileHandler(fExecHandler);
 
  383          Error(
"TProofServLogHandlerGuard",
"invalid handler");
 
  387          Error(
"TProofServLogHandlerGuard",
"undefined file");
 
  394 TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
 
  396    if (fExecHandler && fExecHandler->IsValid()) {
 
  397       gSystem->RemoveFileHandler(fExecHandler);
 
  398       SafeDelete(fExecHandler);
 
  406 TShutdownTimer::TShutdownTimer(TProofServ *p, Int_t delay)
 
  407                : TTimer(delay, kFALSE), fProofServ(p)
 
  409    fTimeout = gEnv->GetValue(
"ProofServ.ShutdownTimeout", 20);
 
  411    fTimeout = gEnv->GetValue(
"ProofServ.ShutdonwTimeout", fTimeout);
 
  418 Bool_t TShutdownTimer::Notify()
 
  421       printf(
"TShutdownTimer::Notify: checking activity on the input socket\n");
 
  425    if (fProofServ && (xs = fProofServ->GetSocket())) {
 
  427       TTimeStamp ts = xs->GetLastUsage();
 
  428       Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
 
  429                   (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
 
  430       if (dt > fTimeout * 60000) {
 
  431          printf(
"TShutdownTimer::Notify: input socket: %p: did not show any activity" 
  432                          " during the last %d mins: aborting\n", xs, fTimeout);
 
  438             printf(
"TShutdownTimer::Notify: input socket: %p: show activity" 
  439                    " %ld secs ago\n", xs, dt / 60000);
 
  451 TReaperTimer::~TReaperTimer()
 
  454       fChildren->SetOwner(kTRUE);
 
  463 void TReaperTimer::AddPid(Int_t pid)
 
  467          fChildren = 
new TList;
 
  469       spid.Form(
"%d", pid);
 
  470       fChildren->Add(
new TParameter<Int_t>(spid.Data(), pid));
 
  479 Bool_t TReaperTimer::Notify()
 
  482       TIter nxp(fChildren);
 
  483       TParameter<Int_t> *p = 0;
 
  484       while ((p = (TParameter<Int_t> *)nxp())) {
 
  489             pid = waitpid(p->GetVal(), &status, WNOHANG);
 
  490          } 
while (pid < 0 && errno == EINTR);
 
  493          pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
 
  495          if (pid > 0 && pid == p->GetVal()) {
 
  497             fChildren->Remove(p);
 
  504    if (!fChildren || fChildren->GetSize() <= 0) {
 
  517 Bool_t TIdleTOTimer::Notify()
 
  519    Info (
"Notify", 
"session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
 
  524       if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
 
  525          Warning(
"Notify", 
"problems updating session status (errno: %d)", -uss_rc);
 
  528       if (fProofServ->GetProtocol() < 29) {
 
  529          msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n" 
  530                   "// Please IGNORE any error message possibly displayed below\n//",
 
  531                   gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
 
  533          msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
 
  534                   gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
 
  536       fProofServ->SendAsynMessage(msg.Data());
 
  537       fProofServ->Terminate(0);
 
  541       Warning(
"Notify", 
"fProofServ undefined!");
 
  547 ClassImp(TProofServ);
 
  552    TApplication *GetTProofServ(Int_t *argc, 
char **argv, FILE *flog)
 
  553    { 
return new TProofServ(argc, argv, flog); }
 
  562 TProofServ::TProofServ(Int_t *argc, 
char **argv, FILE *flog)
 
  563        : TApplication(
"proofserv", argc, argv, 0, -1)
 
  566    Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
 
  568       Printf(
"proofserv: command line testing: OK");
 
  573    TString rcfile = gSystem->Getenv(
"ROOTRCFILE") ? gSystem->Getenv(
"ROOTRCFILE")
 
  575    if (!gSystem->AccessPathName(rcfile, kReadPermission))
 
  576       gEnv->ReadFile(rcfile, kEnvChange);
 
  579    fgVirtMemMax = gEnv->GetValue(
"Proof.VirtMemMax",-1);
 
  580    if (fgVirtMemMax < 0 && gSystem->Getenv(
"PROOF_VIRTMEMMAX")) {
 
  581       Long_t mmx = strtol(gSystem->Getenv(
"PROOF_VIRTMEMMAX"), 0, 10);
 
  582       if (mmx < kMaxLong && mmx > 0)
 
  583          fgVirtMemMax = mmx * 1024;
 
  586    if (fgVirtMemMax < 0 && gSystem->Getenv(
"ROOTPROOFASHARD")) {
 
  587       Long_t mmx = strtol(gSystem->Getenv(
"ROOTPROOFASHARD"), 0, 10);
 
  588       if (mmx < kMaxLong && mmx > 0)
 
  589          fgVirtMemMax = mmx * 1024;
 
  592    fgResMemMax = gEnv->GetValue(
"Proof.ResMemMax",-1);
 
  593    if (fgResMemMax < 0 && gSystem->Getenv(
"PROOF_RESMEMMAX")) {
 
  594       Long_t mmx = strtol(gSystem->Getenv(
"PROOF_RESMEMMAX"), 0, 10);
 
  595       if (mmx < kMaxLong && mmx > 0)
 
  596          fgResMemMax = mmx * 1024;
 
  599    fgMemStop = gEnv->GetValue(
"Proof.MemStop", 0.95);
 
  600    fgMemHWM = gEnv->GetValue(
"Proof.MemHWM", 0.80);
 
  601    if (fgVirtMemMax > 0 || fgResMemMax > 0) {
 
  602       if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
 
  603          Warning(
"TProofServ", 
"requested memory fraction threshold to stop processing" 
  604                                " (MemStop) out of range [0,1] - ignoring");
 
  607       if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
 
  608          Warning(
"TProofServ", 
"requested memory fraction threshold for warning and finer monitoring" 
  609                                " (MemHWM) out of range [0,MemStop] - ignoring");
 
  615    Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3], 
"test")) ? kTRUE : kFALSE;
 
  616    if ((gEnv->GetValue(
"Proof.GdbHook",0) == 3 && !test) ||
 
  617        (gEnv->GetValue(
"Proof.GdbHook",0) == 4 && test)) {
 
  618       while (gProofServDebug)
 
  623    if (argc && *argc >= 4)
 
  624       if (!strcmp(argv[3], 
"test"))
 
  625          fService = 
"prooftest";
 
  628    if (argc && *argc < 2) {
 
  629       Error(
"TProofServ", 
"Must have at least 1 arguments (see  proofd).");
 
  637    fSendLogToMaster = kFALSE;
 
  640    gErrorAbortLevel = kSysError + 1;
 
  641    SetErrorHandlerFile(stderr);
 
  642    SetErrorHandler(ErrorHandler);
 
  645    fGroupPriority   = 100;
 
  648    fOrdinal         = gEnv->GetValue(
"ProofServ.Ordinal", 
"-1");
 
  671    fWaitingQueries  = 
new TList;
 
  675    fQueuedMsg       = 
new TList;
 
  677    fRealTimeLog     = kFALSE;
 
  699    ResetBit(TProofServ::kHighMemory);
 
  702    fMsgSizeHWM = gEnv->GetValue(
"ProofServ.MsgSizeHWM", 1000000);
 
  705    fCompressMsg     = gEnv->GetValue(
"ProofServ.CompressMessage", 0);
 
  707    gProofDebugLevel = gEnv->GetValue(
"Proof.DebugLevel",0);
 
  708    fLogLevel = gProofDebugLevel;
 
  710    gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue(
"Proof.DebugMask",~0);
 
  711    if (gProofDebugLevel > 0)
 
  712       Info(
"TProofServ", 
"DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
 
  715    fLogFileMaxSize = -1;
 
  716    TString logmx = gEnv->GetValue(
"ProofServ.LogFileMaxSize", 
"");
 
  717    if (!logmx.IsNull()) {
 
  719       if (!logmx.IsDigit()) {
 
  720          if (logmx.EndsWith(
"K")) {
 
  722             logmx.Remove(TString::kTrailing, 
'K');
 
  723          } 
else if (logmx.EndsWith(
"M")) {
 
  725             logmx.Remove(TString::kTrailing, 
'M');
 
  726          } 
else if (logmx.EndsWith(
"G")) {
 
  728             logmx.Remove(TString::kTrailing, 
'G');
 
  731       if (logmx.IsDigit()) {
 
  732          fLogFileMaxSize = logmx.Atoi() * xf;
 
  733          if (fLogFileMaxSize > 0)
 
  734             Info(
"TProofServ", 
"keeping the log file size within %lld bytes", fLogFileMaxSize);
 
  736          logmx = gEnv->GetValue(
"ProofServ.LogFileMaxSize", 
"");
 
  737          Warning(
"TProofServ", 
"bad formatted log file size limit ignored: '%s'", logmx.Data());
 
  742    GetOptions(argc, argv);
 
  745    fPrefix = (IsMaster() ? 
"Mst-" : 
"Wrk-");
 
  746    if (test) fPrefix = 
"Test";
 
  747    if (fOrdinal != 
"-1")
 
  749    TProofServLogHandler::SetDefaultPrefix(fPrefix);
 
  752    TString slog = gEnv->GetValue(
"ProofServ.LogToSysLog", 
"");
 
  753    if (!(slog.IsNull())) {
 
  754       if (slog.IsDigit()) {
 
  755          fgLogToSysLog = slog.Atoi();
 
  757          char c = (slog[0] == 
'M' || slog[0] == 
'm') ? 
'm' : 
'a';
 
  758          c = (slog[0] == 
'W' || slog[0] == 
'w') ? 
'w' : c;
 
  759          Bool_t dosyslog = ((c == 
'm' && IsMaster()) ||
 
  760                             (c == 
'w' && !IsMaster()) || c == 
'a') ? kTRUE : kFALSE;
 
  763             if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
 
  764             if (fgLogToSysLog <= 0)
 
  765                Warning(
"TProofServ", 
"request for syslog logging ineffective!");
 
  770    if (fgLogToSysLog > 0) {
 
  771       fgSysLogService = (IsMaster()) ? 
"proofm" : 
"proofw";
 
  772       if (fOrdinal != 
"-1") fgSysLogService += TString::Format(
"-%s", fOrdinal.Data());
 
  773       gSystem->Openlog(fgSysLogService, kLogPid | kLogCons, kLogLocal5);
 
  779    Bool_t enableSchemaEvolution = gEnv->GetValue(
"Proof.SchemaEvolution",1);
 
  780    if (enableSchemaEvolution) {
 
  781       TMessage::EnableSchemaEvolutionForAll();
 
  783       Info(
"TProofServ", 
"automatic schema evolution in TMessage explicitly disabled");
 
  792 Int_t TProofServ::CreateServer()
 
  795    TString opensock = gSystem->Getenv(
"ROOTOPENSOCK");
 
  796    if (opensock.Length() <= 0)
 
  797       opensock = gEnv->GetValue(
"ProofServ.OpenSock", 
"-1");
 
  798    Int_t sock = opensock.Atoi();
 
  800       Fatal(
"CreateServer", 
"Invalid socket descriptor number (%d)", sock);
 
  803    fSocket = 
new TSocket(sock);
 
  806    fSocket->SetCompressionSettings(fCompressMsg);
 
  811       if (gEnv->GetValue(
"Proof.GdbHook",0) == 1) {
 
  812          while (gProofServDebug)
 
  817       if (gEnv->GetValue(
"Proof.GdbHook",0) == 2) {
 
  818          while (gProofServDebug)
 
  823    if (gProofDebugLevel > 0)
 
  824       Info(
"CreateServer", 
"Service %s ConfDir %s IsMaster %d\n",
 
  825            GetService(), GetConfDir(), (Int_t)fMasterServ);
 
  838    TString pfx = (IsMaster() ? 
"Mst-" : 
"Wrk-");
 
  840    TProofServLogHandler::SetDefaultPrefix(pfx);
 
  846       if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
 
  854       if ((fLogFileDes = fileno(fLogFile)) < 0) {
 
  864       if (CatMotd() == -1) {
 
  873    ProcessLine(
"#include <iostream>", kTRUE);
 
  874    ProcessLine(
"#include <string>",kTRUE); 
 
  882    logon = gEnv->GetValue(
"Proof.Load", (
char *)0);
 
  884       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
 
  886          ProcessLine(TString::Format(
".L %s", logon), kTRUE);
 
  891    logon = gEnv->GetValue(
"Proof.Logon", (
char *)0);
 
  892    if (logon && !NoLogOpt()) {
 
  893       char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
 
  900    gInterpreter->SaveContext();
 
  901    gInterpreter->SaveGlobalsContext();
 
  904    gSystem->AddSignalHandler(
new TProofServTerminationHandler(
this));
 
  905    gSystem->AddSignalHandler(
new TProofServInterruptHandler(
this));
 
  906    fInputHandler = 
new TProofServInputHandler(
this, sock);
 
  907    gSystem->AddFileHandler(fInputHandler);
 
  911       TString master = 
"proof://__master__";
 
  912       TInetAddress a = gSystem->GetSockName(sock);
 
  915          master += a.GetPort();
 
  919       TPluginManager *pm = gROOT->GetPluginManager();
 
  921          Error(
"CreateServer", 
"no plugin manager found");
 
  928       TPluginHandler *h = pm->FindHandler(
"TProof", fConfFile);
 
  930          Error(
"CreateServer", 
"no plugin found for TProof with a" 
  931                              " config file of '%s'", fConfFile.Data());
 
  938       if (h->LoadPlugin() == -1) {
 
  939          Error(
"CreateServer", 
"plugin for TProof could not be loaded");
 
  946       fProof = 
reinterpret_cast<TProof*
>(h->ExecPlugin(5, master.Data(),
 
  950       if (!fProof || !fProof->IsValid()) {
 
  951          Error(
"CreateServer", 
"plugin for TProof could not be executed");
 
  958       fEndMaster = fProof->IsEndMaster();
 
  964    if (!fShutdownTimer) {
 
  966       fShutdownTimer = 
new TShutdownTimer(
this, 300000);
 
  967       fShutdownTimer->Start(-1, kFALSE);
 
  972    if (fProtocol <= 17) {
 
  974       msg.Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n" 
  975                "         This may generate compatibility problems between streamed objects.\n" 
  976                "         The advise is to move to ROOT >= 5.21/02 .");
 
  977       SendAsynMessage(msg.Data());
 
  981    if (IsMaster() && !fIdleTOTimer) {
 
  983       Int_t idle_to = gEnv->GetValue(
"ProofServ.IdleTimeout", -1);
 
  985          fIdleTOTimer = 
new TIdleTOTimer(
this, idle_to * 1000);
 
  986          fIdleTOTimer->Start(-1, kTRUE);
 
  987          if (gProofDebugLevel > 0)
 
  988             Info(
"CreateServer", 
" idle timer started (%d secs)", idle_to);
 
  989       } 
else if (gProofDebugLevel > 0) {
 
  990          Info(
"CreateServer", 
" idle timer not started (no idle timeout requested)");
 
 1002 TProofServ::~TProofServ()
 
 1004    SafeDelete(fWaitingQueries);
 
 1005    SafeDelete(fSocket);
 
 1006    SafeDelete(fPackMgr);
 
 1007    SafeDelete(fCacheLock);
 
 1008    SafeDelete(fQueryLock);
 
 1009    SafeDelete(fDataSetManager);
 
 1010    SafeDelete(fDataSetStgRepo);
 
 1021 Int_t TProofServ::CatMotd()
 
 1025    Bool_t  show = kFALSE;
 
 1028    TString motdname(GetConfDir());
 
 1031    if (gSystem->Getenv(
"PROOFNOPROOF")) {
 
 1032       motdname = gSystem->Getenv(
"PROOFNOPROOF");
 
 1034       motdname += 
"/etc/proof/noproof";
 
 1036    if ((motd = fopen(motdname, 
"r"))) {
 
 1039       while ((c = getc(motd)) != EOF)
 
 1048    lastname = TString(GetWorkDir()) + 
"/.prooflast";
 
 1049    char *last = gSystem->ExpandPathName(lastname.Data());
 
 1051    Long_t id, flags, modtime, lasttime = 0;
 
 1052    if (gSystem->GetPathInfo(last, &
id, &size, &flags, &lasttime) == 1)
 
 1056    if (time(0) - lasttime > (time_t)86400)
 
 1061    if (gSystem->Getenv(
"PROOFMOTD")) {
 
 1062       motdname = gSystem->Getenv(
"PROOFMOTD");
 
 1064       motdname = GetConfDir();
 
 1065       motdname += 
"/etc/proof/motd";
 
 1067    if (gSystem->GetPathInfo(motdname, &
id, &size, &flags, &modtime) == 0) {
 
 1068       if (modtime > lasttime || show) {
 
 1069          if ((motd = fopen(motdname, 
"r"))) {
 
 1072             while ((c = getc(motd)) != EOF)
 
 1081       gSystem->Unlink(last);
 
 1082    Int_t fd = creat(last, 0600);
 
 1083    if (fd >= 0) close(fd);
 
 1094 TObject *TProofServ::Get(
const char *namecycle)
 
 1096    if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
 
 1097       Error(
"Get", 
"problems sending request");
 
 1098       return (TObject *)0;
 
 1103    Bool_t notdone = kTRUE;
 
 1106       if (fSocket->Recv(mess) < 0)
 
 1108       Int_t what = mess->What();
 
 1109       if (what == kMESS_OBJECT) {
 
 1110          idcur = mess->ReadObject(mess->GetClass());
 
 1113          Int_t xrc = HandleSocketInput(mess, kFALSE);
 
 1115             Error(
"Get", 
"command %d cannot be executed while processing", what);
 
 1116          } 
else if (xrc == -2) {
 
 1117             Error(
"Get", 
"unknown command %d ! Protocol error?", what);
 
 1129 void TProofServ::RestartComputeTime()
 
 1133       TProofProgressStatus *status = fPlayer->GetProgressStatus();
 
 1134       if (status) status->SetLearnTime(fCompute.RealTime());
 
 1135       Info(
"RestartComputeTime", 
"compute time restarted after %f secs (%d entries)",
 
 1136                                  fCompute.RealTime(), fPlayer->GetLearnEntries());
 
 1138    fCompute.Start(kFALSE);
 
 1144 TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
 
 1146    Long64_t bytesRead = 0;
 
 1148    if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
 
 1150    if (fCompute.Counter() > 0)
 
 1153    TMessage req(kPROOF_GETPACKET);
 
 1154    Double_t cputime = fCompute.CpuTime();
 
 1155    Double_t realtime = fCompute.RealTime();
 
 1157    if (fProtocol > 18) {
 
 1158       req << fLatency.RealTime();
 
 1159       TProofProgressStatus *status = 0;
 
 1161          fPlayer->UpdateProgressInfo();
 
 1162          status = fPlayer->GetProgressStatus();
 
 1164          Error(
"GetNextPacket", 
"no progress status object");
 
 1169       if (status->GetEntries() > 0) {
 
 1170          PDB(kLoop, 2) status->Print(GetOrdinal());
 
 1171          status->IncProcTime(realtime);
 
 1172          status->IncCPUTime(cputime);
 
 1175       if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
 
 1179       Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
 
 1180       Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
 
 1181       req << cacheSize << learnent;
 
 1186       req << totalEntries;
 
 1189       if (fProtocol > 34) req << fSaveOutput.RealTime();
 
 1192          PDB(kLoop, 2) status->Print();
 
 1193          Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
 
 1196       status->ResetBit(TProofProgressStatus::kFileNotOpen);
 
 1197       status->ResetBit(TProofProgressStatus::kFileCorrupted);
 
 1200       req << fLatency.RealTime() << realtime << cputime
 
 1201           << bytesRead << totalEntries;
 
 1203          req << fPlayer->GetEventsProcessed();
 
 1207    Int_t rc = fSocket->Send(req);
 
 1209       Error(
"GetNextPacket",
"Send() failed, returned %d", rc);
 
 1215       fSaveOutput.Start();
 
 1216       if (fPlayer->SavePartialResults(kFALSE) < 0)
 
 1217          Warning(
"GetNextPacket", 
"problems saving partial results");
 
 1221    TDSetElement  *e = 0;
 
 1222    Bool_t notdone = kTRUE;
 
 1226       if ((rc = fSocket->Recv(mess)) <= 0) {
 
 1228          Error(
"GetNextPacket",
"Recv() failed, returned %d", rc);
 
 1233       TString file, dir, obj;
 
 1235       Int_t what = mess->What();
 
 1238          case kPROOF_GETPACKET:
 
 1244                PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
 
 1245                                  e->GetFileName(), e->GetDirectory(),
 
 1246                                  e->GetObjName(), e->GetFirst(),e->GetNum());
 
 1248                PDB(kLoop, 2) Info("GetNextPacket", "Done");
 
 1253          case kPROOF_STOPPROCESS:
 
 1258             PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
 
 1262             xrc = HandleSocketInput(mess, kFALSE);
 
 1264                Error(
"GetNextPacket", 
"command %d cannot be executed while processing", what);
 
 1265             } 
else if (xrc == -2) {
 
 1266                Error(
"GetNextPacket", 
"unknown command %d ! Protocol error?", what);
 
 1283 void TProofServ::GetOptions(Int_t *argc, 
char **argv)
 
 1285    Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3], 
"test")) ? kTRUE : kFALSE;
 
 1288    if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
 
 1289       Printf(
"proofserv: command line testing: OK");
 
 1293    if (!argc || (argc && *argc <= 1)) {
 
 1294       Fatal(
"GetOptions", 
"Must be started from proofd with arguments");
 
 1298    if (!strcmp(argv[1], 
"proofserv")) {
 
 1299       fMasterServ = kTRUE;
 
 1301    } 
else if (!strcmp(argv[1], 
"proofslave")) {
 
 1302       fMasterServ = kFALSE;
 
 1303       fEndMaster = kFALSE;
 
 1305       Fatal(
"GetOptions", 
"Must be started as 'proofserv' or 'proofslave'");
 
 1312    if (!(gSystem->Getenv(
"ROOTCONFDIR"))) {
 
 1313       Fatal(
"GetOptions", 
"ROOTCONFDIR shell variable not set");
 
 1316    fConfDir = gSystem->Getenv(
"ROOTCONFDIR");
 
 1322 void TProofServ::HandleSocketInput()
 
 1325    TIdleTOTimerGuard itg(fIdleTOTimer);
 
 1327    Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
 
 1341       if (fSocket->Recv(mess) <= 0 || !mess) {
 
 1344          Error(
"HandleSocketInput", 
"retrieving message from input socket");
 
 1348       Int_t what = mess->What();
 
 1350          Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
 
 1354       if (fProof) fProof->SetActive();
 
 1356       Bool_t doit = kTRUE;
 
 1361          rc = HandleSocketInput(mess, all);
 
 1365                emsg.Form(
"HandleSocketInput: command %d cannot be executed while processing", what);
 
 1366             } 
else if (rc == -3) {
 
 1367                emsg.Form(
"HandleSocketInput: message %d undefined! Protocol error?", what);
 
 1369                emsg.Form(
"HandleSocketInput: unknown command %d! Protocol error?", what);
 
 1371             SendAsynMessage(emsg.Data());
 
 1372          } 
else if (rc == 2) {
 
 1374             fQueuedMsg->Add(mess);
 
 1376                Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
 
 1377                                           what, fQueuedMsg->GetSize());
 
 1383          if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
 
 1386                Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
 
 1387                                           what, fQueuedMsg->GetSize());
 
 1390             mess = (TMessage *) fQueuedMsg->First();
 
 1391             if (mess) fQueuedMsg->Remove(mess);
 
 1396    } catch (std::bad_alloc &) {
 
 1398       exmsg.Form(
"caught exception 'bad_alloc' (memory leak?) %s %lld",
 
 1399                  fgLastMsg.Data(), fgLastEntry);
 
 1400    } 
catch (std::exception &exc) {
 
 1402       exmsg.Form(
"caught standard exception '%s' %s %lld",
 
 1403                  exc.what(), fgLastMsg.Data(), fgLastEntry);
 
 1406       exmsg.Form(
"caught exception throwing %d %s %lld",
 
 1407                  i, fgLastMsg.Data(), fgLastEntry);
 
 1408    } 
catch (
const char *str) {
 
 1410       exmsg.Form(
"caught exception throwing '%s' %s %lld",
 
 1411                  str, fgLastMsg.Data(), fgLastEntry);
 
 1414       exmsg.Form(
"caught exception <unknown> %s %lld",
 
 1415                  fgLastMsg.Data(), fgLastEntry);
 
 1419    if (!exmsg.IsNull()) {
 
 1421       Error(
"HandleSocketInput", 
"%s", exmsg.Data());
 
 1423       SendAsynMessage(TString::Format(
"%s: %s", GetOrdinal(), exmsg.Data()));
 
 1430    if (TestBit(TProofServ::kHighMemory)) {
 
 1432       exmsg.Form(
"high-memory footprint detected during Process(...) - terminating");
 
 1433       Error(
"HandleSocketInput", 
"%s", exmsg.Data());
 
 1435       SendAsynMessage(TString::Format(
"%s: %s", GetOrdinal(), exmsg.Data()));
 
 1445       Bool_t masterOnly = gEnv->GetValue(
"Proof.MasterOnly", kFALSE);
 
 1446       Bool_t dynamicStartup = gEnv->GetValue(
"Proof.DynamicStartup", kFALSE);
 
 1447       Int_t ngwrks = fProof->GetListOfActiveSlaves()->GetSize() + fProof->GetListOfInactiveSlaves()->GetSize();
 
 1448       if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
 
 1449          SendAsynMessage(
" *** No workers left: cannot continue! Terminating ... *** ");
 
 1452       fProof->SetActive(kFALSE);
 
 1454       fProof->SetRunStatus(TProof::kRunning);
 
 1470 Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
 
 1472    static TStopwatch timer;
 
 1474    Bool_t aborted = kFALSE;
 
 1476    if (!mess) 
return -3;
 
 1478    Int_t what = mess->What();
 
 1480       Info("HandleSocketInput", "processing message type %d from '%s'",
 
 1481                                 what, fSocket->GetTitle());
 
 1485    Int_t rc = 0, lirc = 0;
 
 1487    TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
 
 1493             mess->ReadString(str, 
sizeof(str));
 
 1497             Bool_t hasfn = TProof::GetFileInCmd(str, fn);
 
 1499             if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
 
 1500                fProof->SendCommand(str);
 
 1503                   Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
 
 1507                   ocwd = gSystem->WorkingDirectory();
 
 1508                   gSystem->ChangeDirectory(fCacheDir.Data());
 
 1512                   gSystem->ChangeDirectory(ocwd);
 
 1513                   fCacheLock->Unlock();                 
 
 1522          if (pslb) slb = str;
 
 1527             mess->ReadString(str, 
sizeof(str));
 
 1535             mess->ReadObject(mess->GetClass());
 
 1541       case kPROOF_GROUPVIEW:
 
 1543             mess->ReadString(str, 
sizeof(str));
 
 1545             sscanf(str, 
"%d %d", &fGroupId, &fGroupSize);
 
 1551       case kPROOF_LOGLEVEL:
 
 1553             mess->ReadString(str, 
sizeof(str));
 
 1554             sscanf(str, 
"%d %u", &fLogLevel, &mask);
 
 1555             Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
 
 1556             gProofDebugLevel = fLogLevel;
 
 1557             gProofDebugMask  = (TProofDebug::EProofDebugMask) mask;
 
 1559                Info(
"HandleSocketInput:kPROOF_LOGLEVEL", 
"debug level set to %d (mask: 0x%x)",
 
 1560                     gProofDebugLevel, gProofDebugMask);
 
 1562                fProof->SetLogLevel(fLogLevel, mask);
 
 1574          mess->ReadString(str, 
sizeof(str));
 
 1582             mess->ReadString(str, 
sizeof(str));
 
 1590          Warning(
"HandleSocketInput:kPROOF_STATUS",
 
 1591                "kPROOF_STATUS message is obsolete");
 
 1592          if (fSocket->Send(fProof->GetParallel(), kPROOF_STATUS) < 0)
 
 1593             Warning(
"HandleSocketInput:kPROOF_STATUS", 
"problem sending of request");
 
 1596       case kPROOF_GETSTATS:
 
 1600       case kPROOF_GETPARALLEL:
 
 1610                   Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
 
 1611                if (fProof) fProof->TerminateWorker(ord);
 
 1614                   Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
 
 1622       case kPROOF_STOPPROCESS:
 
 1627             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
 
 1629             Long_t timeout = -1;
 
 1634                Info("HandleSocketInput:kPROOF_STOPPROCESS",
 
 1635                     "recursive mode: enter %d, %ld", aborted, timeout);
 
 1638                fProof->StopProcess(aborted, timeout);
 
 1642                   fPlayer->StopProcess(aborted, timeout);
 
 1646       case kPROOF_PROCESS:
 
 1648             TProofServLogHandlerGuard hg(fLogFile, fSocket, 
"", fRealTimeLog);
 
 1649             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
 
 1650             HandleProcess(mess, pslb);
 
 1657       case kPROOF_SENDOUTPUT:
 
 1659             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
 
 1660                                  "worker was asked to send output to master");
 
 1662             if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
 
 1663                Error(
"HandleSocketInput:kPROOF_SENDOUTPUT", 
"problems sending output list");
 
 1667             fSocket->Send(kPROOF_SETIDLE);
 
 1674       case kPROOF_QUERYLIST:
 
 1676             HandleQueryList(mess);
 
 1684             HandleRemove(mess, pslb);
 
 1690       case kPROOF_RETRIEVE:
 
 1692             HandleRetrieve(mess, pslb);
 
 1698       case kPROOF_ARCHIVE:
 
 1700             HandleArchive(mess, pslb);
 
 1706       case kPROOF_MAXQUERIES:
 
 1708                Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
 
 1709             TMessage m(kPROOF_MAXQUERIES);
 
 1717       case kPROOF_CLEANUPSESSION:
 
 1720                Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
 
 1723             if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
 
 1724                Printf(
"Session %s cleaned up", stag.Data());
 
 1726                Printf(
"Could not cleanup session %s", stag.Data());
 
 1735       case kPROOF_GETENTRIES:
 
 1736          {  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
 
 1740             TString        objname("undef");
 
 1741             Long64_t       entries = -1;
 
 1744                (*mess) >> isTree >> filename >> dir >> objname;
 
 1745                PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
 
 1746                                     "Report size of 
object %s (%s) in dir %s in file %s",
 
 1747                                     objname.Data(), isTree ? "T" : "O",
 
 1748                                     dir.Data(), filename.Data());
 
 1749                entries = TDSet::GetEntries(isTree, filename, dir, objname);
 
 1750                PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
 
 1751                                     "Found %lld %s", entries, isTree ? "entries" : "objects");
 
 1755             TMessage answ(kPROOF_GETENTRIES);
 
 1756             answ << entries << objname;
 
 1758             fSocket->Send(answ);
 
 1759             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
 
 1763       case kPROOF_CHECKFILE:
 
 1764          if (!all && fProtocol <= 19) {
 
 1769             HandleCheckFile(mess, pslb);
 
 1774       case kPROOF_SENDFILE:
 
 1775          if (!all && fProtocol <= 19) {
 
 1779             mess->ReadString(str, 
sizeof(str));
 
 1783             if (fProtocol > 5) {
 
 1784                sscanf(str, 
"%1023s %d %ld %d", name, &bin, &size, &fw);
 
 1786                sscanf(str, 
"%1023s %d %ld", name, &bin, &size);
 
 1789             Bool_t copytocache = kTRUE;
 
 1790             if (fnam.BeginsWith(
"cache:")) {
 
 1791                fnam.ReplaceAll(
"cache:", TString::Format(
"%s/", fCacheDir.Data()));
 
 1792                copytocache = kFALSE;
 
 1797                rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
 
 1800                if (!fnam.BeginsWith(fCacheDir.Data())) {
 
 1801                   fnam.Insert(0, TString::Format(
"%s/", fCacheDir.Data()));
 
 1806                if (copytocache && size > 0 && !fPackMgr->IsInDir(name))
 
 1807                   gSystem->Exec(TString::Format(
"%s %s %s", kCP, fnam.Data(), fCacheDir.Data()));
 
 1808                if (IsMaster() && fw == 1) {
 
 1809                   Int_t opt = TProof::kForward | TProof::kCp;
 
 1811                      opt |= TProof::kBinary;
 
 1813                      Info("HandleSocketInput","forwarding file: %s", fnam.Data());
 
 1814                   if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
 
 1815                      Error(
"HandleSocketInput", 
"forwarding file: %s", fnam.Data());
 
 1818                if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
 
 1826       case kPROOF_LOGFILE:
 
 1829             (*mess) >> start >> end;
 
 1831                Info("HandleSocketInput:kPROOF_LOGFILE",
 
 1832                     "Logfile request - byte range: %d - %d", start, end);
 
 1835             SendLogFile(0, start, end);
 
 1839       case kPROOF_PARALLEL:
 
 1843                Bool_t random = kFALSE;
 
 1845                if ((mess->BufferSize() > mess->Length()))
 
 1847                if (fProof) fProof->SetParallel(nodes, random);
 
 1858          if (!all && fProtocol <= 19) {
 
 1862             TProofServLogHandlerGuard hg(fLogFile, fSocket, 
"", fRealTimeLog);
 
 1863             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
 
 1864             Int_t hcrc = HandleCache(mess, pslb);
 
 1870       case kPROOF_WORKERLISTS:
 
 1874                   wlrc = HandleWorkerLists(mess);
 
 1876                   Warning(
"HandleSocketInput:kPROOF_WORKERLISTS",
 
 1877                         "Action meaning-less on worker nodes: protocol error?");
 
 1886       case kPROOF_GETSLAVEINFO:
 
 1888             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
 
 1893                if (fProof->UseDynamicStartup()) {
 
 1897                   TList* workerList = 
new TList();
 
 1898                   EQueryAction retVal = GetWorkers(workerList, pc);
 
 1899                   if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
 
 1900                      Int_t ret = fProof->AddWorkers(workerList);
 
 1902                         Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
 
 1903                               "adding a list of worker nodes returned: %d", ret);
 
 1906                      Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
 
 1907                            "getting list of worker nodes returned: %d", retVal);
 
 1912                   TList *info = fProof->GetListOfSlaveInfos();
 
 1913                   TMessage answ(kPROOF_GETSLAVEINFO);
 
 1915                   fSocket->Send(answ);
 
 1917                   if (IsMaster() && fProof->UseDynamicStartup()) fProof->RemoveWorkers(0);
 
 1920                TMessage answ(kPROOF_GETSLAVEINFO);
 
 1921                TList *info = 
new TList;
 
 1922                TSlaveInfo *wi = 
new TSlaveInfo(GetOrdinal(), TUrl(gSystem->HostName()).GetHostFQDN(), 0, 
"", GetDataDir());
 
 1924                gSystem->GetSysInfo(&si);
 
 1927                answ << (TList *)info;
 
 1928                fSocket->Send(answ);
 
 1929                info->SetOwner(kTRUE);
 
 1933             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
 
 1935             TMessage answ(kPROOF_GETSLAVEINFO);
 
 1937             fSocket->Send(answ);
 
 1942       case kPROOF_GETTREEHEADER:
 
 1944             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
 
 1946             TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
 
 1948                p->HandleGetTreeHeader(mess);
 
 1951                Error(
"HandleSocketInput:kPROOF_GETTREEHEADER", 
"could not create TProofPlayer instance!");
 
 1954             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
 
 1956             TMessage answ(kPROOF_GETTREEHEADER);
 
 1957             answ << TString(
"Failed") << (TObject *)0;
 
 1958             fSocket->Send(answ);
 
 1963       case kPROOF_GETOUTPUTLIST:
 
 1964          {  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
 
 1965             TList* outputList = 0;
 
 1967                outputList = fProof->GetOutputList();
 
 1969                   outputList = 
new TList();
 
 1971                outputList = 
new TList();
 
 1972                if (fProof->GetPlayer()) {
 
 1973                   TList *olist = fProof->GetPlayer()->GetOutputList();
 
 1976                   while ( (o = next()) ) {
 
 1977                      outputList->Add(
new TNamed(o->GetName(), 
""));
 
 1981             outputList->SetOwner();
 
 1982             TMessage answ(kPROOF_GETOUTPUTLIST);
 
 1984             fSocket->Send(answ);
 
 1986             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
 
 1990       case kPROOF_VALIDATE_DSET:
 
 1993                Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
 
 1998             if (IsMaster()) fProof->ValidateDSet(dset);
 
 1999             else dset->Validate();
 
 2001             TMessage answ(kPROOF_VALIDATE_DSET);
 
 2003             fSocket->Send(answ);
 
 2006                Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
 
 2014       case kPROOF_DATA_READY:
 
 2016             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
 
 2017             TMessage answ(kPROOF_DATA_READY);
 
 2019                Long64_t totalbytes = 0, bytesready = 0;
 
 2020                Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
 
 2021                answ << dataready << totalbytes << bytesready;
 
 2023                Error(
"HandleSocketInput:kPROOF_DATA_READY",
 
 2024                      "This message should not be sent to slaves");
 
 2025                answ << kFALSE << Long64_t(0) << Long64_t(0);
 
 2027             fSocket->Send(answ);
 
 2028             PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
 
 2030             TMessage answ(kPROOF_DATA_READY);
 
 2031             answ << kFALSE << Long64_t(0) << Long64_t(0);
 
 2032             fSocket->Send(answ);
 
 2039       case kPROOF_DATASETS:
 
 2041             if (fProtocol > 16) {
 
 2042                dsrc = HandleDataSets(mess, pslb);
 
 2044                Error(
"HandleSocketInput", 
"old client: no or incompatible dataset support");
 
 2050       case kPROOF_SUBMERGER:
 
 2051          {  HandleSubmerger(mess);
 
 2055       case kPROOF_LIB_INC_PATH:
 
 2057             lirc = HandleLibIncPath(mess);
 
 2062          if (lirc > 0) SendLogFile();
 
 2065       case kPROOF_REALTIMELOG:
 
 2069                Info("HandleSocketInput:kPROOF_REALTIMELOG",
 
 2070                     "setting real-time logging %s", (on ? "ON" : "OFF"));
 
 2074                fProof->SetRealTimeLog(on);
 
 2088       case kPROOF_STARTPROCESS:
 
 2092             if (WaitingQueries() == 0) {
 
 2093                Error(
"HandleSocketInput", 
"no queries enqueued");
 
 2099             TList *workerList = (fProof->UseDynamicStartup()) ? 
new TList : (TList *)0;
 
 2101             EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
 
 2103             if (retVal == TProofServ::kQueryOK) {
 
 2105                if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
 
 2106                   Error(
"HandleSocketInput", 
"adding a list of worker nodes returned: %d", ret);
 
 2112                   TMessage m(kPROOF_SETIDLE);
 
 2113                   Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
 
 2118                if (retVal == TProofServ::kQueryStop) {
 
 2119                   Error(
"HandleSocketInput", 
"error getting list of worker nodes");
 
 2120                } 
else if (retVal != TProofServ::kQueryEnqueued) {
 
 2121                   Warning(
"HandleSocketInput", 
"query was re-queued!");
 
 2123                   Error(
"HandleSocketInput", 
"unexpected answer: %d", retVal);
 
 2131       case kPROOF_GOASYNC:
 
 2135             if (!IsIdle() && fPlayer) {
 
 2137                TProofQueryResult *pq = (TProofQueryResult *) fPlayer->GetCurrentQuery();
 
 2138                TMessage m(kPROOF_QUERYSUBMITTED);
 
 2139                m << pq->GetSeqNum() << kFALSE;
 
 2143                SendAsynMessage(
"Processing request to go asynchronous:" 
 2144                                " idle or undefined player - ignoring");
 
 2155             TObject *obj = mess->ReadObject(0x0);  
 
 2160                if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
 
 2165             TMessage rmsg(kPROOF_MESSAGE);
 
 2168             if (obj->InheritsFrom(TObjString::Class())) {
 
 2170                smsg.Form(
"Echo response from %s:%s: %s",
 
 2171                   gSystem->HostName(), GetOrdinal(),
 
 2172                   ((TObjString *)obj)->String().Data());
 
 2178                TString tmpfn = 
"echo-out-";
 
 2179                FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
 
 2180                if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
 
 2181                   Error(
"HandleSocketInput", 
"Can't redirect output");
 
 2184                      gSystem->Unlink(tmpfn);
 
 2192                gSystem->RedirectOutput(0x0);  
 
 2196                smsg.Form(
"*** Echo response from %s:%s ***\n",
 
 2197                   gSystem->HostName(), GetOrdinal());
 
 2198                TMacro *fr = 
new TMacro();
 
 2199                fr->ReadFile(tmpfn);
 
 2200                TIter nextLine(fr->GetListOfLines());
 
 2202                while (( line = (TObjString *)nextLine() )) {
 
 2203                   smsg.Append( line->String() );
 
 2208                gSystem->Unlink(tmpfn);
 
 2213             GetSocket()->Send(rmsg);
 
 2219          Error(
"HandleSocketInput", 
"unknown command %d", what);
 
 2224    fRealTime += (Float_t)timer.RealTime();
 
 2225    fCpuTime  += (Float_t)timer.CpuTime();
 
 2227    if (!(slb.IsNull()) || fgLogToSysLog > 1) {
 
 2229       s.Form(
"%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
 
 2230                                    what, timer.RealTime(), timer.CpuTime(), slb.Data());
 
 2231       gSystem->Syslog(kLogNotice, s.Data());
 
 2241 Bool_t TProofServ::AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
 
 2243    TMessage *mess = 
new TMessage();
 
 2244    Int_t mergedWorkers = 0;
 
 2246    PDB(kSubmerger, 1)  Info("AcceptResults", "enter");
 
 2249    Bool_t result = kTRUE;
 
 2251    fMergingMonitor = new TMonitor();
 
 2252    fMergingMonitor->Add(fMergingSocket);
 
 2254    Int_t numworkers = 0;
 
 2255    while (fMergingMonitor->GetActive() > 0 && mergedWorkers <  connections) {
 
 2257       TSocket *s = fMergingMonitor->Select();
 
 2259          Info(
"AcceptResults", 
"interrupt!");
 
 2264       if (s == fMergingSocket) {
 
 2266          TSocket *sw = fMergingSocket->Accept();
 
 2267          if (sw && sw != (TSocket *)(-1)) {
 
 2268             fMergingMonitor->Add(sw);
 
 2271                Info("AcceptResults", "connection from a worker accepted on merger %s ",
 
 2274             if (++numworkers >= connections)
 
 2275                fMergingMonitor->Remove(fMergingSocket);
 
 2278                Info("AcceptResults", "spurious signal found of merging socket");
 
 2281          if (s->Recv(mess) < 0) {
 
 2282             Error(
"AcceptResults", 
"problems receiving message");
 
 2286             Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
 
 2288             Error(
"AcceptResults", 
"message received: %p ", mess);
 
 2294          while ((mess->BufferSize() > mess->Length())) {
 
 2297             PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
 
 2301                   Info("AcceptResults",
 
 2302                        "a new worker has been mergerd. Total merged workers: %d",
 
 2305             TObject *o = mess->ReadObject(TObject::Class());
 
 2306             if (mergerPlayer->AddOutputObject(o) == 1) {
 
 2308                PDB(kSubmerger, 2)  Info("AcceptResults", "removing %p (has been merged)", o);
 
 2311                PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
 
 2315    fMergingMonitor->DeActivateAll();
 
 2317    TList* sockets = fMergingMonitor->GetListOfDeActives();
 
 2318    Int_t size = sockets->GetSize();
 
 2319    for (Int_t i =0; i< size; ++i){
 
 2320       ((TSocket*)(sockets->At(i)))->Close();
 
 2321       PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
 
 2322       delete ((TSocket*)(sockets->At(i)));
 
 2325    fMergingMonitor->RemoveAll();
 
 2326    SafeDelete(fMergingMonitor);
 
 2328    PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
 
 2335 void TProofServ::HandleUrgentData()
 
 2338    Int_t n, nch, wasted = 0;
 
 2340    const Int_t kBufSize = 1024;
 
 2341    char waste[kBufSize];
 
 2344    TProofServLogHandlerGuard hg(fLogFile, fSocket, 
"", fRealTimeLog);
 
 2347       Info("HandleUrgentData", "handling oob...");
 
 2350    while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
 
 2362          fSocket->GetOption(kBytesToRead, nch);
 
 2364             gSystem->Sleep(1000);
 
 2368          if (nch > kBufSize) nch = kBufSize;
 
 2369          n = fSocket->RecvRaw(waste, nch);
 
 2371             Error(
"HandleUrgentData", 
"error receiving waste");
 
 2376          Error(
"HandleUrgentData", 
"error receiving OOB");
 
 2382       Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
 
 2384    if (fProof) fProof->SetActive();
 
 2388       case TProof::kHardInterrupt:
 
 2389          Info(
"HandleUrgentData", 
"*** Hard Interrupt");
 
 2393             fProof->Interrupt(TProof::kHardInterrupt);
 
 2399             fSocket->GetOption(kAtMark, atmark);
 
 2404                n = fSocket->SendRaw(&oob_byte, 1, kOob);
 
 2406                   Error(
"HandleUrgentData", 
"error sending OOB");
 
 2411             fSocket->GetOption(kBytesToRead, nch);
 
 2413                gSystem->Sleep(1000);
 
 2417             if (nch > kBufSize) nch = kBufSize;
 
 2418             n = fSocket->RecvRaw(waste, nch);
 
 2420                Error(
"HandleUrgentData", 
"error receiving waste (2)");
 
 2429       case TProof::kSoftInterrupt:
 
 2430          Info(
"HandleUrgentData", 
"Soft Interrupt");
 
 2434             fProof->Interrupt(TProof::kSoftInterrupt);
 
 2437             Error(
"HandleUrgentData", 
"soft interrupt flushed stream");
 
 2447       case TProof::kShutdownInterrupt:
 
 2448          Info(
"HandleUrgentData", 
"Shutdown Interrupt");
 
 2452             fProof->Interrupt(TProof::kShutdownInterrupt);
 
 2459          Error(
"HandleUrgentData", 
"unexpected OOB byte");
 
 2463    if (fProof) fProof->SetActive(kFALSE);
 
 2470 void TProofServ::HandleSigPipe()
 
 2473    TProofServLogHandlerGuard hg(fLogFile, fSocket, 
"", fRealTimeLog);
 
 2478       if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
 
 2479          Info(
"HandleSigPipe", 
"keepAlive probe failed");
 
 2482          fProof->SetActive();
 
 2483          fProof->Interrupt(TProof::kShutdownInterrupt);
 
 2484          fProof->SetActive(kFALSE);
 
 2488       Info(
"HandleSigPipe", 
"keepAlive probe failed");
 
 2496 Bool_t TProofServ::IsParallel()
 const 
 2498    if (IsMaster() && fProof)
 
 2499       return fProof->IsParallel() || fProof->UseDynamicStartup() ;
 
 2508 void TProofServ::Print(Option_t *option)
 const 
 2510    if (IsMaster() && fProof)
 
 2511       fProof->Print(option);
 
 2513       Printf(
"This is worker %s", gSystem->HostName());
 
 2520 void TProofServ::RedirectOutput(
const char *dir, 
const char *mode)
 
 2524    TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
 
 2526       snprintf(logfile, 512, 
"%s/master-%s.log", sdir.Data(), fOrdinal.Data());
 
 2528       snprintf(logfile, 512, 
"%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
 
 2531    if ((freopen(logfile, mode, stdout)) == 0)
 
 2532       SysError(
"RedirectOutput", 
"could not freopen stdout (%s)", logfile);
 
 2534    if ((dup2(fileno(stdout), fileno(stderr))) < 0)
 
 2535       SysError(
"RedirectOutput", 
"could not redirect stderr");
 
 2537    if ((fLogFile = fopen(logfile, 
"r")) == 0)
 
 2538       SysError(
"RedirectOutput", 
"could not open logfile '%s'", logfile);
 
 2541    if (fProtocol < 4 && fWorkDir != TString::Format(
"~/%s", kPROOF_WorkDir)) {
 
 2542       Warning(
"RedirectOutput", 
"no way to tell master (or client) where" 
 2543               " to upload packages");
 
 2550 void TProofServ::Reset(
const char *dir)
 
 2555    if (!dd.BeginsWith(
"proofserv")) {
 
 2556       Int_t ic = dd.Index(
":");
 
 2558          dd.Replace(0, ic, 
"proofserv");
 
 2560    gDirectory->cd(dd.Data());
 
 2567    if (gDirectory != gROOT) {
 
 2568       gDirectory->Delete();
 
 2571    if (IsMaster()) fProof->SendCurrentState();
 
 2580 Int_t TProofServ::ReceiveFile(
const char *file, Bool_t bin, Long64_t size)
 
 2582    if (size <= 0) 
return 0;
 
 2585    Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
 
 2587       SysError(
"ReceiveFile", 
"error opening file %s", file);
 
 2591    const Int_t kMAXBUF = 16384;  
 
 2592    char buf[kMAXBUF], cpy[kMAXBUF];
 
 2595    Long64_t filesize = 0;
 
 2597    while (filesize < size) {
 
 2598       left = Int_t(size - filesize);
 
 2601       r = fSocket->RecvRaw(&buf, left);
 
 2610                Int_t k = 0, i = 0, j = 0;
 
 2617                   cpy[j++] = buf[i++];
 
 2621                w = write(fd, q, r);
 
 2623                w = write(fd, p, r);
 
 2627                SysError(
"ReceiveFile", 
"error writing to file %s", file);
 
 2635          Error(
"ReceiveFile", 
"error during receiving file %s", file);
 
 2643    if (chmod(file, 0644) != 0)
 
 2644       Warning(
"ReceiveFile", 
"error setting mode 0644 on file %s", file);
 
 2652 void TProofServ::Run(Bool_t retrn)
 
 2655    if (CreateServer() == 0) {
 
 2658       TApplication::Run(retrn);
 
 2667 void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
 
 2675       if (!fSendLogToMaster) {
 
 2679          LogToMaster(kFALSE);
 
 2683    off_t ltot=0, lnow=0;
 
 2685    Bool_t adhoc = kFALSE;
 
 2687    if (fLogFileDes > -1) {
 
 2688       ltot = lseek(fileno(stdout),   (off_t) 0, SEEK_END);
 
 2689       lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
 
 2691       if (ltot >= 0 && lnow >= 0) {
 
 2693             lseek(fLogFileDes, (off_t) start, SEEK_SET);
 
 2694             if (end <= start || end > ltot)
 
 2696             left = (Int_t)(end - start);
 
 2701             left = (Int_t)(ltot - lnow);
 
 2707       if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
 
 2708          SysError(
"SendLogFile", 
"error sending kPROOF_LOGFILE");
 
 2712       const Int_t kMAXBUF = 32768;  
 
 2714       Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
 
 2717          while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
 
 2718                 TSystem::GetErrno() == EINTR)
 
 2719             TSystem::ResetErrno();
 
 2722             SysError(
"SendLogFile", 
"error reading log file");
 
 2726          if (end == ltot && len == wanted)
 
 2729          if (fSocket->SendRaw(buf, len) < 0) {
 
 2730             SysError(
"SendLogFile", 
"error sending log file");
 
 2736          wanted = (left > kMAXBUF) ? kMAXBUF : left;
 
 2738       } 
while (len > 0 && left > 0);
 
 2742    if (adhoc && lnow >=0 )
 
 2743       lseek(fLogFileDes, lnow, SEEK_SET);
 
 2745    TMessage mess(kPROOF_LOGDONE);
 
 2747       mess << status << (fProof ? fProof->GetParallel() : 0);
 
 2749       mess << status << (Int_t) 1;
 
 2751    if (fSocket->Send(mess) < 0) {
 
 2752       SysError(
"SendLogFile", 
"error sending kPROOF_LOGDONE");
 
 2756    PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
 
 2762 void TProofServ::SendStatistics()
 
 2764    Long64_t bytesread = TFile::GetFileBytesRead();
 
 2765    Float_t cputime = fCpuTime, realtime = fRealTime;
 
 2767       bytesread = fProof->GetBytesRead();
 
 2768       cputime = fProof->GetCpuTime();
 
 2771    TMessage mess(kPROOF_GETSTATS);
 
 2772    TString workdir = gSystem->WorkingDirectory();  
 
 2773    mess << bytesread << realtime << cputime << workdir;
 
 2774    if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
 
 2775    mess << TString(gProofServ->GetImage());
 
 2776    fSocket->Send(mess);
 
 2782 void TProofServ::SendParallel(Bool_t async)
 
 2784    Int_t nparallel = 0;
 
 2787          Info("SendParallel", "Will invoke AskParallel()");
 
 2788       fProof->AskParallel();
 
 2790          Info("SendParallel", "Will invoke GetParallel()");
 
 2791       nparallel = fProof->GetParallel();
 
 2796    TMessage mess(kPROOF_GETPARALLEL);
 
 2797    mess << nparallel << async;
 
 2798    fSocket->Send(mess);
 
 2805 Int_t TProofServ::Setup()
 
 2810       snprintf(str, 512, 
"**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
 
 2812       snprintf(str, 512, 
"**** PROOF slave server @ %s started ****", gSystem->HostName());
 
 2815    if (fSocket->Send(str) != 1+
static_cast<Int_t
>(strlen(str))) {
 
 2816       Error(
"Setup", 
"failed to send proof server startup message");
 
 2823    if (fSocket->Recv(fProtocol, what) != 2*
sizeof(Int_t)) {
 
 2824       Error(
"Setup", 
"failed to receive remote proof protocol");
 
 2827    if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*
sizeof(Int_t)) {
 
 2828       Error(
"Setup", 
"failed to send local proof protocol");
 
 2833    if (fProtocol < 5) {
 
 2835       if (OldAuthSetup(wconf) != 0) {
 
 2836          Error(
"Setup", 
"OldAuthSetup: failed to setup authentication");
 
 2841          fWorkDir.Form(
"~/%s", kPROOF_WorkDir);
 
 2843          if (fProtocol < 4) {
 
 2844             fWorkDir.Form(
"~/%s", kPROOF_WorkDir);
 
 2847             if (fWorkDir.IsNull()) fWorkDir.Form(
"~/%s", kPROOF_WorkDir);
 
 2854       if ((fSocket->Recv(mess) <= 0) || !mess) {
 
 2855          Error(
"Setup", 
"failed to receive ordinal and config info");
 
 2859          (*mess) >> fUser >> fOrdinal >> fConfFile;
 
 2860          fWorkDir = gEnv->GetValue(
"ProofServ.Sandbox", TString::Format(
"~/%s", kPROOF_WorkDir));
 
 2862          (*mess) >> fUser >> fOrdinal >> fWorkDir;
 
 2863          if (fWorkDir.IsNull())
 
 2864             fWorkDir = gEnv->GetValue(
"ProofServ.Sandbox", TString::Format(
"~/%s", kPROOF_WorkDir));
 
 2867       if (fOrdinal != 
"-1")
 
 2868          fPrefix += fOrdinal;
 
 2869       TProofServLogHandler::SetDefaultPrefix(fPrefix);
 
 2876       TString conffile = fConfFile;
 
 2877       conffile.Remove(0, 1 + conffile.Index(
":"));
 
 2880       TProofResourcesStatic resources(fConfDir, conffile);
 
 2881       if (resources.IsValid()) {
 
 2882          if (resources.GetMaster()) {
 
 2883             TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
 
 2884             if (tmpWorkDir != 
"")
 
 2885                fWorkDir = tmpWorkDir;
 
 2888          Info(
"Setup", 
"invalid config file %s (missing or unreadable",
 
 2889                         resources.GetFileName().Data());
 
 2895    gSystem->Setenv(
"HOME", gSystem->HomeDirectory());
 
 2898    if (fWorkDir.BeginsWith(
"/") &&
 
 2899       !fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
 
 2900       if (!fWorkDir.EndsWith(
"/"))
 
 2902       UserGroup_t *u = gSystem->GetUserInfo();
 
 2904          fWorkDir += u->fUser;
 
 2910    char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
 
 2913    if (gProofDebugLevel > 0)
 
 2914       Info(
"Setup", 
"working directory set to %s", fWorkDir.Data());
 
 2917    TString host = gSystem->HostName();
 
 2918    if (host.Index(
".") != kNPOS)
 
 2919       host.Remove(host.Index(
"."));
 
 2922    fSessionTag.Form(
"%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
 
 2923                     (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
 
 2924    fTopSessionTag = fSessionTag;
 
 2927    fSessionDir = fWorkDir;
 
 2929       fSessionDir += 
"/master-";
 
 2931       fSessionDir += 
"/slave-";
 
 2932    fSessionDir += fSessionTag;
 
 2935    if (SetupCommon() != 0) {
 
 2936       Error(
"Setup", 
"common setup failed");
 
 2941    fSocket->SetOption(kProcessGroup, gSystem->GetPid());
 
 2944    fSocket->SetOption(kNoDelay, 1);
 
 2947    fSocket->SetOption(kKeepAlive, 1);
 
 2957 Int_t TProofServ::SetupCommon()
 
 2960    gSystem->Umask(022);
 
 2964    TString path(gSystem->Getenv(
"PATH"));
 
 2965    TString bindir(TROOT::GetBinDir());
 
 2968    TString paths = gEnv->GetValue(
"ProofServ.BinPaths", 
"");
 
 2969    if (paths.Length() > 0) {
 
 2971       if (paths.Contains(
"^<compiler>"))
 
 2973       else if (paths.Contains(
"<compiler>"))
 
 2977          TString compiler = COMPILER;
 
 2978          if (compiler.Index(
"is ") != kNPOS)
 
 2979             compiler.Remove(0, compiler.Index(
"is ") + 3);
 
 2980          compiler = gSystem->DirName(compiler);
 
 2982             if (!bindir.IsNull()) bindir += 
":";
 
 2984          } 
else if (icomp == -1) {
 
 2985             if (!path.IsNull()) path += 
":";
 
 2991       if (paths.Contains(
"^<sysbin>"))
 
 2993       else if (paths.Contains(
"<sysbin>"))
 
 2997             if (!bindir.IsNull()) bindir += 
":";
 
 2998             bindir += 
"/bin:/usr/bin:/usr/local/bin";
 
 2999          } 
else if (isysb == -1) {
 
 3000             if (!path.IsNull()) path += 
":";
 
 3001             path += 
"/bin:/usr/bin:/usr/local/bin";
 
 3006    if (!bindir.IsNull()) bindir += 
":";
 
 3007    path.Insert(0, bindir);
 
 3008    gSystem->Setenv(
"PATH", path);
 
 3011    if (gSystem->AccessPathName(fWorkDir)) {
 
 3012       gSystem->mkdir(fWorkDir, kTRUE);
 
 3013       if (!gSystem->ChangeDirectory(fWorkDir)) {
 
 3014          Error(
"SetupCommon", 
"can not change to PROOF directory %s",
 
 3019       if (!gSystem->ChangeDirectory(fWorkDir)) {
 
 3020          gSystem->Unlink(fWorkDir);
 
 3021          gSystem->mkdir(fWorkDir, kTRUE);
 
 3022          if (!gSystem->ChangeDirectory(fWorkDir)) {
 
 3023             Error(
"SetupCommon", 
"can not change to PROOF directory %s",
 
 3031    fGroup = gEnv->GetValue(
"ProofServ.ProofGroup", 
"default");
 
 3034    fCacheDir = gEnv->GetValue(
"ProofServ.CacheDir",
 
 3035                                TString::Format(
"%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
 
 3036    ResolveKeywords(fCacheDir);
 
 3037    if (gSystem->AccessPathName(fCacheDir))
 
 3038       gSystem->mkdir(fCacheDir, kTRUE);
 
 3039    if (gProofDebugLevel > 0)
 
 3040       Info(
"SetupCommon", 
"cache directory set to %s", fCacheDir.Data());
 
 3042       new TProofLockPath(TString::Format(
"%s/%s%s",
 
 3043                          gSystem->TempDirectory(), kPROOF_CacheLockFile,
 
 3044                          TString(fCacheDir).ReplaceAll(
"/",
"%").Data()));
 
 3046    TProof::AssertMacroPath(TString::Format(
"%s/.", fCacheDir.Data()));
 
 3049    TString packdir = gEnv->GetValue(
"ProofServ.PackageDir",
 
 3050                                     TString::Format(
"%s/%s", fWorkDir.Data(), kPROOF_PackDir));
 
 3051    ResolveKeywords(packdir);
 
 3052    if (gSystem->AccessPathName(packdir))
 
 3053       gSystem->mkdir(packdir, kTRUE);
 
 3054    fPackMgr = 
new TPackMgr(packdir);
 
 3055    fPackMgr->SetLogger(SendAsynMsg);
 
 3058    const char *k = (IsMaster()) ? 
"Mst" : 
"Wrk";
 
 3059    noth.Form(
"%s-%s", k, fOrdinal.Data());
 
 3060    fPackMgr->SetPrefix(noth.Data());
 
 3061    if (gProofDebugLevel > 0)
 
 3062       Info(
"SetupCommon", 
"package directory set to %s", packdir.Data());
 
 3065    fDataDir = gEnv->GetValue(
"ProofServ.DataDir",
"");
 
 3066    Ssiz_t isep = kNPOS;
 
 3067    if (fDataDir.IsNull()) {
 
 3069       fDataDir.Form(
"%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
 
 3070    } 
else if ((isep = fDataDir.Last(
' ')) !=  kNPOS) {
 
 3071       fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
 
 3072       fDataDir.Remove(isep);
 
 3074    ResolveKeywords(fDataDir);
 
 3075    if (gSystem->AccessPathName(fDataDir))
 
 3076       if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
 
 3077          Warning(
"SetupCommon", 
"problems creating path '%s' (errno: %d)",
 
 3078                                 fDataDir.Data(), TSystem::GetErrno());
 
 3080    if (gProofDebugLevel > 0)
 
 3081       Info(
"SetupCommon", 
"data directory set to %s", fDataDir.Data());
 
 3085    TString dataDirOpts = gEnv->GetValue(
"ProofServ.DataDirOpts",
"");
 
 3086    if (!dataDirOpts.IsNull()) {
 
 3088       Bool_t doit = kTRUE;
 
 3089       if ((IsMaster() && !dataDirOpts.Contains(
"M")) ||
 
 3090          (!IsMaster() && !dataDirOpts.Contains(
"W"))) doit = kFALSE;
 
 3094          if (dataDirOpts.Contains(
"g")) m = 0775;
 
 3095          if (dataDirOpts.Contains(
"a") || dataDirOpts.Contains(
"o")) m = 0777;
 
 3096          if (gProofDebugLevel > 0)
 
 3097             Info(
"SetupCommon", 
"requested mode for data directories is '%o'", m);
 
 3102          if (fDataDir.BeginsWith(
"/")) p = 
"/";
 
 3103          while (fDataDir.Tokenize(subp, from, 
"/")) {
 
 3104             if (subp.IsNull()) 
continue;
 
 3106             if (gSystem->GetPathInfo(p, st) == 0) {
 
 3107                if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
 
 3108                   if (gSystem->Chmod(p.Data(), m) != 0) {
 
 3109                      Warning(
"SetupCommon", 
"problems setting mode '%o' on path '%s' (errno: %d)",
 
 3110                                             m, p.Data(), TSystem::GetErrno());
 
 3116                Warning(
"SetupCommon", 
"problems stat-ing path '%s' (errno: %d; datadir: %s)",
 
 3117                                        p.Data(), TSystem::GetErrno(), fDataDir.Data());
 
 3125    TString globpack = gEnv->GetValue(
"Proof.GlobalPackageDirs",
"");
 
 3127    ResolveKeywords(globpack);
 
 3128    Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
 
 3129    Info(
"SetupCommon", 
" %d global package directories registered", nglb);
 
 3133    if (fSessionDir != gSystem->WorkingDirectory()) {
 
 3134       ResolveKeywords(fSessionDir);
 
 3135       if (gSystem->AccessPathName(fSessionDir))
 
 3136          gSystem->mkdir(fSessionDir, kTRUE);
 
 3137       if (!gSystem->ChangeDirectory(fSessionDir)) {
 
 3138          Error(
"SetupCommon", 
"can not change to working directory '%s'",
 
 3139                               fSessionDir.Data());
 
 3143    gSystem->Setenv(
"PROOF_SANDBOX", fSessionDir);
 
 3144    if (gProofDebugLevel > 0)
 
 3145       Info(
"SetupCommon", 
"session dir is '%s'", fSessionDir.Data());
 
 3152       fQueryDir = fWorkDir;
 
 3153       fQueryDir += TString(
"/") + kPROOF_QueryDir;
 
 3154       ResolveKeywords(fQueryDir);
 
 3155       if (gSystem->AccessPathName(fQueryDir))
 
 3156          gSystem->mkdir(fQueryDir, kTRUE);
 
 3157       fQueryDir += TString(
"/session-") + fTopSessionTag;
 
 3158       if (gSystem->AccessPathName(fQueryDir))
 
 3159          gSystem->mkdir(fQueryDir, kTRUE);
 
 3160       if (gProofDebugLevel > 0)
 
 3161          Info(
"SetupCommon", 
"queries dir is %s", fQueryDir.Data());
 
 3164       fQueryLock = 
new TProofLockPath(TString::Format(
"%s/%s%s-%s",
 
 3165                        gSystem->TempDirectory(),
 
 3166                        kPROOF_QueryLockFile, fSessionTag.Data(),
 
 3167                        TString(fQueryDir).ReplaceAll(
"/",
"%").Data()));
 
 3170       fQMgr = 
new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
 
 3175    fImage = gEnv->GetValue(
"ProofServ.Image", 
"");
 
 3180       TMessage m(kPROOF_SESSIONTAG);
 
 3181       m << fTopSessionTag << fGroup << fUser;
 
 3184       fGroupPriority = GetPriority();
 
 3186       TPluginHandler *h = 0;
 
 3187       TString dsms = gEnv->GetValue(
"Proof.DataSetManager", 
"");
 
 3188       if (!dsms.IsNull()) {
 
 3191          while (dsms.Tokenize(dsm, from, 
",")) {
 
 3192             if (fDataSetManager && !fDataSetManager->TestBit(TObject::kInvalidObject)) {
 
 3193                Warning(
"SetupCommon", 
"a valid dataset manager already initialized");
 
 3194                Warning(
"SetupCommon", 
"support for multiple managers not yet available");
 
 3198             if (gROOT->GetPluginManager()) {
 
 3200                h = gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
 
 3201                if (h && h->LoadPlugin() != -1) {
 
 3204                      reinterpret_cast<TDataSetManager*
>(h->ExecPlugin(3, fGroup.Data(),
 
 3205                                                             fUser.Data(), dsm.Data()));
 
 3210          if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
 
 3211             Warning(
"SetupCommon", 
"dataset manager plug-in initialization failed");
 
 3212             SendAsynMessage(
"TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
 
 3213             SafeDelete(fDataSetManager);
 
 3217          TString opts(
"Av:");
 
 3218          TString dsetdir = gEnv->GetValue(
"ProofServ.DataSetDir", 
"");
 
 3219          if (dsetdir.IsNull()) {
 
 3221             dsetdir.Form(
"%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
 
 3222             if (gSystem->AccessPathName(fDataSetDir))
 
 3223                gSystem->MakeDirectory(fDataSetDir);
 
 3228             h = gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", 
"file");
 
 3229             if (h && h->LoadPlugin() == -1) h = 0;
 
 3233             TString oo = TString::Format(
"dir:%s opt:%s", dsetdir.Data(), opts.Data());
 
 3234             fDataSetManager = 
reinterpret_cast<TDataSetManager*
>(h->ExecPlugin(3,
 
 3235                               fGroup.Data(), fUser.Data(), oo.Data()));
 
 3237          if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
 
 3238             Warning(
"SetupCommon", 
"default dataset manager plug-in initialization failed");
 
 3239             SafeDelete(fDataSetManager);
 
 3243       TString dsReqCfg = gEnv->GetValue(
"Proof.DataSetStagingRequests", 
"");
 
 3244       if (!dsReqCfg.IsNull()) {
 
 3245          TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
 
 3247          if (reReqDir.Match(dsReqCfg) == 5) {
 
 3249             dsDirFmt.Form(
"dir:%s perms:open", reReqDir[3].Data());
 
 3250             fDataSetStgRepo = 
new TDataSetManagerFile(
"_stage_", 
"_stage_",
 
 3252             if (fDataSetStgRepo &&
 
 3253                fDataSetStgRepo->TestBit(TObject::kInvalidObject)) {
 
 3254                Warning(
"SetupCommon",
 
 3255                   "failed init of dataset staging requests repository");
 
 3256                SafeDelete(fDataSetStgRepo);
 
 3259             Warning(
"SetupCommon",
 
 3260               "specify, with [dir:]<path>, a valid path for staging requests");
 
 3262       } 
else if (gProofDebugLevel > 0) {
 
 3263          Warning(
"SetupCommon", 
"no repository for staging requests available");
 
 3268    TString quotas = gEnv->GetValue(TString::Format(
"ProofServ.UserQuotas.%s", fUser.Data()),
"");
 
 3269    if (quotas.IsNull())
 
 3270       quotas = gEnv->GetValue(TString::Format(
"ProofServ.UserQuotasByGroup.%s", fGroup.Data()),
"");
 
 3271    if (quotas.IsNull())
 
 3272       quotas = gEnv->GetValue(
"ProofServ.UserQuotas", 
"");
 
 3273    if (!quotas.IsNull()) {
 
 3277       while (quotas.Tokenize(tok, from, 
" ")) {
 
 3279          if (tok.BeginsWith(
"maxquerykept=")) {
 
 3280             tok.ReplaceAll(
"maxquerykept=",
"");
 
 3282                fMaxQueries = tok.Atoi();
 
 3285                     "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
 
 3288          const char *ksz[2] = {
"hwmsz=", 
"maxsz="};
 
 3289          for (Int_t j = 0; j < 2; j++) {
 
 3290             if (tok.BeginsWith(ksz[j])) {
 
 3291                tok.ReplaceAll(ksz[j],
"");
 
 3293                if (!tok.IsDigit()) {
 
 3296                   const char *s[3] = {
"k", 
"m", 
"g"};
 
 3297                   Int_t i = 0, ki = 1024;
 
 3299                      if (tok.EndsWith(s[i++]))
 
 3304                   tok.Remove(tok.Length()-1);
 
 3306                if (tok.IsDigit()) {
 
 3308                      fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
 
 3310                      fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
 
 3312                   TString ssz(ksz[j], strlen(ksz[j])-1);
 
 3313                   Info(
"SetupCommon", 
"parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
 
 3321    if (IsMaster() && fQMgr)
 
 3322       if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
 
 3323          Warning(
"SetupCommon", 
"problems applying fMaxQueries");
 
 3326    if (fProtocol > 12) {
 
 3327       TString vac = gROOT->GetVersion();
 
 3328       vac += TString::Format(
":%s", gROOT->GetGitCommit());
 
 3329       TString rtag = gEnv->GetValue(
"ProofServ.RootVersionTag", 
"");
 
 3330       if (rtag.Length() > 0)
 
 3331          vac += TString::Format(
":%s", rtag.Data());
 
 3332       vac += TString::Format(
"|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
 
 3333       TMessage m(kPROOF_VERSARCHCOMP);
 
 3339    TString all_vars(gSystem->Getenv(
"PROOF_ALLVARS"));
 
 3342    while (all_vars.Tokenize(name, from, 
",")) {
 
 3343       if (!name.IsNull()) {
 
 3344          TString value = gSystem->Getenv(name);
 
 3345          TProof::AddEnvVar(name, value);
 
 3349    if (fgLogToSysLog > 0) {
 
 3351       if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
 
 3352          fgSysLogEntity.Form(
"%s:%s", fUser.Data(), fGroup.Data());
 
 3353       } 
else if (!(fUser.IsNull()) && fGroup.IsNull()) {
 
 3354          fgSysLogEntity.Form(
"%s:default", fUser.Data());
 
 3355       } 
else if (fUser.IsNull() && !(fGroup.IsNull())) {
 
 3356          fgSysLogEntity.Form(
"undef:%s", fGroup.Data());
 
 3360       s.Form(
"%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
 
 3361       gSystem->Syslog(kLogNotice, s.Data());
 
 3364    if (gProofDebugLevel > 0)
 
 3365       Info(
"SetupCommon", 
"successfully completed");
 
 3374 void TProofServ::Terminate(Int_t status)
 
 3376    if (fgLogToSysLog > 0) {
 
 3378       s.Form(
"%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
 
 3379       gSystem->Syslog(kLogNotice, s.Data());
 
 3384    if (!gSystem->GetProcInfo(&pi)){
 
 3385       Info(
"Terminate", 
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
 
 3386                         pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
 
 3392       gSystem->ChangeDirectory(
"/");
 
 3394       gSystem->MakeDirectory(fSessionDir+
"/.delete");
 
 3395       gSystem->Exec(TString::Format(
"%s %s", kRM, fSessionDir.Data()));
 
 3400       if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
 
 3402          gSystem->ChangeDirectory(
"/");
 
 3404          gSystem->MakeDirectory(fQueryDir+
"/.delete");
 
 3405          gSystem->Exec(TString::Format(
"%s %s", kRM, fQueryDir.Data()));
 
 3408             gSystem->Unlink(fQueryLock->GetName());
 
 3413          fQueryLock->Unlock();
 
 3417    if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
 
 3418      if (UnlinkDataDir(fDataDir))
 
 3419         Info(
"Terminate", 
"data directory '%s' has been removed", fDataDir.Data());
 
 3424    TIter next(gSystem->GetListOfFileHandlers());
 
 3426    while ((fh = next())) {
 
 3427       TProofServInputHandler *ih = 
dynamic_cast<TProofServInputHandler *
>(fh);
 
 3429          gSystem->RemoveFileHandler(ih);
 
 3433    gSystem->ExitLoop();
 
 3442 Bool_t TProofServ::UnlinkDataDir(
const char *path)
 
 3444    if (!path || strlen(path) <= 0) 
return kFALSE;
 
 3446    Bool_t dorm = kTRUE;
 
 3447    void *dirp = gSystem->OpenDirectory(path);
 
 3450       const char *ent = 0;
 
 3451       while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
 
 3452          if (!strcmp(ent, 
".") || !strcmp(ent, 
"..")) 
continue;
 
 3453          fpath.Form(
"%s/%s", path, ent);
 
 3455          if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
 
 3456             dorm = UnlinkDataDir(fpath);
 
 3462       gSystem->FreeDirectory(dirp);
 
 3469    if (dorm && gSystem->Unlink(path) != 0)
 
 3470       Warning(
"UnlinkDataDir", 
"data directory '%s' is empty but could not be removed", path);
 
 3478 Bool_t TProofServ::IsActive()
 
 3480    return gProofServ ? kTRUE : kFALSE;
 
 3488 TProofServ *TProofServ::This()
 
 3497 Int_t TProofServ::OldAuthSetup(TString &conf)
 
 3499    OldProofServAuthSetup_t oldAuthSetupHook = 0;
 
 3501    if (!oldAuthSetupHook) {
 
 3503       TString authlib = 
"libRootAuth";
 
 3506       if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
 
 3508          if (gSystem->Load(authlib) == -1) {
 
 3509             Error(
"OldAuthSetup", 
"can't load %s",authlib.Data());
 
 3513          Error(
"OldAuthSetup", 
"can't locate %s",authlib.Data());
 
 3518       Func_t f = gSystem->DynFindSymbol(authlib,
"OldProofServAuthSetup");
 
 3520          oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
 
 3522          Error(
"OldAuthSetup", 
"can't find OldProofServAuthSetup");
 
 3528    return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
 
 3529                               fUser, fOrdinal, conf);
 
 3535 TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
 
 3537                                                TList *inlist, Long64_t fst,
 
 3538                                                TDSet *dset, 
const char *selec,
 
 3544       fQMgr->IncrementSeqNum();
 
 3545       seqnum = fQMgr->SeqNum();
 
 3549    Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
 
 3551       dset->SetWriteV3(kFALSE);
 
 3554    TProofQueryResult *pqr = 
new TProofQueryResult(seqnum, opt, inlist, nent,
 
 3555                                                   fst, dset, selec, elist);
 
 3557    pqr->SetTitle(gSystem->BaseName(fQueryDir));
 
 3561       dset->SetWriteV3(kTRUE);
 
 3569 void TProofServ::SetQueryRunning(TProofQueryResult *pq)
 
 3573    Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
 
 3577    Info(
"SetQueryRunning", 
"starting query: %d", pq->GetSeqNum());
 
 3580    TString parlist = 
"";
 
 3581    fPackMgr->GetEnabledPackages(parlist);
 
 3585       pq->SetRunning(startlog, parlist, fProof->GetParallel());
 
 3588       pq->SetProcessInfo(pq->GetEntries(),
 
 3589                         fProof->GetCpuTime(), fProof->GetBytesRead());
 
 3592       pq->SetRunning(startlog, parlist, -1);
 
 3595       pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
 
 3602 void TProofServ::HandleArchive(TMessage *mess, TString *slb)
 
 3605       Info("HandleArchive", "Enter");
 
 3609    (*mess) >> queryref >> path;
 
 3611    if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
 
 3614    if (queryref == "Default") {
 
 3615       fArchivePath = path;
 
 3616       Info(
"HandleArchive",
 
 3617            "default path set to %s", fArchivePath.Data());
 
 3623    TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
 
 3624    TProofQueryResult *pqm = pqr;
 
 3626    if (path.Length() <= 0) {
 
 3627       if (fArchivePath.Length() <= 0) {
 
 3628          Info(
"HandleArchive",
 
 3629               "archive paths are not defined - do nothing");
 
 3633          path.Form(
"%s/session-%s-%d.root",
 
 3634                    fArchivePath.Data(), fTopSessionTag.Data(), qry);
 
 3637          path.ReplaceAll(
":q",
"-");
 
 3638          path.Insert(0, TString::Format(
"%s/",fArchivePath.Data()));
 
 3644    if (!pqr || qry < 0) {
 
 3645       TString fout = qdir;
 
 3646       fout += 
"/query-result.root";
 
 3648       TFile *f = TFile::Open(fout,
"READ");
 
 3652          TIter nxk(f->GetListOfKeys());
 
 3654          while ((k = (TKey *)nxk())) {
 
 3655             if (!strcmp(k->GetClassName(), 
"TProofQueryResult")) {
 
 3656                pqr = (TProofQueryResult *) f->Get(k->GetName());
 
 3664          Info(
"HandleArchive",
 
 3665               "file cannot be open (%s)",fout.Data());
 
 3672       PDB(kGlobal, 1) Info("HandleArchive",
 
 3673                            "archive path for query 
#%d: %s", 
 3676       if (gSystem->AccessPathName(path))
 
 3677          farc = TFile::Open(path,
"NEW");
 
 3679          farc = TFile::Open(path,
"UPDATE");
 
 3680       if (!farc || !(farc->IsOpen())) {
 
 3681          Info(
"HandleArchive",
 
 3682               "archive file cannot be open (%s)",path.Data());
 
 3688       pqr->SetArchived(path);
 
 3690          pqm->SetArchived(path);
 
 3696       if (qry > -1 && fQMgr)
 
 3697          fQMgr->SaveQuery(pqr);
 
 3700       Info(
"HandleArchive",
 
 3701            "results of query %s archived to file %s",
 
 3702            queryref.Data(), path.Data());
 
 3714 TMap *TProofServ::GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
 
 3721       emsg.Form(
"file collection undefined!");
 
 3728    TIter nxf(fc->GetList());
 
 3729    TFileInfo *fiind = 0;
 
 3731    while ((fiind = (TFileInfo *)nxf())) {
 
 3732       TUrl *xurl = fiind->GetCurrentUrl();
 
 3734       key.Form(
"%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
 
 3735       if (xurl->GetPort() > 0)
 
 3736          key += TString::Format(
":%d", xurl->GetPort());
 
 3740       if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
 
 3742          l = (THashList *) ent->Value();
 
 3748          fcmap->Add(
new TObjString(key.Data()), l);
 
 3761 void TProofServ::HandleProcess(TMessage *mess, TString *slb)
 
 3764       Info("HandleProcess", "Enter");
 
 3767    if (!IsTopMaster() && !IsIdle())
 
 3771    TString filename, opt;
 
 3773    Long64_t nentries, first;
 
 3774    TEventList *evl = 0;
 
 3775    TEntryList *enl = 0;
 
 3778    (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
 
 3780    if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
 
 3782    Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
 
 3785    TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
 
 3789    if ((!hasNoData) && elist)
 
 3790       dset->SetEntryList(elist);
 
 3792    if (IsTopMaster()) {
 
 3796       if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
 
 3797          if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
 
 3798             SendAsynMessage(TString::Format(
"AssertDataSet on %s: %s",
 
 3799                                  fPrefix.Data(), emsg.Data()));
 
 3800             Error(
"HandleProcess", 
"AssertDataSet: %s", emsg.Data());
 
 3802             if (sync) SendLogFile();
 
 3805       } 
else if (hasNoData) {
 
 3807          TNamed *ftp = 
dynamic_cast<TNamed *
>(input->FindObject(
"PROOF_FilesToProcess"));
 
 3809             TString dsn(ftp->GetTitle());
 
 3810             if (!dsn.Contains(
":") || dsn.BeginsWith(
"dataset:")) {
 
 3811                dsn.ReplaceAll(
"dataset:", 
"");
 
 3814                if (!fDataSetManager) {
 
 3815                   emsg.Form(
"dataset manager not initialized!");
 
 3817                   TFileCollection *fc = 0;
 
 3819                   if (!(fc = fDataSetManager->GetDataSet(dsn))) {
 
 3820                      emsg.Form(
"requested dataset '%s' does not exists", dsn.Data());
 
 3822                      TMap *fcmap = GetDataSetNodeMap(fc, emsg);
 
 3826                         fcmap->SetOwner(kTRUE);
 
 3827                         fcmap->SetName(
"PROOF_FilesToProcess");
 
 3832                if (!emsg.IsNull()) {
 
 3833                   SendAsynMessage(TString::Format(
"HandleProcess on %s: %s",
 
 3834                                                   fPrefix.Data(), emsg.Data()));
 
 3835                   Error(
"HandleProcess", 
"%s", emsg.Data());
 
 3837                   if (sync) SendLogFile();
 
 3844       TProofQueryResult *pq = 0;
 
 3848       pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
 
 3851       if (dset) input->Add(dset);
 
 3852       if (elist) input->Add(elist);
 
 3853       pq->SetInputList(input, kTRUE);
 
 3856       input->Clear(
"nodelete");
 
 3860       if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
 
 3861          Warning(
"HandleProcess", 
"could not save input data: %s", emsg.Data());
 
 3864       if (!(pq->IsDraw())) {
 
 3866             if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
 
 3868             fQMgr->SaveQuery(pq);
 
 3879       Bool_t enqueued = kFALSE;
 
 3882       if (fProof->UseDynamicStartup()) {
 
 3884          TList* workerList = 
new TList();
 
 3885          EQueryAction retVal = GetWorkers(workerList, pc);
 
 3886          if (retVal == TProofServ::kQueryStop) {
 
 3887             Error(
"HandleProcess", 
"error getting list of worker nodes");
 
 3889             if (sync) SendLogFile();
 
 3891          } 
else if (retVal == TProofServ::kQueryEnqueued) {
 
 3894             Info(
"HandleProcess", 
"query %d enqueued", pq->GetSeqNum());
 
 3896             Int_t ret = fProof->AddWorkers(workerList);
 
 3898                Error(
"HandleProcess", 
"Adding a list of worker nodes returned: %d",
 
 3901                if (sync) SendLogFile();
 
 3906          EQueryAction retVal = GetWorkers(0, pc);
 
 3907          if (retVal == TProofServ::kQueryStop) {
 
 3908             Error(
"HandleProcess", 
"error getting list of worker nodes");
 
 3910             if (sync) SendLogFile();
 
 3912          } 
else if (retVal == TProofServ::kQueryEnqueued) {
 
 3915             Info(
"HandleProcess", 
"query %d enqueued", pq->GetSeqNum());
 
 3916          } 
else if (retVal != TProofServ::kQueryOK) {
 
 3917             Error(
"HandleProcess", 
"unknown return value: %d", retVal);
 
 3919             if (sync) SendLogFile();
 
 3927       TMessage m(kPROOF_QUERYSUBMITTED);
 
 3928       if (!sync || enqueued) {
 
 3929          m << pq->GetSeqNum() << kFALSE;
 
 3936          Info(
"HandleProcess",
 
 3937               "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
 
 3945       Bool_t doprocess = kFALSE;
 
 3946       while (WaitingQueries() > 0 && !enqueued) {
 
 3951          if (fProof->UseDynamicStartup())
 
 3960       fProof->ResetMergers();
 
 3965       if (!sync) SendLogFile();
 
 3969          m.Reset(kPROOF_SETIDLE);
 
 3970          Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
 
 3978       if (sync) SendLogFile();
 
 3993       Bool_t deleteplayer = kTRUE;
 
 3997       if (dset && (dset->IsA() == TDSetProxy::Class()))
 
 3998          ((TDSetProxy*)dset)->SetProofServ(
this);
 
 4002       if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
 
 4003          Warning(
"HandleProcess", 
"could not get input data: %s", emsg.Data());
 
 4006       if (TProof::GetParameter(input, 
"PROOF_QuerySeqNum", fQuerySeqNum) != 0)
 
 4007          Warning(
"HandleProcess", 
"could not get query sequential number!");
 
 4011       while ((nord = input->FindObject(
"PROOF_Ordinal")))
 
 4012          input->Remove(nord);
 
 4013       input->Add(
new TNamed(
"PROOF_Ordinal", GetOrdinal()));
 
 4018       while ((o = next())) {
 
 4019          PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
 
 4020          fPlayer->AddInput(o);
 
 4025       TSelector *selector_obj = 0;
 
 4027       while ((obj = nxt())){
 
 4028          if (obj->InheritsFrom(
"TSelector")) {
 
 4029             selector_obj = (TSelector *) obj;
 
 4030             filename = selector_obj->ClassName();
 
 4031             Info(
"HandleProcess", 
"selector obj for '%s' found", selector_obj->ClassName());
 
 4037       fSocket->Send(kPROOF_STARTPROCESS);
 
 4041       fSaveOutput.Reset();
 
 4044       PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
 
 4047          Info(
"HandleProcess", 
"calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
 
 4048          fPlayer->Process(dset, selector_obj, opt, nentries, first);
 
 4051          Info(
"HandleProcess", 
"calling fPlayer->Process() with selector name: %s", filename.Data());
 
 4052          fPlayer->Process(dset, filename, opt, nentries, first);
 
 4056       TMessage m(kPROOF_STOPPROCESS);
 
 4057       Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
 
 4058       if (fProtocol > 18) {
 
 4059          TProofProgressStatus* status =
 
 4060             new TProofProgressStatus(fPlayer->GetEventsProcessed(),
 
 4061                                     gPerfStats?gPerfStats->GetBytesRead():0);
 
 4063             m << status << abort;
 
 4065             slb->Form(
"%d %lld %lld", fPlayer->GetExitStatus(),
 
 4066                                       status->GetEntries(), status->GetBytesRead());
 
 4069          m << fPlayer->GetEventsProcessed() << abort;
 
 4071             slb->Form(
"%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
 
 4076          Info("TProofServ::Handleprocess",
 
 4077               "worker %s has finished processing with %d objects in output list",
 
 4078               GetOrdinal(), fPlayer->GetOutputList()->GetEntries());
 
 4085       Bool_t outok = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted &&
 
 4086                         fPlayer->GetOutputList()) ? kTRUE : kFALSE;
 
 4090          Bool_t isSubMerging = kFALSE;
 
 4094          if (TProof::GetParameter(input, 
"PROOF_UseMergers", nm) == 0) {
 
 4095             isSubMerging = (nm >= 0) ? kTRUE : kFALSE;
 
 4097          if (!isSubMerging) {
 
 4098             cso = gEnv->GetValue(
"Proof.ControlSendOutput", 1);
 
 4099             if (TProof::GetParameter(input, 
"PROOF_ControlSendOutput", cso) != 0)
 
 4100                cso = gEnv->GetValue(
"Proof.ControlSendOutput", 1);
 
 4107             TMessage msg(kPROOF_SENDOUTPUT);
 
 4114             deleteplayer = kFALSE;
 
 4117                Info("HandleProcess", "controlled mode: worker %s has finished,"
 
 4118                                      " sizes sent to master", fOrdinal.Data());
 
 4122             if (TestBit(TProofServ::kHighMemory)) {
 
 4124                   Info(
"HandleProcess", 
"submerging disabled because of high-memory case");
 
 4125                isSubMerging = kFALSE;
 
 4127                PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isSubMerging);
 
 4130             if (!IsMaster() && isSubMerging) {
 
 4138                TMessage msg_osize(kPROOF_SUBMERGER);
 
 4139                msg_osize << Int_t(TProof::kOutputSize);
 
 4140                msg_osize << fPlayer->GetOutputList()->GetEntries();
 
 4142                fMergingSocket = 
new TServerSocket(0);
 
 4143                Int_t merge_port = 0;
 
 4144                if (fMergingSocket) {
 
 4146                      Info("HandleProcess", "possible port for merging connections: %d",
 
 4147                                            fMergingSocket->GetLocalPort());
 
 4148                   merge_port = fMergingSocket->GetLocalPort();
 
 4150                msg_osize << merge_port;
 
 4151                fSocket->Send(msg_osize);
 
 4157                deleteplayer = kFALSE;
 
 4159                PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
 
 4164                PDB(kGlobal, 2)  Info("HandleProcess", "sending result directly to master");
 
 4165                if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
 
 4166                   Warning("HandleProcess","problems sending output list");
 
 4169                if (IsMaster()) fProof->ResetMergers();
 
 4172                fSocket->Send(kPROOF_SETIDLE);
 
 4187          if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
 
 4188             Warning(
"HandleProcess",
"the output list is empty!");
 
 4189          if (SendResults(fSocket) != 0)
 
 4190             Warning(
"HandleProcess", 
"problems sending output list");
 
 4193          if (IsMaster()) fProof->ResetMergers();
 
 4196          fSocket->Send(kPROOF_SETIDLE);
 
 4207       while ((obj = nex())) {
 
 4208          if (obj->InheritsFrom(
"TSelector")) input->Remove(obj);
 
 4212       fPlayer->GetInputList()->SetOwner(0);
 
 4215       TList *added = 
dynamic_cast<TList *
>(input->FindObject(
"PROOF_InputObjsFromFile"));
 
 4217          if (added->GetSize() > 0) {
 
 4219             TFile *f = 
dynamic_cast<TFile *
>(added->Last());
 
 4223                while ((o = nxo())) { input->Remove(o); }
 
 4224                input->Remove(added);
 
 4225                added->SetOwner(kFALSE);
 
 4237       if (deleteplayer) DeletePlayer();
 
 4240    PDB(kGlobal, 1) Info("HandleProcess", "done");
 
 4249 Int_t TProofServ::SendResults(TSocket *sock, TList *outlist, TQueryResult *pq)
 
 4251    PDB(kOutput, 2) Info("SendResults", "enter");
 
 4254    if (fProtocol > 23 && outlist) {
 
 4258       TMessage mbuf(kPROOF_OUTPUTOBJECT);
 
 4260       Int_t olsz = outlist->GetSize();
 
 4261       if (IsTopMaster() && pq) {
 
 4262          msg.Form(
"%s: merging output objects ... done                                     ",
 
 4264          SendAsynMessage(msg.Data());
 
 4266          msg.Form(
"%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
 
 4267          SendAsynMessage(msg.Data(), kFALSE);
 
 4270          mbuf.WriteObject(pq);
 
 4271          if (sock->Send(mbuf) < 0) 
return -1;
 
 4274       Int_t ns = 0, np = 0;
 
 4277       Int_t totsz = 0, objsz = 0;
 
 4279       while ((o = nxo())) {
 
 4280          if (mbuf.Length() > fMsgSizeHWM) {
 
 4283                     "message has %d bytes: limit of %lld bytes reached - sending ...",
 
 4284                     mbuf.Length(), fMsgSizeHWM);
 
 4287             if (GetCompressionLevel() > 0) {
 
 4288                mbuf.SetCompressionSettings(fCompressMsg);
 
 4290                objsz = mbuf.CompLength();
 
 4292                objsz = mbuf.Length();
 
 4295             if (IsTopMaster()) {
 
 4296                msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes)   ",
 
 4297                               fPrefix.Data(), ns, olsz, objsz);
 
 4298                SendAsynMessage(msg.Data(), kFALSE);
 
 4300             if (sock->Send(mbuf) < 0) 
return -1;
 
 4307          mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
 
 4313          if (GetCompressionLevel() > 0) {
 
 4314             mbuf.SetCompressionSettings(fCompressMsg);
 
 4316             objsz = mbuf.CompLength();
 
 4318             objsz = mbuf.Length();
 
 4321          if (IsTopMaster()) {
 
 4322             msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes)     ",
 
 4323                            fPrefix.Data(), ns, olsz, objsz);
 
 4324             SendAsynMessage(msg.Data(), kFALSE);
 
 4326          if (sock->Send(mbuf) < 0) 
return -1;
 
 4328       if (IsTopMaster()) {
 
 4330          msg.Form(
"%s: grand total: sent %d objects, size: %d bytes                            ",
 
 4331                                         fPrefix.Data(), olsz, totsz);
 
 4332          SendAsynMessage(msg.Data());
 
 4334    } 
else if (fProtocol > 10 && outlist) {
 
 4338       TMessage mbuf(kPROOF_OUTPUTOBJECT);
 
 4340       Int_t olsz = outlist->GetSize();
 
 4341       if (IsTopMaster() && pq) {
 
 4342          msg.Form(
"%s: merging output objects ... done                                     ",
 
 4344          SendAsynMessage(msg.Data());
 
 4346          msg.Form(
"%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
 
 4347          SendAsynMessage(msg.Data(), kFALSE);
 
 4350          mbuf.WriteObject(pq);
 
 4351          if (sock->Send(mbuf) < 0) 
return -1;
 
 4355       Int_t totsz = 0, objsz = 0;
 
 4356       TIter nxo(fPlayer->GetOutputList());
 
 4358       while ((o = nxo())) {
 
 4361          Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
 
 4363          mbuf.WriteObject(o);
 
 4366          if (GetCompressionLevel() > 0) {
 
 4367             mbuf.SetCompressionSettings(fCompressMsg);
 
 4369             objsz = mbuf.CompLength();
 
 4371             objsz = mbuf.Length();
 
 4374          if (IsTopMaster()) {
 
 4375             msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes)   ",
 
 4376                            fPrefix.Data(), ns, olsz, objsz);
 
 4377             SendAsynMessage(msg.Data(), kFALSE);
 
 4379          if (sock->Send(mbuf) < 0) 
return -1;
 
 4382       if (IsTopMaster()) {
 
 4384          msg.Form(
"%s: grand total: sent %d objects, size: %d bytes       ",
 
 4385                                         fPrefix.Data(), olsz, totsz);
 
 4386          SendAsynMessage(msg.Data());
 
 4389    } 
else if (IsTopMaster() && fProtocol > 6 && outlist) {
 
 4392       TMessage mbuf(kPROOF_OUTPUTLIST);
 
 4393       mbuf.WriteObject(pq);
 
 4395       Int_t blen = mbuf.CompLength();
 
 4396       Int_t olsz = outlist->GetSize();
 
 4398       msg.Form(
"%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
 
 4399       SendAsynMessage(msg.Data(), kFALSE);
 
 4400       if (sock->Send(mbuf) < 0) 
return -1;
 
 4404          PDB(kGlobal, 2) Info("SendResults", "sending output list");
 
 4406          PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
 
 4408       if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
 
 4411    PDB(kOutput,2) Info("SendResults", "done");
 
 4421 void TProofServ::ProcessNext(TString *slb)
 
 4424    TString filename, opt;
 
 4426    Long64_t nentries = -1, first = 0;
 
 4429    TProofQueryResult *pq = 0;
 
 4432    TSelector* selector_obj = 0;
 
 4446       opt      = pq->GetOptions();
 
 4447       input    = pq->GetInputList();
 
 4448       nentries = pq->GetEntries();
 
 4449       first    = pq->GetFirst();
 
 4450       filename = pq->GetSelecImp()->GetName();
 
 4451       Ssiz_t 
id = opt.Last(
'#');
 
 4452       if (
id != kNPOS && 
id < opt.Length() - 1) {
 
 4453          filename += opt(
id + 1, opt.Length());
 
 4459       if ((o = pq->GetInputObject(
"TDSet"))) {
 
 4463          Error(
"ProcessNext", 
"no TDset object: cannot continue");
 
 4473       if (pq->GetSelecImp()) {
 
 4474          gSystem->Exec(TString::Format(
"%s %s", kRM, pq->GetSelecImp()->GetName()));
 
 4475          pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
 
 4477       if (pq->GetSelecHdr() &&
 
 4478           !strstr(pq->GetSelecHdr()->GetName(), 
"TProofDrawHist")) {
 
 4479          gSystem->Exec(TString::Format(
"%s %s", kRM, pq->GetSelecHdr()->GetName()));
 
 4480          pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
 
 4485       while ((obj = nxt())){
 
 4486          if (obj->InheritsFrom(
"TSelector") &&
 
 4487             !strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
 
 4488             selector_obj = (TSelector *) obj;
 
 4489             Info(
"ProcessNext", 
"found object for selector '%s'", obj->ClassName());
 
 4496       Error(
"ProcessNext", 
"empty waiting queries list!");
 
 4501    SetQueryRunning(pq);
 
 4505       if (!(pq->IsDraw()))
 
 4506          fQMgr->SaveQuery(pq);
 
 4508          fQMgr->IncrementDrawQueries();
 
 4513    TMessage m(kPROOF_STARTPROCESS);
 
 4514    m << TString(pq->GetSelecImp()->GetName())
 
 4515      << dset->GetNumOfFiles()
 
 4516      << pq->GetFirst() << pq->GetEntries();
 
 4523    fPlayer->AddQueryResult(pq);
 
 4526    fPlayer->SetCurrentQuery(pq);
 
 4529    if (dset->IsA() == TDSetProxy::Class())
 
 4530       ((TDSetProxy*)dset)->SetProofServ(
this);
 
 4534    TString qid = TString::Format(
"%s:%s",pq->GetTitle(),pq->GetName());
 
 4535    input->Add(
new TNamed(
"PROOF_QueryTag", qid.Data()));
 
 4537    fQuerySeqNum = pq->GetSeqNum();
 
 4538    input->Add(
new TParameter<Int_t>(
"PROOF_QuerySeqNum", fQuerySeqNum));
 
 4542    if (gEnv->Lookup(
"Proof.UseMergers") && !input->FindObject(
"PROOF_UseMergers")) {
 
 4543       Int_t smg = gEnv->GetValue(
"Proof.UseMergers",-1);
 
 4545          input->Add(
new TParameter<Int_t>(
"PROOF_UseMergers", smg));
 
 4546          PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
 
 4547          if (gEnv->Lookup("Proof.MergersByHost")) {
 
 4548             Int_t mbh = gEnv->GetValue(
"Proof.MergersByHost", 0);
 
 4552                if ((o = input->FindObject(
"PROOF_MergersByHost"))) { input->Remove(o); 
delete o; }
 
 4553                input->Add(
new TParameter<Int_t>(
"PROOF_MergersByHost", mbh));
 
 4554                PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
 
 4563    while ((o = next())) {
 
 4564       PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
 
 4565       fPlayer->AddInput(o);
 
 4569    if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
 
 4572    PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
 
 4574       Info(
"ProcessNext", 
"calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
 
 4575       fPlayer->Process(dset, selector_obj, opt, nentries, first);
 
 4578       Info(
"ProcessNext", 
"calling fPlayer->Process() with selector name: %s", filename.Data());
 
 4579       fPlayer->Process(dset, filename, opt, nentries, first);
 
 4583    fPlayer->SetMerging(kFALSE);
 
 4587       (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
 
 4588    if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
 
 4589       m.Reset(kPROOF_STOPPROCESS);
 
 4591       if (fProtocol > 18) {
 
 4592          TProofProgressStatus* status = fPlayer->GetProgressStatus();
 
 4593          m << status << abort;
 
 4595       } 
else if (fProtocol > 8) {
 
 4596          m << fPlayer->GetEventsProcessed() << abort;
 
 4598          m << fPlayer->GetEventsProcessed();
 
 4604    if (fDataSetManager && fPlayer->GetOutputList()) {
 
 4605       TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject(
"PROOFSERV_RegisterDataSet");
 
 4608          if (RegisterDataSets(input, fPlayer->GetOutputList(), fDataSetManager, emsg) != 0)
 
 4609             Warning(
"ProcessNext", 
"problems registering produced datasets: %s", emsg.Data());
 
 4611             fPlayer->GetOutputList()->Remove(psr);
 
 4613          } 
while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject(
"PROOFSERV_RegisterDataSet")));
 
 4618    if (fQMgr && !pq->IsDraw()) {
 
 4619       if (!abort) fProof->AskStatistics();
 
 4620       if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
 
 4621          fQMgr->SaveQuery(pq, fMaxQueries);
 
 4626    if (IsTopMaster() && fPlayer->GetOutputList()) {
 
 4627       Bool_t save = kTRUE;
 
 4628       TIter nxo(fPlayer->GetOutputList());
 
 4630       while ((xo = nxo())) {
 
 4631          if (xo->InheritsFrom(
"TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
 
 4637          TNamed *nof = (TNamed *) input->FindObject(
"PROOF_DefaultOutputOption");
 
 4639             TString oopt(nof->GetTitle());
 
 4640             if (oopt.BeginsWith(
"of:")) {
 
 4641                oopt.Replace(0, 3, 
"");
 
 4642                if (!oopt.IsNull()) fPlayer->SetOutputFilePath(oopt);
 
 4643                fPlayer->SavePartialResults(kTRUE, kTRUE);
 
 4650    TQueryResult *pqr = pq->CloneInfo();
 
 4652    Info(
"ProcessNext", 
"adding info about dataset '%s' in the light query result", dset->GetName());
 
 4654    TDSet *ds = 
new TDSet(dset->GetName(), dset->GetObjName());
 
 4656    if (pqr) pqr->SetInputList(&rin, kTRUE);
 
 4657    if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
 
 4659          Info("ProcessNext", "sending results");
 
 4660       TQueryResult *xpq = (pqr && fProtocol > 10) ? pqr : pq;
 
 4661       if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
 
 4662          Warning("ProcessNext", "problems sending output list");
 
 4663       if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
 
 4664                                               pq->GetBytes(), pq->GetUsedCPU());
 
 4666       if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
 
 4667          Warning(
"ProcessNext",
"the output list is empty!");
 
 4668       if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
 
 4669          Warning(
"ProcessNext", 
"problems sending output list");
 
 4670       if (slb) slb->Form(
"%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
 
 4674    if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
 
 4676       if (fQMgr) fQMgr->RemoveQuery(pq);
 
 4679       if (!(pq->IsDraw()) && pqr) {
 
 4680          if (fQMgr && fQMgr->Queries()) {
 
 4681             fQMgr->Queries()->Add(pqr);
 
 4683             fQMgr->Queries()->Remove(pq);
 
 4688          fPlayer->RemoveQueryResult(TString::Format(
"%s:%s",
 
 4689                                     pq->GetTitle(), pq->GetName()));
 
 4694    if (IsMaster() && fProof->UseDynamicStartup())
 
 4696       fProof->RemoveWorkers(0);
 
 4702 Int_t TProofServ::RegisterDataSets(TList *in, TList *out,
 
 4703                                    TDataSetManager *dsm, TString &msg)
 
 4706       ::Info(
"TProofServ::RegisterDataSets",
 
 4707              "enter: %d objs in the output list", (out ? out->GetSize() : -1));
 
 4709    if (!in || !out || !dsm) {
 
 4710       ::Error(
"TProofServ::RegisterDataSets", 
"invalid inputs: %p, %p, %p", in, out, dsm);
 
 4718    while ((o = nxo())) {
 
 4720       TFileCollection *ds = 
dynamic_cast<TFileCollection*
> (o);
 
 4723          ds->SetTitle(gSystem->HostName());
 
 4726          TString tag = TString::Format(
"DATASET_%s", ds->GetName());
 
 4727          if (!(fcn = (TNamed *) out->FindObject(tag))) 
continue;
 
 4729          if (tags.FindObject(tag)) {
 
 4734          TString regopt(fcn->GetTitle());
 
 4736          if (regopt.Contains(
":sortidx:")) {
 
 4738             regopt.ReplaceAll(
":sortidx:", 
"");
 
 4741          if (dsm->TestBit(TDataSetManager::kAllowRegister)) {
 
 4743             if (ds->GetList()->GetSize() > 0) {
 
 4745                const char *vfmsg = regopt.Contains(
"V") ? 
" and verifying" : 
"";
 
 4746                msg.Form(
"Registering%s dataset '%s' ... ", vfmsg, ds->GetName());
 
 4748                Bool_t allowVerify = dsm->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
 
 4749                if (regopt.Contains(
"V") && !allowVerify) dsm->SetBit(TDataSetManager::kAllowVerify);
 
 4751                Int_t rc = dsm->RegisterDataSet(ds->GetName(), ds, regopt);
 
 4753                if (regopt.Contains(
"V") && !allowVerify) dsm->ResetBit(TDataSetManager::kAllowVerify);
 
 4755                   ::Warning(
"TProofServ::RegisterDataSets",
 
 4756                             "failure registering or verifying dataset '%s'", ds->GetName());
 
 4757                   msg.Form(
"Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->GetName());
 
 4759                   ::Info(
"TProofServ::RegisterDataSets", 
"dataset '%s' successfully registered%s",
 
 4760                                                          ds->GetName(), (strlen(vfmsg) > 0) ? 
" and verified" : 
"");
 
 4761                   msg.Form(
"Registering%s dataset '%s' ... OK", vfmsg, ds->GetName());
 
 4764                   tags.Add(
new TObjString(tag));
 
 4768                   ::Info(
"TProofServ::RegisterDataSets", 
"printing collection");
 
 4772                ::Warning(
"TProofServ::RegisterDataSets", 
"collection '%s' is empty", o->GetName());
 
 4775             ::Info(
"TProofServ::RegisterDataSets", 
"dataset registration not allowed");
 
 4782    while ((o = nxrm())) out->Remove(o);
 
 4783    torm.SetOwner(kTRUE);
 
 4786    while((o = nxtg())) {
 
 4788       while ((oo = out->FindObject(o->GetName()))) { out->Remove(oo); }
 
 4790    tags.SetOwner(kTRUE);
 
 4792    PDB(kDataset, 1) ::Info(
"TProofServ::RegisterDataSets", 
"exit");
 
 4800 void TProofServ::HandleQueryList(TMessage *mess)
 
 4803       Info("HandleQueryList", "Enter");
 
 4808    TList *ql = new TList;
 
 4809    Int_t ntot = 0, npre = 0, ndraw= 0;
 
 4813          TString qdir = fQueryDir;
 
 4814          Int_t idx = qdir.Index(
"session-");
 
 4817          fQMgr->ScanPreviousQueries(qdir);
 
 4819          if (fQMgr->PreviousQueries()) {
 
 4820             TIter nxq(fQMgr->PreviousQueries());
 
 4821             TProofQueryResult *pqr = 0;
 
 4822             while ((pqr = (TProofQueryResult *)nxq())) {
 
 4824                pqr->fSeqNum = ntot;
 
 4831       if (fQMgr->Queries()) {
 
 4833          TIter nxq(fQMgr->Queries());
 
 4834          TProofQueryResult *pqr = 0;
 
 4835          TQueryResult *pqm = 0;
 
 4836          while ((pqr = (TProofQueryResult *)nxq())) {
 
 4838             if ((pqm = pqr->CloneInfo())) {
 
 4839                pqm->fSeqNum = ntot;
 
 4842                Warning(
"HandleQueryList", 
"unable to clone TProofQueryResult '%s:%s'",
 
 4843                        pqr->GetName(), pqr->GetTitle());
 
 4848       ndraw = fQMgr->DrawQueries();
 
 4851    TMessage m(kPROOF_QUERYLIST);
 
 4852    m << npre << ndraw << ql;
 
 4863 void TProofServ::HandleRemove(TMessage *mess, TString *slb)
 
 4866       Info("HandleRemove", "Enter");
 
 4869    (*mess) >> queryref;
 
 4871    if (slb) *slb = queryref;
 
 4873    if (queryref == "cleanupqueue") {
 
 4875       Int_t pend = CleanupWaitingQueries();
 
 4877       Info(
"HandleRemove", 
"%d queries removed from the waiting list", pend);
 
 4882    if (queryref == 
"cleanupdir") {
 
 4885       Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
 
 4888       Info(
"HandleRemove", 
"%d directories removed", nd);
 
 4895       TProofLockPath *lck = 0;
 
 4896       if (fQMgr->LockSession(queryref, &lck) == 0) {
 
 4900          fQMgr->RemoveQuery(queryref, &qtorm);
 
 4901          CleanupWaitingQueries(kFALSE, &qtorm);
 
 4905             gSystem->Unlink(lck->GetName());
 
 4913       Warning(
"HandleRemove", 
"query result manager undefined!");
 
 4917    Info(
"HandleRemove",
 
 4918         "query %s could not be removed (unable to lock session)", queryref.Data());
 
 4927 void TProofServ::HandleRetrieve(TMessage *mess, TString *slb)
 
 4930       Info("HandleRetrieve", "Enter");
 
 4933    (*mess) >> queryref;
 
 4935    if (slb) *slb = queryref;
 
 4940    if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
 
 4942    TString fout = qdir;
 
 4943    fout += "/query-result.root";
 
 4945    TFile *f = TFile::Open(fout,"READ");
 
 4946    TProofQueryResult *pqr = 0;
 
 4949       TIter nxk(f->GetListOfKeys());
 
 4951       while ((k = (TKey *)nxk())) {
 
 4952          if (!strcmp(k->GetClassName(), 
"TProofQueryResult")) {
 
 4953             pqr = (TProofQueryResult *) f->Get(k->GetName());
 
 4955             if (pqr && fProtocol < 13) {
 
 4958                TIter nxi(pqr->GetInputList());
 
 4960                   if ((d = dynamic_cast<TDSet *>(o)))
 
 4962                d->SetWriteV3(kTRUE);
 
 4967                Float_t qsz = (Float_t) f->GetSize();
 
 4969                static const char *clb[4] = { 
"bytes", 
"KB", 
"MB", 
"GB" };
 
 4970                while (qsz > 1000. && ilb < 3) {
 
 4974                SendAsynMessage(TString::Format(
"%s: sending result of %s:%s (%.1f %s)",
 
 4975                                                fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
 
 4977                fSocket->SendObject(pqr, kPROOF_RETRIEVE);
 
 4979                Info(
"HandleRetrieve",
 
 4980                     "query not found in file %s",fout.Data());
 
 4982                fSocket->SendObject(0, kPROOF_RETRIEVE);
 
 4990       Info(
"HandleRetrieve",
 
 4991            "file cannot be open (%s)",fout.Data());
 
 4993       fSocket->SendObject(0, kPROOF_RETRIEVE);
 
 5004 Int_t TProofServ::HandleLibIncPath(TMessage *mess)
 
 5010    (*mess) >> type >> add >> path;
 
 5011    if (mess->BufferSize() > mess->Length()) (*mess) >> rc;
 
 5014    if ((type != 
"lib") && (type != 
"inc")) {
 
 5015       Error(
"HandleLibIncPath",
"unknown action type: %s", type.Data());
 
 5020    path.ReplaceAll(
",",
" ");
 
 5024    if (path.Length() > 0 && path != 
"-") {
 
 5025       if (!(op = path.Tokenize(
" "))) {
 
 5026          Error(
"HandleLibIncPath",
"decomposing path %s", path.Data());
 
 5033       if (type == 
"lib") {
 
 5036          TIter nxl(op, kIterBackward);
 
 5037          TObjString *lib = 0;
 
 5038          while ((lib = (TObjString *) nxl())) {
 
 5040             TString xlib = lib->GetName();
 
 5041             gSystem->ExpandPathName(xlib);
 
 5043             if (!gSystem->AccessPathName(xlib, kReadPermission)) {
 
 5044                TString newlibpath = gSystem->GetDynamicPath();
 
 5047                if (newlibpath.BeginsWith(
".:"))
 
 5049                if (newlibpath.Index(xlib) == kNPOS) {
 
 5050                   newlibpath.Insert(pos,TString::Format(
"%s:", xlib.Data()));
 
 5051                   gSystem->SetDynamicPath(newlibpath);
 
 5054                Info(
"HandleLibIncPath",
 
 5055                     "libpath %s does not exist or cannot be read - not added", xlib.Data());
 
 5061             fProof->AddDynamicPath(path);
 
 5067          TObjString *inc = 0;
 
 5068          while ((inc = (TObjString *) nxi())) {
 
 5070             TString xinc = inc->GetName();
 
 5071             gSystem->ExpandPathName(xinc);
 
 5073             if (!gSystem->AccessPathName(xinc, kReadPermission)) {
 
 5074                TString curincpath = gSystem->GetIncludePath();
 
 5075                if (curincpath.Index(xinc) == kNPOS)
 
 5076                   gSystem->AddIncludePath(TString::Format(
"-I%s", xinc.Data()));
 
 5078                Info(
"HandleLibIncPath",
 
 5079                     "incpath %s does not exist or cannot be read - not added", xinc.Data());
 
 5084             fProof->AddIncludePath(path);
 
 5090       if (type == 
"lib") {
 
 5094          TObjString *lib = 0;
 
 5095          while ((lib = (TObjString *) nxl())) {
 
 5097             TString xlib = lib->GetName();
 
 5098             gSystem->ExpandPathName(xlib);
 
 5100             TString newlibpath = gSystem->GetDynamicPath();
 
 5101             newlibpath.ReplaceAll(TString::Format(
"%s:", xlib.Data()),
"");
 
 5102             gSystem->SetDynamicPath(newlibpath);
 
 5107             fProof->RemoveDynamicPath(path);
 
 5113          TObjString *inc = 0;
 
 5114          while ((inc = (TObjString *) nxi())) {
 
 5115             TString newincpath = gSystem->GetIncludePath();
 
 5116             newincpath.ReplaceAll(TString::Format(
"-I%s", inc->GetName()),
"");
 
 5118             newincpath.ReplaceAll(gInterpreter->GetIncludePath(),
"");
 
 5119             gSystem->SetIncludePath(newincpath);
 
 5124             fProof->RemoveIncludePath(path);
 
 5134 void TProofServ::HandleCheckFile(TMessage *mess, TString *slb)
 
 5138    UInt_t  opt = TProof::kUntar;
 
 5140    TMessage reply(kPROOF_CHECKFILE);
 
 5143    (*mess) >> filenam >> md5;
 
 5144    if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
 
 5147    if (slb) *slb = filenam;
 
 5149    if (filenam.BeginsWith(
"-")) {
 
 5153       Bool_t err = kFALSE;
 
 5154       filenam = filenam.Strip(TString::kLeading, 
'-');
 
 5155       TString packnam = filenam;
 
 5156       packnam.Remove(packnam.Length() - 4);  
 
 5158       TMD5 *md5local = fPackMgr->GetMD5(packnam);
 
 5159       if (md5local && md5 == (*md5local)) {
 
 5160          if ((opt & TProof::kRemoveOld)) {
 
 5161             if ((st = fPackMgr->Clean(packnam)))
 
 5162                Error(
"HandleCheckFile", 
"failure cleaning %s", packnam.Data());
 
 5165          st = fPackMgr->Unpack(packnam, md5local);
 
 5170                Info("HandleCheckFile",
 
 5171                     "package %s installed on node", filenam.Data());
 
 5174                Error(
"HandleCheckFile", 
"gunzip not found");
 
 5176                Error(
"HandleCheckFile", 
"package %s did not unpack into %s",
 
 5177                                         filenam.Data(), packnam.Data());
 
 5180             if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
 
 5185          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
 
 5188             Error("HandleCheckFile",
 
 5189                   "package %s not yet on node", filenam.Data());
 
 5200          fPackMgr->Remove(filenam);
 
 5201       } 
else if (IsMaster()) {
 
 5204          fPackMgr->GetParPath(filenam, parpath);
 
 5205          if (fProof->UploadPackage(parpath,
 
 5206                                   (TProof::EUploadPackageOpt)opt) != 0)
 
 5207             Info(
"HandleCheckFile",
 
 5208                   "problems uploading package %s", parpath.Data());
 
 5211       fSocket->Send(reply);
 
 5213    } 
else if (filenam.BeginsWith(
"+") || filenam.BeginsWith(
"=")) {
 
 5214       filenam.Remove(0,1);
 
 5216       TString parname = filenam;
 
 5218       TFile::EFileType ft = TFile::GetType(filenam);
 
 5219       if (ft == TFile::kWeb || ft == TFile::kNet) {
 
 5220          parname = gSystem->BaseName(filenam);
 
 5221          if (fPackMgr->Install(filenam) < 0) {
 
 5222             Warning(
"HandleCheckFile",
 
 5223                      "problems installing package %s", filenam.Data());
 
 5229       TMD5 *md5local = fPackMgr->ReadMD5(parname);
 
 5231       if (md5local && md5 == (*md5local)) {
 
 5235             Info("HandleCheckFile",
 
 5236                  "package %s already on node", parname.Data());
 
 5239             TString par = filenam;
 
 5240             if (ft != TFile::kWeb && ft != TFile::kNet) {
 
 5241                xrc = fPackMgr->GetParPath(filenam, par);
 
 5244                if (fProof->UploadPackage(par) != 0)
 
 5245                   Warning(
"HandleCheckFile",
 
 5246                           "problems uploading package %s", par.Data());
 
 5252          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
 
 5254             Info("HandleCheckFile",
 
 5255                  "package %s not yet on node", filenam.Data());
 
 5258       fSocket->Send(reply);
 
 5262       TString cachef = fCacheDir + 
"/" + filenam;
 
 5264       TMD5 *md5local = TMD5::FileChecksum(cachef);
 
 5266       if (md5local && md5 == (*md5local)) {
 
 5269             Info("HandleCheckFile", "file %s already on node", filenam.Data());
 
 5272          if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
 
 5274             Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
 
 5277       fSocket->Send(reply);
 
 5278       fCacheLock->Unlock();
 
 5285 Int_t TProofServ::HandleCache(TMessage *mess, TString *slb)
 
 5288       Info("HandleCache", "Enter");
 
 5292    Bool_t all = kFALSE;
 
 5294    Bool_t fromglobal = kFALSE;
 
 5295    Int_t chkveropt = TPackMgr::kCheckROOT;  
 
 5296    TPackMgr *packmgr = 0;
 
 5300    const 
char *k = (IsMaster()) ? "Mst" : "Wrk";
 
 5301    noth.Form("%s-%s", k, fOrdinal.Data());
 
 5304    TString packagedir, package, pdir, ocwd, file;
 
 5307       case TProof::kShowCache:
 
 5309          printf(
"*** File cache %s:%s ***\n", gSystem->HostName(),
 
 5313             gSystem->Exec(TString::Format(
"%s -a %s", kLS, fCacheDir.Data()));
 
 5315             gSystem->Exec(TString::Format(
"%s %s", kLS, fCacheDir.Data()));
 
 5317          if (IsMaster() && all)
 
 5318             fProof->ShowCache(all);
 
 5320          if (slb) slb->Form(
"%d %d", type, all);
 
 5322       case TProof::kClearCache:
 
 5324          if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
 
 5326          if (file.IsNull() || file == 
"*") {
 
 5327             gSystem->Exec(TString::Format(
"%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
 
 5329             gSystem->Exec(TString::Format(
"%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
 
 5331          fCacheLock->Unlock();
 
 5333             fProof->ClearCache(file);
 
 5334          if (slb) slb->Form(
"%d %s", type, file.Data());
 
 5336       case TProof::kShowPackages:
 
 5339          if (IsMaster() && all)
 
 5340             fProof->ShowPackages(all);
 
 5342          if (slb) slb->Form(
"%d %d", type, all);
 
 5344       case TProof::kClearPackages:
 
 5345          if ((status = fPackMgr->Unload(0)) == 0) {
 
 5348                status = fProof->ClearPackages();
 
 5350          if (slb) slb->Form(
"%d %d", type, status);
 
 5352       case TProof::kClearPackage:
 
 5354          if ((status = fPackMgr->Unload(package)) == 0) {
 
 5355             fPackMgr->Remove(package);
 
 5357                status = fProof->ClearPackage(package);
 
 5359          if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
 
 5361       case TProof::kBuildPackage:
 
 5363          if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
 
 5365          if (!(packmgr = TPackMgr::GetPackMgr(package.Data(), fPackMgr))) {
 
 5367             SendAsynMessage(TString::Format(
"%s: kBuildPackage: failure locating %s ...",
 
 5368                             noth.Data(), package.Data()));
 
 5372          fromglobal = (packmgr == fPackMgr) ? kFALSE : kTRUE;
 
 5373          packagedir = packmgr->GetTitle();
 
 5375          if (IsMaster() && !fromglobal) {
 
 5378             Int_t xrc = packmgr->GetParPath(package, par);
 
 5379             if (xrc != 0 || fProof->UploadPackage(par) != 0) {
 
 5380                Warning(
"HandleCache",
 
 5381                        "kBuildPackage: problems forwarding package %s to workers", package.Data());
 
 5382                SendAsynMessage(TString::Format(
"%s: kBuildPackage: problems forwarding package %s to workers ...",
 
 5383                                        noth.Data(), package.Data()));
 
 5391                     "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
 
 5395                fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
 
 5398             status = packmgr->Build(package.Data(), chkveropt);
 
 5403             SendAsynMessage(TString::Format(
"%s: failure building %s ... (status: %d)", noth.Data(), package.Data(), status));
 
 5407                status = fProof->BuildPackage(package, TProof::kCollectBuildResults);
 
 5409                Info("HandleCache", "package %s successfully built", package.Data());
 
 5411          if (slb) slb->Form("%d %s %d %d", type, package.Data(), status, chkveropt);
 
 5414       case TProof::kLoadPackage:
 
 5418          if ((mess->BufferSize() > mess->Length())) (*mess) >> optls;
 
 5420          if ((status = fPackMgr->Load(package.Data(), optls)) < 0) {
 
 5423             SendAsynMessage(TString::Format(
"%s: failure loading %s, args: %p (%d) ...",
 
 5424                                                  noth.Data(), package.Data(), optls,
 
 5425                                                  (optls && optls->GetSize() > 0) ? optls->GetSize() : 0));
 
 5430                if (optls && optls->GetSize() > 0) {
 
 5432                   status = fProof->LoadPackage(package, kFALSE, optls);
 
 5435                   status = fProof->LoadPackage(package);
 
 5440                Info("HandleCache", "package %s successfully loaded", package.Data());
 
 5443          if (slb) slb->Form("%d %s %d", type, package.Data(), status);
 
 5446       case TProof::kShowEnabledPackages:
 
 5448          {  TString title(
"*** Enabled packages ***");
 
 5449             if (!IsMaster() || all) {
 
 5450                title.Form(
"*** Enabled packages on %s %s on %s",
 
 5451                           (IsMaster()) ? 
"master" : 
"worker",
 
 5452                           fOrdinal.Data(), gSystem->HostName());
 
 5454             fPackMgr->ShowEnabled(title);
 
 5456          if (IsMaster() && all)
 
 5457             fProof->ShowEnabledPackages(all);
 
 5459          if (slb) slb->Form(
"%d %d", type, all);
 
 5461       case TProof::kShowSubCache:
 
 5463          if (IsMaster() && all)
 
 5464             fProof->ShowCache(all);
 
 5466          if (slb) slb->Form(
"%d %d", type, all);
 
 5468       case TProof::kClearSubCache:
 
 5470          if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
 
 5472             fProof->ClearCache(file);
 
 5473          if (slb) slb->Form(
"%d %s", type, file.Data());
 
 5475       case TProof::kShowSubPackages:
 
 5477          if (IsMaster() && all)
 
 5478             fProof->ShowPackages(all);
 
 5480          if (slb) slb->Form(
"%d %d", type, all);
 
 5482       case TProof::kDisableSubPackages:
 
 5484             fProof->DisablePackages();
 
 5485          if (slb) slb->Form(
"%d", type);
 
 5487       case TProof::kDisableSubPackage:
 
 5490             fProof->DisablePackage(package);
 
 5491          if (slb) slb->Form(
"%d %s", type, package.Data());
 
 5493       case TProof::kBuildSubPackage:
 
 5495          if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
 
 5497             fProof->BuildPackage(package, TProof::kBuildAll, chkveropt);
 
 5498          if (slb) slb->Form(
"%d %s %d", type, package.Data(), chkveropt);
 
 5500       case TProof::kUnloadPackage:
 
 5502          status = fPackMgr->Unload(package);
 
 5503          if (IsMaster() && status == 0)
 
 5504             status = fProof->UnloadPackage(package);
 
 5505          if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
 
 5507       case TProof::kDisablePackage:
 
 5509          fPackMgr->Remove(package);
 
 5511             fProof->DisablePackage(package);
 
 5512          if (slb) slb->Form(
"%d %s", type, package.Data());
 
 5514       case TProof::kUnloadPackages:
 
 5515          status = fPackMgr->Unload(0);
 
 5516          if (IsMaster() && status == 0)
 
 5517             status = fProof->UnloadPackages();
 
 5518          if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
 
 5520       case TProof::kDisablePackages:
 
 5523             fProof->DisablePackages();
 
 5524          if (slb) slb->Form(
"%d %s", type, package.Data());
 
 5526       case TProof::kListEnabledPackages:
 
 5527          msg.Reset(kPROOF_PACKAGE_LIST);
 
 5528          { TList *epl = fPackMgr->GetListOfEnabled();
 
 5534          if (slb) slb->Form(
"%d", type);
 
 5536       case TProof::kListPackages:
 
 5538             TList *pack = fPackMgr->GetList();
 
 5539             msg.Reset(kPROOF_PACKAGE_LIST);
 
 5540             msg << type << pack;
 
 5542             pack->SetOwner(kTRUE);
 
 5545          if (slb) slb->Form(
"%d", type);
 
 5547       case TProof::kLoadMacro:
 
 5554                fProof->Load(package, kFALSE, kTRUE);
 
 5559             TString originalCwd = gSystem->WorkingDirectory();
 
 5560             gSystem->ChangeDirectory(fCacheDir.Data());
 
 5563             TString pack(package);
 
 5565             if ((from = pack.Index(
",")) != kNPOS) pack.Remove(from);
 
 5566             Info(
"HandleCache", 
"loading macro %s ...", pack.Data());
 
 5567             gROOT->ProcessLine(TString::Format(
".L %s", pack.Data()));
 
 5570             gSystem->ChangeDirectory(originalCwd.Data());
 
 5571             fCacheLock->Unlock();
 
 5576                fProof->Load(package, kFALSE, kFALSE);
 
 5581             if (slb) slb->Form(
"%d %s", type, package.Data());
 
 5585          Error(
"HandleCache", 
"unknown type %d", type);
 
 5596 Int_t TProofServ::HandleWorkerLists(TMessage *mess)
 
 5599       Info("HandleWorkerLists", "Enter");
 
 5601    Int_t type = 0, rc = 0;
 
 5607       case TProof::kActivateWorker:
 
 5609          if (ord != 
"*" && !ord.BeginsWith(GetOrdinal()) && ord != 
"restore") 
break;
 
 5611             Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
 
 5612             Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
 
 5613                             fProof->GetListOfBadSlaves()->GetSize();
 
 5614             if (nact < nactmax || !IsEndMaster()) {
 
 5615                Int_t nwc = fProof->ActivateWorker(ord);
 
 5616                Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
 
 5618                   if (nactnew == nactmax) {
 
 5619                      PDB(kGlobal, 1) Info("HandleWorkerList", "all workers (re-)activated");
 
 5622                         PDB(kGlobal, 1) Info("HandleWorkerList", "%d workers could not be (re-)activated", nactmax - nactnew);
 
 5624                } else if (ord == "restore") {
 
 5626                      PDB(kGlobal, 1) Info("HandleWorkerList","active worker(s) restored");
 
 5628                      Error(
"HandleWorkerList", 
"some active worker(s) could not be restored; check logs");
 
 5631                   if (nactnew == (nact + nwc)) {
 
 5633                         PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s (re-)activated", ord.Data());
 
 5635                      if (nwc != -2 && IsEndMaster()) {
 
 5636                         Error(
"HandleWorkerList", 
"some worker(s) could not be (re-)activated;" 
 5637                                                   " # of actives: %d --> %d (nwc: %d)",
 
 5638                                                   nact, nactnew, nwc);
 
 5640                      rc = (nwc < 0) ? nwc : -1;
 
 5644                PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already active");
 
 5647             Warning(
"HandleWorkerList",
"undefined PROOF session: protocol error?");
 
 5650       case TProof::kDeactivateWorker:
 
 5652          if (ord != 
"*" && !ord.BeginsWith(GetOrdinal()) && ord != 
"restore") 
break;
 
 5654             Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
 
 5656                Int_t nwc = fProof->DeactivateWorker(ord);
 
 5657                Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
 
 5660                      PDB(kGlobal, 1) Info("HandleWorkerList","all workers deactivated");
 
 5663                         PDB(kGlobal, 1) Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
 
 5666                   if (nactnew == (nact - nwc)) {
 
 5668                         PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s deactivated", ord.Data());
 
 5670                      if (nwc != -2 && IsEndMaster()) {
 
 5671                         Error(
"HandleWorkerList", 
"some worker(s) could not be deactivated:" 
 5672                                                   " # of actives: %d --> %d (nwc: %d)",
 
 5673                                                   nact, nactnew, nwc);
 
 5675                      rc = (nwc < 0) ? nwc : -1;
 
 5679                PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already inactive");
 
 5682             Warning(
"HandleWorkerList",
"undefined PROOF session: protocol error?");
 
 5686          Warning(
"HandleWorkerList",
"unknown action type (%d)", type);
 
 5697 TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
 
 5702    TProofResourcesStatic *resources =
 
 5703       new TProofResourcesStatic(fConfDir, fConfFile);
 
 5704    fConfFile = resources->GetFileName(); 
 
 5706          Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
 
 5709    TProofNodeInfo *master = resources->GetMaster();
 
 5713               "no appropriate master line found in %s", fConfFile.Data());
 
 5717       if (fImage.IsNull() && strlen(master->GetImage()) > 0)
 
 5718          fImage = master->GetImage();
 
 5723       if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
 
 5725             resources->GetSubmasters()->Print();
 
 5726          TProofNodeInfo *ni = 0;
 
 5727          TIter nw(resources->GetSubmasters());
 
 5728          while ((ni = (TProofNodeInfo *) nw()))
 
 5729             workers->Add(new TProofNodeInfo(*ni));
 
 5730       } else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
 
 5732             resources->GetWorkers()->Print();
 
 5733          TProofNodeInfo *ni = 0;
 
 5734          TIter nw(resources->GetWorkers());
 
 5735          while ((ni = (TProofNodeInfo *) nw()))
 
 5736             workers->Add(new TProofNodeInfo(*ni));
 
 5749 FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
 
 5751    FILE *oldferr = fgErrorHandlerFile;
 
 5752    fgErrorHandlerFile = (ferr) ? ferr : stderr;
 
 5760 void TProofServ::ErrorHandler(Int_t level, Bool_t abort, 
const char *location,
 
 5763    if (gErrorIgnoreLevel == kUnset) {
 
 5764       gErrorIgnoreLevel = 0;
 
 5766          TString lvl = gEnv->GetValue(
"Root.ErrorIgnoreLevel", 
"Print");
 
 5767          if (!lvl.CompareTo(
"Print", TString::kIgnoreCase))
 
 5768             gErrorIgnoreLevel = kPrint;
 
 5769          else if (!lvl.CompareTo(
"Info", TString::kIgnoreCase))
 
 5770             gErrorIgnoreLevel = kInfo;
 
 5771          else if (!lvl.CompareTo(
"Warning", TString::kIgnoreCase))
 
 5772             gErrorIgnoreLevel = kWarning;
 
 5773          else if (!lvl.CompareTo(
"Error", TString::kIgnoreCase))
 
 5774             gErrorIgnoreLevel = kError;
 
 5775          else if (!lvl.CompareTo(
"Break", TString::kIgnoreCase))
 
 5776             gErrorIgnoreLevel = kBreak;
 
 5777          else if (!lvl.CompareTo(
"SysError", TString::kIgnoreCase))
 
 5778             gErrorIgnoreLevel = kSysError;
 
 5779          else if (!lvl.CompareTo(
"Fatal", TString::kIgnoreCase))
 
 5780             gErrorIgnoreLevel = kFatal;
 
 5784    if (level < gErrorIgnoreLevel)
 
 5788    if (level >= kError && gProofServ)
 
 5789       gProofServ->LogToMaster();
 
 5791    Bool_t tosyslog = (fgLogToSysLog > 2) ? kTRUE : kFALSE;
 
 5793    const char *type   = 0;
 
 5794    ELogLevel loglevel = kLogInfo;
 
 5796    Int_t ipos = (location) ? strlen(location) : 0;
 
 5798    if (level >= kPrint) {
 
 5799       loglevel = kLogInfo;
 
 5802    if (level >= kInfo) {
 
 5803       loglevel = kLogInfo;
 
 5804       char *ps = location ? (
char *) strrchr(location, 
'|') : (char *)0;
 
 5806          ipos = (int)(ps - (
char *)location);
 
 5812    if (level >= kWarning) {
 
 5813       loglevel = kLogWarning;
 
 5816    if (level >= kError) {
 
 5820    if (level >= kBreak) {
 
 5822       type = 
"*** Break ***";
 
 5824    if (level >= kSysError) {
 
 5828    if (level >= kFatal) {
 
 5838    TString st(ts.AsString(
"lc"),19);
 
 5840    if (!location || ipos == 0 ||
 
 5841        (level >= kPrint && level < kInfo) ||
 
 5842        (level >= kBreak && level < kSysError)) {
 
 5843       fprintf(fgErrorHandlerFile, 
"%s %5d %s | %s: %s\n", st(11,8).Data(),
 
 5845                                  (gProofServ ? gProofServ->GetPrefix() : 
"proof"),
 
 5848          buf.Form(
"%s: %s:%s", fgSysLogEntity.Data(), type, msg);
 
 5850       fprintf(fgErrorHandlerFile, 
"%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
 
 5852                                  (gProofServ ? gProofServ->GetPrefix() : 
"proof"),
 
 5853                                   type, ipos, location, msg);
 
 5855          buf.Form(
"%s: %s:<%.*s>: %s", fgSysLogEntity.Data(), type, ipos, location, msg);
 
 5857    fflush(fgErrorHandlerFile);
 
 5860       gSystem->Syslog(loglevel, buf);
 
 5863    if (__crashreporter_info__)
 
 5864       delete [] __crashreporter_info__;
 
 5865    __crashreporter_info__ = StrDup(buf);
 
 5870       static Bool_t recursive = kFALSE;
 
 5872       if (gProofServ != 0 && !recursive) {
 
 5874          if (gProofServ->GetSocket()) gProofServ->GetSocket()->Send(kPROOF_FATAL);
 
 5878       fprintf(fgErrorHandlerFile, 
"aborting\n");
 
 5879       fflush(fgErrorHandlerFile);
 
 5880       gSystem->StackTrace();
 
 5888 void TProofServ::MakePlayer()
 
 5890    TVirtualProofPlayer *p = 0;
 
 5897       p = fProof->MakePlayer();
 
 5900       p = TVirtualProofPlayer::Create(
"slave", 0, fSocket);
 
 5902          fProof->SetPlayer(p);
 
 5912 void TProofServ::DeletePlayer()
 
 5917          Printf(
" +++ Latest processing times: %f s (CPU: %f s)",
 
 5918                 fCompute.RealTime(), fCompute.CpuTime());
 
 5920       if (fProof) fProof->SetPlayer(0);
 
 5922       SafeDelete(fPlayer);
 
 5944 Int_t TProofServ::GetPriority()
 
 5946    TString sqlserv = gEnv->GetValue(
"ProofServ.QueryLogDB",
"");
 
 5947    TString sqluser = gEnv->GetValue(
"ProofServ.QueryLogUser",
"");
 
 5948    TString sqlpass = gEnv->GetValue(
"ProofServ.QueryLogPasswd",
"");
 
 5950    Int_t priority = 100;
 
 5956    sql.Form(
"SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
 
 5959    TSQLServer *db =  TSQLServer::Connect(sqlserv, sqluser, sqlpass);
 
 5961    if (!db || db->IsZombie()) {
 
 5962       Error(
"GetPriority", 
"failed to connect to SQL server %s as %s %s",
 
 5963             sqlserv.Data(), sqluser.Data(), sqlpass.Data());
 
 5964       printf(
"%s\n", sql.Data());
 
 5966       TSQLResult *res = db->Query(sql);
 
 5969          Error(
"GetPriority", 
"query into proofpriority failed");
 
 5970          Printf(
"%s", sql.Data());
 
 5972          TSQLRow *row = res->Next();   
 
 5974             priority = atoi(row->GetField(0));
 
 5977             Error(
"GetPriority", 
"first row is header is NULL");
 
 5994 void TProofServ::SendAsynMessage(
const char *msg, Bool_t lf)
 
 5996    static TMessage m(kPROOF_MESSAGE);
 
 6001       Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
 
 6003    if (fSocket && msg) {
 
 6004       m.Reset(kPROOF_MESSAGE);
 
 6005       m << TString(msg) << lf;
 
 6006       if (fSocket->Send(m) <= 0)
 
 6007          Warning(
"SendAsynMessage", 
"could not send message '%s'", msg);
 
 6018 void TProofServ::FlushLogFile()
 
 6020    off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
 
 6021    if (lend >= 0) lseek(fLogFileDes, lend, SEEK_SET);
 
 6028 void TProofServ::TruncateLogFile()
 
 6032    if (fLogFileMaxSize > 0 && fLogFileDes > 0) {
 
 6035       if (fstat(fLogFileDes, &st) == 0) {
 
 6036          if (st.st_size >= fLogFileMaxSize) {
 
 6037             off_t truncsz = (off_t) (( fLogFileMaxSize * 80 ) / 100 );
 
 6038             if (truncsz < 100) {
 
 6039                emsg.Form(
"+++ WARNING +++: %s: requested truncate size too small" 
 6040                          " (%lld,%lld) - ignore ", fPrefix.Data(), (Long64_t) truncsz, fLogFileMaxSize);
 
 6041                SendAsynMessage(emsg.Data());
 
 6044             TSystem::ResetErrno();
 
 6045             while (ftruncate(fileno(stdout), truncsz) != 0 &&
 
 6046                    (TSystem::GetErrno() == EINTR)) {
 
 6047                TSystem::ResetErrno();
 
 6049             if (TSystem::GetErrno() > 0) {
 
 6050                Error(
"TruncateLogFile", 
"truncating to %lld bytes; file size is %lld bytes (errno: %d)",
 
 6051                                         (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
 
 6052                emsg.Form(
"+++ WARNING +++: %s: problems truncating log file to %lld bytes; file size is %lld bytes" 
 6053                          " (errno: %d)", fPrefix.Data(), (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
 
 6054                SendAsynMessage(emsg.Data());
 
 6056                Info(
"TruncateLogFile", 
"file truncated to %lld bytes (80%% of %lld); file size was %lld bytes ",
 
 6057                                        (Long64_t)truncsz, fLogFileMaxSize, (Long64_t)st.st_size);
 
 6058                emsg.Form(
"+++ WARNING +++: %s: log file truncated to %lld bytes (80%% of %lld)",
 
 6059                                        fPrefix.Data(), (Long64_t)truncsz, fLogFileMaxSize);
 
 6060                SendAsynMessage(emsg.Data());
 
 6064          emsg.Form(
"+++ WARNING +++: %s: could not stat log file descriptor" 
 6065                    " for truncation (errno: %d)", fPrefix.Data(), TSystem::GetErrno());
 
 6066          SendAsynMessage(emsg.Data());
 
 6075 void TProofServ::HandleException(Int_t sig)
 
 6077    Error(
"HandleException", 
"caugth exception triggered by signal '%d' %s %lld",
 
 6078                             sig, fgLastMsg.Data(), fgLastEntry);
 
 6081    emsg.Form(
"%s: caught exception triggered by signal '%d' %s %lld",
 
 6082              GetOrdinal(), sig, fgLastMsg.Data(), fgLastEntry);
 
 6084    SendAsynMessage(emsg.Data());
 
 6092 Int_t TProofServ::HandleDataSets(TMessage *mess, TString *slb)
 
 6095       Info(
"HandleDataSets", 
"enter");
 
 6098    if (!fDataSetManager) {
 
 6099       Warning(
"HandleDataSets", 
"no data manager is available to fullfil the request");
 
 6104    TString dsUser, dsGroup, dsName, dsTree, uri, opt;
 
 6108    TPMERegexp reInvalid(
"[^A-Za-z0-9._-]");  
 
 6115       case TProof::kCheckDataSetName:
 
 6120             if (slb) slb->Form(
"%d %s", type, uri.Data());
 
 6121             if (fDataSetManager->ExistsDataSet(uri))
 
 6126       case TProof::kRegisterDataSet:
 
 6129             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
 
 6132                if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
 
 6134                TFileCollection *dataSet =
 
 6135                   dynamic_cast<TFileCollection*
> ((mess->ReadObject(TFileCollection::Class())));
 
 6136                if (!dataSet || dataSet->GetList()->GetSize() == 0) {
 
 6137                   Error(
"HandleDataSets", 
"can not save an empty list.");
 
 6141                rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
 
 6145                Info(
"HandleDataSets", 
"dataset registration not allowed");
 
 6146                if (slb) slb->Form(
"%d notallowed", type);
 
 6152       case TProof::kRequestStaging:
 
 6156             if (!fDataSetStgRepo) {
 
 6157                Error(
"HandleDataSets",
 
 6158                   "no dataset staging request repository available");
 
 6163             TString validUri = uri;
 
 6164             while (reInvalid.Substitute(validUri, 
"_")) {}
 
 6168             if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
 
 6169                Warning(
"HandleDataSets", 
"staging of %s already requested",
 
 6175             TFileCollection *fc = fDataSetManager->GetDataSet(uri.Data());
 
 6176             if (!fc || (fc->GetNFiles() == 0)) {
 
 6177                Error(
"HandleDataSets", 
"empty dataset or no dataset returned");
 
 6183             TIter it(fc->GetList());
 
 6185             while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
 
 6186                fi->ResetBit(TFileInfo::kStaged);
 
 6187                Int_t nToErase = fi->GetNUrls() - 1;
 
 6188                for (Int_t i=0; i<nToErase; i++)
 
 6195             fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
 
 6196             if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser,
 
 6199                Error(
"HandleDataSets",
 
 6200                   "can't register staging request for %s", uri.Data());
 
 6205             Info(
"HandleDataSets",
 
 6206                "Staging request registered for %s", uri.Data());
 
 6213       case TProof::kStagingStatus:
 
 6215             if (!fDataSetStgRepo) {
 
 6216                Error(
"HandleDataSets",
 
 6217                   "no dataset staging request repository available");
 
 6224             while (reInvalid.Substitute(uri, 
"_")) {}
 
 6227             TFileCollection *fc = fDataSetStgRepo->GetDataSet(uri.Data());
 
 6229                fSocket->SendObject(fc, kMESS_OK);
 
 6235                Info(
"HandleDataSets", 
"no pending staging request for %s",
 
 6242       case TProof::kCancelStaging:
 
 6244             if (!fDataSetStgRepo) {
 
 6245                Error(
"HandleDataSets",
 
 6246                   "no dataset staging request repository available");
 
 6253             while (reInvalid.Substitute(uri, 
"_")) {}
 
 6255             if (!fDataSetStgRepo->RemoveDataSet(uri.Data()))
 
 6262       case TProof::kShowDataSets:
 
 6264             (*mess) >> uri >> opt;
 
 6265             if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
 
 6267             fDataSetManager->ShowDataSets(uri, opt);
 
 6271       case TProof::kGetDataSets:
 
 6273             (*mess) >> uri >> opt;
 
 6274             if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
 
 6276             UInt_t omsk = (UInt_t)TDataSetManager::kExport;
 
 6277             Ssiz_t kLite = opt.Index(
":lite:", 0, TString::kIgnoreCase);
 
 6278             if (kLite != kNPOS) {
 
 6279                omsk |= (UInt_t)TDataSetManager::kReadShort;
 
 6280                opt.Remove(kLite, strlen(
":lite:"));
 
 6282             TMap *returnMap = fDataSetManager->GetDataSets(uri, omsk);
 
 6284             if (returnMap && !opt.IsNull()) {
 
 6286                TMap *rmap = 
new TMap;
 
 6288                TFileCollection *fc = 0, *xfc = 0;
 
 6289                TIter nxd(returnMap);
 
 6290                while ((k = nxd()) && (fc = (TFileCollection *) returnMap->GetValue(k))) {
 
 6292                   if ((xfc = fc->GetFilesOnServer(opt.Data()))) {
 
 6293                      rmap->Add(
new TObjString(k->GetName()), xfc);
 
 6296                returnMap->DeleteAll();
 
 6297                if (rmap->GetSize() > 0) {
 
 6300                   Info(
"HandleDataSets", 
"no dataset found on server '%s'", opt.Data());
 
 6307                fSocket->SendObject(returnMap, kMESS_OK);
 
 6308                returnMap->DeleteAll();
 
 6315       case TProof::kGetDataSet:
 
 6317             (*mess) >> uri >> opt;
 
 6318             if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
 
 6320             TFileCollection *fileList = fDataSetManager->GetDataSet(uri,opt);
 
 6322                fSocket->SendObject(fileList, kMESS_OK);
 
 6330       case TProof::kRemoveDataSet:
 
 6332             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
 
 6334                if (slb) slb->Form(
"%d %s", type, uri.Data());
 
 6335                if (!fDataSetManager->RemoveDataSet(uri)) {
 
 6340                Info(
"HandleDataSets", 
"dataset creation / removal not allowed");
 
 6341                if (slb) slb->Form(
"%d notallowed", type);
 
 6346       case TProof::kVerifyDataSet:
 
 6348             if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
 
 6349                (*mess) >> uri >> opt;
 
 6350                if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
 
 6351                TProofServLogHandlerGuard hg(fLogFile,  fSocket);
 
 6352                rc = fDataSetManager->ScanDataSet(uri, opt);
 
 6360                Info(
"HandleDataSets", 
"dataset verification not allowed");
 
 6365       case TProof::kGetQuota:
 
 6367             if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
 
 6368                if (slb) slb->Form(
"%d", type);
 
 6369                TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
 
 6370                if (groupQuotaMap) {
 
 6372                   fSocket->SendObject(groupQuotaMap, kMESS_OK);
 
 6377                Info(
"HandleDataSets", 
"quota control disabled");
 
 6378                if (slb) slb->Form(
"%d disabled", type);
 
 6383       case TProof::kShowQuota:
 
 6385             if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
 
 6386                if (slb) slb->Form(
"%d", type);
 
 6389                fDataSetManager->ShowQuota(opt);
 
 6391                Info(
"HandleDataSets", 
"quota control disabled");
 
 6392                if (slb) slb->Form(
"%d disabled", type);
 
 6396       case TProof::kSetDefaultTreeName:
 
 6398             if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
 
 6400                if (slb) slb->Form(
"%d %s", type, uri.Data());
 
 6401                rc = fDataSetManager->ScanDataSet(uri, (UInt_t)TDataSetManager::kSetDefaultTree);
 
 6403                Info(
"HandleDataSets", 
"kSetDefaultTreeName: modification of dataset info not allowed");
 
 6404                if (slb) slb->Form(
"%d notallowed", type);
 
 6409       case TProof::kCache:
 
 6411             (*mess) >> uri >> opt;
 
 6412             if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
 
 6413             if (opt == 
"show") {
 
 6415                fDataSetManager->ShowCache(uri);
 
 6416             } 
else if (opt == 
"clear") {
 
 6418                fDataSetManager->ClearCache(uri);
 
 6420                Error(
"HandleDataSets", 
"kCache: unknown action: %s", opt.Data());
 
 6426          Error(
"HandleDataSets", 
"unknown type %d", type);
 
 6437 void TProofServ::HandleSubmerger(TMessage *mess)
 
 6445       case TProof::kOutputSize:
 
 6448       case TProof::kSendOutput:
 
 6450             Bool_t deleteplayer = kTRUE;
 
 6452                if (fMergingMonitor) {
 
 6453                   Info(
"HandleSubmerger", 
"kSendOutput: interrupting ...");
 
 6454                   fMergingMonitor->Interrupt();
 
 6456                if (fMergingSocket) {
 
 6457                   if (fMergingMonitor) fMergingMonitor->Remove(fMergingSocket);
 
 6458                   fMergingSocket->Close();
 
 6459                   SafeDelete(fMergingSocket);
 
 6464                Int_t merger_id = -1;
 
 6465                (*mess) >> merger_id >> name >> port;
 
 6467                   Info("HandleSubmerger","worker %s redirected to merger 
#%d %s:%d", fOrdinal.Data(), merger_id, name.Data(), port); 
 6470                if (name.Length() > 0 && port > 0 && (t = 
new TSocket(name, port)) && t->IsValid()) {
 
 6472                   PDB(kSubmerger, 2) Info("HandleSubmerger",
 
 6473                                           "kSendOutput: worker asked for sending output to merger 
#%d %s:%d", 
 6474                                           merger_id, name.Data(), port);
 
 6476                   if (SendResults(t, fPlayer->GetOutputList()) != 0) {
 
 6477                      msg.Form(
"worker %s cannot send results to merger #%d at %s:%d", GetPrefix(), merger_id, name.Data(), port);
 
 6478                      PDB(kSubmerger, 2) Info("HandleSubmerger",
 
 6479                                              "kSendOutput: %s - inform the master", msg.Data());
 
 6480                      SendAsynMessage(msg);
 
 6482                      TMessage answ(kPROOF_SUBMERGER);
 
 6483                      answ << Int_t(TProof::kMergerDown);
 
 6485                      fSocket->Send(answ);
 
 6488                      TMessage answ(kPROOF_SUBMERGER);
 
 6489                      answ << Int_t(TProof::kOutputSent);
 
 6491                      fSocket->Send(answ);
 
 6493                      PDB(kSubmerger, 2) Info("HandleSubmerger", "kSendOutput: worker sent its output");
 
 6494                      fSocket->Send(kPROOF_SETIDLE);
 
 6500                   if (name == 
"master") {
 
 6501                      PDB(kSubmerger, 2) Info("HandleSubmerger",
 
 6502                                              "kSendOutput: worker was asked for sending output to master");
 
 6503                      if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
 
 6504                         Warning("HandleSubmerger", "problems sending output list");
 
 6506                      fSocket->Send(kPROOF_SETIDLE);
 
 6510                   } else if (!t || !(t->IsValid())) {
 
 6511                      msg.Form(
"worker %s could not open a valid socket to merger #%d at %s:%d",
 
 6512                               GetPrefix(), merger_id, name.Data(), port);
 
 6513                      PDB(kSubmerger, 2) Info("HandleSubmerger",
 
 6514                                              "kSendOutput: %s - inform the master", msg.Data());
 
 6515                      SendAsynMessage(msg);
 
 6517                      TMessage answ(kPROOF_SUBMERGER);
 
 6518                      answ << Int_t(TProof::kMergerDown);
 
 6520                      fSocket->Send(answ);
 
 6521                      deleteplayer = kFALSE;
 
 6529                Error(
"HandleSubmerger", 
"kSendOutput: received not on worker");
 
 6533             if (deleteplayer) DeletePlayer();
 
 6536       case TProof::kBeMerger:
 
 6538             Bool_t deleteplayer = kTRUE;
 
 6540                Int_t merger_id = -1;
 
 6542                Int_t connections = 0;
 
 6543                (*mess) >> merger_id  >> connections;
 
 6545                   Info("HandleSubmerger", "worker %s established as merger", fOrdinal.Data());
 
 6548                   Info("HandleSubmerger",
 
 6549                        "kBeMerger: worker asked for being merger 
#%d for %d connections", 
 6550                        merger_id, connections);
 
 6552                TVirtualProofPlayer *mergerPlayer =  TVirtualProofPlayer::Create(
"remote",fProof,0);
 
 6555                   PDB(kSubmerger, 2) Info("HandleSubmerger",
 
 6556                                           "kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
 
 6559                   mergerPlayer->SetBit(TVirtualProofPlayer::kIsSubmerger);
 
 6562                   if (AcceptResults(connections, mergerPlayer)) {
 
 6564                         Info("HandleSubmerger", "kBeMerger: all outputs from workers accepted");
 
 6567                         Info("","adding own output to the list on %s", fOrdinal.Data());
 
 6574                      TIter nxo(fPlayer->GetOutputList());
 
 6576                      while ((o = nxo())) {
 
 6577                         if ((mergerPlayer->AddOutputObject(o) != 1)) {
 
 6580                            if (fPlayer->GetOutputList()) {
 
 6582                                  Info("HandleSocketInput", "removing merged 
object (%p)", o);
 
 6583                               fPlayer->GetOutputList()->Remove(o);
 
 6587                      PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: own outputs added");
 
 6588                      PDB(kSubmerger, 2) Info("HandleSubmerger","starting delayed merging on %s", fOrdinal.Data());
 
 6591                      mergerPlayer->MergeOutput(kTRUE);
 
 6593                      PDB(kSubmerger, 2) mergerPlayer->GetOutputList()->Print("all");
 
 6595                      PDB(kSubmerger, 2) Info("HandleSubmerger", "delayed merging on %s finished ", fOrdinal.Data());
 
 6596                      PDB(kSubmerger, 2) Info("HandleSubmerger", "%s sending results to master ", fOrdinal.Data());
 
 6598                      if (SendResults(fSocket, mergerPlayer->GetOutputList()) != 0)
 
 6599                         Warning("HandleSubmerger","kBeMerger: problems sending output list");
 
 6600                      if (mergerPlayer->GetOutputList())
 
 6601                         mergerPlayer->GetOutputList()->SetOwner(kTRUE);
 
 6603                      PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: results sent to master");
 
 6605                      fSocket->Send(kPROOF_SETIDLE);
 
 6610                      TMessage answ(kPROOF_SUBMERGER);
 
 6611                      answ << Int_t(TProof::kMergerDown);
 
 6613                      fSocket->Send(answ);
 
 6614                      deleteplayer = kFALSE;
 
 6617                   SafeDelete(mergerPlayer);
 
 6620                   Warning(
"HandleSubmerger",
"kBeMerger: problems craeting the merger player!");
 
 6622                   TMessage answ(kPROOF_SUBMERGER);
 
 6623                   answ << Int_t(TProof::kMergerDown);
 
 6625                   fSocket->Send(answ);
 
 6626                   deleteplayer = kFALSE;
 
 6629                Error(
"HandleSubmerger",
"kSendOutput: received not on worker");
 
 6633             if (deleteplayer) DeletePlayer();
 
 6637       case TProof::kMergerDown:
 
 6640       case TProof::kStopMerging:
 
 6643             PDB(kSubmerger, 2)  Info("HandleSubmerger", "kStopMerging");
 
 6644             if (fMergingMonitor) {
 
 6645                Info(
"HandleSubmerger", 
"kStopMerging: interrupting ...");
 
 6646                fMergingMonitor->Interrupt();
 
 6651       case TProof::kOutputSent:
 
 6659 void TProofServ::HandleFork(TMessage *)
 
 6661    Info(
"HandleFork", 
"fork cloning not implemented");
 
 6670 Int_t TProofServ::Fork()
 
 6675    if ((pid = fork()) < 0) {
 
 6676       Error(
"Fork", 
"failed to fork");
 
 6681    if (!pid) 
return pid;
 
 6684    if (!fReaperTimer) {
 
 6685       fReaperTimer = 
new TReaperTimer(1000);
 
 6686       fReaperTimer->Start(-1);
 
 6690    fReaperTimer->AddPid(pid);
 
 6695    Warning(
"Fork", 
"Functionality not provided under windows");
 
 6707 void TProofServ::ResolveKeywords(TString &fname, 
const char *path)
 
 6710    if (fname.Contains(
"<user>")) {
 
 6711       if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
 
 6712          fname.ReplaceAll(
"<user>", gProofServ->GetUser());
 
 6713       } 
else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
 
 6714          fname.ReplaceAll(
"<user>", gProof->GetUser());
 
 6716          fname.ReplaceAll(
"<user>", 
"nouser");
 
 6720    if (fname.Contains(
"<u>")) {
 
 6721       if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
 
 6722          TString u(gProofServ->GetUser()[0]);
 
 6723          fname.ReplaceAll(
"<u>", u);
 
 6724       } 
else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
 
 6725          TString u(gProof->GetUser()[0]);
 
 6726          fname.ReplaceAll(
"<u>", u);
 
 6728          fname.ReplaceAll(
"<u>", 
"n");
 
 6732    if (fname.Contains(
"<group>")) {
 
 6733       if (gProofServ && gProofServ->GetGroup() && strlen(gProofServ->GetGroup())) {
 
 6734          fname.ReplaceAll(
"<group>", gProofServ->GetGroup());
 
 6735       } 
else if (gProof && gProof->GetGroup() && strlen(gProof->GetGroup())) {
 
 6736          fname.ReplaceAll(
"<group>", gProof->GetGroup());
 
 6738          fname.ReplaceAll(
"<group>", 
"default");
 
 6742    if (fname.Contains(
"<stag>")) {
 
 6743       if (gProofServ && gProofServ->GetSessionTag() && strlen(gProofServ->GetSessionTag())) {
 
 6744          fname.ReplaceAll(
"<stag>", gProofServ->GetSessionTag());
 
 6745       } 
else if (gProof && gProof->GetSessionTag() && strlen(gProof->GetSessionTag())) {
 
 6746          fname.ReplaceAll(
"<stag>", gProof->GetSessionTag());
 
 6748          ::Warning(
"TProofServ::ResolveKeywords", 
"session tag undefined: ignoring");
 
 6752    if (fname.Contains(
"<ord>")) {
 
 6753       if (gProofServ && gProofServ->GetOrdinal() && strlen(gProofServ->GetOrdinal()))
 
 6754          fname.ReplaceAll(
"<ord>", gProofServ->GetOrdinal());
 
 6756          ::Warning(
"TProofServ::ResolveKeywords", 
"ordinal number undefined: ignoring");
 
 6759    if (fname.Contains(
"<qnum>")) {
 
 6760       if (gProofServ && gProofServ->GetQuerySeqNum() && gProofServ->GetQuerySeqNum() > 0)
 
 6761          fname.ReplaceAll(
"<qnum>", TString::Format(
"%d", gProofServ->GetQuerySeqNum()).Data());
 
 6763          ::Warning(
"TProofServ::ResolveKeywords", 
"query seqeuntial number undefined: ignoring");
 
 6766    if (fname.Contains(
"<file>") && path && strlen(path) > 0) {
 
 6767       fname.ReplaceAll(
"<file>", path);
 
 6770    if (fname.Contains(
"<rver>")) {
 
 6771       TString v = TString::Format(
"%d", gROOT->GetVersionInt());
 
 6772       fname.ReplaceAll(
"<rver>", v);
 
 6775    if (fname.Contains(
"<build>")) {
 
 6776       TString b = TString::Format(
"%d_%s_%s", gROOT->GetVersionInt(), gSystem->GetBuildArch(),
 
 6777                                   gSystem->GetBuildCompilerVersion());
 
 6778       fname.ReplaceAll(
"<build>", b);
 
 6791 Int_t TProofServ::GetSessionStatus()
 
 6793    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6794    Int_t st = (fIdle) ? 0 : 1;
 
 6795    if (fIdle && fWaitingQueries->GetSize() > 0) st = 3;
 
 6804 Int_t TProofServ::UpdateSessionStatus(Int_t xst)
 
 6806    FILE *fs = fopen(fAdminPath.Data(), 
"w");
 
 6808       Int_t st = (xst < 0) ? GetSessionStatus() : xst;
 
 6809       fprintf(fs, 
"%d", st);
 
 6812          Info("UpdateSessionStatus", "status (=%d) update in path: %s", st, fAdminPath.Data());
 
 6823 Bool_t TProofServ::IsIdle()
 
 6825    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6832 void TProofServ::SetIdle(Bool_t st)
 
 6834    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6841 Bool_t TProofServ::IsWaiting()
 
 6843    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6844    if (fIdle && fWaitingQueries->GetSize() > 0) 
return kTRUE;
 
 6851 Int_t TProofServ::WaitingQueries()
 
 6853    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6854    return fWaitingQueries->GetSize();
 
 6861 Int_t TProofServ::QueueQuery(TProofQueryResult *pq)
 
 6863    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6864    fWaitingQueries->Add(pq);
 
 6865    return fWaitingQueries->GetSize();
 
 6872 TProofQueryResult *TProofServ::NextQuery()
 
 6874    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6875    TProofQueryResult *pq = (TProofQueryResult *) fWaitingQueries->First();
 
 6876    fWaitingQueries->Remove(pq);
 
 6885 Int_t TProofServ::CleanupWaitingQueries(Bool_t del, TList *qls)
 
 6887    std::lock_guard<std::recursive_mutex> lock(fQMtx);
 
 6892       while ((o = nxq())) {
 
 6893          if (fWaitingQueries->FindObject(o)) ncq++;
 
 6894          fWaitingQueries->Remove(o);
 
 6898       ncq = fWaitingQueries->GetSize();
 
 6899       fWaitingQueries->SetOwner(del);
 
 6900       fWaitingQueries->Delete();
 
 6909 void TProofServ::SetLastMsg(
const char *lastmsg)
 
 6911    fgLastMsg = lastmsg;
 
 6917 void TProofServ::SetLastEntry(Long64_t entry)
 
 6919    fgLastEntry = entry;
 
 6925 Long_t TProofServ::GetVirtMemMax()
 
 6927    return fgVirtMemMax;
 
 6932 Long_t TProofServ::GetResMemMax()
 
 6939 Float_t TProofServ::GetMemHWM()
 
 6946 Float_t TProofServ::GetMemStop()
 
 6954 void TProofServ::GetLocalServer(TString &dsrv)
 
 6957    if (gSystem->Getenv(
"LOCALDATASERVER")) {
 
 6958       dsrv = gSystem->Getenv(
"LOCALDATASERVER");
 
 6959       if (!dsrv.EndsWith(
"/")) dsrv += 
"/";
 
 6971 void TProofServ::FilterLocalroot(TString &path, 
const char *dsrv)
 
 6973    TUrl u(path, kTRUE);
 
 6974    if (!strcmp(u.GetProtocol(), 
"file")) {
 
 6976       TString pfx  = gEnv->GetValue(
"Path.Localroot",
"");
 
 6977       if (!pfx.IsNull() && !strncmp(u.GetFile(), pfx.Data(), pfx.Length())) {
 
 6978          TString srvp = TUrl(dsrv).GetProtocol();
 
 6979          if (srvp == 
"root" || srvp == 
"xrd") path.Remove(0, pfx.Length());
 
 6991 Int_t TProofLockPath::Lock()
 
 6993    const char *pname = GetName();
 
 6995    if (gSystem->AccessPathName(pname))
 
 6996       fLockId = open(pname, O_CREAT|O_RDWR, 0644);
 
 6998       fLockId = open(pname, O_RDWR);
 
 7000    if (fLockId == -1) {
 
 7001       SysError(
"Lock", 
"cannot open lock file %s", pname);
 
 7006       Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
 
 7008 #if !defined(R__WIN32) && !defined(R__WINGCC) 
 7009    if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
 
 7010       SysError(
"Lock", 
"error locking %s", pname);
 
 7018       Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
 
 7027 Int_t TProofLockPath::Unlock()
 
 7033       Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
 
 7035    lseek(fLockId, 0, SEEK_SET);
 
 7036 #if !defined(R__WIN32) && !defined(R__WINGCC) 
 7037    if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
 
 7038       SysError(
"Unlock", 
"error unlocking %s", GetName());
 
 7046       Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());