Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TMPWorkerTree.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "MPCode.h"
14 #include "MPSendRecv.h"
15 #include "TError.h"
16 #include "TMPWorkerTree.h"
17 #include "TSystem.h"
18 #include "TEnv.h"
19 #include <string>
20 
21 //////////////////////////////////////////////////////////////////////////
22 ///
23 /// \class TMPWorkerTree
24 ///
25 /// This class works in conjuction with TTreeProcessorMP, reacting to messages
26 /// received from it as specified by the Notify and HandleInput methods.
27 ///
28 /// \class TMPWorkerTreeFunc
29 ///
30 /// Templated derivation of TMPWorkerTree handlign generic function tree processing.
31 ///
32 /// \class TMPWorkerTreeSel
33 ///
34 /// Templated derivation of TMPWorkerTree handlign selector tree processing.
35 ///
36 //////////////////////////////////////////////////////////////////////////
37 
38 //////////////////////////////////////////////////////////////////////////
39 /// Class constructors.
40 /// Note that this does not set variables like fPid or fS (worker's socket).\n
41 /// These operations are handled by the Init method, which is called after
42 /// forking.\n
43 /// This separation is in place because the instantiation of a worker
44 /// must be done once _before_ forking, while the initialization of the
45 /// members must be done _after_ forking by each of the children processes.
46 TMPWorkerTree::TMPWorkerTree()
47  : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48  fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
49 {
50  Setup();
51 }
52 
53 TMPWorkerTree::TMPWorkerTree(const std::vector<std::string> &fileNames, TEntryList *entries,
54  const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
55  : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
56  fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE),
57  fCacheSize(-1)
58 {
59  Setup();
60 }
61 
62 TMPWorkerTree::TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
63  ULong64_t firstEntry)
64  : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
65  fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
66 {
67  Setup();
68 }
69 
70 TMPWorkerTree::~TMPWorkerTree()
71 {
72  // Properly close the open file, if any
73  CloseFile();
74 }
75 
76 //////////////////////////////////////////////////////////////////////////
77 /// Auxilliary method for common initializations
78 void TMPWorkerTree::Setup()
79 {
80  Int_t uc = gEnv->GetValue("MultiProc.UseTreeCache", 1);
81  if (uc != 1) fUseTreeCache = kFALSE;
82  fCacheSize = gEnv->GetValue("MultiProc.CacheSize", -1);
83 }
84 
85 //////////////////////////////////////////////////////////////////////////
86 /// Handle file closing.
87 
88 void TMPWorkerTree::CloseFile()
89 {
90  // Avoid destroying the cache; must be placed before deleting the trees
91  if (fFile) {
92  if (fTree) fFile->SetCacheRead(0, fTree);
93  delete fFile ;
94  fFile = 0;
95  }
96 }
97 
98 //////////////////////////////////////////////////////////////////////////
99 /// Handle file opening.
100 
101 TFile *TMPWorkerTree::OpenFile(const std::string& fileName)
102 {
103 
104  TFile *fp = TFile::Open(fileName.c_str());
105  if (fp == nullptr || fp->IsZombie()) {
106  std::stringstream ss;
107  ss << "could not open file " << fileName;
108  std::string errmsg = ss.str();
109  SendError(errmsg, MPCode::kProcError);
110  return nullptr;
111  }
112 
113  return fp;
114 }
115 
116 //////////////////////////////////////////////////////////////////////////
117 /// Retrieve a tree from an open file.
118 
119 TTree *TMPWorkerTree::RetrieveTree(TFile *fp)
120 {
121  //retrieve the TTree with the specified name from file
122  //we are not the owner of the TTree object, the file is!
123  TTree *tree = nullptr;
124  if(fTreeName == "") {
125  // retrieve the first TTree
126  // (re-adapted from TEventIter.cxx)
127  if (fp->GetListOfKeys()) {
128  for(auto k : *fp->GetListOfKeys()) {
129  TKey *key = static_cast<TKey*>(k);
130  if (!strcmp(key->GetClassName(), "TTree") || !strcmp(key->GetClassName(), "TNtuple"))
131  tree = static_cast<TTree*>(fp->Get(key->GetName()));
132  }
133  }
134  } else {
135  tree = static_cast<TTree*>(fp->Get(fTreeName.c_str()));
136  }
137  if (tree == nullptr) {
138  std::stringstream ss;
139  ss << "cannot find tree with name " << fTreeName << " in file " << fp->GetName();
140  std::string errmsg = ss.str();
141  SendError(errmsg, MPCode::kProcError);
142  return nullptr;
143  }
144 
145  return tree;
146 }
147 
148 //////////////////////////////////////////////////////////////////////////
149 /// Tree cache handling
150 
151 void TMPWorkerTree::SetupTreeCache(TTree *tree)
152 {
153  if (fUseTreeCache) {
154  TFile *curfile = tree->GetCurrentFile();
155  if (curfile) {
156  if (!fTreeCache) {
157  tree->SetCacheSize(fCacheSize);
158  fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
159  if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
160  } else {
161  fTreeCache->UpdateBranches(tree);
162  fTreeCache->ResetCache();
163  curfile->SetCacheRead(fTreeCache, tree);
164  }
165  if (fTreeCache) {
166  fTreeCacheIsLearning = fTreeCache->IsLearning();
167  }
168  } else {
169  Warning("SetupTreeCache", "default tree does not have a file attached: corruption? Tree cache untouched");
170  }
171  } else {
172  // Disable the cache
173  tree->SetCacheSize(0);
174  }
175 }
176 
177 //////////////////////////////////////////////////////////////////////////
178 /// Init overload definign max entries
179 
180 void TMPWorkerTree::Init(Int_t fd, UInt_t workerN)
181 {
182 
183  TMPWorker::Init(fd, workerN);
184  fMaxNEntries = EvalMaxEntries(fMaxNEntries);
185 }
186 
187 //////////////////////////////////////////////////////////////////////////
188 /// Max entries evaluation
189 
190 ULong64_t TMPWorkerTree::EvalMaxEntries(ULong64_t maxEntries)
191 {
192  // E.g.: when dividing 10 entries between 3 workers, the first
193  // two will process 10/3 == 3 entries, the last one will process
194  // 10 - 2*(10/3) == 4 entries.
195  if(GetNWorker() < fNWorkers-1)
196  return maxEntries/fNWorkers;
197  else
198  return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
199 }
200 
201 //////////////////////////////////////////////////////////////////////////
202 /// Generic input handling
203 
204 void TMPWorkerTree::HandleInput(MPCodeBufPair& msg)
205 {
206  UInt_t code = msg.first;
207 
208  if (code == MPCode::kProcRange
209  || code == MPCode::kProcFile
210  || code == MPCode::kProcTree) {
211  //execute fProcFunc on a file or a range of entries in a file
212  Process(code, msg);
213  } else if (code == MPCode::kSendResult) {
214  //send back result
215  SendResult();
216  } else {
217  //unknown code received
218  std::string reply = "S" + std::to_string(GetNWorker());
219  reply += ": unknown code received: " + std::to_string(code);
220  MPSend(GetSocket(), MPCode::kError, reply.c_str());
221  }
222 }
223 
224 
225 
226 //////////////////////////////////////////////////////////////////////////
227 /// Selector processing SendResult and Process overload
228 
229 void TMPWorkerTreeSel::SendResult()
230 {
231  //send back result
232  fSelector.SlaveTerminate();
233  MPSend(GetSocket(), MPCode::kProcResult, fSelector.GetOutputList());
234 }
235 
236 /// Selector specialization
237 void TMPWorkerTreeSel::Process(UInt_t code, MPCodeBufPair &msg)
238 {
239  //evaluate the index of the file to process in fFileNames
240  //(we actually don't need the parameter if code == kProcTree)
241 
242  Long64_t start = 0;
243  Long64_t finish = 0;
244  TEntryList *enl = 0;
245  std::string errmsg;
246  if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
247  SendError(errmsg);
248  return;
249  }
250 
251  if (fCallBegin) {
252  fSelector.SlaveBegin(nullptr);
253  fCallBegin = false;
254  }
255 
256  fSelector.Init(fTree);
257  fSelector.Notify();
258  for (Long64_t entry = start; entry < finish; ++entry) {
259  Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
260  fSelector.Process(e);
261  }
262 
263  // update the number of processed entries
264  fProcessedEntries += finish - start;
265 
266  MPSend(GetSocket(), MPCode::kIdling);
267 
268  return;
269 }
270 
271 /// Load the requierd tree and evaluate the processing range
272 
273 Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
274  std::string &errmsg)
275 {
276  // evaluate the index of the file to process in fFileNames
277  //(we actually don't need the parameter if code == kProcTree)
278 
279  start = 0;
280  finish = 0;
281  errmsg = "";
282 
283  UInt_t fileN = 0;
284  UInt_t nProcessed = 0;
285  Bool_t setupcache = true;
286 
287  std::string mgroot = "[S" + std::to_string(GetNWorker()) + "]: ";
288 
289  TTree *tree = 0;
290  if (code == MPCode::kProcTree) {
291 
292  mgroot += "MPCode::kProcTree: ";
293 
294  // The tree must be defined at this level
295  if(fTree == nullptr) {
296  errmsg = mgroot + std::string("tree undefined!");
297  return -1;
298  }
299 
300  //retrieve the total number of entries ranges processed so far by TPool
301  nProcessed = ReadBuffer<UInt_t>(msg.second.get());
302 
303  //create entries range
304  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
305  //and this worker must take the rangeN-th range
306  Long64_t nEntries = fTree->GetEntries();
307  UInt_t nBunch = nEntries / fNWorkers;
308  UInt_t rangeN = nProcessed % fNWorkers;
309  start = rangeN * nBunch;
310  if (rangeN < (fNWorkers - 1)) {
311  finish = (rangeN+1)*nBunch;
312  } else {
313  finish = nEntries;
314  }
315 
316  //process tree
317  tree = fTree;
318  CloseFile(); // May not be needed
319  if (fTree->GetCurrentFile()) {
320  // We need to reopen the file locally (TODO: to understand and fix this)
321  if ((fFile = TFile::Open(fTree->GetCurrentFile()->GetName())) && !fFile->IsZombie()) {
322  if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
323  errmsg = mgroot + std::string("unable to retrieve tree from open file ") +
324  std::string(fTree->GetCurrentFile()->GetName());
325  delete fFile;
326  return -1;
327  }
328  fTree = tree;
329  } else {
330  //errors are handled inside OpenFile
331  errmsg = mgroot + std::string("unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
332  if (fFile && fFile->IsZombie()) delete fFile;
333  return -1;
334  }
335  }
336 
337  } else {
338 
339  if (code == MPCode::kProcRange) {
340  mgroot += "MPCode::kProcRange: ";
341  //retrieve the total number of entries ranges processed so far by TPool
342  nProcessed = ReadBuffer<UInt_t>(msg.second.get());
343  //evaluate the file and the entries range to process
344  fileN = nProcessed / fNWorkers;
345  } else if (code == MPCode::kProcFile) {
346  mgroot += "MPCode::kProcFile: ";
347  //evaluate the file and the entries range to process
348  fileN = ReadBuffer<UInt_t>(msg.second.get());
349  } else {
350  errmsg += "MPCode undefined!";
351  return -1;
352  }
353 
354  // Open the file if required
355  if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
356  if (!fFile) {
357  fFile = OpenFile(fFileNames[fileN]);
358  if (fFile == nullptr) {
359  // errors are handled inside OpenFile
360  errmsg = mgroot + std::string("unable to open file ") + fFileNames[fileN];
361  return -1;
362  }
363  }
364 
365  //retrieve the TTree with the specified name from file
366  //we are not the owner of the TTree object, the file is!
367  tree = RetrieveTree(fFile);
368  if (tree == nullptr) {
369  //errors are handled inside RetrieveTree
370  errmsg = mgroot + std::string("unable to retrieve tree from open file ") + fFileNames[fileN];
371  return -1;
372  }
373 
374  // Prepare to setup the cache, if required
375  setupcache = (tree != fTree) ? true : false;
376 
377  // Store as reference
378  fTree = tree;
379 
380  //create entries range
381  if (code == MPCode::kProcRange) {
382  //example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
383  //and this worker must take the rangeN-th range
384  Long64_t nEntries = tree->GetEntries();
385  UInt_t nBunch = nEntries / fNWorkers;
386  if(nEntries % fNWorkers) nBunch++;
387  UInt_t rangeN = nProcessed % fNWorkers;
388  start = rangeN * nBunch;
389  if(rangeN < (fNWorkers-1))
390  finish = (rangeN+1)*nBunch;
391  else
392  finish = nEntries;
393  } else {
394  start = 0;
395  finish = tree->GetEntries();
396  }
397  }
398 
399  // Setup the cache, if required
400  if (setupcache) SetupTreeCache(fTree);
401 
402  // Get the entrylist, if required
403  if (fEntryList && enl) {
404  if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
405  // create entries range
406  if (code == MPCode::kProcRange) {
407  // example: for 21 entries, 4 workers we want ranges 0-5, 5-10, 10-15, 15-21
408  // and this worker must take the rangeN-th range
409  ULong64_t nEntries = (*enl)->GetN();
410  UInt_t nBunch = nEntries / fNWorkers;
411  if (nEntries % fNWorkers) nBunch++;
412  UInt_t rangeN = nProcessed % fNWorkers;
413  start = rangeN * nBunch;
414  if (rangeN < (fNWorkers - 1))
415  finish = (rangeN + 1) * nBunch;
416  else
417  finish = nEntries;
418  } else {
419  start = 0;
420  finish = (*enl)->GetN();
421  }
422  } else {
423  Warning("LoadTree", "failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
424  }
425  }
426 
427  //check if we are going to reach the max of entries
428  //change finish accordingly
429  if (fMaxNEntries)
430  if (fProcessedEntries + finish - start > fMaxNEntries)
431  finish = start + fMaxNEntries - fProcessedEntries;
432 
433  if (gDebug > 0 && fFile)
434  Info("LoadTree", "%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,
435  finish);
436 
437  return 0;
438 }