Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProcessExecutor.hxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
15 
16 #include "MPCode.h"
17 #include "MPSendRecv.h"
18 #include "PoolUtils.h"
19 #include "TError.h"
20 #include "TFileCollection.h"
21 #include "TFileInfo.h"
22 #include "THashList.h"
23 #include "TMPClient.h"
24 #include "ROOT/TExecutor.hxx"
25 #include "TMPWorkerExecutor.h"
26 #include <algorithm> //std::generate
27 #include <numeric> //std::iota
28 #include <string>
29 #include <type_traits> //std::result_of, std::enable_if
30 #include <functional> //std::reference_wrapper
31 #include <vector>
32 
33 namespace ROOT {
34 
35 class TProcessExecutor : public TExecutor<TProcessExecutor>, private TMPClient {
36 public:
37  explicit TProcessExecutor(unsigned nWorkers = 0); //default number of workers is the number of processors
38  ~TProcessExecutor() = default;
39  //it doesn't make sense for a TProcessExecutor to be copied
40  TProcessExecutor(const TProcessExecutor &) = delete;
41  TProcessExecutor &operator=(const TProcessExecutor &) = delete;
42 
43  // Map
44  using TExecutor<TProcessExecutor>::Map;
45  template<class F, class Cond = noReferenceCond<F>>
46  auto Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
47  template<class F, class INTEGER, class Cond = noReferenceCond<F, INTEGER>>
48  auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
49  template<class F, class T, class Cond = noReferenceCond<F, T>>
50  auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
51 
52  void SetNWorkers(unsigned n) { TMPClient::SetNWorkers(n); }
53  unsigned GetNWorkers() const { return TMPClient::GetNWorkers(); }
54 
55  using TExecutor<TProcessExecutor>::MapReduce;
56  template<class F, class R, class Cond = noReferenceCond<F>>
57  auto MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type;
58  template<class F, class T, class R, class Cond = noReferenceCond<F, T>>
59  auto MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type;
60 
61  using TExecutor<TProcessExecutor>::Reduce;
62  template<class T, class R> T Reduce(const std::vector<T> &objs, R redfunc);
63 
64 private:
65  template<class T> void Collect(std::vector<T> &reslist);
66  template<class T> void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
67 
68  void Reset();
69  void ReplyToFuncResult(TSocket *s);
70  void ReplyToIdle(TSocket *s);
71 
72  unsigned fNProcessed; ///< number of arguments already passed to the workers
73  unsigned fNToProcess; ///< total number of arguments to pass to the workers
74 
75  /// A collection of the types of tasks that TProcessExecutor can execute.
76  /// It is used to interpret in the right way and properly reply to the
77  /// messages received (see, for example, TProcessExecutor::HandleInput)
78  enum class ETask : unsigned char {
79  kNoTask, ///< no task is being executed
80  kMap, ///< a Map method with no arguments is being executed
81  kMapWithArg, ///< a Map method with arguments is being executed
82  kMapRed, ///< a MapReduce method with no arguments is being executed
83  kMapRedWithArg ///< a MapReduce method with arguments is being executed
84  };
85 
86  ETask fTaskType = ETask::kNoTask; ///< the kind of task that is being executed, if any
87 };
88 
89 
90 /************ TEMPLATE METHODS IMPLEMENTATION ******************/
91 
92 //////////////////////////////////////////////////////////////////////////
93 /// Execute func (with no arguments) nTimes in parallel.
94 /// A vector containg executions' results is returned.
95 /// Functions that take more than zero arguments can be executed (with
96 /// fixed arguments) by wrapping them in a lambda or with std::bind.
97 template<class F, class Cond>
98 auto TProcessExecutor::Map(F func, unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>
99 {
100  using retType = decltype(func());
101  //prepare environment
102  Reset();
103  fTaskType = ETask::kMap;
104 
105  //fork max(nTimes, fNWorkers) times
106  unsigned oldNWorkers = GetNWorkers();
107  if (nTimes < oldNWorkers)
108  SetNWorkers(nTimes);
109  TMPWorkerExecutor<F> worker(func);
110  bool ok = Fork(worker);
111  SetNWorkers(oldNWorkers);
112  if (!ok)
113  {
114  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
115  return std::vector<retType>();
116  }
117 
118  //give out tasks
119  fNToProcess = nTimes;
120  std::vector<retType> reslist;
121  reslist.reserve(fNToProcess);
122  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
123 
124  //collect results, give out other tasks if needed
125  Collect(reslist);
126 
127  //clean-up and return
128  ReapWorkers();
129  fTaskType = ETask::kNoTask;
130  return reslist;
131 }
132 
133 //////////////////////////////////////////////////////////////////////////
134 /// Execute func in parallel, taking an element of an
135 /// std::vector as argument.
136 /// A vector containg executions' results is returned.
137 // actual implementation of the Map method. all other calls with arguments eventually
138 // call this one
139 template<class F, class T, class Cond>
140 auto TProcessExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
141 {
142  //check whether func is callable
143  using retType = decltype(func(args.front()));
144  //prepare environment
145  Reset();
146  fTaskType = ETask::kMapWithArg;
147 
148  //fork max(args.size(), fNWorkers) times
149  //N.B. from this point onwards, args is filled with undefined (but valid) values, since TMPWorkerExecutor moved its content away
150  unsigned oldNWorkers = GetNWorkers();
151  if (args.size() < oldNWorkers)
152  SetNWorkers(args.size());
153  TMPWorkerExecutor<F, T> worker(func, args);
154  bool ok = Fork(worker);
155  SetNWorkers(oldNWorkers);
156  if (!ok)
157  {
158  Error("TProcessExecutor::Map", "[E][C] Could not fork. Aborting operation.");
159  return std::vector<retType>();
160  }
161 
162  //give out tasks
163  fNToProcess = args.size();
164  std::vector<retType> reslist;
165  reslist.reserve(fNToProcess);
166  std::vector<unsigned> range(fNToProcess);
167  std::iota(range.begin(), range.end(), 0);
168  fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
169 
170  //collect results, give out other tasks if needed
171  Collect(reslist);
172 
173  //clean-up and return
174  ReapWorkers();
175  fTaskType = ETask::kNoTask;
176  return reslist;
177 }
178 
179 //////////////////////////////////////////////////////////////////////////
180 /// Execute func in parallel, taking an element of a
181 /// sequence as argument.
182 /// A vector containg executions' results is returned.
183 template<class F, class INTEGER, class Cond>
184 auto TProcessExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>
185 {
186  std::vector<INTEGER> vargs(args.size());
187  std::copy(args.begin(), args.end(), vargs.begin());
188  const auto &reslist = Map(func, vargs);
189  return reslist;
190 }
191 
192 //////////////////////////////////////////////////////////////////////////
193 /// This method behaves just like Map, but an additional redfunc function
194 /// must be provided. redfunc is applied to the vector Map would return and
195 /// must return the same type as func. In practice, redfunc can be used to
196 /// "squash" the vector returned by Map into a single object by merging,
197 /// adding, mixing the elements of the vector.
198 template<class F, class R, class Cond>
199 auto TProcessExecutor::MapReduce(F func, unsigned nTimes, R redfunc) -> typename std::result_of<F()>::type
200 {
201  using retType = decltype(func());
202  //prepare environment
203  Reset();
204  fTaskType= ETask::kMapRed;
205 
206  //fork max(nTimes, fNWorkers) times
207  unsigned oldNWorkers = GetNWorkers();
208  if (nTimes < oldNWorkers)
209  SetNWorkers(nTimes);
210  TMPWorkerExecutor<F, void, R> worker(func, redfunc);
211  bool ok = Fork(worker);
212  SetNWorkers(oldNWorkers);
213  if (!ok) {
214  std::cerr << "[E][C] Could not fork. Aborting operation\n";
215  return retType();
216  }
217 
218  //give workers their first task
219  fNToProcess = nTimes;
220  std::vector<retType> reslist;
221  reslist.reserve(fNToProcess);
222  fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
223 
224  //collect results/give workers their next task
225  Collect(reslist);
226 
227  //clean-up and return
228  ReapWorkers();
229  fTaskType= ETask::kNoTask;
230  return redfunc(reslist);
231 }
232 
233 //////////////////////////////////////////////////////////////////////////
234 /// This method behaves just like Map, but an additional redfunc function
235 /// must be provided. redfunc is applied to the vector Map would return and
236 /// must return the same type as func. In practice, redfunc can be used to
237 /// "squash" the vector returned by Map into a single object by merging,
238 /// adding, mixing the elements of the vector.
239 template<class F, class T, class R, class Cond>
240 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) -> typename std::result_of<F(T)>::type
241 {
242 
243  using retType = decltype(func(args.front()));
244  //prepare environment
245  Reset();
246  fTaskType= ETask::kMapRedWithArg;
247 
248  //fork max(args.size(), fNWorkers) times
249  unsigned oldNWorkers = GetNWorkers();
250  if (args.size() < oldNWorkers)
251  SetNWorkers(args.size());
252  TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
253  bool ok = Fork(worker);
254  SetNWorkers(oldNWorkers);
255  if (!ok) {
256  std::cerr << "[E][C] Could not fork. Aborting operation\n";
257  return decltype(func(args.front()))();
258  }
259 
260  //give workers their first task
261  fNToProcess = args.size();
262  std::vector<retType> reslist;
263  reslist.reserve(fNToProcess);
264  std::vector<unsigned> range(fNToProcess);
265  std::iota(range.begin(), range.end(), 0);
266  fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
267 
268  //collect results/give workers their next task
269  Collect(reslist);
270 
271  ReapWorkers();
272  fTaskType= ETask::kNoTask;
273  return Reduce(reslist, redfunc);
274 }
275 
276 //////////////////////////////////////////////////////////////////////////
277 /// "Reduce" an std::vector into a single object by passing a
278 /// function as the second argument defining the reduction operation.
279 template<class T, class R>
280 T TProcessExecutor::Reduce(const std::vector<T> &objs, R redfunc)
281 {
282  // check we can apply reduce to objs
283  static_assert(std::is_same<decltype(redfunc(objs)), T>::value, "redfunc does not have the correct signature");
284  return redfunc(objs);
285 }
286 
287 //////////////////////////////////////////////////////////////////////////
288 /// Handle message and reply to the worker
289 template<class T>
290 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
291 {
292  unsigned code = msg.first;
293  if (code == MPCode::kFuncResult) {
294  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
295  ReplyToFuncResult(s);
296  } else if (code == MPCode::kIdling) {
297  ReplyToIdle(s);
298  } else if(code == MPCode::kProcResult) {
299  if(msg.second != nullptr)
300  reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
301  MPSend(s, MPCode::kShutdownOrder);
302  } else if(code == MPCode::kProcError) {
303  const char *str = ReadBuffer<const char*>(msg.second.get());
304  Error("TProcessExecutor::HandlePoolCode", "[E][C] a worker encountered an error: %s\n"
305  "Continuing execution ignoring these entries.", str);
306  ReplyToIdle(s);
307  delete [] str;
308  } else {
309  // UNKNOWN CODE
310  Error("TProcessExecutor::HandlePoolCode", "[W][C] unknown code received from server. code=%d", code);
311  }
312 }
313 
314 //////////////////////////////////////////////////////////////////////////
315 /// Listen for messages sent by the workers and call the appropriate handler function.
316 /// TProcessExecutor::HandlePoolCode is called on messages with a code < 1000 and
317 /// TMPClient::HandleMPCode is called on messages with a code >= 1000.
318 template<class T>
319 void TProcessExecutor::Collect(std::vector<T> &reslist)
320 {
321  TMonitor &mon = GetMonitor();
322  mon.ActivateAll();
323  while (mon.GetActive() > 0) {
324  TSocket *s = mon.Select();
325  MPCodeBufPair msg = MPRecv(s);
326  if (msg.first == MPCode::kRecvError) {
327  Error("TProcessExecutor::Collect", "[E][C] Lost connection to a worker");
328  Remove(s);
329  } else if (msg.first < 1000)
330  HandlePoolCode(msg, s, reslist);
331  else
332  HandleMPCode(msg, s);
333  }
334 }
335 
336 } // ROOT namespace
337 
338 #endif