43 # include <sys/stat.h>
44 # include <sys/types.h>
45 # include "snprintf.h"
51 #include "RConfigure.h"
105 char TProofMergePrg::fgCr[4] = {
'-',
'\\',
'|',
'/'};
107 TList *TProof::fgProofEnvList = 0;
108 TPluginHandler *TProof::fgLogViewer = 0;
116 Bool_t TProofInterruptHandler::Notify()
118 if (!fProof->IsTty() || fProof->GetRemoteProtocol() < 22) {
121 fProof->StopProcess(kTRUE);
126 if (fProof->GetRemoteProtocol() < 22) {
127 a = Getline(
"\nSwitch to asynchronous mode not supported remotely:"
128 "\nEnter S/s to stop, Q/q to quit, any other key to continue: ");
130 a = Getline(
"\nEnter A/a to switch asynchronous, S/s to stop, Q/q to quit,"
131 " any other key to continue: ");
133 if (a[0] ==
'Q' || a[0] ==
'S' || a[0] ==
'q' || a[0] ==
's') {
135 Info(
"Notify",
"Processing interrupt signal ... %c", a[0]);
138 Bool_t abort = (a[0] ==
'Q' || a[0] ==
'q') ? kTRUE : kFALSE;
139 fProof->StopProcess(abort);
141 }
else if ((a[0] ==
'A' || a[0] ==
'a') && fProof->GetRemoteProtocol() >= 22) {
143 fProof->GoAsynchronous();
154 TProofInputHandler::TProofInputHandler(TProof *p, TSocket *s)
155 : TFileHandler(s->GetDescriptor(),1),
156 fSocket(s), fProof(p)
163 Bool_t TProofInputHandler::Notify()
165 fProof->CollectInputFrom(fSocket);
172 ClassImp(TSlaveInfo);
177 Int_t TSlaveInfo::Compare(
const TObject *obj)
const
181 const TSlaveInfo *si =
dynamic_cast<const TSlaveInfo*
>(obj);
183 if (!si)
return fOrdinal.CompareTo(obj->GetName());
185 const char *myord = GetOrdinal();
186 const char *otherord = si->GetOrdinal();
187 while (myord && otherord) {
188 Int_t myval = atoi(myord);
189 Int_t otherval = atoi(otherord);
190 if (myval < otherval)
return 1;
191 if (myval > otherval)
return -1;
192 myord = strchr(myord,
'.');
194 otherord = strchr(otherord,
'.');
195 if (otherord) otherord++;
197 if (myord)
return -1;
198 if (otherord)
return 1;
205 Bool_t TSlaveInfo::IsEqual(
const TObject* obj)
const
207 if (!obj)
return kFALSE;
208 const TSlaveInfo *si =
dynamic_cast<const TSlaveInfo*
>(obj);
209 if (!si)
return kFALSE;
210 return (strcmp(GetOrdinal(), si->GetOrdinal()) == 0);
219 void TSlaveInfo::Print(Option_t *opt)
const
221 TString stat = fStatus == kActive ?
"active" :
222 fStatus == kBad ?
"bad" :
225 Bool_t newfmt = kFALSE;
227 if (oo.Contains(
"N")) {
229 oo.ReplaceAll(
"N",
"");
231 if (oo ==
"active" && fStatus != kActive)
return;
232 if (oo ==
"notactive" && fStatus != kNotActive)
return;
233 if (oo ==
"bad" && fStatus != kBad)
return;
236 TString msd, si, datadir;
237 if (!(fMsd.IsNull())) msd.Form(
"| msd: %s ", fMsd.Data());
238 if (!(fDataDir.IsNull())) datadir.Form(
"| datadir: %s ", fDataDir.Data());
239 if (fSysInfo.fCpus > 0) {
240 si.Form(
"| %s, %d cores, %d MB ram", fHostName.Data(),
241 fSysInfo.fCpus, fSysInfo.fPhysRam);
243 si.Form(
"| %s", fHostName.Data());
245 Printf(
"Worker: %9s %s %s%s| %s", fOrdinal.Data(), si.Data(), msd.Data(), datadir.Data(), stat.Data());
248 TString msd = fMsd.IsNull() ?
"<null>" : fMsd.Data();
250 std::cout <<
"Slave: " << fOrdinal
251 <<
" hostname: " << fHostName
253 <<
" perf index: " << fPerfIndex
262 void TSlaveInfo::SetSysInfo(SysInfo_t si)
264 fSysInfo.fOS = si.fOS;
265 fSysInfo.fModel = si.fModel;
266 fSysInfo.fCpuType = si.fCpuType;
267 fSysInfo.fCpus = si.fCpus;
268 fSysInfo.fCpuSpeed = si.fCpuSpeed;
269 fSysInfo.fBusSpeed = si.fBusSpeed;
270 fSysInfo.fL2Cache = si.fL2Cache;
271 fSysInfo.fPhysRam = si.fPhysRam;
281 TMergerInfo::~TMergerInfo()
285 fWorkers->SetOwner(kFALSE);
286 SafeDelete(fWorkers);
292 void TMergerInfo::SetMergedWorker()
294 if (AreAllWorkersMerged())
295 Error(
"SetMergedWorker",
"all workers have been already merged before!");
303 void TMergerInfo::AddWorker(TSlave *sl)
306 fWorkers =
new TList();
307 if (fWorkersToMerge == fWorkers->GetSize()) {
308 Error(
"AddWorker",
"all workers have been already assigned to this merger");
317 Bool_t TMergerInfo::AreAllWorkersMerged()
319 return (fWorkersToMerge == fMergedWorkers);
325 Bool_t TMergerInfo::AreAllWorkersAssigned()
330 return (fWorkers->GetSize() == fWorkersToMerge);
341 static Int_t PoDCheckUrl(TString *_cluster)
347 *_cluster = _cluster->Strip( TString::kBoth );
349 const TString pod_prot(
"pod");
353 TUrl url( _cluster->Data() );
354 if( pod_prot.CompareTo(url.GetProtocol(), TString::kIgnoreCase) )
361 *_cluster = gSystem->GetFromPipe(
"pod-info -c -b");
362 if( 0 == _cluster->Length() ) {
363 Error(
"PoDCheckUrl",
"PoD server is not running");
381 TProof::TProof(
const char *masterurl,
const char *conffile,
const char *confdir,
382 Int_t loglevel,
const char *alias, TProofMgr *mgr)
392 fServType = TProofMgr::kXProofd;
399 ResetBit(TProof::kIsClient);
400 ResetBit(TProof::kIsMaster);
403 if (!masterurl || strlen(masterurl) <= 0) {
404 fUrl.SetProtocol(
"proof");
405 fUrl.SetHost(
"__master__");
406 }
else if (!(strstr(masterurl,
"://"))) {
407 fUrl.SetProtocol(
"proof");
410 if (fUrl.GetPort() == TUrl(
" ").GetPort())
411 fUrl.SetPort(TUrl(
"proof:// ").GetPort());
414 if (!strcmp(fUrl.GetHost(),
"__master__"))
415 fMaster = fUrl.GetHost();
416 else if (!strlen(fUrl.GetHost()))
417 fMaster = gSystem->GetHostByName(gSystem->HostName()).GetHostName();
419 fMaster = gSystem->GetHostByName(fUrl.GetHost()).GetHostName();
422 if (strlen(fUrl.GetOptions()) > 0) {
423 TString opts(fUrl.GetOptions());
424 if (!(strncmp(fUrl.GetOptions(),
"std",3))) {
425 fServType = TProofMgr::kProofd;
427 fUrl.SetOptions(opts.Data());
428 }
else if (!(strncmp(fUrl.GetOptions(),
"lite",4))) {
429 fServType = TProofMgr::kProofLite;
431 fUrl.SetOptions(opts.Data());
436 fMasterServ = kFALSE;
437 SetBit(TProof::kIsClient);
438 ResetBit(TProof::kIsMaster);
439 if (fMaster ==
"__master__") {
441 ResetBit(TProof::kIsClient);
442 SetBit(TProof::kIsMaster);
443 }
else if (fMaster ==
"prooflite") {
446 SetBit(TProof::kIsMaster);
449 if (TestBit(TProof::kIsClient))
450 if (!gSystem->Getenv(
"ROOTPROOFCLIENT")) gSystem->Setenv(
"ROOTPROOFCLIENT",
"");
452 Init(masterurl, conffile, confdir, loglevel, alias);
455 if (strlen(fUrl.GetUser()) <= 0) {
457 if (Exec(
"gProofServ->GetUser()",
"0", kTRUE) == 0) {
458 TObjString *os = fMacroLog.GetLineWith(
"const char");
460 Ssiz_t fst = os->GetString().First(
'\"');
461 Ssiz_t lst = os->GetString().Last(
'\"');
462 usr = os->GetString()(fst+1, lst-fst-1);
464 emsg =
"could not find 'const char *' string in macro log";
467 emsg =
"could not retrieve user info";
469 if (!emsg.IsNull()) {
471 UserGroup_t *pw = gSystem->GetUserInfo();
476 Warning(
"TProof",
"%s: using local default %s", emsg.Data(), usr.Data());
479 fUrl.SetUser(usr.Data());
485 R__LOCKGUARD(gROOTMutex);
486 gROOT->GetListOfSockets()->Remove(mgr);
487 gROOT->GetListOfSockets()->Add(mgr);
491 if (IsProofd() || TestBit(TProof::kIsMaster))
492 if (!gROOT->GetListOfProofs()->FindObject(
this))
493 gROOT->GetListOfProofs()->Add(
this);
507 TProof::TProof() : fUrl(
""), fServType(TProofMgr::kXProofd)
512 if (!gROOT->GetListOfProofs()->FindObject(
this))
513 gROOT->GetListOfProofs()->Add(
this);
521 void TProof::InitMembers()
527 fMasterServ = kFALSE;
528 fSendGroupView = kFALSE;
529 fIsPollingWorkers = kFALSE;
530 fLastPollWorkers_s = -1;
534 fAllUniqueSlaves = 0;
535 fNonUniqueMasters = 0;
538 fAllUniqueMonitor = 0;
545 fProgressDialogStarted = kFALSE;
546 SetBit(kUseProgressDialog);
553 fRunStatus = kRunning;
558 fLogToWindowOnly = kFALSE;
559 fSaveLogToMacro = kFALSE;
560 fMacroLog.SetName(
"ProofLogMacro");
573 fEnabledPackagesOnCluster = 0;
583 fTerminatedSlaveInfos = 0;
589 fAvailablePackages = 0;
590 fEnabledPackages = 0;
593 fCollectTimeout = -1;
597 fDynamicStartup = kFALSE;
599 fMergersSet = kFALSE;
600 fMergersByHost = kFALSE;
603 fLastAssignedMerger = 0;
605 fFinalizationRunning = kFALSE;
609 fWrksOutputReady = 0;
617 if (gSystem->Getenv(
"PROOF_ENVVARS")) {
618 TString envs(gSystem->Getenv(
"PROOF_ENVVARS")), env, envsfound;
620 while (envs.Tokenize(env, from,
",")) {
622 if (!gSystem->Getenv(env)) {
623 Warning(
"Init",
"request for sending over undefined environemnt variable '%s' - ignoring", env.Data());
625 if (!envsfound.IsNull()) envsfound +=
",";
627 TProof::DelEnvVar(env);
628 TProof::AddEnvVar(env, gSystem->Getenv(env));
632 if (envsfound.IsNull()) {
633 Warning(
"Init",
"none of the requested env variables were found: '%s'", envs.Data());
635 Info(
"Init",
"the following environment variables have been added to the list to be sent to the nodes: '%s'", envsfound.Data());
649 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
657 if (TestBit(TProof::kIsClient)) {
659 TList *epl = fPackMgr->GetListOfEnabled();
661 while (TObjString *pck = (TObjString *)(nxp())) {
663 if (gSystem->GetPathInfo(pck->String(), stat) == 0) {
668 gSystem->Unlink(pck->String());
674 SafeDelete(fIntHandler);
676 SafeDelete(fActiveSlaves);
677 SafeDelete(fInactiveSlaves);
678 SafeDelete(fUniqueSlaves);
679 SafeDelete(fAllUniqueSlaves);
680 SafeDelete(fNonUniqueMasters);
681 SafeDelete(fTerminatedSlaveInfos);
682 SafeDelete(fBadSlaves);
683 SafeDelete(fAllMonitor);
684 SafeDelete(fActiveMonitor);
685 SafeDelete(fUniqueMonitor);
686 SafeDelete(fAllUniqueMonitor);
687 SafeDelete(fSlaveInfo);
690 SafeDelete(fFeedback);
691 SafeDelete(fWaitingSlaves);
692 SafeDelete(fAvailablePackages);
693 SafeDelete(fEnabledPackages);
694 SafeDelete(fLoadedMacros);
695 SafeDelete(fPackMgr);
696 SafeDelete(fRecvMessages);
697 SafeDelete(fInputData);
698 SafeDelete(fRunningDSets);
699 if (fWrksOutputReady) {
700 fWrksOutputReady->SetOwner(kFALSE);
701 delete fWrksOutputReady;
705 if (TestBit(TProof::kIsClient)) {
710 if (fLogFileName.Length() > 0)
711 gSystem->Unlink(fLogFileName);
715 gROOT->GetListOfProofs()->Remove(
this);
717 if (fManager && fManager->IsValid())
718 fManager->DiscardSession(
this);
720 if (gProof && gProof ==
this) {
722 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
723 while ((gProof = (TProof *)pvp())) {
724 if (gProof->InheritsFrom(TProof::Class()))
731 Emit(
"CloseWindow()");
742 Int_t TProof::Init(
const char *,
const char *conffile,
743 const char *confdir, Int_t loglevel,
const char *alias)
750 fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
753 Bool_t attach = kFALSE;
754 if (strlen(fUrl.GetOptions()) > 0) {
757 TString opts = fUrl.GetOptions();
758 if (opts.Contains(
"GUI")) {
759 SetBit(TProof::kUsingSessionGui);
760 opts.Remove(opts.Index(
"GUI"));
761 fUrl.SetOptions(opts);
765 if (TestBit(TProof::kIsMaster)) {
767 if (!conffile || !conffile[0])
768 fConfFile = kPROOF_ConfFile;
769 if (!confdir || !confdir[0])
770 fConfDir = kPROOF_ConfDir;
772 if (gProofServ) fGroup = gProofServ->GetGroup();
775 fConfFile = conffile;
779 if (fConfFile.Contains(
"workers=0")) fConfFile.ReplaceAll(
"workers=0",
"masteronly");
780 ParseConfigField(fConfFile);
782 fWorkDir = gSystem->WorkingDirectory();
783 fLogLevel = loglevel;
784 fProtocol = kPROOF_Protocol;
785 fSendGroupView = kTRUE;
786 fImage = fMasterServ ?
"" :
"<local>";
789 fRecvMessages =
new TList;
790 fRecvMessages->SetOwner(kTRUE);
793 fAvailablePackages = 0;
794 fEnabledPackages = 0;
796 fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
798 ResetBit(TProof::kNewInputData);
802 fCollectTimeout = gEnv->GetValue(
"Proof.CollectTimeout", -1);
805 fDynamicStartup = gEnv->GetValue(
"Proof.DynamicStartup", kFALSE);
808 if (TestBit(TProof::kIsClient))
809 fDataPoolUrl.Form(
"root://%s", fMaster.Data());
814 fProgressDialogStarted = kFALSE;
817 TString al = (alias) ? alias : fMaster.Data();
822 if (TestBit(TProof::kIsClient)) {
823 fLogFileName.Form(
"%s/ProofLog_%d", gSystem->TempDirectory(), gSystem->GetPid());
824 if ((fLogFileW = fopen(fLogFileName,
"w")) == 0)
825 Error(
"Init",
"could not create temporary logfile");
826 if ((fLogFileR = fopen(fLogFileName,
"r")) == 0)
827 Error(
"Init",
"could not open temp logfile for reading");
829 fLogToWindowOnly = kFALSE;
834 fSync = (attach) ? kFALSE : kTRUE;
860 fFeedback =
new TList;
861 fFeedback->SetOwner();
862 fFeedback->SetName(
"FeedbackList");
866 fSlaves =
new TSortedList(kSortDescending);
867 fActiveSlaves =
new TList;
868 fInactiveSlaves =
new TList;
869 fUniqueSlaves =
new TList;
870 fAllUniqueSlaves =
new TList;
871 fNonUniqueMasters =
new TList;
872 fBadSlaves =
new TList;
873 fAllMonitor =
new TMonitor;
874 fActiveMonitor =
new TMonitor;
875 fUniqueMonitor =
new TMonitor;
876 fAllUniqueMonitor =
new TMonitor;
879 fTerminatedSlaveInfos =
new TList;
880 fTerminatedSlaveInfos->SetOwner(kTRUE);
888 Bool_t enableSchemaEvolution = gEnv->GetValue(
"Proof.SchemaEvolution",1);
889 if (enableSchemaEvolution) {
890 TMessage::EnableSchemaEvolutionForAll();
892 Info(
"TProof",
"automatic schema evolution in TMessage explicitly disabled");
897 if (gProofServ) fPackMgr = gProofServ->GetPackMgr();
901 if (GetSandbox(sandbox, kTRUE) != 0) {
902 Error(
"Init",
"failure asserting sandbox directory %s", sandbox.Data());
907 TString packdir = gEnv->GetValue(
"Proof.PackageDir",
"");
908 if (packdir.IsNull())
909 packdir.Form(
"%s/%s", sandbox.Data(), kPROOF_PackDir);
910 if (AssertPath(packdir, kTRUE) != 0) {
911 Error(
"Init",
"failure asserting directory %s", packdir.Data());
914 fPackMgr =
new TPackMgr(packdir);
916 Info(
"Init",
"package directory set to %s", packdir.Data());
921 TString globpack = gEnv->GetValue(
"Proof.GlobalPackageDirs",
"");
922 TProofServ::ResolveKeywords(globpack);
923 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
925 Info(
"Init",
" %d global package directories registered", nglb);
929 if (fDynamicStartup) {
932 if (!StartSlaves(attach))
939 Bool_t masterOnly = gEnv->GetValue(
"Proof.MasterOnly", kFALSE);
940 if (!IsMaster() || !masterOnly) {
942 if (!StartSlaves(attach))
947 GetRC(
"Proof.DynamicStartup", dyn);
948 if (dyn != 0) fDynamicStartup = kTRUE;
956 fAllMonitor->DeActivateAll();
959 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
961 if (TProof::GetEnvVars() &&
962 (n = (TNamed *) TProof::GetEnvVars()->FindObject(
"PROOF_NWORKERS"))) {
963 TString s(n->GetTitle());
964 if (s.IsDigit()) nwrk = s.Atoi();
966 GoParallel(nwrk, attach);
976 if (TestBit(TProof::kIsClient))
984 ActivateAsyncInput();
986 R__LOCKGUARD(gROOTMutex);
987 gROOT->GetListOfSockets()->Add(
this);
992 return fActiveSlaves->GetSize();
1001 Int_t TProof::GetSandbox(TString &sb, Bool_t assert,
const char *rc)
1004 if (rc && strlen(rc)) sb = gEnv->GetValue(rc, sb);
1006 if (sb.IsNull()) sb = gEnv->GetValue(
"Proof.Sandbox",
"");
1008 if (sb.IsNull()) sb.Form(
"~/%s", kPROOF_WorkDir);
1011 sb = gSystem->pwd();
1012 }
else if (sb ==
"..") {
1013 sb = gSystem->DirName(gSystem->pwd());
1015 gSystem->ExpandPathName(sb);
1018 if (assert && AssertPath(sb, kTRUE) != 0)
return -1;
1028 void TProof::ParseConfigField(
const char *config)
1030 TString sconf(config), opt;
1032 Bool_t cpuPin = kFALSE;
1035 const char *cq = (IsLite()) ?
"\"" :
"";
1036 while (sconf.Tokenize(opt, from,
",")) {
1037 if (opt.IsNull())
continue;
1039 if (opt.BeginsWith(
"valgrind")) {
1044 TString mst, top, sub, wrk, all;
1045 TList *envs = fgProofEnvList;
1048 if ((n = (TNamed *) envs->FindObject(
"PROOF_WRAPPERCMD")))
1049 all = n->GetTitle();
1050 if ((n = (TNamed *) envs->FindObject(
"PROOF_MASTER_WRAPPERCMD")))
1051 mst = n->GetTitle();
1052 if ((n = (TNamed *) envs->FindObject(
"PROOF_TOPMASTER_WRAPPERCMD")))
1053 top = n->GetTitle();
1054 if ((n = (TNamed *) envs->FindObject(
"PROOF_SUBMASTER_WRAPPERCMD")))
1055 sub = n->GetTitle();
1056 if ((n = (TNamed *) envs->FindObject(
"PROOF_SLAVE_WRAPPERCMD")))
1057 wrk = n->GetTitle();
1059 if (all !=
"" && mst ==
"") mst = all;
1060 if (all !=
"" && top ==
"") top = all;
1061 if (all !=
"" && sub ==
"") sub = all;
1062 if (all !=
"" && wrk ==
"") wrk = all;
1063 if (all !=
"" && all.BeginsWith(
"valgrind_opts:")) {
1065 Info(
"ParseConfigField",
"valgrind run: resetting 'PROOF_WRAPPERCMD':"
1066 " must be set again for next run , if any");
1067 TProof::DelEnvVar(
"PROOF_WRAPPERCMD");
1070 cmd.Form(
"%svalgrind -v --suppressions=<rootsys>/etc/valgrind-root.supp", cq);
1071 TString mstlab(
"NO"), wrklab(
"NO");
1072 Bool_t doMaster = (opt ==
"valgrind" || (opt.Contains(
"master") &&
1073 !opt.Contains(
"topmaster") && !opt.Contains(
"submaster")))
1078 if (mst ==
"" || mst.BeginsWith(
"valgrind_opts:")) {
1079 mst.ReplaceAll(
"valgrind_opts:",
"");
1080 var.Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), mst.Data());
1081 TProof::AddEnvVar(
"PROOF_MASTER_WRAPPERCMD", var);
1083 }
else if (mst !=
"") {
1087 if (opt.Contains(
"master")) {
1088 Warning(
"ParseConfigField",
1089 "master valgrinding does not make sense for PROOF-Lite: ignoring");
1090 opt.ReplaceAll(
"master",
"");
1091 if (!opt.Contains(
"workers"))
return;
1093 if (opt ==
"valgrind" || opt ==
"valgrind=") opt =
"valgrind=workers";
1096 if (opt.Contains(
"topmaster")) {
1098 if (top ==
"" || top.BeginsWith(
"valgrind_opts:")) {
1099 top.ReplaceAll(
"valgrind_opts:",
"");
1100 var.Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), top.Data());
1101 TProof::AddEnvVar(
"PROOF_TOPMASTER_WRAPPERCMD", var);
1103 }
else if (top !=
"") {
1107 if (opt.Contains(
"submaster")) {
1109 if (sub ==
"" || sub.BeginsWith(
"valgrind_opts:")) {
1110 sub.ReplaceAll(
"valgrind_opts:",
"");
1111 var.Form(
"%s --log-file=<logfilemst>.valgrind.log %s", cmd.Data(), sub.Data());
1112 TProof::AddEnvVar(
"PROOF_SUBMASTER_WRAPPERCMD", var);
1114 }
else if (sub !=
"") {
1118 if (opt.Contains(
"=workers") || opt.Contains(
"+workers")) {
1120 if (wrk ==
"" || wrk.BeginsWith(
"valgrind_opts:")) {
1121 wrk.ReplaceAll(
"valgrind_opts:",
"");
1122 var.Form(
"%s --log-file=<logfilewrk>.__valgrind__.log %s%s", cmd.Data(), wrk.Data(), cq);
1123 TProof::AddEnvVar(
"PROOF_SLAVE_WRAPPERCMD", var);
1125 Int_t inw = opt.Index(
'#');
1127 nwrks = opt(inw+1, opt.Length());
1128 if (!nwrks.IsDigit()) nwrks =
"2";
1132 TProof::AddEnvVar(
"PROOF_NWORKERS", nwrks);
1134 gEnv->SetValue(
"ProofLite.Workers", nwrks.Atoi());
1139 TProof::AddEnvVar(
"PROOF_ADDITIONALLOG",
"__valgrind__.log*");
1140 }
else if (wrk !=
"") {
1146 TProof::AddEnvVar(
"PROOF_INTWAIT",
"5000");
1147 gEnv->SetValue(
"Proof.SocketActivityTimeout", 6000);
1149 gEnv->SetValue(
"ProofLite.StartupTimeOut", 5000);
1154 Printf(
" ---> Starting a debug run with valgrind (master:%s, workers:%s)", mstlab.Data(), wrklab.Data());
1156 Printf(
" ---> Starting a debug run with valgrind (workers:%s)", wrklab.Data());
1158 Printf(
" ---> Please be patient: startup may be VERY slow ...");
1159 Printf(
" ---> Logs will be available as special tags in the log window (from the progress dialog or TProof::LogViewer()) ");
1160 Printf(
" ---> (Reminder: this debug run makes sense only if you are running a debug version of ROOT)");
1163 }
else if (opt.BeginsWith(
"igprof-pp")) {
1169 Printf(
"*** Requested IgProf performance profiling ***");
1170 TString addLogExt =
"__igprof.pp__.log";
1171 TString addLogFmt =
"igprof -pk -pp -t proofserv.exe -o %s.%s";
1175 addLogFmt.Append(
"\"");
1176 addLogFmt.Prepend(
"\"");
1179 tmp.Form(addLogFmt.Data(),
"<logfilemst>", addLogExt.Data());
1180 TProof::AddEnvVar(
"PROOF_MASTER_WRAPPERCMD", tmp.Data());
1182 tmp.Form(addLogFmt.Data(),
"<logfilewrk>", addLogExt.Data());
1183 TProof::AddEnvVar(
"PROOF_SLAVE_WRAPPERCMD", tmp.Data() );
1185 TProof::AddEnvVar(
"PROOF_ADDITIONALLOG", addLogExt.Data());
1187 }
else if (opt.BeginsWith(
"cpupin=")) {
1206 for (Ssiz_t i=0; i<opt.Length(); i++) {
1208 if ((c !=
'+') && ((c < '0') || (c >
'9')))
1211 opt.ReplaceAll(
"_",
"");
1212 TProof::AddEnvVar(
"PROOF_SLAVE_CPUPIN_ORDER", opt);
1214 }
else if (opt.BeginsWith(
"workers=")) {
1220 opt.ReplaceAll(
"workers=",
"");
1221 TProof::AddEnvVar(
"PROOF_NWORKERS", opt);
1227 if (IsLite() && cpuPin) {
1228 Printf(
"*** Requested CPU pinning ***");
1229 const TList *ev = GetEnvVars();
1230 const char *pinCmd =
"taskset -c <cpupin>";
1233 if (ev && (p = dynamic_cast<TNamed *>(ev->FindObject(
"PROOF_SLAVE_WRAPPERCMD")))) {
1234 val = p->GetTitle();
1235 val.Insert(val.Length()-1,
" ");
1236 val.Insert(val.Length()-1, pinCmd);
1239 val.Form(
"\"%s\"", pinCmd);
1241 TProof::AddEnvVar(
"PROOF_SLAVE_WRAPPERCMD", val.Data());
1250 Int_t TProof::AssertPath(
const char *inpath, Bool_t writable)
1252 if (!inpath || strlen(inpath) <= 0) {
1253 Error(
"AssertPath",
"undefined input path");
1257 TString path(inpath);
1258 gSystem->ExpandPathName(path);
1260 if (gSystem->AccessPathName(path, kFileExists)) {
1261 if (gSystem->mkdir(path, kTRUE) != 0) {
1262 Error(
"AssertPath",
"could not create path %s", path.Data());
1267 if (gSystem->AccessPathName(path, kWritePermission) && writable) {
1268 if (gSystem->Chmod(path, 0666) != 0) {
1269 Error(
"AssertPath",
"could not make path %s writable", path.Data());
1282 void TProof::SetManager(TProofMgr *mgr)
1287 R__LOCKGUARD(gROOTMutex);
1288 gROOT->GetListOfSockets()->Remove(mgr);
1289 gROOT->GetListOfSockets()->Add(mgr);
1301 Int_t TProof::AddWorkers(TList *workerList)
1304 Error(
"AddWorkers",
"AddWorkers can only be called on the master!");
1308 if (!workerList || !(workerList->GetSize())) {
1309 Error(
"AddWorkers",
"empty list of workers!");
1315 fImage = gProofServ->GetImage();
1316 if (fImage.IsNull())
1317 fImage.Form(
"%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(), gProofServ->GetWorkDir());
1320 UInt_t nSlaves = workerList->GetSize();
1321 UInt_t nSlavesDone = 0;
1326 Bool_t goMoreParallel = (fSlaves->GetEntries() > 0) ? kTRUE : kFALSE;
1329 TList *addedWorkers =
new TList();
1330 if (!addedWorkers) {
1332 Error(
"AddWorkers",
"cannot create new list for the workers to be added");
1335 addedWorkers->SetOwner(kFALSE);
1336 TListIter next(workerList);
1338 TProofNodeInfo *worker;
1339 TSlaveInfo *dummysi =
new TSlaveInfo();
1340 while ((to = next())) {
1342 worker = (TProofNodeInfo *)to;
1345 const Char_t *image = worker->GetImage().Data();
1346 const Char_t *workdir = worker->GetWorkDir().Data();
1347 Int_t perfidx = worker->GetPerfIndex();
1348 Int_t sport = worker->GetPort();
1350 sport = fUrl.GetPort();
1354 if (worker->GetOrdinal().Length() > 0) {
1355 fullord.Form(
"%s.%s", gProofServ->GetOrdinal(), worker->GetOrdinal().Data());
1357 fullord.Form(
"%s.%d", gProofServ->GetOrdinal(), ord);
1361 dummysi->SetOrdinal(fullord);
1362 TSlaveInfo *rmsi = (TSlaveInfo *)fTerminatedSlaveInfos->Remove(dummysi);
1366 TString wn(worker->GetNodeName());
1367 if (wn ==
"localhost" || wn.BeginsWith(
"localhost.")) wn = gSystem->HostName();
1368 TUrl u(TString::Format(
"%s:%d", wn.Data(), sport));
1370 if (strlen(gProofServ->GetGroup()) > 0) {
1372 if (strlen(u.GetUser()) <= 0)
1373 u.SetUser(gProofServ->GetUser());
1374 u.SetPasswd(gProofServ->GetGroup());
1377 if (worker->IsWorker()) {
1378 slave = CreateSlave(u.GetUrl(), fullord, perfidx, image, workdir);
1380 slave = CreateSubmaster(u.GetUrl(), fullord,
1381 image, worker->GetMsd(), worker->GetNWrks());
1386 Bool_t slaveOk = kTRUE;
1387 fSlaves->Add(slave);
1388 if (slave->IsValid()) {
1389 addedWorkers->Add(slave);
1392 fBadSlaves->Add(slave);
1393 Warning(
"AddWorkers",
"worker '%s' is invalid", slave->GetOrdinal());
1397 Info("AddWorkers", "worker on host %s created"
1398 " and added to list (ord: %s)", worker->GetName(), slave->GetOrdinal());
1402 TMessage m(kPROOF_SERVERSTARTED);
1403 m << TString("Opening connections to workers") << nSlaves
1404 << nSlavesDone << slaveOk;
1405 gProofServ->GetSocket()->Send(m);
1409 SafeDelete(dummysi);
1412 SafeDelete(workerList);
1418 TIter nxsl(addedWorkers);
1420 while ((sl = (TSlave *) nxsl())) {
1424 sl->SetupServ(TSlave::kSlave, 0);
1427 Bool_t slaveOk = kTRUE;
1428 if (sl->IsValid()) {
1429 fAllMonitor->Add(sl->GetSocket());
1431 Info("AddWorkers", "worker on host %s finalized"
1432 " and added to list", sl->GetOrdinal());
1435 fBadSlaves->Add(sl);
1440 TMessage m(kPROOF_SERVERSTARTED);
1441 m << TString(
"Setting up worker servers") << nSlaves
1442 << nSlavesDone << slaveOk;
1443 gProofServ->GetSocket()->Send(m);
1450 Int_t nwrk = GetRemoteProtocol() > 35 ? -1 : 9999;
1452 if (TProof::GetEnvVars() &&
1453 (n = (TNamed *) TProof::GetEnvVars()->FindObject(
"PROOF_NWORKERS"))) {
1454 TString s(n->GetTitle());
1455 if (s.IsDigit()) nwrk = s.Atoi();
1458 if (fDynamicStartup && goMoreParallel) {
1461 Info("AddWorkers", "will invoke GoMoreParallel()");
1462 Int_t nw = GoMoreParallel(nwrk);
1464 Info("AddWorkers", "GoMoreParallel()=%d", nw);
1470 Info("AddWorkers", "will invoke GoParallel()");
1471 GoParallel(nwrk, kFALSE, 0);
1475 SetupWorkersEnv(addedWorkers, goMoreParallel);
1479 Info("AddWorkers", "will invoke SaveWorkerInfo()");
1483 if (fDynamicStartup && gProofServ) {
1485 Info("AddWorkers", "will invoke SendParallel()");
1486 gProofServ->SendParallel(kTRUE);
1488 if (goMoreParallel && fPlayer) {
1493 Info("AddWorkers", "will send the PROCESS message to selected workers");
1494 fPlayer->JoinProcess(addedWorkers);
1496 fMergePrg.SetNWrks(fActiveSlaves->GetSize() + addedWorkers->GetSize());
1501 delete addedWorkers;
1509 void TProof::SetupWorkersEnv(TList *addedWorkers, Bool_t increasingWorkers)
1512 TList *packs = gProofServ ? gProofServ->GetEnabledPackages() : GetEnabledPackages();
1513 if (packs && packs->GetSize() > 0) {
1516 while ((pck = (TPair *) nxp())) {
1519 if (fDynamicStartup && increasingWorkers) {
1522 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on added workers");
1523 if (UploadPackage(pck->GetName(), kUntar, addedWorkers) >= 0)
1524 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE, addedWorkers);
1527 Info("SetupWorkersEnv", "will invoke UploadPackage() and EnablePackage() on all workers");
1528 if (UploadPackage(pck->GetName()) >= 0)
1529 EnablePackage(pck->GetName(), (TList *) pck->Value(), kTRUE);
1535 if (fLoadedMacros) {
1536 TIter nxp(fLoadedMacros);
1538 while ((os = (TObjString *) nxp())) {
1540 Info(
"SetupWorkersEnv",
"will invoke Load() on selected workers");
1541 Printf(
"Loading a macro : %s", os->GetName());
1543 Load(os->GetName(), kTRUE, kTRUE, addedWorkers);
1548 TString dyn = gSystem->GetDynamicPath();
1549 dyn.ReplaceAll(
":",
" ");
1550 dyn.ReplaceAll(
"\"",
" ");
1552 Info("SetupWorkersEnv", "will invoke AddDynamicPath() on selected workers");
1553 AddDynamicPath(dyn, kFALSE, addedWorkers, kFALSE);
1556 TString inc = gSystem->GetIncludePath();
1557 inc.ReplaceAll("-I", " ");
1558 inc.ReplaceAll("\"", " ");
1560 Info("SetupWorkersEnv", "will invoke AddIncludePath() on selected workers");
1561 AddIncludePath(inc, kFALSE, addedWorkers, kFALSE);
1572 Int_t TProof::RemoveWorkers(TList *workerList)
1575 Error(
"RemoveWorkers",
"RemoveWorkers can only be called on the master!");
1583 TIter nxsl(fSlaves);
1585 while ((sl = (TSlave *) nxsl())) {
1587 TerminateWorker(sl);
1591 if (!(workerList->GetSize())) {
1592 Error(
"RemoveWorkers",
"The list of workers should not be empty!");
1597 TListIter next(workerList);
1599 TProofNodeInfo *worker;
1600 while ((to = next())) {
1602 if (!strcmp(to->ClassName(),
"TProofNodeInfo")) {
1604 worker = (TProofNodeInfo *)to;
1605 TIter nxsl(fSlaves);
1606 while ((sl = (TSlave *) nxsl())) {
1608 if (sl->GetName() == worker->GetNodeName())
1611 }
else if (to->InheritsFrom(TSlave::Class())) {
1614 Warning(
"RemoveWorkers",
"unknown object type: %s - it should be"
1615 " TProofNodeInfo or inheriting from TSlave", to->ClassName());
1620 Info(
"RemoveWorkers",
"terminating worker %s", sl->GetOrdinal());
1621 TerminateWorker(sl);
1627 if (gProofServ && fSlaves->GetSize() <= 0) gProofServ->ReleaseWorker(
"master");
1635 Bool_t TProof::StartSlaves(Bool_t attach)
1639 if (TestBit(TProof::kIsMaster)) {
1642 TList *workerList =
new TList;
1644 if (gProofServ->GetWorkers(workerList, pc) == TProofServ::kQueryStop) {
1645 TString emsg(
"no resource currently available for this session: please retry later");
1646 if (gDebug > 0) Info(
"StartSlaves",
"%s", emsg.Data());
1647 gProofServ->SendAsynMessage(emsg.Data());
1651 if (AddWorkers(workerList) < 0)
1657 Printf(
"Starting master: opening connection ...");
1658 TSlave *slave = CreateSubmaster(fUrl.GetUrl(),
"0",
"master", 0);
1660 if (slave->IsValid()) {
1663 fprintf(stderr,
"Starting master:"
1664 " connection open: setting up server ... \r");
1665 StartupMessage(
"Connection to master opened", kTRUE, 1, 1);
1670 slave->SetInterruptHandler(kTRUE);
1673 slave->SetupServ(TSlave::kMaster, fConfFile);
1675 if (slave->IsValid()) {
1678 Printf(
"Starting master: OK ");
1679 StartupMessage(
"Master started", kTRUE, 1, 1);
1683 if (fProtocol == 1) {
1684 Error(
"StartSlaves",
1685 "client and remote protocols not compatible (%d and %d)",
1686 kPROOF_Protocol, fProtocol);
1692 fSlaves->Add(slave);
1693 fAllMonitor->Add(slave->GetSocket());
1696 slave->SetInterruptHandler(kFALSE);
1699 fIntHandler =
new TProofInterruptHandler(
this);
1702 Int_t rc = Collect(slave, 300);
1703 Int_t slStatus = slave->GetStatus();
1704 if (slStatus == -99 || slStatus == -98 || rc == 0) {
1705 fSlaves->Remove(slave);
1706 fAllMonitor->Remove(slave->GetSocket());
1707 if (slStatus == -99)
1708 Error(
"StartSlaves",
"no resources available or problems setting up workers (check logs)");
1709 else if (slStatus == -98)
1710 Error(
"StartSlaves",
"could not setup output redirection on master");
1712 Error(
"StartSlaves",
"setting up master");
1718 if (!slave->IsValid()) {
1719 fSlaves->Remove(slave);
1720 fAllMonitor->Remove(slave->GetSocket());
1723 Error(
"StartSlaves",
1724 "failed to setup connection with PROOF master server");
1728 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1729 if ((fProgressDialog =
1730 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1731 if (fProgressDialog->LoadPlugin() == -1)
1732 fProgressDialog = 0;
1736 Printf(
"Starting master: failure");
1741 Printf(
"Starting master: OK ");
1742 StartupMessage(
"Master attached", kTRUE, 1, 1);
1744 if (!gROOT->IsBatch() && TestBit(kUseProgressDialog)) {
1745 if ((fProgressDialog =
1746 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
1747 if (fProgressDialog->LoadPlugin() == -1)
1748 fProgressDialog = 0;
1751 fSlaves->Add(slave);
1752 fIntHandler =
new TProofInterruptHandler(
this);
1759 Error(
"StartSlaves",
"failed to create (or connect to) the PROOF master server");
1773 void TProof::Close(Option_t *opt)
1775 { std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
1780 fIntHandler->Remove();
1784 while ((sl = (TSlave *)nxs()))
1787 fActiveSlaves->Clear(
"nodelete");
1788 fUniqueSlaves->Clear(
"nodelete");
1789 fAllUniqueSlaves->Clear(
"nodelete");
1790 fNonUniqueMasters->Clear(
"nodelete");
1791 fBadSlaves->Clear(
"nodelete");
1792 fInactiveSlaves->Clear(
"nodelete");
1797 { R__LOCKGUARD(gROOTMutex);
1798 gROOT->GetListOfSockets()->Remove(
this);
1801 while (TChain *chain = dynamic_cast<TChain*> (fChains->First()) ) {
1810 gROOT->GetListOfProofs()->Remove(
this);
1811 if (gProof && gProof ==
this) {
1813 TIter pvp(gROOT->GetListOfProofs(), kIterBackward);
1814 while ((gProof = (TProof *)pvp())) {
1815 if (gProof->IsProofd())
1828 TSlave *TProof::CreateSlave(
const char *url,
const char *ord,
1829 Int_t perf,
const char *image,
const char *workdir)
1831 TSlave* sl = TSlave::Create(url, ord, perf, image,
1832 this, TSlave::kSlave, workdir, 0);
1834 if (sl->IsValid()) {
1835 sl->SetInputHandler(
new TProofInputHandler(
this, sl->GetSocket()));
1850 TSlave *TProof::CreateSubmaster(
const char *url,
const char *ord,
1851 const char *image,
const char *msd, Int_t nwk)
1853 TSlave *sl = TSlave::Create(url, ord, 100, image,
this,
1854 TSlave::kMaster, 0, msd, nwk);
1856 if (sl->IsValid()) {
1857 sl->SetInputHandler(
new TProofInputHandler(
this, sl->GetSocket()));
1866 TSlave *TProof::FindSlave(TSocket *s)
const
1869 TIter next(fSlaves);
1871 while ((sl = (TSlave *)next())) {
1872 if (sl->IsValid() && sl->GetSocket() == s)
1887 void TProof::FindUniqueSlaves()
1889 fUniqueSlaves->Clear();
1890 fUniqueMonitor->RemoveAll();
1891 fAllUniqueSlaves->Clear();
1892 fAllUniqueMonitor->RemoveAll();
1893 fNonUniqueMasters->Clear();
1895 TIter next(fActiveSlaves);
1897 while (TSlave *sl = dynamic_cast<TSlave*>(next())) {
1898 if (fImage == sl->fImage) {
1899 if (sl->GetSlaveType() == TSlave::kMaster) {
1900 fNonUniqueMasters->Add(sl);
1901 fAllUniqueSlaves->Add(sl);
1902 fAllUniqueMonitor->Add(sl->GetSocket());
1907 TIter next2(fUniqueSlaves);
1908 TSlave *replace_slave = 0;
1910 while (TSlave *sl2 = dynamic_cast<TSlave*>(next2())) {
1911 if (sl->fImage == sl2->fImage) {
1913 if (sl->GetSlaveType() == TSlave::kMaster) {
1914 if (sl2->GetSlaveType() == TSlave::kSlave) {
1916 replace_slave = sl2;
1918 }
else if (sl2->GetSlaveType() == TSlave::kMaster) {
1919 fNonUniqueMasters->Add(sl);
1920 fAllUniqueSlaves->Add(sl);
1921 fAllUniqueMonitor->Add(sl->GetSocket());
1923 Error(
"FindUniqueSlaves",
"TSlave is neither Master nor Slave");
1932 fUniqueSlaves->Add(sl);
1933 fAllUniqueSlaves->Add(sl);
1934 fUniqueMonitor->Add(sl->GetSocket());
1935 fAllUniqueMonitor->Add(sl->GetSocket());
1936 if (replace_slave) {
1937 fUniqueSlaves->Remove(replace_slave);
1938 fAllUniqueSlaves->Remove(replace_slave);
1939 fUniqueMonitor->Remove(replace_slave->GetSocket());
1940 fAllUniqueMonitor->Remove(replace_slave->GetSocket());
1946 fUniqueMonitor->DeActivateAll();
1947 fAllUniqueMonitor->DeActivateAll();
1953 Int_t TProof::GetNumberOfSlaves()
const
1955 return fSlaves->GetSize();
1962 Int_t TProof::GetNumberOfActiveSlaves()
const
1964 return fActiveSlaves->GetSize();
1971 Int_t TProof::GetNumberOfInactiveSlaves()
const
1973 return fInactiveSlaves->GetSize();
1980 Int_t TProof::GetNumberOfUniqueSlaves()
const
1982 return fUniqueSlaves->GetSize();
1989 Int_t TProof::GetNumberOfBadSlaves()
const
1991 return fBadSlaves->GetSize();
1997 void TProof::AskStatistics()
1999 if (!IsValid())
return;
2001 Broadcast(kPROOF_GETSTATS, kActive);
2002 Collect(kActive, fCollectTimeout);
2010 void TProof::GetStatistics(Bool_t verbose)
2012 if (fProtocol > 27) {
2017 RedirectHandle_t rh;
2018 gSystem->RedirectOutput(fLogFileName,
"a", &rh);
2020 gSystem->RedirectOutput(0, 0, &rh);
2021 TMacro *mp = GetLastLog();
2024 TIter nxl(mp->GetListOfLines());
2026 while ((os = (TObjString *) nxl())) {
2027 TString s(os->GetName());
2028 if (s.Contains(
"Total MB's processed:")) {
2029 s.ReplaceAll(
"Total MB's processed:",
"");
2030 if (s.IsFloat()) fBytesRead = (Long64_t) s.Atof() * (1024*1024);
2031 }
else if (s.Contains(
"Total real time used (s):")) {
2032 s.ReplaceAll(
"Total real time used (s):",
"");
2033 if (s.IsFloat()) fRealTime = s.Atof();
2034 }
else if (s.Contains(
"Total CPU time used (s):")) {
2035 s.ReplaceAll(
"Total CPU time used (s):",
"");
2036 if (s.IsFloat()) fCpuTime = s.Atof();
2044 Printf(
" Real/CPU time (s): %.3f / %.3f; workers: %d; processed: %.2f MBs",
2045 GetRealTime(), GetCpuTime(), GetParallel(),
float(GetBytesRead())/(1024*1024));
2052 void TProof::AskParallel()
2054 if (!IsValid())
return;
2056 Broadcast(kPROOF_GETPARALLEL, kActive);
2057 Collect(kActive, fCollectTimeout);
2063 TList *TProof::GetListOfQueries(Option_t *opt)
2065 if (!IsValid() || TestBit(TProof::kIsMaster))
return (TList *)0;
2067 Bool_t all = ((strchr(opt,
'A') || strchr(opt,
'a'))) ? kTRUE : kFALSE;
2068 TMessage m(kPROOF_QUERYLIST);
2070 Broadcast(m, kActive);
2071 Collect(kActive, fCollectTimeout);
2080 Int_t TProof::GetNumberOfQueries()
2083 return fQueries->GetSize() - fOtherQueries;
2090 void TProof::SetMaxDrawQueries(Int_t max)
2094 fPlayer->SetMaxDrawQueries(max);
2095 fMaxDrawQueries = max;
2103 void TProof::GetMaxQueries()
2105 TMessage m(kPROOF_MAXQUERIES);
2107 Broadcast(m, kActive);
2108 Collect(kActive, fCollectTimeout);
2114 TList *TProof::GetQueryResults()
2116 return (fPlayer ? fPlayer->GetListOfResults() : (TList *)0);
2123 TQueryResult *TProof::GetQueryResult(
const char *ref)
2125 return (fPlayer ? fPlayer->GetQueryResult(ref) : (TQueryResult *)0);
2140 void TProof::ShowQueries(Option_t *opt)
2142 Bool_t help = ((strchr(opt,
'H') || strchr(opt,
'h'))) ? kTRUE : kFALSE;
2148 Printf(
"+++ Options: \"A\" show all queries known to server");
2149 Printf(
"+++ \"L\" show retrieved queries");
2150 Printf(
"+++ \"F\" full listing of query info");
2151 Printf(
"+++ \"H\" print this menu");
2153 Printf(
"+++ (case insensitive)");
2155 Printf(
"+++ Use Retrieve(<#>) to retrieve the full"
2156 " query results from the master");
2157 Printf(
"+++ e.g. Retrieve(8)");
2164 if (!IsValid())
return;
2166 Bool_t local = ((strchr(opt,
'L') || strchr(opt,
'l'))) ? kTRUE : kFALSE;
2170 GetListOfQueries(opt);
2172 if (!fQueries)
return;
2174 TIter nxq(fQueries);
2177 if (fOtherQueries > 0) {
2179 Printf(
"+++ Queries processed during other sessions: %d", fOtherQueries);
2181 while (nq++ < fOtherQueries && (pq = nxq()))
2187 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2188 GetNumberOfQueries(), fDrawQueries);
2189 while ((pq = nxq()))
2196 Printf(
"+++ Queries processed during this session: selector: %d, draw: %d",
2197 GetNumberOfQueries(), fDrawQueries);
2200 TList *listlocal = fPlayer ? fPlayer->GetListOfResults() : (TList *)0;
2203 Printf(
"+++ Queries available locally: %d", listlocal->GetSize());
2204 TIter nxlq(listlocal);
2205 while ((pq = nxlq()))
2215 Bool_t TProof::IsDataReady(Long64_t &totalbytes, Long64_t &bytesready)
2217 if (!IsValid())
return kFALSE;
2220 TIter nextSlave(GetListOfActiveSlaves());
2221 while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
2222 if (sl->GetSlaveType() == TSlave::kMaster) {
2231 if (submasters.GetSize() > 0) {
2232 Broadcast(kPROOF_DATA_READY, &submasters);
2233 Collect(&submasters);
2236 bytesready = fBytesReady;
2237 totalbytes = fTotalBytes;
2239 EmitVA(
"IsDataReady(Long64_t,Long64_t)", 2, totalbytes, bytesready);
2242 Info("IsDataReady", "%lld / %lld (%s)",
2243 bytesready, totalbytes, fDataReady?"READY":"NOT READY");
2251 void TProof::Interrupt(EUrgent type, ESlaves list)
2253 if (!IsValid())
return;
2256 if (list == kAll) slaves = fSlaves;
2257 if (list == kActive) slaves = fActiveSlaves;
2258 if (list == kUnique) slaves = fUniqueSlaves;
2259 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2261 if (slaves->GetSize() == 0)
return;
2266 while ((sl = (TSlave *)next())) {
2267 if (sl->IsValid()) {
2270 sl->Interrupt((Int_t)type);
2279 Int_t TProof::GetParallel()
const
2281 if (!IsValid())
return -1;
2284 TIter nextSlave(GetListOfActiveSlaves());
2285 Int_t nparallel = 0;
2286 while (TSlave* sl = dynamic_cast<TSlave*>(nextSlave()))
2287 if (sl->GetParallel() >= 0)
2288 nparallel += sl->GetParallel();
2296 TList *TProof::GetListOfSlaveInfos()
2298 if (!IsValid())
return 0;
2300 if (fSlaveInfo == 0) {
2301 fSlaveInfo =
new TSortedList(kSortDescending);
2302 fSlaveInfo->SetOwner();
2304 fSlaveInfo->Delete();
2308 TIter next(GetListOfSlaves());
2311 while ((slave = (TSlave *) next()) != 0) {
2312 if (slave->GetSlaveType() == TSlave::kSlave) {
2313 const char *name = IsLite() ? gSystem->HostName() : slave->GetName();
2314 TSlaveInfo *slaveinfo =
new TSlaveInfo(slave->GetOrdinal(),
2316 slave->GetPerfIdx());
2317 fSlaveInfo->Add(slaveinfo);
2319 TIter nextactive(GetListOfActiveSlaves());
2320 TSlave *activeslave;
2321 while ((activeslave = (TSlave *) nextactive())) {
2322 if (TString(slaveinfo->GetOrdinal()) == activeslave->GetOrdinal()) {
2323 slaveinfo->SetStatus(TSlaveInfo::kActive);
2328 TIter nextbad(GetListOfBadSlaves());
2330 while ((badslave = (TSlave *) nextbad())) {
2331 if (TString(slaveinfo->GetOrdinal()) == badslave->GetOrdinal()) {
2332 slaveinfo->SetStatus(TSlaveInfo::kBad);
2337 if (slave->IsValid()) {
2338 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2339 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2344 }
else if (slave->GetSlaveType() == TSlave::kMaster) {
2345 if (slave->IsValid()) {
2346 if (slave->GetSocket()->Send(kPROOF_GETSLAVEINFO) == -1)
2347 MarkBad(slave,
"could not send kPROOF_GETSLAVEINFO message");
2352 Error(
"GetSlaveInfo",
"TSlave is neither Master nor Slave");
2356 if (masters.GetSize() > 0) Collect(&masters);
2364 void TProof::Activate(TList *slaves)
2366 TMonitor *mon = fAllMonitor;
2367 mon->DeActivateAll();
2369 slaves = !slaves ? fActiveSlaves : slaves;
2373 while ((sl = (TSlave*) next())) {
2375 mon->Activate(sl->GetSocket());
2383 void TProof::SetMonitor(TMonitor *mon, Bool_t on)
2385 TMonitor *m = (mon) ? mon : fCurrentMonitor;
2399 Int_t TProof::BroadcastGroupPriority(
const char *grp, Int_t priority, TList *workers)
2401 if (!IsValid())
return -1;
2403 if (workers->GetSize() == 0)
return 0;
2406 TIter next(workers);
2409 while ((wrk = (TSlave *)next())) {
2410 if (wrk->IsValid()) {
2411 if (wrk->SendGroupPriority(grp, priority) == -1)
2412 MarkBad(wrk,
"could not send group priority");
2426 Int_t TProof::BroadcastGroupPriority(
const char *grp, Int_t priority, ESlaves list)
2429 if (list == kAll) workers = fSlaves;
2430 if (list == kActive) workers = fActiveSlaves;
2431 if (list == kUnique) workers = fUniqueSlaves;
2432 if (list == kAllUnique) workers = fAllUniqueSlaves;
2434 return BroadcastGroupPriority(grp, priority, workers);
2440 void TProof::ResetMergePrg()
2442 fMergePrg.Reset(fActiveSlaves->GetSize());
2450 Int_t TProof::Broadcast(
const TMessage &mess, TList *slaves)
2452 if (!IsValid())
return -1;
2454 if (!slaves || slaves->GetSize() == 0)
return 0;
2460 while ((sl = (TSlave *)next())) {
2461 if (sl->IsValid()) {
2462 if (sl->GetSocket()->Send(mess) == -1)
2463 MarkBad(sl,
"could not broadcast request");
2477 Int_t TProof::Broadcast(
const TMessage &mess, ESlaves list)
2480 if (list == kAll) slaves = fSlaves;
2481 if (list == kActive) slaves = fActiveSlaves;
2482 if (list == kUnique) slaves = fUniqueSlaves;
2483 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2485 return Broadcast(mess, slaves);
2493 Int_t TProof::Broadcast(
const char *str, Int_t kind, TList *slaves)
2495 TMessage mess(kind);
2496 if (str) mess.WriteString(str);
2497 return Broadcast(mess, slaves);
2506 Int_t TProof::Broadcast(
const char *str, Int_t kind, ESlaves list)
2508 TMessage mess(kind);
2509 if (str) mess.WriteString(str);
2510 return Broadcast(mess, list);
2518 Int_t TProof::BroadcastObject(
const TObject *obj, Int_t kind, TList *slaves)
2520 TMessage mess(kind);
2521 mess.WriteObject(obj);
2522 return Broadcast(mess, slaves);
2530 Int_t TProof::BroadcastObject(
const TObject *obj, Int_t kind, ESlaves list)
2532 TMessage mess(kind);
2533 mess.WriteObject(obj);
2534 return Broadcast(mess, list);
2542 Int_t TProof::BroadcastRaw(
const void *buffer, Int_t length, TList *slaves)
2544 if (!IsValid())
return -1;
2546 if (slaves->GetSize() == 0)
return 0;
2552 while ((sl = (TSlave *)next())) {
2553 if (sl->IsValid()) {
2554 if (sl->GetSocket()->SendRaw(buffer, length) == -1)
2555 MarkBad(sl,
"could not send broadcast-raw request");
2569 Int_t TProof::BroadcastRaw(
const void *buffer, Int_t length, ESlaves list)
2572 if (list == kAll) slaves = fSlaves;
2573 if (list == kActive) slaves = fActiveSlaves;
2574 if (list == kUnique) slaves = fUniqueSlaves;
2575 if (list == kAllUnique) slaves = fAllUniqueSlaves;
2577 return BroadcastRaw(buffer, length, slaves);
2585 Int_t TProof::BroadcastFile(
const char *file, Int_t opt,
const char *rfile, TList *wrks)
2587 if (!IsValid())
return -1;
2589 if (wrks->GetSize() == 0)
return 0;
2595 while ((wrk = (TSlave *)next())) {
2596 if (wrk->IsValid()) {
2597 if (SendFile(file, opt, rfile, wrk) < 0)
2598 Error(
"BroadcastFile",
2599 "problems sending file to worker %s (%s)",
2600 wrk->GetOrdinal(), wrk->GetName());
2614 Int_t TProof::BroadcastFile(
const char *file, Int_t opt,
const char *rfile, ESlaves list)
2617 if (list == kAll) wrks = fSlaves;
2618 if (list == kActive) wrks = fActiveSlaves;
2619 if (list == kUnique) wrks = fUniqueSlaves;
2620 if (list == kAllUnique) wrks = fAllUniqueSlaves;
2622 return BroadcastFile(file, opt, rfile, wrks);
2629 void TProof::ReleaseMonitor(TMonitor *mon)
2631 if (mon && (mon != fAllMonitor) && (mon != fActiveMonitor)
2632 && (mon != fUniqueMonitor) && (mon != fAllUniqueMonitor)) {
2644 Int_t TProof::Collect(
const TSlave *sl, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2649 if (!sl->IsValid())
return 0;
2651 if (fCurrentMonitor == fAllMonitor) {
2655 mon->DeActivateAll();
2657 mon->Activate(sl->GetSocket());
2659 rc = Collect(mon, timeout, endtype, deactonfail);
2660 ReleaseMonitor(mon);
2671 Int_t TProof::Collect(TList *slaves, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2677 if (fCurrentMonitor == fAllMonitor) {
2681 mon->DeActivateAll();
2685 while ((sl = (TSlave*) next())) {
2687 mon->Activate(sl->GetSocket());
2690 rc = Collect(mon, timeout, endtype, deactonfail);
2691 ReleaseMonitor(mon);
2702 Int_t TProof::Collect(ESlaves list, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2707 if (list == kAll) mon = fAllMonitor;
2708 if (list == kActive) mon = fActiveMonitor;
2709 if (list == kUnique) mon = fUniqueMonitor;
2710 if (list == kAllUnique) mon = fAllUniqueMonitor;
2711 if (fCurrentMonitor == mon) {
2713 mon =
new TMonitor(*mon);
2717 rc = Collect(mon, timeout, endtype, deactonfail);
2718 ReleaseMonitor(mon);
2731 Int_t TProof::Collect(TMonitor *mon, Long_t timeout, Int_t endtype, Bool_t deactonfail)
2733 Int_t collectId = gRandom->Integer(9999);
2736 Info("Collect", ">>>>>> Entering collect responses
#%04d", collectId);
2740 fRecvMessages->Clear();
2742 Long_t actto = (Long_t)(gEnv->GetValue(
"Proof.SocketActivityTimeout", -1) * 1000);
2744 if (!mon->GetActive(actto))
return 0;
2746 DeActivateAsyncInput();
2749 TMonitor *savedMonitor = 0;
2750 if (fCurrentMonitor) {
2751 savedMonitor = fCurrentMonitor;
2752 fCurrentMonitor = mon;
2754 fCurrentMonitor = mon;
2762 Bool_t saveRedirLog = fRedirLog;
2763 if (!IsIdle() && !IsSync())
2766 int cnt = 0, rc = 0;
2769 Long_t nto = timeout;
2771 Info("Collect","
#%04d: active: %d", collectId, mon->GetActive());
2781 Int_t pollint = gEnv->GetValue(
"Proof.DynamicStartupPollInt", (Int_t) kPROOF_DynWrkPollInt_s);
2782 mon->ResetInterrupt();
2783 while ((nact = mon->GetActive(sto)) && (nto < 0 || nto > 0)) {
2788 TList *al = mon->GetListOfActives();
2789 if (al && al->GetSize() > 0) {
2790 Info(
"Collect",
" %d node(s) still active:", al->GetSize());
2793 while ((xs = (TSocket *)nxs())) {
2794 TSlave *wrk = FindSlave(xs);
2796 Info(
"Collect",
" %s (%s)", wrk->GetName(), wrk->GetOrdinal());
2798 Info(
"Collect",
" %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2799 xs->GetInetAddress().GetPort());
2807 if (TestBit(TProof::kIsMaster) && !IsIdle() && fDynamicStartup && !fIsPollingWorkers &&
2808 ((fLastPollWorkers_s == -1) || (time(0)-fLastPollWorkers_s >= pollint))) {
2809 fIsPollingWorkers = kTRUE;
2810 if (PollForNewWorkers() > 0) DeActivateAsyncInput();
2811 fLastPollWorkers_s = time(0);
2812 fIsPollingWorkers = kFALSE;
2814 Info("Collect","
#%04d: now active: %d", collectId, mon->GetActive());
2819 Info("Collect", "Will invoke Select()
#%04d", collectId);
2820 TSocket *s = mon->Select(1000);
2822 if (s && s != (TSocket *)(-1)) {
2824 rc = CollectInputFrom(s, endtype, deactonfail);
2825 if (rc == 1 || (rc == 2 && !savedMonitor)) {
2829 Info("Collect","
#%04d: deactivating %p (active: %d, %p)", collectId,
2830 s, mon->GetActive(),
2831 mon->GetListOfActives()->First());
2832 }
else if (rc == 2) {
2836 savedMonitor->DeActivate(s);
2838 Info("Collect","save monitor: deactivating %p (active: %d, %p)",
2839 s, savedMonitor->GetActive(),
2840 savedMonitor->GetListOfActives()->First());
2852 if (fPlayer && (fPlayer->GetExitStatus() == TVirtualProofPlayer::kFinished))
2853 mon->DeActivateAll();
2855 if (s == (TSocket *)(-1) && nto > 0)
2860 if (IsMaster() && fWrksOutputReady && fWrksOutputReady->GetSize() > 0) {
2862 Int_t mxws = gEnv->GetValue(
"Proof.ControlSendOutput", 1);
2863 if (TProof::GetParameter(fPlayer->GetInputList(),
"PROOF_ControlSendOutput", mxws) != 0)
2864 mxws = gEnv->GetValue(
"Proof.ControlSendOutput", 1);
2865 TIter nxwr(fWrksOutputReady);
2867 while (mxws && (wrk = (TSlave *) nxwr())) {
2868 if (!wrk->TestBit(TSlave::kOutputRequested)) {
2870 TMessage sendoutput(kPROOF_SENDOUTPUT);
2872 Info("Collect", "worker %s was asked to send its output to master",
2874 if (wrk->GetSocket()->Send(sendoutput) != 1) {
2875 wrk->SetBit(TSlave::kOutputRequested);
2888 sto = (Long_t) actto;
2896 TList *al = mon->GetListOfActives();
2897 if (al && al->GetSize() > 0) {
2899 Info(
"Collect",
" %d node(s) went in timeout:", al->GetSize());
2902 while ((xs = (TSocket *)nxs())) {
2903 TSlave *wrk = FindSlave(xs);
2905 Info(
"Collect",
" %s", wrk->GetName());
2907 Info(
"Collect",
" %p: %s:%d", xs, xs->GetInetAddress().GetHostName(),
2908 xs->GetInetAddress().GetPort());
2911 mon->DeActivateAll();
2916 fIntHandler->Remove();
2922 fRedirLog = saveRedirLog;
2925 fCurrentMonitor = savedMonitor;
2927 ActivateAsyncInput();
2930 Info("Collect", "<<<<<< Exiting collect responses
#%04d", collectId);
2939 Int_t TProof::PollForNewWorkers()
2943 TList *reqWorkers =
new TList();
2944 reqWorkers->SetOwner(kFALSE);
2946 if (!TestBit(TProof::kIsMaster)) {
2947 Error(
"PollForNewWorkers",
"Can't invoke: not on a master -- should not happen!");
2951 Error(
"PollForNewWorkers",
"No ProofServ available -- should not happen!");
2955 gProofServ->GetWorkers(reqWorkers, dummy, kTRUE);
2958 TList *newWorkers =
new TList();
2959 newWorkers->SetOwner(kTRUE);
2961 TIter next(reqWorkers);
2964 while (( ni = dynamic_cast<TProofNodeInfo *>(next()) )) {
2967 fullOrd.Form(
"%s.%s", gProofServ->GetOrdinal(), ni->GetOrdinal().Data());
2969 TIter nextInner(fSlaves);
2971 Bool_t found = kFALSE;
2972 while (( sl = dynamic_cast<TSlave *>(nextInner()) )) {
2973 if ( strcmp(sl->GetOrdinal(), fullOrd.Data()) == 0 ) {
2979 if (found)
delete ni;
2981 newWorkers->Add(ni);
2983 Info("PollForNewWorkers", "New worker found: %s:%s",
2984 ni->GetNodeName().Data(), fullOrd.Data());
2990 Int_t nNewWorkers = newWorkers->GetEntries();
2993 if (nNewWorkers > 0) {
2995 Info("PollForNewWorkers", "Requesting to add %d new worker(s)", newWorkers->GetEntries());
2996 Int_t rv = AddWorkers(newWorkers);
2998 Error(
"PollForNewWorkers",
"Call to AddWorkers() failed (got %d < 0)", rv);
3005 Info("PollForNewWorkers", "No new worker found");
3015 void TProof::CleanGDirectory(TList *ol)
3021 gDirectory->RecursiveRemove(o);
3029 Int_t TProof::CollectInputFrom(TSocket *s, Int_t endtype, Bool_t deactonfail)
3034 if ((recvrc = s->Recv(mess)) < 0) {
3036 Info("CollectInputFrom","%p: got %d from Recv()", s, recvrc);
3040 if (fCurrentMonitor) fCurrentMonitor->Remove(s);
3041 if (s->Reconnect() == 0) {
3042 if (fCurrentMonitor) fCurrentMonitor->Add(s);
3047 MarkBad(s,
"problems receiving a message in TProof::CollectInputFrom(...)");
3053 MarkBad(s,
"undefined message in TProof::CollectInputFrom(...)");
3058 Int_t what = mess->What();
3059 TSlave *sl = FindSlave(s);
3060 rc = HandleInputMessage(sl, mess, deactonfail);
3061 if (rc == 1 && (endtype >= 0) && (what != endtype))
3074 Int_t TProof::HandleInputMessage(TSlave *sl, TMessage *mess, Bool_t deactonfail)
3081 Warning(
"HandleInputMessage",
"given an empty message or undefined worker");
3084 Bool_t delete_mess = kTRUE;
3085 TSocket *s = sl->GetSocket();
3087 Warning(
"HandleInputMessage",
"worker socket is undefined");
3092 Int_t what = mess->What();
3095 Info("HandleInputMessage", "got type %d from '%s'", what, sl->GetOrdinal());
3101 fRecvMessages->Add(mess);
3102 delete_mess = kFALSE;
3106 if (fPlayer) fPlayer->HandleRecvHisto(mess);
3111 if ((mess->BufferSize() > mess->Length()))
3114 MarkBad(s,
"received kPROOF_FATAL");
3119 if (fProgressDialogStarted) {
3121 Emit(
"StopProcess(Bool_t)", kTRUE);
3127 Info(
"HandleInputMessage",
"received kPROOF_STOP from %s: disabling any further collection this worker",
3132 case kPROOF_GETTREEHEADER:
3134 fRecvMessages->Add(mess);
3135 delete_mess = kFALSE;
3146 case kPROOF_GETOBJECT:
3148 mess->ReadString(str,
sizeof(str));
3149 obj = gDirectory->Get(str);
3153 s->Send(kMESS_NOTOK);
3156 case kPROOF_GETPACKET:
3159 Info("HandleInputMessage","%s: kPROOF_GETPACKET", sl->GetOrdinal());
3160 TDSetElement *elem = 0;
3161 elem = fPlayer ? fPlayer->GetNextPacket(sl, mess) : 0;
3163 if (elem != (TDSetElement*) -1) {
3164 TMessage answ(kPROOF_GETPACKET);
3168 while (fWaitingSlaves != 0 && fWaitingSlaves->GetSize()) {
3169 TPair *p = (TPair*) fWaitingSlaves->First();
3170 s = (TSocket*) p->Key();
3171 TMessage *m = (TMessage*) p->Value();
3173 elem = fPlayer ? fPlayer->GetNextPacket(sl, m) : 0;
3174 if (elem != (TDSetElement*) -1) {
3175 TMessage a(kPROOF_GETPACKET);
3181 fWaitingSlaves->Remove(fWaitingSlaves->FirstLink());
3189 if (fWaitingSlaves == 0) fWaitingSlaves =
new TList;
3190 fWaitingSlaves->Add(
new TPair(s, mess));
3191 delete_mess = kFALSE;
3196 case kPROOF_LOGFILE:
3201 Info("HandleInputMessage","%s: kPROOF_LOGFILE: size: %d", sl->GetOrdinal(), size);
3202 RecvLogFile(s, size);
3206 case kPROOF_LOGDONE:
3207 (*mess) >> sl->fStatus >> sl->fParallel;
3209 Info("HandleInputMessage","%s: kPROOF_LOGDONE: status %d parallel %d",
3210 sl->GetOrdinal(), sl->fStatus, sl->fParallel);
3211 if (sl->fStatus != 0) {
3213 fStatus = sl->fStatus;
3215 if (deactonfail) DeactivateWorker(sl->fOrdinal);
3218 if (fWrksOutputReady && fWrksOutputReady->FindObject(sl)) {
3219 sl->ResetBit(TSlave::kOutputRequested);
3220 fWrksOutputReady->Remove(sl);
3225 case kPROOF_GETSTATS:
3227 (*mess) >> sl->fBytesRead >> sl->fRealTime >> sl->fCpuTime
3228 >> sl->fWorkDir >> sl->fProofWorkDir;
3230 Info("HandleInputMessage", "kPROOF_GETSTATS: %s", sl->fWorkDir.Data());
3232 if ((mess->BufferSize() > mess->Length()))
3236 if (sl->fImage.IsNull())
3237 sl->fImage.Form(
"%s:%s", TUrl(sl->fName).GetHostFQDN(),
3238 sl->fProofWorkDir.Data());
3243 Info("HandleInputMessage",
3244 "kPROOF_GETSTATS:%s image: %s", sl->GetOrdinal(), sl->GetImage());
3246 fBytesRead += sl->fBytesRead;
3247 fRealTime += sl->fRealTime;
3248 fCpuTime += sl->fCpuTime;
3253 case kPROOF_GETPARALLEL:
3255 Bool_t async = kFALSE;
3256 (*mess) >> sl->fParallel;
3257 if ((mess->BufferSize() > mess->Length()))
3259 rc = (async) ? 0 : 1;
3263 case kPROOF_CHECKFILE:
3265 if ((mess->BufferSize() > mess->Length())) {
3266 (*mess) >> fCheckFileStatus;
3270 fCheckFileStatus = 1;
3276 case kPROOF_SENDFILE:
3282 case kPROOF_PACKAGE_LIST:
3284 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PACKAGE_LIST: enter");
3288 case TProof::kListEnabledPackages:
3289 SafeDelete(fEnabledPackages);
3290 fEnabledPackages = (TList *) mess->ReadObject(TList::Class());
3291 if (fEnabledPackages) {
3292 fEnabledPackages->SetOwner();
3294 Error(
"HandleInputMessage",
3295 "kPROOF_PACKAGE_LIST: kListEnabledPackages: TList not found in message!");
3298 case TProof::kListPackages:
3299 SafeDelete(fAvailablePackages);
3300 fAvailablePackages = (TList *) mess->ReadObject(TList::Class());
3301 if (fAvailablePackages) {
3302 fAvailablePackages->SetOwner();
3304 Error(
"HandleInputMessage",
3305 "kPROOF_PACKAGE_LIST: kListPackages: TList not found in message!");
3309 Error(
"HandleInputMessage",
"kPROOF_PACKAGE_LIST: unknown type: %d", type);
3314 case kPROOF_SENDOUTPUT:
3317 fPlayer->SetMerging();
3320 sl->ResetBit(TSlave::kOutputRequested);
3322 Info("HandleInputMessage","kPROOF_SENDOUTPUT: enter (%s)", sl->GetOrdinal());
3324 if (!fWrksOutputReady) {
3325 fWrksOutputReady =
new TList;
3326 fWrksOutputReady->SetOwner(kFALSE);
3328 fWrksOutputReady->Add(sl);
3332 case kPROOF_OUTPUTOBJECT:
3335 fPlayer->SetMerging();
3338 Info("HandleInputMessage","kPROOF_OUTPUTOBJECT: enter");
3340 const
char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
3341 if (!TestBit(TProof::kIsClient) && !fMergersSet && !fFinalizationRunning) {
3342 Info(
"HandleInputMessage",
"finalization on %s started ...", prefix);
3343 fFinalizationRunning = kTRUE;
3346 while ((mess->BufferSize() > mess->Length())) {
3353 (TQueryResult *) mess->ReadObject(TQueryResult::Class());
3356 fPlayer->AddQueryResult(pq);
3357 fPlayer->SetCurrentQuery(pq);
3359 if (fPlayer->GetOutputList())
3360 fPlayer->GetOutputList()->Clear();
3363 TString qid = TString::Format(
"%s:%s",pq->GetTitle(),pq->GetName());
3364 if (fPlayer->GetInputList()->FindObject(
"PROOF_QueryTag"))
3365 fPlayer->GetInputList()->Remove(fPlayer->GetInputList()->FindObject(
"PROOF_QueryTag"));
3366 fPlayer->AddInput(
new TNamed(
"PROOF_QueryTag", qid.Data()));
3368 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: query result missing");
3370 }
else if (type > 0) {
3372 TObject *o = mess->ReadObject(TObject::Class());
3374 fMergePrg.IncreaseIdx();
3376 Bool_t changed = kFALSE;
3377 msg.Form(
"%s: merging output objects ... %s", prefix, fMergePrg.Export(changed));
3379 gProofServ->SendAsynMessage(msg.Data(), kFALSE);
3380 }
else if (IsTty() || changed) {
3381 fprintf(stderr,
"%s\r", msg.Data());
3384 if ((fPlayer->AddOutputObject(o) == 1)) {
3390 fMergePrg.DecreaseNWrks();
3391 if (TestBit(TProof::kIsClient) && !IsLite()) {
3393 TQueryResult *pq = fPlayer->GetCurrentQuery();
3395 pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
3398 TIter nxin(fPlayer->GetInputList());
3400 if (!pq->GetInputList()) pq->SetInputList(
new TList());
3401 while ((xo = nxin()))
3402 if (!pq->GetInputList()->FindObject(xo->GetName()))
3403 pq->AddInput(xo->Clone());
3405 QueryResultReady(TString::Format(
"%s:%s", pq->GetTitle(), pq->GetName()));
3413 Warning(
"HandleInputMessage",
"kPROOF_OUTPUTOBJECT: player undefined!");
3419 case kPROOF_OUTPUTLIST:
3424 Info("HandleInputMessage","%s: kPROOF_OUTPUTLIST: enter", sl->GetOrdinal());
3427 fPlayer->SetMerging();
3428 if (TestBit(TProof::kIsMaster) || fProtocol < 7) {
3429 out = (TList *) mess->ReadObject(TList::Class());
3432 (TQueryResult *) mess->ReadObject(TQueryResult::Class());
3435 fPlayer->AddQueryResult(pq);
3436 fPlayer->SetCurrentQuery(pq);
3439 out = pq->GetOutputList();
3440 CleanGDirectory(out);
3441 out = (TList *) out->Clone();
3443 QueryResultReady(TString::Format(
"%s:%s", pq->GetTitle(), pq->GetName()));
3446 Info("HandleInputMessage",
3447 "%s: kPROOF_OUTPUTLIST: query result missing", sl->GetOrdinal());
3452 fPlayer->AddOutput(out);
3456 Info("HandleInputMessage",
3457 "%s: kPROOF_OUTPUTLIST: outputlist is empty", sl->GetOrdinal());
3460 Warning(
"HandleInputMessage",
3461 "%s: kPROOF_OUTPUTLIST: player undefined!", sl->GetOrdinal());
3464 if (TestBit(TProof::kIsClient) && !IsLite())
3469 case kPROOF_QUERYLIST:
3471 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYLIST: enter");
3472 (*mess) >> fOtherQueries >> fDrawQueries;
3478 fQueries = (TList *) mess->ReadObject(TList::Class());
3482 case kPROOF_RETRIEVE:
3484 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_RETRIEVE: enter");
3486 (TQueryResult *) mess->ReadObject(TQueryResult::Class());
3487 if (pq && fPlayer) {
3488 fPlayer->AddQueryResult(pq);
3490 QueryResultReady(TString::Format(
"%s:%s", pq->GetTitle(), pq->GetName()));
3493 Info("HandleInputMessage",
3494 "kPROOF_RETRIEVE: query result missing or player undefined");
3499 case kPROOF_MAXQUERIES:
3501 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MAXQUERIES: enter");
3505 Printf("Number of queries fully kept remotely: %d", max);
3509 case kPROOF_SERVERSTARTED:
3511 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SERVERSTARTED: enter");
3513 UInt_t tot = 0, done = 0;
3517 (*mess) >> action >> tot >> done >> st;
3519 if (TestBit(TProof::kIsClient)) {
3521 TString type = (action.Contains(
"submas")) ?
"submasters"
3523 Int_t frac = (Int_t) (done*100.)/tot;
3524 char msg[512] = {0};
3526 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3527 action.Data(),tot, type.Data());
3529 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3530 action.Data(), done, tot, frac);
3533 fprintf(stderr,
"%s", msg);
3535 NotifyLogMsg(msg, 0);
3538 StartupMessage(action.Data(), st, (Int_t)done, (Int_t)tot);
3542 TMessage m(kPROOF_SERVERSTARTED);
3543 m << action << tot << done << st;
3544 gProofServ->GetSocket()->Send(m);
3549 case kPROOF_DATASET_STATUS:
3551 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_DATASET_STATUS: enter");
3553 UInt_t tot = 0, done = 0;
3557 (*mess) >> action >> tot >> done >> st;
3559 if (TestBit(TProof::kIsClient)) {
3561 TString type =
"files";
3562 Int_t frac = (Int_t) (done*100.)/tot;
3563 char msg[512] = {0};
3565 snprintf(msg, 512,
"%s: OK (%d %s) \n",
3566 action.Data(),tot, type.Data());
3568 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
3569 action.Data(), done, tot, frac);
3572 fprintf(stderr,
"%s", msg);
3574 NotifyLogMsg(msg, 0);
3577 DataSetStatus(action.Data(), st, (Int_t)done, (Int_t)tot);
3581 TMessage m(kPROOF_DATASET_STATUS);
3582 m << action << tot << done << st;
3583 gProofServ->GetSocket()->Send(m);
3588 case kPROOF_STARTPROCESS:
3590 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STARTPROCESS: enter");
3596 fIsWaiting = kFALSE;
3600 fRedirLog = (fSync) ? fRedirLog : kTRUE;
3605 if (!TestBit(TProof::kIsMaster)) {
3609 fPrepTime = fQuerySTW.RealTime();
3610 PDB(kGlobal,2) Info("HandleInputMessage","Preparation time: %f s", fPrepTime);
3614 Long64_t first = -1, nent = -1;
3615 (*mess) >> selec >> dsz >> first >> nent;
3617 if (!gROOT->IsBatch()) {
3618 if (fProgressDialog &&
3619 !TestBit(kUsingSessionGui) && TestBit(kUseProgressDialog)) {
3620 if (!fProgressDialogStarted) {
3621 fProgressDialog->ExecPlugin(5,
this,
3622 selec.Data(), dsz, first, nent);
3623 fProgressDialogStarted = kTRUE;
3625 ResetProgressDialog(selec, dsz, first, nent);
3628 ResetBit(kUsingSessionGui);
3634 case kPROOF_ENDINIT:
3636 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_ENDINIT: enter");
3638 if (TestBit(TProof::kIsMaster)) {
3640 fPlayer->SetInitTime();
3645 case kPROOF_SETIDLE:
3648 Info("HandleInputMessage","kPROOF_SETIDLE from '%s': enter (%d)", sl->GetOrdinal(), fNotIdle);
3655 Info("HandleInputMessage", "%s: got kPROOF_SETIDLE", sl->GetOrdinal());
3657 Warning(
"HandleInputMessage",
3658 "%s: got kPROOF_SETIDLE but no running workers ! protocol error?",
3664 if ((mess->BufferSize() > mess->Length()))
3665 (*mess) >> fIsWaiting;
3670 case kPROOF_QUERYSUBMITTED:
3672 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_QUERYSUBMITTED: enter");
3676 Bool_t sync = fSync;
3677 if ((mess->BufferSize() > mess->Length()))
3679 if (sync != fSync && fSync) {
3695 case kPROOF_SESSIONTAG:
3697 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_SESSIONTAG: enter");
3704 sl->SetSessionTag(stag);
3706 if ((mess->BufferSize() > mess->Length()))
3709 if ((mess->BufferSize() > mess->Length())) {
3712 if (!usr.IsNull()) fUrl.SetUser(usr.Data());
3717 case kPROOF_FEEDBACK:
3720 Info("HandleInputMessage","kPROOF_FEEDBACK: enter");
3721 TList *out = (TList *) mess->ReadObject(TList::Class());
3724 fPlayer->StoreFeedback(sl, out);
3731 case kPROOF_AUTOBIN:
3733 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_AUTOBIN: enter");
3736 Double_t xmin, xmax, ymin, ymax, zmin, zmax;
3738 (*mess) >> name >> xmin >> xmax >> ymin >> ymax >> zmin >> zmax;
3740 if (fPlayer) fPlayer->UpdateAutoBin(name,xmin,xmax,ymin,ymax,zmin,zmax);
3742 TMessage answ(kPROOF_AUTOBIN);
3744 answ << name << xmin << xmax << ymin << ymax << zmin << zmax;
3750 case kPROOF_PROGRESS:
3752 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_PROGRESS: enter");
3754 if (GetRemoteProtocol() > 25) {
3756 TProofProgressInfo *pi = 0;
3758 fPlayer->Progress(sl,pi);
3759 }
else if (GetRemoteProtocol() > 11) {
3760 Long64_t total, processed, bytesread;
3761 Float_t initTime, procTime, evtrti, mbrti;
3762 (*mess) >> total >> processed >> bytesread
3763 >> initTime >> procTime
3766 fPlayer->Progress(sl, total, processed, bytesread,
3767 initTime, procTime, evtrti, mbrti);
3771 Long64_t total, processed;
3772 (*mess) >> total >> processed;
3774 fPlayer->Progress(sl, total, processed);
3779 case kPROOF_STOPPROCESS:
3786 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_STOPPROCESS: enter");
3788 Long64_t events = 0;
3789 Bool_t abort = kFALSE;
3790 TProofProgressStatus *status = 0;
3792 if ((mess->BufferSize() > mess->Length()) && (fProtocol > 18)) {
3793 (*mess) >> status >> abort;
3794 }
else if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8)) {
3795 (*mess) >> events >> abort;
3800 if (fProtocol > 18) {
3801 TList *listOfMissingFiles = 0;
3802 if (!(listOfMissingFiles = (TList *)GetOutput(
"MissingFiles"))) {
3803 listOfMissingFiles =
new TList();
3804 listOfMissingFiles->SetName(
"MissingFiles");
3806 fPlayer->AddOutputObject(listOfMissingFiles);
3808 if (fPlayer->GetPacketizer()) {
3810 fPlayer->GetPacketizer()->AddProcessed(sl, status, 0, &listOfMissingFiles);
3812 fPlayer->GetPacketizer()->MarkBad(sl, status, &listOfMissingFiles);
3816 if (status) fPlayer->AddEventsProcessed(status->GetEntries());
3818 fPlayer->AddEventsProcessed(events);
3822 if (!TestBit(TProof::kIsMaster))
3823 Emit(
"StopProcess(Bool_t)", abort);
3827 case kPROOF_SUBMERGER:
3829 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_SUBMERGER: enter");
3830 HandleSubmerger(mess, sl);
3834 case kPROOF_GETSLAVEINFO:
3836 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_GETSLAVEINFO: enter");
3838 Bool_t active = (GetListOfActiveSlaves()->FindObject(sl) != 0);
3839 Bool_t bad = (GetListOfBadSlaves()->FindObject(sl) != 0);
3843 Error(
"HandleInputMessage",
"kPROOF_GETSLAVEINFO: no list received!");
3845 tmpinfo->SetOwner(kFALSE);
3846 Int_t nentries = tmpinfo->GetSize();
3847 for (Int_t i=0; i<nentries; i++) {
3848 TSlaveInfo* slinfo =
3849 dynamic_cast<TSlaveInfo*
>(tmpinfo->At(i));
3852 if (IsLite()) slinfo->fHostName = gSystem->HostName();
3854 TIter nxw(fSlaveInfo);
3855 TSlaveInfo *ourwi = 0;
3856 while ((ourwi = (TSlaveInfo *)nxw())) {
3857 if (!strcmp(ourwi->GetOrdinal(), slinfo->GetOrdinal())) {
3858 ourwi->SetSysInfo(slinfo->GetSysInfo());
3859 ourwi->fHostName = slinfo->GetName();
3860 if (slinfo->GetDataDir() && (strlen(slinfo->GetDataDir()) > 0))
3861 ourwi->fDataDir = slinfo->GetDataDir();
3866 fSlaveInfo->Add(slinfo);
3870 if (slinfo->fStatus != TSlaveInfo::kBad) {
3871 if (!active) slinfo->SetStatus(TSlaveInfo::kNotActive);
3872 if (bad) slinfo->SetStatus(TSlaveInfo::kBad);
3874 if (sl->GetMsd() && (strlen(sl->GetMsd()) > 0))
3875 slinfo->fMsd = sl->GetMsd();
3884 case kPROOF_VALIDATE_DSET:
3887 Info("HandleInputMessage", "kPROOF_VALIDATE_DSET: enter");
3891 Error("HandleInputMessage", "kPROOF_VALIDATE_DSET: fDSet not set");
3893 fDSet->Validate(dset);
3898 case kPROOF_DATA_READY:
3900 PDB(kGlobal,2) Info("HandleInputMessage", "kPROOF_DATA_READY: enter");
3901 Bool_t dataready = kFALSE;
3902 Long64_t totalbytes, bytesready;
3903 (*mess) >> dataready >> totalbytes >> bytesready;
3904 fTotalBytes += totalbytes;
3905 fBytesReady += bytesready;
3906 if (dataready == kFALSE) fDataReady = dataready;
3914 case kPROOF_MESSAGE:
3916 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_MESSAGE: enter");
3921 Bool_t lfeed = kTRUE;
3922 if ((mess->BufferSize() > mess->Length()))
3925 if (TestBit(TProof::kIsClient)) {
3929 fprintf(stderr,
"%s%c", msg.Data(), (lfeed ?
'\n' :
'\r'));
3932 NotifyLogMsg(msg, (lfeed ?
"\n" :
"\r"));
3937 fprintf(stderr,
"%s%c", msg.Data(), (lfeed ?
'\n' :
'\r'));
3940 gProofServ->FlushLogFile();
3943 gProofServ->SendAsynMessage(msg, lfeed);
3949 case kPROOF_VERSARCHCOMP:
3953 PDB(kGlobal,2) Info("HandleInputMessage","kPROOF_VERSARCHCOMP: %s", vac.Data());
3955 TString vers, archcomp;
3956 if (vac.Tokenize(vers, from, "|"))
3957 vac.Tokenize(archcomp, from, "|");
3958 sl->SetArchCompiler(archcomp);
3959 vers.ReplaceAll(":","|");
3960 sl->SetROOTVersion(vers);
3966 Error(
"HandleInputMessage",
"unknown command received from '%s' (what = %d)",
3967 sl->GetOrdinal(), what);
3983 void TProof::HandleSubmerger(TMessage *mess, TSlave *sl)
3988 TSocket *s = sl->GetSocket();
3993 if (IsEndMaster()) {
3994 Int_t merger_id = -1;
3995 (*mess) >> merger_id;
3998 Info("HandleSubmerger", "kOutputSent: Worker %s:%d:%s had sent its output to merger
#%d",
3999 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), merger_id);
4001 if (!fMergers || fMergers->GetSize() <= merger_id) {
4002 Error(
"HandleSubmerger",
"kOutputSize: #%d not in list ", merger_id);
4005 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4006 mi->SetMergedWorker();
4007 if (mi->AreAllWorkersMerged()) {
4009 if (GetActiveMergersCount() == 0) {
4012 fMergersSet = kFALSE;
4014 fLastAssignedMerger = 0;
4015 PDB(kSubmerger, 2) Info("HandleSubmerger", "all mergers removed ... ");
4019 PDB(kSubmerger, 2) Error("HandleSubmerger","kOutputSent: received not on endmaster!");
4026 Int_t merger_id = -1;
4027 (*mess) >> merger_id;
4029 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:
#%d ", merger_id);
4031 if (!fMergers || fMergers->GetSize() <= merger_id) {
4032 Error(
"HandleSubmerger",
"kMergerDown: #%d not in list ", merger_id);
4036 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4037 if (!mi->IsActive()) {
4044 TMessage stop(kPROOF_SUBMERGER);
4045 stop << Int_t(kStopMerging);
4050 AskForOutput(mi->GetMerger());
4053 TIter nxo(mi->GetWorkers());
4055 while ((o = nxo())) {
4056 AskForOutput((TSlave *)o);
4058 PDB(kSubmerger, 2) Info("HandleSubmerger", "kMergerDown:%d: exit", merger_id);
4064 if (IsEndMaster()) {
4066 Info("HandleSubmerger", "worker %s reported as finished ", sl->GetOrdinal());
4068 const
char *prefix = gProofServ ? gProofServ->GetPrefix() : "Lite-0";
4069 if (!fFinalizationRunning) {
4070 Info(
"HandleSubmerger",
"finalization on %s started ...", prefix);
4071 fFinalizationRunning = kTRUE;
4074 Int_t output_size = 0;
4075 Int_t merging_port = 0;
4076 (*mess) >> output_size >> merging_port;
4078 PDB(kSubmerger, 2) Info("HandleSubmerger",
4079 "kOutputSize: Worker %s:%d:%s reports %d output objects (+ available port %d)",
4080 sl->GetName(), sl->GetPort(), sl->GetOrdinal(), output_size, merging_port);
4084 Int_t activeWorkers = fCurrentMonitor ? fCurrentMonitor->GetActive() : GetNumberOfActiveSlaves();
4088 TParameter<Int_t> *mc =
dynamic_cast<TParameter<Int_t> *
>(GetParameter(
"PROOF_UseMergers"));
4089 if (mc) fMergersCount = mc->GetVal();
4090 TParameter<Int_t> *mh =
dynamic_cast<TParameter<Int_t> *
>(GetParameter(
"PROOF_MergersByHost"));
4091 if (mh) fMergersByHost = (mh->GetVal() != 0) ? kTRUE : kFALSE;
4094 if (fMergersCount < 0 || (fMergersCount > (activeWorkers/2) )) {
4095 msg.Form(
"%s: Invalid request: cannot start %d mergers for %d workers",
4096 prefix, fMergersCount, activeWorkers);
4098 gProofServ->SendAsynMessage(msg);
4100 Printf(
"%s",msg.Data());
4104 if ((fMergersCount == 0) && (!fMergersByHost)) {
4105 if (activeWorkers > 1) {
4106 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4107 if (activeWorkers / fMergersCount < 2)
4108 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4110 if (fMergersCount > 1)
4111 msg.Form(
"%s: Number of mergers set dynamically to %d (for %d workers)",
4112 prefix, fMergersCount, activeWorkers);
4114 msg.Form(
"%s: No mergers will be used for %d workers",
4115 prefix, activeWorkers);
4119 gProofServ->SendAsynMessage(msg);
4121 Printf(
"%s",msg.Data());
4122 }
else if (fMergersByHost) {
4124 if (activeWorkers > 1) {
4127 TIter nxwk(fSlaves);
4129 while ((wrk = nxwk())) {
4130 if (!hosts.FindObject(wrk->GetName())) {
4131 hosts.Add(
new TObjString(wrk->GetName()));
4136 if (fMergersCount > 1)
4137 msg.Form(
"%s: Number of mergers set to %d (for %d workers), one for each slave host",
4138 prefix, fMergersCount, activeWorkers);
4140 msg.Form(
"%s: No mergers will be used for %d workers",
4141 prefix, activeWorkers);
4145 gProofServ->SendAsynMessage(msg);
4147 Printf(
"%s",msg.Data());
4149 msg.Form(
"%s: Number of mergers set by user to %d (for %d workers)",
4150 prefix, fMergersCount, activeWorkers);
4152 gProofServ->SendAsynMessage(msg);
4154 Printf(
"%s",msg.Data());
4159 fPlayer->SetMerging(kTRUE);
4162 fMergePrg.SetNWrks(fMergersCount);
4164 if (fMergersCount > 0) {
4166 fMergers =
new TList();
4167 fLastAssignedMerger = 0;
4169 fWorkersToMerge = (activeWorkers - fMergersCount);
4171 if (!CreateMerger(sl, merging_port)) {
4177 if (IsLite()) fMergePrg.SetNWrks(fMergersCount);
4181 fMergersSet = kTRUE;
4184 if (fMergersCount == -1) {
4188 if ((fRedirectNext > 0 ) && (!fMergersByHost)) {
4189 RedirectWorker(s, sl, output_size);
4192 Bool_t newMerger = kTRUE;
4193 if (fMergersByHost) {
4194 TIter nxmg(fMergers);
4195 TMergerInfo *mgi = 0;
4196 while ((mgi = (TMergerInfo *) nxmg())) {
4197 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4203 if ((fMergersCount > fMergers->GetSize()) && newMerger) {
4205 if (!CreateMerger(sl, merging_port)) {
4212 RedirectWorker(s, sl, output_size);
4217 Error(
"HandleSubMerger",
"kOutputSize received not on endmaster!");
4227 void TProof::RedirectWorker(TSocket *s, TSlave * sl, Int_t output_size)
4229 Int_t merger_id = -1;
4231 if (fMergersByHost) {
4232 for (Int_t i = 0; i < fMergers->GetSize(); i++) {
4233 TMergerInfo *mgi = (TMergerInfo *)fMergers->At(i);
4234 if (!strcmp(sl->GetName(), mgi->GetMerger()->GetName())) {
4240 merger_id = FindNextFreeMerger();
4243 if (merger_id == -1) {
4247 TMessage sendoutput(kPROOF_SUBMERGER);
4248 sendoutput << Int_t(kSendOutput);
4250 Info("RedirectWorker", "redirecting worker %s to merger %d", sl->GetOrdinal(), merger_id);
4252 PDB(kSubmerger, 2) Info("RedirectWorker", "redirecting output to merger
#%d", merger_id);
4253 if (!fMergers || fMergers->GetSize() <= merger_id) {
4254 Error(
"RedirectWorker",
"#%d not in list ", merger_id);
4257 TMergerInfo * mi = (TMergerInfo *) fMergers->At(merger_id);
4259 TString hname = (IsLite()) ?
"localhost" : mi->GetMerger()->GetName();
4260 sendoutput << merger_id;
4261 sendoutput << hname;
4262 sendoutput << mi->GetPort();
4263 s->Send(sendoutput);
4264 mi->AddMergedObjects(output_size);
4273 Int_t TProof::FindNextFreeMerger()
4275 while (fLastAssignedMerger < fMergers->GetSize() &&
4276 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4277 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4278 fLastAssignedMerger++;
4281 if (fLastAssignedMerger == fMergers->GetSize()) {
4282 fLastAssignedMerger = 0;
4284 return fLastAssignedMerger++;
4287 while (fLastAssignedMerger < fMergers->GetSize() &&
4288 (!((TMergerInfo*)fMergers->At(fLastAssignedMerger))->IsActive() ||
4289 ((TMergerInfo*)fMergers->At(fLastAssignedMerger))->AreAllWorkersAssigned())) {
4290 fLastAssignedMerger++;
4293 if (fLastAssignedMerger == fMergers->GetSize()) {
4296 return fLastAssignedMerger++;
4303 void TProof::AskForOutput(TSlave *sl)
4305 TMessage sendoutput(kPROOF_SUBMERGER);
4306 sendoutput << Int_t(kSendOutput);
4308 PDB(kSubmerger, 2) Info("AskForOutput",
4309 "worker %s was asked to send its output to master",
4313 sendoutput << TString("master");
4315 sl->GetSocket()->Send(sendoutput);
4316 if (IsLite()) fMergePrg.IncreaseNWrks();
4322 void TProof::UpdateDialog()
4324 if (!fPlayer)
return;
4327 if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
4329 Info(
"UpdateDialog",
4330 "processing was aborted - %lld events processed",
4331 fPlayer->GetEventsProcessed());
4333 if (GetRemoteProtocol() > 11) {
4335 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4337 Progress(-1, fPlayer->GetEventsProcessed());
4339 Emit(
"StopProcess(Bool_t)", kTRUE);
4343 if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kStopped) {
4345 Info(
"UpdateDialog",
4346 "processing was stopped - %lld events processed",
4347 fPlayer->GetEventsProcessed());
4349 if (GetRemoteProtocol() > 25) {
4351 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1., -1, -1, -1.);
4352 }
else if (GetRemoteProtocol() > 11) {
4353 Progress(-1, fPlayer->GetEventsProcessed(), -1, -1., -1., -1., -1.);
4355 Progress(-1, fPlayer->GetEventsProcessed());
4357 Emit(
"StopProcess(Bool_t)", kFALSE);
4361 if (GetRemoteProtocol() > 25) {
4363 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
4364 10, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),(Float_t)(-1.),(Float_t)(-1.),
4365 (Float_t)(-1.),(Float_t)(-1.),(Int_t)(-1),(Int_t)(-1),(Float_t)(-1.));
4366 }
else if (GetRemoteProtocol() > 11) {
4368 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
4369 7, (Long64_t)(-1), (Long64_t)(-1), (Long64_t)(-1),
4370 (Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.),(Float_t)(-1.));
4372 EmitVA(
"Progress(Long64_t,Long64_t)", 2, (Long64_t)(-1), (Long64_t)(-1));
4379 void TProof::ActivateAsyncInput()
4381 TIter next(fSlaves);
4384 while ((sl = (TSlave*) next()))
4385 if (sl->GetInputHandler())
4386 sl->GetInputHandler()->Add();
4392 void TProof::DeActivateAsyncInput()
4394 TIter next(fSlaves);
4397 while ((sl = (TSlave*) next()))
4398 if (sl->GetInputHandler())
4399 sl->GetInputHandler()->Remove();
4405 Int_t TProof::GetActiveMergersCount()
4407 if (!fMergers)
return 0;
4409 Int_t active_mergers = 0;
4411 TIter mergers(fMergers);
4412 TMergerInfo *mi = 0;
4413 while ((mi = (TMergerInfo *)mergers())) {
4414 if (mi->IsActive()) active_mergers++;
4417 return active_mergers;
4423 Bool_t TProof::CreateMerger(TSlave *sl, Int_t port)
4426 Info("CreateMerger", "worker %s will be merger ", sl->GetOrdinal());
4428 PDB(kSubmerger, 2) Info("CreateMerger","Begin");
4432 Info("CreateMerger", "cannot create merger on port %d - exit", port);
4437 if (!fMergersByHost) {
4438 Int_t mergersToCreate = fMergersCount - fMergers->GetSize();
4440 Int_t rest = fWorkersToMerge % mergersToCreate;
4442 if (rest > 0 && fMergers->GetSize() < rest) {
4447 workers = (fWorkersToMerge / mergersToCreate) + rest;
4449 Int_t workersOnHost = 0;
4450 for (Int_t i = 0; i < fActiveSlaves->GetSize(); i++) {
4451 if(!strcmp(sl->GetName(), fActiveSlaves->At(i)->GetName())) workersOnHost++;
4453 workers = workersOnHost - 1;
4457 msg.Form(
"worker %s on host %s will be merger for %d additional workers", sl->GetOrdinal(), sl->GetName(), workers);
4460 gProofServ->SendAsynMessage(msg);
4462 Printf(
"%s",msg.Data());
4464 TMergerInfo * merger =
new TMergerInfo(sl, port, workers);
4466 TMessage bemerger(kPROOF_SUBMERGER);
4467 bemerger << Int_t(kBeMerger);
4468 bemerger << fMergers->GetSize();
4469 bemerger << workers;
4470 sl->GetSocket()->Send(bemerger);
4472 PDB(kSubmerger,2) Info("CreateMerger",
4473 "merger
#%d (port: %d) for %d workers started",
4474 fMergers->GetSize(), port, workers);
4476 fMergers->Add(merger);
4477 fWorkersToMerge = fWorkersToMerge - workers;
4479 fRedirectNext = workers / 2;
4481 PDB(kSubmerger, 2) Info("CreateMerger", "exit");
4490 void TProof::MarkBad(TSlave *wrk, const
char *reason)
4492 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4495 if (!IsValid())
return;
4498 Error(
"MarkBad",
"worker instance undefined: protocol error? ");
4503 static TString thisurl;
4504 if (thisurl.IsNull()) {
4506 Int_t port = gEnv->GetValue(
"ProofServ.XpdPort",-1);
4507 thisurl = TUrl(gSystem->HostName()).GetHostFQDN();
4508 if (port > 0) thisurl += TString::Format(
":%d", port);
4510 thisurl.Form(
"%s@%s:%d", fUrl.GetUser(), fUrl.GetHost(), fUrl.GetPort());
4514 if (!reason || (strcmp(reason, kPROOF_TerminateWorker) && strcmp(reason, kPROOF_WorkerIdleTO))) {
4516 const char *mastertype = (gProofServ && gProofServ->IsTopMaster()) ?
"top master" :
"master";
4517 TString src = IsMaster() ? Form(
"%s at %s", mastertype, thisurl.Data()) :
"local session";
4519 msg.Form(
"\n +++ Message from %s : marking %s:%d (%s) as bad\n +++ Reason: %s",
4520 src.Data(), wrk->GetName(), wrk->GetPort(), wrk->GetOrdinal(),
4521 (reason && strlen(reason)) ? reason :
"unknown");
4522 Info(
"MarkBad",
"%s", msg.Data());
4526 msg += TString::Format(
"\n\n +++ Most likely your code crashed on worker %s at %s:%d.\n",
4527 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4529 msg += TString::Format(
"\n\n +++ Most likely your code crashed\n");
4531 msg += TString::Format(
" +++ Please check the session logs for error messages either using\n");
4532 msg += TString::Format(
" +++ the 'Show logs' button or executing\n");
4533 msg += TString::Format(
" +++\n");
4535 msg += TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4536 "Display(\"%s\",0)\n\n", thisurl.Data(), wrk->GetOrdinal());
4537 gProofServ->SendAsynMessage(msg, kTRUE);
4539 msg += TString::Format(
" +++ root [] TProof::Mgr(\"%s\")->GetSessionLogs()->"
4540 "Display(\"*\")\n\n", thisurl.Data());
4541 Printf(
"%s", msg.Data());
4543 }
else if (reason) {
4544 if (gDebug > 0 && strcmp(reason, kPROOF_WorkerIdleTO)) {
4545 Info(
"MarkBad",
"worker %s at %s:%d asked to terminate",
4546 wrk->GetOrdinal(), wrk->GetName(), wrk->GetPort());
4550 if (IsMaster() && reason) {
4551 if (strcmp(reason, kPROOF_TerminateWorker)) {
4553 TList *listOfMissingFiles = 0;
4554 if (!(listOfMissingFiles = (TList *)GetOutput(
"MissingFiles"))) {
4555 listOfMissingFiles =
new TList();
4556 listOfMissingFiles->SetName(
"MissingFiles");
4558 fPlayer->AddOutputObject(listOfMissingFiles);
4562 TVirtualPacketizer *packetizer = fPlayer ? fPlayer->GetPacketizer() : 0;
4565 packetizer->MarkBad(wrk, 0, &listOfMissingFiles);
4570 TString ord(wrk->GetOrdinal());
4571 Int_t
id = ord.Last(
'.');
4572 if (
id != kNPOS) ord.Remove(0,
id+1);
4573 gProofServ->ReleaseWorker(ord.Data());
4576 }
else if (TestBit(TProof::kIsClient) && reason && !strcmp(reason, kPROOF_WorkerIdleTO)) {
4581 fActiveSlaves->Remove(wrk);
4584 fAllMonitor->Remove(wrk->GetSocket());
4585 fActiveMonitor->Remove(wrk->GetSocket());
4587 fSendGroupView = kTRUE;
4590 if (reason && !strcmp(reason, kPROOF_TerminateWorker)) {
4593 fSlaves->Remove(wrk);
4594 fBadSlaves->Remove(wrk);
4595 fActiveSlaves->Remove(wrk);
4596 fInactiveSlaves->Remove(wrk);
4597 fUniqueSlaves->Remove(wrk);
4598 fAllUniqueSlaves->Remove(wrk);
4599 fNonUniqueMasters->Remove(wrk);
4603 TSlaveInfo *si =
new TSlaveInfo(
4605 Form(
"%s@%s:%d", wrk->GetUser(), wrk->GetName(), wrk->GetPort()),
4606 0,
"", wrk->GetWorkDir());
4607 if (!fTerminatedSlaveInfos->Contains(si)) fTerminatedSlaveInfos->Add(si);
4612 fBadSlaves->Add(wrk);
4613 fActiveSlaves->Remove(wrk);
4614 fUniqueSlaves->Remove(wrk);
4615 fAllUniqueSlaves->Remove(wrk);
4616 fNonUniqueMasters->Remove(wrk);
4617 if (fCurrentMonitor) fCurrentMonitor->DeActivate(wrk->GetSocket());
4621 Int_t mergersCount = -1;
4622 TParameter<Int_t> *mc =
dynamic_cast<TParameter<Int_t> *
>(GetParameter(
"PROOF_UseMergers"));
4623 if (mc) mergersCount = mc->GetVal();
4625 if (mergersCount == 0) {
4626 Int_t activeWorkers = fCurrentMonitor ? fCurrentMonitor->GetActive() : GetNumberOfActiveSlaves();
4627 if (activeWorkers > 1) {
4628 fMergersCount = TMath::Nint(TMath::Sqrt(activeWorkers));
4629 if (activeWorkers / fMergersCount < 2)
4630 fMergersCount = (Int_t) TMath::Sqrt(activeWorkers);
4641 fSlaves->Remove(wrk);
4643 fManager->DiscardSession(
this);
4651 void TProof::MarkBad(TSocket *s,
const char *reason)
4653 std::lock_guard<std::recursive_mutex> lock(fCloseMutex);
4656 if (!IsValid())
return;
4658 TSlave *wrk = FindSlave(s);
4659 MarkBad(wrk, reason);
4665 void TProof::TerminateWorker(TSlave *wrk)
4668 Warning(
"TerminateWorker",
"worker instance undefined: protocol error? ");
4673 if (wrk->GetSocket() && wrk->GetSocket()->IsValid()) {
4674 TMessage mess(kPROOF_STOP);
4675 wrk->GetSocket()->Send(mess);
4678 Info(
"TerminateWorker",
"connection to worker is already down: cannot"
4679 " send termination message");
4683 MarkBad(wrk, kPROOF_TerminateWorker);
4689 void TProof::TerminateWorker(
const char *ord)
4691 if (ord && strlen(ord) > 0) {
4692 Bool_t all = (ord[0] ==
'*') ? kTRUE : kFALSE;
4696 while ((wrk = (TSlave *)nxw())) {
4697 if (all || !strcmp(wrk->GetOrdinal(), ord)) {
4698 TerminateWorker(wrk);
4703 TMessage mess(kPROOF_STOP);
4704 mess << TString(ord);
4713 Int_t TProof::Ping()
4715 return Ping(kActive);
4721 Int_t TProof::Ping(ESlaves list)
4724 if (list == kAll) slaves = fSlaves;
4725 if (list == kActive) slaves = fActiveSlaves;
4726 if (list == kUnique) slaves = fUniqueSlaves;
4727 if (list == kAllUnique) slaves = fAllUniqueSlaves;
4729 if (slaves->GetSize() == 0)
return 0;
4735 while ((sl = (TSlave *)next())) {
4736 if (sl->IsValid()) {
4737 if (sl->Ping() == -1) {
4738 MarkBad(sl,
"ping unsuccessful");
4751 void TProof::Touch()
4753 TList *slaves = fSlaves;
4755 if (slaves->GetSize() == 0)
return;
4760 while ((sl = (TSlave *)next())) {
4761 if (sl->IsValid()) {
4772 void TProof::Print(Option_t *option)
const
4776 if (TestBit(TProof::kIsClient)) {
4777 Printf(
"Connected to: %s (%s)", GetMaster(),
4778 IsValid() ?
"valid" :
"invalid");
4779 Printf(
"Port number: %d", GetPort());
4780 Printf(
"User: %s", GetUser());
4781 Printf(
"ROOT version|rev: %s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4782 Printf(
"Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4783 gSystem->GetBuildCompilerVersion());
4784 TSlave *sl = (TSlave *)fActiveSlaves->First();
4787 if (sl->GetSocket()->GetSecContext())
4788 Printf(
"Security context: %s",
4789 sl->GetSocket()->GetSecContext()->AsString(sc));
4790 Printf(
"Proofd protocol version: %d", sl->GetSocket()->GetRemoteProtocol());
4792 Printf(
"Security context: Error - No connection");
4793 Printf(
"Proofd protocol version: Error - No connection");
4795 Printf(
"Client protocol version: %d", GetClientProtocol());
4796 Printf(
"Remote protocol version: %d", GetRemoteProtocol());
4797 Printf(
"Log level: %d", GetLogLevel());
4798 Printf(
"Session unique tag: %s", IsValid() ? GetSessionTag() :
"");
4799 Printf(
"Default data pool: %s", IsValid() ? GetDataPoolUrl() :
"");
4801 const_cast<TProof*
>(
this)->SendPrint(option);
4803 const_cast<TProof*
>(
this)->AskStatistics();
4805 Printf(
"*** Master server %s (parallel mode, %d workers):",
4806 gProofServ->GetOrdinal(), GetParallel());
4808 Printf(
"*** Master server %s (sequential mode):",
4809 gProofServ->GetOrdinal());
4811 Printf(
"Master host name: %s", gSystem->HostName());
4812 Printf(
"Port number: %d", GetPort());
4813 if (strlen(gProofServ->GetGroup()) > 0) {
4814 Printf(
"User/Group: %s/%s", GetUser(), gProofServ->GetGroup());
4816 Printf(
"User: %s", GetUser());
4819 ver.Form(
"%s|%s", gROOT->GetVersion(), gROOT->GetGitCommit());
4820 if (gSystem->Getenv(
"ROOTVERSIONTAG"))
4821 ver.Form(
"%s|%s", gROOT->GetVersion(), gSystem->Getenv(
"ROOTVERSIONTAG"));
4822 Printf(
"ROOT version|rev|tag: %s", ver.Data());
4823 Printf(
"Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
4824 gSystem->GetBuildCompilerVersion());
4825 Printf(
"Protocol version: %d", GetClientProtocol());
4826 Printf(
"Image name: %s", GetImage());
4827 Printf(
"Working directory: %s", gSystem->WorkingDirectory());
4828 Printf(
"Config directory: %s", GetConfDir());
4829 Printf(
"Config file: %s", GetConfFile());
4830 Printf(
"Log level: %d", GetLogLevel());
4831 Printf(
"Number of workers: %d", GetNumberOfSlaves());
4832 Printf(
"Number of active workers: %d", GetNumberOfActiveSlaves());
4833 Printf(
"Number of unique workers: %d", GetNumberOfUniqueSlaves());
4834 Printf(
"Number of inactive workers: %d", GetNumberOfInactiveSlaves());
4835 Printf(
"Number of bad workers: %d", GetNumberOfBadSlaves());
4836 Printf(
"Total MB's processed: %.2f",
float(GetBytesRead())/(1024*1024));
4837 Printf(
"Total real time used (s): %.3f", GetRealTime());
4838 Printf(
"Total CPU time used (s): %.3f", GetCpuTime());
4839 if (TString(option).Contains(
"a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
4840 Printf(
"List of workers:");
4842 TIter nextslave(fSlaves);
4843 while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
4844 if (!sl->IsValid())
continue;
4846 if (sl->GetSlaveType() == TSlave::kSlave) {
4848 }
else if (sl->GetSlaveType() == TSlave::kMaster) {
4849 TMessage mess(kPROOF_PRINT);
4850 mess.WriteString(option);
4851 if (sl->GetSocket()->Send(mess) == -1)
4852 const_cast<TProof*>(
this)->MarkBad(sl,
"could not send kPROOF_PRINT request");
4856 Error(
"Print",
"TSlave is neither Master nor Worker");
4860 const_cast<TProof*
>(
this)->Collect(&masters, fCollectTimeout);
4906 Int_t TProof::HandleOutputOptions(TString &opt, TString &target, Int_t action)
4908 TString outfile, dsname, stfopt;
4910 TString tagf, tagd, tags, oo;
4911 Ssiz_t from = 0, iof = kNPOS, iod = kNPOS, ios = kNPOS;
4912 while (opt.Tokenize(oo, from,
"[; ]")) {
4913 if (oo.BeginsWith(
"of=")) {
4915 iof = opt.Index(tagf);
4916 }
else if (oo.BeginsWith(
"outfile=")) {
4918 iof = opt.Index(tagf);
4919 }
else if (oo.BeginsWith(
"ds")) {
4921 iod = opt.Index(tagd);
4922 }
else if (oo.BeginsWith(
"dataset")) {
4924 iod = opt.Index(tagd);
4925 }
else if (oo.BeginsWith(
"stf")) {
4927 ios = opt.Index(tags);
4928 }
else if (oo.BeginsWith(
"savetofile")) {
4929 tags =
"savetofile";
4930 ios = opt.Index(tags);
4934 if (iof != kNPOS && iod != kNPOS) {
4935 Error(
"HandleOutputOptions",
"options 'of'/'outfile' and 'ds'/'dataset' are incompatible!");
4941 from = iof + tagf.Length();
4942 if (!opt.Tokenize(outfile, from,
"[; ]") || outfile.IsNull()) {
4943 Error(
"HandleOutputOptions",
"could not extract output file settings string! (%s)", opt.Data());
4951 from = iod + tagd.Length();
4952 if (!opt.Tokenize(dsname, from,
"[; ]"))
4953 if (gDebug > 0) Info(
"HandleOutputOptions",
"no dataset name found: use default");
4957 if (dsname.BeginsWith(
"=")) dsname.Replace(0, 1,
"");
4958 if (dsname.Contains(
"|V")) {
4960 dsname.ReplaceAll(
"|V",
"");
4962 if (dsname.IsNull()) dsname =
"dataset_<qtag>";
4966 from = ios + tags.Length();
4967 if (!opt.Tokenize(stfopt, from,
"[; ]"))
4968 if (gDebug > 0) Info(
"HandleOutputOptions",
"save-to-file not found: use default");
4972 if (!stfopt.IsNull()) {
4973 if (stfopt.BeginsWith(
"=")) stfopt.Replace(0,1,
"");
4974 if (!stfopt.IsNull()) {
4975 if (!stfopt.IsDigit()) {
4976 Error(
"HandleOutputOptions",
"save-to-file option must be a digit! (%s)", stfopt.Data());
4989 opt.ReplaceAll(tagf,
"");
4990 opt.ReplaceAll(tagd,
"");
4991 opt.ReplaceAll(tags,
"");
4997 if (!outfile.IsNull()) {
4998 if (!outfile.BeginsWith(
"master:")) {
4999 if (gSystem->AccessPathName(gSystem->DirName(outfile.Data()), kWritePermission)) {
5000 Warning(
"HandleOutputOptions",
5001 "directory '%s' for the output file does not exists or is not writable:"
5002 " saving to master", gSystem->DirName(outfile.Data()));
5003 outfile.Form(
"master:%s", gSystem->BaseName(outfile.Data()));
5008 if (!stfopt.IsNull()) {
5009 outfile.Form(
"master:%s", gSystem->BaseName(target.Data()));
5016 if (outfile.BeginsWith(
"master:")) {
5017 outfile.ReplaceAll(
"master:",
"");
5018 if (outfile.IsNull() || !gSystem->IsAbsoluteFileName(outfile)) {
5022 if (Exec(
"gProofServ->GetDataDir()",
"0", kTRUE) == 0) {
5023 TObjString *os = fMacroLog.GetLineWith(
"const char");
5025 Ssiz_t fst = os->GetString().First(
'\"');
5026 Ssiz_t lst = os->GetString().Last(
'\"');
5027 ddir = os->GetString()(fst+1, lst-fst-1);
5029 emsg =
"could not find 'const char *' string in macro log! cannot continue";
5032 emsg =
"could not retrieve master data directory info! cannot continue";
5034 if (!emsg.IsNull()) {
5035 Error(
"HandleOutputOptions",
"%s", emsg.Data());
5039 if (!ddir.IsNull()) ddir +=
"/";
5040 if (outfile.IsNull()) {
5041 outfile.Form(
"%s<file>", ddir.Data());
5043 outfile.Insert(0, TString::Format(
"%s", ddir.Data()));
5048 if (!outfile.IsNull()) {
5049 if (!outfile.BeginsWith(
"of:")) outfile.Insert(0,
"of:");
5050 SetParameter(
"PROOF_DefaultOutputOption", outfile.Data());
5054 if (!dsname.IsNull()) {
5055 dsname.Insert(0,
"ds:");
5057 SetParameter(
"PROOF_DefaultOutputOption", dsname.Data());
5059 if (!stfopt.IsNull()) {
5060 Int_t ostf = (Int_t) stfopt.Atoi();
5062 Warning(
"HandleOutputOptions",
"Dataset required bu Save-To-File disabled: enabling!");
5063 stfopt.Form(
"%d", ostf+1);
5071 if (!stfopt.IsNull()) {
5073 SetParameter(
"PROOF_SavePartialResults", (Int_t) stfopt.Atoi());
5077 if (GetOutputList()) {
5078 if (target ==
"ds|V") {
5081 TIter nxo(GetOutputList());
5083 while ((o = nxo())) {
5084 if (o->InheritsFrom(TFileCollection::Class())) {
5085 VerifyDataSet(o->GetName());
5086 dsname = o->GetName();
5090 if (!dsname.IsNull()) {
5091 TFileCollection *fc = GetDataSet(dsname);
5095 Warning(
"HandleOutputOptions",
"could not retrieve TFileCollection for dataset '%s'", dsname.Data());
5098 Warning(
"HandleOutputOptions",
"dataset not found!");
5101 Bool_t targetcopied = kFALSE;
5102 TProofOutputFile *pf = 0;
5103 if (!target.IsNull())
5104 pf = (TProofOutputFile *) GetOutputList()->FindObject(gSystem->BaseName(target.Data()));
5107 if (strcmp(TUrl(pf->GetOutputFileName(), kTRUE).GetUrl(),
5108 TUrl(target, kTRUE).GetUrl())) {
5109 if (TFile::Cp(pf->GetOutputFileName(), target)) {
5110 Printf(
" Output successfully copied to %s", target.Data());
5111 targetcopied = kTRUE;
5113 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.Data());
5119 TIter nxo(GetOutputList());
5120 Bool_t swapcopied = kFALSE;
5121 while ((o = nxo())) {
5122 TProofOutputFile *pof =
dynamic_cast<TProofOutputFile *
>(o);
5124 if (pof->TestBit(TProofOutputFile::kSwapFile) && !target.IsNull()) {
5125 if (pof == pf && targetcopied)
continue;
5127 if (strcmp(TUrl(pf->GetOutputFileName(), kTRUE).GetUrl(),
5128 TUrl(target, kTRUE).GetUrl())) {
5129 if (TFile::Cp(pof->GetOutputFileName(), target)) {
5130 Printf(
" Output successfully copied to %s", target.Data());
5133 Warning(
"HandleOutputOptions",
"problems copying output to %s", target.Data());
5136 }
else if (pof->IsRetrieve()) {
5138 if (strcmp(TUrl(pf->GetOutputFileName(), kTRUE).GetUrl(),
5139 TUrl(pof->GetTitle(), kTRUE).GetUrl())) {
5140 if (TFile::Cp(pof->GetOutputFileName(), pof->GetTitle())) {
5141 Printf(
" Output successfully copied to %s", pof->GetTitle());
5143 Warning(
"HandleOutputOptions",
5144 "problems copying %s to %s", pof->GetOutputFileName(), pof->GetTitle());
5150 if (!target.IsNull() && !swapcopied) {
5152 fout = TFile::Open(target,
"RECREATE");
5153 if (!fout || (fout && fout->IsZombie())) {
5155 Warning(
"HandleOutputOptions",
"problems opening output file %s", target.Data());
5160 while ((o = nxo())) {
5161 TProofOutputFile *pof =
dynamic_cast<TProofOutputFile *
>(o);
5173 Printf(
" Output saved to %s", target.Data());
5178 DeleteParameters(
"PROOF_DefaultOutputOption");
5180 DeleteParameters(
"PROOF_SavePartialResults");
5201 void TProof::SetFeedback(TString &opt, TString &optfb, Int_t action)
5204 if (action == 0 || (action == 1 && optfb.IsNull())) {
5206 Ssiz_t ifb = opt.Index(tag);
5209 ifb = opt.Index(tag);
5211 if (ifb == kNPOS)
return;
5212 from = ifb + tag.Length();
5214 if (!opt.Tokenize(optfb, from,
"[; ]") || optfb.IsNull()) {
5215 Warning(
"SetFeedback",
"could not extract feedback string! Ignoring ...");
5220 opt.ReplaceAll(tag,
"");
5224 TString nm, startdraw, stopdraw;
5226 while (optfb.Tokenize(nm, from,
",")) {
5228 if (nm ==
"stats") {
5230 startdraw.Form(
"gDirectory->Add(new TStatsFeedback((TProof *)%p))",
this);
5231 gROOT->ProcessLine(startdraw.Data());
5232 SetParameter(
"PROOF_StatsHist",
"");
5233 AddFeedback(
"PROOF_EventsHist");
5234 AddFeedback(
"PROOF_PacketsHist");
5235 AddFeedback(
"PROOF_ProcPcktHist");
5237 stopdraw.Form(
"TObject *o = gDirectory->FindObject(\"%s\"); "
5238 " if (o && strcmp(o->ClassName(), \"TStatsFeedback\")) "
5239 " { gDirectory->Remove(o); delete o; }", GetSessionTag());
5240 gROOT->ProcessLine(stopdraw.Data());
5241 DeleteParameters(
"PROOF_StatsHist");
5242 RemoveFeedback(
"PROOF_EventsHist");
5243 RemoveFeedback(
"PROOF_PacketsHist");
5244 RemoveFeedback(
"PROOF_ProcPcktHist");
5250 startdraw.Form(
"gDirectory->Add(new TDrawFeedback((TProof *)%p))",
this);
5251 gROOT->ProcessLine(startdraw.Data());
5255 stopdraw.Form(
"TObject *o = gDirectory->FindObject(\"%s\"); "
5256 " if (o && strcmp(o->ClassName(), \"TDrawFeedback\")) "
5257 " { gDirectory->Remove(o); delete o; }", GetSessionTag());
5258 gROOT->ProcessLine(stopdraw.Data());
5272 Long64_t TProof::Process(TDSet *dset,
const char *selector, Option_t *option,
5273 Long64_t nentries, Long64_t first)
5275 if (!IsValid() || !fPlayer)
return -1;
5278 SetRunStatus(TProof::kRunning);
5280 TString opt(option), optfb, outfile;
5282 if (opt.Contains(
"fb=") || opt.Contains(
"feedback=")) SetFeedback(opt, optfb, 0);
5284 if (HandleOutputOptions(opt, outfile, 0) != 0)
return -1;
5287 fSync = (GetQueryMode(opt) == kSync);
5289 if (fSync && (!IsIdle() || IsWaiting())) {
5291 Info(
"Process",
"session is in waiting or processing status: switch to asynchronous mode");
5293 opt.ReplaceAll(
"SYNC",
"");
5298 if ((IsIdle() && !IsWaiting()) && fRunningDSets && fRunningDSets->GetSize() > 0) {
5299 fRunningDSets->SetOwner(kTRUE);
5300 fRunningDSets->Delete();
5305 TSignalHandler *sh = 0;
5308 sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
5312 fOutputList.Clear();
5315 if (fWrksOutputReady) {
5316 fWrksOutputReady->SetOwner(kFALSE);
5317 fWrksOutputReady->Clear();
5321 TProof::AssertMacroPath(selector);
5327 if (selector && strlen(selector)) {
5328 rv = fPlayer->Process(dset, selector, opt.Data(), nentries, first);
5329 }
else if (fSelector) {
5330 rv = fPlayer->Process(dset, fSelector, opt.Data(), nentries, first);
5332 Error(
"Process",
"neither a selecrot file nor a selector object have"
5333 " been specified: cannot process!");
5338 Float_t rt = fQuerySTW.RealTime();
5340 TQueryResult *qr = GetQueryResult();
5342 qr->SetTermTime(rt);
5343 qr->SetPrepTime(fPrepTime);
5347 if (!optfb.IsNull()) SetFeedback(opt, optfb, 1);
5349 if (HandleOutputOptions(opt, outfile, 1) != 0)
return -1;
5353 TParameter<Long64_t> *sst =
5354 (TParameter<Long64_t> *) fOutputList.FindObject(
"PROOF_SelectorStatus");
5355 if (sst) rv = sst->GetVal();
5361 gSystem->AddSignalHandler(sh);
5363 if (!fPerfTree.IsNull()) {
5364 if (SavePerfTree() != 0) Error(
"Process",
"saving performance info ...");
5381 Long64_t TProof::Process(TFileCollection *fc,
const char *selector,
5382 Option_t *option, Long64_t nentries, Long64_t first)
5384 if (!IsValid() || !fPlayer)
return -1;
5386 if (fProtocol < 17) {
5387 Info(
"Process",
"server version < 5.18/00:"
5388 " processing of TFileCollection not supported");
5394 TDSet *dset =
new TDSet(TString::Format(
"TFileCollection:%s", fc->GetName()), 0, 0,
"");
5395 fPlayer->AddInput(fc);
5398 Long64_t retval = -1;
5399 if (selector && strlen(selector)) {
5400 retval = Process(dset, selector, option, nentries, first);
5401 }
else if (fSelector) {
5402 retval = Process(dset, fSelector, option, nentries, first);
5404 Error(
"Process",
"neither a selecrot file nor a selector object have"
5405 " been specified: cannot process!");
5407 fPlayer->GetInputList()->Remove(fc);
5410 if (IsLite() && !fSync) {
5411 if (!fRunningDSets) fRunningDSets =
new TList;
5412 fRunningDSets->Add(dset);
5472 Long64_t TProof::Process(
const char *dsetname,
const char *selector,
5473 Option_t *option, Long64_t nentries,
5474 Long64_t first, TObject *elist)
5476 if (fProtocol < 13) {
5477 Info(
"Process",
"processing 'by name' not supported by the server");
5481 TString dsname, fname(dsetname);
5487 const char *separator = (fname.EndsWith(
",")) ?
"," :
"|";
5488 if (!strcmp(separator,
",") || fname.EndsWith(
"|")) fname.Remove(fname.Length()-1, 1);
5489 if (!(gSystem->AccessPathName(fname, kReadPermission))) {
5490 TUrl uf(fname, kTRUE);
5491 uf.SetOptions(TString::Format(
"%sfiletype=raw", uf.GetOptions()));
5492 TFile *f = TFile::Open(uf.GetUrl());
5493 if (f && !(f->IsZombie())) {
5494 const Int_t blen = 8192;
5496 Long64_t rest = f->GetSize();
5498 Long64_t len = (rest > blen - 1) ? blen - 1 : rest;
5499 if (f->ReadBuffer(buf, len)) {
5500 Error(
"Process",
"problems reading from file '%s'", fname.Data());
5511 if (rest > 0)
return -1;
5513 Error(
"Process",
"could not open file '%s'", fname.Data());
5517 if (dsname.IsNull()) {
5521 if (dsname.EndsWith(
"\n")) dsname.Remove(dsname.Length()-1, 1);
5523 dsname.ReplaceAll(
"\n", separator);
5525 Info(
"Process",
"processing multi-dataset read from file '%s':", fname.Data());
5526 Info(
"Process",
" '%s'", dsname.Data());
5530 TString names(dsname), name, enl, newname;
5532 if (fProtocol < 28 && names.Index(TRegexp(
"[, |]")) != kNPOS) {
5533 Info(
"Process",
"multi-dataset processing not supported by the server");
5538 TString dsobj, dsdir;
5540 while (names.Tokenize(name, from,
"[, |]")) {
5545 Int_t ienl = name.Index(
"?enl=");
5546 if (ienl == kNPOS) {
5547 ienl = name.Index(
"<<");
5548 if (ienl != kNPOS) {
5549 newname.Remove(ienl);
5550 ienl += strlen(
"<<");
5553 newname.Remove(ienl);
5554 ienl += strlen(
"?enl=");
5558 TString obj, dir(
"/");
5559 Int_t idxc = newname.Index(
"#");
5560 if (idxc != kNPOS) {
5561 Int_t idxs = newname.Index(
"/", 1, idxc, TString::kExact);
5562 if (idxs != kNPOS) {
5563 obj = newname(idxs+1, newname.Length());
5564 dir = newname(idxc+1, newname.Length());
5565 dir.Remove(dir.Index(
"/") + 1);
5566 newname.Remove(idxc);
5568 obj = newname(idxc+1, newname.Length());
5569 newname.Remove(idxc);
5571 }
else if (newname.Index(
":") != kNPOS && newname.Index(
"://") == kNPOS) {
5573 Error(
"Process",
"bad name syntax (%s): please use"
5574 " a '#' after the dataset name", name.Data());
5575 dsname.ReplaceAll(name,
"");
5578 if (dsobj.IsNull() && dsdir.IsNull()) {
5582 }
else if (obj != dsobj || dir != dsdir) {
5584 Warning(
"Process",
"'obj' or 'dir' specification not consistent w/ the first given: ignore");
5587 if (ienl != kNPOS) {
5589 enl = name(ienl, name.Length());
5593 TList *inpl = GetInputList();
5594 if (inpl && (oel = inpl->FindObject(enl))) el =
dynamic_cast<TEntryList *
>(oel);
5596 if (!el && gDirectory && (oel = gDirectory->FindObject(enl))) {
5597 if ((el = dynamic_cast<TEntryList *>(oel))) {
5600 if (fProtocol >= 28)
5601 if (!(inpl->FindObject(el->GetName()))) AddInput(el);
5606 if (!gSystem->AccessPathName(enl)) {
5607 TFile *f = TFile::Open(enl);
5608 if (f && !(f->IsZombie()) && f->GetListOfKeys()) {
5609 TIter nxk(f->GetListOfKeys());
5611 while ((k = (TKey *) nxk())) {
5612 if (!strcmp(k->GetClassName(),
"TEntryList")) {
5614 if ((el = dynamic_cast<TEntryList *>(f->Get(k->GetName())))) {
5617 if (fProtocol >= 28) {
5618 if (!(inpl->FindObject(el->GetName()))) {
5619 el = (TEntryList *) el->Clone();
5623 el = (TEntryList *) el->Clone();
5626 }
else if (strcmp(el->GetName(), k->GetName())) {
5627 Warning(
"Process",
"multiple entry lists found in file '%s': the first one is taken;\n"
5628 "if this is not what you want, load first the content in memory"
5629 "and select it by name ", enl.Data());
5634 Warning(
"Process",
"file '%s' cannot be open or is empty - ignoring", enl.Data());
5639 if (fProtocol >= 28) {
5643 newname += el->GetName();
5652 dsname.ReplaceAll(name, newname);
5656 TDSet *dset =
new TDSet(dsname, dsobj, dsdir);
5658 if (el && fProtocol < 28) {
5659 dset->SetEntryList(el);
5661 dset->SetEntryList(elist);
5664 Long64_t retval = -1;
5665 if (selector && strlen(selector)) {
5666 retval = Process(dset, selector, option, nentries, first);
5667 }
else if (fSelector) {
5668 retval = Process(dset, fSelector, option, nentries, first);
5670 Error(
"Process",
"neither a selector file nor a selector object have"
5671 " been specified: cannot process!");
5674 if (IsLite() && !fSync) {
5675 if (!fRunningDSets) fRunningDSets =
new TList;
5676 fRunningDSets->Add(dset);
5690 Long64_t TProof::Process(
const char *selector, Long64_t n, Option_t *option)
5692 if (!IsValid())
return -1;
5694 if (fProtocol < 16) {
5695 Info(
"Process",
"server version < 5.17/04: generic processing not supported");
5700 TDSet *dset =
new TDSet;
5701 dset->SetBit(TDSet::kEmpty);
5703 Long64_t retval = -1;
5704 if (selector && strlen(selector)) {
5705 retval = Process(dset, selector, option, n);
5706 }
else if (fSelector) {
5707 retval = Process(dset, fSelector, option, n);
5709 Error(
"Process",
"neither a selector file nor a selector object have"
5710 " been specified: cannot process!");
5714 if (IsLite() && !fSync) {
5715 if (!fRunningDSets) fRunningDSets =
new TList;
5716 fRunningDSets->Add(dset);
5730 Long64_t TProof::Process(TDSet *dset, TSelector *selector, Option_t *option,
5731 Long64_t nentries, Long64_t first)
5733 if (fProtocol < 34) {
5734 Error(
"Process",
"server version < 5.33/02:"
5735 "processing by object not supported");
5739 Error(
"Process",
"selector object undefined!");
5742 fSelector = selector;
5743 Long64_t rc = Process(dset, (
const char*)0, option, nentries, first);
5756 Long64_t TProof::Process(TFileCollection *fc, TSelector *selector,
5757 Option_t *option, Long64_t nentries, Long64_t first)
5759 if (fProtocol < 34) {
5760 Error(
"Process",
"server version < 5.33/02:"
5761 "processing by object not supported");
5765 Error(
"Process",
"selector object undefined!");
5768 fSelector = selector;
5769 Long64_t rc = Process(fc, (
const char*)0, option, nentries, first);
5778 Long64_t TProof::Process(
const char *dsetname, TSelector *selector,
5779 Option_t *option, Long64_t nentries,
5780 Long64_t first, TObject *elist)
5782 if (fProtocol < 34) {
5783 Error(
"Process",
"server version < 5.33/02:"
5784 "processing by object not supported");
5788 Error(
"Process",
"selector object undefined!");
5791 fSelector = selector;
5792 Long64_t rc = Process(dsetname, (
const char*)0, option, nentries, first, elist);
5804 Long64_t TProof::Process(TSelector *selector, Long64_t n, Option_t *option)
5806 if (fProtocol < 34) {
5807 Error(
"Process",
"server version < 5.33/02:"
5808 "processing by object not supported");
5812 Error(
"Process",
"selector object undefined!");
5815 fSelector = selector;
5816 Long64_t rc = Process((
const char*)0, n, option);
5826 Int_t TProof::GetQueryReference(Int_t qry, TString &ref)
5833 TIter nxq(fQueries);
5834 TQueryResult *qr = 0;
5835 while ((qr = (TQueryResult *) nxq()))
5836 if (qr->GetSeqNum() == qry) {
5837 ref.Form(
"%s:%s", qr->GetTitle(), qr->GetName());
5852 Long64_t TProof::Finalize(Int_t qry, Bool_t force)
5857 if (GetQueryReference(qry, ref) == 0) {
5858 return Finalize(ref, force);
5860 Info(
"Finalize",
"query #%d not found", qry);
5864 return Finalize(
"", force);
5877 Long64_t TProof::Finalize(
const char *ref, Bool_t force)
5881 TQueryResult *qr = (ref && strlen(ref) > 0) ? fPlayer->GetQueryResult(ref)
5883 Bool_t retrieve = kFALSE;
5886 if (!xref.IsNull()) {
5890 if (qr->IsFinalized()) {
5894 Info(
"Finalize",
"query already finalized:"
5895 " use Finalize(<qry>,kTRUE) to force new retrieval");
5900 xref.Form(
"%s:%s", qr->GetTitle(), qr->GetName());
5904 Retrieve(xref.Data());
5905 qr = fPlayer->GetQueryResult(xref.Data());
5908 return fPlayer->Finalize(qr);
5917 Int_t TProof::Retrieve(Int_t qry,
const char *path)
5921 if (GetQueryReference(qry, ref) == 0)
5922 return Retrieve(ref, path);
5924 Info(
"Retrieve",
"query #%d not found", qry);
5926 Info(
"Retrieve",
"positive argument required - do nothing");
5936 Int_t TProof::Retrieve(
const char *ref,
const char *path)
5939 TMessage m(kPROOF_RETRIEVE);
5941 Broadcast(m, kActive);
5942 Collect(kActive, fCollectTimeout);
5948 TQueryResult *qr = fPlayer ? fPlayer->GetQueryResult(ref) : 0;
5952 TFile *farc = TFile::Open(path,
"UPDATE");
5953 if (!farc || (farc && !(farc->IsOpen()))) {
5954 Info(
"Retrieve",
"archive file cannot be open (%s)", path);
5960 qr->SetArchived(path);
5969 Info(
"Retrieve",
"query not found after retrieve");
5982 Int_t TProof::Remove(Int_t qry, Bool_t all)
5986 if (GetQueryReference(qry, ref) == 0)
5987 return Remove(ref, all);
5989 Info(
"Remove",
"query #%d not found", qry);
5991 Info(
"Remove",
"positive argument required - do nothing");
6003 Int_t TProof::Remove(
const char *ref, Bool_t all)
6008 fPlayer->RemoveQueryResult(ref);
6011 if (IsLite())
return 0;
6014 TMessage m(kPROOF_REMOVE);
6016 Broadcast(m, kActive);
6017 Collect(kActive, fCollectTimeout);
6026 Int_t TProof::Archive(Int_t qry,
const char *path)
6030 if (GetQueryReference(qry, ref) == 0)
6031 return Archive(ref, path);
6033 Info(
"Archive",
"query #%d not found", qry);
6035 Info(
"Archive",
"positive argument required - do nothing");
6046 Int_t TProof::Archive(
const char *ref,
const char *path)
6049 TMessage m(kPROOF_ARCHIVE);
6050 m << TString(ref) << TString(path);
6051 Broadcast(m, kActive);
6052 Collect(kActive, fCollectTimeout);
6061 Int_t TProof::CleanupSession(
const char *sessiontag)
6064 TMessage m(kPROOF_CLEANUPSESSION);
6065 m << TString(sessiontag);
6066 Broadcast(m, kActive);
6067 Collect(kActive, fCollectTimeout);
6076 void TProof::SetQueryMode(EQueryMode mode)
6081 Info(
"SetQueryMode",
"query mode is set to: %s", fQueryMode == kSync ?
6088 TProof::EQueryMode TProof::GetQueryMode(Option_t *mode)
const
6090 EQueryMode qmode = fQueryMode;
6092 if (mode && (strlen(mode) > 0)) {
6095 if (m.Contains(
"ASYN")) {
6097 }
else if (m.Contains(
"SYNC")) {
6103 Info(
"GetQueryMode",
"query mode is set to: %s", qmode == kSync ?
6115 Long64_t TProof::DrawSelect(TDSet *dset,
const char *varexp,
6116 const char *selection, Option_t *option,
6117 Long64_t nentries, Long64_t first)
6119 if (!IsValid() || !fPlayer)
return -1;
6123 Info(
"DrawSelect",
"not idle, asynchronous Draw not supported");
6126 TString opt(option);
6127 Int_t idx = opt.Index(
"ASYN", 0, TString::kIgnoreCase);
6129 opt.Replace(idx,4,
"");
6131 return fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
6151 Long64_t TProof::DrawSelect(
const char *dsetname,
const char *varexp,
6152 const char *selection, Option_t *option,
6153 Long64_t nentries, Long64_t first, TObject *enl)
6155 if (fProtocol < 13) {
6156 Info(
"Process",
"processing 'by name' not supported by the server");
6160 TString name(dsetname);
6163 Int_t idxc = name.Index(
"#");
6164 if (idxc != kNPOS) {
6165 Int_t idxs = name.Index(
"/", 1, idxc, TString::kExact);
6166 if (idxs != kNPOS) {
6167 obj = name(idxs+1, name.Length());
6168 dir = name(idxc+1, name.Length());
6169 dir.Remove(dir.Index(
"/") + 1);
6172 obj = name(idxc+1, name.Length());
6175 }
else if (name.Index(
":") != kNPOS && name.Index(
"://") == kNPOS) {
6177 Error(
"DrawSelect",
"bad name syntax (%s): please use"
6178 " a '#' after the dataset name", dsetname);
6182 TDSet *dset =
new TDSet(name, obj, dir);
6184 dset->SetEntryList(enl);
6185 Long64_t retval = DrawSelect(dset, varexp, selection, option, nentries, first);
6193 void TProof::StopProcess(Bool_t abort, Int_t timeout)
6196 Info("StopProcess","enter %d", abort);
6202 ERunStatus rst = abort ? TProof::kAborted : TProof::kStopped;
6206 fPlayer->StopProcess(abort, timeout);
6210 if (TestBit(TProof::kIsClient) || abort)
6211 InterruptCurrentMonitor();
6213 if (fSlaves->GetSize() == 0)
6218 TIter next(fSlaves);
6219 while ((sl = (TSlave *)next()))
6222 sl->StopProcess(abort, timeout);
6228 void TProof::DisableGoAsyn()
6230 Emit(
"DisableGoAsyn()");
6236 void TProof::GoAsynchronous()
6238 if (!IsValid())
return;
6240 if (GetRemoteProtocol() < 22) {
6241 Info(
"GoAsynchronous",
"functionality not supported by the server - ignoring");
6245 if (fSync && !IsIdle()) {
6246 TMessage m(kPROOF_GOASYNC);
6249 Info(
"GoAsynchronous",
"either idle or already in asynchronous mode - ignoring");
6256 void TProof::RecvLogFile(TSocket *s, Int_t size)
6258 const Int_t kMAXBUF = 16384;
6262 if (fSaveLogToMacro && fMacroLog.GetListOfLines()) {
6263 fMacroLog.GetListOfLines()->SetOwner(kTRUE);
6264 fMacroLog.GetListOfLines()->Clear();
6269 if (!fLogToWindowOnly) {
6270 fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
6272 Warning(
"RecvLogFile",
"file descriptor for outputs undefined (%d):"
6273 " will not log msgs", fdout);
6276 lseek(fdout, (off_t) 0, SEEK_END);
6280 Long_t filesize = 0;
6282 while (filesize < size) {
6283 left = Int_t(size - filesize);
6284 if (left >= kMAXBUF)
6286 rec = s->RecvRaw(&buf, left);
6287 filesize = (rec > 0) ? (filesize + rec) : filesize;
6288 if (!fLogToWindowOnly && !fSaveLogToMacro) {
6296 w = write(fdout, p, r);
6299 SysError(
"RecvLogFile",
"error writing to unit: %d", fdout);
6305 }
else if (rec < 0) {
6306 Error(
"RecvLogFile",
"error during receiving log file");
6312 EmitVA(
"LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
6314 if (fSaveLogToMacro) fMacroLog.AddLine(buf);
6319 if (fRedirLog && IsIdle() && !TestBit(TProof::kIsMaster))
6327 void TProof::NotifyLogMsg(
const char *msg,
const char *sfx)
6331 if (!msg || (len = strlen(msg)) <= 0)
6335 Int_t lsfx = (sfx) ? strlen(sfx) : 0;
6339 if (!fLogToWindowOnly) {
6340 fdout = (fRedirLog) ? fileno(fLogFileW) : fileno(stdout);
6342 Warning(
"NotifyLogMsg",
"file descriptor for outputs undefined (%d):"
6343 " will not notify msgs", fdout);
6346 lseek(fdout, (off_t) 0, SEEK_END);
6349 if (!fLogToWindowOnly) {
6352 char *p = (
char *)msg;
6355 Int_t w = write(fdout, p, r);
6357 SysError(
"NotifyLogMsg",
"error writing to unit: %d", fdout);
6365 if (write(fdout, sfx, lsfx) != lsfx)
6366 SysError(
"NotifyLogMsg",
"error writing to unit: %d", fdout);
6372 EmitVA(
"LogMessage(const char*,Bool_t)", 2, msg, kFALSE);
6376 if (fRedirLog && IsIdle())
6383 void TProof::LogMessage(
const char *msg, Bool_t all)
6386 Info("LogMessage","Enter ... %s, 'all: %s", msg ? msg : "",
6387 all ? "true" : "false");
6389 if (gROOT->IsBatch()) {
6390 PDB(kGlobal,1) Info("LogMessage","GUI not started - use TProof::ShowLog()");
6395 EmitVA("LogMessage(const
char*,Bool_t)", 2, msg, all);
6401 lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
6403 const Int_t kMAXBUF = 32768;
6407 while ((len = read(fileno(fLogFileR), buf, kMAXBUF-1)) < 0 &&
6408 TSystem::GetErrno() == EINTR)
6409 TSystem::ResetErrno();
6412 Error(
"LogMessage",
"error reading log file");
6418 EmitVA(
"LogMessage(const char*,Bool_t)", 2, buf, kFALSE);
6429 Int_t TProof::SendGroupView()
6431 if (!IsValid())
return -1;
6432 if (TestBit(TProof::kIsClient))
return 0;
6433 if (!fSendGroupView)
return 0;
6434 fSendGroupView = kFALSE;
6436 TIter next(fActiveSlaves);
6439 int bad = 0, cnt = 0, size = GetNumberOfActiveSlaves();
6442 while ((sl = (TSlave *)next())) {
6443 snprintf(str, 32,
"%d %d", cnt, size);
6444 if (sl->GetSocket()->Send(str, kPROOF_GROUPVIEW) == -1) {
6445 MarkBad(sl,
"could not send kPROOF_GROUPVIEW message");
6454 if (bad) SendGroupView();
6456 return GetNumberOfActiveSlaves();
6464 Bool_t TProof::GetFileInCmd(
const char *cmd, TString &fn)
6467 s = s.Strip(TString::kBoth);
6469 if (s.Length() > 0 &&
6470 (s.BeginsWith(
".L") || s.BeginsWith(
".x") || s.BeginsWith(
".X"))) {
6471 TString file = s(2, s.Length());
6472 TString acm, arg, io;
6473 fn = gSystem->SplitAclicMode(file, acm, arg, io);
6490 Int_t TProof::Exec(
const char *cmd, Bool_t plusMaster)
6492 return Exec(cmd, kActive, plusMaster);
6502 Int_t TProof::Exec(
const char *cmd, ESlaves list, Bool_t plusMaster)
6504 if (!IsValid())
return -1;
6507 s = s.Strip(TString::kBoth);
6509 if (!s.Length())
return 0;
6513 if (TProof::GetFileInCmd(s.Data(), filename)) {
6514 char *fn = gSystem->Which(TROOT::GetMacroPath(), filename, kReadPermission);
6516 if (GetNumberOfUniqueSlaves() > 0) {
6517 if (SendFile(fn, kAscii | kForward | kCpBin) < 0) {
6518 Error(
"Exec",
"file %s could not be transfered", fn);
6523 TString scmd = s(0,3) + fn;
6524 Int_t n = SendCommand(scmd, list);
6529 Error(
"Exec",
"macro %s not found", filename.Data());
6537 gROOT->ProcessLine(cmd);
6539 DeactivateWorker(
"*");
6540 Int_t res = SendCommand(cmd, list);
6541 ActivateWorker(
"restore");
6546 return SendCommand(cmd, list);
6558 Int_t TProof::Exec(
const char *cmd,
const char *ord, Bool_t logtomacro)
6560 if (!IsValid())
return -1;
6563 s = s.Strip(TString::kBoth);
6565 if (!s.Length())
return 0;
6569 gROOT->ProcessLine(cmd);
6571 Bool_t oldRedirLog = fRedirLog;
6574 DeactivateWorker(
"*");
6577 if (strcmp(ord,
"master") && strcmp(ord,
"0")) ActivateWorker(ord);
6579 Bool_t oldSaveLog = fSaveLogToMacro;
6580 fSaveLogToMacro = logtomacro;
6581 res = SendCommand(cmd, kActive);
6582 fSaveLogToMacro = oldSaveLog;
6584 ActivateWorker(
"restore");
6585 fRedirLog = oldRedirLog;
6600 Int_t TProof::SendCommand(
const char *cmd, ESlaves list)
6602 if (!IsValid())
return -1;
6604 Broadcast(cmd, kMESS_CINT, list);
6613 TString TProof::Getenv(
const char *env,
const char *ord)
6616 TString cmd = TString::Format(
"gSystem->Getenv(\"%s\")", env);
6617 if (Exec(cmd.Data(), ord, kTRUE) != 0)
return TString(
"");
6619 TObjString *os = fMacroLog.GetLineWith(
"const char");
6623 os->GetString().Tokenize(info, from,
"\"");
6624 os->GetString().Tokenize(info, from,
"\"");
6625 if (gDebug > 0) Printf(
"%s: '%s'", env, info.Data());
6634 Int_t TProof::GetRC(
const char *rcenv, Int_t &env,
const char *ord)
6637 TString cmd = TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6639 if (Exec(cmd.Data(), ord, kTRUE) != 0)
return -1;
6641 TObjString *os = fMacroLog.GetLineWith(
"const char");
6644 Ssiz_t fst = os->GetString().First(
'\"');
6645 Ssiz_t lst = os->GetString().Last(
'\"');
6646 TString info = os->GetString()(fst+1, lst-fst-1);
6647 if (info.IsDigit()) {
6651 Printf(
"%s: %d", rcenv, env);
6660 Int_t TProof::GetRC(
const char *rcenv, Double_t &env,
const char *ord)
6663 TString cmd = TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6665 if (Exec(cmd.Data(), ord, kTRUE) != 0)
return -1;
6667 TObjString *os = fMacroLog.GetLineWith(
"const char");
6670 Ssiz_t fst = os->GetString().First(
'\"');
6671 Ssiz_t lst = os->GetString().Last(
'\"');
6672 TString info = os->GetString()(fst+1, lst-fst-1);
6673 if (info.IsFloat()) {
6677 Printf(
"%s: %f", rcenv, env);
6686 Int_t TProof::GetRC(
const char *rcenv, TString &env,
const char *ord)
6689 TString cmd = TString::Format(
"if (gEnv->Lookup(\"%s\")) { gEnv->GetValue(\"%s\",\"\"); }", rcenv, rcenv);
6691 if (Exec(cmd.Data(), ord, kTRUE) != 0)
return -1;
6693 TObjString *os = fMacroLog.GetLineWith(
"const char");
6696 Ssiz_t fst = os->GetString().First(
'\"');
6697 Ssiz_t lst = os->GetString().Last(
'\"');
6698 env = os->GetString()(fst+1, lst-fst-1);
6701 Printf(
"%s: %s", rcenv, env.Data());
6711 Int_t TProof::SendCurrentState(TList *list)
6713 if (!IsValid())
return -1;
6717 Broadcast(gDirectory->GetPath(), kPROOF_RESET, list);
6719 return GetParallel();
6727 Int_t TProof::SendCurrentState(ESlaves list)
6729 if (!IsValid())
return -1;
6733 Broadcast(gDirectory->GetPath(), kPROOF_RESET, list);
6735 return GetParallel();
6743 Int_t TProof::SendInitialState()
6745 if (!IsValid())
return -1;
6747 SetLogLevel(fLogLevel, gProofDebugMask);
6749 return GetNumberOfActiveSlaves();
6768 Bool_t TProof::CheckFile(
const char *file, TSlave *slave, Long_t modtime, Int_t cpopt)
6770 Bool_t sendto = kFALSE;
6773 TString sn = slave->GetName();
6775 sn += slave->GetOrdinal();
6777 sn += gSystem->BaseName(file);
6780 FileMap_t::const_iterator it;
6781 if ((it = fFileMap.find(sn)) != fFileMap.end()) {
6783 MD5Mod_t md = (*it).second;
6784 if (md.fModtime != modtime) {
6785 TMD5 *md5 = TMD5::FileChecksum(file);
6787 if ((*md5) != md.fMD5) {
6790 md.fModtime = modtime;
6797 if (TestBit(TProof::kIsMaster)) {
6799 TMessage mess(kPROOF_CHECKFILE);
6800 mess << TString(gSystem->BaseName(file)) << md.fMD5 << cpopt;
6801 slave->GetSocket()->Send(mess);
6803 fCheckFileStatus = 0;
6804 Collect(slave, fCollectTimeout, kPROOF_CHECKFILE);
6805 sendto = (fCheckFileStatus == 0) ? kTRUE : kFALSE;
6810 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
6816 TMD5 *md5 = TMD5::FileChecksum(file);
6820 md.fModtime = modtime;
6824 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
6827 TMessage mess(kPROOF_CHECKFILE);
6828 mess << TString(gSystem->BaseName(file)) << md.fMD5 << cpopt;
6829 slave->GetSocket()->Send(mess);
6831 fCheckFileStatus = 0;
6832 Collect(slave, fCollectTimeout, kPROOF_CHECKFILE);
6833 sendto = (fCheckFileStatus == 0) ? kTRUE : kFALSE;
6863 Int_t TProof::SendFile(
const char *file, Int_t opt,
const char *rfile, TSlave *wrk)
6865 if (!IsValid())
return -1;
6868 TList *slaves = (rfile && !strcmp(rfile,
"cache")) ? fUniqueSlaves : fActiveSlaves;
6871 slaves =
new TList();
6875 if (slaves->GetSize() == 0)
return 0;
6878 Int_t fd = open(file, O_RDONLY);
6880 Int_t fd = open(file, O_RDONLY | O_BINARY);
6883 SysError(
"SendFile",
"cannot open file %s", file);
6889 Long_t id, flags, modtime = 0;
6890 if (gSystem->GetPathInfo(file, &
id, &size, &flags, &modtime) == 1) {
6891 Error(
"SendFile",
"cannot stat file %s", file);
6896 Error(
"SendFile",
"empty file %s", file);
6902 Bool_t bin = (opt & kBinary) ? kTRUE : kFALSE;
6903 Bool_t force = (opt & kForce) ? kTRUE : kFALSE;
6904 Bool_t fw = (opt & kForward) ? kTRUE : kFALSE;
6908 if ((opt & kCp)) cpopt |= kCp;
6909 if ((opt & kCpBin)) cpopt |= (kCp | kCpBin);
6911 const Int_t kMAXBUF = 32768;
6917 TString fnam(rfile);
6918 if (fnam ==
"cache") {
6919 fnam += TString::Format(
":%s", gSystem->BaseName(file));
6920 }
else if (fnam.IsNull()) {
6921 fnam = gSystem->BaseName(file);
6925 while ((sl = (TSlave *)next())) {
6929 Bool_t sendto = force ? kTRUE : CheckFile(file, sl, modtime, cpopt);
6934 const char *snd = (sl->fSlaveType == TSlave::kSlave && sendto) ?
"" :
"not";
6935 Info(
"SendFile",
"%s sending file %s to: %s:%s (%d)", snd,
6936 file, sl->GetName(), sl->GetOrdinal(), sendto);
6938 if (sl->fSlaveType == TSlave::kSlave && !sendto)
6942 Long64_t siz = sendto ? size : 0;
6943 snprintf(buf, kMAXBUF,
"%s %d %lld %d", fnam.Data(), bin, siz, fw);
6944 if (sl->GetSocket()->Send(buf, kPROOF_SENDFILE) == -1) {
6945 MarkBad(sl,
"could not send kPROOF_SENDFILE request");
6951 lseek(fd, 0, SEEK_SET);
6955 while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
6956 TSystem::ResetErrno();
6959 SysError(
"SendFile",
"error reading from file %s", file);
6960 Interrupt(kSoftInterrupt, kActive);
6965 if (len > 0 && sl->GetSocket()->SendRaw(buf, len) == -1) {
6966 SysError(
"SendFile",
"error writing to slave %s:%s (now offline)",
6967 sl->GetName(), sl->GetOrdinal());
6968 MarkBad(sl,
"sendraw failure");
6979 Collect(sl, fCollectTimeout, kPROOF_SENDFILE);
6985 if (slaves != fActiveSlaves && slaves != fUniqueSlaves)
6989 return (fStatus != 0) ? -1 : nsl;
6997 Int_t TProof::Echo(
const TObject *obj)
6999 if (!IsValid() || !obj)
return -1;
7000 TMessage mess(kPROOF_ECHO);
7001 mess.WriteObject(obj);
7002 return Broadcast(mess);
7011 Int_t TProof::Echo(
const char *str)
7013 TObjString *os =
new TObjString(str);
7014 Int_t rv = Echo(os);
7023 Int_t TProof::SendObject(
const TObject *obj, ESlaves list)
7025 if (!IsValid() || !obj)
return -1;
7027 TMessage mess(kMESS_OBJECT);
7029 mess.WriteObject(obj);
7030 return Broadcast(mess, list);
7037 Int_t TProof::SendPrint(Option_t *option)
7039 if (!IsValid())
return -1;
7041 Broadcast(option, kPROOF_PRINT, kActive);
7042 return Collect(kActive, fCollectTimeout);
7048 void TProof::SetLogLevel(Int_t level, UInt_t mask)
7052 gProofDebugLevel = level;
7053 gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
7054 snprintf(str, 32,
"%d %u", level, mask);
7055 Broadcast(str, kPROOF_LOGLEVEL, kAll);
7065 void TProof::SetRealTimeLog(Bool_t on)
7068 TMessage mess(kPROOF_REALTIMELOG);
7072 Warning(
"SetRealTimeLog",
"session is invalid - do nothing");
7081 Int_t TProof::SetParallelSilent(Int_t nodes, Bool_t random)
7083 if (!IsValid())
return -1;
7085 if (TestBit(TProof::kIsMaster)) {
7086 if (!fDynamicStartup) GoParallel(nodes, kFALSE, random);
7087 return SendCurrentState();
7090 PDB(kGlobal,1) Info("SetParallelSilent", "request all nodes");
7092 PDB(kGlobal,1) Info("SetParallelSilent", "request %d node%s", nodes,
7093 nodes == 1 ? "" : "s");
7095 TMessage mess(kPROOF_PARALLEL);
7096 mess << nodes << random;
7098 Collect(kActive, fCollectTimeout);
7099 Int_t n = GetParallel();
7100 PDB(kGlobal,1) Info("SetParallelSilent", "got %d node%s", n, n == 1 ? "" : "s");
7109 Int_t TProof::SetParallel(Int_t nodes, Bool_t random)
7112 if (fDynamicStartup && nodes < 0) {
7113 if (gSystem->Getenv(
"PROOF_NWORKERS")) gSystem->Unsetenv(
"PROOF_NWORKERS");
7116 Int_t n = SetParallelSilent(nodes, random);
7117 if (TestBit(TProof::kIsClient)) {
7119 Printf(
"PROOF set to sequential mode");
7121 TString subfix = (n == 1) ?
"" :
"s";
7123 subfix +=
", randomly selected";
7124 Printf(
"PROOF set to parallel mode (%d worker%s)", n, subfix.Data());
7126 }
else if (fDynamicStartup && nodes >= 0) {
7127 if (gSystem->Getenv(
"PROOF_NWORKERS")) gSystem->Unsetenv(
"PROOF_NWORKERS");
7128 gSystem->Setenv(
"PROOF_NWORKERS", TString::Format(
"%d", nodes));
7139 Int_t TProof::GoMoreParallel(Int_t nWorkersToAdd)
7141 if (!IsValid() || !IsMaster() || IsIdle()) {
7142 Error(
"GoMoreParallel",
"can't invoke here -- should not happen!");
7145 if (!gProofServ && !IsLite()) {
7146 Error(
"GoMoreParallel",
"no ProofServ available nor Lite -- should not happen!");
7151 TIter next( fSlaves );
7152 Int_t nAddedWorkers = 0;
7154 while (((nAddedWorkers < nWorkersToAdd) || (nWorkersToAdd == -1)) &&
7155 (( sl = dynamic_cast<TSlave *>( next() ) ))) {
7158 if ((sl->GetSlaveType() != TSlave::kSlave) &&
7159 (sl->GetSlaveType() != TSlave::kMaster)) {
7160 Error(
"GoMoreParallel",
"TSlave is neither a Master nor a Slave: %s:%s",
7161 sl->GetName(), sl->GetOrdinal());
7166 if ((!sl->IsValid()) || (fBadSlaves->FindObject(sl)) ||
7167 (strcmp(
"IGNORE", sl->GetImage()) == 0)) {
7169 Info("GoMoreParallel", "Worker %s:%s won't be considered",
7170 sl->GetName(), sl->GetOrdinal());
7175 if (fActiveSlaves->FindObject(sl)) {
7176 Info(
"GoMoreParallel",
"Worker %s:%s is already active: skipping",
7177 sl->GetName(), sl->GetOrdinal());
7185 if (sl->GetSlaveType() == TSlave::kSlave) {
7186 sl->SetStatus(TSlave::kActive);
7187 fActiveSlaves->Add(sl);
7188 fInactiveSlaves->Remove(sl);
7189 fActiveMonitor->Add(sl->GetSocket());
7192 Info("GoMoreParallel", "Worker %s:%s marked as active!",
7193 sl->GetName(), sl->GetOrdinal());
7197 Error(
"GoMoreParallel",
"Dynamic addition of master is not supported");
7205 Info("GoMoreParallel", "Will invoke AskStatistics() -- implies a Collect()");
7210 Info("GoMoreParallel", "Will invoke FindUniqueSlaves()");
7215 Info("GoMoreParallel", "Will invoke SendGroupView()");
7219 Info("GoMoreParallel", "Will invoke GetParallel()");
7220 Int_t nTotalWorkers = GetParallel();
7225 s.Form("PROOF just went more parallel (%d additional worker%s, %d worker%s total)",
7226 nAddedWorkers, (nAddedWorkers == 1) ? "" : "s",
7227 nTotalWorkers, (nTotalWorkers == 1) ? "" : "s");
7228 if (gProofServ) gProofServ->SendAsynMessage(s);
7229 Info("GoMoreParallel", "%s", s.Data());
7231 return nTotalWorkers;
7242 Int_t TProof::GoParallel(Int_t nodes, Bool_t attach, Bool_t random)
7244 if (!IsValid())
return -1;
7246 fActiveSlaves->Clear();
7247 fActiveMonitor->RemoveAll();
7252 TList *wlst =
new TList;
7254 fInactiveSlaves->Clear();
7255 while ((sl = (TSlave *)nxt())) {
7256 if (sl->IsValid() && !fBadSlaves->FindObject(sl)) {
7257 if (strcmp(
"IGNORE", sl->GetImage()) == 0)
continue;
7258 if ((sl->GetSlaveType() != TSlave::kSlave) &&
7259 (sl->GetSlaveType() != TSlave::kMaster)) {
7260 Error(
"GoParallel",
"TSlave is neither Master nor Slave");
7266 fInactiveSlaves->Add(sl);
7267 sl->SetStatus(TSlave::kInactive);
7270 Int_t nwrks = (nodes < 0 || nodes > wlst->GetSize()) ? wlst->GetSize() : nodes;
7272 fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
7273 while (cnt < nwrks) {
7276 Int_t iwrk = (Int_t) (gRandom->Rndm() * wlst->GetSize());
7277 sl = (TSlave *) wlst->At(iwrk);
7280 sl = (TSlave *) wlst->First();
7283 Error(
"GoParallel",
"attaching to candidate!");
7289 Int_t slavenodes = 0;
7290 if (sl->GetSlaveType() == TSlave::kSlave) {
7291 sl->SetStatus(TSlave::kActive);
7292 fActiveSlaves->Add(sl);
7293 fInactiveSlaves->Remove(sl);
7294 fActiveMonitor->Add(sl->GetSocket());
7296 }
else if (sl->GetSlaveType() == TSlave::kMaster) {
7297 fEndMaster = kFALSE;
7298 TMessage mess(kPROOF_PARALLEL);
7300 Int_t nn = (nodes < 0) ? -1 : nodes-cnt;
7304 mess.SetWhat(kPROOF_LOGFILE);
7307 if (sl->GetSocket()->Send(mess) == -1) {
7308 MarkBad(sl,
"could not send kPROOF_PARALLEL or kPROOF_LOGFILE request");
7311 Collect(sl, fCollectTimeout);
7312 if (sl->IsValid()) {
7313 sl->SetStatus(TSlave::kActive);
7314 fActiveSlaves->Add(sl);
7315 fInactiveSlaves->Remove(sl);
7316 fActiveMonitor->Add(sl->GetSocket());
7317 if (sl->GetParallel() > 0) {
7318 slavenodes = sl->GetParallel();
7324 MarkBad(sl,
"collect failed after kPROOF_PARALLEL or kPROOF_LOGFILE request");
7347 Int_t n = GetParallel();
7349 if (TestBit(TProof::kIsClient)) {
7351 printf(
"PROOF set to sequential mode\n");
7353 printf(
"PROOF set to parallel mode (%d worker%s)\n",
7354 n, n == 1 ?
"" :
"s");
7357 PDB(kGlobal,1) Info("GoParallel", "got %d node%s", n, n == 1 ? "" : "s");
7365 void TProof::ShowData()
7367 if (!IsValid() || !fManager)
return;
7370 fManager->Find(
"~/data",
"-type f",
"all");
7381 void TProof::ClearData(UInt_t what,
const char *dsname)
7383 if (!IsValid() || !fManager)
return;
7386 TString prompt, a(
"Y");
7387 Bool_t force = (what & kForceClear) ? kTRUE : kFALSE;
7388 Bool_t doask = (!force && IsTty()) ? kTRUE : kFALSE;
7391 if ((what & TProof::kPurge)) {
7393 if (doask && !Prompt(
"Do you really want to remove all data files"))
return;
7394 if (fManager->Rm(
"~/data/*",
"-rf",
"all") < 0)
7395 Warning(
"ClearData",
"problems purging data directory");
7397 }
else if ((what & TProof::kDataset)) {
7399 if (!dsname || strlen(dsname) <= 0) {
7400 Error(
"ClearData",
"dataset name mandatory when removing a full dataset");
7404 if (!ExistsDataSet(dsname)) {
7405 Error(
"ClearData",
"dataset '%s' does not exists", dsname);
7409 TFileCollection *fc = GetDataSet(dsname);
7411 Error(
"ClearData",
"could not retrieve info about dataset '%s'", dsname);
7415 TString pmpt = TString::Format(
"Do you really want to remove all data files"
7416 " of dataset '%s'", dsname);
7417 if (doask && !Prompt(pmpt.Data()))
return;
7420 Bool_t rmds = kTRUE;
7421 TIter nxf(fc->GetList());
7423 Int_t rfiles = 0, nfiles = fc->GetList()->GetSize();
7424 while ((fi = (TFileInfo *) nxf())) {
7428 if (!(fi->GetFirstUrl())) {
7429 Error(
"ClearData",
"GetFirstUrl() returns NULL for '%s' - skipping",
7433 TUrl uf(*(fi->GetFirstUrl()));
7434 file = uf.GetFile();
7435 host = uf.GetHost();
7437 Int_t nurl = fi->GetNUrls();
7440 while (nurl-- && fi->NextUrl()) {
7441 up = fi->GetCurrentUrl();
7442 if (!strcmp(up->GetProtocol(),
"file")) {
7443 TString opt(up->GetOptions());
7444 if (opt.BeginsWith(
"node=")) {
7446 host.ReplaceAll(
"node=",
"");
7447 file = up->GetFile();
7453 if (fManager->Rm(file.Data(),
"-f", host.Data()) != 0) {
7454 Error(
"ClearData",
"problems removing '%s'", file.Data());
7459 ClearDataProgress(rfiles, nfiles);
7461 fprintf(stderr,
"\n");
7464 RemoveDataSet(dsname);
7466 }
else if (what & TProof::kUnregistered) {
7469 TString outtmp(
"ProofClearData_");
7470 FILE *ftmp = gSystem->TempFileName(outtmp);
7472 Error(
"ClearData",
"cannot create temp file for logs");
7477 gSystem->RedirectOutput(outtmp.Data(),
"w", &h);
7479 gSystem->RedirectOutput(0, 0, &h);
7482 in.open(outtmp.Data());
7483 if (!in.is_open()) {
7484 Error(
"ClearData",
"could not open temp file for logs: %s", outtmp.Data());
7485 gSystem->Unlink(outtmp);
7490 TMap *afmap =
new TMap;
7491 TString line, host, file;
7495 if (line.IsNull())
continue;
7496 while (line.EndsWith(
"\n")) { line.Strip(TString::kTrailing,
'\n'); }
7499 if (!line.Tokenize(host, from,
"| "))
continue;
7501 if (!line.Tokenize(file, from,
"| "))
continue;
7502 if (!host.IsNull() && !file.IsNull()) {
7503 TList *fl = (TList *) afmap->GetValue(host.Data());
7507 afmap->Add(
new TObjString(host), fl);
7509 fl->Add(
new TObjString(file));
7512 Info("ClearData", "added info for: h:%s, f:%s", host.Data(), file.Data());
7514 Warning(
"ClearData",
"found incomplete line: '%s'", line.Data());
7519 gSystem->Unlink(outtmp);
7522 TString sel = TString::Format(
"/%s/%s/", GetGroup(), GetUser());
7523 TMap *fcmap = GetDataSets(sel);
7524 if (!fcmap || (fcmap && fcmap->GetSize() <= 0)) {
7526 Warning("ClearData", "no dataset beloning to '%s'", sel.Data());
7535 while ((os = (TObjString *) nxfc())) {
7536 TFileCollection *fc = 0;
7537 if ((fc = (TFileCollection *) fcmap->GetValue(os))) {
7539 TIter nxfi(fc->GetList());
7540 while ((fi = (TFileInfo *) nxfi())) {
7543 Int_t nurl = fi->GetNUrls();
7545 while (nurl-- && fi->NextUrl()) {
7546 up = fi->GetCurrentUrl();
7547 if (!strcmp(up->GetProtocol(),
"file")) {
7548 opt = up->GetOptions();
7549 if (opt.BeginsWith(
"node=")) {
7551 host.ReplaceAll(
"node=",
"");
7552 file = up->GetFile();
7554 Info("ClearData", "found: host: %s, file: %s", host.Data(), file.Data());
7556 TList *fl = (TList *) afmap->GetValue(host.Data());
7558 TObjString *fn = (TObjString *) fl->FindObject(file.Data());
7564 Warning(
"ClearData",
7565 "registered file '%s' not found in the full list!",
7577 if (fcmap) fcmap->SetOwner(kTRUE);
7581 Info(
"ClearData",
"%d unregistered files to be removed:", nfiles);
7584 TString pmpt = TString::Format(
"Do you really want to remove all %d"
7585 " unregistered data files", nfiles);
7586 if (doask && !Prompt(pmpt.Data()))
return;
7591 while ((os = (TObjString *) nxls())) {
7593 if ((fl = (TList *) afmap->GetValue(os))) {
7596 while ((fn = (TObjString *) nxf())) {
7598 if (fManager->Rm(fn->GetName(),
"-f", os->GetName()) != 0) {
7599 Error(
"ClearData",
"problems removing '%s' on host '%s'",
7600 fn->GetName(), os->GetName());
7603 ClearDataProgress(rfiles, nfiles);
7607 fprintf(stderr,
"\n");
7609 afmap->SetOwner(kTRUE);
7618 Bool_t TProof::Prompt(
const char *p)
7621 if (!pp.Contains(
"?")) pp +=
"?";
7622 if (!pp.Contains(
"[y/N]")) pp +=
" [y/N]";
7623 TString a = Getline(pp.Data());
7624 if (a !=
"\n" && a[0] !=
'y' && a[0] !=
'Y' && a[0] !=
'n' && a[0] !=
'N') {
7625 Printf(
"Please answer y, Y, n or N");
7628 }
else if (a ==
"\n" || a[0] ==
'n' || a[0] ==
'N') {
7639 void TProof::ClearDataProgress(Int_t r, Int_t t)
7641 fprintf(stderr,
"[TProof::ClearData] Total %5d files\t|", t);
7642 for (Int_t l = 0; l < 20; l++) {
7643 if (r > 0 && t > 0) {
7645 fprintf(stderr,
"=");
7646 else if (l == 20*r/t)
7647 fprintf(stderr,
">");
7648 else if (l > 20*r/t)
7649 fprintf(stderr,
".");
7651 fprintf(stderr,
"=");
7653 fprintf(stderr,
"| %.02f %% \r", 100.0*(t ? (r/t) : 1));
7660 void TProof::ShowCache(Bool_t all)
7662 if (!IsValid())
return;
7664 TMessage mess(kPROOF_CACHE);
7665 mess << Int_t(kShowCache) << all;
7666 Broadcast(mess, kUnique);
7669 TMessage mess2(kPROOF_CACHE);
7670 mess2 << Int_t(kShowSubCache) << all;
7671 Broadcast(mess2, fNonUniqueMasters);
7673 Collect(kAllUnique, fCollectTimeout);
7675 Collect(kUnique, fCollectTimeout);
7683 void TProof::ClearCache(
const char *file)
7685 if (!IsValid())
return;
7687 TMessage mess(kPROOF_CACHE);
7688 mess << Int_t(kClearCache) << TString(file);
7689 Broadcast(mess, kUnique);
7691 TMessage mess2(kPROOF_CACHE);
7692 mess2 << Int_t(kClearSubCache) << TString(file);
7693 Broadcast(mess2, fNonUniqueMasters);
7695 Collect(kAllUnique);
7704 void TProof::SystemCmd(
const char *cmd, Int_t fdout)
7711 FILE *fin = gSystem->OpenPipe(cmd,
"r");
7715 while (fgets(line, 2048, fin)) {
7716 Int_t r = strlen(line);
7718 if (write(fdout, line, r) < 0) {
7719 ::Warning(
"TProof::SystemCmd",
7720 "errno %d writing to file descriptor %d",
7721 TSystem::GetErrno(), fdout);
7728 gSystem->ClosePipe(fin);
7739 void TProof::ShowPackages(Bool_t all, Bool_t redirlog)
7741 if (!IsValid())
return;
7743 Bool_t oldredir = fRedirLog;
7744 if (redirlog) fRedirLog = kTRUE;
7747 FILE *fout = (fRedirLog) ? fLogFileW : stdout;
7749 Warning(
"ShowPackages",
"file descriptor for outputs undefined (%p):"
7750 " will not log msgs", fout);
7753 lseek(fileno(fout), (off_t) 0, SEEK_END);
7755 if (TestBit(TProof::kIsClient)) {
7761 fRedirLog = oldredir;
7765 TMessage mess(kPROOF_CACHE);
7766 mess << Int_t(kShowPackages) << all;
7767 Broadcast(mess, kUnique);
7770 TMessage mess2(kPROOF_CACHE);
7771 mess2 << Int_t(kShowSubPackages) << all;
7772 Broadcast(mess2, fNonUniqueMasters);
7774 Collect(kAllUnique, fCollectTimeout);
7776 Collect(kUnique, fCollectTimeout);
7779 fRedirLog = oldredir;
7787 void TProof::ShowEnabledPackages(Bool_t all)
7789 if (!IsValid())
return;
7791 if (TestBit(TProof::kIsClient)) {
7792 fPackMgr->ShowEnabled(TString::Format(
"*** Enabled packages on client on %s\n",
7793 gSystem->HostName()));
7797 if (IsLite())
return;
7799 TMessage mess(kPROOF_CACHE);
7800 mess << Int_t(kShowEnabledPackages) << all;
7802 Collect(kActive, fCollectTimeout);
7809 Int_t TProof::ClearPackages()
7811 if (!IsValid())
return -1;
7813 if (UnloadPackages() == -1)
7816 if (DisablePackages() == -1)
7826 Int_t TProof::ClearPackage(
const char *package)
7828 if (!IsValid())
return -1;
7830 if (!package || !package[0]) {
7831 Error(
"ClearPackage",
"need to specify a package name");
7836 TString pac = package;
7837 if (pac.EndsWith(
".par"))
7838 pac.Remove(pac.Length()-4);
7839 pac = gSystem->BaseName(pac);
7841 if (UnloadPackage(pac) == -1)
7844 if (DisablePackage(pac) == -1)
7854 Int_t TProof::DisablePackage(
const char *pack)
7856 if (!IsValid())
return -1;
7858 if (!pack || strlen(pack) <= 0) {
7859 Error(
"DisablePackage",
"need to specify a package name");
7865 if (pac.EndsWith(
".par"))
7866 pac.Remove(pac.Length()-4);
7867 pac = gSystem->BaseName(pac);
7869 if (fPackMgr->Remove(pack) < 0)
7870 Warning(
"DisablePackage",
"problem removing locally package '%s'", pack);
7873 if (IsLite())
return 0;
7876 Bool_t done = kFALSE;
7880 path.Form(
"~/packages/%s", pack);
7881 if (fManager->Rm(path,
"-rf",
"all") != -1) {
7882 path.Append(
".par");
7883 if (fManager->Rm(path,
"-f",
"all") != -1) {
7891 TMessage mess(kPROOF_CACHE);
7892 mess << Int_t(kDisablePackage) << pac;
7893 Broadcast(mess, kUnique);
7895 TMessage mess2(kPROOF_CACHE);
7896 mess2 << Int_t(kDisableSubPackage) << pac;
7897 Broadcast(mess2, fNonUniqueMasters);
7899 Collect(kAllUnique);
7911 Int_t TProof::DisablePackages()
7913 if (!IsValid())
return -1;
7916 if (fPackMgr->Remove(
nullptr) < 0)
7917 Warning(
"DisablePackages",
"problem removing packages locally");
7920 if (IsLite())
return 0;
7923 Bool_t done = kFALSE;
7926 if (fManager->Rm(
"~/packages/*",
"-rf",
"all") != -1) {
7933 TMessage mess(kPROOF_CACHE);
7934 mess << Int_t(kDisablePackages);
7935 Broadcast(mess, kUnique);
7937 TMessage mess2(kPROOF_CACHE);
7938 mess2 << Int_t(kDisableSubPackages);
7939 Broadcast(mess2, fNonUniqueMasters);
7941 Collect(kAllUnique);
7959 Int_t TProof::BuildPackage(
const char *package,
7960 EBuildPackageOpt opt, Int_t chkveropt, TList *workers)
7962 if (!IsValid())
return -1;
7964 if (!package || !package[0]) {
7965 Error(
"BuildPackage",
"need to specify a package name");
7970 TString pac = package;
7971 if (pac.EndsWith(
".par"))
7972 pac.Remove(pac.Length()-4);
7973 pac = gSystem->BaseName(pac);
7975 Bool_t buildOnClient = kTRUE;
7976 if (opt == kDontBuildOnClient) {
7977 buildOnClient = kFALSE;
7984 if (opt <= kBuildAll && (!IsLite() || !buildOnClient)) {
7986 TMessage mess(kPROOF_CACHE);
7987 mess << Int_t(kBuildPackage) << pac << chkveropt;
7988 Broadcast(mess, workers);
7991 TMessage mess(kPROOF_CACHE);
7992 mess << Int_t(kBuildPackage) << pac << chkveropt;
7993 Broadcast(mess, kUnique);
7995 TMessage mess2(kPROOF_CACHE);
7996 mess2 << Int_t(kBuildSubPackage) << pac << chkveropt;
7997 Broadcast(mess2, fNonUniqueMasters);
8001 if (opt >= kBuildAll) {
8004 if (buildOnClient) {
8005 st = fPackMgr->Build(pac, chkveropt);
8010 if (!IsLite() || !buildOnClient) {
8018 Collect(kAllUnique);
8022 if (fStatus < 0 || st < 0)
8038 Int_t TProof::LoadPackage(
const char *package, Bool_t notOnClient,
8039 TList *loadopts, TList *workers)
8041 if (!IsValid())
return -1;
8043 if (!package || !package[0]) {
8044 Error(
"LoadPackage",
"need to specify a package name");
8049 TString pac = package;
8050 if (pac.EndsWith(
".par"))
8051 pac.Remove(pac.Length()-4);
8052 pac = gSystem->BaseName(pac);
8054 if (!notOnClient && TestBit(TProof::kIsClient))
8055 if (fPackMgr->Load(package, loadopts) == -1)
return -1;
8057 TMessage mess(kPROOF_CACHE);
8058 mess << Int_t(kLoadPackage) << pac;
8059 if (loadopts) mess << loadopts;
8062 Bool_t deactivateOnFailure = (IsMaster()) ? kTRUE : kFALSE;
8064 Bool_t doCollect = (fDynamicStartup && !IsIdle()) ? kFALSE : kTRUE;
8068 Info("LoadPackage", "Sending load message to selected workers only");
8069 Broadcast(mess, workers);
8070 if (doCollect) Collect(workers, -1, -1, deactivateOnFailure);
8073 Collect(kActive, -1, -1, deactivateOnFailure);
8083 Int_t TProof::UnloadPackage(
const char *package)
8085 if (!IsValid())
return -1;
8087 if (!package || !package[0]) {
8088 Error(
"UnloadPackage",
"need to specify a package name");
8093 TString pac = package;
8094 if (pac.EndsWith(
".par"))
8095 pac.Remove(pac.Length()-4);
8096 pac = gSystem->BaseName(pac);
8098 if (fPackMgr->Unload(package) < 0)
8099 Warning(
"UnloadPackage",
"unable to remove symlink to %s", package);
8102 if (IsLite())
return 0;
8104 TMessage mess(kPROOF_CACHE);
8105 mess << Int_t(kUnloadPackage) << pac;
8116 Int_t TProof::UnloadPackages()
8118 if (!IsValid())
return -1;
8120 if (TestBit(TProof::kIsClient)) {
8121 if (fPackMgr->Unload(0) < 0)
return -1;
8125 if (IsLite())
return 0;
8127 TMessage mess(kPROOF_CACHE);
8128 mess << Int_t(kUnloadPackages);
8144 Int_t TProof::EnablePackage(
const char *package, Bool_t notOnClient,
8147 return EnablePackage(package, (TList *)0, notOnClient, workers);
8166 Int_t TProof::EnablePackage(
const char *package,
const char *loadopts,
8167 Bool_t notOnClient, TList *workers)
8170 if (loadopts && strlen(loadopts)) {
8171 if (fProtocol > 28) {
8172 TObjString *os =
new TObjString(loadopts);
8174 os->String().ReplaceAll(
"checkversion=",
"chkv=");
8175 Ssiz_t fcv = kNPOS, lcv = kNPOS;
8176 if ((fcv = os->String().Index(
"chkv=")) != kNPOS) {
8177 TRegexp re(
"[; |]");
8178 if ((lcv = os->String().Index(re, fcv)) == kNPOS) {
8179 lcv = os->String().Length();
8181 TString ocv = os->String()(fcv, lcv - fcv);
8183 if (ocv.EndsWith(
"=off") || ocv.EndsWith(
"=0"))
8184 cvopt = (Int_t) TPackMgr::kDontCheck;
8185 else if (ocv.EndsWith(
"=on") || ocv.EndsWith(
"=1"))
8186 cvopt = (Int_t) TPackMgr::kCheckROOT;
8188 Warning(
"EnablePackage",
"'checkversion' option unknown from argument: '%s' - ignored", ocv.Data());
8191 Info(
"EnablePackage",
"setting check version option from argument: %d", cvopt);
8193 optls->Add(
new TParameter<Int_t>(
"PROOF_Package_CheckVersion", (Int_t) cvopt));
8195 if (lcv != kNPOS && fcv == 0) ocv += os->String()[lcv];
8196 if (fcv > 0 && os->String().Index(re, fcv - 1) == fcv - 1) os->String().Remove(fcv - 1, 1);
8197 os->String().ReplaceAll(ocv.Data(),
"");
8200 if (!os->String().IsNull()) {
8201 if (!optls) optls =
new TList;
8202 optls->Add(
new TObjString(os->String().Data()));
8204 if (optls) optls->SetOwner(kTRUE);
8207 Warning(
"EnablePackage",
"remote server does not support options: ignoring the option string");
8211 Int_t rc = EnablePackage(package, optls, notOnClient, workers);
8227 Int_t TProof::EnablePackage(
const char *package, TList *loadopts,
8228 Bool_t notOnClient, TList *workers)
8230 if (!IsValid())
return -1;
8232 if (!package || !package[0]) {
8233 Error(
"EnablePackage",
"need to specify a package name");
8238 TString pac = package;
8239 if (pac.EndsWith(
".par"))
8240 pac.Remove(pac.Length()-4);
8241 pac = gSystem->BaseName(pac);
8243 EBuildPackageOpt opt = kBuildAll;
8245 opt = kDontBuildOnClient;
8248 Int_t chkveropt = TPackMgr::kCheckROOT;
8249 TString ocv = gEnv->GetValue(
"Proof.Package.CheckVersion",
"");
8250 if (!ocv.IsNull()) {
8251 if (ocv ==
"off" || ocv ==
"0")
8252 chkveropt = (Int_t) TPackMgr::kDontCheck;
8253 else if (ocv ==
"on" || ocv ==
"1")
8254 chkveropt = (Int_t) TPackMgr::kCheckROOT;
8256 Warning(
"EnablePackage",
"'checkversion' option unknown from rootrc: '%s' - ignored", ocv.Data());
8259 TParameter<Int_t> *pcv = (TParameter<Int_t> *) loadopts->FindObject(
"PROOF_Package_CheckVersion");
8261 chkveropt = pcv->GetVal();
8262 loadopts->Remove(pcv);
8267 Info(
"EnablePackage",
"using check version option: %d", chkveropt);
8269 if (BuildPackage(pac, opt, chkveropt, workers) == -1)
8272 TList *optls = (loadopts && loadopts->GetSize() > 0) ? loadopts : 0;
8273 if (optls && fProtocol <= 28) {
8274 Warning(
"EnablePackage",
"remote server does not support options: ignoring the option list");
8278 if (LoadPackage(pac, notOnClient, optls, workers) == -1)
8282 if (!fEnabledPackagesOnCluster) {
8283 fEnabledPackagesOnCluster =
new TList;
8284 fEnabledPackagesOnCluster->SetOwner();
8286 if (!fEnabledPackagesOnCluster->FindObject(pac)) {
8287 TPair *pck = (optls && optls->GetSize() > 0) ?
new TPair(
new TObjString(pac), optls->Clone())
8288 :
new TPair(
new TObjString(pac), 0);
8289 fEnabledPackagesOnCluster->Add(pck);
8303 Int_t TProof::DownloadPackage(
const char *pack,
const char *dstdir)
8305 if (!fManager || !(fManager->IsValid())) {
8306 Error(
"DownloadPackage",
"the manager is undefined!");
8311 TString parname(gSystem->BaseName(pack)), src, dst;
8312 if (!parname.EndsWith(
".par")) parname +=
".par";
8313 src.Form(
"packages/%s", parname.Data());
8314 if (!dstdir || strlen(dstdir) <= 0) {
8315 dst.Form(
"./%s", parname.Data());
8319 if (gSystem->GetPathInfo(dstdir, st) != 0) {
8321 if (gSystem->mkdir(dstdir, kTRUE) != 0) {
8322 Error(
"DownloadPackage",
8323 "could not create the destination directory '%s' (errno: %d)",
8324 dstdir, TSystem::GetErrno());
8327 }
else if (!R_ISDIR(st.fMode) && !R_ISLNK(st.fMode)) {
8328 Error(
"DownloadPackage",
8329 "destination path '%s' exist but is not a directory!", dstdir);
8332 dst.Form(
"%s/%s", dstdir, parname.Data());
8337 RedirectHandle_t rh;
8338 if (gSystem->RedirectOutput(fLogFileName,
"a", &rh) != 0)
8339 Warning(
"DownloadPackage",
"problems redirecting output to '%s'", fLogFileName.Data());
8340 Int_t rc = fManager->Stat(src, stsrc);
8341 if (gSystem->RedirectOutput(0, 0, &rh) != 0)
8342 Warning(
"DownloadPackage",
"problems restoring output");
8345 ShowPackages(kFALSE, kTRUE);
8346 TMacro *mp = GetLastLog();
8349 Bool_t isGlobal = kFALSE;
8350 TIter nxl(mp->GetListOfLines());
8353 while ((os = (TObjString *) nxl())) {
8354 TString s(os->GetName());
8355 if (s.Contains(
"*** Global Package cache")) {
8357 s.Remove(0, s.Last(
':') + 1);
8358 s.Remove(s.Last(
' '));
8361 }
else if (s.Contains(
"*** Package cache")) {
8366 if (isGlobal && s.Contains(parname)) {
8367 src.Form(
"%s/%s", globaldir.Data(), parname.Data());
8377 if (fManager->GetFile(src, dst,
"silent") != 0) {
8378 Error(
"DownloadPackage",
"problems downloading '%s' (src:%s, dst:%s)",
8379 pack, src.Data(), dst.Data());
8382 Info(
"DownloadPackage",
"'%s' cross-checked against master repository (local path: %s)",
8410 Int_t TProof::UploadPackage(
const char *pack, EUploadPackageOpt opt,
8413 if (!IsValid())
return -1;
8416 TFile::EFileType ft = TFile::GetType(pack);
8417 Bool_t remotepar = (ft == TFile::kWeb || ft == TFile::kNet) ? kTRUE : kFALSE;
8419 TString par(pack), base, name;
8420 if (par.EndsWith(
".par")) {
8421 base = gSystem->BaseName(par);
8422 name = base(0, base.Length() - strlen(
".par"));
8424 name = gSystem->BaseName(par);
8425 base.Form(
"%s.par", name.Data());
8430 gSystem->ExpandPathName(par);
8431 if (gSystem->AccessPathName(par, kReadPermission)) {
8433 if (!remotepar) xrc = TPackMgr::FindParPath(fPackMgr, name, par);
8437 Info(
"UploadPackage",
"global package found (%s): no upload needed",
8440 }
else if (xrc < 0) {
8441 Error(
"UploadPackage",
"PAR file '%s' not found", par.Data());
8458 if (TestBit(TProof::kIsClient)) {
8459 Bool_t rmold = (opt == TProof::kRemoveOld) ? kTRUE : kFALSE;
8460 if (fPackMgr->Install(par, rmold) < 0) {
8461 Error(
"UploadPackage",
"installing '%s' failed", gSystem->BaseName(par));
8467 if (IsLite())
return 0;
8469 TMD5 *md5 = fPackMgr->ReadMD5(name);
8472 if (remotepar && GetRemoteProtocol() > 36) {
8473 smsg.Form(
"+%s", par.Data());
8475 smsg.Form(
"+%s", base.Data());
8478 TMessage mess(kPROOF_CHECKFILE);
8479 mess << smsg << (*md5);
8480 TMessage mess2(kPROOF_CHECKFILE);
8481 smsg.Replace(0, 1,
"-");
8482 mess2 << smsg << (*md5);
8483 TMessage mess3(kPROOF_CHECKFILE);
8484 smsg.Replace(0, 1,
"=");
8485 mess3 << smsg << (*md5);
8489 if (fProtocol > 8) {
8491 mess << (UInt_t) opt;
8492 mess2 << (UInt_t) opt;
8493 mess3 << (UInt_t) opt;
8499 workers = fUniqueSlaves;
8500 TIter next(workers);
8502 while ((sl = (TSlave *) next())) {
8506 sl->GetSocket()->Send(mess);
8508 fCheckFileStatus = 0;
8509 Collect(sl, fCollectTimeout, kPROOF_CHECKFILE);
8510 if (fCheckFileStatus == 0) {
8512 if (fProtocol > 5) {
8514 smsg.Form(
"%s/%s/%s", sl->GetProofWorkDir(), kPROOF_PackDir, base.Data());
8515 if (SendFile(par, (kBinary | kForce | kCpBin | kForward), smsg.Data(), sl) < 0) {
8516 Error(
"UploadPackage",
"%s: problems uploading file %s",
8517 sl->GetOrdinal(), par.Data());
8522 TFTP ftp(TString(
"root://")+sl->GetName(), 1);
8523 if (!ftp.IsZombie()) {
8524 smsg.Form(
"%s/%s", sl->GetProofWorkDir(), kPROOF_PackDir);
8525 ftp.cd(smsg.Data());
8526 ftp.put(par, base.Data());
8531 sl->GetSocket()->Send(mess2);
8532 fCheckFileStatus = 0;
8533 Collect(sl, fCollectTimeout, kPROOF_CHECKFILE);
8534 if (fCheckFileStatus == 0) {
8535 Error(
"UploadPackage",
"%s: unpacking of package %s failed",
8536 sl->GetOrdinal(), base.Data());
8543 TIter nextmaster(fNonUniqueMasters);
8545 while ((ma = (TSlave *) nextmaster())) {
8549 ma->GetSocket()->Send(mess3);
8551 fCheckFileStatus = 0;
8552 Collect(ma, fCollectTimeout, kPROOF_CHECKFILE);
8553 if (fCheckFileStatus == 0) {
8555 Error(
"UploadPackage",
"package %s did not exist on submaster %s",
8556 base.Data(), ma->GetOrdinal());
8568 void TProof::AssertMacroPath(
const char *macro)
8570 static TString macrop(gROOT->GetMacroPath());
8571 if (macro && strlen(macro) > 0) {
8572 TString dirn(gSystem->DirName(macro));
8573 if (!macrop.Contains(dirn)) {
8574 macrop += TString::Format(
"%s:", dirn.Data());
8575 gROOT->SetMacroPath(macrop);
8597 Int_t TProof::Load(
const char *macro, Bool_t notOnClient, Bool_t uniqueWorkers,
8600 if (!IsValid())
return -1;
8602 if (!macro || !macro[0]) {
8603 Error(
"Load",
"need to specify a macro name");
8608 TProof::AssertMacroPath(macro);
8610 if (TestBit(TProof::kIsClient) && !wrks) {
8613 TString addsname, implname = macro;
8614 Ssiz_t icom = implname.Index(
",");
8615 if (icom != kNPOS) {
8616 addsname = implname(icom + 1, implname.Length());
8617 implname.Remove(icom);
8619 TString basemacro = gSystem->BaseName(implname), mainmacro(implname);
8620 TString bmsg(basemacro), acmode, args, io;
8621 implname = gSystem->SplitAclicMode(implname, acmode, args, io);
8624 Int_t dot = implname.Last(
'.');
8626 Info(
"Load",
"macro '%s' does not contain a '.': do nothing", macro);
8631 Bool_t hasHeader = kTRUE;
8632 TString headname = implname;
8633 headname.Remove(dot);
8635 if (gSystem->AccessPathName(headname, kReadPermission)) {
8636 TString h = headname;
8637 headname.Remove(dot);
8639 if (gSystem->AccessPathName(headname, kReadPermission)) {
8642 Info(
"Load",
"no associated header file found: tried: %s %s",
8643 h.Data(), headname.Data());
8650 if (!addsname.IsNull()) {
8653 while (addsname.Tokenize(fn, from,
",")) {
8654 if (gSystem->AccessPathName(fn, kReadPermission)) {
8655 Error(
"Load",
"additional file '%s' not found", fn.Data());
8660 TString dirn(gSystem->DirName(fn));
8661 if (addincs.IsNull()) {
8662 addincs.Form(
"-I%s", dirn.Data());
8663 }
else if (!addincs.Contains(dirn)) {
8664 addincs += TString::Format(
" -I%s", dirn.Data());
8668 addfiles.Add(
new TObjString(fn));
8674 if (SendFile(implname, kAscii | kForward ,
"cache") == -1) {
8675 Error(
"Load",
"problems sending implementation file %s", implname.Data());
8679 if (SendFile(headname, kAscii | kForward ,
"cache") == -1) {
8680 Error(
"Load",
"problems sending header file %s", headname.Data());
8684 if (addfiles.GetSize() > 0) {
8685 TIter nxfn(&addfiles);
8687 while ((os = (TObjString *) nxfn())) {
8689 if (SendFile(os->GetName(), kAscii | kForward,
"cache") == -1) {
8690 Error(
"Load",
"problems sending additional file %s", os->GetName());
8694 bmsg += TString::Format(
",%s", gSystem->BaseName(os->GetName()));
8696 addfiles.SetOwner(kTRUE);
8700 TMessage mess(kPROOF_CACHE);
8701 if (GetRemoteProtocol() < 34) {
8702 mess << Int_t(kLoadMacro) << basemacro;
8704 AddIncludePath(
"../../cache");
8706 mess << Int_t(kLoadMacro) << bmsg;
8708 Broadcast(mess, kActive);
8713 TString oldincs = gSystem->GetIncludePath();
8714 if (!addincs.IsNull()) gSystem->AddIncludePath(addincs);
8718 gROOT->ProcessLine(TString::Format(
".L %s", mainmacro.Data()));
8721 if (!addincs.IsNull()) gSystem->SetIncludePath(oldincs);
8724 TString mp(TROOT::GetMacroPath());
8725 TString np(gSystem->DirName(macro));
8728 if (!mp.BeginsWith(np) && !mp.Contains(
":"+np)) {
8729 Int_t ip = (mp.BeginsWith(
".:")) ? 2 : 0;
8731 TROOT::SetMacroPath(mp);
8733 Info(
"Load",
"macro path set to '%s'", TROOT::GetMacroPath());
8742 PDB(kGlobal, 1) Info("Load", "adding loaded macro: %s", macro);
8743 if (!fLoadedMacros) {
8744 fLoadedMacros =
new TList();
8745 fLoadedMacros->SetOwner();
8748 fLoadedMacros->Add(
new TObjString(macro));
8756 TString basemacro = gSystem->BaseName(macro);
8757 TMessage mess(kPROOF_CACHE);
8759 if (uniqueWorkers) {
8760 mess << Int_t(kLoadMacro) << basemacro;
8762 Broadcast(mess, wrks);
8765 Broadcast(mess, kUnique);
8774 TIter nxw(fActiveSlaves);
8775 while ((wrk = (TSlave *)nxw())) {
8776 if (!fUniqueSlaves->FindObject(wrk)) {
8782 Int_t ld = basemacro.Last(
'.');
8784 Int_t lpp = basemacro.Index(
"++", ld);
8785 if (lpp != kNPOS) basemacro.Replace(lpp, 2,
"+");
8787 mess << Int_t(kLoadMacro) << basemacro;
8788 Broadcast(mess, &others);
8792 PDB(kGlobal, 1) Info("Load", "adding loaded macro: %s", macro);
8793 if (!fLoadedMacros) {
8794 fLoadedMacros =
new TList();
8795 fLoadedMacros->SetOwner();
8799 fLoadedMacros->Add(
new TObjString(macro));
8812 Int_t TProof::AddDynamicPath(
const char *libpath, Bool_t onClient, TList *wrks,
8815 if ((!libpath || !libpath[0])) {
8817 Info(
"AddDynamicPath",
"list is empty - nothing to do");
8823 HandleLibIncPath(
"lib", kTRUE, libpath);
8825 TMessage m(kPROOF_LIB_INC_PATH);
8826 m << TString(
"lib") << (Bool_t)kTRUE;
8829 if (libpath && strlen(libpath)) {
8830 m << TString(libpath);
8836 m << (Int_t)doCollect;
8842 Collect(wrks, fCollectTimeout);
8845 Collect(kActive, fCollectTimeout);
8857 Int_t TProof::AddIncludePath(
const char *incpath, Bool_t onClient, TList *wrks,
8860 if ((!incpath || !incpath[0])) {
8862 Info(
"AddIncludePath",
"list is empty - nothing to do");
8868 HandleLibIncPath(
"inc", kTRUE, incpath);
8870 TMessage m(kPROOF_LIB_INC_PATH);
8871 m << TString(
"inc") << (Bool_t)kTRUE;
8874 if (incpath && strlen(incpath)) {
8875 m << TString(incpath);
8881 m << (Int_t)doCollect;
8887 Collect(wrks, fCollectTimeout);
8890 Collect(kActive, fCollectTimeout);
8902 Int_t TProof::RemoveDynamicPath(
const char *libpath, Bool_t onClient)
8904 if ((!libpath || !libpath[0])) {
8906 Info(
"RemoveDynamicPath",
"list is empty - nothing to do");
8912 HandleLibIncPath(
"lib", kFALSE, libpath);
8914 TMessage m(kPROOF_LIB_INC_PATH);
8915 m << TString(
"lib") <<(Bool_t)kFALSE;
8918 if (libpath && strlen(libpath))
8919 m << TString(libpath);
8925 Collect(kActive, fCollectTimeout);
8936 Int_t TProof::RemoveIncludePath(
const char *incpath, Bool_t onClient)
8938 if ((!incpath || !incpath[0])) {
8940 Info(
"RemoveIncludePath",
"list is empty - nothing to do");
8946 HandleLibIncPath(
"in", kFALSE, incpath);
8948 TMessage m(kPROOF_LIB_INC_PATH);
8949 m << TString(
"inc") << (Bool_t)kFALSE;
8952 if (incpath && strlen(incpath))
8953 m << TString(incpath);
8959 Collect(kActive, fCollectTimeout);
8967 void TProof::HandleLibIncPath(
const char *what, Bool_t add,
const char *dirs)
8973 if ((type !=
"lib") && (type !=
"inc")) {
8974 Error(
"HandleLibIncPath",
"unknown action type: %s - protocol error?", type.Data());
8979 path.ReplaceAll(
",",
" ");
8983 if (path.Length() > 0 && path !=
"-") {
8984 if (!(op = path.Tokenize(
" "))) {
8985 Warning(
"HandleLibIncPath",
"decomposing path %s", path.Data());
8992 if (type ==
"lib") {
8995 TIter nxl(op, kIterBackward);
8996 TObjString *lib = 0;
8997 while ((lib = (TObjString *) nxl())) {
8999 TString xlib = lib->GetName();
9000 gSystem->ExpandPathName(xlib);
9002 if (!gSystem->AccessPathName(xlib, kReadPermission)) {
9003 TString newlibpath = gSystem->GetDynamicPath();
9006 if (newlibpath.BeginsWith(
".:"))
9008 if (newlibpath.Index(xlib) == kNPOS) {
9009 newlibpath.Insert(pos,TString::Format(
"%s:", xlib.Data()));
9010 gSystem->SetDynamicPath(newlibpath);
9014 Info(
"HandleLibIncPath",
9015 "libpath %s does not exist or cannot be read - not added", xlib.Data());
9023 TObjString *inc = 0;
9024 while ((inc = (TObjString *) nxi())) {
9026 TString xinc = inc->GetName();
9027 gSystem->ExpandPathName(xinc);
9029 if (!gSystem->AccessPathName(xinc, kReadPermission)) {
9030 TString curincpath = gSystem->GetIncludePath();
9031 if (curincpath.Index(xinc) == kNPOS)
9032 gSystem->AddIncludePath(TString::Format(
"-I%s", xinc.Data()));
9035 Info(
"HandleLibIncPath",
9036 "incpath %s does not exist or cannot be read - not added", xinc.Data());
9043 if (type ==
"lib") {
9047 TObjString *lib = 0;
9048 while ((lib = (TObjString *) nxl())) {
9050 TString xlib = lib->GetName();
9051 gSystem->ExpandPathName(xlib);
9053 TString newlibpath = gSystem->GetDynamicPath();
9054 newlibpath.ReplaceAll(TString::Format(
"%s:", xlib.Data()),
"");
9055 gSystem->SetDynamicPath(newlibpath);
9062 TObjString *inc = 0;
9063 while ((inc = (TObjString *) nxi())) {
9064 TString newincpath = gSystem->GetIncludePath();
9065 newincpath.ReplaceAll(TString::Format(
"-I%s", inc->GetName()),
"");
9067 newincpath.ReplaceAll(gInterpreter->GetIncludePath(),
"");
9068 gSystem->SetIncludePath(newincpath);
9077 TList *TProof::GetListOfPackages()
9082 TMessage mess(kPROOF_CACHE);
9083 mess << Int_t(kListPackages);
9085 Collect(kActive, fCollectTimeout);
9087 return fAvailablePackages;
9093 TList *TProof::GetListOfEnabledPackages()
9098 TMessage mess(kPROOF_CACHE);
9099 mess << Int_t(kListEnabledPackages);
9101 Collect(kActive, fCollectTimeout);
9103 return fEnabledPackages;
9109 void TProof::PrintProgress(Long64_t total, Long64_t processed,
9110 Float_t procTime, Long64_t bytesread)
9112 if (fPrintProgress) {
9113 Bool_t redirlog = fRedirLog;
9116 (*fPrintProgress)(total, processed, procTime, bytesread);
9117 fRedirLog = redirlog;
9121 fprintf(stderr,
"[TProof::Progress] Total %lld events\t|", total);
9123 for (
int l = 0; l < 20; l++) {
9125 if (l < 20*processed/total)
9126 fprintf(stderr,
"=");
9127 else if (l == 20*processed/total)
9128 fprintf(stderr,
">");
9129 else if (l > 20*processed/total)
9130 fprintf(stderr,
".");
9132 fprintf(stderr,
"=");
9134 Float_t evtrti = (procTime > 0. && processed > 0) ? processed / procTime : -1.;
9135 Float_t mbsrti = (procTime > 0. && bytesread > 0) ? bytesread / procTime : -1.;
9136 TString sunit(
"B/s");
9138 Float_t remainingTime = (total >= processed) ? (total - processed) / evtrti : -1;
9140 const Float_t toK = 1024., toM = 1048576., toG = 1073741824.;
9141 if (mbsrti >= toG) {
9144 }
else if (mbsrti >= toM) {
9147 }
else if (mbsrti >= toK) {
9151 fprintf(stderr,
"| %.02f %% [%.1f evts/s, %.1f %s, time left: %.1f s]\r",
9152 (total ? ((100.0*processed)/total) : 100.0), evtrti, mbsrti, sunit.Data(), remainingTime);
9154 fprintf(stderr,
"| %.02f %% [%.1f evts/s, time left: %.1f s]\r",
9155 (total ? ((100.0*processed)/total) : 100.0), evtrti, remainingTime);
9158 fprintf(stderr,
"| %.02f %%\r",
9159 (total ? ((100.0*processed)/total) : 100.0));
9161 if (processed >= total) {
9162 fprintf(stderr,
"\n Query processing time: %.1f s\n", procTime);
9170 void TProof::Progress(Long64_t total, Long64_t processed)
9172 if (fPrintProgress) {
9174 return (*fPrintProgress)(total, processed, -1., -1);
9178 Info("Progress","%2f (%lld/%lld)", 100.*processed/total, processed, total);
9180 if (gROOT->IsBatch()) {
9183 PrintProgress(total, processed);
9185 EmitVA(
"Progress(Long64_t,Long64_t)", 2, total, processed);
9193 void TProof::Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
9194 Float_t initTime, Float_t procTime,
9195 Float_t evtrti, Float_t mbrti)
9198 Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
9199 initTime, procTime, evtrti, mbrti);
9201 if (gROOT->IsBatch()) {
9204 PrintProgress(total, processed, procTime, bytesread);
9206 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t)",
9207 7, total, processed, bytesread, initTime, procTime, evtrti, mbrti);
9215 void TProof::Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
9216 Float_t initTime, Float_t procTime,
9217 Float_t evtrti, Float_t mbrti, Int_t actw, Int_t tses, Float_t eses)
9220 Info("Progress","%lld %lld %lld %f %f %f %f %d %f", total, processed, bytesread,
9221 initTime, procTime, evtrti, mbrti, actw, eses);
9223 if (gROOT->IsBatch()) {
9226 PrintProgress(total, processed, procTime, bytesread);
9228 EmitVA(
"Progress(Long64_t,Long64_t,Long64_t,Float_t,Float_t,Float_t,Float_t,Int_t,Int_t,Float_t)",
9229 10, total, processed, bytesread, initTime, procTime, evtrti, mbrti, actw, tses, eses);
9237 void TProof::Feedback(TList *objs)
9240 Info("Feedback","%d objects", objs->GetSize());
9242 Info(
"Feedback",
"%d objects", objs->GetSize());
9246 Emit(
"Feedback(TList *objs)", (Long_t) objs);
9252 void TProof::CloseProgressDialog()
9255 Info("CloseProgressDialog",
9256 "called: have progress dialog: %d", fProgressDialogStarted);
9259 if (!fProgressDialogStarted)
9262 Emit("CloseProgressDialog()");
9268 void TProof::ResetProgressDialog(const
char *sel, Int_t sz, Long64_t fst,
9272 Info("ResetProgressDialog","(%s,%d,%lld,%lld)", sel, sz, fst, ent);
9274 EmitVA("ResetProgressDialog(const
char*,Int_t,Long64_t,Long64_t)",
9275 4, sel, sz, fst, ent);
9281 void TProof::StartupMessage(const
char *msg, Bool_t st, Int_t done, Int_t total)
9284 Info("StartupMessage","(%s,%d,%d,%d)", msg, st, done, total);
9286 EmitVA("StartupMessage(const
char*,Bool_t,Int_t,Int_t)",
9287 4, msg, st, done, total);
9293 void TProof::DataSetStatus(const
char *msg, Bool_t st, Int_t done, Int_t total)
9296 Info("DataSetStatus","(%s,%d,%d,%d)", msg, st, done, total);
9298 EmitVA("DataSetStatus(const
char*,Bool_t,Int_t,Int_t)",
9299 4, msg, st, done, total);
9305 void TProof::SendDataSetStatus(const
char *action, UInt_t done,
9306 UInt_t tot, Bool_t st)
9310 TString type =
"files";
9311 Int_t frac = (Int_t) (done*100.)/tot;
9312 char msg[512] = {0};
9314 snprintf(msg, 512,
"%s: OK (%d %s) \n",
9315 action,tot, type.Data());
9317 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
9318 action, done, tot, frac);
9321 fprintf(stderr,
"%s", msg);
9323 NotifyLogMsg(msg, 0);
9328 if (TestBit(TProof::kIsMaster)) {
9329 TMessage mess(kPROOF_DATASET_STATUS);
9330 mess << TString(action) << tot << done << st;
9331 gProofServ->GetSocket()->Send(mess);
9338 void TProof::QueryResultReady(
const char *ref)
9341 Info("QueryResultReady","ref: %s", ref);
9343 Emit("QueryResultReady(const
char*)",ref);
9349 void TProof::ValidateDSet(TDSet *dset)
9351 if (dset->ElementsValid())
return;
9357 slholder.SetOwner();
9359 elemholder.SetOwner();
9362 TIter nextSlave(GetListOfActiveSlaves());
9363 while (TSlave *sl = dynamic_cast<TSlave*>(nextSlave())) {
9365 TPair *p =
dynamic_cast<TPair*
>(nodes.FindObject(sl->GetName()));
9368 sllist->SetName(sl->GetName());
9369 slholder.Add(sllist);
9370 TList *elemlist =
new TList;
9371 elemlist->SetName(TString(sl->GetName())+
"_elem");
9372 elemholder.Add(elemlist);
9373 nodes.Add(
new TPair(sllist, elemlist));
9375 sllist =
dynamic_cast<TList*
>(p->Key());
9377 if (sllist) sllist->Add(sl);
9383 for (Int_t i = 0; i < 2; i++) {
9384 Bool_t local = i>0?kFALSE:kTRUE;
9385 TIter nextElem(local ? dset->GetListOfElements() : &nonLocal);
9386 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
9387 if (elem->GetValid())
continue;
9388 TPair *p =
dynamic_cast<TPair*
>(local?nodes.FindObject(TUrl(elem->GetFileName()).GetHost()):nodes.At(0));
9390 TList *eli =
dynamic_cast<TList*
>(p->Value());
9391 TList *sli =
dynamic_cast<TList*
>(p->Key());
9397 Bool_t stop = kFALSE;
9399 TPair *p3 =
dynamic_cast<TPair*
>(nodes.After(p2->Key()));
9401 TList *p3v =
dynamic_cast<TList*
>(p3->Value());
9402 TList *p3k =
dynamic_cast<TList*
>(p3->Key());
9404 Int_t nelem = p3v->GetSize();
9405 Int_t nsl = p3k->GetSize();
9406 if (nelem*sli->GetSize() < eli->GetSize()*nsl) p2 = p3;
9415 nodes.Remove(p->Key());
9416 nodes.AddAfter(p2->Key(), p);
9419 Warning(
"ValidateDSet",
"invalid values from TPair! Protocol error?");
9427 Warning(
"ValidateDSet",
"no node to allocate TDSetElement to - ignoring");
9435 TIter nextNode(&nodes);
9437 while (TPair *node = dynamic_cast<TPair*>(nextNode())) {
9438 TList *slaves =
dynamic_cast<TList*
>(node->Key());
9439 TList *setelements =
dynamic_cast<TList*
>(node->Value());
9440 if (!slaves || !setelements)
continue;
9442 Int_t nslaves = slaves->GetSize();
9443 Int_t nelements = setelements->GetSize();
9444 for (Int_t i=0; i<nslaves; i++) {
9446 TDSet copyset(dset->GetType(), dset->GetObjName(),
9447 dset->GetDirectory());
9448 for (Int_t j = (i*nelements)/nslaves;
9449 j < ((i+1)*nelements)/nslaves;
9451 TDSetElement *elem =
9452 dynamic_cast<TDSetElement*
>(setelements->At(j));
9454 copyset.Add(elem->GetFileName(), elem->GetObjName(),
9455 elem->GetDirectory(), elem->GetFirst(),
9456 elem->GetNum(), elem->GetMsd());
9460 if (copyset.GetListOfElements()->GetSize()>0) {
9461 TMessage mesg(kPROOF_VALIDATE_DSET);
9464 TSlave *sl =
dynamic_cast<TSlave*
>(slaves->At(i));
9466 PDB(kGlobal,1) Info("ValidateDSet",
9467 "Sending TDSet with %d elements to slave %s"
9469 copyset.GetListOfElements()->GetSize(),
9471 sl->GetSocket()->Send(mesg);
9479 Info("ValidateDSet","Calling Collect");
9480 Collect(&usedslaves);
9491 void TProof::AddInputData(TObject *obj, Bool_t push)
9494 if (!fInputData) fInputData =
new TList;
9495 if (!fInputData->FindObject(obj)) {
9496 fInputData->Add(obj);
9497 SetBit(TProof::kNewInputData);
9500 if (push) SetBit(TProof::kNewInputData);
9507 void TProof::ClearInputData(TObject *obj)
9511 fInputData->SetOwner(kTRUE);
9512 SafeDelete(fInputData);
9514 ResetBit(TProof::kNewInputData);
9518 TList *in = GetInputList();
9519 while ((o = GetInputList()->FindObject(
"PROOF_InputDataFile")))
9521 while ((o = GetInputList()->FindObject(
"PROOF_InputData")))
9525 fInputDataFile =
"";
9526 gSystem->Unlink(kPROOF_InputDataFile);
9528 }
else if (fInputData) {
9529 Int_t sz = fInputData->GetSize();
9530 while (fInputData->FindObject(obj))
9531 fInputData->Remove(obj);
9533 if (sz != fInputData->GetSize())
9534 SetBit(TProof::kNewInputData);
9541 void TProof::ClearInputData(
const char *name)
9543 TObject *obj = (fInputData && name) ? fInputData->FindObject(name) : 0;
9544 if (obj) ClearInputData(obj);
9554 void TProof::SetInputDataFile(
const char *datafile)
9556 if (datafile && strlen(datafile) > 0) {
9557 if (fInputDataFile != datafile && strcmp(datafile, kPROOF_InputDataFile))
9558 SetBit(TProof::kNewInputData);
9559 fInputDataFile = datafile;
9561 if (!fInputDataFile.IsNull())
9562 SetBit(TProof::kNewInputData);
9563 fInputDataFile =
"";
9566 if (fInputDataFile != kPROOF_InputDataFile && !fInputDataFile.IsNull() &&
9567 gSystem->AccessPathName(fInputDataFile, kReadPermission)) {
9568 fInputDataFile =
"";
9581 void TProof::SendInputDataFile()
9585 PrepareInputDataFile(dataFile);
9588 if (dataFile.Length() > 0) {
9590 Info(
"SendInputDataFile",
"broadcasting %s", dataFile.Data());
9591 BroadcastFile(dataFile.Data(), kBinary,
"cache", kActive);
9594 TString t = TString::Format(
"cache:%s", gSystem->BaseName(dataFile));
9595 AddInput(
new TNamed(
"PROOF_InputDataFile", t.Data()));
9608 void TProof::PrepareInputDataFile(TString &dataFile)
9611 Bool_t newdata = TestBit(TProof::kNewInputData) ? kTRUE : kFALSE;
9613 ResetBit(TProof::kNewInputData);
9616 Bool_t list_ok = (fInputData && fInputData->GetSize() > 0) ? kTRUE : kFALSE;
9618 Bool_t file_ok = kFALSE;
9619 if (fInputDataFile != kPROOF_InputDataFile && !fInputDataFile.IsNull() &&
9620 !gSystem->AccessPathName(fInputDataFile, kReadPermission)) {
9622 TFile *f = TFile::Open(fInputDataFile);
9623 if (f && f->GetListOfKeys() && f->GetListOfKeys()->GetSize() > 0)
9629 TList *in = GetInputList();
9630 while ((o = GetInputList()->FindObject(
"PROOF_InputDataFile")))
9632 while ((o = GetInputList()->FindObject(
"PROOF_InputData")))
9637 if (!list_ok && !file_ok)
return;
9640 if (file_ok && !list_ok) {
9642 dataFile = fInputDataFile;
9643 }
else if (!file_ok && list_ok) {
9644 fInputDataFile = kPROOF_InputDataFile;
9646 if (!newdata && !gSystem->AccessPathName(fInputDataFile))
return;
9648 TFile *f = TFile::Open(fInputDataFile,
"RECREATE");
9651 TIter next(fInputData);
9653 while ((obj = next())) {
9654 obj->Write(0, TObject::kSingleKey, 0);
9659 Error(
"PrepareInputDataFile",
"could not (re-)create %s", fInputDataFile.Data());
9662 dataFile = fInputDataFile;
9663 }
else if (file_ok && list_ok) {
9664 dataFile = kPROOF_InputDataFile;
9666 if (newdata || gSystem->AccessPathName(dataFile)) {
9668 if (!gSystem->AccessPathName(dataFile))
9669 gSystem->Unlink(dataFile);
9670 if (dataFile != fInputDataFile) {
9672 if (gSystem->CopyFile(fInputDataFile, dataFile, kTRUE) != 0) {
9673 Error(
"PrepareInputDataFile",
"could not make local copy of %s", fInputDataFile.Data());
9678 TFile *f = TFile::Open(dataFile,
"UPDATE");
9681 TIter next(fInputData);
9683 while ((obj = next())) {
9684 obj->Write(0, TObject::kSingleKey, 0);
9689 Error(
"PrepareInputDataFile",
"could not open %s for updating", dataFile.Data());
9703 void TProof::AddInput(TObject *obj)
9705 if (fPlayer) fPlayer->AddInput(obj);
9711 void TProof::ClearInput()
9713 if (fPlayer) fPlayer->ClearInput();
9716 AddInput(fFeedback);
9722 TList *TProof::GetInputList()
9724 return (fPlayer ? fPlayer->GetInputList() : (TList *)0);
9731 TObject *TProof::GetOutput(
const char *name)
9734 if (TestBit(TProof::kIsMaster))
9736 return (fPlayer) ? fPlayer->GetOutput(name) : (TObject *)0;
9739 return (GetOutputList()) ? GetOutputList()->FindObject(name) : (TObject *)0;
9745 TObject *TProof::GetOutput(
const char *name, TList *out)
9748 if (!name || (name && strlen(name) <= 0) ||
9749 !out || (out && out->GetSize() <= 0))
return o;
9750 if ((o = out->FindObject(name)))
return o;
9754 TProofOutputFile *pf = 0;
9756 while ((o = nxo())) {
9757 if ((pf = dynamic_cast<TProofOutputFile *> (o))) {
9759 if (!(f = (TFile *) gROOT->GetListOfFiles()->FindObject(pf->GetOutputFileName()))) {
9760 TString fn = TString::Format(
"%s/%s", pf->GetDir(), pf->GetFileName());
9761 f = TFile::Open(fn.Data());
9762 if (!f || (f && f->IsZombie())) {
9763 ::Warning(
"TProof::GetOutput",
"problems opening file %s", fn.Data());
9766 if (f && (o = f->Get(name)))
return o;
9777 TList *TProof::GetOutputList()
9779 if (fOutputList.GetSize() > 0)
return &fOutputList;
9781 fOutputList.AttachList(fPlayer->GetOutputList());
9782 return &fOutputList;
9791 void TProof::SetParameter(
const char *par,
const char *value)
9794 Warning(
"SetParameter",
"player undefined! Ignoring");
9798 TList *il = fPlayer->GetInputList();
9799 TObject *item = il->FindObject(par);
9804 il->Add(
new TNamed(par, value));
9810 void TProof::SetParameter(
const char *par, Int_t value)
9813 Warning(
"SetParameter",
"player undefined! Ignoring");
9817 TList *il = fPlayer->GetInputList();
9818 TObject *item = il->FindObject(par);
9823 il->Add(
new TParameter<Int_t>(par, value));
9829 void TProof::SetParameter(
const char *par, Long_t value)
9832 Warning(
"SetParameter",
"player undefined! Ignoring");
9836 TList *il = fPlayer->GetInputList();
9837 TObject *item = il->FindObject(par);
9842 il->Add(
new TParameter<Long_t>(par, value));
9848 void TProof::SetParameter(
const char *par, Long64_t value)
9851 Warning(
"SetParameter",
"player undefined! Ignoring");
9855 TList *il = fPlayer->GetInputList();
9856 TObject *item = il->FindObject(par);
9861 il->Add(
new TParameter<Long64_t>(par, value));
9867 void TProof::SetParameter(
const char *par, Double_t value)
9870 Warning(
"SetParameter",
"player undefined! Ignoring");
9874 TList *il = fPlayer->GetInputList();
9875 TObject *item = il->FindObject(par);
9880 il->Add(
new TParameter<Double_t>(par, value));
9887 TObject *TProof::GetParameter(
const char *par)
const
9890 Warning(
"GetParameter",
"player undefined! Ignoring");
9891 return (TObject *)0;
9894 TList *il = fPlayer->GetInputList();
9895 return il->FindObject(par);
9902 void TProof::DeleteParameters(
const char *wildcard)
9904 if (!fPlayer)
return;
9906 if (!wildcard) wildcard =
"";
9907 TRegexp re(wildcard, kTRUE);
9908 Int_t nch = strlen(wildcard);
9910 TList *il = fPlayer->GetInputList();
9914 while ((p = next())) {
9915 TString s = p->GetName();
9916 if (nch && s != wildcard && s.Index(re) == kNPOS)
continue;
9927 void TProof::ShowParameters(
const char *wildcard)
const
9929 if (!fPlayer)
return;
9931 if (!wildcard) wildcard =
"";
9932 TRegexp re(wildcard, kTRUE);
9933 Int_t nch = strlen(wildcard);
9935 TList *il = fPlayer->GetInputList();
9938 while ((p = next())) {
9939 TString s = p->GetName();
9940 if (nch && s != wildcard && s.Index(re) == kNPOS)
continue;
9941 if (p->IsA() == TNamed::Class()) {
9942 Printf(
"%s\t\t\t%s", s.Data(), p->GetTitle());
9943 }
else if (p->IsA() == TParameter<Long_t>::Class()) {
9944 Printf(
"%s\t\t\t%ld", s.Data(),
dynamic_cast<TParameter<Long_t>*
>(p)->GetVal());
9945 }
else if (p->IsA() == TParameter<Long64_t>::Class()) {
9946 Printf(
"%s\t\t\t%lld", s.Data(),
dynamic_cast<TParameter<Long64_t>*
>(p)->GetVal());
9947 }
else if (p->IsA() == TParameter<Double_t>::Class()) {
9948 Printf(
"%s\t\t\t%f", s.Data(),
dynamic_cast<TParameter<Double_t>*
>(p)->GetVal());
9950 Printf(
"%s\t\t\t%s", s.Data(), p->GetTitle());
9958 void TProof::AddFeedback(
const char *name)
9961 Info("AddFeedback", "Adding
object \"%s\" to feedback", name);
9962 if (fFeedback->FindObject(name) == 0)
9963 fFeedback->Add(new TObjString(name));
9969 void TProof::RemoveFeedback(const
char *name)
9971 TObject *obj = fFeedback->FindObject(name);
9973 fFeedback->Remove(obj);
9981 void TProof::ClearFeedback()
9983 fFeedback->Delete();
9989 void TProof::ShowFeedback()
const
9991 if (fFeedback->GetSize() == 0) {
9992 Info(
"",
"no feedback requested");
10002 TList *TProof::GetFeedbackList()
const
10011 TTree *TProof::GetTreeHeader(TDSet *dset)
10013 TList *l = GetListOfActiveSlaves();
10014 TSlave *sl = (TSlave*) l->First();
10016 Error(
"GetTreeHeader",
"No connection");
10020 TSocket *soc = sl->GetSocket();
10021 TMessage msg(kPROOF_GETTREEHEADER);
10029 if (fProtocol >= 20) {
10030 Collect(sl, fCollectTimeout, kPROOF_GETTREEHEADER);
10031 reply = (TMessage *) fRecvMessages->First();
10033 d = soc->Recv(reply);
10036 Error(
"GetTreeHeader",
"Error getting a replay from the master.Result %d", (
int) d);
10043 if (s1 ==
"Success")
10048 Info(
"GetTreeHeader",
"%s, message size: %d, entries: %d",
10049 s1.Data(), reply->BufferSize(), (int) t->GetMaxEntryLoop());
10051 Info(
"GetTreeHeader",
"tree header retrieval failed");
10063 TDrawFeedback *TProof::CreateDrawFeedback()
10065 return (fPlayer ? fPlayer->CreateDrawFeedback(
this) : (TDrawFeedback *)0);
10071 void TProof::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
10073 if (fPlayer) fPlayer->SetDrawFeedbackOption(f, opt);
10079 void TProof::DeleteDrawFeedback(TDrawFeedback *f)
10081 if (fPlayer) fPlayer->DeleteDrawFeedback(f);
10087 TList *TProof::GetOutputNames()
10148 void TProof::Browse(TBrowser *b)
10150 b->Add(fActiveSlaves, fActiveSlaves->Class(),
"fActiveSlaves");
10151 b->Add(&fMaster, fMaster.Class(),
"fMaster");
10152 b->Add(fFeedback, fFeedback->Class(),
"fFeedback");
10153 b->Add(fChains, fChains->Class(),
"fChains");
10156 b->Add(fPlayer->GetInputList(), fPlayer->GetInputList()->Class(),
"InputList");
10157 if (fPlayer->GetOutputList())
10158 b->Add(fPlayer->GetOutputList(), fPlayer->GetOutputList()->Class(),
"OutputList");
10159 if (fPlayer->GetListOfResults())
10160 b->Add(fPlayer->GetListOfResults(),
10161 fPlayer->GetListOfResults()->Class(),
"ListOfResults");
10168 void TProof::SetPlayer(TVirtualProofPlayer *player)
10180 TVirtualProofPlayer *TProof::MakePlayer(
const char *player, TSocket *s)
10185 SetPlayer(TVirtualProofPlayer::Create(player,
this, s));
10186 return GetPlayer();
10192 void TProof::AddChain(TChain *chain)
10194 fChains->Add(chain);
10200 void TProof::RemoveChain(TChain *chain)
10202 fChains->Remove(chain);
10209 void TProof::GetLog(Int_t start, Int_t end)
10211 if (!IsValid() || TestBit(TProof::kIsMaster))
return;
10213 TMessage msg(kPROOF_LOGFILE);
10215 msg << start << end;
10217 Broadcast(msg, kActive);
10218 Collect(kActive, fCollectTimeout);
10226 TMacro *TProof::GetLastLog()
10228 TMacro *maclog = 0;
10231 off_t nowlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_CUR);
10233 SysError(
"GetLastLog",
10234 "problem lseeking log file to current position (errno: %d)", TSystem::GetErrno());
10239 off_t startlog = nowlog;
10240 off_t endlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_END);
10242 SysError(
"GetLastLog",
10243 "problem lseeking log file to end position (errno: %d)", TSystem::GetErrno());
10248 UInt_t tolog = (UInt_t)(endlog - startlog);
10249 if (tolog <= 0)
return maclog;
10252 if (lseek(fileno(fLogFileR), startlog, SEEK_SET) < 0) {
10253 SysError(
"GetLastLog",
10254 "problem lseeking log file to start position (errno: %d)", TSystem::GetErrno());
10259 maclog =
new TMacro;
10263 Int_t wanted = (tolog >
sizeof(line)) ?
sizeof(line) : tolog;
10264 while (fgets(line, wanted, fLogFileR)) {
10265 Int_t r = strlen(line);
10267 if (line[r-1] ==
'\n') line[r-1] =
'\0';
10268 maclog->AddLine(line);
10274 wanted = (tolog >
sizeof(line)) ?
sizeof(line) : tolog;
10278 if (lseek(fileno(fLogFileR), nowlog, SEEK_SET) < 0) {
10279 Warning(
"GetLastLog",
10280 "problem lseeking log file to original position (errno: %d)", TSystem::GetErrno());
10290 void TProof::PutLog(TQueryResult *pq)
10294 TList *lines = pq->GetLogFile()->GetListOfLines();
10298 while ((l = (TObjString *)nxl()))
10299 EmitVA(
"LogMessage(const char*,Bool_t)", 2, l->GetName(), kFALSE);
10307 void TProof::ShowLog(
const char *queryref)
10311 Retrieve(queryref);
10315 if (fPlayer->GetListOfResults()) {
10316 TIter nxq(fPlayer->GetListOfResults());
10317 TQueryResult *qr = 0;
10318 while ((qr = (TQueryResult *) nxq()))
10319 if (strstr(queryref, qr->GetTitle()) &&
10320 strstr(queryref, qr->GetName()))
10340 void TProof::ShowLog(Int_t qry)
10343 off_t nowlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_CUR);
10345 SysError(
"ShowLog",
"problem lseeking log file (errno: %d)", TSystem::GetErrno());
10350 off_t startlog = nowlog;
10351 off_t endlog = lseek(fileno(fLogFileR), (off_t) 0, SEEK_END);
10353 SysError(
"ShowLog",
"problem lseeking log file (errno: %d)", TSystem::GetErrno());
10357 lseek(fileno(fLogFileR), nowlog, SEEK_SET);
10360 lseek(fileno(fLogFileR), (off_t) 0, SEEK_SET);
10361 }
else if (qry != -1) {
10363 TQueryResult *pq = 0;
10366 pq = (GetQueryResults()) ? ((TQueryResult *)(GetQueryResults()->Last())) : 0;
10368 GetListOfQueries();
10370 pq = (TQueryResult *)(fQueries->Last());
10372 }
else if (qry > 0) {
10373 TList *queries = GetQueryResults();
10375 TIter nxq(queries);
10376 while ((pq = (TQueryResult *)nxq()))
10377 if (qry == pq->GetSeqNum())
10381 queries = GetListOfQueries();
10382 TIter nxq(queries);
10383 while ((pq = (TQueryResult *)nxq()))
10384 if (qry == pq->GetSeqNum())
10393 Info(
"ShowLog",
"query %d not found in list", qry);
10399 UInt_t tolog = (UInt_t)(endlog - startlog);
10404 lseek(fileno(fLogFileR), startlog, SEEK_SET);
10410 Int_t wanted = (tolog >
sizeof(line)) ?
sizeof(line) : tolog;
10411 while (fgets(line, wanted, fLogFileR)) {
10413 Int_t r = strlen(line);
10414 if (!SendingLogToWindow()) {
10415 if (line[r-1] !=
'\n') line[r-1] =
'\n';
10419 Int_t w = write(fileno(stdout), p, r);
10421 SysError(
"ShowLog",
"error writing to stdout");
10428 tolog -= strlen(line);
10433 const char *opt = Getline(
"More (y/n)? [y]");
10443 wanted = (tolog >
sizeof(line)) ?
sizeof(line) : tolog;
10446 if (line[r-1] ==
'\n') line[r-1] = 0;
10447 LogMessage(line, kFALSE);
10450 if (!SendingLogToWindow()) {
10452 if (write(fileno(stdout),
"\n", 1) != 1)
10453 SysError(
"ShowLog",
"error writing to stdout");
10458 lseek(fileno(fLogFileR), nowlog, SEEK_SET);
10465 void TProof::cd(Int_t
id)
10467 if (GetManager()) {
10468 TProofDesc *d = GetManager()->GetProofDesc(
id);
10470 if (d->GetProof()) {
10471 gProof = d->GetProof();
10487 void TProof::Detach(Option_t *opt)
10490 if (!IsValid())
return;
10493 TSlave *sl = (TSlave *) fActiveSlaves->First();
10495 if (!sl || !(sl->IsValid()) || !(s = sl->GetSocket())) {
10496 Error(
"Detach",
"corrupted worker instance: wrk:%p, sock:%p", sl, s);
10500 Bool_t shutdown = (strchr(opt,
's') || strchr(opt,
'S')) ? kTRUE : kFALSE;
10503 if (shutdown && !IsIdle()) {
10505 Remove(
"cleanupqueue");
10507 Long_t timeout = gEnv->GetValue(
"Proof.ShutdownTimeout", 60);
10508 timeout = (timeout > 20) ? timeout : 20;
10510 StopProcess(kFALSE, (Long_t) (timeout / 2));
10512 Collect(kActive, timeout);
10516 DeActivateAsyncInput();
10525 if (fProgressDialogStarted)
10526 CloseProgressDialog();
10529 if (GetManager() && GetManager()->QuerySessions(
"L")) {
10530 TIter nxd(GetManager()->QuerySessions(
"L"));
10532 while ((d = (TProofDesc *)nxd())) {
10533 if (d->GetProof() ==
this) {
10535 GetManager()->QuerySessions(
"L")->Remove(d);
10552 void TProof::SetAlias(
const char *alias)
10555 TNamed::SetTitle(alias);
10556 if (TestBit(TProof::kIsMaster))
10558 TNamed::SetName(alias);
10561 if (!IsValid())
return;
10563 if (!IsProofd() && TestBit(TProof::kIsClient)) {
10564 TSlave *sl = (TSlave *) fActiveSlaves->First();
10566 sl->SetAlias(alias);
10604 Int_t TProof::UploadDataSet(
const char *, TList *,
const char *, Int_t, TList *)
10606 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10635 Int_t TProof::UploadDataSet(
const char *,
const char *,
const char *, Int_t, TList *)
10637 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10653 Int_t TProof::UploadDataSetFromFile(
const char *,
const char *,
const char *, Int_t, TList *)
10655 Printf(
" *** WARNING: this function is obsolete: it has been replaced by TProofMgr::UploadFiles ***");
10676 Bool_t TProof::RegisterDataSet(
const char *dataSetName,
10677 TFileCollection *dataSet,
const char *optStr)
10680 if (fProtocol < 17) {
10681 Info(
"RegisterDataSet",
10682 "functionality not available: the server does not have dataset support");
10686 if (!dataSetName || strlen(dataSetName) <= 0) {
10687 Info(
"RegisterDataSet",
"specifying a dataset name is mandatory");
10691 Bool_t parallelverify = kFALSE;
10692 TString sopt(optStr);
10693 if (sopt.Contains(
"V") && fProtocol >= 34 && !sopt.Contains(
"S")) {
10695 parallelverify = kTRUE;
10696 sopt.ReplaceAll(
"V",
"");
10699 sopt.ReplaceAll(
"S",
"");
10701 TMessage mess(kPROOF_DATASETS);
10702 mess << Int_t(kRegisterDataSet);
10703 mess << TString(dataSetName);
10705 mess.WriteObject(dataSet);
10708 Bool_t result = kTRUE;
10710 if (fStatus != 0) {
10711 Error(
"RegisterDataSet",
"dataset was not saved");
10717 if (!parallelverify)
return result;
10721 if (VerifyDataSet(dataSetName, sopt) < 0){
10722 Error(
"RegisterDataSet",
"problems verifying dataset '%s'", dataSetName);
10735 Int_t TProof::SetDataSetTreeName(
const char *dataset,
const char *treename)
10738 if (fProtocol < 23) {
10739 Info(
"SetDataSetTreeName",
"functionality not supported by the server");
10743 if (!dataset || strlen(dataset) <= 0) {
10744 Info(
"SetDataSetTreeName",
"specifying a dataset name is mandatory");
10748 if (!treename || strlen(treename) <= 0) {
10749 Info(
"SetDataSetTreeName",
"specifying a tree name is mandatory");
10754 TString fragment(treename);
10755 if (!fragment.BeginsWith(
"/")) fragment.Insert(0,
"/");
10756 uri.SetFragment(fragment);
10758 TMessage mess(kPROOF_DATASETS);
10759 mess << Int_t(kSetDefaultTreeName);
10760 mess << uri.GetUri();
10764 if (fStatus != 0) {
10765 Error(
"SetDataSetTreeName",
"some error occured: default tree name not changed");
10778 TMap *TProof::GetDataSets(
const char *uri,
const char *optStr)
10780 if (fProtocol < 15) {
10781 Info(
"GetDataSets",
10782 "functionality not available: the server does not have dataset support");
10785 if (fProtocol < 31 && strstr(optStr,
":lite:"))
10786 Warning(
"GetDataSets",
"'lite' option not supported by the server");
10788 TMessage mess(kPROOF_DATASETS);
10789 mess << Int_t(kGetDataSets);
10790 mess << TString(uri ? uri :
"");
10791 mess << TString(optStr ? optStr :
"");
10793 Collect(kActive, fCollectTimeout);
10795 TMap *dataSetMap = 0;
10796 if (fStatus != 0) {
10797 Error(
"GetDataSets",
"error receiving datasets information");
10800 TMessage *retMess = (TMessage *) fRecvMessages->First();
10801 if (retMess && retMess->What() == kMESS_OK) {
10802 if (!(dataSetMap = (TMap *)(retMess->ReadObject(TMap::Class()))))
10803 Error(
"GetDataSets",
"error receiving datasets");
10805 Error(
"GetDataSets",
"message not found or wrong type (%p)", retMess);
10815 void TProof::ShowDataSets(
const char *uri,
const char* optStr)
10817 if (fProtocol < 15) {
10818 Info(
"ShowDataSets",
10819 "functionality not available: the server does not have dataset support");
10823 TMessage mess(kPROOF_DATASETS);
10824 mess << Int_t(kShowDataSets);
10825 mess << TString(uri ? uri :
"");
10826 mess << TString(optStr ? optStr :
"");
10829 Collect(kActive, fCollectTimeout);
10831 Error(
"ShowDataSets",
"error receiving datasets information");
10837 Bool_t TProof::ExistsDataSet(
const char *dataset)
10839 if (fProtocol < 15) {
10840 Info(
"ExistsDataSet",
"functionality not available: the server has an"
10841 " incompatible version of TFileInfo");
10845 if (!dataset || strlen(dataset) <= 0) {
10846 Error(
"ExistsDataSet",
"dataset name missing");
10850 TMessage msg(kPROOF_DATASETS);
10851 msg << Int_t(kCheckDataSetName) << TString(dataset);
10853 Collect(kActive, fCollectTimeout);
10854 if (fStatus == -1) {
10865 void TProof::ClearDataSetCache(
const char *dataset)
10867 if (fProtocol < 28) {
10868 Info(
"ClearDataSetCache",
"functionality not available on server");
10872 TMessage msg(kPROOF_DATASETS);
10873 msg << Int_t(kCache) << TString(dataset) << TString(
"clear");
10875 Collect(kActive, fCollectTimeout);
10883 void TProof::ShowDataSetCache(
const char *dataset)
10885 if (fProtocol < 28) {
10886 Info(
"ShowDataSetCache",
"functionality not available on server");
10890 TMessage msg(kPROOF_DATASETS);
10891 msg << Int_t(kCache) << TString(dataset) << TString(
"show");
10893 Collect(kActive, fCollectTimeout);
10906 TFileCollection *TProof::GetDataSet(
const char *uri,
const char *optStr)
10908 if (fProtocol < 15) {
10909 Info(
"GetDataSet",
"functionality not available: the server has an"
10910 " incompatible version of TFileInfo");
10914 if (!uri || strlen(uri) <= 0) {
10915 Info(
"GetDataSet",
"specifying a dataset name is mandatory");
10919 TMessage nameMess(kPROOF_DATASETS);
10920 nameMess << Int_t(kGetDataSet);
10921 nameMess << TString(uri);
10922 nameMess << TString(optStr ? optStr:
"");
10923 if (Broadcast(nameMess) < 0)
10924 Error(
"GetDataSet",
"sending request failed");
10926 Collect(kActive, fCollectTimeout);
10927 TFileCollection *fileList = 0;
10928 if (fStatus != 0) {
10929 Error(
"GetDataSet",
"error receiving datasets information");
10932 TMessage *retMess = (TMessage *) fRecvMessages->First();
10933 if (retMess && retMess->What() == kMESS_OK) {
10934 if (!(fileList = (TFileCollection*)(retMess->ReadObject(TFileCollection::Class()))))
10935 Error(
"GetDataSet",
"error reading list of files");
10937 Error(
"GetDataSet",
"message not found or wrong type (%p)", retMess);
10946 void TProof::ShowDataSet(
const char *uri,
const char* opt)
10948 TFileCollection *fileList = 0;
10949 if ((fileList = GetDataSet(uri))) {
10950 fileList->Print(opt);
10953 Warning(
"ShowDataSet",
"no such dataset: %s", uri);
10960 Int_t TProof::RemoveDataSet(
const char *uri,
const char* optStr)
10962 TMessage nameMess(kPROOF_DATASETS);
10963 nameMess << Int_t(kRemoveDataSet);
10964 nameMess << TString(uri?uri:
"");
10965 nameMess << TString(optStr?optStr:
"");
10966 if (Broadcast(nameMess) < 0)
10967 Error(
"RemoveDataSet",
"sending request failed");
10968 Collect(kActive, fCollectTimeout);
10979 TList* TProof::FindDataSets(
const char* ,
const char* )
10981 Error (
"FindDataSets",
"not yet implemented");
10982 return (TList *) 0;
10989 Bool_t TProof::RequestStagingDataSet(
const char *dataset)
10991 if (fProtocol < 35) {
10992 Error(
"RequestStagingDataSet",
10993 "functionality not supported by the server");
10997 TMessage mess(kPROOF_DATASETS);
10998 mess << Int_t(kRequestStaging);
10999 mess << TString(dataset);
11003 if (fStatus != 0) {
11004 Error(
"RequestStagingDataSet",
"staging request was unsuccessful");
11015 Bool_t TProof::CancelStagingDataSet(
const char *dataset)
11017 if (fProtocol < 36) {
11018 Error(
"CancelStagingDataSet",
11019 "functionality not supported by the server");
11023 TMessage mess(kPROOF_DATASETS);
11024 mess << Int_t(kCancelStaging);
11025 mess << TString(dataset);
11029 if (fStatus != 0) {
11030 Error(
"CancelStagingDataSet",
"cancel staging request was unsuccessful");
11042 TFileCollection *TProof::GetStagingStatusDataSet(
const char *dataset)
11044 if (fProtocol < 35) {
11045 Error(
"GetStagingStatusDataSet",
11046 "functionality not supported by the server");
11050 TMessage nameMess(kPROOF_DATASETS);
11051 nameMess << Int_t(kStagingStatus);
11052 nameMess << TString(dataset);
11053 if (Broadcast(nameMess) < 0) {
11054 Error(
"GetStagingStatusDataSet",
"sending request failed");
11058 Collect(kActive, fCollectTimeout);
11059 TFileCollection *fc = NULL;
11062 Error(
"GetStagingStatusDataSet",
"problem processing the request");
11064 else if (fStatus == 0) {
11065 TMessage *retMess = (TMessage *)fRecvMessages->First();
11066 if (retMess && (retMess->What() == kMESS_OK)) {
11067 fc = (TFileCollection *)(
11068 retMess->ReadObject(TFileCollection::Class()) );
11070 Error(
"GetStagingStatusDataSet",
"error reading list of files");
11073 Error(
"GetStagingStatusDataSet",
11074 "response message not found or wrong type (%p)", retMess);
11085 void TProof::ShowStagingStatusDataSet(
const char *dataset,
const char *opt)
11087 TFileCollection *fc = GetStagingStatusDataSet(dataset);
11099 Int_t TProof::VerifyDataSet(
const char *uri,
const char *optStr)
11101 if (fProtocol < 15) {
11102 Info(
"VerifyDataSet",
"functionality not available: the server has an"
11103 " incompatible version of TFileInfo");
11108 if (!uri || (uri && strlen(uri) <= 0)) {
11109 Error(
"VerifyDataSet",
"dataset name is is mandatory");
11113 Int_t nmissingfiles = 0;
11115 TString sopt(optStr);
11116 if (fProtocol < 34 || sopt.Contains(
"S")) {
11117 sopt.ReplaceAll(
"S",
"");
11118 Info(
"VerifyDataSet",
"Master-only verification");
11119 TMessage nameMess(kPROOF_DATASETS);
11120 nameMess << Int_t(kVerifyDataSet);
11121 nameMess << TString(uri);
11123 Broadcast(nameMess);
11125 Collect(kActive, fCollectTimeout);
11128 Info(
"VerifyDataSet",
"no such dataset %s", uri);
11131 nmissingfiles = fStatus;
11132 return nmissingfiles;
11136 if (!IsParallel() && !fDynamicStartup) {
11137 Error(
"VerifyDataSet",
"PROOF is in sequential mode (no workers): cannot do parallel verification.");
11138 Error(
"VerifyDataSet",
"Either start PROOF with some workers or force sequential adding 'S' as option.");
11143 return VerifyDataSetParallel(uri, optStr);
11150 Int_t TProof::VerifyDataSetParallel(
const char *uri,
const char *optStr)
11152 Int_t nmissingfiles = 0;
11155 SetParameter(
"PROOF_FilesToProcess", Form(
"dataset:%s", uri));
11159 if (TProof::GetParameter(GetInputList(),
"PROOF_Packetizer", oldpack) != 0) oldpack =
"";
11160 SetParameter(
"PROOF_Packetizer",
"TPacketizerFile");
11163 SetParameter(
"PROOF_VerifyDataSet", uri);
11165 SetParameter(
"PROOF_VerifyDataSetOption", optStr);
11166 SetParameter(
"PROOF_SavePartialResults", (Int_t)0);
11167 Int_t oldifiip = -1;
11168 if (TProof::GetParameter(GetInputList(),
"PROOF_IncludeFileInfoInPacket", oldifiip) != 0) oldifiip = -1;
11169 SetParameter(
"PROOF_IncludeFileInfoInPacket", (Int_t)1);
11172 const char* mss=
"";
11173 SetParameter(
"PROOF_MSS", mss);
11174 const char* stageoption=
"";
11175 SetParameter(
"PROOF_StageOption", stageoption);
11178 Process(
"TSelVerifyDataSet", (Long64_t) 1);
11181 if (!oldpack.IsNull())
11182 SetParameter(
"PROOF_Packetizer", oldpack);
11184 DeleteParameters(
"PROOF_Packetizer");
11187 DeleteParameters(
"PROOF_FilesToProcess");
11188 DeleteParameters(
"PROOF_VerifyDataSet");
11189 DeleteParameters(
"PROOF_VerifyDataSetOption");
11190 DeleteParameters(
"PROOF_MSS");
11191 DeleteParameters(
"PROOF_StageOption");
11192 if (oldifiip > -1) {
11193 SetParameter(
"PROOF_IncludeFileInfoInPacket", oldifiip);
11195 DeleteParameters(
"PROOF_IncludeFileInfoInPacket");
11197 DeleteParameters(
"PROOF_SavePartialResults");
11201 Int_t ntouched = 0;
11202 Bool_t changed_ds = kFALSE;
11204 TIter nxtout(GetOutputList());
11206 TList *lfiindout =
new TList;
11207 while ((obj = nxtout())) {
11208 TList *l =
dynamic_cast<TList *
>(obj);
11209 if (l && TString(l->GetName()).BeginsWith(
"PROOF_ListFileInfos_")) {
11211 TFileInfo *fiindout = 0;
11212 while ((fiindout = (TFileInfo*) nxt())) {
11213 lfiindout->Add(fiindout);
11217 TParameter<Int_t>* pdisappeared =
dynamic_cast<TParameter<Int_t>*
>(obj);
11218 if ( pdisappeared && TString(pdisappeared->GetName()).BeginsWith(
"PROOF_NoFilesDisppeared_")) {
11219 nmissingfiles += pdisappeared->GetVal();
11221 TParameter<Int_t>* pnopened =
dynamic_cast<TParameter<Int_t>*
>(obj);
11222 if (pnopened && TString(pnopened->GetName()).BeginsWith(
"PROOF_NoFilesOpened_")) {
11223 nopened += pnopened->GetVal();
11225 TParameter<Int_t>* pntouched =
dynamic_cast<TParameter<Int_t>*
>(obj);
11226 if (pntouched && TString(pntouched->GetName()).BeginsWith(
"PROOF_NoFilesTouched_")) {
11227 ntouched += pntouched->GetVal();
11229 TParameter<Bool_t>* pchanged_ds =
dynamic_cast<TParameter<Bool_t>*
>(obj);
11230 if (pchanged_ds && TString(pchanged_ds->GetName()).BeginsWith(
"PROOF_DataSetChanged_")) {
11231 if (pchanged_ds->GetVal() == kTRUE) changed_ds = kTRUE;
11235 Info(
"VerifyDataSetParallel",
"%s: changed? %d (# files opened = %d, # files touched = %d,"
11236 " # missing files = %d)",
11237 uri, changed_ds, nopened, ntouched, nmissingfiles);
11239 return nmissingfiles;
11245 TMap *TProof::GetDataSetQuota(
const char* optStr)
11248 Info(
"UploadDataSet",
"Lite-session: functionality not implemented");
11252 TMessage mess(kPROOF_DATASETS);
11253 mess << Int_t(kGetQuota);
11254 mess << TString(optStr?optStr:
"");
11257 Collect(kActive, fCollectTimeout);
11258 TMap *groupQuotaMap = 0;
11260 Info(
"GetDataSetQuota",
"could not receive quota");
11263 TMessage *retMess = (TMessage *) fRecvMessages->First();
11264 if (retMess && retMess->What() == kMESS_OK) {
11265 if (!(groupQuotaMap = (TMap*)(retMess->ReadObject(TMap::Class()))))
11266 Error(
"GetDataSetQuota",
"error getting quotas");
11268 Error(
"GetDataSetQuota",
"message not found or wrong type (%p)", retMess);
11271 return groupQuotaMap;
11278 void TProof::ShowDataSetQuota(Option_t* opt)
11280 if (fProtocol < 15) {
11281 Info(
"ShowDataSetQuota",
11282 "functionality not available: the server does not have dataset support");
11287 Info(
"UploadDataSet",
"Lite-session: functionality not implemented");
11291 TMessage mess(kPROOF_DATASETS);
11292 mess << Int_t(kShowQuota);
11293 mess << TString(opt?opt:
"");
11298 Error(
"ShowDataSetQuota",
"error receiving quota information");
11304 void TProof::InterruptCurrentMonitor()
11306 if (fCurrentMonitor)
11307 fCurrentMonitor->Interrupt();
11322 Int_t TProof::ActivateWorker(
const char *ord, Bool_t save)
11324 return ModifyWorkerLists(ord, kTRUE, save);
11339 Int_t TProof::DeactivateWorker(
const char *ord, Bool_t save)
11341 return ModifyWorkerLists(ord, kFALSE, save);
11358 Int_t TProof::ModifyWorkerLists(
const char *ord, Bool_t add, Bool_t save)
11361 if (!ord || strlen(ord) <= 0) {
11362 Info(
"ModifyWorkerLists",
11363 "an ordinal number - e.g. \"0.4\" or \"*\" for all - is required as input");
11367 Info(
"ModifyWorkerLists",
"ord: '%s' (add: %d, save: %d)", ord, add, save);
11370 Bool_t restoring = !strcmp(ord,
"restore") ? kTRUE : kFALSE;
11371 if (IsEndMaster()) {
11374 nwc = RestoreActiveList();
11376 if (save) SaveActiveList();
11380 Bool_t allord = strcmp(ord,
"*") ? kFALSE : kTRUE;
11383 if (TestBit(TProof::kIsMaster) && gProofServ) {
11385 strncmp(ord, gProofServ->GetOrdinal(), strlen(gProofServ->GetOrdinal())))
11390 Bool_t rs = kFALSE;
11393 TList *in = (add) ? fInactiveSlaves : fActiveSlaves;
11394 TList *out = (add) ? fActiveSlaves : fInactiveSlaves;
11396 if (IsEndMaster() && !restoring) {
11398 THashList *ords = 0;
11400 ords =
new THashList();
11401 const char *masterord = (gProofServ) ? gProofServ->GetOrdinal() :
"0";
11402 TString oo(ord), o;
11404 while(oo.Tokenize(o, from,
","))
11405 if (o.BeginsWith(masterord)) ords->Add(
new TObjString(o));
11412 if (in->GetSize() > 0) {
11414 while ((wrk = (TSlave *) nxw())) {
11416 if (allord || (ords && (os = ords->FindObject(wrk->GetOrdinal())))) {
11418 if (!out->FindObject(wrk)) {
11421 fActiveMonitor->Add(wrk->GetSocket());
11426 fActiveMonitor->Remove(wrk->GetSocket());
11427 wrk->SetStatus(TSlave::kInactive);
11429 wrk->SetStatus(TSlave::kActive);
11437 if (!allord && ords) {
11438 if (os) ords->Remove(os);
11439 if (ords->GetSize() == 0)
break;
11446 if (!fw && ords && ords->GetSize() > 0) {
11449 while ((os = nxo())) {
11451 while ((wrk = (TSlave *) nxw()))
11452 if (!strcmp(os->GetName(), wrk->GetOrdinal()))
break;
11454 if (!oo.IsNull()) oo +=
",";
11455 oo += os->GetName();
11458 if (!oo.IsNull()) {
11459 Warning(
"ModifyWorkerLists",
"worker(s) '%s' not found!", oo.Data());
11472 FindUniqueSlaves();
11475 Int_t action = (add) ? (Int_t) kActivateWorker : (Int_t) kDeactivateWorker;
11477 if (fProtocol > 32) {
11478 TMessage mess(kPROOF_WORKERLISTS);
11479 mess << action << TString(ord);
11481 Collect(kActive, fCollectTimeout);
11482 if (fStatus != 0) {
11483 nwc = (fStatus < nwc) ? fStatus : nwc;
11484 if (fStatus == -2) {
11486 Warning(
"ModifyWorkerLists",
"request not completely full filled");
11488 Error(
"ModifyWorkerLists",
"request failed");
11492 TString oo(ord), o;
11493 if (oo.Contains(
","))
11494 Warning(
"ModifyWorkerLists",
"block request not supported by server: splitting into pieces ...");
11496 while(oo.Tokenize(o, from,
",")) {
11497 TMessage mess(kPROOF_WORKERLISTS);
11498 mess << action << o;
11500 Collect(kActive, fCollectTimeout);
11511 void TProof::SaveActiveList()
11513 if (!fActiveSlavesSaved.IsNull()) fActiveSlavesSaved =
"";
11514 if (fInactiveSlaves->GetSize() == 0) {
11515 fActiveSlavesSaved =
"*";
11517 TIter nxw(fActiveSlaves);
11519 while ((wk = (TSlave *)nxw())) { fActiveSlavesSaved += TString::Format(
"%s,", wk->GetOrdinal()); }
11526 Int_t TProof::RestoreActiveList()
11529 DeactivateWorker(
"*", kFALSE);
11531 if (!fActiveSlavesSaved.IsNull())
11532 return ActivateWorker(fActiveSlavesSaved, kFALSE);
11552 TProof *TProof::Open(
const char *cluster,
const char *conffile,
11553 const char *confdir, Int_t loglevel)
11555 const char *pn =
"TProof::Open";
11561 TPluginManager *pm = gROOT->GetPluginManager();
11563 ::Error(pn,
"plugin manager not found");
11567 if (gROOT->IsBatch()) {
11568 ::Error(pn,
"we are in batch mode, cannot show PROOF Session Viewer");
11572 TPluginHandler *sv = pm->FindHandler(
"TSessionViewer",
"");
11574 ::Error(pn,
"no plugin found for TSessionViewer");
11577 if (sv->LoadPlugin() == -1) {
11578 ::Error(pn,
"plugin for TSessionViewer could not be loaded");
11586 TString clst(cluster);
11589 if (PoDCheckUrl( &clst ) < 0)
return 0;
11591 if (clst.BeginsWith(
"workers=")) clst.Insert(0,
"lite:///?");
11592 if (clst.BeginsWith(
"tunnel=")) clst.Insert(0,
"/?");
11601 TString opts(u.GetOptions());
11602 if (!opts.IsNull()) {
11603 Int_t it = opts.Index(
"tunnel=");
11605 TString sport = opts(it + strlen(
"tunnel="), opts.Length());
11606 TString host(
"127.0.0.1");
11608 Int_t ic = sport.Index(
":");
11611 host = sport(0, ic);
11612 sport.Remove(0, ic + 1);
11614 if (!sport.IsDigit()) {
11616 TRegexp re(
"[^0-9]");
11617 Int_t ind = sport.Index(re);
11622 if (sport.IsDigit())
11623 port = sport.Atoi();
11626 ::Info(
"TProof::Open",
"using tunnel at %s:%d", host.Data(), port);
11627 gEnv->SetValue(
"XNet.SOCKS4Host", host);
11628 gEnv->SetValue(
"XNet.SOCKS4Port", port);
11631 ::Warning(
"TProof::Open",
11632 "problems parsing tunnelling info from options: %s", opts.Data());
11639 Bool_t create = kFALSE;
11640 if (opts.Length() > 0) {
11641 if (opts.BeginsWith(
"N",TString::kIgnoreCase)) {
11644 u.SetOptions(opts);
11645 }
else if (opts.IsDigit()) {
11646 locid = opts.Atoi();
11651 TProofMgr *mgr = TProofMgr::Create(u.GetUrl());
11654 if (mgr && mgr->IsValid()) {
11658 Bool_t attach = (create || mgr->IsProofd() || mgr->IsLite()) ? kFALSE : kTRUE;
11663 d = (TProofDesc *) mgr->QuerySessions(
"")->First();
11665 d = (TProofDesc *) mgr->GetProofDesc(locid);
11667 proof = (TProof*) mgr->AttachSession(d);
11668 if (!proof || !proof->IsValid()) {
11670 ::Error(pn,
"new session could not be attached");
11678 proof = (TProof*) mgr->CreateSession(conffile, confdir, loglevel);
11679 if (!proof || !proof->IsValid()) {
11680 ::Error(pn,
"new session could not be created");
11693 TProofMgr *TProof::Mgr(
const char *url)
11696 return (TProofMgr *)0;
11699 return TProofMgr::Create(url);
11705 void TProof::Reset(
const char *url, Bool_t hard)
11708 TProofMgr *mgr = TProof::Mgr(url);
11709 if (mgr && mgr->IsValid())
11712 ::Error(
"TProof::Reset",
11713 "unable to initialize a valid manager instance");
11720 const TList *TProof::GetEnvVars()
11722 return fgProofEnvList;
11729 void TProof::AddEnvVar(
const char *name,
const char *value)
11731 if (gDebug > 0) ::Info(
"TProof::AddEnvVar",
"%s=%s", name, value);
11733 if (fgProofEnvList == 0) {
11735 fgProofEnvList =
new TList;
11736 fgProofEnvList->SetOwner();
11739 TObject *o = fgProofEnvList->FindObject(name);
11741 fgProofEnvList->Remove(o);
11744 fgProofEnvList->Add(
new TNamed(name, value));
11751 void TProof::DelEnvVar(
const char *name)
11753 if (fgProofEnvList == 0)
return;
11755 TObject *o = fgProofEnvList->FindObject(name);
11757 fgProofEnvList->Remove(o);
11765 void TProof::ResetEnvVars()
11767 if (fgProofEnvList == 0)
return;
11769 SafeDelete(fgProofEnvList);
11777 void TProof::SaveWorkerInfo()
11780 if (TestBit(TProof::kIsClient))
11785 Error(
"SaveWorkerInfo",
"gProofServ undefined");
11790 if (!fSlaves && !fBadSlaves) {
11791 Warning(
"SaveWorkerInfo",
"all relevant worker lists is undefined");
11796 TString fnwrk = TString::Format(
"%s/.workers",
11797 gSystem->DirName(gProofServ->GetSessionDir()));
11798 FILE *fwrk = fopen(fnwrk.Data(),
"w");
11800 Error(
"SaveWorkerInfo",
11801 "cannot open %s for writing (errno: %d)", fnwrk.Data(), errno);
11808 if (gSystem->Getenv(
"PROOF_ADDITIONALLOG")) {
11809 addlogext = gSystem->Getenv(
"PROOF_ADDITIONALLOG");
11810 TPMERegexp reLogTag(
"^__(.*)__\\.log");
11811 if (reLogTag.Match(addlogext) == 2) {
11812 addLogTag = reLogTag[1];
11818 Info(
"SaveWorkerInfo",
"request for additional line with ext: '%s'", addlogext.Data());
11822 TPMERegexp re(
"(.*?)-[0-9]+-[0-9]+$");
11825 TIter nxa(fSlaves);
11828 while ((wrk = (TSlave *) nxa())) {
11829 Int_t status = (fBadSlaves && fBadSlaves->FindObject(wrk)) ? 0 : 1;
11830 logfile = wrk->GetWorkDir();
11831 if (re.Match(logfile) == 2) logfile = re[1];
11834 fprintf(fwrk,
"%s@%s:%d %d %s %s.log\n",
11835 wrk->GetUser(), wrk->GetName(), wrk->GetPort(), status,
11836 wrk->GetOrdinal(), logfile.Data());
11838 if (addlogext.Length() > 0) {
11839 fprintf(fwrk,
"%s@%s:%d %d %s(%s) %s.%s\n",
11840 wrk->GetUser(), wrk->GetName(), wrk->GetPort(), status,
11841 wrk->GetOrdinal(), addLogTag.Data(), logfile.Data(), addlogext.Data());
11848 TIter nxb(fBadSlaves);
11849 while ((wrk = (TSlave *) nxb())) {
11850 logfile = wrk->GetWorkDir();
11851 if (re.Match(logfile) == 2) logfile = re[1];
11853 if (!fSlaves->FindObject(wrk)) {
11855 fprintf(fwrk,
"%s@%s:%d 0 %s %s.log\n",
11856 wrk->GetUser(), wrk->GetName(), wrk->GetPort(),
11857 wrk->GetOrdinal(), logfile.Data());
11864 TIter nxt(fTerminatedSlaveInfos);
11866 while (( sli = (TSlaveInfo *)nxt() )) {
11867 logfile = sli->GetDataDir();
11868 if (re.Match(logfile) == 2) logfile = re[1];
11870 fprintf(fwrk,
"%s 2 %s %s.log\n",
11871 sli->GetName(), sli->GetOrdinal(), logfile.Data());
11873 if (addlogext.Length() > 0) {
11874 fprintf(fwrk,
"%s 2 %s(%s) %s.%s\n",
11875 sli->GetName(), sli->GetOrdinal(), addLogTag.Data(),
11876 logfile.Data(), addlogext.Data());
11892 Int_t TProof::GetParameter(TCollection *c,
const char *par, TString &value)
11894 TObject *obj = c ? c->FindObject(par) : (TObject *)0;
11896 TNamed *p =
dynamic_cast<TNamed*
>(obj);
11898 value = p->GetTitle();
11911 Int_t TProof::GetParameter(TCollection *c,
const char *par, Int_t &value)
11913 TObject *obj = c ? c->FindObject(par) : (TObject *)0;
11915 TParameter<Int_t> *p =
dynamic_cast<TParameter<Int_t>*
>(obj);
11917 value = p->GetVal();
11929 Int_t TProof::GetParameter(TCollection *c,
const char *par, Long_t &value)
11931 TObject *obj = c ? c->FindObject(par) : (TObject *)0;
11933 TParameter<Long_t> *p =
dynamic_cast<TParameter<Long_t>*
>(obj);
11935 value = p->GetVal();
11947 Int_t TProof::GetParameter(TCollection *c,
const char *par, Long64_t &value)
11949 TObject *obj = c ? c->FindObject(par) : (TObject *)0;
11951 TParameter<Long64_t> *p =
dynamic_cast<TParameter<Long64_t>*
>(obj);
11953 value = p->GetVal();
11965 Int_t TProof::GetParameter(TCollection *c,
const char *par, Double_t &value)
11967 TObject *obj = c ? c->FindObject(par) : (TObject *)0;
11969 TParameter<Double_t> *p =
dynamic_cast<TParameter<Double_t>*
>(obj);
11971 value = p->GetVal();
11984 Int_t TProof::AssertDataSet(TDSet *dset, TList *input,
11985 TDataSetManager *mgr, TString &emsg)
11990 if (!dset || !input || !mgr) {
11991 emsg.Form(
"invalid inputs (%p, %p, %p)", dset, input, mgr);
11995 TList *datasets =
new TList;
11996 TFileCollection *dataset = 0;
11998 TString dsname(dset->GetName());
12001 TString dsns(dsname), enlname;
12002 Ssiz_t eli = dsns.Index(
"?enl=");
12003 if (eli != kNPOS) {
12004 enlname = dsns(eli + strlen(
"?enl="), dsns.Length());
12005 dsns.Remove(eli, dsns.Length()-eli);
12009 if (dsname.BeginsWith(
"TFileCollection:")) {
12011 dsname.ReplaceAll(
"TFileCollection:",
"");
12013 dataset = (TFileCollection *) input->FindObject(dsname);
12015 emsg.Form(
"TFileCollection %s not found in input list", dset->GetName());
12019 input->RecursiveRemove(dataset);
12021 datasets->Add(
new TPair(dataset,
new TObjString(enlname.Data())));
12024 if (TProof::GetParameter(input,
"PROOF_LookupOpt", lookupopt) != 0) {
12025 lookupopt = gEnv->GetValue(
"Proof.LookupOpt",
"all");
12026 input->Add(
new TNamed(
"PROOF_LookupOpt", lookupopt.Data()));
12039 TFileCollection *fc =
nullptr;
12044 TRegexp rg(
"[, |]");
12045 Bool_t validEnl = (enlname.Index(rg) == kNPOS) ? kTRUE : kFALSE;
12046 Bool_t validSdsn = (dsns.Index(rg) == kNPOS) ? kTRUE : kFALSE;
12048 if (validEnl && validSdsn && (( fc = mgr->GetDataSet(dsns) ))) {
12054 TIter nxfi(fc->GetList());
12056 while (( fi = (TFileInfo *)nxfi() ))
12057 fi->SetTitle(dsns.Data());
12062 datasets->Add(
new TPair(dataset,
new TObjString( enlname.Data() )) );
12071 dsns = dsname.Data();
12074 while (dsns.Tokenize(dsn1, from1,
"[, ]")) {
12077 while (dsn1.Tokenize(dsn2, from2,
"|")) {
12079 Int_t ienl = dsn2.Index(
"?enl=");
12080 if (ienl != kNPOS) {
12081 enlname = dsn2(ienl + 5, dsn2.Length());
12084 if ((fc = mgr->GetDataSet(dsn2.Data()))) {
12086 TIter nxfi(fc->GetList());
12088 while ((fi = (TFileInfo *) nxfi())) { fi->SetTitle(dsn2.Data()); }
12102 if (dataset->GetList()->First())
12103 ((TFileInfo *)(dataset->GetList()->First()))->SetTitle(dsn1.Data());
12105 datasets->Add(
new TPair(dataset,
new TObjString(enlname.Data())));
12118 if (!datasets || datasets->GetSize() <= 0) {
12119 emsg.Form(
"no dataset(s) found on the master corresponding to: %s", dsname.Data());
12123 if (!(dataset = (TFileCollection *) ((TPair *)(datasets->First()))->Key())) {
12124 emsg.Form(
"dataset pointer is null: corruption? - aborting");
12130 if (TProof::GetParameter(input,
"PROOF_LookupOpt", lookupopt) != 0) {
12131 lookupopt = gEnv->GetValue(
"Proof.LookupOpt",
"stagedOnly");
12132 input->Add(
new TNamed(
"PROOF_LookupOpt", lookupopt.Data()));
12145 mgr->ParseUri(dsnparse.Data(), 0, 0, 0, &dsTree);
12146 if (dsTree.IsNull()) {
12149 dsTree += dset->GetDirectory();
12150 dsTree += dset->GetObjName();
12152 if (!dsTree.IsNull() && dsTree !=
"/") {
12153 TString tree(dsTree);
12154 Int_t idx = tree.Index(
"/");
12155 if (idx != kNPOS) {
12156 TString dir = tree(0, idx+1);
12157 tree.Remove(0, idx);
12158 dset->SetDirectory(dir);
12160 dset->SetObjName(tree);
12163 dsTree = dataset->GetDefaultTreeName();
12167 TList *srvmapsref = TDataSetManager::GetDataSetSrvMaps();
12168 TList *srvmapslist = srvmapsref;
12170 if (TProof::GetParameter(input,
"PROOF_DataSetSrvMaps", srvmaps) == 0) {
12171 srvmapslist = TDataSetManager::ParseDataSetSrvMaps(srvmaps);
12174 if (srvmapsref && !srvmapslist) {
12175 msg.Form(
"+++ Info: dataset server mapping(s) DISABLED by user");
12176 }
else if (srvmapsref && srvmapslist && srvmapslist != srvmapsref) {
12177 msg.Form(
"+++ Info: dataset server mapping(s) modified by user");
12178 }
else if (!srvmapsref && srvmapslist) {
12179 msg.Form(
"+++ Info: dataset server mapping(s) added by user");
12181 gProofServ->SendAsynMessage(msg.Data());
12186 if (datasets->GetSize() > 1) dset->SetBit(TDSet::kMultiDSet);
12188 TList *listOfMissingFiles =
new TList;
12189 TEntryList *entrylist = 0;
12191 TIter nxds(datasets);
12192 while ((pair = (TPair *) nxds())) {
12194 dataset = (TFileCollection *) pair->Key();
12196 TEntryList *enl = 0;
12197 TObjString *os = (TObjString *) pair->Value();
12198 if (strlen(os->GetName())) {
12199 if (!(enl = dynamic_cast<TEntryList *>(input->FindObject(os->GetName())))) {
12201 gProofServ->SendAsynMessage(TString::Format(
"+++ Warning:"
12202 " entry list %s not found", os->GetName()));
12204 if (enl && (!(enl->GetLists()) || enl->GetLists()->GetSize() <= 0)) {
12206 gProofServ->SendAsynMessage(TString::Format(
"+++ Warning:"
12207 " no sub-lists in entry-list!"));
12210 TList *missingFiles =
new TList;
12211 TSeqCollection* files = dataset->GetList();
12212 if (gDebug > 0) files->Print();
12213 Bool_t availableOnly = (lookupopt !=
"all") ? kTRUE : kFALSE;
12214 if (dset->TestBit(TDSet::kMultiDSet)) {
12215 TDSet *ds =
new TDSet(dataset->GetName(), dset->GetObjName(), dset->GetDirectory());
12216 ds->SetSrvMaps(srvmapslist);
12217 if (!ds->Add(files, dsTree, availableOnly, missingFiles)) {
12218 emsg.Form(
"error integrating dataset %s", dataset->GetName());
12224 if (enl) ds->SetEntryList(enl);
12226 dset->SetSrvMaps(srvmapslist);
12227 if (!dset->Add(files, dsTree, availableOnly, missingFiles)) {
12228 emsg.Form(
"error integrating dataset %s", dataset->GetName());
12231 if (enl) entrylist = enl;
12233 if (missingFiles) {
12236 TIter next(missingFiles);
12238 while ((file = next())) {
12239 dataset->GetList()->Remove(file);
12240 listOfMissingFiles->Add(file);
12242 missingFiles->SetOwner(kFALSE);
12243 missingFiles->Clear();
12245 SafeDelete(missingFiles);
12249 while ((pair = (TPair *) nxds())) {
12250 if (pair->Key())
delete pair->Key();
12251 if (pair->Value())
delete pair->Value();
12253 datasets->SetOwner(kTRUE);
12254 SafeDelete(datasets);
12257 if (srvmapslist && srvmapslist != srvmapsref) {
12258 srvmapslist->SetOwner(kTRUE);
12259 SafeDelete(srvmapslist);
12263 if (entrylist) dset->SetEntryList(entrylist);
12268 if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
12269 listOfMissingFiles->SetName(
"MissingFiles");
12270 input->Add(listOfMissingFiles);
12281 Int_t TProof::SaveInputData(TQueryResult *qr,
const char *cachedir, TString &emsg)
12286 if (!qr || !(input = qr->GetInputList()) ||
12287 !cachedir || strlen(cachedir) <= 0)
return 0;
12290 TNamed *data = (TNamed *) input->FindObject(
"PROOF_InputDataFile");
12291 TList *inputdata = (TList *) input->FindObject(
"PROOF_InputData");
12292 if (!data && !inputdata)
return 0;
12295 input->Add((data =
new TNamed(
"PROOF_InputDataFile", kPROOF_InputDataFile)));
12297 TString dstname(data->GetTitle()), srcname;
12298 Bool_t fromcache = kFALSE;
12299 if (dstname.BeginsWith(
"cache:")) {
12301 dstname.ReplaceAll(
"cache:",
"");
12302 srcname.Form(
"%s/%s", cachedir, dstname.Data());
12303 if (gSystem->AccessPathName(srcname)) {
12304 emsg.Form(
"input data file not found in cache (%s)", srcname.Data());
12311 if (gSystem->CopyFile(srcname, dstname, kTRUE) != 0) {
12312 emsg.Form(
"problems copying %s to %s", srcname.Data(), dstname.Data());
12317 if (inputdata && inputdata->GetSize() > 0) {
12318 TFile *f = TFile::Open(dstname.Data(),
"RECREATE");
12321 inputdata->Write();
12325 emsg.Form(
"could not create %s", dstname.Data());
12329 emsg.Form(
"no input data!");
12333 ::Info(
"TProof::SaveInputData",
"input data saved to %s", dstname.Data());
12336 data->SetTitle(dstname);
12338 input->Remove(inputdata);
12339 inputdata->SetOwner();
12350 Int_t TProof::SendInputData(TQueryResult *qr, TProof *p, TString &emsg)
12355 if (!qr || !(input = qr->GetInputList()))
return 0;
12358 TNamed *inputdata = (TNamed *) input->FindObject(
"PROOF_InputDataFile");
12359 if (!inputdata)
return 0;
12361 TString fname(inputdata->GetTitle());
12362 if (gSystem->AccessPathName(fname)) {
12363 emsg.Form(
"input data file not found in sandbox (%s)", fname.Data());
12368 if (!p || !p->IsValid()) {
12369 emsg.Form(
"TProof object undefined or invalid: protocol error!");
12374 p->BroadcastFile(fname, TProof::kBinary,
"cache");
12383 Int_t TProof::GetInputData(TList *input,
const char *cachedir, TString &emsg)
12386 if (!input || !cachedir || strlen(cachedir) <= 0)
return 0;
12389 TNamed *inputdata = (TNamed *) input->FindObject(
"PROOF_InputDataFile");
12390 if (!inputdata)
return 0;
12393 fname.Form(
"%s/%s", cachedir, inputdata->GetTitle());
12394 if (gSystem->AccessPathName(fname)) {
12395 emsg.Form(
"input data file not found in cache (%s)", fname.Data());
12400 TList *added =
new TList;
12401 added->SetName(
"PROOF_InputObjsFromFile");
12403 TFile *f = TFile::Open(fname.Data());
12405 TList *keys = (TList *) f->GetListOfKeys();
12407 emsg.Form(
"could not get list of object keys from file");
12412 while ((k = (TKey *)nxk())) {
12413 TObject *o = f->Get(k->GetName());
12420 if (added->GetSize() > 0) {
12429 emsg.Form(
"could not open %s", fname.Data());
12440 void TProof::LogViewer(
const char *url, Int_t idx)
12442 if (!gROOT->IsBatch()) {
12444 if (!fgLogViewer) {
12446 gROOT->GetPluginManager()->FindHandler(
"TProofProgressLog"))) {
12447 if (fgLogViewer->LoadPlugin() == -1) {
12449 ::Error(
"TProof::LogViewer",
"cannot load the relevant plug-in");
12456 TString u = (url && strlen(url) <= 0) ?
"lite" : url;
12457 fgLogViewer->ExecPlugin(2, u.Data(), idx);
12460 if (url && strlen(url) > 0) {
12461 ::Info(
"TProof::LogViewer",
12462 "batch mode: use TProofLog *pl = TProof::Mgr(\"%s\")->GetSessionLogs(%d)", url, idx);
12463 }
else if (url && strlen(url) <= 0) {
12464 ::Info(
"TProof::LogViewer",
12465 "batch mode: use TProofLog *pl = TProof::Mgr(\"lite\")->GetSessionLogs(%d)", idx);
12467 ::Info(
"TProof::LogViewer",
12468 "batch mode: use TProofLog *pl = TProof::Mgr(\"<master>\")->GetSessionLogs(%d)", idx);
12479 void TProof::SetProgressDialog(Bool_t on)
12482 SetBit(kUseProgressDialog);
12484 ResetBit(kUseProgressDialog);
12492 void TProof::ShowMissingFiles(TQueryResult *qr)
12494 TQueryResult *xqr = (qr) ? qr : GetQueryResult();
12496 Warning(
"ShowMissingFiles",
"no (last) query found: do nothing");
12501 TList *missing = (xqr->GetOutputList()) ? (TList *) xqr->GetOutputList()->FindObject(
"MissingFiles") : 0;
12503 Info(
"ShowMissingFiles",
"no files missing in query %s:%s", xqr->GetTitle(), xqr->GetName());
12507 Int_t nmf = 0, ncf = 0;
12508 Long64_t msz = 0, mszzip = 0, mev = 0;
12511 TIter nxf(missing);
12512 while ((fi = (TFileInfo *) nxf())) {
12514 if (fi->TestBit(TFileInfo::kCorrupted)) {
12520 TFileInfoMeta *im = fi->GetMetaData();
12522 if (im->GetTotBytes() > 0) msz += im->GetTotBytes();
12523 if (im->GetZipBytes() > 0) mszzip += im->GetZipBytes();
12524 mev += im->GetEntries();
12525 Printf(
" %d. (%c) %s %s %lld", ncf+nmf, status, fi->GetCurrentUrl()->GetUrl(), im->GetName(), im->GetEntries());
12527 Printf(
" %d. (%c) %s '' -1", ncf+nmf, status, fi->GetCurrentUrl()->GetUrl());
12532 if (msz <= 0) msz = -1;
12533 if (mszzip <= 0) mszzip = -1;
12534 Double_t xf = (Double_t)mev / (mev + xqr->GetEntries()) ;
12535 if (msz > 0. || mszzip > 0.) {
12536 Printf(
" +++ %d file(s) missing, %d corrupted, i.e. %lld unprocessed events -->"
12537 " about %.2f%% of the total (%lld bytes, %lld zipped)",
12538 nmf, ncf, mev, xf * 100., msz, mszzip);
12540 Printf(
" +++ %d file(s) missing, %d corrupted, i.e. %lld unprocessed events -->"
12541 " about %.2f%% of the total", nmf, ncf, mev, xf * 100.);
12551 TFileCollection *TProof::GetMissingFiles(TQueryResult *qr)
12553 TFileCollection *fc = 0;
12555 TQueryResult *xqr = (qr) ? qr : GetQueryResult();
12557 Warning(
"GetMissingFiles",
"no (last) query found: do nothing");
12562 TList *missing = (xqr->GetOutputList()) ? (TList *) xqr->GetOutputList()->FindObject(
"MissingFiles") : 0;
12565 Info(
"ShowMissingFiles",
"no files missing in query %s:%s", xqr->GetTitle(), xqr->GetName());
12570 TString fcname(
"unknown");
12571 TDSet *ds = (TDSet *) xqr->GetInputObject(
"TDSet");
12573 fcname.Form(
"%s.m0", ds->GetName());
12575 while (gDirectory->FindObject(fcname) && j < 1000)
12576 fcname.Form(
"%s.m%d", ds->GetName(), j++);
12578 fc =
new TFileCollection(fcname,
"Missing Files");
12579 if (ds) fc->SetDefaultTreeName(ds->GetObjName());
12582 TIter nxf(missing);
12583 while ((fi = (TFileInfo *) nxf())) {
12584 fc->Add((TFileInfo *) fi->Clone());
12594 void TProof::SetPerfTree(
const char *pf, Bool_t withWrks)
12596 if (pf && strlen(pf) > 0) {
12598 SetParameter(
"PROOF_StatsHist",
"");
12599 SetParameter(
"PROOF_StatsTrace",
"");
12600 if (withWrks) SetParameter(
"PROOF_SlaveStatsTrace",
"");
12601 Info(
"SetPerfTree",
"saving of the performance tree enabled (%s)", fPerfTree.Data());
12604 DeleteParameters(
"PROOF_StatsHist");
12605 DeleteParameters(
"PROOF_StatsTrace");
12606 DeleteParameters(
"PROOF_SlaveStatsTrace");
12607 Info(
"SetPerfTree",
"saving of the performance tree disabled");
12616 Int_t TProof::SavePerfTree(
const char *pf,
const char *ref)
12619 Error(
"SafePerfTree",
"this TProof instance is invalid!");
12623 TList *outls = GetOutputList();
12625 if (ref && strlen(ref) > 0) {
12627 Error(
"SafePerfTree",
"requested to use query '%s' but player instance undefined!", ref);
12630 TQueryResult *qr = fPlayer->GetQueryResult(ref);
12632 Error(
"SafePerfTree",
"TQueryResult instance for query '%s' could not be retrieved", ref);
12635 outls = qr->GetOutputList();
12636 sref.Form(
" for requested query '%s'", ref);
12638 if (!outls || (outls && outls->GetSize() <= 0)) {
12639 Error(
"SafePerfTree",
"outputlist%s undefined or empty", sref.Data());
12643 TString fn = fPerfTree;
12644 if (pf && strlen(pf)) fn = pf;
12645 if (fn.IsNull()) fn =
"perftree.root";
12647 TFile f(fn,
"RECREATE");
12648 if (f.IsZombie()) {
12649 Error(
"SavePerfTree",
"could not open file '%s' for writing", fn.Data());
12654 while ((obj = nxo())) {
12655 TString objname(obj->GetName());
12656 if (objname.BeginsWith(
"PROOF_")) {
12659 if (objname ==
"PROOF_PerfStats" ||
12660 objname ==
"PROOF_PacketsHist" ||
12661 objname ==
"PROOF_EventsHist" ||
12662 objname ==
"PROOF_NodeHist" ||
12663 objname ==
"PROOF_LatencyHist" ||
12664 objname ==
"PROOF_ProcTimeHist" ||
12665 objname ==
"PROOF_CpuTimeHist")
12671 Info(
"SavePerfTree",
"performance information%s saved in %s ...", sref.Data(), fn.Data());