Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TVirtualPacketizer.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: Maarten Ballintijn 9/7/2002
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TVirtualPacketizer
13 \ingroup proofkernel
14 
15 The packetizer is a load balancing object created for each query.
16 It generates packets to be processed on PROOF worker servers.
17 A packet is an event range (begin entry and number of entries) or
18 object range (first object and number of objects) in a TTree
19 (entries) or a directory (objects) in a file.
20 Packets are generated taking into account the performance of the
21 remote machine, the time it took to process a previous packet on
22 the remote machine, the locality of the database files, etc.
23 
24 TVirtualPacketizer includes common parts of PROOF packetizers.
25 Look in subclasses for details.
26 The default packetizer is TPacketizerAdaptive (TPacketizer for Proof-Lite).
27 To use an alternative one, for instance - the TPacketizer, call:
28 proof->SetParameter("PROOF_Packetizer", "TPacketizer");
29 
30 */
31 
32 #include "TVirtualPacketizer.h"
33 #include "TEnv.h"
34 #include "TFile.h"
35 #include "TTree.h"
36 #include "TKey.h"
37 #include "TDSet.h"
38 #include "TError.h"
39 #include "TEventList.h"
40 #include "TEntryList.h"
41 #include "TMap.h"
42 #include "TMessage.h"
43 #include "TObjString.h"
44 #include "TParameter.h"
45 
46 #include "TProof.h"
47 #include "TProofDebug.h"
48 #include "TProofPlayer.h"
49 #include "TProofServ.h"
50 #include "TSlave.h"
51 #include "TSocket.h"
52 #include "TTimer.h"
53 #include "TUrl.h"
54 #include "TMath.h"
55 #include "TMonitor.h"
56 #include "TNtuple.h"
57 #include "TNtupleD.h"
58 #include "TPerfStats.h"
59 
60 ClassImp(TVirtualPacketizer);
61 
62 ////////////////////////////////////////////////////////////////////////////////
63 /// Constructor.
64 
65 TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st)
66 {
67  fInput = input;
68  // General configuration parameters
69  fMinPacketTime = 3;
70  Double_t minPacketTime = 0;
71  if (TProof::GetParameter(input, "PROOF_MinPacketTime", minPacketTime) == 0) {
72  Info("TVirtualPacketizer", "setting minimum time for a packet to %f",
73  minPacketTime);
74  fMinPacketTime = (Int_t) minPacketTime;
75  }
76  fMaxPacketTime = 20;
77  Double_t maxPacketTime = 0;
78  if (TProof::GetParameter(input, "PROOF_MaxPacketTime", maxPacketTime) == 0) {
79  Info("TVirtualPacketizer", "setting maximum packet time for a packet to %f",
80  maxPacketTime);
81  fMaxPacketTime = (Int_t) maxPacketTime;
82  }
83  ResetBit(TVirtualPacketizer::kIsTree);
84 
85  // Create the list to save them in the query result (each derived packetizer is
86  // responsible to update this coherently)
87  fConfigParams = new TList;
88  fConfigParams->SetName("PROOF_PacketizerConfigParams");
89  fConfigParams->Add(new TParameter<Double_t>("PROOF_MinPacketTime", fMinPacketTime));
90  fConfigParams->Add(new TParameter<Double_t>("PROOF_MaxPacketTime", fMaxPacketTime));
91 
92  fProgressStatus = st;
93  if (!fProgressStatus) {
94  Error("TVirtualPacketizer", "No progress status");
95  return;
96  }
97  fTotalEntries = 0;
98  fValid = kTRUE;
99  fStop = kFALSE;
100  fFailedPackets = 0;
101  fDataSet = "";
102  fSlaveStats = 0;
103 
104  // Performance monitoring
105  fStartTime = gSystem->Now();
106  SetBit(TVirtualPacketizer::kIsInitializing);
107  ResetBit(TVirtualPacketizer::kIsDone);
108  fInitTime = 0;
109  fProcTime = 0;
110  fTimeUpdt = -1.;
111 
112  // Init circularity ntple for performance calculations
113  fCircProg = new TNtupleD("CircNtuple","Circular progress info","tm:ev:mb:rc:al");
114  fCircN = 5;
115  TProof::GetParameter(input, "PROOF_ProgressCircularity", fCircN);
116  fCircProg->SetCircular(fCircN);
117  fCircProg->SetDirectory(0);
118 
119  // Check if we need to start the progress timer (multi-packetizers do not want
120  // timers from the packetizers they control ...). Also submasters do not need
121  // that (the progress timer is the one at the top master).
122  TString startProgress("yes");
123  TProof::GetParameter(input, "PROOF_StartProgressTimer", startProgress);
124  // If we are on a submaster, check if there is something else to do
125  if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress = "no";
126 
127  // Init progress timer, if requested
128  // The timer is destroyed (and therefore stopped) by the relevant TPacketizer implementation
129  // in GetNextPacket when end of work is detected.
130  fProgress = 0;
131  if (startProgress == "yes") {
132  Long_t period = 500;
133  TProof::GetParameter(input, "PROOF_ProgressPeriod", period);
134  fProgress = new TTimer;
135  fProgress->SetObject(this);
136  fProgress->Start(period, kFALSE);
137  }
138 
139  // Init ntple to store active workers vs processing time
140  fProgressPerf = 0;
141  TString saveProgressPerf("no");
142  if (TProof::GetParameter(input, "PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
143  if (fProgress && saveProgressPerf == "yes")
144  fProgressPerf = new TNtuple("PROOF_ProgressPerfNtuple",
145  "{Active workers, evt rate, MB read} vs processing time", "tm:aw:er:mb:ns");
146  }
147  fProcTimeLast = -1.;
148  fActWrksLast = -1;
149  fEvtRateLast = -1.;
150  fMBsReadLast = -1.;
151  fEffSessLast = -1.;
152  fAWLastFill = kFALSE;
153  fReportPeriod = -1.;
154 
155  // Whether to send estimated values for the progress info
156  TString estopt;
157  if (TProof::GetParameter(input, "PROOF_RateEstimation", estopt) != 0 ||
158  estopt.IsNull()) {
159  // Parse option from the env
160  estopt = gEnv->GetValue("Proof.RateEstimation", "");
161  }
162  fUseEstOpt = kEstOff;
163  if (estopt == "current")
164  fUseEstOpt = kEstCurrent;
165  else if (estopt == "average")
166  fUseEstOpt = kEstAverage;
167 }
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Destructor.
171 
172 TVirtualPacketizer::~TVirtualPacketizer()
173 {
174  SafeDelete(fCircProg);
175  SafeDelete(fProgress);
176  SafeDelete(fFailedPackets);
177  SafeDelete(fConfigParams);
178  SafeDelete(fProgressPerf);
179  fProgressStatus = 0; // belongs to the player
180 }
181 
182 ////////////////////////////////////////////////////////////////////////////////
183 /// Get entries.
184 
185 Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
186 {
187  Long64_t entries;
188  TFile *file = TFile::Open(e->GetFileName());
189 
190  if (!file || (file && file->IsZombie())) {
191  const char *emsg = (file) ? strerror(file->GetErrno()) : "<undef>";
192  Error("GetEntries","Cannot open file: %s (%s)", e->GetFileName(), emsg);
193  return -1;
194  }
195 
196  TDirectory *dirsave = gDirectory;
197  if ( ! file->cd(e->GetDirectory()) ) {
198  Error("GetEntries","Cannot cd to: %s", e->GetDirectory() );
199  delete file;
200  return -1;
201  }
202  TDirectory *dir = gDirectory;
203  dirsave->cd();
204 
205  if ( tree ) {
206  TKey *key = dir->GetKey(e->GetObjName());
207  if ( key == 0 ) {
208  Error("GetEntries","Cannot find tree \"%s\" in %s",
209  e->GetObjName(), e->GetFileName() );
210  delete file;
211  return -1;
212  }
213  TTree *t = (TTree *) key->ReadObj();
214  if ( t == 0 ) {
215  // Error always reported?
216  delete file;
217  return -1;
218  }
219  entries = (Long64_t) t->GetEntries();
220  delete t;
221 
222  } else {
223  TList *keys = dir->GetListOfKeys();
224  entries = keys->GetSize();
225  }
226 
227  delete file;
228 
229  return entries;
230 }
231 
232 ////////////////////////////////////////////////////////////////////////////////
233 /// Get next packet.
234 
235 TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
236 {
237  AbstractMethod("GetNextPacket");
238  return 0;
239 }
240 
241 ////////////////////////////////////////////////////////////////////////////////
242 /// Stop process.
243 
244 void TVirtualPacketizer::StopProcess(Bool_t /*abort*/, Bool_t stoptimer)
245 {
246  fStop = kTRUE;
247  if (stoptimer) HandleTimer(0);
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Creates a new TDSetElement from from base packet starting from
252 /// the first entry with num entries.
253 /// The function returns a new created objects which have to be deleted.
254 
255 TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
256  Long64_t first, Long64_t num)
257 {
258  TDSetElement* elem = new TDSetElement(base->GetFileName(), base->GetObjName(),
259  base->GetDirectory(), first, num,
260  0, fDataSet.Data());
261 
262  // create TDSetElements for all the friends of elem.
263  TList *friends = base->GetListOfFriends();
264  if (friends) {
265  TIter nxf(friends);
266  TDSetElement *fe = 0;
267  while ((fe = (TDSetElement *) nxf())) {
268  PDB(kLoop,2)
269  Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
270  fe->GetFileName(), fe->GetObjName());
271  TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
272  fe->GetDirectory(), first, num);
273  // The alias, if any, is in the element name options ('friend_alias=<alias>|')
274  elem->AddFriend(xfe, 0);
275  }
276  }
277 
278  return elem;
279 }
280 
281 ////////////////////////////////////////////////////////////////////////////////
282 /// Send progress message to client.
283 
284 Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
285 {
286  PDB(kPacketizer,2)
287  Info("HandleTimer", "fProgress: %p, isDone: %d",
288  fProgress, TestBit(TVirtualPacketizer::kIsDone));
289 
290  if (fProgress == 0 || TestBit(TVirtualPacketizer::kIsDone)) {
291  // Make sure that the timer is stopped
292  if (fProgress) fProgress->Stop();
293  return kFALSE;
294  }
295 
296  // Prepare progress info
297  TTime tnow = gSystem->Now();
298  Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
299  Long64_t estent = GetEntriesProcessed();
300  Long64_t estmb = GetBytesRead();
301  Long64_t estrc = GetReadCalls();
302 
303  // Times and counters
304  Float_t evtrti = -1., mbrti = -1.;
305  if (TestBit(TVirtualPacketizer::kIsInitializing)) {
306  // Initialization
307  fInitTime = now;
308  } else {
309  // Fill the reference as first
310  if (fCircProg->GetEntries() <= 0) {
311  fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
312  }
313  // Time between updates
314  fTimeUpdt = now - fProcTime;
315  // Update proc time
316  fProcTime = now - fInitTime;
317  // Get the last entry
318  Double_t *ar = fCircProg->GetArgs();
319  fCircProg->GetEntry(fCircProg->GetEntries()-1);
320  // The current rate
321  Bool_t all = kTRUE;
322  evtrti = GetCurrentRate(all);
323  Double_t xall = (all) ? 1. : 0.;
324  GetEstEntriesProcessed(0, estent, estmb, estrc);
325  if (estent >= fTotalEntries) {
326  estent = GetEntriesProcessed();
327  estmb = GetBytesRead();
328  estrc = GetReadCalls();
329  }
330  // Fill entry
331  Double_t evts = (Double_t) estent;
332  Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.; //--> MB
333  Double_t rcs = (Double_t) estrc;
334  fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
335  fCircProg->GetEntry(fCircProg->GetEntries()-2);
336  if (all) {
337  Double_t dt = (Double_t)fProcTime - ar[0];
338  Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
339  Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
340  if (gPerfStats)
341  gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
342  // Get the last to spot the cache readings
343  Double_t rc = (Double_t)estrc - ar[3];
344  mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
345  }
346  // Final report only once (to correctly determine the proc time)
347  if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
348  SetBit(TVirtualPacketizer::kIsDone);
349  PDB(kPacketizer,2)
350  Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
351  estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
352  }
353 
354  if (gProofServ) {
355  // Message to be sent over
356  TMessage m(kPROOF_PROGRESS);
357  if (gProofServ->GetProtocol() > 25) {
358  Int_t actw = GetActiveWorkers();
359  Int_t acts = gProofServ->GetActSessions();
360  Float_t effs = gProofServ->GetEffSessions();
361  if (fProgressPerf && estent > 0) {
362  // Estimated query time
363  if (fProcTime > 0.) {
364  fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
365  if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
366  }
367 
368  if (fProgressPerf->GetEntries() <= 0) {
369  // Fill the first entry
370  fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
371  } else {
372  // Fill only if changed since last entry filled
373  Float_t *far = fProgressPerf->GetArgs();
374  fProgressPerf->GetEntry(fProgressPerf->GetEntries()-1);
375  Bool_t doReport = (fReportPeriod > 0. &&
376  (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
377  Float_t mbsread = estmb / 1024. / 1024.;
378  if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
379  if (fAWLastFill)
380  fProgressPerf->Fill(fProcTimeLast, (Float_t)fActWrksLast,
381  fEvtRateLast, fMBsReadLast, fEffSessLast);
382  fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
383  fAWLastFill = kFALSE;
384  } else if (doReport) {
385  fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
386  fAWLastFill = kFALSE;
387  } else {
388  fAWLastFill = kTRUE;
389  }
390  fProcTimeLast = fProcTime;
391  fActWrksLast = actw;
392  fEvtRateLast = evtrti;
393  fMBsReadLast = mbsread;
394  fEffSessLast = effs;
395  }
396  }
397  // Fill the message now
398  TProofProgressInfo pi(fTotalEntries, estent, estmb, fInitTime,
399  fProcTime, evtrti, mbrti, actw, acts, effs);
400  m << &pi;
401  } else if (gProofServ->GetProtocol() > 11) {
402  // Fill the message now
403  m << fTotalEntries << estent << estmb << fInitTime << fProcTime
404  << evtrti << mbrti;
405  } else {
406  // Old format
407  m << fTotalEntries << GetEntriesProcessed();
408  }
409  // send message to client;
410  gProofServ->GetSocket()->Send(m);
411 
412  } else {
413  if (gProof && gProof->GetPlayer()) {
414  // Log locally
415  gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
416  fInitTime, fProcTime, evtrti, mbrti);
417  }
418  }
419 
420  // Final report only once (to correctly determine the proc time)
421  if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
422  SetBit(TVirtualPacketizer::kIsDone);
423 
424  return kFALSE; // ignored?
425 }
426 
427 ////////////////////////////////////////////////////////////////////////////////
428 /// Set the initialization time
429 
430 void TVirtualPacketizer::SetInitTime()
431 {
432  if (TestBit(TVirtualPacketizer::kIsInitializing)) {
433  fInitTime = Long64_t(gSystem->Now() - fStartTime) / (Float_t)1000.;
434  ResetBit(TVirtualPacketizer::kIsInitializing);
435  PDB(kPacketizer,2)
436  Info("SetInitTime","fInitTime set to %f s", fInitTime);
437  }
438 }
439 
440 ////////////////////////////////////////////////////////////////////////////////
441 /// Adds new workers. Must be implemented by each real packetizer properly.
442 /// Returns the number of workers added, or -1 on failure.
443 
444 Int_t TVirtualPacketizer::AddWorkers(TList *)
445 {
446  Warning("AddWorkers", "Not implemented for this packetizer");
447 
448  return -1;
449 }