Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
testTMPIFile.C
Go to the documentation of this file.
1 /// \file
2 /// \ingroup tutorial_io
3 /// This macro shows the usage of TMPIFile to simulate event
4 /// reconstruction and merging them in parallel.
5 /// The JetEvent class is in $ROOTSYS/tutorials/tree/JetEvent.h,cxx
6 ///
7 /// To run this macro do the following:
8 /// ~~~{.bash}
9 /// mpirun -np 4 root -b -l -q testTMPIFile.C
10 /// ~~~
11 ///
12 /// \macro_code
13 ///
14 /// \author Taylor Childers, Yunsong Wang
15 
16 #include "TMPIFile.h"
17 
18 R__LOAD_LIBRARY(RMPI) // Work around autoloading issue when ROOT modules are enabled
19 
20 #ifdef TMPI_SECOND_RUN
21 
22 #include <chrono>
23 #include <sstream>
24 
25 R__LOAD_LIBRARY(JetEvent_cxx)
26 
27 /* ---------------------------------------------------------------------------
28 
29 The idea of TMPIFile is to run N MPI ranks where some ranks are
30 producing data (called workers), while other ranks are collecting data and
31 writing it to disk (called collectors). The number of collectors can be
32 configured and this should be optimized for each workflow and data size.
33 
34 This example uses a typical event processing loop, where every N events the
35 TMPIFile::Sync() function is called. This call triggers the local TTree data
36 to be sent via MPI to the collector rank where it is merged with all the
37 other worker rank data and written to a TFile.
38 
39 An MPI Sub-Communictor is created for each collector which equally distributes
40 the remaining ranks to distribute the workers among collectors.
41 
42 --------------------------------------------------------------------------- */
43 
44 void test_tmpi()
45 {
46 
47  Int_t N_collectors = 2; // specify how many collectors to run
48  Int_t sync_rate = 2; // workers sync every sync_rate events
49  Int_t events_per_rank = 6; // total events each rank will produce then exit
50  Int_t sleep_mean = 5; // simulate compute time for event processing
51  Int_t sleep_sigma = 2; // variation in compute time
52 
53  // using JetEvent generator to create a data structure
54  // these parameters control this generator
55  Int_t jetm = 25;
56  Int_t trackm = 60;
57  Int_t hitam = 200;
58  Int_t hitbm = 100;
59 
60  std::string treename = "test_tmpi";
61  std::string branchname = "event";
62 
63  // set output filename
64  std::stringstream smpifname;
65  smpifname << "/tmp/merged_output_" << getpid() << ".root";
66 
67  // Create new TMPIFile, passing the filename, setting read/write permissions
68  // and setting the number of collectors.
69  // If MPI_INIT has not been called already, the constructor of TMPIFile
70  // will call this.
71  TMPIFile *newfile = new TMPIFile(smpifname.str().c_str(), "RECREATE", N_collectors);
72  // set random number seed that is based on MPI rank
73  // this avoids producing the same events in each MPI rank
74  gRandom->SetSeed(gRandom->GetSeed() + newfile->GetMPIGlobalRank());
75 
76  // only print log messages in MPI Rank 0
77  if (newfile->GetMPIGlobalRank() == 0) {
78  Info("test_tmpi", " running with parallel ranks: %d", newfile->GetMPIGlobalSize());
79  Info("test_tmpi", " running with collecting ranks: %d", N_collectors);
80  Info("test_tmpi", " running with working ranks: %d", (newfile->GetMPIGlobalSize() - N_collectors));
81  Info("test_tmpi", " running with sync rate: %d", sync_rate);
82  Info("test_tmpi", " running with events per rank: %d", events_per_rank);
83  Info("test_tmpi", " running with sleep mean: %d", sleep_mean);
84  Info("test_tmpi", " running with sleep sigma: %d", sleep_sigma);
85  Info("test_tmpi", " running with seed: %d", gRandom->GetSeed());
86  }
87 
88  // print filename for each collector Rank
89  if (newfile->IsCollector()) {
90  Info("Collector", "[%d]\troot output filename = %s", newfile->GetMPIGlobalRank(), smpifname.str().c_str());
91  }
92 
93  // This if statement splits the run-time functionality of
94  // workers and collectors.
95  if (newfile->IsCollector()) {
96  // Run by collector ranks
97  // This will run until all workers have exited
98  newfile->RunCollector();
99  } else {
100  // Run by worker ranks
101  // these ranks generate data to be written to TMPIFile
102 
103  // create a TTree to store event data
104  TTree *tree = new TTree(treename.c_str(), "Event example with Jets");
105  // set the AutoFlush rate to be the same as the sync_rate
106  // this synchronizes the TTree branch compression
107  tree->SetAutoFlush(sync_rate);
108 
109  // Create our fake event data generator
110  JetEvent *event = new JetEvent;
111 
112  // add our branch to the TTree
113  tree->Branch(branchname.c_str(), "JetEvent", &event, 8000, 2);
114 
115  // monitor timing
116  auto sync_start = std::chrono::high_resolution_clock::now();
117 
118  // generate the specified number of events
119  for (int i = 0; i < events_per_rank; i++) {
120 
121  auto start = std::chrono::high_resolution_clock::now();
122  // Generate one event
123  event->Build(jetm, trackm, hitam, hitbm);
124 
125  auto evt_built = std::chrono::high_resolution_clock::now();
126  double build_time = std::chrono::duration_cast<std::chrono::duration<double>>(evt_built - start).count();
127 
128  Info("Rank", "[%d] [%d]\tevt = %d;\tbuild_time = %f", newfile->GetMPIColor(), newfile->GetMPILocalRank(), i,
129  build_time);
130 
131  // if our build time was significant, subtract that from the sleep time
132  auto adjusted_sleep = (int)(sleep_mean - build_time);
133  auto sleep = abs(gRandom->Gaus(adjusted_sleep, sleep_sigma));
134 
135  // simulate the time taken by more complicated event generation
136  std::this_thread::sleep_for(std::chrono::seconds(int(sleep)));
137 
138  // Fill the tree
139  tree->Fill();
140 
141  // every sync_rate events, call the TMPIFile::Sync() function
142  // to trigger MPI collection of local data
143  if ((i + 1) % sync_rate == 0) {
144  // call TMPIFile::Sync()
145  newfile->Sync();
146 
147  auto end = std::chrono::high_resolution_clock::now();
148  double sync_time = std::chrono::duration_cast<std::chrono::duration<double>>(end - sync_start).count();
149  Info("Rank", "[%d] [%d]\tevent collection time: %f", newfile->GetMPIColor(), newfile->GetMPILocalRank(),
150  sync_time);
151  sync_start = std::chrono::high_resolution_clock::now();
152  }
153  }
154 
155  // synchronize any left over events
156  if (events_per_rank % sync_rate != 0) {
157  newfile->Sync();
158  }
159  }
160 
161  // call Close on the file for clean exit.
162  Info("Rank", "[%d] [%d]\tclosing file", newfile->GetMPIColor(), newfile->GetMPILocalRank());
163  newfile->Close();
164 
165  // open file and test contents
166  if (newfile->GetMPILocalRank() == 0) {
167  TString filename = newfile->GetMPIFilename();
168  Info("Rank", "[%d] [%d]\topening file: %s", newfile->GetMPIColor(), newfile->GetMPILocalRank(), filename.Data());
169  TFile file(filename.Data());
170  if (file.IsOpen()) {
171  file.ls();
172  TTree *tree = (TTree *)file.Get(treename.c_str());
173  if (tree)
174  tree->Print();
175 
176  Info("Rank", "[%d] [%d]\tfile should have %d events and has %lld", newfile->GetMPIColor(),
177  newfile->GetMPILocalRank(), (newfile->GetMPILocalSize() - 1) * events_per_rank, tree->GetEntries());
178  }
179  }
180 }
181 
182 void testTMPIFile(Bool_t secRun)
183 {
184  auto start = std::chrono::high_resolution_clock::now();
185 
186  test_tmpi();
187 
188  auto end = std::chrono::high_resolution_clock::now();
189  double time = std::chrono::duration_cast<std::chrono::duration<double>>(end - start).count();
190  std::string msg = "Total elapsed time: ";
191  msg += std::to_string(time);
192  Info("testTMPIFile", "%s", msg.c_str());
193  Info("testTMPIFile", "exiting");
194 }
195 
196 #else
197 
198 void testTMPIFile()
199 {
200  Int_t flag;
201  MPI_Initialized(&flag);
202  if (!flag) {
203  MPI_Init(NULL, NULL);
204  }
205 
206  // Get rank and size
207  Int_t rank, size;
208  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
209  MPI_Comm_size(MPI_COMM_WORLD, &size);
210 
211 
212  // Procecss 0 generates JetEvent library
213  if (rank == 0) {
214  TString tutdir = gROOT->GetTutorialDir();
215  gSystem->Exec("cp " + tutdir + "/tree/JetEvent* .");
216  gROOT->ProcessLine(".L JetEvent.cxx+");
217  }
218  // Wait until it's done
219  MPI_Barrier(MPI_COMM_WORLD);
220 
221  gROOT->ProcessLine("#define TMPI_SECOND_RUN yes");
222  gROOT->ProcessLine("#include \"" __FILE__ "\"");
223  gROOT->ProcessLine("testTMPIFile(true)");
224 
225  // TMPIFile will do MPI_Finalize() when closing the file
226  Int_t finalized = 0;
227  MPI_Finalized(&finalized);
228  if (!finalized) {
229  MPI_Finalize();
230  }
231 }
232 
233 #endif