36 extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout,
char *bufout, Int_t *nout);
37 extern "C" int R__unzip_header(Int_t *nin, UChar_t *bufin, Int_t *lout);
39 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::fgParallel = TTreeCacheUnzip::kDisable;
44 Double_t TTreeCacheUnzip::fgRelBuffSize = .5;
46 ClassImp(TTreeCacheUnzip);
51 void TTreeCacheUnzip::UnzipState::Clear(Int_t size) {
52 for (Int_t i = 0; i < size; i++) {
53 if (!fUnzipLen.empty()) fUnzipLen[i] = 0;
55 if (fUnzipChunks[i]) fUnzipChunks[i].reset();
57 if (fUnzipStatus) fUnzipStatus[i].store(0);
63 Bool_t TTreeCacheUnzip::UnzipState::IsUntouched(Int_t index)
const {
64 return fUnzipStatus[index].load() == kUntouched;
69 Bool_t TTreeCacheUnzip::UnzipState::IsProgress(Int_t index)
const {
70 return fUnzipStatus[index].load() == kProgress;
75 Bool_t TTreeCacheUnzip::UnzipState::IsFinished(Int_t index)
const {
76 return fUnzipStatus[index].load() == kFinished;
83 Bool_t TTreeCacheUnzip::UnzipState::IsUnzipped(Int_t index)
const {
84 return (fUnzipStatus[index].load() == kFinished) && (fUnzipChunks[index].get()) && (fUnzipLen[index] > 0);
93 void TTreeCacheUnzip::UnzipState::Reset(Int_t oldSize, Int_t newSize) {
94 std::vector<Int_t> aUnzipLen = std::vector<Int_t>(newSize, 0);
95 std::unique_ptr<char[]> *aUnzipChunks =
new std::unique_ptr<char[]>[newSize];
96 std::atomic<Byte_t> *aUnzipStatus =
new std::atomic<Byte_t>[newSize];
98 for (Int_t i = 0; i < newSize; ++i)
99 aUnzipStatus[i].store(0);
101 for (Int_t i = 0; i < oldSize; i++) {
102 aUnzipLen[i] = fUnzipLen[i];
103 aUnzipChunks[i] = std::move(fUnzipChunks[i]);
104 aUnzipStatus[i].store(fUnzipStatus[i].load());
107 if (fUnzipChunks)
delete [] fUnzipChunks;
108 if (fUnzipStatus)
delete [] fUnzipStatus;
110 fUnzipLen = aUnzipLen;
111 fUnzipChunks = aUnzipChunks;
112 fUnzipStatus = aUnzipStatus;
124 void TTreeCacheUnzip::UnzipState::SetFinished(Int_t index) {
125 fUnzipLen[index] = 0;
126 fUnzipChunks[index].reset();
127 fUnzipStatus[index].store((Byte_t)kFinished);
132 void TTreeCacheUnzip::UnzipState::SetMissed(Int_t index) {
133 fUnzipChunks[index].reset();
134 fUnzipStatus[index].store((Byte_t)kFinished);
139 void TTreeCacheUnzip::UnzipState::SetUnzipped(Int_t index,
char* buf, Int_t len) {
141 fUnzipLen[index] = len;
142 fUnzipChunks[index].reset(buf);
143 fUnzipStatus[index].store((Byte_t)kFinished);
149 Bool_t TTreeCacheUnzip::UnzipState::TryUnzipping(Int_t index) {
150 Byte_t oldValue = kUntouched;
151 Byte_t newValue = kProgress;
152 return fUnzipStatus[index].compare_exchange_weak(oldValue, newValue, std::memory_order_release, std::memory_order_relaxed);
157 TTreeCacheUnzip::TTreeCacheUnzip() : TTreeCache(),
158 fAsyncReading(kFALSE),
176 TTreeCacheUnzip::TTreeCacheUnzip(TTree *tree, Int_t buffersize) : TTreeCache(tree,buffersize),
177 fAsyncReading(kFALSE),
194 void TTreeCacheUnzip::Init()
197 fUnzipTaskGroup.reset();
199 fIOMutex = std::make_unique<TMutex>(kTRUE);
201 fCompBuffer =
new char[16384];
202 fCompBufferSize = 16384;
204 fUnzipGroupSize = 102400;
206 if (fgParallel == kDisable) {
209 else if(fgParallel == kEnable || fgParallel == kForce) {
210 fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
213 Info(
"TTreeCacheUnzip",
"Enabling Parallel Unzipping");
219 Warning(
"TTreeCacheUnzip",
"Parallel Option unknown");
223 if (gEnv->GetValue(
"TFile.AsyncReading", 1)) {
224 if (fFile && !(fFile->ReadBufferAsync(0, 0)))
225 fAsyncReading = kTRUE;
233 TTreeCacheUnzip::~TTreeCacheUnzip()
236 fUnzipState.Clear(fNseekMax);
246 Int_t TTreeCacheUnzip::AddBranch(TBranch *b, Bool_t subbranches )
248 return TTreeCache::AddBranch(b, subbranches);
258 Int_t TTreeCacheUnzip::AddBranch(
const char *branch, Bool_t subbranches )
260 return TTreeCache::AddBranch(branch, subbranches);
265 Bool_t TTreeCacheUnzip::FillBuffer()
268 if (fNbranches <= 0)
return kFALSE;
271 fIsTransferred = kFALSE;
273 TTree *tree = ((TBranch*)fBranches->UncheckedAt(0))->GetTree();
274 Long64_t entry = tree->GetReadEntry();
280 if (fEntryCurrent <= entry && entry < fEntryNext)
return kFALSE;
283 if (entry == -1) entry = 0;
285 TTree::TClusterIterator clusterIter = tree->GetClusterIterator(entry);
286 fEntryCurrent = clusterIter();
287 fEntryNext = clusterIter.GetNextEntry();
289 if (fEntryCurrent < fEntryMin) fEntryCurrent = fEntryMin;
290 if (fEntryMax <= 0) fEntryMax = tree->GetEntries();
291 if (fEntryNext > fEntryMax) fEntryNext = fEntryMax;
296 TEventList *elist = fTree->GetEventList();
297 Long64_t chainOffset = 0;
299 if (fTree->IsA() ==TChain::Class()) {
300 TChain *chain = (TChain*)fTree;
301 Int_t t = chain->GetTreeNumber();
302 chainOffset = chain->GetTreeOffset()[t];
307 TFileCacheRead::Prefetch(0,0);
310 for (Int_t i = 0; i < fNbranches; i++) {
311 TBranch *b = (TBranch*)fBranches->UncheckedAt(i);
312 if (b->GetDirectory() == 0)
continue;
313 if (b->GetDirectory()->GetFile() != fFile)
continue;
314 Int_t nb = b->GetMaxBaskets();
315 Int_t *lbaskets = b->GetBasketBytes();
316 Long64_t *entries = b->GetBasketEntry();
317 if (!lbaskets || !entries)
continue;
320 Int_t blistsize = b->GetListOfBaskets()->GetSize();
321 for (Int_t j=0;j<nb;j++) {
323 if (j<blistsize && b->GetListOfBaskets()->UncheckedAt(j))
continue;
325 Long64_t pos = b->GetBasketSeek(j);
326 Int_t len = lbaskets[j];
327 if (pos <= 0 || len <= 0)
continue;
329 if (entries[j] >= fEntryNext)
continue;
330 if (entries[j] < entry && (j < nb - 1 && entries[j+1] <= entry))
continue;
332 Long64_t emax = fEntryMax;
333 if (j < nb - 1) emax = entries[j+1] - 1;
334 if (!elist->ContainsRange(entries[j] + chainOffset, emax + chainOffset))
continue;
338 TFileCacheRead::Prefetch(pos, len);
340 if (gDebug > 0) printf(
"Entry: %lld, registering baskets branch %s, fEntryNext=%lld, fNseek=%d, fNtot=%d\n", entry, ((TBranch*)fBranches->UncheckedAt(i))->GetName(), fEntryNext, fNseek, fNtot);
345 fIsLearning = kFALSE;
357 Int_t TTreeCacheUnzip::SetBufferSize(Int_t buffersize)
359 Int_t res = TTreeCache::SetBufferSize(buffersize);
363 fUnzipBufferSize = Long64_t(fgRelBuffSize * GetBufferSize());
373 void TTreeCacheUnzip::SetEntryRange(Long64_t emin, Long64_t emax)
375 TTreeCache::SetEntryRange(emin, emax);
382 void TTreeCacheUnzip::StopLearningPhase()
384 TTreeCache::StopLearningPhase();
390 void TTreeCacheUnzip::UpdateBranches(TTree *tree)
392 TTreeCache::UpdateBranches(tree);
405 TTreeCacheUnzip::EParUnzipMode TTreeCacheUnzip::GetParallelUnzip()
413 Bool_t TTreeCacheUnzip::IsParallelUnzip()
415 if (fgParallel == kEnable || fgParallel == kForce)
434 Int_t TTreeCacheUnzip::SetParallelUnzip(TTreeCacheUnzip::EParUnzipMode option)
436 if(fgParallel == kEnable || fgParallel == kForce || fgParallel == kDisable) {
467 Int_t TTreeCacheUnzip::GetRecordHeader(
char *buf, Int_t maxbytes, Int_t &nbytes, Int_t &objlen, Int_t &keylen)
469 Version_t versionkey;
473 Int_t nread = maxbytes;
476 if (nb < 0)
return nread;
478 const Int_t headerSize = 16;
479 if (nread < headerSize)
return nread;
480 frombuf(buf, &versionkey);
482 frombuf(buf, &datime);
484 if (!olen) olen = nbytes - klen;
499 void TTreeCacheUnzip::ResetCache()
503 fUnzipState.Clear(fNseekMax);
505 if(fNseekMax < fNseek){
507 Info(
"ResetCache",
"Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
509 fUnzipState.Reset(fNseekMax, fNseek);
524 Int_t TTreeCacheUnzip::UnzipCache(Int_t index)
527 const Int_t hlen = 128;
528 Int_t objlen = 0, keylen = 0;
537 rdoffs = fSeek[index];
538 rdlen = fSeekLen[index];
541 if (!fNseek || fIsLearning) {
545 if ((myCycle != fCycle) || !fIsTransferred) {
546 fUnzipState.SetFinished(index);
553 locbuff =
new char[rdlen];
554 }
else if (rdlen * 3 < 16384) {
555 locbuff =
new char[rdlen * 2];
557 locbuff =
new char[16384];
560 readbuf = ReadBufferExt(locbuff, rdoffs, rdlen, loc);
563 fUnzipState.SetFinished(index);
564 if (locbuff)
delete [] locbuff;
568 GetRecordHeader(locbuff, hlen, nbytes, objlen, keylen);
570 Int_t len = (objlen > nbytes - keylen) ? keylen + objlen : nbytes;
575 if (len > 4 * fUnzipBufferSize) {
577 Info(
"UnzipCache",
"Block %d is too big, skipping.", index);
579 fUnzipState.SetFinished(index);
580 if (locbuff)
delete [] locbuff;
586 Int_t loclen = UnzipBuffer(&ptr, locbuff);
587 if ((loclen > 0) && (loclen == objlen + keylen)) {
588 if ((myCycle != fCycle) || !fIsTransferred) {
589 fUnzipState.SetFinished(index);
590 if (locbuff)
delete [] locbuff;
593 fUnzipState.SetUnzipped(index, ptr, loclen);
596 fUnzipState.SetFinished(index);
599 if (locbuff)
delete [] locbuff;
609 Int_t TTreeCacheUnzip::CreateTasks()
611 auto mapFunction = [&]() {
612 auto unzipFunction = [&](
const std::vector<Int_t> &indices) {
614 if (!fIsTransferred)
return nullptr;
616 for (
auto ii : indices) {
617 if(fUnzipState.TryUnzipping(ii)) {
618 Int_t res = UnzipCache(ii);
621 Info(
"UnzipCache",
"Unzipping failed or cache is in learning state");
628 std::vector<std::vector<Int_t>> basketIndices;
629 std::vector<Int_t> indices;
630 if (fUnzipGroupSize <= 0) fUnzipGroupSize = 102400;
631 for (Int_t i = 0; i < fNseek; i++) {
632 while (accusz < fUnzipGroupSize) {
633 accusz += fSeekLen[i];
634 indices.push_back(i);
636 if (i >= fNseek)
break;
639 basketIndices.push_back(indices);
643 ROOT::TThreadExecutor pool;
644 pool.Foreach(unzipFunction, basketIndices);
647 fUnzipTaskGroup.reset(
new ROOT::Experimental::TTaskGroup());
648 fUnzipTaskGroup->Run(mapFunction);
664 Int_t TTreeCacheUnzip::GetUnzipBuffer(
char **buf, Long64_t pos, Int_t len, Bool_t *free)
678 Int_t myCycle = fCycle;
680 if (fParallel && !fIsLearning) {
682 if(fNseekMax < fNseek){
684 Info(
"GetUnzipBuffer",
"Changing fNseekMax from:%d to:%d", fNseekMax, fNseek);
686 fUnzipState.Reset(fNseekMax, fNseek);
690 loc = (Int_t)TMath::BinarySearch(fNseek, fSeekSort, pos);
691 if ((fCycle == myCycle) && (loc >= 0) && (loc < fNseek) && (pos == fSeekSort[loc])) {
695 Int_t seekidx = fSeekIndex[loc];
702 if (fUnzipState.IsUnzipped(seekidx)) {
704 *buf = fUnzipState.fUnzipChunks[seekidx].get();
705 fUnzipState.fUnzipChunks[seekidx].release();
708 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
709 fUnzipState.fUnzipChunks[seekidx].reset();
714 return fUnzipState.fUnzipLen[seekidx];
720 if (fUnzipState.IsProgress(seekidx)) {
722 for (Int_t ii = 0; ii < fNseek; ++ii) {
723 Int_t idx = (seekidx + 1 + ii) % fNseek;
724 if (fUnzipState.IsUntouched(idx)) {
725 if(fUnzipState.TryUnzipping(idx)) {
738 if ( myCycle != fCycle ) {
740 Info(
"GetUnzipBuffer",
"Sudden paging Break!!! fNseek: %d, fIsLearning:%d",
741 fNseek, fIsLearning);
748 }
while (fUnzipState.IsProgress(seekidx));
751 if ( (seekidx >= 0) && (fUnzipState.IsUnzipped(seekidx)) ) {
753 *buf = fUnzipState.fUnzipChunks[seekidx].get();
754 fUnzipState.fUnzipChunks[seekidx].release();
757 memcpy(*buf, fUnzipState.fUnzipChunks[seekidx].get(), fUnzipState.fUnzipLen[seekidx]);
758 fUnzipState.fUnzipChunks[seekidx].reset();
763 return fUnzipState.fUnzipLen[seekidx];
767 fUnzipState.SetMissed(seekidx);
771 fIsTransferred = kFALSE;
775 if (len > fCompBufferSize) {
776 if(fCompBuffer)
delete [] fCompBuffer;
777 fCompBuffer =
new char[len];
778 fCompBufferSize = len;
780 if (fCompBufferSize > len * 4) {
781 if(fCompBuffer)
delete [] fCompBuffer;
782 fCompBuffer =
new char[len*2];
783 fCompBufferSize = len * 2;
788 if (!ReadBufferExt(fCompBuffer, pos, len, loc)) {
791 if(ROOT::IsImplicitMTEnabled() && fUnzipTaskGroup) {
792 fUnzipTaskGroup->Cancel();
793 fUnzipTaskGroup.reset();
798 R__LOCKGUARD(fIOMutex.get());
800 res = fFile->ReadBuffer(fCompBuffer, len);
803 if(ROOT::IsImplicitMTEnabled()) {
812 res = UnzipBuffer(buf, fCompBuffer);
826 void TTreeCacheUnzip::SetUnzipRelBufferSize(Float_t relbufferSize)
828 fgRelBuffSize = relbufferSize;
835 void TTreeCacheUnzip::SetUnzipBufferSize(Long64_t bufferSize)
837 fUnzipBufferSize = bufferSize;
849 Int_t TTreeCacheUnzip::UnzipBuffer(
char **dest,
char *src)
852 Bool_t alloc = kFALSE;
855 const Int_t hlen = 128;
856 Int_t nbytes = 0, objlen = 0, keylen = 0;
857 GetRecordHeader(src, hlen, nbytes, objlen, keylen);
861 UChar_t *bufcur = (UChar_t *) (src + keylen);
863 if(objlen > nbytes - keylen && R__unzip_header(&nin, bufcur, &nbuf) != 0) {
864 Error(
"UnzipBuffer",
"Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
868 Int_t l = keylen + objlen;
878 Bool_t oldCase = objlen == nbytes - keylen
879 && ((TBranch*)fBranches->UncheckedAt(0))->GetCompressionLevel() != 0
880 && fFile->GetVersion() <= 30401;
882 if (objlen > nbytes-keylen || oldCase) {
885 memcpy(*dest, src, keylen);
888 char *objbuf = *dest + keylen;
889 UChar_t *bufcur = (UChar_t *) (src + keylen);
895 Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
898 Info(
"UnzipBuffer",
" nin:%d, nbuf:%d, bufcur[3] :%d, bufcur[4] :%d, bufcur[5] :%d ",
899 nin, nbuf, bufcur[3], bufcur[4], bufcur[5]);
900 if (oldCase && (nin > objlen || nbuf > objlen)) {
902 Info(
"UnzipBuffer",
"oldcase objlen :%d ", objlen);
905 memcpy(*dest + keylen, src + keylen, objlen);
910 R__unzip(&nin, bufcur, &nbuf, objbuf, &nout);
913 Info(
"UnzipBuffer",
"R__unzip nin:%d, bufcur:%p, nbuf:%d, objbuf:%p, nout:%d",
914 nin, bufcur, nbuf, objbuf, nout);
918 if (noutot >= objlen)
break;
923 if (noutot != objlen) {
924 Error(
"UnzipBuffer",
"nbytes = %d, keylen = %d, objlen = %d, noutot = %d, nout=%d, nin=%d, nbuf=%d",
925 nbytes,keylen,objlen, noutot,nout,nin,nbuf);
927 if(alloc)
delete [] *dest;
933 memcpy(*dest, src, keylen);
935 memcpy(*dest + keylen, src + keylen, objlen);
943 void TTreeCacheUnzip::Print(Option_t* option)
const {
945 printf(
"******TreeCacheUnzip statistics for file: %s ******\n",fFile->GetName());
946 printf(
"Max allowed mem for pending buffers: %lld\n", fUnzipBufferSize);
947 printf(
"Number of blocks unzipped by threads: %d\n", fNUnzip);
948 printf(
"Number of hits: %d\n", fNFound);
949 printf(
"Number of stalls: %d\n", fNStalls);
950 printf(
"Number of misses: %d\n", fNMissed);
952 TTreeCache::Print(option);
957 Int_t TTreeCacheUnzip::ReadBufferExt(
char *buf, Long64_t pos, Int_t len, Int_t &loc) {
958 R__LOCKGUARD(fIOMutex.get());
959 return TTreeCache::ReadBufferExt(buf, pos, len, loc);