60 ClassImp(TVirtualPacketizer);
65 TVirtualPacketizer::TVirtualPacketizer(TList *input, TProofProgressStatus *st)
70 Double_t minPacketTime = 0;
71 if (TProof::GetParameter(input,
"PROOF_MinPacketTime", minPacketTime) == 0) {
72 Info(
"TVirtualPacketizer",
"setting minimum time for a packet to %f",
74 fMinPacketTime = (Int_t) minPacketTime;
77 Double_t maxPacketTime = 0;
78 if (TProof::GetParameter(input,
"PROOF_MaxPacketTime", maxPacketTime) == 0) {
79 Info(
"TVirtualPacketizer",
"setting maximum packet time for a packet to %f",
81 fMaxPacketTime = (Int_t) maxPacketTime;
83 ResetBit(TVirtualPacketizer::kIsTree);
87 fConfigParams =
new TList;
88 fConfigParams->SetName(
"PROOF_PacketizerConfigParams");
89 fConfigParams->Add(
new TParameter<Double_t>(
"PROOF_MinPacketTime", fMinPacketTime));
90 fConfigParams->Add(
new TParameter<Double_t>(
"PROOF_MaxPacketTime", fMaxPacketTime));
93 if (!fProgressStatus) {
94 Error(
"TVirtualPacketizer",
"No progress status");
105 fStartTime = gSystem->Now();
106 SetBit(TVirtualPacketizer::kIsInitializing);
107 ResetBit(TVirtualPacketizer::kIsDone);
113 fCircProg =
new TNtupleD(
"CircNtuple",
"Circular progress info",
"tm:ev:mb:rc:al");
115 TProof::GetParameter(input,
"PROOF_ProgressCircularity", fCircN);
116 fCircProg->SetCircular(fCircN);
117 fCircProg->SetDirectory(0);
122 TString startProgress(
"yes");
123 TProof::GetParameter(input,
"PROOF_StartProgressTimer", startProgress);
125 if (gProofServ && gProofServ->IsMaster() && !gProofServ->IsTopMaster()) startProgress =
"no";
131 if (startProgress ==
"yes") {
133 TProof::GetParameter(input,
"PROOF_ProgressPeriod", period);
134 fProgress =
new TTimer;
135 fProgress->SetObject(
this);
136 fProgress->Start(period, kFALSE);
141 TString saveProgressPerf(
"no");
142 if (TProof::GetParameter(input,
"PROOF_SaveProgressPerf", saveProgressPerf) == 0) {
143 if (fProgress && saveProgressPerf ==
"yes")
144 fProgressPerf =
new TNtuple(
"PROOF_ProgressPerfNtuple",
145 "{Active workers, evt rate, MB read} vs processing time",
"tm:aw:er:mb:ns");
152 fAWLastFill = kFALSE;
157 if (TProof::GetParameter(input,
"PROOF_RateEstimation", estopt) != 0 ||
160 estopt = gEnv->GetValue(
"Proof.RateEstimation",
"");
162 fUseEstOpt = kEstOff;
163 if (estopt ==
"current")
164 fUseEstOpt = kEstCurrent;
165 else if (estopt ==
"average")
166 fUseEstOpt = kEstAverage;
172 TVirtualPacketizer::~TVirtualPacketizer()
174 SafeDelete(fCircProg);
175 SafeDelete(fProgress);
176 SafeDelete(fFailedPackets);
177 SafeDelete(fConfigParams);
178 SafeDelete(fProgressPerf);
185 Long64_t TVirtualPacketizer::GetEntries(Bool_t tree, TDSetElement *e)
188 TFile *file = TFile::Open(e->GetFileName());
190 if (!file || (file && file->IsZombie())) {
191 const char *emsg = (file) ? strerror(file->GetErrno()) :
"<undef>";
192 Error(
"GetEntries",
"Cannot open file: %s (%s)", e->GetFileName(), emsg);
196 TDirectory *dirsave = gDirectory;
197 if ( ! file->cd(e->GetDirectory()) ) {
198 Error(
"GetEntries",
"Cannot cd to: %s", e->GetDirectory() );
202 TDirectory *dir = gDirectory;
206 TKey *key = dir->GetKey(e->GetObjName());
208 Error(
"GetEntries",
"Cannot find tree \"%s\" in %s",
209 e->GetObjName(), e->GetFileName() );
213 TTree *t = (TTree *) key->ReadObj();
219 entries = (Long64_t) t->GetEntries();
223 TList *keys = dir->GetListOfKeys();
224 entries = keys->GetSize();
235 TDSetElement *TVirtualPacketizer::GetNextPacket(TSlave *, TMessage *)
237 AbstractMethod(
"GetNextPacket");
244 void TVirtualPacketizer::StopProcess(Bool_t , Bool_t stoptimer)
247 if (stoptimer) HandleTimer(0);
255 TDSetElement* TVirtualPacketizer::CreateNewPacket(TDSetElement* base,
256 Long64_t first, Long64_t num)
258 TDSetElement* elem =
new TDSetElement(base->GetFileName(), base->GetObjName(),
259 base->GetDirectory(), first, num,
263 TList *friends = base->GetListOfFriends();
266 TDSetElement *fe = 0;
267 while ((fe = (TDSetElement *) nxf())) {
269 Info("CreateNewPacket", "friend: file '%s', obj:'%s'",
270 fe->GetFileName(), fe->GetObjName());
271 TDSetElement *xfe = new TDSetElement(fe->GetFileName(), fe->GetObjName(),
272 fe->GetDirectory(), first, num);
274 elem->AddFriend(xfe, 0);
284 Bool_t TVirtualPacketizer::HandleTimer(TTimer *)
287 Info("HandleTimer", "fProgress: %p, isDone: %d",
288 fProgress, TestBit(TVirtualPacketizer::kIsDone));
290 if (fProgress == 0 || TestBit(TVirtualPacketizer::kIsDone)) {
292 if (fProgress) fProgress->Stop();
297 TTime tnow = gSystem->Now();
298 Float_t now = Long64_t(tnow - fStartTime) / (Float_t)1000.;
299 Long64_t estent = GetEntriesProcessed();
300 Long64_t estmb = GetBytesRead();
301 Long64_t estrc = GetReadCalls();
304 Float_t evtrti = -1., mbrti = -1.;
305 if (TestBit(TVirtualPacketizer::kIsInitializing)) {
310 if (fCircProg->GetEntries() <= 0) {
311 fCircProg->Fill((Double_t)0., 0., 0., 0., 0.);
314 fTimeUpdt = now - fProcTime;
316 fProcTime = now - fInitTime;
318 Double_t *ar = fCircProg->GetArgs();
319 fCircProg->GetEntry(fCircProg->GetEntries()-1);
322 evtrti = GetCurrentRate(all);
323 Double_t xall = (all) ? 1. : 0.;
324 GetEstEntriesProcessed(0, estent, estmb, estrc);
325 if (estent >= fTotalEntries) {
326 estent = GetEntriesProcessed();
327 estmb = GetBytesRead();
328 estrc = GetReadCalls();
331 Double_t evts = (Double_t) estent;
332 Double_t mbs = (estmb > 0) ? estmb / TMath::Power(2.,20.) : 0.;
333 Double_t rcs = (Double_t) estrc;
334 fCircProg->Fill((Double_t)fProcTime, evts, mbs, rcs, xall);
335 fCircProg->GetEntry(fCircProg->GetEntries()-2);
337 Double_t dt = (Double_t)fProcTime - ar[0];
338 Long64_t de = (evts > ar[1]) ? (Long64_t) (evts - ar[1]) : 0;
339 Long64_t db = (mbs > ar[2]) ? (Long64_t) ((mbs - ar[2])*TMath::Power(2.,20.)) : 0;
341 gPerfStats->RateEvent((Double_t)fProcTime, dt, de, db);
343 Double_t rc = (Double_t)estrc - ar[3];
344 mbrti = (rc > 0 && mbs > ar[2]) ? (Float_t) (mbs - ar[2]) / rc : 0. ;
347 if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
348 SetBit(TVirtualPacketizer::kIsDone);
350 Info("HandleTimer", "ent:%lld, bytes:%lld, proct:%f, evtrti:%f, mbrti:%f (%f,%f)",
351 estent, estmb, fProcTime, evtrti, mbrti, mbs, ar[2]);
356 TMessage m(kPROOF_PROGRESS);
357 if (gProofServ->GetProtocol() > 25) {
358 Int_t actw = GetActiveWorkers();
359 Int_t acts = gProofServ->GetActSessions();
360 Float_t effs = gProofServ->GetEffSessions();
361 if (fProgressPerf && estent > 0) {
363 if (fProcTime > 0.) {
364 fReportPeriod = (Float_t) fTotalEntries / (Double_t) estent * fProcTime / 100.;
365 if (fReportPeriod > 0. && fReportPeriod < 5.) fReportPeriod = 5.;
368 if (fProgressPerf->GetEntries() <= 0) {
370 fProgressPerf->Fill(fProcTime, (Float_t)actw, -1., -1., -1.);
373 Float_t *far = fProgressPerf->GetArgs();
374 fProgressPerf->GetEntry(fProgressPerf->GetEntries()-1);
375 Bool_t doReport = (fReportPeriod > 0. &&
376 (fProcTime - far[0]) >= fReportPeriod) ? kTRUE : kFALSE;
377 Float_t mbsread = estmb / 1024. / 1024.;
378 if (TMath::Abs((Float_t)actw - far[1]) > 0.1) {
380 fProgressPerf->Fill(fProcTimeLast, (Float_t)fActWrksLast,
381 fEvtRateLast, fMBsReadLast, fEffSessLast);
382 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
383 fAWLastFill = kFALSE;
384 }
else if (doReport) {
385 fProgressPerf->Fill(fProcTime, (Float_t)actw, evtrti, mbsread, effs);
386 fAWLastFill = kFALSE;
390 fProcTimeLast = fProcTime;
392 fEvtRateLast = evtrti;
393 fMBsReadLast = mbsread;
398 TProofProgressInfo pi(fTotalEntries, estent, estmb, fInitTime,
399 fProcTime, evtrti, mbrti, actw, acts, effs);
401 }
else if (gProofServ->GetProtocol() > 11) {
403 m << fTotalEntries << estent << estmb << fInitTime << fProcTime
407 m << fTotalEntries << GetEntriesProcessed();
410 gProofServ->GetSocket()->Send(m);
413 if (gProof && gProof->GetPlayer()) {
415 gProof->GetPlayer()->Progress(fTotalEntries, estent, estmb,
416 fInitTime, fProcTime, evtrti, mbrti);
421 if (fTotalEntries > 0 && GetEntriesProcessed() >= fTotalEntries)
422 SetBit(TVirtualPacketizer::kIsDone);
430 void TVirtualPacketizer::SetInitTime()
432 if (TestBit(TVirtualPacketizer::kIsInitializing)) {
433 fInitTime = Long64_t(gSystem->Now() - fStartTime) / (Float_t)1000.;
434 ResetBit(TVirtualPacketizer::kIsInitializing);
436 Info("SetInitTime","fInitTime set to %f s", fInitTime);
444 Int_t TVirtualPacketizer::AddWorkers(TList *)
446 Warning(
"AddWorkers",
"Not implemented for this packetizer");