Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProofPlayer.h
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 07/01/02
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TProofPlayer
13 #define ROOT_TProofPlayer
14 
15 
16 //////////////////////////////////////////////////////////////////////////
17 // //
18 // TProofPlayer //
19 // //
20 // This internal class and its subclasses steer the processing in PROOF.//
21 // Instances of the TProofPlayer class are created on the worker nodes //
22 // per session and do the processing. //
23 // Instances of its subclass - TProofPlayerRemote are created per each //
24 // query on the master(s) and on the client. On the master(s), //
25 // TProofPlayerRemote coordinate processing, check the dataset, create //
26 // the packetizer and take care of merging the results of the workers. //
27 // The instance on the client collects information on the input //
28 // (dataset and selector), it invokes the Begin() method and finalizes //
29 // the query by calling Terminate(). //
30 // //
31 //////////////////////////////////////////////////////////////////////////
32 
33 #include "TVirtualProofPlayer.h"
34 #include "TArrayL64.h"
35 #include "TArrayF.h"
36 #include "TArrayI.h"
37 #include "TList.h"
38 #include "TSystem.h"
39 #include "TQueryResult.h"
40 #include "TProofProgressStatus.h"
41 #include "TError.h"
42 
43 #include <mutex>
44 
45 class TSelector;
46 class TSocket;
47 class TVirtualPacketizer;
48 class TSlave;
49 class TEventIter;
50 class TProofStats;
51 class TStatus;
52 class TTimer;
53 class THashList;
54 class TH1;
55 class TFile;
56 class TStopwatch;
57 
58 //------------------------------------------------------------------------
59 
60 class TProofPlayer : public TVirtualProofPlayer {
61 
62 private:
63  TList *fAutoBins; // Map of min/max values by name for slaves
64 
65 protected:
66  TList *fInput; //-> list with input objects
67  THashList *fOutput; // list with output objects
68  TSelector *fSelector; //! the latest selector
69  Bool_t fCreateSelObj; //! kTRUE when fSelector has been created locally
70  TClass *fSelectorClass; //! class of the latest selector
71  TTimer *fFeedbackTimer; //! timer for sending intermediate results
72  Long_t fFeedbackPeriod; //! period (ms) for sending intermediate results
73  TEventIter *fEvIter; //! iterator on events or objects
74  TStatus *fSelStatus; //! status of query in progress
75  EExitStatus fExitStatus; // exit status
76  Long64_t fTotalEvents; // number of events requested
77  TProofProgressStatus *fProgressStatus; // the progress status object;
78 
79  Long64_t fReadBytesRun; //! Bytes read in this run
80  Long64_t fReadCallsRun; //! Read calls in this run
81  Long64_t fProcessedRun; //! Events processed in this run
82 
83  TList *fQueryResults; //List of TQueryResult
84  TQueryResult *fQuery; //Instance of TQueryResult currently processed
85  TQueryResult *fPreviousQuery; //Previous instance of TQueryResult processed
86  Int_t fDrawQueries; //Number of Draw queries in the list
87  Int_t fMaxDrawQueries; //Max number of Draw queries kept
88 
89  TTimer *fStopTimer; //Timer associated with a stop request
90  std::mutex fStopTimerMtx; //To protect the stop timer
91 
92  TTimer *fDispatchTimer; //Dispatch pending events while processing
93 
94  TTimer *fProcTimeTimer; //Notifies reaching of allowed max proc time
95  TStopwatch *fProcTime; //Packet proc time
96 
97  TString fOutputFilePath; //Path to file with (partial) results of the query
98  TFile *fOutputFile; //TFile object attached to fOutputFilePath
99  Long_t fSaveMemThreshold; //Threshold for saving output to file
100  Bool_t fSavePartialResults; //Whether to save the partial results
101  Bool_t fSaveResultsPerPacket; //Whether to save partial results after each packet
102 
103  static THashList *fgDrawInputPars; // List of input parameters to be kept on drawing actions
104 
105  void *GetSender() { return this; } //used to set gTQSender
106 
107  virtual Int_t DrawCanvas(TObject *obj); // Canvas drawing via libProofDraw
108 
109  virtual void SetupFeedback(); // specialized setup
110 
111  virtual void MergeOutput(Bool_t savememvalues = kFALSE);
112 
113 public: // fix for broken compilers so TCleanup can call StopFeedback()
114  virtual void StopFeedback(); // specialized teardown
115 
116 protected:
117  class TCleanup {
118  private:
119  TProofPlayer *fPlayer;
120  public:
121  TCleanup(TProofPlayer *p) : fPlayer(p) { }
122  ~TCleanup() { fPlayer->StopFeedback(); }
123  };
124 
125  Int_t AssertSelector(const char *selector_file);
126  Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg);
127 
128  void MapOutputListToDataMembers() const;
129 
130 public:
131  enum EStatusBits { kDispatchOneEvent = BIT(15), kIsProcessing = BIT(16),
132  kMaxProcTimeReached = BIT(17), kMaxProcTimeExtended = BIT(18) };
133 
134  TProofPlayer(TProof *proof = 0);
135  virtual ~TProofPlayer();
136 
137  Long64_t Process(TDSet *set,
138  const char *selector, Option_t *option = "",
139  Long64_t nentries = -1, Long64_t firstentry = 0);
140  Long64_t Process(TDSet *set,
141  TSelector *selector, Option_t *option = "",
142  Long64_t nentries = -1, Long64_t firstentry = 0);
143  virtual Bool_t JoinProcess(TList *workers);
144  TVirtualPacketizer *GetPacketizer() const { return 0; }
145  Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
146  Long64_t Finalize(TQueryResult *qr);
147  Long64_t DrawSelect(TDSet *set, const char *varexp,
148  const char *selection, Option_t *option = "",
149  Long64_t nentries = -1, Long64_t firstentry = 0);
150  Int_t GetDrawArgs(const char *var, const char *sel, Option_t *opt,
151  TString &selector, TString &objname);
152  void HandleGetTreeHeader(TMessage *mess);
153  void HandleRecvHisto(TMessage *mess);
154  void FeedBackCanvas(const char *name, Bool_t create);
155 
156  void StopProcess(Bool_t abort, Int_t timeout = -1);
157  void AddInput(TObject *inp);
158  void ClearInput();
159  TObject *GetOutput(const char *name) const;
160  TList *GetOutputList() const;
161  TList *GetInputList() const { return fInput; }
162  TList *GetListOfResults() const { return fQueryResults; }
163  void AddQueryResult(TQueryResult *q);
164  TQueryResult *GetCurrentQuery() const { return fQuery; }
165  TQueryResult *GetQueryResult(const char *ref);
166  void RemoveQueryResult(const char *ref);
167  void SetCurrentQuery(TQueryResult *q);
168  void SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
169  void RestorePreviousQuery() { fQuery = fPreviousQuery; }
170  Int_t AddOutputObject(TObject *obj);
171  void AddOutput(TList *out); // Incorporate a list
172  void StoreOutput(TList *out); // Adopts the list
173  void StoreFeedback(TObject *slave, TList *out); // Adopts the list
174  void Progress(Long64_t total, Long64_t processed); // *SIGNAL*
175  void Progress(TSlave *, Long64_t total, Long64_t processed)
176  { Progress(total, processed); }
177  void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
178  Float_t initTime, Float_t procTime,
179  Float_t evtrti, Float_t mbrti); // *SIGNAL*
180  void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
181  Float_t initTime, Float_t procTime,
182  Float_t evtrti, Float_t mbrti)
183  { Progress(total, processed, bytesread, initTime, procTime,
184  evtrti, mbrti); } // *SIGNAL*
185  void Progress(TProofProgressInfo *pi); // *SIGNAL*
186  void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); } // *SIGNAL*
187  void Feedback(TList *objs); // *SIGNAL*
188 
189  TDrawFeedback *CreateDrawFeedback(TProof *p);
190  void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
191  void DeleteDrawFeedback(TDrawFeedback *f);
192 
193  TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
194 
195  Int_t ReinitSelector(TQueryResult *qr);
196 
197  void UpdateAutoBin(const char *name,
198  Double_t& xmin, Double_t& xmax,
199  Double_t& ymin, Double_t& ymax,
200  Double_t& zmin, Double_t& zmax);
201 
202  Bool_t IsClient() const { return kFALSE; }
203 
204  void SetExitStatus(EExitStatus st) { fExitStatus = st; }
205  EExitStatus GetExitStatus() const { return fExitStatus; }
206  Long64_t GetEventsProcessed() const { return fProgressStatus->GetEntries(); }
207  void AddEventsProcessed(Long64_t ev) { fProgressStatus->IncEntries(ev); }
208 
209  void SetDispatchTimer(Bool_t on = kTRUE);
210  void SetStopTimer(Bool_t on = kTRUE,
211  Bool_t abort = kFALSE, Int_t timeout = 0);
212 
213  virtual void SetInitTime() { }
214 
215  virtual void SetMerging(Bool_t = kTRUE) { }
216 
217  Long64_t GetCacheSize();
218  Int_t GetLearnEntries();
219 
220  void SetOutputFilePath(const char *fp) { fOutputFilePath = fp; }
221  Int_t SavePartialResults(Bool_t queryend = kFALSE, Bool_t force = kFALSE);
222 
223  void SetProcessing(Bool_t on = kTRUE);
224  TProofProgressStatus *GetProgressStatus() const { return fProgressStatus; }
225 
226  void UpdateProgressInfo();
227 
228  ClassDef(TProofPlayer,0) // Basic PROOF player
229 };
230 
231 
232 //------------------------------------------------------------------------
233 
234 class TProofPlayerLocal : public TProofPlayer {
235 
236 private:
237  Bool_t fIsClient;
238 
239 protected:
240  void SetupFeedback() { }
241  void StopFeedback() { }
242 
243 public:
244  TProofPlayerLocal(Bool_t client = kTRUE) : fIsClient(client) { }
245  virtual ~TProofPlayerLocal() { }
246 
247  Bool_t IsClient() const { return fIsClient; }
248  Long64_t Process(const char *selector, Long64_t nentries = -1, Option_t *option = "");
249  Long64_t Process(TSelector *selector, Long64_t nentries = -1, Option_t *option = "");
250  Long64_t Process(TDSet *set,
251  const char *selector, Option_t *option = "",
252  Long64_t nentries = -1, Long64_t firstentry = 0) {
253  return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
254  Long64_t Process(TDSet *set,
255  TSelector *selector, Option_t *option = "",
256  Long64_t nentries = -1, Long64_t firstentry = 0) {
257  return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
258  ClassDef(TProofPlayerLocal,0) // PROOF player running on client
259 };
260 
261 
262 //------------------------------------------------------------------------
263 
264 //////////////////////////////////////////////////////////////////////////
265 // //
266 // TProofPlayerRemote //
267 // //
268 // Instances of TProofPlayerRemote are created per each query on the //
269 // master(s) and on the client. On the master(s), TProofPlayerRemote //
270 // coordinate processing, check the dataset, create the packetizer //
271 // and take care of merging the results of the workers. //
272 // The instance on the client collects information on the input //
273 // (dataset and selector), it invokes the Begin() method and finalizes //
274 // the query by calling Terminate(). //
275 // //
276 //////////////////////////////////////////////////////////////////////////
277 
278 
279 class TProofPlayerRemote : public TProofPlayer {
280 
281 protected:
282  TProof *fProof; // link to associated PROOF session
283  TList *fOutputLists; // results returned by slaves
284  TList *fFeedback; // reference for use on master
285  TList *fFeedbackLists; // intermediate results
286  TVirtualPacketizer *fPacketizer; // transform TDSet into packets for slaves
287  Bool_t fMergeFiles; // is True when merging output files centrally is needed
288  TDSet *fDSet; //!tdset for current processing
289  ErrorHandlerFunc_t fErrorHandler; // Store previous handler when redirecting output
290  Bool_t fMergeTH1OneByOne; // If kTRUE forces TH1 merge one-by-one [kTRUE]
291  TH1 *fProcPackets; //!Histogram with packets being processed (owned by TPerfStats)
292  TMessage *fProcessMessage; // Process message to replay when adding new workers dynamically
293  TString fSelectorFileName; // Current Selector's name, set by Process()
294 
295  TStopwatch *fMergeSTW; // Merging stop watch
296  Int_t fNumMergers; // Number of submergers
297 
298  virtual Bool_t HandleTimer(TTimer *timer);
299  Int_t InitPacketizer(TDSet *dset, Long64_t nentries,
300  Long64_t first, const char *defpackunit,
301  const char *defpackdata);
302  TList *MergeFeedback();
303  Bool_t MergeOutputFiles();
304  void NotifyMemory(TObject *obj);
305  void SetLastMergingMsg(TObject *obj);
306  virtual Bool_t SendSelector(const char *selector_file); //send selector to slaves
307  TProof *GetProof() const { return fProof; }
308  void SetupFeedback(); // specialized setup
309  void StopFeedback(); // specialized teardown
310  void SetSelectorDataMembersFromOutputList();
311 
312 public:
313  TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
314  fFeedbackLists(0), fPacketizer(0),
315  fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0),
316  fMergeTH1OneByOne(kTRUE), fProcPackets(0),
317  fProcessMessage(0), fMergeSTW(0), fNumMergers(0)
318  { fProgressStatus = new TProofProgressStatus(); }
319  virtual ~TProofPlayerRemote(); // Owns the fOutput list
320  virtual Long64_t Process(TDSet *set, const char *selector,
321  Option_t *option = "", Long64_t nentries = -1,
322  Long64_t firstentry = 0);
323  virtual Long64_t Process(TDSet *set, TSelector *selector,
324  Option_t *option = "", Long64_t nentries = -1,
325  Long64_t firstentry = 0);
326  virtual Bool_t JoinProcess(TList *workers);
327  virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
328  virtual Long64_t Finalize(TQueryResult *qr);
329  Long64_t DrawSelect(TDSet *set, const char *varexp,
330  const char *selection, Option_t *option = "",
331  Long64_t nentries = -1, Long64_t firstentry = 0);
332 
333  void RedirectOutput(Bool_t on = kTRUE);
334  void StopProcess(Bool_t abort, Int_t timeout = -1);
335  void StoreOutput(TList *out); // Adopts the list
336  virtual void StoreFeedback(TObject *slave, TList *out); // Adopts the list
337  Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged);
338  TObject *HandleHistogram(TObject *obj, Bool_t &merged);
339  Bool_t HistoSameAxis(TH1 *h0, TH1 *h1);
340  Int_t AddOutputObject(TObject *obj);
341  void AddOutput(TList *out); // Incorporate a list
342  virtual void MergeOutput(Bool_t savememvalues = kFALSE);
343  void Progress(Long64_t total, Long64_t processed); // *SIGNAL*
344  void Progress(TSlave*, Long64_t total, Long64_t processed)
345  { Progress(total, processed); }
346  void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
347  Float_t initTime, Float_t procTime,
348  Float_t evtrti, Float_t mbrti); // *SIGNAL*
349  void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
350  Float_t initTime, Float_t procTime,
351  Float_t evtrti, Float_t mbrti)
352  { Progress(total, processed, bytesread, initTime, procTime,
353  evtrti, mbrti); } // *SIGNAL*
354  void Progress(TProofProgressInfo *pi); // *SIGNAL*
355  void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); } // *SIGNAL*
356  void Feedback(TList *objs); // *SIGNAL*
357  TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
358  TVirtualPacketizer *GetPacketizer() const { return fPacketizer; }
359 
360  Bool_t IsClient() const;
361 
362  void SetInitTime();
363 
364  void SetMerging(Bool_t on = kTRUE);
365 
366  ClassDef(TProofPlayerRemote,0) // PROOF player running on master server
367 };
368 
369 
370 //------------------------------------------------------------------------
371 
372 class TProofPlayerSlave : public TProofPlayer {
373 
374 private:
375  TSocket *fSocket;
376  TList *fFeedback; // List of objects to send updates of
377 
378  Bool_t HandleTimer(TTimer *timer);
379 
380 protected:
381  void SetupFeedback();
382  void StopFeedback();
383 
384 public:
385  TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }
386 
387  void HandleGetTreeHeader(TMessage *mess);
388 
389  ClassDef(TProofPlayerSlave,0) // PROOF player running on slave server
390 };
391 
392 
393 //------------------------------------------------------------------------
394 
395 class TProofPlayerSuperMaster : public TProofPlayerRemote {
396 
397 private:
398  TArrayL64 fSlaveProgress;
399  TArrayL64 fSlaveTotals;
400  TArrayL64 fSlaveBytesRead;
401  TArrayF fSlaveInitTime;
402  TArrayF fSlaveProcTime;
403  TArrayF fSlaveEvtRti;
404  TArrayF fSlaveMBRti;
405  TArrayI fSlaveActW;
406  TArrayI fSlaveTotS;
407  TArrayF fSlaveEffS;
408  TList fSlaves;
409  Bool_t fReturnFeedback;
410 
411 protected:
412  Bool_t HandleTimer(TTimer *timer);
413  void SetupFeedback();
414 
415 public:
416  TProofPlayerSuperMaster(TProof *proof = 0) :
417  TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
418  virtual ~TProofPlayerSuperMaster() { }
419 
420  Long64_t Process(TDSet *set, const char *selector,
421  Option_t *option = "", Long64_t nentries = -1,
422  Long64_t firstentry = 0);
423  Long64_t Process(TDSet *set, TSelector *selector,
424  Option_t *option = "", Long64_t nentries = -1,
425  Long64_t firstentry = 0)
426  { return TProofPlayerRemote::Process(set, selector, option,
427  nentries, firstentry); }
428  void Progress(Long64_t total, Long64_t processed)
429  { TProofPlayerRemote::Progress(total, processed); }
430  void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
431  Float_t initTime, Float_t procTime,
432  Float_t evtrti, Float_t mbrti)
433  { TProofPlayerRemote::Progress(total, processed, bytesread,
434  initTime, procTime, evtrti, mbrti); }
435  void Progress(TProofProgressInfo *pi) { TProofPlayerRemote::Progress(pi); }
436  void Progress(TSlave *sl, Long64_t total, Long64_t processed);
437  void Progress(TSlave *sl, Long64_t total, Long64_t processed, Long64_t bytesread,
438  Float_t initTime, Float_t procTime,
439  Float_t evtrti, Float_t mbrti);
440  void Progress(TSlave *sl, TProofProgressInfo *pi);
441 
442  ClassDef(TProofPlayerSuperMaster,0) // PROOF player running on super master
443 };
444 
445 #endif