13 #ifndef ROOT_TProcessExecutor
14 #define ROOT_TProcessExecutor
29 #include <type_traits>
35 class TProcessExecutor :
public TExecutor<TProcessExecutor>,
private TMPClient {
37 explicit TProcessExecutor(
unsigned nWorkers = 0);
38 ~TProcessExecutor() =
default;
40 TProcessExecutor(
const TProcessExecutor &) =
delete;
41 TProcessExecutor &operator=(
const TProcessExecutor &) =
delete;
44 using TExecutor<TProcessExecutor>::Map;
45 template<
class F,
class Cond = noReferenceCond<F>>
46 auto Map(F func,
unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>;
47 template<
class F,
class INTEGER,
class Cond = noReferenceCond<F, INTEGER>>
48 auto Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>;
49 template<
class F,
class T,
class Cond = noReferenceCond<F, T>>
50 auto Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>;
52 void SetNWorkers(
unsigned n) { TMPClient::SetNWorkers(n); }
53 unsigned GetNWorkers()
const {
return TMPClient::GetNWorkers(); }
55 using TExecutor<TProcessExecutor>::MapReduce;
56 template<
class F,
class R,
class Cond = noReferenceCond<F>>
57 auto MapReduce(F func,
unsigned nTimes, R redfunc) ->
typename std::result_of<F()>::type;
58 template<
class F,
class T,
class R,
class Cond = noReferenceCond<F, T>>
59 auto MapReduce(F func, std::vector<T> &args, R redfunc) ->
typename std::result_of<F(T)>::type;
61 using TExecutor<TProcessExecutor>::Reduce;
62 template<
class T,
class R> T Reduce(
const std::vector<T> &objs, R redfunc);
65 template<
class T>
void Collect(std::vector<T> &reslist);
66 template<
class T>
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
69 void ReplyToFuncResult(TSocket *s);
70 void ReplyToIdle(TSocket *s);
78 enum class ETask : unsigned char {
86 ETask fTaskType = ETask::kNoTask;
97 template<
class F,
class Cond>
98 auto TProcessExecutor::Map(F func,
unsigned nTimes) -> std::vector<typename std::result_of<F()>::type>
100 using retType = decltype(func());
103 fTaskType = ETask::kMap;
106 unsigned oldNWorkers = GetNWorkers();
107 if (nTimes < oldNWorkers)
109 TMPWorkerExecutor<F> worker(func);
110 bool ok = Fork(worker);
111 SetNWorkers(oldNWorkers);
114 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
115 return std::vector<retType>();
119 fNToProcess = nTimes;
120 std::vector<retType> reslist;
121 reslist.reserve(fNToProcess);
122 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
129 fTaskType = ETask::kNoTask;
139 template<
class F,
class T,
class Cond>
140 auto TProcessExecutor::Map(F func, std::vector<T> &args) -> std::vector<typename std::result_of<F(T)>::type>
143 using retType = decltype(func(args.front()));
146 fTaskType = ETask::kMapWithArg;
150 unsigned oldNWorkers = GetNWorkers();
151 if (args.size() < oldNWorkers)
152 SetNWorkers(args.size());
153 TMPWorkerExecutor<F, T> worker(func, args);
154 bool ok = Fork(worker);
155 SetNWorkers(oldNWorkers);
158 Error(
"TProcessExecutor::Map",
"[E][C] Could not fork. Aborting operation.");
159 return std::vector<retType>();
163 fNToProcess = args.size();
164 std::vector<retType> reslist;
165 reslist.reserve(fNToProcess);
166 std::vector<unsigned> range(fNToProcess);
167 std::iota(range.begin(), range.end(), 0);
168 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
175 fTaskType = ETask::kNoTask;
183 template<
class F,
class INTEGER,
class Cond>
184 auto TProcessExecutor::Map(F func, ROOT::TSeq<INTEGER> args) -> std::vector<typename std::result_of<F(INTEGER)>::type>
186 std::vector<INTEGER> vargs(args.size());
187 std::copy(args.begin(), args.end(), vargs.begin());
188 const auto &reslist = Map(func, vargs);
198 template<
class F,
class R,
class Cond>
199 auto TProcessExecutor::MapReduce(F func,
unsigned nTimes, R redfunc) ->
typename std::result_of<F()>::type
201 using retType = decltype(func());
204 fTaskType= ETask::kMapRed;
207 unsigned oldNWorkers = GetNWorkers();
208 if (nTimes < oldNWorkers)
210 TMPWorkerExecutor<F, void, R> worker(func, redfunc);
211 bool ok = Fork(worker);
212 SetNWorkers(oldNWorkers);
214 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
219 fNToProcess = nTimes;
220 std::vector<retType> reslist;
221 reslist.reserve(fNToProcess);
222 fNProcessed = Broadcast(MPCode::kExecFunc, fNToProcess);
229 fTaskType= ETask::kNoTask;
230 return redfunc(reslist);
239 template<
class F,
class T,
class R,
class Cond>
240 auto TProcessExecutor::MapReduce(F func, std::vector<T> &args, R redfunc) ->
typename std::result_of<F(T)>::type
243 using retType = decltype(func(args.front()));
246 fTaskType= ETask::kMapRedWithArg;
249 unsigned oldNWorkers = GetNWorkers();
250 if (args.size() < oldNWorkers)
251 SetNWorkers(args.size());
252 TMPWorkerExecutor<F, T, R> worker(func, args, redfunc);
253 bool ok = Fork(worker);
254 SetNWorkers(oldNWorkers);
256 std::cerr <<
"[E][C] Could not fork. Aborting operation\n";
257 return decltype(func(args.front()))();
261 fNToProcess = args.size();
262 std::vector<retType> reslist;
263 reslist.reserve(fNToProcess);
264 std::vector<unsigned> range(fNToProcess);
265 std::iota(range.begin(), range.end(), 0);
266 fNProcessed = Broadcast(MPCode::kExecFuncWithArg, range);
272 fTaskType= ETask::kNoTask;
273 return Reduce(reslist, redfunc);
279 template<
class T,
class R>
280 T TProcessExecutor::Reduce(
const std::vector<T> &objs, R redfunc)
283 static_assert(std::is_same<decltype(redfunc(objs)), T>::value,
"redfunc does not have the correct signature");
284 return redfunc(objs);
290 void TProcessExecutor::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
292 unsigned code = msg.first;
293 if (code == MPCode::kFuncResult) {
294 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
295 ReplyToFuncResult(s);
296 }
else if (code == MPCode::kIdling) {
298 }
else if(code == MPCode::kProcResult) {
299 if(msg.second !=
nullptr)
300 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
301 MPSend(s, MPCode::kShutdownOrder);
302 }
else if(code == MPCode::kProcError) {
303 const char *str = ReadBuffer<const char*>(msg.second.get());
304 Error(
"TProcessExecutor::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
305 "Continuing execution ignoring these entries.", str);
310 Error(
"TProcessExecutor::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
319 void TProcessExecutor::Collect(std::vector<T> &reslist)
321 TMonitor &mon = GetMonitor();
323 while (mon.GetActive() > 0) {
324 TSocket *s = mon.Select();
325 MPCodeBufPair msg = MPRecv(s);
326 if (msg.first == MPCode::kRecvError) {
327 Error(
"TProcessExecutor::Collect",
"[E][C] Lost connection to a worker");
329 }
else if (msg.first < 1000)
330 HandlePoolCode(msg, s, reslist);
332 HandleMPCode(msg, s);