Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TThreadExecutor.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Xavier Valls March 2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2006, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadExecutor
13 #define ROOT_TThreadExecutor
14 
15 #include "RConfigure.h"
16 
17 // exclude in case ROOT does not have IMT support
18 #ifndef R__USE_IMT
19 // No need to error out for dictionaries.
20 # if !defined(__ROOTCLING__) && !defined(G__DICTIONARY)
21 # error "Cannot use ROOT::TThreadExecutor without defining R__USE_IMT."
22 # endif
23 #else
24 
25 #include "ROOT/TExecutor.hxx"
26 #include "ROOT/TPoolManager.hxx"
27 #include "TROOT.h"
28 #include "TError.h"
29 #include <functional>
30 #include <memory>
31 #include <numeric>
32 
33 namespace ROOT {
34 
35  class TThreadExecutor: public TExecutor<TThreadExecutor> {
36  public:
37  explicit TThreadExecutor();
38 
39  explicit TThreadExecutor(UInt_t nThreads);
40 
41  TThreadExecutor(TThreadExecutor &) = delete;
42  TThreadExecutor &operator=(TThreadExecutor &) = delete;
43 
44  template<class F>
45  void Foreach(F func, unsigned nTimes, unsigned nChunks = 0);
46  template<class F, class INTEGER>
47  void Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks = 0);
48  /// \cond
49  template<class F, class T>
50  void Foreach(F func, std::initializer_list<T> args, unsigned nChunks = 0);
51  /// \endcond
52  template<class F, class T>
53  void Foreach(F func, std::vector<T> &args, unsigned nChunks = 0);
54  template<class F, class T>
55  void Foreach(F func, const std::vector<T> &args, unsigned nChunks = 0);
56 
57  using TExecutor<TThreadExecutor>::Map;
58  template<class F, class Cond = noReferenceCond<F>>
59  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
60  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
61  auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
62  template<class F, class T, class Cond = noReferenceCond<F, T>>
63  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
64 
65  // // MapReduce
66  // // the late return types also check at compile-time whether redfunc is compatible with func,
67  // // other than checking that func is compatible with the type of arguments.
68  // // a static_assert check in TThreadExecutor::Reduce is used to check that redfunc is compatible with the type returned by func
69  using TExecutor<TThreadExecutor>::MapReduce;
70  template<class F, class R, class Cond = noReferenceCond<F>>
71  auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
72  template<class F, class R, class Cond = noReferenceCond<F>>
73  auto MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type;
74  template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
75  auto MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type;
76  /// \cond
77  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
78  auto MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
79  /// \endcond
80  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
81  auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
82  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
83  auto MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type;
84 
85  using TExecutor<TThreadExecutor>::Reduce;
86  template<class T, class BINARYOP> auto Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()));
87  template<class T, class R> auto Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
88 
89  unsigned GetPoolSize();
90 
91  protected:
92  template<class F, class R, class Cond = noReferenceCond<F>>
93  auto Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type>;
94  template<class F, class INTEGER, class R, class Cond = noReferenceCond<F, INTEGER>>
95  auto Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
96  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
97  auto Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
98  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
99  auto Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type>;
100 
101  private:
102  void ParallelFor(unsigned start, unsigned end, unsigned step, const std::function<void(unsigned int i)> &f);
103  double ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc);
104  float ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc);
105  template<class T, class R>
106  auto SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs));
107 
108  std::shared_ptr<ROOT::Internal::TPoolManager> fSched = nullptr;
109  };
110 
111  /************ TEMPLATE METHODS IMPLEMENTATION ******************/
112 
113  //////////////////////////////////////////////////////////////////////////
114  /// Execute func (with no arguments) nTimes in parallel.
115  /// Functions that take more than zero arguments can be executed (with
116  /// fixed arguments) by wrapping them in a lambda or with std::bind.
117  template<class F>
118  void TThreadExecutor::Foreach(F func, unsigned nTimes, unsigned nChunks) {
119  if (nChunks == 0) {
120  ParallelFor(0U, nTimes, 1, [&](unsigned int){func();});
121  return;
122  }
123 
124  unsigned step = (nTimes + nChunks - 1) / nChunks;
125  auto lambda = [&](unsigned int i)
126  {
127  for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
128  func();
129  }
130  };
131  ParallelFor(0U, nTimes, step, lambda);
132  }
133 
134  //////////////////////////////////////////////////////////////////////////
135  /// Execute func in parallel, taking an element of a
136  /// sequence as argument.
137  template<class F, class INTEGER>
138  void TThreadExecutor::Foreach(F func, ROOT::TSeq<INTEGER> args, unsigned nChunks) {
139  if (nChunks == 0) {
140  ParallelFor(*args.begin(), *args.end(), args.step(), [&](unsigned int i){func(i);});
141  return;
142  }
143  unsigned start = *args.begin();
144  unsigned end = *args.end();
145  unsigned seqStep = args.step();
146  unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
147 
148  auto lambda = [&](unsigned int i)
149  {
150  for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
151  func(i + j);
152  }
153  };
154  ParallelFor(start, end, step, lambda);
155  }
156 
157  /// \cond
158  //////////////////////////////////////////////////////////////////////////
159  /// Execute func in parallel, taking an element of a
160  /// initializer_list as argument.
161  template<class F, class T>
162  void TThreadExecutor::Foreach(F func, std::initializer_list<T> args, unsigned nChunks) {
163  std::vector<T> vargs(std::move(args));
164  Foreach(func, vargs, nChunks);
165  }
166  /// \endcond
167 
168  //////////////////////////////////////////////////////////////////////////
169  /// Execute func in parallel, taking an element of an
170  /// std::vector as argument.
171  template<class F, class T>
172  void TThreadExecutor::Foreach(F func, std::vector<T> &args, unsigned nChunks) {
173  unsigned int nToProcess = args.size();
174  if (nChunks == 0) {
175  ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
176  return;
177  }
178 
179  unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
180  auto lambda = [&](unsigned int i)
181  {
182  for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
183  func(args[i + j]);
184  }
185  };
186  ParallelFor(0U, nToProcess, step, lambda);
187  }
188 
189  //////////////////////////////////////////////////////////////////////////
190  /// Execute func in parallel, taking an element of a std::vector as argument.
191  template<class F, class T>
192  void TThreadExecutor::Foreach(F func, const std::vector<T> &args, unsigned nChunks) {
193  unsigned int nToProcess = args.size();
194  if (nChunks == 0) {
195  ParallelFor(0U, nToProcess, 1, [&](unsigned int i){func(args[i]);});
196  return;
197  }
198 
199  unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
200  auto lambda = [&](unsigned int i)
201  {
202  for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
203  func(args[i + j]);
204  }
205  };
206  ParallelFor(0U, nToProcess, step, lambda);
207  }
208 
209  //////////////////////////////////////////////////////////////////////////
210  /// Execute func (with no arguments) nTimes in parallel.
211  /// A vector containg executions' results is returned.
212  /// Functions that take more than zero arguments can be executed (with
213  /// fixed arguments) by wrapping them in a lambda or with std::bind.
214  template<class F, class Cond>
215  auto TThreadExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type> {
216  using retType = decltype(func());
217  std::vector<retType> reslist(nTimes);
218  auto lambda = [&](unsigned int i)
219  {
220  reslist[i] = func();
221  };
222  ParallelFor(0U, nTimes, 1, lambda);
223 
224  return reslist;
225  }
226 
227  //////////////////////////////////////////////////////////////////////////
228  /// Execute func in parallel, taking an element of a
229  /// sequence as argument.
230  /// A vector containg executions' results is returned.
231  template<class F, class INTEGER, class Cond>
232  auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
233  unsigned start = *args.begin();
234  unsigned end = *args.end();
235  unsigned seqStep = args.step();
236 
237  using retType = decltype(func(start));
238  std::vector<retType> reslist(args.size());
239  auto lambda = [&](unsigned int i)
240  {
241  reslist[i] = func(i);
242  };
243  ParallelFor(start, end, seqStep, lambda);
244 
245  return reslist;
246  }
247 
248  //////////////////////////////////////////////////////////////////////////
249  /// Execute func (with no arguments) nTimes in parallel.
250  /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction;
251  /// A vector containg partial reductions' results is returned.
252  template<class F, class R, class Cond>
253  auto TThreadExecutor::Map(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F()>::type> {
254  if (nChunks == 0)
255  {
256  return Map(func, nTimes);
257  }
258 
259  unsigned step = (nTimes + nChunks - 1) / nChunks;
260  // Avoid empty chunks
261  unsigned actualChunks = (nTimes + step - 1) / step;
262  using retType = decltype(func());
263  std::vector<retType> reslist(actualChunks);
264  auto lambda = [&](unsigned int i)
265  {
266  std::vector<retType> partialResults(std::min(nTimes-i, step));
267  for (unsigned j = 0; j < step && (i + j) < nTimes; j++) {
268  partialResults[j] = func();
269  }
270  reslist[i / step] = Reduce(partialResults, redfunc);
271  };
272  ParallelFor(0U, nTimes, step, lambda);
273 
274  return reslist;
275  }
276 
277  //////////////////////////////////////////////////////////////////////////
278  /// Execute func in parallel, taking an element of an
279  /// std::vector as argument.
280  /// A vector containg executions' results is returned.
281  // actual implementation of the Map method. all other calls with arguments eventually
282  // call this one
283  template<class F, class T, class Cond>
284  auto TThreadExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type> {
285  // //check whether func is callable
286  using retType = decltype(func(args.front()));
287 
288  unsigned int nToProcess = args.size();
289  std::vector<retType> reslist(nToProcess);
290 
291  auto lambda = [&](unsigned int i)
292  {
293  reslist[i] = func(args[i]);
294  };
295 
296  ParallelFor(0U, nToProcess, 1, lambda);
297 
298  return reslist;
299  }
300 
301  //////////////////////////////////////////////////////////////////////////
302  /// Execute func in parallel, taking an element of a
303  /// sequence as argument.
304  /// Divides and groups the executions in nChunks (if it doesn't make sense will reduce the number of chunks) with partial reduction\n
305  /// A vector containg partial reductions' results is returned.
306  template<class F, class INTEGER, class R, class Cond>
307  auto TThreadExecutor::Map(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(INTEGER)>::type> {
308  if (nChunks == 0)
309  {
310  return Map(func, args);
311  }
312 
313  unsigned start = *args.begin();
314  unsigned end = *args.end();
315  unsigned seqStep = args.step();
316  unsigned step = (end - start + nChunks - 1) / nChunks; //ceiling the division
317  // Avoid empty chunks
318  unsigned actualChunks = (end - start + step - 1) / step;
319 
320  using retType = decltype(func(start));
321  std::vector<retType> reslist(actualChunks);
322  auto lambda = [&](unsigned int i)
323  {
324  std::vector<retType> partialResults(std::min(end-i, step));
325  for (unsigned j = 0; j < step && (i + j) < end; j+=seqStep) {
326  partialResults[j] = func(i + j);
327  }
328  reslist[i / step] = Reduce(partialResults, redfunc);
329  };
330  ParallelFor(start, end, step, lambda);
331 
332  return reslist;
333  }
334 
335 /// \cond
336  //////////////////////////////////////////////////////////////////////////
337  /// Execute func in parallel, taking an element of an
338  /// std::vector as argument. Divides and groups the executions in nChunks with partial reduction.
339  /// If it doesn't make sense will reduce the number of chunks.\n
340  /// A vector containg partial reductions' results is returned.
341  template<class F, class T, class R, class Cond>
342  auto TThreadExecutor::Map(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
343  if (nChunks == 0)
344  {
345  return Map(func, args);
346  }
347 
348  unsigned int nToProcess = args.size();
349  unsigned step = (nToProcess + nChunks - 1) / nChunks; //ceiling the division
350  // Avoid empty chunks
351  unsigned actualChunks = (nToProcess + step - 1) / step;
352 
353  using retType = decltype(func(args.front()));
354  std::vector<retType> reslist(actualChunks);
355  auto lambda = [&](unsigned int i)
356  {
357  std::vector<T> partialResults(step);
358  for (unsigned j = 0; j < step && (i + j) < nToProcess; j++) {
359  partialResults[j] = func(args[i + j]);
360  }
361  reslist[i / step] = Reduce(partialResults, redfunc);
362  };
363 
364  ParallelFor(0U, nToProcess, step, lambda);
365 
366  return reslist;
367  }
368 
369  //////////////////////////////////////////////////////////////////////////
370  /// Execute func in parallel, taking an element of an
371  /// std::initializer_list as an argument. Divides and groups the executions in nChunks with partial reduction.
372  /// If it doesn't make sense will reduce the number of chunks.\n
373  /// A vector containg partial reductions' results is returned.
374  template<class F, class T, class R, class Cond>
375  auto TThreadExecutor::Map(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> std::vector<typename std::result_of<F(T)>::type> {
376  std::vector<T> vargs(std::move(args));
377  const auto &reslist = Map(func, vargs, redfunc, nChunks);
378  return reslist;
379  }
380 /// \endcond
381 
382 
383  //////////////////////////////////////////////////////////////////////////
384  /// This method behaves just like Map, but an additional redfunc function
385  /// must be provided. redfunc is applied to the vector Map would return and
386  /// must return the same type as func. In practice, redfunc can be used to
387  /// "squash" the vector returned by Map into a single object by merging,
388  /// adding, mixing the elements of the vector.\n
389  /// The fourth argument indicates the number of chunks we want to divide our work in.
390  template<class F, class R, class Cond>
391  auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type {
392  return Reduce(Map(func, nTimes), redfunc);
393  }
394 
395  template<class F, class R, class Cond>
396  auto TThreadExecutor::MapReduce(F func, unsigned nTimes, R redfunc, unsigned nChunks) -> typename std::result_of<F()>::type {
397  return Reduce(Map(func, nTimes, redfunc, nChunks), redfunc);
398  }
399 
400  template<class F, class INTEGER, class R, class Cond>
401  auto TThreadExecutor::MapReduce(F func, ROOT::TSeq<INTEGER> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(INTEGER)>::type {
402  return Reduce(Map(func, args, redfunc, nChunks), redfunc);
403  }
404  /// \cond
405  template<class F, class T, class R, class Cond>
406  auto TThreadExecutor::MapReduce(F func, std::initializer_list<T> args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
407  return Reduce(Map(func, args, redfunc, nChunks), redfunc);
408  }
409  /// \endcond
410 
411  template<class F, class T, class R, class Cond>
412  auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type {
413  return Reduce(Map(func, args), redfunc);
414  }
415 
416  template<class F, class T, class R, class Cond>
417  auto TThreadExecutor::MapReduce(F func, std::vector<T> &args, R redfunc, unsigned nChunks) -> typename std::result_of<F(T)>::type {
418  return Reduce(Map(func, args, redfunc, nChunks), redfunc);
419  }
420 
421  //////////////////////////////////////////////////////////////////////////
422  /// "Reduce" an std::vector into a single object in parallel by passing a
423  /// binary operator as the second argument to act on pairs of elements of the std::vector.
424  template<class T, class BINARYOP>
425  auto TThreadExecutor::Reduce(const std::vector<T> &objs, BINARYOP redfunc) -> decltype(redfunc(objs.front(), objs.front()))
426  {
427  // check we can apply reduce to objs
428  static_assert(std::is_same<decltype(redfunc(objs.front(), objs.front())), T>::value, "redfunc does not have the correct signature");
429  return ParallelReduce(objs, redfunc);
430  }
431 
432  //////////////////////////////////////////////////////////////////////////
433  /// "Reduce" an std::vector into a single object by passing a
434  /// function as the second argument defining the reduction operation.
435  template<class T, class R>
436  auto TThreadExecutor::Reduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
437  {
438  // check we can apply reduce to objs
439  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
440  return SeqReduce(objs, redfunc);
441  }
442 
443  template<class T, class R>
444  auto TThreadExecutor::SeqReduce(const std::vector<T> &objs, R redfunc) -> decltype(redfunc(objs))
445  {
446  return redfunc(objs);
447  }
448 
449 } // namespace ROOT
450 
451 #endif // R__USE_IMT
452 #endif