52 class TPacketizerFile::TSlaveStat :
public TVirtualPacketizer::TVirtualSlaveStat {
54 friend class TPacketizerFile;
57 Long64_t fLastProcessed;
59 Double_t fTimeInstant;
64 TSlaveStat(TSlave *sl, TList *input);
67 void GetCurrentTime();
69 void UpdatePerformance(Double_t time);
70 TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
74 class TPacketizerFile::TIterObj :
public TObject {
81 TIterObj(
const char *n, TIter *iter) : fName(n), fIter(iter) { }
82 virtual ~TIterObj() {
if (fIter)
delete fIter; }
84 const char *GetName()
const {
return fName;}
85 TIter *GetIter()
const {
return fIter;}
86 void Print(Option_t* option =
"")
const;
89 ClassImp(TPacketizerFile);
94 TPacketizerFile::TPacketizerFile(TList *workers, Long64_t, TList *input,
95 TProofProgressStatus *st)
96 : TVirtualPacketizer(input, st)
98 PDB(kPacketizer,1) Info("TPacketizerFile", "enter");
99 ResetBit(TObject::kInvalidObject);
102 fProcNotAssigned = kTRUE;
103 fAddFileInfo = kFALSE;
105 if (!input || (input && input->GetSize() <= 0)) {
106 Error(
"TPacketizerFile",
"input file is undefined or empty!");
107 SetBit(TObject::kInvalidObject);
112 Int_t procnotass = 1;
113 if (TProof::GetParameter(input,
"PROOF_ProcessNotAssigned", procnotass) == 0) {
114 if (procnotass == 0) {
115 Info(
"TPacketizerFile",
"files not assigned to workers will not be processed");
116 fProcNotAssigned = kFALSE;
121 Int_t addfileinfo = 0;
122 if (TProof::GetParameter(input,
"PROOF_IncludeFileInfoInPacket", addfileinfo) == 0) {
123 if (addfileinfo == 1) {
124 Info(
"TPacketizerFile",
125 "TFileInfo object will be included in the packet as associated object");
126 fAddFileInfo = kTRUE;
131 if (!(fFiles = dynamic_cast<TMap *>(input->FindObject(
"PROOF_FilesToProcess")))) {
132 Error(
"TPacketizerFile",
"map of files to be processed/created not found");
133 SetBit(TObject::kInvalidObject);
138 fSlaveStats =
new TMap;
139 fSlaveStats->SetOwner(kFALSE);
142 nodes.SetOwner(kTRUE);
145 while ((wrk = (TSlave *) si.Next())) {
146 fSlaveStats->Add(wrk,
new TSlaveStat(wrk, input));
147 TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
148 Info(
"TPacketizerFile",
"worker: %s", wrkname.Data());
149 if (!nodes.FindObject(wrkname)) nodes.Add(
new TObjString(wrkname));
154 fIters->SetOwner(kTRUE);
158 fNotAssigned =
new TList;
159 fNotAssigned->SetName(
"*");
161 TObject *key, *o = 0;
162 while ((key = nxl()) != 0) {
163 THashList *wrklist =
dynamic_cast<THashList *
>(fFiles->GetValue(key));
165 TFileCollection *fc =
dynamic_cast<TFileCollection *
>(fFiles->GetValue(key));
166 if (fc) wrklist = fc->GetList();
169 TString hname = TUrl(key->GetName()).GetHostFQDN();
170 if ((o = nodes.FindObject(hname))) {
171 fTotalEntries += wrklist->GetSize();
172 fIters->Add(
new TIterObj(hname,
new TIter(wrklist)));
175 Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') assigned to '%s'",
176 wrklist->GetSize(), key->GetName(), hname.Data(), o->GetName());
182 fNotAssigned->Add(o);
185 Info("TPacketizerFile", "%d files of '%s' (fqdn: '%s') not assigned",
186 wrklist->GetSize(), key->GetName(), hname.Data());
190 if (fNotAssigned && fNotAssigned->GetSize() > 0) {
191 fTotalEntries += fNotAssigned->GetSize();
192 fIters->Add(
new TIterObj(
"*",
new TIter(fNotAssigned)));
193 Info(
"TPacketizerFile",
"non-assigned files: %d", fNotAssigned->GetSize());
194 fNotAssigned->Print();
196 if (fTotalEntries <= 0) {
197 Error(
"TPacketizerFile",
"no file path in the map!");
198 SetBit(TObject::kInvalidObject);
202 Info(
"TPacketizerFile",
"processing %lld files", fTotalEntries);
206 fStopwatch =
new TStopwatch();
209 PDB(kPacketizer,1) Info("TPacketizerFile", "return");
218 TPacketizerFile::~TPacketizerFile()
220 if (fNotAssigned) fNotAssigned->SetOwner(kFALSE);
221 SafeDelete(fNotAssigned);
222 if (fIters) fIters->SetOwner(kTRUE);
224 SafeDelete(fStopwatch);
230 Double_t TPacketizerFile::GetCurrentTime()
232 Double_t retValue = fStopwatch->RealTime();
233 fStopwatch->Continue();
241 Float_t TPacketizerFile::GetCurrentRate(Bool_t &all)
245 Float_t currate = 0.;
246 if (fSlaveStats && fSlaveStats->GetSize() > 0) {
247 TIter nxw(fSlaveStats);
249 while ((key = nxw()) != 0) {
250 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
251 if (wrkstat && wrkstat->GetProgressStatus() && wrkstat->GetEntriesProcessed() > 0) {
253 currate += wrkstat->GetProgressStatus()->GetCurrentRate();
266 TDSetElement *TPacketizerFile::GetNextPacket(TSlave *wrk, TMessage *r)
268 TDSetElement *elem = 0;
269 if (!fValid)
return elem;
272 TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(wrk);
274 Error(
"GetNextPacket",
"could not find stat object for worker '%s'!", wrk->GetName());
279 Info("GetNextPacket","worker-%s: fAssigned %lld / %lld", wrk->GetOrdinal(), fAssigned, fTotalEntries);
282 Double_t latency = 0., proctime = 0., proccpu = 0.;
283 Long64_t bytesRead = -1;
284 Long64_t totalEntries = -1;
288 TProofProgressStatus *status = 0;
289 if (wrk->GetProtocol() > 18) {
294 TProofProgressStatus *progress = 0;
297 numev = status->GetEntries() - wrkstat->GetEntriesProcessed();
298 progress = wrkstat->AddProcessed(status);
301 proctime = progress->GetProcTime();
302 proccpu = progress->GetCPUTime();
303 totev = status->GetEntries();
304 bytesRead = progress->GetBytesRead();
309 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
312 (*r) >> latency >> proctime >> proccpu;
315 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
316 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
317 if (r->BufferSize() > r->Length()) (*r) >> totev;
319 numev = totev - wrkstat->GetEntriesProcessed();
320 wrkstat->GetProgressStatus()->IncEntries(numev);
321 wrkstat->GetProgressStatus()->SetLastUpdate();
324 fProgressStatus->IncEntries(numev);
325 fProgressStatus->SetLastUpdate();
328 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
329 wrk->GetOrdinal(), wrk->GetName(),
330 numev, latency, proctime, proccpu, bytesRead);
332 if (gPerfStats != 0) {
333 gPerfStats->PacketEvent(wrk->GetOrdinal(), wrk->GetName(),
"", numev,
334 latency, proctime, proccpu, bytesRead);
337 if (fAssigned == fTotalEntries) {
350 Info("GetNextPacket", "worker-%s (%s): getting next files ... ", wrk->GetOrdinal(),
354 TObject *nextfile = 0;
357 TString wrkname = TUrl(wrk->GetName()).GetHostFQDN();
358 TIterObj *io = dynamic_cast<TIterObj *>(fIters->FindObject(wrkname));
362 nextfile = io->GetIter()->Next();
367 if (!nextfile && fProcNotAssigned) {
368 if ((io = dynamic_cast<TIterObj *>(fIters->FindObject(
"*")))) {
371 nextfile = io->GetIter()->Next();
376 if (!nextfile)
return elem;
382 if ((os = dynamic_cast<TObjString *>(nextfile))) {
383 filename = os->GetName();
385 if ((fi = dynamic_cast<TFileInfo *>(nextfile)))
386 filename = fi->GetCurrentUrl()->GetUrl();
389 if (filename.IsNull()) {
390 Warning(
"GetNextPacket",
"found unsupported object of type '%s' in list: it must"
391 " be 'TObjString' or 'TFileInfo'", nextfile->ClassName());
396 Info("GetNextPacket", "worker-%s: assigning: '%s' (remaining %lld files)",
397 wrk->GetOrdinal(), filename.Data(), (fTotalEntries - fAssigned));
398 elem = new TDSetElement(filename, "", "", 0, 1);
399 elem->SetBit(TDSetElement::kEmpty);
402 if (fAddFileInfo && fi) {
403 elem->AddAssocObj(fi);
404 PDB(kPacketizer,2) fi->Print("L");
418 TPacketizerFile::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
420 fSpeed(0), fTimeInstant(0), fCircLvl(5)
423 fCircNtp =
new TNtupleD(
"Speed Circ Ntp",
"Circular process info",
"tm:ev");
424 TProof::GetParameter(input,
"PROOF_TPacketizerFileCircularity", fCircLvl);
425 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
426 fCircNtp->SetCircular(fCircLvl);
428 fStatus =
new TProofProgressStatus();
434 TPacketizerFile::TSlaveStat::~TSlaveStat()
436 SafeDelete(fCircNtp);
442 void TPacketizerFile::TSlaveStat::UpdatePerformance(Double_t time)
444 Double_t ttot = time;
445 Double_t *ar = fCircNtp->GetArgs();
446 Int_t ne = fCircNtp->GetEntries();
449 fCircNtp->Fill(0., 0);
454 fCircNtp->GetEntry(ne-1);
456 fCircNtp->Fill(ttot, GetEntriesProcessed());
459 fCircNtp->GetEntry(0);
460 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
461 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
462 fSpeed = nevts / dtime;
464 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
465 time, dtime, nevts, fSpeed);
473 TProofProgressStatus *TPacketizerFile::TSlaveStat::AddProcessed(TProofProgressStatus *st)
477 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
479 fStatus->SetLastProcTime(0.);
481 TProofProgressStatus *diff =
new TProofProgressStatus(*st - *fStatus);
484 fStatus->SetLastEntries(lastEntries);
487 Error(
"AddProcessed",
"status arg undefined");
495 void TPacketizerFile::TIterObj::Print(Option_t *)
const
497 Printf(
"Iterator '%s' controls %d units", GetName(),
498 ((GetIter() && GetIter()->GetCollection()) ? GetIter()->GetCollection()->GetSize()