1 #include "RConfigure.h"
29 using namespace ROOT::Detail::RDF;
30 using namespace ROOT::Internal::RDF;
32 bool ContainsLeaf(
const std::set<TLeaf *> &leaves, TLeaf *leaf)
34 return (leaves.find(leaf) != leaves.end());
40 void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
const std::string &branchName,
41 const std::string &friendName)
44 if (!friendName.empty()) {
46 const auto friendBName = friendName +
"." + branchName;
47 if (bNamesReg.insert(friendBName).second)
48 bNames.push_back(friendBName);
51 if (bNamesReg.insert(branchName).second)
52 bNames.push_back(branchName);
57 void UpdateList(std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
const std::string &branchName,
58 const std::string &friendName, std::set<TLeaf *> &foundLeaves, TLeaf *leaf,
bool allowDuplicates)
60 const bool canAdd = allowDuplicates ?
true : !ContainsLeaf(foundLeaves, leaf);
65 UpdateList(bNamesReg, bNames, branchName, friendName);
67 foundLeaves.insert(leaf);
70 void ExploreBranch(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames, TBranch *b, std::string prefix,
71 std::string &friendName)
73 for (
auto sb : *b->GetListOfBranches()) {
74 TBranch *subBranch =
static_cast<TBranch *
>(sb);
75 auto subBranchName = std::string(subBranch->GetName());
76 auto fullName = prefix + subBranchName;
78 std::string newPrefix;
80 newPrefix = fullName +
".";
82 ExploreBranch(t, bNamesReg, bNames, subBranch, newPrefix, friendName);
84 if (t.GetBranch(fullName.c_str())) {
85 UpdateList(bNamesReg, bNames, fullName, friendName);
87 }
else if (t.GetBranch(subBranchName.c_str())) {
88 UpdateList(bNamesReg, bNames, subBranchName, friendName);
93 void GetBranchNamesImpl(TTree &t, std::set<std::string> &bNamesReg, ColumnNames_t &bNames,
94 std::set<TTree *> &analysedTrees, std::string &friendName,
bool allowDuplicates)
96 std::set<TLeaf *> foundLeaves;
97 if (!analysedTrees.insert(&t).second) {
101 const auto branches = t.GetListOfBranches();
106 std::string err(
"GetBranchNames: error in opening the tree ");
108 throw std::runtime_error(err);
111 for (
auto b : *branches) {
112 TBranch *branch =
static_cast<TBranch *
>(b);
113 const auto branchName = std::string(branch->GetName());
114 if (branch->IsA() == TBranch::Class()) {
116 auto listOfLeaves = branch->GetListOfLeaves();
117 if (listOfLeaves->GetEntries() == 1) {
118 auto leaf =
static_cast<TLeaf *
>(listOfLeaves->At(0));
119 const auto leafName = std::string(leaf->GetName());
120 if (leafName == branchName) {
121 UpdateList(bNamesReg, bNames, branchName, friendName, foundLeaves, leaf, allowDuplicates);
125 for (
auto leaf : *listOfLeaves) {
126 auto castLeaf =
static_cast<TLeaf *
>(leaf);
127 const auto leafName = std::string(leaf->GetName());
128 const auto fullName = branchName +
"." + leafName;
129 UpdateList(bNamesReg, bNames, fullName, friendName, foundLeaves, castLeaf, allowDuplicates);
131 }
else if (branch->IsA() == TBranchObject::Class()) {
133 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName);
134 UpdateList(bNamesReg, bNames, branchName, friendName);
139 bool dotIsImplied =
false;
140 auto be =
dynamic_cast<TBranchElement *
>(b);
142 throw std::runtime_error(
"GetBranchNames: unsupported branch type");
144 if (be->GetType() == 3 || be->GetType() == 4)
147 if (dotIsImplied || branchName.back() ==
'.')
148 ExploreBranch(t, bNamesReg, bNames, branch,
"", friendName);
150 ExploreBranch(t, bNamesReg, bNames, branch, branchName +
".", friendName);
152 UpdateList(bNamesReg, bNames, branchName, friendName);
157 auto friendTrees = t.GetListOfFriends();
162 for (
auto friendTreeObj : *friendTrees) {
163 auto friendTree = ((TFriendElement *)friendTreeObj)->GetTree();
166 auto alias = t.GetFriendAlias(friendTree);
167 if (alias !=
nullptr)
168 frName = std::string(alias);
170 frName = std::string(friendTree->GetName());
172 GetBranchNamesImpl(*friendTree, bNamesReg, bNames, analysedTrees, frName, allowDuplicates);
178 ColumnNames_t ROOT::Internal::RDF::GetBranchNames(TTree &t,
bool allowDuplicates)
180 std::set<std::string> bNamesSet;
181 ColumnNames_t bNames;
182 std::set<TTree *> analysedTrees;
183 std::string emptyFrName =
"";
184 GetBranchNamesImpl(t, bNamesSet, bNames, analysedTrees, emptyFrName, allowDuplicates);
189 RLoopManager::RLoopManager(TTree *tree,
const ColumnNames_t &defaultBranches)
190 : fTree(std::shared_ptr<TTree>(tree, [](TTree *) {})), fDefaultColumns(defaultBranches),
191 fNSlots(RDFInternal::GetNSlots()),
192 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kROOTFilesMT : ELoopType::kROOTFiles)
196 RLoopManager::RLoopManager(ULong64_t nEmptyEntries)
197 : fNEmptyEntries(nEmptyEntries), fNSlots(RDFInternal::GetNSlots()),
198 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kNoFilesMT : ELoopType::kNoFiles)
202 RLoopManager::RLoopManager(std::unique_ptr<RDataSource> ds,
const ColumnNames_t &defaultBranches)
203 : fDefaultColumns(defaultBranches), fNSlots(RDFInternal::GetNSlots()),
204 fLoopType(ROOT::IsImplicitMTEnabled() ? ELoopType::kDataSourceMT : ELoopType::kDataSource),
205 fDataSource(std::move(ds))
207 fDataSource->SetNSlots(fNSlots);
211 void RLoopManager::CheckIndexedFriends()
213 auto friends = fTree->GetListOfFriends();
216 for (
auto friendElObj : *friends) {
217 auto friendEl =
static_cast<TFriendElement *
>(friendElObj);
218 auto friendTree = friendEl->GetTree();
219 if (friendTree && friendTree->GetTreeIndex()) {
220 std::string err = fTree->GetName();
221 err +=
" has a friend, \"";
222 err += friendTree->GetName();
223 err +=
"\", which has an index. This is not supported.";
224 throw std::runtime_error(err);
230 void RLoopManager::RunEmptySourceMT()
233 RSlotStack slotStack(fNSlots);
236 const auto nEntriesPerSlot = fNEmptyEntries / (fNSlots * 2);
237 auto remainder = fNEmptyEntries % (fNSlots * 2);
238 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
240 while (start < fNEmptyEntries) {
241 ULong64_t end = start + nEntriesPerSlot;
246 entryRanges.emplace_back(start, end);
251 auto genFunction = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
252 auto slot = slotStack.GetSlot();
253 InitNodeSlots(
nullptr, slot);
254 for (
auto currEntry = range.first; currEntry < range.second; ++currEntry) {
255 RunAndCheckFilters(slot, currEntry);
258 slotStack.ReturnSlot(slot);
261 ROOT::TThreadExecutor pool;
262 pool.Foreach(genFunction, entryRanges);
264 #endif // not implemented otherwise
268 void RLoopManager::RunEmptySource()
270 InitNodeSlots(
nullptr, 0);
271 for (ULong64_t currEntry = 0; currEntry < fNEmptyEntries && fNStopsReceived < fNChildren; ++currEntry) {
272 RunAndCheckFilters(0, currEntry);
278 void RLoopManager::RunTreeProcessorMT()
281 CheckIndexedFriends();
282 RSlotStack slotStack(fNSlots);
283 const auto &entryList = fTree->GetEntryList() ? *fTree->GetEntryList() : TEntryList();
284 auto tp = std::make_unique<ROOT::TTreeProcessorMT>(*fTree, entryList);
286 std::atomic<ULong64_t> entryCount(0ull);
288 tp->Process([
this, &slotStack, &entryCount](TTreeReader &r) ->
void {
289 auto slot = slotStack.GetSlot();
290 InitNodeSlots(&r, slot);
291 const auto entryRange = r.GetEntriesRange();
292 const auto nEntries = entryRange.second - entryRange.first;
293 auto count = entryCount.fetch_add(nEntries);
296 RunAndCheckFilters(slot, count++);
299 slotStack.ReturnSlot(slot);
301 #endif // no-op otherwise (will not be called)
305 void RLoopManager::RunTreeReader()
307 CheckIndexedFriends();
308 TTreeReader r(fTree.get(), fTree->GetEntryList());
309 if (0 == fTree->GetEntriesFast())
311 InitNodeSlots(&r, 0);
315 while (r.Next() && fNStopsReceived < fNChildren) {
316 RunAndCheckFilters(0, r.GetCurrentEntry());
322 void RLoopManager::RunDataSource()
324 R__ASSERT(fDataSource !=
nullptr);
325 fDataSource->Initialise();
326 auto ranges = fDataSource->GetEntryRanges();
327 while (!ranges.empty()) {
328 InitNodeSlots(
nullptr, 0u);
329 fDataSource->InitSlot(0u, 0ull);
330 for (
const auto &range : ranges) {
331 auto end = range.second;
332 for (
auto entry = range.first; entry < end; ++entry) {
333 if (fDataSource->SetEntry(0u, entry)) {
334 RunAndCheckFilters(0u, entry);
339 fDataSource->FinaliseSlot(0u);
340 ranges = fDataSource->GetEntryRanges();
342 fDataSource->Finalise();
346 void RLoopManager::RunDataSourceMT()
349 R__ASSERT(fDataSource !=
nullptr);
350 RSlotStack slotStack(fNSlots);
351 ROOT::TThreadExecutor pool;
354 auto runOnRange = [
this, &slotStack](
const std::pair<ULong64_t, ULong64_t> &range) {
355 const auto slot = slotStack.GetSlot();
356 InitNodeSlots(
nullptr, slot);
357 fDataSource->InitSlot(slot, range.first);
358 const auto end = range.second;
359 for (
auto entry = range.first; entry < end; ++entry) {
360 if (fDataSource->SetEntry(slot, entry)) {
361 RunAndCheckFilters(slot, entry);
365 fDataSource->FinaliseSlot(slot);
366 slotStack.ReturnSlot(slot);
369 fDataSource->Initialise();
370 auto ranges = fDataSource->GetEntryRanges();
371 while (!ranges.empty()) {
372 pool.Foreach(runOnRange, ranges);
373 ranges = fDataSource->GetEntryRanges();
375 fDataSource->Finalise();
376 #endif // not implemented otherwise (never called)
381 void RLoopManager::RunAndCheckFilters(
unsigned int slot, Long64_t entry)
383 for (
auto &actionPtr : fBookedActions)
384 actionPtr->Run(slot, entry);
385 for (
auto &namedFilterPtr : fBookedNamedFilters)
386 namedFilterPtr->CheckFilters(slot, entry);
387 for (
auto &callback : fCallbacks)
396 void RLoopManager::InitNodeSlots(TTreeReader *r,
unsigned int slot)
398 for (
auto &ptr : fBookedActions)
399 ptr->InitSlot(r, slot);
400 for (
auto &ptr : fBookedFilters)
401 ptr->InitSlot(r, slot);
402 for (
auto &callback : fCallbacksOnce)
410 void RLoopManager::InitNodes()
412 EvalChildrenCounts();
413 for (
auto column : fCustomColumns)
415 for (
auto &filter : fBookedFilters)
417 for (
auto &range : fBookedRanges)
419 for (
auto &ptr : fBookedActions)
424 void RLoopManager::CleanUpNodes()
426 fMustRunNamedFilters =
false;
429 for (
auto &ptr : fBookedActions)
432 fRunActions.insert(fRunActions.begin(), fBookedActions.begin(), fBookedActions.end());
433 fBookedActions.clear();
438 for (
auto &ptr : fBookedFilters)
439 ptr->ResetChildrenCount();
440 for (
auto &ptr : fBookedRanges)
441 ptr->ResetChildrenCount();
444 fCallbacksOnce.clear();
448 void RLoopManager::CleanUpTask(
unsigned int slot)
450 for (
auto &ptr : fBookedActions)
451 ptr->FinalizeSlot(slot);
452 for (
auto &ptr : fBookedFilters)
453 ptr->ClearTask(slot);
458 void RLoopManager::JitDeclarations()
460 if (fToJitDeclare.empty())
463 RDFInternal::InterpreterDeclare(fToJitDeclare);
464 fToJitDeclare.clear();
469 void RLoopManager::Jit()
471 if (fToJitExec.empty())
475 RDFInternal::InterpreterCalc(fToJitExec,
"RLoopManager::Run");
485 void RLoopManager::EvalChildrenCounts()
487 for (
auto &actionPtr : fBookedActions)
488 actionPtr->TriggerChildrenCount();
489 for (
auto &namedFilterPtr : fBookedNamedFilters)
490 namedFilterPtr->TriggerChildrenCount();
493 unsigned int RLoopManager::GetNextID()
495 static unsigned int id = 0;
502 void RLoopManager::Run()
509 case ELoopType::kNoFilesMT: RunEmptySourceMT();
break;
510 case ELoopType::kROOTFilesMT: RunTreeProcessorMT();
break;
511 case ELoopType::kDataSourceMT: RunDataSourceMT();
break;
512 case ELoopType::kNoFiles: RunEmptySource();
break;
513 case ELoopType::kROOTFiles: RunTreeReader();
break;
514 case ELoopType::kDataSource: RunDataSource();
break;
521 const ColumnNames_t &RLoopManager::GetDefaultColumnNames()
const
523 return fDefaultColumns;
526 TTree *RLoopManager::GetTree()
const
531 void RLoopManager::Book(RDFInternal::RActionBase *actionPtr)
533 fBookedActions.emplace_back(actionPtr);
536 void RLoopManager::Deregister(RDFInternal::RActionBase *actionPtr)
538 RDFInternal::Erase(actionPtr, fRunActions);
539 RDFInternal::Erase(actionPtr, fBookedActions);
542 void RLoopManager::Book(RFilterBase *filterPtr)
544 fBookedFilters.emplace_back(filterPtr);
545 if (filterPtr->HasName()) {
546 fBookedNamedFilters.emplace_back(filterPtr);
547 fMustRunNamedFilters =
true;
551 void RLoopManager::Deregister(RFilterBase *filterPtr)
553 RDFInternal::Erase(filterPtr, fBookedFilters);
554 RDFInternal::Erase(filterPtr, fBookedNamedFilters);
557 void RLoopManager::Book(RRangeBase *rangePtr)
559 fBookedRanges.emplace_back(rangePtr);
562 void RLoopManager::Deregister(RRangeBase *rangePtr)
564 RDFInternal::Erase(rangePtr, fBookedRanges);
568 bool RLoopManager::CheckFilters(
unsigned int, Long64_t)
574 void RLoopManager::Report(ROOT::RDF::RCutFlowReport &rep)
const
576 for (
const auto &fPtr : fBookedNamedFilters)
577 fPtr->FillReport(rep);
580 void RLoopManager::RegisterCallback(ULong64_t everyNEvents, std::function<
void(
unsigned int)> &&f)
582 if (everyNEvents == 0ull)
583 fCallbacksOnce.emplace_back(std::move(f), fNSlots);
585 fCallbacks.emplace_back(everyNEvents, std::move(f), fNSlots);
588 std::vector<std::string> RLoopManager::GetFiltersNames()
590 std::vector<std::string> filters;
591 for (
auto &filter : fBookedFilters) {
592 auto name = (filter->HasName() ? filter->GetName() :
"Unnamed Filter");
593 filters.push_back(name);
598 std::vector<RDFInternal::RActionBase *> RLoopManager::GetAllActions()
600 std::vector<RDFInternal::RActionBase *> actions;
601 actions.insert(actions.begin(), fBookedActions.begin(), fBookedActions.end());
602 actions.insert(actions.begin(), fRunActions.begin(), fRunActions.end());
606 std::shared_ptr<ROOT::Internal::RDF::GraphDrawing::GraphNode> RLoopManager::GetGraph()
610 name = fDataSource->GetLabel();
612 name = fTree->GetName();
614 name = std::to_string(fNEmptyEntries);
617 auto thisNode = std::make_shared<ROOT::Internal::RDF::GraphDrawing::GraphNode>(name);
619 thisNode->SetCounter(0);
626 const ColumnNames_t &RLoopManager::GetBranchNames()
628 if (fValidBranchNames.empty() && fTree) {
629 fValidBranchNames = RDFInternal::GetBranchNames(*fTree,
true);
631 return fValidBranchNames;