25 static const int kMAX_READ_SIZE    = 2;   
 
   27 inline int xtod(
char c) { 
return (c>=
'0' && c<=
'9') ? c-
'0' : ((c>=
'A' && c<=
'F') ? c-
'A'+10 : ((c>=
'a' && c<=
'f') ? c-
'a'+10 : 0)); }
 
   31 ClassImp(TFilePrefetch);
 
   52 TFilePrefetch::TFilePrefetch(TFile* file) :
 
   56   fPrefetchFinished(kFALSE)
 
   58    fPendingBlocks    = 
new TList();
 
   59    fReadBlocks       = 
new TList();
 
   61    fPendingBlocks->SetOwner();
 
   62    fReadBlocks->SetOwner();
 
   64    fSemChangeFile    = 
new TSemaphore(0);
 
   70 TFilePrefetch::~TFilePrefetch()
 
   76    SafeDelete(fConsumer);
 
   77    SafeDelete(fPendingBlocks);
 
   78    SafeDelete(fReadBlocks);
 
   79    SafeDelete(fSemChangeFile);
 
   86 void TFilePrefetch::WaitFinishPrefetch()
 
   90       std::lock_guard<std::mutex> lk(fMutexPendingList);
 
   91       fPrefetchFinished = kTRUE;
 
   93    fNewBlockAdded.notify_one();
 
   96    fThreadJoined = kTRUE;
 
   97    fPrefetchFinished = kFALSE;
 
  104 void TFilePrefetch::ReadAsync(TFPBlock* block, Bool_t &inCache)
 
  108    if (CheckBlockInCache(path, block)){
 
  109       block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
 
  113       fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
 
  114       if (fFile->GetArchive()) {
 
  115          for (Int_t i = 0; i < block->GetNoElem(); i++)
 
  116             block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
 
  126 void TFilePrefetch::ReadListOfBlocks()
 
  128    Bool_t inCache = kFALSE;
 
  131    while((block = GetPendingBlock())){
 
  132       ReadAsync(block, inCache);
 
  135          SaveBlockInCache(block);
 
  142 Bool_t TFilePrefetch::BinarySearchReadList(TFPBlock* blockObj, Long64_t offset, Int_t len, Int_t* index)
 
  144    Int_t first = 0, last = -1, mid = -1;
 
  145    last = (Int_t) blockObj->GetNoElem()-1;
 
  147    while (first <= last){
 
  148       mid = first + (last - first) / 2;
 
  149       if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
 
  150            && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
 
  155       else if (blockObj->GetPos(mid) < offset){
 
  168 Long64_t TFilePrefetch::GetWaitTime()
 
  170    return Long64_t(fWaitTime.RealTime()*1.e+6);
 
  176 Bool_t TFilePrefetch::ReadBuffer(
char* buf, Long64_t offset, Int_t len)
 
  178    Bool_t found = 
false;
 
  179    TFPBlock* blockObj = 0;
 
  182    std::unique_lock<std::mutex> lk(fMutexReadList);
 
  184       TIter iter(fReadBlocks);
 
  185       while ((blockObj = (TFPBlock*) iter.Next())){
 
  187          if (BinarySearchReadList(blockObj, offset, len, &index)){
 
  195          fWaitTime.Start(kFALSE);
 
  196          fReadBlockAdded.wait(lk); 
 
  202       char *pBuff = blockObj->GetPtrToPiece(index);
 
  203       pBuff += (offset - blockObj->GetPos(index));
 
  204       memcpy(buf, pBuff, len);
 
  212 void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
 
  214    TFPBlock* block = CreateBlockObj(offset, len, nblock);
 
  215    AddPendingBlock(block);
 
  221 void TFilePrefetch::AddPendingBlock(TFPBlock* block)
 
  223    fMutexPendingList.lock();
 
  224    fPendingBlocks->Add(block);
 
  225    fMutexPendingList.unlock();
 
  227    fNewBlockAdded.notify_one();
 
  233 TFPBlock* TFilePrefetch::GetPendingBlock()
 
  239    fSemChangeFile->Post();
 
  240    std::unique_lock<std::mutex> lk(fMutexPendingList);
 
  242    fNewBlockAdded.wait(lk, [&]{ 
return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
 
  244    fSemChangeFile->Wait();
 
  247    if (fPendingBlocks->GetSize()){
 
  248       block = (TFPBlock*)fPendingBlocks->First();
 
  249       block = (TFPBlock*)fPendingBlocks->Remove(block);
 
  257 void TFilePrefetch::AddReadBlock(TFPBlock* block)
 
  259    fMutexReadList.lock();
 
  261    if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
 
  262       TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
 
  263       movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
 
  268    fReadBlocks->Add(block);
 
  269    fMutexReadList.unlock();
 
  272    fReadBlockAdded.notify_one();
 
  279 TFPBlock* TFilePrefetch::CreateBlockObj(Long64_t* offset, Int_t* len, Int_t noblock)
 
  281    TFPBlock* blockObj = 0;
 
  283    fMutexReadList.lock();
 
  285    if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
 
  286       blockObj = 
static_cast<TFPBlock*
>(fReadBlocks->First());
 
  287       fReadBlocks->Remove(blockObj);
 
  288       fMutexReadList.unlock();
 
  289       blockObj->ReallocBlock(offset, len, noblock);
 
  292       fMutexReadList.unlock();
 
  293       blockObj = 
new TFPBlock(offset, len, noblock);
 
  301 TThread* TFilePrefetch::GetThread()
 const 
  315 void TFilePrefetch::SetFile(TFile *file, TFile::ECacheAction action)
 
  317    if (action == TFile::kDisconnect) {
 
  318       if (!fThreadJoined) {
 
  319         fSemChangeFile->Wait();
 
  324         fMutexPendingList.lock();
 
  325         fPendingBlocks->Clear();
 
  326         fMutexPendingList.unlock();
 
  328         fMutexReadList.lock();
 
  329         fReadBlocks->Clear();
 
  330         fMutexReadList.unlock();
 
  334       if (!fThreadJoined) {
 
  335         fSemChangeFile->Post();
 
  339       assert((fFile == file) && 
"kDoNotDisconnect must reattach to the same file");
 
  347 Int_t TFilePrefetch::ThreadStart()
 
  351    fConsumer = 
new TThread((TThread::VoidRtnFunc_t) ThreadProc, (
void*) 
this);
 
  352    rc = fConsumer->Run();
 
  354       fThreadJoined = kFALSE;
 
  363 TThread::VoidRtnFunc_t TFilePrefetch::ThreadProc(
void* arg)
 
  365    TFilePrefetch* pClass = (TFilePrefetch*) arg;
 
  367    while (!pClass->IsPrefetchFinished()) {
 
  368       pClass->ReadListOfBlocks();
 
  371    return (TThread::VoidRtnFunc_t) 1;
 
  379 Int_t TFilePrefetch::SumHex(
const char *hex)
 
  382    const char* ptr = hex;
 
  384    for(Int_t i=0; i < (Int_t)strlen(hex); i++)
 
  385       result += xtod(ptr[i]);
 
  393 Bool_t TFilePrefetch::CheckBlockInCache(
char*& path, TFPBlock* block)
 
  395    if (fPathCache == 
"")
 
  398    Bool_t found = 
false;
 
  399    TString fullPath(fPathCache); 
 
  403    if (!gSystem->OpenDirectory(fullPath))
 
  404       gSystem->mkdir(fullPath);
 
  407    TMD5* md = 
new TMD5();
 
  410    for (Int_t i=0; i < block->GetNoElem(); i++){
 
  411       concatStr.Form(
"%lld", block->GetPos(i));
 
  412       md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
 
  416    TString fileName( md->AsString() );
 
  417    value = SumHex(fileName);
 
  420    dirName.Form(
"%i", value);
 
  422    fullPath += 
"/" + dirName + 
"/" + fileName;
 
  425    if (gSystem->GetPathInfo(fullPath, stat) == 0) {
 
  426       path = 
new char[fullPath.Length() + 1];
 
  427       strlcpy(path, fullPath,fullPath.Length() + 1);
 
  439 char* TFilePrefetch::GetBlockFromCache(
const char* path, Int_t length)
 
  442    TString strPath = path;
 
  444    strPath += 
"?filetype=raw";
 
  445    TFile* file = 
new TFile(strPath);
 
  448    if (gPerfStats != 0) start = TTimeStamp();
 
  450    buffer = (
char*) calloc(length, 
sizeof(
char));
 
  451    file->ReadBuffer(buffer, 0, length);
 
  453    fFile->fBytesRead  += length;
 
  454    fFile->fgBytesRead += length;
 
  455    fFile->SetReadCalls(fFile->GetReadCalls() + 1);
 
  456    fFile->fgReadCalls++;
 
  458    if (gMonitoringWriter)
 
  459       gMonitoringWriter->SendFileReadProgress(fFile);
 
  460    if (gPerfStats != 0) {
 
  461       gPerfStats->FileReadEvent(fFile, length, start);
 
  472 void TFilePrefetch::SaveBlockInCache(TFPBlock* block)
 
  474    if (fPathCache == 
"")
 
  478    TMD5* md = 
new TMD5();
 
  481    for(Int_t i=0; i< block->GetNoElem(); i++){
 
  482       concatStr.Form(
"%lld", block->GetPos(i));
 
  483       md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
 
  487    TString fileName( md->AsString() );
 
  488    Int_t value = SumHex(fileName);
 
  491    TString fullPath( fPathCache );
 
  493    dirName.Form(
"%i", value);
 
  494    fullPath += (
"/" + dirName);
 
  496    if (!gSystem->OpenDirectory(fullPath))
 
  497       gSystem->mkdir(fullPath);
 
  500    fullPath += (
"/" + fileName);
 
  502    if (gSystem->GetPathInfo(fullPath, stat) == 0) {
 
  503       fullPath += 
"?filetype=raw";
 
  504       file = TFile::Open(fullPath, 
"update");
 
  506       fullPath += 
"?filetype=raw";
 
  507       file = TFile::Open(fullPath, 
"new");
 
  513       file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
 
  524 Bool_t TFilePrefetch::SetCache(
const char* path)
 
  528   if (!gSystem->OpenDirectory(path)){
 
  529     return (!gSystem->mkdir(path) ? 
true : 
false);