89 TTreeProcessorMP::TTreeProcessorMP(UInt_t nWorkers) : TMPClient(nWorkers)
96 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, TEntryList &entries, ULong64_t nToProcess,
102 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
108 UInt_t nWorkers = GetNWorkers();
109 selector.Begin(
nullptr);
112 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
114 TMPWorkerTreeSel worker(selector, &tree, elist, nWorkers, nToProcess / nWorkers, jFirst);
115 bool ok = Fork(worker);
117 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
122 fTaskType = ETask::kProcByRange;
125 fNToProcess = nWorkers;
126 std::vector<UInt_t> args(nWorkers);
127 std::iota(args.begin(), args.end(), 0);
128 fNProcessed = Broadcast(MPCode::kProcTree, args);
129 if (fNProcessed < nWorkers)
130 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
131 " Some entries might not be processed.");
134 std::vector<TObject*> outLists;
140 PoolUtils::ReduceObjects<TObject *> redfunc;
141 auto outList =
static_cast<TList*
>(redfunc(outLists));
144 selector.ImportOutput(outList);
149 selector.Terminate();
153 fTaskType = ETask::kNoTask;
154 return selector.GetOutputList();
159 TList *TTreeProcessorMP::Process(
const std::vector<std::string> &fileNames, TSelector &selector, TEntryList &entries,
160 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
165 Warning(
"Process",
"support for generic 'first entry' (jFirst > 0) not implemented yet - ignoring");
171 UInt_t nWorkers = GetNWorkers();
172 selector.Begin(
nullptr);
175 TEntryList *elist = (entries.IsValid()) ? &entries :
nullptr;
177 TMPWorkerTreeSel worker(selector, fileNames, elist, treeName, nWorkers, nToProcess, jFirst);
178 bool ok = Fork(worker);
180 Error(
"TTreeProcessorMP::Process",
"[E][C] Could not fork. Aborting operation");
184 Int_t procByFile = gEnv->GetValue(
"MultiProc.TestProcByFile", 0);
187 if (fileNames.size() < nWorkers) {
189 fTaskType = ETask::kProcByRange;
191 fNToProcess = nWorkers*fileNames.size();
192 std::vector<UInt_t> args(nWorkers);
193 std::iota(args.begin(), args.end(), 0);
194 fNProcessed = Broadcast(MPCode::kProcRange, args);
195 if (fNProcessed < nWorkers)
196 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
197 " Some entries might not be processed");
200 fTaskType = ETask::kProcByFile;
201 fNToProcess = fileNames.size();
202 std::vector<UInt_t> args(nWorkers);
203 std::iota(args.begin(), args.end(), 0);
204 fNProcessed = Broadcast(MPCode::kProcFile, args);
205 if (fNProcessed < nWorkers)
206 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
207 " Some entries might not be processed.");
211 fTaskType = ETask::kProcByRange;
213 fNToProcess = nWorkers*fileNames.size();
214 std::vector<UInt_t> args(nWorkers);
215 std::iota(args.begin(), args.end(), 0);
216 fNProcessed = Broadcast(MPCode::kProcRange, args);
217 if (fNProcessed < nWorkers)
218 Error(
"TTreeProcessorMP::Process",
"[E][C] There was an error while sending tasks to workers."
219 " Some entries might not be processed.");
223 std::vector<TObject*> outLists;
229 PoolUtils::ReduceObjects<TObject *> redfunc;
230 auto outList =
static_cast<TList*
>(redfunc(outLists));
233 selector.ImportOutput(outList);
238 selector.Terminate();
242 fTaskType = ETask::kNoTask;
244 return selector.GetOutputList();
249 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector, TEntryList &entries,
250 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
252 std::vector<std::string> fileNames(files.GetNFiles());
254 for(
auto f : *static_cast<THashList*>(files.GetList()))
255 fileNames[count++] =
static_cast<TFileInfo*
>(f)->GetCurrentUrl()->GetUrl();
257 TList *rl = Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
263 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector, TEntryList &entries,
const std::string &treeName,
264 ULong64_t nToProcess, ULong64_t firstEntry)
266 TObjArray* filelist = files.GetListOfFiles();
267 std::vector<std::string> fileNames(filelist->GetEntries());
269 for(
auto f : *filelist)
270 fileNames[count++] = f->GetTitle();
272 return Process(fileNames, selector, entries, treeName, nToProcess, firstEntry);
277 TList *TTreeProcessorMP::Process(
const std::string &fileName, TSelector &selector, TEntryList &entries,
278 const std::string &treeName, ULong64_t nToProcess, ULong64_t firstEntry)
280 std::vector<std::string> singleFileName(1, fileName);
281 return Process(singleFileName, selector, entries, treeName, nToProcess, firstEntry);
288 TList *TTreeProcessorMP::Process(
const std::vector<std::string> &fileNames, TSelector &selector,
289 const std::string &treeName, ULong64_t nToProcess, ULong64_t jFirst)
292 return Process(fileNames, selector, noelist, treeName, nToProcess, jFirst);
295 TList *TTreeProcessorMP::Process(
const std::string &fileName, TSelector &selector,
const std::string &treeName,
296 ULong64_t nToProcess, ULong64_t jFirst)
299 return Process(fileName, selector, noelist, treeName, nToProcess, jFirst);
302 TList *TTreeProcessorMP::Process(TFileCollection &files, TSelector &selector,
const std::string &treeName,
303 ULong64_t nToProcess, ULong64_t jFirst)
306 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
309 TList *TTreeProcessorMP::Process(TChain &files, TSelector &selector,
const std::string &treeName, ULong64_t nToProcess,
313 return Process(files, selector, noelist, treeName, nToProcess, jFirst);
316 TList *TTreeProcessorMP::Process(TTree &tree, TSelector &selector, ULong64_t nToProcess, ULong64_t jFirst)
319 return Process(tree, selector, noelist, nToProcess, jFirst);
323 void TTreeProcessorMP::FixLists(std::vector<TObject*> &lists) {
326 TList *firstlist =
new TList;
327 TList *oldlist = (TList *) lists[0];
330 while ((o = nxo())) { firstlist->Add(o); }
331 oldlist->SetOwner(kFALSE);
332 lists.erase(lists.begin());
333 lists.insert(lists.begin(), firstlist);
339 void TTreeProcessorMP::Reset()
343 fTaskType = ETask::kNoTask;
350 void TTreeProcessorMP::ReplyToIdle(TSocket *s)
352 if (fNProcessed < fNToProcess) {
354 if (fTaskType == ETask::kProcByRange)
355 MPSend(s, MPCode::kProcRange, fNProcessed);
356 else if (fTaskType == ETask::kProcByFile)
357 MPSend(s, MPCode::kProcFile, fNProcessed);
360 MPSend(s, MPCode::kSendResult);