22 #include "RConfigure.h"
35 #include <sys/types.h>
48 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
49 (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
50 (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
52 #define lockf(fd, op, sz) flock((fd), (op))
54 #define F_LOCK (LOCK_EX | LOCK_NB)
57 #define F_ULOCK LOCK_UN
88 #include "compiledata.h"
112 TProofServ *gProofServ = 0;
115 static volatile Int_t gProofServDebug = 1;
118 Int_t TProofServ::fgLogToSysLog = 0;
119 TString TProofServ::fgSysLogService(
"proof");
120 TString TProofServ::fgSysLogEntity(
"undef:default");
123 FILE *TProofServ::fgErrorHandlerFile = 0;
128 static const char *__crashreporter_info__ = 0;
129 asm(
".desc ___crashreporter_info__, 0x10");
134 Int_t TProofServ::fgRecursive = 0;
137 TString TProofServ::fgLastMsg(
"<undef>");
138 Long64_t TProofServ::fgLastEntry = -1;
141 Long_t TProofServ::fgVirtMemMax = -1;
142 Long_t TProofServ::fgResMemMax = -1;
143 Float_t TProofServ::fgMemHWM = 0.80;
144 Float_t TProofServ::fgMemStop = 0.95;
147 static void SendAsynMsg(
const char *msg) {
148 if (gProofServ) gProofServ->SendAsynMessage(msg, kTRUE);
154 class TProofServTerminationHandler :
public TSignalHandler {
157 TProofServTerminationHandler(TProofServ *s)
158 : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
165 Bool_t TProofServTerminationHandler::Notify()
167 Printf(
"Received SIGTERM: terminating");
168 fServ->HandleTermination();
175 class TProofServInterruptHandler :
public TSignalHandler {
178 TProofServInterruptHandler(TProofServ *s)
179 : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
186 Bool_t TProofServInterruptHandler::Notify()
188 fServ->HandleUrgentData();
189 if (TROOT::Initialized()) {
198 class TProofServSigPipeHandler :
public TSignalHandler {
201 TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
209 Bool_t TProofServSigPipeHandler::Notify()
211 fServ->HandleSigPipe();
218 class TProofServInputHandler :
public TFileHandler {
221 TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
224 Bool_t ReadNotify() {
return Notify(); }
230 Bool_t TProofServInputHandler::Notify()
232 fServ->HandleSocketInput();
236 TString TProofServLogHandler::fgPfx =
"";
237 Int_t TProofServLogHandler::fgCmdRtn = 0;
242 TProofServLogHandler::TProofServLogHandler(
const char *cmd,
243 TSocket *s,
const char *pfx)
244 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
246 ResetBit(kFileIsPipe);
250 fFile = gSystem->OpenPipe(cmd,
"r");
252 SetFd(fileno(fFile));
259 Error(
"TProofServLogHandler",
"executing command in pipe");
263 Error(
"TProofServLogHandler",
264 "undefined command (%p) or socket (%p)", (
int *)cmd, s);
270 TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s,
const char *pfx)
271 : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
273 ResetBit(kFileIsPipe);
278 SetFd(fileno(fFile));
282 Error(
"TProofServLogHandler",
"undefined file (%p) or socket (%p)", f, s);
288 TProofServLogHandler::~TProofServLogHandler()
290 if (TestBit(kFileIsPipe) && fFile) {
291 Int_t rc = gSystem->ClosePipe(fFile);
295 fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
300 ResetBit(kFileIsPipe);
305 Bool_t TProofServLogHandler::Notify()
308 TMessage m(kPROOF_MESSAGE);
312 while (fgets(line,
sizeof(line), fFile)) {
313 if ((plf = strchr(line,
'\n')))
317 if (fPfx.Length() > 0) {
319 log.Form(
"%s: %s", fPfx.Data(), line);
320 }
else if (fgPfx.Length() > 0) {
322 log.Form(
"%s: %s", fgPfx.Data(), line);
328 m.Reset(kPROOF_MESSAGE);
338 void TProofServLogHandler::SetDefaultPrefix(
const char *pfx)
346 Int_t TProofServLogHandler::GetCmdRtn()
354 TProofServLogHandlerGuard::TProofServLogHandlerGuard(
const char *cmd, TSocket *s,
355 const char *pfx, Bool_t on)
359 fExecHandler =
new TProofServLogHandler(cmd, s, pfx);
360 if (fExecHandler->IsValid()) {
361 gSystem->AddFileHandler(fExecHandler);
363 Error(
"TProofServLogHandlerGuard",
"invalid handler");
367 Error(
"TProofServLogHandlerGuard",
"undefined command");
374 TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
375 const char *pfx, Bool_t on)
379 fExecHandler =
new TProofServLogHandler(f, s, pfx);
380 if (fExecHandler->IsValid()) {
381 gSystem->AddFileHandler(fExecHandler);
383 Error(
"TProofServLogHandlerGuard",
"invalid handler");
387 Error(
"TProofServLogHandlerGuard",
"undefined file");
394 TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
396 if (fExecHandler && fExecHandler->IsValid()) {
397 gSystem->RemoveFileHandler(fExecHandler);
398 SafeDelete(fExecHandler);
406 TShutdownTimer::TShutdownTimer(TProofServ *p, Int_t delay)
407 : TTimer(delay, kFALSE), fProofServ(p)
409 fTimeout = gEnv->GetValue(
"ProofServ.ShutdownTimeout", 20);
411 fTimeout = gEnv->GetValue(
"ProofServ.ShutdonwTimeout", fTimeout);
418 Bool_t TShutdownTimer::Notify()
421 printf(
"TShutdownTimer::Notify: checking activity on the input socket\n");
425 if (fProofServ && (xs = fProofServ->GetSocket())) {
427 TTimeStamp ts = xs->GetLastUsage();
428 Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
429 (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
430 if (dt > fTimeout * 60000) {
431 printf(
"TShutdownTimer::Notify: input socket: %p: did not show any activity"
432 " during the last %d mins: aborting\n", xs, fTimeout);
438 printf(
"TShutdownTimer::Notify: input socket: %p: show activity"
439 " %ld secs ago\n", xs, dt / 60000);
451 TReaperTimer::~TReaperTimer()
454 fChildren->SetOwner(kTRUE);
463 void TReaperTimer::AddPid(Int_t pid)
467 fChildren =
new TList;
469 spid.Form(
"%d", pid);
470 fChildren->Add(
new TParameter<Int_t>(spid.Data(), pid));
479 Bool_t TReaperTimer::Notify()
482 TIter nxp(fChildren);
483 TParameter<Int_t> *p = 0;
484 while ((p = (TParameter<Int_t> *)nxp())) {
489 pid = waitpid(p->GetVal(), &status, WNOHANG);
490 }
while (pid < 0 && errno == EINTR);
493 pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
495 if (pid > 0 && pid == p->GetVal()) {
497 fChildren->Remove(p);
504 if (!fChildren || fChildren->GetSize() <= 0) {
517 Bool_t TIdleTOTimer::Notify()
519 Info (
"Notify",
"session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
524 if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
525 Warning(
"Notify",
"problems updating session status (errno: %d)", -uss_rc);
528 if (fProofServ->GetProtocol() < 29) {
529 msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
530 "// Please IGNORE any error message possibly displayed below\n//",
531 gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
533 msg.Form(
"\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
534 gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
536 fProofServ->SendAsynMessage(msg.Data());
537 fProofServ->Terminate(0);
541 Warning(
"Notify",
"fProofServ undefined!");
547 ClassImp(TProofServ);
552 TApplication *GetTProofServ(Int_t *argc,
char **argv, FILE *flog)
553 {
return new TProofServ(argc, argv, flog); }
562 TProofServ::TProofServ(Int_t *argc,
char **argv, FILE *flog)
563 : TApplication(
"proofserv", argc, argv, 0, -1)
566 Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
568 Printf(
"proofserv: command line testing: OK");
573 TString rcfile = gSystem->Getenv(
"ROOTRCFILE") ? gSystem->Getenv(
"ROOTRCFILE")
575 if (!gSystem->AccessPathName(rcfile, kReadPermission))
576 gEnv->ReadFile(rcfile, kEnvChange);
579 fgVirtMemMax = gEnv->GetValue(
"Proof.VirtMemMax",-1);
580 if (fgVirtMemMax < 0 && gSystem->Getenv(
"PROOF_VIRTMEMMAX")) {
581 Long_t mmx = strtol(gSystem->Getenv(
"PROOF_VIRTMEMMAX"), 0, 10);
582 if (mmx < kMaxLong && mmx > 0)
583 fgVirtMemMax = mmx * 1024;
586 if (fgVirtMemMax < 0 && gSystem->Getenv(
"ROOTPROOFASHARD")) {
587 Long_t mmx = strtol(gSystem->Getenv(
"ROOTPROOFASHARD"), 0, 10);
588 if (mmx < kMaxLong && mmx > 0)
589 fgVirtMemMax = mmx * 1024;
592 fgResMemMax = gEnv->GetValue(
"Proof.ResMemMax",-1);
593 if (fgResMemMax < 0 && gSystem->Getenv(
"PROOF_RESMEMMAX")) {
594 Long_t mmx = strtol(gSystem->Getenv(
"PROOF_RESMEMMAX"), 0, 10);
595 if (mmx < kMaxLong && mmx > 0)
596 fgResMemMax = mmx * 1024;
599 fgMemStop = gEnv->GetValue(
"Proof.MemStop", 0.95);
600 fgMemHWM = gEnv->GetValue(
"Proof.MemHWM", 0.80);
601 if (fgVirtMemMax > 0 || fgResMemMax > 0) {
602 if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
603 Warning(
"TProofServ",
"requested memory fraction threshold to stop processing"
604 " (MemStop) out of range [0,1] - ignoring");
607 if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
608 Warning(
"TProofServ",
"requested memory fraction threshold for warning and finer monitoring"
609 " (MemHWM) out of range [0,MemStop] - ignoring");
615 Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3],
"test")) ? kTRUE : kFALSE;
616 if ((gEnv->GetValue(
"Proof.GdbHook",0) == 3 && !test) ||
617 (gEnv->GetValue(
"Proof.GdbHook",0) == 4 && test)) {
618 while (gProofServDebug)
623 if (argc && *argc >= 4)
624 if (!strcmp(argv[3],
"test"))
625 fService =
"prooftest";
628 if (argc && *argc < 2) {
629 Error(
"TProofServ",
"Must have at least 1 arguments (see proofd).");
637 fSendLogToMaster = kFALSE;
640 gErrorAbortLevel = kSysError + 1;
641 SetErrorHandlerFile(stderr);
642 SetErrorHandler(ErrorHandler);
645 fGroupPriority = 100;
648 fOrdinal = gEnv->GetValue(
"ProofServ.Ordinal",
"-1");
671 fWaitingQueries =
new TList;
675 fQueuedMsg =
new TList;
677 fRealTimeLog = kFALSE;
699 ResetBit(TProofServ::kHighMemory);
702 fMsgSizeHWM = gEnv->GetValue(
"ProofServ.MsgSizeHWM", 1000000);
705 fCompressMsg = gEnv->GetValue(
"ProofServ.CompressMessage", 0);
707 gProofDebugLevel = gEnv->GetValue(
"Proof.DebugLevel",0);
708 fLogLevel = gProofDebugLevel;
710 gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue(
"Proof.DebugMask",~0);
711 if (gProofDebugLevel > 0)
712 Info(
"TProofServ",
"DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
715 fLogFileMaxSize = -1;
716 TString logmx = gEnv->GetValue(
"ProofServ.LogFileMaxSize",
"");
717 if (!logmx.IsNull()) {
719 if (!logmx.IsDigit()) {
720 if (logmx.EndsWith(
"K")) {
722 logmx.Remove(TString::kTrailing,
'K');
723 }
else if (logmx.EndsWith(
"M")) {
725 logmx.Remove(TString::kTrailing,
'M');
726 }
else if (logmx.EndsWith(
"G")) {
728 logmx.Remove(TString::kTrailing,
'G');
731 if (logmx.IsDigit()) {
732 fLogFileMaxSize = logmx.Atoi() * xf;
733 if (fLogFileMaxSize > 0)
734 Info(
"TProofServ",
"keeping the log file size within %lld bytes", fLogFileMaxSize);
736 logmx = gEnv->GetValue(
"ProofServ.LogFileMaxSize",
"");
737 Warning(
"TProofServ",
"bad formatted log file size limit ignored: '%s'", logmx.Data());
742 GetOptions(argc, argv);
745 fPrefix = (IsMaster() ?
"Mst-" :
"Wrk-");
746 if (test) fPrefix =
"Test";
747 if (fOrdinal !=
"-1")
749 TProofServLogHandler::SetDefaultPrefix(fPrefix);
752 TString slog = gEnv->GetValue(
"ProofServ.LogToSysLog",
"");
753 if (!(slog.IsNull())) {
754 if (slog.IsDigit()) {
755 fgLogToSysLog = slog.Atoi();
757 char c = (slog[0] ==
'M' || slog[0] ==
'm') ?
'm' :
'a';
758 c = (slog[0] ==
'W' || slog[0] ==
'w') ?
'w' : c;
759 Bool_t dosyslog = ((c ==
'm' && IsMaster()) ||
760 (c ==
'w' && !IsMaster()) || c ==
'a') ? kTRUE : kFALSE;
763 if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
764 if (fgLogToSysLog <= 0)
765 Warning(
"TProofServ",
"request for syslog logging ineffective!");
770 if (fgLogToSysLog > 0) {
771 fgSysLogService = (IsMaster()) ?
"proofm" :
"proofw";
772 if (fOrdinal !=
"-1") fgSysLogService += TString::Format(
"-%s", fOrdinal.Data());
773 gSystem->Openlog(fgSysLogService, kLogPid | kLogCons, kLogLocal5);
779 Bool_t enableSchemaEvolution = gEnv->GetValue(
"Proof.SchemaEvolution",1);
780 if (enableSchemaEvolution) {
781 TMessage::EnableSchemaEvolutionForAll();
783 Info(
"TProofServ",
"automatic schema evolution in TMessage explicitly disabled");
792 Int_t TProofServ::CreateServer()
795 TString opensock = gSystem->Getenv(
"ROOTOPENSOCK");
796 if (opensock.Length() <= 0)
797 opensock = gEnv->GetValue(
"ProofServ.OpenSock",
"-1");
798 Int_t sock = opensock.Atoi();
800 Fatal(
"CreateServer",
"Invalid socket descriptor number (%d)", sock);
803 fSocket =
new TSocket(sock);
806 fSocket->SetCompressionSettings(fCompressMsg);
811 if (gEnv->GetValue(
"Proof.GdbHook",0) == 1) {
812 while (gProofServDebug)
817 if (gEnv->GetValue(
"Proof.GdbHook",0) == 2) {
818 while (gProofServDebug)
823 if (gProofDebugLevel > 0)
824 Info(
"CreateServer",
"Service %s ConfDir %s IsMaster %d\n",
825 GetService(), GetConfDir(), (Int_t)fMasterServ);
838 TString pfx = (IsMaster() ?
"Mst-" :
"Wrk-");
840 TProofServLogHandler::SetDefaultPrefix(pfx);
846 if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
854 if ((fLogFileDes = fileno(fLogFile)) < 0) {
864 if (CatMotd() == -1) {
873 ProcessLine(
"#include <iostream>", kTRUE);
874 ProcessLine(
"#include <string>",kTRUE);
882 logon = gEnv->GetValue(
"Proof.Load", (
char *)0);
884 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
886 ProcessLine(TString::Format(
".L %s", logon), kTRUE);
891 logon = gEnv->GetValue(
"Proof.Logon", (
char *)0);
892 if (logon && !NoLogOpt()) {
893 char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
900 gInterpreter->SaveContext();
901 gInterpreter->SaveGlobalsContext();
904 gSystem->AddSignalHandler(
new TProofServTerminationHandler(
this));
905 gSystem->AddSignalHandler(
new TProofServInterruptHandler(
this));
906 fInputHandler =
new TProofServInputHandler(
this, sock);
907 gSystem->AddFileHandler(fInputHandler);
911 TString master =
"proof://__master__";
912 TInetAddress a = gSystem->GetSockName(sock);
915 master += a.GetPort();
919 TPluginManager *pm = gROOT->GetPluginManager();
921 Error(
"CreateServer",
"no plugin manager found");
928 TPluginHandler *h = pm->FindHandler(
"TProof", fConfFile);
930 Error(
"CreateServer",
"no plugin found for TProof with a"
931 " config file of '%s'", fConfFile.Data());
938 if (h->LoadPlugin() == -1) {
939 Error(
"CreateServer",
"plugin for TProof could not be loaded");
946 fProof =
reinterpret_cast<TProof*
>(h->ExecPlugin(5, master.Data(),
950 if (!fProof || !fProof->IsValid()) {
951 Error(
"CreateServer",
"plugin for TProof could not be executed");
958 fEndMaster = fProof->IsEndMaster();
964 if (!fShutdownTimer) {
966 fShutdownTimer =
new TShutdownTimer(
this, 300000);
967 fShutdownTimer->Start(-1, kFALSE);
972 if (fProtocol <= 17) {
974 msg.Form(
"Warning: client version is too old: automatic schema evolution is ineffective.\n"
975 " This may generate compatibility problems between streamed objects.\n"
976 " The advise is to move to ROOT >= 5.21/02 .");
977 SendAsynMessage(msg.Data());
981 if (IsMaster() && !fIdleTOTimer) {
983 Int_t idle_to = gEnv->GetValue(
"ProofServ.IdleTimeout", -1);
985 fIdleTOTimer =
new TIdleTOTimer(
this, idle_to * 1000);
986 fIdleTOTimer->Start(-1, kTRUE);
987 if (gProofDebugLevel > 0)
988 Info(
"CreateServer",
" idle timer started (%d secs)", idle_to);
989 }
else if (gProofDebugLevel > 0) {
990 Info(
"CreateServer",
" idle timer not started (no idle timeout requested)");
1002 TProofServ::~TProofServ()
1004 SafeDelete(fWaitingQueries);
1005 SafeDelete(fSocket);
1006 SafeDelete(fPackMgr);
1007 SafeDelete(fCacheLock);
1008 SafeDelete(fQueryLock);
1009 SafeDelete(fDataSetManager);
1010 SafeDelete(fDataSetStgRepo);
1021 Int_t TProofServ::CatMotd()
1025 Bool_t show = kFALSE;
1028 TString motdname(GetConfDir());
1031 if (gSystem->Getenv(
"PROOFNOPROOF")) {
1032 motdname = gSystem->Getenv(
"PROOFNOPROOF");
1034 motdname +=
"/etc/proof/noproof";
1036 if ((motd = fopen(motdname,
"r"))) {
1039 while ((c = getc(motd)) != EOF)
1048 lastname = TString(GetWorkDir()) +
"/.prooflast";
1049 char *last = gSystem->ExpandPathName(lastname.Data());
1051 Long_t id, flags, modtime, lasttime = 0;
1052 if (gSystem->GetPathInfo(last, &
id, &size, &flags, &lasttime) == 1)
1056 if (time(0) - lasttime > (time_t)86400)
1061 if (gSystem->Getenv(
"PROOFMOTD")) {
1062 motdname = gSystem->Getenv(
"PROOFMOTD");
1064 motdname = GetConfDir();
1065 motdname +=
"/etc/proof/motd";
1067 if (gSystem->GetPathInfo(motdname, &
id, &size, &flags, &modtime) == 0) {
1068 if (modtime > lasttime || show) {
1069 if ((motd = fopen(motdname,
"r"))) {
1072 while ((c = getc(motd)) != EOF)
1081 gSystem->Unlink(last);
1082 Int_t fd = creat(last, 0600);
1083 if (fd >= 0) close(fd);
1094 TObject *TProofServ::Get(
const char *namecycle)
1096 if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
1097 Error(
"Get",
"problems sending request");
1098 return (TObject *)0;
1103 Bool_t notdone = kTRUE;
1106 if (fSocket->Recv(mess) < 0)
1108 Int_t what = mess->What();
1109 if (what == kMESS_OBJECT) {
1110 idcur = mess->ReadObject(mess->GetClass());
1113 Int_t xrc = HandleSocketInput(mess, kFALSE);
1115 Error(
"Get",
"command %d cannot be executed while processing", what);
1116 }
else if (xrc == -2) {
1117 Error(
"Get",
"unknown command %d ! Protocol error?", what);
1129 void TProofServ::RestartComputeTime()
1133 TProofProgressStatus *status = fPlayer->GetProgressStatus();
1134 if (status) status->SetLearnTime(fCompute.RealTime());
1135 Info(
"RestartComputeTime",
"compute time restarted after %f secs (%d entries)",
1136 fCompute.RealTime(), fPlayer->GetLearnEntries());
1138 fCompute.Start(kFALSE);
1144 TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
1146 Long64_t bytesRead = 0;
1148 if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
1150 if (fCompute.Counter() > 0)
1153 TMessage req(kPROOF_GETPACKET);
1154 Double_t cputime = fCompute.CpuTime();
1155 Double_t realtime = fCompute.RealTime();
1157 if (fProtocol > 18) {
1158 req << fLatency.RealTime();
1159 TProofProgressStatus *status = 0;
1161 fPlayer->UpdateProgressInfo();
1162 status = fPlayer->GetProgressStatus();
1164 Error(
"GetNextPacket",
"no progress status object");
1169 if (status->GetEntries() > 0) {
1170 PDB(kLoop, 2) status->Print(GetOrdinal());
1171 status->IncProcTime(realtime);
1172 status->IncCPUTime(cputime);
1175 if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
1179 Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
1180 Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
1181 req << cacheSize << learnent;
1186 req << totalEntries;
1189 if (fProtocol > 34) req << fSaveOutput.RealTime();
1192 PDB(kLoop, 2) status->Print();
1193 Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
1196 status->ResetBit(TProofProgressStatus::kFileNotOpen);
1197 status->ResetBit(TProofProgressStatus::kFileCorrupted);
1200 req << fLatency.RealTime() << realtime << cputime
1201 << bytesRead << totalEntries;
1203 req << fPlayer->GetEventsProcessed();
1207 Int_t rc = fSocket->Send(req);
1209 Error(
"GetNextPacket",
"Send() failed, returned %d", rc);
1215 fSaveOutput.Start();
1216 if (fPlayer->SavePartialResults(kFALSE) < 0)
1217 Warning(
"GetNextPacket",
"problems saving partial results");
1221 TDSetElement *e = 0;
1222 Bool_t notdone = kTRUE;
1226 if ((rc = fSocket->Recv(mess)) <= 0) {
1228 Error(
"GetNextPacket",
"Recv() failed, returned %d", rc);
1233 TString file, dir, obj;
1235 Int_t what = mess->What();
1238 case kPROOF_GETPACKET:
1244 PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
1245 e->GetFileName(), e->GetDirectory(),
1246 e->GetObjName(), e->GetFirst(),e->GetNum());
1248 PDB(kLoop, 2) Info("GetNextPacket", "Done");
1253 case kPROOF_STOPPROCESS:
1258 PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
1262 xrc = HandleSocketInput(mess, kFALSE);
1264 Error(
"GetNextPacket",
"command %d cannot be executed while processing", what);
1265 }
else if (xrc == -2) {
1266 Error(
"GetNextPacket",
"unknown command %d ! Protocol error?", what);
1283 void TProofServ::GetOptions(Int_t *argc,
char **argv)
1285 Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3],
"test")) ? kTRUE : kFALSE;
1288 if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1289 Printf(
"proofserv: command line testing: OK");
1293 if (!argc || (argc && *argc <= 1)) {
1294 Fatal(
"GetOptions",
"Must be started from proofd with arguments");
1298 if (!strcmp(argv[1],
"proofserv")) {
1299 fMasterServ = kTRUE;
1301 }
else if (!strcmp(argv[1],
"proofslave")) {
1302 fMasterServ = kFALSE;
1303 fEndMaster = kFALSE;
1305 Fatal(
"GetOptions",
"Must be started as 'proofserv' or 'proofslave'");
1312 if (!(gSystem->Getenv(
"ROOTCONFDIR"))) {
1313 Fatal(
"GetOptions",
"ROOTCONFDIR shell variable not set");
1316 fConfDir = gSystem->Getenv(
"ROOTCONFDIR");
1322 void TProofServ::HandleSocketInput()
1325 TIdleTOTimerGuard itg(fIdleTOTimer);
1327 Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
1341 if (fSocket->Recv(mess) <= 0 || !mess) {
1344 Error(
"HandleSocketInput",
"retrieving message from input socket");
1348 Int_t what = mess->What();
1350 Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
1354 if (fProof) fProof->SetActive();
1356 Bool_t doit = kTRUE;
1361 rc = HandleSocketInput(mess, all);
1365 emsg.Form(
"HandleSocketInput: command %d cannot be executed while processing", what);
1366 }
else if (rc == -3) {
1367 emsg.Form(
"HandleSocketInput: message %d undefined! Protocol error?", what);
1369 emsg.Form(
"HandleSocketInput: unknown command %d! Protocol error?", what);
1371 SendAsynMessage(emsg.Data());
1372 }
else if (rc == 2) {
1374 fQueuedMsg->Add(mess);
1376 Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
1377 what, fQueuedMsg->GetSize());
1383 if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
1386 Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
1387 what, fQueuedMsg->GetSize());
1390 mess = (TMessage *) fQueuedMsg->First();
1391 if (mess) fQueuedMsg->Remove(mess);
1396 } catch (std::bad_alloc &) {
1398 exmsg.Form(
"caught exception 'bad_alloc' (memory leak?) %s %lld",
1399 fgLastMsg.Data(), fgLastEntry);
1400 }
catch (std::exception &exc) {
1402 exmsg.Form(
"caught standard exception '%s' %s %lld",
1403 exc.what(), fgLastMsg.Data(), fgLastEntry);
1406 exmsg.Form(
"caught exception throwing %d %s %lld",
1407 i, fgLastMsg.Data(), fgLastEntry);
1408 }
catch (
const char *str) {
1410 exmsg.Form(
"caught exception throwing '%s' %s %lld",
1411 str, fgLastMsg.Data(), fgLastEntry);
1414 exmsg.Form(
"caught exception <unknown> %s %lld",
1415 fgLastMsg.Data(), fgLastEntry);
1419 if (!exmsg.IsNull()) {
1421 Error(
"HandleSocketInput",
"%s", exmsg.Data());
1423 SendAsynMessage(TString::Format(
"%s: %s", GetOrdinal(), exmsg.Data()));
1430 if (TestBit(TProofServ::kHighMemory)) {
1432 exmsg.Form(
"high-memory footprint detected during Process(...) - terminating");
1433 Error(
"HandleSocketInput",
"%s", exmsg.Data());
1435 SendAsynMessage(TString::Format(
"%s: %s", GetOrdinal(), exmsg.Data()));
1445 Bool_t masterOnly = gEnv->GetValue(
"Proof.MasterOnly", kFALSE);
1446 Bool_t dynamicStartup = gEnv->GetValue(
"Proof.DynamicStartup", kFALSE);
1447 Int_t ngwrks = fProof->GetListOfActiveSlaves()->GetSize() + fProof->GetListOfInactiveSlaves()->GetSize();
1448 if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1449 SendAsynMessage(
" *** No workers left: cannot continue! Terminating ... *** ");
1452 fProof->SetActive(kFALSE);
1454 fProof->SetRunStatus(TProof::kRunning);
1470 Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
1472 static TStopwatch timer;
1474 Bool_t aborted = kFALSE;
1476 if (!mess)
return -3;
1478 Int_t what = mess->What();
1480 Info("HandleSocketInput", "processing message type %d from '%s'",
1481 what, fSocket->GetTitle());
1485 Int_t rc = 0, lirc = 0;
1487 TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
1493 mess->ReadString(str,
sizeof(str));
1497 Bool_t hasfn = TProof::GetFileInCmd(str, fn);
1499 if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
1500 fProof->SendCommand(str);
1503 Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
1507 ocwd = gSystem->WorkingDirectory();
1508 gSystem->ChangeDirectory(fCacheDir.Data());
1512 gSystem->ChangeDirectory(ocwd);
1513 fCacheLock->Unlock();
1522 if (pslb) slb = str;
1527 mess->ReadString(str,
sizeof(str));
1535 mess->ReadObject(mess->GetClass());
1541 case kPROOF_GROUPVIEW:
1543 mess->ReadString(str,
sizeof(str));
1545 sscanf(str,
"%d %d", &fGroupId, &fGroupSize);
1551 case kPROOF_LOGLEVEL:
1553 mess->ReadString(str,
sizeof(str));
1554 sscanf(str,
"%d %u", &fLogLevel, &mask);
1555 Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
1556 gProofDebugLevel = fLogLevel;
1557 gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
1559 Info(
"HandleSocketInput:kPROOF_LOGLEVEL",
"debug level set to %d (mask: 0x%x)",
1560 gProofDebugLevel, gProofDebugMask);
1562 fProof->SetLogLevel(fLogLevel, mask);
1574 mess->ReadString(str,
sizeof(str));
1582 mess->ReadString(str,
sizeof(str));
1590 Warning(
"HandleSocketInput:kPROOF_STATUS",
1591 "kPROOF_STATUS message is obsolete");
1592 if (fSocket->Send(fProof->GetParallel(), kPROOF_STATUS) < 0)
1593 Warning(
"HandleSocketInput:kPROOF_STATUS",
"problem sending of request");
1596 case kPROOF_GETSTATS:
1600 case kPROOF_GETPARALLEL:
1610 Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
1611 if (fProof) fProof->TerminateWorker(ord);
1614 Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
1622 case kPROOF_STOPPROCESS:
1627 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
1629 Long_t timeout = -1;
1634 Info("HandleSocketInput:kPROOF_STOPPROCESS",
1635 "recursive mode: enter %d, %ld", aborted, timeout);
1638 fProof->StopProcess(aborted, timeout);
1642 fPlayer->StopProcess(aborted, timeout);
1646 case kPROOF_PROCESS:
1648 TProofServLogHandlerGuard hg(fLogFile, fSocket,
"", fRealTimeLog);
1649 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
1650 HandleProcess(mess, pslb);
1657 case kPROOF_SENDOUTPUT:
1659 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
1660 "worker was asked to send output to master");
1662 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
1663 Error(
"HandleSocketInput:kPROOF_SENDOUTPUT",
"problems sending output list");
1667 fSocket->Send(kPROOF_SETIDLE);
1674 case kPROOF_QUERYLIST:
1676 HandleQueryList(mess);
1684 HandleRemove(mess, pslb);
1690 case kPROOF_RETRIEVE:
1692 HandleRetrieve(mess, pslb);
1698 case kPROOF_ARCHIVE:
1700 HandleArchive(mess, pslb);
1706 case kPROOF_MAXQUERIES:
1708 Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
1709 TMessage m(kPROOF_MAXQUERIES);
1717 case kPROOF_CLEANUPSESSION:
1720 Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
1723 if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
1724 Printf(
"Session %s cleaned up", stag.Data());
1726 Printf(
"Could not cleanup session %s", stag.Data());
1735 case kPROOF_GETENTRIES:
1736 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
1740 TString objname("undef");
1741 Long64_t entries = -1;
1744 (*mess) >> isTree >> filename >> dir >> objname;
1745 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1746 "Report size of
object %s (%s) in dir %s in file %s",
1747 objname.Data(), isTree ? "T" : "O",
1748 dir.Data(), filename.Data());
1749 entries = TDSet::GetEntries(isTree, filename, dir, objname);
1750 PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1751 "Found %lld %s", entries, isTree ? "entries" : "objects");
1755 TMessage answ(kPROOF_GETENTRIES);
1756 answ << entries << objname;
1758 fSocket->Send(answ);
1759 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
1763 case kPROOF_CHECKFILE:
1764 if (!all && fProtocol <= 19) {
1769 HandleCheckFile(mess, pslb);
1774 case kPROOF_SENDFILE:
1775 if (!all && fProtocol <= 19) {
1779 mess->ReadString(str,
sizeof(str));
1783 if (fProtocol > 5) {
1784 sscanf(str,
"%1023s %d %ld %d", name, &bin, &size, &fw);
1786 sscanf(str,
"%1023s %d %ld", name, &bin, &size);
1789 Bool_t copytocache = kTRUE;
1790 if (fnam.BeginsWith(
"cache:")) {
1791 fnam.ReplaceAll(
"cache:", TString::Format(
"%s/", fCacheDir.Data()));
1792 copytocache = kFALSE;
1797 rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
1800 if (!fnam.BeginsWith(fCacheDir.Data())) {
1801 fnam.Insert(0, TString::Format(
"%s/", fCacheDir.Data()));
1806 if (copytocache && size > 0 && !fPackMgr->IsInDir(name))
1807 gSystem->Exec(TString::Format(
"%s %s %s", kCP, fnam.Data(), fCacheDir.Data()));
1808 if (IsMaster() && fw == 1) {
1809 Int_t opt = TProof::kForward | TProof::kCp;
1811 opt |= TProof::kBinary;
1813 Info("HandleSocketInput","forwarding file: %s", fnam.Data());
1814 if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
1815 Error(
"HandleSocketInput",
"forwarding file: %s", fnam.Data());
1818 if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
1826 case kPROOF_LOGFILE:
1829 (*mess) >> start >> end;
1831 Info("HandleSocketInput:kPROOF_LOGFILE",
1832 "Logfile request - byte range: %d - %d", start, end);
1835 SendLogFile(0, start, end);
1839 case kPROOF_PARALLEL:
1843 Bool_t random = kFALSE;
1845 if ((mess->BufferSize() > mess->Length()))
1847 if (fProof) fProof->SetParallel(nodes, random);
1858 if (!all && fProtocol <= 19) {
1862 TProofServLogHandlerGuard hg(fLogFile, fSocket,
"", fRealTimeLog);
1863 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
1864 Int_t hcrc = HandleCache(mess, pslb);
1870 case kPROOF_WORKERLISTS:
1874 wlrc = HandleWorkerLists(mess);
1876 Warning(
"HandleSocketInput:kPROOF_WORKERLISTS",
1877 "Action meaning-less on worker nodes: protocol error?");
1886 case kPROOF_GETSLAVEINFO:
1888 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
1893 if (fProof->UseDynamicStartup()) {
1897 TList* workerList =
new TList();
1898 EQueryAction retVal = GetWorkers(workerList, pc);
1899 if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
1900 Int_t ret = fProof->AddWorkers(workerList);
1902 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1903 "adding a list of worker nodes returned: %d", ret);
1906 Error(
"HandleSocketInput:kPROOF_GETSLAVEINFO",
1907 "getting list of worker nodes returned: %d", retVal);
1912 TList *info = fProof->GetListOfSlaveInfos();
1913 TMessage answ(kPROOF_GETSLAVEINFO);
1915 fSocket->Send(answ);
1917 if (IsMaster() && fProof->UseDynamicStartup()) fProof->RemoveWorkers(0);
1920 TMessage answ(kPROOF_GETSLAVEINFO);
1921 TList *info =
new TList;
1922 TSlaveInfo *wi =
new TSlaveInfo(GetOrdinal(), TUrl(gSystem->HostName()).GetHostFQDN(), 0,
"", GetDataDir());
1924 gSystem->GetSysInfo(&si);
1927 answ << (TList *)info;
1928 fSocket->Send(answ);
1929 info->SetOwner(kTRUE);
1933 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
1935 TMessage answ(kPROOF_GETSLAVEINFO);
1937 fSocket->Send(answ);
1942 case kPROOF_GETTREEHEADER:
1944 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
1946 TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
1948 p->HandleGetTreeHeader(mess);
1951 Error(
"HandleSocketInput:kPROOF_GETTREEHEADER",
"could not create TProofPlayer instance!");
1954 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
1956 TMessage answ(kPROOF_GETTREEHEADER);
1957 answ << TString(
"Failed") << (TObject *)0;
1958 fSocket->Send(answ);
1963 case kPROOF_GETOUTPUTLIST:
1964 { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
1965 TList* outputList = 0;
1967 outputList = fProof->GetOutputList();
1969 outputList =
new TList();
1971 outputList =
new TList();
1972 if (fProof->GetPlayer()) {
1973 TList *olist = fProof->GetPlayer()->GetOutputList();
1976 while ( (o = next()) ) {
1977 outputList->Add(
new TNamed(o->GetName(),
""));
1981 outputList->SetOwner();
1982 TMessage answ(kPROOF_GETOUTPUTLIST);
1984 fSocket->Send(answ);
1986 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
1990 case kPROOF_VALIDATE_DSET:
1993 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
1998 if (IsMaster()) fProof->ValidateDSet(dset);
1999 else dset->Validate();
2001 TMessage answ(kPROOF_VALIDATE_DSET);
2003 fSocket->Send(answ);
2006 Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
2014 case kPROOF_DATA_READY:
2016 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
2017 TMessage answ(kPROOF_DATA_READY);
2019 Long64_t totalbytes = 0, bytesready = 0;
2020 Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
2021 answ << dataready << totalbytes << bytesready;
2023 Error(
"HandleSocketInput:kPROOF_DATA_READY",
2024 "This message should not be sent to slaves");
2025 answ << kFALSE << Long64_t(0) << Long64_t(0);
2027 fSocket->Send(answ);
2028 PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
2030 TMessage answ(kPROOF_DATA_READY);
2031 answ << kFALSE << Long64_t(0) << Long64_t(0);
2032 fSocket->Send(answ);
2039 case kPROOF_DATASETS:
2041 if (fProtocol > 16) {
2042 dsrc = HandleDataSets(mess, pslb);
2044 Error(
"HandleSocketInput",
"old client: no or incompatible dataset support");
2050 case kPROOF_SUBMERGER:
2051 { HandleSubmerger(mess);
2055 case kPROOF_LIB_INC_PATH:
2057 lirc = HandleLibIncPath(mess);
2062 if (lirc > 0) SendLogFile();
2065 case kPROOF_REALTIMELOG:
2069 Info("HandleSocketInput:kPROOF_REALTIMELOG",
2070 "setting real-time logging %s", (on ? "ON" : "OFF"));
2074 fProof->SetRealTimeLog(on);
2088 case kPROOF_STARTPROCESS:
2092 if (WaitingQueries() == 0) {
2093 Error(
"HandleSocketInput",
"no queries enqueued");
2099 TList *workerList = (fProof->UseDynamicStartup()) ?
new TList : (TList *)0;
2101 EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
2103 if (retVal == TProofServ::kQueryOK) {
2105 if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
2106 Error(
"HandleSocketInput",
"adding a list of worker nodes returned: %d", ret);
2112 TMessage m(kPROOF_SETIDLE);
2113 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
2118 if (retVal == TProofServ::kQueryStop) {
2119 Error(
"HandleSocketInput",
"error getting list of worker nodes");
2120 }
else if (retVal != TProofServ::kQueryEnqueued) {
2121 Warning(
"HandleSocketInput",
"query was re-queued!");
2123 Error(
"HandleSocketInput",
"unexpected answer: %d", retVal);
2131 case kPROOF_GOASYNC:
2135 if (!IsIdle() && fPlayer) {
2137 TProofQueryResult *pq = (TProofQueryResult *) fPlayer->GetCurrentQuery();
2138 TMessage m(kPROOF_QUERYSUBMITTED);
2139 m << pq->GetSeqNum() << kFALSE;
2143 SendAsynMessage(
"Processing request to go asynchronous:"
2144 " idle or undefined player - ignoring");
2155 TObject *obj = mess->ReadObject(0x0);
2160 if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
2165 TMessage rmsg(kPROOF_MESSAGE);
2168 if (obj->InheritsFrom(TObjString::Class())) {
2170 smsg.Form(
"Echo response from %s:%s: %s",
2171 gSystem->HostName(), GetOrdinal(),
2172 ((TObjString *)obj)->String().Data());
2178 TString tmpfn =
"echo-out-";
2179 FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
2180 if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
2181 Error(
"HandleSocketInput",
"Can't redirect output");
2184 gSystem->Unlink(tmpfn);
2192 gSystem->RedirectOutput(0x0);
2196 smsg.Form(
"*** Echo response from %s:%s ***\n",
2197 gSystem->HostName(), GetOrdinal());
2198 TMacro *fr =
new TMacro();
2199 fr->ReadFile(tmpfn);
2200 TIter nextLine(fr->GetListOfLines());
2202 while (( line = (TObjString *)nextLine() )) {
2203 smsg.Append( line->String() );
2208 gSystem->Unlink(tmpfn);
2213 GetSocket()->Send(rmsg);
2219 Error(
"HandleSocketInput",
"unknown command %d", what);
2224 fRealTime += (Float_t)timer.RealTime();
2225 fCpuTime += (Float_t)timer.CpuTime();
2227 if (!(slb.IsNull()) || fgLogToSysLog > 1) {
2229 s.Form(
"%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
2230 what, timer.RealTime(), timer.CpuTime(), slb.Data());
2231 gSystem->Syslog(kLogNotice, s.Data());
2241 Bool_t TProofServ::AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
2243 TMessage *mess =
new TMessage();
2244 Int_t mergedWorkers = 0;
2246 PDB(kSubmerger, 1) Info("AcceptResults", "enter");
2249 Bool_t result = kTRUE;
2251 fMergingMonitor = new TMonitor();
2252 fMergingMonitor->Add(fMergingSocket);
2254 Int_t numworkers = 0;
2255 while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
2257 TSocket *s = fMergingMonitor->Select();
2259 Info(
"AcceptResults",
"interrupt!");
2264 if (s == fMergingSocket) {
2266 TSocket *sw = fMergingSocket->Accept();
2267 if (sw && sw != (TSocket *)(-1)) {
2268 fMergingMonitor->Add(sw);
2271 Info("AcceptResults", "connection from a worker accepted on merger %s ",
2274 if (++numworkers >= connections)
2275 fMergingMonitor->Remove(fMergingSocket);
2278 Info("AcceptResults", "spurious signal found of merging socket");
2281 if (s->Recv(mess) < 0) {
2282 Error(
"AcceptResults",
"problems receiving message");
2286 Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
2288 Error(
"AcceptResults",
"message received: %p ", mess);
2294 while ((mess->BufferSize() > mess->Length())) {
2297 PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
2301 Info("AcceptResults",
2302 "a new worker has been mergerd. Total merged workers: %d",
2305 TObject *o = mess->ReadObject(TObject::Class());
2306 if (mergerPlayer->AddOutputObject(o) == 1) {
2308 PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
2311 PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
2315 fMergingMonitor->DeActivateAll();
2317 TList* sockets = fMergingMonitor->GetListOfDeActives();
2318 Int_t size = sockets->GetSize();
2319 for (Int_t i =0; i< size; ++i){
2320 ((TSocket*)(sockets->At(i)))->Close();
2321 PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
2322 delete ((TSocket*)(sockets->At(i)));
2325 fMergingMonitor->RemoveAll();
2326 SafeDelete(fMergingMonitor);
2328 PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
2335 void TProofServ::HandleUrgentData()
2338 Int_t n, nch, wasted = 0;
2340 const Int_t kBufSize = 1024;
2341 char waste[kBufSize];
2344 TProofServLogHandlerGuard hg(fLogFile, fSocket,
"", fRealTimeLog);
2347 Info("HandleUrgentData", "handling oob...");
2350 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
2362 fSocket->GetOption(kBytesToRead, nch);
2364 gSystem->Sleep(1000);
2368 if (nch > kBufSize) nch = kBufSize;
2369 n = fSocket->RecvRaw(waste, nch);
2371 Error(
"HandleUrgentData",
"error receiving waste");
2376 Error(
"HandleUrgentData",
"error receiving OOB");
2382 Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
2384 if (fProof) fProof->SetActive();
2388 case TProof::kHardInterrupt:
2389 Info(
"HandleUrgentData",
"*** Hard Interrupt");
2393 fProof->Interrupt(TProof::kHardInterrupt);
2399 fSocket->GetOption(kAtMark, atmark);
2404 n = fSocket->SendRaw(&oob_byte, 1, kOob);
2406 Error(
"HandleUrgentData",
"error sending OOB");
2411 fSocket->GetOption(kBytesToRead, nch);
2413 gSystem->Sleep(1000);
2417 if (nch > kBufSize) nch = kBufSize;
2418 n = fSocket->RecvRaw(waste, nch);
2420 Error(
"HandleUrgentData",
"error receiving waste (2)");
2429 case TProof::kSoftInterrupt:
2430 Info(
"HandleUrgentData",
"Soft Interrupt");
2434 fProof->Interrupt(TProof::kSoftInterrupt);
2437 Error(
"HandleUrgentData",
"soft interrupt flushed stream");
2447 case TProof::kShutdownInterrupt:
2448 Info(
"HandleUrgentData",
"Shutdown Interrupt");
2452 fProof->Interrupt(TProof::kShutdownInterrupt);
2459 Error(
"HandleUrgentData",
"unexpected OOB byte");
2463 if (fProof) fProof->SetActive(kFALSE);
2470 void TProofServ::HandleSigPipe()
2473 TProofServLogHandlerGuard hg(fLogFile, fSocket,
"", fRealTimeLog);
2478 if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
2479 Info(
"HandleSigPipe",
"keepAlive probe failed");
2482 fProof->SetActive();
2483 fProof->Interrupt(TProof::kShutdownInterrupt);
2484 fProof->SetActive(kFALSE);
2488 Info(
"HandleSigPipe",
"keepAlive probe failed");
2496 Bool_t TProofServ::IsParallel()
const
2498 if (IsMaster() && fProof)
2499 return fProof->IsParallel() || fProof->UseDynamicStartup() ;
2508 void TProofServ::Print(Option_t *option)
const
2510 if (IsMaster() && fProof)
2511 fProof->Print(option);
2513 Printf(
"This is worker %s", gSystem->HostName());
2520 void TProofServ::RedirectOutput(
const char *dir,
const char *mode)
2524 TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
2526 snprintf(logfile, 512,
"%s/master-%s.log", sdir.Data(), fOrdinal.Data());
2528 snprintf(logfile, 512,
"%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
2531 if ((freopen(logfile, mode, stdout)) == 0)
2532 SysError(
"RedirectOutput",
"could not freopen stdout (%s)", logfile);
2534 if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2535 SysError(
"RedirectOutput",
"could not redirect stderr");
2537 if ((fLogFile = fopen(logfile,
"r")) == 0)
2538 SysError(
"RedirectOutput",
"could not open logfile '%s'", logfile);
2541 if (fProtocol < 4 && fWorkDir != TString::Format(
"~/%s", kPROOF_WorkDir)) {
2542 Warning(
"RedirectOutput",
"no way to tell master (or client) where"
2543 " to upload packages");
2550 void TProofServ::Reset(
const char *dir)
2555 if (!dd.BeginsWith(
"proofserv")) {
2556 Int_t ic = dd.Index(
":");
2558 dd.Replace(0, ic,
"proofserv");
2560 gDirectory->cd(dd.Data());
2567 if (gDirectory != gROOT) {
2568 gDirectory->Delete();
2571 if (IsMaster()) fProof->SendCurrentState();
2580 Int_t TProofServ::ReceiveFile(
const char *file, Bool_t bin, Long64_t size)
2582 if (size <= 0)
return 0;
2585 Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2587 SysError(
"ReceiveFile",
"error opening file %s", file);
2591 const Int_t kMAXBUF = 16384;
2592 char buf[kMAXBUF], cpy[kMAXBUF];
2595 Long64_t filesize = 0;
2597 while (filesize < size) {
2598 left = Int_t(size - filesize);
2601 r = fSocket->RecvRaw(&buf, left);
2610 Int_t k = 0, i = 0, j = 0;
2617 cpy[j++] = buf[i++];
2621 w = write(fd, q, r);
2623 w = write(fd, p, r);
2627 SysError(
"ReceiveFile",
"error writing to file %s", file);
2635 Error(
"ReceiveFile",
"error during receiving file %s", file);
2643 if (chmod(file, 0644) != 0)
2644 Warning(
"ReceiveFile",
"error setting mode 0644 on file %s", file);
2652 void TProofServ::Run(Bool_t retrn)
2655 if (CreateServer() == 0) {
2658 TApplication::Run(retrn);
2667 void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
2675 if (!fSendLogToMaster) {
2679 LogToMaster(kFALSE);
2683 off_t ltot=0, lnow=0;
2685 Bool_t adhoc = kFALSE;
2687 if (fLogFileDes > -1) {
2688 ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2689 lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
2691 if (ltot >= 0 && lnow >= 0) {
2693 lseek(fLogFileDes, (off_t) start, SEEK_SET);
2694 if (end <= start || end > ltot)
2696 left = (Int_t)(end - start);
2701 left = (Int_t)(ltot - lnow);
2707 if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
2708 SysError(
"SendLogFile",
"error sending kPROOF_LOGFILE");
2712 const Int_t kMAXBUF = 32768;
2714 Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2717 while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
2718 TSystem::GetErrno() == EINTR)
2719 TSystem::ResetErrno();
2722 SysError(
"SendLogFile",
"error reading log file");
2726 if (end == ltot && len == wanted)
2729 if (fSocket->SendRaw(buf, len) < 0) {
2730 SysError(
"SendLogFile",
"error sending log file");
2736 wanted = (left > kMAXBUF) ? kMAXBUF : left;
2738 }
while (len > 0 && left > 0);
2742 if (adhoc && lnow >=0 )
2743 lseek(fLogFileDes, lnow, SEEK_SET);
2745 TMessage mess(kPROOF_LOGDONE);
2747 mess << status << (fProof ? fProof->GetParallel() : 0);
2749 mess << status << (Int_t) 1;
2751 if (fSocket->Send(mess) < 0) {
2752 SysError(
"SendLogFile",
"error sending kPROOF_LOGDONE");
2756 PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
2762 void TProofServ::SendStatistics()
2764 Long64_t bytesread = TFile::GetFileBytesRead();
2765 Float_t cputime = fCpuTime, realtime = fRealTime;
2767 bytesread = fProof->GetBytesRead();
2768 cputime = fProof->GetCpuTime();
2771 TMessage mess(kPROOF_GETSTATS);
2772 TString workdir = gSystem->WorkingDirectory();
2773 mess << bytesread << realtime << cputime << workdir;
2774 if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
2775 mess << TString(gProofServ->GetImage());
2776 fSocket->Send(mess);
2782 void TProofServ::SendParallel(Bool_t async)
2784 Int_t nparallel = 0;
2787 Info("SendParallel", "Will invoke AskParallel()");
2788 fProof->AskParallel();
2790 Info("SendParallel", "Will invoke GetParallel()");
2791 nparallel = fProof->GetParallel();
2796 TMessage mess(kPROOF_GETPARALLEL);
2797 mess << nparallel << async;
2798 fSocket->Send(mess);
2805 Int_t TProofServ::Setup()
2810 snprintf(str, 512,
"**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
2812 snprintf(str, 512,
"**** PROOF slave server @ %s started ****", gSystem->HostName());
2815 if (fSocket->Send(str) != 1+
static_cast<Int_t
>(strlen(str))) {
2816 Error(
"Setup",
"failed to send proof server startup message");
2823 if (fSocket->Recv(fProtocol, what) != 2*
sizeof(Int_t)) {
2824 Error(
"Setup",
"failed to receive remote proof protocol");
2827 if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*
sizeof(Int_t)) {
2828 Error(
"Setup",
"failed to send local proof protocol");
2833 if (fProtocol < 5) {
2835 if (OldAuthSetup(wconf) != 0) {
2836 Error(
"Setup",
"OldAuthSetup: failed to setup authentication");
2841 fWorkDir.Form(
"~/%s", kPROOF_WorkDir);
2843 if (fProtocol < 4) {
2844 fWorkDir.Form(
"~/%s", kPROOF_WorkDir);
2847 if (fWorkDir.IsNull()) fWorkDir.Form(
"~/%s", kPROOF_WorkDir);
2854 if ((fSocket->Recv(mess) <= 0) || !mess) {
2855 Error(
"Setup",
"failed to receive ordinal and config info");
2859 (*mess) >> fUser >> fOrdinal >> fConfFile;
2860 fWorkDir = gEnv->GetValue(
"ProofServ.Sandbox", TString::Format(
"~/%s", kPROOF_WorkDir));
2862 (*mess) >> fUser >> fOrdinal >> fWorkDir;
2863 if (fWorkDir.IsNull())
2864 fWorkDir = gEnv->GetValue(
"ProofServ.Sandbox", TString::Format(
"~/%s", kPROOF_WorkDir));
2867 if (fOrdinal !=
"-1")
2868 fPrefix += fOrdinal;
2869 TProofServLogHandler::SetDefaultPrefix(fPrefix);
2876 TString conffile = fConfFile;
2877 conffile.Remove(0, 1 + conffile.Index(
":"));
2880 TProofResourcesStatic resources(fConfDir, conffile);
2881 if (resources.IsValid()) {
2882 if (resources.GetMaster()) {
2883 TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
2884 if (tmpWorkDir !=
"")
2885 fWorkDir = tmpWorkDir;
2888 Info(
"Setup",
"invalid config file %s (missing or unreadable",
2889 resources.GetFileName().Data());
2895 gSystem->Setenv(
"HOME", gSystem->HomeDirectory());
2898 if (fWorkDir.BeginsWith(
"/") &&
2899 !fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
2900 if (!fWorkDir.EndsWith(
"/"))
2902 UserGroup_t *u = gSystem->GetUserInfo();
2904 fWorkDir += u->fUser;
2910 char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
2913 if (gProofDebugLevel > 0)
2914 Info(
"Setup",
"working directory set to %s", fWorkDir.Data());
2917 TString host = gSystem->HostName();
2918 if (host.Index(
".") != kNPOS)
2919 host.Remove(host.Index(
"."));
2922 fSessionTag.Form(
"%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
2923 (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
2924 fTopSessionTag = fSessionTag;
2927 fSessionDir = fWorkDir;
2929 fSessionDir +=
"/master-";
2931 fSessionDir +=
"/slave-";
2932 fSessionDir += fSessionTag;
2935 if (SetupCommon() != 0) {
2936 Error(
"Setup",
"common setup failed");
2941 fSocket->SetOption(kProcessGroup, gSystem->GetPid());
2944 fSocket->SetOption(kNoDelay, 1);
2947 fSocket->SetOption(kKeepAlive, 1);
2957 Int_t TProofServ::SetupCommon()
2960 gSystem->Umask(022);
2964 TString path(gSystem->Getenv(
"PATH"));
2965 TString bindir(TROOT::GetBinDir());
2968 TString paths = gEnv->GetValue(
"ProofServ.BinPaths",
"");
2969 if (paths.Length() > 0) {
2971 if (paths.Contains(
"^<compiler>"))
2973 else if (paths.Contains(
"<compiler>"))
2977 TString compiler = COMPILER;
2978 if (compiler.Index(
"is ") != kNPOS)
2979 compiler.Remove(0, compiler.Index(
"is ") + 3);
2980 compiler = gSystem->DirName(compiler);
2982 if (!bindir.IsNull()) bindir +=
":";
2984 }
else if (icomp == -1) {
2985 if (!path.IsNull()) path +=
":";
2991 if (paths.Contains(
"^<sysbin>"))
2993 else if (paths.Contains(
"<sysbin>"))
2997 if (!bindir.IsNull()) bindir +=
":";
2998 bindir +=
"/bin:/usr/bin:/usr/local/bin";
2999 }
else if (isysb == -1) {
3000 if (!path.IsNull()) path +=
":";
3001 path +=
"/bin:/usr/bin:/usr/local/bin";
3006 if (!bindir.IsNull()) bindir +=
":";
3007 path.Insert(0, bindir);
3008 gSystem->Setenv(
"PATH", path);
3011 if (gSystem->AccessPathName(fWorkDir)) {
3012 gSystem->mkdir(fWorkDir, kTRUE);
3013 if (!gSystem->ChangeDirectory(fWorkDir)) {
3014 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3019 if (!gSystem->ChangeDirectory(fWorkDir)) {
3020 gSystem->Unlink(fWorkDir);
3021 gSystem->mkdir(fWorkDir, kTRUE);
3022 if (!gSystem->ChangeDirectory(fWorkDir)) {
3023 Error(
"SetupCommon",
"can not change to PROOF directory %s",
3031 fGroup = gEnv->GetValue(
"ProofServ.ProofGroup",
"default");
3034 fCacheDir = gEnv->GetValue(
"ProofServ.CacheDir",
3035 TString::Format(
"%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
3036 ResolveKeywords(fCacheDir);
3037 if (gSystem->AccessPathName(fCacheDir))
3038 gSystem->mkdir(fCacheDir, kTRUE);
3039 if (gProofDebugLevel > 0)
3040 Info(
"SetupCommon",
"cache directory set to %s", fCacheDir.Data());
3042 new TProofLockPath(TString::Format(
"%s/%s%s",
3043 gSystem->TempDirectory(), kPROOF_CacheLockFile,
3044 TString(fCacheDir).ReplaceAll(
"/",
"%").Data()));
3046 TProof::AssertMacroPath(TString::Format(
"%s/.", fCacheDir.Data()));
3049 TString packdir = gEnv->GetValue(
"ProofServ.PackageDir",
3050 TString::Format(
"%s/%s", fWorkDir.Data(), kPROOF_PackDir));
3051 ResolveKeywords(packdir);
3052 if (gSystem->AccessPathName(packdir))
3053 gSystem->mkdir(packdir, kTRUE);
3054 fPackMgr =
new TPackMgr(packdir);
3055 fPackMgr->SetLogger(SendAsynMsg);
3058 const char *k = (IsMaster()) ?
"Mst" :
"Wrk";
3059 noth.Form(
"%s-%s", k, fOrdinal.Data());
3060 fPackMgr->SetPrefix(noth.Data());
3061 if (gProofDebugLevel > 0)
3062 Info(
"SetupCommon",
"package directory set to %s", packdir.Data());
3065 fDataDir = gEnv->GetValue(
"ProofServ.DataDir",
"");
3066 Ssiz_t isep = kNPOS;
3067 if (fDataDir.IsNull()) {
3069 fDataDir.Form(
"%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
3070 }
else if ((isep = fDataDir.Last(
' ')) != kNPOS) {
3071 fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
3072 fDataDir.Remove(isep);
3074 ResolveKeywords(fDataDir);
3075 if (gSystem->AccessPathName(fDataDir))
3076 if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
3077 Warning(
"SetupCommon",
"problems creating path '%s' (errno: %d)",
3078 fDataDir.Data(), TSystem::GetErrno());
3080 if (gProofDebugLevel > 0)
3081 Info(
"SetupCommon",
"data directory set to %s", fDataDir.Data());
3085 TString dataDirOpts = gEnv->GetValue(
"ProofServ.DataDirOpts",
"");
3086 if (!dataDirOpts.IsNull()) {
3088 Bool_t doit = kTRUE;
3089 if ((IsMaster() && !dataDirOpts.Contains(
"M")) ||
3090 (!IsMaster() && !dataDirOpts.Contains(
"W"))) doit = kFALSE;
3094 if (dataDirOpts.Contains(
"g")) m = 0775;
3095 if (dataDirOpts.Contains(
"a") || dataDirOpts.Contains(
"o")) m = 0777;
3096 if (gProofDebugLevel > 0)
3097 Info(
"SetupCommon",
"requested mode for data directories is '%o'", m);
3102 if (fDataDir.BeginsWith(
"/")) p =
"/";
3103 while (fDataDir.Tokenize(subp, from,
"/")) {
3104 if (subp.IsNull())
continue;
3106 if (gSystem->GetPathInfo(p, st) == 0) {
3107 if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
3108 if (gSystem->Chmod(p.Data(), m) != 0) {
3109 Warning(
"SetupCommon",
"problems setting mode '%o' on path '%s' (errno: %d)",
3110 m, p.Data(), TSystem::GetErrno());
3116 Warning(
"SetupCommon",
"problems stat-ing path '%s' (errno: %d; datadir: %s)",
3117 p.Data(), TSystem::GetErrno(), fDataDir.Data());
3125 TString globpack = gEnv->GetValue(
"Proof.GlobalPackageDirs",
"");
3127 ResolveKeywords(globpack);
3128 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
3129 Info(
"SetupCommon",
" %d global package directories registered", nglb);
3133 if (fSessionDir != gSystem->WorkingDirectory()) {
3134 ResolveKeywords(fSessionDir);
3135 if (gSystem->AccessPathName(fSessionDir))
3136 gSystem->mkdir(fSessionDir, kTRUE);
3137 if (!gSystem->ChangeDirectory(fSessionDir)) {
3138 Error(
"SetupCommon",
"can not change to working directory '%s'",
3139 fSessionDir.Data());
3143 gSystem->Setenv(
"PROOF_SANDBOX", fSessionDir);
3144 if (gProofDebugLevel > 0)
3145 Info(
"SetupCommon",
"session dir is '%s'", fSessionDir.Data());
3152 fQueryDir = fWorkDir;
3153 fQueryDir += TString(
"/") + kPROOF_QueryDir;
3154 ResolveKeywords(fQueryDir);
3155 if (gSystem->AccessPathName(fQueryDir))
3156 gSystem->mkdir(fQueryDir, kTRUE);
3157 fQueryDir += TString(
"/session-") + fTopSessionTag;
3158 if (gSystem->AccessPathName(fQueryDir))
3159 gSystem->mkdir(fQueryDir, kTRUE);
3160 if (gProofDebugLevel > 0)
3161 Info(
"SetupCommon",
"queries dir is %s", fQueryDir.Data());
3164 fQueryLock =
new TProofLockPath(TString::Format(
"%s/%s%s-%s",
3165 gSystem->TempDirectory(),
3166 kPROOF_QueryLockFile, fSessionTag.Data(),
3167 TString(fQueryDir).ReplaceAll(
"/",
"%").Data()));
3170 fQMgr =
new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
3175 fImage = gEnv->GetValue(
"ProofServ.Image",
"");
3180 TMessage m(kPROOF_SESSIONTAG);
3181 m << fTopSessionTag << fGroup << fUser;
3184 fGroupPriority = GetPriority();
3186 TPluginHandler *h = 0;
3187 TString dsms = gEnv->GetValue(
"Proof.DataSetManager",
"");
3188 if (!dsms.IsNull()) {
3191 while (dsms.Tokenize(dsm, from,
",")) {
3192 if (fDataSetManager && !fDataSetManager->TestBit(TObject::kInvalidObject)) {
3193 Warning(
"SetupCommon",
"a valid dataset manager already initialized");
3194 Warning(
"SetupCommon",
"support for multiple managers not yet available");
3198 if (gROOT->GetPluginManager()) {
3200 h = gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
3201 if (h && h->LoadPlugin() != -1) {
3204 reinterpret_cast<TDataSetManager*
>(h->ExecPlugin(3, fGroup.Data(),
3205 fUser.Data(), dsm.Data()));
3210 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
3211 Warning(
"SetupCommon",
"dataset manager plug-in initialization failed");
3212 SendAsynMessage(
"TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3213 SafeDelete(fDataSetManager);
3217 TString opts(
"Av:");
3218 TString dsetdir = gEnv->GetValue(
"ProofServ.DataSetDir",
"");
3219 if (dsetdir.IsNull()) {
3221 dsetdir.Form(
"%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
3222 if (gSystem->AccessPathName(fDataSetDir))
3223 gSystem->MakeDirectory(fDataSetDir);
3228 h = gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
3229 if (h && h->LoadPlugin() == -1) h = 0;
3233 TString oo = TString::Format(
"dir:%s opt:%s", dsetdir.Data(), opts.Data());
3234 fDataSetManager =
reinterpret_cast<TDataSetManager*
>(h->ExecPlugin(3,
3235 fGroup.Data(), fUser.Data(), oo.Data()));
3237 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
3238 Warning(
"SetupCommon",
"default dataset manager plug-in initialization failed");
3239 SafeDelete(fDataSetManager);
3243 TString dsReqCfg = gEnv->GetValue(
"Proof.DataSetStagingRequests",
"");
3244 if (!dsReqCfg.IsNull()) {
3245 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
3247 if (reReqDir.Match(dsReqCfg) == 5) {
3249 dsDirFmt.Form(
"dir:%s perms:open", reReqDir[3].Data());
3250 fDataSetStgRepo =
new TDataSetManagerFile(
"_stage_",
"_stage_",
3252 if (fDataSetStgRepo &&
3253 fDataSetStgRepo->TestBit(TObject::kInvalidObject)) {
3254 Warning(
"SetupCommon",
3255 "failed init of dataset staging requests repository");
3256 SafeDelete(fDataSetStgRepo);
3259 Warning(
"SetupCommon",
3260 "specify, with [dir:]<path>, a valid path for staging requests");
3262 }
else if (gProofDebugLevel > 0) {
3263 Warning(
"SetupCommon",
"no repository for staging requests available");
3268 TString quotas = gEnv->GetValue(TString::Format(
"ProofServ.UserQuotas.%s", fUser.Data()),
"");
3269 if (quotas.IsNull())
3270 quotas = gEnv->GetValue(TString::Format(
"ProofServ.UserQuotasByGroup.%s", fGroup.Data()),
"");
3271 if (quotas.IsNull())
3272 quotas = gEnv->GetValue(
"ProofServ.UserQuotas",
"");
3273 if (!quotas.IsNull()) {
3277 while (quotas.Tokenize(tok, from,
" ")) {
3279 if (tok.BeginsWith(
"maxquerykept=")) {
3280 tok.ReplaceAll(
"maxquerykept=",
"");
3282 fMaxQueries = tok.Atoi();
3285 "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3288 const char *ksz[2] = {
"hwmsz=",
"maxsz="};
3289 for (Int_t j = 0; j < 2; j++) {
3290 if (tok.BeginsWith(ksz[j])) {
3291 tok.ReplaceAll(ksz[j],
"");
3293 if (!tok.IsDigit()) {
3296 const char *s[3] = {
"k",
"m",
"g"};
3297 Int_t i = 0, ki = 1024;
3299 if (tok.EndsWith(s[i++]))
3304 tok.Remove(tok.Length()-1);
3306 if (tok.IsDigit()) {
3308 fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3310 fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3312 TString ssz(ksz[j], strlen(ksz[j])-1);
3313 Info(
"SetupCommon",
"parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3321 if (IsMaster() && fQMgr)
3322 if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
3323 Warning(
"SetupCommon",
"problems applying fMaxQueries");
3326 if (fProtocol > 12) {
3327 TString vac = gROOT->GetVersion();
3328 vac += TString::Format(
":%s", gROOT->GetGitCommit());
3329 TString rtag = gEnv->GetValue(
"ProofServ.RootVersionTag",
"");
3330 if (rtag.Length() > 0)
3331 vac += TString::Format(
":%s", rtag.Data());
3332 vac += TString::Format(
"|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
3333 TMessage m(kPROOF_VERSARCHCOMP);
3339 TString all_vars(gSystem->Getenv(
"PROOF_ALLVARS"));
3342 while (all_vars.Tokenize(name, from,
",")) {
3343 if (!name.IsNull()) {
3344 TString value = gSystem->Getenv(name);
3345 TProof::AddEnvVar(name, value);
3349 if (fgLogToSysLog > 0) {
3351 if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
3352 fgSysLogEntity.Form(
"%s:%s", fUser.Data(), fGroup.Data());
3353 }
else if (!(fUser.IsNull()) && fGroup.IsNull()) {
3354 fgSysLogEntity.Form(
"%s:default", fUser.Data());
3355 }
else if (fUser.IsNull() && !(fGroup.IsNull())) {
3356 fgSysLogEntity.Form(
"undef:%s", fGroup.Data());
3360 s.Form(
"%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
3361 gSystem->Syslog(kLogNotice, s.Data());
3364 if (gProofDebugLevel > 0)
3365 Info(
"SetupCommon",
"successfully completed");
3374 void TProofServ::Terminate(Int_t status)
3376 if (fgLogToSysLog > 0) {
3378 s.Form(
"%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
3379 gSystem->Syslog(kLogNotice, s.Data());
3384 if (!gSystem->GetProcInfo(&pi)){
3385 Info(
"Terminate",
"process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3386 pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
3392 gSystem->ChangeDirectory(
"/");
3394 gSystem->MakeDirectory(fSessionDir+
"/.delete");
3395 gSystem->Exec(TString::Format(
"%s %s", kRM, fSessionDir.Data()));
3400 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
3402 gSystem->ChangeDirectory(
"/");
3404 gSystem->MakeDirectory(fQueryDir+
"/.delete");
3405 gSystem->Exec(TString::Format(
"%s %s", kRM, fQueryDir.Data()));
3408 gSystem->Unlink(fQueryLock->GetName());
3413 fQueryLock->Unlock();
3417 if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
3418 if (UnlinkDataDir(fDataDir))
3419 Info(
"Terminate",
"data directory '%s' has been removed", fDataDir.Data());
3424 TIter next(gSystem->GetListOfFileHandlers());
3426 while ((fh = next())) {
3427 TProofServInputHandler *ih =
dynamic_cast<TProofServInputHandler *
>(fh);
3429 gSystem->RemoveFileHandler(ih);
3433 gSystem->ExitLoop();
3442 Bool_t TProofServ::UnlinkDataDir(
const char *path)
3444 if (!path || strlen(path) <= 0)
return kFALSE;
3446 Bool_t dorm = kTRUE;
3447 void *dirp = gSystem->OpenDirectory(path);
3450 const char *ent = 0;
3451 while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
3452 if (!strcmp(ent,
".") || !strcmp(ent,
".."))
continue;
3453 fpath.Form(
"%s/%s", path, ent);
3455 if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
3456 dorm = UnlinkDataDir(fpath);
3462 gSystem->FreeDirectory(dirp);
3469 if (dorm && gSystem->Unlink(path) != 0)
3470 Warning(
"UnlinkDataDir",
"data directory '%s' is empty but could not be removed", path);
3478 Bool_t TProofServ::IsActive()
3480 return gProofServ ? kTRUE : kFALSE;
3488 TProofServ *TProofServ::This()
3497 Int_t TProofServ::OldAuthSetup(TString &conf)
3499 OldProofServAuthSetup_t oldAuthSetupHook = 0;
3501 if (!oldAuthSetupHook) {
3503 TString authlib =
"libRootAuth";
3506 if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
3508 if (gSystem->Load(authlib) == -1) {
3509 Error(
"OldAuthSetup",
"can't load %s",authlib.Data());
3513 Error(
"OldAuthSetup",
"can't locate %s",authlib.Data());
3518 Func_t f = gSystem->DynFindSymbol(authlib,
"OldProofServAuthSetup");
3520 oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
3522 Error(
"OldAuthSetup",
"can't find OldProofServAuthSetup");
3528 return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
3529 fUser, fOrdinal, conf);
3535 TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
3537 TList *inlist, Long64_t fst,
3538 TDSet *dset,
const char *selec,
3544 fQMgr->IncrementSeqNum();
3545 seqnum = fQMgr->SeqNum();
3549 Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
3551 dset->SetWriteV3(kFALSE);
3554 TProofQueryResult *pqr =
new TProofQueryResult(seqnum, opt, inlist, nent,
3555 fst, dset, selec, elist);
3557 pqr->SetTitle(gSystem->BaseName(fQueryDir));
3561 dset->SetWriteV3(kTRUE);
3569 void TProofServ::SetQueryRunning(TProofQueryResult *pq)
3573 Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3577 Info(
"SetQueryRunning",
"starting query: %d", pq->GetSeqNum());
3580 TString parlist =
"";
3581 fPackMgr->GetEnabledPackages(parlist);
3585 pq->SetRunning(startlog, parlist, fProof->GetParallel());
3588 pq->SetProcessInfo(pq->GetEntries(),
3589 fProof->GetCpuTime(), fProof->GetBytesRead());
3592 pq->SetRunning(startlog, parlist, -1);
3595 pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
3602 void TProofServ::HandleArchive(TMessage *mess, TString *slb)
3605 Info("HandleArchive", "Enter");
3609 (*mess) >> queryref >> path;
3611 if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
3614 if (queryref == "Default") {
3615 fArchivePath = path;
3616 Info(
"HandleArchive",
3617 "default path set to %s", fArchivePath.Data());
3623 TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
3624 TProofQueryResult *pqm = pqr;
3626 if (path.Length() <= 0) {
3627 if (fArchivePath.Length() <= 0) {
3628 Info(
"HandleArchive",
3629 "archive paths are not defined - do nothing");
3633 path.Form(
"%s/session-%s-%d.root",
3634 fArchivePath.Data(), fTopSessionTag.Data(), qry);
3637 path.ReplaceAll(
":q",
"-");
3638 path.Insert(0, TString::Format(
"%s/",fArchivePath.Data()));
3644 if (!pqr || qry < 0) {
3645 TString fout = qdir;
3646 fout +=
"/query-result.root";
3648 TFile *f = TFile::Open(fout,
"READ");
3652 TIter nxk(f->GetListOfKeys());
3654 while ((k = (TKey *)nxk())) {
3655 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
3656 pqr = (TProofQueryResult *) f->Get(k->GetName());
3664 Info(
"HandleArchive",
3665 "file cannot be open (%s)",fout.Data());
3672 PDB(kGlobal, 1) Info("HandleArchive",
3673 "archive path for query
#%d: %s",
3676 if (gSystem->AccessPathName(path))
3677 farc = TFile::Open(path,
"NEW");
3679 farc = TFile::Open(path,
"UPDATE");
3680 if (!farc || !(farc->IsOpen())) {
3681 Info(
"HandleArchive",
3682 "archive file cannot be open (%s)",path.Data());
3688 pqr->SetArchived(path);
3690 pqm->SetArchived(path);
3696 if (qry > -1 && fQMgr)
3697 fQMgr->SaveQuery(pqr);
3700 Info(
"HandleArchive",
3701 "results of query %s archived to file %s",
3702 queryref.Data(), path.Data());
3714 TMap *TProofServ::GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
3721 emsg.Form(
"file collection undefined!");
3728 TIter nxf(fc->GetList());
3729 TFileInfo *fiind = 0;
3731 while ((fiind = (TFileInfo *)nxf())) {
3732 TUrl *xurl = fiind->GetCurrentUrl();
3734 key.Form(
"%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
3735 if (xurl->GetPort() > 0)
3736 key += TString::Format(
":%d", xurl->GetPort());
3740 if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
3742 l = (THashList *) ent->Value();
3748 fcmap->Add(
new TObjString(key.Data()), l);
3761 void TProofServ::HandleProcess(TMessage *mess, TString *slb)
3764 Info("HandleProcess", "Enter");
3767 if (!IsTopMaster() && !IsIdle())
3771 TString filename, opt;
3773 Long64_t nentries, first;
3774 TEventList *evl = 0;
3775 TEntryList *enl = 0;
3778 (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3780 if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
3782 Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
3785 TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
3789 if ((!hasNoData) && elist)
3790 dset->SetEntryList(elist);
3792 if (IsTopMaster()) {
3796 if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
3797 if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
3798 SendAsynMessage(TString::Format(
"AssertDataSet on %s: %s",
3799 fPrefix.Data(), emsg.Data()));
3800 Error(
"HandleProcess",
"AssertDataSet: %s", emsg.Data());
3802 if (sync) SendLogFile();
3805 }
else if (hasNoData) {
3807 TNamed *ftp =
dynamic_cast<TNamed *
>(input->FindObject(
"PROOF_FilesToProcess"));
3809 TString dsn(ftp->GetTitle());
3810 if (!dsn.Contains(
":") || dsn.BeginsWith(
"dataset:")) {
3811 dsn.ReplaceAll(
"dataset:",
"");
3814 if (!fDataSetManager) {
3815 emsg.Form(
"dataset manager not initialized!");
3817 TFileCollection *fc = 0;
3819 if (!(fc = fDataSetManager->GetDataSet(dsn))) {
3820 emsg.Form(
"requested dataset '%s' does not exists", dsn.Data());
3822 TMap *fcmap = GetDataSetNodeMap(fc, emsg);
3826 fcmap->SetOwner(kTRUE);
3827 fcmap->SetName(
"PROOF_FilesToProcess");
3832 if (!emsg.IsNull()) {
3833 SendAsynMessage(TString::Format(
"HandleProcess on %s: %s",
3834 fPrefix.Data(), emsg.Data()));
3835 Error(
"HandleProcess",
"%s", emsg.Data());
3837 if (sync) SendLogFile();
3844 TProofQueryResult *pq = 0;
3848 pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
3851 if (dset) input->Add(dset);
3852 if (elist) input->Add(elist);
3853 pq->SetInputList(input, kTRUE);
3856 input->Clear(
"nodelete");
3860 if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
3861 Warning(
"HandleProcess",
"could not save input data: %s", emsg.Data());
3864 if (!(pq->IsDraw())) {
3866 if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
3868 fQMgr->SaveQuery(pq);
3879 Bool_t enqueued = kFALSE;
3882 if (fProof->UseDynamicStartup()) {
3884 TList* workerList =
new TList();
3885 EQueryAction retVal = GetWorkers(workerList, pc);
3886 if (retVal == TProofServ::kQueryStop) {
3887 Error(
"HandleProcess",
"error getting list of worker nodes");
3889 if (sync) SendLogFile();
3891 }
else if (retVal == TProofServ::kQueryEnqueued) {
3894 Info(
"HandleProcess",
"query %d enqueued", pq->GetSeqNum());
3896 Int_t ret = fProof->AddWorkers(workerList);
3898 Error(
"HandleProcess",
"Adding a list of worker nodes returned: %d",
3901 if (sync) SendLogFile();
3906 EQueryAction retVal = GetWorkers(0, pc);
3907 if (retVal == TProofServ::kQueryStop) {
3908 Error(
"HandleProcess",
"error getting list of worker nodes");
3910 if (sync) SendLogFile();
3912 }
else if (retVal == TProofServ::kQueryEnqueued) {
3915 Info(
"HandleProcess",
"query %d enqueued", pq->GetSeqNum());
3916 }
else if (retVal != TProofServ::kQueryOK) {
3917 Error(
"HandleProcess",
"unknown return value: %d", retVal);
3919 if (sync) SendLogFile();
3927 TMessage m(kPROOF_QUERYSUBMITTED);
3928 if (!sync || enqueued) {
3929 m << pq->GetSeqNum() << kFALSE;
3936 Info(
"HandleProcess",
3937 "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
3945 Bool_t doprocess = kFALSE;
3946 while (WaitingQueries() > 0 && !enqueued) {
3951 if (fProof->UseDynamicStartup())
3960 fProof->ResetMergers();
3965 if (!sync) SendLogFile();
3969 m.Reset(kPROOF_SETIDLE);
3970 Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
3978 if (sync) SendLogFile();
3993 Bool_t deleteplayer = kTRUE;
3997 if (dset && (dset->IsA() == TDSetProxy::Class()))
3998 ((TDSetProxy*)dset)->SetProofServ(
this);
4002 if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
4003 Warning(
"HandleProcess",
"could not get input data: %s", emsg.Data());
4006 if (TProof::GetParameter(input,
"PROOF_QuerySeqNum", fQuerySeqNum) != 0)
4007 Warning(
"HandleProcess",
"could not get query sequential number!");
4011 while ((nord = input->FindObject(
"PROOF_Ordinal")))
4012 input->Remove(nord);
4013 input->Add(
new TNamed(
"PROOF_Ordinal", GetOrdinal()));
4018 while ((o = next())) {
4019 PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
4020 fPlayer->AddInput(o);
4025 TSelector *selector_obj = 0;
4027 while ((obj = nxt())){
4028 if (obj->InheritsFrom(
"TSelector")) {
4029 selector_obj = (TSelector *) obj;
4030 filename = selector_obj->ClassName();
4031 Info(
"HandleProcess",
"selector obj for '%s' found", selector_obj->ClassName());
4037 fSocket->Send(kPROOF_STARTPROCESS);
4041 fSaveOutput.Reset();
4044 PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
4047 Info(
"HandleProcess",
"calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4048 fPlayer->Process(dset, selector_obj, opt, nentries, first);
4051 Info(
"HandleProcess",
"calling fPlayer->Process() with selector name: %s", filename.Data());
4052 fPlayer->Process(dset, filename, opt, nentries, first);
4056 TMessage m(kPROOF_STOPPROCESS);
4057 Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
4058 if (fProtocol > 18) {
4059 TProofProgressStatus* status =
4060 new TProofProgressStatus(fPlayer->GetEventsProcessed(),
4061 gPerfStats?gPerfStats->GetBytesRead():0);
4063 m << status << abort;
4065 slb->Form(
"%d %lld %lld", fPlayer->GetExitStatus(),
4066 status->GetEntries(), status->GetBytesRead());
4069 m << fPlayer->GetEventsProcessed() << abort;
4071 slb->Form(
"%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
4076 Info("TProofServ::Handleprocess",
4077 "worker %s has finished processing with %d objects in output list",
4078 GetOrdinal(), fPlayer->GetOutputList()->GetEntries());
4085 Bool_t outok = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted &&
4086 fPlayer->GetOutputList()) ? kTRUE : kFALSE;
4090 Bool_t isSubMerging = kFALSE;
4094 if (TProof::GetParameter(input,
"PROOF_UseMergers", nm) == 0) {
4095 isSubMerging = (nm >= 0) ? kTRUE : kFALSE;
4097 if (!isSubMerging) {
4098 cso = gEnv->GetValue(
"Proof.ControlSendOutput", 1);
4099 if (TProof::GetParameter(input,
"PROOF_ControlSendOutput", cso) != 0)
4100 cso = gEnv->GetValue(
"Proof.ControlSendOutput", 1);
4107 TMessage msg(kPROOF_SENDOUTPUT);
4114 deleteplayer = kFALSE;
4117 Info("HandleProcess", "controlled mode: worker %s has finished,"
4118 " sizes sent to master", fOrdinal.Data());
4122 if (TestBit(TProofServ::kHighMemory)) {
4124 Info(
"HandleProcess",
"submerging disabled because of high-memory case");
4125 isSubMerging = kFALSE;
4127 PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isSubMerging);
4130 if (!IsMaster() && isSubMerging) {
4138 TMessage msg_osize(kPROOF_SUBMERGER);
4139 msg_osize << Int_t(TProof::kOutputSize);
4140 msg_osize << fPlayer->GetOutputList()->GetEntries();
4142 fMergingSocket =
new TServerSocket(0);
4143 Int_t merge_port = 0;
4144 if (fMergingSocket) {
4146 Info("HandleProcess", "possible port for merging connections: %d",
4147 fMergingSocket->GetLocalPort());
4148 merge_port = fMergingSocket->GetLocalPort();
4150 msg_osize << merge_port;
4151 fSocket->Send(msg_osize);
4157 deleteplayer = kFALSE;
4159 PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
4164 PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
4165 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
4166 Warning("HandleProcess","problems sending output list");
4169 if (IsMaster()) fProof->ResetMergers();
4172 fSocket->Send(kPROOF_SETIDLE);
4187 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
4188 Warning(
"HandleProcess",
"the output list is empty!");
4189 if (SendResults(fSocket) != 0)
4190 Warning(
"HandleProcess",
"problems sending output list");
4193 if (IsMaster()) fProof->ResetMergers();
4196 fSocket->Send(kPROOF_SETIDLE);
4207 while ((obj = nex())) {
4208 if (obj->InheritsFrom(
"TSelector")) input->Remove(obj);
4212 fPlayer->GetInputList()->SetOwner(0);
4215 TList *added =
dynamic_cast<TList *
>(input->FindObject(
"PROOF_InputObjsFromFile"));
4217 if (added->GetSize() > 0) {
4219 TFile *f =
dynamic_cast<TFile *
>(added->Last());
4223 while ((o = nxo())) { input->Remove(o); }
4224 input->Remove(added);
4225 added->SetOwner(kFALSE);
4237 if (deleteplayer) DeletePlayer();
4240 PDB(kGlobal, 1) Info("HandleProcess", "done");
4249 Int_t TProofServ::SendResults(TSocket *sock, TList *outlist, TQueryResult *pq)
4251 PDB(kOutput, 2) Info("SendResults", "enter");
4254 if (fProtocol > 23 && outlist) {
4258 TMessage mbuf(kPROOF_OUTPUTOBJECT);
4260 Int_t olsz = outlist->GetSize();
4261 if (IsTopMaster() && pq) {
4262 msg.Form(
"%s: merging output objects ... done ",
4264 SendAsynMessage(msg.Data());
4266 msg.Form(
"%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4267 SendAsynMessage(msg.Data(), kFALSE);
4270 mbuf.WriteObject(pq);
4271 if (sock->Send(mbuf) < 0)
return -1;
4274 Int_t ns = 0, np = 0;
4277 Int_t totsz = 0, objsz = 0;
4279 while ((o = nxo())) {
4280 if (mbuf.Length() > fMsgSizeHWM) {
4283 "message has %d bytes: limit of %lld bytes reached - sending ...",
4284 mbuf.Length(), fMsgSizeHWM);
4287 if (GetCompressionLevel() > 0) {
4288 mbuf.SetCompressionSettings(fCompressMsg);
4290 objsz = mbuf.CompLength();
4292 objsz = mbuf.Length();
4295 if (IsTopMaster()) {
4296 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4297 fPrefix.Data(), ns, olsz, objsz);
4298 SendAsynMessage(msg.Data(), kFALSE);
4300 if (sock->Send(mbuf) < 0)
return -1;
4307 mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4313 if (GetCompressionLevel() > 0) {
4314 mbuf.SetCompressionSettings(fCompressMsg);
4316 objsz = mbuf.CompLength();
4318 objsz = mbuf.Length();
4321 if (IsTopMaster()) {
4322 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4323 fPrefix.Data(), ns, olsz, objsz);
4324 SendAsynMessage(msg.Data(), kFALSE);
4326 if (sock->Send(mbuf) < 0)
return -1;
4328 if (IsTopMaster()) {
4330 msg.Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4331 fPrefix.Data(), olsz, totsz);
4332 SendAsynMessage(msg.Data());
4334 }
else if (fProtocol > 10 && outlist) {
4338 TMessage mbuf(kPROOF_OUTPUTOBJECT);
4340 Int_t olsz = outlist->GetSize();
4341 if (IsTopMaster() && pq) {
4342 msg.Form(
"%s: merging output objects ... done ",
4344 SendAsynMessage(msg.Data());
4346 msg.Form(
"%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4347 SendAsynMessage(msg.Data(), kFALSE);
4350 mbuf.WriteObject(pq);
4351 if (sock->Send(mbuf) < 0)
return -1;
4355 Int_t totsz = 0, objsz = 0;
4356 TIter nxo(fPlayer->GetOutputList());
4358 while ((o = nxo())) {
4361 Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
4363 mbuf.WriteObject(o);
4366 if (GetCompressionLevel() > 0) {
4367 mbuf.SetCompressionSettings(fCompressMsg);
4369 objsz = mbuf.CompLength();
4371 objsz = mbuf.Length();
4374 if (IsTopMaster()) {
4375 msg.Form(
"%s: objects merged; sending obj %d/%d (%d bytes) ",
4376 fPrefix.Data(), ns, olsz, objsz);
4377 SendAsynMessage(msg.Data(), kFALSE);
4379 if (sock->Send(mbuf) < 0)
return -1;
4382 if (IsTopMaster()) {
4384 msg.Form(
"%s: grand total: sent %d objects, size: %d bytes ",
4385 fPrefix.Data(), olsz, totsz);
4386 SendAsynMessage(msg.Data());
4389 }
else if (IsTopMaster() && fProtocol > 6 && outlist) {
4392 TMessage mbuf(kPROOF_OUTPUTLIST);
4393 mbuf.WriteObject(pq);
4395 Int_t blen = mbuf.CompLength();
4396 Int_t olsz = outlist->GetSize();
4398 msg.Form(
"%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
4399 SendAsynMessage(msg.Data(), kFALSE);
4400 if (sock->Send(mbuf) < 0)
return -1;
4404 PDB(kGlobal, 2) Info("SendResults", "sending output list");
4406 PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
4408 if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
4411 PDB(kOutput,2) Info("SendResults", "done");
4421 void TProofServ::ProcessNext(TString *slb)
4424 TString filename, opt;
4426 Long64_t nentries = -1, first = 0;
4429 TProofQueryResult *pq = 0;
4432 TSelector* selector_obj = 0;
4446 opt = pq->GetOptions();
4447 input = pq->GetInputList();
4448 nentries = pq->GetEntries();
4449 first = pq->GetFirst();
4450 filename = pq->GetSelecImp()->GetName();
4451 Ssiz_t
id = opt.Last(
'#');
4452 if (
id != kNPOS &&
id < opt.Length() - 1) {
4453 filename += opt(
id + 1, opt.Length());
4459 if ((o = pq->GetInputObject(
"TDSet"))) {
4463 Error(
"ProcessNext",
"no TDset object: cannot continue");
4473 if (pq->GetSelecImp()) {
4474 gSystem->Exec(TString::Format(
"%s %s", kRM, pq->GetSelecImp()->GetName()));
4475 pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
4477 if (pq->GetSelecHdr() &&
4478 !strstr(pq->GetSelecHdr()->GetName(),
"TProofDrawHist")) {
4479 gSystem->Exec(TString::Format(
"%s %s", kRM, pq->GetSelecHdr()->GetName()));
4480 pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
4485 while ((obj = nxt())){
4486 if (obj->InheritsFrom(
"TSelector") &&
4487 !strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
4488 selector_obj = (TSelector *) obj;
4489 Info(
"ProcessNext",
"found object for selector '%s'", obj->ClassName());
4496 Error(
"ProcessNext",
"empty waiting queries list!");
4501 SetQueryRunning(pq);
4505 if (!(pq->IsDraw()))
4506 fQMgr->SaveQuery(pq);
4508 fQMgr->IncrementDrawQueries();
4513 TMessage m(kPROOF_STARTPROCESS);
4514 m << TString(pq->GetSelecImp()->GetName())
4515 << dset->GetNumOfFiles()
4516 << pq->GetFirst() << pq->GetEntries();
4523 fPlayer->AddQueryResult(pq);
4526 fPlayer->SetCurrentQuery(pq);
4529 if (dset->IsA() == TDSetProxy::Class())
4530 ((TDSetProxy*)dset)->SetProofServ(
this);
4534 TString qid = TString::Format(
"%s:%s",pq->GetTitle(),pq->GetName());
4535 input->Add(
new TNamed(
"PROOF_QueryTag", qid.Data()));
4537 fQuerySeqNum = pq->GetSeqNum();
4538 input->Add(
new TParameter<Int_t>(
"PROOF_QuerySeqNum", fQuerySeqNum));
4542 if (gEnv->Lookup(
"Proof.UseMergers") && !input->FindObject(
"PROOF_UseMergers")) {
4543 Int_t smg = gEnv->GetValue(
"Proof.UseMergers",-1);
4545 input->Add(
new TParameter<Int_t>(
"PROOF_UseMergers", smg));
4546 PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
4547 if (gEnv->Lookup("Proof.MergersByHost")) {
4548 Int_t mbh = gEnv->GetValue(
"Proof.MergersByHost", 0);
4552 if ((o = input->FindObject(
"PROOF_MergersByHost"))) { input->Remove(o);
delete o; }
4553 input->Add(
new TParameter<Int_t>(
"PROOF_MergersByHost", mbh));
4554 PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
4563 while ((o = next())) {
4564 PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
4565 fPlayer->AddInput(o);
4569 if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
4572 PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
4574 Info(
"ProcessNext",
"calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4575 fPlayer->Process(dset, selector_obj, opt, nentries, first);
4578 Info(
"ProcessNext",
"calling fPlayer->Process() with selector name: %s", filename.Data());
4579 fPlayer->Process(dset, filename, opt, nentries, first);
4583 fPlayer->SetMerging(kFALSE);
4587 (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
4588 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
4589 m.Reset(kPROOF_STOPPROCESS);
4591 if (fProtocol > 18) {
4592 TProofProgressStatus* status = fPlayer->GetProgressStatus();
4593 m << status << abort;
4595 }
else if (fProtocol > 8) {
4596 m << fPlayer->GetEventsProcessed() << abort;
4598 m << fPlayer->GetEventsProcessed();
4604 if (fDataSetManager && fPlayer->GetOutputList()) {
4605 TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject(
"PROOFSERV_RegisterDataSet");
4608 if (RegisterDataSets(input, fPlayer->GetOutputList(), fDataSetManager, emsg) != 0)
4609 Warning(
"ProcessNext",
"problems registering produced datasets: %s", emsg.Data());
4611 fPlayer->GetOutputList()->Remove(psr);
4613 }
while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject(
"PROOFSERV_RegisterDataSet")));
4618 if (fQMgr && !pq->IsDraw()) {
4619 if (!abort) fProof->AskStatistics();
4620 if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
4621 fQMgr->SaveQuery(pq, fMaxQueries);
4626 if (IsTopMaster() && fPlayer->GetOutputList()) {
4627 Bool_t save = kTRUE;
4628 TIter nxo(fPlayer->GetOutputList());
4630 while ((xo = nxo())) {
4631 if (xo->InheritsFrom(
"TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
4637 TNamed *nof = (TNamed *) input->FindObject(
"PROOF_DefaultOutputOption");
4639 TString oopt(nof->GetTitle());
4640 if (oopt.BeginsWith(
"of:")) {
4641 oopt.Replace(0, 3,
"");
4642 if (!oopt.IsNull()) fPlayer->SetOutputFilePath(oopt);
4643 fPlayer->SavePartialResults(kTRUE, kTRUE);
4650 TQueryResult *pqr = pq->CloneInfo();
4652 Info(
"ProcessNext",
"adding info about dataset '%s' in the light query result", dset->GetName());
4654 TDSet *ds =
new TDSet(dset->GetName(), dset->GetObjName());
4656 if (pqr) pqr->SetInputList(&rin, kTRUE);
4657 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
4659 Info("ProcessNext", "sending results");
4660 TQueryResult *xpq = (pqr && fProtocol > 10) ? pqr : pq;
4661 if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
4662 Warning("ProcessNext", "problems sending output list");
4663 if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
4664 pq->GetBytes(), pq->GetUsedCPU());
4666 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
4667 Warning(
"ProcessNext",
"the output list is empty!");
4668 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
4669 Warning(
"ProcessNext",
"problems sending output list");
4670 if (slb) slb->Form(
"%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
4674 if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
4676 if (fQMgr) fQMgr->RemoveQuery(pq);
4679 if (!(pq->IsDraw()) && pqr) {
4680 if (fQMgr && fQMgr->Queries()) {
4681 fQMgr->Queries()->Add(pqr);
4683 fQMgr->Queries()->Remove(pq);
4688 fPlayer->RemoveQueryResult(TString::Format(
"%s:%s",
4689 pq->GetTitle(), pq->GetName()));
4694 if (IsMaster() && fProof->UseDynamicStartup())
4696 fProof->RemoveWorkers(0);
4702 Int_t TProofServ::RegisterDataSets(TList *in, TList *out,
4703 TDataSetManager *dsm, TString &msg)
4706 ::Info(
"TProofServ::RegisterDataSets",
4707 "enter: %d objs in the output list", (out ? out->GetSize() : -1));
4709 if (!in || !out || !dsm) {
4710 ::Error(
"TProofServ::RegisterDataSets",
"invalid inputs: %p, %p, %p", in, out, dsm);
4718 while ((o = nxo())) {
4720 TFileCollection *ds =
dynamic_cast<TFileCollection*
> (o);
4723 ds->SetTitle(gSystem->HostName());
4726 TString tag = TString::Format(
"DATASET_%s", ds->GetName());
4727 if (!(fcn = (TNamed *) out->FindObject(tag)))
continue;
4729 if (tags.FindObject(tag)) {
4734 TString regopt(fcn->GetTitle());
4736 if (regopt.Contains(
":sortidx:")) {
4738 regopt.ReplaceAll(
":sortidx:",
"");
4741 if (dsm->TestBit(TDataSetManager::kAllowRegister)) {
4743 if (ds->GetList()->GetSize() > 0) {
4745 const char *vfmsg = regopt.Contains(
"V") ?
" and verifying" :
"";
4746 msg.Form(
"Registering%s dataset '%s' ... ", vfmsg, ds->GetName());
4748 Bool_t allowVerify = dsm->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
4749 if (regopt.Contains(
"V") && !allowVerify) dsm->SetBit(TDataSetManager::kAllowVerify);
4751 Int_t rc = dsm->RegisterDataSet(ds->GetName(), ds, regopt);
4753 if (regopt.Contains(
"V") && !allowVerify) dsm->ResetBit(TDataSetManager::kAllowVerify);
4755 ::Warning(
"TProofServ::RegisterDataSets",
4756 "failure registering or verifying dataset '%s'", ds->GetName());
4757 msg.Form(
"Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->GetName());
4759 ::Info(
"TProofServ::RegisterDataSets",
"dataset '%s' successfully registered%s",
4760 ds->GetName(), (strlen(vfmsg) > 0) ?
" and verified" :
"");
4761 msg.Form(
"Registering%s dataset '%s' ... OK", vfmsg, ds->GetName());
4764 tags.Add(
new TObjString(tag));
4768 ::Info(
"TProofServ::RegisterDataSets",
"printing collection");
4772 ::Warning(
"TProofServ::RegisterDataSets",
"collection '%s' is empty", o->GetName());
4775 ::Info(
"TProofServ::RegisterDataSets",
"dataset registration not allowed");
4782 while ((o = nxrm())) out->Remove(o);
4783 torm.SetOwner(kTRUE);
4786 while((o = nxtg())) {
4788 while ((oo = out->FindObject(o->GetName()))) { out->Remove(oo); }
4790 tags.SetOwner(kTRUE);
4792 PDB(kDataset, 1) ::Info(
"TProofServ::RegisterDataSets",
"exit");
4800 void TProofServ::HandleQueryList(TMessage *mess)
4803 Info("HandleQueryList", "Enter");
4808 TList *ql = new TList;
4809 Int_t ntot = 0, npre = 0, ndraw= 0;
4813 TString qdir = fQueryDir;
4814 Int_t idx = qdir.Index(
"session-");
4817 fQMgr->ScanPreviousQueries(qdir);
4819 if (fQMgr->PreviousQueries()) {
4820 TIter nxq(fQMgr->PreviousQueries());
4821 TProofQueryResult *pqr = 0;
4822 while ((pqr = (TProofQueryResult *)nxq())) {
4824 pqr->fSeqNum = ntot;
4831 if (fQMgr->Queries()) {
4833 TIter nxq(fQMgr->Queries());
4834 TProofQueryResult *pqr = 0;
4835 TQueryResult *pqm = 0;
4836 while ((pqr = (TProofQueryResult *)nxq())) {
4838 if ((pqm = pqr->CloneInfo())) {
4839 pqm->fSeqNum = ntot;
4842 Warning(
"HandleQueryList",
"unable to clone TProofQueryResult '%s:%s'",
4843 pqr->GetName(), pqr->GetTitle());
4848 ndraw = fQMgr->DrawQueries();
4851 TMessage m(kPROOF_QUERYLIST);
4852 m << npre << ndraw << ql;
4863 void TProofServ::HandleRemove(TMessage *mess, TString *slb)
4866 Info("HandleRemove", "Enter");
4869 (*mess) >> queryref;
4871 if (slb) *slb = queryref;
4873 if (queryref == "cleanupqueue") {
4875 Int_t pend = CleanupWaitingQueries();
4877 Info(
"HandleRemove",
"%d queries removed from the waiting list", pend);
4882 if (queryref ==
"cleanupdir") {
4885 Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
4888 Info(
"HandleRemove",
"%d directories removed", nd);
4895 TProofLockPath *lck = 0;
4896 if (fQMgr->LockSession(queryref, &lck) == 0) {
4900 fQMgr->RemoveQuery(queryref, &qtorm);
4901 CleanupWaitingQueries(kFALSE, &qtorm);
4905 gSystem->Unlink(lck->GetName());
4913 Warning(
"HandleRemove",
"query result manager undefined!");
4917 Info(
"HandleRemove",
4918 "query %s could not be removed (unable to lock session)", queryref.Data());
4927 void TProofServ::HandleRetrieve(TMessage *mess, TString *slb)
4930 Info("HandleRetrieve", "Enter");
4933 (*mess) >> queryref;
4935 if (slb) *slb = queryref;
4940 if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
4942 TString fout = qdir;
4943 fout += "/query-result.root";
4945 TFile *f = TFile::Open(fout,"READ");
4946 TProofQueryResult *pqr = 0;
4949 TIter nxk(f->GetListOfKeys());
4951 while ((k = (TKey *)nxk())) {
4952 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
4953 pqr = (TProofQueryResult *) f->Get(k->GetName());
4955 if (pqr && fProtocol < 13) {
4958 TIter nxi(pqr->GetInputList());
4960 if ((d = dynamic_cast<TDSet *>(o)))
4962 d->SetWriteV3(kTRUE);
4967 Float_t qsz = (Float_t) f->GetSize();
4969 static const char *clb[4] = {
"bytes",
"KB",
"MB",
"GB" };
4970 while (qsz > 1000. && ilb < 3) {
4974 SendAsynMessage(TString::Format(
"%s: sending result of %s:%s (%.1f %s)",
4975 fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
4977 fSocket->SendObject(pqr, kPROOF_RETRIEVE);
4979 Info(
"HandleRetrieve",
4980 "query not found in file %s",fout.Data());
4982 fSocket->SendObject(0, kPROOF_RETRIEVE);
4990 Info(
"HandleRetrieve",
4991 "file cannot be open (%s)",fout.Data());
4993 fSocket->SendObject(0, kPROOF_RETRIEVE);
5004 Int_t TProofServ::HandleLibIncPath(TMessage *mess)
5010 (*mess) >> type >> add >> path;
5011 if (mess->BufferSize() > mess->Length()) (*mess) >> rc;
5014 if ((type !=
"lib") && (type !=
"inc")) {
5015 Error(
"HandleLibIncPath",
"unknown action type: %s", type.Data());
5020 path.ReplaceAll(
",",
" ");
5024 if (path.Length() > 0 && path !=
"-") {
5025 if (!(op = path.Tokenize(
" "))) {
5026 Error(
"HandleLibIncPath",
"decomposing path %s", path.Data());
5033 if (type ==
"lib") {
5036 TIter nxl(op, kIterBackward);
5037 TObjString *lib = 0;
5038 while ((lib = (TObjString *) nxl())) {
5040 TString xlib = lib->GetName();
5041 gSystem->ExpandPathName(xlib);
5043 if (!gSystem->AccessPathName(xlib, kReadPermission)) {
5044 TString newlibpath = gSystem->GetDynamicPath();
5047 if (newlibpath.BeginsWith(
".:"))
5049 if (newlibpath.Index(xlib) == kNPOS) {
5050 newlibpath.Insert(pos,TString::Format(
"%s:", xlib.Data()));
5051 gSystem->SetDynamicPath(newlibpath);
5054 Info(
"HandleLibIncPath",
5055 "libpath %s does not exist or cannot be read - not added", xlib.Data());
5061 fProof->AddDynamicPath(path);
5067 TObjString *inc = 0;
5068 while ((inc = (TObjString *) nxi())) {
5070 TString xinc = inc->GetName();
5071 gSystem->ExpandPathName(xinc);
5073 if (!gSystem->AccessPathName(xinc, kReadPermission)) {
5074 TString curincpath = gSystem->GetIncludePath();
5075 if (curincpath.Index(xinc) == kNPOS)
5076 gSystem->AddIncludePath(TString::Format(
"-I%s", xinc.Data()));
5078 Info(
"HandleLibIncPath",
5079 "incpath %s does not exist or cannot be read - not added", xinc.Data());
5084 fProof->AddIncludePath(path);
5090 if (type ==
"lib") {
5094 TObjString *lib = 0;
5095 while ((lib = (TObjString *) nxl())) {
5097 TString xlib = lib->GetName();
5098 gSystem->ExpandPathName(xlib);
5100 TString newlibpath = gSystem->GetDynamicPath();
5101 newlibpath.ReplaceAll(TString::Format(
"%s:", xlib.Data()),
"");
5102 gSystem->SetDynamicPath(newlibpath);
5107 fProof->RemoveDynamicPath(path);
5113 TObjString *inc = 0;
5114 while ((inc = (TObjString *) nxi())) {
5115 TString newincpath = gSystem->GetIncludePath();
5116 newincpath.ReplaceAll(TString::Format(
"-I%s", inc->GetName()),
"");
5118 newincpath.ReplaceAll(gInterpreter->GetIncludePath(),
"");
5119 gSystem->SetIncludePath(newincpath);
5124 fProof->RemoveIncludePath(path);
5134 void TProofServ::HandleCheckFile(TMessage *mess, TString *slb)
5138 UInt_t opt = TProof::kUntar;
5140 TMessage reply(kPROOF_CHECKFILE);
5143 (*mess) >> filenam >> md5;
5144 if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
5147 if (slb) *slb = filenam;
5149 if (filenam.BeginsWith(
"-")) {
5153 Bool_t err = kFALSE;
5154 filenam = filenam.Strip(TString::kLeading,
'-');
5155 TString packnam = filenam;
5156 packnam.Remove(packnam.Length() - 4);
5158 TMD5 *md5local = fPackMgr->GetMD5(packnam);
5159 if (md5local && md5 == (*md5local)) {
5160 if ((opt & TProof::kRemoveOld)) {
5161 if ((st = fPackMgr->Clean(packnam)))
5162 Error(
"HandleCheckFile",
"failure cleaning %s", packnam.Data());
5165 st = fPackMgr->Unpack(packnam, md5local);
5170 Info("HandleCheckFile",
5171 "package %s installed on node", filenam.Data());
5174 Error(
"HandleCheckFile",
"gunzip not found");
5176 Error(
"HandleCheckFile",
"package %s did not unpack into %s",
5177 filenam.Data(), packnam.Data());
5180 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5185 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5188 Error("HandleCheckFile",
5189 "package %s not yet on node", filenam.Data());
5200 fPackMgr->Remove(filenam);
5201 }
else if (IsMaster()) {
5204 fPackMgr->GetParPath(filenam, parpath);
5205 if (fProof->UploadPackage(parpath,
5206 (TProof::EUploadPackageOpt)opt) != 0)
5207 Info(
"HandleCheckFile",
5208 "problems uploading package %s", parpath.Data());
5211 fSocket->Send(reply);
5213 }
else if (filenam.BeginsWith(
"+") || filenam.BeginsWith(
"=")) {
5214 filenam.Remove(0,1);
5216 TString parname = filenam;
5218 TFile::EFileType ft = TFile::GetType(filenam);
5219 if (ft == TFile::kWeb || ft == TFile::kNet) {
5220 parname = gSystem->BaseName(filenam);
5221 if (fPackMgr->Install(filenam) < 0) {
5222 Warning(
"HandleCheckFile",
5223 "problems installing package %s", filenam.Data());
5229 TMD5 *md5local = fPackMgr->ReadMD5(parname);
5231 if (md5local && md5 == (*md5local)) {
5235 Info("HandleCheckFile",
5236 "package %s already on node", parname.Data());
5239 TString par = filenam;
5240 if (ft != TFile::kWeb && ft != TFile::kNet) {
5241 xrc = fPackMgr->GetParPath(filenam, par);
5244 if (fProof->UploadPackage(par) != 0)
5245 Warning(
"HandleCheckFile",
5246 "problems uploading package %s", par.Data());
5252 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5254 Info("HandleCheckFile",
5255 "package %s not yet on node", filenam.Data());
5258 fSocket->Send(reply);
5262 TString cachef = fCacheDir +
"/" + filenam;
5264 TMD5 *md5local = TMD5::FileChecksum(cachef);
5266 if (md5local && md5 == (*md5local)) {
5269 Info("HandleCheckFile", "file %s already on node", filenam.Data());
5272 if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5274 Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
5277 fSocket->Send(reply);
5278 fCacheLock->Unlock();
5285 Int_t TProofServ::HandleCache(TMessage *mess, TString *slb)
5288 Info("HandleCache", "Enter");
5292 Bool_t all = kFALSE;
5294 Bool_t fromglobal = kFALSE;
5295 Int_t chkveropt = TPackMgr::kCheckROOT;
5296 TPackMgr *packmgr = 0;
5300 const
char *k = (IsMaster()) ? "Mst" : "Wrk";
5301 noth.Form("%s-%s", k, fOrdinal.Data());
5304 TString packagedir, package, pdir, ocwd, file;
5307 case TProof::kShowCache:
5309 printf(
"*** File cache %s:%s ***\n", gSystem->HostName(),
5313 gSystem->Exec(TString::Format(
"%s -a %s", kLS, fCacheDir.Data()));
5315 gSystem->Exec(TString::Format(
"%s %s", kLS, fCacheDir.Data()));
5317 if (IsMaster() && all)
5318 fProof->ShowCache(all);
5320 if (slb) slb->Form(
"%d %d", type, all);
5322 case TProof::kClearCache:
5324 if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
5326 if (file.IsNull() || file ==
"*") {
5327 gSystem->Exec(TString::Format(
"%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
5329 gSystem->Exec(TString::Format(
"%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
5331 fCacheLock->Unlock();
5333 fProof->ClearCache(file);
5334 if (slb) slb->Form(
"%d %s", type, file.Data());
5336 case TProof::kShowPackages:
5339 if (IsMaster() && all)
5340 fProof->ShowPackages(all);
5342 if (slb) slb->Form(
"%d %d", type, all);
5344 case TProof::kClearPackages:
5345 if ((status = fPackMgr->Unload(0)) == 0) {
5348 status = fProof->ClearPackages();
5350 if (slb) slb->Form(
"%d %d", type, status);
5352 case TProof::kClearPackage:
5354 if ((status = fPackMgr->Unload(package)) == 0) {
5355 fPackMgr->Remove(package);
5357 status = fProof->ClearPackage(package);
5359 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5361 case TProof::kBuildPackage:
5363 if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
5365 if (!(packmgr = TPackMgr::GetPackMgr(package.Data(), fPackMgr))) {
5367 SendAsynMessage(TString::Format(
"%s: kBuildPackage: failure locating %s ...",
5368 noth.Data(), package.Data()));
5372 fromglobal = (packmgr == fPackMgr) ? kFALSE : kTRUE;
5373 packagedir = packmgr->GetTitle();
5375 if (IsMaster() && !fromglobal) {
5378 Int_t xrc = packmgr->GetParPath(package, par);
5379 if (xrc != 0 || fProof->UploadPackage(par) != 0) {
5380 Warning(
"HandleCache",
5381 "kBuildPackage: problems forwarding package %s to workers", package.Data());
5382 SendAsynMessage(TString::Format(
"%s: kBuildPackage: problems forwarding package %s to workers ...",
5383 noth.Data(), package.Data()));
5391 "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
5395 fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
5398 status = packmgr->Build(package.Data(), chkveropt);
5403 SendAsynMessage(TString::Format(
"%s: failure building %s ... (status: %d)", noth.Data(), package.Data(), status));
5407 status = fProof->BuildPackage(package, TProof::kCollectBuildResults);
5409 Info("HandleCache", "package %s successfully built", package.Data());
5411 if (slb) slb->Form("%d %s %d %d", type, package.Data(), status, chkveropt);
5414 case TProof::kLoadPackage:
5418 if ((mess->BufferSize() > mess->Length())) (*mess) >> optls;
5420 if ((status = fPackMgr->Load(package.Data(), optls)) < 0) {
5423 SendAsynMessage(TString::Format(
"%s: failure loading %s, args: %p (%d) ...",
5424 noth.Data(), package.Data(), optls,
5425 (optls && optls->GetSize() > 0) ? optls->GetSize() : 0));
5430 if (optls && optls->GetSize() > 0) {
5432 status = fProof->LoadPackage(package, kFALSE, optls);
5435 status = fProof->LoadPackage(package);
5440 Info("HandleCache", "package %s successfully loaded", package.Data());
5443 if (slb) slb->Form("%d %s %d", type, package.Data(), status);
5446 case TProof::kShowEnabledPackages:
5448 { TString title(
"*** Enabled packages ***");
5449 if (!IsMaster() || all) {
5450 title.Form(
"*** Enabled packages on %s %s on %s",
5451 (IsMaster()) ?
"master" :
"worker",
5452 fOrdinal.Data(), gSystem->HostName());
5454 fPackMgr->ShowEnabled(title);
5456 if (IsMaster() && all)
5457 fProof->ShowEnabledPackages(all);
5459 if (slb) slb->Form(
"%d %d", type, all);
5461 case TProof::kShowSubCache:
5463 if (IsMaster() && all)
5464 fProof->ShowCache(all);
5466 if (slb) slb->Form(
"%d %d", type, all);
5468 case TProof::kClearSubCache:
5470 if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
5472 fProof->ClearCache(file);
5473 if (slb) slb->Form(
"%d %s", type, file.Data());
5475 case TProof::kShowSubPackages:
5477 if (IsMaster() && all)
5478 fProof->ShowPackages(all);
5480 if (slb) slb->Form(
"%d %d", type, all);
5482 case TProof::kDisableSubPackages:
5484 fProof->DisablePackages();
5485 if (slb) slb->Form(
"%d", type);
5487 case TProof::kDisableSubPackage:
5490 fProof->DisablePackage(package);
5491 if (slb) slb->Form(
"%d %s", type, package.Data());
5493 case TProof::kBuildSubPackage:
5495 if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
5497 fProof->BuildPackage(package, TProof::kBuildAll, chkveropt);
5498 if (slb) slb->Form(
"%d %s %d", type, package.Data(), chkveropt);
5500 case TProof::kUnloadPackage:
5502 status = fPackMgr->Unload(package);
5503 if (IsMaster() && status == 0)
5504 status = fProof->UnloadPackage(package);
5505 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5507 case TProof::kDisablePackage:
5509 fPackMgr->Remove(package);
5511 fProof->DisablePackage(package);
5512 if (slb) slb->Form(
"%d %s", type, package.Data());
5514 case TProof::kUnloadPackages:
5515 status = fPackMgr->Unload(0);
5516 if (IsMaster() && status == 0)
5517 status = fProof->UnloadPackages();
5518 if (slb) slb->Form(
"%d %s %d", type, package.Data(), status);
5520 case TProof::kDisablePackages:
5523 fProof->DisablePackages();
5524 if (slb) slb->Form(
"%d %s", type, package.Data());
5526 case TProof::kListEnabledPackages:
5527 msg.Reset(kPROOF_PACKAGE_LIST);
5528 { TList *epl = fPackMgr->GetListOfEnabled();
5534 if (slb) slb->Form(
"%d", type);
5536 case TProof::kListPackages:
5538 TList *pack = fPackMgr->GetList();
5539 msg.Reset(kPROOF_PACKAGE_LIST);
5540 msg << type << pack;
5542 pack->SetOwner(kTRUE);
5545 if (slb) slb->Form(
"%d", type);
5547 case TProof::kLoadMacro:
5554 fProof->Load(package, kFALSE, kTRUE);
5559 TString originalCwd = gSystem->WorkingDirectory();
5560 gSystem->ChangeDirectory(fCacheDir.Data());
5563 TString pack(package);
5565 if ((from = pack.Index(
",")) != kNPOS) pack.Remove(from);
5566 Info(
"HandleCache",
"loading macro %s ...", pack.Data());
5567 gROOT->ProcessLine(TString::Format(
".L %s", pack.Data()));
5570 gSystem->ChangeDirectory(originalCwd.Data());
5571 fCacheLock->Unlock();
5576 fProof->Load(package, kFALSE, kFALSE);
5581 if (slb) slb->Form(
"%d %s", type, package.Data());
5585 Error(
"HandleCache",
"unknown type %d", type);
5596 Int_t TProofServ::HandleWorkerLists(TMessage *mess)
5599 Info("HandleWorkerLists", "Enter");
5601 Int_t type = 0, rc = 0;
5607 case TProof::kActivateWorker:
5609 if (ord !=
"*" && !ord.BeginsWith(GetOrdinal()) && ord !=
"restore")
break;
5611 Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
5612 Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
5613 fProof->GetListOfBadSlaves()->GetSize();
5614 if (nact < nactmax || !IsEndMaster()) {
5615 Int_t nwc = fProof->ActivateWorker(ord);
5616 Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
5618 if (nactnew == nactmax) {
5619 PDB(kGlobal, 1) Info("HandleWorkerList", "all workers (re-)activated");
5622 PDB(kGlobal, 1) Info("HandleWorkerList", "%d workers could not be (re-)activated", nactmax - nactnew);
5624 } else if (ord == "restore") {
5626 PDB(kGlobal, 1) Info("HandleWorkerList","active worker(s) restored");
5628 Error(
"HandleWorkerList",
"some active worker(s) could not be restored; check logs");
5631 if (nactnew == (nact + nwc)) {
5633 PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s (re-)activated", ord.Data());
5635 if (nwc != -2 && IsEndMaster()) {
5636 Error(
"HandleWorkerList",
"some worker(s) could not be (re-)activated;"
5637 " # of actives: %d --> %d (nwc: %d)",
5638 nact, nactnew, nwc);
5640 rc = (nwc < 0) ? nwc : -1;
5644 PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already active");
5647 Warning(
"HandleWorkerList",
"undefined PROOF session: protocol error?");
5650 case TProof::kDeactivateWorker:
5652 if (ord !=
"*" && !ord.BeginsWith(GetOrdinal()) && ord !=
"restore")
break;
5654 Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
5656 Int_t nwc = fProof->DeactivateWorker(ord);
5657 Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
5660 PDB(kGlobal, 1) Info("HandleWorkerList","all workers deactivated");
5663 PDB(kGlobal, 1) Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
5666 if (nactnew == (nact - nwc)) {
5668 PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s deactivated", ord.Data());
5670 if (nwc != -2 && IsEndMaster()) {
5671 Error(
"HandleWorkerList",
"some worker(s) could not be deactivated:"
5672 " # of actives: %d --> %d (nwc: %d)",
5673 nact, nactnew, nwc);
5675 rc = (nwc < 0) ? nwc : -1;
5679 PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already inactive");
5682 Warning(
"HandleWorkerList",
"undefined PROOF session: protocol error?");
5686 Warning(
"HandleWorkerList",
"unknown action type (%d)", type);
5697 TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
5702 TProofResourcesStatic *resources =
5703 new TProofResourcesStatic(fConfDir, fConfFile);
5704 fConfFile = resources->GetFileName();
5706 Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
5709 TProofNodeInfo *master = resources->GetMaster();
5713 "no appropriate master line found in %s", fConfFile.Data());
5717 if (fImage.IsNull() && strlen(master->GetImage()) > 0)
5718 fImage = master->GetImage();
5723 if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
5725 resources->GetSubmasters()->Print();
5726 TProofNodeInfo *ni = 0;
5727 TIter nw(resources->GetSubmasters());
5728 while ((ni = (TProofNodeInfo *) nw()))
5729 workers->Add(new TProofNodeInfo(*ni));
5730 } else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
5732 resources->GetWorkers()->Print();
5733 TProofNodeInfo *ni = 0;
5734 TIter nw(resources->GetWorkers());
5735 while ((ni = (TProofNodeInfo *) nw()))
5736 workers->Add(new TProofNodeInfo(*ni));
5749 FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
5751 FILE *oldferr = fgErrorHandlerFile;
5752 fgErrorHandlerFile = (ferr) ? ferr : stderr;
5760 void TProofServ::ErrorHandler(Int_t level, Bool_t abort,
const char *location,
5763 if (gErrorIgnoreLevel == kUnset) {
5764 gErrorIgnoreLevel = 0;
5766 TString lvl = gEnv->GetValue(
"Root.ErrorIgnoreLevel",
"Print");
5767 if (!lvl.CompareTo(
"Print", TString::kIgnoreCase))
5768 gErrorIgnoreLevel = kPrint;
5769 else if (!lvl.CompareTo(
"Info", TString::kIgnoreCase))
5770 gErrorIgnoreLevel = kInfo;
5771 else if (!lvl.CompareTo(
"Warning", TString::kIgnoreCase))
5772 gErrorIgnoreLevel = kWarning;
5773 else if (!lvl.CompareTo(
"Error", TString::kIgnoreCase))
5774 gErrorIgnoreLevel = kError;
5775 else if (!lvl.CompareTo(
"Break", TString::kIgnoreCase))
5776 gErrorIgnoreLevel = kBreak;
5777 else if (!lvl.CompareTo(
"SysError", TString::kIgnoreCase))
5778 gErrorIgnoreLevel = kSysError;
5779 else if (!lvl.CompareTo(
"Fatal", TString::kIgnoreCase))
5780 gErrorIgnoreLevel = kFatal;
5784 if (level < gErrorIgnoreLevel)
5788 if (level >= kError && gProofServ)
5789 gProofServ->LogToMaster();
5791 Bool_t tosyslog = (fgLogToSysLog > 2) ? kTRUE : kFALSE;
5793 const char *type = 0;
5794 ELogLevel loglevel = kLogInfo;
5796 Int_t ipos = (location) ? strlen(location) : 0;
5798 if (level >= kPrint) {
5799 loglevel = kLogInfo;
5802 if (level >= kInfo) {
5803 loglevel = kLogInfo;
5804 char *ps = location ? (
char *) strrchr(location,
'|') : (char *)0;
5806 ipos = (int)(ps - (
char *)location);
5812 if (level >= kWarning) {
5813 loglevel = kLogWarning;
5816 if (level >= kError) {
5820 if (level >= kBreak) {
5822 type =
"*** Break ***";
5824 if (level >= kSysError) {
5828 if (level >= kFatal) {
5838 TString st(ts.AsString(
"lc"),19);
5840 if (!location || ipos == 0 ||
5841 (level >= kPrint && level < kInfo) ||
5842 (level >= kBreak && level < kSysError)) {
5843 fprintf(fgErrorHandlerFile,
"%s %5d %s | %s: %s\n", st(11,8).Data(),
5845 (gProofServ ? gProofServ->GetPrefix() :
"proof"),
5848 buf.Form(
"%s: %s:%s", fgSysLogEntity.Data(), type, msg);
5850 fprintf(fgErrorHandlerFile,
"%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
5852 (gProofServ ? gProofServ->GetPrefix() :
"proof"),
5853 type, ipos, location, msg);
5855 buf.Form(
"%s: %s:<%.*s>: %s", fgSysLogEntity.Data(), type, ipos, location, msg);
5857 fflush(fgErrorHandlerFile);
5860 gSystem->Syslog(loglevel, buf);
5863 if (__crashreporter_info__)
5864 delete [] __crashreporter_info__;
5865 __crashreporter_info__ = StrDup(buf);
5870 static Bool_t recursive = kFALSE;
5872 if (gProofServ != 0 && !recursive) {
5874 if (gProofServ->GetSocket()) gProofServ->GetSocket()->Send(kPROOF_FATAL);
5878 fprintf(fgErrorHandlerFile,
"aborting\n");
5879 fflush(fgErrorHandlerFile);
5880 gSystem->StackTrace();
5888 void TProofServ::MakePlayer()
5890 TVirtualProofPlayer *p = 0;
5897 p = fProof->MakePlayer();
5900 p = TVirtualProofPlayer::Create(
"slave", 0, fSocket);
5902 fProof->SetPlayer(p);
5912 void TProofServ::DeletePlayer()
5917 Printf(
" +++ Latest processing times: %f s (CPU: %f s)",
5918 fCompute.RealTime(), fCompute.CpuTime());
5920 if (fProof) fProof->SetPlayer(0);
5922 SafeDelete(fPlayer);
5944 Int_t TProofServ::GetPriority()
5946 TString sqlserv = gEnv->GetValue(
"ProofServ.QueryLogDB",
"");
5947 TString sqluser = gEnv->GetValue(
"ProofServ.QueryLogUser",
"");
5948 TString sqlpass = gEnv->GetValue(
"ProofServ.QueryLogPasswd",
"");
5950 Int_t priority = 100;
5956 sql.Form(
"SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
5959 TSQLServer *db = TSQLServer::Connect(sqlserv, sqluser, sqlpass);
5961 if (!db || db->IsZombie()) {
5962 Error(
"GetPriority",
"failed to connect to SQL server %s as %s %s",
5963 sqlserv.Data(), sqluser.Data(), sqlpass.Data());
5964 printf(
"%s\n", sql.Data());
5966 TSQLResult *res = db->Query(sql);
5969 Error(
"GetPriority",
"query into proofpriority failed");
5970 Printf(
"%s", sql.Data());
5972 TSQLRow *row = res->Next();
5974 priority = atoi(row->GetField(0));
5977 Error(
"GetPriority",
"first row is header is NULL");
5994 void TProofServ::SendAsynMessage(
const char *msg, Bool_t lf)
5996 static TMessage m(kPROOF_MESSAGE);
6001 Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
6003 if (fSocket && msg) {
6004 m.Reset(kPROOF_MESSAGE);
6005 m << TString(msg) << lf;
6006 if (fSocket->Send(m) <= 0)
6007 Warning(
"SendAsynMessage",
"could not send message '%s'", msg);
6018 void TProofServ::FlushLogFile()
6020 off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
6021 if (lend >= 0) lseek(fLogFileDes, lend, SEEK_SET);
6028 void TProofServ::TruncateLogFile()
6032 if (fLogFileMaxSize > 0 && fLogFileDes > 0) {
6035 if (fstat(fLogFileDes, &st) == 0) {
6036 if (st.st_size >= fLogFileMaxSize) {
6037 off_t truncsz = (off_t) (( fLogFileMaxSize * 80 ) / 100 );
6038 if (truncsz < 100) {
6039 emsg.Form(
"+++ WARNING +++: %s: requested truncate size too small"
6040 " (%lld,%lld) - ignore ", fPrefix.Data(), (Long64_t) truncsz, fLogFileMaxSize);
6041 SendAsynMessage(emsg.Data());
6044 TSystem::ResetErrno();
6045 while (ftruncate(fileno(stdout), truncsz) != 0 &&
6046 (TSystem::GetErrno() == EINTR)) {
6047 TSystem::ResetErrno();
6049 if (TSystem::GetErrno() > 0) {
6050 Error(
"TruncateLogFile",
"truncating to %lld bytes; file size is %lld bytes (errno: %d)",
6051 (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
6052 emsg.Form(
"+++ WARNING +++: %s: problems truncating log file to %lld bytes; file size is %lld bytes"
6053 " (errno: %d)", fPrefix.Data(), (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
6054 SendAsynMessage(emsg.Data());
6056 Info(
"TruncateLogFile",
"file truncated to %lld bytes (80%% of %lld); file size was %lld bytes ",
6057 (Long64_t)truncsz, fLogFileMaxSize, (Long64_t)st.st_size);
6058 emsg.Form(
"+++ WARNING +++: %s: log file truncated to %lld bytes (80%% of %lld)",
6059 fPrefix.Data(), (Long64_t)truncsz, fLogFileMaxSize);
6060 SendAsynMessage(emsg.Data());
6064 emsg.Form(
"+++ WARNING +++: %s: could not stat log file descriptor"
6065 " for truncation (errno: %d)", fPrefix.Data(), TSystem::GetErrno());
6066 SendAsynMessage(emsg.Data());
6075 void TProofServ::HandleException(Int_t sig)
6077 Error(
"HandleException",
"caugth exception triggered by signal '%d' %s %lld",
6078 sig, fgLastMsg.Data(), fgLastEntry);
6081 emsg.Form(
"%s: caught exception triggered by signal '%d' %s %lld",
6082 GetOrdinal(), sig, fgLastMsg.Data(), fgLastEntry);
6084 SendAsynMessage(emsg.Data());
6092 Int_t TProofServ::HandleDataSets(TMessage *mess, TString *slb)
6095 Info(
"HandleDataSets",
"enter");
6098 if (!fDataSetManager) {
6099 Warning(
"HandleDataSets",
"no data manager is available to fullfil the request");
6104 TString dsUser, dsGroup, dsName, dsTree, uri, opt;
6108 TPMERegexp reInvalid(
"[^A-Za-z0-9._-]");
6115 case TProof::kCheckDataSetName:
6120 if (slb) slb->Form(
"%d %s", type, uri.Data());
6121 if (fDataSetManager->ExistsDataSet(uri))
6126 case TProof::kRegisterDataSet:
6129 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
6132 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6134 TFileCollection *dataSet =
6135 dynamic_cast<TFileCollection*
> ((mess->ReadObject(TFileCollection::Class())));
6136 if (!dataSet || dataSet->GetList()->GetSize() == 0) {
6137 Error(
"HandleDataSets",
"can not save an empty list.");
6141 rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
6145 Info(
"HandleDataSets",
"dataset registration not allowed");
6146 if (slb) slb->Form(
"%d notallowed", type);
6152 case TProof::kRequestStaging:
6156 if (!fDataSetStgRepo) {
6157 Error(
"HandleDataSets",
6158 "no dataset staging request repository available");
6163 TString validUri = uri;
6164 while (reInvalid.Substitute(validUri,
"_")) {}
6168 if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
6169 Warning(
"HandleDataSets",
"staging of %s already requested",
6175 TFileCollection *fc = fDataSetManager->GetDataSet(uri.Data());
6176 if (!fc || (fc->GetNFiles() == 0)) {
6177 Error(
"HandleDataSets",
"empty dataset or no dataset returned");
6183 TIter it(fc->GetList());
6185 while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
6186 fi->ResetBit(TFileInfo::kStaged);
6187 Int_t nToErase = fi->GetNUrls() - 1;
6188 for (Int_t i=0; i<nToErase; i++)
6195 fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
6196 if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser,
6199 Error(
"HandleDataSets",
6200 "can't register staging request for %s", uri.Data());
6205 Info(
"HandleDataSets",
6206 "Staging request registered for %s", uri.Data());
6213 case TProof::kStagingStatus:
6215 if (!fDataSetStgRepo) {
6216 Error(
"HandleDataSets",
6217 "no dataset staging request repository available");
6224 while (reInvalid.Substitute(uri,
"_")) {}
6227 TFileCollection *fc = fDataSetStgRepo->GetDataSet(uri.Data());
6229 fSocket->SendObject(fc, kMESS_OK);
6235 Info(
"HandleDataSets",
"no pending staging request for %s",
6242 case TProof::kCancelStaging:
6244 if (!fDataSetStgRepo) {
6245 Error(
"HandleDataSets",
6246 "no dataset staging request repository available");
6253 while (reInvalid.Substitute(uri,
"_")) {}
6255 if (!fDataSetStgRepo->RemoveDataSet(uri.Data()))
6262 case TProof::kShowDataSets:
6264 (*mess) >> uri >> opt;
6265 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6267 fDataSetManager->ShowDataSets(uri, opt);
6271 case TProof::kGetDataSets:
6273 (*mess) >> uri >> opt;
6274 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6276 UInt_t omsk = (UInt_t)TDataSetManager::kExport;
6277 Ssiz_t kLite = opt.Index(
":lite:", 0, TString::kIgnoreCase);
6278 if (kLite != kNPOS) {
6279 omsk |= (UInt_t)TDataSetManager::kReadShort;
6280 opt.Remove(kLite, strlen(
":lite:"));
6282 TMap *returnMap = fDataSetManager->GetDataSets(uri, omsk);
6284 if (returnMap && !opt.IsNull()) {
6286 TMap *rmap =
new TMap;
6288 TFileCollection *fc = 0, *xfc = 0;
6289 TIter nxd(returnMap);
6290 while ((k = nxd()) && (fc = (TFileCollection *) returnMap->GetValue(k))) {
6292 if ((xfc = fc->GetFilesOnServer(opt.Data()))) {
6293 rmap->Add(
new TObjString(k->GetName()), xfc);
6296 returnMap->DeleteAll();
6297 if (rmap->GetSize() > 0) {
6300 Info(
"HandleDataSets",
"no dataset found on server '%s'", opt.Data());
6307 fSocket->SendObject(returnMap, kMESS_OK);
6308 returnMap->DeleteAll();
6315 case TProof::kGetDataSet:
6317 (*mess) >> uri >> opt;
6318 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6320 TFileCollection *fileList = fDataSetManager->GetDataSet(uri,opt);
6322 fSocket->SendObject(fileList, kMESS_OK);
6330 case TProof::kRemoveDataSet:
6332 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
6334 if (slb) slb->Form(
"%d %s", type, uri.Data());
6335 if (!fDataSetManager->RemoveDataSet(uri)) {
6340 Info(
"HandleDataSets",
"dataset creation / removal not allowed");
6341 if (slb) slb->Form(
"%d notallowed", type);
6346 case TProof::kVerifyDataSet:
6348 if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
6349 (*mess) >> uri >> opt;
6350 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6351 TProofServLogHandlerGuard hg(fLogFile, fSocket);
6352 rc = fDataSetManager->ScanDataSet(uri, opt);
6360 Info(
"HandleDataSets",
"dataset verification not allowed");
6365 case TProof::kGetQuota:
6367 if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
6368 if (slb) slb->Form(
"%d", type);
6369 TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
6370 if (groupQuotaMap) {
6372 fSocket->SendObject(groupQuotaMap, kMESS_OK);
6377 Info(
"HandleDataSets",
"quota control disabled");
6378 if (slb) slb->Form(
"%d disabled", type);
6383 case TProof::kShowQuota:
6385 if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
6386 if (slb) slb->Form(
"%d", type);
6389 fDataSetManager->ShowQuota(opt);
6391 Info(
"HandleDataSets",
"quota control disabled");
6392 if (slb) slb->Form(
"%d disabled", type);
6396 case TProof::kSetDefaultTreeName:
6398 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
6400 if (slb) slb->Form(
"%d %s", type, uri.Data());
6401 rc = fDataSetManager->ScanDataSet(uri, (UInt_t)TDataSetManager::kSetDefaultTree);
6403 Info(
"HandleDataSets",
"kSetDefaultTreeName: modification of dataset info not allowed");
6404 if (slb) slb->Form(
"%d notallowed", type);
6409 case TProof::kCache:
6411 (*mess) >> uri >> opt;
6412 if (slb) slb->Form(
"%d %s %s", type, uri.Data(), opt.Data());
6413 if (opt ==
"show") {
6415 fDataSetManager->ShowCache(uri);
6416 }
else if (opt ==
"clear") {
6418 fDataSetManager->ClearCache(uri);
6420 Error(
"HandleDataSets",
"kCache: unknown action: %s", opt.Data());
6426 Error(
"HandleDataSets",
"unknown type %d", type);
6437 void TProofServ::HandleSubmerger(TMessage *mess)
6445 case TProof::kOutputSize:
6448 case TProof::kSendOutput:
6450 Bool_t deleteplayer = kTRUE;
6452 if (fMergingMonitor) {
6453 Info(
"HandleSubmerger",
"kSendOutput: interrupting ...");
6454 fMergingMonitor->Interrupt();
6456 if (fMergingSocket) {
6457 if (fMergingMonitor) fMergingMonitor->Remove(fMergingSocket);
6458 fMergingSocket->Close();
6459 SafeDelete(fMergingSocket);
6464 Int_t merger_id = -1;
6465 (*mess) >> merger_id >> name >> port;
6467 Info("HandleSubmerger","worker %s redirected to merger
#%d %s:%d", fOrdinal.Data(), merger_id, name.Data(), port);
6470 if (name.Length() > 0 && port > 0 && (t =
new TSocket(name, port)) && t->IsValid()) {
6472 PDB(kSubmerger, 2) Info("HandleSubmerger",
6473 "kSendOutput: worker asked for sending output to merger
#%d %s:%d",
6474 merger_id, name.Data(), port);
6476 if (SendResults(t, fPlayer->GetOutputList()) != 0) {
6477 msg.Form(
"worker %s cannot send results to merger #%d at %s:%d", GetPrefix(), merger_id, name.Data(), port);
6478 PDB(kSubmerger, 2) Info("HandleSubmerger",
6479 "kSendOutput: %s - inform the master", msg.Data());
6480 SendAsynMessage(msg);
6482 TMessage answ(kPROOF_SUBMERGER);
6483 answ << Int_t(TProof::kMergerDown);
6485 fSocket->Send(answ);
6488 TMessage answ(kPROOF_SUBMERGER);
6489 answ << Int_t(TProof::kOutputSent);
6491 fSocket->Send(answ);
6493 PDB(kSubmerger, 2) Info("HandleSubmerger", "kSendOutput: worker sent its output");
6494 fSocket->Send(kPROOF_SETIDLE);
6500 if (name ==
"master") {
6501 PDB(kSubmerger, 2) Info("HandleSubmerger",
6502 "kSendOutput: worker was asked for sending output to master");
6503 if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
6504 Warning("HandleSubmerger", "problems sending output list");
6506 fSocket->Send(kPROOF_SETIDLE);
6510 } else if (!t || !(t->IsValid())) {
6511 msg.Form(
"worker %s could not open a valid socket to merger #%d at %s:%d",
6512 GetPrefix(), merger_id, name.Data(), port);
6513 PDB(kSubmerger, 2) Info("HandleSubmerger",
6514 "kSendOutput: %s - inform the master", msg.Data());
6515 SendAsynMessage(msg);
6517 TMessage answ(kPROOF_SUBMERGER);
6518 answ << Int_t(TProof::kMergerDown);
6520 fSocket->Send(answ);
6521 deleteplayer = kFALSE;
6529 Error(
"HandleSubmerger",
"kSendOutput: received not on worker");
6533 if (deleteplayer) DeletePlayer();
6536 case TProof::kBeMerger:
6538 Bool_t deleteplayer = kTRUE;
6540 Int_t merger_id = -1;
6542 Int_t connections = 0;
6543 (*mess) >> merger_id >> connections;
6545 Info("HandleSubmerger", "worker %s established as merger", fOrdinal.Data());
6548 Info("HandleSubmerger",
6549 "kBeMerger: worker asked for being merger
#%d for %d connections",
6550 merger_id, connections);
6552 TVirtualProofPlayer *mergerPlayer = TVirtualProofPlayer::Create(
"remote",fProof,0);
6555 PDB(kSubmerger, 2) Info("HandleSubmerger",
6556 "kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
6559 mergerPlayer->SetBit(TVirtualProofPlayer::kIsSubmerger);
6562 if (AcceptResults(connections, mergerPlayer)) {
6564 Info("HandleSubmerger", "kBeMerger: all outputs from workers accepted");
6567 Info("","adding own output to the list on %s", fOrdinal.Data());
6574 TIter nxo(fPlayer->GetOutputList());
6576 while ((o = nxo())) {
6577 if ((mergerPlayer->AddOutputObject(o) != 1)) {
6580 if (fPlayer->GetOutputList()) {
6582 Info("HandleSocketInput", "removing merged
object (%p)", o);
6583 fPlayer->GetOutputList()->Remove(o);
6587 PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: own outputs added");
6588 PDB(kSubmerger, 2) Info("HandleSubmerger","starting delayed merging on %s", fOrdinal.Data());
6591 mergerPlayer->MergeOutput(kTRUE);
6593 PDB(kSubmerger, 2) mergerPlayer->GetOutputList()->Print("all");
6595 PDB(kSubmerger, 2) Info("HandleSubmerger", "delayed merging on %s finished ", fOrdinal.Data());
6596 PDB(kSubmerger, 2) Info("HandleSubmerger", "%s sending results to master ", fOrdinal.Data());
6598 if (SendResults(fSocket, mergerPlayer->GetOutputList()) != 0)
6599 Warning("HandleSubmerger","kBeMerger: problems sending output list");
6600 if (mergerPlayer->GetOutputList())
6601 mergerPlayer->GetOutputList()->SetOwner(kTRUE);
6603 PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: results sent to master");
6605 fSocket->Send(kPROOF_SETIDLE);
6610 TMessage answ(kPROOF_SUBMERGER);
6611 answ << Int_t(TProof::kMergerDown);
6613 fSocket->Send(answ);
6614 deleteplayer = kFALSE;
6617 SafeDelete(mergerPlayer);
6620 Warning(
"HandleSubmerger",
"kBeMerger: problems craeting the merger player!");
6622 TMessage answ(kPROOF_SUBMERGER);
6623 answ << Int_t(TProof::kMergerDown);
6625 fSocket->Send(answ);
6626 deleteplayer = kFALSE;
6629 Error(
"HandleSubmerger",
"kSendOutput: received not on worker");
6633 if (deleteplayer) DeletePlayer();
6637 case TProof::kMergerDown:
6640 case TProof::kStopMerging:
6643 PDB(kSubmerger, 2) Info("HandleSubmerger", "kStopMerging");
6644 if (fMergingMonitor) {
6645 Info(
"HandleSubmerger",
"kStopMerging: interrupting ...");
6646 fMergingMonitor->Interrupt();
6651 case TProof::kOutputSent:
6659 void TProofServ::HandleFork(TMessage *)
6661 Info(
"HandleFork",
"fork cloning not implemented");
6670 Int_t TProofServ::Fork()
6675 if ((pid = fork()) < 0) {
6676 Error(
"Fork",
"failed to fork");
6681 if (!pid)
return pid;
6684 if (!fReaperTimer) {
6685 fReaperTimer =
new TReaperTimer(1000);
6686 fReaperTimer->Start(-1);
6690 fReaperTimer->AddPid(pid);
6695 Warning(
"Fork",
"Functionality not provided under windows");
6707 void TProofServ::ResolveKeywords(TString &fname,
const char *path)
6710 if (fname.Contains(
"<user>")) {
6711 if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
6712 fname.ReplaceAll(
"<user>", gProofServ->GetUser());
6713 }
else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
6714 fname.ReplaceAll(
"<user>", gProof->GetUser());
6716 fname.ReplaceAll(
"<user>",
"nouser");
6720 if (fname.Contains(
"<u>")) {
6721 if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
6722 TString u(gProofServ->GetUser()[0]);
6723 fname.ReplaceAll(
"<u>", u);
6724 }
else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
6725 TString u(gProof->GetUser()[0]);
6726 fname.ReplaceAll(
"<u>", u);
6728 fname.ReplaceAll(
"<u>",
"n");
6732 if (fname.Contains(
"<group>")) {
6733 if (gProofServ && gProofServ->GetGroup() && strlen(gProofServ->GetGroup())) {
6734 fname.ReplaceAll(
"<group>", gProofServ->GetGroup());
6735 }
else if (gProof && gProof->GetGroup() && strlen(gProof->GetGroup())) {
6736 fname.ReplaceAll(
"<group>", gProof->GetGroup());
6738 fname.ReplaceAll(
"<group>",
"default");
6742 if (fname.Contains(
"<stag>")) {
6743 if (gProofServ && gProofServ->GetSessionTag() && strlen(gProofServ->GetSessionTag())) {
6744 fname.ReplaceAll(
"<stag>", gProofServ->GetSessionTag());
6745 }
else if (gProof && gProof->GetSessionTag() && strlen(gProof->GetSessionTag())) {
6746 fname.ReplaceAll(
"<stag>", gProof->GetSessionTag());
6748 ::Warning(
"TProofServ::ResolveKeywords",
"session tag undefined: ignoring");
6752 if (fname.Contains(
"<ord>")) {
6753 if (gProofServ && gProofServ->GetOrdinal() && strlen(gProofServ->GetOrdinal()))
6754 fname.ReplaceAll(
"<ord>", gProofServ->GetOrdinal());
6756 ::Warning(
"TProofServ::ResolveKeywords",
"ordinal number undefined: ignoring");
6759 if (fname.Contains(
"<qnum>")) {
6760 if (gProofServ && gProofServ->GetQuerySeqNum() && gProofServ->GetQuerySeqNum() > 0)
6761 fname.ReplaceAll(
"<qnum>", TString::Format(
"%d", gProofServ->GetQuerySeqNum()).Data());
6763 ::Warning(
"TProofServ::ResolveKeywords",
"query seqeuntial number undefined: ignoring");
6766 if (fname.Contains(
"<file>") && path && strlen(path) > 0) {
6767 fname.ReplaceAll(
"<file>", path);
6770 if (fname.Contains(
"<rver>")) {
6771 TString v = TString::Format(
"%d", gROOT->GetVersionInt());
6772 fname.ReplaceAll(
"<rver>", v);
6775 if (fname.Contains(
"<build>")) {
6776 TString b = TString::Format(
"%d_%s_%s", gROOT->GetVersionInt(), gSystem->GetBuildArch(),
6777 gSystem->GetBuildCompilerVersion());
6778 fname.ReplaceAll(
"<build>", b);
6791 Int_t TProofServ::GetSessionStatus()
6793 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6794 Int_t st = (fIdle) ? 0 : 1;
6795 if (fIdle && fWaitingQueries->GetSize() > 0) st = 3;
6804 Int_t TProofServ::UpdateSessionStatus(Int_t xst)
6806 FILE *fs = fopen(fAdminPath.Data(),
"w");
6808 Int_t st = (xst < 0) ? GetSessionStatus() : xst;
6809 fprintf(fs,
"%d", st);
6812 Info("UpdateSessionStatus", "status (=%d) update in path: %s", st, fAdminPath.Data());
6823 Bool_t TProofServ::IsIdle()
6825 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6832 void TProofServ::SetIdle(Bool_t st)
6834 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6841 Bool_t TProofServ::IsWaiting()
6843 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6844 if (fIdle && fWaitingQueries->GetSize() > 0)
return kTRUE;
6851 Int_t TProofServ::WaitingQueries()
6853 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6854 return fWaitingQueries->GetSize();
6861 Int_t TProofServ::QueueQuery(TProofQueryResult *pq)
6863 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6864 fWaitingQueries->Add(pq);
6865 return fWaitingQueries->GetSize();
6872 TProofQueryResult *TProofServ::NextQuery()
6874 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6875 TProofQueryResult *pq = (TProofQueryResult *) fWaitingQueries->First();
6876 fWaitingQueries->Remove(pq);
6885 Int_t TProofServ::CleanupWaitingQueries(Bool_t del, TList *qls)
6887 std::lock_guard<std::recursive_mutex> lock(fQMtx);
6892 while ((o = nxq())) {
6893 if (fWaitingQueries->FindObject(o)) ncq++;
6894 fWaitingQueries->Remove(o);
6898 ncq = fWaitingQueries->GetSize();
6899 fWaitingQueries->SetOwner(del);
6900 fWaitingQueries->Delete();
6909 void TProofServ::SetLastMsg(
const char *lastmsg)
6911 fgLastMsg = lastmsg;
6917 void TProofServ::SetLastEntry(Long64_t entry)
6919 fgLastEntry = entry;
6925 Long_t TProofServ::GetVirtMemMax()
6927 return fgVirtMemMax;
6932 Long_t TProofServ::GetResMemMax()
6939 Float_t TProofServ::GetMemHWM()
6946 Float_t TProofServ::GetMemStop()
6954 void TProofServ::GetLocalServer(TString &dsrv)
6957 if (gSystem->Getenv(
"LOCALDATASERVER")) {
6958 dsrv = gSystem->Getenv(
"LOCALDATASERVER");
6959 if (!dsrv.EndsWith(
"/")) dsrv +=
"/";
6971 void TProofServ::FilterLocalroot(TString &path,
const char *dsrv)
6973 TUrl u(path, kTRUE);
6974 if (!strcmp(u.GetProtocol(),
"file")) {
6976 TString pfx = gEnv->GetValue(
"Path.Localroot",
"");
6977 if (!pfx.IsNull() && !strncmp(u.GetFile(), pfx.Data(), pfx.Length())) {
6978 TString srvp = TUrl(dsrv).GetProtocol();
6979 if (srvp ==
"root" || srvp ==
"xrd") path.Remove(0, pfx.Length());
6991 Int_t TProofLockPath::Lock()
6993 const char *pname = GetName();
6995 if (gSystem->AccessPathName(pname))
6996 fLockId = open(pname, O_CREAT|O_RDWR, 0644);
6998 fLockId = open(pname, O_RDWR);
7000 if (fLockId == -1) {
7001 SysError(
"Lock",
"cannot open lock file %s", pname);
7006 Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
7008 #if !defined(R__WIN32) && !defined(R__WINGCC)
7009 if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
7010 SysError(
"Lock",
"error locking %s", pname);
7018 Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
7027 Int_t TProofLockPath::Unlock()
7033 Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
7035 lseek(fLockId, 0, SEEK_SET);
7036 #if !defined(R__WIN32) && !defined(R__WINGCC)
7037 if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
7038 SysError(
"Unlock",
"error unlocking %s", GetName());
7046 Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());