80 #define kPEX_STOPPED 1001
81 #define kPEX_ABORTED 1002
85 static Bool_t gAbort = kFALSE;
87 class TAutoBinVal :
public TNamed {
89 Double_t fXmin, fXmax, fYmin, fYmax, fZmin, fZmax;
92 TAutoBinVal(
const char *name, Double_t xmin, Double_t xmax, Double_t ymin,
93 Double_t ymax, Double_t zmin, Double_t zmax) : TNamed(name,
"")
95 fXmin = xmin; fXmax = xmax;
96 fYmin = ymin; fYmax = ymax;
97 fZmin = zmin; fZmax = zmax;
99 void GetAll(Double_t& xmin, Double_t& xmax, Double_t& ymin,
100 Double_t& ymax, Double_t& zmin, Double_t& zmax)
102 xmin = fXmin; xmax = fXmax;
103 ymin = fYmin; ymax = fYmax;
104 zmin = fZmin; zmax = fZmax;
113 class TDispatchTimer :
public TTimer {
115 TProofPlayer *fPlayer;
118 TDispatchTimer(TProofPlayer *p) : TTimer(1000, kFALSE), fPlayer(p) { }
127 Bool_t TDispatchTimer::Notify()
129 if (gDebug > 0) printf(
"TDispatchTimer::Notify: called!\n");
131 fPlayer->SetBit(TProofPlayer::kDispatchOneEvent);
142 class TProctimeTimer :
public TTimer {
144 TProofPlayer *fPlayer;
147 TProctimeTimer(TProofPlayer *p, Long_t to) : TTimer(to, kFALSE), fPlayer(p) { }
156 Bool_t TProctimeTimer::Notify()
158 if (gDebug > 0) printf(
"TProctimeTimer::Notify: called!\n");
160 fPlayer->SetBit(TProofPlayer::kMaxProcTimeReached);
170 class TStopTimer :
public TTimer {
173 TProofPlayer *fPlayer;
176 TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to);
187 TStopTimer::TStopTimer(TProofPlayer *p, Bool_t abort, Int_t to)
188 : TTimer(((to <= 0 || to > 864000) ? 10 : to * 1000), kFALSE)
191 Info (
"TStopTimer",
"enter: %d, timeout: %d", abort, to);
197 Info (
"TStopTimer",
"timeout set to %s ms", fTime.AsString());
206 Bool_t TStopTimer::Notify()
208 if (gDebug > 0) printf(
"TStopTimer::Notify: called!\n");
220 ClassImp(TProofPlayer);
222 THashList *TProofPlayer::fgDrawInputPars = 0;
227 TProofPlayer::TProofPlayer(TProof *)
228 : fAutoBins(0), fOutput(0), fSelector(0), fCreateSelObj(kTRUE), fSelectorClass(0),
229 fFeedbackTimer(0), fFeedbackPeriod(2000),
230 fEvIter(0), fSelStatus(0),
231 fTotalEvents(0), fReadBytesRun(0), fReadCallsRun(0), fProcessedRun(0),
232 fQueryResults(0), fQuery(0), fPreviousQuery(0), fDrawQueries(0),
233 fMaxDrawQueries(1), fStopTimer(0), fDispatchTimer(0),
234 fProcTimeTimer(0), fProcTime(0),
236 fSaveMemThreshold(-1), fSavePartialResults(kFALSE), fSaveResultsPerPacket(kFALSE)
239 fExitStatus = kFinished;
240 fProgressStatus =
new TProofProgressStatus();
241 ResetBit(TProofPlayer::kDispatchOneEvent);
242 ResetBit(TProofPlayer::kIsProcessing);
243 ResetBit(TProofPlayer::kMaxProcTimeReached);
244 ResetBit(TProofPlayer::kMaxProcTimeExtended);
246 static Bool_t initLimitsFinder = kFALSE;
247 if (!initLimitsFinder && gProofServ && !gProofServ->IsMaster()) {
248 THLimitsFinder::SetLimitsFinder(
new TProofLimitsFinder);
249 initLimitsFinder = kTRUE;
257 TProofPlayer::~TProofPlayer()
259 fInput->Clear(
"nodelete");
262 SafeDelete(fSelector);
263 SafeDelete(fFeedbackTimer);
265 SafeDelete(fQueryResults);
266 SafeDelete(fDispatchTimer);
267 SafeDelete(fProcTimeTimer);
268 SafeDelete(fProcTime);
269 SafeDelete(fStopTimer);
275 void TProofPlayer::SetProcessing(Bool_t on)
278 SetBit(TProofPlayer::kIsProcessing);
280 ResetBit(TProofPlayer::kIsProcessing);
288 void TProofPlayer::StopProcess(Bool_t abort, Int_t timeout)
291 Info (
"StopProcess",
"abort: %d, timeout: %d", abort, timeout);
294 fEvIter->StopProcess(abort);
296 if (abort == kTRUE) {
297 fExitStatus = kAborted;
299 fExitStatus = kStopped;
304 SetStopTimer(kTRUE, abort, to);
310 void TProofPlayer::SetDispatchTimer(Bool_t on)
312 SafeDelete(fDispatchTimer);
313 ResetBit(TProofPlayer::kDispatchOneEvent);
315 fDispatchTimer =
new TDispatchTimer(
this);
316 fDispatchTimer->Start();
324 void TProofPlayer::SetStopTimer(Bool_t on, Bool_t abort, Int_t timeout)
326 std::lock_guard<std::mutex> lock(fStopTimerMtx);
329 SafeDelete(fStopTimer);
332 fStopTimer =
new TStopTimer(
this, abort, timeout);
336 Info (
"SetStopTimer",
"%s timer STARTED (timeout: %d)",
337 (abort ?
"ABORT" :
"STOP"), timeout);
340 Info (
"SetStopTimer",
"timer STOPPED");
348 void TProofPlayer::AddQueryResult(TQueryResult *q)
351 Warning(
"AddQueryResult",
"query undefined - do nothing");
356 if (!(q->IsDraw())) {
357 if (!fQueryResults) {
358 fQueryResults =
new TList;
359 fQueryResults->Add(q);
361 TIter nxr(fQueryResults);
362 TQueryResult *qr = 0;
363 TQueryResult *qp = 0;
364 while ((qr = (TQueryResult *) nxr())) {
367 fQueryResults->Remove(qr);
372 if (qr->GetStartTime().Convert() <= q->GetStartTime().Convert())
377 fQueryResults->AddFirst(q);
379 fQueryResults->AddAfter(qp, q);
382 }
else if (IsClient()) {
384 if (fDrawQueries == fMaxDrawQueries && fMaxDrawQueries > 0) {
385 TIter nxr(fQueryResults);
386 TQueryResult *qr = 0;
387 while ((qr = (TQueryResult *) nxr())) {
391 fQueryResults->Remove(qr);
398 if (fDrawQueries >= 0 && fDrawQueries < fMaxDrawQueries) {
401 fQueryResults =
new TList;
402 fQueryResults->Add(q);
411 void TProofPlayer::RemoveQueryResult(
const char *ref)
414 TIter nxq(fQueryResults);
415 TQueryResult *qr = 0;
416 while ((qr = (TQueryResult *) nxq())) {
417 if (qr->Matches(ref)) {
418 fQueryResults->Remove(qr);
429 TQueryResult *TProofPlayer::GetQueryResult(
const char *ref)
432 if (ref && strlen(ref) > 0) {
433 TIter nxq(fQueryResults);
434 TQueryResult *qr = 0;
435 while ((qr = (TQueryResult *) nxq())) {
436 if (qr->Matches(ref))
441 return (TQueryResult *) fQueryResults->Last();
446 return (TQueryResult *)0;
452 void TProofPlayer::SetCurrentQuery(TQueryResult *q)
454 fPreviousQuery = fQuery;
461 void TProofPlayer::AddInput(TObject *inp)
469 void TProofPlayer::ClearInput()
477 TObject *TProofPlayer::GetOutput(
const char *name)
const
480 return fOutput->FindObject(name);
487 TList *TProofPlayer::GetOutputList()
const
491 ol = fQuery->GetOutputList();
500 Int_t TProofPlayer::ReinitSelector(TQueryResult *qr)
506 Info(
"ReinitSelector",
"query undefined - do nothing");
511 TString selec = qr->GetSelecImp()->GetName();
512 if (selec.Length() <= 0) {
513 Info(
"ReinitSelector",
"selector name undefined - do nothing");
518 Bool_t stdselec = TSelector::IsStandardDraw(selec);
522 Bool_t compselec = (selec.Contains(
".") || stdselec) ? kFALSE : kTRUE;
526 if (!stdselec && !compselec) {
528 Bool_t expandselec = kTRUE;
530 char *selc = gSystem->Which(TROOT::GetMacroPath(), selec, kReadPermission);
533 TMD5 *md5icur = 0, *md5iold = 0, *md5hcur = 0, *md5hold = 0;
535 md5icur = TMD5::FileChecksum(selc);
536 md5iold = qr->GetSelecImp()->Checksum();
539 Int_t dot = selh.Last(
'.');
540 if (dot != kNPOS) selh.Remove(dot);
542 if (!gSystem->AccessPathName(selh, kReadPermission))
543 md5hcur = TMD5::FileChecksum(selh);
544 md5hold = qr->GetSelecHdr()->Checksum();
547 if (md5hcur && md5hold && md5icur && md5iold)
548 if (*md5hcur == *md5hold && *md5icur == *md5iold)
549 expandselec = kFALSE;
555 if (selc)
delete [] selc;
565 dir = Form(
"%s/%s",gSystem->TempDirectory(),u.AsString());
566 if (!(gSystem->MakeDirectory(dir))) {
569 selec = Form(
"%s/%s",dir.Data(),selec.Data());
570 qr->GetSelecImp()->SaveSource(selec);
573 TString seleh = Form(
"%s/%s",dir.Data(),qr->GetSelecHdr()->GetName());
574 qr->GetSelecHdr()->SaveSource(seleh);
577 ipathold = gSystem->GetIncludePath();
578 ipath = Form(
"-I%s %s", dir.Data(), gSystem->GetIncludePath());
579 gSystem->SetIncludePath(ipath.Data());
584 TString opt(qr->GetOptions());
585 Ssiz_t
id = opt.Last(
'#');
586 if (
id != kNPOS &&
id < opt.Length() - 1)
587 selec += opt(
id + 1, opt.Length());
590 Info(
"ReinitSelector",
"problems locating or exporting selector files");
596 SafeDelete(fSelector);
600 Int_t iglevelsave = gErrorIgnoreLevel;
603 gErrorIgnoreLevel = kBreak;
605 if ((fSelector = TSelector::GetSelector(selec))) {
607 gErrorIgnoreLevel = iglevelsave;
608 fSelectorClass = fSelector->IsA();
609 fSelector->SetOption(qr->GetOptions());
613 gErrorIgnoreLevel = iglevelsave;
615 if (strlen(qr->GetLibList()) > 0) {
616 TString sl(qr->GetLibList());
617 TObjArray *oa = sl.Tokenize(
" ");
619 Bool_t retry = kFALSE;
622 while ((os = (TObjString *) nxl())) {
623 TString lib = gSystem->BaseName(os->GetName());
625 lib.ReplaceAll(
"-l",
"lib");
626 if (gSystem->Load(lib) == 0)
632 fSelector = TSelector::GetSelector(selec);
638 Info(
"ReinitSelector",
"compiled selector re-init failed:"
639 " automatic reload unsuccessful:"
640 " please load manually the correct library");
646 fSelector->SetInputList(qr->GetInputList());
648 ((TProofDraw *)fSelector)->DefVar();
656 if (ipathold.Length() > 0)
657 gSystem->SetIncludePath(ipathold.Data());
665 Int_t TProofPlayer::AddOutputObject(TObject *)
667 MayNotUse(
"AddOutputObject");
674 void TProofPlayer::AddOutput(TList *)
676 MayNotUse(
"AddOutput");
682 void TProofPlayer::StoreOutput(TList *)
684 MayNotUse(
"StoreOutput");
690 void TProofPlayer::StoreFeedback(TObject *, TList *)
692 MayNotUse(
"StoreFeedback");
698 void TProofPlayer::Progress(Long64_t , Long64_t )
700 MayNotUse(
"Progress");
706 void TProofPlayer::Progress(Long64_t , Long64_t ,
711 MayNotUse(
"Progress");
717 void TProofPlayer::Progress(TProofProgressInfo * )
719 MayNotUse(
"Progress");
725 void TProofPlayer::Feedback(TList *)
727 MayNotUse(
"Feedback");
734 TDrawFeedback *TProofPlayer::CreateDrawFeedback(TProof *p)
736 return new TDrawFeedback(p);
742 void TProofPlayer::SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt)
751 void TProofPlayer::DeleteDrawFeedback(TDrawFeedback *f)
781 Int_t TProofPlayer::SavePartialResults(Bool_t queryend, Bool_t force)
783 Bool_t save = (force || (fSavePartialResults &&
784 (queryend || fSaveResultsPerPacket))) ? kTRUE : kFALSE;
787 Info("SavePartialResults", "partial result saving disabled");
793 Error(
"SavePartialResults",
"gProofServ undefined: something really wrong going on!!!");
797 Error(
"SavePartialResults",
"fOutput undefined: something really wrong going on!!!");
802 Info("SavePartialResults", "start saving partial results {%d,%d,%d,%d}
",
803 queryend, force, fSavePartialResults, fSaveResultsPerPacket);
805 // Get list of processed packets from the iterator
806 PDB(kOutput, 2) Info("SavePartialResults
", "fEvIter: %p
", fEvIter);
808 TList *packets = (fEvIter) ? fEvIter->GetPackets() : 0;
809 PDB(kOutput, 2) Info("SavePartialResults
", "list of packets: %p, sz: %d
",
810 packets, (packets ? packets->GetSize(): -1));
813 const char *oopt = "UPDATE
";
814 // Check if the file has already been defined
815 TString baseName(fOutputFilePath);
816 if (fOutputFilePath.IsNull()) {
817 baseName.Form("output-%s.q%d.root
", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
818 if (gProofServ->GetDataDirOpts() && strlen(gProofServ->GetDataDirOpts()) > 0) {
819 fOutputFilePath.Form("%s/%s?%s
", gProofServ->GetDataDir(), baseName.Data(),
820 gProofServ->GetDataDirOpts());
822 fOutputFilePath.Form("%s/%s
", gProofServ->GetDataDir(), baseName.Data());
824 Info("SavePartialResults
", "file with (partial) output: '%s'", fOutputFilePath.Data());
828 if (!(fOutputFile = TFile::Open(fOutputFilePath, oopt)) ||
829 (fOutputFile && fOutputFile->IsZombie())) {
830 Error(
"SavePartialResults",
"cannot open '%s' for writing", fOutputFilePath.Data());
831 SafeDelete(fOutputFile);
836 TDirectory *curdir = gDirectory;
841 TDirectory *packetsDir = fOutputFile->mkdir(
"packets");
842 if (packetsDir) packetsDir->cd();
843 packets->Write(0, TObject::kSingleKey | TObject::kOverwrite);
847 Bool_t notempty = kFALSE;
852 while ((o = nxo())) {
854 if (o->InheritsFrom(TProofOutputFile::Class()))
continue;
856 if (!strncmp(o->GetName(),
"PROOF_", 6))
continue;
858 if (o->InheritsFrom(TOutputListSelectorDataMap::Class()))
continue;
860 if (!strcmp(o->GetName(),
"MissingFiles"))
continue;
862 if (o->InheritsFrom(
"TTree")) {
863 TTree *t = (TTree *) o;
864 TDirectory *d = t->GetDirectory();
866 if (!d || (d && !d->InheritsFrom(
"TFile"))) {
868 t->SetDirectory(fOutputFile);
870 if (t->GetDirectory() == fOutputFile) {
873 o->Write(0, TObject::kOverwrite);
885 }
else if (queryend || fSaveResultsPerPacket) {
887 o->Write(0, TObject::kOverwrite);
891 if (queryend) torm.Add(o);
900 if (!fOutput->FindObject(baseName)) {
901 TProofOutputFile *po = 0;
903 TNamed *nm = (TNamed *) fInput->FindObject(
"PROOF_DefaultOutputOption");
904 TString oname = (nm) ? nm->GetTitle() : fOutputFilePath.Data();
905 if (nm && oname.BeginsWith(
"ds:")) {
906 oname.Replace(0, 3,
"");
908 TString::Format(
"%s_q%d", gProofServ->GetTopSessionTag(), gProofServ->GetQuerySeqNum());
909 oname.ReplaceAll(
"<qtag>", qtag);
911 po =
new TProofOutputFile(baseName,
"DRO", oname.Data());
913 Bool_t hasddir = kFALSE;
915 po =
new TProofOutputFile(baseName,
"M");
916 if (oname.BeginsWith(
"of:")) oname.Replace(0, 3,
"");
917 if (gProofServ->IsTopMaster()) {
918 if (!strcmp(TUrl(oname, kTRUE).GetProtocol(),
"file")) {
920 TProofServ::GetLocalServer(dsrv);
921 TProofServ::FilterLocalroot(oname, dsrv);
922 oname.Insert(0, dsrv);
927 oname.ReplaceAll(
"<file>", baseName);
931 oname.Form(
"<datadir>/%s", baseName.Data());
935 po->SetOutputFileName(oname.Data());
938 po->ResetBit(TProofOutputFile::kOutputFileNameSet);
939 po->SetName(gSystem->BaseName(oname.Data()));
941 po->AdoptFile(fOutputFile);
944 po->SetBit(TProofOutputFile::kSwapFile);
947 fOutputFile->Close();
948 SafeDelete(fOutputFile);
951 if (queryend && torm.GetSize() > 0) {
953 while ((o = nxrm())) { fOutput->Remove(o); }
955 torm.SetOwner(kFALSE);
958 Info("SavePartialResults", "partial results saved to file");
967 Int_t TProofPlayer::AssertSelector(const
char *selector_file)
969 if (selector_file && strlen(selector_file)) {
970 SafeDelete(fSelector);
973 TString ocwd = gSystem->WorkingDirectory();
975 gProofServ->GetCacheLock()->Lock();
976 gSystem->ChangeDirectory(gProofServ->GetCacheDir());
979 fSelector = TSelector::GetSelector(selector_file);
982 gSystem->ChangeDirectory(ocwd);
983 gProofServ->GetCacheLock()->Unlock();
987 Error(
"AssertSelector",
"cannot load: %s", selector_file );
991 fCreateSelObj = kTRUE;
992 Info(
"AssertSelector",
"Processing via filename (%s)", selector_file);
993 }
else if (!fSelector) {
994 Error(
"AssertSelector",
"no TSelector object define : cannot continue!");
997 Info(
"AssertSelector",
"Processing via TSelector object");
1005 void TProofPlayer::UpdateProgressInfo()
1007 if (fProgressStatus) {
1008 fProgressStatus->IncEntries(fProcessedRun);
1009 fProgressStatus->SetBytesRead(TFile::GetFileBytesRead()-fReadBytesRun);
1010 fProgressStatus->SetReadCalls(TFile::GetFileReadCalls()-fReadCallsRun);
1011 fProgressStatus->SetLastUpdate();
1012 if (gMonitoringWriter)
1013 gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(),
1014 fReadBytesRun, kFALSE);
1024 Long64_t TProofPlayer::Process(TDSet *dset,
const char *selector_file,
1025 Option_t *option, Long64_t nentries,
1028 PDB(kGlobal,1) Info("Process","Enter");
1030 fExitStatus = kFinished;
1033 TCleanup clean(this);
1038 if (AssertSelector(selector_file) != 0 || !fSelector) {
1039 Error(
"Process",
"cannot assert the selector object");
1043 fSelectorClass = fSelector->IsA();
1044 Int_t version = fSelector->Version();
1045 if (version == 0 && IsClient()) fSelector->GetOutputList()->Clear();
1047 fOutput = (THashList *) fSelector->GetOutputList();
1050 TPerfStats::Start(fInput, fOutput);
1052 fSelStatus =
new TStatus;
1053 fOutput->Add(fSelStatus);
1055 fSelector->SetOption(option);
1056 fSelector->SetInputList(fInput);
1060 fTotalEvents = nentries;
1061 if (fTotalEvents < 0 && gProofServ &&
1062 gProofServ->IsMaster() && !gProofServ->IsParallel()) {
1065 TDSetElement *e = 0;
1066 while ((e = dset->Next())) {
1067 fTotalEvents += e->GetNum();
1074 Int_t useTreeCache = 1;
1075 if (TProof::GetParameter(fInput,
"PROOF_UseTreeCache", useTreeCache) == 0) {
1076 if (useTreeCache > -1 && useTreeCache < 2)
1077 gEnv->SetValue(
"ProofPlayer.UseTreeCache", useTreeCache);
1079 Long64_t cacheSize = -1;
1080 if (TProof::GetParameter(fInput,
"PROOF_CacheSize", cacheSize) == 0) {
1081 TString sz = TString::Format(
"%lld", cacheSize);
1082 gEnv->SetValue(
"ProofPlayer.CacheSize", sz.Data());
1085 Int_t useParallelUnzip = 0;
1086 if (TProof::GetParameter(fInput,
"PROOF_UseParallelUnzip", useParallelUnzip) == 0) {
1087 if (useParallelUnzip > -1 && useParallelUnzip < 2)
1088 gEnv->SetValue(
"ProofPlayer.UseParallelUnzip", useParallelUnzip);
1091 Int_t dontCacheFiles = 0;
1092 if (TProof::GetParameter(fInput,
"PROOF_DontCacheFiles", dontCacheFiles) == 0) {
1093 if (dontCacheFiles == 1)
1094 gEnv->SetValue(
"ProofPlayer.DontCacheFiles", 1);
1096 fEvIter = TEventIter::Create(dset, fSelector, first, nentries);
1105 if (TProof::GetParameter(fInput,
"PROOF_SavePartialResults", opt) != 0) {
1106 opt = gEnv->GetValue(
"ProofPlayer.SavePartialResults", 0);
1108 fSaveResultsPerPacket = (opt >= 10) ? kTRUE : kFALSE;
1109 fSavePartialResults = (opt%10 > 0) ? kTRUE : kFALSE;
1110 Info(
"Process",
"save partial results? %d per-packet? %d", fSavePartialResults, fSaveResultsPerPacket);
1113 Float_t memfrac = gEnv->GetValue(
"ProofPlayer.SaveMemThreshold", -1.);
1117 if (gSystem->GetSysInfo(&si) == 0) {
1118 fSaveMemThreshold = (Long_t) ((memfrac * si.fPhysRam * 1024.) / si.fCpus);
1119 Info(
"Process",
"memory threshold for saving objects to file set to %ld kB",
1122 Error(
"Process",
"cannot get SysInfo_t (!)");
1127 PDB(kLoop,1) Info("Process","Call Begin(0)");
1128 fSelector->Begin(0);
1132 PDB(kLoop,1) Info("Process","Call Begin(0)");
1133 fSelector->Begin(0);
1135 if (!fSelStatus->TestBit(TStatus::kNotOk)) {
1136 PDB(kLoop,1) Info("Process","Call SlaveBegin(0)");
1137 fSelector->SlaveBegin(0);
1143 ResetBit(TProofPlayer::kIsProcessing);
1144 Error(
"Process",
"exception %d caught", excode);
1145 gProofServ->GetCacheLock()->Unlock();
1150 if (SavePartialResults(kFALSE) < 0)
1151 Warning(
"Process",
"problems seetting up file-object swapping");
1156 if (gMonitoringWriter)
1157 gMonitoringWriter->SendProcessingStatus(
"STARTED",kTRUE);
1160 Info("Process","Looping over Process()");
1163 fReadBytesRun = TFile::GetFileBytesRead();
1164 fReadCallsRun = TFile::GetFileReadCalls();
1167 if (gMonitoringWriter)
1168 gMonitoringWriter->SendProcessingProgress(0,0,kTRUE);
1171 SetDispatchTimer(kTRUE);
1176 fProgressStatus->Reset();
1177 if (gProofServ) gProofServ->ResetBit(TProofServ::kHighMemory);
1183 Long64_t memlogfreq = -1;
1184 if (((mrc = TProof::GetParameter(fInput,
"PROOF_MemLogFreq", memlogfreq))) != 0) memlogfreq = -1;
1185 Long64_t singleshot = 1;
1186 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1187 TString lastMsg(
"(unfortunately no detailed info is available about current packet)");
1190 if (!CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg)) {
1191 Error(
"Process",
"%s", wmsg.Data());
1192 wmsg.Insert(0, TString::Format(
"ERROR:%s, after SlaveBegin(), ", gProofServ ? gProofServ->GetOrdinal() :
"gProofServ is nullptr"));
1193 fSelStatus->Add(wmsg.Data());
1195 gProofServ->SendAsynMessage(wmsg.Data());
1196 gProofServ->SetBit(TProofServ::kHighMemory);
1198 fExitStatus = kStopped;
1199 ResetBit(TProofPlayer::kIsProcessing);
1200 }
else if (!wmsg.IsNull()) {
1201 Warning(
"Process",
"%s", wmsg.Data());
1204 TPair *currentElem = 0;
1206 Long64_t fst = -1, num;
1207 Long_t maxproctime = -1;
1208 Bool_t newrun = kFALSE;
1209 while ((fEvIter->GetNextPacket(fst, num) != -1) &&
1210 !fSelStatus->TestBit(TStatus::kNotOk) &&
1211 fSelector->GetAbort() == TSelector::kContinue) {
1214 SetBit(TProofPlayer::kIsProcessing);
1218 if (dset->Current()) {
1220 currentElem =
new TPair(
new TObjString(
"PROOF_CurrentElement"), dset->Current());
1221 fInput->Add(currentElem);
1223 if (currentElem->Value() != dset->Current()) {
1224 currentElem->SetValue(dset->Current());
1225 }
else if (dset->Current()->TestBit(TDSetElement::kNewRun)) {
1226 dset->Current()->ResetBit(TDSetElement::kNewRun);
1229 if (dset->Current()->TestBit(TDSetElement::kNewPacket)) {
1230 if (dset->TestBit(TDSet::kEmpty)) {
1231 lastMsg =
"check logs for possible stacktrace - last cycle:";
1233 TDSetElement *elem =
dynamic_cast<TDSetElement *
>(currentElem->Value());
1234 TString fn = (elem) ? elem->GetFileName() :
"<undef>";
1235 lastMsg.Form(
"while processing dset:'%s', file:'%s'"
1236 " - check logs for possible stacktrace - last event:", dset->GetName(), fn.Data());
1238 TProofServ::SetLastMsg(lastMsg);
1241 if (dset->Current()->GetMaxProcTime() >= 0.)
1242 maxproctime = (Long_t) (1000 * dset->Current()->GetMaxProcTime());
1243 newrun = (dset->Current()->TestBit(TDSetElement::kNewPacket)) ? kTRUE : kFALSE;
1246 ResetBit(TProofPlayer::kMaxProcTimeReached);
1247 ResetBit(TProofPlayer::kMaxProcTimeExtended);
1249 if (maxproctime > 0) {
1250 if (!fProcTimeTimer) fProcTimeTimer =
new TProctimeTimer(
this, maxproctime);
1251 fProcTimeTimer->Start(maxproctime, kTRUE);
1252 if (!fProcTime) fProcTime =
new TStopwatch();
1255 Long64_t refnum = num;
1256 if (refnum < 0 && maxproctime <= 0) {
1257 wmsg.Form(
"neither entries nor max proc time specified:"
1258 " risk of infinite loop: processing aborted");
1259 Error(
"Process",
"%s", wmsg.Data());
1261 wmsg.Insert(0, TString::Format(
"ERROR:%s, entry:%lld, ",
1262 gProofServ->GetOrdinal(), fProcessedRun));
1263 gProofServ->SendAsynMessage(wmsg.Data());
1265 fExitStatus = kAborted;
1266 ResetBit(TProofPlayer::kIsProcessing);
1269 while (refnum < 0 || num--) {
1272 if (TestBit(TProofPlayer::kMaxProcTimeReached)) {
1274 if (!newrun && !TestBit(TProofPlayer::kMaxProcTimeExtended) && refnum > 0) {
1276 Float_t xleft = (refnum > num) ? (Float_t) num / (Float_t) (refnum) : 1.;
1279 Long_t mpt = (Long_t) (1500 * num / ((Double_t)(refnum - num) / fProcTime->RealTime()));
1280 SetBit(TProofPlayer::kMaxProcTimeExtended);
1281 fProcTimeTimer->Start(mpt, kTRUE);
1282 ResetBit(TProofPlayer::kMaxProcTimeReached);
1285 if (TestBit(TProofPlayer::kMaxProcTimeReached)) {
1286 Info(
"Process",
"max proc time reached (%ld msecs): packet processing stopped:\n%s",
1287 maxproctime, lastMsg.Data());
1293 if (!(!fSelStatus->TestBit(TStatus::kNotOk) &&
1294 fSelector->GetAbort() == TSelector::kContinue))
break;
1297 entry = fEvIter->GetEntryNumber(fst);
1301 TProofServ::SetLastEntry(entry);
1303 if (fSelector->Version() == 0) {
1305 Info("Process","Call ProcessCut(%lld)", entry);
1306 if (fSelector->ProcessCut(entry)) {
1308 Info("Process","Call ProcessFill(%lld)", entry);
1309 fSelector->ProcessFill(entry);
1313 Info("Process","Call Process(%lld)", entry);
1314 fSelector->Process(entry);
1315 if (fSelector->GetAbort() == TSelector::kAbortProcess) {
1316 ResetBit(TProofPlayer::kIsProcessing);
1318 }
else if (fSelector->GetAbort() == TSelector::kAbortFile) {
1319 Info(
"Process",
"packet processing aborted following the"
1320 " selector settings:\n%s", lastMsg.Data());
1321 fEvIter->InvalidatePacket();
1322 fProgressStatus->SetBit(TProofProgressStatus::kFileCorrupted);
1325 if (!fSelStatus->TestBit(TStatus::kNotOk)) fProcessedRun++;
1328 if (memlogfreq > 0 && (GetEventsProcessed() + fProcessedRun)%memlogfreq == 0) {
1329 if (!CheckMemUsage(memlogfreq, warnHWMres, warnHWMvir, wmsg)) {
1330 Error(
"Process",
"%s", wmsg.Data());
1332 wmsg.Insert(0, TString::Format(
"ERROR:%s, entry:%lld, ",
1333 gProofServ->GetOrdinal(), entry));
1334 gProofServ->SendAsynMessage(wmsg.Data());
1336 fExitStatus = kStopped;
1337 ResetBit(TProofPlayer::kIsProcessing);
1338 if (gProofServ) gProofServ->SetBit(TProofServ::kHighMemory);
1341 if (!wmsg.IsNull()) {
1342 Warning(
"Process",
"%s", wmsg.Data());
1344 wmsg.Insert(0, TString::Format(
"WARNING:%s, entry:%lld, ",
1345 gProofServ->GetOrdinal(), entry));
1346 gProofServ->SendAsynMessage(wmsg.Data());
1351 if (TestBit(TProofPlayer::kDispatchOneEvent)) {
1352 gSystem->DispatchOneEvent(kTRUE);
1353 ResetBit(TProofPlayer::kDispatchOneEvent);
1355 ResetBit(TProofPlayer::kIsProcessing);
1356 if (fSelStatus->TestBit(TStatus::kNotOk) || gROOT->IsInterrupted())
break;
1359 if (fSelector->GetAbort() == TSelector::kAbortFile)
1360 fSelector->Abort(
"status reset", TSelector::kContinue);
1365 if (excode == kPEX_STOPPED) {
1366 Info(
"Process",
"received stop-process signal");
1367 fExitStatus = kStopped;
1368 }
else if (excode == kPEX_ABORTED) {
1370 Info(
"Process",
"received abort-process signal");
1371 fExitStatus = kAborted;
1373 Error(
"Process",
"exception %d caught", excode);
1376 fExitStatus = kAborted;
1378 ResetBit(TProofPlayer::kIsProcessing);
1382 TPair *currentElem = 0;
1383 if ((currentElem = (TPair *) fInput->FindObject(
"PROOF_CurrentElement"))) {
1384 if ((currentElem = (TPair *) fInput->Remove(currentElem))) {
1385 delete currentElem->Key();
1391 Long64_t singleshot = 1;
1392 Bool_t warnHWMres = kTRUE, warnHWMvir = kTRUE;
1393 Bool_t shrc = CheckMemUsage(singleshot, warnHWMres, warnHWMvir, wmsg);
1394 if (!wmsg.IsNull()) Warning(
"Process",
"%s (%s)", wmsg.Data(), shrc ?
"warn" :
"hwm");
1397 Info("Process","%lld events processed", fProgressStatus->GetEntries());
1399 if (gMonitoringWriter) {
1400 gMonitoringWriter->SendProcessingProgress(fProgressStatus->GetEntries(),
1401 TFile::GetFileBytesRead()-fReadBytesRun, kFALSE);
1402 gMonitoringWriter->SendProcessingStatus(
"DONE");
1406 SetDispatchTimer(kFALSE);
1407 if (fStopTimer != 0)
1408 SetStopTimer(kFALSE, gAbort);
1409 if (fFeedbackTimer != 0)
1415 if (SavePartialResults(kTRUE) < 0)
1416 Warning(
"Process",
"problems saving the results to file");
1418 SafeDelete(fEvIter);
1422 if (fExitStatus != kAborted) {
1424 TIter nxo(GetOutputList());
1426 while ((o = nxo())) {
1428 if (o->IsA() == TProofOutputFile::Class()) {
1429 TProofOutputFile *of = (TProofOutputFile *)o;
1431 of->SetWorkerOrdinal(gProofServ->GetOrdinal());
1432 const char *dir = of->GetDir();
1433 if (!dir || (dir && strlen(dir) <= 0)) {
1434 of->SetDir(gProofServ->GetSessionDir());
1435 }
else if (dir && strlen(dir) > 0) {
1437 if (!strcmp(u.GetHost(),
"localhost") || !strcmp(u.GetHost(),
"127.0.0.1") ||
1438 !strcmp(u.GetHost(),
"localhost.localdomain")) {
1439 u.SetHost(TUrl(gSystem->HostName()).GetHostFQDN());
1440 of->SetDir(u.GetUrl(kTRUE));
1447 MapOutputListToDataMembers();
1449 if (!fSelStatus->TestBit(TStatus::kNotOk)) {
1450 if (fSelector->Version() == 0) {
1451 PDB(kLoop,1) Info("Process","Call Terminate()");
1452 fSelector->Terminate();
1454 PDB(kLoop,1) Info("Process","Call SlaveTerminate()");
1455 fSelector->SlaveTerminate();
1456 if (IsClient() && !fSelStatus->TestBit(TStatus::kNotOk)) {
1457 PDB(kLoop,1) Info("Process","Call Terminate()");
1458 fSelector->Terminate();
1465 fOutput->Add(new TParameter<Long64_t>("PROOF_SelectorStatus", (Long64_t) fSelector->GetStatus()));
1467 if (gProofServ && !gProofServ->IsParallel()) {
1468 TIter nxc(gROOT->GetListOfCanvases());
1469 while (TObject *c = nxc())
1485 Long64_t TProofPlayer::Process(TDSet *dset, TSelector *selector,
1486 Option_t *option, Long64_t nentries,
1490 Error(
"Process",
"selector object undefiend!");
1494 SafeDelete(fSelector);
1495 fSelector = selector;
1496 fCreateSelObj = kFALSE;
1497 return Process(dset, (
const char *)0, option, nentries, first);
1503 Bool_t TProofPlayer::JoinProcess(TList *)
1513 Bool_t TProofPlayer::CheckMemUsage(Long64_t &mfreq, Bool_t &w80r,
1514 Bool_t &w80v, TString &wmsg)
1516 Long64_t processed = GetEventsProcessed() + fProcessedRun;
1517 if (mfreq > 0 && processed%mfreq == 0) {
1520 if (!gSystem->GetProcInfo(&pi)){
1523 Info(
"CheckMemUsage|Svc",
"Memory %ld virtual %ld resident event %lld",
1524 pi.fMemVirtual, pi.fMemResident, processed);
1526 fSelStatus->SetMemValues(pi.fMemVirtual, pi.fMemResident);
1528 if (TProofServ::GetVirtMemMax() > 0) {
1529 if (pi.fMemVirtual > TProofServ::GetMemStop() * TProofServ::GetVirtMemMax()) {
1530 wmsg.Form(
"using more than %d%% of allowed virtual memory (%ld kB)"
1531 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemVirtual);
1533 }
else if (pi.fMemVirtual > TProofServ::GetMemHWM() * TProofServ::GetVirtMemMax() && w80v) {
1536 wmsg.Form(
"using more than %d%% of allowed virtual memory (%ld kB)",
1537 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual);
1542 if (TProofServ::GetResMemMax() > 0) {
1543 if (pi.fMemResident > TProofServ::GetMemStop() * TProofServ::GetResMemMax()) {
1544 wmsg.Form(
"using more than %d%% of allowed resident memory (%ld kB)"
1545 " - STOP processing", (Int_t) (TProofServ::GetMemStop() * 100), pi.fMemResident);
1547 }
else if (pi.fMemResident > TProofServ::GetMemHWM() * TProofServ::GetResMemMax() && w80r) {
1550 if (wmsg.Length() > 0) {
1551 wmsg.Form(
"using more than %d%% of allowed both virtual and resident memory ({%ld,%ld} kB)",
1552 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemVirtual, pi.fMemResident);
1554 wmsg.Form(
"using more than %d%% of allowed resident memory (%ld kB)",
1555 (Int_t) (TProofServ::GetMemHWM() * 100), pi.fMemResident);
1562 if (fSaveMemThreshold > 0 && pi.fMemResident >= fSaveMemThreshold) fSavePartialResults = kTRUE;
1572 Long64_t TProofPlayer::Finalize(Bool_t, Bool_t)
1574 MayNotUse(
"Finalize");
1581 Long64_t TProofPlayer::Finalize(TQueryResult *)
1583 MayNotUse(
"Finalize");
1589 void TProofPlayer::MergeOutput(Bool_t)
1591 MayNotUse(
"MergeOutput");
1597 void TProofPlayer::MapOutputListToDataMembers()
const
1599 TOutputListSelectorDataMap* olsdm =
new TOutputListSelectorDataMap(fSelector);
1600 fOutput->Add(olsdm);
1606 void TProofPlayer::UpdateAutoBin(
const char *name,
1607 Double_t& xmin, Double_t& xmax,
1608 Double_t& ymin, Double_t& ymax,
1609 Double_t& zmin, Double_t& zmax)
1611 if ( fAutoBins == 0 ) {
1612 fAutoBins =
new THashList;
1615 TAutoBinVal *val = (TAutoBinVal*) fAutoBins->FindObject(name);
1619 if (gProofServ && !gProofServ->IsTopMaster()) {
1621 TProofLimitsFinder::AutoBinFunc(key,xmin,xmax,ymin,ymax,zmin,zmax);
1624 val =
new TAutoBinVal(name,xmin,xmax,ymin,ymax,zmin,zmax);
1625 fAutoBins->Add(val);
1627 val->GetAll(xmin,xmax,ymin,ymax,zmin,zmax);
1634 TDSetElement *TProofPlayer::GetNextPacket(TSlave *, TMessage *)
1636 MayNotUse(
"GetNextPacket");
1643 void TProofPlayer::SetupFeedback()
1645 MayNotUse(
"SetupFeedback");
1651 void TProofPlayer::StopFeedback()
1653 MayNotUse(
"StopFeedback");
1659 Long64_t TProofPlayer::DrawSelect(TDSet * ,
const char * ,
1660 const char * , Option_t * ,
1661 Long64_t , Long64_t )
1663 MayNotUse(
"DrawSelect");
1670 void TProofPlayer::HandleGetTreeHeader(TMessage *)
1672 MayNotUse(
"HandleGetTreeHeader|");
1678 void TProofPlayer::HandleRecvHisto(TMessage *mess)
1680 TObject *obj = mess->ReadObject(mess->GetClass());
1681 if (obj->InheritsFrom(TH1::Class())) {
1684 TH1 *horg = (TH1*)gDirectory->GetList()->FindObject(h->GetName());
1688 h->SetDirectory(gDirectory);
1697 Int_t TProofPlayer::DrawCanvas(TObject *obj)
1699 static Int_t (*gDrawCanvasHook)(TObject *) = 0;
1702 if (!gDrawCanvasHook) {
1704 TString drawlib =
"libProofDraw";
1706 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1708 if (gSystem->Load(drawlib) != -1) {
1711 if ((f = gSystem->DynFindSymbol(drawlib,
"DrawCanvas")))
1712 gDrawCanvasHook = (Int_t (*)(TObject *))(f);
1714 Warning(
"DrawCanvas",
"can't find DrawCanvas");
1716 Warning(
"DrawCanvas",
"can't load %s", drawlib.Data());
1718 Warning(
"DrawCanvas",
"can't locate %s", drawlib.Data());
1720 if (gDrawCanvasHook && obj)
1721 return (*gDrawCanvasHook)(obj);
1731 Int_t TProofPlayer::GetDrawArgs(
const char *var,
const char *sel, Option_t *opt,
1732 TString &selector, TString &objname)
1734 static Int_t (*gGetDrawArgsHook)(
const char *,
const char *, Option_t *,
1735 TString &, TString &) = 0;
1738 if (!gGetDrawArgsHook) {
1740 TString drawlib =
"libProofDraw";
1742 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1744 if (gSystem->Load(drawlib) != -1) {
1747 if ((f = gSystem->DynFindSymbol(drawlib,
"GetDrawArgs")))
1748 gGetDrawArgsHook = (Int_t (*)(
const char *,
const char *, Option_t *,
1749 TString &, TString &))(f);
1751 Warning(
"GetDrawArgs",
"can't find GetDrawArgs");
1753 Warning(
"GetDrawArgs",
"can't load %s", drawlib.Data());
1755 Warning(
"GetDrawArgs",
"can't locate %s", drawlib.Data());
1757 if (gGetDrawArgsHook)
1758 return (*gGetDrawArgsHook)(var, sel, opt, selector, objname);
1766 void TProofPlayer::FeedBackCanvas(
const char *name, Bool_t create)
1768 static void (*gFeedBackCanvasHook)(
const char *, Bool_t) = 0;
1771 if (!gFeedBackCanvasHook) {
1773 TString drawlib =
"libProofDraw";
1775 if ((p = gSystem->DynamicPathName(drawlib, kTRUE))) {
1777 if (gSystem->Load(drawlib) != -1) {
1780 if ((f = gSystem->DynFindSymbol(drawlib,
"FeedBackCanvas")))
1781 gFeedBackCanvasHook = (void (*)(
const char *, Bool_t))(f);
1783 Warning(
"FeedBackCanvas",
"can't find FeedBackCanvas");
1785 Warning(
"FeedBackCanvas",
"can't load %s", drawlib.Data());
1787 Warning(
"FeedBackCanvas",
"can't locate %s", drawlib.Data());
1789 if (gFeedBackCanvasHook) (*gFeedBackCanvasHook)(name, create);
1797 Long64_t TProofPlayer::GetCacheSize()
1799 if (fEvIter)
return fEvIter->GetCacheSize();
1806 Int_t TProofPlayer::GetLearnEntries()
1808 if (fEvIter)
return fEvIter->GetLearnEntries();
1815 void TProofPlayerRemote::SetMerging(Bool_t on)
1818 if (!fMergeSTW) fMergeSTW =
new TStopwatch();
1820 Info("SetMerging", "ON: mergers: %d", fProof->fMergersCount);
1821 if (fNumMergers <= 0 && fProof->fMergersCount > 0)
1822 fNumMergers = fProof->fMergersCount;
1823 } else if (fMergeSTW) {
1825 Float_t rt = fMergeSTW->RealTime();
1827 Info("SetMerging", "OFF: rt: %f, mergers: %d", rt, fNumMergers);
1829 if (!fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) {
1831 fQuery->SetMergeTime(rt);
1832 fQuery->SetNumMergers(fNumMergers);
1835 fQuery->SetRecvTime(rt);
1837 PDB(kGlobal,2) fQuery->Print("F");
1844 ClassImp(TProofPlayerLocal);
1853 Long64_t TProofPlayerLocal::Process(TSelector *selector,
1854 Long64_t nentries, Option_t *option)
1857 Error(
"Process",
"selector object undefiend!");
1861 TDSetProxy *set =
new TDSetProxy(
"",
"",
"");
1862 set->SetBit(TDSet::kEmpty);
1863 set->SetBit(TDSet::kIsLocal);
1864 Long64_t rc = Process(set, selector, option, nentries);
1879 Long64_t TProofPlayerLocal::Process(
const char *selector,
1880 Long64_t nentries, Option_t *option)
1882 TDSetProxy *set =
new TDSetProxy(
"",
"",
"");
1883 set->SetBit(TDSet::kEmpty);
1884 set->SetBit(TDSet::kIsLocal);
1885 Long64_t rc = Process(set, selector, option, nentries);
1895 ClassImp(TProofPlayerRemote);
1900 TProofPlayerRemote::~TProofPlayerRemote()
1902 SafeDelete(fOutput);
1903 SafeDelete(fOutputLists);
1906 SafeDelete(fFeedbackLists);
1907 SafeDelete(fPacketizer);
1909 SafeDelete(fProcessMessage);
1916 Int_t TProofPlayerRemote::InitPacketizer(TDSet *dset, Long64_t nentries,
1917 Long64_t first,
const char *defpackunit,
1918 const char *defpackdata)
1920 SafeDelete(fPacketizer);
1921 PDB(kGlobal,1) Info("Process","Enter");
1923 fExitStatus = kFinished;
1926 Int_t honebyone = 1;
1927 if (TProof::GetParameter(fInput, "PROOF_MergeTH1OneByOne", honebyone) != 0)
1928 honebyone = gEnv->GetValue("ProofPlayer.MergeTH1OneByOne", 1);
1929 fMergeTH1OneByOne = (honebyone == 1) ? kTRUE : kFALSE;
1931 Bool_t noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1934 TList *listOfMissingFiles = 0;
1936 TMethodCall callEnv;
1938 noData = dset->TestBit(TDSet::kEmpty) ? kTRUE : kFALSE;
1942 if (TProof::GetParameter(fInput,
"PROOF_Packetizer", packetizer) != 0)
1943 packetizer = defpackunit;
1945 Info(
"InitPacketizer",
"using alternate packetizer: %s", packetizer.Data());
1948 cl = TClass::GetClass(packetizer);
1950 Error(
"InitPacketizer",
"class '%s' not found", packetizer.Data());
1951 fExitStatus = kAborted;
1956 callEnv.InitWithPrototype(cl, cl->GetName(),
"TList*,Long64_t,TList*,TProofProgressStatus*");
1957 if (!callEnv.IsValid()) {
1958 Error(
"InitPacketizer",
1959 "cannot find correct constructor for '%s'", cl->GetName());
1960 fExitStatus = kAborted;
1963 callEnv.ResetParam();
1964 callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
1965 callEnv.SetParam((Long64_t) nentries);
1966 callEnv.SetParam((Long_t) fInput);
1967 callEnv.SetParam((Long_t) fProgressStatus);
1969 }
else if (dset->TestBit(TDSet::kMultiDSet)) {
1972 if (fProof->GetRunStatus() != TProof::kRunning) {
1974 Error(
"InitPacketizer",
"received stop/abort request");
1975 fExitStatus = kAborted;
1980 packetizer =
"TPacketizerMulti";
1983 cl = TClass::GetClass(packetizer);
1985 Error(
"InitPacketizer",
"class '%s' not found", packetizer.Data());
1986 fExitStatus = kAborted;
1991 callEnv.InitWithPrototype(cl, cl->GetName(),
"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
1992 if (!callEnv.IsValid()) {
1993 Error(
"InitPacketizer",
"cannot find correct constructor for '%s'", cl->GetName());
1994 fExitStatus = kAborted;
1997 callEnv.ResetParam();
1998 callEnv.SetParam((Long_t) dset);
1999 callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
2000 callEnv.SetParam((Long64_t) first);
2001 callEnv.SetParam((Long64_t) nentries);
2002 callEnv.SetParam((Long_t) fInput);
2003 callEnv.SetParam((Long_t) fProgressStatus);
2006 dset->SetBit(TDSet::kValidityChecked);
2007 dset->ResetBit(TDSet::kSomeInvalid);
2015 if ((listOfMissingFiles = (TList *)fInput->FindObject(
"MissingFiles"))) {
2017 fInput->Remove(listOfMissingFiles);
2019 listOfMissingFiles =
new TList;
2023 if (TProof::GetParameter(fInput,
"PROOF_LookupOpt", lkopt) != 0 || lkopt !=
"none")
2024 dset->Lookup(kTRUE, &listOfMissingFiles);
2026 if (fProof->GetRunStatus() != TProof::kRunning) {
2028 Error(
"InitPacketizer",
"received stop/abort request");
2029 fExitStatus = kAborted;
2033 if (!(dset->GetListOfElements()) ||
2034 !(dset->GetListOfElements()->GetSize())) {
2036 gProofServ->SendAsynMessage(
"InitPacketizer: No files from the data set were found - Aborting");
2037 Error(
"InitPacketizer",
"No files from the data set were found - Aborting");
2038 fExitStatus = kAborted;
2039 if (listOfMissingFiles) {
2040 listOfMissingFiles->SetOwner();
2041 fOutput->Remove(listOfMissingFiles);
2042 SafeDelete(listOfMissingFiles);
2047 if (TProof::GetParameter(fInput,
"PROOF_Packetizer", packetizer) != 0)
2049 packetizer = defpackdata;
2051 Info(
"InitPacketizer",
"using alternate packetizer: %s", packetizer.Data());
2054 cl = TClass::GetClass(packetizer);
2056 Error(
"InitPacketizer",
"class '%s' not found", packetizer.Data());
2057 fExitStatus = kAborted;
2062 callEnv.InitWithPrototype(cl, cl->GetName(),
"TDSet*,TList*,Long64_t,Long64_t,TList*,TProofProgressStatus*");
2063 if (!callEnv.IsValid()) {
2064 Error(
"InitPacketizer",
"cannot find correct constructor for '%s'", cl->GetName());
2065 fExitStatus = kAborted;
2068 callEnv.ResetParam();
2069 callEnv.SetParam((Long_t) dset);
2070 callEnv.SetParam((Long_t) fProof->GetListOfActiveSlaves());
2071 callEnv.SetParam((Long64_t) first);
2072 callEnv.SetParam((Long64_t) nentries);
2073 callEnv.SetParam((Long_t) fInput);
2074 callEnv.SetParam((Long_t) fProgressStatus);
2077 dset->SetBit(TDSet::kValidityChecked);
2078 dset->ResetBit(TDSet::kSomeInvalid);
2083 callEnv.Execute(ret);
2084 if ((fPacketizer = (TVirtualPacketizer *)ret) == 0) {
2085 Error(
"InitPacketizer",
"cannot construct '%s'", cl->GetName());
2086 fExitStatus = kAborted;
2090 if (!fPacketizer->IsValid()) {
2091 Error(
"InitPacketizer",
2092 "instantiated packetizer object '%s' is invalid", cl->GetName());
2093 fExitStatus = kAborted;
2094 SafeDelete(fPacketizer);
2099 if (!noData && dset->TestBit(TDSet::kMultiDSet)) {
2100 if ((listOfMissingFiles = (TList *) fInput->FindObject(
"MissingFiles"))) {
2102 fInput->Remove(listOfMissingFiles);
2108 TDSetElement *elem = 0;
2109 if (dset->TestBit(TDSet::kSomeInvalid)) {
2110 TIter nxe(dset->GetListOfElements());
2111 while ((elem = (TDSetElement *)nxe())) {
2112 if (!elem->GetValid()) {
2113 if (!listOfMissingFiles)
2114 listOfMissingFiles =
new TList;
2115 listOfMissingFiles->Add(elem->GetFileInfo(dset->GetType()));
2116 dset->Remove(elem, kFALSE);
2120 dset->ResetBit(TDSet::kSomeInvalid);
2124 if (listOfMissingFiles && listOfMissingFiles->GetSize() > 0) {
2125 TIter missingFiles(listOfMissingFiles);
2129 while ((fi = (TFileInfo *) missingFiles.Next())) {
2130 if (fi->GetCurrentUrl()) {
2131 msg = Form(
"File not found: %s - skipping!",
2132 fi->GetCurrentUrl()->GetUrl());
2134 msg = Form(
"File not found: %s - skipping!", fi->GetName());
2136 if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
2140 if (!GetOutput(
"MissingFiles")) {
2141 listOfMissingFiles->SetName(
"MissingFiles");
2142 AddOutputObject(listOfMissingFiles);
2144 TStatus *tmpStatus = (TStatus *)GetOutput(
"PROOF_Status");
2145 if (!tmpStatus) AddOutputObject((tmpStatus =
new TStatus()));
2148 Int_t ngood = dset->GetListOfElements()->GetSize();
2149 Int_t nbad = listOfMissingFiles->GetSize();
2150 Double_t xb = Double_t(nbad) / Double_t(ngood + nbad);
2151 msg = Form(
" About %.2f %c of the requested files (%d out of %d) were missing or unusable; details in"
2152 " the 'missingFiles' list", xb * 100.,
'%', nbad, nbad + ngood);
2153 tmpStatus->Add(msg.Data());
2155 " +++ About %.2f %c of the requested files (%d out of %d) are missing or unusable; details in"
2156 " the 'MissingFiles' list\n"
2157 " +++", xb * 100.,
'%', nbad, nbad + ngood);
2158 if (gProofServ) gProofServ->SendAsynMessage(msg.Data());
2161 SafeDelete(listOfMissingFiles);
2175 Long64_t TProofPlayerRemote::Process(TDSet *dset,
const char *selector_file,
2176 Option_t *option, Long64_t nentries,
2179 PDB(kGlobal,1) Info("Process", "Enter");
2182 fExitStatus = kFinished;
2184 if (!fProgressStatus) {
2185 Error(
"Process",
"No progress status");
2188 fProgressStatus->Reset();
2192 fOutput =
new THashList;
2196 SafeDelete(fFeedbackLists);
2198 if (fProof->IsMaster()){
2199 TPerfStats::Start(fInput, fOutput);
2201 TPerfStats::Setup(fInput);
2208 fSelectorFileName = selector_file;
2210 if (fCreateSelObj) {
2211 if(!SendSelector(selector_file))
return -1;
2212 fn = gSystem->BaseName(selector_file);
2217 TMessage mesg(kPROOF_PROCESS);
2220 Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
2222 TList *inputtmp = 0;
2224 if (fProof->IsMaster()) {
2226 PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
2227 set = new TDSetProxy( dset->GetType(), dset->GetObjName(),
2228 dset->GetDirectory() );
2229 if (dset->TestBit(TDSet::kEmpty))
2230 set->SetBit(TDSet::kEmpty);
2232 if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
2233 Error(
"Process",
"cannot init the packetizer");
2234 fExitStatus = kAborted;
2245 Long64_t memlogfreq = -1, mlf;
2246 if (gSystem->Getenv(
"PROOF_MEMLOGFREQ")) {
2247 TString clf(gSystem->Getenv(
"PROOF_MEMLOGFREQ"));
2248 if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
2250 if ((mrc = TProof::GetParameter(fProof->GetInputList(),
"PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
2251 if (memlogfreq == 0) {
2252 memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
2253 if (memlogfreq <= 0) memlogfreq = 1;
2255 if (mrc == 0) fProof->SetParameter(
"PROOF_MemLogFreq", memlogfreq);
2260 if (TProof::SendInputData(fQuery, fProof, emsg) != 0)
2261 Warning(
"Process",
"could not forward input data: %s", emsg.Data());
2264 if (fInput->FindObject(
"PROOF_StatsHist") != 0) {
2265 if (!(fProcPackets = (TH1I *) fOutput->FindObject(
"PROOF_ProcPcktHist"))) {
2266 Warning(
"Process",
"could not attach to histogram 'PROOF_ProcPcktHist'");
2269 Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
2270 " packets being processed");
2277 if (gEnv->Lookup(
"Proof.UseMergers") && !fInput->FindObject(
"PROOF_UseMergers")) {
2278 Int_t smg = gEnv->GetValue(
"Proof.UseMergers",-1);
2280 fInput->Add(
new TParameter<Int_t>(
"PROOF_UseMergers", smg));
2281 if (gEnv->Lookup(
"Proof.MergersByHost")) {
2282 Int_t mbh = gEnv->GetValue(
"Proof.MergersByHost",0);
2286 if ((o = fInput->FindObject(
"PROOF_MergersByHost"))) { fInput->Remove(o);
delete o; }
2287 fInput->Add(
new TParameter<Int_t>(
"PROOF_MergersByHost", mbh));
2296 fOutputLists->Delete();
2297 delete fOutputLists;
2302 gSystem->RedirectOutput(fProof->fLogFileName);
2304 Info(
"Process",
"starting new query");
2308 if (fCreateSelObj) {
2309 SafeDelete(fSelector);
2310 if (!(fSelector = TSelector::GetSelector(selector_file))) {
2312 gSystem->RedirectOutput(0);
2318 fSelectorClass = fSelector->IsA();
2321 if (!fCreateSelObj) {
2324 if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
2325 TIter nxi(fSelector->GetInputList());
2327 while ((o = nxi())) {
2328 if (!fInput->FindObject(o)) {
2331 inputtmp =
new TList;
2332 inputtmp->SetOwner(kFALSE);
2338 fInput->Add(fSelector);
2341 fSelector->SetInputList(fInput);
2342 fSelector->SetOption(option);
2343 if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
2345 PDB(kLoop,1) Info("Process","Call Begin(0)");
2346 fSelector->Begin(0);
2350 if (!fCreateSelObj) fSelector->SetInputList(0);
2353 fProof->SendInputDataFile();
2356 gSystem->RedirectOutput(0);
2359 TCleanup clean(this);
2362 TString opt = option;
2365 if (fProof->fProtocol < 13)
2366 dset->SetWriteV3(kTRUE);
2369 Long64_t num = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : nentries;
2370 Long64_t fst = (gProofServ && gProofServ->IsMaster() && gProofServ->IsParallel()) ? -1 : first;
2373 TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
2375 TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
2377 if (fProof->fProtocol > 14) {
2378 if (fProcessMessage)
delete fProcessMessage;
2379 fProcessMessage =
new TMessage(kPROOF_PROCESS);
2380 mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
2381 (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
2383 mesg << set << fn << fInput << opt << num << fst << evl << sync;
2386 Warning(
"Process",
"entry lists not supported by the server");
2390 fProof->ResetMergePrg();
2392 Int_t nb = fProof->Broadcast(mesg);
2393 PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
2394 if (fProof->IsLite()) fProof->fNotIdle += nb;
2397 if (fProof->fProtocol < 13)
2398 dset->SetWriteV3(kFALSE);
2402 fProof->fRedirLog = kTRUE;
2406 Info(
"Process|Svc",
"Start merging Memory information");
2413 PDB(kGlobal,1) Info("Process","Asynchronous processing:"
2414 " activating CollectInputFrom");
2420 return fProof->fSeqNum;
2423 PDB(kGlobal,1) Info("Process","Calling Collect");
2432 fPacketizer->StopProcess(kFALSE, kTRUE);
2434 fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
2438 fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2439 fPacketizer->GetInitTime(),
2440 elapsed.RealTime());
2444 return Finalize(kFALSE,sync);
2448 PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
2450 if (!(fProof->IsSync())) {
2452 Info(
"Process",
"switching to the asynchronous mode ...");
2453 return fProof->fSeqNum;
2459 fProof->fRedirLog = kFALSE;
2469 fPacketizer->StopProcess(kFALSE, kTRUE);
2471 fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
2474 fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
2475 fPacketizer->GetInitTime(),
2476 fPacketizer->GetProcTime());
2480 if (!fCreateSelObj) fSelector->SetInputList(fInput);
2485 if (!IsClient() || GetExitStatus() != TProofPlayer::kAborted)
2486 rc = Finalize(kFALSE,sync);
2490 TIter nxi(inputtmp);
2492 while ((o = nxi())) fInput->Remove(o);
2493 SafeDelete(inputtmp);
2507 Long64_t TProofPlayerRemote::Process(TDSet *dset, TSelector *selector,
2508 Option_t *option, Long64_t nentries,
2512 Error(
"Process",
"selector object undefined");
2517 if (IsClient() && (selector != fSelector)) {
2518 SafeDelete(fSelector);
2519 fSelector = selector;
2522 fCreateSelObj = kFALSE;
2523 Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
2524 fCreateSelObj = kTRUE;
2534 Bool_t TProofPlayerRemote::JoinProcess(TList *workers)
2536 if (!fProcessMessage || !fProof || !fPacketizer) {
2537 Error(
"Process",
"Should not happen: fProcessMessage=%p fProof=%p fPacketizer=%p",
2538 fProcessMessage, fProof, fPacketizer);
2542 if (!workers || !fProof->IsMaster()) {
2543 Error(
"Process",
"Invalid call");
2548 Info("Process", "Preparing %d new worker(s) to process", workers->GetEntries());
2551 if (fCreateSelObj) {
2553 Info("Process", "Sending selector file %s", fSelectorFileName.Data());
2554 if(!SendSelector(fSelectorFileName.Data())) {
2555 Error(
"Process",
"Problems in sending selector file %s", fSelectorFileName.Data());
2560 if (fProof->IsLite()) fProof->fNotIdle += workers->GetSize();
2563 Info("Process", "Adding new workers to the packetizer");
2564 if (fPacketizer->AddWorkers(workers) == -1) {
2565 Error(
"Process",
"Cannot add new workers to the packetizer!");
2570 Info("Process", "Broadcasting process message to new workers");
2571 fProof->Broadcast(*fProcessMessage, workers);
2583 Bool_t TProofPlayerRemote::MergeOutputFiles()
2585 PDB(kOutput,1) Info("MergeOutputFiles", "enter: fOutput size: %d", fOutput->GetSize());
2586 PDB(kOutput,2) fOutput->ls();
2592 TProofOutputFile *pf = 0;
2593 while ((o = nxo())) {
2594 if ((pf = dynamic_cast<TProofOutputFile*>(o))) {
2596 PDB(kOutput,2) pf->Print();
2598 if (pf->IsMerge()) {
2601 Bool_t localMerge = (pf->GetTypeOpt() == TProofOutputFile::kLocal) ? kTRUE : kFALSE;
2602 TFileMerger *filemerger = pf->GetFileMerger(localMerge);
2604 Error(
"MergeOutputFiles",
"file merger is null in TProofOutputFile! Protocol error?");
2609 if (!pf->IsMerged()) {
2610 PDB(kOutput,2) pf->Print();
2611 TString fileLoc = TString::Format("%s/%s", pf->GetDir(), pf->GetFileName());
2612 filemerger->AddFile(fileLoc);
2615 TString ddir, ddopts;
2617 ddir.Form(
"%s/", gProofServ->GetDataDir());
2618 if (gProofServ->GetDataDirOpts()) ddopts= gProofServ->GetDataDirOpts();
2621 TString outfile(pf->GetOutputFileName());
2622 if (outfile.Contains(
"<datadir>/")) {
2623 outfile.ReplaceAll(
"<datadir>/", ddir.Data());
2624 if (!ddopts.IsNull())
2625 outfile += TString::Format(
"?%s", ddopts.Data());
2626 pf->SetOutputFileName(outfile);
2628 if ((gProofServ && gProofServ->IsTopMaster()) || (fProof && fProof->IsLite())) {
2629 TFile::EFileType ftyp = TFile::kLocal;
2631 TProofServ::GetLocalServer(srv);
2633 Bool_t localFile = kFALSE;
2634 if (pf->IsRetrieve()) {
2637 if (outfile.BeginsWith(
"client:")) outfile.Replace(0, 7,
"");
2638 TString bn = gSystem->BaseName(TUrl(outfile.Data(), kTRUE).GetFile());
2640 outfile.Form(
"%s%s", ddir.Data(), bn.Data());
2642 if (strlen(pf->GetTitle()) <= 0) pf->SetTitle(bn);
2647 if (outfile.BeginsWith(
"master:")) outfile.Replace(0, 7,
"");
2649 TUrl uof(outfile.Data(), kTRUE);
2651 ftyp = TFile::GetType(uof.GetUrl(),
"RECREATE", &lfn);
2652 if (ftyp == TFile::kLocal && !srv.IsNull()) {
2654 if (uof.GetPort() > 0 && usrv.GetPort() > 0 &&
2655 usrv.GetPort() != uof.GetPort()) ftyp = TFile::kNet;
2658 if (ftyp == TFile::kLocal) outfile = lfn;
2660 if (ftyp == TFile::kLocal || ftyp == TFile::kFile) localFile = kTRUE;
2663 TString outfilerem(outfile);
2667 TProofServ::FilterLocalroot(outfilerem, srv);
2668 outfilerem.Insert(0, srv);
2671 pf->SetOutputFileName(outfilerem);
2673 pf->SetFileName(gSystem->BaseName(outfilerem));
2675 if (!filemerger->OutputFile(outfile)) {
2676 Error(
"MergeOutputFiles",
"cannot open the output file");
2680 PDB(kSubmerger,2) filemerger->PrintFiles("");
2681 if (!filemerger->Merge()) {
2682 Error(
"MergeOutputFiles",
"cannot merge the output files");
2686 TList *fileList = filemerger->GetMergeList();
2688 TIter next(fileList);
2689 TObjString *url = 0;
2690 while((url = (TObjString*)next())) {
2691 TUrl u(url->GetName());
2692 if (!strcmp(u.GetProtocol(),
"file")) {
2693 gSystem->Unlink(u.GetFile());
2695 gSystem->Unlink(url->GetName());
2700 filemerger->Reset();
2706 if (!pf->IsMerged()) {
2708 dumlist.Add(
new TNamed(
"dum",
"dum"));
2709 dumlist.SetOwner(kTRUE);
2710 pf->Merge(&dumlist);
2713 TFileCollection *fc = pf->GetFileCollection();
2715 Error(
"MergeOutputFiles",
"file collection is null in TProofOutputFile! Protocol error?");
2723 pf->ResetFileCollection();
2725 if (pf->IsRegister()) {
2727 if ((pf->GetTypeOpt() & TProofOutputFile::kOverwrite)) opt +=
"O";
2728 if ((pf->GetTypeOpt() & TProofOutputFile::kVerify)) opt +=
"V";
2729 if (!fOutput->FindObject(
"PROOFSERV_RegisterDataSet"))
2730 fOutput->Add(
new TNamed(
"PROOFSERV_RegisterDataSet",
""));
2731 TString tag = TString::Format(
"DATASET_%s", pf->GetTitle());
2732 fOutput->Add(
new TNamed(tag, opt));
2735 fOutput->Remove(pf);
2736 if (!rmList) rmList =
new TList;
2738 PDB(kOutput,2) fOutput->Print();
2745 if (rmList && rmList->GetSize() > 0) {
2748 while((o = nxo())) {
2751 rmList->SetOwner(kTRUE);
2755 PDB(kOutput,1) Info("MergeOutputFiles", "done!");
2767 void TProofPlayerRemote::SetSelectorDataMembersFromOutputList()
2769 TOutputListSelectorDataMap* olsdm
2770 = TOutputListSelectorDataMap::FindInList(fOutput);
2772 PDB(kOutput,1) Warning("SetSelectorDataMembersFromOutputList",
2773 "failed to find map
object in output list!");
2777 olsdm->SetDataMembers(fSelector);
2782 Long64_t TProofPlayerRemote::Finalize(Bool_t force, Bool_t sync)
2788 if (fOutputLists == 0) {
2791 return fProof->Finalize(Form(
"%s:%s", fQuery->GetTitle(),
2792 fQuery->GetName()), force);
2795 PDB(kGlobal,1) Info("Finalize","Calling Merge Output to finalize the output list");
2801 if (fProof->IsMaster()) {
2804 TStatus *status = (TStatus *) fOutput->FindObject(
"PROOF_Status");
2807 status =
new TStatus();
2808 fOutput->Add(status);
2809 TString emsg = TString::Format(
"Query aborted after %lld entries", GetEventsProcessed());
2812 status->SetExitStatus((Int_t) GetExitStatus());
2814 PDB(kOutput,1) Info("Finalize","Calling Merge Output");
2819 fOutput->SetOwner();
2823 TObject *pperf = (TObject *) fPacketizer->GetProgressPerf(kTRUE);
2824 if (pperf) fOutput->Add(pperf);
2825 TList *parms = fPacketizer->GetConfigParams(kTRUE);
2829 while ((o = nxo())) fOutput->Add(o);
2834 TDSetElement *elem = 0;
2835 if (fPacketizer->GetFailedPackets()) {
2836 TString type = (fPacketizer->TestBit(TVirtualPacketizer::kIsTree)) ?
"TTree" :
"";
2837 TList *listOfMissingFiles = (TList *) fOutput->FindObject(
"MissingFiles");
2838 if (!listOfMissingFiles) {
2839 listOfMissingFiles =
new TList;
2840 listOfMissingFiles->SetName(
"MissingFiles");
2842 TIter nxe(fPacketizer->GetFailedPackets());
2843 while ((elem = (TDSetElement *)nxe()))
2844 listOfMissingFiles->Add(elem->GetFileInfo(type));
2845 if (!fOutput->FindObject(listOfMissingFiles)) fOutput->Add(listOfMissingFiles);
2851 Long_t vmaxmst, rmaxmst;
2852 TPerfStats::GetMemValues(vmaxmst, rmaxmst);
2853 status->SetMemValues(vmaxmst, rmaxmst, kTRUE);
2855 SafeDelete(fSelector);
2858 if (fExitStatus != kAborted) {
2865 if (ReinitSelector(fQuery) == -1) {
2866 Info(
"Finalize",
"problems reinitializing selector \"%s\"",
2867 fQuery->GetSelecImp()->GetName());
2873 if (TList *failedPackets = fPacketizer->GetFailedPackets()) {
2874 fPacketizer->SetFailedPackets(0);
2875 failedPackets->SetName(
"FailedPackets");
2876 AddOutputObject(failedPackets);
2878 TStatus *status = (TStatus *)GetOutput(
"PROOF_Status");
2879 if (!status) AddOutputObject((status =
new TStatus()));
2880 status->Add(
"Some packets were not processed! Check the the"
2881 " 'FailedPackets' list in the output list");
2885 fSelector->SetInputList(fInput);
2887 TList *output = fSelector->GetOutputList();
2889 TIter next(fOutput);
2890 while(TObject* obj = next()) {
2891 if (fProof->IsParallel() || DrawCanvas(obj) == 1)
2897 Warning(
"Finalize",
"undefined output list in the selector! Protocol error?");
2903 fOutput->SetOwner(kFALSE);
2904 fOutput->Clear(
"nodelete");
2907 SetSelectorDataMembersFromOutputList();
2909 PDB(kLoop,1) Info("Finalize","Call Terminate()");
2913 fProof->fQuerySTW.Reset();
2915 fSelector->Terminate();
2917 rv = fSelector->GetStatus();
2921 while(TObject* o = it()) {
2927 fQuery->SetOutputList(fOutput);
2929 fQuery->SetFinalized();
2931 Warning(
"Finalize",
"current TQueryResult object is undefined!");
2934 if (!fCreateSelObj) {
2935 fInput->Remove(fSelector);
2936 fOutput->Remove(fSelector);
2937 if (output) output->Remove(fSelector);
2944 if (output) { output->SetOwner(kFALSE); output->Clear(
"nodelete"); }
2945 SafeDelete(fSelector);
2949 fOutput->SetOwner(kFALSE);
2950 fOutput->Clear(
"nodelete");
2951 SafeDelete(fOutput);
2956 fOutput->SetOwner();
2957 SafeDelete(fSelector);
2958 if (!fCreateSelObj) fSelector = 0;
2961 PDB(kGlobal,1) Info("Process","exit");
2964 Info(
"Finalize",
"finalization on %s finished", gProofServ->GetPrefix());
2966 fProof->FinalizationDone();
2974 Long64_t TProofPlayerRemote::Finalize(TQueryResult *qr)
2976 PDB(kGlobal,1) Info("Finalize(TQueryResult *)","Enter");
2979 Info(
"Finalize(TQueryResult *)",
2980 "method to be executed only on the clients");
2985 Info(
"Finalize(TQueryResult *)",
"query undefined");
2989 if (qr->IsFinalized()) {
2990 Info(
"Finalize(TQueryResult *)",
"query already finalized");
2996 fOutput =
new THashList;
3002 fOutputLists->Delete();
3003 delete fOutputLists;
3008 gSystem->RedirectOutput(fProof->fLogFileName);
3011 TList *tmp = (TList *) qr->GetOutputList();
3013 gSystem->RedirectOutput(0);
3014 Info(
"Finalize(TQueryResult *)",
"outputlist is empty");
3017 TList *out = fOutput;
3018 if (fProof->fProtocol < 11)
3023 out->Add(o->Clone());
3026 if (fProof->fProtocol < 11) {
3030 gSystem->RedirectOutput(0);
3032 SetSelectorDataMembersFromOutputList();
3035 SetCurrentQuery(qr);
3036 Long64_t rc = Finalize();
3037 RestorePreviousQuery();
3045 Bool_t TProofPlayerRemote::SendSelector(
const char* selector_file)
3048 if (!selector_file) {
3049 Info(
"SendSelector",
"Invalid input: selector (file) name undefined");
3053 if (!strchr(gSystem->BaseName(selector_file),
'.')) {
3055 Info(
"SendSelector",
"selector name '%s' does not contain a '.':"
3056 " nothing to send, it will be loaded from a library", selector_file);
3061 TString selec = selector_file;
3065 selec = gSystem->SplitAclicMode(selec, aclicMode, arguments, io);
3068 gSystem->ExpandPathName(selec);
3071 TString mp(TROOT::GetMacroPath());
3072 TString np(gSystem->DirName(selec));
3075 if (!mp.BeginsWith(np) && !mp.Contains(
":"+np)) {
3076 Int_t ip = (mp.BeginsWith(
".:")) ? 2 : 0;
3078 TROOT::SetMacroPath(mp);
3080 Info(
"SendSelector",
"macro path set to '%s'", TROOT::GetMacroPath());
3085 TString header = selec;
3086 header.Remove(header.Last(
'.'));
3088 if (gSystem->AccessPathName(header, kReadPermission)) {
3090 header.Remove(header.Last(
'.'));
3092 if (gSystem->AccessPathName(header, kReadPermission)) {
3093 Info(
"SendSelector",
3094 "header file not found: tried: %s %s", h.Data(), header.Data());
3100 if (fProof->SendFile(selec, (TProof::kBinary | TProof::kForward | TProof::kCp | TProof::kCpBin)) == -1) {
3101 Info(
"SendSelector",
"problems sending implementation file %s", selec.Data());
3104 if (fProof->SendFile(header, (TProof::kBinary | TProof::kForward | TProof::kCp)) == -1) {
3105 Info(
"SendSelector",
"problems sending header file %s", header.Data());
3115 void TProofPlayerRemote::MergeOutput(Bool_t saveMemValues)
3117 PDB(kOutput,1) Info("MergeOutput","Enter");
3122 TIter next(fOutputLists);
3125 while ( (list = (TList *) next()) ) {
3127 if (!(obj = fOutput->FindObject(list->GetName()))) {
3128 obj = list->First();
3133 if ( list->IsEmpty() )
continue;
3135 TMethodCall callEnv;
3137 callEnv.InitWithPrototype(obj->IsA(),
"Merge",
"TCollection*");
3138 if (callEnv.IsValid()) {
3139 callEnv.SetParam((Long_t) list);
3140 callEnv.Execute(obj);
3143 while ( (obj = list->First()) ) {
3149 SafeDelete(fOutputLists);
3153 PDB(kOutput,1) Info("MergeOutput","fOutputLists empty");
3156 if (!IsClient() || fProof->IsLite()) {
3170 while ((obj = nxo())) {
3171 TProofOutputFile *pf =
dynamic_cast<TProofOutputFile *
>(obj);
3174 PDB(kOutput,2) Info("MergeOutput","found TProofOutputFile '%s'", obj->GetName());
3175 TString dir(pf->GetOutputFileName());
3176 PDB(kOutput,2) Info("MergeOutput","outputfilename: '%s'", dir.Data());
3178 if (dir.Last('/') != kNPOS) dir.Remove(dir.Last('/')+1);
3179 PDB(kOutput,2) Info("MergeOutput","dir: '%s'", dir.Data());
3184 TString pfx = gEnv->GetValue("Path.Localroot","");
3185 if (!pfx.IsNull() &&
3186 (!strcmp(u.GetProtocol(), "root") || !strcmp(u.GetProtocol(), "xrd")))
3188 PDB(kOutput,2) Info("MergeOutput","rawdir: '%s'", dir.Data());
3189 pf->SetDir(dir, kTRUE);
3191 pf->SetWorkerOrdinal(gProofServ ? gProofServ->GetOrdinal() : "0");
3193 key.Form("PROOF_OutputFileName_%s", pf->GetFileName());
3194 if ((nm = (TNamed *) fOutput->FindObject(key.Data()))) {
3195 pf->SetOutputFileName(nm->GetTitle());
3197 }
else if (TestBit(TVirtualProofPlayer::kIsSubmerger)) {
3198 pf->SetOutputFileName(0);
3199 pf->ResetBit(TProofOutputFile::kOutputFileNameSet);
3202 dir = pf->GetFileName();
3203 if (TestBit(TVirtualProofPlayer::kIsSubmerger)) {
3205 pf->SetMerged(kFALSE);
3207 if (dir.EndsWith(
".merger")) dir.Remove(dir.Last(
'.'));
3209 pf->SetFileName(dir);
3210 }
else if (fProof->IsLite()) {
3212 pf->SetWorkerOrdinal(
"0");
3214 pf->SetDir(gSystem->DirName(pf->GetOutputFileName()));
3216 TUrl u(pf->GetOutputFileName(), kTRUE);
3217 pf->SetFileName(gSystem->BaseName(u.GetFile()));
3218 pf->SetDir(gSystem->DirName(u.GetFile()), kTRUE);
3220 Printf(
"\nOutput file: %s", pf->GetOutputFileName());
3223 PDB(kOutput,2) Info("MergeOutput","output
object '%s' is not a TProofOutputFile", obj->GetName());
3228 if (rmlist.GetSize() > 0) {
3229 TIter nxrm(&rmlist);
3230 while ((obj = nxrm()))
3231 fOutput->Remove(obj);
3232 rmlist.SetOwner(kTRUE);
3237 if (saveMemValues) {
3240 Long_t vmaxmst, rmaxmst;
3241 TPerfStats::GetMemValues(vmaxmst, rmaxmst);
3242 TStatus *status = (TStatus *) fOutput->FindObject(
"PROOF_Status");
3243 if (status) status->SetMemValues(vmaxmst, rmaxmst, kFALSE);
3246 PDB(kOutput,1) fOutput->Print();
3247 PDB(kOutput,1) Info("MergeOutput","leave (%d
object(s))", fOutput->GetSize());
3253 void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed)
3256 fProof->Progress(total, processed);
3259 TMessage m(kPROOF_PROGRESS);
3260 m << total << processed;
3261 gProofServ->GetSocket()->Send(m);
3268 void TProofPlayerRemote::Progress(Long64_t total, Long64_t processed,
3270 Float_t initTime, Float_t procTime,
3271 Float_t evtrti, Float_t mbrti)
3274 Info("Progress","%lld %lld %lld %f %f %f %f", total, processed, bytesread,
3275 initTime, procTime, evtrti, mbrti);
3278 fProof->Progress(total, processed, bytesread, initTime, procTime, evtrti, mbrti);
3281 TMessage m(kPROOF_PROGRESS);
3282 m << total << processed << bytesread << initTime << procTime << evtrti << mbrti;
3283 gProofServ->GetSocket()->Send(m);
3290 void TProofPlayerRemote::Progress(TProofProgressInfo *pi)
3294 Info("Progress","%lld %lld %lld %f %f %f %f %d %f", pi->fTotal, pi->fProcessed, pi->fBytesRead,
3295 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
3296 pi->fActWorkers, pi->fEffSessions);
3299 fProof->Progress(pi->fTotal, pi->fProcessed, pi->fBytesRead,
3300 pi->fInitTime, pi->fProcTime,
3301 pi->fEvtRateI, pi->fMBRateI,
3302 pi->fActWorkers, pi->fTotSessions, pi->fEffSessions);
3305 TMessage m(kPROOF_PROGRESS);
3307 gProofServ->GetSocket()->Send(m);
3310 Warning(
"Progress",
"TProofProgressInfo object undefined!");
3318 void TProofPlayerRemote::Feedback(TList *objs)
3320 fProof->Feedback(objs);
3326 void TProofPlayerRemote::StopProcess(Bool_t abort, Int_t)
3328 if (fPacketizer != 0)
3329 fPacketizer->StopProcess(abort, kFALSE);
3331 fExitStatus = kAborted;
3333 fExitStatus = kStopped;
3345 Int_t TProofPlayerRemote::AddOutputObject(TObject *obj)
3348 Info("AddOutputObject","Enter: %p (%s)", obj, obj ? obj->ClassName() : "undef");
3352 PDB(kOutput,1) Info("AddOutputObject","Invalid input (obj == 0x0)");
3358 fOutput = new THashList;
3361 Bool_t merged = kTRUE;
3364 TList *elists = dynamic_cast<TList *> (obj);
3365 if (elists && !strcmp(elists->GetName(), "PROOF_EventListsList")) {
3369 TEventList *evlist =
new TEventList(
"PROOF_EventList");
3372 TIter nxevl(elists);
3373 TEventList *evl = 0;
3374 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3378 TIter nxelem(fDSet->GetListOfElements());
3379 TDSetElement *elem = 0;
3380 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3381 if (!strcmp(elem->GetFileName(), evl->GetName()))
3385 Error(
"AddOutputObject",
"Found an event list for %s, but no object with"
3386 " the same name in the TDSet", evl->GetName());
3389 Long64_t offset = elem->GetTDSetOffset();
3392 Long64_t *arr = evl->GetList();
3393 Int_t num = evl->GetN();
3394 if (arr && offset > 0)
3395 for (Int_t i = 0; i < num; i++)
3403 SetLastMergingMsg(evlist);
3404 Incorporate(evlist, fOutput, merged);
3405 NotifyMemory(evlist);
3417 TProofOutputFile *pf =
dynamic_cast<TProofOutputFile*
>(obj);
3419 fMergeFiles = kTRUE;
3420 if (!IsClient() || fProof->IsLite()) {
3421 if (pf->IsMerge()) {
3422 Bool_t hasfout = (pf->GetOutputFileName() &&
3423 strlen(pf->GetOutputFileName()) > 0 &&
3424 pf->TestBit(TProofOutputFile::kOutputFileNameSet)) ? kTRUE : kFALSE;
3425 Bool_t setfout = (!hasfout || TestBit(TVirtualProofPlayer::kIsSubmerger)) ? kTRUE : kFALSE;
3428 TString ddir, ddopts;
3430 ddir.Form(
"%s/", gProofServ->GetDataDir());
3431 if (gProofServ->GetDataDirOpts()) ddopts = gProofServ->GetDataDirOpts();
3434 TString outfile(pf->GetOutputFileName());
3435 outfile.ReplaceAll(
"<datadir>/", ddir.Data());
3436 if (!ddopts.IsNull()) outfile += TString::Format(
"?%s", ddopts.Data());
3437 pf->SetOutputFileName(outfile);
3441 if (TestBit(TVirtualProofPlayer::kIsSubmerger) && hasfout) {
3442 TString key = TString::Format(
"PROOF_OutputFileName_%s", pf->GetFileName());
3443 if (!fOutput->FindObject(key.Data()))
3444 fOutput->Add(
new TNamed(key.Data(), pf->GetOutputFileName()));
3447 TProofServ::GetLocalServer(of);
3450 of.Form(
"root://%s/", gSystem->HostName());
3451 if (gSystem->Getenv(
"XRDPORT")) {
3452 TString sp(gSystem->Getenv(
"XRDPORT"));
3454 of.Form(
"root://%s:%s/", gSystem->HostName(), sp.Data());
3457 TString sessionPath(gProofServ->GetSessionDir());
3458 TProofServ::FilterLocalroot(sessionPath, of);
3459 of += TString::Format(
"%s/%s", sessionPath.Data(), pf->GetFileName());
3460 if (TestBit(TVirtualProofPlayer::kIsSubmerger)) {
3461 if (!of.EndsWith(
".merger")) of +=
".merger";
3463 if (of.EndsWith(
".merger")) of.Remove(of.Last(
'.'));
3465 pf->SetOutputFileName(of);
3469 PDB(kOutput, 1) pf->Print();
3473 Printf(
"Output file: %s", pf->GetOutputFileName());
3478 SetLastMergingMsg(obj);
3479 Incorporate(obj, fOutput, merged);
3483 return (merged ? 1 : 0);
3489 void TProofPlayerRemote::RedirectOutput(Bool_t on)
3491 if (on && fProof && fProof->fLogFileW) {
3492 TProofServ::SetErrorHandlerFile(fProof->fLogFileW);
3493 fErrorHandler = SetErrorHandler(TProofServ::ErrorHandler);
3495 if (fErrorHandler) {
3496 TProofServ::SetErrorHandlerFile(0);
3497 SetErrorHandler(fErrorHandler);
3508 void TProofPlayerRemote::AddOutput(TList *out)
3510 PDB(kOutput,1) Info("AddOutput","Enter");
3514 PDB(kOutput,1) Info("AddOutput","Invalid input (out == 0x0)");
3520 fOutput = new THashList;
3523 Bool_t merged = kTRUE;
3524 TList *elists = dynamic_cast<TList *> (out->FindObject("PROOF_EventListsList"));
3529 TEventList *evlist =
new TEventList(
"PROOF_EventList");
3532 TIter nxevl(elists);
3533 TEventList *evl = 0;
3534 while ((evl = dynamic_cast<TEventList *> (nxevl()))) {
3538 TIter nxelem(fDSet->GetListOfElements());
3539 TDSetElement *elem = 0;
3540 while ((elem = dynamic_cast<TDSetElement *> (nxelem()))) {
3541 if (!strcmp(elem->GetFileName(), evl->GetName()))
3545 Error(
"AddOutput",
"Found an event list for %s, but no object with"
3546 " the same name in the TDSet", evl->GetName());
3549 Long64_t offset = elem->GetTDSetOffset();
3552 Long64_t *arr = evl->GetList();
3553 Int_t num = evl->GetN();
3554 if (arr && offset > 0)
3555 for (Int_t i = 0; i < num; i++)
3564 out->Remove(elists);
3568 SetLastMergingMsg(evlist);
3569 Incorporate(evlist, fOutput, merged);
3570 NotifyMemory(evlist);
3576 while ((obj = nxo())) {
3577 SetLastMergingMsg(obj);
3578 Incorporate(obj, fOutput, merged);
3594 void TProofPlayerRemote::NotifyMemory(TObject *obj)
3596 if (fProof && (!IsClient() || fProof->IsLite())){
3598 if (!gSystem->GetProcInfo(&pi)){
3601 RedirectOutput(fProof->IsLite());
3602 Info(
"NotifyMemory|Svc",
"Memory %ld virtual %ld resident after merging object %s",
3603 pi.fMemVirtual, pi.fMemResident, obj->GetName());
3607 TPerfStats::SetMemValues();
3614 void TProofPlayerRemote::SetLastMergingMsg(TObject *obj)
3616 TString lastMsg = TString::Format(
"while merging object '%s'", obj->GetName());
3617 TProofServ::SetLastMsg(lastMsg);
3630 Int_t TProofPlayerRemote::Incorporate(TObject *newobj, TList *outlist, Bool_t &merged)
3635 Info("Incorporate", "enter: obj: %p (%s), list: %p",
3636 newobj, newobj ? newobj->ClassName() : "undef", outlist);
3639 if (!newobj || !outlist) {
3640 Error(
"Incorporate",
"Invalid inputs: obj: %p, list: %p", newobj, outlist);
3646 (!fProof || !fProof->TestBit(TProof::kIsClient) || fProof->IsLite()) ? kTRUE : kFALSE;
3647 if (specialH && newobj->InheritsFrom(TH1::Class())) {
3648 if (!HandleHistogram(newobj, merged)) {
3650 PDB(kOutput,1) Info("Incorporate", "histogram
object '%s' merged", newobj->GetName());
3652 PDB(kOutput,1) Info("Incorporate", "histogram
object '%s' added to the"
3653 " appropriate list for delayed merging", newobj->GetName());
3660 TObject *obj = outlist->FindObject(newobj->GetName());
3664 outlist->Add(newobj);
3671 TMethodCall callEnv;
3673 callEnv.InitWithPrototype(obj->IsA(),
"Merge",
"TCollection*");
3674 if (callEnv.IsValid()) {
3676 static TList *xlist =
new TList;
3679 callEnv.SetParam((Long_t) xlist);
3680 callEnv.Execute(obj);
3685 outlist->Add(newobj);
3696 TObject *TProofPlayerRemote::HandleHistogram(TObject *obj, Bool_t &merged)
3698 TH1 *h =
dynamic_cast<TH1 *
>(obj);
3709 Bool_t tobebinned = (h->GetBuffer()) ? kTRUE : kFALSE;
3712 Int_t nent = h->GetBufferLength();
3713 PDB(kOutput,2) Info("HandleHistogram", "h:%s ent:%d, buffer size: %d",
3714 h->GetName(), nent, h->GetBufferSize());
3718 if (!fOutputLists) {
3719 PDB(kOutput,2) Info("HandleHistogram", "create fOutputLists");
3720 fOutputLists = new TList;
3721 fOutputLists->SetOwner();
3723 list = (TList *) fOutputLists->FindObject(h->GetName());
3734 list->SetName(h->GetName());
3736 fOutputLists->Add(list);
3738 if (fOutput && (href = (TH1 *) fOutput->FindObject(h->GetName()))) {
3739 fOutput->Remove(href);
3744 while ((href = (TH1 *) nxh())) {
3745 if (href->GetBuffer() && href->GetBufferLength() < nent)
break;
3748 list->AddBefore(href, h);
3753 return (TObject *)0;
3759 while ((href = (TH1 *) nxh())) {
3760 if (href->GetBuffer() || href->GetEntries() < nent)
break;
3763 list->AddBefore(href, h);
3768 return (TObject *)0;
3773 TH1 *hout = (TH1*) fOutput->FindObject(h->GetName());
3776 fOutput->Remove(hout);
3780 Int_t hsz = h->GetNbinsX() * h->GetNbinsY() * h->GetNbinsZ();
3781 if (fMergeTH1OneByOne || (gProofServ && hsz > gProofServ->GetMsgSizeHWM())) {
3790 list->SetName(h->GetName());
3792 fOutputLists->Add(list);
3797 return (TObject *)0;
3802 return (TObject *)0;
3812 Bool_t TProofPlayerRemote::HistoSameAxis(TH1 *h0, TH1 *h1)
3815 if (!h0 || !h1)
return rc;
3817 TAxis *a0 = 0, *a1 = 0;
3820 a0 = h0->GetXaxis();
3821 a1 = h1->GetXaxis();
3822 if (a0->GetNbins() == a1->GetNbins())
3823 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3824 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3827 if (h0->GetDimension() > 1) {
3829 a0 = h0->GetYaxis();
3830 a1 = h1->GetYaxis();
3831 if (a0->GetNbins() == a1->GetNbins())
3832 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3833 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3837 if (h0->GetDimension() > 2) {
3839 a0 = h0->GetZaxis();
3840 a1 = h1->GetZaxis();
3841 if (a0->GetNbins() == a1->GetNbins())
3842 if (TMath::Abs(a0->GetXmax() - a1->GetXmax()) < 1.e-9)
3843 if (TMath::Abs(a0->GetXmin() - a1->GetXmin()) < 1.e-9) rc = kTRUE;
3853 void TProofPlayerRemote::StoreOutput(TList *out)
3855 PDB(kOutput,1) Info("StoreOutput","Enter");
3858 PDB(kOutput,1) Info("StoreOutput","Leave (empty)");
3863 out->SetOwner(kFALSE);
3865 if (fOutputLists == 0) {
3866 PDB(kOutput,2) Info("StoreOutput","Create fOutputLists");
3867 fOutputLists = new TList;
3868 fOutputLists->SetOwner();
3871 TList* lists = dynamic_cast<TList*> (out->FindObject("PROOF_EventListsList"));
3874 TEventList *mainList =
new TEventList(
"PROOF_EventList");
3878 while ( (aList = dynamic_cast<TEventList*> (it())) ) {
3880 TIter nxe(fDSet->GetListOfElements());
3882 while ( (elem = dynamic_cast<TDSetElement*> (nxe())) ) {
3883 if (strcmp(elem->GetFileName(), aList->GetName()) == 0)
3887 Error(
"StoreOutput",
"found the EventList for %s, but no object with that name "
3888 "in the TDSet", aList->GetName());
3891 Long64_t offset = elem->GetTDSetOffset();
3894 Long64_t *arr = aList->GetList();
3895 Int_t num = aList->GetN();
3897 for (
int i = 0; i < num; i++)
3900 mainList->Add(aList);
3906 while( (obj = next()) ) {
3907 PDB(kOutput,2) Info("StoreOutput","find list for '%s'", obj->GetName() );
3909 TList *list = (TList *) fOutputLists->FindObject( obj->GetName() );
3911 PDB(kOutput,2) Info("StoreOutput", "list for '%s' not found (creating)", obj->GetName());
3913 list->SetName( obj->GetName() );
3915 fOutputLists->Add( list );
3921 PDB(kOutput,1) Info("StoreOutput", "leave");
3927 TList *TProofPlayerRemote::MergeFeedback()
3930 Info("MergeFeedback","Enter");
3932 if ( fFeedbackLists == 0 ) {
3934 Info("MergeFeedback","Leave (no output)");
3938 TList *fb = new TList;
3941 TIter next(fFeedbackLists);
3944 while ( (map = (TMap*) next()) ) {
3947 Info("MergeFeedback", "map %s size: %d", map->GetName(), map->GetSize());
3951 TList *list = new TList;
3954 #ifndef R__TH1MERGEFIXED
3958 while ( TObject *key = keys() ) {
3959 TObject *o = map->GetValue(key);
3960 TH1 *h =
dynamic_cast<TH1 *
>(o);
3961 #ifndef R__TH1MERGEFIXED
3966 if (h && !strncmp(o->GetName(),
"PROOF_",6)) {
3967 if (h->GetNbinsX() > nbmx) {
3968 nbmx= h->GetNbinsX();
3976 while ((href = (TH1 *)nxh())) {
3977 if (h->GetBuffer()) {
3978 if (href->GetBuffer() && href->GetBufferLength() < h->GetBufferLength())
break;
3980 if (href->GetBuffer() || href->GetEntries() < h->GetEntries())
break;
3984 list->AddBefore(href, h);
3994 #ifdef R__TH1MERGEFIXED
3995 TObject *obj = list->First();
3997 TObject *obj = (oref) ? oref : list->First();
4003 if ( list->IsEmpty() ) {
4009 TMethodCall callEnv;
4011 callEnv.InitWithPrototype(obj->IsA(),
"Merge",
"TCollection*");
4012 if (callEnv.IsValid()) {
4013 callEnv.SetParam((Long_t) list);
4014 callEnv.Execute(obj);
4017 while ( (obj = list->First()) ) {
4018 fb->Add(obj->Clone());
4027 Info("MergeFeedback","Leave (%d
object(s))", fb->GetSize());
4035 void TProofPlayerRemote::StoreFeedback(TObject *slave, TList *out)
4038 Info("StoreFeedback","Enter");
4042 Info("StoreFeedback","Leave (empty)");
4053 if (fFeedbackLists == 0) {
4054 PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
4055 fFeedbackLists = new TList;
4056 fFeedbackLists->SetOwner();
4060 out->SetOwner(kFALSE);
4062 const
char *ord = ((TSlave*) slave)->GetOrdinal();
4065 while( (obj = next()) ) {
4067 Info("StoreFeedback","%s: Find '%s'", ord, obj->GetName() );
4068 TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
4071 Info("StoreFeedback", "%s: map for '%s' not found (creating)", ord, obj->GetName());
4074 map->SetName(obj->GetName());
4075 fFeedbackLists->Add(map);
4078 Info("StoreFeedback","%s: removing previous value", ord);
4079 if (map->GetValue(slave))
4080 delete map->GetValue(slave);
4083 map->Add(slave, obj);
4085 Info("StoreFeedback","%s: %s, size: %d", ord, obj->GetName(), map->GetSize());
4090 Info("StoreFeedback","Leave");
4096 void TProofPlayerRemote::SetupFeedback()
4098 if (IsClient())
return;
4100 fFeedback = (TList*) fInput->FindObject(
"FeedbackList");
4102 PDB(kFeedback,1) Info("SetupFeedback","\"FeedbackList\" %sfound",
4103 fFeedback == 0 ? "NOT ":"");
4105 if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
4108 SafeDelete(fFeedbackTimer);
4109 fFeedbackPeriod = 2000;
4110 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4111 fFeedbackTimer = new TTimer;
4112 fFeedbackTimer->SetObject(this);
4113 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4119 void TProofPlayerRemote::StopFeedback()
4121 if (fFeedbackTimer == 0)
return;
4123 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4125 SafeDelete(fFeedbackTimer);
4131 Bool_t TProofPlayerRemote::HandleTimer(TTimer *)
4133 PDB(kFeedback,2) Info("HandleTimer","Entry");
4135 if (fFeedbackTimer == 0) return kFALSE;
4139 TList *fb = new TList;
4142 TIter next(fFeedback);
4143 while( TObjString *name = (TObjString*) next() ) {
4144 TObject *o = fOutput->FindObject(name->GetName());
4146 fb->Add(o->Clone());
4149 if (fFeedbackLists &&
4150 (m = (TMap *) fFeedbackLists->FindObject(name->GetName()))) {
4151 fFeedbackLists->Remove(m);
4158 if (fb->GetSize() > 0) {
4159 StoreFeedback(
this, fb);
4164 if (fFeedbackLists == 0) {
4165 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4169 fb = MergeFeedback();
4171 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4173 TMessage m(kPROOF_FEEDBACK);
4177 gProofServ->GetSocket()->Send(m);
4181 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4189 TDSetElement *TProofPlayerRemote::GetNextPacket(TSlave *slave, TMessage *r)
4195 Int_t bin = fProcPackets->GetXaxis()->FindBin(slave->GetOrdinal());
4197 if (fProcPackets->GetBinContent(bin) > 0)
4198 fProcPackets->Fill(slave->GetOrdinal(), -1);
4202 TDSetElement *e = fPacketizer->GetNextPacket( slave, r );
4206 Info("GetNextPacket","%s: done!", slave->GetOrdinal());
4207 } else if (e == (TDSetElement*) -1) {
4209 Info("GetNextPacket","%s: waiting ...", slave->GetOrdinal());
4212 Info("GetNextPacket","%s (%s): '%s' '%s' '%s' %lld %lld",
4213 slave->GetOrdinal(), slave->GetName(), e->GetFileName(),
4214 e->GetDirectory(), e->GetObjName(), e->GetFirst(), e->GetNum());
4215 if (fProcPackets) fProcPackets->Fill(slave->GetOrdinal(), 1);
4224 Bool_t TProofPlayerRemote::IsClient()
const
4226 return fProof ? fProof->TestBit(TProof::kIsClient) : kFALSE;
4233 Long64_t TProofPlayerRemote::DrawSelect(TDSet *set,
const char *varexp,
4234 const char *selection, Option_t *option,
4235 Long64_t nentries, Long64_t firstentry)
4237 if (!fgDrawInputPars) {
4238 fgDrawInputPars =
new THashList;
4239 fgDrawInputPars->Add(
new TObjString(
"FeedbackList"));
4240 fgDrawInputPars->Add(
new TObjString(
"PROOF_ChainWeight"));
4241 fgDrawInputPars->Add(
new TObjString(
"PROOF_LineColor"));
4242 fgDrawInputPars->Add(
new TObjString(
"PROOF_LineStyle"));
4243 fgDrawInputPars->Add(
new TObjString(
"PROOF_LineWidth"));
4244 fgDrawInputPars->Add(
new TObjString(
"PROOF_MarkerColor"));
4245 fgDrawInputPars->Add(
new TObjString(
"PROOF_MarkerStyle"));
4246 fgDrawInputPars->Add(
new TObjString(
"PROOF_MarkerSize"));
4247 fgDrawInputPars->Add(
new TObjString(
"PROOF_FillColor"));
4248 fgDrawInputPars->Add(
new TObjString(
"PROOF_FillStyle"));
4249 fgDrawInputPars->Add(
new TObjString(
"PROOF_ListOfAliases"));
4252 TString selector, objname;
4253 if (GetDrawArgs(varexp, selection, option, selector, objname) != 0) {
4254 Error(
"DrawSelect",
"parsing arguments");
4258 TNamed *varexpobj =
new TNamed(
"varexp", varexp);
4259 TNamed *selectionobj =
new TNamed(
"selection", selection);
4263 TList *savedInput =
new TList;
4265 while ((o = nxi())) {
4267 TString n(o->GetName());
4268 if (fgDrawInputPars &&
4269 !fgDrawInputPars->FindObject(o->GetName()) &&
4270 !n.BeginsWith(
"alias:")) fInput->Remove(o);
4273 fInput->Add(varexpobj);
4274 fInput->Add(selectionobj);
4277 if (objname ==
"") objname =
"htemp";
4279 fProof->AddFeedback(objname);
4280 Long64_t r = Process(set, selector, option, nentries, firstentry);
4281 fProof->RemoveFeedback(objname);
4283 fInput->Remove(varexpobj);
4284 fInput->Remove(selectionobj);
4285 if (TNamed *opt = dynamic_cast<TNamed*> (fInput->FindObject(
"PROOF_OPTIONS"))) {
4286 fInput->Remove(opt);
4291 delete selectionobj;
4295 TIter nxsi(savedInput);
4296 while ((o = nxsi()))
4298 savedInput->SetOwner(kFALSE);
4307 void TProofPlayerRemote::SetInitTime()
4310 fPacketizer->SetInitTime();
4316 ClassImp(TProofPlayerSlave);
4321 void TProofPlayerSlave::SetupFeedback()
4323 TList *fb = (TList*) fInput->FindObject(
"FeedbackList");
4326 Info("SetupFeedback","\"FeedbackList\" found: %d objects", fb->GetSize());
4329 Info("SetupFeedback","\"FeedbackList\" NOT found");
4332 if (fb == 0 || fb->GetSize() == 0) return;
4336 SafeDelete(fFeedbackTimer);
4337 fFeedbackPeriod = 2000;
4338 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
4339 fFeedbackTimer = new TTimer;
4340 fFeedbackTimer->SetObject(this);
4341 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4349 void TProofPlayerSlave::StopFeedback()
4351 if (fFeedbackTimer == 0)
return;
4353 PDB(kFeedback,1) Info("StopFeedback","Stop Timer");
4355 SafeDelete(fFeedbackTimer);
4361 Bool_t TProofPlayerSlave::HandleTimer(TTimer *)
4363 PDB(kFeedback,2) Info("HandleTimer","Entry");
4368 Bool_t sendm = kFALSE;
4369 TMessage m(kPROOF_PROGRESS);
4370 if (gProofServ->IsMaster() && !gProofServ->IsParallel()) {
4372 if (gProofServ->GetProtocol() > 25) {
4373 m << GetProgressStatus();
4374 }
else if (gProofServ->GetProtocol() > 11) {
4375 TProofProgressStatus *ps = GetProgressStatus();
4376 m << fTotalEvents << ps->GetEntries() << ps->GetBytesRead()
4377 << (Float_t) -1. << (Float_t) ps->GetProcTime()
4378 << (Float_t) ps->GetRate() << (Float_t) -1.;
4380 m << fTotalEvents << GetEventsProcessed();
4383 if (sendm) gProofServ->GetSocket()->Send(m);
4386 if (fFeedback == 0)
return kFALSE;
4388 TList *fb =
new TList;
4389 fb->SetOwner(kFALSE);
4392 fOutput = (THashList *) fSelector->GetOutputList();
4396 TIter next(fFeedback);
4397 while( TObjString *name = (TObjString*) next() ) {
4399 TObject *o = fOutput->FindObject(name->GetName());
4400 if (o != 0) fb->Add(o);
4404 PDB(kFeedback,2) Info("HandleTimer","Sending %d objects", fb->GetSize());
4406 TMessage m(kPROOF_FEEDBACK);
4410 if (gProofServ) gProofServ->GetSocket()->Send(m);
4414 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
4422 void TProofPlayerSlave::HandleGetTreeHeader(TMessage *mess)
4424 TMessage answ(kPROOF_GETTREEHEADER);
4429 TDSetElement *e = dset->Next();
4430 Long64_t entries = 0;
4434 PDB(kGlobal, 1) Info("HandleGetTreeHeader", "empty TDSet");
4436 f = TFile::Open(e->GetFileName());
4439 t = (TTree*) f->Get(e->GetObjName());
4441 t->SetMaxVirtualSize(0);
4443 entries = t->GetEntries();
4446 while ((e = dset->Next()) != 0) {
4447 TFile *f1 = TFile::Open(e->GetFileName());
4449 TTree *t1 = (TTree*) f1->Get(e->GetObjName());
4451 entries += t1->GetEntries();
4457 t->SetMaxEntryLoop(entries);
4462 answ << TString(
"Success") << t;
4464 answ << TString(
"Failed") << t;
4466 fSocket->Send(answ);
4475 ClassImp(TProofPlayerSuperMaster);
4482 Long64_t TProofPlayerSuperMaster::Process(TDSet *dset,
const char *selector_file,
4483 Option_t *option, Long64_t nentries,
4486 fProgressStatus->Reset();
4487 PDB(kGlobal,1) Info("Process","Enter");
4489 TProofSuperMaster *proof = dynamic_cast<TProofSuperMaster*>(GetProof());
4490 if (!proof) return -1;
4493 fOutput = new THashList;
4495 TPerfStats::Start(fInput, fOutput);
4497 if (!SendSelector(selector_file)) {
4498 Error(
"Process",
"sending selector %s", selector_file);
4502 TCleanup clean(
this);
4505 if (proof->IsMaster()) {
4508 if (!dset->ElementsValid()) {
4509 proof->ValidateDSet(dset);
4510 if (!dset->ElementsValid()) {
4511 Error(
"Process",
"could not validate TDSet");
4520 keyholder.SetOwner();
4522 valueholder.SetOwner();
4525 TIter nextslave(proof->GetListOfActiveSlaves());
4526 while (TSlave *sl = dynamic_cast<TSlave*>(nextslave())) {
4527 TList *submasters = 0;
4528 TPair *msd =
dynamic_cast<TPair*
>(msds.FindObject(sl->GetMsd()));
4530 submasters =
new TList;
4531 submasters->SetName(sl->GetMsd());
4532 keyholder.Add(submasters);
4533 TList *setelements =
new TSortedList(kSortDescending);
4534 setelements->SetName(TString(sl->GetMsd())+
"_Elements");
4535 valueholder.Add(setelements);
4536 msds.Add(
new TPair(submasters, setelements));
4538 submasters =
dynamic_cast<TList*
>(msd->Key());
4540 if (submasters) submasters->Add(sl);
4545 TIter nextelement(dset->GetListOfElements());
4546 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextelement())) {
4548 if (elem->GetNum()<1)
continue;
4550 if (nentries !=-1 && cur>=first+nentries) {
4555 if (cur+elem->GetNum()-1<first) {
4557 cur+=elem->GetNum();
4563 elem->SetNum(elem->GetNum()-(first-cur));
4564 elem->SetFirst(elem->GetFirst()+first-cur);
4568 if (nentries==-1 || cur+elem->GetNum()<=first+nentries) {
4569 cur+=elem->GetNum();
4572 elem->SetNum(first+nentries-cur);
4576 TPair *msd =
dynamic_cast<TPair*
>(msds.FindObject(elem->GetMsd()));
4578 Error(
"Process",
"data requires mass storage domain '%s'"
4579 " which is not accessible in this proof session",
4583 TList *elements =
dynamic_cast<TList*
>(msd->Value());
4584 if (elements) elements->Add(elem);
4589 TIter nextmsd(msds.MakeIterator());
4590 while (TPair *msd = dynamic_cast<TPair*>(nextmsd())) {
4591 TList *submasters =
dynamic_cast<TList*
>(msd->Key());
4592 TList *setelements =
dynamic_cast<TList*
>(msd->Value());
4595 Int_t nmasters = submasters ? submasters->GetSize() : -1;
4596 Int_t nelements = setelements ? setelements->GetSize() : -1;
4597 for (Int_t i=0; i<nmasters; i++) {
4600 TDSet set(dset->GetType(), dset->GetObjName(),
4601 dset->GetDirectory());
4602 for (Int_t j = (i*nelements)/nmasters;
4603 j < ((i+1)*nelements)/nmasters;
4605 TDSetElement *elem = setelements ?
4606 dynamic_cast<TDSetElement*
>(setelements->At(j)) : (TDSetElement *)0;
4608 set.Add(elem->GetFileName(), elem->GetObjName(),
4609 elem->GetDirectory(), elem->GetFirst(),
4610 elem->GetNum(), elem->GetMsd());
4611 nent += elem->GetNum();
4613 Warning(
"Process",
"not a TDSetElement object");
4617 if (set.GetListOfElements()->GetSize()>0) {
4618 TMessage mesg(kPROOF_PROCESS);
4619 TString fn(gSystem->BaseName(selector_file));
4620 TString opt = option;
4621 mesg << &set << fn << fInput << opt << Long64_t(-1) << Long64_t(0);
4623 TSlave *sl =
dynamic_cast<TSlave*
>(submasters->At(i));
4625 PDB(kGlobal,1) Info("Process",
4626 "Sending TDSet with %d elements to submaster %s",
4627 set.GetListOfElements()->GetSize(),
4629 sl->GetSocket()->Send(mesg);
4630 usedmasters.Add(sl);
4633 fSlaves.AddLast(sl);
4634 fSlaveProgress.Set(fSlaveProgress.GetSize()+1);
4635 fSlaveProgress[fSlaveProgress.GetSize()-1] = 0;
4636 fSlaveTotals.Set(fSlaveTotals.GetSize()+1);
4637 fSlaveTotals[fSlaveTotals.GetSize()-1] = nent;
4638 fSlaveBytesRead.Set(fSlaveBytesRead.GetSize()+1);
4639 fSlaveBytesRead[fSlaveBytesRead.GetSize()-1] = 0;
4640 fSlaveInitTime.Set(fSlaveInitTime.GetSize()+1);
4641 fSlaveInitTime[fSlaveInitTime.GetSize()-1] = -1.;
4642 fSlaveProcTime.Set(fSlaveProcTime.GetSize()+1);
4643 fSlaveProcTime[fSlaveProcTime.GetSize()-1] = -1.;
4644 fSlaveEvtRti.Set(fSlaveEvtRti.GetSize()+1);
4645 fSlaveEvtRti[fSlaveEvtRti.GetSize()-1] = -1.;
4646 fSlaveMBRti.Set(fSlaveMBRti.GetSize()+1);
4647 fSlaveMBRti[fSlaveMBRti.GetSize()-1] = -1.;
4648 fSlaveActW.Set(fSlaveActW.GetSize()+1);
4649 fSlaveActW[fSlaveActW.GetSize()-1] = 0;
4650 fSlaveTotS.Set(fSlaveTotS.GetSize()+1);
4651 fSlaveTotS[fSlaveTotS.GetSize()-1] = 0;
4652 fSlaveEffS.Set(fSlaveEffS.GetSize()+1);
4653 fSlaveEffS[fSlaveEffS.GetSize()-1] = 0.;
4655 Warning(
"Process",
"not a TSlave object");
4661 if ( !IsClient() ) HandleTimer(0);
4662 PDB(kGlobal,1) Info("Process","Calling Collect");
4663 proof->Collect(&usedmasters);
4670 PDB(kGlobal,1) Info("Process","Calling Merge Output");
4681 void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total, Long64_t processed)
4683 Int_t idx = fSlaves.IndexOf(sl);
4684 fSlaveProgress[idx] = processed;
4685 if (fSlaveTotals[idx] != total)
4686 Warning(
"Progress",
"total events has changed for slave %s", sl->GetName());
4687 fSlaveTotals[idx] = total;
4691 for (i = 0; i < fSlaveTotals.GetSize(); i++) tot += fSlaveTotals[i];
4693 for (i = 0; i < fSlaveProgress.GetSize(); i++) proc += fSlaveProgress[i];
4695 Progress(tot, proc);
4701 void TProofPlayerSuperMaster::Progress(TSlave *sl, Long64_t total,
4702 Long64_t processed, Long64_t bytesread,
4703 Float_t initTime, Float_t procTime,
4704 Float_t evtrti, Float_t mbrti)
4707 Info("Progress","%s: %lld %lld %f %f %f %f", sl->GetName(),
4708 processed, bytesread, initTime, procTime, evtrti, mbrti);
4710 Int_t idx = fSlaves.IndexOf(sl);
4711 if (fSlaveTotals[idx] != total)
4712 Warning("Progress", "total events has changed for slave %s", sl->GetName());
4713 fSlaveTotals[idx] = total;
4714 fSlaveProgress[idx] = processed;
4715 fSlaveBytesRead[idx] = bytesread;
4716 fSlaveInitTime[idx] = (initTime > -1.) ? initTime : fSlaveInitTime[idx];
4717 fSlaveProcTime[idx] = (procTime > -1.) ? procTime : fSlaveProcTime[idx];
4718 fSlaveEvtRti[idx] = (evtrti > -1.) ? evtrti : fSlaveEvtRti[idx];
4719 fSlaveMBRti[idx] = (mbrti > -1.) ? mbrti : fSlaveMBRti[idx];
4726 Float_t ptime = -1.;
4731 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4732 tot += fSlaveTotals[i];
4733 if (i < fSlaveProgress.GetSize())
4734 proc += fSlaveProgress[i];
4735 if (i < fSlaveBytesRead.GetSize())
4736 bytes += fSlaveBytesRead[i];
4737 if (i < fSlaveInitTime.GetSize())
4738 if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4739 init = fSlaveInitTime[i];
4740 if (i < fSlaveProcTime.GetSize())
4741 if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4742 ptime = fSlaveProcTime[i];
4743 if (i < fSlaveEvtRti.GetSize())
4744 if (fSlaveEvtRti[i] > -1.) {
4745 erti += fSlaveEvtRti[i];
4748 if (i < fSlaveMBRti.GetSize())
4749 if (fSlaveMBRti[i] > -1.) {
4750 srti += fSlaveMBRti[i];
4754 srti = (nsrti > 0) ? srti / nerti : 0.;
4756 Progress(tot, proc, bytes, init, ptime, erti, srti);
4762 void TProofPlayerSuperMaster::Progress(TSlave *wrk, TProofProgressInfo *pi)
4766 Info("Progress","%s: %lld %lld %lld %f %f %f %f %d %f", wrk->GetOrdinal(),
4767 pi->fTotal, pi->fProcessed, pi->fBytesRead,
4768 pi->fInitTime, pi->fProcTime, pi->fEvtRateI, pi->fMBRateI,
4769 pi->fActWorkers, pi->fEffSessions);
4771 Int_t idx = fSlaves.IndexOf(wrk);
4772 if (fSlaveTotals[idx] != pi->fTotal)
4773 Warning("Progress", "total events has changed for worker %s", wrk->GetName());
4774 fSlaveTotals[idx] = pi->fTotal;
4775 fSlaveProgress[idx] = pi->fProcessed;
4776 fSlaveBytesRead[idx] = pi->fBytesRead;
4777 fSlaveInitTime[idx] = (pi->fInitTime > -1.) ? pi->fInitTime : fSlaveInitTime[idx];
4778 fSlaveProcTime[idx] = (pi->fProcTime > -1.) ? pi->fProcTime : fSlaveProcTime[idx];
4779 fSlaveEvtRti[idx] = (pi->fEvtRateI > -1.) ? pi->fEvtRateI : fSlaveEvtRti[idx];
4780 fSlaveMBRti[idx] = (pi->fMBRateI > -1.) ? pi->fMBRateI : fSlaveMBRti[idx];
4781 fSlaveActW[idx] = (pi->fActWorkers > -1) ? pi->fActWorkers : fSlaveActW[idx];
4782 fSlaveTotS[idx] = (pi->fTotSessions > -1) ? pi->fTotSessions : fSlaveTotS[idx];
4783 fSlaveEffS[idx] = (pi->fEffSessions > -1.) ? pi->fEffSessions : fSlaveEffS[idx];
4788 TProofProgressInfo pisum(0, 0, 0, -1., -1., 0., 0., 0, 0, 0.);
4789 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4790 pisum.fTotal += fSlaveTotals[i];
4791 if (i < fSlaveProgress.GetSize())
4792 pisum.fProcessed += fSlaveProgress[i];
4793 if (i < fSlaveBytesRead.GetSize())
4794 pisum.fBytesRead += fSlaveBytesRead[i];
4795 if (i < fSlaveInitTime.GetSize())
4796 if (fSlaveInitTime[i] > -1. && (pisum.fInitTime < 0. || fSlaveInitTime[i] < pisum.fInitTime))
4797 pisum.fInitTime = fSlaveInitTime[i];
4798 if (i < fSlaveProcTime.GetSize())
4799 if (fSlaveProcTime[i] > -1. && (pisum.fProcTime < 0. || fSlaveProcTime[i] > pisum.fProcTime))
4800 pisum.fProcTime = fSlaveProcTime[i];
4801 if (i < fSlaveEvtRti.GetSize())
4802 if (fSlaveEvtRti[i] > -1.) {
4803 pisum.fEvtRateI += fSlaveEvtRti[i];
4806 if (i < fSlaveMBRti.GetSize())
4807 if (fSlaveMBRti[i] > -1.) {
4808 pisum.fMBRateI += fSlaveMBRti[i];
4811 if (i < fSlaveActW.GetSize())
4812 pisum.fActWorkers += fSlaveActW[i];
4813 if (i < fSlaveTotS.GetSize())
4814 if (fSlaveTotS[i] > -1 && (pisum.fTotSessions < 0. || fSlaveTotS[i] > pisum.fTotSessions))
4815 pisum.fTotSessions = fSlaveTotS[i];
4816 if (i < fSlaveEffS.GetSize())
4817 if (fSlaveEffS[i] > -1. && (pisum.fEffSessions < 0. || fSlaveEffS[i] > pisum.fEffSessions))
4818 pisum.fEffSessions = fSlaveEffS[i];
4820 pisum.fMBRateI = (nsrti > 0) ? pisum.fMBRateI / nerti : 0.;
4829 Bool_t TProofPlayerSuperMaster::HandleTimer(TTimer *)
4831 if (fFeedbackTimer == 0)
return kFALSE;
4838 Float_t ptime = -1.;
4843 for (i = 0; i < fSlaveTotals.GetSize(); i++) {
4844 tot += fSlaveTotals[i];
4845 if (i < fSlaveProgress.GetSize())
4846 proc += fSlaveProgress[i];
4847 if (i < fSlaveBytesRead.GetSize())
4848 bytes += fSlaveBytesRead[i];
4849 if (i < fSlaveInitTime.GetSize())
4850 if (fSlaveInitTime[i] > -1. && (init < 0. || fSlaveInitTime[i] < init))
4851 init = fSlaveInitTime[i];
4852 if (i < fSlaveProcTime.GetSize())
4853 if (fSlaveProcTime[i] > -1. && (ptime < 0. || fSlaveProcTime[i] > ptime))
4854 ptime = fSlaveProcTime[i];
4855 if (i < fSlaveEvtRti.GetSize())
4856 if (fSlaveEvtRti[i] > -1.) {
4857 erti += fSlaveEvtRti[i];
4860 if (i < fSlaveMBRti.GetSize())
4861 if (fSlaveMBRti[i] > -1.) {
4862 srti += fSlaveMBRti[i];
4866 erti = (nerti > 0) ? erti / nerti : 0.;
4867 srti = (nsrti > 0) ? srti / nerti : 0.;
4869 TMessage m(kPROOF_PROGRESS);
4870 if (gProofServ->GetProtocol() > 25) {
4872 TProofProgressInfo pi(tot, proc, bytes, init, ptime,
4874 gProofServ->GetTotSessions(), gProofServ->GetEffSessions());
4878 m << tot << proc << bytes << init << ptime << erti << srti;
4882 gProofServ->GetSocket()->Send(m);
4884 if (fReturnFeedback)
4885 return TProofPlayerRemote::HandleTimer(0);
4893 void TProofPlayerSuperMaster::SetupFeedback()
4895 if (IsClient())
return;
4897 TProofPlayerRemote::SetupFeedback();
4899 if (fFeedbackTimer) {
4900 fReturnFeedback = kTRUE;
4903 fReturnFeedback = kFALSE;
4907 SafeDelete(fFeedbackTimer);
4908 fFeedbackPeriod = 2000;
4909 TProof::GetParameter(fInput,
"PROOF_FeedbackPeriod", fFeedbackPeriod);
4910 fFeedbackTimer =
new TTimer;
4911 fFeedbackTimer->SetObject(
this);
4912 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);