45 Int_t TProofPlayerLite::MakeSelector(
const char *selfile)
48 SafeDelete(fSelector);
49 if (!selfile || strlen(selfile) <= 0) {
50 Error(
"MakeSelector",
"input file path or name undefined");
55 if (!strchr(gSystem->BaseName(selfile),
'.')) {
57 Info(
"MakeSelector",
"selector name '%s' does not contain a '.':"
58 " no file to check, it will be loaded from a library", selfile);
59 if (!(fSelector = TSelector::GetSelector(selfile))) {
60 Error(
"MakeSelector",
"could not create a %s selector", selfile);
67 if (((TProofLite*)fProof)->CopyMacroToCache(selfile, 1, &fSelector, TProof::kCp | TProof::kCpBin) < 0)
80 Long64_t TProofPlayerLite::Process(TDSet *dset, TSelector *selector,
81 Option_t *option, Long64_t nentries,
85 Error(
"Process",
"selector object undefined");
90 if (selector != fSelector) {
91 SafeDelete(fSelector);
95 fCreateSelObj = kFALSE;
96 Long64_t rc = Process(dset, selector->ClassName(), option, nentries, first);
97 fCreateSelObj = kTRUE;
109 Long64_t TProofPlayerLite::Process(TDSet *dset,
const char *selector_file,
110 Option_t *option, Long64_t nentries,
113 PDB(kGlobal,1) Info("Process","Enter");
115 fExitStatus = kFinished;
117 if (!fProgressStatus) {
118 Error(
"Process",
"No progress status");
121 fProgressStatus->Reset();
125 fOutput =
new THashList;
129 TPerfStats::Setup(fInput);
130 TPerfStats::Start(fInput, fOutput);
134 TMessage mesg(kPROOF_PROCESS);
135 TString fn(gSystem->BaseName(selector_file));
138 Bool_t sync = (fProof->GetQueryMode(option) == TProof::kSync);
142 fOutputLists->Delete();
148 gSystem->RedirectOutput(fProof->fLogFileName);
150 Info(
"Process",
"starting new query");
154 if (MakeSelector(selector_file) != 0) {
156 gSystem->RedirectOutput(0);
161 fSelectorClass = fSelector->IsA();
164 if (!fCreateSelObj) {
167 if (fSelector->GetInputList() && fSelector->GetInputList()->GetSize() > 0) {
168 TIter nxi(fSelector->GetInputList());
170 while ((o = nxi())) {
171 if (!fInput->FindObject(o)) {
174 inputtmp =
new TList;
175 inputtmp->SetOwner(kFALSE);
181 fInput->Add(fSelector);
184 fSelector->SetInputList(fInput);
185 fSelector->SetOption(option);
186 if (fSelector->GetOutputList()) fSelector->GetOutputList()->Clear();
188 PDB(kLoop,1) Info("Process","Call Begin(0)");
192 gProof->SendInputDataFile();
195 if (fInput->FindObject("PROOF_StatsHist") != 0) {
196 if (!(fProcPackets = (TH1 *) fOutput->FindObject(
"PROOF_ProcPcktHist"))) {
197 Warning(
"Process",
"could not attach to histogram 'PROOF_ProcPcktHist'");
200 Info("Process", "attached to histogram 'PROOF_ProcPcktHist' to record"
201 " packets being processed");
205 PDB(kPacketizer,1) Info("Process","Create Proxy TDSet");
206 TDSet *set = new TDSetProxy(dset->GetType(), dset->GetObjName(),
207 dset->GetDirectory());
208 if (dset->TestBit(TDSet::kEmpty))
209 set->SetBit(TDSet::kEmpty);
210 fProof->SetParameter("PROOF_MaxSlavesPerNode", (Long_t) 0);
211 if (InitPacketizer(dset, nentries, first, "TPacketizerUnit", "TPacketizer") != 0) {
212 Error(
"Process",
"cannot init the packetizer");
213 fExitStatus = kAborted;
223 Long64_t memlogfreq = -1, mlf;
224 if ((mrc = TProof::GetParameter(fProof->GetInputList(),
"PROOF_MemLogFreq", mlf)) == 0) memlogfreq = mlf;
225 if (mrc != 0 && gSystem->Getenv(
"PROOF_MEMLOGFREQ")) {
226 TString clf(gSystem->Getenv(
"PROOF_MEMLOGFREQ"));
227 if (clf.IsDigit()) { memlogfreq = clf.Atoi(); mrc = 0; }
229 if (memlogfreq == 0) {
230 memlogfreq = fPacketizer->GetTotalEntries()/(fProof->GetParallel()*100);
231 if (memlogfreq <= 0) memlogfreq = 1;
233 if (mrc == 0) fProof->SetParameter(
"PROOF_MemLogFreq", memlogfreq);
237 fProof->SetParameter(
"PROOF_QueryTag", fProof->GetName());
239 fProof->SetParameter(
"PROOF_QuerySeqNum", fProof->fSeqNum);
242 gSystem->RedirectOutput(0);
244 TCleanup clean(
this);
247 TString opt = option;
250 Long64_t num = (fProof->IsParallel()) ? -1 : nentries;
251 Long64_t fst = (fProof->IsParallel()) ? -1 : first;
254 TEntryList *enl = (!fProof->IsMaster()) ? dynamic_cast<TEntryList *>(set->GetEntryList())
256 TEventList *evl = (!fProof->IsMaster() && !enl) ? dynamic_cast<TEventList *>(set->GetEntryList())
259 fProof->ResetMergePrg();
262 PDB(kGlobal,1) Info("Process","Calling Broadcast");
263 if (fProcessMessage) delete fProcessMessage;
264 fProcessMessage = new TMessage(kPROOF_PROCESS);
265 mesg << set << fn << fInput << opt << num << fst << evl << sync << enl;
266 (*fProcessMessage) << set << fn << fInput << opt << num << fst << evl << sync << enl;
267 Int_t nb = fProof->Broadcast(mesg);
268 PDB(kGlobal,1) Info("Process", "Broadcast called: %d workers notified", nb);
269 fProof->fNotIdle += nb;
272 fProof->fRedirLog = kTRUE;
278 PDB(kGlobal,1) Info("Process","Asynchronous processing:"
279 " activating CollectInputFrom");
283 return fProof->fSeqNum;
288 PDB(kGlobal,1) Info("Process","Synchronous processing: calling Collect");
293 fProof->fRedirLog = kFALSE;
295 if (!TSelector::IsStandardDraw(fn))
298 fPacketizer->StopProcess(kFALSE, kTRUE);
300 fPacketizer->SetBit(TVirtualPacketizer::kIsDone);
304 fQuery->SetProcessInfo(0, 0., fPacketizer->GetBytesRead(),
305 fPacketizer->GetInitTime(),
311 if (GetExitStatus() != TProofPlayer::kAborted)
312 rc = Finalize(kFALSE, sync);
318 while ((o = nxi())) fInput->Remove(o);
319 SafeDelete(inputtmp);
331 Long64_t TProofPlayerLite::Finalize(Bool_t force, Bool_t sync)
333 if (fOutputLists == 0) {
335 return fProof->Finalize(Form(
"%s:%s", fQuery->GetTitle(),
336 fQuery->GetName()), force);
344 Info(
"Finalize",
"query is undefined!");
352 if (fExitStatus != kAborted) {
359 if (ReinitSelector(fQuery) == -1) {
360 Info(
"Finalize",
"problems reinitializing selector \"%s\"",
361 fQuery->GetSelecImp()->GetName());
367 fSelector->SetInputList(fInput);
369 TList *output = fSelector->GetOutputList();
372 while(TObject* obj = next()) {
373 if (fProof->IsParallel() || DrawCanvas(obj) == 1)
379 Warning(
"Finalize",
"undefined output list in the selector! Protocol error?");
382 SetSelectorDataMembersFromOutputList();
384 PDB(kLoop,1) Info("Finalize","Call Terminate()");
385 fOutput->Clear("nodelete");
389 fProof->fQuerySTW.Reset();
391 fSelector->Terminate();
393 rv = fSelector->GetStatus();
397 while(TObject* o = it()) {
403 fQuery->SetOutputList(fOutput);
405 fQuery->SetFinalized();
407 Warning(
"Finalize",
"current TQueryResult object is undefined!");
410 if (!fCreateSelObj) {
411 fInput->Remove(fSelector);
412 fOutput->Remove(fSelector);
413 if (output) output->Remove(fSelector);
420 if (output) output->SetOwner(kFALSE);
421 SafeDelete(fSelector);
425 fOutput->SetOwner(kFALSE);
431 SafeDelete(fSelector);
432 if (!fCreateSelObj) fSelector = 0;
435 PDB(kGlobal,1) Info("Finalize","exit");
442 Bool_t TProofPlayerLite::HandleTimer(TTimer *)
445 Info("HandleTimer","Entry: %p", fFeedbackTimer);
447 if (fFeedbackTimer == 0) return kFALSE;
452 TList *fb = new TList;
455 TIter next(fFeedback);
456 while( TObjString *name = (TObjString*) next() ) {
457 TObject *o = fOutput->FindObject(name->GetName());
458 if (o != 0) fb->Add(o->Clone());
461 if (fb->GetSize() > 0)
462 StoreFeedback(
this, fb);
466 if (fFeedbackLists == 0) {
467 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
471 fb = MergeFeedback();
477 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
485 void TProofPlayerLite::SetupFeedback()
487 fFeedback = (TList*) fInput->FindObject(
"FeedbackList");
491 Info("SetupFeedback","\"FeedbackList\" found: %d objects", fFeedback->GetSize());
494 Info("SetupFeedback","\"FeedbackList\" NOT found");
497 if (fFeedback == 0 || fFeedback->GetSize() == 0) return;
500 SafeDelete(fFeedbackTimer);
501 fFeedbackPeriod = 2000;
502 TProof::GetParameter(fInput, "PROOF_FeedbackPeriod", fFeedbackPeriod);
503 fFeedbackTimer = new TTimer;
504 fFeedbackTimer->SetObject(this);
505 fFeedbackTimer->Start(fFeedbackPeriod, kTRUE);
511 void TProofPlayerLite::StoreFeedback(TObject *slave, TList *out)
514 Info("StoreFeedback","Enter (%p,%p,%d)", fFeedbackLists, out, (out ? out->GetSize() : -1));
518 Info("StoreFeedback","Leave (empty)");
522 if (fFeedbackLists == 0) {
523 PDB(kFeedback,2) Info("StoreFeedback","Create fFeedbackLists");
524 fFeedbackLists = new TList;
525 fFeedbackLists->SetOwner();
529 out->SetOwner(kFALSE);
532 while( (obj = next()) ) {
534 Info("StoreFeedback","Find '%s'", obj->GetName() );
536 TMap *map = (TMap*) fFeedbackLists->FindObject(obj->GetName());
539 Info("StoreFeedback", "map for '%s' not found (creating)", obj->GetName());
542 map->SetName(obj->GetName());
543 fFeedbackLists->Add(map);
546 Info("StoreFeedback","removing previous value");
547 if (map->GetValue(slave))
548 delete map->GetValue(slave);
551 map->Add(slave, obj);
556 Info("StoreFeedback","Leave");