66 class TPacketizer::TFileStat :
public TObject {
71 TDSetElement *fElement;
75 TFileStat(TFileNode *node, TDSetElement *elem);
77 Bool_t IsDone()
const {
return fIsDone;}
78 void SetDone() {fIsDone = kTRUE;}
79 TFileNode *GetNode()
const {
return fNode;}
80 TDSetElement *GetElement()
const {
return fElement;}
81 Long64_t GetNextEntry()
const {
return fNextEntry;}
82 void MoveNextEntry(Long64_t step) {fNextEntry += step;}
86 TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
87 : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
94 class TPacketizer::TFileNode :
public TObject {
99 TObject *fUnAllocFileNext;
101 TObject *fActFileNext;
106 TFileNode(
const char *name);
107 ~TFileNode() {
delete fFiles;
delete fActFiles; }
109 void IncMySlaveCnt() { fMySlaveCnt++; }
110 void IncSlaveCnt(
const char *slave) {
if (fNodeName != slave) fSlaveCnt++; }
111 void DecSlaveCnt(
const char *slave) {
if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
112 Int_t GetSlaveCnt()
const {
return fMySlaveCnt + fSlaveCnt;}
113 Int_t GetNumberOfActiveFiles()
const {
return fActFiles->GetSize(); }
114 Bool_t IsSortable()
const {
return kTRUE; }
116 const char *GetName()
const {
return fNodeName.Data(); }
118 void Add(TDSetElement *elem)
120 TFileStat *f =
new TFileStat(
this,elem);
122 if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
125 TFileStat *GetNextUnAlloc()
127 TObject *next = fUnAllocFileNext;
131 fActFiles->Add(next);
132 if (fActFileNext == 0) fActFileNext = fActFiles->First();
135 fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
138 return (TFileStat *) next;
141 TFileStat *GetNextActive()
143 TObject *next = fActFileNext;
145 if (fActFileNext != 0) {
146 fActFileNext = fActFiles->After(fActFileNext);
147 if (fActFileNext == 0) fActFileNext = fActFiles->First();
150 return (TFileStat *) next;
153 void RemoveActive(TFileStat *file)
155 if (fActFileNext == file) fActFileNext = fActFiles->After(file);
156 fActFiles->Remove(file);
157 if (fActFileNext == 0) fActFileNext = fActFiles->First();
160 Int_t Compare(
const TObject *other)
const
164 const TFileNode *obj =
dynamic_cast<const TFileNode*
>(other);
166 Error(
"Compare",
"input is not a TPacketizer::TFileNode object");
170 Int_t myVal = GetSlaveCnt();
171 Int_t otherVal = obj->GetSlaveCnt();
172 if (myVal < otherVal) {
174 }
else if (myVal > otherVal) {
181 void Print(Option_t *)
const
183 std::cout <<
"OBJ: " << IsA()->GetName() <<
"\t" << fNodeName
184 <<
"\tMySlaveCount " << fMySlaveCnt
185 <<
"\tSlaveCount " << fSlaveCnt << std::endl;
190 fUnAllocFileNext = fFiles->First();
199 TPacketizer::TFileNode::TFileNode(
const char *name)
200 : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
201 fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
206 fActFiles->SetOwner(kFALSE);
212 class TPacketizer::TSlaveStat :
public TVirtualPacketizer::TVirtualSlaveStat {
214 friend class TPacketizer;
217 TFileNode *fFileNode;
219 TDSetElement *fCurElem;
220 TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
222 TSlaveStat(TSlave *slave);
225 TFileNode *GetFileNode()
const {
return fFileNode; }
227 void SetFileNode(TFileNode *node) { fFileNode = node; }
231 TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
232 : fFileNode(0), fCurFile(0), fCurElem(0)
235 fStatus =
new TProofProgressStatus();
241 TPacketizer::TSlaveStat::~TSlaveStat()
246 TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
253 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
255 fStatus->SetLastProcTime(0.);
257 TProofProgressStatus *diff =
new TProofProgressStatus(*st - *fStatus);
260 fStatus->SetLastEntries(lastEntries);
263 Error(
"AddProcessed",
"status arg undefined");
270 ClassImp(TPacketizer);
275 TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first,
276 Long64_t num, TList *input, TProofProgressStatus *st)
277 : TVirtualPacketizer(input, st)
279 PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
288 fHeuristicPSiz = kFALSE;
289 fDefMaxWrkNode = kTRUE;
291 if (!fProgressStatus) {
292 Error(
"TPacketizer",
"No progress status");
296 Long_t maxSlaveCnt = 0;
297 if (TProof::GetParameter(input,
"PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
298 if (maxSlaveCnt < 0) {
299 Warning(
"TPacketizer",
"PROOF_MaxSlavesPerNode must be positive");
302 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
306 if (TProof::GetParameter(input,
"PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
308 Warning(
"TPacketizer",
"PROOF_MaxSlavesPerNode must be positive");
311 maxSlaveCnt = (Long_t) mxslcnt;
312 if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
316 maxSlaveCnt = gEnv->GetValue(
"Packetizer.MaxWorkersPerNode", slaves->GetSize());
317 if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
319 if (maxSlaveCnt > 0) {
320 fMaxSlaveCnt = maxSlaveCnt;
322 Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
325 fPackets = new TList;
326 fPackets->SetOwner();
328 fFileNodes = new TList;
329 fFileNodes->SetOwner();
330 fUnAllocated = new TList;
331 fUnAllocated->SetOwner(kFALSE);
333 fActive->SetOwner(kFALSE);
344 while ((e = (TDSetElement*)dset->Next())) {
345 if (e->GetValid())
continue;
347 TUrl url = e->GetFileName();
351 if ( !url.IsValid() ||
352 (strncmp(url.GetProtocol(),
"root", 4) &&
353 strncmp(url.GetProtocol(),
"file", 4)) ) {
355 }
else if ( url.IsValid() && !strncmp(url.GetProtocol(),
"file", 4)) {
357 url.SetProtocol(
"root");
359 host = url.GetHost();
362 if (host.Contains(
"localhost") || host ==
"127.0.0.1") {
363 url.SetHost(gSystem->HostName());
364 host = url.GetHostFQDN();
367 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
370 node =
new TFileNode(host);
371 fFileNodes->Add(node);
377 fSlaveStats =
new TMap;
378 fSlaveStats->SetOwner(kFALSE);
381 Int_t nwrks = AddWorkers(slaves);
382 Info(
"TPacketizer",
"Initial number of workers: %d", nwrks);
387 Int_t validateMode = 0;
388 Int_t gprc = TProof::GetParameter(input,
"PROOF_ValidateByFile", validateMode);
389 Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
393 "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
394 ValidateFiles(dset, slaves, num, byfile);
403 fUnAllocated->Clear();
406 PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
410 while (( e = (TDSetElement*)dset->Next())) {
414 if (!e->GetValid())
continue;
417 if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
418 fDataSet = e->GetDataSet();
420 TUrl url = e->GetFileName();
421 Long64_t eFirst = e->GetFirst();
422 Long64_t eNum = e->GetNum();
424 Info("TPacketizer", " --> '%s'", e->GetFileName());
426 Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
428 if (!e->GetEntryList()){
430 if (cur + eNum < first) {
433 Info("TPacketizer", " --> skip element cur %lld", cur);
438 if (num != -1 && (first+num <= cur)) {
441 Info("TPacketizer", " --> drop element cur %lld", cur);
445 Bool_t inRange = kFALSE;
446 if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
451 e->SetFirst( eFirst + (first - cur) );
452 e->SetNum( e->GetNum() - (first - cur) );
454 Info("TPacketizer", " --> adjust start %lld and end %lld",
455 eFirst + (first - cur), first + num - cur);
458 if (num != -1 && (first+num <= cur+eNum)) {
461 e->SetNum( first + num - e->GetFirst() - cur );
463 Info("TPacketizer", " --> adjust end %lld", first + num - cur);
470 Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
480 TEntryList *enl =
dynamic_cast<TEntryList *
>(e->GetEntryList());
484 TEventList *evl =
dynamic_cast<TEventList *
>(e->GetEntryList());
485 eNum = evl ? evl->GetN() : eNum;
491 Info("TPacketizer", " --> next cur %lld", cur);
495 if ( !url.IsValid() ||
496 (strncmp(url.GetProtocol(),"root", 4) &&
497 strncmp(url.GetProtocol(),"file", 4)) ) {
499 }
else if ( url.IsValid() && !strncmp(url.GetProtocol(),
"file", 4)) {
501 url.SetProtocol(
"root");
503 host = url.GetHostFQDN();
506 if (host.Contains(
"localhost") || host ==
"127.0.0.1") {
507 url.SetHost(gSystem->HostName());
508 host = url.GetHostFQDN();
511 TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
514 node =
new TFileNode( host );
515 fFileNodes->Add( node );
519 fTotalEntries += eNum;
521 PDB(kPacketizer,2) e->Print("a");
525 Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
526 fTotalEntries, files, fFileNodes->GetSize());
530 gPerfStats->SetNumEvents(fTotalEntries);
534 if (fFileNodes->GetSize() == 0) {
535 Info(
"TPacketizer",
"no valid or non-empty file found: setting invalid");
548 Long_t packetAsAFraction = 20;
549 if (TProof::GetParameter(input,
"PROOF_PacketAsAFraction", packetAsAFraction) == 0)
550 Info(
"Process",
"using alternate fraction of query time as a packet Size: %ld",
552 fPacketAsAFraction = (Int_t)packetAsAFraction;
555 if (TProof::GetParameter(input,
"PROOF_PacketSize", fPacketSize) == 0) {
556 Info(
"Process",
"using alternate packet size: %lld", fPacketSize);
559 fHeuristicPSiz = kTRUE;
560 Int_t nslaves = fSlaveStats->GetSize();
562 fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves);
563 if (fPacketSize < 1) fPacketSize = 1;
569 PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
572 SafeDelete(fProgress);
574 PDB(kPacketizer,1) Info("TPacketizer", "Return");
580 TPacketizer::~TPacketizer()
583 fSlaveStats->DeleteValues();
586 SafeDelete(fPackets);
587 SafeDelete(fSlaveStats);
588 SafeDelete(fUnAllocated);
590 SafeDelete(fFileNodes);
596 Int_t TPacketizer::AddWorkers(TList *workers)
599 Error(
"AddWorkers",
"Null list of new workers!");
603 Int_t curNumOfWrks = fSlaveStats->GetEntries();
607 while (( sl = dynamic_cast<TSlave*>(next()) ))
608 if (!fSlaveStats->FindObject(sl)) {
609 fSlaveStats->Add(sl,
new TSlaveStat(sl));
610 fMaxPerfIdx = sl->GetPerfIdx() > fMaxPerfIdx ? sl->GetPerfIdx() : fMaxPerfIdx;
614 Int_t nwrks = fSlaveStats->GetSize();
615 if (fHeuristicPSiz && nwrks > curNumOfWrks) {
617 fPacketSize = fTotalEntries / (fPacketAsAFraction * nwrks);
618 if (fPacketSize < 1) fPacketSize = 1;
625 if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
634 TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
639 file = node->GetNextUnAlloc();
640 if (file == 0) RemoveUnAllocNode(node);
642 while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
643 file = node->GetNextUnAlloc();
644 if (file == 0) RemoveUnAllocNode(node);
650 if (fActive->FindObject(node) == 0) {
661 TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
663 fUnAllocated->Sort();
665 std::cout <<
"TPacketizer::NextUnAllocNode()" << std::endl;
666 fUnAllocated->Print();
669 TFileNode *fn = (TFileNode*) fUnAllocated->First();
670 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
671 PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
682 void TPacketizer::RemoveUnAllocNode(TFileNode * node)
684 fUnAllocated->Remove(node);
690 TPacketizer::TFileStat *TPacketizer::GetNextActive()
695 while (file == 0 && ((node = NextActiveNode()) != 0)) {
696 file = node->GetNextActive();
697 if (file == 0) RemoveActiveNode(node);
706 TPacketizer::TFileNode *TPacketizer::NextActiveNode()
710 Printf(
"TPacketizer::NextActiveNode : ----------------------");
714 TFileNode *fn = (TFileNode*) fActive->First();
715 if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
717 Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
727 void TPacketizer::RemoveActive(TFileStat *file)
729 TFileNode *node = file->GetNode();
731 node->RemoveActive(file);
732 if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
738 void TPacketizer::RemoveActiveNode(TFileNode *node)
740 fActive->Remove(node);
746 void TPacketizer::Reset()
748 fUnAllocated->Clear();
749 fUnAllocated->AddAll(fFileNodes);
753 TIter files(fFileNodes);
755 while ((fn = (TFileNode*) files.Next()) != 0) {
759 TIter slaves(fSlaveStats);
761 while ((key = slaves.Next()) != 0) {
762 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
764 fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
766 slstat->SetFileNode(fn);
769 slstat->fCurFile = 0;
771 Warning(
"Reset",
"TSlaveStat associated to key '%s' is NULL", key->GetName());
780 void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
789 workers.AddAll(slaves);
792 while ((slm = (TSlave*)si.Next()) != 0) {
794 Info("ValidateFiles","socket added to monitor: %p (%s)",
795 slm->GetSocket(), slm->GetName());
796 mon.Add(slm->GetSocket());
797 slaves_by_sock.Add(slm->GetSocket(), slm);
799 Info("ValidateFiles",
800 "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
805 ((TProof*)gProof)->DeActivateAsyncInput();
808 ((TProof*)gProof)->fCurrentMonitor = &mon;
811 TString msg("Validating files");
813 UInt_t tot = dset->GetListOfElements()->GetSize();
816 Long64_t totent = 0, nopenf = 0;
820 while( TSlave *s = (TSlave*)workers.First() ) {
826 TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
828 Error(
"ValidateFiles",
"TSlaveStat associated to slave '%s' is NULL", s->GetName());
835 if ( (node = slstat->GetFileNode()) != 0 ) {
836 file = GetNextUnAlloc(node);
838 slstat->SetFileNode(0);
844 file = GetNextUnAlloc();
851 slstat->fCurFile = file;
852 TDSetElement *elem = file->GetElement();
853 Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
854 if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
856 file->GetNode()->IncSlaveCnt(slstat->GetName());
857 TMessage m(kPROOF_GETENTRIES);
859 << TString(elem->GetFileName())
860 << TString(elem->GetDirectory())
861 << TString(elem->GetObjName());
863 s->GetSocket()->Send( m );
864 mon.Activate(s->GetSocket());
866 Info("ValidateFiles",
867 "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
868 s->GetOrdinal(), s->GetName(), s->GetSocket(),
869 dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
870 elem->GetDirectory(), elem->GetObjName());
873 elem->SetTDSetOffset(entries);
877 if (!elem->GetEntryList()) {
878 if (elem->GetFirst() > entries) {
879 Error(
"ValidateFiles",
880 "first (%lld) higher then number of entries (%lld) in %s",
881 elem->GetFirst(), entries, elem->GetFileName());
883 slstat->fCurFile->SetDone();
885 dset->SetBit(TDSet::kSomeInvalid);
887 if (elem->GetNum() == -1) {
888 elem->SetNum(entries - elem->GetFirst());
889 }
else if (elem->GetFirst() + elem->GetNum() > entries) {
890 Warning(
"ValidateFiles",
"num (%lld) + first (%lld) larger then number of"
891 " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
892 entries, elem->GetFileName());
893 elem->SetNum(entries - elem->GetFirst());
896 Info("ValidateFiles",
897 "found elem '%s' with %lld entries", elem->GetFileName(), entries);
902 gProof->SendDataSetStatus(msg, n, tot, st);
911 if (mon.GetActive() == 0) {
912 if (byfile && maxent > 0 && totent > 0) {
914 Long64_t nrestf = (maxent - totent) * nopenf / totent ;
915 if (nrestf <= 0 && maxent > totent) nrestf = 1;
918 Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files
",
919 maxent, totent, nopenf, nrestf);
921 while ((slm = (TSlave *) si.Next()) && nrestf--) {
927 Info("ValidateFiles
", "no need to validate more files
");
936 Info("ValidateFiles
", "waiting
for %d workers:
", mon.GetActive());
937 TList *act = mon.GetListOfActives();
940 while ((s = (TSocket*) next())) {
941 Info("ValidateFiles
", "found sck: %p
", s);
942 TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
944 Info("ValidateFiles
", " worker-%s (%s)
", sl->GetOrdinal(), sl->GetName());
949 TSocket *sock = mon.Select();
950 // If we have been interrupted break
952 Error("ValidateFiles
", "selection has been interrupted - STOP
");
957 mon.DeActivate(sock);
959 PDB(kPacketizer,3) Info("ValidateFiles
", "select returned: %p
", sock);
961 TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
962 if (!sock->IsValid()) {
963 // A socket got invalid during validation
964 Error("ValidateFiles
", "worker-%s (%s) got invalid - STOP",
965 slave->GetOrdinal(), slave->GetName());
966 ((TProof*)gProof)->MarkBad(slave);
973 if ( sock->Recv(reply) <= 0 ) {
975 ((TProof*)gProof)->MarkBad(slave);
977 Error(
"ValidateFiles",
"Recv failed! for worker-%s (%s)",
978 slave->GetOrdinal(), slave->GetName());
982 if (reply->What() != kPROOF_GETENTRIES) {
984 Int_t what = reply->What();
985 ((TProof*)gProof)->HandleInputMessage(slave, reply);
986 if (what == kPROOF_FATAL) {
987 Error(
"ValidateFiles",
"kPROOF_FATAL from worker-%s (%s)",
988 slave->GetOrdinal(), slave->GetName());
998 TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
999 TDSetElement *e = slavestat->fCurFile->GetElement();
1000 slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1003 (*reply) >> entries;
1006 if ((reply->BufferSize() > reply->Length())) {
1008 (*reply) >> objname;
1009 e->SetTitle(objname);
1012 e->SetTDSetOffset(entries);
1013 if ( entries > 0 ) {
1019 if (!e->GetEntryList()){
1020 if ( e->GetFirst() > entries ) {
1021 Error(
"ValidateFiles",
"first (%lld) higher then number of entries (%lld) in %s",
1022 e->GetFirst(), entries, e->GetFileName());
1025 slavestat->fCurFile->SetDone();
1027 dset->SetBit(TDSet::kSomeInvalid);
1030 if ( e->GetNum() == -1 ) {
1031 e->SetNum( entries - e->GetFirst() );
1032 }
else if ( e->GetFirst() + e->GetNum() > entries ) {
1033 Error(
"ValidateFiles",
1034 "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1035 e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1036 e->SetNum(entries - e->GetFirst());
1046 gProof->SendDataSetStatus(msg, n, tot, st);
1050 Error(
"ValidateFiles",
"cannot get entries for %s (", e->GetFileName() );
1056 TMessage m(kPROOF_MESSAGE);
1057 m << TString(Form(
"Cannot get entries for file: %s - skipping", e->GetFileName()));
1058 gProofServ->GetSocket()->Send(m);
1063 dset->SetBit(TDSet::kSomeInvalid);
1065 PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1068 if (maxent < 0 || ((totent < maxent) && !byfile))
1074 ((TProof*)gProof)->ActivateAsyncInput();
1077 ((TProof*)gProof)->fCurrentMonitor = 0;
1085 Long64_t offset = 0;
1086 Long64_t newOffset = 0;
1087 TIter next(dset->GetListOfElements());
1089 while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1090 newOffset = offset + el->GetTDSetOffset();
1091 el->SetTDSetOffset(offset);
1099 Long64_t TPacketizer::GetEntriesProcessed(TSlave *slave)
const
1101 if ( fSlaveStats == 0 )
return 0;
1103 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1105 if ( slstat == 0 )
return 0;
1107 return slstat->GetEntriesProcessed();
1114 Float_t TPacketizer::GetCurrentRate(Bool_t &all)
1118 Float_t currate = 0.;
1119 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1120 TIter nxw(fSlaveStats);
1122 while ((key = nxw()) != 0) {
1123 TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1124 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1126 currate += slstat->GetProgressStatus()->GetCurrentRate();
1139 TDSetElement *TPacketizer::GetNextPacket(TSlave *sl, TMessage *r)
1147 TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1149 R__ASSERT( slstat != 0 );
1152 Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1155 Bool_t firstPacket = kFALSE;
1156 if ( slstat->fCurElem != 0 ) {
1157 Double_t latency = 0., proctime = 0., proccpu = 0.;
1158 Long64_t bytesRead = -1;
1159 Long64_t totalEntries = -1;
1161 Long64_t numev = slstat->fCurElem->GetNum();
1163 fPackets->Add(slstat->fCurElem);
1165 if (sl->GetProtocol() > 18) {
1166 TProofProgressStatus *status = 0;
1171 TProofProgressStatus *progress = 0;
1174 numev = status->GetEntries() - slstat->GetEntriesProcessed();
1175 progress = slstat->AddProcessed(status);
1178 proctime = progress->GetProcTime();
1179 proccpu = progress->GetCPUTime();
1180 totev = status->GetEntries();
1181 bytesRead = progress->GetBytesRead();
1186 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
1189 (*r) >> latency >> proctime >> proccpu;
1192 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1193 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1194 if (r->BufferSize() > r->Length()) (*r) >> totev;
1196 numev = totev - slstat->GetEntriesProcessed();
1197 if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1198 if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1199 if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1202 if (fProgressStatus) {
1203 if (numev > 0) fProgressStatus->IncEntries(numev);
1204 if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1205 if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1208 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1209 sl->GetOrdinal(), sl->GetName(),
1210 numev, latency, proctime, proccpu, bytesRead);
1213 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1214 numev, latency, proctime, proccpu, bytesRead);
1216 slstat->fCurElem = 0;
1217 if (fProgressStatus && fProgressStatus->GetEntries() == fTotalEntries) {
1219 delete fProgress; fProgress = 0;
1222 firstPacket = kTRUE;
1232 TFileStat *file = slstat->fCurFile;
1234 if ( file != 0 && file->IsDone() ) {
1235 file->GetNode()->DecSlaveCnt(slstat->GetName());
1237 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1238 file->GetElement()->GetFileName(), kFALSE);
1242 slstat->fCurFile = file;
1247 if (slstat->GetFileNode() != 0) {
1248 file = GetNextUnAlloc(slstat->GetFileNode());
1250 slstat->SetFileNode(0);
1256 file = GetNextUnAlloc();
1261 file = GetNextActive();
1264 if (!file)
return 0;
1266 slstat->fCurFile = file;
1267 file->GetNode()->IncSlaveCnt(slstat->GetName());
1269 gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1270 file->GetNode()->GetName(),
1271 file->GetElement()->GetFileName(), kTRUE);
1276 TDSetElement *base = file->GetElement();
1277 Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
1278 if (num < 1) num = 1;
1280 Long64_t first = file->GetNextEntry();
1281 Long64_t last = base->GetFirst() + base->GetNum();
1283 if ( first + num >= last ) {
1291 file->MoveNextEntry(num);
1295 slstat->fCurElem = CreateNewPacket(base, first, num);
1296 if (base->GetEntryList())
1297 slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1301 slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1303 slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1306 Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1308 return slstat->fCurElem;
1314 Int_t TPacketizer::GetActiveWorkers()
1317 TIter nxw(fSlaveStats);
1319 while ((key = nxw())) {
1320 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1321 if (wrkstat && wrkstat->fCurFile) actw++;