Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TTreeProcessorMP.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TTreeProcessorMP
14 #define ROOT_TTreeProcessorMP
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TChain.h"
20 #include "TChainElement.h"
21 #include "TError.h"
22 #include "TFileCollection.h"
23 #include "TFileInfo.h"
24 #include "THashList.h"
25 #include "TMPClient.h"
26 #include "TMPWorkerTree.h"
27 #include "TSelector.h"
28 #include "TTreeReader.h"
29 #include <algorithm> //std::generate
30 #include <numeric> //std::iota
31 #include <string>
32 #include <type_traits> //std::result_of, std::enable_if
33 #include <functional> //std::reference_wrapper
34 #include <vector>
35 
36 namespace ROOT {
37 
38 class TTreeProcessorMP : private TMPClient {
39 public:
40  explicit TTreeProcessorMP(unsigned nWorkers = 0); //default number of workers is the number of processors
41  ~TTreeProcessorMP() = default;
42  //it doesn't make sense for a TTreeProcessorMP to be copied
43  TTreeProcessorMP(const TTreeProcessorMP &) = delete;
44  TTreeProcessorMP &operator=(const TTreeProcessorMP &) = delete;
45 
46  /// \brief Process a TTree dataset with a functor
47  /// \tparam F functor returning a pointer to TObject or inheriting classes and
48  /// taking a TTreeReader& (both enforced at compile-time)
49  ///
50  /// Dataset definition:
51  /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
52  /// \param[in] fileName string with the path of the files with the TTree to process
53  /// \param[in] collection TFileCollection with the files with the TTree to process
54  /// \param[in] chain TChain with the files with the TTree to process
55  /// \param[in] tree TTree to process
56  ///
57  /// \param[in] entries TEntryList to filter the dataset
58  /// \param[in] treeName Name of the TTree to process
59  /// \param[in] nToProcess Number of entries to process (0 means all)
60  /// \param[in] jFirst First entry to process (0 means the first of the first file)
61  ///
62  template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
63  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
64  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
65  template<class F> auto Process(const std::string& fileName, F procFunc, TEntryList &entries,
66  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
67  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
68  template<class F> auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
69  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
70  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
71  template<class F> auto Process(TChain& chain, F procFunc, TEntryList &entries,
72  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
73  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
74  template<class F> auto Process(TTree& tree, F procFunc, TEntryList &entries,
75  ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
76  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
77 
78  /// \brief Process a TTree dataset with a functor: version without entry list
79  /// \tparam F functor returning a pointer to TObject or inheriting classes and
80  /// taking a TTreeReader& (both enforced at compile-time)
81  ///
82  /// Dataset definition:
83  /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
84  /// \param[in] fileName string with the path of the files with the TTree to process
85  /// \param[in] collection TFileCollection with the files with the TTree to process
86  /// \param[in] chain TChain with the files with the TTree to process
87  /// \param[in] tree TTree to process
88  ///
89  /// \param[in] treeName Name of the TTree to process
90  /// \param[in] nToProcess Number of entries to process (0 means all)
91  /// \param[in] jFirst First entry to process (0 means the first of the first file)
92  ///
93  template<class F> auto Process(const std::vector<std::string>& fileNames, F procFunc,
94  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
95  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
96  template<class F> auto Process(const std::string& fileName, F procFunc,
97  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
98  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
99  template<class F> auto Process(TFileCollection& files, F procFunc,
100  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
101  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
102  template<class F> auto Process(TChain& files, F procFunc,
103  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
104  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
105  template<class F> auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
106  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
107 
108 
109  /// \brief Process a TTree dataset with a selector
110  ///
111  /// Dataset definition:
112  /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
113  /// \param[in] fileName string with the path of the files with the TTree to process
114  /// \param[in] collection TFileCollection with the files with the TTree to process
115  /// \param[in] chain TChain with the files with the TTree to process
116  /// \param[in] tree TTree to process
117  ///
118  /// \param[in] selector Instance of TSelector to be applied to the dataset
119  /// \param[in] entries TEntryList to filter the dataset
120  /// \param[in] treeName Name of the TTree to process
121  /// \param[in] nToProcess Number of entries to process (0 means all)
122  /// \param[in] jFirst First entry to process (0 means the first of the first file)
123  ///
124  // these versions require a TSelector
125  TList* Process(const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
126  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
127  TList* Process(const std::string &fileName, TSelector& selector, TEntryList &entries,
128  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
129  TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
130  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
131  TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
132  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
133  TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
134  ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
135 
136 
137  /// \brief Process a TTree dataset with a selector: version without entry list
138  ///
139  /// Dataset definition:
140  /// \param[in] fileNames vector of strings with the paths of the files with the TTree to process
141  /// \param[in] fileName string with the path of the files with the TTree to process
142  /// \param[in] collection TFileCollection with the files with the TTree to process
143  /// \param[in] chain TChain with the files with the TTree to process
144  /// \param[in] tree TTree to process
145  ///
146  /// \param[in] selector Instance of TSelector to be applied to the dataset
147  /// \param[in] treeName Name of the TTree to process
148  /// \param[in] nToProcess Number of entries to process (0 means all)
149  /// \param[in] jFirst First entry to process (0 means the first of the first file)
150  ///
151  // these versions require a TSelector
152  TList* Process(const std::vector<std::string>& fileNames, TSelector& selector,
153  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
154  TList* Process(const std::string &fileName, TSelector& selector,
155  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
156  TList* Process(TFileCollection& files, TSelector& selector,
157  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
158  TList* Process(TChain& files, TSelector& selector,
159  const std::string& treeName = "", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
160  TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
161 
162  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
163  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
164 
165 private:
166  template<class T> void Collect(std::vector<T> &reslist);
167  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
168 
169  void FixLists(std::vector<TObject*> &lists);
170  void Reset();
171  void ReplyToIdle(TSocket *s);
172 
173  unsigned fNProcessed; ///< number of arguments already passed to the workers
174  unsigned fNToProcess; ///< total number of arguments to pass to the workers
175 
176  /// A collection of the types of tasks that TTreeProcessorMP can execute.
177  /// It is used to interpret in the right way and properly reply to the
178  /// messages received (see, for example, TTreeProcessorMP::HandleInput)
179  enum class ETask : unsigned char {
180  kNoTask, ///< no task is being executed
181  kProcByRange, ///< a Process method is being executed and each worker will process a certain range of each file
182  kProcByFile ///< a Process method is being executed and each worker will process a different file
183  };
184 
185  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
186 };
187 
188 template<class F>
189 auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
190  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
191  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
192 {
193  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
194  static_assert(std::is_constructible<TObject*, retType>::value,
195  "procFunc must return a pointer to a class inheriting from TObject,"
196  " and must take a reference to TTreeReader as the only argument");
197 
198  // Warn for yet unimplemented functionality
199  if (jFirst > 0) {
200  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
201  jFirst = 0;
202  }
203 
204  //prepare environment
205  Reset();
206  unsigned nWorkers = GetNWorkers();
207 
208  // Check th entry list
209  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
210  //fork
211  TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
212  bool ok = Fork(worker);
213  if(!ok) {
214  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
215  return nullptr;
216  }
217 
218 
219  if(fileNames.size() < nWorkers) {
220  //TTree entry granularity. For each file, we divide entries equally between workers
221  fTaskType = ETask::kProcByRange;
222  //Tell workers to start processing entries
223  fNToProcess = nWorkers*fileNames.size(); //this is the total number of ranges that will be processed by all workers cumulatively
224  std::vector<unsigned> args(nWorkers);
225  std::iota(args.begin(), args.end(), 0);
226  fNProcessed = Broadcast(MPCode::kProcRange, args);
227  if(fNProcessed < nWorkers)
228  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
229  } else {
230  //file granularity. each worker processes one whole file as a single task
231  fTaskType = ETask::kProcByFile;
232  fNToProcess = fileNames.size();
233  std::vector<unsigned> args(nWorkers);
234  std::iota(args.begin(), args.end(), 0);
235  fNProcessed = Broadcast(MPCode::kProcFile, args);
236  if(fNProcessed < nWorkers)
237  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
238  }
239 
240  //collect results, distribute new tasks
241  std::vector<TObject*> reslist;
242  Collect(reslist);
243 
244  //merge
245  PoolUtils::ReduceObjects<TObject *> redfunc;
246  auto res = redfunc(reslist);
247 
248  //clean-up and return
249  ReapWorkers();
250  fTaskType = ETask::kNoTask;
251  return static_cast<retType>(res);
252 }
253 
254 
255 template<class F>
256 auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc, TEntryList &entries,
257  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
258  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
259 {
260  std::vector<std::string> singleFileName(1, fileName);
261  return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
262 }
263 
264 
265 template<class F>
266 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc, TEntryList &entries,
267  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
268  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
269 {
270  std::vector<std::string> fileNames(files.GetNFiles());
271  unsigned count = 0;
272  for(auto f : *static_cast<THashList*>(files.GetList()))
273  fileNames[count++] = static_cast<TFileInfo*>(f)->GetCurrentUrl()->GetUrl();
274 
275  return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
276 }
277 
278 
279 template<class F>
280 auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
281  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
282  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
283 {
284  TObjArray* filelist = files.GetListOfFiles();
285  std::vector<std::string> fileNames(filelist->GetEntries());
286  unsigned count = 0;
287  for(auto f : *filelist)
288  fileNames[count++] = f->GetTitle();
289 
290  return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
291 }
292 
293 
294 template<class F>
295 auto TTreeProcessorMP::Process(TTree& tree, F procFunc, TEntryList &entries,
296  ULong64_t nToProcess, ULong64_t jFirst)
297  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
298 {
299  using retType = typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
300  static_assert(std::is_constructible<TObject*, retType>::value, "procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
301 
302  // Warn for yet unimplemented functionality
303  if (jFirst > 0) {
304  Warning("Process", "support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
305  jFirst = 0;
306  }
307 
308  //prepare environment
309  Reset();
310  unsigned nWorkers = GetNWorkers();
311 
312  // Check th entry list
313  TEntryList *elist = (entries.IsValid()) ? &entries : nullptr;
314  //fork
315  TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
316  bool ok = Fork(worker);
317  if(!ok) {
318  Error("TTreeProcessorMP::Process", "[E][C] Could not fork. Aborting operation.");
319  return nullptr;
320  }
321 
322  //divide entries equally between workers
323  fTaskType = ETask::kProcByRange;
324 
325  //tell workers to start processing entries
326  fNToProcess = nWorkers; //this is the total number of ranges that will be processed by all workers cumulatively
327  std::vector<unsigned> args(nWorkers);
328  std::iota(args.begin(), args.end(), 0);
329  fNProcessed = Broadcast(MPCode::kProcTree, args);
330  if(fNProcessed < nWorkers)
331  Error("TTreeProcessorMP::Process", "[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
332 
333  //collect results, distribute new tasks
334  std::vector<TObject*> reslist;
335  Collect(reslist);
336 
337  //merge
338  PoolUtils::ReduceObjects<TObject *> redfunc;
339  auto res = redfunc(reslist);
340 
341  //clean-up and return
342  ReapWorkers();
343  fTaskType = ETask::kNoTask;
344  return static_cast<retType>(res);
345 }
346 
347 
348 ///
349 /// No TEntryList versions of generic processor
350 ///
351 
352 template<class F>
353 auto TTreeProcessorMP::Process(const std::vector<std::string>& fileNames, F procFunc,
354  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
355  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
356 {
357  TEntryList noelist;
358  return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
359 }
360 
361 
362 template<class F>
363 auto TTreeProcessorMP::Process(const std::string& fileName, F procFunc,
364  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
365  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
366 {
367  TEntryList noelist;
368  return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
369 }
370 
371 
372 template<class F>
373 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc,
374  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
375  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
376 {
377  TEntryList noelist;
378  return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
379 }
380 
381 
382 template<class F>
383 auto TTreeProcessorMP::Process(TChain& files, F procFunc,
384  const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
385  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
386 {
387  TEntryList noelist;
388  return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
389 }
390 
391 
392 template<class F>
393 auto TTreeProcessorMP::Process(TTree& tree, F procFunc,
394  ULong64_t nToProcess, ULong64_t jFirst)
395  -> typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
396 {
397  TEntryList noelist;
398  return Process(tree, procFunc, noelist, nToProcess, jFirst);
399 }
400 
401 //////////////////////////////////////////////////////////////////////////
402 /// Handle message and reply to the worker
403 template<class T>
404 void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
405 {
406  unsigned code = msg.first;
407  if (code == MPCode::kIdling) {
408  ReplyToIdle(s);
409  } else if(code == MPCode::kProcResult) {
410  if(msg.second != nullptr)
411  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
412  MPSend(s, MPCode::kShutdownOrder);
413  } else if(code == MPCode::kProcError) {
414  const char *str = ReadBuffer<const char*>(msg.second.get());
415  Error("TTreeProcessorMP::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
416  "Continuing execution ignoring these entries.", str);
417  ReplyToIdle(s);
418  delete [] str;
419  } else {
420  // UNKNOWN CODE
421  Error("TTreeProcessorMP::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
422  }
423 }
424 
425 //////////////////////////////////////////////////////////////////////////
426 /// Listen for messages sent by the workers and call the appropriate handler function.
427 /// TTreeProcessorMP::HandlePoolCode is called on messages with a code < 1000 and
428 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
429 template<class T>
430 void TTreeProcessorMP::Collect(std::vector<T> &reslist)
431 {
432  TMonitor &mon = GetMonitor();
433  mon.ActivateAll();
434  while (mon.GetActive() > 0) {
435  TSocket *s = mon.Select();
436  MPCodeBufPair msg = MPRecv(s);
437  if (msg.first == MPCode::kRecvError) {
438  Error("TTreeProcessorMP::Collect", "[E][C] Lost connection to a worker");
439  Remove(s);
440  } else if (msg.first < 1000)
441  HandlePoolCode(msg, s, reslist);
442  else
443  HandleMPCode(msg, s);
444  }
445 }
446 
447 } // ROOT namespace
448 
449 #endif