12 #ifndef ROOT_TProofPlayer
13 #define ROOT_TProofPlayer
47 class TVirtualPacketizer;
60 class TProofPlayer :
public TVirtualProofPlayer {
70 TClass *fSelectorClass;
71 TTimer *fFeedbackTimer;
72 Long_t fFeedbackPeriod;
75 EExitStatus fExitStatus;
76 Long64_t fTotalEvents;
77 TProofProgressStatus *fProgressStatus;
79 Long64_t fReadBytesRun;
80 Long64_t fReadCallsRun;
81 Long64_t fProcessedRun;
85 TQueryResult *fPreviousQuery;
87 Int_t fMaxDrawQueries;
90 std::mutex fStopTimerMtx;
92 TTimer *fDispatchTimer;
94 TTimer *fProcTimeTimer;
95 TStopwatch *fProcTime;
97 TString fOutputFilePath;
99 Long_t fSaveMemThreshold;
100 Bool_t fSavePartialResults;
101 Bool_t fSaveResultsPerPacket;
103 static THashList *fgDrawInputPars;
105 void *GetSender() {
return this; }
107 virtual Int_t DrawCanvas(TObject *obj);
109 virtual void SetupFeedback();
111 virtual void MergeOutput(Bool_t savememvalues = kFALSE);
114 virtual void StopFeedback();
119 TProofPlayer *fPlayer;
121 TCleanup(TProofPlayer *p) : fPlayer(p) { }
122 ~TCleanup() { fPlayer->StopFeedback(); }
125 Int_t AssertSelector(
const char *selector_file);
126 Bool_t CheckMemUsage(Long64_t &mfreq, Bool_t &w80r, Bool_t &w80v, TString &wmsg);
128 void MapOutputListToDataMembers()
const;
131 enum EStatusBits { kDispatchOneEvent = BIT(15), kIsProcessing = BIT(16),
132 kMaxProcTimeReached = BIT(17), kMaxProcTimeExtended = BIT(18) };
134 TProofPlayer(TProof *proof = 0);
135 virtual ~TProofPlayer();
137 Long64_t Process(TDSet *set,
138 const char *selector, Option_t *option =
"",
139 Long64_t nentries = -1, Long64_t firstentry = 0);
140 Long64_t Process(TDSet *set,
141 TSelector *selector, Option_t *option =
"",
142 Long64_t nentries = -1, Long64_t firstentry = 0);
143 virtual Bool_t JoinProcess(TList *workers);
144 TVirtualPacketizer *GetPacketizer()
const {
return 0; }
145 Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
146 Long64_t Finalize(TQueryResult *qr);
147 Long64_t DrawSelect(TDSet *set,
const char *varexp,
148 const char *selection, Option_t *option =
"",
149 Long64_t nentries = -1, Long64_t firstentry = 0);
150 Int_t GetDrawArgs(
const char *var,
const char *sel, Option_t *opt,
151 TString &selector, TString &objname);
152 void HandleGetTreeHeader(TMessage *mess);
153 void HandleRecvHisto(TMessage *mess);
154 void FeedBackCanvas(
const char *name, Bool_t create);
156 void StopProcess(Bool_t abort, Int_t timeout = -1);
157 void AddInput(TObject *inp);
159 TObject *GetOutput(
const char *name)
const;
160 TList *GetOutputList()
const;
161 TList *GetInputList()
const {
return fInput; }
162 TList *GetListOfResults()
const {
return fQueryResults; }
163 void AddQueryResult(TQueryResult *q);
164 TQueryResult *GetCurrentQuery()
const {
return fQuery; }
165 TQueryResult *GetQueryResult(
const char *ref);
166 void RemoveQueryResult(
const char *ref);
167 void SetCurrentQuery(TQueryResult *q);
168 void SetMaxDrawQueries(Int_t max) { fMaxDrawQueries = max; }
169 void RestorePreviousQuery() { fQuery = fPreviousQuery; }
170 Int_t AddOutputObject(TObject *obj);
171 void AddOutput(TList *out);
172 void StoreOutput(TList *out);
173 void StoreFeedback(TObject *slave, TList *out);
174 void Progress(Long64_t total, Long64_t processed);
175 void Progress(TSlave *, Long64_t total, Long64_t processed)
176 { Progress(total, processed); }
177 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
178 Float_t initTime, Float_t procTime,
179 Float_t evtrti, Float_t mbrti);
180 void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
181 Float_t initTime, Float_t procTime,
182 Float_t evtrti, Float_t mbrti)
183 { Progress(total, processed, bytesread, initTime, procTime,
185 void Progress(TProofProgressInfo *pi);
186 void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); }
187 void Feedback(TList *objs);
189 TDrawFeedback *CreateDrawFeedback(TProof *p);
190 void SetDrawFeedbackOption(TDrawFeedback *f, Option_t *opt);
191 void DeleteDrawFeedback(TDrawFeedback *f);
193 TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
195 Int_t ReinitSelector(TQueryResult *qr);
197 void UpdateAutoBin(
const char *name,
198 Double_t& xmin, Double_t& xmax,
199 Double_t& ymin, Double_t& ymax,
200 Double_t& zmin, Double_t& zmax);
202 Bool_t IsClient()
const {
return kFALSE; }
204 void SetExitStatus(EExitStatus st) { fExitStatus = st; }
205 EExitStatus GetExitStatus()
const {
return fExitStatus; }
206 Long64_t GetEventsProcessed()
const {
return fProgressStatus->GetEntries(); }
207 void AddEventsProcessed(Long64_t ev) { fProgressStatus->IncEntries(ev); }
209 void SetDispatchTimer(Bool_t on = kTRUE);
210 void SetStopTimer(Bool_t on = kTRUE,
211 Bool_t abort = kFALSE, Int_t timeout = 0);
213 virtual void SetInitTime() { }
215 virtual void SetMerging(Bool_t = kTRUE) { }
217 Long64_t GetCacheSize();
218 Int_t GetLearnEntries();
220 void SetOutputFilePath(
const char *fp) { fOutputFilePath = fp; }
221 Int_t SavePartialResults(Bool_t queryend = kFALSE, Bool_t force = kFALSE);
223 void SetProcessing(Bool_t on = kTRUE);
224 TProofProgressStatus *GetProgressStatus()
const {
return fProgressStatus; }
226 void UpdateProgressInfo();
228 ClassDef(TProofPlayer,0)
234 class TProofPlayerLocal :
public TProofPlayer {
240 void SetupFeedback() { }
241 void StopFeedback() { }
244 TProofPlayerLocal(Bool_t client = kTRUE) : fIsClient(client) { }
245 virtual ~TProofPlayerLocal() { }
247 Bool_t IsClient()
const {
return fIsClient; }
248 Long64_t Process(
const char *selector, Long64_t nentries = -1, Option_t *option =
"");
249 Long64_t Process(TSelector *selector, Long64_t nentries = -1, Option_t *option =
"");
250 Long64_t Process(TDSet *set,
251 const char *selector, Option_t *option =
"",
252 Long64_t nentries = -1, Long64_t firstentry = 0) {
253 return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
254 Long64_t Process(TDSet *set,
255 TSelector *selector, Option_t *option =
"",
256 Long64_t nentries = -1, Long64_t firstentry = 0) {
257 return TProofPlayer::Process(set, selector, option, nentries, firstentry); }
258 ClassDef(TProofPlayerLocal,0)
279 class TProofPlayerRemote :
public TProofPlayer {
285 TList *fFeedbackLists;
286 TVirtualPacketizer *fPacketizer;
289 ErrorHandlerFunc_t fErrorHandler;
290 Bool_t fMergeTH1OneByOne;
292 TMessage *fProcessMessage;
293 TString fSelectorFileName;
295 TStopwatch *fMergeSTW;
298 virtual Bool_t HandleTimer(TTimer *timer);
299 Int_t InitPacketizer(TDSet *dset, Long64_t nentries,
300 Long64_t first,
const char *defpackunit,
301 const char *defpackdata);
302 TList *MergeFeedback();
303 Bool_t MergeOutputFiles();
304 void NotifyMemory(TObject *obj);
305 void SetLastMergingMsg(TObject *obj);
306 virtual Bool_t SendSelector(
const char *selector_file);
307 TProof *GetProof()
const {
return fProof; }
308 void SetupFeedback();
310 void SetSelectorDataMembersFromOutputList();
313 TProofPlayerRemote(TProof *proof = 0) : fProof(proof), fOutputLists(0), fFeedback(0),
314 fFeedbackLists(0), fPacketizer(0),
315 fMergeFiles(kFALSE), fDSet(0), fErrorHandler(0),
316 fMergeTH1OneByOne(kTRUE), fProcPackets(0),
317 fProcessMessage(0), fMergeSTW(0), fNumMergers(0)
318 { fProgressStatus =
new TProofProgressStatus(); }
319 virtual ~TProofPlayerRemote();
320 virtual Long64_t Process(TDSet *set,
const char *selector,
321 Option_t *option =
"", Long64_t nentries = -1,
322 Long64_t firstentry = 0);
323 virtual Long64_t Process(TDSet *set, TSelector *selector,
324 Option_t *option =
"", Long64_t nentries = -1,
325 Long64_t firstentry = 0);
326 virtual Bool_t JoinProcess(TList *workers);
327 virtual Long64_t Finalize(Bool_t force = kFALSE, Bool_t sync = kFALSE);
328 virtual Long64_t Finalize(TQueryResult *qr);
329 Long64_t DrawSelect(TDSet *set,
const char *varexp,
330 const char *selection, Option_t *option =
"",
331 Long64_t nentries = -1, Long64_t firstentry = 0);
333 void RedirectOutput(Bool_t on = kTRUE);
334 void StopProcess(Bool_t abort, Int_t timeout = -1);
335 void StoreOutput(TList *out);
336 virtual void StoreFeedback(TObject *slave, TList *out);
337 Int_t Incorporate(TObject *obj, TList *out, Bool_t &merged);
338 TObject *HandleHistogram(TObject *obj, Bool_t &merged);
339 Bool_t HistoSameAxis(TH1 *h0, TH1 *h1);
340 Int_t AddOutputObject(TObject *obj);
341 void AddOutput(TList *out);
342 virtual void MergeOutput(Bool_t savememvalues = kFALSE);
343 void Progress(Long64_t total, Long64_t processed);
344 void Progress(TSlave*, Long64_t total, Long64_t processed)
345 { Progress(total, processed); }
346 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
347 Float_t initTime, Float_t procTime,
348 Float_t evtrti, Float_t mbrti);
349 void Progress(TSlave *, Long64_t total, Long64_t processed, Long64_t bytesread,
350 Float_t initTime, Float_t procTime,
351 Float_t evtrti, Float_t mbrti)
352 { Progress(total, processed, bytesread, initTime, procTime,
354 void Progress(TProofProgressInfo *pi);
355 void Progress(TSlave *, TProofProgressInfo *pi) { Progress(pi); }
356 void Feedback(TList *objs);
357 TDSetElement *GetNextPacket(TSlave *slave, TMessage *r);
358 TVirtualPacketizer *GetPacketizer()
const {
return fPacketizer; }
360 Bool_t IsClient()
const;
364 void SetMerging(Bool_t on = kTRUE);
366 ClassDef(TProofPlayerRemote,0)
372 class TProofPlayerSlave :
public TProofPlayer {
378 Bool_t HandleTimer(TTimer *timer);
381 void SetupFeedback();
385 TProofPlayerSlave(TSocket *socket = 0) : fSocket(socket), fFeedback(0) { }
387 void HandleGetTreeHeader(TMessage *mess);
389 ClassDef(TProofPlayerSlave,0)
395 class TProofPlayerSuperMaster :
public TProofPlayerRemote {
398 TArrayL64 fSlaveProgress;
399 TArrayL64 fSlaveTotals;
400 TArrayL64 fSlaveBytesRead;
401 TArrayF fSlaveInitTime;
402 TArrayF fSlaveProcTime;
403 TArrayF fSlaveEvtRti;
409 Bool_t fReturnFeedback;
412 Bool_t HandleTimer(TTimer *timer);
413 void SetupFeedback();
416 TProofPlayerSuperMaster(TProof *proof = 0) :
417 TProofPlayerRemote(proof), fReturnFeedback(kFALSE) { }
418 virtual ~TProofPlayerSuperMaster() { }
420 Long64_t Process(TDSet *set,
const char *selector,
421 Option_t *option =
"", Long64_t nentries = -1,
422 Long64_t firstentry = 0);
423 Long64_t Process(TDSet *set, TSelector *selector,
424 Option_t *option =
"", Long64_t nentries = -1,
425 Long64_t firstentry = 0)
426 {
return TProofPlayerRemote::Process(set, selector, option,
427 nentries, firstentry); }
428 void Progress(Long64_t total, Long64_t processed)
429 { TProofPlayerRemote::Progress(total, processed); }
430 void Progress(Long64_t total, Long64_t processed, Long64_t bytesread,
431 Float_t initTime, Float_t procTime,
432 Float_t evtrti, Float_t mbrti)
433 { TProofPlayerRemote::Progress(total, processed, bytesread,
434 initTime, procTime, evtrti, mbrti); }
435 void Progress(TProofProgressInfo *pi) { TProofPlayerRemote::Progress(pi); }
436 void Progress(TSlave *sl, Long64_t total, Long64_t processed);
437 void Progress(TSlave *sl, Long64_t total, Long64_t processed, Long64_t bytesread,
438 Float_t initTime, Float_t procTime,
439 Float_t evtrti, Float_t mbrti);
440 void Progress(TSlave *sl, TProofProgressInfo *pi);
442 ClassDef(TProofPlayerSuperMaster,0)