Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TBufferMerger.cxx
Go to the documentation of this file.
1 // @(#)root/io:$Id$
2 // Author: Philippe Canal, Witold Pokorski, and Guilherme Amadio
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #include "ROOT/TBufferMerger.hxx"
13 
14 #include "TBufferFile.h"
15 #include "TError.h"
16 #include "TROOT.h"
17 #include "TVirtualMutex.h"
18 
19 #include <utility>
20 
21 namespace ROOT {
22 namespace Experimental {
23 
24 TBufferMerger::TBufferMerger(const char *name, Option_t *option, Int_t compress)
25 {
26  // We cannot chain constructors or use in-place initialization here because
27  // instantiating a TBufferMerger should not alter gDirectory's state.
28  TDirectory::TContext ctxt;
29  Init(std::unique_ptr<TFile>(TFile::Open(name, option, /* title */ name, compress)));
30 }
31 
32 TBufferMerger::TBufferMerger(std::unique_ptr<TFile> output)
33 {
34  Init(std::move(output));
35 }
36 
37 void TBufferMerger::Init(std::unique_ptr<TFile> output)
38 {
39  if (!output || !output->IsWritable() || output->IsZombie())
40  Error("TBufferMerger", "cannot write to output file");
41 
42  fMerger.OutputFile(std::move(output));
43 }
44 
45 TBufferMerger::~TBufferMerger()
46 {
47  for (const auto &f : fAttachedFiles)
48  if (!f.expired()) Fatal("TBufferMerger", " TBufferMergerFiles must be destroyed before the server");
49 
50  if (!fQueue.empty())
51  Merge();
52 }
53 
54 std::shared_ptr<TBufferMergerFile> TBufferMerger::GetFile()
55 {
56  R__LOCKGUARD(gROOTMutex);
57  std::shared_ptr<TBufferMergerFile> f(new TBufferMergerFile(*this));
58  gROOT->GetListOfFiles()->Remove(f.get());
59  fAttachedFiles.push_back(f);
60  return f;
61 }
62 
63 size_t TBufferMerger::GetQueueSize() const
64 {
65  return fQueue.size();
66 }
67 
68 void TBufferMerger::Push(TBufferFile *buffer)
69 {
70  {
71  std::lock_guard<std::mutex> lock(fQueueMutex);
72  fBuffered += buffer->BufferSize();
73  fQueue.push(buffer);
74  }
75 
76  if (fBuffered > fAutoSave)
77  Merge();
78 }
79 
80 size_t TBufferMerger::GetAutoSave() const
81 {
82  return fAutoSave;
83 }
84 
85 const char *TBufferMerger::GetMergeOptions()
86 {
87  return fMerger.GetMergeOptions();
88 }
89 
90 
91 void TBufferMerger::SetAutoSave(size_t size)
92 {
93  fAutoSave = size;
94 }
95 
96 void TBufferMerger::SetMergeOptions(const TString& options)
97 {
98  fMerger.SetMergeOptions(options);
99 }
100 
101 void TBufferMerger::Merge()
102 {
103  if (fMergeMutex.try_lock()) {
104  std::queue<TBufferFile *> queue;
105  {
106  std::lock_guard<std::mutex> q(fQueueMutex);
107  std::swap(queue, fQueue);
108  fBuffered = 0;
109  }
110 
111  while (!queue.empty()) {
112  std::unique_ptr<TBufferFile> buffer{queue.front()};
113  fMerger.AddAdoptFile(new TMemFile(fMerger.GetOutputFileName(), std::move(buffer)));
114  queue.pop();
115  }
116 
117  fMerger.PartialMerge();
118  fMerger.Reset();
119  fMergeMutex.unlock();
120  }
121 }
122 
123 } // namespace Experimental
124 } // namespace ROOT