Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TPacketizerUnit.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Long Tran-Thanh 22/07/07
3 // Revised: G. Ganis, May 2011
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 /** \class TPacketizerUnit
14 \ingroup proofkernel
15 
16 This packetizer generates packets of generic units, representing the
17 number of times an operation cycle has to be repeated by the worker
18 node, e.g. the number of Monte carlo events to be generated.
19 Packets sizes are generated taking into account the performance of
20 worker nodes, based on the time needed to process previous packets,
21 with the goal of having all workers ending at the same time.
22 
23 */
24 
25 
26 #include "TPacketizerUnit.h"
27 
28 #include "Riostream.h"
29 #include "TDSet.h"
30 #include "TError.h"
31 #include "TEventList.h"
32 #include "TMap.h"
33 #include "TMessage.h"
34 #include "TMonitor.h"
35 #include "TNtupleD.h"
36 #include "TObject.h"
37 #include "TParameter.h"
38 #include "TPerfStats.h"
39 #include "TProofDebug.h"
40 #include "TProof.h"
41 #include "TProofPlayer.h"
42 #include "TProofServ.h"
43 #include "TSlave.h"
44 #include "TSocket.h"
45 #include "TStopwatch.h"
46 #include "TTimer.h"
47 #include "TUrl.h"
48 #include "TClass.h"
49 #include "TMath.h"
50 #include "TObjString.h"
51 
52 
53 using namespace TMath;
54 //
55 // The following utility class manage the state of the
56 // work to be performed and the slaves involved in the process.
57 //
58 // The list of TSlaveStat(s) keep track of the work (being) done
59 // by each slave
60 //
61 
62 //------------------------------------------------------------------------------
63 
64 class TPacketizerUnit::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
65 
66 friend class TPacketizerUnit;
67 
68 private:
69  Long64_t fLastProcessed; // Number of processed entries of the last packet
70  Double_t fRate; // Estimated processing rate averaged over circularity
71  Double_t fTimeInstant; // Starting time of the current packet
72  TNtupleD *fCircNtp; // Keeps circular info for speed calculations
73  Long_t fCircLvl; // Circularity level
74 
75 public:
76  TSlaveStat(TSlave *sl, TList *input);
77  ~TSlaveStat();
78 
79 // void GetCurrentTime();
80 
81  void UpdatePerformance(Double_t time);
82  TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
83 
84 // ClassDef(TPacketizerUnit::TSlaveStat, 0);
85 };
86 
87 ////////////////////////////////////////////////////////////////////////////////
88 /// Main constructor
89 
90 TPacketizerUnit::TSlaveStat::TSlaveStat(TSlave *slave, TList *input)
91  : fLastProcessed(0),
92  fRate(0), fTimeInstant(0), fCircLvl(5)
93 {
94  // Initialize the circularity ntple for speed calculations
95  fCircNtp = new TNtupleD("Speed Circ Ntp", "Circular process info","tm:ev");
96  fCircNtp->SetDirectory(0);
97  TProof::GetParameter(input, "PROOF_TPacketizerUnitCircularity", fCircLvl);
98  fCircLvl = (fCircLvl > 0) ? fCircLvl : 5;
99  fCircNtp->SetCircular(fCircLvl);
100  fSlave = slave;
101  fStatus = new TProofProgressStatus();
102 }
103 
104 ////////////////////////////////////////////////////////////////////////////////
105 /// Destructor
106 
107 TPacketizerUnit::TSlaveStat::~TSlaveStat()
108 {
109  SafeDelete(fCircNtp);
110 }
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Update the circular ntple
114 
115 void TPacketizerUnit::TSlaveStat::UpdatePerformance(Double_t time)
116 {
117  Double_t ttot = time;
118  Double_t *ar = fCircNtp->GetArgs();
119  Int_t ne = fCircNtp->GetEntries();
120  if (ne <= 0) {
121  // First call: just fill one ref entry and return
122  fCircNtp->Fill(0., 0);
123  fRate = 0.;
124  return;
125  }
126  // Fill the entry
127  fCircNtp->GetEntry(ne-1);
128  ttot = ar[0] + time;
129  fCircNtp->Fill(ttot, GetEntriesProcessed());
130 
131  // Calculate the speed
132  fCircNtp->GetEntry(0);
133  Double_t dtime = (ttot > ar[0]) ? ttot - ar[0] : ne+1 ;
134  Long64_t nevts = GetEntriesProcessed() - (Long64_t)ar[1];
135  fRate = nevts / dtime;
136  PDB(kPacketizer,2)
137  Info("UpdatePerformance", "time:%f, dtime:%f, nevts:%lld, speed: %f",
138  time, dtime, nevts, fRate);
139 
140 }
141 
142 ////////////////////////////////////////////////////////////////////////////////
143 /// Update the status info to the 'st'.
144 /// return the difference (*st - *fStatus)
145 
146 TProofProgressStatus *TPacketizerUnit::TSlaveStat::AddProcessed(TProofProgressStatus *st)
147 {
148  if (st) {
149  // The entriesis not correct in 'st'
150  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
151  // The last proc time should not be added
152  fStatus->SetLastProcTime(0.);
153  // Get the diff
154  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
155  *fStatus += *diff;
156  // Set the correct value
157  fStatus->SetLastEntries(lastEntries);
158  return diff;
159  } else {
160  Error("AddProcessed", "status arg undefined");
161  return 0;
162  }
163 }
164 
165 //------------------------------------------------------------------------------
166 
167 ClassImp(TPacketizerUnit);
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Constructor
171 
172 TPacketizerUnit::TPacketizerUnit(TList *slaves, Long64_t num, TList *input,
173  TProofProgressStatus *st)
174  : TVirtualPacketizer(input, st)
175 {
176  PDB(kPacketizer,1) Info("TPacketizerUnit", "enter (num %lld)", num);
177 
178  // Init pointer members
179  fWrkStats = 0;
180  fPackets = 0;
181  fInput = input;
182 
183  fFixedNum = kFALSE;
184  Int_t fixednum = -1;
185  if (TProof::GetParameter(input, "PROOF_PacketizerFixedNum", fixednum) != 0 || fixednum <= 0) {
186  fFixedNum = kFALSE;
187  }
188  else {
189  Info("TPacketizerUnit", "forcing the same cycles on each worker");
190  fFixedNum = kTRUE;
191  }
192 
193  fCalibFrac = 0.01;
194  if (TProof::GetParameter(input, "PROOF_PacketizerCalibFrac", fCalibFrac) != 0 || fCalibFrac <= 0)
195  fCalibFrac = 0.01;
196  PDB(kPacketizer,1)
197  Info("TPacketizerUnit", "size of the calibration packets: %.2f %% of average number per worker", fCalibFrac);
198 
199  fMaxPacketTime = 3.;
200  Double_t timeLimit = -1;
201  if (TProof::GetParameter(input, "PROOF_PacketizerTimeLimit", timeLimit) == 0) {
202  fMaxPacketTime = timeLimit;
203  Warning("TPacketizerUnit", "PROOF_PacketizerTimeLimit is deprecated: use PROOF_MaxPacketTime instead");
204  }
205  PDB(kPacketizer,1)
206  Info("TPacketizerUnit", "time limit is %lf", fMaxPacketTime);
207 
208  // Different default for min packet time
209  fMinPacketTime = 1;
210  Double_t minPacketTime = 0;
211  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) fMinPacketTime = minPacketTime;
212  TParameter<Double_t> *mpt = (TParameter<Double_t> *) fConfigParams->FindObject("PROOF_MinPacketTime");
213  if (mpt) {
214  mpt->SetVal(fMinPacketTime);
215  } else {
216  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
217  }
218 
219  fProcessing = 0;
220  fAssigned = 0;
221  fPacketSeq = 0;
222 
223  fStopwatch = new TStopwatch();
224 
225  fPackets = new TList;
226  fPackets->SetOwner();
227 
228  fWrkStats = new TMap;
229  fWrkStats->SetOwner(kFALSE);
230  fWrkExcluded = 0;
231 
232  TSlave *slave;
233  TIter si(slaves);
234  while ((slave = (TSlave*) si.Next())) {
235  if (slave->GetParallel() > 0) {
236  fWrkStats->Add(slave, new TSlaveStat(slave, input));
237  } else {
238  if (!fWrkExcluded) {
239  fWrkExcluded = new TList;
240  fWrkExcluded->SetOwner(kFALSE);
241  }
242  PDB(kPacketizer,2)
243  Info("TPacketizerUnit", "node '%s' has NO active worker: excluded from work distribution", slave->GetOrdinal());
244  fWrkExcluded->Add(slave);
245  }
246  }
247 
248  fTotalEntries = 0;
249  fNumPerWorker = -1;
250  if (num > 0 && AssignWork(0,0,num) != 0)
251  Warning("TPacketizerUnit", "some problems assigning work");
252 
253  // Save the config parameters in the dedicated list so that they will be saved
254  // in the outputlist and therefore in the relevant TQueryResult
255  fConfigParams->Add(new TParameter<Float_t>("PROOF_PacketizerCalibFrac", fCalibFrac));
256 
257  fStopwatch->Start();
258  PDB(kPacketizer,1) Info("TPacketizerUnit", "return");
259 }
260 
261 ////////////////////////////////////////////////////////////////////////////////
262 /// Assign work to be done to this packetizer
263 
264 Int_t TPacketizerUnit::AssignWork(TDSet *, Long64_t, Long64_t num)
265 {
266  if (num < 0) {
267  Error("AssignWork", "assigned a negative number (%lld) of cycles - protocol error?", num);
268  return -1;
269  }
270 
271  fTotalEntries += num;
272  PDB(kPacketizer,1)
273  Info("AssignWork", "assigned %lld additional cycles (new total: %lld)", num, fTotalEntries);
274 
275  // Update fixed number counter
276  if (fFixedNum && fWrkStats->GetSize() > 0) {
277  // Approximate number: the exact number is determined in GetNextPacket
278  fNumPerWorker = fTotalEntries / fWrkStats->GetSize();
279  if (fNumPerWorker == 0) fNumPerWorker = 1;
280  }
281 
282  // Update/Save the config parameters in the dedicated list so that they will be saved
283  // in the outputlist and therefore in the relevant TQueryResult
284  TParameter<Long64_t> *fn =
285  (TParameter<Long64_t> *) fConfigParams->FindObject("PROOF_PacketizerFixedNum");
286  if (fn) {
287  fn->SetVal(fNumPerWorker);
288  } else {
289  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
290  }
291 
292  // Done
293  return 0;
294 }
295 
296 ////////////////////////////////////////////////////////////////////////////////
297 /// Destructor.
298 
299 TPacketizerUnit::~TPacketizerUnit()
300 {
301  if (fWrkStats)
302  fWrkStats->DeleteValues();
303  SafeDelete(fWrkStats);
304  SafeDelete(fWrkExcluded);
305  SafeDelete(fPackets);
306  SafeDelete(fStopwatch);
307 }
308 
309 ////////////////////////////////////////////////////////////////////////////////
310 /// Get current time
311 
312 Double_t TPacketizerUnit::GetCurrentTime()
313 {
314  Double_t retValue = fStopwatch->RealTime();
315  fStopwatch->Continue();
316  return retValue;
317 }
318 
319 ////////////////////////////////////////////////////////////////////////////////
320 /// Get Estimation of the current rate; just summing the current rates of
321 /// the active workers
322 
323 Float_t TPacketizerUnit::GetCurrentRate(Bool_t &all)
324 {
325  all = kTRUE;
326  // Loop over the workers
327  Float_t currate = 0.;
328  if (fWrkStats && fWrkStats->GetSize() > 0) {
329  TIter nxw(fWrkStats);
330  TObject *key;
331  while ((key = nxw()) != 0) {
332  TSlaveStat *slstat = (TSlaveStat *) fWrkStats->GetValue(key);
333  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
334  // Sum-up the current rates
335  currate += slstat->GetProgressStatus()->GetCurrentRate();
336  } else {
337  all = kFALSE;
338  }
339  }
340  }
341  // Done
342  return currate;
343 }
344 
345 ////////////////////////////////////////////////////////////////////////////////
346 /// Get next packet
347 
348 TDSetElement *TPacketizerUnit::GetNextPacket(TSlave *sl, TMessage *r)
349 {
350  if (!fValid)
351  return 0;
352 
353  // Find slave
354  TSlaveStat *slstat = (TSlaveStat*) fWrkStats->GetValue(sl);
355  if (!slstat) {
356  Warning("GetNextPacket", "Received a packet request from an unknown slave: %s:%s",
357  sl->GetName(), sl->GetOrdinal());
358  return 0;
359  }
360 
361  PDB(kPacketizer,2)
362  Info("GetNextPacket","worker-%s: fAssigned %lld\t", sl->GetOrdinal(), fAssigned);
363 
364  // Update stats & free old element
365  Double_t latency = 0., proctime = 0., proccpu = 0.;
366  Long64_t bytesRead = -1;
367  Long64_t totalEntries = -1; // used only to read an old message type
368  Long64_t totev = 0;
369  Long64_t numev = -1;
370 
371  TProofProgressStatus *status = 0;
372  if (sl->GetProtocol() > 18) {
373  (*r) >> latency;
374  (*r) >> status;
375 
376  // Calculate the progress made in the last packet
377  TProofProgressStatus *progress = 0;
378  if (status) {
379  // update the worker status
380  numev = status->GetEntries() - slstat->GetEntriesProcessed();
381  progress = slstat->AddProcessed(status);
382  if (progress) {
383  // (*fProgressStatus) += *progress;
384  proctime = progress->GetProcTime();
385  proccpu = progress->GetCPUTime();
386  totev = status->GetEntries(); // for backward compatibility
387  bytesRead = progress->GetBytesRead();
388  delete progress;
389  }
390  delete status;
391  } else
392  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
393  } else {
394 
395  (*r) >> latency >> proctime >> proccpu;
396 
397  // only read new info if available
398  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
399  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
400  if (r->BufferSize() > r->Length()) (*r) >> totev;
401 
402  numev = totev - slstat->GetEntriesProcessed();
403  slstat->GetProgressStatus()->IncEntries(numev);
404  slstat->GetProgressStatus()->SetLastUpdate();
405  }
406 
407  fProgressStatus->IncEntries(numev);
408  fProgressStatus->SetLastUpdate();
409 
410  fProcessing = 0;
411 
412  PDB(kPacketizer,2)
413  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
414  sl->GetOrdinal(), sl->GetName(),
415  numev, latency, proctime, proccpu, bytesRead);
416 
417  if (gPerfStats != 0) {
418  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), "", numev,
419  latency, proctime, proccpu, bytesRead);
420  }
421 
422  if (fNumPerWorker > 0 && slstat->GetEntriesProcessed() >= fNumPerWorker) {
423  PDB(kPacketizer,2)
424  Info("GetNextPacket","worker-%s (%s) is done (%lld cycles)",
425  sl->GetOrdinal(), sl->GetName(), slstat->GetEntriesProcessed());
426  return 0;
427  }
428 
429  if (fAssigned == fTotalEntries) {
430  Bool_t done = kTRUE;
431  // If we are on a submaster, check if there is something else to do
432  if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) {
433  TDSetElement *nxe = gProofServ->GetNextPacket();
434  if (nxe) {
435  if (AssignWork(0,0,nxe->GetNum()) == 0) {
436  if (fAssigned < fTotalEntries) done = kFALSE;
437  } else {
438  Error("GetNextPacket", "problems assigning additional work: stop");
439  }
440  SafeDelete(nxe);
441  }
442  }
443  if (done) {
444  // Send last timer message
445  HandleTimer(0);
446  return 0;
447  }
448  }
449 
450  if (fStop) {
451  // Send last timer message
452  HandleTimer(0);
453  return 0;
454  }
455 
456 
457  Long64_t num;
458 
459  // Get the current time
460  Double_t cTime = GetCurrentTime();
461 
462  if (slstat->fCircNtp->GetEntries() <= 0) {
463  // The calibration phase
464  Long64_t avg = fTotalEntries / fWrkStats->GetSize();
465  num = (Long64_t) (fCalibFrac * avg);
466  if (num < 1) num = (avg >= 1) ? avg : 1;
467  PDB(kPacketizer,2)
468  Info("GetNextPacket", "calibration: total entries %lld, workers %d, frac: %.1f %%, raw num: %lld",
469  fTotalEntries, fWrkStats->GetSize(), fCalibFrac * 100., num);
470 
471  // Create a reference entry
472  slstat->UpdatePerformance(0.);
473 
474  } else {
475 
476  if (fNumPerWorker < 0) {
477 
478  // Schedule tasks for workers based on the currently estimated processing speeds
479 
480  // Update performances
481  // slstat->fStatus was updated before;
482  slstat->UpdatePerformance(proctime);
483 
484  // We need to estimate the total instantaneous rate: for the workers not having yet
485  // one we assume the average of those having a measurement
486  // The optimal number for worker j is
487  //
488  // n_j = r_j / Sum r_i * N_left
489  //
490 
491  Int_t nrm = 0;
492  Double_t sumRate = 0.;
493  TIter nxwrk(fWrkStats);
494  TSlaveStat *wrkStat = 0;
495  TSlave *tmpWrk = 0;
496  while ((tmpWrk = (TSlave *)nxwrk())) {
497  if ((wrkStat = dynamic_cast<TSlaveStat *>(fWrkStats->GetValue(tmpWrk)))) {
498  if (wrkStat->fRate > 0) {
499  nrm++;
500  sumRate += wrkStat->fRate;
501  }
502  PDB(kPacketizer,3)
503  Info("GetNextPacket", "%d: worker-%s: rate %lf /s (sum: %lf /s)",
504  nrm, tmpWrk->GetOrdinal(), wrkStat->fRate, sumRate);
505  } else {
506  Warning("GetNextPacket", "dynamic_cast<TSlaveStat *> failing on value for '%s (%s)'! Skipping",
507  tmpWrk->GetName(), tmpWrk->GetOrdinal());
508  }
509  }
510 
511  // Check consistency
512  if (nrm <= 0) {
513  Error("GetNextPacket", "no worker has consistent information: stop processing!");
514  return (TDSetElement *)0;
515  }
516 
517  Double_t avgRate = sumRate / nrm;
518  // Check if all workers had meaningful rate information
519  if (nrm < fWrkStats->GetSize()) {
520  // For some workers the measurement is missing: use the average
521  sumRate += (fWrkStats->GetSize() - nrm) * avgRate;
522  }
523  PDB(kPacketizer,2)
524  Info("GetNextPacket", "rate: avg: %lf /s/wrk - sum: %lf /s (measurements %d out of %d)",
525  avgRate, sumRate, nrm, fWrkStats->GetSize());
526 
527  // Packet size for this worker
528  Double_t wrkRate = (slstat->fRate > 0.) ? slstat->fRate : avgRate ;
529  num = (Long64_t) ((fTotalEntries - fAssigned) * wrkRate / sumRate);
530  PDB(kPacketizer,2)
531  Info("GetNextPacket", "worker-%s (%s): raw packet size: %lld", sl->GetOrdinal(), sl->GetName(), num);
532 
533  // Apply time-per-packet limits
534  Double_t packTime = num / wrkRate;
535  if (fMaxPacketTime > 0. && packTime > fMaxPacketTime) {
536  num = (Long64_t) (fMaxPacketTime * wrkRate) ;
537  packTime = fMaxPacketTime;
538  PDB(kPacketizer,2)
539  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (upper limit: %.2f secs)",
540  sl->GetOrdinal(), sl->GetName(), num, fMaxPacketTime);
541  }
542  if (fMinPacketTime > 0. && packTime < fMinPacketTime) {
543  num = (Long64_t) (fMinPacketTime * wrkRate);
544  PDB(kPacketizer,2)
545  Info("GetNextPacket", "worker-%s (%s): time-limited packet size: %lld (lower limit: %.2f secs)",
546  sl->GetOrdinal(), sl->GetName(), num, fMinPacketTime);
547  }
548 
549  } else {
550  // Fixed number of cycles per worker
551  num = fNumPerWorker - slstat->fLastProcessed;
552  if (num > 1 && slstat->fRate > 0 && num / slstat->fRate > fMaxPacketTime) {
553  num = (Long64_t) (slstat->fRate * fMaxPacketTime);
554  }
555  }
556  }
557  // Minimum packet size
558  num = (num > 1) ? num : 1;
559  fProcessing = (num < (fTotalEntries - fAssigned)) ? num
560  : (fTotalEntries - fAssigned);
561 
562  // Set the information of the current slave
563  slstat->fLastProcessed = fProcessing;
564  // Set the start time of the current packet
565  slstat->fTimeInstant = cTime;
566 
567  // Update the sequential number
568  fPacketSeq++;
569  TString sseq = TString::Format("p%lld", fPacketSeq);
570 
571  PDB(kPacketizer,2)
572  Info("GetNextPacket", "worker-%s: num %lld, processing %lld, remaining %lld",sl->GetOrdinal(),
573  num, fProcessing, (fTotalEntries - fAssigned - fProcessing));
574  TDSetElement *elem = new TDSetElement(sseq, sseq, "", fAssigned, fProcessing);
575  elem->SetBit(TDSetElement::kEmpty);
576 
577  // Update the total counter
578  fAssigned += slstat->fLastProcessed;
579 
580  return elem;
581 }
582 
583 ////////////////////////////////////////////////////////////////////////////////
584 /// Adds new workers. Returns the number of workers added, or -1 on failure.
585 
586 Int_t TPacketizerUnit::AddWorkers(TList *workers)
587 {
588  if (!workers) {
589  Error("AddWorkers", "Null list of new workers!");
590  return -1;
591  }
592 
593  Int_t curNumOfWrks = fWrkStats->GetEntries();
594 
595  TSlave *sl;
596  TIter next(workers);
597  while (( sl = dynamic_cast<TSlave*>(next()) ))
598  fWrkStats->Add(sl, new TSlaveStat(sl, fInput));
599 
600  fNumPerWorker = -1;
601  if (fFixedNum && fWrkStats->GetSize() > 0) {
602  // Approximate number: the exact number is determined in GetNextPacket
603  fNumPerWorker = (fNumPerWorker * curNumOfWrks) / fWrkStats->GetSize();
604  if (fNumPerWorker == 0) fNumPerWorker = 1;
605  }
606 
607  fConfigParams->Add(new TParameter<Long64_t>("PROOF_PacketizerFixedNum", fNumPerWorker));
608 
609  return fWrkStats->GetEntries();
610 }