42 TQueryResultManager::TQueryResultManager(
const char *qdir,
const char *stag,
44 TProofLockPath *lck, FILE *logfile)
55 fLogFile = (logfile) ? logfile : stdout;
62 TQueryResultManager::~TQueryResultManager()
65 SafeDelete(fPreviousQueries);
72 void TQueryResultManager::AddLogFile(TProofQueryResult *pq)
82 if ((lnow = lseek(fileno(fLogFile), (off_t) 0, SEEK_CUR)) < 0) {
83 Error(
"AddLogFile",
"problems lseeking current position on log file (errno: %d)", errno);
88 Int_t start = pq->fStartLog;
90 lseek(fileno(fLogFile), (off_t) start, SEEK_SET);
93 const Int_t kMAXBUF = 4096;
95 while (fgets(line,
sizeof(line), fLogFile)) {
96 if (line[strlen(line)-1] ==
'\n')
97 line[strlen(line)-1] = 0;
98 pq->AddLogLine((
const char *)line);
102 if (lnow >= 0) lseek(fileno(fLogFile), lnow, SEEK_SET);
107 Int_t TQueryResultManager::CleanupQueriesDir()
112 if (fPreviousQueries) {
113 fPreviousQueries->Delete();
114 SafeDelete(fPreviousQueries);
118 TString queriesdir = fQueryDir;
119 queriesdir = queriesdir.Remove(queriesdir.Index(kPROOF_QueryDir) +
120 strlen(kPROOF_QueryDir));
121 void *dirs = gSystem->OpenDirectory(queriesdir);
124 while ((sess = (
char *) gSystem->GetDirEntry(dirs))) {
127 if (strlen(sess) < 7 || strncmp(sess,
"session",7))
131 if (strstr(sess, fSessionTag))
136 qdir.Form(
"%s/%s", queriesdir.Data(), sess);
138 Info("RemoveQuery", "removing directory: %s", qdir.Data());
139 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
143 gSystem->FreeDirectory(dirs);
145 Warning(
"RemoveQuery",
"cannot open queries directory: %s", queriesdir.Data());
157 void TQueryResultManager::ScanPreviousQueries(
const char *dir)
160 if (fPreviousQueries) {
161 fPreviousQueries->Delete();
162 SafeDelete(fPreviousQueries);
166 void *dirs = gSystem->OpenDirectory(dir);
168 while ((sess = (
char *) gSystem->GetDirEntry(dirs))) {
171 if (strlen(sess) < 7 || strncmp(sess,
"session",7))
175 if (strstr(sess, fSessionTag))
179 void *dirq = gSystem->OpenDirectory(Form(
"%s/%s", dir, sess));
181 while ((qry = (
char *) gSystem->GetDirEntry(dirq))) {
188 TString fn = Form(
"%s/%s/%s/query-result.root", dir, sess, qry);
189 TFile *f = TFile::Open(fn);
192 TIter nxk(f->GetListOfKeys());
194 TProofQueryResult *pqr = 0;
195 while ((k = (TKey *)nxk())) {
196 if (!strcmp(k->GetClassName(),
"TProofQueryResult")) {
197 pqr = (TProofQueryResult *) f->Get(k->GetName());
199 TQueryResult *qr = pqr->CloneInfo();
201 if (!fPreviousQueries)
202 fPreviousQueries =
new TList;
203 if (qr->GetStatus() > TQueryResult::kRunning) {
204 fPreviousQueries->Add(qr);
208 TProofLockPath *lck = 0;
209 if (LockSession(qr->GetTitle(), &lck) == 0) {
216 Warning(
"ScanPreviousQueries",
"unable to clone TProofQueryResult '%s:%s'",
217 pqr->GetName(), pqr->GetTitle());
226 gSystem->FreeDirectory(dirq);
228 gSystem->FreeDirectory(dirs);
236 Int_t TQueryResultManager::ApplyMaxQueries(Int_t mxq)
243 TSortedList *sl =
new TSortedList;
246 THashList *hl =
new THashList;
250 TList *dl =
new TList;
254 TString dir = fQueryDir;
255 Int_t idx = dir.Index(
"session-");
258 void *dirs = gSystem->OpenDirectory(dir);
260 while ((sess = (
char *) gSystem->GetDirEntry(dirs))) {
263 if (strlen(sess) < 7 || strncmp(sess,
"session",7))
267 if (strstr(sess, fSessionTag))
272 void *dirq = gSystem->OpenDirectory(Form(
"%s/%s", dir.Data(), sess));
274 while ((qry = (
char *) gSystem->GetDirEntry(dirq))) {
281 TString fn = Form(
"%s/%s/%s/query-result.root", dir.Data(), sess, qry);
284 if (gSystem->GetPathInfo(fn, st)) {
286 Info("ApplyMaxQueries","file '%s' cannot be stated: remove it", fn.Data());
287 gSystem->Unlink(gSystem->DirName(fn));
292 sl->Add(new TObjString(TString::Format("%ld", st.fMtime)));
293 hl->Add(new TNamed((const
char*)TString::Format("%ld",st.fMtime), fn.Data()));
296 gSystem->FreeDirectory(dirq);
299 dl->Add(new TParameter<Int_t>(TString::Format("%s/%s", dir.Data(), sess), nq));
302 gSystem->Exec(TString::Format("%s -fr %s/%s", kRM, dir.Data(), sess));
304 gSystem->FreeDirectory(dirs);
307 TIter nxq(sl, kIterBackward);
310 while ((os = (TObjString *)nxq())) {
316 TNamed *nm =
dynamic_cast<TNamed *
>(hl->FindObject(os->GetName()));
318 gSystem->Unlink(nm->GetTitle());
320 TString tdir(gSystem->DirName(nm->GetTitle()));
321 tdir = gSystem->DirName(tdir.Data());
322 TParameter<Int_t> *nq =
dynamic_cast<TParameter<Int_t>*
>(dl->FindObject(tdir));
324 Int_t val = nq->GetVal();
326 if (nq->GetVal() <= 0)
328 gSystem->Exec(Form(
"%s -fr %s", kRM, tdir.Data()));
348 Int_t TQueryResultManager::LockSession(
const char *sessiontag, TProofLockPath **lck)
351 if (strstr(sessiontag, fSessionTag))
355 Error(
"LockSession",
"locker space undefined");
361 TString stag = sessiontag;
362 TRegexp re(
"session-.*-.*-.*-.*");
363 Int_t i1 = stag.Index(re);
365 Error(
"LockSession",
"bad format: %s", sessiontag);
368 stag.ReplaceAll(
"session-",
"");
371 Int_t i2 = stag.Index(
":q");
376 TString parlog = fSessionDir;
377 parlog = parlog.Remove(parlog.Index(
"master-")+strlen(
"master-"));
379 if (!gSystem->AccessPathName(parlog)) {
381 Info("LockSession", "parent still running: do nothing");
387 TString qlock = fLock->GetName();
388 qlock.ReplaceAll(fSessionTag, stag);
390 if (!gSystem->AccessPathName(qlock)) {
391 *lck =
new TProofLockPath(qlock);
392 if (((*lck)->Lock()) < 0) {
393 Error(
"LockSession",
"problems locking query lock file");
407 Int_t TQueryResultManager::CleanupSession(
const char *sessiontag)
410 Error(
"CleanupSession",
"session tag undefined");
415 TString qdir = fQueryDir;
416 qdir.ReplaceAll(Form(
"session-%s", fSessionTag.Data()), sessiontag);
417 Int_t idx = qdir.Index(
":q");
420 if (gSystem->AccessPathName(qdir)) {
421 Info(
"CleanupSession",
"query dir %s does not exist", qdir.Data());
425 TProofLockPath *lck = 0;
426 if (LockSession(sessiontag, &lck) == 0) {
429 gSystem->Exec(Form(
"%s %s", kRM, qdir.Data()));
433 gSystem->Unlink(lck->GetName());
442 Info(
"CleanupSession",
"could not lock session %s", sessiontag);
450 void TQueryResultManager::SaveQuery(TProofQueryResult *qr,
const char *fout)
452 if (!qr || qr->IsDraw())
456 TString querydir = Form(
"%s/%d",fQueryDir.Data(), qr->GetSeqNum());
459 if (gSystem->AccessPathName(querydir))
460 gSystem->MakeDirectory(querydir);
461 TString ofn = fout ? fout : Form(
"%s/query-result.root", querydir.Data());
464 TFile *f = TFile::Open(ofn,
"RECREATE");
467 if (!(qr->IsArchived()))
468 qr->SetResultFile(ofn);
479 void TQueryResultManager::RemoveQuery(
const char *queryref, TList *otherlist)
482 Info("RemoveQuery", "Enter");
487 TProofQueryResult *pqr = LocateQuery(queryref, qry, qdir);
491 fQueries->Remove(pqr);
492 if (otherlist) otherlist->Add(pqr);
494 fPreviousQueries->Remove(pqr);
501 Info("RemoveQuery", "removing directory: %s", qdir.Data());
502 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
512 void TQueryResultManager::RemoveQuery(TQueryResult *qr, Bool_t soft)
515 Info("RemoveQuery", "Enter");
521 TString qdir = fQueryDir;
522 qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
523 qdir = Form("%s/%s/%d", qdir.Data(), qr->GetTitle(), qr->GetSeqNum());
525 Info("RemoveQuery", "removing directory: %s", qdir.Data());
526 gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
530 TQueryResult *qrn = qr->CloneInfo();
531 Int_t idx = fQueries->IndexOf(qr);
533 fQueries->AddAt(qrn, idx);
537 fQueries->Remove(qr);
549 TProofQueryResult *TQueryResultManager::LocateQuery(TString queryref, Int_t &qry, TString &qdir)
551 TProofQueryResult *pqr = 0;
556 if (queryref.IsDigit()) {
557 qry = queryref.Atoi();
558 }
else if (queryref.Contains(fSessionTag)) {
559 Int_t i1 = queryref.Index(
":q");
561 queryref.Remove(0,i1+2);
562 qry = queryref.Atoi();
571 Info("LocateQuery", "local query: %d", qry);
576 while ((pqr = (TProofQueryResult *) nxq())) {
577 if (pqr->GetSeqNum() == qry) {
579 qdir = Form(
"%s/%d", fQueryDir.Data(), qry);
587 Info("LocateQuery", "previously processed query: %s", queryref.Data());
590 if (fPreviousQueries) {
591 TIter nxq(fPreviousQueries);
592 while ((pqr = (TProofQueryResult *) nxq())) {
593 if (queryref.Contains(pqr->GetTitle()) &&
594 queryref.Contains(pqr->GetName()))
599 queryref.ReplaceAll(
":q",
"/");
601 qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
602 qdir = Form(
"%s/%s", qdir.Data(), queryref.Data());
612 Bool_t TQueryResultManager::FinalizeQuery(TProofQueryResult *pq,
613 TProof *proof, TVirtualProofPlayer *player)
615 if (!pq || !proof || !player) {
616 Warning(
"FinalizeQuery",
"bad inputs: query = %p, proof = %p, player: %p ",
617 pq ? pq : 0, proof ? proof : 0, player ? player : 0);
621 Int_t qn = pq->GetSeqNum();
622 Long64_t np = player->GetEventsProcessed();
623 TVirtualProofPlayer::EExitStatus est = player->GetExitStatus();
624 TList *out = player->GetOutputList();
626 Float_t cpu = proof->GetCpuTime();
627 Long64_t bytes = proof->GetBytesRead();
629 TQueryResult::EQueryStatus st = TQueryResult::kAborted;
631 PDB(kGlobal, 2) Info("FinalizeQuery","query
#%d", qn);
634 Info("FinalizeQuery","%.1f %lld", cpu, bytes);
639 case TVirtualProofPlayer::kAborted:
641 Info("FinalizeQuery", "query %d has been ABORTED <====", qn);
645 case TVirtualProofPlayer::kStopped:
647 Info("FinalizeQuery",
648 "query %d has been STOPPED: %lld events processed", qn, np);
649 st = TQueryResult::kStopped;
651 case TVirtualProofPlayer::kFinished:
653 Info("FinalizeQuery",
654 "query %d has been completed: %lld events processed", qn, np);
655 st = TQueryResult::kCompleted;
658 Warning("FinalizeQuery",
659 "query %d: unknown exit status (%d)", qn, player->GetExitStatus());
666 Info("FinalizeQuery", "cpu: %.4f, saved: %.4f, master: %.4f",
667 cpu, pq->GetUsedCPU() ,GetCpuTime());
670 pq->SetProcessInfo(np, cpu - pq->GetUsedCPU());
671 pq->RecordEnd(st, out);
683 void TQueryResultManager::SaveQuery(TProofQueryResult *pq, Int_t mxq)
687 if (fQueries && fKeptQueries >= mxq) {
689 TQueryResult *fcom = 0;
690 TQueryResult *farc = 0;
692 TQueryResult *qr = 0;
693 while (fKeptQueries >= mxq) {
694 while ((qr = (TQueryResult *) nxq())) {
695 if (qr->IsArchived()) {
696 if (qr->GetOutputList() && !farc)
698 }
else if (qr->GetStatus() > TQueryResult::kRunning && !fcom) {
704 if (!farc && !fcom) {
707 RemoveQuery(farc, kTRUE);
717 if (fKeptQueries < mxq) {
722 emsg.Form(
"Too many saved queries (%d): cannot save %s:%s",
723 fKeptQueries, pq->GetTitle(), pq->GetName());
725 gProofServ->SendAsynMessage(emsg.Data());
727 Warning(
"SaveQuery",
"%s", emsg.Data());