75 class TPacketizerAdaptive::TFileStat :
public TObject {
80 TDSetElement *fElement;
84 TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
86 Bool_t IsDone()
const {
return fIsDone;}
87 Bool_t IsSortable()
const {
return kTRUE; }
88 void SetDone() {fIsDone = kTRUE;}
89 TFileNode *GetNode()
const {
return fNode;}
90 TDSetElement *GetElement()
const {
return fElement;}
91 Long64_t GetNextEntry()
const {
return fNextEntry;}
92 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
95 Int_t Compare(
const TObject* obj)
const
99 const TFileStat *fst =
dynamic_cast<const TFileStat*
>(obj);
100 if (fst && GetElement() && fst->GetElement()) {
101 Long64_t ent = GetElement()->GetNum();
102 Long64_t entfst = fst->GetElement()->GetNum();
103 if (ent > 0 && entfst > 0) {
106 }
else if (ent < entfst) {
116 void Print(Option_t * = 0)
const
118 Printf(
"TFileStat: %s %lld", fElement ? fElement->GetName() :
"---",
119 fElement ? fElement->GetNum() : -1);
123 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
124 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
127 if (files) files->Add(
this);
133 class TPacketizerAdaptive::TFileNode :
public TObject {
138 TObject *fUnAllocFileNext;
140 TObject *fActFileNext;
152 TSortedList *fFilesToProcess;
155 TFileNode(
const char *name, Int_t strategy, TSortedList *files);
156 ~TFileNode() {
delete fFiles;
delete fActFiles; }
158 void IncMySlaveCnt() { fMySlaveCnt++; }
159 Int_t GetMySlaveCnt()
const {
return fMySlaveCnt; }
160 void IncExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt++; }
161 void DecExtSlaveCnt(
const char *slave) {
if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
162 Int_t GetSlaveCnt()
const {
return fMySlaveCnt + fExtSlaveCnt; }
163 void IncRunSlaveCnt() { fRunSlaveCnt++; }
164 void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
165 Int_t GetRunSlaveCnt()
const {
return fRunSlaveCnt; }
166 Int_t GetExtSlaveCnt()
const {
return fExtSlaveCnt; }
167 Int_t GetNumberOfActiveFiles()
const {
return fActFiles->GetSize(); }
168 Bool_t IsSortable()
const {
return kTRUE; }
169 Int_t GetNumberOfFiles() {
return fFiles->GetSize(); }
170 void IncProcessed(Long64_t nEvents)
171 { fProcessed += nEvents; }
172 Long64_t GetProcessed()
const {
return fProcessed; }
173 void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
176 Long64_t GetEventsLeftPerSlave()
const
177 {
return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178 void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
179 const char *GetName()
const {
return fNodeName.Data(); }
180 Long64_t GetNEvents()
const {
return fEvents; }
182 void Print(Option_t * = 0)
const
187 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188 Printf(
"+++ TFileNode: %s +++", fNodeName.Data());
189 Printf(
"+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190 Printf(
"+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
191 Printf(
"+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
192 if (fFiles && fFiles->GetSize() > 0) {
194 while ((fs = (TFileStat *) nxf())) {
195 if ((e = fs->GetElement())) {
196 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
197 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
199 Printf(
"+++ #%d: no element! ", ++nn);
203 Printf(
"+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
204 if (fActFiles && fActFiles->GetSize() > 0) {
205 TIter nxaf(fActFiles);
206 while ((fs = (TFileStat *) nxaf())) {
207 if ((e = fs->GetElement())) {
208 Printf(
"+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
209 e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
211 Printf(
"+++ #%d: no element! ", ++nn);
215 Printf(
"++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
218 void Add(TDSetElement *elem, Bool_t tolist)
220 TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
221 TFileStat *f =
new TFileStat(
this, elem, files);
223 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
226 TFileStat *GetNextUnAlloc()
228 TObject *next = fUnAllocFileNext;
232 fActFiles->Add(next);
233 if (fActFileNext == 0) fActFileNext = fActFiles->First();
236 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
238 return (TFileStat *) next;
241 TFileStat *GetNextActive()
243 TObject *next = fActFileNext;
245 if (fActFileNext != 0) {
246 fActFileNext = fActFiles->After(fActFileNext);
247 if (fActFileNext == 0) fActFileNext = fActFiles->First();
250 return (TFileStat *) next;
253 void RemoveActive(TFileStat *file)
255 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
256 fActFiles->Remove(file);
257 if (fFilesToProcess) fFilesToProcess->Remove(file);
258 if (fActFileNext == 0) fActFileNext = fActFiles->First();
261 Int_t Compare(
const TObject *other)
const
269 const TFileNode *obj =
dynamic_cast<const TFileNode*
>(other);
271 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
277 if (fStrategy == 1) {
279 Int_t myVal = GetRunSlaveCnt();
280 Int_t otherVal = obj->GetRunSlaveCnt();
281 if (myVal < otherVal) {
283 }
else if (myVal > otherVal) {
287 if ((fEvents - fProcessed) >
288 (obj->GetNEvents() - obj->GetProcessed())) {
295 Int_t myVal = GetSlaveCnt();
296 Int_t otherVal = obj->GetSlaveCnt();
297 if (myVal < otherVal) {
299 }
else if (myVal > otherVal) {
309 fUnAllocFileNext = fFiles->First();
319 TPacketizerAdaptive::TFileNode::TFileNode(
const char *name, Int_t strategy, TSortedList *files)
320 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
321 fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
322 fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323 fStrategy(strategy), fFilesToProcess(files)
328 fActFiles->SetOwner(kFALSE);
333 class TPacketizerAdaptive::TSlaveStat :
public TVirtualPacketizer::TVirtualSlaveStat {
335 friend class TPacketizerAdaptive;
338 TFileNode *fFileNode;
340 TDSetElement *fCurElem;
341 Long64_t fCurProcessed;
342 Float_t fCurProcTime;
346 TSlaveStat(TSlave *slave);
348 TFileNode *GetFileNode()
const {
return fFileNode; }
349 Long64_t GetEntriesProcessed()
const {
return fStatus?fStatus->GetEntries():-1; }
350 Double_t GetProcTime()
const {
return fStatus?fStatus->GetProcTime():-1; }
351 TFileStat *GetCurFile() {
return fCurFile; }
352 void SetFileNode(TFileNode *node) { fFileNode = node; }
353 void UpdateRates(TProofProgressStatus *st);
354 Float_t GetAvgRate() {
return fStatus->GetRate(); }
355 Float_t GetCurRate() {
356 return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
357 Int_t GetLocalEventsLeft() {
358 return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
359 TList *GetProcessedSubSet() {
return fDSubSet; }
360 TProofProgressStatus *GetProgressStatus() {
return fStatus; }
361 TProofProgressStatus *AddProcessed(TProofProgressStatus *st = 0);
367 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
368 : fFileNode(0), fCurFile(0), fCurElem(0),
369 fCurProcessed(0), fCurProcTime(0)
371 fDSubSet =
new TList();
372 fDSubSet->SetOwner();
374 fStatus =
new TProofProgressStatus();
377 fWrkFQDN = slave->GetName();
378 if (strcmp(slave->ClassName(),
"TSlaveLite")) {
379 fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
381 if (fWrkFQDN.Contains(
"localhost") || fWrkFQDN ==
"127.0.0.1")
382 fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
385 Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
391 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
393 SafeDelete(fDSubSet);
400 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
403 Error(
"UpdateRates",
"no status object!");
406 if (fCurFile->IsDone()) {
410 fCurProcTime += st->GetProcTime() - GetProcTime();
411 fCurProcessed += st->GetEntries() - GetEntriesProcessed();
413 fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
414 st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
424 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
426 if (st && fDSubSet && fCurElem) {
427 if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
428 fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
429 fDSubSet->Add(fCurElem);
430 TProofProgressStatus *diff =
new TProofProgressStatus(*st - *fStatus);
433 Error(
"AddProcessed",
"processed subset of current elem undefined");
440 ClassImp(TPacketizerAdaptive);
445 TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves,
446 Long64_t first, Long64_t num,
447 TList *input, TProofProgressStatus *st)
448 : TVirtualPacketizer(input, st)
450 PDB(kPacketizer,1) Info("TPacketizerAdaptive",
451 "enter (first %lld, num %lld)", first, num);
459 fCachePacketSync = kTRUE;
460 fMaxEntriesRatio = 2.;
463 fPacketAsAFraction = 4;
465 fFilesToProcess = new TSortedList;
466 fFilesToProcess->SetOwner(kFALSE);
468 if (!fProgressStatus) {
469 Error(
"TPacketizerAdaptive",
"No progress status");
475 if (TProof::GetParameter(input,
"PROOF_PacketizerCachePacketSync", cpsync) != 0) {
477 cpsync = gEnv->GetValue(
"Packetizer.CachePacketSync", 1);
479 if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
483 if (TProof::GetParameter(input,
"PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
485 fMaxEntriesRatio = gEnv->GetValue(
"Packetizer.MaxEntriesRatio", 2.);
491 if (TProof::GetParameter(input,
"PROOF_PacketizerStrategy", strategy) != 0) {
493 strategy = gEnv->GetValue(
"Packetizer.Strategy", 1);
497 Info(
"TPacketizerAdaptive",
"using the basic strategy of TPacketizer");
498 }
else if (strategy != 1) {
499 Warning(
"TPacketizerAdaptive",
"unsupported strategy index (%d): ignore", strategy);
502 Long_t maxSlaveCnt = 0;
503 if (TProof::GetParameter(input,
"PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
504 if (maxSlaveCnt < 0) {
505 Info(
"TPacketizerAdaptive",
506 "The value of PROOF_MaxSlavesPerNode must be positive");
512 if (TProof::GetParameter(input,
"PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
514 Info(
"TPacketizerAdaptive",
515 "The value of PROOF_MaxSlavesPerNode must be positive");
518 maxSlaveCnt = (Long_t) mxslcnt;
523 maxSlaveCnt = gEnv->GetValue(
"Packetizer.MaxWorkersPerNode", 0);
524 if (maxSlaveCnt > 0) {
525 fMaxSlaveCnt = maxSlaveCnt;
526 Info(
"TPacketizerAdaptive",
"Setting max number of workers per node to %ld",
535 fForceLocal = kFALSE;
536 Int_t forceLocal = 0;
537 if (TProof::GetParameter(input,
"PROOF_ForceLocal", forceLocal) == 0) {
541 Info(
"TPacketizerAdaptive",
542 "The only accepted value of PROOF_ForceLocal parameter is 1 !");
551 Int_t packetAsAFraction = 0;
552 if (TProof::GetParameter(input,
"PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
553 if (packetAsAFraction > 0) {
554 fPacketAsAFraction = packetAsAFraction;
555 Info(
"TPacketizerAdaptive",
556 "using alternate fraction of query time as a packet size: %d",
559 Info(
"TPacketizerAdaptive",
"packetAsAFraction parameter must be higher than 0");
564 Int_t tryReassign = 0;
565 if (TProof::GetParameter(input,
"PROOF_TryReassign", tryReassign) != 0)
566 tryReassign = gEnv->GetValue(
"Packetizer.TryReassign", 0);
567 fTryReassign = tryReassign;
568 if (fTryReassign != 0)
569 Info(
"TPacketizerAdaptive",
"failed packets will be re-assigned");
573 fConfigParams->Add(
new TParameter<Int_t>(
"PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
574 fConfigParams->Add(
new TParameter<Double_t>(
"PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
575 fConfigParams->Add(
new TParameter<Int_t>(
"PROOF_PacketizerStrategy", fStrategy));
576 fConfigParams->Add(
new TParameter<Int_t>(
"PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
577 fConfigParams->Add(
new TParameter<Int_t>(
"PROOF_ForceLocal", (Int_t)fForceLocal));
578 fConfigParams->Add(
new TParameter<Int_t>(
"PROOF_PacketAsAFraction", fPacketAsAFraction));
580 Double_t baseLocalPreference = 1.2;
581 fBaseLocalPreference = (Float_t)baseLocalPreference;
582 if (TProof::GetParameter(input,
"PROOF_BaseLocalPreference", baseLocalPreference) == 0)
583 fBaseLocalPreference = (Float_t)baseLocalPreference;
585 fFileNodes =
new TList;
586 fFileNodes->SetOwner();
587 fUnAllocated =
new TList;
588 fUnAllocated->SetOwner(kFALSE);
590 fActive->SetOwner(kFALSE);
598 TObjArray *partitions = 0;
599 TString partitionsStr;
600 if (TProof::GetParameter(input,
"PROOF_PacketizerPartitions", partitionsStr) != 0)
601 partitionsStr = gEnv->GetValue(
"Packetizer.Partitions",
"");
602 if (!partitionsStr.IsNull()) {
603 Info(
"TPacketizerAdaptive",
"Partitions: %s", partitionsStr.Data());
604 partitions = partitionsStr.Tokenize(
",");
610 while ((e = (TDSetElement*)dset->Next())) {
612 if (e->GetValid())
continue;
615 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
616 fDataSet = e->GetDataSet();
618 TUrl url = e->GetFileName();
620 Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
624 if ( !url.IsValid() ||
625 (strncmp(url.GetProtocol(),"root", 4) &&
626 strncmp(url.GetProtocol(),"file", 4)) ) {
628 }
else if ( url.IsValid() && !strncmp(url.GetProtocol(),
"file", 4)) {
630 url.SetProtocol(
"root");
632 host = url.GetHostFQDN();
635 if (host.Contains(
"localhost") || host ==
"127.0.0.1") {
636 url.SetHost(gSystem->HostName());
637 host = url.GetHostFQDN();
643 TIter iString(partitions);
645 while ((os = (TObjString *)iString())) {
647 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
648 disk = os->GetName();
656 nodeStr.Form(
"%s://%s", url.GetProtocol(), host.Data());
658 nodeStr.Form(
"%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
659 TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
662 node =
new TFileNode(nodeStr, fStrategy, fFilesToProcess);
663 fFileNodes->Add(node);
665 Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
668 Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
671 node->Add(e, kFALSE);
674 fSlaveStats = new TMap;
675 fSlaveStats->SetOwner(kFALSE);
679 while ((slave = (TSlave*) si.Next())) {
680 fSlaveStats->Add( slave,
new TSlaveStat(slave) );
681 fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
682 slave->GetPerfIdx() : fMaxPerfIdx;
688 Int_t validateMode = 0;
689 Int_t gprc = TProof::GetParameter(input,
"PROOF_ValidateByFile", validateMode);
690 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
693 Info("TPacketizerAdaptive",
694 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
695 ValidateFiles(dset, slaves, num, byfile);
705 fUnAllocated->Clear();
709 Info("TPacketizerAdaptive",
710 "processing range: first %lld, num %lld", first, num);
714 while (( e = (TDSetElement*)dset->Next())) {
718 if (!e->GetValid())
continue;
720 TUrl url = e->GetFileName();
721 Long64_t eFirst = e->GetFirst();
722 Long64_t eNum = e->GetNum();
724 Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
726 Info("TPacketizerAdaptive",
727 " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
729 if (!e->GetEntryList()) {
731 if (cur + eNum < first) {
734 Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
739 if (num != -1 && (first+num <= cur)) {
742 Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
746 Bool_t inRange = kFALSE;
747 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
752 e->SetFirst( eFirst + (first - cur) );
753 e->SetNum( e->GetNum() - (first - cur) );
755 Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
756 eFirst + (first - cur), first + num - cur);
759 if (num != -1 && (first+num <= cur+eNum)) {
762 e->SetNum( first + num - e->GetFirst() - cur );
764 Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
771 Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
781 TEntryList *enl =
dynamic_cast<TEntryList *
>(e->GetEntryList());
785 Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
787 TEventList *evl =
dynamic_cast<TEventList *
>(e->GetEntryList());
788 eNum = evl ? evl->GetN() : eNum;
790 Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
794 Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
799 Info("TPacketizerAdaptive", " --> next cur %lld", cur);
803 if ( !url.IsValid() ||
804 (strncmp(url.GetProtocol(),"root", 4) &&
805 strncmp(url.GetProtocol(),"file", 4)) ) {
807 }
else if ( url.IsValid() && !strncmp(url.GetProtocol(),
"file", 4)) {
809 url.SetProtocol(
"root");
811 host = url.GetHostFQDN();
814 if (host.Contains(
"localhost") || host ==
"127.0.0.1") {
815 url.SetHost(gSystem->HostName());
816 host = url.GetHostFQDN();
822 TIter iString(partitions);
824 while ((os = (TObjString *)iString())) {
826 if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
827 disk = os->GetName();
835 nodeStr.Form(
"%s://%s", url.GetProtocol(), host.Data());
837 nodeStr.Form(
"%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
838 TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
842 node =
new TFileNode(nodeStr, fStrategy, fFilesToProcess);
843 fFileNodes->Add( node );
845 Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
848 Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
852 fTotalEntries += eNum;
854 node->IncEvents(eNum);
855 PDB(kPacketizer,2) e->Print("a");
858 Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
859 fTotalEntries, files, fFileNodes->GetSize());
863 gPerfStats->SetNumEvents(fTotalEntries);
870 SafeDelete(fProgress);
872 PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
878 TPacketizerAdaptive::~TPacketizerAdaptive()
881 fSlaveStats->DeleteValues();
884 SafeDelete(fSlaveStats);
885 SafeDelete(fUnAllocated);
887 SafeDelete(fFileNodes);
888 SafeDelete(fFilesToProcess);
895 void TPacketizerAdaptive::InitStats()
899 Int_t noRemoteFiles = 0;
900 fNEventsOnRemLoc = 0;
901 Int_t totalNumberOfFiles = 0;
902 TIter next(fFileNodes);
903 while (TFileNode *fn = (TFileNode*)next()) {
904 totalNumberOfFiles += fn->GetNumberOfFiles();
905 if (fn->GetMySlaveCnt() == 0) {
906 noRemoteFiles += fn->GetNumberOfFiles();
907 fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
911 if (totalNumberOfFiles == 0) {
912 Info(
"InitStats",
"no valid or non-empty file found: setting invalid");
918 fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
920 "fraction of remote files %f", fFractionOfRemoteFiles);
923 SafeDelete(fProgress);
925 PDB(kPacketizer,1) Info("InitStats", "return");
933 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const
char *nodeHostName)
939 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
940 file = node->GetNextUnAlloc();
941 if (file == 0) RemoveUnAllocNode(node);
943 if (nodeHostName && strlen(nodeHostName) > 0) {
947 fUnAllocated->Sort();
948 PDB(kPacketizer,2) fUnAllocated->Print();
951 for (
int i = 0; i < fUnAllocated->GetSize(); i++) {
953 if ((fn = (TFileNode *) fUnAllocated->At(i))) {
954 TUrl uu(fn->GetName());
956 Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
959 if (!strcmp(nodeHostName, uu.GetHost())) {
963 if ((file = node->GetNextUnAlloc()) == 0) {
964 RemoveUnAllocNode(node);
968 Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
973 Warning(
"GetNextUnAlloc",
"unallocate entry %d is empty!", i);
977 if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
980 Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
986 while (file == 0 && ((node = NextNode()) != 0)) {
988 Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
989 if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
996 if (fActive->FindObject(node) == 0) {
1001 PDB(kPacketizer, 2) {
1003 Info(
"GetNextUnAlloc",
"no file found!");
1016 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
1018 fUnAllocated->Sort();
1019 PDB(kPacketizer,2) {
1020 fUnAllocated->Print();
1023 TFileNode *fn = (TFileNode*) fUnAllocated->First();
1024 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1027 Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1037 void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node)
1039 fUnAllocated->Remove(node);
1045 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
1048 TFileStat *file = 0;
1050 while (file == 0 && ((node = NextActiveNode()) != 0)) {
1051 file = node->GetNextActive();
1052 if (file == 0) RemoveActiveNode(node);
1062 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
1065 PDB(kPacketizer,2) {
1066 Info(
"NextActiveNode",
"enter");
1070 TFileNode *fn = (TFileNode*) fActive->First();
1072 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1074 Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1084 void TPacketizerAdaptive::RemoveActive(TFileStat *file)
1086 TFileNode *node = file->GetNode();
1088 node->RemoveActive(file);
1089 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1095 void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node)
1097 fActive->Remove(node);
1103 void TPacketizerAdaptive::Reset()
1105 fUnAllocated->Clear();
1106 fUnAllocated->AddAll(fFileNodes);
1110 TIter files(fFileNodes);
1112 while ((fn = (TFileNode*) files.Next()) != 0) {
1116 TIter slaves(fSlaveStats);
1118 while ((key = slaves.Next()) != 0) {
1119 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1121 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->GetName());
1126 TFileNode *fnmin = 0;
1127 Int_t fncnt = fSlaveStats->GetSize();
1129 while ((fn = (TFileNode*) files.Next()) != 0) {
1130 if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1131 if (fn->GetMySlaveCnt() < fncnt) {
1133 fncnt = fn->GetMySlaveCnt();
1138 slstat->SetFileNode(fnmin);
1139 fnmin->IncMySlaveCnt();
1141 Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1142 fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1144 slstat->fCurFile = 0;
1152 void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves,
1153 Long64_t maxent, Bool_t byfile)
1155 TMap slaves_by_sock;
1162 workers.AddAll(slaves);
1165 while ((slm = (TSlave*)si.Next()) != 0) {
1167 Info("ValidateFiles","socket added to monitor: %p (%s)",
1168 slm->GetSocket(), slm->GetName());
1169 mon.Add(slm->GetSocket());
1170 slaves_by_sock.Add(slm->GetSocket(), slm);
1173 mon.DeActivateAll();
1175 ((TProof*)gProof)->DeActivateAsyncInput();
1178 ((TProof*)gProof)->fCurrentMonitor = &mon;
1181 if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1184 TString msg("Validating files");
1186 UInt_t tot = dset->GetListOfElements()->GetSize();
1189 Long64_t totent = 0, nopenf = 0;
1193 while (TSlave *s = (TSlave *)workers.First()) {
1199 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1201 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
1205 TFileNode *node = 0;
1206 TFileStat *file = 0;
1209 if ((node = slstat->GetFileNode()) != 0) {
1210 PDB(kPacketizer,3) node->Print();
1211 file = GetNextUnAlloc(node);
1213 slstat->SetFileNode(0);
1218 file = GetNextUnAlloc();
1224 slstat->fCurFile = file;
1225 TDSetElement *elem = file->GetElement();
1226 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1227 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1229 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1230 TMessage m(kPROOF_GETENTRIES);
1232 << TString(elem->GetFileName())
1233 << TString(elem->GetDirectory())
1234 << TString(elem->GetObjName());
1236 s->GetSocket()->Send( m );
1237 mon.Activate(s->GetSocket());
1239 Info("ValidateFiles",
1240 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1241 s->GetOrdinal(), s->GetName(), s->GetSocket(),
1242 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1243 elem->GetDirectory(), elem->GetObjName());
1246 elem->SetTDSetOffset(entries);
1250 if (!elem->GetEntryList()) {
1251 if (elem->GetFirst() > entries) {
1252 Error(
"ValidateFiles",
1253 "first (%lld) higher then number of entries (%lld) in %s",
1254 elem->GetFirst(), entries, elem->GetFileName());
1256 slstat->fCurFile->SetDone();
1258 dset->SetBit(TDSet::kSomeInvalid);
1260 if (elem->GetNum() == -1) {
1261 elem->SetNum(entries - elem->GetFirst());
1262 }
else if (elem->GetFirst() + elem->GetNum() > entries) {
1263 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
1264 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1265 entries, elem->GetFileName());
1266 elem->SetNum(entries - elem->GetFirst());
1269 Info("ValidateFiles",
1270 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1278 gProof->SendDataSetStatus(msg, n, tot, st);
1287 if (mon.GetActive() == 0) {
1288 if (byfile && maxent > 0) {
1290 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1291 if (nrestf <= 0 && maxent > totent) nrestf = 1;
1294 Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files
",
1295 maxent, totent, nopenf, nrestf);
1297 while ((slm = (TSlave *) si.Next()) && nrestf--) {
1303 Info("ValidateFiles
", "no need to validate more files
");
1311 PDB(kPacketizer,3) {
1312 Info("ValidateFiles
", "waiting
for %d slaves:
", mon.GetActive());
1313 TList *act = mon.GetListOfActives();
1315 while (TSocket *s = (TSocket*) next()) {
1316 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1318 Info("ValidateFiles
", " worker-%s (%s)
",
1319 sl->GetOrdinal(), sl->GetName());
1324 TSocket *sock = mon.Select();
1325 // If we have been interrupted break
1327 Error("ValidateFiles
", "selection has been interrupted - STOP
");
1328 mon.DeActivateAll();
1332 mon.DeActivate(sock);
1334 PDB(kPacketizer,3) Info("ValidateFiles
", "select returned: %p
", sock);
1336 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1337 if (!sock->IsValid()) {
1338 // A socket got invalid during validation
1339 Error("ValidateFiles
", "worker-%s (%s) got invalid - STOP",
1340 slave->GetOrdinal(), slave->GetName());
1341 ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1348 if (sock->Recv(reply) <= 0) {
1350 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
1351 slave->GetOrdinal(), slave->GetName());
1353 ((TProof*)gProof)->MarkBad(slave,
"receive failed during validation");
1358 if (reply->What() != kPROOF_GETENTRIES) {
1360 Int_t what = reply->What();
1361 ((TProof*)gProof)->HandleInputMessage(slave, reply);
1362 if (what == kPROOF_FATAL) {
1363 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
1364 slave->GetOrdinal(), slave->GetName());
1374 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1375 TDSetElement *e = slavestat->fCurFile->GetElement();
1376 slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1379 (*reply) >> entries;
1382 if ((reply->BufferSize() > reply->Length())) {
1384 (*reply) >> objname;
1385 e->SetTitle(objname);
1388 e->SetTDSetOffset(entries);
1394 if (!e->GetEntryList()) {
1395 if (e->GetFirst() > entries) {
1396 Error(
"ValidateFiles",
1397 "first (%lld) higher then number of entries (%lld) in %s",
1398 e->GetFirst(), entries, e->GetFileName());
1401 slavestat->fCurFile->SetDone();
1403 dset->SetBit(TDSet::kSomeInvalid);
1406 if (e->GetNum() == -1) {
1407 e->SetNum(entries - e->GetFirst());
1408 }
else if (e->GetFirst() + e->GetNum() > entries) {
1409 Error(
"ValidateFiles",
1410 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1411 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1412 e->SetNum(entries - e->GetFirst());
1422 gProof->SendDataSetStatus(msg, n, tot, st);
1426 Error(
"ValidateFiles",
"cannot get entries for file: %s - skipping", e->GetFileName() );
1432 TMessage m(kPROOF_MESSAGE);
1433 m << TString(Form(
"Cannot get entries for file: %s - skipping",
1435 gProofServ->GetSocket()->Send(m);
1440 dset->SetBit(TDSet::kSomeInvalid);
1442 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1445 if (maxent < 0 || ((totent < maxent) && !byfile))
1451 ((TProof*)gProof)->ActivateAsyncInput();
1454 ((TProof*)gProof)->fCurrentMonitor = 0;
1461 Long64_t offset = 0;
1462 Long64_t newOffset = 0;
1463 TIter next(dset->GetListOfElements());
1465 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1466 if (el->GetValid()) {
1467 newOffset = offset + el->GetTDSetOffset();
1468 el->SetTDSetOffset(offset);
1477 Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr, Long64_t cachesz, Int_t learnent)
1480 if (fStrategy == 0) {
1483 Int_t nslaves = fSlaveStats->GetSize();
1485 num = fTotalEntries / (fPacketAsAFraction * nslaves);
1493 TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1494 Float_t rate = slstat->GetCurRate();
1496 rate = slstat->GetAvgRate();
1500 Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
1501 Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1504 Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1510 Bool_t cpsync = fCachePacketSync;
1511 if (fMaxEntriesRatio > 0. && cpsync) {
1512 if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
1513 Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
1514 Long64_t maxEntries = -1;
1515 if (fFilesToProcess->Last()) {
1516 TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
1517 if (elem) maxEntries = elem->GetNum();
1519 if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1520 PDB(kPacketizer,3) {
1521 Info(
"CalculatePacketSize",
"%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1522 Info(
"CalculatePacketSize",
"%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1523 slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1524 (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1530 if (bevt > 0. && cachesz > 0 && cpsync) {
1531 if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1532 packetTime = cachesz / bevt / rate;
1536 if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1537 if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1540 num = (Long64_t)(rate * packetTime);
1544 Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1545 slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1546 packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1551 num = (learnent > 0) ? 5 * learnent : 1000;
1555 Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1558 if (num < 1) num = 1;
1567 Int_t TPacketizerAdaptive::AddProcessed(TSlave *sl,
1568 TProofProgressStatus *status,
1570 TList **listOfMissingFiles)
1573 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1575 Error(
"AddProcessed",
"%s: TSlaveStat instance for worker %s not found!",
1576 (sl ? sl->GetOrdinal() :
"x.x"),
1577 (sl ? sl->GetName() :
"**undef**"));
1583 if ( slstat->fCurElem != 0 ) {
1584 Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1587 if (status && status->GetEntries() > 0)
1588 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1593 TProofProgressStatus *progress = 0;
1596 progress = slstat->AddProcessed(status);
1598 (*fProgressStatus) += *progress;
1600 slstat->UpdateRates(status);
1603 progress =
new TProofProgressStatus();
1607 Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1608 sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1609 progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1612 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1613 slstat->fCurElem->GetFileName(),
1614 progress->GetEntries(),
1616 progress->GetProcTime(),
1617 progress->GetCPUTime(),
1618 progress->GetBytesRead());
1621 if (numev != expectedNumEv) {
1626 TDSetElement *newPacket =
new TDSetElement(*(slstat->fCurElem));
1627 if (newPacket && numev < expectedNumEv) {
1628 Long64_t first = newPacket->GetFirst();
1629 newPacket->SetFirst(first + numev);
1630 if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1631 SafeDelete(newPacket);
1633 Error(
"AddProcessed",
"%s: processed too much? (%lld, %lld)",
1634 sl->GetOrdinal(), numev, expectedNumEv);
1646 slstat->fCurElem = 0;
1647 return (expectedNumEv - numev);
1664 TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r)
1672 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1674 Error(
"GetNextPacket",
"TSlaveStat instance for worker %s not found!",
1675 (sl ? sl->GetName() :
"**undef**"));
1680 TFileStat *file = slstat->fCurFile;
1684 Bool_t firstPacket = kFALSE;
1685 Long64_t cachesz = -1;
1686 Int_t learnent = -1;
1687 if ( slstat->fCurElem != 0 ) {
1689 Long64_t restEntries = 0;
1690 Double_t latency, proctime, proccpu;
1691 TProofProgressStatus *status = 0;
1692 Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1694 if (sl->GetProtocol() > 18) {
1699 if (sl->GetProtocol() > 25) {
1700 (*r) >> cachesz >> learnent;
1701 if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1703 fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1704 fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1708 Long64_t bytesRead = -1;
1710 (*r) >> latency >> proctime >> proccpu;
1712 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1713 if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1715 if (r->BufferSize() > r->Length()) (*r) >> totev;
1717 status =
new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1718 fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1721 if (!fileNotOpen && !fileCorrupted) {
1722 if (AddProcessed(sl, status, latency) != 0)
1723 Error(
"GetNextPacket",
"%s: the worker processed a different # of entries", sl->GetOrdinal());
1724 if (fProgressStatus->GetEntries() >= fTotalEntries) {
1725 if (fProgressStatus->GetEntries() > fTotalEntries)
1726 Error(
"GetNextPacket",
"%s: processed too many entries! (%lld, %lld)",
1727 sl->GetOrdinal(), fProgressStatus->GetEntries(), fTotalEntries);
1730 SafeDelete(fProgress);
1734 if (file->GetElement()) {
1735 if (fileCorrupted) {
1736 Info(
"GetNextPacket",
"%s: file '%s' turned corrupted: invalidating file (%lld)",
1737 sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1738 Int_t nunproc = AddProcessed(sl, status, latency);
1740 Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1743 if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1745 num = file->GetElement()->GetEntries() + restEntries;
1749 Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1750 num = restEntries + rest;
1752 file->GetElement()->SetEntries(num);
1754 Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1755 file->GetElement()->GetName(), file->GetElement()->GetEntries());
1757 file->GetElement()->SetBit(TDSetElement::kCorrupted);
1759 Info(
"GetNextPacket",
"%s: file '%s' could not be open: invalidating related element",
1760 sl->GetOrdinal(), file->GetElement()->GetName());
1763 file->GetElement()->Invalidate();
1765 if (!fFailedPackets) fFailedPackets =
new TList();
1766 if (!fFailedPackets->FindObject(file->GetElement()))
1767 fFailedPackets->Add(file->GetElement());
1773 Info(
"GetNextPacket",
"%s: error raised by worker, but TFileStat object invalid:"
1774 " protocol error?", sl->GetOrdinal());
1778 firstPacket = kTRUE;
1787 if (file != 0) nodeName = file->GetNode()->GetName();
1788 TString nodeHostName(slstat->GetName());
1791 Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1792 sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1795 if ( file != 0 && file->IsDone() ) {
1796 file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1797 file->GetNode()->DecRunSlaveCnt();
1799 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1800 file->GetElement()->GetFileName(), kFALSE);
1804 slstat->fCurFile = file;
1806 Long64_t avgEventsLeftPerSlave =
1807 (fTotalEntries - fProgressStatus->GetEntries()) / fSlaveStats->GetSize();
1808 if (fTotalEntries == fProgressStatus->GetEntries())
1815 Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1816 (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
1817 if ( slstat->GetFileNode() != 0 ) {
1819 fUnAllocated->Sort();
1820 TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1821 Bool_t nonLocalNodePossible;
1823 nonLocalNodePossible = 0;
1825 nonLocalNodePossible = firstNonLocalNode ?
1826 (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1828 openLocal = !nonLocalNodePossible;
1829 Float_t slaveRate = slstat->GetAvgRate();
1830 if ( nonLocalNodePossible && fStrategy == 1) {
1832 if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1833 slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1837 else if ( slaveRate == 0 ) {
1840 if ( slstat->GetLocalEventsLeft() * localPreference
1841 > (avgEventsLeftPerSlave))
1843 else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1844 < slstat->GetLocalEventsLeft() * localPreference )
1846 else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1848 else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1852 Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1854 Float_t avgTime = avgEventsLeftPerSlave
1855 /(fProgressStatus->GetEntries()/GetCumProcTime());
1856 if (slaveTime * localPreference > avgTime)
1858 else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1859 < slstat->GetLocalEventsLeft() * localPreference)
1863 if (openLocal || fStrategy == 0) {
1865 file = slstat->GetFileNode()->GetNextUnAlloc();
1867 file = slstat->GetFileNode()->GetNextActive();
1870 slstat->SetFileNode(0);
1876 if(file == 0 && !fForceLocal)
1877 file = GetNextUnAlloc(0, nodeHostName);
1880 if(file == 0 && !fForceLocal)
1881 file = GetNextActive();
1883 if (file == 0)
return 0;
1885 PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1887 slstat->fCurFile = file;
1889 if (file->GetNode()->GetMySlaveCnt() == 0 &&
1890 file->GetElement()->GetFirst() == file->GetNextEntry()) {
1891 fNEventsOnRemLoc -= file->GetElement()->GetNum();
1892 if (fNEventsOnRemLoc < 0) {
1893 Error(
"GetNextPacket",
1894 "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1899 file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1900 file->GetNode()->IncRunSlaveCnt();
1902 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1903 file->GetNode()->GetName(),
1904 file->GetElement()->GetFileName(), kTRUE);
1907 Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1911 TDSetElement *base = file->GetElement();
1912 Long64_t first = file->GetNextEntry();
1913 Long64_t last = base->GetFirst() + base->GetNum();
1918 if ( first + num * 1.5 >= last ) {
1926 file->MoveNextEntry(num);
1928 slstat->fCurElem = CreateNewPacket(base, first, num);
1929 if (base->GetEntryList())
1930 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1934 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1936 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1939 Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1941 return slstat->fCurElem;
1947 Int_t TPacketizerAdaptive::GetActiveWorkers()
1950 TIter nxw(fSlaveStats);
1952 while ((key = nxw())) {
1953 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1954 if (wrkstat && wrkstat->fCurFile) actw++;
1964 Float_t TPacketizerAdaptive::GetCurrentRate(Bool_t &all)
1968 Float_t currate = 0.;
1969 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1970 TIter nxw(fSlaveStats);
1972 while ((key = nxw()) != 0) {
1973 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1974 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1976 currate += slstat->GetProgressStatus()->GetCurrentRate();
1993 Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t, Long64_t &ent,
1994 Long64_t &bytes, Long64_t &calls)
1997 ent = GetEntriesProcessed();
1998 bytes = GetBytesRead();
1999 calls = GetReadCalls();
2002 if (fUseEstOpt == kEstOff)
2005 Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2007 TTime tnow = gSystem->Now();
2008 Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2014 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2016 TIter nxw(fSlaveStats);
2018 while ((key = nxw()) != 0) {
2019 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2022 Long64_t e = slstat->GetEntriesProcessed();
2023 if (e <= 0) all = kFALSE;
2025 dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2027 Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2028 : slstat->GetAvgRate();
2031 e += (Long64_t) (dt * rate);
2036 Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2037 slstat->fSlave->GetOrdinal(),
2038 slstat->GetEntriesProcessed(), rate, dt, e);
2043 dt = now - fProgressStatus->GetLastUpdate();
2045 Info("GetEstEntriesProcessed",
2046 "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2047 dt, ent, GetEntriesProcessed(), bytes, trate, all);
2050 ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2051 ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2052 bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2055 return ((all) ? 0 : 1);
2068 void TPacketizerAdaptive::MarkBad(TSlave *s, TProofProgressStatus *status,
2069 TList **listOfMissingFiles)
2071 TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2073 Error(
"MarkBad",
"Worker does not exist");
2077 if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2078 slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2079 slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2086 TList *subSet = slaveStat->GetProcessedSubSet();
2089 if (slaveStat->fCurElem) {
2090 subSet->Add(slaveStat->fCurElem);
2093 Int_t nmg = 0, ntries = 100;
2094 TDSetElement *e = 0, *enxt = 0;
2097 e = (TDSetElement *) subSet->First();
2098 while ((enxt = (TDSetElement *) subSet->After(e))) {
2099 if (e->MergeElement(enxt) >= 0) {
2101 subSet->Remove(enxt);
2107 }
while (nmg > 0 && --ntries > 0);
2109 SplitPerHost(subSet, listOfMissingFiles);
2111 subSet->SetOwner(0);
2113 Warning(
"MarkBad",
"subset processed by bad worker not found!");
2115 (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2118 fSlaveStats->Remove(s);
2128 Int_t TPacketizerAdaptive::ReassignPacket(TDSetElement *e,
2129 TList **listOfMissingFiles)
2132 Error(
"ReassignPacket",
"empty packet!");
2136 TUrl url = e->GetFileName();
2140 if (!url.IsValid() || strncmp(url.GetProtocol(),
"root", 4)) {
2143 host = url.GetHost();
2148 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2149 if (node && fTryReassign) {
2151 node->DecreaseProcessed(e->GetNum());
2153 node->Add(e, kFALSE);
2154 if (!fUnAllocated->FindObject(node))
2155 fUnAllocated->Add(node);
2159 TFileInfo *fi = e->GetFileInfo();
2160 if (listOfMissingFiles && *listOfMissingFiles)
2161 (*listOfMissingFiles)->Add((TObject *)fi);
2171 void TPacketizerAdaptive::SplitPerHost(TList *elements,
2172 TList **listOfMissingFiles)
2175 Error(
"SplitPerHost",
"Empty list of packets!");
2178 if (elements->GetSize() <= 0) {
2179 Error(
"SplitPerHost",
"The input list contains no elements");
2182 TIter subSetIter(elements);
2184 while ((e = (TDSetElement*) subSetIter.Next())) {
2185 if (ReassignPacket(e, listOfMissingFiles) == -1) {
2187 if (elements->Remove(e))
2188 Error(
"SplitPerHost",
"Error removing a missing file");