Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TThreadedObject.hxx
Go to the documentation of this file.
1 // @(#)root/thread:$Id$
2 // Author: Danilo Piparo, CERN 11/2/2016
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2018, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TThreadedObject
13 #define ROOT_TThreadedObject
14 
15 #include "TList.h"
16 #include "TError.h"
17 
18 #include <functional>
19 #include <map>
20 #include <memory>
21 #include <mutex>
22 #include <string>
23 #include <thread>
24 #include <vector>
25 
26 #include "ROOT/TSpinMutex.hxx"
27 #include "TROOT.h"
28 
29 class TH1;
30 
31 namespace ROOT {
32 
33  namespace Internal {
34 
35  namespace TThreadedObjectUtils {
36 
37  /// Get the unique index identifying a TThreadedObject.
38  inline unsigned GetTThreadedObjectIndex() {
39  static unsigned fgTThreadedObjectIndex = 0;
40  return fgTThreadedObjectIndex++;
41  }
42 
43  template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
44  struct Detacher{
45  static T* Detach(T* obj) {
46  return obj;
47  }
48  };
49 
50  template<typename T>
51  struct Detacher<T, true>{
52  static T* Detach(T* obj) {
53  obj->SetDirectory(nullptr);
54  obj->ResetBit(kMustCleanup);
55  return obj;
56  }
57  };
58 
59  /// Return a copy of the object or a "Clone" if the copy constructor is not implemented.
60  template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
61  struct Cloner {
62  static T *Clone(const T *obj, TDirectory* d = nullptr) {
63  T* clone;
64  if (d){
65  TDirectory::TContext ctxt(d);
66  clone = new T(*obj);
67  } else {
68  clone = new T(*obj);
69  }
70  return Detacher<T>::Detach(clone);
71  }
72  };
73 
74  template<class T>
75  struct Cloner<T, false> {
76  static T *Clone(const T *obj, TDirectory* d = nullptr) {
77  T* clone;
78  if (d){
79  TDirectory::TContext ctxt(d);
80  clone = (T*)obj->Clone();
81  } else {
82  clone = (T*)obj->Clone();
83  }
84  return clone;
85  }
86  };
87 
88  template<class T, bool ISHISTO = std::is_base_of<TH1,T>::value>
89  struct DirCreator{
90  static std::vector<TDirectory*> Create(unsigned maxSlots) {
91  std::string dirName = "__TThreaded_dir_";
92  dirName += std::to_string(ROOT::Internal::TThreadedObjectUtils::GetTThreadedObjectIndex()) + "_";
93  std::vector<TDirectory*> dirs;
94  dirs.reserve(maxSlots);
95  for (unsigned i=0; i< maxSlots;++i) {
96  auto dir = gROOT->mkdir((dirName+std::to_string(i)).c_str());
97  dirs.emplace_back(dir);
98  }
99  return dirs;
100  }
101  };
102 
103  template<class T>
104  struct DirCreator<T, true>{
105  static std::vector<TDirectory*> Create(unsigned maxSlots) {
106  std::vector<TDirectory*> dirs(maxSlots, nullptr);
107  return dirs;
108  }
109  };
110 
111  } // End of namespace TThreadedObjectUtils
112  } // End of namespace Internals
113 
114  namespace TThreadedObjectUtils {
115 
116  template<class T>
117  using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
118  /// Merge TObjects
119  template<class T>
120  void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
121  {
122  if (!target) return;
123  TList objTList;
124  // Cannot do better than this
125  for (auto obj : objs) {
126  if (obj && obj != target) objTList.Add(obj.get());
127  }
128  target->Merge(&objTList);
129  }
130  } // end of namespace TThreadedObjectUtils
131 
132  /**
133  * \class ROOT::TThreadedObject
134  * \brief A wrapper to make object instances thread private, lazily.
135  * \tparam T Class of the object to be made thread private (e.g. TH1F)
136  * \ingroup Multicore
137  *
138  * A wrapper which makes objects thread private. The methods of the underlying
139  * object can be invoked via the the arrow operator. The object is created in
140  * a specific thread lazily, i.e. upon invocation of one of its methods.
141  * The correct object pointer from within a particular thread can be accessed
142  * with the overloaded arrow operator or with the Get method.
143  * In case an elaborate thread management is in place, e.g. in presence of
144  * stream of operations or "processing slots", it is also possible to
145  * manually select the correct object pointer explicitly.
146  * The default size of the threaded objects is 64. This size can be extended
147  * manually via the fgMaxSlots parameter. The size of individual instances
148  * is automatically extended if the size of the implicit MT pool is bigger
149  * than 64.
150  *
151  */
152  template<class T>
153  class TThreadedObject {
154  public:
155  static unsigned fgMaxSlots; ///< The maximum number of processing slots (distinct threads) which the instances can manage
156  TThreadedObject(const TThreadedObject&) = delete;
157  /// Construct the TThreaded object and the "model" of the thread private
158  /// objects.
159  /// \tparam ARGS Arguments of the constructor of T
160  template<class ...ARGS>
161  TThreadedObject(ARGS&&... args)
162  {
163  const auto imtPoolSize = ROOT::GetImplicitMTPoolSize();
164  fMaxSlots = (64 > imtPoolSize) ? fgMaxSlots : imtPoolSize;
165  fObjPointers = std::vector<std::shared_ptr<T>>(fMaxSlots, nullptr);
166  fDirectories = Internal::TThreadedObjectUtils::DirCreator<T>::Create(fMaxSlots);
167 
168  TDirectory::TContext ctxt(fDirectories[0]);
169  fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(new T(std::forward<ARGS>(args)...)));
170  }
171 
172  /// Access a particular processing slot. This
173  /// method is *thread-unsafe*: it cannot be invoked from two different
174  /// threads with the same argument.
175  std::shared_ptr<T> GetAtSlot(unsigned i)
176  {
177  if ( i >= fObjPointers.size()) {
178  Warning("TThreadedObject::GetAtSlot", "Maximum number of slots reached.");
179  return nullptr;
180  }
181  auto objPointer = fObjPointers[i];
182  if (!objPointer) {
183  objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
184  fObjPointers[i] = objPointer;
185  }
186  return objPointer;
187  }
188 
189  /// Set the value of a particular slot.
190  void SetAtSlot(unsigned i, std::shared_ptr<T> v)
191  {
192  fObjPointers[i] = v;
193  }
194 
195  /// Access a particular slot which corresponds to a single thread.
196  /// This is in general faster than the GetAtSlot method but it is
197  /// responsibility of the caller to make sure that an object is
198  /// initialised for the particular slot.
199  std::shared_ptr<T> GetAtSlotUnchecked(unsigned i) const
200  {
201  return fObjPointers[i];
202  }
203 
204  /// Access a particular slot which corresponds to a single thread.
205  /// This overload is faster than the GetAtSlotUnchecked method but
206  /// the caller is responsible to make sure that an object is
207  /// initialised for the particular slot and that the returned pointer
208  /// will not outlive the TThreadedObject that returned it.
209  T* GetAtSlotRaw(unsigned i) const
210  {
211  return fObjPointers[i].get();
212  }
213 
214  /// Access the pointer corresponding to the current slot. This method is
215  /// not adequate for being called inside tight loops as it implies a
216  /// lookup in a mapping between the threadIDs and the slot indices.
217  /// A good practice consists in copying the pointer onto the stack and
218  /// proceed with the loop as shown in this work item (psudo-code) which
219  /// will be sent to different threads:
220  /// ~~~{.cpp}
221  /// auto workItem = [](){
222  /// auto objPtr = tthreadedObject.Get();
223  /// for (auto i : ROOT::TSeqI(1000)) {
224  /// // tthreadedObject->FastMethod(i); // don't do this! Inefficient!
225  /// objPtr->FastMethod(i);
226  /// }
227  /// }
228  /// ~~~
229  std::shared_ptr<T> Get()
230  {
231  return GetAtSlot(GetThisSlotNumber());
232  }
233 
234  /// Access the wrapped object and allow to call its methods.
235  T *operator->()
236  {
237  return Get().get();
238  }
239 
240  /// Merge all the thread private objects. Can be called once: it does not
241  /// create any new object but destroys the present bookkeping collapsing
242  /// all objects into the one at slot 0.
243  std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
244  {
245  // We do not return if we already merged.
246  if (fIsMerged) {
247  Warning("TThreadedObject::Merge", "This object was already merged. Returning the previous result.");
248  return fObjPointers[0];
249  }
250  mergeFunction(fObjPointers[0], fObjPointers);
251  fIsMerged = true;
252  return fObjPointers[0];
253  }
254 
255  /// Merge all the thread private objects. Can be called many times. It
256  /// does create a new instance of class T to represent the "Sum" object.
257  /// This method is not thread safe: correct or acceptable behaviours
258  /// depend on the nature of T and of the merging function.
259  std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
260  {
261  if (fIsMerged) {
262  Warning("TThreadedObject::SnapshotMerge", "This object was already merged. Returning the previous result.");
263  return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].get()));
264  }
265  auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
266  std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
267  mergeFunction(targetPtrShared, fObjPointers);
268  return std::unique_ptr<T>(targetPtr);
269  }
270 
271  private:
272  unsigned fMaxSlots; ///< The size of the instance
273  std::unique_ptr<T> fModel; ///< Use to store a "model" of the object
274  std::vector<std::shared_ptr<T>> fObjPointers; ///< A pointer per thread is kept.
275  std::vector<TDirectory*> fDirectories; ///< A TDirectory per thread is kept.
276  std::map<std::thread::id, unsigned> fThrIDSlotMap; ///< A mapping between the thread IDs and the slots
277  unsigned fCurrMaxSlotIndex = 0; ///< The maximum slot index
278  bool fIsMerged = false; ///< Remember if the objects have been merged already
279  ROOT::TSpinMutex fThrIDSlotMutex; ///< Mutex to protect the ID-slot map access
280 
281  /// Get the slot number for this threadID.
282  unsigned GetThisSlotNumber()
283  {
284  const auto thisThreadID = std::this_thread::get_id();
285  unsigned thisIndex;
286  {
287  std::lock_guard<ROOT::TSpinMutex> lg(fThrIDSlotMutex);
288  auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
289  if (thisSlotNumIt != fThrIDSlotMap.end()) return thisSlotNumIt->second;
290  thisIndex = fCurrMaxSlotIndex++;
291  fThrIDSlotMap[thisThreadID] = thisIndex;
292  }
293  return thisIndex;
294  }
295 
296  };
297 
298  template<class T> unsigned TThreadedObject<T>::fgMaxSlots = 64;
299 
300 } // End ROOT namespace
301 
302 #include <sstream>
303 
304 ////////////////////////////////////////////////////////////////////////////////
305 /// Print a TThreadedObject at the prompt:
306 
307 namespace cling {
308  template<class T>
309  std::string printValue(ROOT::TThreadedObject<T> *val)
310  {
311  auto model = ((std::unique_ptr<T>*)(val))->get();
312  std::ostringstream ret;
313  ret << "A wrapper to make object instances thread private, lazily. "
314  << "The model which is replicated is " << printValue(model);
315  return ret.str();
316  }
317 }
318 
319 
320 #endif