Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TPacketizer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 18/03/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TPacketizer
13 \ingroup proofkernel
14 
15 This class generates packets to be processed on PROOF worker servers.
16 A packet is an event range (begin entry and number of entries) or
17 object range (first object and number of objects) in a TTree
18 (entries) or a directory (objects) in a file.
19 Packets are generated taking into account the performance of the
20 remote machine, the time it took to process a previous packet on
21 the remote machine, the locality of the database files, etc.
22 
23 */
24 
25 #include "TPacketizer.h"
26 
27 #include "Riostream.h"
28 #include "TDSet.h"
29 #include "TEnv.h"
30 #include "TError.h"
31 #include "TEventList.h"
32 #include "TEntryList.h"
33 #include "TMap.h"
34 #include "TMessage.h"
35 #include "TMonitor.h"
36 #include "TNtupleD.h"
37 #include "TObject.h"
38 #include "TParameter.h"
39 #include "TPerfStats.h"
40 #include "TProofDebug.h"
41 #include "TProof.h"
42 #include "TProofPlayer.h"
43 #include "TProofServ.h"
44 #include "TSlave.h"
45 #include "TSocket.h"
46 #include "TTimer.h"
47 #include "TUrl.h"
48 #include "TClass.h"
49 #include "TMath.h"
50 #include "TObjString.h"
51 
52 //
53 // The following three utility classes manage the state of the
54 // work to be performed and the slaves involved in the process.
55 // A list of TFileNode(s) describes the hosts with files, each
56 // has a list of TFileStat(s) keeping the state for each TDSet
57 // element (file).
58 //
59 // The list of TSlaveStat(s) keep track of the work (being) done
60 // by each slave
61 //
62 
63 
64 //------------------------------------------------------------------------------
65 
66 class TPacketizer::TFileStat : public TObject {
67 
68 private:
69  Bool_t fIsDone; // is this element processed
70  TFileNode *fNode; // my FileNode
71  TDSetElement *fElement; // location of the file and its range
72  Long64_t fNextEntry; // cursor in the range, -1 when done
73 
74 public:
75  TFileStat(TFileNode *node, TDSetElement *elem);
76 
77  Bool_t IsDone() const {return fIsDone;}
78  void SetDone() {fIsDone = kTRUE;}
79  TFileNode *GetNode() const {return fNode;}
80  TDSetElement *GetElement() const {return fElement;}
81  Long64_t GetNextEntry() const {return fNextEntry;}
82  void MoveNextEntry(Long64_t step) {fNextEntry += step;}
83 };
84 
85 
86 TPacketizer::TFileStat::TFileStat(TFileNode *node, TDSetElement *elem)
87  : fIsDone(kFALSE), fNode(node), fElement(elem), fNextEntry(elem->GetFirst())
88 {
89 }
90 
91 
92 //------------------------------------------------------------------------------
93 
94 class TPacketizer::TFileNode : public TObject {
95 
96 private:
97  TString fNodeName; // FQDN of the node
98  TList *fFiles; // TDSetElements (files) stored on this node
99  TObject *fUnAllocFileNext; // cursor in fFiles
100  TList *fActFiles; // files with work remaining
101  TObject *fActFileNext; // cursor in fActFiles
102  Int_t fMySlaveCnt; // number of slaves running on this node
103  Int_t fSlaveCnt; // number of external slaves processing files on this node
104 
105 public:
106  TFileNode(const char *name);
107  ~TFileNode() { delete fFiles; delete fActFiles; }
108 
109  void IncMySlaveCnt() { fMySlaveCnt++; }
110  void IncSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt++; }
111  void DecSlaveCnt(const char *slave) { if (fNodeName != slave) fSlaveCnt--; R__ASSERT(fSlaveCnt >= 0); }
112  Int_t GetSlaveCnt() const {return fMySlaveCnt + fSlaveCnt;}
113  Int_t GetNumberOfActiveFiles() const { return fActFiles->GetSize(); }
114  Bool_t IsSortable() const { return kTRUE; }
115 
116  const char *GetName() const { return fNodeName.Data(); }
117 
118  void Add(TDSetElement *elem)
119  {
120  TFileStat *f = new TFileStat(this,elem);
121  fFiles->Add(f);
122  if (fUnAllocFileNext == 0) fUnAllocFileNext = fFiles->First();
123  }
124 
125  TFileStat *GetNextUnAlloc()
126  {
127  TObject *next = fUnAllocFileNext;
128 
129  if (next != 0) {
130  // make file active
131  fActFiles->Add(next);
132  if (fActFileNext == 0) fActFileNext = fActFiles->First();
133 
134  // move cursor
135  fUnAllocFileNext = fFiles->After(fUnAllocFileNext);
136  }
137 
138  return (TFileStat *) next;
139  }
140 
141  TFileStat *GetNextActive()
142  {
143  TObject *next = fActFileNext;
144 
145  if (fActFileNext != 0) {
146  fActFileNext = fActFiles->After(fActFileNext);
147  if (fActFileNext == 0) fActFileNext = fActFiles->First();
148  }
149 
150  return (TFileStat *) next;
151  }
152 
153  void RemoveActive(TFileStat *file)
154  {
155  if (fActFileNext == file) fActFileNext = fActFiles->After(file);
156  fActFiles->Remove(file);
157  if (fActFileNext == 0) fActFileNext = fActFiles->First();
158  }
159 
160  Int_t Compare(const TObject *other) const
161  {
162  // Must return -1 if this is smaller than obj, 0 if objects are equal
163  // and 1 if this is larger than obj.
164  const TFileNode *obj = dynamic_cast<const TFileNode*>(other);
165  if (!obj) {
166  Error("Compare", "input is not a TPacketizer::TFileNode object");
167  return 0;
168  }
169 
170  Int_t myVal = GetSlaveCnt();
171  Int_t otherVal = obj->GetSlaveCnt();
172  if (myVal < otherVal) {
173  return -1;
174  } else if (myVal > otherVal) {
175  return 1;
176  } else {
177  return 0;
178  }
179  }
180 
181  void Print(Option_t *) const
182  {
183  std::cout << "OBJ: " << IsA()->GetName() << "\t" << fNodeName
184  << "\tMySlaveCount " << fMySlaveCnt
185  << "\tSlaveCount " << fSlaveCnt << std::endl;
186  }
187 
188  void Reset()
189  {
190  fUnAllocFileNext = fFiles->First();
191  fActFiles->Clear();
192  fActFileNext = 0;
193  fSlaveCnt = 0;
194  fMySlaveCnt = 0;
195  }
196 };
197 
198 
199 TPacketizer::TFileNode::TFileNode(const char *name)
200  : fNodeName(name), fFiles(new TList), fUnAllocFileNext(0),fActFiles(new TList),
201  fActFileNext(0), fMySlaveCnt(0), fSlaveCnt(0)
202 {
203  // Constructor
204 
205  fFiles->SetOwner();
206  fActFiles->SetOwner(kFALSE);
207 }
208 
209 
210 //------------------------------------------------------------------------------
211 
212 class TPacketizer::TSlaveStat : public TVirtualPacketizer::TVirtualSlaveStat {
213 
214 friend class TPacketizer;
215 
216 private:
217  TFileNode *fFileNode; // corresponding node or 0
218  TFileStat *fCurFile; // file currently being processed
219  TDSetElement *fCurElem; // TDSetElement currently being processed
220  TProofProgressStatus *AddProcessed(TProofProgressStatus *st);
221 public:
222  TSlaveStat(TSlave *slave);
223  ~TSlaveStat();
224 
225  TFileNode *GetFileNode() const { return fFileNode; }
226 
227  void SetFileNode(TFileNode *node) { fFileNode = node; }
228 };
229 
230 
231 TPacketizer::TSlaveStat::TSlaveStat(TSlave *slave)
232  : fFileNode(0), fCurFile(0), fCurElem(0)
233 {
234  fSlave = slave;
235  fStatus = new TProofProgressStatus();
236 }
237 
238 ////////////////////////////////////////////////////////////////////////////////
239 /// Cleanup
240 
241 TPacketizer::TSlaveStat::~TSlaveStat()
242 {
243  SafeDelete(fStatus);
244 }
245 
246 TProofProgressStatus *TPacketizer::TSlaveStat::AddProcessed(TProofProgressStatus *st)
247 {
248  // Update the status info to the 'st'.
249  // return the difference (*st - *fStatus)
250 
251  if (st) {
252  // The entriesis not correct in 'st'
253  Long64_t lastEntries = st->GetEntries() - fStatus->GetEntries();
254  // The last proc time should not be added
255  fStatus->SetLastProcTime(0.);
256  // Get the diff
257  TProofProgressStatus *diff = new TProofProgressStatus(*st - *fStatus);
258  *fStatus += *diff;
259  // Set the correct value
260  fStatus->SetLastEntries(lastEntries);
261  return diff;
262  } else {
263  Error("AddProcessed", "status arg undefined");
264  return 0;
265  }
266 }
267 
268 //------------------------------------------------------------------------------
269 
270 ClassImp(TPacketizer);
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Constructor
274 
275 TPacketizer::TPacketizer(TDSet *dset, TList *slaves, Long64_t first,
276  Long64_t num, TList *input, TProofProgressStatus *st)
277  : TVirtualPacketizer(input, st)
278 {
279  PDB(kPacketizer,1) Info("TPacketizer", "Enter (first %lld, num %lld)", first, num);
280 
281  // Init pointer members
282  fPackets = 0;
283  fUnAllocated = 0;
284  fActive = 0;
285  fFileNodes = 0;
286  fMaxPerfIdx = 1;
287  fMaxSlaveCnt = 0;
288  fHeuristicPSiz = kFALSE;
289  fDefMaxWrkNode = kTRUE;
290 
291  if (!fProgressStatus) {
292  Error("TPacketizer", "No progress status");
293  return;
294  }
295 
296  Long_t maxSlaveCnt = 0;
297  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", maxSlaveCnt) == 0) {
298  if (maxSlaveCnt < 0) {
299  Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
300  maxSlaveCnt = 0;
301  }
302  if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
303  } else {
304  // Try also with Int_t (recently supported in TProof::SetParameter)
305  Int_t mxslcnt = -1;
306  if (TProof::GetParameter(input, "PROOF_MaxSlavesPerNode", mxslcnt) == 0) {
307  if (mxslcnt < 0) {
308  Warning("TPacketizer", "PROOF_MaxSlavesPerNode must be positive");
309  mxslcnt = 0;
310  }
311  maxSlaveCnt = (Long_t) mxslcnt;
312  if (maxSlaveCnt > 0) fDefMaxWrkNode = kFALSE;
313  }
314  }
315  if (!maxSlaveCnt) {
316  maxSlaveCnt = gEnv->GetValue("Packetizer.MaxWorkersPerNode", slaves->GetSize());
317  if (maxSlaveCnt != slaves->GetSize()) fDefMaxWrkNode = kFALSE;
318  }
319  if (maxSlaveCnt > 0) {
320  fMaxSlaveCnt = maxSlaveCnt;
321  PDB(kPacketizer,1)
322  Info("TPacketizer", "setting max number of workers per node to %ld", fMaxSlaveCnt);
323  }
324 
325  fPackets = new TList;
326  fPackets->SetOwner();
327 
328  fFileNodes = new TList;
329  fFileNodes->SetOwner();
330  fUnAllocated = new TList;
331  fUnAllocated->SetOwner(kFALSE);
332  fActive = new TList;
333  fActive->SetOwner(kFALSE);
334 
335 
336  fValid = kTRUE;
337 
338  // Resolve end-point urls to optmize distribution
339  // dset->Lookup(); // moved to TProofPlayerRemote::Process
340 
341  // Split into per host entries
342  dset->Reset();
343  TDSetElement *e;
344  while ((e = (TDSetElement*)dset->Next())) {
345  if (e->GetValid()) continue;
346 
347  TUrl url = e->GetFileName();
348 
349  // Map non URL filenames to dummy host
350  TString host;
351  if ( !url.IsValid() ||
352  (strncmp(url.GetProtocol(),"root", 4) &&
353  strncmp(url.GetProtocol(),"file", 4)) ) {
354  host = "no-host";
355  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
356  host = "localhost";
357  url.SetProtocol("root");
358  } else {
359  host = url.GetHost();
360  }
361  // Get full name for local hosts
362  if (host.Contains("localhost") || host == "127.0.0.1") {
363  url.SetHost(gSystem->HostName());
364  host = url.GetHostFQDN();
365  }
366 
367  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
368 
369  if (node == 0) {
370  node = new TFileNode(host);
371  fFileNodes->Add(node);
372  }
373 
374  node->Add( e );
375  }
376 
377  fSlaveStats = new TMap;
378  fSlaveStats->SetOwner(kFALSE);
379 
380  // Record initial available workers
381  Int_t nwrks = AddWorkers(slaves);
382  Info("TPacketizer", "Initial number of workers: %d", nwrks);
383 
384  // Setup file & filenode structure
385  Reset();
386  // Optimize the number of files to be open when running on subsample
387  Int_t validateMode = 0;
388  Int_t gprc = TProof::GetParameter(input, "PROOF_ValidateByFile", validateMode);
389  Bool_t byfile = (gprc == 0 && validateMode > 0 && num > -1) ? kTRUE : kFALSE;
390  if (num > -1)
391  PDB(kPacketizer,2)
392  Info("TPacketizer",
393  "processing subset of entries: validating by file? %s", byfile ? "yes": "no");
394  ValidateFiles(dset, slaves, num, byfile);
395 
396  if (!fValid) return;
397 
398  // apply global range (first,num) to dset and rebuild structure
399  // ommitting TDSet elements that are not needed
400 
401  Int_t files = 0;
402  fTotalEntries = 0;
403  fUnAllocated->Clear(); // avoid dangling pointers
404  fActive->Clear();
405  fFileNodes->Clear(); // then delete all objects
406  PDB(kPacketizer,2) Info("TPacketizer", "processing range: first %lld, num %lld", first, num);
407 
408  dset->Reset();
409  Long64_t cur = 0;
410  while (( e = (TDSetElement*)dset->Next())) {
411 
412  // Skip invalid or missing file; It will be moved
413  // from the dset to the 'MissingFiles' list in the player.
414  if (!e->GetValid()) continue;
415 
416  // The dataset name, if any
417  if (fDataSet.IsNull() && e->GetDataSet() && strlen(e->GetDataSet()))
418  fDataSet = e->GetDataSet();
419 
420  TUrl url = e->GetFileName();
421  Long64_t eFirst = e->GetFirst();
422  Long64_t eNum = e->GetNum();
423  PDB(kPacketizer,2)
424  Info("TPacketizer", " --> '%s'", e->GetFileName());
425  PDB(kPacketizer,2)
426  Info("TPacketizer", " --> first %lld, num %lld (cur %lld)", eFirst, eNum, cur);
427 
428  if (!e->GetEntryList()){
429  // this element is before the start of the global range, skip it
430  if (cur + eNum < first) {
431  cur += eNum;
432  PDB(kPacketizer,2)
433  Info("TPacketizer", " --> skip element cur %lld", cur);
434  continue;
435  }
436 
437  // this element is after the end of the global range, skip it
438  if (num != -1 && (first+num <= cur)) {
439  cur += eNum;
440  PDB(kPacketizer,2)
441  Info("TPacketizer", " --> drop element cur %lld", cur);
442  continue; // break ??
443  }
444 
445  Bool_t inRange = kFALSE;
446  if (cur <= first || (num != -1 && (first+num <= cur+eNum))) {
447 
448  if (cur <= first) {
449  // If this element contains the start of the global range
450  // adjust its start and number of entries
451  e->SetFirst( eFirst + (first - cur) );
452  e->SetNum( e->GetNum() - (first - cur) );
453  PDB(kPacketizer,2)
454  Info("TPacketizer", " --> adjust start %lld and end %lld",
455  eFirst + (first - cur), first + num - cur);
456  inRange = kTRUE;
457  }
458  if (num != -1 && (first+num <= cur+eNum)) {
459  // If this element contains the end of the global range
460  // adjust its number of entries
461  e->SetNum( first + num - e->GetFirst() - cur );
462  PDB(kPacketizer,2)
463  Info("TPacketizer", " --> adjust end %lld", first + num - cur);
464  inRange = kTRUE;
465  }
466 
467  } else {
468  // Increment the counter ...
469  PDB(kPacketizer,2)
470  Info("TPacketizer", " --> increment 'cur' by %lld", eNum);
471  cur += eNum;
472  }
473  // Re-adjust eNum and cur, if needed
474  if (inRange) {
475  cur += eNum;
476  eNum = e->GetNum();
477  }
478 
479  } else {
480  TEntryList *enl = dynamic_cast<TEntryList *>(e->GetEntryList());
481  if (enl) {
482  eNum = enl->GetN();
483  } else {
484  TEventList *evl = dynamic_cast<TEventList *>(e->GetEntryList());
485  eNum = evl ? evl->GetN() : eNum;
486  }
487  if (!eNum)
488  continue;
489  }
490  PDB(kPacketizer,2)
491  Info("TPacketizer", " --> next cur %lld", cur);
492 
493  // Map non URL filenames to dummy host
494  TString host;
495  if ( !url.IsValid() ||
496  (strncmp(url.GetProtocol(),"root", 4) &&
497  strncmp(url.GetProtocol(),"file", 4)) ) {
498  host = "no-host";
499  } else if ( url.IsValid() && !strncmp(url.GetProtocol(),"file", 4)) {
500  host = "localhost";
501  url.SetProtocol("root");
502  } else {
503  host = url.GetHostFQDN();
504  }
505  // Get full name for local hosts
506  if (host.Contains("localhost") || host == "127.0.0.1") {
507  url.SetHost(gSystem->HostName());
508  host = url.GetHostFQDN();
509  }
510 
511  TFileNode *node = (TFileNode*) fFileNodes->FindObject( host );
512 
513  if ( node == 0 ) {
514  node = new TFileNode( host );
515  fFileNodes->Add( node );
516  }
517 
518  ++files;
519  fTotalEntries += eNum;
520  node->Add(e);
521  PDB(kPacketizer,2) e->Print("a");
522  }
523 
524  PDB(kPacketizer,1)
525  Info("TPacketizer", "processing %lld entries in %d files on %d hosts",
526  fTotalEntries, files, fFileNodes->GetSize());
527 
528  // Set the total number for monitoring
529  if (gPerfStats)
530  gPerfStats->SetNumEvents(fTotalEntries);
531 
532  Reset();
533 
534  if (fFileNodes->GetSize() == 0) {
535  Info("TPacketizer", "no valid or non-empty file found: setting invalid");
536  // No valid files: set invalid and return
537  fValid = kFALSE;
538  return;
539  }
540 
541  // Below we provide a possibility to change the way packet size is
542  // calculated or define the packet size directly.
543  // fPacketAsAFraction can be interpreted as follows:
544  // assuming all slaves have equal processing rate,
545  // packet size is (#events processed by 1 slave) / fPacketSizeAsAFraction.
546  // It substitutes 20 in the old formula to calculate the fPacketSize:
547  // fPacketSize = fTotalEntries / (20 * nslaves)
548  Long_t packetAsAFraction = 20;
549  if (TProof::GetParameter(input, "PROOF_PacketAsAFraction", packetAsAFraction) == 0)
550  Info("Process", "using alternate fraction of query time as a packet Size: %ld",
551  packetAsAFraction);
552  fPacketAsAFraction = (Int_t)packetAsAFraction;
553 
554  fPacketSize = 1;
555  if (TProof::GetParameter(input, "PROOF_PacketSize", fPacketSize) == 0) {
556  Info("Process","using alternate packet size: %lld", fPacketSize);
557  } else {
558  // Heuristic for starting packet size
559  fHeuristicPSiz = kTRUE;
560  Int_t nslaves = fSlaveStats->GetSize();
561  if (nslaves > 0) {
562  fPacketSize = fTotalEntries / (fPacketAsAFraction * nslaves);
563  if (fPacketSize < 1) fPacketSize = 1;
564  } else {
565  fPacketSize = 1;
566  }
567  }
568 
569  PDB(kPacketizer,1) Info("TPacketizer", "Base Packetsize = %lld", fPacketSize);
570 
571  if (!fValid)
572  SafeDelete(fProgress);
573 
574  PDB(kPacketizer,1) Info("TPacketizer", "Return");
575 }
576 
577 ////////////////////////////////////////////////////////////////////////////////
578 /// Destructor.
579 
580 TPacketizer::~TPacketizer()
581 {
582  if (fSlaveStats) {
583  fSlaveStats->DeleteValues();
584  }
585 
586  SafeDelete(fPackets);
587  SafeDelete(fSlaveStats);
588  SafeDelete(fUnAllocated);
589  SafeDelete(fActive);
590  SafeDelete(fFileNodes);
591 }
592 
593 ////////////////////////////////////////////////////////////////////////////////
594 /// Adds new workers. Returns the number of workers added, or -1 on failure.
595 
596 Int_t TPacketizer::AddWorkers(TList *workers)
597 {
598  if (!workers) {
599  Error("AddWorkers", "Null list of new workers!");
600  return -1;
601  }
602 
603  Int_t curNumOfWrks = fSlaveStats->GetEntries();
604 
605  TSlave *sl;
606  TIter next(workers);
607  while (( sl = dynamic_cast<TSlave*>(next()) ))
608  if (!fSlaveStats->FindObject(sl)) {
609  fSlaveStats->Add(sl, new TSlaveStat(sl));
610  fMaxPerfIdx = sl->GetPerfIdx() > fMaxPerfIdx ? sl->GetPerfIdx() : fMaxPerfIdx;
611  }
612 
613  // If heuristic (and new workers) set the packet size
614  Int_t nwrks = fSlaveStats->GetSize();
615  if (fHeuristicPSiz && nwrks > curNumOfWrks) {
616  if (nwrks > 0) {
617  fPacketSize = fTotalEntries / (fPacketAsAFraction * nwrks);
618  if (fPacketSize < 1) fPacketSize = 1;
619  } else {
620  fPacketSize = 1;
621  }
622  }
623 
624  // Update the max number that can access one file node if the default is used
625  if (fDefMaxWrkNode && nwrks > fMaxSlaveCnt) fMaxSlaveCnt = nwrks;
626 
627  // Done
628  return nwrks;
629 }
630 
631 ////////////////////////////////////////////////////////////////////////////////
632 /// Get next unallocated file.
633 
634 TPacketizer::TFileStat *TPacketizer::GetNextUnAlloc(TFileNode *node)
635 {
636  TFileStat *file = 0;
637 
638  if (node != 0) {
639  file = node->GetNextUnAlloc();
640  if (file == 0) RemoveUnAllocNode(node);
641  } else {
642  while (file == 0 && ((node = NextUnAllocNode()) != 0)) {
643  file = node->GetNextUnAlloc();
644  if (file == 0) RemoveUnAllocNode(node);
645  }
646  }
647 
648  if (file != 0) {
649  // if needed make node active
650  if (fActive->FindObject(node) == 0) {
651  fActive->Add(node);
652  }
653  }
654 
655  return file;
656 }
657 
658 ////////////////////////////////////////////////////////////////////////////////
659 /// Get next unallocated node.
660 
661 TPacketizer::TFileNode *TPacketizer::NextUnAllocNode()
662 {
663  fUnAllocated->Sort();
664  PDB(kPacketizer,2) {
665  std::cout << "TPacketizer::NextUnAllocNode()" << std::endl;
666  fUnAllocated->Print();
667  }
668 
669  TFileNode *fn = (TFileNode*) fUnAllocated->First();
670  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
671  PDB(kPacketizer,1) Info("NextUnAllocNode", "reached workers per node limit (%ld)",
672  fMaxSlaveCnt);
673  fn = 0;
674  }
675 
676  return fn;
677 }
678 
679 ////////////////////////////////////////////////////////////////////////////////
680 /// Remove unallocated node.
681 
682 void TPacketizer::RemoveUnAllocNode(TFileNode * node)
683 {
684  fUnAllocated->Remove(node);
685 }
686 
687 ////////////////////////////////////////////////////////////////////////////////
688 /// Get next active file.
689 
690 TPacketizer::TFileStat *TPacketizer::GetNextActive()
691 {
692  TFileNode *node;
693  TFileStat *file = 0;
694 
695  while (file == 0 && ((node = NextActiveNode()) != 0)) {
696  file = node->GetNextActive();
697  if (file == 0) RemoveActiveNode(node);
698  }
699 
700  return file;
701 }
702 
703 ////////////////////////////////////////////////////////////////////////////////
704 /// Get next active node.
705 
706 TPacketizer::TFileNode *TPacketizer::NextActiveNode()
707 {
708  fActive->Sort();
709  PDB(kPacketizer,2) {
710  Printf("TPacketizer::NextActiveNode : ----------------------");
711  fActive->Print();
712  }
713 
714  TFileNode *fn = (TFileNode*) fActive->First();
715  if (fn != 0 && fMaxSlaveCnt > 0 && fn->GetSlaveCnt() >= fMaxSlaveCnt) {
716  PDB(kPacketizer,1)
717  Info("NextActiveNode", "reached workers per node limit (%ld)", fMaxSlaveCnt);
718  fn = 0;
719  }
720 
721  return fn;
722 }
723 
724 ////////////////////////////////////////////////////////////////////////////////
725 /// Remove file from the list of actives.
726 
727 void TPacketizer::RemoveActive(TFileStat *file)
728 {
729  TFileNode *node = file->GetNode();
730 
731  node->RemoveActive(file);
732  if (node->GetNumberOfActiveFiles() == 0) RemoveActiveNode(node);
733 }
734 
735 ////////////////////////////////////////////////////////////////////////////////
736 /// Remove node from the list of actives.
737 
738 void TPacketizer::RemoveActiveNode(TFileNode *node)
739 {
740  fActive->Remove(node);
741 }
742 
743 ////////////////////////////////////////////////////////////////////////////////
744 /// Reset the internal datastructure for packet distribution.
745 
746 void TPacketizer::Reset()
747 {
748  fUnAllocated->Clear();
749  fUnAllocated->AddAll(fFileNodes);
750 
751  fActive->Clear();
752 
753  TIter files(fFileNodes);
754  TFileNode *fn;
755  while ((fn = (TFileNode*) files.Next()) != 0) {
756  fn->Reset();
757  }
758 
759  TIter slaves(fSlaveStats);
760  TObject *key;
761  while ((key = slaves.Next()) != 0) {
762  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue(key);
763  if (slstat) {
764  fn = (TFileNode*) fFileNodes->FindObject(slstat->GetName());
765  if (fn != 0 ) {
766  slstat->SetFileNode(fn);
767  fn->IncMySlaveCnt();
768  }
769  slstat->fCurFile = 0;
770  } else {
771  Warning("Reset", "TSlaveStat associated to key '%s' is NULL", key->GetName());
772  }
773  }
774 }
775 
776 ////////////////////////////////////////////////////////////////////////////////
777 /// Check existence of file/dir/tree an get number of entries.
778 /// Assumes the files have been setup.
779 
780 void TPacketizer::ValidateFiles(TDSet *dset, TList *slaves, Long64_t maxent, Bool_t byfile)
781 {
782  TMap slaves_by_sock;
783  TMonitor mon;
784  TList workers;
785 
786 
787  // Setup the communication infrastructure
788 
789  workers.AddAll(slaves);
790  TIter si(slaves);
791  TSlave *slm = 0;
792  while ((slm = (TSlave*)si.Next()) != 0) {
793  PDB(kPacketizer,3)
794  Info("ValidateFiles","socket added to monitor: %p (%s)",
795  slm->GetSocket(), slm->GetName());
796  mon.Add(slm->GetSocket());
797  slaves_by_sock.Add(slm->GetSocket(), slm);
798  PDB(kPacketizer,1)
799  Info("ValidateFiles",
800  "mon: %p, wrk: %p, sck: %p", &mon, slm, slm->GetSocket());
801  }
802 
803  mon.DeActivateAll();
804 
805  ((TProof*)gProof)->DeActivateAsyncInput();
806 
807  // Some monitoring systems (TXSocketHandler) need to know this
808  ((TProof*)gProof)->fCurrentMonitor = &mon;
809 
810  // Preparing for client notification
811  TString msg("Validating files");
812  UInt_t n = 0;
813  UInt_t tot = dset->GetListOfElements()->GetSize();
814  Bool_t st = kTRUE;
815 
816  Long64_t totent = 0, nopenf = 0;
817  while (kTRUE) {
818 
819  // send work
820  while( TSlave *s = (TSlave*)workers.First() ) {
821 
822  workers.Remove(s);
823 
824  // find a file
825 
826  TSlaveStat *slstat = (TSlaveStat*)fSlaveStats->GetValue(s);
827  if (!slstat) {
828  Error("ValidateFiles", "TSlaveStat associated to slave '%s' is NULL", s->GetName());
829  continue;
830  }
831  TFileNode *node = 0;
832  TFileStat *file = 0;
833 
834  // try its own node first
835  if ( (node = slstat->GetFileNode()) != 0 ) {
836  file = GetNextUnAlloc(node);
837  if ( file == 0 ) {
838  slstat->SetFileNode(0);
839  }
840  }
841 
842  // look for a file on any other node if necessary
843  if (file == 0) {
844  file = GetNextUnAlloc();
845  }
846 
847  if ( file != 0 ) {
848  // files are done right away
849  RemoveActive(file);
850 
851  slstat->fCurFile = file;
852  TDSetElement *elem = file->GetElement();
853  Long64_t entries = elem->GetEntries(kTRUE, kFALSE);
854  if (entries < 0 || strlen(elem->GetTitle()) <= 0) {
855  // This is decremented when we get the reply
856  file->GetNode()->IncSlaveCnt(slstat->GetName());
857  TMessage m(kPROOF_GETENTRIES);
858  m << dset->IsTree()
859  << TString(elem->GetFileName())
860  << TString(elem->GetDirectory())
861  << TString(elem->GetObjName());
862 
863  s->GetSocket()->Send( m );
864  mon.Activate(s->GetSocket());
865  PDB(kPacketizer,2)
866  Info("ValidateFiles",
867  "sent to worker-%s (%s) via %p GETENTRIES on %s %s %s %s",
868  s->GetOrdinal(), s->GetName(), s->GetSocket(),
869  dset->IsTree() ? "tree" : "objects", elem->GetFileName(),
870  elem->GetDirectory(), elem->GetObjName());
871  } else {
872  // Fill the info
873  elem->SetTDSetOffset(entries);
874  if (entries > 0) {
875  // Most likely valid
876  elem->SetValid();
877  if (!elem->GetEntryList()) {
878  if (elem->GetFirst() > entries) {
879  Error("ValidateFiles",
880  "first (%lld) higher then number of entries (%lld) in %s",
881  elem->GetFirst(), entries, elem->GetFileName());
882  // disable element
883  slstat->fCurFile->SetDone();
884  elem->Invalidate();
885  dset->SetBit(TDSet::kSomeInvalid);
886  }
887  if (elem->GetNum() == -1) {
888  elem->SetNum(entries - elem->GetFirst());
889  } else if (elem->GetFirst() + elem->GetNum() > entries) {
890  Warning("ValidateFiles", "num (%lld) + first (%lld) larger then number of"
891  " keys/entries (%lld) in %s", elem->GetNum(), elem->GetFirst(),
892  entries, elem->GetFileName());
893  elem->SetNum(entries - elem->GetFirst());
894  }
895  PDB(kPacketizer,2)
896  Info("ValidateFiles",
897  "found elem '%s' with %lld entries", elem->GetFileName(), entries);
898  }
899  }
900  // Notify the client
901  n++;
902  gProof->SendDataSetStatus(msg, n, tot, st);
903 
904  // This worker is ready for the next validation
905  workers.Add(s);
906  }
907  }
908  }
909 
910  // Check if there is anything to wait for
911  if (mon.GetActive() == 0) {
912  if (byfile && maxent > 0 && totent > 0) {
913  // How many files do we still need ?
914  Long64_t nrestf = (maxent - totent) * nopenf / totent ;
915  if (nrestf <= 0 && maxent > totent) nrestf = 1;
916  if (nrestf > 0) {
917  PDB(kPacketizer,3)
918  Info("ValidateFiles", "{%lld, %lld, %lld): needs to validate %lld more files",
919  maxent, totent, nopenf, nrestf);
920  si.Reset();
921  while ((slm = (TSlave *) si.Next()) && nrestf--) {
922  workers.Add(slm);
923  }
924  continue;
925  } else {
926  PDB(kPacketizer,3)
927  Info("ValidateFiles", "no need to validate more files");
928  break;
929  }
930  } else {
931  break;
932  }
933  }
934 
935  PDB(kPacketizer,3) {
936  Info("ValidateFiles", "waiting for %d workers:", mon.GetActive());
937  TList *act = mon.GetListOfActives();
938  TIter next(act);
939  TSocket *s = 0;
940  while ((s = (TSocket*) next())) {
941  Info("ValidateFiles", "found sck: %p", s);
942  TSlave *sl = (TSlave *) slaves_by_sock.GetValue(s);
943  if (sl)
944  Info("ValidateFiles", " worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
945  }
946  delete act;
947  }
948 
949  TSocket *sock = mon.Select();
950  // If we have been interrupted break
951  if (!sock) {
952  Error("ValidateFiles", "selection has been interrupted - STOP");
953  mon.DeActivateAll();
954  fValid = kFALSE;
955  break;
956  }
957  mon.DeActivate(sock);
958 
959  PDB(kPacketizer,3) Info("ValidateFiles", "select returned: %p", sock);
960 
961  TSlave *slave = (TSlave *) slaves_by_sock.GetValue( sock );
962  if (!sock->IsValid()) {
963  // A socket got invalid during validation
964  Error("ValidateFiles", "worker-%s (%s) got invalid - STOP",
965  slave->GetOrdinal(), slave->GetName());
966  ((TProof*)gProof)->MarkBad(slave);
967  fValid = kFALSE;
968  break;
969  }
970 
971  TMessage *reply;
972 
973  if ( sock->Recv(reply) <= 0 ) {
974  // Help! lost a slave?
975  ((TProof*)gProof)->MarkBad(slave);
976  fValid = kFALSE;
977  Error("ValidateFiles", "Recv failed! for worker-%s (%s)",
978  slave->GetOrdinal(), slave->GetName());
979  continue;
980  }
981 
982  if (reply->What() != kPROOF_GETENTRIES) {
983  // Not what we want: handover processing to the central machinery
984  Int_t what = reply->What();
985  ((TProof*)gProof)->HandleInputMessage(slave, reply);
986  if (what == kPROOF_FATAL) {
987  Error("ValidateFiles", "kPROOF_FATAL from worker-%s (%s)",
988  slave->GetOrdinal(), slave->GetName());
989  fValid = kFALSE;
990  } else {
991  // Reactivate the socket
992  mon.Activate(sock);
993  }
994  // Get next message
995  continue;
996  }
997 
998  TSlaveStat *slavestat = (TSlaveStat*) fSlaveStats->GetValue( slave );
999  TDSetElement *e = slavestat->fCurFile->GetElement();
1000  slavestat->fCurFile->GetNode()->DecSlaveCnt(slavestat->GetName());
1001  Long64_t entries;
1002 
1003  (*reply) >> entries;
1004 
1005  // Extract object name, if there
1006  if ((reply->BufferSize() > reply->Length())) {
1007  TString objname;
1008  (*reply) >> objname;
1009  e->SetTitle(objname);
1010  }
1011 
1012  e->SetTDSetOffset(entries);
1013  if ( entries > 0 ) {
1014 
1015  // This dataset element is most likely valid
1016  e->SetValid();
1017 
1018  //if (!e->GetEventList()) {
1019  if (!e->GetEntryList()){
1020  if ( e->GetFirst() > entries ) {
1021  Error("ValidateFiles", "first (%lld) higher then number of entries (%lld) in %s",
1022  e->GetFirst(), entries, e->GetFileName());
1023 
1024  // Invalidate the element
1025  slavestat->fCurFile->SetDone();
1026  e->Invalidate();
1027  dset->SetBit(TDSet::kSomeInvalid);
1028  }
1029 
1030  if ( e->GetNum() == -1 ) {
1031  e->SetNum( entries - e->GetFirst() );
1032  } else if ( e->GetFirst() + e->GetNum() > entries ) {
1033  Error("ValidateFiles",
1034  "num (%lld) + first (%lld) larger then number of keys/entries (%lld) in %s",
1035  e->GetNum(), e->GetFirst(), entries, e->GetFileName());
1036  e->SetNum(entries - e->GetFirst());
1037  }
1038  }
1039 
1040  // Count
1041  totent += entries;
1042  nopenf++;
1043 
1044  // Notify the client
1045  n++;
1046  gProof->SendDataSetStatus(msg, n, tot, st);
1047 
1048  } else {
1049 
1050  Error("ValidateFiles", "cannot get entries for %s (", e->GetFileName() );
1051  //
1052  // Need to fix this with a user option to allow incomplete file sets (rdm)
1053  //
1054  //fValid = kFALSE; // all element must be readable!
1055  if (gProofServ) {
1056  TMessage m(kPROOF_MESSAGE);
1057  m << TString(Form("Cannot get entries for file: %s - skipping", e->GetFileName()));
1058  gProofServ->GetSocket()->Send(m);
1059  }
1060 
1061  // Invalidate the element
1062  e->Invalidate();
1063  dset->SetBit(TDSet::kSomeInvalid);
1064  }
1065  PDB(kPacketizer,3) Info("ValidateFiles", " %lld events validated", totent);
1066 
1067  // Ready for the next job, unless we have enough files
1068  if (maxent < 0 || ((totent < maxent) && !byfile))
1069  workers.Add(slave);
1070  }
1071 
1072  // report std. output from slaves??
1073 
1074  ((TProof*)gProof)->ActivateAsyncInput();
1075 
1076  // This needs to be reset
1077  ((TProof*)gProof)->fCurrentMonitor = 0;
1078 
1079  // No reason to continue if invalid
1080  if (!fValid)
1081  return;
1082 
1083 
1084  // compute the offset for each file element
1085  Long64_t offset = 0;
1086  Long64_t newOffset = 0;
1087  TIter next(dset->GetListOfElements());
1088  TDSetElement *el;
1089  while ( (el = dynamic_cast<TDSetElement*> (next())) ) {
1090  newOffset = offset + el->GetTDSetOffset();
1091  el->SetTDSetOffset(offset);
1092  offset = newOffset;
1093  }
1094 }
1095 
1096 ////////////////////////////////////////////////////////////////////////////////
1097 /// Get entries processed by the specified slave.
1098 
1099 Long64_t TPacketizer::GetEntriesProcessed(TSlave *slave) const
1100 {
1101  if ( fSlaveStats == 0 ) return 0;
1102 
1103  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( slave );
1104 
1105  if ( slstat == 0 ) return 0;
1106 
1107  return slstat->GetEntriesProcessed();
1108 }
1109 
1110 ////////////////////////////////////////////////////////////////////////////////
1111 /// Get Estimation of the current rate; just summing the current rates of
1112 /// the active workers
1113 
1114 Float_t TPacketizer::GetCurrentRate(Bool_t &all)
1115 {
1116  all = kTRUE;
1117  // Loop over the workers
1118  Float_t currate = 0.;
1119  if (fSlaveStats && fSlaveStats->GetSize() > 0) {
1120  TIter nxw(fSlaveStats);
1121  TObject *key;
1122  while ((key = nxw()) != 0) {
1123  TSlaveStat *slstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1124  if (slstat && slstat->GetProgressStatus() && slstat->GetEntriesProcessed() > 0) {
1125  // Sum-up the current rates
1126  currate += slstat->GetProgressStatus()->GetCurrentRate();
1127  } else {
1128  all = kFALSE;
1129  }
1130  }
1131  }
1132  // Done
1133  return currate;
1134 }
1135 
1136 ////////////////////////////////////////////////////////////////////////////////
1137 /// Get next packet
1138 
1139 TDSetElement *TPacketizer::GetNextPacket(TSlave *sl, TMessage *r)
1140 {
1141  if ( !fValid ) {
1142  return 0;
1143  }
1144 
1145  // Find worker
1146 
1147  TSlaveStat *slstat = (TSlaveStat*) fSlaveStats->GetValue( sl );
1148 
1149  R__ASSERT( slstat != 0 );
1150 
1151  PDB(kPacketizer,1)
1152  Info("GetNextPacket","worker-%s (%s)", sl->GetOrdinal(), sl->GetName());
1153  // update stats & free old element
1154 
1155  Bool_t firstPacket = kFALSE;
1156  if ( slstat->fCurElem != 0 ) {
1157  Double_t latency = 0., proctime = 0., proccpu = 0.;
1158  Long64_t bytesRead = -1;
1159  Long64_t totalEntries = -1;
1160  Long64_t totev = 0;
1161  Long64_t numev = slstat->fCurElem->GetNum();
1162 
1163  fPackets->Add(slstat->fCurElem);
1164 
1165  if (sl->GetProtocol() > 18) {
1166  TProofProgressStatus *status = 0;
1167  (*r) >> latency;
1168  (*r) >> status;
1169 
1170  // Calculate the progress made in the last packet
1171  TProofProgressStatus *progress = 0;
1172  if (status) {
1173  // upadte the worker status
1174  numev = status->GetEntries() - slstat->GetEntriesProcessed();
1175  progress = slstat->AddProcessed(status);
1176  if (progress) {
1177  // (*fProgressStatus) += *progress;
1178  proctime = progress->GetProcTime();
1179  proccpu = progress->GetCPUTime();
1180  totev = status->GetEntries(); // for backward compatibility
1181  bytesRead = progress->GetBytesRead();
1182  delete progress;
1183  }
1184  delete status;
1185  } else
1186  Error("GetNextPacket", "no status came in the kPROOF_GETPACKET message");
1187  } else {
1188 
1189  (*r) >> latency >> proctime >> proccpu;
1190 
1191  // only read new info if available
1192  if (r->BufferSize() > r->Length()) (*r) >> bytesRead;
1193  if (r->BufferSize() > r->Length()) (*r) >> totalEntries;
1194  if (r->BufferSize() > r->Length()) (*r) >> totev;
1195 
1196  numev = totev - slstat->GetEntriesProcessed();
1197  if (numev > 0) slstat->GetProgressStatus()->IncEntries(numev);
1198  if (bytesRead > 0) slstat->GetProgressStatus()->IncBytesRead(bytesRead);
1199  if (numev > 0 || bytesRead > 0) slstat->GetProgressStatus()->SetLastUpdate();
1200  }
1201 
1202  if (fProgressStatus) {
1203  if (numev > 0) fProgressStatus->IncEntries(numev);
1204  if (bytesRead > 0) fProgressStatus->IncBytesRead(bytesRead);
1205  if (numev > 0 || bytesRead > 0) fProgressStatus->SetLastUpdate();
1206  }
1207  PDB(kPacketizer,2)
1208  Info("GetNextPacket","worker-%s (%s): %lld %7.3lf %7.3lf %7.3lf %lld",
1209  sl->GetOrdinal(), sl->GetName(),
1210  numev, latency, proctime, proccpu, bytesRead);
1211 
1212  if (gPerfStats)
1213  gPerfStats->PacketEvent(sl->GetOrdinal(), sl->GetName(), slstat->fCurElem->GetFileName(),
1214  numev, latency, proctime, proccpu, bytesRead);
1215 
1216  slstat->fCurElem = 0;
1217  if (fProgressStatus && fProgressStatus->GetEntries() == fTotalEntries) {
1218  HandleTimer(0); // Send last timer message
1219  delete fProgress; fProgress = 0;
1220  }
1221  } else {
1222  firstPacket = kTRUE;
1223  }
1224 
1225  if ( fStop ) {
1226  HandleTimer(0);
1227  return 0;
1228  }
1229 
1230  // get a file if needed
1231 
1232  TFileStat *file = slstat->fCurFile;
1233 
1234  if ( file != 0 && file->IsDone() ) {
1235  file->GetNode()->DecSlaveCnt(slstat->GetName());
1236  if (gPerfStats)
1237  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(), file->GetNode()->GetName(),
1238  file->GetElement()->GetFileName(), kFALSE);
1239  file = 0;
1240  }
1241  // Reset the current file field
1242  slstat->fCurFile = file;
1243 
1244  if (!file) {
1245 
1246  // Try its own node first
1247  if (slstat->GetFileNode() != 0) {
1248  file = GetNextUnAlloc(slstat->GetFileNode());
1249  if (!file) {
1250  slstat->SetFileNode(0);
1251  }
1252  }
1253 
1254  // try to find an unused filenode first
1255  if (!file) {
1256  file = GetNextUnAlloc();
1257  }
1258 
1259  // then look at the active filenodes
1260  if (!file) {
1261  file = GetNextActive();
1262  }
1263 
1264  if (!file) return 0;
1265 
1266  slstat->fCurFile = file;
1267  file->GetNode()->IncSlaveCnt(slstat->GetName());
1268  if (gPerfStats)
1269  gPerfStats->FileEvent(sl->GetOrdinal(), sl->GetName(),
1270  file->GetNode()->GetName(),
1271  file->GetElement()->GetFileName(), kTRUE);
1272  }
1273 
1274  // get a packet
1275 
1276  TDSetElement *base = file->GetElement();
1277  Long64_t num = Long64_t(fPacketSize*(Float_t)slstat->fSlave->GetPerfIdx()/fMaxPerfIdx);
1278  if (num < 1) num = 1;
1279 
1280  Long64_t first = file->GetNextEntry();
1281  Long64_t last = base->GetFirst() + base->GetNum();
1282 
1283  if ( first + num >= last ) {
1284  num = last - first;
1285  file->SetDone(); // done
1286 
1287  // delete file from active list (unalloc list is single pass, no delete needed)
1288  RemoveActive(file);
1289 
1290  } else {
1291  file->MoveNextEntry(num);
1292  }
1293 
1294 
1295  slstat->fCurElem = CreateNewPacket(base, first, num);
1296  if (base->GetEntryList())
1297  slstat->fCurElem->SetEntryList(base->GetEntryList(), first, num);
1298 
1299  // Flag the first packet of a new run (dataset)
1300  if (firstPacket)
1301  slstat->fCurElem->SetBit(TDSetElement::kNewRun);
1302  else
1303  slstat->fCurElem->ResetBit(TDSetElement::kNewRun);
1304 
1305  PDB(kPacketizer,2)
1306  Info("GetNextPacket","%s: %s %lld %lld", sl->GetOrdinal(), base->GetFileName(), first, num);
1307 
1308  return slstat->fCurElem;
1309 }
1310 
1311 ////////////////////////////////////////////////////////////////////////////////
1312 /// Return the number of workers still processing
1313 
1314 Int_t TPacketizer::GetActiveWorkers()
1315 {
1316  Int_t actw = 0;
1317  TIter nxw(fSlaveStats);
1318  TObject *key;
1319  while ((key = nxw())) {
1320  TSlaveStat *wrkstat = (TSlaveStat *) fSlaveStats->GetValue(key);
1321  if (wrkstat && wrkstat->fCurFile) actw++;
1322  }
1323  // Done
1324  return actw;
1325 }