Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TTreeCacheUnzip.cxx
Go to the documentation of this file.
1 // Authors: Rene Brun 04/06/2006
2 // Leandro Franco 10/04/2008
3 // Fabrizio Furano (CERN) Aug 2009
4 
5 /*************************************************************************
6  * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
7  * All rights reserved. *
8  * *
9  * For the licensing terms see $ROOTSYS/LICENSE. *
10  * For the list of contributors see $ROOTSYS/README/CREDITS. *
11  *************************************************************************/
12 
13 /**
14 \class TTreeCacheUnzip
15 \ingroup tree
16 
17 A TTreeCache which exploits parallelized decompression of its own content.
18 
19 */
20 
21 #include "TTreeCacheUnzip.h"
22 #include "TBranch.h"
23 #include "TChain.h"
24 #include "TEnv.h"
25 #include "TEventList.h"
26 #include "TFile.h"
27 #include "TMath.h"
28 #include "TMutex.h"
29 #include "ROOT/RMakeUnique.hxx"
30 
31 #ifdef R__USE_IMT
32 #include "ROOT/TThreadExecutor.hxx"
33 #include "ROOT/TTaskGroup.hxx"
34 #endif
35 
36 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
37 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
38 
39 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;
40 
41 // The unzip cache does not consume memory by itself, it just allocates in advance
42 // mem blocks which are then picked as they are by the baskets.
43 // Hence there is no good reason to limit it too much
44 Double_t TTreeCacheUnzip::fgRelBuffSize = .5;
45 
46 ClassImp(TTreeCacheUnzip);
47 
48 ////////////////////////////////////////////////////////////////////////////////
49 /// Clear all baskets' state arrays.
50 
51 void TTreeCacheUnzip::UnzipState::Clear(Int_t size) {
52  for (Int_t i = 0; i < size; i++) {
53  if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
54  if (fUnzipChunks) {
55  if (fUnzipChunks[i]) fUnzipChunks[i].reset();
56  }
57  if (fUnzipStatus) fUnzipStatus[i].store(0);
58  }
59 }
60 
61 ////////////////////////////////////////////////////////////////////////////////
62 
63 Bool_t TTreeCacheUnzip::UnzipState::IsUntouched(Int_t index) const {
64  return fUnzipStatus[index].load() == kUntouched;
65 }
66 
67 ////////////////////////////////////////////////////////////////////////////////
68 
69 Bool_t TTreeCacheUnzip::UnzipState::IsProgress(Int_t index) const {
70  return fUnzipStatus[index].load() == kProgress;
71 }
72 
73 ////////////////////////////////////////////////////////////////////////////////
74 
75 Bool_t TTreeCacheUnzip::UnzipState::IsFinished(Int_t index) const {
76  return fUnzipStatus[index].load() == kFinished;
77 }
78 
79 ////////////////////////////////////////////////////////////////////////////////
80 /// Check if the basket is unzipped already. We must make sure the length in
81 /// fUnzipLen is larger than 0.
82 
83 Bool_t TTreeCacheUnzip::UnzipState::IsUnzipped(Int_t index) const {
84  return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
85 }
86 
87 ////////////////////////////////////////////////////////////////////////////////
88 /// Reset all baskets' state arrays. This function is only called by main
89 /// thread and parallel processing from upper layers should be disabled such
90 /// as IMT in TTree::GetEntry(). Other threads should not call this function
91 /// since it is not thread-safe.
92 
93 void TTreeCacheUnzip::UnzipState::Reset(Int_t oldSize, Int_t newSize) {
94  std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
95  std::unique_ptr<char[]> *aUnzipChunks = new std::unique_ptr<char[]>[newSize];
96  std::atomic<Byte_t> *aUnzipStatus = new std::atomic<Byte_t>[newSize];
97 
98  for (Int_t i = 0; i < newSize; ++i)
99  aUnzipStatus[i].store(0);
100 
101  for (Int_t i = 0; i < oldSize; i++) {
102  aUnzipLen[i] = fUnzipLen[i];
103  aUnzipChunks[i] = std::move(fUnzipChunks[i]);
104  aUnzipStatus[i].store(fUnzipStatus[i].load());
105  }
106 
107  if (fUnzipChunks) delete [] fUnzipChunks;
108  if (fUnzipStatus) delete [] fUnzipStatus;
109 
110  fUnzipLen = aUnzipLen;
111  fUnzipChunks = aUnzipChunks;
112  fUnzipStatus = aUnzipStatus;
113 }
114 
115 ////////////////////////////////////////////////////////////////////////////////
116 /// Set cache as finished.
117 /// There are three scenarios that a basket is set as finished:
118 /// 1. The basket has already been unzipped.
119 /// 2. The thread is aborted from unzipping process.
120 /// 3. To avoid other tasks/threads work on this basket,
121 /// main thread marks the basket as finished and designates itself
122 /// to unzip this basket.
123 
124 void TTreeCacheUnzip::UnzipState::SetFinished(Int_t index) {
125  fUnzipLen[index] = 0;
126  fUnzipChunks[index].reset();
127  fUnzipStatus[index].store((Byte_t)kFinished);
128 }
129 
130 ////////////////////////////////////////////////////////////////////////////////
131 
132 void TTreeCacheUnzip::UnzipState::SetMissed(Int_t index) {
133  fUnzipChunks[index].reset();
134  fUnzipStatus[index].store((Byte_t)kFinished);
135 }
136 
137 ////////////////////////////////////////////////////////////////////////////////
138 
139 void TTreeCacheUnzip::UnzipState::SetUnzipped(Int_t index, char* buf, Int_t len) {
140  // Update status array at the very end because we need to be synchronous with the main thread.
141  fUnzipLen[index] = len;
142  fUnzipChunks[index].reset(buf);
143  fUnzipStatus[index].store((Byte_t)kFinished);
144 }
145 
146 ////////////////////////////////////////////////////////////////////////////////
147 /// Start unzipping the basket if it is untouched yet.
148 
149 Bool_t TTreeCacheUnzip::UnzipState::TryUnzipping(Int_t index) {
150  Byte_t oldValue = kUntouched;
151  Byte_t newValue = kProgress;
152  return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
153 }
154 
155 ////////////////////////////////////////////////////////////////////////////////
156 
157 TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
158  fAsyncReading(kFALSE),
159  fEmpty(kTRUE),
160  fCycle(0),
161  fNseekMax(0),
162  fUnzipGroupSize(0),
163  fUnzipBufferSize(0),
164  fNFound(0),
165  fNMissed(0),
166  fNStalls(0),
167  fNUnzip(0)
168 {
169  // Default Constructor.
170  Init();
171 }
172 
173 ////////////////////////////////////////////////////////////////////////////////
174 /// Constructor.
175 
176 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
177  fAsyncReading(kFALSE),
178  fEmpty(kTRUE),
179  fCycle(0),
180  fNseekMax(0),
181  fUnzipGroupSize(0),
182  fUnzipBufferSize(0),
183  fNFound(0),
184  fNMissed(0),
185  fNStalls(0),
186  fNUnzip(0)
187 {
188  Init();
189 }
190 
191 ////////////////////////////////////////////////////////////////////////////////
192 /// Initialization procedure common to all the constructors.
193 
194 void TTreeCacheUnzip::Init()
195 {
196 #ifdef R__USE_IMT
197  fUnzipTaskGroup.reset();
198 #endif
199  fIOMutex = std::make_unique<TMutex>(kTRUE);
200 
201  fCompBuffer = new char[16384];
202  fCompBufferSize = 16384;
203 
204  fUnzipGroupSize = 102400; // Each task unzips at least 100 KB
205 
206  if (fgParallel == kDisable) {
207  fParallel = kFALSE;
208  }
209  else if(fgParallel == kEnable || fgParallel == kForce) {
210  fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
211 
212  if(gDebug > 0)
213  Info("TTreeCacheUnzip", "Enabling Parallel Unzipping");
214 
215  fParallel = kTRUE;
216 
217  }
218  else {
219  Warning("TTreeCacheUnzip", "Parallel Option unknown");
220  }
221 
222  // Check if asynchronous reading is supported by this TFile specialization
223  if (gEnv->GetValue("TFile.AsyncReading", 1)) {
224  if (fFile && !(fFile->ReadBufferAsync(0, 0)))
225  fAsyncReading = kTRUE;
226  }
227 
228 }
229 
230 ////////////////////////////////////////////////////////////////////////////////
231 /// Destructor. (in general called by the TFile destructor)
232 
233 TTreeCacheUnzip::~TTreeCacheUnzip()
234 {
235  ResetCache();
236  fUnzipState.Clear(fNseekMax);
237 }
238 
239 ////////////////////////////////////////////////////////////////////////////////
240 /// Add a branch to the list of branches to be stored in the cache
241 /// this function is called by TBranch::GetBasket
242 /// Returns:
243 /// - 0 branch added or already included
244 /// - -1 on error
245 
246 Int_t TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches /*= kFALSE*/)
247 {
248  return TTreeCache::AddBranch(b, subbranches);
249 }
250 
251 ////////////////////////////////////////////////////////////////////////////////
252 /// Add a branch to the list of branches to be stored in the cache
253 /// this function is called by TBranch::GetBasket
254 /// Returns:
255 /// - 0 branch added or already included
256 /// - -1 on error
257 
258 Int_t TTreeCacheUnzip::AddBranch(const char *branch, Bool_t subbranches /*= kFALSE*/)
259 {
260  return TTreeCache::AddBranch(branch, subbranches);
261 }
262 
263 ////////////////////////////////////////////////////////////////////////////////
264 
265 Bool_t TTreeCacheUnzip::FillBuffer()
266 {
267 
268  if (fNbranches <= 0) return kFALSE;
269 
270  // Fill the cache buffer with the branches in the cache.
271  fIsTransferred = kFALSE;
272 
273  TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
274  Long64_t entry = tree->GetReadEntry();
275 
276  // If the entry is in the range we previously prefetched, there is
277  // no point in retrying. Note that this will also return false
278  // during the training phase (fEntryNext is then set intentional to
279  // the end of the training phase).
280  if (fEntryCurrent <= entry && entry < fEntryNext) return kFALSE;
281 
282  // Triggered by the user, not the learning phase
283  if (entry == -1) entry = 0;
284 
285  TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
286  fEntryCurrent = clusterIter();
287  fEntryNext = clusterIter.GetNextEntry();
288 
289  if (fEntryCurrent < fEntryMin) fEntryCurrent = fEntryMin;
290  if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
291  if (fEntryNext > fEntryMax) fEntryNext = fEntryMax;
292 
293  // Check if owner has a TEventList set. If yes we optimize for this
294  // Special case reading only the baskets containing entries in the
295  // list.
296  TEventList *elist = fTree->GetEventList();
297  Long64_t chainOffset = 0;
298  if (elist) {
299  if (fTree->IsA() ==TChain::Class()) {
300  TChain *chain = (TChain*)fTree;
301  Int_t t = chain->GetTreeNumber();
302  chainOffset = chain->GetTreeOffset()[t];
303  }
304  }
305 
306  //clear cache buffer
307  TFileCacheRead::Prefetch(0,0);
308 
309  //store baskets
310  for (Int_t i = 0; i < fNbranches; i++) {
311  TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
312  if (b->GetDirectory() == 0) continue;
313  if (b->GetDirectory()->GetFile() != fFile) continue;
314  Int_t nb = b->GetMaxBaskets();
315  Int_t *lbaskets = b->GetBasketBytes();
316  Long64_t *entries = b->GetBasketEntry();
317  if (!lbaskets || !entries) continue;
318  //we have found the branch. We now register all its baskets
319  //from the requested offset to the basket below fEntrymax
320  Int_t blistsize = b->GetListOfBaskets()->GetSize();
321  for (Int_t j=0;j<nb;j++) {
322  // This basket has already been read, skip it
323  if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j)) continue;
324 
325  Long64_t pos = b->GetBasketSeek(j);
326  Int_t len = lbaskets[j];
327  if (pos <= 0 || len <= 0) continue;
328  //important: do not try to read fEntryNext, otherwise you jump to the next autoflush
329  if (entries[j] >= fEntryNext) continue;
330  if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry)) continue;
331  if (elist) {
332  Long64_t emax = fEntryMax;
333  if (j < nb - 1) emax = entries[j+1] - 1;
334  if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset)) continue;
335  }
336  fNReadPref++;
337 
338  TFileCacheRead::Prefetch(pos, len);
339  }
340  if (gDebug > 0) printf("Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
341  }
342 
343  // Now fix the size of the status arrays
344  ResetCache();
345  fIsLearning = kFALSE;
346 
347  return kTRUE;
348 }
349 
350 ////////////////////////////////////////////////////////////////////////////////
351 /// Change the underlying buffer size of the cache.
352 /// Returns:
353 /// - 0 if the buffer content is still available
354 /// - 1 if some or all of the buffer content has been made unavailable
355 /// - -1 on error
356 
357 Int_t TTreeCacheUnzip::SetBufferSize(Int_t buffersize)
358 {
359  Int_t res = TTreeCache::SetBufferSize(buffersize);
360  if (res < 0) {
361  return res;
362  }
363  fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
364  ResetCache();
365  return 1;
366 }
367 
368 ////////////////////////////////////////////////////////////////////////////////
369 /// Set the minimum and maximum entry number to be processed
370 /// this information helps to optimize the number of baskets to read
371 /// when prefetching the branch buffers.
372 
373 void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
374 {
375  TTreeCache::SetEntryRange(emin, emax);
376 }
377 
378 ////////////////////////////////////////////////////////////////////////////////
379 /// It's the same as TTreeCache::StopLearningPhase but we guarantee that
380 /// we start the unzipping just after getting the buffers
381 
382 void TTreeCacheUnzip::StopLearningPhase()
383 {
384  TTreeCache::StopLearningPhase();
385 }
386 
387 ////////////////////////////////////////////////////////////////////////////////
388 ///update pointer to current Tree and recompute pointers to the branches in the cache
389 
390 void TTreeCacheUnzip::UpdateBranches(TTree *tree)
391 {
392  TTreeCache::UpdateBranches(tree);
393 }
394 
395 ////////////////////////////////////////////////////////////////////////////////
396 // //
397 // From now on we have the methods concerning the threading part of the cache //
398 // //
399 ////////////////////////////////////////////////////////////////////////////////
400 
401 ////////////////////////////////////////////////////////////////////////////////
402 /// Static function that returns the parallel option
403 /// (to indicate an additional thread)
404 
405 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
406 {
407  return fgParallel;
408 }
409 
410 ////////////////////////////////////////////////////////////////////////////////
411 /// Static function that tells wether the multithreading unzipping is activated
412 
413 Bool_t TTreeCacheUnzip::IsParallelUnzip()
414 {
415  if (fgParallel == kEnable || fgParallel == kForce)
416  return kTRUE;
417 
418  return kFALSE;
419 }
420 
421 ////////////////////////////////////////////////////////////////////////////////
422 /// Static function that (de)activates multithreading unzipping
423 ///
424 /// The possible options are:
425 /// - kEnable _Enable_ it, which causes an automatic detection and launches the
426 /// additional thread if the number of cores in the machine is greater than
427 /// one
428 /// - kDisable _Disable_ will not activate the additional thread.
429 /// - kForce _Force_ will start the additional thread even if there is only one
430 /// core. the default will be taken as kEnable.
431 ///
432 /// Returns 0 if there was an error, 1 otherwise.
433 
434 Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
435 {
436  if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
437  fgParallel = option;
438  return 1;
439  }
440  return 0;
441 }
442 
443 ////////////////////////////////////////////////////////////////////////////////
444 // //
445 // From now on we have the methods concerning the unzipping part of the cache //
446 // //
447 ////////////////////////////////////////////////////////////////////////////////
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Read the logical record header from the buffer buf.
451 /// That must be the pointer tho the header part not the object by itself and
452 /// must contain data of at least maxbytes
453 /// Returns nread;
454 ///
455 /// In output arguments:
456 ///
457 /// - nbytes : number of bytes in record
458 /// if negative, this is a deleted record
459 /// if 0, cannot read record, wrong value of argument first
460 /// - objlen : uncompressed object size
461 /// - keylen : length of logical record header
462 ///
463 /// Note that the arguments objlen and keylen are returned only
464 /// if maxbytes >=16
465 /// Note: This was adapted from TFile... so some things dont apply
466 
467 Int_t TTreeCacheUnzip::GetRecordHeader(char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
468 {
469  Version_t versionkey;
470  Short_t klen;
471  UInt_t datime;
472  Int_t nb = 0, olen;
473  Int_t nread = maxbytes;
474  frombuf(buf, &nb);
475  nbytes = nb;
476  if (nb < 0) return nread;
477  // const Int_t headerSize = Int_t(sizeof(nb) +sizeof(versionkey) +sizeof(olen) +sizeof(datime) +sizeof(klen));
478  const Int_t headerSize = 16;
479  if (nread < headerSize) return nread;
480  frombuf(buf, &versionkey);
481  frombuf(buf, &olen);
482  frombuf(buf, &datime);
483  frombuf(buf, &klen);
484  if (!olen) olen = nbytes - klen;
485  objlen = olen;
486  keylen = klen;
487  return nread;
488 }
489 
490 ////////////////////////////////////////////////////////////////////////////////
491 /// This will delete the list of buffers that are in the unzipping cache
492 /// and will reset certain values in the cache.
493 /// This name is ambiguos because the method doesn't reset the whole cache,
494 /// only the part related to the unzipping
495 /// Note: This method is completely different from TTreeCache::ResetCache(),
496 /// in that method we were cleaning the prefetching buffer while here we
497 /// delete the information about the unzipped buffers
498 
499 void TTreeCacheUnzip::ResetCache()
500 {
501  // Reset all the lists and wipe all the chunks
502  fCycle++;
503  fUnzipState.Clear(fNseekMax);
504 
505  if(fNseekMax < fNseek){
506  if (gDebug > 0)
507  Info("ResetCache", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
508 
509  fUnzipState.Reset(fNseekMax, fNseek);
510  fNseekMax = fNseek;
511  }
512  fEmpty = kTRUE;
513 }
514 
515 ////////////////////////////////////////////////////////////////////////////////
516 /// This inflates a basket in the cache.. passing the data to a new
517 /// buffer that will only wait there to be read...
518 /// This function is responsible to update corresponding elements in
519 /// fUnzipStatus, fUnzipChunks and fUnzipLen. Since we use atomic variables
520 /// in fUnzipStatus to exclusively unzip the basket, we must update
521 /// fUnzipStatus after fUnzipChunks and fUnzipLen and make sure fUnzipChunks
522 /// and fUnzipLen are ready before main thread fetch the data.
523 
524 Int_t TTreeCacheUnzip::UnzipCache(Int_t index)
525 {
526  Int_t myCycle;
527  const Int_t hlen = 128;
528  Int_t objlen = 0, keylen = 0;
529  Int_t nbytes = 0;
530  Int_t readbuf = 0;
531 
532  Long64_t rdoffs = 0;
533  Int_t rdlen = 0;
534 
535  // To synchronize with the 'paging'
536  myCycle = fCycle;
537  rdoffs = fSeek[index];
538  rdlen = fSeekLen[index];
539 
540  Int_t loc = -1;
541  if (!fNseek || fIsLearning) {
542  return 1;
543  }
544 
545  if ((myCycle != fCycle) || !fIsTransferred) {
546  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
547  return 1;
548  }
549 
550  // Prepare a memory buffer of adequate size
551  char* locbuff = 0;
552  if (rdlen > 16384) {
553  locbuff = new char[rdlen];
554  } else if (rdlen * 3 < 16384) {
555  locbuff = new char[rdlen * 2];
556  } else {
557  locbuff = new char[16384];
558  }
559 
560  readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
561 
562  if (readbuf <= 0) {
563  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
564  if (locbuff) delete [] locbuff;
565  return -1;
566  }
567 
568  GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
569 
570  Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
571  // If the single unzipped chunk is really too big, reset it to not processable
572  // I.e. mark it as done but set the pointer to 0
573  // This block will be unzipped synchronously in the main thread
574  // TODO: ROOT internally breaks zipped buffers into 16MB blocks, we can probably still unzip in parallel.
575  if (len > 4 * fUnzipBufferSize) {
576  if (gDebug > 0)
577  Info("UnzipCache", "Block %d is too big, skipping.", index);
578 
579  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
580  if (locbuff) delete [] locbuff;
581  return 0;
582  }
583 
584  // Unzip it into a new blk
585  char *ptr = 0;
586  Int_t loclen = UnzipBuffer(&ptr, locbuff);
587  if ((loclen > 0) && (loclen == objlen + keylen)) {
588  if ((myCycle != fCycle) || !fIsTransferred) {
589  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
590  if (locbuff) delete [] locbuff;
591  return 1;
592  }
593  fUnzipState.SetUnzipped(index, ptr, loclen); // Set it as done
594  fNUnzip++;
595  } else {
596  fUnzipState.SetFinished(index); // Set it as not done, main thread will take charge
597  }
598 
599  if (locbuff) delete [] locbuff;
600  return 0;
601 }
602 
603 #ifdef R__USE_IMT
604 ////////////////////////////////////////////////////////////////////////////////
605 /// We create a TTaskGroup and asynchronously maps each group of baskets(> 100 kB in total)
606 /// to a task. In TTaskGroup, we use TThreadExecutor to do the actually work of unzipping
607 /// a group of basket. The purpose of creating TTaskGroup is to avoid competing with main thread.
608 
609 Int_t TTreeCacheUnzip::CreateTasks()
610 {
611  auto mapFunction = [&]() {
612  auto unzipFunction = [&](const std::vector<Int_t> &indices) {
613  // If cache is invalidated and we should return immediately.
614  if (!fIsTransferred) return nullptr;
615 
616  for (auto ii : indices) {
617  if(fUnzipState.TryUnzipping(ii)) {
618  Int_t res = UnzipCache(ii);
619  if(res)
620  if (gDebug > 0)
621  Info("UnzipCache", "Unzipping failed or cache is in learning state");
622  }
623  }
624  return nullptr;
625  };
626 
627  Int_t accusz = 0;
628  std::vector<std::vector<Int_t>> basketIndices;
629  std::vector<Int_t> indices;
630  if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
631  for (Int_t i = 0; i < fNseek; i++) {
632  while (accusz < fUnzipGroupSize) {
633  accusz += fSeekLen[i];
634  indices.push_back(i);
635  i++;
636  if (i >= fNseek) break;
637  }
638  if (i < fNseek) i--;
639  basketIndices.push_back(indices);
640  indices.clear();
641  accusz = 0;
642  }
643  ROOT::TThreadExecutor pool;
644  pool.Foreach(unzipFunction, basketIndices);
645  };
646 
647  fUnzipTaskGroup.reset(new ROOT::Experimental::TTaskGroup());
648  fUnzipTaskGroup->Run(mapFunction);
649 
650  return 0;
651 }
652 #endif
653 
654 ////////////////////////////////////////////////////////////////////////////////
655 /// We try to read a buffer that has already been unzipped
656 /// Returns -1 in case of read failure, 0 in case it's not in the
657 /// cache and n>0 in case read from cache (number of bytes copied).
658 /// pos and len are the original values as were passed to ReadBuffer
659 /// but instead we will return the inflated buffer.
660 /// Note!! : If *buf == 0 we will allocate the buffer and it will be the
661 /// responsability of the caller to free it... it is useful for example
662 /// to pass it to the creator of TBuffer
663 
664 Int_t TTreeCacheUnzip::GetUnzipBuffer(char **buf, Long64_t pos, Int_t len, Bool_t *free)
665 {
666  Int_t res = 0;
667  Int_t loc = -1;
668 
669  // We go straight to TTreeCache/TfileCacheRead, in order to get the info we need
670  // pointer to the original zipped chunk
671  // its index in the original unsorted offsets lists
672  //
673  // Actually there are situations in which copying the buffer is not
674  // useful. But the choice is among doing once more a small memcpy or a binary search in a large array. I prefer the former.
675  // Also, here we prefer not to trigger the (re)population of the chunks in the TFileCacheRead. That is
676  // better to be done in the main thread.
677 
678  Int_t myCycle = fCycle;
679 
680  if (fParallel && !fIsLearning) {
681 
682  if(fNseekMax < fNseek){
683  if (gDebug > 0)
684  Info("GetUnzipBuffer", "Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
685 
686  fUnzipState.Reset(fNseekMax, fNseek);
687  fNseekMax = fNseek;
688  }
689 
690  loc = (Int_t)TMath::BinarySearch(fNseek, fSeekSort, pos);
691  if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
692 
693  // The buffer is, at minimum, in the file cache. We must know its index in the requests list
694  // In order to get its info
695  Int_t seekidx = fSeekIndex[loc];
696 
697  do {
698 
699  // If the block is ready we get it immediately.
700  // And also we don't have to alloc the blks. This is supposed to be
701  // the main thread of the app.
702  if (fUnzipState.IsUnzipped(seekidx)) {
703  if(!(*buf)) {
704  *buf = fUnzipState.fUnzipChunks[seekidx].get();
705  fUnzipState.fUnzipChunks[seekidx].release();
706  *free = kTRUE;
707  } else {
708  memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
709  fUnzipState.fUnzipChunks[seekidx].reset();
710  *free = kFALSE;
711  }
712 
713  fNFound++;
714  return fUnzipState.fUnzipLen[seekidx];
715  }
716 
717  // If the requested basket is being unzipped by a background task, we try to steal a blk to unzip.
718  Int_t reqi = -1;
719 
720  if (fUnzipState.IsProgress(seekidx)) {
721  if (fEmpty) {
722  for (Int_t ii = 0; ii < fNseek; ++ii) {
723  Int_t idx = (seekidx + 1 + ii) % fNseek;
724  if (fUnzipState.IsUntouched(idx)) {
725  if(fUnzipState.TryUnzipping(idx)) {
726  reqi = idx;
727  break;
728  }
729  }
730  }
731  if (reqi < 0) {
732  fEmpty = kFALSE;
733  } else {
734  UnzipCache(reqi);
735  }
736  }
737 
738  if ( myCycle != fCycle ) {
739  if (gDebug > 0)
740  Info("GetUnzipBuffer", "Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
741  fNseek, fIsLearning);
742 
743  seekidx = -1;
744  break;
745  }
746  }
747 
748  } while (fUnzipState.IsProgress(seekidx));
749 
750  // Here the block is not pending. It could be done or aborted or not yet being processed.
751  if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
752  if(!(*buf)) {
753  *buf = fUnzipState.fUnzipChunks[seekidx].get();
754  fUnzipState.fUnzipChunks[seekidx].release();
755  *free = kTRUE;
756  } else {
757  memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
758  fUnzipState.fUnzipChunks[seekidx].reset();
759  *free = kFALSE;
760  }
761 
762  fNStalls++;
763  return fUnzipState.fUnzipLen[seekidx];
764  } else {
765  // This is a complete miss. We want to avoid the background tasks
766  // to try unzipping this block in the future.
767  fUnzipState.SetMissed(seekidx);
768  }
769  } else {
770  loc = -1;
771  fIsTransferred = kFALSE;
772  }
773  }
774 
775  if (len > fCompBufferSize) {
776  if(fCompBuffer) delete [] fCompBuffer;
777  fCompBuffer = new char[len];
778  fCompBufferSize = len;
779  } else {
780  if (fCompBufferSize > len * 4) {
781  if(fCompBuffer) delete [] fCompBuffer;
782  fCompBuffer = new char[len*2];
783  fCompBufferSize = len * 2;
784  }
785  }
786 
787  res = 0;
788  if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
789  // Cache is invalidated and we need to wait for all unzipping tasks to befinished before fill new baskets in cache.
790 #ifdef R__USE_IMT
791  if(ROOT::IsImplicitMTEnabled() && fUnzipTaskGroup) {
792  fUnzipTaskGroup->Cancel();
793  fUnzipTaskGroup.reset();
794  }
795 #endif
796  {
797  // Fill new baskets into cache.
798  R__LOCKGUARD(fIOMutex.get());
799  fFile->Seek(pos);
800  res = fFile->ReadBuffer(fCompBuffer, len);
801  } // end of lock scope
802 #ifdef R__USE_IMT
803  if(ROOT::IsImplicitMTEnabled()) {
804  CreateTasks();
805  }
806 #endif
807  }
808 
809  if (res) res = -1;
810 
811  if (!res) {
812  res = UnzipBuffer(buf, fCompBuffer);
813  *free = kTRUE;
814  }
815 
816  if (!fIsLearning) {
817  fNMissed++;
818  }
819 
820  return res;
821 }
822 
823 ////////////////////////////////////////////////////////////////////////////////
824 /// static function: Sets the unzip relatibe buffer size
825 
826 void TTreeCacheUnzip::SetUnzipRelBufferSize(Float_t relbufferSize)
827 {
828  fgRelBuffSize = relbufferSize;
829 }
830 
831 ////////////////////////////////////////////////////////////////////////////////
832 /// Sets the size for the unzipping cache... by default it should be
833 /// two times the size of the prefetching cache
834 
835 void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
836 {
837  fUnzipBufferSize = bufferSize;
838 }
839 
840 ////////////////////////////////////////////////////////////////////////////////
841 /// Unzips a ROOT specific buffer... by reading the header at the beginning.
842 /// returns the size of the inflated buffer or -1 if error
843 /// Note!! : If *dest == 0 we will allocate the buffer and it will be the
844 /// responsability of the caller to free it... it is useful for example
845 /// to pass it to the creator of TBuffer
846 /// src is the original buffer with the record (header+compressed data)
847 /// *dest is the inflated buffer (including the header)
848 
849 Int_t TTreeCacheUnzip::UnzipBuffer(char **dest, char *src)
850 {
851  Int_t uzlen = 0;
852  Bool_t alloc = kFALSE;
853 
854  // Here we read the header of the buffer
855  const Int_t hlen = 128;
856  Int_t nbytes = 0, objlen = 0, keylen = 0;
857  GetRecordHeader(src, hlen, nbytes, objlen, keylen);
858 
859  if (!(*dest)) {
860  /* early consistency check */
861  UChar_t *bufcur = (UChar_t *) (src + keylen);
862  Int_t nin, nbuf;
863  if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
864  Error("UnzipBuffer", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
865  uzlen = -1;
866  return uzlen;
867  }
868  Int_t l = keylen + objlen;
869  *dest = new char[l];
870  alloc = kTRUE;
871  }
872  // Must unzip the buffer
873  // fSeekPos[ind]; adress of zipped buffer
874  // fSeekLen[ind]; len of the zipped buffer
875  // &fBuffer[fSeekPos[ind]]; memory address
876 
877  // This is similar to TBasket::ReadBasketBuffers
878  Bool_t oldCase = objlen == nbytes - keylen
879  && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
880  && fFile->GetVersion() <= 30401;
881 
882  if (objlen > nbytes-keylen || oldCase) {
883 
884  // Copy the key
885  memcpy(*dest, src, keylen);
886  uzlen += keylen;
887 
888  char *objbuf = *dest + keylen;
889  UChar_t *bufcur = (UChar_t *) (src + keylen);
890  Int_t nin, nbuf;
891  Int_t nout = 0;
892  Int_t noutot = 0;
893 
894  while (1) {
895  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
896  if (hc != 0) break;
897  if (gDebug > 2)
898  Info("UnzipBuffer", " nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
899  nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
900  if (oldCase && (nin > objlen || nbuf > objlen)) {
901  if (gDebug > 2)
902  Info("UnzipBuffer", "oldcase objlen :%d ", objlen);
903 
904  //buffer was very likely not compressed in an old version
905  memcpy(*dest + keylen, src + keylen, objlen);
906  uzlen += objlen;
907  return uzlen;
908  }
909 
910  R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
911 
912  if (gDebug > 2)
913  Info("UnzipBuffer", "R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
914  nin, bufcur, nbuf, objbuf, nout);
915 
916  if (!nout) break;
917  noutot += nout;
918  if (noutot >= objlen) break;
919  bufcur += nin;
920  objbuf += nout;
921  }
922 
923  if (noutot != objlen) {
924  Error("UnzipBuffer", "nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
925  nbytes,keylen,objlen, noutot,nout,nin,nbuf);
926  uzlen = -1;
927  if(alloc) delete [] *dest;
928  *dest = 0;
929  return uzlen;
930  }
931  uzlen += objlen;
932  } else {
933  memcpy(*dest, src, keylen);
934  uzlen += keylen;
935  memcpy(*dest + keylen, src + keylen, objlen);
936  uzlen += objlen;
937  }
938  return uzlen;
939 }
940 
941 ////////////////////////////////////////////////////////////////////////////////
942 
943 void TTreeCacheUnzip::Print(Option_t* option) const {
944 
945  printf("******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
946  printf("Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
947  printf("Number of blocks unzipped by threads: %d\n", fNUnzip);
948  printf("Number of hits: %d\n", fNFound);
949  printf("Number of stalls: %d\n", fNStalls);
950  printf("Number of misses: %d\n", fNMissed);
951 
952  TTreeCache::Print(option);
953 }
954 
955 ////////////////////////////////////////////////////////////////////////////////
956 
957 Int_t TTreeCacheUnzip::ReadBufferExt(char *buf, Long64_t pos, Int_t len, Int_t &loc) {
958  R__LOCKGUARD(fIOMutex.get());
959  return TTreeCache::ReadBufferExt(buf, pos, len, loc);
960 }