53 using namespace TMath;
64 class TPacketizerUnit::TSlaveStat :
public TVirtualPacketizer::TVirtualSlaveStat {
66 friend class TPacketizerUnit;
69 Long64_t fLastProcessed;
71 Double_t fTimeInstant;
76 TSlaveStat(TSlave *sl, TList *input);
81 void UpdatePerformance(Double_t time);
82 TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
90 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
92 fRate(0), fTimeInstant(0), fCircLvl(5)
95 fCircNtp =
new TNtupleD(
"Speed Circ Ntp",
"Circular process info",
"tm:ev");
96 fCircNtp->SetDirectory(0);
97 TProof::GetParameter(input,
"PROOF_TPacketizerUnitCircularity", fCircLvl);
98 fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
99 fCircNtp->SetCircular(fCircLvl);
101 fStatus =
new TProofProgressStatus();
107 TPacketizerUnit::TSlaveStat::~TSlaveStat()
109 SafeDelete(fCircNtp);
115 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
117 Double_t ttot = time;
118 Double_t *ar = fCircNtp->GetArgs();
119 Int_t ne = fCircNtp->GetEntries();
122 fCircNtp->Fill(0., 0);
127 fCircNtp->GetEntry(ne-1);
129 fCircNtp->Fill(ttot, GetEntriesProcessed());
132 fCircNtp->GetEntry(0);
133 Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
134 Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
135 fRate = nevts / dtime;
137 Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
138 time, dtime, nevts, fRate);
146 TProofProgressStatus *TPacketizerUnit::TSlaveStat::AddProcessed(TProofProgressStatus *st)
150 Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
152 fStatus->SetLastProcTime(0.);
154 TProofProgressStatus *diff =
new TProofProgressStatus(*st - *fStatus);
157 fStatus->SetLastEntries(lastEntries);
160 Error(
"AddProcessed",
"status arg undefined");
167 ClassImp(TPacketizerUnit);
172 TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input,
173 TProofProgressStatus *st)
174 : TVirtualPacketizer(input, st)
176 PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
185 if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
189 Info(
"TPacketizerUnit",
"forcing the same cycles on each worker");
194 if (TProof::GetParameter(input,
"PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
197 Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
200 Double_t timeLimit = -1;
201 if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
202 fMaxPacketTime = timeLimit;
203 Warning(
"TPacketizerUnit",
"PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
206 Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
210 Double_t minPacketTime = 0;
211 if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
212 TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
214 mpt->SetVal(fMinPacketTime);
216 fConfigParams->Add(
new TParameter<Double_t>(
"PROOF_MinPacketTime", fMinPacketTime));
223 fStopwatch =
new TStopwatch();
225 fPackets =
new TList;
226 fPackets->SetOwner();
228 fWrkStats =
new TMap;
229 fWrkStats->SetOwner(kFALSE);
234 while ((slave = (TSlave*) si.Next())) {
235 if (slave->GetParallel() > 0) {
236 fWrkStats->Add(slave,
new TSlaveStat(slave, input));
239 fWrkExcluded =
new TList;
240 fWrkExcluded->SetOwner(kFALSE);
243 Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
244 fWrkExcluded->Add(slave);
250 if (num > 0 && AssignWork(0,0,num) != 0)
251 Warning("TPacketizerUnit", "some problems assigning work");
255 fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
258 PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
264 Int_t TPacketizerUnit::AssignWork(TDSet *, Long64_t, Long64_t num)
267 Error(
"AssignWork",
"assigned a negative number (%lld) of cycles - protocol error?", num);
271 fTotalEntries += num;
273 Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
276 if (fFixedNum && fWrkStats->GetSize() > 0) {
278 fNumPerWorker = fTotalEntries / fWrkStats->GetSize();
279 if (fNumPerWorker == 0) fNumPerWorker = 1;
284 TParameter<Long64_t> *fn =
285 (TParameter<Long64_t> *) fConfigParams->FindObject(
"PROOF_PacketizerFixedNum");
287 fn->SetVal(fNumPerWorker);
289 fConfigParams->Add(
new TParameter<Long64_t>(
"PROOF_PacketizerFixedNum", fNumPerWorker));
299 TPacketizerUnit::~TPacketizerUnit()
302 fWrkStats->DeleteValues();
303 SafeDelete(fWrkStats);
304 SafeDelete(fWrkExcluded);
305 SafeDelete(fPackets);
306 SafeDelete(fStopwatch);
312 Double_t TPacketizerUnit::GetCurrentTime()
314 Double_t retValue = fStopwatch->RealTime();
315 fStopwatch->Continue();
323 Float_t TPacketizerUnit::GetCurrentRate(Bool_t &all)
327 Float_t currate = 0.;
328 if (fWrkStats && fWrkStats->GetSize() > 0) {
329 TIter nxw(fWrkStats);
331 while ((key = nxw()) != 0) {
332 TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
333 if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
335 currate += slstat->GetProgressStatus()->GetCurrentRate();
348 TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
354 TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
356 Warning(
"GetNextPacket",
"Received a packet request from an unknown slave: %s:%s",
357 sl->GetName(), sl->GetOrdinal());
362 Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
365 Double_t latency = 0., proctime = 0., proccpu = 0.;
366 Long64_t bytesRead = -1;
367 Long64_t totalEntries = -1;
371 TProofProgressStatus *status = 0;
372 if (sl->GetProtocol() > 18) {
377 TProofProgressStatus *progress = 0;
380 numev = status->GetEntries() - slstat->GetEntriesProcessed();
381 progress = slstat->AddProcessed(status);
384 proctime = progress->GetProcTime();
385 proccpu = progress->GetCPUTime();
386 totev = status->GetEntries();
387 bytesRead = progress->GetBytesRead();
392 Error(
"GetNextPacket",
"no status came in the kPROOF_GETPACKET message");
395 (*r) >> latency >> proctime >> proccpu;
398 if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
399 if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
400 if (r->BufferSize() > r->Length()) (*r) >> totev;
402 numev = totev - slstat->GetEntriesProcessed();
403 slstat->GetProgressStatus()->IncEntries(numev);
404 slstat->GetProgressStatus()->SetLastUpdate();
407 fProgressStatus->IncEntries(numev);
408 fProgressStatus->SetLastUpdate();
413 Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
414 sl->GetOrdinal(), sl->GetName(),
415 numev, latency, proctime, proccpu, bytesRead);
417 if (gPerfStats != 0) {
418 gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
"", numev,
419 latency, proctime, proccpu, bytesRead);
422 if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
424 Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
425 sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
429 if (fAssigned == fTotalEntries) {
432 if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) {
433 TDSetElement *nxe = gProofServ->GetNextPacket();
435 if (AssignWork(0,0,nxe->GetNum()) == 0) {
436 if (fAssigned < fTotalEntries) done = kFALSE;
438 Error(
"GetNextPacket",
"problems assigning additional work: stop");
460 Double_t cTime = GetCurrentTime();
462 if (slstat->fCircNtp->GetEntries() <= 0) {
464 Long64_t avg = fTotalEntries / fWrkStats->GetSize();
465 num = (Long64_t) (fCalibFrac * avg);
466 if (num < 1) num = (avg >= 1) ? avg : 1;
468 Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
469 fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
472 slstat->UpdatePerformance(0.);
476 if (fNumPerWorker < 0) {
482 slstat->UpdatePerformance(proctime);
492 Double_t sumRate = 0.;
493 TIter nxwrk(fWrkStats);
494 TSlaveStat *wrkStat = 0;
496 while ((tmpWrk = (TSlave *)nxwrk())) {
497 if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
498 if (wrkStat->fRate > 0) {
500 sumRate += wrkStat->fRate;
503 Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
504 nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
506 Warning(
"GetNextPacket",
"dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
507 tmpWrk->GetName(), tmpWrk->GetOrdinal());
513 Error(
"GetNextPacket",
"no worker has consistent information: stop processing!");
514 return (TDSetElement *)0;
517 Double_t avgRate = sumRate / nrm;
519 if (nrm < fWrkStats->GetSize()) {
521 sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
524 Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
525 avgRate, sumRate, nrm, fWrkStats->GetSize());
528 Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
529 num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
531 Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
534 Double_t packTime = num / wrkRate;
535 if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
536 num = (Long64_t) (fMaxPacketTime * wrkRate) ;
537 packTime = fMaxPacketTime;
539 Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
540 sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
542 if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
543 num = (Long64_t) (fMinPacketTime * wrkRate);
545 Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
546 sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
551 num = fNumPerWorker - slstat->fLastProcessed;
552 if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
553 num = (Long64_t) (slstat->fRate * fMaxPacketTime);
558 num = (num > 1) ? num : 1;
559 fProcessing = (num < (fTotalEntries - fAssigned)) ? num
560 : (fTotalEntries - fAssigned);
563 slstat->fLastProcessed = fProcessing;
565 slstat->fTimeInstant = cTime;
569 TString sseq = TString::Format(
"p%lld", fPacketSeq);
572 Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
573 num, fProcessing, (fTotalEntries - fAssigned - fProcessing));
574 TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
575 elem->SetBit(TDSetElement::kEmpty);
578 fAssigned += slstat->fLastProcessed;
586 Int_t TPacketizerUnit::AddWorkers(TList *workers)
589 Error(
"AddWorkers",
"Null list of new workers!");
593 Int_t curNumOfWrks = fWrkStats->GetEntries();
597 while (( sl = dynamic_cast<TSlave*>(next()) ))
598 fWrkStats->Add(sl,
new TSlaveStat(sl, fInput));
601 if (fFixedNum && fWrkStats->GetSize() > 0) {
603 fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
604 if (fNumPerWorker == 0) fNumPerWorker = 1;
607 fConfigParams->Add(
new TParameter<Long64_t>(
"PROOF_PacketizerFixedNum", fNumPerWorker));
609 return fWrkStats->GetEntries();