Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TTreeProcessorMT.cxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Authors: Enric Tejedor, Enrico Guiraud CERN 05/06/2018
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2016, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class ROOT::TTreeProcessorMT
13  \ingroup Parallelism
14  \brief A class to process the entries of a TTree in parallel.
15 
16 By means of its Process method, ROOT::TTreeProcessorMT provides a way to process the
17 entries of a TTree in parallel. When invoking TTreeProcessor::Process, the user
18 passes a function whose only parameter is a TTreeReader. The function iterates
19 on a subrange of entries by using that TTreeReader.
20 
21 The implementation of ROOT::TTreeProcessorMT parallelizes the processing of the subranges,
22 each corresponding to a cluster in the TTree. This is possible thanks to the use
23 of a ROOT::TThreadedObject, so that each thread works with its own TFile and TTree
24 objects.
25 */
26 
27 #include "TROOT.h"
29 #include "ROOT/TThreadExecutor.hxx"
30 
31 using namespace ROOT;
32 
33 namespace ROOT {
34 
35 unsigned int TTreeProcessorMT::fgMaxTasksPerFilePerWorker = 24U;
36 
37 namespace Internal {
38 
39 /// A cluster of entries
40 struct EntryCluster {
41  Long64_t start;
42  Long64_t end;
43 };
44 
45 ////////////////////////////////////////////////////////////////////////////////
46 /// Construct fChain, also adding friends if needed and injecting knowledge of offsets if available.
47 void TTreeView::MakeChain(const std::string &treeName, const std::vector<std::string> &fileNames,
48  const FriendInfo &friendInfo, const std::vector<Long64_t> &nEntries,
49  const std::vector<std::vector<Long64_t>> &friendEntries)
50 {
51  const std::vector<NameAlias> &friendNames = friendInfo.fFriendNames;
52  const std::vector<std::vector<std::string>> &friendFileNames = friendInfo.fFriendFileNames;
53 
54  fChain.reset(new TChain(treeName.c_str()));
55  const auto nFiles = fileNames.size();
56  for (auto i = 0u; i < nFiles; ++i) {
57  fChain->Add(fileNames[i].c_str(), nEntries[i]);
58  }
59  fChain->ResetBit(TObject::kMustCleanup);
60 
61  fFriends.clear();
62  const auto nFriends = friendNames.size();
63  for (auto i = 0u; i < nFriends; ++i) {
64  const auto &friendName = friendNames[i];
65  const auto &name = friendName.first;
66  const auto &alias = friendName.second;
67 
68  // Build a friend chain
69  auto frChain = std::make_unique<TChain>(name.c_str());
70  const auto nFileNames = friendFileNames[i].size();
71  for (auto j = 0u; j < nFileNames; ++j)
72  frChain->Add(friendFileNames[i][j].c_str(), friendEntries[i][j]);
73 
74  // Make it friends with the main chain
75  fChain->AddFriend(frChain.get(), alias.c_str());
76  fFriends.emplace_back(std::move(frChain));
77  }
78 }
79 
80 TTreeView::TreeReaderEntryListPair
81 TTreeView::MakeReaderWithEntryList(TEntryList &globalList, Long64_t start, Long64_t end)
82 {
83  // TEntryList and SetEntriesRange do not work together (the former has precedence).
84  // We need to construct a TEntryList that contains only those entry numbers in our desired range.
85 
86  std::vector<TEntryList*> globalEntryLists;
87  auto innerLists = globalList.GetLists();
88  if (!innerLists) {
89  if (globalList.GetN()) {
90  globalEntryLists.emplace_back(&globalList);
91  }
92  } else {
93  for (auto lp : *innerLists) {
94  auto lpAsTEntryList = static_cast<TEntryList *>(lp);
95  if (lpAsTEntryList->GetN()) {
96  globalEntryLists.emplace_back(lpAsTEntryList);
97  }
98  }
99  }
100 
101  auto localList = std::make_unique<TEntryList>();
102 
103  for (auto gl : globalEntryLists) {
104  Long64_t entry = gl->GetEntry(0);
105 
106  // this may be owned by the local list
107  auto tmp_list = new TEntryList(gl->GetName(), gl->GetTitle(), gl->GetFileName(), gl->GetTreeName());
108 
109  do {
110  if (entry >= end) {
111  break;
112  } else if (entry >= start) {
113  tmp_list->Enter(entry);
114  }
115  } while ((entry = gl->Next()) >= 0);
116 
117  if (tmp_list->GetN() > 0) {
118  localList->Add(tmp_list);
119  } else {
120  delete tmp_list;
121  }
122  }
123 
124  auto reader = std::make_unique<TTreeReader>(fChain.get(), localList.get());
125  return std::make_pair(std::move(reader), std::move(localList));
126 }
127 
128 std::unique_ptr<TTreeReader> TTreeView::MakeReader(Long64_t start, Long64_t end)
129 {
130  auto reader = std::make_unique<TTreeReader>(fChain.get());
131  reader->SetEntriesRange(start, end);
132  return reader;
133 }
134 
135 //////////////////////////////////////////////////////////////////////////
136 /// Get a TTreeReader for the current tree of this view.
137 TTreeView::TreeReaderEntryListPair
138 TTreeView::GetTreeReader(Long64_t start, Long64_t end, const std::string &treeName,
139  const std::vector<std::string> &fileNames, const FriendInfo &friendInfo, TEntryList entryList,
140  const std::vector<Long64_t> &nEntries, const std::vector<std::vector<Long64_t>> &friendEntries)
141 {
142  const bool usingLocalEntries = friendInfo.fFriendNames.empty() && entryList.GetN() == 0;
143  if (fChain == nullptr || (usingLocalEntries && fileNames[0] != fChain->GetListOfFiles()->At(0)->GetTitle()))
144  MakeChain(treeName, fileNames, friendInfo, nEntries, friendEntries);
145 
146  std::unique_ptr<TTreeReader> reader;
147  std::unique_ptr<TEntryList> localList;
148  if (entryList.GetN() > 0) {
149  std::tie(reader, localList) = MakeReaderWithEntryList(entryList, start, end);
150  } else {
151  reader = MakeReader(start, end);
152  }
153 
154  // we need to return the entry list too, as it needs to be in scope as long as the reader is
155  return std::make_pair(std::move(reader), std::move(localList));
156 }
157 
158 ////////////////////////////////////////////////////////////////////////
159 /// Return a vector of cluster boundaries for the given tree and files.
160 // EntryClusters and number of entries per file
161 using ClustersAndEntries = std::pair<std::vector<std::vector<EntryCluster>>, std::vector<Long64_t>>;
162 static ClustersAndEntries MakeClusters(const std::string &treeName, const std::vector<std::string> &fileNames)
163 {
164  // Note that as a side-effect of opening all files that are going to be used in the
165  // analysis once, all necessary streamers will be loaded into memory.
166  TDirectory::TContext c;
167  const auto nFileNames = fileNames.size();
168  std::vector<std::vector<EntryCluster>> clustersPerFile;
169  std::vector<Long64_t> entriesPerFile;
170  entriesPerFile.reserve(nFileNames);
171  Long64_t offset = 0ll;
172  for (const auto &fileName : fileNames) {
173  auto fileNameC = fileName.c_str();
174  std::unique_ptr<TFile> f(TFile::Open(fileNameC)); // need TFile::Open to load plugins if need be
175  if (!f || f->IsZombie()) {
176  Error("TTreeProcessorMT::Process", "An error occurred while opening file %s: skipping it.", fileNameC);
177  clustersPerFile.emplace_back(std::vector<EntryCluster>());
178  entriesPerFile.emplace_back(0ULL);
179  continue;
180  }
181  TTree *t = nullptr; // not a leak, t will be deleted by f
182  f->GetObject(treeName.c_str(), t);
183 
184  if (!t) {
185  Error("TTreeProcessorMT::Process", "An error occurred while getting tree %s from file %s: skipping this file.",
186  treeName.c_str(), fileNameC);
187  clustersPerFile.emplace_back(std::vector<EntryCluster>());
188  entriesPerFile.emplace_back(0ULL);
189  continue;
190  }
191 
192  auto clusterIter = t->GetClusterIterator(0);
193  Long64_t start = 0ll, end = 0ll;
194  const Long64_t entries = t->GetEntries();
195  // Iterate over the clusters in the current file
196  std::vector<EntryCluster> clusters;
197  while ((start = clusterIter()) < entries) {
198  end = clusterIter.GetNextEntry();
199  // Add the current file's offset to start and end to make them (chain) global
200  clusters.emplace_back(EntryCluster{start + offset, end + offset});
201  }
202  offset += entries;
203  clustersPerFile.emplace_back(std::move(clusters));
204  entriesPerFile.emplace_back(entries);
205  }
206 
207  // Here we "fuse" together clusters if the number of clusters is to big with respect to
208  // the number of slots, otherwise we can incurr in an overhead which is so big to make
209  // the parallelisation detrimental for performance.
210  // For example, this is the case when following a merging of many small files a file
211  // contains a tree with many entries and with clusters of just a few entries.
212  // The criterion according to which we fuse clusters together is to have at most
213  // TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters per file per slot.
214  // For example: given 2 files and 16 workers, at most
215  // 16 * 2 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() clusters will be created, at most
216  // 16 * TTreeProcessorMT::GetMaxTasksPerFilePerWorker() per file.
217 
218  const auto maxTasksPerFile = TTreeProcessorMT::GetMaxTasksPerFilePerWorker() * ROOT::GetImplicitMTPoolSize();
219  std::vector<std::vector<EntryCluster>> eventRangesPerFile(clustersPerFile.size());
220  auto clustersPerFileIt = clustersPerFile.begin();
221  auto eventRangesPerFileIt = eventRangesPerFile.begin();
222  for (; clustersPerFileIt != clustersPerFile.end(); clustersPerFileIt++, eventRangesPerFileIt++) {
223  const auto clustersInThisFileSize = clustersPerFileIt->size();
224  const auto nFolds = clustersInThisFileSize / maxTasksPerFile;
225  // If the number of clusters is less than maxTasksPerFile
226  // we take the clusters as they are
227  if (nFolds == 0) {
228  std::for_each(
229  clustersPerFileIt->begin(), clustersPerFileIt->end(),
230  [&eventRangesPerFileIt](const EntryCluster &clust) { eventRangesPerFileIt->emplace_back(clust); });
231  continue;
232  }
233  // Otherwise, we have to merge clusters, distributing the reminder evenly
234  // onto the first clusters
235  auto nReminderClusters = clustersInThisFileSize % maxTasksPerFile;
236  const auto clustersInThisFile = *clustersPerFileIt;
237  for (auto i = 0ULL; i < clustersInThisFileSize; ++i) {
238  const auto start = clustersInThisFile[i].start;
239  // We lump together at least nFolds clusters, therefore
240  // we need to jump ahead of nFolds-1.
241  i += (nFolds - 1);
242  // We now add a cluster if we have some reminder left
243  if (nReminderClusters > 0) {
244  i += 1U;
245  nReminderClusters--;
246  }
247  const auto end = clustersInThisFile[i].end;
248  eventRangesPerFileIt->emplace_back(EntryCluster({start, end}));
249  }
250  }
251 
252  return std::make_pair(std::move(eventRangesPerFile), std::move(entriesPerFile));
253 }
254 
255 ////////////////////////////////////////////////////////////////////////
256 /// Return a vector containing the number of entries of each file of each friend TChain
257 static std::vector<std::vector<Long64_t>>
258 GetFriendEntries(const std::vector<std::pair<std::string, std::string>> &friendNames,
259  const std::vector<std::vector<std::string>> &friendFileNames)
260 {
261  std::vector<std::vector<Long64_t>> friendEntries;
262  const auto nFriends = friendNames.size();
263  for (auto i = 0u; i < nFriends; ++i) {
264  std::vector<Long64_t> nEntries;
265  const auto &thisFriendName = friendNames[i].first;
266  const auto &thisFriendFiles = friendFileNames[i];
267  for (const auto &fname : thisFriendFiles) {
268  std::unique_ptr<TFile> f(TFile::Open(fname.c_str()));
269  TTree *t = nullptr; // owned by TFile
270  f->GetObject(thisFriendName.c_str(), t);
271  nEntries.emplace_back(t->GetEntries());
272  }
273  friendEntries.emplace_back(std::move(nEntries));
274  }
275 
276  return friendEntries;
277 }
278 
279 ////////////////////////////////////////////////////////////////////////
280 /// Return the full path of the tree
281 static std::string GetTreeFullPath(const TTree &tree)
282 {
283  // Case 1: this is a TChain: we get the name out of the first TChainElement
284  if (0 == strcmp("TChain", tree.ClassName())) {
285  auto &chain = dynamic_cast<const TChain &>(tree);
286  auto files = chain.GetListOfFiles();
287  if (files && 0 != files->GetEntries()) {
288  return files->At(0)->GetName();
289  }
290  }
291 
292  // Case 2: this is a TTree: we get the full path of it
293  if (auto motherDir = tree.GetDirectory()) {
294  // We have 2 subcases (ROOT-9948):
295  // - 1. motherDir is a TFile
296  // - 2. motherDir is a directory
297  // If 1. we just return the name of the tree, if 2. we reconstruct the path
298  // to the file.
299  if (motherDir->InheritsFrom("TFile")) {
300  return tree.GetName();
301  }
302  std::string fullPath(motherDir->GetPath());
303  fullPath += "/";
304  fullPath += tree.GetName();
305  return fullPath;
306  }
307 
308  // We do our best and return the name of the tree
309  return tree.GetName();
310 }
311 
312 } // namespace Internal
313 } // namespace ROOT
314 
315 ////////////////////////////////////////////////////////////////////////////////
316 /// Get and store the names, aliases and file names of the friends of the tree.
317 /// \param[in] tree The main tree whose friends to
318 ///
319 /// Note that "friends of friends" and circular references in the lists of friends are not supported.
320 Internal::FriendInfo TTreeProcessorMT::GetFriendInfo(TTree &tree)
321 {
322  std::vector<Internal::NameAlias> friendNames;
323  std::vector<std::vector<std::string>> friendFileNames;
324 
325  const auto friends = tree.GetListOfFriends();
326  if (!friends)
327  return Internal::FriendInfo();
328 
329  for (auto fr : *friends) {
330  const auto frTree = static_cast<TFriendElement *>(fr)->GetTree();
331 
332  // Check if friend tree has an alias
333  const auto realName = frTree->GetName();
334  const auto alias = tree.GetFriendAlias(frTree);
335  if (alias) {
336  friendNames.emplace_back(std::make_pair(realName, std::string(alias)));
337  } else {
338  friendNames.emplace_back(std::make_pair(realName, ""));
339  }
340 
341  // Store the file names of the friend tree
342  friendFileNames.emplace_back();
343  auto &fileNames = friendFileNames.back();
344  const bool isChain = tree.IsA() == TChain::Class();
345  if (isChain) {
346  const auto frChain = static_cast<TChain *>(frTree);
347  for (auto f : *(frChain->GetListOfFiles())) {
348  fileNames.emplace_back(f->GetTitle());
349  }
350  } else {
351  const auto f = frTree->GetCurrentFile();
352  if (!f)
353  throw std::runtime_error("Friend trees with no associated file are not supported.");
354  fileNames.emplace_back(f->GetName());
355  }
356  }
357 
358  return Internal::FriendInfo{std::move(friendNames), std::move(friendFileNames)};
359 }
360 
361 ////////////////////////////////////////////////////////////////////////////////
362 /// Retrieve the name of the first TTree in the first input file, else throw.
363 std::string TTreeProcessorMT::FindTreeName()
364 {
365  std::string treeName;
366 
367  if (fFileNames.empty())
368  throw std::runtime_error("Empty list of files and no tree name provided");
369 
370  ::TDirectory::TContext ctxt(gDirectory);
371  std::unique_ptr<TFile> f(TFile::Open(fFileNames[0].c_str()));
372  TIter next(f->GetListOfKeys());
373  while (TKey *key = (TKey *)next()) {
374  const char *className = key->GetClassName();
375  if (strcmp(className, "TTree") == 0) {
376  treeName = key->GetName();
377  break;
378  }
379  }
380  if (treeName.empty())
381  throw std::runtime_error("Cannot find any tree in file " + fFileNames[0]);
382 
383  return treeName;
384 }
385 
386 ////////////////////////////////////////////////////////////////////////
387 /// Constructor based on a file name.
388 /// \param[in] filename Name of the file containing the tree to process.
389 /// \param[in] treename Name of the tree to process. If not provided,
390 /// the implementation will automatically search for a
391 /// tree in the file.
392 TTreeProcessorMT::TTreeProcessorMT(std::string_view filename, std::string_view treename)
393  : fFileNames({std::string(filename)}), fTreeName(treename.empty() ? FindTreeName() : treename), fFriendInfo()
394 {
395 }
396 
397 std::vector<std::string> CheckAndConvert(const std::vector<std::string_view> &views)
398 {
399  if (views.empty())
400  throw std::runtime_error("The provided list of file names is empty");
401 
402  std::vector<std::string> strings;
403  strings.reserve(views.size());
404  for (const auto &v : views)
405  strings.emplace_back(v);
406  return strings;
407 }
408 
409 ////////////////////////////////////////////////////////////////////////
410 /// Constructor based on a collection of file names.
411 /// \param[in] filenames Collection of the names of the files containing the tree to process.
412 /// \param[in] treename Name of the tree to process. If not provided,
413 /// the implementation will automatically search for a
414 /// tree in the collection of files.
415 TTreeProcessorMT::TTreeProcessorMT(const std::vector<std::string_view> &filenames, std::string_view treename)
416  : fFileNames(CheckAndConvert(filenames)), fTreeName(treename.empty() ? FindTreeName() : treename), fFriendInfo()
417 {
418 }
419 
420 std::vector<std::string> GetFilesFromTree(TTree &tree)
421 {
422  std::vector<std::string> filenames;
423 
424  const bool isChain = tree.IsA() == TChain::Class();
425  if (isChain) {
426  TObjArray *filelist = static_cast<TChain &>(tree).GetListOfFiles();
427  const auto nFiles = filelist->GetEntries();
428  if (nFiles == 0)
429  throw std::runtime_error("The provided chain of files is empty");
430  filenames.reserve(nFiles);
431  for (auto f : *filelist)
432  filenames.emplace_back(f->GetTitle());
433  } else {
434  TFile *f = tree.GetCurrentFile();
435  if (!f) {
436  const auto msg = "The specified TTree is not linked to any file, in-memory-only trees are not supported.";
437  throw std::runtime_error(msg);
438  }
439 
440  filenames.emplace_back(f->GetName());
441  }
442 
443  return filenames;
444 }
445 
446 ////////////////////////////////////////////////////////////////////////
447 /// Constructor based on a TTree and a TEntryList.
448 /// \param[in] tree Tree or chain of files containing the tree to process.
449 /// \param[in] entries List of entry numbers to process.
450 TTreeProcessorMT::TTreeProcessorMT(TTree &tree, const TEntryList &entries)
451  : fFileNames(GetFilesFromTree(tree)), fTreeName(ROOT::Internal::GetTreeFullPath(tree)), fEntryList(entries),
452  fFriendInfo(GetFriendInfo(tree))
453 {
454 }
455 
456 ////////////////////////////////////////////////////////////////////////
457 /// Constructor based on a TTree.
458 /// \param[in] tree Tree or chain of files containing the tree to process.
459 TTreeProcessorMT::TTreeProcessorMT(TTree &tree) : TTreeProcessorMT(tree, TEntryList()) {}
460 
461 //////////////////////////////////////////////////////////////////////////////
462 /// Process the entries of a TTree in parallel. The user-provided function
463 /// receives a TTreeReader which can be used to iterate on a subrange of
464 /// entries
465 /// ~~~{.cpp}
466 /// TTreeProcessorMT::Process([](TTreeReader& readerSubRange) {
467 /// // Select branches to read
468 /// while (readerSubRange.Next()) {
469 /// // Use content of current entry
470 /// }
471 /// });
472 /// ~~~
473 /// The user needs to be aware that each of the subranges can potentially
474 /// be processed in parallel. This means that the code of the user function
475 /// should be thread safe.
476 ///
477 /// \param[in] func User-defined function that processes a subrange of entries
478 void TTreeProcessorMT::Process(std::function<void(TTreeReader &)> func)
479 {
480  const std::vector<Internal::NameAlias> &friendNames = fFriendInfo.fFriendNames;
481  const std::vector<std::vector<std::string>> &friendFileNames = fFriendInfo.fFriendFileNames;
482 
483  // If an entry list or friend trees are present, we need to generate clusters with global entry numbers,
484  // so we do it here for all files.
485  const bool hasFriends = !friendNames.empty();
486  const bool hasEntryList = fEntryList.GetN() > 0;
487  const bool shouldRetrieveAllClusters = hasFriends || hasEntryList;
488  const auto clustersAndEntries =
489  shouldRetrieveAllClusters ? Internal::MakeClusters(fTreeName, fFileNames) : Internal::ClustersAndEntries{};
490  const auto &clusters = clustersAndEntries.first;
491  const auto &entries = clustersAndEntries.second;
492 
493  // Retrieve number of entries for each file for each friend tree
494  const auto friendEntries =
495  hasFriends ? Internal::GetFriendEntries(friendNames, friendFileNames) : std::vector<std::vector<Long64_t>>{};
496 
497  TThreadExecutor pool;
498  // Parent task, spawns tasks that process each of the entry clusters for each input file
499  using Internal::EntryCluster;
500  auto processFile = [&](std::size_t fileIdx) {
501  // theseFiles contains either all files or just the single file to process
502  const auto &theseFiles = shouldRetrieveAllClusters ? fFileNames : std::vector<std::string>({fFileNames[fileIdx]});
503  // Evaluate clusters (with local entry numbers) and number of entries for this file, if needed
504  const auto theseClustersAndEntries =
505  shouldRetrieveAllClusters ? Internal::ClustersAndEntries{} : Internal::MakeClusters(fTreeName, theseFiles);
506 
507  // All clusters for the file to process, either with global or local entry numbers
508  const auto &thisFileClusters = shouldRetrieveAllClusters ? clusters[fileIdx] : theseClustersAndEntries.first[0];
509 
510  // Either all number of entries or just the ones for this file
511  const auto &theseEntries =
512  shouldRetrieveAllClusters ? entries : std::vector<Long64_t>({theseClustersAndEntries.second[0]});
513 
514  auto processCluster = [&](const Internal::EntryCluster &c) {
515  std::unique_ptr<TTreeReader> reader;
516  std::unique_ptr<TEntryList> elist;
517  std::tie(reader, elist) = fTreeView->GetTreeReader(c.start, c.end, fTreeName, theseFiles, fFriendInfo,
518  fEntryList, theseEntries, friendEntries);
519  func(*reader);
520  };
521 
522  pool.Foreach(processCluster, thisFileClusters);
523  };
524 
525  std::vector<std::size_t> fileIdxs(fFileNames.size());
526  std::iota(fileIdxs.begin(), fileIdxs.end(), 0u);
527 
528  // Enable this IMT use case (activate its locks)
529  Internal::TParTreeProcessingRAII ptpRAII;
530 
531  pool.Foreach(processFile, fileIdxs);
532 }
533 
534 ////////////////////////////////////////////////////////////////////////
535 /// \brief Sets the maximum number of tasks created per file, per worker.
536 /// \return The maximum number of tasks created per file, per worker
537 unsigned int TTreeProcessorMT::GetMaxTasksPerFilePerWorker()
538 {
539  return fgMaxTasksPerFilePerWorker;
540 }
541 
542 ////////////////////////////////////////////////////////////////////////
543 /// \brief Sets the maximum number of tasks created per file, per worker.
544 /// \param[in] maxTasksPerFile Name of the file containing the tree to process.
545 ///
546 /// This allows to create a reasonable number of tasks even if any of the
547 /// processed files features a bad clustering, for example with a lot of
548 /// entries and just a few entries per cluster.
549 void TTreeProcessorMT::SetMaxTasksPerFilePerWorker(unsigned int maxTasksPerFile)
550 {
551  fgMaxTasksPerFilePerWorker = maxTasksPerFile;
552 }