25 static const int kMAX_READ_SIZE = 2;
27 inline int xtod(
char c) {
return (c>=
'0' && c<=
'9') ? c-
'0' : ((c>=
'A' && c<=
'F') ? c-
'A'+10 : ((c>=
'a' && c<=
'f') ? c-
'a'+10 : 0)); }
31 ClassImp(TFilePrefetch);
52 TFilePrefetch::TFilePrefetch(TFile* file) :
56 fPrefetchFinished(kFALSE)
58 fPendingBlocks =
new TList();
59 fReadBlocks =
new TList();
61 fPendingBlocks->SetOwner();
62 fReadBlocks->SetOwner();
64 fSemChangeFile =
new TSemaphore(0);
70 TFilePrefetch::~TFilePrefetch()
76 SafeDelete(fConsumer);
77 SafeDelete(fPendingBlocks);
78 SafeDelete(fReadBlocks);
79 SafeDelete(fSemChangeFile);
86 void TFilePrefetch::WaitFinishPrefetch()
90 std::lock_guard<std::mutex> lk(fMutexPendingList);
91 fPrefetchFinished = kTRUE;
93 fNewBlockAdded.notify_one();
96 fThreadJoined = kTRUE;
97 fPrefetchFinished = kFALSE;
104 void TFilePrefetch::ReadAsync(TFPBlock* block, Bool_t &inCache)
108 if (CheckBlockInCache(path, block)){
109 block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
113 fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
114 if (fFile->GetArchive()) {
115 for (Int_t i = 0; i < block->GetNoElem(); i++)
116 block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
126 void TFilePrefetch::ReadListOfBlocks()
128 Bool_t inCache = kFALSE;
131 while((block = GetPendingBlock())){
132 ReadAsync(block, inCache);
135 SaveBlockInCache(block);
142 Bool_t TFilePrefetch::BinarySearchReadList(TFPBlock* blockObj, Long64_t offset, Int_t len, Int_t* index)
144 Int_t first = 0, last = -1, mid = -1;
145 last = (Int_t) blockObj->GetNoElem()-1;
147 while (first <= last){
148 mid = first + (last - first) / 2;
149 if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
150 && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
155 else if (blockObj->GetPos(mid) < offset){
168 Long64_t TFilePrefetch::GetWaitTime()
170 return Long64_t(fWaitTime.RealTime()*1.e+6);
176 Bool_t TFilePrefetch::ReadBuffer(
char* buf, Long64_t offset, Int_t len)
178 Bool_t found =
false;
179 TFPBlock* blockObj = 0;
182 std::unique_lock<std::mutex> lk(fMutexReadList);
184 TIter iter(fReadBlocks);
185 while ((blockObj = (TFPBlock*) iter.Next())){
187 if (BinarySearchReadList(blockObj, offset, len, &index)){
195 fWaitTime.Start(kFALSE);
196 fReadBlockAdded.wait(lk);
202 char *pBuff = blockObj->GetPtrToPiece(index);
203 pBuff += (offset - blockObj->GetPos(index));
204 memcpy(buf, pBuff, len);
212 void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
214 TFPBlock* block = CreateBlockObj(offset, len, nblock);
215 AddPendingBlock(block);
221 void TFilePrefetch::AddPendingBlock(TFPBlock* block)
223 fMutexPendingList.lock();
224 fPendingBlocks->Add(block);
225 fMutexPendingList.unlock();
227 fNewBlockAdded.notify_one();
233 TFPBlock* TFilePrefetch::GetPendingBlock()
239 fSemChangeFile->Post();
240 std::unique_lock<std::mutex> lk(fMutexPendingList);
242 fNewBlockAdded.wait(lk, [&]{
return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
244 fSemChangeFile->Wait();
247 if (fPendingBlocks->GetSize()){
248 block = (TFPBlock*)fPendingBlocks->First();
249 block = (TFPBlock*)fPendingBlocks->Remove(block);
257 void TFilePrefetch::AddReadBlock(TFPBlock* block)
259 fMutexReadList.lock();
261 if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
262 TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
263 movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
268 fReadBlocks->Add(block);
269 fMutexReadList.unlock();
272 fReadBlockAdded.notify_one();
279 TFPBlock* TFilePrefetch::CreateBlockObj(Long64_t* offset, Int_t* len, Int_t noblock)
281 TFPBlock* blockObj = 0;
283 fMutexReadList.lock();
285 if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
286 blockObj =
static_cast<TFPBlock*
>(fReadBlocks->First());
287 fReadBlocks->Remove(blockObj);
288 fMutexReadList.unlock();
289 blockObj->ReallocBlock(offset, len, noblock);
292 fMutexReadList.unlock();
293 blockObj =
new TFPBlock(offset, len, noblock);
301 TThread* TFilePrefetch::GetThread()
const
315 void TFilePrefetch::SetFile(TFile *file, TFile::ECacheAction action)
317 if (action == TFile::kDisconnect) {
318 if (!fThreadJoined) {
319 fSemChangeFile->Wait();
324 fMutexPendingList.lock();
325 fPendingBlocks->Clear();
326 fMutexPendingList.unlock();
328 fMutexReadList.lock();
329 fReadBlocks->Clear();
330 fMutexReadList.unlock();
334 if (!fThreadJoined) {
335 fSemChangeFile->Post();
339 assert((fFile == file) &&
"kDoNotDisconnect must reattach to the same file");
347 Int_t TFilePrefetch::ThreadStart()
351 fConsumer =
new TThread((TThread::VoidRtnFunc_t) ThreadProc, (
void*)
this);
352 rc = fConsumer->Run();
354 fThreadJoined = kFALSE;
363 TThread::VoidRtnFunc_t TFilePrefetch::ThreadProc(
void* arg)
365 TFilePrefetch* pClass = (TFilePrefetch*) arg;
367 while (!pClass->IsPrefetchFinished()) {
368 pClass->ReadListOfBlocks();
371 return (TThread::VoidRtnFunc_t) 1;
379 Int_t TFilePrefetch::SumHex(
const char *hex)
382 const char* ptr = hex;
384 for(Int_t i=0; i < (Int_t)strlen(hex); i++)
385 result += xtod(ptr[i]);
393 Bool_t TFilePrefetch::CheckBlockInCache(
char*& path, TFPBlock* block)
395 if (fPathCache ==
"")
398 Bool_t found =
false;
399 TString fullPath(fPathCache);
403 if (!gSystem->OpenDirectory(fullPath))
404 gSystem->mkdir(fullPath);
407 TMD5* md =
new TMD5();
410 for (Int_t i=0; i < block->GetNoElem(); i++){
411 concatStr.Form(
"%lld", block->GetPos(i));
412 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
416 TString fileName( md->AsString() );
417 value = SumHex(fileName);
420 dirName.Form(
"%i", value);
422 fullPath +=
"/" + dirName +
"/" + fileName;
425 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
426 path =
new char[fullPath.Length() + 1];
427 strlcpy(path, fullPath,fullPath.Length() + 1);
439 char* TFilePrefetch::GetBlockFromCache(
const char* path, Int_t length)
442 TString strPath = path;
444 strPath +=
"?filetype=raw";
445 TFile* file =
new TFile(strPath);
448 if (gPerfStats != 0) start = TTimeStamp();
450 buffer = (
char*) calloc(length,
sizeof(
char));
451 file->ReadBuffer(buffer, 0, length);
453 fFile->fBytesRead += length;
454 fFile->fgBytesRead += length;
455 fFile->SetReadCalls(fFile->GetReadCalls() + 1);
456 fFile->fgReadCalls++;
458 if (gMonitoringWriter)
459 gMonitoringWriter->SendFileReadProgress(fFile);
460 if (gPerfStats != 0) {
461 gPerfStats->FileReadEvent(fFile, length, start);
472 void TFilePrefetch::SaveBlockInCache(TFPBlock* block)
474 if (fPathCache ==
"")
478 TMD5* md =
new TMD5();
481 for(Int_t i=0; i< block->GetNoElem(); i++){
482 concatStr.Form(
"%lld", block->GetPos(i));
483 md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
487 TString fileName( md->AsString() );
488 Int_t value = SumHex(fileName);
491 TString fullPath( fPathCache );
493 dirName.Form(
"%i", value);
494 fullPath += (
"/" + dirName);
496 if (!gSystem->OpenDirectory(fullPath))
497 gSystem->mkdir(fullPath);
500 fullPath += (
"/" + fileName);
502 if (gSystem->GetPathInfo(fullPath, stat) == 0) {
503 fullPath +=
"?filetype=raw";
504 file = TFile::Open(fullPath,
"update");
506 fullPath +=
"?filetype=raw";
507 file = TFile::Open(fullPath,
"new");
513 file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
524 Bool_t TFilePrefetch::SetCache(
const char* path)
528 if (!gSystem->OpenDirectory(path)){
529 return (!gSystem->mkdir(path) ?
true :
false);