Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TPacketizerAdaptive.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Jan Iwaszkiewicz 11/12/06
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10 *************************************************************************/
11 
12 /** \class TPacketizerAdaptive
13 \ingroup proofkernel
14 
15 This packetizer is based on TPacketizer but uses different
16 load-balancing algorithms and data structures.
17 Two main improvements in the load-balancing strategy:
18  - First one was to change the order in which the files are assigned
19  to the computing nodes in such a way that network transfers are
20  evenly distributed in the query time. Transfer of the remote files
21  was often becoming a bottleneck at the end of a query.
22  - The other improvement is the use of time-based packet size. We
23  measure the processing rate of all the nodes and calculate the
24  packet size, so that it takes certain amount of time. In this way
25  packetizer prevents the situation where the query can't finish
26  because of one slow node.
27 
28 The data structures: TFileStat, TFileNode and TSlaveStat are
29 enriched + changed and TFileNode::Compare method is changed.
30 
31 */
32 
33 #include "TPacketizerAdaptive.h"
34 
35 #include "Riostream.h"
36 #include "TDSet.h"
37 #include "TError.h"
38 #include "TEnv.h"
39 #include "TEntryList.h"
40 #include "TEventList.h"
41 #include "TMap.h"
42 #include "TMessage.h"
43 #include "TMonitor.h"
44 #include "TNtupleD.h"
45 #include "TObject.h"
46 #include "TParameter.h"
47 #include "TPerfStats.h"
48 #include "TProofDebug.h"
49 #include "TProof.h"
50 #include "TProofServ.h"
51 #include "TSlave.h"
52 #include "TSocket.h"
53 #include "TSortedList.h"
54 #include "TUrl.h"
55 #include "TClass.h"
56 #include "TRandom.h"
57 #include "TMath.h"
58 #include "TObjString.h"
59 #include "TList.h"
60 
61 //
62 // The following three utility classes manage the state of the
63 // work to be performed and the slaves involved in the process.
64 // A list of TFileNode(s) describes the hosts with files, each
65 // has a list of TFileStat(s) keeping the state for each TDSet
66 // element (file).
67 //
68 // The list of TSlaveStat(s) keep track of the work (being) done
69 // by each slave
70 //
71 
72 
73 //------------------------------------------------------------------------------
74 
75 class TPacketizerAdaptive::TFileStat : public TObject {
76 
77 private:
78  Bool_t fIsDone; // is this element processed
79  TFileNode *fNode; // my FileNode
80  TDSetElement *fElement; // location of the file and its range
81  Long64_t fNextEntry; // cursor in the range, -1 when done // needs changing
82 
83 public:
84  TFileStat(TFileNode *node, TDSetElement *elem, TList *file);
85 
86  Bool_t IsDone() const {return fIsDone;}
87  Bool_t IsSortable() const { return kTRUE; }
88  void SetDone() {fIsDone = kTRUE;}
89  TFileNode *GetNode() const {return fNode;}
90  TDSetElement *GetElement() const {return fElement;}
91  Long64_t GetNextEntry() const {return fNextEntry;}
92  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
93 
94  // This method is used to keep a sorted list of remaining files to be processed
95  Int_t Compare(const TObject* obj) const
96  {
97  // Return -1 if elem.entries < obj.elem.entries, 0 if elem.entries equal
98  // and 1 if elem.entries < obj.elem.entries.
99  const TFileStat *fst = dynamic_cast<const TFileStat*>(obj);
100  if (fst && GetElement() && fst->GetElement()) {
101  Long64_t ent = GetElement()->GetNum();
102  Long64_t entfst = fst->GetElement()->GetNum();
103  if (ent > 0 && entfst > 0) {
104  if (ent > entfst) {
105  return 1;
106  } else if (ent < entfst) {
107  return -1;
108  } else {
109  return 0;
110  }
111  }
112  }
113  // No info: assume equal (no change in order)
114  return 0;
115  }
116  void Print(Option_t * = 0) const
117  { // Notify file name and entries
118  Printf("TFileStat: %s %lld", fElement ? fElement->GetName() : "---",
119  fElement ? fElement->GetNum() : -1);
120  }
121 };
122 
123 TPacketizerAdaptive::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem, TList *files)
124  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
125 {
126  // Constructor: add to the global list
127  if (files) files->Add(this);
128 }
129 
130 //------------------------------------------------------------------------------
131 
132 // a class describing a file node as a part of a session
133 class TPacketizerAdaptive::TFileNode : public TObject {
134 
135 private:
136  TString fNodeName; // FQDN of the node
137  TList *fFiles; // TDSetElements (files) stored on this node
138  TObject *fUnAllocFileNext; // cursor in fFiles
139  TList *fActFiles; // files with work remaining
140  TObject *fActFileNext; // cursor in fActFiles
141  Int_t fMySlaveCnt; // number of slaves running on this node
142  // (which can process remote files)
143  Int_t fExtSlaveCnt; // number of external slaves processing
144  // files on this node
145  Int_t fRunSlaveCnt; // total number of slaves processing files
146  // on this node
147  Long64_t fProcessed; // number of events processed on this node
148  Long64_t fEvents; // number of entries in files on this node
149 
150  Int_t fStrategy; // 0 means the classic and 1 (default) - the adaptive strategy
151 
152  TSortedList *fFilesToProcess; // Global list of files (TFileStat) to be processed (owned by TPacketizer)
153 
154 public:
155  TFileNode(const char *name, Int_t strategy, TSortedList *files);
156  ~TFileNode() { delete fFiles; delete fActFiles; }
157 
158  void IncMySlaveCnt() { fMySlaveCnt++; }
159  Int_t GetMySlaveCnt() const { return fMySlaveCnt; }
160  void IncExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt++; }
161  void DecExtSlaveCnt(const char *slave) { if (fNodeName != slave) fExtSlaveCnt--; R__ASSERT(fExtSlaveCnt >= 0); }
162  Int_t GetSlaveCnt() const { return fMySlaveCnt + fExtSlaveCnt; }
163  void IncRunSlaveCnt() { fRunSlaveCnt++; }
164  void DecRunSlaveCnt() { fRunSlaveCnt--; R__ASSERT(fRunSlaveCnt >= 0); }
165  Int_t GetRunSlaveCnt() const { return fRunSlaveCnt; }
166  Int_t GetExtSlaveCnt() const { return fExtSlaveCnt; }
167  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
168  Bool_t IsSortable() const { return kTRUE; }
169  Int_t GetNumberOfFiles() { return fFiles->GetSize(); }
170  void IncProcessed(Long64_t nEvents)
171  { fProcessed += nEvents; }
172  Long64_t GetProcessed() const { return fProcessed; }
173  void DecreaseProcessed(Long64_t nEvents) { fProcessed -= nEvents; }
174  // this method is used by Compare() it adds 1, so it returns a number that
175  // would be true if one more slave is added.
176  Long64_t GetEventsLeftPerSlave() const
177  { return ((fEvents - fProcessed)/(fRunSlaveCnt + 1)); }
178  void IncEvents(Long64_t nEvents) { fEvents += nEvents; }
179  const char *GetName() const { return fNodeName.Data(); }
180  Long64_t GetNEvents() const { return fEvents; }
181 
182  void Print(Option_t * = 0) const
183  {
184  TFileStat *fs = 0;
185  TDSetElement *e = 0;
186  Int_t nn = 0;
187  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
188  Printf("+++ TFileNode: %s +++", fNodeName.Data());
189  Printf("+++ Evts: %lld (total: %lld) ", fProcessed, fEvents);
190  Printf("+++ Worker count: int:%d, ext: %d, tot:%d ", fMySlaveCnt, fExtSlaveCnt, fRunSlaveCnt);
191  Printf("+++ Files: %d ", fFiles ? fFiles->GetSize() : 0);
192  if (fFiles && fFiles->GetSize() > 0) {
193  TIter nxf(fFiles);
194  while ((fs = (TFileStat *) nxf())) {
195  if ((e = fs->GetElement())) {
196  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld ", ++nn, e->GetName(),
197  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
198  } else {
199  Printf("+++ #%d: no element! ", ++nn);
200  }
201  }
202  }
203  Printf("+++ Active files: %d ", fActFiles ? fActFiles->GetSize() : 0);
204  if (fActFiles && fActFiles->GetSize() > 0) {
205  TIter nxaf(fActFiles);
206  while ((fs = (TFileStat *) nxaf())) {
207  if ((e = fs->GetElement())) {
208  Printf("+++ #%d: %s %lld - %lld (%lld) - next: %lld", ++nn, e->GetName(),
209  e->GetFirst(), e->GetFirst() + e->GetNum() - 1, e->GetNum(), fs->GetNextEntry());
210  } else {
211  Printf("+++ #%d: no element! ", ++nn);
212  }
213  }
214  }
215  Printf("++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
216  }
217 
218  void Add(TDSetElement *elem, Bool_t tolist)
219  {
220  TList *files = tolist ? (TList *)fFilesToProcess : (TList *)0;
221  TFileStat *f = new TFileStat(this, elem, files);
222  fFiles->Add(f);
223  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
224  }
225 
226  TFileStat *GetNextUnAlloc()
227  {
228  TObject *next = fUnAllocFileNext;
229 
230  if (next != 0) {
231  // make file active
232  fActFiles->Add(next);
233  if (fActFileNext == 0) fActFileNext = fActFiles->First();
234 
235  // move cursor
236  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
237  }
238  return (TFileStat *) next;
239  }
240 
241  TFileStat *GetNextActive()
242  {
243  TObject *next = fActFileNext;
244 
245  if (fActFileNext != 0) {
246  fActFileNext = fActFiles->After(fActFileNext);
247  if (fActFileNext == 0) fActFileNext = fActFiles->First();
248  }
249 
250  return (TFileStat *) next;
251  }
252 
253  void RemoveActive(TFileStat *file)
254  {
255  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
256  fActFiles->Remove(file);
257  if (fFilesToProcess) fFilesToProcess->Remove(file);
258  if (fActFileNext == 0) fActFileNext = fActFiles->First();
259  }
260 
261  Int_t Compare(const TObject *other) const
262  {
263  // Must return -1 if this is smaller than obj, 0 if objects are equal
264  // and 1 if this is larger than obj.
265  // smaller means more needing a new worker.
266  // Two cases are considered depending on
267  // relation between harddrive speed and network bandwidth.
268 
269  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
270  if (!obj) {
271  Error("Compare", "input is not a TPacketizer::TFileNode object");
272  return 0;
273  }
274 
275  // how many more events it has than obj
276 
277  if (fStrategy == 1) {
278  // The default adaptive strategy.
279  Int_t myVal = GetRunSlaveCnt();
280  Int_t otherVal = obj->GetRunSlaveCnt();
281  if (myVal < otherVal) {
282  return -1;
283  } else if (myVal > otherVal) {
284  return 1;
285  } else {
286  // if this has more events to process than obj
287  if ((fEvents - fProcessed) >
288  (obj->GetNEvents() - obj->GetProcessed())) {
289  return -1;
290  } else {
291  return 1;
292  }
293  }
294  } else {
295  Int_t myVal = GetSlaveCnt();
296  Int_t otherVal = obj->GetSlaveCnt();
297  if (myVal < otherVal) {
298  return -1;
299  } else if (myVal > otherVal) {
300  return 1;
301  } else {
302  return 0;
303  }
304  }
305  }
306 
307  void Reset()
308  {
309  fUnAllocFileNext = fFiles->First();
310  fActFiles->Clear();
311  fActFileNext = 0;
312  fExtSlaveCnt = 0;
313  fMySlaveCnt = 0;
314  fRunSlaveCnt = 0;
315  }
316 };
317 
318 
319 TPacketizerAdaptive::TFileNode::TFileNode(const char *name, Int_t strategy, TSortedList *files)
320  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),
321  fActFiles(new TList), fActFileNext(0), fMySlaveCnt(0),
322  fExtSlaveCnt(0), fRunSlaveCnt(0), fProcessed(0), fEvents(0),
323  fStrategy(strategy), fFilesToProcess(files)
324 {
325  // Constructor
326 
327  fFiles->SetOwner();
328  fActFiles->SetOwner(kFALSE);
329 }
330 
331 //------------------------------------------------------------------------------
332 
333 class TPacketizerAdaptive::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
334 
335 friend class TPacketizerAdaptive;
336 
337 private:
338  TFileNode *fFileNode; // corresponding node or 0
339  TFileStat *fCurFile; // file currently being processed
340  TDSetElement *fCurElem; // TDSetElement currently being processed
341  Long64_t fCurProcessed; // events processed in the current file
342  Float_t fCurProcTime; // proc time spent on the current file
343  TList *fDSubSet; // packets processed by this worker
344 
345 public:
346  TSlaveStat(TSlave *slave);
347  ~TSlaveStat();
348  TFileNode *GetFileNode() const { return fFileNode; }
349  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
350  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
351  TFileStat *GetCurFile() { return fCurFile; }
352  void SetFileNode(TFileNode *node) { fFileNode = node; }
353  void UpdateRates(TProofProgressStatus *st);
354  Float_t GetAvgRate() { return fStatus->GetRate(); }
355  Float_t GetCurRate() {
356  return (fCurProcTime?fCurProcessed/fCurProcTime:0); }
357  Int_t GetLocalEventsLeft() {
358  return fFileNode?(fFileNode->GetEventsLeftPerSlave()):0; }
359  TList *GetProcessedSubSet() { return fDSubSet; }
360  TProofProgressStatus *GetProgressStatus() { return fStatus; }
361  TProofProgressStatus *AddProcessed(TProofProgressStatus *st = 0);
362 };
363 
364 ////////////////////////////////////////////////////////////////////////////////
365 /// Constructor
366 
367 TPacketizerAdaptive::TSlaveStat::TSlaveStat(TSlave *slave)
368  : fFileNode(0), fCurFile(0), fCurElem(0),
369  fCurProcessed(0), fCurProcTime(0)
370 {
371  fDSubSet = new TList();
372  fDSubSet->SetOwner();
373  fSlave = slave;
374  fStatus = new TProofProgressStatus();
375  // The slave name is a special one in PROOF-Lite: avoid blocking on the DNS
376  // for non existing names
377  fWrkFQDN = slave->GetName();
378  if (strcmp(slave->ClassName(), "TSlaveLite")) {
379  fWrkFQDN = TUrl(fWrkFQDN).GetHostFQDN();
380  // Get full name for local hosts
381  if (fWrkFQDN.Contains("localhost") || fWrkFQDN == "127.0.0.1")
382  fWrkFQDN = TUrl(gSystem->HostName()).GetHostFQDN();
383  }
384  PDB(kPacketizer, 2)
385  Info("TSlaveStat", "wrk FQDN: %s", fWrkFQDN.Data());
386 }
387 
388 ////////////////////////////////////////////////////////////////////////////////
389 /// Cleanup
390 
391 TPacketizerAdaptive::TSlaveStat::~TSlaveStat()
392 {
393  SafeDelete(fDSubSet);
394  SafeDelete(fStatus);
395 }
396 
397 ////////////////////////////////////////////////////////////////////////////////
398 /// Update packetizer rates
399 
400 void TPacketizerAdaptive::TSlaveStat::UpdateRates(TProofProgressStatus *st)
401 {
402  if (!st) {
403  Error("UpdateRates", "no status object!");
404  return;
405  }
406  if (fCurFile->IsDone()) {
407  fCurProcTime = 0;
408  fCurProcessed = 0;
409  } else {
410  fCurProcTime += st->GetProcTime() - GetProcTime();
411  fCurProcessed += st->GetEntries() - GetEntriesProcessed();
412  }
413  fCurFile->GetNode()->IncProcessed(st->GetEntries() - GetEntriesProcessed());
414  st->SetLastEntries(st->GetEntries() - fStatus->GetEntries());
415  SafeDelete(fStatus);
416  fStatus = st;
417 }
418 
419 ////////////////////////////////////////////////////////////////////////////////
420 /// Add the current element to the fDSubSet (subset processed by this worker)
421 /// and if the status arg is given, then change the size of the packet.
422 /// return the difference (*st - *fStatus)
423 
424 TProofProgressStatus *TPacketizerAdaptive::TSlaveStat::AddProcessed(TProofProgressStatus *st)
425 {
426  if (st && fDSubSet && fCurElem) {
427  if (fCurElem->GetNum() != st->GetEntries() - GetEntriesProcessed())
428  fCurElem->SetNum(st->GetEntries() - GetEntriesProcessed());
429  fDSubSet->Add(fCurElem);
430  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
431  return diff;
432  } else {
433  Error("AddProcessed", "processed subset of current elem undefined");
434  return 0;
435  }
436 }
437 
438 //------------------------------------------------------------------------------
439 
440 ClassImp(TPacketizerAdaptive);
441 
442 ////////////////////////////////////////////////////////////////////////////////
443 /// Constructor
444 
445 TPacketizerAdaptive::TPacketizerAdaptive(TDSet *dset, TList *slaves,
446  Long64_t first, Long64_t num,
447  TList *input, TProofProgressStatus *st)
448  : TVirtualPacketizer(input, st)
449 {
450  PDB(kPacketizer,1) Info("TPacketizerAdaptive",
451  "enter (first %lld, num %lld)", first, num);
452 
453  // Init pointer members
454  fSlaveStats = 0;
455  fUnAllocated = 0;
456  fActive = 0;
457  fFileNodes = 0;
458  fMaxPerfIdx = 1;
459  fCachePacketSync = kTRUE;
460  fMaxEntriesRatio = 2.;
461 
462  fMaxSlaveCnt = -1;
463  fPacketAsAFraction = 4;
464  fStrategy = 1;
465  fFilesToProcess = new TSortedList;
466  fFilesToProcess->SetOwner(kFALSE);
467 
468  if (!fProgressStatus) {
469  Error("TPacketizerAdaptive", "No progress status");
470  return;
471  }
472 
473  // Attempt to synchronize the packet size with the tree cache size
474  Int_t cpsync = -1;
475  if (TProof::GetParameter(input, "PROOF_PacketizerCachePacketSync", cpsync) != 0) {
476  // Check if there is a global cache-packet sync setting
477  cpsync = gEnv->GetValue("Packetizer.CachePacketSync", 1);
478  }
479  if (cpsync >= 0) fCachePacketSync = (cpsync > 0) ? kTRUE : kFALSE;
480 
481  // Max file entries to avg allowed ratio for cache-to-packet synchronization
482  // (applies only if fCachePacketSync is true; -1. disables the bound)
483  if (TProof::GetParameter(input, "PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio) != 0) {
484  // Check if there is a global ratio setting
485  fMaxEntriesRatio = gEnv->GetValue("Packetizer.MaxEntriesRatio", 2.);
486  }
487 
488  // The possibility to change packetizer strategy to the basic TPacketizer's
489  // one (in which workers always process their local data first).
490  Int_t strategy = -1;
491  if (TProof::GetParameter(input, "PROOF_PacketizerStrategy", strategy) != 0) {
492  // Check if there is a global strategy setting
493  strategy = gEnv->GetValue("Packetizer.Strategy", 1);
494  }
495  if (strategy == 0) {
496  fStrategy = 0;
497  Info("TPacketizerAdaptive", "using the basic strategy of TPacketizer");
498  } else if (strategy != 1) {
499  Warning("TPacketizerAdaptive", "unsupported strategy index (%d): ignore", strategy);
500  }
501 
502  Long_t maxSlaveCnt = 0;
503  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
504  if (maxSlaveCnt < 0) {
505  Info("TPacketizerAdaptive",
506  "The value of PROOF_MaxSlavesPerNode must be positive");
507  maxSlaveCnt = 0;
508  }
509  } else {
510  // Try also with Int_t (recently supported in TProof::SetParameter)
511  Int_t mxslcnt = -1;
512  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
513  if (mxslcnt < 0) {
514  Info("TPacketizerAdaptive",
515  "The value of PROOF_MaxSlavesPerNode must be positive");
516  mxslcnt = 0;
517  }
518  maxSlaveCnt = (Long_t) mxslcnt;
519  }
520  }
521 
522  if (!maxSlaveCnt)
523  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", 0);
524  if (maxSlaveCnt > 0) {
525  fMaxSlaveCnt = maxSlaveCnt;
526  Info("TPacketizerAdaptive", "Setting max number of workers per node to %ld",
527  fMaxSlaveCnt);
528  }
529 
530  // if forceLocal parameter is set to 1 then eliminate the cross-worker
531  // processing;
532  // This minimizes the network usage on the PROOF cluser at the expense of
533  // longer jobs processing times.
534  // To process successfully the session must have workers with all the data!
535  fForceLocal = kFALSE;
536  Int_t forceLocal = 0;
537  if (TProof::GetParameter(input, "PROOF_ForceLocal", forceLocal) == 0) {
538  if (forceLocal == 1)
539  fForceLocal = kTRUE;
540  else
541  Info("TPacketizerAdaptive",
542  "The only accepted value of PROOF_ForceLocal parameter is 1 !");
543  }
544 
545  // Below we provide a possibility to change the way packet size is
546  // calculated or define the packet time directly.
547  // fPacketAsAFraction can be interpreted as follows:
548  // packet time is (expected job proc. time) / fPacketSizeAsAFraction.
549  // It substitutes 20 in the old formula to calculate the fPacketSize:
550  // fPacketSize = fTotalEntries / (20 * nslaves)
551  Int_t packetAsAFraction = 0;
552  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0) {
553  if (packetAsAFraction > 0) {
554  fPacketAsAFraction = packetAsAFraction;
555  Info("TPacketizerAdaptive",
556  "using alternate fraction of query time as a packet size: %d",
557  packetAsAFraction);
558  } else
559  Info("TPacketizerAdaptive", "packetAsAFraction parameter must be higher than 0");
560  }
561 
562  // Packet re-assignement
563  fTryReassign = 0;
564  Int_t tryReassign = 0;
565  if (TProof::GetParameter(input, "PROOF_TryReassign", tryReassign) != 0)
566  tryReassign = gEnv->GetValue("Packetizer.TryReassign", 0);
567  fTryReassign = tryReassign;
568  if (fTryReassign != 0)
569  Info("TPacketizerAdaptive", "failed packets will be re-assigned");
570 
571  // Save the config parameters in the dedicated list so that they will be saved
572  // in the outputlist and therefore in the relevant TQueryResult
573  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerCachePacketSync", (Int_t)fCachePacketSync));
574  fConfigParams->Add(new TParameter<Double_t>("PROOF_PacketizerMaxEntriesRatio", fMaxEntriesRatio));
575  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketizerStrategy", fStrategy));
576  fConfigParams->Add(new TParameter<Int_t>("PROOF_MaxWorkersPerNode", (Int_t)fMaxSlaveCnt));
577  fConfigParams->Add(new TParameter<Int_t>("PROOF_ForceLocal", (Int_t)fForceLocal));
578  fConfigParams->Add(new TParameter<Int_t>("PROOF_PacketAsAFraction", fPacketAsAFraction));
579 
580  Double_t baseLocalPreference = 1.2;
581  fBaseLocalPreference = (Float_t)baseLocalPreference;
582  if (TProof::GetParameter(input, "PROOF_BaseLocalPreference", baseLocalPreference) == 0)
583  fBaseLocalPreference = (Float_t)baseLocalPreference;
584 
585  fFileNodes = new TList;
586  fFileNodes->SetOwner();
587  fUnAllocated = new TList;
588  fUnAllocated->SetOwner(kFALSE);
589  fActive = new TList;
590  fActive->SetOwner(kFALSE);
591 
592  fValid = kTRUE;
593 
594  // Resolve end-point urls to optmize distribution
595  // dset->Lookup(); // moved to TProofPlayerRemote::Process
596 
597  // Read list of mounted disks
598  TObjArray *partitions = 0;
599  TString partitionsStr;
600  if (TProof::GetParameter(input, "PROOF_PacketizerPartitions", partitionsStr) != 0)
601  partitionsStr = gEnv->GetValue("Packetizer.Partitions", "");
602  if (!partitionsStr.IsNull()) {
603  Info("TPacketizerAdaptive", "Partitions: %s", partitionsStr.Data());
604  partitions = partitionsStr.Tokenize(",");
605  }
606 
607  // Split into per host and disk entries
608  dset->Reset();
609  TDSetElement *e;
610  while ((e = (TDSetElement*)dset->Next())) {
611 
612  if (e->GetValid()) continue;
613 
614  // The dataset name, if any
615  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
616  fDataSet = e->GetDataSet();
617 
618  TUrl url = e->GetFileName();
619  PDB(kPacketizer,2)
620  Info("TPacketizerAdaptive", "element name: %s (url: %s)", e->GetFileName(), url.GetUrl());
621 
622  // Map non URL filenames to dummy host
623  TString host;
624  if ( !url.IsValid() ||
625  (strncmp(url.GetProtocol(),"root", 4) &&
626  strncmp(url.GetProtocol(),"file", 4)) ) {
627  host = "no-host";
628  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
629  host = "localhost";
630  url.SetProtocol("root");
631  } else {
632  host = url.GetHostFQDN();
633  }
634  // Get full name for local hosts
635  if (host.Contains("localhost") || host == "127.0.0.1") {
636  url.SetHost(gSystem->HostName());
637  host = url.GetHostFQDN();
638  }
639 
640  // Find on which disk is the file, if any
641  TString disk;
642  if (partitions) {
643  TIter iString(partitions);
644  TObjString* os = 0;
645  while ((os = (TObjString *)iString())) {
646  // Compare begining of the url with disk mountpoint
647  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
648  disk = os->GetName();
649  break;
650  }
651  }
652  }
653  // Node's url
654  TString nodeStr;
655  if (disk.IsNull())
656  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
657  else
658  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
659  TFileNode *node = (TFileNode *) fFileNodes->FindObject(nodeStr);
660 
661  if (node == 0) {
662  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
663  fFileNodes->Add(node);
664  PDB(kPacketizer,2)
665  Info("TPacketizerAdaptive", "creating new node '%s' or the element", nodeStr.Data());
666  } else {
667  PDB(kPacketizer,2)
668  Info("TPacketizerAdaptive", "adding element to existing node '%s'", nodeStr.Data());
669  }
670 
671  node->Add(e, kFALSE);
672  }
673 
674  fSlaveStats = new TMap;
675  fSlaveStats->SetOwner(kFALSE);
676 
677  TSlave *slave;
678  TIter si(slaves);
679  while ((slave = (TSlave*) si.Next())) {
680  fSlaveStats->Add( slave, new TSlaveStat(slave) );
681  fMaxPerfIdx = slave->GetPerfIdx() > fMaxPerfIdx ?
682  slave->GetPerfIdx() : fMaxPerfIdx;
683  }
684 
685  // Setup file & filenode structure
686  Reset();
687  // Optimize the number of files to be open when running on subsample
688  Int_t validateMode = 0;
689  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
690  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
691  if (num > -1)
692  PDB(kPacketizer,2)
693  Info("TPacketizerAdaptive",
694  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
695  ValidateFiles(dset, slaves, num, byfile);
696 
697 
698  if (!fValid) return;
699 
700  // apply global range (first,num) to dset and rebuild structure
701  // ommitting TDSet elements that are not needed
702 
703  Int_t files = 0;
704  fTotalEntries = 0;
705  fUnAllocated->Clear(); // avoid dangling pointers
706  fActive->Clear();
707  fFileNodes->Clear(); // then delete all objects
708  PDB(kPacketizer,2)
709  Info("TPacketizerAdaptive",
710  "processing range: first %lld, num %lld", first, num);
711 
712  dset->Reset();
713  Long64_t cur = 0;
714  while (( e = (TDSetElement*)dset->Next())) {
715 
716  // Skip invalid or missing file; It will be moved
717  // from the dset to the 'MissingFiles' list in the player.
718  if (!e->GetValid()) continue;
719 
720  TUrl url = e->GetFileName();
721  Long64_t eFirst = e->GetFirst();
722  Long64_t eNum = e->GetNum();
723  PDB(kPacketizer,2)
724  Info("TPacketizerAdaptive", "processing element '%s'", e->GetFileName());
725  PDB(kPacketizer,2)
726  Info("TPacketizerAdaptive",
727  " --> first %lld, elenum %lld (cur %lld) (entrylist: %p)", eFirst, eNum, cur, e->GetEntryList());
728 
729  if (!e->GetEntryList()) {
730  // This element is before the start of the global range, skip it
731  if (cur + eNum < first) {
732  cur += eNum;
733  PDB(kPacketizer,2)
734  Info("TPacketizerAdaptive", " --> skip element cur %lld", cur);
735  continue;
736  }
737 
738  // This element is after the end of the global range, skip it
739  if (num != -1 && (first+num <= cur)) {
740  cur += eNum;
741  PDB(kPacketizer,2)
742  Info("TPacketizerAdaptive", " --> drop element cur %lld", cur);
743  continue; // break ??
744  }
745 
746  Bool_t inRange = kFALSE;
747  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
748 
749  if (cur <= first) {
750  // If this element contains the start of the global range
751  // adjust its start and number of entries
752  e->SetFirst( eFirst + (first - cur) );
753  e->SetNum( e->GetNum() - (first - cur) );
754  PDB(kPacketizer,2)
755  Info("TPacketizerAdaptive", " --> adjust start %lld and end %lld",
756  eFirst + (first - cur), first + num - cur);
757  inRange = kTRUE;
758  }
759  if (num != -1 && (first+num <= cur+eNum)) {
760  // If this element contains the end of the global range
761  // adjust its number of entries
762  e->SetNum( first + num - e->GetFirst() - cur );
763  PDB(kPacketizer,2)
764  Info("TPacketizerAdaptive", " --> adjust end %lld", first + num - cur);
765  inRange = kTRUE;
766  }
767 
768  } else {
769  // Increment the counter ...
770  PDB(kPacketizer,2)
771  Info("TPacketizerAdaptive", " --> increment 'cur' by %lld", eNum);
772  cur += eNum;
773  }
774  // Re-adjust eNum and cur, if needed
775  if (inRange) {
776  cur += eNum;
777  eNum = e->GetNum();
778  }
779 
780  } else {
781  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
782  if (enl) {
783  eNum = enl->GetN();
784  PDB(kPacketizer,2)
785  Info("TPacketizerAdaptive", " --> entry-list element: %lld entries", eNum);
786  } else {
787  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
788  eNum = evl ? evl->GetN() : eNum;
789  PDB(kPacketizer,2)
790  Info("TPacketizerAdaptive", " --> event-list element: %lld entries (evl:%p)", eNum, evl);
791  }
792  if (!eNum) {
793  PDB(kPacketizer,2)
794  Info("TPacketizerAdaptive", " --> empty entry- or event-list element!");
795  continue;
796  }
797  }
798  PDB(kPacketizer,2)
799  Info("TPacketizerAdaptive", " --> next cur %lld", cur);
800 
801  // Map non URL filenames to dummy host
802  TString host;
803  if ( !url.IsValid() ||
804  (strncmp(url.GetProtocol(),"root", 4) &&
805  strncmp(url.GetProtocol(),"file", 4)) ) {
806  host = "no-host";
807  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
808  host = "localhost";
809  url.SetProtocol("root");
810  } else {
811  host = url.GetHostFQDN();
812  }
813  // Get full name for local hosts
814  if (host.Contains("localhost") || host == "127.0.0.1") {
815  url.SetHost(gSystem->HostName());
816  host = url.GetHostFQDN();
817  }
818 
819  // Find, on which disk is the file
820  TString disk;
821  if (partitions) {
822  TIter iString(partitions);
823  TObjString* os = 0;
824  while ((os = (TObjString *)iString())) {
825  // Compare begining of the url with disk mountpoint
826  if (strncmp(url.GetFile(), os->GetName(), os->GetString().Length()) == 0) {
827  disk = os->GetName();
828  break;
829  }
830  }
831  }
832  // Node's url
833  TString nodeStr;
834  if (disk.IsNull())
835  nodeStr.Form("%s://%s", url.GetProtocol(), host.Data());
836  else
837  nodeStr.Form("%s://%s/%s", url.GetProtocol(), host.Data(), disk.Data());
838  TFileNode *node = (TFileNode*) fFileNodes->FindObject(nodeStr);
839 
840 
841  if (node == 0) {
842  node = new TFileNode(nodeStr, fStrategy, fFilesToProcess);
843  fFileNodes->Add( node );
844  PDB(kPacketizer, 2)
845  Info("TPacketizerAdaptive", " --> creating new node '%s' for element", nodeStr.Data());
846  } else {
847  PDB(kPacketizer, 2)
848  Info("TPacketizerAdaptive", " --> adding element to exiting node '%s'", nodeStr.Data());
849  }
850 
851  ++files;
852  fTotalEntries += eNum;
853  node->Add(e, kTRUE);
854  node->IncEvents(eNum);
855  PDB(kPacketizer,2) e->Print("a");
856  }
857  PDB(kPacketizer,1)
858  Info("TPacketizerAdaptive", "processing %lld entries in %d files on %d hosts",
859  fTotalEntries, files, fFileNodes->GetSize());
860 
861  // Set the total number for monitoring
862  if (gPerfStats)
863  gPerfStats->SetNumEvents(fTotalEntries);
864 
865  Reset();
866 
867  InitStats();
868 
869  if (!fValid)
870  SafeDelete(fProgress);
871 
872  PDB(kPacketizer,1) Info("TPacketizerAdaptive", "return");
873 }
874 
875 ////////////////////////////////////////////////////////////////////////////////
876 /// Destructor.
877 
878 TPacketizerAdaptive::~TPacketizerAdaptive()
879 {
880  if (fSlaveStats) {
881  fSlaveStats->DeleteValues();
882  }
883 
884  SafeDelete(fSlaveStats);
885  SafeDelete(fUnAllocated);
886  SafeDelete(fActive);
887  SafeDelete(fFileNodes);
888  SafeDelete(fFilesToProcess);
889 }
890 
891 ////////////////////////////////////////////////////////////////////////////////
892 /// (re)initialise the statistics
893 /// called at the begining or after a worker dies.
894 
895 void TPacketizerAdaptive::InitStats()
896 {
897  // calculating how many files from TDSet are not cached on
898  // any slave
899  Int_t noRemoteFiles = 0;
900  fNEventsOnRemLoc = 0;
901  Int_t totalNumberOfFiles = 0;
902  TIter next(fFileNodes);
903  while (TFileNode *fn = (TFileNode*)next()) {
904  totalNumberOfFiles += fn->GetNumberOfFiles();
905  if (fn->GetMySlaveCnt() == 0) {
906  noRemoteFiles += fn->GetNumberOfFiles();
907  fNEventsOnRemLoc += (fn->GetNEvents() - fn->GetProcessed());
908  }
909  }
910 
911  if (totalNumberOfFiles == 0) {
912  Info("InitStats", "no valid or non-empty file found: setting invalid");
913  // No valid files: set invalid and return
914  fValid = kFALSE;
915  return;
916  }
917 
918  fFractionOfRemoteFiles = (1.0 * noRemoteFiles) / totalNumberOfFiles;
919  Info("InitStats",
920  "fraction of remote files %f", fFractionOfRemoteFiles);
921 
922  if (!fValid)
923  SafeDelete(fProgress);
924 
925  PDB(kPacketizer,1) Info("InitStats", "return");
926 }
927 
928 ////////////////////////////////////////////////////////////////////////////////
929 /// Get next unallocated file from 'node' or other nodes:
930 /// First try 'node'. If there is no more files, keep trying to
931 /// find an unallocated file on other nodes.
932 
933 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextUnAlloc(TFileNode *node, const char *nodeHostName)
934 {
935  TFileStat *file = 0;
936 
937  if (node != 0) {
938  PDB(kPacketizer, 2)
939  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
940  file = node->GetNextUnAlloc();
941  if (file == 0) RemoveUnAllocNode(node);
942  } else {
943  if (nodeHostName && strlen(nodeHostName) > 0) {
944 
945  TFileNode *fn;
946  // Make sure that they are in the corrected order
947  fUnAllocated->Sort();
948  PDB(kPacketizer,2) fUnAllocated->Print();
949 
950  // Loop over unallocated fileNode list
951  for (int i = 0; i < fUnAllocated->GetSize(); i++) {
952 
953  if ((fn = (TFileNode *) fUnAllocated->At(i))) {
954  TUrl uu(fn->GetName());
955  PDB(kPacketizer, 2)
956  Info("GetNextUnAlloc", "comparing %s with %s...", nodeHostName, uu.GetHost());
957 
958  // Check, whether node's hostname is matching with current fileNode (fn)
959  if (!strcmp(nodeHostName, uu.GetHost())) {
960  node = fn;
961 
962  // Fetch next unallocated file from this node
963  if ((file = node->GetNextUnAlloc()) == 0) {
964  RemoveUnAllocNode(node);
965  node = 0;
966  } else {
967  PDB(kPacketizer, 2)
968  Info("GetNextUnAlloc", "found! (host: %s)", uu.GetHost());
969  break;
970  }
971  }
972  } else {
973  Warning("GetNextUnAlloc", "unallocate entry %d is empty!", i);
974  }
975  }
976 
977  if (node != 0 && fMaxSlaveCnt > 0 && node->GetExtSlaveCnt() >= fMaxSlaveCnt) {
978  // Unlike in TPacketizer we look at the number of ext slaves only.
979  PDB(kPacketizer,1)
980  Info("GetNextUnAlloc", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
981  node = 0;
982  }
983  }
984 
985  if (node == 0) {
986  while (file == 0 && ((node = NextNode()) != 0)) {
987  PDB(kPacketizer, 2)
988  Info("GetNextUnAlloc", "looking for file on node %s", node->GetName());
989  if ((file = node->GetNextUnAlloc()) == 0) RemoveUnAllocNode(node);
990  }
991  }
992  }
993 
994  if (file != 0) {
995  // if needed make node active
996  if (fActive->FindObject(node) == 0) {
997  fActive->Add(node);
998  }
999  }
1000 
1001  PDB(kPacketizer, 2) {
1002  if (!file) {
1003  Info("GetNextUnAlloc", "no file found!");
1004  } else {
1005  file->Print();
1006  }
1007  }
1008 
1009  return file;
1010 }
1011 
1012 ////////////////////////////////////////////////////////////////////////////////
1013 /// Get next node which has unallocated files.
1014 /// the order is determined by TFileNode::Compare
1015 
1016 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextNode()
1017 {
1018  fUnAllocated->Sort();
1019  PDB(kPacketizer,2) {
1020  fUnAllocated->Print();
1021  }
1022 
1023  TFileNode *fn = (TFileNode*) fUnAllocated->First();
1024  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1025  // unlike in TPacketizer we look at the number of ext slaves only.
1026  PDB(kPacketizer,1)
1027  Info("NextNode", "reached Workers-per-Node Limit (%ld)", fMaxSlaveCnt);
1028  fn = 0;
1029  }
1030 
1031  return fn;
1032 }
1033 
1034 ////////////////////////////////////////////////////////////////////////////////
1035 /// Remove unallocated node.
1036 
1037 void TPacketizerAdaptive::RemoveUnAllocNode(TFileNode * node)
1038 {
1039  fUnAllocated->Remove(node);
1040 }
1041 
1042 ////////////////////////////////////////////////////////////////////////////////
1043 /// Get next active file.
1044 
1045 TPacketizerAdaptive::TFileStat *TPacketizerAdaptive::GetNextActive()
1046 {
1047  TFileNode *node;
1048  TFileStat *file = 0;
1049 
1050  while (file == 0 && ((node = NextActiveNode()) != 0)) {
1051  file = node->GetNextActive();
1052  if (file == 0) RemoveActiveNode(node);
1053  }
1054 
1055  return file;
1056 }
1057 
1058 
1059 ////////////////////////////////////////////////////////////////////////////////
1060 /// Get next active node.
1061 
1062 TPacketizerAdaptive::TFileNode *TPacketizerAdaptive::NextActiveNode()
1063 {
1064  fActive->Sort();
1065  PDB(kPacketizer,2) {
1066  Info("NextActiveNode", "enter");
1067  fActive->Print();
1068  }
1069 
1070  TFileNode *fn = (TFileNode*) fActive->First();
1071  // look at only ext slaves
1072  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetExtSlaveCnt() >= fMaxSlaveCnt) {
1073  PDB(kPacketizer,1)
1074  Info("NextActiveNode","reached Workers-per-Node limit (%ld)", fMaxSlaveCnt);
1075  fn = 0;
1076  }
1077 
1078  return fn;
1079 }
1080 
1081 ////////////////////////////////////////////////////////////////////////////////
1082 /// Remove file from the list of actives.
1083 
1084 void TPacketizerAdaptive::RemoveActive(TFileStat *file)
1085 {
1086  TFileNode *node = file->GetNode();
1087 
1088  node->RemoveActive(file);
1089  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
1090 }
1091 
1092 ////////////////////////////////////////////////////////////////////////////////
1093 /// Remove node from the list of actives.
1094 
1095 void TPacketizerAdaptive::RemoveActiveNode(TFileNode *node)
1096 {
1097  fActive->Remove(node);
1098 }
1099 
1100 ////////////////////////////////////////////////////////////////////////////////
1101 /// Reset the internal data structure for packet distribution.
1102 
1103 void TPacketizerAdaptive::Reset()
1104 {
1105  fUnAllocated->Clear();
1106  fUnAllocated->AddAll(fFileNodes);
1107 
1108  fActive->Clear();
1109 
1110  TIter files(fFileNodes);
1111  TFileNode *fn;
1112  while ((fn = (TFileNode*) files.Next()) != 0) {
1113  fn->Reset();
1114  }
1115 
1116  TIter slaves(fSlaveStats);
1117  TObject *key;
1118  while ((key = slaves.Next()) != 0) {
1119  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
1120  if (!slstat) {
1121  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
1122  continue;
1123  }
1124  // Find out which file nodes are on the worker machine and assign the
1125  // one with less workers assigned
1126  TFileNode *fnmin = 0;
1127  Int_t fncnt = fSlaveStats->GetSize();
1128  files.Reset();
1129  while ((fn = (TFileNode*) files.Next()) != 0) {
1130  if (!strcmp(slstat->GetName(), TUrl(fn->GetName()).GetHost())) {
1131  if (fn->GetMySlaveCnt() < fncnt) {
1132  fnmin = fn;
1133  fncnt = fn->GetMySlaveCnt();
1134  }
1135  }
1136  }
1137  if (fnmin != 0 ) {
1138  slstat->SetFileNode(fnmin);
1139  fnmin->IncMySlaveCnt();
1140  PDB(kPacketizer, 2)
1141  Info("Reset","assigning node '%s' to '%s' (cnt: %d)",
1142  fnmin->GetName(), slstat->GetName(), fnmin->GetMySlaveCnt());
1143  }
1144  slstat->fCurFile = 0;
1145  }
1146 }
1147 
1148 ////////////////////////////////////////////////////////////////////////////////
1149 /// Check existence of file/dir/tree an get number of entries.
1150 /// Assumes the files have been setup.
1151 
1152 void TPacketizerAdaptive::ValidateFiles(TDSet *dset, TList *slaves,
1153  Long64_t maxent, Bool_t byfile)
1154 {
1155  TMap slaves_by_sock;
1156  TMonitor mon;
1157  TList workers;
1158 
1159 
1160  // Setup the communication infrastructure
1161 
1162  workers.AddAll(slaves);
1163  TIter si(slaves);
1164  TSlave *slm;
1165  while ((slm = (TSlave*)si.Next()) != 0) {
1166  PDB(kPacketizer,3)
1167  Info("ValidateFiles","socket added to monitor: %p (%s)",
1168  slm->GetSocket(), slm->GetName());
1169  mon.Add(slm->GetSocket());
1170  slaves_by_sock.Add(slm->GetSocket(), slm);
1171  }
1172 
1173  mon.DeActivateAll();
1174 
1175  ((TProof*)gProof)->DeActivateAsyncInput();
1176 
1177  // Some monitoring systems (TXSocketHandler) need to know this
1178  ((TProof*)gProof)->fCurrentMonitor = &mon;
1179 
1180  // Identify the type
1181  if (!strcmp(dset->GetType(), "TTree")) SetBit(TVirtualPacketizer::kIsTree);
1182 
1183  // Preparing for client notification
1184  TString msg("Validating files");
1185  UInt_t n = 0;
1186  UInt_t tot = dset->GetListOfElements()->GetSize();
1187  Bool_t st = kTRUE;
1188 
1189  Long64_t totent = 0, nopenf = 0;
1190  while (kTRUE) {
1191 
1192  // send work
1193  while (TSlave *s = (TSlave *)workers.First()) {
1194 
1195  workers.Remove(s);
1196 
1197  // find a file
1198 
1199  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
1200  if (!slstat) {
1201  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
1202  continue;
1203  }
1204 
1205  TFileNode *node = 0;
1206  TFileStat *file = 0;
1207 
1208  // try its own node first
1209  if ((node = slstat->GetFileNode()) != 0) {
1210  PDB(kPacketizer,3) node->Print();
1211  file = GetNextUnAlloc(node);
1212  if (file == 0)
1213  slstat->SetFileNode(0);
1214  }
1215 
1216  // look for a file on any other node if necessary
1217  if (file == 0)
1218  file = GetNextUnAlloc();
1219 
1220  if (file != 0) {
1221  // files are done right away
1222  RemoveActive(file);
1223 
1224  slstat->fCurFile = file;
1225  TDSetElement *elem = file->GetElement();
1226  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
1227  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
1228  // This is decremented when we get the reply
1229  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1230  TMessage m(kPROOF_GETENTRIES);
1231  m << dset->IsTree()
1232  << TString(elem->GetFileName())
1233  << TString(elem->GetDirectory())
1234  << TString(elem->GetObjName());
1235 
1236  s->GetSocket()->Send( m );
1237  mon.Activate(s->GetSocket());
1238  PDB(kPacketizer,2)
1239  Info("ValidateFiles",
1240  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
1241  s->GetOrdinal(), s->GetName(), s->GetSocket(),
1242  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
1243  elem->GetDirectory(), elem->GetObjName());
1244  } else {
1245  // Fill the info
1246  elem->SetTDSetOffset(entries);
1247  if (entries > 0) {
1248  // Most likely valid
1249  elem->SetValid();
1250  if (!elem->GetEntryList()) {
1251  if (elem->GetFirst() > entries) {
1252  Error("ValidateFiles",
1253  "first (%lld) higher then number of entries (%lld) in %s",
1254  elem->GetFirst(), entries, elem->GetFileName());
1255  // disable element
1256  slstat->fCurFile->SetDone();
1257  elem->Invalidate();
1258  dset->SetBit(TDSet::kSomeInvalid);
1259  }
1260  if (elem->GetNum() == -1) {
1261  elem->SetNum(entries - elem->GetFirst());
1262  } else if (elem->GetFirst() + elem->GetNum() > entries) {
1263  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
1264  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
1265  entries, elem->GetFileName());
1266  elem->SetNum(entries - elem->GetFirst());
1267  }
1268  PDB(kPacketizer,2)
1269  Info("ValidateFiles",
1270  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
1271  }
1272  }
1273  // Count
1274  totent += entries;
1275  nopenf++;
1276  // Notify the client
1277  n++;
1278  gProof->SendDataSetStatus(msg, n, tot, st);
1279 
1280  // This worker is ready for the next validation
1281  workers.Add(s);
1282  }
1283  }
1284  }
1285 
1286  // Check if there is anything to wait for
1287  if (mon.GetActive() == 0) {
1288  if (byfile && maxent > 0) {
1289  // How many files do we still need ?
1290  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
1291  if (nrestf <= 0 && maxent > totent) nrestf = 1;
1292  if (nrestf > 0) {
1293  PDB(kPacketizer,3)
1294  Info("ValidateFiles", "{%lld, %lld, %lld}: needs to validate %lld more files",
1295  maxent, totent, nopenf, nrestf);
1296  si.Reset();
1297  while ((slm = (TSlave *) si.Next()) && nrestf--) {
1298  workers.Add(slm);
1299  }
1300  continue;
1301  } else {
1302  PDB(kPacketizer,3)
1303  Info("ValidateFiles", "no need to validate more files");
1304  break;
1305  }
1306  } else {
1307  break;
1308  }
1309  }
1310 
1311  PDB(kPacketizer,3) {
1312  Info("ValidateFiles", "waiting for %d slaves:", mon.GetActive());
1313  TList *act = mon.GetListOfActives();
1314  TIter next(act);
1315  while (TSocket *s = (TSocket*) next()) {
1316  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
1317  if (sl)
1318  Info("ValidateFiles", " worker-%s (%s)",
1319  sl->GetOrdinal(), sl->GetName());
1320  }
1321  delete act;
1322  }
1323 
1324  TSocket *sock = mon.Select();
1325  // If we have been interrupted break
1326  if (!sock) {
1327  Error("ValidateFiles", "selection has been interrupted - STOP");
1328  mon.DeActivateAll();
1329  fValid = kFALSE;
1330  break;
1331  }
1332  mon.DeActivate(sock);
1333 
1334  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
1335 
1336  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
1337  if (!sock->IsValid()) {
1338  // A socket got invalid during validation
1339  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
1340  slave->GetOrdinal(), slave->GetName());
1341  ((TProof*)gProof)->MarkBad(slave, "socket got invalid during validation");
1342  fValid = kFALSE;
1343  break;
1344  }
1345 
1346  TMessage *reply;
1347 
1348  if (sock->Recv(reply) <= 0) {
1349  // Notify
1350  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
1351  slave->GetOrdinal(), slave->GetName());
1352  // Help! lost a slave? ('slave' is deleted inside here ...)
1353  ((TProof*)gProof)->MarkBad(slave, "receive failed during validation");
1354  fValid = kFALSE;
1355  continue;
1356  }
1357 
1358  if (reply->What() != kPROOF_GETENTRIES) {
1359  // Not what we want: handover processing to the central machinery
1360  Int_t what = reply->What();
1361  ((TProof*)gProof)->HandleInputMessage(slave, reply);
1362  if (what == kPROOF_FATAL) {
1363  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
1364  slave->GetOrdinal(), slave->GetName());
1365  fValid = kFALSE;
1366  } else {
1367  // Reactivate the socket
1368  mon.Activate(sock);
1369  }
1370  // Get next message
1371  continue;
1372  }
1373 
1374  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1375  TDSetElement *e = slavestat->fCurFile->GetElement();
1376  slavestat->fCurFile->GetNode()->DecExtSlaveCnt(slavestat->GetName());
1377  Long64_t entries;
1378 
1379  (*reply) >> entries;
1380 
1381  // Extract object name, if there
1382  if ((reply->BufferSize() > reply->Length())) {
1383  TString objname;
1384  (*reply) >> objname;
1385  e->SetTitle(objname);
1386  }
1387 
1388  e->SetTDSetOffset(entries);
1389  if (entries > 0) {
1390 
1391  // This dataset element is most likely valid
1392  e->SetValid();
1393 
1394  if (!e->GetEntryList()) {
1395  if (e->GetFirst() > entries) {
1396  Error("ValidateFiles",
1397  "first (%lld) higher then number of entries (%lld) in %s",
1398  e->GetFirst(), entries, e->GetFileName());
1399 
1400  // Invalidate the element
1401  slavestat->fCurFile->SetDone();
1402  e->Invalidate();
1403  dset->SetBit(TDSet::kSomeInvalid);
1404  }
1405 
1406  if (e->GetNum() == -1) {
1407  e->SetNum(entries - e->GetFirst());
1408  } else if (e->GetFirst() + e->GetNum() > entries) {
1409  Error("ValidateFiles",
1410  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1411  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1412  e->SetNum(entries - e->GetFirst());
1413  }
1414  }
1415 
1416  // Count
1417  totent += entries;
1418  nopenf++;
1419 
1420  // Notify the client
1421  n++;
1422  gProof->SendDataSetStatus(msg, n, tot, st);
1423 
1424  } else {
1425 
1426  Error("ValidateFiles", "cannot get entries for file: %s - skipping", e->GetFileName() );
1427  //
1428  // Need to fix this with a user option to allow incomplete file sets (rdm)
1429  //
1430  //fValid = kFALSE; // all element must be readable!
1431  if (gProofServ) {
1432  TMessage m(kPROOF_MESSAGE);
1433  m << TString(Form("Cannot get entries for file: %s - skipping",
1434  e->GetFileName()));
1435  gProofServ->GetSocket()->Send(m);
1436  }
1437 
1438  // invalidate element
1439  e->Invalidate();
1440  dset->SetBit(TDSet::kSomeInvalid);
1441  }
1442  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1443 
1444  // Ready for the next job, unless we have enough files
1445  if (maxent < 0 || ((totent < maxent) && !byfile))
1446  workers.Add(slave);
1447  }
1448 
1449  // report std. output from slaves??
1450 
1451  ((TProof*)gProof)->ActivateAsyncInput();
1452 
1453  // This needs to be reset
1454  ((TProof*)gProof)->fCurrentMonitor = 0;
1455 
1456  // No reason to continue if invalid
1457  if (!fValid)
1458  return;
1459 
1460  // compute the offset for each file element
1461  Long64_t offset = 0;
1462  Long64_t newOffset = 0;
1463  TIter next(dset->GetListOfElements());
1464  TDSetElement *el;
1465  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1466  if (el->GetValid()) {
1467  newOffset = offset + el->GetTDSetOffset();
1468  el->SetTDSetOffset(offset);
1469  offset = newOffset;
1470  }
1471  }
1472 }
1473 
1474 ////////////////////////////////////////////////////////////////////////////////
1475 /// The result depends on the fStrategy
1476 
1477 Int_t TPacketizerAdaptive::CalculatePacketSize(TObject *slStatPtr, Long64_t cachesz, Int_t learnent)
1478 {
1479  Long64_t num;
1480  if (fStrategy == 0) {
1481  // TPacketizer's heuristic for starting packet size
1482  // Constant packet size;
1483  Int_t nslaves = fSlaveStats->GetSize();
1484  if (nslaves > 0) {
1485  num = fTotalEntries / (fPacketAsAFraction * nslaves);
1486  } else {
1487  num = 1;
1488  }
1489  } else {
1490  // The dynamic heuristic for setting the packet size (default)
1491  // Calculates the packet size based on performance of this slave
1492  // and estimated time left until the end of the query.
1493  TSlaveStat* slstat = (TSlaveStat*)slStatPtr;
1494  Float_t rate = slstat->GetCurRate();
1495  if (!rate)
1496  rate = slstat->GetAvgRate();
1497  if (rate) {
1498 
1499  // Global average rate
1500  Float_t avgProcRate = (GetEntriesProcessed()/(GetCumProcTime() / fSlaveStats->GetSize()));
1501  Float_t packetTime = ((fTotalEntries - GetEntriesProcessed())/avgProcRate)/fPacketAsAFraction;
1502 
1503  // Bytes-to-Event conversion
1504  Float_t bevt = (GetEntriesProcessed() > 0) ? GetBytesRead() / GetEntriesProcessed() : -1.;
1505 
1506  // Make sure it is not smaller then the cache, if the info is available and the size
1507  // synchronization is required. But apply the cache-packet size synchronization only if there
1508  // are enough left files to process and the files are all of similar sizes. Otherwise we risk
1509  // to not exploit optimally all potentially active workers.
1510  Bool_t cpsync = fCachePacketSync;
1511  if (fMaxEntriesRatio > 0. && cpsync) {
1512  if (fFilesToProcess && fFilesToProcess->GetSize() <= fSlaveStats->GetSize()) {
1513  Long64_t remEntries = fTotalEntries - GetEntriesProcessed();
1514  Long64_t maxEntries = -1;
1515  if (fFilesToProcess->Last()) {
1516  TDSetElement *elem = (TDSetElement *) ((TPacketizerAdaptive::TFileStat *) fFilesToProcess->Last())->GetElement();
1517  if (elem) maxEntries = elem->GetNum();
1518  }
1519  if (maxEntries > remEntries / fSlaveStats->GetSize() * fMaxEntriesRatio) {
1520  PDB(kPacketizer,3) {
1521  Info("CalculatePacketSize", "%s: switching off synchronization of packet and cache sizes:", slstat->GetOrdinal());
1522  Info("CalculatePacketSize", "%s: few files (%d) remaining of very different sizes (max/avg = %.2f > %.2f)",
1523  slstat->GetOrdinal(), fFilesToProcess->GetSize(),
1524  (Double_t)maxEntries / remEntries * fSlaveStats->GetSize(), fMaxEntriesRatio);
1525  }
1526  cpsync = kFALSE;
1527  }
1528  }
1529  }
1530  if (bevt > 0. && cachesz > 0 && cpsync) {
1531  if ((Long64_t)(rate * packetTime * bevt) < cachesz)
1532  packetTime = cachesz / bevt / rate;
1533  }
1534 
1535  // Apply min-max again, if required
1536  if (fMaxPacketTime > 0. && packetTime > fMaxPacketTime) packetTime = fMaxPacketTime;
1537  if (fMinPacketTime > 0. && packetTime < fMinPacketTime) packetTime = fMinPacketTime;
1538 
1539  // Translate the packet length in number of entries
1540  num = (Long64_t)(rate * packetTime);
1541 
1542  // Notify
1543  PDB(kPacketizer,2)
1544  Info("CalculatePacketSize","%s: avgr: %f, rate: %f, left: %lld, pacT: %f, sz: %f (csz: %f), num: %lld",
1545  slstat->GetOrdinal(), avgProcRate, rate, fTotalEntries - GetEntriesProcessed(),
1546  packetTime, ((bevt > 0) ? num*bevt/1048576. : -1.), cachesz/1048576., num);
1547 
1548  } else {
1549  // First packet for this worker in this query
1550  // Twice the learning phase
1551  num = (learnent > 0) ? 5 * learnent : 1000;
1552 
1553  // Notify
1554  PDB(kPacketizer,2)
1555  Info("CalculatePacketSize","%s: num: %lld", slstat->GetOrdinal(), num);
1556  }
1557  }
1558  if (num < 1) num = 1;
1559  return num;
1560 }
1561 
1562 ////////////////////////////////////////////////////////////////////////////////
1563 /// To be used by GetNextPacket but also in reaction to kPROOF_STOPPROCESS
1564 /// message (when the worker was asked to stop processing during a packet).
1565 /// returns the #entries intended in the last packet - #processed entries
1566 
1567 Int_t TPacketizerAdaptive::AddProcessed(TSlave *sl,
1568  TProofProgressStatus *status,
1569  Double_t latency,
1570  TList **listOfMissingFiles)
1571 {
1572  // find slave
1573  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1574  if (!slstat) {
1575  Error("AddProcessed", "%s: TSlaveStat instance for worker %s not found!",
1576  (sl ? sl->GetOrdinal() : "x.x"),
1577  (sl ? sl->GetName() : "**undef**"));
1578  return -1;
1579  }
1580 
1581  // update stats & free old element
1582 
1583  if ( slstat->fCurElem != 0 ) {
1584  Long64_t expectedNumEv = slstat->fCurElem->GetNum();
1585  // Calculate the number of events processed in the last packet
1586  Long64_t numev;
1587  if (status && status->GetEntries() > 0)
1588  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1589  else
1590  numev = 0;
1591 
1592  // Calculate the progress made in the last packet
1593  TProofProgressStatus *progress = 0;
1594  if (numev > 0) {
1595  // This also moves the pointer in the corrsponding TFileInfo
1596  progress = slstat->AddProcessed(status);
1597  if (progress) {
1598  (*fProgressStatus) += *progress;
1599  // update processing rate
1600  slstat->UpdateRates(status);
1601  }
1602  } else {
1603  progress = new TProofProgressStatus();
1604  }
1605  if (progress) {
1606  PDB(kPacketizer,2)
1607  Info("AddProcessed", "%s: %s: %lld %7.3lf %7.3lf %7.3lf %lld",
1608  sl->GetOrdinal(), sl->GetName(), progress->GetEntries(), latency,
1609  progress->GetProcTime(), progress->GetCPUTime(), progress->GetBytesRead());
1610 
1611  if (gPerfStats)
1612  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(),
1613  slstat->fCurElem->GetFileName(),
1614  progress->GetEntries(),
1615  latency,
1616  progress->GetProcTime(),
1617  progress->GetCPUTime(),
1618  progress->GetBytesRead());
1619  delete progress;
1620  }
1621  if (numev != expectedNumEv) {
1622  // The last packet was not fully processed
1623  // and will be split in two:
1624  // - The completed part was marked as done.
1625  // - Create a new packet with the part to be resubmitted.
1626  TDSetElement *newPacket = new TDSetElement(*(slstat->fCurElem));
1627  if (newPacket && numev < expectedNumEv) {
1628  Long64_t first = newPacket->GetFirst();
1629  newPacket->SetFirst(first + numev);
1630  if (ReassignPacket(newPacket, listOfMissingFiles) == -1)
1631  SafeDelete(newPacket);
1632  } else
1633  Error("AddProcessed", "%s: processed too much? (%lld, %lld)",
1634  sl->GetOrdinal(), numev, expectedNumEv);
1635 
1636  // TODO: a signal handler which will send info from the worker
1637  // after a packet fails.
1638  /* Add it to the failed packets list.
1639  if (!fFailedPackets) {
1640  fFailedPackets = new TList();
1641  }
1642  fFailedPackets->Add(slstat->fCurElem);
1643  */
1644  }
1645 
1646  slstat->fCurElem = 0;
1647  return (expectedNumEv - numev);
1648  } else {
1649  // the kPROOF_STOPPRPOCESS message is send after the worker receives zero
1650  // as the reply to kPROOF_GETNEXTPACKET
1651  return -1;
1652  }
1653 }
1654 
1655 ////////////////////////////////////////////////////////////////////////////////
1656 /// Get next packet;
1657 /// A meaningfull difference to TPacketizer is the fact that this
1658 /// packetizer, for each worker, tries to predict whether the worker
1659 /// will finish processing it's local files before the end of the query.
1660 /// If yes, it allocates, to those workers, files from non-slave filenodes
1661 /// or from slaves that are overloaded. The check is done every time a new
1662 /// file needs to be assigned.
1663 
1664 TDSetElement *TPacketizerAdaptive::GetNextPacket(TSlave *sl, TMessage *r)
1665 {
1666  if ( !fValid ) {
1667  return 0;
1668  }
1669 
1670  // find slave
1671 
1672  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1673  if (!slstat) {
1674  Error("GetNextPacket", "TSlaveStat instance for worker %s not found!",
1675  (sl ? sl->GetName() : "**undef**"));
1676  return 0;
1677  }
1678 
1679  // Attach to current file
1680  TFileStat *file = slstat->fCurFile;
1681 
1682  // Update stats & free old element
1683 
1684  Bool_t firstPacket = kFALSE;
1685  Long64_t cachesz = -1;
1686  Int_t learnent = -1;
1687  if ( slstat->fCurElem != 0 ) {
1688 
1689  Long64_t restEntries = 0;
1690  Double_t latency, proctime, proccpu;
1691  TProofProgressStatus *status = 0;
1692  Bool_t fileNotOpen = kFALSE, fileCorrupted = kFALSE;
1693 
1694  if (sl->GetProtocol() > 18) {
1695 
1696  (*r) >> latency;
1697  (*r) >> status;
1698 
1699  if (sl->GetProtocol() > 25) {
1700  (*r) >> cachesz >> learnent;
1701  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1702  }
1703  fileNotOpen = status->TestBit(TProofProgressStatus::kFileNotOpen) ? kTRUE : kFALSE;
1704  fileCorrupted = status->TestBit(TProofProgressStatus::kFileCorrupted) ? kTRUE : kFALSE;
1705 
1706  } else {
1707 
1708  Long64_t bytesRead = -1;
1709 
1710  (*r) >> latency >> proctime >> proccpu;
1711  // only read new info if available
1712  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1713  if (r->BufferSize() > r->Length()) (*r) >> restEntries;
1714  Long64_t totev = 0;
1715  if (r->BufferSize() > r->Length()) (*r) >> totev;
1716 
1717  status = new TProofProgressStatus(totev, bytesRead, -1, proctime, proccpu);
1718  fileNotOpen = (restEntries < 0) ? kTRUE : kFALSE;
1719  }
1720 
1721  if (!fileNotOpen && !fileCorrupted) {
1722  if (AddProcessed(sl, status, latency) != 0)
1723  Error("GetNextPacket", "%s: the worker processed a different # of entries", sl->GetOrdinal());
1724  if (fProgressStatus->GetEntries() >= fTotalEntries) {
1725  if (fProgressStatus->GetEntries() > fTotalEntries)
1726  Error("GetNextPacket", "%s: processed too many entries! (%lld, %lld)",
1727  sl->GetOrdinal(), fProgressStatus->GetEntries(), fTotalEntries);
1728  // Send last timer message and stop the timer
1729  HandleTimer(0);
1730  SafeDelete(fProgress);
1731  }
1732  } else {
1733  if (file) {
1734  if (file->GetElement()) {
1735  if (fileCorrupted) {
1736  Info("GetNextPacket", "%s: file '%s' turned corrupted: invalidating file (%lld)",
1737  sl->GetOrdinal(), file->GetElement()->GetName(), restEntries);
1738  Int_t nunproc = AddProcessed(sl, status, latency);
1739  PDB(kPacketizer,1)
1740  Info("GetNextPacket", "%s: %d entries un-processed", sl->GetOrdinal(), nunproc);
1741  // Remaining to be processed
1742  Long64_t num = 0;
1743  if (file->GetElement()->TestBit(TDSetElement::kCorrupted)) {
1744  // Add the remainign entries in the packet to the ones already registered
1745  num = file->GetElement()->GetEntries() + restEntries;
1746  } else {
1747  // First call: add the remaining entries in the packet and those of the file
1748  // not yet assigned
1749  Long64_t rest = file->GetElement()->GetEntries() - file->GetNextEntry();
1750  num = restEntries + rest;
1751  }
1752  file->GetElement()->SetEntries(num);
1753  PDB(kPacketizer,1)
1754  Info("GetNextPacket", "%s: removed file: %s, entries left: %lld", sl->GetOrdinal(),
1755  file->GetElement()->GetName(), file->GetElement()->GetEntries());
1756  // Flag as corrupted
1757  file->GetElement()->SetBit(TDSetElement::kCorrupted);
1758  } else {
1759  Info("GetNextPacket", "%s: file '%s' could not be open: invalidating related element",
1760  sl->GetOrdinal(), file->GetElement()->GetName());
1761  }
1762  // Invalidate the element
1763  file->GetElement()->Invalidate();
1764  // Add it to the failed packets list
1765  if (!fFailedPackets) fFailedPackets = new TList();
1766  if (!fFailedPackets->FindObject(file->GetElement()))
1767  fFailedPackets->Add(file->GetElement());
1768  }
1769  // Deactivate this TFileStat
1770  file->SetDone();
1771  RemoveActive(file);
1772  } else {
1773  Info("GetNextPacket", "%s: error raised by worker, but TFileStat object invalid:"
1774  " protocol error?", sl->GetOrdinal());
1775  }
1776  }
1777  } else {
1778  firstPacket = kTRUE;
1779  }
1780 
1781  if ( fStop ) {
1782  HandleTimer(0);
1783  return 0;
1784  }
1785 
1786  TString nodeName;
1787  if (file != 0) nodeName = file->GetNode()->GetName();
1788  TString nodeHostName(slstat->GetName());
1789 
1790  PDB(kPacketizer,3)
1791  Info("GetNextPacket", "%s: entries processed: %lld - looking for a packet from node '%s'",
1792  sl->GetOrdinal(), fProgressStatus->GetEntries(), nodeName.Data());
1793 
1794  // If current file is just finished
1795  if ( file != 0 && file->IsDone() ) {
1796  file->GetNode()->DecExtSlaveCnt(slstat->GetName());
1797  file->GetNode()->DecRunSlaveCnt();
1798  if (gPerfStats)
1799  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1800  file->GetElement()->GetFileName(), kFALSE);
1801  file = 0;
1802  }
1803  // Reset the current file field
1804  slstat->fCurFile = file;
1805 
1806  Long64_t avgEventsLeftPerSlave =
1807  (fTotalEntries - fProgressStatus->GetEntries()) / fSlaveStats->GetSize();
1808  if (fTotalEntries == fProgressStatus->GetEntries())
1809  return 0;
1810  // Get a file if needed
1811  if ( file == 0) {
1812  // Needs a new file
1813  Bool_t openLocal;
1814  // Aiming for localPreference == 1 when #local == #remote events left
1815  Float_t localPreference = fBaseLocalPreference - (fNEventsOnRemLoc /
1816  (0.4 *(fTotalEntries - fProgressStatus->GetEntries())));
1817  if ( slstat->GetFileNode() != 0 ) {
1818  // Local file node exists and has more events to process.
1819  fUnAllocated->Sort();
1820  TFileNode* firstNonLocalNode = (TFileNode*)fUnAllocated->First();
1821  Bool_t nonLocalNodePossible;
1822  if (fForceLocal)
1823  nonLocalNodePossible = 0;
1824  else
1825  nonLocalNodePossible = firstNonLocalNode ?
1826  (fMaxSlaveCnt < 0 || (fMaxSlaveCnt > 0 && firstNonLocalNode->GetExtSlaveCnt() < fMaxSlaveCnt))
1827  : 0;
1828  openLocal = !nonLocalNodePossible;
1829  Float_t slaveRate = slstat->GetAvgRate();
1830  if ( nonLocalNodePossible && fStrategy == 1) {
1831  // OpenLocal is set to kFALSE
1832  if ( slstat->GetFileNode()->GetRunSlaveCnt() >
1833  slstat->GetFileNode()->GetMySlaveCnt() - 1 )
1834  // External slaves help slstat -> don't open nonlocal files
1835  // -1 because, at this point slstat is not running
1836  openLocal = kTRUE;
1837  else if ( slaveRate == 0 ) { // first file for this slave
1838  // GetLocalEventsLeft() counts the potential slave
1839  // as running on its fileNode.
1840  if ( slstat->GetLocalEventsLeft() * localPreference
1841  > (avgEventsLeftPerSlave))
1842  openLocal = kTRUE;
1843  else if ( (firstNonLocalNode->GetEventsLeftPerSlave())
1844  < slstat->GetLocalEventsLeft() * localPreference )
1845  openLocal = kTRUE;
1846  else if ( firstNonLocalNode->GetExtSlaveCnt() > 1 )
1847  openLocal = kTRUE;
1848  else if ( firstNonLocalNode->GetRunSlaveCnt() == 0 )
1849  openLocal = kTRUE;
1850  } else {
1851  // At this point slstat has a non zero avg rate > 0
1852  Float_t slaveTime = slstat->GetLocalEventsLeft()/slaveRate;
1853  // And thus fCumProcTime, fProcessed > 0
1854  Float_t avgTime = avgEventsLeftPerSlave
1855  /(fProgressStatus->GetEntries()/GetCumProcTime());
1856  if (slaveTime * localPreference > avgTime)
1857  openLocal = kTRUE;
1858  else if ((firstNonLocalNode->GetEventsLeftPerSlave())
1859  < slstat->GetLocalEventsLeft() * localPreference)
1860  openLocal = kTRUE;
1861  }
1862  }
1863  if (openLocal || fStrategy == 0) {
1864  // Try its own node
1865  file = slstat->GetFileNode()->GetNextUnAlloc();
1866  if (!file)
1867  file = slstat->GetFileNode()->GetNextActive();
1868  if ( file == 0 ) {
1869  // No more files on this worker
1870  slstat->SetFileNode(0);
1871  }
1872  }
1873  }
1874 
1875  // Try to find an unused filenode first
1876  if(file == 0 && !fForceLocal)
1877  file = GetNextUnAlloc(0, nodeHostName);
1878 
1879  // Then look at the active filenodes
1880  if(file == 0 && !fForceLocal)
1881  file = GetNextActive();
1882 
1883  if (file == 0) return 0;
1884 
1885  PDB(kPacketizer,3) if (fFilesToProcess) fFilesToProcess->Print();
1886 
1887  slstat->fCurFile = file;
1888  // if remote and unallocated file
1889  if (file->GetNode()->GetMySlaveCnt() == 0 &&
1890  file->GetElement()->GetFirst() == file->GetNextEntry()) {
1891  fNEventsOnRemLoc -= file->GetElement()->GetNum();
1892  if (fNEventsOnRemLoc < 0) {
1893  Error("GetNextPacket",
1894  "inconsistent value for fNEventsOnRemLoc (%lld): stop delivering packets!",
1895  fNEventsOnRemLoc);
1896  return 0;
1897  }
1898  }
1899  file->GetNode()->IncExtSlaveCnt(slstat->GetName());
1900  file->GetNode()->IncRunSlaveCnt();
1901  if (gPerfStats)
1902  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1903  file->GetNode()->GetName(),
1904  file->GetElement()->GetFileName(), kTRUE);
1905  }
1906 
1907  Long64_t num = CalculatePacketSize(slstat, cachesz, learnent);
1908 
1909  // Get a packet
1910 
1911  TDSetElement *base = file->GetElement();
1912  Long64_t first = file->GetNextEntry();
1913  Long64_t last = base->GetFirst() + base->GetNum();
1914 
1915  // If the remaining part is smaller than the (packetsize * 1.5)
1916  // then increase the packetsize
1917 
1918  if ( first + num * 1.5 >= last ) {
1919  num = last - first;
1920  file->SetDone(); // done
1921  // Delete file from active list (unalloc list is single pass, no delete needed)
1922  RemoveActive(file);
1923  }
1924 
1925  // Update NextEntry in the file object
1926  file->MoveNextEntry(num);
1927 
1928  slstat->fCurElem = CreateNewPacket(base, first, num);
1929  if (base->GetEntryList())
1930  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1931 
1932  // Flag the first packet of a new run (dataset)
1933  if (firstPacket)
1934  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1935  else
1936  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1937 
1938  PDB(kPacketizer,2)
1939  Info("GetNextPacket","%s: %s %lld %lld (%lld)", sl->GetOrdinal(), base->GetFileName(), first, first + num - 1, num);
1940 
1941  return slstat->fCurElem;
1942 }
1943 
1944 ////////////////////////////////////////////////////////////////////////////////
1945 /// Return the number of workers still processing
1946 
1947 Int_t TPacketizerAdaptive::GetActiveWorkers()
1948 {
1949  Int_t actw = 0;
1950  TIter nxw(fSlaveStats);
1951  TObject *key;
1952  while ((key = nxw())) {
1953  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1954  if (wrkstat && wrkstat->fCurFile) actw++;
1955  }
1956  // Done
1957  return actw;
1958 }
1959 
1960 ////////////////////////////////////////////////////////////////////////////////
1961 /// Get Estimation of the current rate; just summing the current rates of
1962 /// the active workers
1963 
1964 Float_t TPacketizerAdaptive::GetCurrentRate(Bool_t &all)
1965 {
1966  all = kTRUE;
1967  // Loop over the workers
1968  Float_t currate = 0.;
1969  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1970  TIter nxw(fSlaveStats);
1971  TObject *key;
1972  while ((key = nxw()) != 0) {
1973  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1974  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1975  // Sum-up the current rates
1976  currate += slstat->GetProgressStatus()->GetCurrentRate();
1977  } else {
1978  all = kFALSE;
1979  }
1980  }
1981  }
1982  // Done
1983  return currate;
1984 }
1985 
1986 ////////////////////////////////////////////////////////////////////////////////
1987 /// Get estimation for the number of processed entries and bytes read at time t,
1988 /// based on the numbers already processed and the latests worker measured speeds.
1989 /// If t <= 0 the current time is used.
1990 /// Only the estimation for the entries is currently implemented.
1991 /// This is needed to smooth the instantaneous rate plot.
1992 
1993 Int_t TPacketizerAdaptive::GetEstEntriesProcessed(Float_t t, Long64_t &ent,
1994  Long64_t &bytes, Long64_t &calls)
1995 {
1996  // Default value
1997  ent = GetEntriesProcessed();
1998  bytes = GetBytesRead();
1999  calls = GetReadCalls();
2000 
2001  // Parse option
2002  if (fUseEstOpt == kEstOff)
2003  // Do not use estimation
2004  return 0;
2005  Bool_t current = (fUseEstOpt == kEstCurrent) ? kTRUE : kFALSE;
2006 
2007  TTime tnow = gSystem->Now();
2008  Double_t now = (t > 0) ? (Double_t)t : Long64_t(tnow) / (Double_t)1000.;
2009  Double_t dt = -1;
2010 
2011  // Loop over the workers
2012  Bool_t all = kTRUE;
2013  Float_t trate = 0.;
2014  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
2015  ent = 0;
2016  TIter nxw(fSlaveStats);
2017  TObject *key;
2018  while ((key = nxw()) != 0) {
2019  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
2020  if (slstat) {
2021  // Those surely processed
2022  Long64_t e = slstat->GetEntriesProcessed();
2023  if (e <= 0) all = kFALSE;
2024  // Time elapsed since last update
2025  dt = now - slstat->GetProgressStatus()->GetLastUpdate();
2026  // Add estimated entries processed since last update
2027  Float_t rate = (current && slstat->GetCurRate() > 0) ? slstat->GetCurRate()
2028  : slstat->GetAvgRate();
2029  trate += rate;
2030  // Add estimated entries processed since last update
2031  e += (Long64_t) (dt * rate);
2032  // Add to the total
2033  ent += e;
2034  // Notify
2035  PDB(kPacketizer,3)
2036  Info("GetEstEntriesProcessed","%s: e:%lld rate:%f dt:%f e:%lld",
2037  slstat->fSlave->GetOrdinal(),
2038  slstat->GetEntriesProcessed(), rate, dt, e);
2039  }
2040  }
2041  }
2042  // Notify
2043  dt = now - fProgressStatus->GetLastUpdate();
2044  PDB(kPacketizer,2)
2045  Info("GetEstEntriesProcessed",
2046  "dt: %f, estimated entries: %lld (%lld), bytes read: %lld rate: %f (all: %d)",
2047  dt, ent, GetEntriesProcessed(), bytes, trate, all);
2048 
2049  // Check values
2050  ent = (ent > 0) ? ent : fProgressStatus->GetEntries();
2051  ent = (ent <= fTotalEntries) ? ent : fTotalEntries;
2052  bytes = (bytes > 0) ? bytes : fProgressStatus->GetBytesRead();
2053 
2054  // Done
2055  return ((all) ? 0 : 1);
2056 }
2057 
2058 ////////////////////////////////////////////////////////////////////////////////
2059 /// This method can be called at any time during processing
2060 /// as an effect of handling kPROOF_STOPPROCESS
2061 /// If the output list from this worker is going to be sent back to the master,
2062 /// the 'status' includes the number of entries processed by the slave.
2063 /// From this we calculate the remaining part of the packet.
2064 /// 0 indicates that the results from that worker were lost completely.
2065 /// Assume that the filenodes for which we have a TFileNode object
2066 /// are still up and running.
2067 
2068 void TPacketizerAdaptive::MarkBad(TSlave *s, TProofProgressStatus *status,
2069  TList **listOfMissingFiles)
2070 {
2071  TSlaveStat *slaveStat = (TSlaveStat *)(fSlaveStats->GetValue(s));
2072  if (!slaveStat) {
2073  Error("MarkBad", "Worker does not exist");
2074  return;
2075  }
2076  // Update worker counters
2077  if (slaveStat->fCurFile && slaveStat->fCurFile->GetNode()) {
2078  slaveStat->fCurFile->GetNode()->DecExtSlaveCnt(slaveStat->GetName());
2079  slaveStat->fCurFile->GetNode()->DecRunSlaveCnt();
2080  }
2081 
2082  // If status is defined, the remaining part of the last packet is
2083  // reassigned in AddProcessed called from handling kPROOF_STOPPROCESS
2084  if (!status) {
2085  // Get the subset processed by the bad worker.
2086  TList *subSet = slaveStat->GetProcessedSubSet();
2087  if (subSet) {
2088  // Take care of the current packet
2089  if (slaveStat->fCurElem) {
2090  subSet->Add(slaveStat->fCurElem);
2091  }
2092  // Merge overlapping or subsequent elements
2093  Int_t nmg = 0, ntries = 100;
2094  TDSetElement *e = 0, *enxt = 0;
2095  do {
2096  nmg = 0;
2097  e = (TDSetElement *) subSet->First();
2098  while ((enxt = (TDSetElement *) subSet->After(e))) {
2099  if (e->MergeElement(enxt) >= 0) {
2100  nmg++;
2101  subSet->Remove(enxt);
2102  delete enxt;
2103  } else {
2104  e = enxt;
2105  }
2106  }
2107  } while (nmg > 0 && --ntries > 0);
2108  // reassign the packets assigned to the bad slave and save the size;
2109  SplitPerHost(subSet, listOfMissingFiles);
2110  // the elements were reassigned so should not be deleted
2111  subSet->SetOwner(0);
2112  } else {
2113  Warning("MarkBad", "subset processed by bad worker not found!");
2114  }
2115  (*fProgressStatus) -= *(slaveStat->GetProgressStatus());
2116  }
2117  // remove slavestat from the map
2118  fSlaveStats->Remove(s);
2119  delete slaveStat;
2120  // recalculate fNEventsOnRemLoc and others
2121  InitStats();
2122 }
2123 
2124 ////////////////////////////////////////////////////////////////////////////////
2125 /// The file in the listOfMissingFiles can appear several times;
2126 /// in order to fix that, a TDSetElement::Merge method is needed.
2127 
2128 Int_t TPacketizerAdaptive::ReassignPacket(TDSetElement *e,
2129  TList **listOfMissingFiles)
2130 {
2131  if (!e) {
2132  Error("ReassignPacket", "empty packet!");
2133  return -1;
2134  }
2135  // Check the old filenode
2136  TUrl url = e->GetFileName();
2137  // Check the host from which 'e' was previously read.
2138  // Map non URL filenames to dummy host
2139  TString host;
2140  if (!url.IsValid() || strncmp(url.GetProtocol(),"root", 4)) {
2141  host = "no-host";
2142  } else {
2143  host = url.GetHost();
2144  }
2145 
2146  // If accessible add it back to the old node
2147  // and do DecProcessed
2148  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
2149  if (node && fTryReassign) {
2150  // The packet 'e' was processing data from this node.
2151  node->DecreaseProcessed(e->GetNum());
2152  // The file should be already in fFilesToProcess ...
2153  node->Add(e, kFALSE);
2154  if (!fUnAllocated->FindObject(node))
2155  fUnAllocated->Add(node);
2156  return 0;
2157  } else {
2158  // Add to the list of missing files
2159  TFileInfo *fi = e->GetFileInfo();
2160  if (listOfMissingFiles && *listOfMissingFiles)
2161  (*listOfMissingFiles)->Add((TObject *)fi);
2162  return -1;
2163  }
2164 }
2165 
2166 ////////////////////////////////////////////////////////////////////////////////
2167 /// Split into per host entries
2168 /// The files in the listOfMissingFiles can appear several times;
2169 /// in order to fix that, a TDSetElement::Merge method is needed.
2170 
2171 void TPacketizerAdaptive::SplitPerHost(TList *elements,
2172  TList **listOfMissingFiles)
2173 {
2174  if (!elements) {
2175  Error("SplitPerHost", "Empty list of packets!");
2176  return;
2177  }
2178  if (elements->GetSize() <= 0) {
2179  Error("SplitPerHost", "The input list contains no elements");
2180  return;
2181  }
2182  TIter subSetIter(elements);
2183  TDSetElement *e;
2184  while ((e = (TDSetElement*) subSetIter.Next())) {
2185  if (ReassignPacket(e, listOfMissingFiles) == -1) {
2186  // Remove from the list in order to delete it.
2187  if (elements->Remove(e))
2188  Error("SplitPerHost", "Error removing a missing file");
2189  delete e;
2190  }
2191 
2192  }
2193 }