Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TFilePrefetch.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Elvin Sindrilaru 19/05/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "TFilePrefetch.h"
13 #include "TTimeStamp.h"
14 #include "TVirtualPerfStats.h"
15 #include "TVirtualMonitoring.h"
16 
17 #include <iostream>
18 #include <string>
19 #include <sstream>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <cctype>
23 #include <cassert>
24 
25 static const int kMAX_READ_SIZE = 2; //maximum size of the read list of blocks
26 
27 inline int xtod(char c) { return (c>='0' && c<='9') ? c-'0' : ((c>='A' && c<='F') ? c-'A'+10 : ((c>='a' && c<='f') ? c-'a'+10 : 0)); }
28 
29 using namespace std;
30 
31 ClassImp(TFilePrefetch);
32 
33 /**
34 \class TFilePrefetch
35 \ingroup IO
36 
37 The prefetching mechanism uses two classes (TFilePrefetch and
38 TFPBlock) to prefetch in advance a block of tree entries. There is
39 a thread which takes care of actually transferring the blocks and
40 making them available to the main requesting thread. Therefore,
41 the time spent by the main thread waiting for the data before
42 processing considerably decreases. Besides the prefetching
43 mechanisms there is also a local caching option which can be
44 enabled by the user. Both capabilities are disabled by default
45 and must be explicitly enabled by the user.
46 */
47 
48 
49 ////////////////////////////////////////////////////////////////////////////////
50 /// Constructor.
51 
52 TFilePrefetch::TFilePrefetch(TFile* file) :
53  fFile(file),
54  fConsumer(0),
55  fThreadJoined(kTRUE),
56  fPrefetchFinished(kFALSE)
57 {
58  fPendingBlocks = new TList();
59  fReadBlocks = new TList();
60 
61  fPendingBlocks->SetOwner();
62  fReadBlocks->SetOwner();
63 
64  fSemChangeFile = new TSemaphore(0);
65 }
66 
67 ////////////////////////////////////////////////////////////////////////////////
68 /// Destructor
69 
70 TFilePrefetch::~TFilePrefetch()
71 {
72  if (!fThreadJoined) {
73  WaitFinishPrefetch();
74  }
75 
76  SafeDelete(fConsumer);
77  SafeDelete(fPendingBlocks);
78  SafeDelete(fReadBlocks);
79  SafeDelete(fSemChangeFile);
80 }
81 
82 
83 ////////////////////////////////////////////////////////////////////////////////
84 /// Killing the async prefetching thread
85 
86 void TFilePrefetch::WaitFinishPrefetch()
87 {
88  // Inform the consumer thread that prefetching is over
89  {
90  std::lock_guard<std::mutex> lk(fMutexPendingList);
91  fPrefetchFinished = kTRUE;
92  }
93  fNewBlockAdded.notify_one();
94 
95  fConsumer->Join();
96  fThreadJoined = kTRUE;
97  fPrefetchFinished = kFALSE;
98 }
99 
100 
101 ////////////////////////////////////////////////////////////////////////////////
102 /// Read one block and insert it in prefetchBuffers list.
103 
104 void TFilePrefetch::ReadAsync(TFPBlock* block, Bool_t &inCache)
105 {
106  char* path = 0;
107 
108  if (CheckBlockInCache(path, block)){
109  block->SetBuffer(GetBlockFromCache(path, block->GetDataSize()));
110  inCache = kTRUE;
111  }
112  else{
113  fFile->ReadBuffers(block->GetBuffer(), block->GetPos(), block->GetLen(), block->GetNoElem());
114  if (fFile->GetArchive()) {
115  for (Int_t i = 0; i < block->GetNoElem(); i++)
116  block->SetPos(i, block->GetPos(i) - fFile->GetArchiveOffset());
117  }
118  inCache =kFALSE;
119  }
120  delete[] path;
121 }
122 
123 ////////////////////////////////////////////////////////////////////////////////
124 /// Get blocks specified in prefetchBlocks.
125 
126 void TFilePrefetch::ReadListOfBlocks()
127 {
128  Bool_t inCache = kFALSE;
129  TFPBlock* block = 0;
130 
131  while((block = GetPendingBlock())){
132  ReadAsync(block, inCache);
133  AddReadBlock(block);
134  if (!inCache)
135  SaveBlockInCache(block);
136  }
137 }
138 
139 ////////////////////////////////////////////////////////////////////////////////
140 /// Search for a requested element in a block and return the index.
141 
142 Bool_t TFilePrefetch::BinarySearchReadList(TFPBlock* blockObj, Long64_t offset, Int_t len, Int_t* index)
143 {
144  Int_t first = 0, last = -1, mid = -1;
145  last = (Int_t) blockObj->GetNoElem()-1;
146 
147  while (first <= last){
148  mid = first + (last - first) / 2;
149  if ((offset >= blockObj->GetPos(mid) && offset <= (blockObj->GetPos(mid) + blockObj->GetLen(mid))
150  && ( (offset + len) <= blockObj->GetPos(mid) + blockObj->GetLen(mid)))){
151 
152  *index = mid;
153  return true;
154  }
155  else if (blockObj->GetPos(mid) < offset){
156  first = mid + 1;
157  }
158  else{
159  last = mid - 1;
160  }
161  }
162  return false;
163 }
164 
165 ////////////////////////////////////////////////////////////////////////////////
166 /// Return the time spent wating for buffer to be read in microseconds.
167 
168 Long64_t TFilePrefetch::GetWaitTime()
169 {
170  return Long64_t(fWaitTime.RealTime()*1.e+6);
171 }
172 
173 ////////////////////////////////////////////////////////////////////////////////
174 /// Return a prefetched element.
175 
176 Bool_t TFilePrefetch::ReadBuffer(char* buf, Long64_t offset, Int_t len)
177 {
178  Bool_t found = false;
179  TFPBlock* blockObj = 0;
180  Int_t index = -1;
181 
182  std::unique_lock<std::mutex> lk(fMutexReadList);
183  while (1){
184  TIter iter(fReadBlocks);
185  while ((blockObj = (TFPBlock*) iter.Next())){
186  index = -1;
187  if (BinarySearchReadList(blockObj, offset, len, &index)){
188  found = true;
189  break;
190  }
191  }
192  if (found)
193  break;
194  else{
195  fWaitTime.Start(kFALSE);
196  fReadBlockAdded.wait(lk); //wait for a new block to be added
197  fWaitTime.Stop();
198  }
199  }
200 
201  if (found){
202  char *pBuff = blockObj->GetPtrToPiece(index);
203  pBuff += (offset - blockObj->GetPos(index));
204  memcpy(buf, pBuff, len);
205  }
206  return found;
207 }
208 
209 ////////////////////////////////////////////////////////////////////////////////
210 /// Create a TFPBlock object or recycle one and add it to the prefetchBlocks list.
211 
212 void TFilePrefetch::ReadBlock(Long64_t* offset, Int_t* len, Int_t nblock)
213 {
214  TFPBlock* block = CreateBlockObj(offset, len, nblock);
215  AddPendingBlock(block);
216 }
217 
218 ////////////////////////////////////////////////////////////////////////////////
219 /// Safe method to add a block to the pendingList.
220 
221 void TFilePrefetch::AddPendingBlock(TFPBlock* block)
222 {
223  fMutexPendingList.lock();
224  fPendingBlocks->Add(block);
225  fMutexPendingList.unlock();
226 
227  fNewBlockAdded.notify_one();
228 }
229 
230 ////////////////////////////////////////////////////////////////////////////////
231 /// Safe method to remove a block from the pendingList.
232 
233 TFPBlock* TFilePrefetch::GetPendingBlock()
234 {
235  TFPBlock* block = 0;
236 
237  // Use the semaphore to deal with the case when the file pointer
238  // is changed on the fly by TChain
239  fSemChangeFile->Post();
240  std::unique_lock<std::mutex> lk(fMutexPendingList);
241  // Wait unless there is a pending block or prefetching is over
242  fNewBlockAdded.wait(lk, [&]{ return fPendingBlocks->GetSize() > 0 || fPrefetchFinished; });
243  lk.unlock();
244  fSemChangeFile->Wait();
245 
246  lk.lock();
247  if (fPendingBlocks->GetSize()){
248  block = (TFPBlock*)fPendingBlocks->First();
249  block = (TFPBlock*)fPendingBlocks->Remove(block);
250  }
251  return block;
252 }
253 
254 ////////////////////////////////////////////////////////////////////////////////
255 /// Safe method to add a block to the readList.
256 
257 void TFilePrefetch::AddReadBlock(TFPBlock* block)
258 {
259  fMutexReadList.lock();
260 
261  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
262  TFPBlock* movedBlock = (TFPBlock*) fReadBlocks->First();
263  movedBlock = (TFPBlock*)fReadBlocks->Remove(movedBlock);
264  delete movedBlock;
265  movedBlock = 0;
266  }
267 
268  fReadBlocks->Add(block);
269  fMutexReadList.unlock();
270 
271  //signal the addition of a new block
272  fReadBlockAdded.notify_one();
273 }
274 
275 
276 ////////////////////////////////////////////////////////////////////////////////
277 /// Create a new block or recycle an old one.
278 
279 TFPBlock* TFilePrefetch::CreateBlockObj(Long64_t* offset, Int_t* len, Int_t noblock)
280 {
281  TFPBlock* blockObj = 0;
282 
283  fMutexReadList.lock();
284 
285  if (fReadBlocks->GetSize() >= kMAX_READ_SIZE){
286  blockObj = static_cast<TFPBlock*>(fReadBlocks->First());
287  fReadBlocks->Remove(blockObj);
288  fMutexReadList.unlock();
289  blockObj->ReallocBlock(offset, len, noblock);
290  }
291  else{
292  fMutexReadList.unlock();
293  blockObj = new TFPBlock(offset, len, noblock);
294  }
295  return blockObj;
296 }
297 
298 ////////////////////////////////////////////////////////////////////////////////
299 /// Return reference to the consumer thread.
300 
301 TThread* TFilePrefetch::GetThread() const
302 {
303  return fConsumer;
304 }
305 
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Change the file
309 ///
310 /// When prefetching is enabled we also need to:
311 /// - make sure the async thread is not doing any work
312 /// - clear all blocks from prefetching and read list
313 /// - reset the file pointer
314 
315 void TFilePrefetch::SetFile(TFile *file, TFile::ECacheAction action)
316 {
317  if (action == TFile::kDisconnect) {
318  if (!fThreadJoined) {
319  fSemChangeFile->Wait();
320  }
321 
322  if (fFile) {
323  // Remove all pending and read blocks
324  fMutexPendingList.lock();
325  fPendingBlocks->Clear();
326  fMutexPendingList.unlock();
327 
328  fMutexReadList.lock();
329  fReadBlocks->Clear();
330  fMutexReadList.unlock();
331  }
332 
333  fFile = file;
334  if (!fThreadJoined) {
335  fSemChangeFile->Post();
336  }
337  } else {
338  // kDoNotDisconnect must reconnect to the same file
339  assert((fFile == file) && "kDoNotDisconnect must reattach to the same file");
340  }
341 }
342 
343 
344 ////////////////////////////////////////////////////////////////////////////////
345 /// Used to start the consumer thread.
346 
347 Int_t TFilePrefetch::ThreadStart()
348 {
349  int rc;
350 
351  fConsumer = new TThread((TThread::VoidRtnFunc_t) ThreadProc, (void*) this);
352  rc = fConsumer->Run();
353  if ( !rc ) {
354  fThreadJoined = kFALSE;
355  }
356  return rc;
357 }
358 
359 
360 ////////////////////////////////////////////////////////////////////////////////
361 /// Execution loop of the consumer thread.
362 
363 TThread::VoidRtnFunc_t TFilePrefetch::ThreadProc(void* arg)
364 {
365  TFilePrefetch* pClass = (TFilePrefetch*) arg;
366 
367  while (!pClass->IsPrefetchFinished()) {
368  pClass->ReadListOfBlocks();
369  }
370 
371  return (TThread::VoidRtnFunc_t) 1;
372 }
373 
374 //############################# CACHING PART ###################################
375 
376 ////////////////////////////////////////////////////////////////////////////////
377 /// Sum up individual hex values to obtain a decimal value.
378 
379 Int_t TFilePrefetch::SumHex(const char *hex)
380 {
381  Int_t result = 0;
382  const char* ptr = hex;
383 
384  for(Int_t i=0; i < (Int_t)strlen(hex); i++)
385  result += xtod(ptr[i]);
386 
387  return result;
388 }
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 /// Test if the block is in cache.
392 
393 Bool_t TFilePrefetch::CheckBlockInCache(char*& path, TFPBlock* block)
394 {
395  if (fPathCache == "")
396  return false;
397 
398  Bool_t found = false;
399  TString fullPath(fPathCache); // path of the cached files.
400 
401  Int_t value = 0;
402 
403  if (!gSystem->OpenDirectory(fullPath))
404  gSystem->mkdir(fullPath);
405 
406  //dir is SHA1 value modulo 16; filename is the value of the SHA1(offset+len)
407  TMD5* md = new TMD5();
408 
409  TString concatStr;
410  for (Int_t i=0; i < block->GetNoElem(); i++){
411  concatStr.Form("%lld", block->GetPos(i));
412  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
413  }
414 
415  md->Final();
416  TString fileName( md->AsString() );
417  value = SumHex(fileName);
418  value = value % 16;
419  TString dirName;
420  dirName.Form("%i", value);
421 
422  fullPath += "/" + dirName + "/" + fileName;
423 
424  FileStat_t stat;
425  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
426  path = new char[fullPath.Length() + 1];
427  strlcpy(path, fullPath,fullPath.Length() + 1);
428  found = true;
429  } else
430  found = false;
431 
432  delete md;
433  return found;
434 }
435 
436 ////////////////////////////////////////////////////////////////////////////////
437 /// Return a buffer from cache.
438 
439 char* TFilePrefetch::GetBlockFromCache(const char* path, Int_t length)
440 {
441  char *buffer = 0;
442  TString strPath = path;
443 
444  strPath += "?filetype=raw";
445  TFile* file = new TFile(strPath);
446 
447  Double_t start = 0;
448  if (gPerfStats != 0) start = TTimeStamp();
449 
450  buffer = (char*) calloc(length, sizeof(char));
451  file->ReadBuffer(buffer, 0, length);
452 
453  fFile->fBytesRead += length;
454  fFile->fgBytesRead += length;
455  fFile->SetReadCalls(fFile->GetReadCalls() + 1);
456  fFile->fgReadCalls++;
457 
458  if (gMonitoringWriter)
459  gMonitoringWriter->SendFileReadProgress(fFile);
460  if (gPerfStats != 0) {
461  gPerfStats->FileReadEvent(fFile, length, start);
462  }
463 
464  file->Close();
465  delete file;
466  return buffer;
467 }
468 
469 ////////////////////////////////////////////////////////////////////////////////
470 /// Save the block content in cache.
471 
472 void TFilePrefetch::SaveBlockInCache(TFPBlock* block)
473 {
474  if (fPathCache == "")
475  return;
476 
477  //dir is SHA1 value modulo 16; filename is the value of the SHA1
478  TMD5* md = new TMD5();
479 
480  TString concatStr;
481  for(Int_t i=0; i< block->GetNoElem(); i++){
482  concatStr.Form("%lld", block->GetPos(i));
483  md->Update((UChar_t*)concatStr.Data(), concatStr.Length());
484  }
485  md->Final();
486 
487  TString fileName( md->AsString() );
488  Int_t value = SumHex(fileName);
489  value = value % 16;
490 
491  TString fullPath( fPathCache );
492  TString dirName;
493  dirName.Form("%i", value);
494  fullPath += ("/" + dirName);
495 
496  if (!gSystem->OpenDirectory(fullPath))
497  gSystem->mkdir(fullPath);
498 
499  TFile* file = 0;
500  fullPath += ("/" + fileName);
501  FileStat_t stat;
502  if (gSystem->GetPathInfo(fullPath, stat) == 0) {
503  fullPath += "?filetype=raw";
504  file = TFile::Open(fullPath, "update");
505  } else{
506  fullPath += "?filetype=raw";
507  file = TFile::Open(fullPath, "new");
508  }
509 
510  if (file) {
511  // coverity[unchecked_value] We do not print error message, have not error
512  // return code and close the file anyway, not need to check the return value.
513  file->WriteBuffer(block->GetBuffer(), block->GetDataSize());
514  file->Close();
515  delete file;
516  }
517  delete md;
518 }
519 
520 
521 ////////////////////////////////////////////////////////////////////////////////
522 /// Set the path of the cache directory.
523 
524 Bool_t TFilePrefetch::SetCache(const char* path)
525 {
526  fPathCache = path;
527 
528  if (!gSystem->OpenDirectory(path)){
529  return (!gSystem->mkdir(path) ? true : false);
530  }
531 
532  // Directory already exists
533  return true;
534 }
535