13 #ifndef ROOT_TTreeProcessorMP
14 #define ROOT_TTreeProcessorMP
32 #include <type_traits>
38 class TTreeProcessorMP :
private TMPClient {
40 explicit TTreeProcessorMP(
unsigned nWorkers = 0);
41 ~TTreeProcessorMP() =
default;
43 TTreeProcessorMP(
const TTreeProcessorMP &) =
delete;
44 TTreeProcessorMP &operator=(
const TTreeProcessorMP &) =
delete;
62 template<
class F>
auto Process(
const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
63 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
64 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
65 template<
class F>
auto Process(
const std::string& fileName, F procFunc, TEntryList &entries,
66 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
67 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
68 template<
class F>
auto Process(TFileCollection& collection, F procFunc, TEntryList &entries,
69 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
70 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
71 template<
class F>
auto Process(TChain& chain, F procFunc, TEntryList &entries,
72 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
73 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
74 template<
class F>
auto Process(TTree& tree, F procFunc, TEntryList &entries,
75 ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
76 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
93 template<
class F>
auto Process(
const std::vector<std::string>& fileNames, F procFunc,
94 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
95 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
96 template<
class F>
auto Process(
const std::string& fileName, F procFunc,
97 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
98 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
99 template<
class F>
auto Process(TFileCollection& files, F procFunc,
100 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
101 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
102 template<
class F>
auto Process(TChain& files, F procFunc,
103 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
104 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
105 template<
class F>
auto Process(TTree& tree, F procFunc, ULong64_t nToProcess = 0, ULong64_t jFirst = 0)
106 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
125 TList* Process(
const std::vector<std::string>& fileNames, TSelector& selector, TEntryList &entries,
126 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
127 TList* Process(
const std::string &fileName, TSelector& selector, TEntryList &entries,
128 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
129 TList* Process(TFileCollection& files, TSelector& selector, TEntryList &entries,
130 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
131 TList* Process(TChain& files, TSelector& selector, TEntryList &entries,
132 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
133 TList* Process(TTree& tree, TSelector& selector, TEntryList &entries,
134 ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
152 TList* Process(
const std::vector<std::string>& fileNames, TSelector& selector,
153 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
154 TList* Process(
const std::string &fileName, TSelector& selector,
155 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
156 TList* Process(TFileCollection& files, TSelector& selector,
157 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
158 TList* Process(TChain& files, TSelector& selector,
159 const std::string& treeName =
"", ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
160 TList* Process(TTree& tree, TSelector& selector, ULong64_t nToProcess = 0, ULong64_t jFirst = 0);
162 void SetNWorkers(
unsigned n) { TMPClient::SetNWorkers(n); }
163 unsigned GetNWorkers()
const {
return TMPClient::GetNWorkers(); }
166 template<
class T>
void Collect(std::vector<T> &reslist);
167 template<
class T>
void HandlePoolCode(MPCodeBufPair &msg, TSocket *sender, std::vector<T> &reslist);
169 void FixLists(std::vector<TObject*> &lists);
171 void ReplyToIdle(TSocket *s);
173 unsigned fNProcessed;
174 unsigned fNToProcess;
179 enum class ETask : unsigned char {
185 ETask fTaskType = ETask::kNoTask;
189 auto TTreeProcessorMP::Process(
const std::vector<std::string>& fileNames, F procFunc, TEntryList &entries,
190 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
191 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
193 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
194 static_assert(std::is_constructible<TObject*, retType>::value,
195 "procFunc must return a pointer to a class inheriting from TObject,"
196 " and must take a reference to TTreeReader as the only argument");
200 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
206 unsigned nWorkers = GetNWorkers();
209 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
211 TMPWorkerTreeFunc<F> worker(procFunc, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
212 bool ok = Fork(worker);
214 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
219 if(fileNames.size() < nWorkers) {
221 fTaskType = ETask::kProcByRange;
223 fNToProcess = nWorkers*fileNames.size();
224 std::vector<unsigned> args(nWorkers);
225 std::iota(args.begin(), args.end(), 0);
226 fNProcessed = Broadcast(MPCode::kProcRange, args);
227 if(fNProcessed < nWorkers)
228 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
231 fTaskType = ETask::kProcByFile;
232 fNToProcess = fileNames.size();
233 std::vector<unsigned> args(nWorkers);
234 std::iota(args.begin(), args.end(), 0);
235 fNProcessed = Broadcast(MPCode::kProcFile, args);
236 if(fNProcessed < nWorkers)
237 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
241 std::vector<TObject*> reslist;
245 PoolUtils::ReduceObjects<TObject *> redfunc;
246 auto res = redfunc(reslist);
250 fTaskType = ETask::kNoTask;
251 return static_cast<retType
>(res);
256 auto TTreeProcessorMP::Process(
const std::string& fileName, F procFunc, TEntryList &entries,
257 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
258 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
260 std::vector<std::string> singleFileName(1, fileName);
261 return Process(singleFileName, procFunc, entries, treeName, nToProcess, jFirst);
266 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc, TEntryList &entries,
267 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
268 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
270 std::vector<std::string> fileNames(files.GetNFiles());
272 for(
auto f : *static_cast<THashList*>(files.GetList()))
273 fileNames[count++] =
static_cast<TFileInfo*
>(f)->GetCurrentUrl()->GetUrl();
275 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
280 auto TTreeProcessorMP::Process(TChain& files, F procFunc, TEntryList &entries,
281 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
282 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
284 TObjArray* filelist = files.GetListOfFiles();
285 std::vector<std::string> fileNames(filelist->GetEntries());
287 for(
auto f : *filelist)
288 fileNames[count++] = f->GetTitle();
290 return Process(fileNames, procFunc, entries, treeName, nToProcess, jFirst);
295 auto TTreeProcessorMP::Process(TTree& tree, F procFunc, TEntryList &entries,
296 ULong64_t nToProcess, ULong64_t jFirst)
297 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
299 using retType =
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type;
300 static_assert(std::is_constructible<TObject*, retType>::value,
"procFunc must return a pointer to a class inheriting from TObject, and must take a reference to TTreeReader as the only argument");
304 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
310 unsigned nWorkers = GetNWorkers();
313 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
315 TMPWorkerTreeFunc<F> worker(procFunc, &tree, elist, nWorkers, nToProcess, jFirst);
316 bool ok = Fork(worker);
318 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation.");
323 fTaskType = ETask::kProcByRange;
326 fNToProcess = nWorkers;
327 std::vector<unsigned> args(nWorkers);
328 std::iota(args.begin(), args.end(), 0);
329 fNProcessed = Broadcast(MPCode::kProcTree, args);
330 if(fNProcessed < nWorkers)
331 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers. Some entries might not be processed.");
334 std::vector<TObject*> reslist;
338 PoolUtils::ReduceObjects<TObject *> redfunc;
339 auto res = redfunc(reslist);
343 fTaskType = ETask::kNoTask;
344 return static_cast<retType
>(res);
353 auto TTreeProcessorMP::Process(
const std::vector<std::string>& fileNames, F procFunc,
354 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
355 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
358 return Process(fileNames, procFunc, noelist, treeName, nToProcess, jFirst);
363 auto TTreeProcessorMP::Process(
const std::string& fileName, F procFunc,
364 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
365 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
368 return Process(fileName, procFunc, noelist, treeName, nToProcess, jFirst);
373 auto TTreeProcessorMP::Process(TFileCollection& files, F procFunc,
374 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
375 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
378 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
383 auto TTreeProcessorMP::Process(TChain& files, F procFunc,
384 const std::string& treeName, ULong64_t nToProcess, ULong64_t jFirst)
385 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
388 return Process(files, procFunc, noelist, treeName, nToProcess, jFirst);
393 auto TTreeProcessorMP::Process(TTree& tree, F procFunc,
394 ULong64_t nToProcess, ULong64_t jFirst)
395 ->
typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type
398 return Process(tree, procFunc, noelist, nToProcess, jFirst);
404 void TTreeProcessorMP::HandlePoolCode(MPCodeBufPair &msg, TSocket *s, std::vector<T> &reslist)
406 unsigned code = msg.first;
407 if (code == MPCode::kIdling) {
409 }
else if(code == MPCode::kProcResult) {
410 if(msg.second !=
nullptr)
411 reslist.push_back(std::move(ReadBuffer<T>(msg.second.get())));
412 MPSend(s, MPCode::kShutdownOrder);
413 }
else if(code == MPCode::kProcError) {
414 const char *str = ReadBuffer<const char*>(msg.second.get());
415 Error(
"TTreeProcessorMP::HandlePoolCode",
"[E][C] a worker encountered an error: %s\n"
416 "Continuing execution ignoring these entries.", str);
421 Error(
"TTreeProcessorMP::HandlePoolCode",
"[W][C] unknown code received from server. code=%d", code);
430 void TTreeProcessorMP::Collect(std::vector<T> &reslist)
432 TMonitor &mon = GetMonitor();
434 while (mon.GetActive() > 0) {
435 TSocket *s = mon.Select();
436 MPCodeBufPair msg = MPRecv(s);
437 if (msg.first == MPCode::kRecvError) {
438 Error(
"TTreeProcessorMP::Collect",
"[E][C] Lost connection to a worker");
440 }
else if (msg.first < 1000)
441 HandlePoolCode(msg, s, reslist);
443 HandleMPCode(msg, s);