12 #ifndef ROOT_TThreadedObject
13 #define ROOT_TThreadedObject
35 namespace TThreadedObjectUtils {
38 inline unsigned GetTThreadedObjectIndex() {
39 static unsigned fgTThreadedObjectIndex = 0;
40 return fgTThreadedObjectIndex++;
43 template<typename T, bool ISHISTO = std::is_base_of<TH1,T>::value>
45 static T* Detach(T* obj) {
51 struct Detacher<T, true>{
52 static T* Detach(T* obj) {
53 obj->SetDirectory(
nullptr);
54 obj->ResetBit(kMustCleanup);
60 template<class T, bool isCopyConstructible = std::is_copy_constructible<T>::value>
62 static T *Clone(
const T *obj, TDirectory* d =
nullptr) {
65 TDirectory::TContext ctxt(d);
70 return Detacher<T>::Detach(clone);
75 struct Cloner<T, false> {
76 static T *Clone(
const T *obj, TDirectory* d =
nullptr) {
79 TDirectory::TContext ctxt(d);
80 clone = (T*)obj->Clone();
82 clone = (T*)obj->Clone();
88 template<class T, bool ISHISTO = std::is_base_of<TH1,T>::value>
90 static std::vector<TDirectory*> Create(
unsigned maxSlots) {
91 std::string dirName =
"__TThreaded_dir_";
92 dirName += std::to_string(ROOT::Internal::TThreadedObjectUtils::GetTThreadedObjectIndex()) +
"_";
93 std::vector<TDirectory*> dirs;
94 dirs.reserve(maxSlots);
95 for (
unsigned i=0; i< maxSlots;++i) {
96 auto dir = gROOT->mkdir((dirName+std::to_string(i)).c_str());
97 dirs.emplace_back(dir);
104 struct DirCreator<T, true>{
105 static std::vector<TDirectory*> Create(
unsigned maxSlots) {
106 std::vector<TDirectory*> dirs(maxSlots,
nullptr);
114 namespace TThreadedObjectUtils {
117 using MergeFunctionType = std::function<void(std::shared_ptr<T>, std::vector<std::shared_ptr<T>>&)>;
120 void MergeTObjects(std::shared_ptr<T> target, std::vector<std::shared_ptr<T>> &objs)
125 for (
auto obj : objs) {
126 if (obj && obj != target) objTList.Add(obj.get());
128 target->Merge(&objTList);
153 class TThreadedObject {
155 static unsigned fgMaxSlots;
156 TThreadedObject(
const TThreadedObject&) =
delete;
160 template<
class ...ARGS>
161 TThreadedObject(ARGS&&... args)
163 const auto imtPoolSize = ROOT::GetImplicitMTPoolSize();
164 fMaxSlots = (64 > imtPoolSize) ? fgMaxSlots : imtPoolSize;
165 fObjPointers = std::vector<std::shared_ptr<T>>(fMaxSlots,
nullptr);
166 fDirectories = Internal::TThreadedObjectUtils::DirCreator<T>::Create(fMaxSlots);
168 TDirectory::TContext ctxt(fDirectories[0]);
169 fModel.reset(Internal::TThreadedObjectUtils::Detacher<T>::Detach(
new T(std::forward<ARGS>(args)...)));
175 std::shared_ptr<T> GetAtSlot(
unsigned i)
177 if ( i >= fObjPointers.size()) {
178 Warning(
"TThreadedObject::GetAtSlot",
"Maximum number of slots reached.");
181 auto objPointer = fObjPointers[i];
183 objPointer.reset(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get(), fDirectories[i]));
184 fObjPointers[i] = objPointer;
190 void SetAtSlot(
unsigned i, std::shared_ptr<T> v)
199 std::shared_ptr<T> GetAtSlotUnchecked(
unsigned i)
const
201 return fObjPointers[i];
209 T* GetAtSlotRaw(
unsigned i)
const
211 return fObjPointers[i].get();
229 std::shared_ptr<T> Get()
231 return GetAtSlot(GetThisSlotNumber());
243 std::shared_ptr<T> Merge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
247 Warning(
"TThreadedObject::Merge",
"This object was already merged. Returning the previous result.");
248 return fObjPointers[0];
250 mergeFunction(fObjPointers[0], fObjPointers);
252 return fObjPointers[0];
259 std::unique_ptr<T> SnapshotMerge(TThreadedObjectUtils::MergeFunctionType<T> mergeFunction = TThreadedObjectUtils::MergeTObjects<T>)
262 Warning(
"TThreadedObject::SnapshotMerge",
"This object was already merged. Returning the previous result.");
263 return std::unique_ptr<T>(Internal::TThreadedObjectUtils::Cloner<T>::Clone(fObjPointers[0].
get()));
265 auto targetPtr = Internal::TThreadedObjectUtils::Cloner<T>::Clone(fModel.get());
266 std::shared_ptr<T> targetPtrShared(targetPtr, [](T *) {});
267 mergeFunction(targetPtrShared, fObjPointers);
268 return std::unique_ptr<T>(targetPtr);
273 std::unique_ptr<T> fModel;
274 std::vector<std::shared_ptr<T>> fObjPointers;
275 std::vector<TDirectory*> fDirectories;
276 std::map<std::thread::id, unsigned> fThrIDSlotMap;
277 unsigned fCurrMaxSlotIndex = 0;
278 bool fIsMerged =
false;
279 ROOT::TSpinMutex fThrIDSlotMutex;
282 unsigned GetThisSlotNumber()
284 const auto thisThreadID = std::this_thread::get_id();
287 std::lock_guard<ROOT::TSpinMutex> lg(fThrIDSlotMutex);
288 auto thisSlotNumIt = fThrIDSlotMap.find(thisThreadID);
289 if (thisSlotNumIt != fThrIDSlotMap.end())
return thisSlotNumIt->second;
290 thisIndex = fCurrMaxSlotIndex++;
291 fThrIDSlotMap[thisThreadID] = thisIndex;
298 template<
class T>
unsigned TThreadedObject<T>::fgMaxSlots = 64;
309 std::string printValue(ROOT::TThreadedObject<T> *val)
311 auto model = ((std::unique_ptr<T>*)(val))->
get();
312 std::ostringstream ret;
313 ret <<
"A wrapper to make object instances thread private, lazily. "
314 <<
"The model which is replicated is " << printValue(model);