Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TTreeProcessorMP.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "TEnv.h"
15 #include "TMPWorkerTree.h"
16 
17 //////////////////////////////////////////////////////////////////////////
18 ///
19 /// \class ROOT::TTreeProcessorMP
20 /// \ingroup Parallelism
21 /// \brief This class provides an interface to process a TTree dataset
22 /// in parallel with multi-process technology
23 ///
24 /// ###ROOT::TTreeProcessorMP::Process
25 /// The possible usages of the Process method are the following:\n
26 /// * Process(<dataset>, F func, const std::string& treeName, ULong64_t nToProcess):
27 /// func is executed nToProcess times with argument a TTreeReader&, initialized for
28 /// the TTree with name treeName, from the dataset <dataset>. The dataset can be
29 /// expressed as:
30 /// const std::string& fileName -> single file name
31 /// const std::vector<std::string>& fileNames -> vector of file names
32 /// TFileCollection& files -> collection of TFileInfo objects
33 /// TChain& files -> TChain with the file paths
34 /// TTree& tree -> Reference to an existing TTree object
35 ///
36 /// For legacy, the following signature is also supported:
37 /// * Process(<dataset>, TSelector& selector, const std::string& treeName, ULong64_t nToProcess):
38 /// where selector is a TSelector derived class describing the analysis and the other arguments
39 /// have the same meaning as above.
40 ///
41 /// For either set of signatures, the processing function is executed as many times as
42 /// needed by a pool of fNWorkers workers; the number of workers can be passed to the constructor
43 /// or set via SetNWorkers. It defaults to the number of cores.\n
44 /// A collection containing the result of each execution is returned.\n
45 /// **Note:** the user is responsible for the deletion of any object that might
46 /// be created upon execution of func, returned objects included: ROOT::TTreeProcessorMP never
47 /// deletes what it returns, it simply forgets it.\n
48 /// **Note:** that the usage of ROOT::TTreeProcessorMP::Process is indicated only when the task to be
49 /// executed takes more than a few seconds, otherwise the overhead introduced
50 /// by Process will outrun the benefits of parallel execution on most machines.
51 ///
52 /// \param func
53 /// \parblock
54 /// a lambda expression, an std::function, a loaded macro, a
55 /// functor class or a function that takes zero arguments (for the first signature)
56 /// or one (for the second signature).
57 /// \endparblock
58 /// \param args
59 /// \parblock
60 /// a standard container (vector, list, deque), an initializer list
61 /// or a pointer to a TCollection (TList*, TObjArray*, ...).
62 /// \endparblock
63 /// **Note:** the version of ROOT::TTreeProcessorMP::Process that takes a TFileCollection* as argument incurs
64 /// in the overhead of copying data from the TCollection to an STL container. Only
65 /// use it when absolutely necessary.\n
66 /// **Note:** in cases where the function to be executed takes more than
67 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
68 /// in a lambda or via std::bind to give it the right signature.\n
69 /// **Note:** the user should take care of initializing random seeds differently in each
70 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
71 /// might generate the same sequence of pseudo-random numbers.
72 ///
73 /// #### Return value:
74 /// Methods taking 'F func' return the return type of F.
75 /// Methods taking a TSelector return a 'TList *' with the selector output list; the output list
76 /// content is owned by the caller.
77 ///
78 /// #### Examples:
79 ///
80 /// See tutorials/multicore/mp102_readNtuplesFillHistosAndFit.C and tutorials/multicore/mp103__processSelector.C .
81 ///
82 //////////////////////////////////////////////////////////////////////////
83 
84 namespace ROOT {
85 //////////////////////////////////////////////////////////////////////////
86 /// Class constructor.
87 /// nWorkers is the number of times this ROOT session will be forked, i.e.
88 /// the number of workers that will be spawned.
89 TTreeProcessorMP::TTreeProcessorMP(UInt_t nWorkers) : TMPClient(nWorkers)
90 {
91  Reset();
92 }
93 
94 //////////////////////////////////////////////////////////////////////////
95 /// TSelector-based tree processing: memory resident tree
96 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, TEntryList &entries, ULong64_t nToProcess,
97  ULong64_t jFirst)
98 {
99 
100  // Warn for yet unimplemented functionality
101  if (jFirst > 0) {
102  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
103  jFirst = 0;
104  }
105 
106  //prepare environment
107  Reset();
108  UInt_t nWorkers = GetNWorkers();
109  selector.Begin(nullptr);
110 
111  // Check the entry list
112  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
113  //fork
114  TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
115  bool ok = Fork(worker);
116  if(!ok) {
117  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
118  return nullptr;
119  }
120 
121  //divide entries equally between workers
122  fTaskType = ETask::kProcByRange;
123 
124  //tell workers to start processing entries
125  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
126  std::vector<UInt_t> args(nWorkers);
127  std::iota(args.begin(), args.end(), 0);
128  fNProcessed = Broadcast(MPCode::kProcTree, args);
129  if (fNProcessed < nWorkers)
130  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
131  " Some entries might not be processed.");
132 
133  //collect results, distribute new tasks
134  std::vector<TObject*> outLists;
135  Collect(outLists);
136 
137  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
138  FixLists(outLists);
139 
140  PoolUtils::ReduceObjects<TObject *> redfunc;
141  auto outList = static_cast<TList*>(redfunc(outLists));
142 
143  // Import the resulting list in the selector
144  selector.ImportOutput(outList);
145  // outList is empty after this: just delete it
146  delete outList;
147 
148  // Finalize the selector tasks
149  selector.Terminate();
150 
151  //clean-up and return
152  ReapWorkers();
153  fTaskType = ETask::kNoTask;
154  return selector.GetOutputList();
155 }
156 
157 //////////////////////////////////////////////////////////////////////////
158 /// TSelector-based tree processing: dataset as a vector of files
159 TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
160  const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
161 {
162 
163  // Warn for yet unimplemented functionality
164  if (jFirst > 0) {
165  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
166  jFirst = 0;
167  }
168 
169  //prepare environment
170  Reset();
171  UInt_t nWorkers = GetNWorkers();
172  selector.Begin(nullptr);
173 
174  // Check the entry list
175  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
176  //fork
177  TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
178  bool ok = Fork(worker);
179  if (!ok) {
180  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation");
181  return nullptr;
182  }
183 
184  Int_t procByFile = gEnv->GetValue("MultiProc.TestProcByFile", 0);
185 
186  if (procByFile) {
187  if (fileNames.size() < nWorkers) {
188  // TTree entry granularity: for each file, we divide entries equally between workers
189  fTaskType = ETask::kProcByRange;
190  // Tell workers to start processing entries
191  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
192  std::vector<UInt_t> args(nWorkers);
193  std::iota(args.begin(), args.end(), 0);
194  fNProcessed = Broadcast(MPCode::kProcRange, args);
195  if (fNProcessed < nWorkers)
196  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
197  " Some entries might not be processed");
198  } else {
199  // File granularity: each worker processes one whole file as a single task
200  fTaskType = ETask::kProcByFile;
201  fNToProcess = fileNames.size();
202  std::vector<UInt_t> args(nWorkers);
203  std::iota(args.begin(), args.end(), 0);
204  fNProcessed = Broadcast(MPCode::kProcFile, args);
205  if (fNProcessed < nWorkers)
206  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
207  " Some entries might not be processed.");
208  }
209  } else {
210  // TTree entry granularity: for each file, we divide entries equally between workers
211  fTaskType = ETask::kProcByRange;
212  // Tell workers to start processing entries
213  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
214  std::vector<UInt_t> args(nWorkers);
215  std::iota(args.begin(), args.end(), 0);
216  fNProcessed = Broadcast(MPCode::kProcRange, args);
217  if (fNProcessed < nWorkers)
218  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers."
219  " Some entries might not be processed.");
220  }
221 
222  // collect results, distribute new tasks
223  std::vector<TObject*> outLists;
224  Collect(outLists);
225 
226  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
227  FixLists(outLists);
228 
229  PoolUtils::ReduceObjects<TObject *> redfunc;
230  auto outList = static_cast<TList*>(redfunc(outLists));
231 
232  // Import the resulting list in the selector
233  selector.ImportOutput(outList);
234  // outList is empty after this: just delete it
235  delete outList;
236 
237  // Finalize the selector tasks
238  selector.Terminate();
239 
240  //clean-up and return
241  ReapWorkers();
242  fTaskType = ETask::kNoTask;
243 
244  return selector.GetOutputList();
245 }
246 
247 //////////////////////////////////////////////////////////////////////////
248 /// TSelector-based tree processing: dataset as a TFileCollection
249 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, TEntryList &entries,
250  const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
251 {
252  std::vector<std::string> fileNames(files.GetNFiles());
253  UInt_t count = 0;
254  for(auto f : *static_cast<THashList*>(files.GetList()))
255  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
256 
257  TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
258  return rl;
259 }
260 
261 //////////////////////////////////////////////////////////////////////////
262 /// TSelector-based tree processing: dataset as a TChain
263 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries, const std::string &treeName,
264  ULong64_t nToProcess, ULong64_t firstEntry)
265 {
266  TObjArray* filelist = files.GetListOfFiles();
267  std::vector<std::string> fileNames(filelist->GetEntries());
268  UInt_t count = 0;
269  for(auto f : *filelist)
270  fileNames[count++] = f->GetTitle();
271 
272  return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
273 }
274 
275 //////////////////////////////////////////////////////////////////////////
276 /// TSelector-based tree processing: dataset as a single file
277 TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, TEntryList &entries,
278  const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
279 {
280  std::vector<std::string> singleFileName(1, fileName);
281  return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
282 }
283 
284 ///
285 /// No TEntryList versions of selector processor
286 ///
287 
288 TList *TTreeProcessorMP::Process(const std::vector<std::string> &fileNames, TSelector &selector,
289  const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
290 {
291  TEntryList noelist;
292  return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
293 }
294 
295 TList *TTreeProcessorMP::Process(const std::string &fileName, TSelector &selector, const std::string &treeName,
296  ULong64_t nToProcess, ULong64_t jFirst)
297 {
298  TEntryList noelist;
299  return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
300 }
301 
302 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, const std::string &treeName,
303  ULong64_t nToProcess, ULong64_t jFirst)
304 {
305  TEntryList noelist;
306  return Process(files, selector, noelist, treeName, nToProcess, jFirst);
307 }
308 
309 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, const std::string &treeName, ULong64_t nToProcess,
310  ULong64_t jFirst)
311 {
312  TEntryList noelist;
313  return Process(files, selector, noelist, treeName, nToProcess, jFirst);
314 }
315 
316 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, ULong64_t nToProcess, ULong64_t jFirst)
317 {
318  TEntryList noelist;
319  return Process(tree, selector, noelist, nToProcess, jFirst);
320 }
321 
322 /// Fix list of lists before merging (to avoid errors about duplicated objects)
323 void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
324 
325  // The first element must be a TList instead of a TSelector List, to avoid duplicate problems with merging
326  TList *firstlist = new TList;
327  TList *oldlist = (TList *) lists[0];
328  TIter nxo(oldlist);
329  TObject *o = 0;
330  while ((o = nxo())) { firstlist->Add(o); }
331  oldlist->SetOwner(kFALSE);
332  lists.erase(lists.begin());
333  lists.insert(lists.begin(), firstlist);
334  delete oldlist;
335 }
336 
337 //////////////////////////////////////////////////////////////////////////
338 /// Reset TTreeProcessorMP's state.
339 void TTreeProcessorMP::Reset()
340 {
341  fNProcessed = 0;
342  fNToProcess = 0;
343  fTaskType = ETask::kNoTask;
344 }
345 
346 //////////////////////////////////////////////////////////////////////////
347 /// Reply to a worker who is idle.
348 /// If still events to process, tell the worker. Otherwise
349 /// ask for a result
350 void TTreeProcessorMP::ReplyToIdle(TSocket *s)
351 {
352  if (fNProcessed < fNToProcess) {
353  //we are executing a "greedy worker" task
354  if (fTaskType == ETask::kProcByRange)
355  MPSend(s, MPCode::kProcRange, fNProcessed);
356  else if (fTaskType == ETask::kProcByFile)
357  MPSend(s, MPCode::kProcFile, fNProcessed);
358  ++fNProcessed;
359  } else
360  MPSend(s, MPCode::kSendResult);
361 }
362 
363 } // namespace ROOT