12 #ifndef ROOT_TMPWorkerTree
13 #define ROOT_TMPWorkerTree
29 #include <type_traits>
32 class TMPWorkerTree :
public TMPWorker {
38 TMPWorkerTree(
const std::vector<std::string> &fileNames, TEntryList *entries,
const std::string &treeName,
39 UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
40 TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry);
41 virtual ~TMPWorkerTree();
44 TMPWorkerTree(
const TMPWorkerTree &) =
delete;
45 TMPWorkerTree &operator=(
const TMPWorkerTree &) =
delete;
50 ULong64_t EvalMaxEntries(ULong64_t maxEntries);
51 void HandleInput(MPCodeBufPair& msg);
52 void Init(
int fd, UInt_t workerN);
53 Int_t LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
55 TFile *OpenFile(
const std::string& fileName);
56 virtual void Process(UInt_t, MPCodeBufPair &) {}
57 TTree *RetrieveTree(TFile *fp);
58 virtual void SendResult() { }
60 void SetupTreeCache(TTree *tree);
62 std::vector<std::string> fFileNames;
63 std::string fTreeName;
66 TEntryList *fEntryList;
67 ULong64_t fFirstEntry;
72 TTreeCache *fTreeCache;
73 Bool_t fTreeCacheIsLearning;
79 class TMPWorkerTreeFunc :
public TMPWorkerTree {
81 TMPWorkerTreeFunc(F procFunc,
const std::vector<std::string> &fileNames, TEntryList *entries,
82 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
83 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc),
84 fReducedResult(), fCanReduce(false)
87 TMPWorkerTreeFunc(F procFunc, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
89 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fProcFunc(procFunc), fReducedResult(),
93 virtual ~TMPWorkerTreeFunc() {}
96 void Process(UInt_t code, MPCodeBufPair &msg);
100 typename std::result_of<F(std::reference_wrapper<TTreeReader>)>::type fReducedResult;
104 class TMPWorkerTreeSel :
public TMPWorkerTree {
106 TMPWorkerTreeSel(TSelector &selector,
const std::vector<std::string> &fileNames, TEntryList *entries,
107 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
108 : TMPWorkerTree(fileNames, entries, treeName, nWorkers, maxEntries, firstEntry), fSelector(selector),
112 TMPWorkerTreeSel(TSelector &selector, TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
113 ULong64_t firstEntry)
114 : TMPWorkerTree(tree, entries, nWorkers, maxEntries, firstEntry), fSelector(selector), fCallBegin(true)
117 virtual ~TMPWorkerTreeSel() {}
120 void Process(UInt_t code, MPCodeBufPair &msg);
123 TSelector &fSelector;
124 bool fCallBegin =
true;
135 template <class T, typename std::enable_if<std::is_pointer<T>::value && std::is_constructible<TObject *, T>::value &&
136 !std::is_constructible<TCollection *, T>::value>::type * =
nullptr>
137 void DetachRes(T res)
139 auto th1p =
dynamic_cast<TH1*
>(res);
140 if(th1p !=
nullptr) {
141 th1p->SetDirectory(
nullptr);
144 auto ttreep =
dynamic_cast<TTree*
>(res);
145 if(ttreep !=
nullptr) {
146 ttreep->SetDirectory(
nullptr);
149 auto tentrylist =
dynamic_cast<TEntryList*
>(res);
150 if(tentrylist !=
nullptr) {
151 tentrylist->SetDirectory(
nullptr);
154 auto teventlist =
dynamic_cast<TEventList*
>(res);
155 if(teventlist !=
nullptr) {
156 teventlist->SetDirectory(
nullptr);
163 template <class T, typename std::enable_if<std::is_pointer<T>::value &&
164 std::is_constructible<TCollection *, T>::value>::type * =
nullptr>
165 void DetachRes(T res)
170 while ((obj = nxo())) {
180 void TMPWorkerTreeFunc<F>::SendResult()
183 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
187 void TMPWorkerTreeFunc<F>::Process(UInt_t code, MPCodeBufPair &msg)
193 std::string reply, errmsg, sn =
"[S" + std::to_string(GetNWorker()) +
"]: ";
194 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
196 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
201 TTreeReader reader(fTree, enl);
203 TTreeReader::EEntryStatus status = reader.SetEntriesRange(start, finish);
204 if(status != TTreeReader::kEntryValid) {
205 reply = sn +
"could not set TTreeReader to range " + std::to_string(start) +
" " + std::to_string(finish - 1);
206 MPSend(GetSocket(), MPCode::kProcError, reply.c_str());
211 auto res = fProcFunc(reader);
217 fProcessedEntries += finish - start;
220 PoolUtils::ReduceObjects<TObject *> redfunc;
221 fReducedResult =
static_cast<decltype(fReducedResult)
>(redfunc({res, fReducedResult}));
224 fReducedResult = res;
227 if(fMaxNEntries == fProcessedEntries)
229 MPSend(GetSocket(), MPCode::kProcResult, fReducedResult);
232 MPSend(GetSocket(), MPCode::kIdling);