12 #ifndef ROOT_TMPWorkerExecutor
13 #define ROOT_TMPWorkerExecutor
76 template<
class F,
class T =
void,
class R =
void>
77 class TMPWorkerExecutor :
public TMPWorker {
82 TMPWorkerExecutor(F func,
const std::vector<T> &args, R redfunc) :
83 TMPWorker(), fFunc(func), fArgs(args), fRedFunc(redfunc),
84 fReducedResult(), fCanReduce(false)
86 ~TMPWorkerExecutor() {}
88 void HandleInput(MPCodeBufPair &msg)
90 unsigned code = msg.first;
91 TSocket *s = GetSocket();
92 std::string reply =
"S" + std::to_string(GetNWorker());
93 if (code == MPCode::kExecFuncWithArg) {
95 msg.second->ReadUInt(n);
97 const auto &res = fFunc(fArgs[n]);
99 MPSend(s, MPCode::kIdling);
102 using FINAL = decltype(fReducedResult);
103 using ORIGINAL = decltype(fRedFunc({res, fReducedResult}));
104 fReducedResult = ROOT::Internal::PoolUtils::ResultCaster<ORIGINAL, FINAL>::CastIfNeeded(fRedFunc({res, fReducedResult}));
107 fReducedResult = res;
109 }
else if (code == MPCode::kSendResult) {
110 MPSend(s, MPCode::kFuncResult, fReducedResult);
112 reply +=
": unknown code received: " + std::to_string(code);
113 MPSend(s, MPCode::kError, reply.c_str());
119 std::vector<T> fArgs;
121 decltype(fFunc(fArgs.front())) fReducedResult;
126 template<class F, class R>
127 class TMPWorkerExecutor<F,
void, R> : public TMPWorker {
129 TMPWorkerExecutor(F func, R redfunc) :
130 TMPWorker(), fFunc(func), fRedFunc(redfunc),
131 fReducedResult(), fCanReduce(false)
133 ~TMPWorkerExecutor() {}
135 void HandleInput(MPCodeBufPair &msg)
137 unsigned code = msg.first;
138 TSocket *s = GetSocket();
139 std::string reply =
"S" + std::to_string(GetNWorker());
140 if (code == MPCode::kExecFunc) {
142 const auto &res = fFunc();
144 MPSend(s, MPCode::kIdling);
147 fReducedResult = fRedFunc({res, fReducedResult});
150 fReducedResult = res;
152 }
else if (code == MPCode::kSendResult) {
153 MPSend(s, MPCode::kFuncResult, fReducedResult);
155 reply +=
": unknown code received: " + std::to_string(code);
156 MPSend(s, MPCode::kError, reply.c_str());
163 decltype(fFunc()) fReducedResult;
167 template<class F, class T>
168 class TMPWorkerExecutor<F, T,
void> : public TMPWorker {
170 TMPWorkerExecutor(F func,
const std::vector<T> &args) : TMPWorker(), fFunc(func), fArgs(std::move(args)) {}
171 ~TMPWorkerExecutor() {}
172 void HandleInput(MPCodeBufPair &msg)
174 unsigned code = msg.first;
175 TSocket *s = GetSocket();
176 std::string reply =
"S" + std::to_string(GetNWorker());
177 if (code == MPCode::kExecFuncWithArg) {
179 msg.second->ReadUInt(n);
180 MPSend(s, MPCode::kFuncResult, fFunc(fArgs[n]));
182 reply +=
": unknown code received: " + std::to_string(code);
183 MPSend(s, MPCode::kError, reply.c_str());
189 std::vector<T> fArgs;
200 class TMPWorkerExecutor<F, void, void> :
public TMPWorker {
202 explicit TMPWorkerExecutor(F func) : TMPWorker(), fFunc(func) {}
203 ~TMPWorkerExecutor() {}
204 void HandleInput(MPCodeBufPair &msg)
206 unsigned code = msg.first;
207 TSocket *s = GetSocket();
208 std::string myId =
"S" + std::to_string(GetPid());
209 if (code == MPCode::kExecFunc) {
210 MPSend(s, MPCode::kFuncResult, fFunc());
212 MPSend(s, MPCode::kError, (myId +
": unknown code received: " + std::to_string(code)).c_str());