Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProofPlayerLite.cxx
Go to the documentation of this file.
1 // @(#)root/proofplayer:$Id$
2 // Author: G. Ganis Mar 2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofPlayerLite
13 \ingroup proofkernel
14 
15 Version of TProofPlayerRemote merges the functionality needed by clients
16 and masters. It is used in optmized local sessions.
17 
18 */
19 
20 #include "TProofPlayerLite.h"
21 
22 #include "MessageTypes.h"
23 #include "TDSet.h"
24 #include "TDSetProxy.h"
25 #include "TEntryList.h"
26 #include "TEventList.h"
27 #include "THashList.h"
28 #include "TMap.h"
29 #include "TMessage.h"
30 #include "TObjString.h"
31 #include "TPerfStats.h"
32 #include "TProofLite.h"
33 #include "TProofDebug.h"
34 #include "TProofServ.h"
35 #include "TROOT.h"
36 #include "TSelector.h"
37 #include "TVirtualPacketizer.h"
38 
39 ////////////////////////////////////////////////////////////////////////////////
40 /// Create the selector object and save the relevant files and binary information
41 /// in the cache so that the worker can pick it up.
42 /// Returns 0 and fill fSelector in case of success. Returns -1 and sets
43 /// fSelector to 0 in case of failure.
44 
45 Int_t TProofPlayerLite::MakeSelector(const char *selfile)
46 {
47  fSelectorClass = 0;
48  SafeDelete(fSelector);
49  if (!selfile || strlen(selfile) <= 0) {
50  Error("MakeSelector", "input file path or name undefined");
51  return -1;
52  }
53 
54  // If we are just given a name, init the selector and return
55  if (!strchr(gSystem->BaseName(selfile), '.')) {
56  if (gDebug > 1)
57  Info("MakeSelector", "selector name '%s' does not contain a '.':"
58  " no file to check, it will be loaded from a library", selfile);
59  if (!(fSelector = TSelector::GetSelector(selfile))) {
60  Error("MakeSelector", "could not create a %s selector", selfile);
61  return -1;
62  }
63  // Done
64  return 0;
65  }
66 
67  if (((TProofLite*)fProof)->CopyMacroToCache(selfile, 1, &fSelector, TProof::kCp | TProof::kCpBin) < 0)
68  return -1;
69 
70  // Done
71  return 0;
72 }
73 
74 ////////////////////////////////////////////////////////////////////////////////
75 /// Process specified TDSet on PROOF.
76 /// This method is called on client and on the PROOF master.
77 /// The return value is -1 in case of an error and TSelector::GetStatus() in
78 /// in case of success.
79 
80 Long64_t TProofPlayerLite::Process(TDSet *dset, TSelector *selector,
81  Option_t *option, Long64_t nentries,
82  Long64_t first)
83 {
84  if (!selector) {
85  Error("Process", "selector object undefined");
86  return -1;
87  }
88 
89  // Define fSelector in Client
90  if (selector != fSelector) {
91  SafeDelete(fSelector);
92  fSelector = selector;
93  }
94 
95  fCreateSelObj = kFALSE;
96  Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
97  fCreateSelObj = kTRUE;
98 
99  // Done
100  return rc;
101 }
102 
103 ////////////////////////////////////////////////////////////////////////////////
104 /// Process specified TDSet on PROOF.
105 /// This method is called on client and on the PROOF master.
106 /// The return value is -1 in case of error and TSelector::GetStatus() in
107 /// in case of success.
108 
109 Long64_t TProofPlayerLite::Process(TDSet *dset, const char *selector_file,
110  Option_t *option, Long64_t nentries,
111  Long64_t first)
112 {
113  PDB(kGlobal,1) Info("Process","Enter");
114  fDSet = dset;
115  fExitStatus = kFinished;
116 
117  if (!fProgressStatus) {
118  Error("Process", "No progress status");
119  return -1;
120  }
121  fProgressStatus->Reset();
122 
123  // delete fOutput;
124  if (!fOutput)
125  fOutput = new THashList;
126  else
127  fOutput->Clear();
128 
129  TPerfStats::Setup(fInput);
130  TPerfStats::Start(fInput, fOutput);
131 
132  TStopwatch elapsed;
133 
134  TMessage mesg(kPROOF_PROCESS);
135  TString fn(gSystem->BaseName(selector_file));
136 
137  // Parse option
138  Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
139 
140  // Make sure that the temporary output list is empty
141  if (fOutputLists) {
142  fOutputLists->Delete();
143  delete fOutputLists;
144  fOutputLists = 0;
145  }
146 
147  if (!sync) {
148  gSystem->RedirectOutput(fProof->fLogFileName);
149  Printf(" ");
150  Info("Process","starting new query");
151  }
152 
153  if (fCreateSelObj) {
154  if (MakeSelector(selector_file) != 0) {
155  if (!sync)
156  gSystem->RedirectOutput(0);
157  return -1;
158  }
159  }
160 
161  fSelectorClass = fSelector->IsA();
162  // Add fSelector to inputlist if processing with object
163  TList *inputtmp = 0; // List of temporary input objects
164  if (!fCreateSelObj) {
165  // In any input list was set into the selector move it to the PROOF
166  // input list, because we do not want to stream the selector one
167  if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
168  TIter nxi(fSelector->GetInputList());
169  TObject *o = 0;
170  while ((o = nxi())) {
171  if (!fInput->FindObject(o)) {
172  fInput->Add(o);
173  if (!inputtmp) {
174  inputtmp = new TList;
175  inputtmp->SetOwner(kFALSE);
176  }
177  inputtmp->Add(o);
178  }
179  }
180  }
181  fInput->Add(fSelector);
182  }
183  // Set the input list for initialization
184  fSelector->SetInputList(fInput);
185  fSelector->SetOption(option);
186  if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
187 
188  PDB(kLoop,1) Info("Process","Call Begin(0)");
189  fSelector->Begin(0);
190 
191  // Send large input data objects, if any
192  gProof->SendInputDataFile();
193 
194  // Attach to the transient histogram with the assigned packets, if required
195  if (fInput->FindObject("PROOF_StatsHist") != 0) {
196  if (!(fProcPackets = (TH1 *) fOutput->FindObject("PROOF_ProcPcktHist"))) {
197  Warning("Process", "could not attach to histogram 'PROOF_ProcPcktHist'");
198  } else {
199  PDB(kLoop,1)
200  Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
201  " packets being processed");
202  }
203  }
204 
205  PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
206  TDSet *set = new TDSetProxy(dset->GetType(), dset->GetObjName(),
207  dset->GetDirectory());
208  if (dset->TestBit(TDSet::kEmpty))
209  set->SetBit(TDSet::kEmpty);
210  fProof->SetParameter("PROOF_MaxSlavesPerNode", (Long_t) 0);
211  if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
212  Error("Process", "cannot init the packetizer");
213  fExitStatus = kAborted;
214  return -1;
215  }
216  // reset start, this is now managed by the packetizer
217  first = 0;
218 
219  // Negative memlogfreq disable checks.
220  // If 0 is passed we try to have 100 messages about memory
221  // Otherwise we use the frequency passed.
222  Int_t mrc = -1;
223  Long64_t memlogfreq = -1, mlf;
224  if ((mrc = TProof::GetParameter(fProof->GetInputList(), "PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
225  if (mrc != 0 && gSystem->Getenv("PROOF_MEMLOGFREQ")) {
226  TString clf(gSystem->Getenv("PROOF_MEMLOGFREQ"));
227  if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
228  }
229  if (memlogfreq == 0) {
230  memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
231  if (memlogfreq <= 0) memlogfreq = 1;
232  }
233  if (mrc == 0) fProof->SetParameter("PROOF_MemLogFreq", memlogfreq);
234 
235  // Add the unique query tag as TNamed object to the input list
236  // so that it is available in TSelectors for monitoring
237  fProof->SetParameter("PROOF_QueryTag", fProof->GetName());
238  // ... and the sequential number
239  fProof->SetParameter("PROOF_QuerySeqNum", fProof->fSeqNum);
240 
241  if (!sync)
242  gSystem->RedirectOutput(0);
243 
244  TCleanup clean(this);
245  SetupFeedback();
246 
247  TString opt = option;
248 
249  // Workers will get the entry ranges from the packetizer
250  Long64_t num = (fProof->IsParallel()) ? -1 : nentries;
251  Long64_t fst = (fProof->IsParallel()) ? -1 : first;
252 
253  // Entry- or Event- list ?
254  TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
255  : (TEntryList *)0;
256  TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
257  : (TEventList *)0;
258  // Reset the merging progress information
259  fProof->ResetMergePrg();
260 
261  // Broadcast main message
262  PDB(kGlobal,1) Info("Process","Calling Broadcast");
263  if (fProcessMessage) delete fProcessMessage;
264  fProcessMessage = new TMessage(kPROOF_PROCESS);
265  mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
266  (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
267  Int_t nb = fProof->Broadcast(mesg);
268  PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
269  fProof->fNotIdle += nb;
270 
271  // Redirect logs from master to special log frame
272  fProof->fRedirLog = kTRUE;
273 
274  if (!sync) {
275 
276  // Asynchronous query: just make sure that asynchronous input
277  // is enabled and return the prompt
278  PDB(kGlobal,1) Info("Process","Asynchronous processing:"
279  " activating CollectInputFrom");
280  fProof->Activate();
281 
282  // Return the query sequential number
283  return fProof->fSeqNum;
284 
285  } else {
286 
287  // Wait for processing
288  PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
289  fProof->Collect();
290 
291  // Restore prompt logging (Collect leaves things as they were
292  // at the time it was called)
293  fProof->fRedirLog = kFALSE;
294 
295  if (!TSelector::IsStandardDraw(fn))
296  HandleTimer(0); // force an update of final result
297  if (fPacketizer) {
298  fPacketizer->StopProcess(kFALSE, kTRUE);
299  // The progress timer will now stop itself at the next call
300  fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
301  // Store process info
302  elapsed.Stop();
303  if (fQuery)
304  fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
305  fPacketizer->GetInitTime(),
306  elapsed.RealTime());
307  }
308  StopFeedback();
309 
310  Long64_t rc = -1;
311  if (GetExitStatus() != TProofPlayer::kAborted)
312  rc = Finalize(kFALSE, sync);
313 
314  // Remove temporary input objects, if any
315  if (inputtmp) {
316  TIter nxi(inputtmp);
317  TObject *o = 0;
318  while ((o = nxi())) fInput->Remove(o);
319  SafeDelete(inputtmp);
320  }
321 
322  // Done
323  return rc;
324  }
325 }
326 
327 ////////////////////////////////////////////////////////////////////////////////
328 /// Finalize a query.
329 /// Returns -1 in case error, 0 otherwise.
330 
331 Long64_t TProofPlayerLite::Finalize(Bool_t force, Bool_t sync)
332 {
333  if (fOutputLists == 0) {
334  if (force && fQuery)
335  return fProof->Finalize(Form("%s:%s", fQuery->GetTitle(),
336  fQuery->GetName()), force);
337  }
338 
339  Long64_t rv = 0;
340 
341  TPerfStats::Stop();
342 
343  if (!fQuery) {
344  Info("Finalize", "query is undefined!");
345  return -1;
346  }
347 
348  // Some objects (e.g. histos in autobin) may not have been merged yet
349  // do it now
350  MergeOutput();
351 
352  if (fExitStatus != kAborted) {
353 
354  if (!sync) {
355  // Reinit selector (with multi-sessioning we must do this until
356  // TSelector::GetSelector() is optimized to i) avoid reloading of an
357  // unchanged selector and ii) invalidate existing instances of
358  // reloaded selector)
359  if (ReinitSelector(fQuery) == -1) {
360  Info("Finalize", "problems reinitializing selector \"%s\"",
361  fQuery->GetSelecImp()->GetName());
362  return -1;
363  }
364  }
365 
366  // Some input parameters may be needed in Terminate
367  fSelector->SetInputList(fInput);
368 
369  TList *output = fSelector->GetOutputList();
370  if (output) {
371  TIter next(fOutput);
372  while(TObject* obj = next()) {
373  if (fProof->IsParallel() || DrawCanvas(obj) == 1)
374  // Either parallel or not a canvas or not able to display it:
375  // just add to the list
376  output->Add(obj);
377  }
378  } else {
379  Warning("Finalize", "undefined output list in the selector! Protocol error?");
380  }
381 
382  SetSelectorDataMembersFromOutputList();
383 
384  PDB(kLoop,1) Info("Finalize","Call Terminate()");
385  fOutput->Clear("nodelete");
386  // This is the end of merging
387  SetMerging(kFALSE);
388  // We measure the merge time
389  fProof->fQuerySTW.Reset();
390  // Call Terminate now
391  fSelector->Terminate();
392 
393  rv = fSelector->GetStatus();
394 
395  // copy the output list back and clean the selector's list
396  TIter it(output);
397  while(TObject* o = it()) {
398  fOutput->Add(o);
399  }
400 
401  // Save the output list in the current query, if any
402  if (fQuery) {
403  fQuery->SetOutputList(fOutput);
404  // Set in finalized state (cannot be done twice)
405  fQuery->SetFinalized();
406  } else {
407  Warning("Finalize","current TQueryResult object is undefined!");
408  }
409 
410  if (!fCreateSelObj) {
411  fInput->Remove(fSelector);
412  fOutput->Remove(fSelector);
413  if (output) output->Remove(fSelector);
414  fSelector = 0;
415  }
416 
417  // We have transferred copy of the output objects in TQueryResult,
418  // so now we can cleanup the selector, making sure that we do not
419  // touch the output objects
420  if (output) output->SetOwner(kFALSE);
421  SafeDelete(fSelector);
422 
423  // Delete fOutput (not needed anymore, cannot be finalized twice),
424  // making sure that the objects saved in TQueryResult are not deleted
425  fOutput->SetOwner(kFALSE);
426  SafeDelete(fOutput);
427  } else {
428 
429  // Cleanup
430  fOutput->SetOwner();
431  SafeDelete(fSelector);
432  if (!fCreateSelObj) fSelector = 0;
433  }
434 
435  PDB(kGlobal,1) Info("Finalize","exit");
436  return rv;
437 }
438 
439 ////////////////////////////////////////////////////////////////////////////////
440 /// Send feedback objects to client.
441 
442 Bool_t TProofPlayerLite::HandleTimer(TTimer *)
443 {
444  PDB(kFeedback,2)
445  Info("HandleTimer","Entry: %p", fFeedbackTimer);
446 
447  if (fFeedbackTimer == 0) return kFALSE; // timer already switched off
448 
449 
450  // process local feedback objects
451 
452  TList *fb = new TList;
453  fb->SetOwner();
454 
455  TIter next(fFeedback);
456  while( TObjString *name = (TObjString*) next() ) {
457  TObject *o = fOutput->FindObject(name->GetName());
458  if (o != 0) fb->Add(o->Clone());
459  }
460 
461  if (fb->GetSize() > 0)
462  StoreFeedback(this, fb); // adopts fb
463  else
464  delete fb;
465 
466  if (fFeedbackLists == 0) {
467  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE); // maybe next time
468  return kFALSE;
469  }
470 
471  fb = MergeFeedback();
472 
473  Feedback(fb);
474  fb->SetOwner();
475  delete fb;
476 
477  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
478 
479  return kFALSE; // ignored?
480 }
481 
482 ////////////////////////////////////////////////////////////////////////////////
483 /// Setup reporting of feedback objects.
484 
485 void TProofPlayerLite::SetupFeedback()
486 {
487  fFeedback = (TList*) fInput->FindObject("FeedbackList");
488 
489  if (fFeedback) {
490  PDB(kFeedback,1)
491  Info("SetupFeedback","\"FeedbackList\" found: %d objects", fFeedback->GetSize());
492  } else {
493  PDB(kFeedback,1)
494  Info("SetupFeedback","\"FeedbackList\" NOT found");
495  }
496 
497  if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
498 
499  // OK, feedback was requested, setup the timer
500  SafeDelete(fFeedbackTimer);
501  fFeedbackPeriod = 2000;
502  TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
503  fFeedbackTimer = new TTimer;
504  fFeedbackTimer->SetObject(this);
505  fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
506 }
507 
508 ////////////////////////////////////////////////////////////////////////////////
509 /// Store feedback results from the specified slave.
510 
511 void TProofPlayerLite::StoreFeedback(TObject *slave, TList *out)
512 {
513  PDB(kFeedback,1)
514  Info("StoreFeedback","Enter (%p,%p,%d)", fFeedbackLists, out, (out ? out->GetSize() : -1));
515 
516  if ( out == 0 ) {
517  PDB(kFeedback,1)
518  Info("StoreFeedback","Leave (empty)");
519  return;
520  }
521 
522  if (fFeedbackLists == 0) {
523  PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
524  fFeedbackLists = new TList;
525  fFeedbackLists->SetOwner();
526  }
527 
528  TIter next(out);
529  out->SetOwner(kFALSE); // take ownership of the contents
530 
531  TObject *obj;
532  while( (obj = next()) ) {
533  PDB(kFeedback,2)
534  Info("StoreFeedback","Find '%s'", obj->GetName() );
535 
536  TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
537  if ( map == 0 ) {
538  PDB(kFeedback,2)
539  Info("StoreFeedback", "map for '%s' not found (creating)", obj->GetName());
540  // map must not be owner (ownership is with regards to the keys (only))
541  map = new TMap;
542  map->SetName(obj->GetName());
543  fFeedbackLists->Add(map);
544  } else {
545  PDB(kFeedback,2)
546  Info("StoreFeedback","removing previous value");
547  if (map->GetValue(slave))
548  delete map->GetValue(slave);
549  map->Remove(slave);
550  }
551  map->Add(slave, obj);
552  }
553 
554  delete out;
555  PDB(kFeedback,1)
556  Info("StoreFeedback","Leave");
557 }