Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProcessExecutor.cxx
Go to the documentation of this file.
1 /* @(#)root/multiproc:$Id$ */
2 // Author: Enrico Guiraud July 2015
3 // Modified: G Ganis Jan 2017
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 #include "TEnv.h"
15 
16 //////////////////////////////////////////////////////////////////////////
17 ///
18 /// \class ROOT::TProcessExecutor
19 /// \ingroup Parallelism
20 /// \brief This class provides a simple interface to execute the same task
21 /// multiple times in parallel, possibly with different arguments every
22 /// time. This mimics the behaviour of python's pool.Map method.
23 ///
24 /// ###ROOT::TProcessExecutor::Map
25 /// This class inherits its interfaces from ROOT::TExecutor\n.
26 /// The two possible usages of the Map method are:\n
27 /// * Map(F func, unsigned nTimes): func is executed nTimes with no arguments
28 /// * Map(F func, T& args): func is executed on each element of the collection of arguments args
29 ///
30 /// For either signature, func is executed as many times as needed by a pool of
31 /// fNWorkers workers; the number of workers can be passed to the constructor
32 /// or set via SetNWorkers. It defaults to the number of cores.\n
33 /// A collection containing the result of each execution is returned.\n
34 /// **Note:** the user is responsible for the deletion of any object that might
35 /// be created upon execution of func, returned objects included: ROOT::TProcessExecutor never
36 /// deletes what it returns, it simply forgets it.\n
37 /// **Note:** that the usage of ROOT::TProcessExecutor::Map is indicated only when the task to be
38 /// executed takes more than a few seconds, otherwise the overhead introduced
39 /// by Map will outrun the benefits of parallel execution on most machines.
40 ///
41 /// \param func
42 /// \parblock
43 /// a lambda expression, an std::function, a loaded macro, a
44 /// functor class or a function that takes zero arguments (for the first signature)
45 /// or one (for the second signature).
46 /// \endparblock
47 /// \param args
48 /// \parblock
49 /// a standard vector, a ROOT::TSeq of integer type or an initializer list for the second signature.
50 /// An integer only for the first.
51 /// \endparblock
52 /// **Note:** in cases where the function to be executed takes more than
53 /// zero/one argument but all are fixed except zero/one, the function can be wrapped
54 /// in a lambda or via std::bind to give it the right signature.\n
55 /// **Note:** the user should take care of initializing random seeds differently in each
56 /// process (e.g. using the process id in the seed). Otherwise several parallel executions
57 /// might generate the same sequence of pseudo-random numbers.
58 ///
59 /// #### Return value:
60 /// An std::vector. The elements in the container
61 /// will be the objects returned by func.
62 ///
63 ///
64 /// #### Examples:
65 ///
66 /// ~~~{.cpp}
67 /// root[] ROOT::TProcessExecutor pool; auto hists = pool.Map(CreateHisto, 10);
68 /// root[] ROOT::TProcessExecutor pool(2); auto squares = pool.Map([](int a) { return a*a; }, {1,2,3});
69 /// ~~~
70 ///
71 /// ###ROOT::TProcessExecutor::MapReduce
72 /// This set of methods behaves exactly like Map, but takes an additional
73 /// function as a third argument. This function is applied to the set of
74 /// objects returned by the corresponding Map execution to "squash" them
75 /// to a single object.
76 ///
77 /// ####Examples:
78 /// ~~~{.cpp}
79 /// root[] ROOT::TProcessExecutor pool; auto ten = pool.MapReduce([]() { return 1; }, 10, [](std::vector<int> v) { return std::accumulate(v.begin(), v.end(), 0); })
80 /// root[] ROOT::TProcessExecutor pool; auto hist = pool.MapReduce(CreateAndFillHists, 10, PoolUtils::ReduceObjects);
81 /// ~~~
82 ///
83 //////////////////////////////////////////////////////////////////////////
84 
85 namespace ROOT {
86 //////////////////////////////////////////////////////////////////////////
87 /// Class constructor.
88 /// nWorkers is the number of times this ROOT session will be forked, i.e.
89 /// the number of workers that will be spawned.
90 TProcessExecutor::TProcessExecutor(unsigned nWorkers) : TMPClient(nWorkers)
91 {
92  Reset();
93 }
94 
95 //////////////////////////////////////////////////////////////////////////
96 /// Reset TProcessExecutor's state.
97 void TProcessExecutor::Reset()
98 {
99  fNProcessed = 0;
100  fNToProcess = 0;
101  fTaskType = ETask::kNoTask;
102 }
103 
104 //////////////////////////////////////////////////////////////////////////
105 /// Reply to a worker who just sent a result.
106 /// If another argument to process exists, tell the worker. Otherwise
107 /// send a shutdown order.
108 void TProcessExecutor::ReplyToFuncResult(TSocket *s)
109 {
110  if (fNProcessed < fNToProcess) {
111  //this cannot be a "greedy worker" task
112  if (fTaskType == ETask::kMap)
113  MPSend(s, MPCode::kExecFunc);
114  else if (fTaskType == ETask::kMapWithArg)
115  MPSend(s, MPCode::kExecFuncWithArg, fNProcessed);
116  ++fNProcessed;
117  } else //whatever the task is, we are done
118  MPSend(s, MPCode::kShutdownOrder);
119 }
120 
121 
122 //////////////////////////////////////////////////////////////////////////
123 /// Reply to a worker who is idle.
124 /// If another argument to process exists, tell the worker. Otherwise
125 /// ask for a result
126 void TProcessExecutor::ReplyToIdle(TSocket *s)
127 {
128  if (fNProcessed < fNToProcess) {
129  //we are executing a "greedy worker" task
130  if (fTaskType == ETask::kMapRedWithArg)
131  MPSend(s, MPCode::kExecFuncWithArg, fNProcessed);
132  else if (fTaskType == ETask::kMapRed)
133  MPSend(s, MPCode::kExecFunc);
134  ++fNProcessed;
135  } else
136  MPSend(s, MPCode::kSendResult);
137 }
138 
139 } // namespace ROOT