46 TMPWorkerTree::TMPWorkerTree()
47 : TMPWorker(), fFileNames(), fTreeName(), fTree(nullptr), fFile(nullptr), fEntryList(nullptr), fFirstEntry(0),
48 fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
53 TMPWorkerTree::TMPWorkerTree(
const std::vector<std::string> &fileNames, TEntryList *entries,
54 const std::string &treeName, UInt_t nWorkers, ULong64_t maxEntries, ULong64_t firstEntry)
55 : TMPWorker(nWorkers, maxEntries), fFileNames(fileNames), fTreeName(treeName), fTree(nullptr), fFile(nullptr),
56 fEntryList(entries), fFirstEntry(firstEntry), fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE),
62 TMPWorkerTree::TMPWorkerTree(TTree *tree, TEntryList *entries, UInt_t nWorkers, ULong64_t maxEntries,
64 : TMPWorker(nWorkers, maxEntries), fTree(tree), fFile(nullptr), fEntryList(entries), fFirstEntry(firstEntry),
65 fTreeCache(0), fTreeCacheIsLearning(kFALSE), fUseTreeCache(kTRUE), fCacheSize(-1)
70 TMPWorkerTree::~TMPWorkerTree()
78 void TMPWorkerTree::Setup()
80 Int_t uc = gEnv->GetValue(
"MultiProc.UseTreeCache", 1);
81 if (uc != 1) fUseTreeCache = kFALSE;
82 fCacheSize = gEnv->GetValue(
"MultiProc.CacheSize", -1);
88 void TMPWorkerTree::CloseFile()
92 if (fTree) fFile->SetCacheRead(0, fTree);
101 TFile *TMPWorkerTree::OpenFile(
const std::string& fileName)
104 TFile *fp = TFile::Open(fileName.c_str());
105 if (fp ==
nullptr || fp->IsZombie()) {
106 std::stringstream ss;
107 ss <<
"could not open file " << fileName;
108 std::string errmsg = ss.str();
109 SendError(errmsg, MPCode::kProcError);
119 TTree *TMPWorkerTree::RetrieveTree(TFile *fp)
123 TTree *tree =
nullptr;
124 if(fTreeName ==
"") {
127 if (fp->GetListOfKeys()) {
128 for(
auto k : *fp->GetListOfKeys()) {
129 TKey *key =
static_cast<TKey*
>(k);
130 if (!strcmp(key->GetClassName(),
"TTree") || !strcmp(key->GetClassName(),
"TNtuple"))
131 tree =
static_cast<TTree*
>(fp->Get(key->GetName()));
135 tree =
static_cast<TTree*
>(fp->Get(fTreeName.c_str()));
137 if (tree ==
nullptr) {
138 std::stringstream ss;
139 ss <<
"cannot find tree with name " << fTreeName <<
" in file " << fp->GetName();
140 std::string errmsg = ss.str();
141 SendError(errmsg, MPCode::kProcError);
151 void TMPWorkerTree::SetupTreeCache(TTree *tree)
154 TFile *curfile = tree->GetCurrentFile();
157 tree->SetCacheSize(fCacheSize);
158 fTreeCache = (TTreeCache *)curfile->GetCacheRead(tree);
159 if (fCacheSize < 0) fCacheSize = tree->GetCacheSize();
161 fTreeCache->UpdateBranches(tree);
162 fTreeCache->ResetCache();
163 curfile->SetCacheRead(fTreeCache, tree);
166 fTreeCacheIsLearning = fTreeCache->IsLearning();
169 Warning(
"SetupTreeCache",
"default tree does not have a file attached: corruption? Tree cache untouched");
173 tree->SetCacheSize(0);
180 void TMPWorkerTree::Init(Int_t fd, UInt_t workerN)
183 TMPWorker::Init(fd, workerN);
184 fMaxNEntries = EvalMaxEntries(fMaxNEntries);
190 ULong64_t TMPWorkerTree::EvalMaxEntries(ULong64_t maxEntries)
195 if(GetNWorker() < fNWorkers-1)
196 return maxEntries/fNWorkers;
198 return maxEntries - (fNWorkers-1)*(maxEntries/fNWorkers);
204 void TMPWorkerTree::HandleInput(MPCodeBufPair& msg)
206 UInt_t code = msg.first;
208 if (code == MPCode::kProcRange
209 || code == MPCode::kProcFile
210 || code == MPCode::kProcTree) {
213 }
else if (code == MPCode::kSendResult) {
218 std::string reply =
"S" + std::to_string(GetNWorker());
219 reply +=
": unknown code received: " + std::to_string(code);
220 MPSend(GetSocket(), MPCode::kError, reply.c_str());
229 void TMPWorkerTreeSel::SendResult()
232 fSelector.SlaveTerminate();
233 MPSend(GetSocket(), MPCode::kProcResult, fSelector.GetOutputList());
237 void TMPWorkerTreeSel::Process(UInt_t code, MPCodeBufPair &msg)
246 if (LoadTree(code, msg, start, finish, &enl, errmsg) != 0) {
252 fSelector.SlaveBegin(
nullptr);
256 fSelector.Init(fTree);
258 for (Long64_t entry = start; entry < finish; ++entry) {
259 Long64_t e = (enl) ? enl->GetEntry(entry) : entry;
260 fSelector.Process(e);
264 fProcessedEntries += finish - start;
266 MPSend(GetSocket(), MPCode::kIdling);
273 Int_t TMPWorkerTree::LoadTree(UInt_t code, MPCodeBufPair &msg, Long64_t &start, Long64_t &finish, TEntryList **enl,
284 UInt_t nProcessed = 0;
285 Bool_t setupcache =
true;
287 std::string mgroot =
"[S" + std::to_string(GetNWorker()) +
"]: ";
290 if (code == MPCode::kProcTree) {
292 mgroot +=
"MPCode::kProcTree: ";
295 if(fTree ==
nullptr) {
296 errmsg = mgroot + std::string(
"tree undefined!");
301 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
306 Long64_t nEntries = fTree->GetEntries();
307 UInt_t nBunch = nEntries / fNWorkers;
308 UInt_t rangeN = nProcessed % fNWorkers;
309 start = rangeN * nBunch;
310 if (rangeN < (fNWorkers - 1)) {
311 finish = (rangeN+1)*nBunch;
319 if (fTree->GetCurrentFile()) {
321 if ((fFile = TFile::Open(fTree->GetCurrentFile()->GetName())) && !fFile->IsZombie()) {
322 if (!(tree = (TTree *) fFile->Get(fTree->GetName()))) {
323 errmsg = mgroot + std::string(
"unable to retrieve tree from open file ") +
324 std::string(fTree->GetCurrentFile()->GetName());
331 errmsg = mgroot + std::string(
"unable to open file ") + std::string(fTree->GetCurrentFile()->GetName());
332 if (fFile && fFile->IsZombie())
delete fFile;
339 if (code == MPCode::kProcRange) {
340 mgroot +=
"MPCode::kProcRange: ";
342 nProcessed = ReadBuffer<UInt_t>(msg.second.get());
344 fileN = nProcessed / fNWorkers;
345 }
else if (code == MPCode::kProcFile) {
346 mgroot +=
"MPCode::kProcFile: ";
348 fileN = ReadBuffer<UInt_t>(msg.second.get());
350 errmsg +=
"MPCode undefined!";
355 if (fFile && strcmp(fFileNames[fileN].c_str(), fFile->GetName())) CloseFile();
357 fFile = OpenFile(fFileNames[fileN]);
358 if (fFile ==
nullptr) {
360 errmsg = mgroot + std::string(
"unable to open file ") + fFileNames[fileN];
367 tree = RetrieveTree(fFile);
368 if (tree ==
nullptr) {
370 errmsg = mgroot + std::string(
"unable to retrieve tree from open file ") + fFileNames[fileN];
375 setupcache = (tree != fTree) ?
true :
false;
381 if (code == MPCode::kProcRange) {
384 Long64_t nEntries = tree->GetEntries();
385 UInt_t nBunch = nEntries / fNWorkers;
386 if(nEntries % fNWorkers) nBunch++;
387 UInt_t rangeN = nProcessed % fNWorkers;
388 start = rangeN * nBunch;
389 if(rangeN < (fNWorkers-1))
390 finish = (rangeN+1)*nBunch;
395 finish = tree->GetEntries();
400 if (setupcache) SetupTreeCache(fTree);
403 if (fEntryList && enl) {
404 if ((*enl = fEntryList->GetEntryList(fTree->GetName(), TUrl(fFile->GetName()).GetFile()))) {
406 if (code == MPCode::kProcRange) {
409 ULong64_t nEntries = (*enl)->GetN();
410 UInt_t nBunch = nEntries / fNWorkers;
411 if (nEntries % fNWorkers) nBunch++;
412 UInt_t rangeN = nProcessed % fNWorkers;
413 start = rangeN * nBunch;
414 if (rangeN < (fNWorkers - 1))
415 finish = (rangeN + 1) * nBunch;
420 finish = (*enl)->GetN();
423 Warning(
"LoadTree",
"failed to get entry list for: %s %s", fTree->GetName(), TUrl(fFile->GetName()).GetFile());
430 if (fProcessedEntries + finish - start > fMaxNEntries)
431 finish = start + fMaxNEntries - fProcessedEntries;
433 if (gDebug > 0 && fFile)
434 Info(
"LoadTree",
"%s %d %d file: %s %lld %lld", mgroot.c_str(), nProcessed, fileN, fFile->GetName(), start,