Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TThreadExecutor.cxx
Go to the documentation of this file.
2 #include "ROOT/TTaskGroup.hxx"
3 
4 #if !defined(_MSC_VER)
5 #pragma GCC diagnostic push
6 #pragma GCC diagnostic ignored "-Wshadow"
7 #endif
8 
9 #include "tbb/tbb.h"
10 
11 #if !defined(_MSC_VER)
12 #pragma GCC diagnostic pop
13 #endif
14 
15 //////////////////////////////////////////////////////////////////////////
16 ///
17 /// \class ROOT::TThreadExecutor
18 /// \ingroup Parallelism
19 /// \brief This class provides a simple interface to execute the same task
20 /// multiple times in parallel, possibly with different arguments every
21 /// time. This mimics the behaviour of python's pool.Map method.
22 ///
23 /// ### ROOT::TThreadExecutor::Map
24 /// This class inherits its interfaces from ROOT::TExecutor\n.
25 /// The two possible usages of the Map method are:\n
26 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
27 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
28 ///
29 /// For either signature, func is executed as many times as needed by a pool of
30 /// nThreads threads; It defaults to the number of cores.\n
31 /// A collection containing the result of each execution is returned.\n
32 /// **Note:** the user is responsible for the deletion of any object that might
33 /// be created upon execution of func, returned objects included: ROOT::TThreadExecutor never
34 /// deletes what it returns, it simply forgets it.\n
35 ///
36 /// \param func
37 /// \parblock
38 /// a lambda expression, an std::function, a loaded macro, a
39 /// functor class or a function that takes zero arguments (for the first signature)
40 /// or one (for the second signature).
41 /// \endparblock
42 /// \param args
43 /// \parblock
44 /// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
45 /// An integer only for the first.
46 /// \endparblock
47 /// **Note:** in cases where the function to be executed takes more than
48 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
49 /// in a lambda or via std::bind to give it the right signature.\n
50 ///
51 /// #### Return value:
52 /// An std::vector. The elements in the container
53 /// will be the objects returned by func.
54 ///
55 ///
56 /// #### Examples:
57 ///
58 /// ~~~{.cpp}
59 /// root[] ROOT::TThreadExecutor pool; auto hists = pool.Map(CreateHisto, 10);
60 /// root[] ROOT::TThreadExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
61 /// ~~~
62 ///
63 /// ### ROOT::TThreadExecutor::MapReduce
64 /// This set of methods behaves exactly like Map, but takes an additional
65 /// function as a third argument. This function is applied to the set of
66 /// objects returned by the corresponding Map execution to "squash" them
67 /// to a single object. This function should be independent of the size of
68 /// the vector returned by Map due to optimization of the number of chunks.
69 ///
70 /// If this function is a binary operator, the "squashing" will be performed in parallel.
71 /// This is exclusive to ROOT::TThreadExecutor and not any other ROOT::TExecutor-derived classes.\n
72 /// An integer can be passed as the fourth argument indicating the number of chunks we want to divide our work in.
73 /// This may be useful to avoid the overhead introduced when running really short tasks.
74 ///
75 /// #### Examples:
76 /// ~~~{.cpp}
77 /// root[] ROOT::TThreadExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
78 /// root[] ROOT::TThreadExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
79 /// ~~~
80 ///
81 //////////////////////////////////////////////////////////////////////////
82 
83 /*
84 VERY IMPORTANT NOTE ABOUT WORK ISOLATION
85 
86 We enclose the parallel_for and parallel_reduce invocations in a
87 task_arena::isolate because we want to prevent a thread to start executing an
88 outer task when the task it's running spawned subtasks, e.g. with a parallel_for,
89 and is waiting on inner tasks to be completed.
90 
91 While this change has a negligible performance impact, it has benefits for
92 several applications, for example big parallelised HEP frameworks and
93 RDataFrame analyses.
94 - For HEP Frameworks, without work isolation, it can happen that a huge
95 framework task is pulled by a yielding ROOT task.
96 This causes to delay the processing of the event which is interrupted by the
97 long task.
98 For example, work isolation avoids that during the wait due to the parallel
99 flushing of baskets, a very long simulation task is pulled in by the idle task.
100 - For RDataFrame analyses we want to guarantee that each entry is processed from
101 the beginning to the end without TBB interrupting it to pull in other work items.
102 As a corollary, the usage of ROOT (or TBB in work isolation mode) in actions
103 and transformations guarantee that each entry is processed from the beginning to
104 the end without being interrupted by the processing of outer tasks.
105 */
106 
107 namespace ROOT {
108 namespace Internal {
109 
110 /// A helper function to implement the TThreadExecutor::ParallelReduce methods
111 template<typename T>
112 static T ParallelReduceHelper(const std::vector<T> &objs, const std::function<T(T a, T b)> &redfunc)
113 {
114  using BRange_t = tbb::blocked_range<decltype(objs.begin())>;
115 
116  auto pred = [redfunc](BRange_t const & range, T init) {
117  return std::accumulate(range.begin(), range.end(), init, redfunc);
118  };
119 
120  BRange_t objRange(objs.begin(), objs.end());
121 
122  return tbb::this_task_arena::isolate([&]{
123  return tbb::parallel_reduce(objRange, T{}, pred, redfunc);
124  });
125 
126 }
127 
128 } // End NS Internal
129 } // End NS ROOT
130 
131 namespace ROOT {
132 
133  //////////////////////////////////////////////////////////////////////////
134  /// Class constructor.
135  /// If the scheduler is active, gets a pointer to it.
136  /// If not, initializes the pool of threads with the number of logical threads supported by the hardware.
137  TThreadExecutor::TThreadExecutor(): TThreadExecutor::TThreadExecutor(tbb::task_scheduler_init::default_num_threads()) {}
138  //////////////////////////////////////////////////////////////////////////
139  /// Class constructor.
140  /// nThreads is the number of threads that will be spawned. If the scheduler is active (ImplicitMT enabled, another TThreadExecutor instance),
141  /// it won't change the number of threads.
142  TThreadExecutor::TThreadExecutor(UInt_t nThreads)
143  {
144  fSched = ROOT::Internal::GetPoolManager(nThreads);
145  }
146 
147  void TThreadExecutor::ParallelFor(unsigned int start, unsigned int end, unsigned step, const std::function<void(unsigned int i)> &f)
148  {
149  tbb::this_task_arena::isolate([&]{
150  tbb::parallel_for(start, end, step, f);
151  });
152  }
153 
154  double TThreadExecutor::ParallelReduce(const std::vector<double> &objs, const std::function<double(double a, double b)> &redfunc)
155  {
156  return ROOT::Internal::ParallelReduceHelper<double>(objs, redfunc);
157  }
158 
159  float TThreadExecutor::ParallelReduce(const std::vector<float> &objs, const std::function<float(float a, float b)> &redfunc)
160  {
161  return ROOT::Internal::ParallelReduceHelper<float>(objs, redfunc);
162  }
163 
164  unsigned TThreadExecutor::GetPoolSize(){
165  return ROOT::Internal::TPoolManager::GetPoolSize();
166  }
167 
168 }