Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TVirtualPacketizer.h
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 9/7/2002
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TVirtualPacketizer
13 #define ROOT_TVirtualPacketizer
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TVirtualPacketizer //
18 // //
19 // Packetizer is a load balancing object created for each query. //
20 // It generates packets to be processed on PROOF worker servers. //
21 // A packet is an event range (begin entry and number of entries) or //
22 // object range (first object and number of objects) in a TTree //
23 // (entries) or a directory (objects) in a file. //
24 // Packets are generated taking into account the performance of the //
25 // remote machine, the time it took to process a previous packet on //
26 // the remote machine, the locality of the database files, etc. //
27 // //
28 // TVirtualPacketizer includes common parts of PROOF packetizers. //
29 // Look in subclasses for details. //
30 // The default packetizer is TPacketizerAdaptive. //
31 // To use an alternative one, for instance - the TPacketizer, call: //
32 // proof->SetParameter("PROOF_Packetizer", "TPacketizer"); //
33 // //
34 //////////////////////////////////////////////////////////////////////////
35 
36 #include "TObject.h"
37 #include "TSlave.h"
38 #include "TProofProgressStatus.h"
39 #include "TTime.h"
40 
41 
42 class TDSet;
43 class TDSetElement;
44 class TList;
45 class TMap;
46 class TMessage;
47 class TNtuple;
48 class TNtupleD;
49 class TProofProgressInfo;
50 class TSlave;
51 
52 
53 class TVirtualPacketizer : public TObject {
54 
55 public: // public because of Sun CC bug
56  class TVirtualSlaveStat;
57 
58 protected:
59  enum EUseEstOpt { // Option for usage of estimated values
60  kEstOff = 0,
61  kEstCurrent = 1,
62  kEstAverage = 2
63  };
64 
65  // General configuration parameters
66  Double_t fMinPacketTime; // minimum packet time
67  Double_t fMaxPacketTime; // maximum packet time
68  TList *fConfigParams; // List of configuration parameters
69 
70  TMap *fSlaveStats; // slave status, keyed by correspondig TSlave
71 
72  TProofProgressStatus *fProgressStatus; // pointer to status in the player.
73  TTimer *fProgress; // progress updates timer
74 
75  Long64_t fTotalEntries; // total number of entries to be distributed;
76  // not used in the progressive packetizer
77  TList *fFailedPackets;// a list of packets that failed while processing
78 
79  // Members for progress info
80  TTime fStartTime; // time offset
81  Float_t fInitTime; // time before processing
82  Float_t fProcTime; // time since start of processing
83  Float_t fTimeUpdt; // time between updates
84  TNtupleD *fCircProg; // Keeps circular info for "instantenous"
85  // rate calculations
86  Long_t fCircN; // Circularity
87 
88  TNtuple *fProgressPerf; // {Active workers, evt rate, MBs read} as a function of processing time
89  Float_t fProcTimeLast; // Time of the last measurement
90  Int_t fActWrksLast; // Active workers at fProcTimeLast
91  Float_t fEvtRateLast; // Evt rate at fProcTimeLast
92  Float_t fMBsReadLast; // MBs read at fProcTimeLast
93  Float_t fEffSessLast; // Number of effective sessions at fProcTimeLast
94  Bool_t fAWLastFill; // Whether to fill the last measurement
95  Float_t fReportPeriod; // Time between reports if nothing changes (estimated proc time / 100)
96 
97  EUseEstOpt fUseEstOpt; // Control usage of estimated values for the progress info
98 
99  Bool_t fValid; // Constructed properly?
100  Bool_t fStop; // Termination of Process() requested?
101 
102  TString fDataSet; // Name of the dataset being processed (for dataset-driven runs)
103 
104  TList *fInput; // Input list
105 
106  TVirtualPacketizer(TList *input, TProofProgressStatus *st = 0);
107  TVirtualPacketizer(const TVirtualPacketizer &); // no implementation, will generate
108  void operator=(const TVirtualPacketizer &); // error on accidental usage
109 
110  TDSetElement *CreateNewPacket(TDSetElement* base, Long64_t first, Long64_t num);
111  Long64_t GetEntries(Bool_t tree, TDSetElement *e); // Num of entries or objects
112  virtual Bool_t HandleTimer(TTimer *timer);
113 
114 public:
115  enum EStatusBits { kIsInitializing = BIT(16), kIsDone = BIT(17), kIsTree = BIT(18) };
116  virtual ~TVirtualPacketizer();
117 
118  virtual Int_t AssignWork(TDSet* /*dset*/, Long64_t /*first*/, Long64_t /*num*/) { return -1; }
119  Bool_t IsValid() const { return fValid; }
120  Long64_t GetEntriesProcessed() const { return (fProgressStatus? fProgressStatus->GetEntries() : 0); }
121  virtual Int_t GetEstEntriesProcessed(Float_t, Long64_t &ent, Long64_t &bytes, Long64_t &calls)
122  { ent = GetEntriesProcessed(); bytes = GetBytesRead(); calls = GetReadCalls(); return 0; }
123  virtual Float_t GetCurrentRate(Bool_t &all) { all = kTRUE; return (fProgressStatus? fProgressStatus->GetCurrentRate() : 0.); }
124  Long64_t GetTotalEntries() const { return fTotalEntries; }
125  virtual TDSetElement *GetNextPacket(TSlave *sl, TMessage *r);
126  virtual void SetInitTime();
127  virtual void StopProcess(Bool_t abort, Bool_t stoptimer = kFALSE);
128  TList *GetFailedPackets() { return fFailedPackets; }
129  void SetFailedPackets(TList *list) { fFailedPackets = list; }
130  virtual Int_t AddWorkers(TList *workers);
131 
132  Long64_t GetBytesRead() const { return (fProgressStatus? fProgressStatus->GetBytesRead() : 0); }
133  Long64_t GetReadCalls() const { return (fProgressStatus? fProgressStatus->GetReadCalls() : 0); }
134  Double_t GetCumProcTime() const { return fProgressStatus->GetProcTime(); }
135  Float_t GetInitTime() const { return fInitTime; }
136  Float_t GetProcTime() const { return fProcTime; }
137  TNtuple *GetProgressPerf(Bool_t steal = kFALSE) { if (steal) { TNtuple *n = fProgressPerf; fProgressPerf = 0; return n;
138  } else { return fProgressPerf;} }
139  TList *GetConfigParams(Bool_t steal = kFALSE) { if (steal) { TList *l = fConfigParams; fConfigParams = 0; return l;
140  } else { return fConfigParams;} }
141  virtual void MarkBad(TSlave * /*s*/, TProofProgressStatus * /*status*/, TList ** /*missingFiles*/) { return; }
142  virtual Int_t AddProcessed(TSlave * /*sl*/, TProofProgressStatus * /*st*/,
143  Double_t /*lat*/, TList ** /*missingFiles*/) { return 0; }
144  TProofProgressStatus *GetStatus() { return fProgressStatus; }
145  void SetProgressStatus(TProofProgressStatus *st) { fProgressStatus = st; }
146  void SetTotalEntries(Long64_t ent) { fTotalEntries = ent; }
147 
148  TMap *GetSlaveStats() const { return fSlaveStats; }
149 
150  virtual Int_t GetActiveWorkers() { return -1; }
151 
152  ClassDef(TVirtualPacketizer,0) //Generate work packets for parallel processing
153 };
154 
155 //------------------------------------------------------------------------------
156 
157 class TVirtualPacketizer::TVirtualSlaveStat : public TObject {
158 
159 friend class TPacketizerAdaptive;
160 friend class TPacketizer;
161 
162 protected:
163  TString fWrkFQDN; // Worker FQDN
164  TSlave *fSlave; // corresponding TSlave record
165  TProofProgressStatus *fStatus; // status as of the last finished packet
166 
167 public:
168  const char *GetName() const { return fWrkFQDN.Data(); }
169  const char *GetOrdinal() const { return fSlave->GetOrdinal(); }
170  Long64_t GetEntriesProcessed() const { return fStatus?fStatus->GetEntries():-1; }
171  Double_t GetProcTime() const { return fStatus?fStatus->GetProcTime():-1; }
172  Float_t GetAvgRate() { return fStatus->GetRate(); }
173  TProofProgressStatus *GetProgressStatus() { return fStatus; }
174  virtual TProofProgressStatus *AddProcessed(TProofProgressStatus *st) = 0;
175 };
176 
177 #endif