42 TQueryResultManager::TQueryResultManager(
const char *qdir, 
const char *stag,
 
   44                                          TProofLockPath *lck, FILE *logfile)
 
   55    fLogFile         = (logfile) ? logfile : stdout;
 
   62 TQueryResultManager::~TQueryResultManager()
 
   65    SafeDelete(fPreviousQueries);
 
   72 void TQueryResultManager::AddLogFile(TProofQueryResult *pq)
 
   82    if ((lnow = lseek(fileno(fLogFile), (off_t) 0, SEEK_CUR)) < 0) {
 
   83       Error(
"AddLogFile", 
"problems lseeking current position on log file (errno: %d)", errno);
 
   88    Int_t start = pq->fStartLog;
 
   90       lseek(fileno(fLogFile), (off_t) start, SEEK_SET);
 
   93    const Int_t kMAXBUF = 4096;
 
   95    while (fgets(line, 
sizeof(line), fLogFile)) {
 
   96       if (line[strlen(line)-1] == 
'\n')
 
   97          line[strlen(line)-1] = 0;
 
   98       pq->AddLogLine((
const char *)line);
 
  102    if (lnow >= 0) lseek(fileno(fLogFile), lnow, SEEK_SET);
 
  107 Int_t TQueryResultManager::CleanupQueriesDir()
 
  112    if (fPreviousQueries) {
 
  113       fPreviousQueries->Delete();
 
  114       SafeDelete(fPreviousQueries);
 
  118    TString queriesdir = fQueryDir;
 
  119    queriesdir = queriesdir.Remove(queriesdir.Index(kPROOF_QueryDir) +
 
  120                                   strlen(kPROOF_QueryDir));
 
  121    void *dirs = gSystem->OpenDirectory(queriesdir);
 
  124       while ((sess = (
char *) gSystem->GetDirEntry(dirs))) {
 
  127          if (strlen(sess) < 7 || strncmp(sess,
"session",7))
 
  131          if (strstr(sess, fSessionTag))
 
  136          qdir.Form(
"%s/%s", queriesdir.Data(), sess);
 
  138             Info("RemoveQuery", "removing directory: %s", qdir.Data());
 
  139          gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
 
  143       gSystem->FreeDirectory(dirs);
 
  145       Warning(
"RemoveQuery", 
"cannot open queries directory: %s", queriesdir.Data());
 
  157 void TQueryResultManager::ScanPreviousQueries(
const char *dir)
 
  160    if (fPreviousQueries) {
 
  161       fPreviousQueries->Delete();
 
  162       SafeDelete(fPreviousQueries);
 
  166    void *dirs = gSystem->OpenDirectory(dir);
 
  168    while ((sess = (
char *) gSystem->GetDirEntry(dirs))) {
 
  171       if (strlen(sess) < 7 || strncmp(sess,
"session",7))
 
  175       if (strstr(sess, fSessionTag))
 
  179       void *dirq = gSystem->OpenDirectory(Form(
"%s/%s", dir, sess));
 
  181       while ((qry = (
char *) gSystem->GetDirEntry(dirq))) {
 
  188          TString fn = Form(
"%s/%s/%s/query-result.root", dir, sess, qry);
 
  189          TFile *f = TFile::Open(fn);
 
  192             TIter nxk(f->GetListOfKeys());
 
  194             TProofQueryResult *pqr = 0;
 
  195             while ((k = (TKey *)nxk())) {
 
  196                if (!strcmp(k->GetClassName(), 
"TProofQueryResult")) {
 
  197                   pqr = (TProofQueryResult *) f->Get(k->GetName());
 
  199                      TQueryResult *qr = pqr->CloneInfo();
 
  201                         if (!fPreviousQueries)
 
  202                            fPreviousQueries = 
new TList;
 
  203                         if (qr->GetStatus() > TQueryResult::kRunning) {
 
  204                            fPreviousQueries->Add(qr);
 
  208                            TProofLockPath *lck = 0;
 
  209                            if (LockSession(qr->GetTitle(), &lck) == 0) {
 
  216                         Warning(
"ScanPreviousQueries", 
"unable to clone TProofQueryResult '%s:%s'",
 
  217                                                        pqr->GetName(), pqr->GetTitle());
 
  226       gSystem->FreeDirectory(dirq);
 
  228    gSystem->FreeDirectory(dirs);
 
  236 Int_t TQueryResultManager::ApplyMaxQueries(Int_t mxq)
 
  243    TSortedList *sl = 
new TSortedList;
 
  246    THashList *hl = 
new THashList;
 
  250    TList *dl = 
new TList;
 
  254    TString dir = fQueryDir;
 
  255    Int_t idx = dir.Index(
"session-");
 
  258    void *dirs = gSystem->OpenDirectory(dir);
 
  260    while ((sess = (
char *) gSystem->GetDirEntry(dirs))) {
 
  263       if (strlen(sess) < 7 || strncmp(sess,
"session",7))
 
  267       if (strstr(sess, fSessionTag))
 
  272       void *dirq = gSystem->OpenDirectory(Form(
"%s/%s", dir.Data(), sess));
 
  274       while ((qry = (
char *) gSystem->GetDirEntry(dirq))) {
 
  281          TString fn = Form(
"%s/%s/%s/query-result.root", dir.Data(), sess, qry);
 
  284          if (gSystem->GetPathInfo(fn, st)) {
 
  286                Info("ApplyMaxQueries","file '%s' cannot be stated: remove it", fn.Data());
 
  287             gSystem->Unlink(gSystem->DirName(fn));
 
  292          sl->Add(new TObjString(TString::Format("%ld", st.fMtime)));
 
  293          hl->Add(new TNamed((const 
char*)TString::Format("%ld",st.fMtime), fn.Data()));
 
  296       gSystem->FreeDirectory(dirq);
 
  299          dl->Add(new TParameter<Int_t>(TString::Format("%s/%s", dir.Data(), sess), nq));
 
  302          gSystem->Exec(TString::Format("%s -fr %s/%s", kRM, dir.Data(), sess));
 
  304    gSystem->FreeDirectory(dirs);
 
  307    TIter nxq(sl, kIterBackward);
 
  310    while ((os = (TObjString *)nxq())) {
 
  316          TNamed *nm = 
dynamic_cast<TNamed *
>(hl->FindObject(os->GetName()));
 
  318             gSystem->Unlink(nm->GetTitle());
 
  320             TString tdir(gSystem->DirName(nm->GetTitle()));
 
  321             tdir = gSystem->DirName(tdir.Data());
 
  322             TParameter<Int_t> *nq = 
dynamic_cast<TParameter<Int_t>*
>(dl->FindObject(tdir));
 
  324                Int_t val = nq->GetVal();
 
  326                if (nq->GetVal() <= 0)
 
  328                   gSystem->Exec(Form(
"%s -fr %s", kRM, tdir.Data()));
 
  348 Int_t TQueryResultManager::LockSession(
const char *sessiontag, TProofLockPath **lck)
 
  351    if (strstr(sessiontag, fSessionTag))
 
  355       Error(
"LockSession",
"locker space undefined");
 
  361    TString stag = sessiontag;
 
  362    TRegexp re(
"session-.*-.*-.*-.*");
 
  363    Int_t i1 = stag.Index(re);
 
  365       Error(
"LockSession",
"bad format: %s", sessiontag);
 
  368    stag.ReplaceAll(
"session-",
"");
 
  371    Int_t i2 = stag.Index(
":q");
 
  376    TString parlog = fSessionDir;
 
  377    parlog = parlog.Remove(parlog.Index(
"master-")+strlen(
"master-"));
 
  379    if (!gSystem->AccessPathName(parlog)) {
 
  381          Info("LockSession", "parent still running: do nothing");
 
  387       TString qlock = fLock->GetName();
 
  388       qlock.ReplaceAll(fSessionTag, stag);
 
  390       if (!gSystem->AccessPathName(qlock)) {
 
  391          *lck = 
new TProofLockPath(qlock);
 
  392          if (((*lck)->Lock()) < 0) {
 
  393             Error(
"LockSession",
"problems locking query lock file");
 
  407 Int_t TQueryResultManager::CleanupSession(
const char *sessiontag)
 
  410       Error(
"CleanupSession",
"session tag undefined");
 
  415    TString qdir = fQueryDir;
 
  416    qdir.ReplaceAll(Form(
"session-%s", fSessionTag.Data()), sessiontag);
 
  417    Int_t idx = qdir.Index(
":q");
 
  420    if (gSystem->AccessPathName(qdir)) {
 
  421       Info(
"CleanupSession",
"query dir %s does not exist", qdir.Data());
 
  425    TProofLockPath *lck = 0;
 
  426    if (LockSession(sessiontag, &lck) == 0) {
 
  429       gSystem->Exec(Form(
"%s %s", kRM, qdir.Data()));
 
  433          gSystem->Unlink(lck->GetName());
 
  442    Info(
"CleanupSession", 
"could not lock session %s", sessiontag);
 
  450 void TQueryResultManager::SaveQuery(TProofQueryResult *qr, 
const char *fout)
 
  452    if (!qr || qr->IsDraw())
 
  456    TString querydir = Form(
"%s/%d",fQueryDir.Data(), qr->GetSeqNum());
 
  459    if (gSystem->AccessPathName(querydir))
 
  460       gSystem->MakeDirectory(querydir);
 
  461    TString ofn = fout ? fout : Form(
"%s/query-result.root", querydir.Data());
 
  464    TFile *f = TFile::Open(ofn, 
"RECREATE");
 
  467       if (!(qr->IsArchived()))
 
  468          qr->SetResultFile(ofn);
 
  479 void TQueryResultManager::RemoveQuery(
const char *queryref, TList *otherlist)
 
  482       Info("RemoveQuery", "Enter");
 
  487    TProofQueryResult *pqr = LocateQuery(queryref, qry, qdir);
 
  491          fQueries->Remove(pqr);
 
  492          if (otherlist) otherlist->Add(pqr);
 
  494          fPreviousQueries->Remove(pqr);
 
  501       Info("RemoveQuery", "removing directory: %s", qdir.Data());
 
  502    gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
 
  512 void TQueryResultManager::RemoveQuery(TQueryResult *qr, Bool_t soft)
 
  515       Info("RemoveQuery", "Enter");
 
  521    TString qdir = fQueryDir;
 
  522    qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
 
  523    qdir = Form("%s/%s/%d", qdir.Data(), qr->GetTitle(), qr->GetSeqNum());
 
  525       Info("RemoveQuery", "removing directory: %s", qdir.Data());
 
  526    gSystem->Exec(Form("%s %s", kRM, qdir.Data()));
 
  530       TQueryResult *qrn = qr->CloneInfo();
 
  531       Int_t idx = fQueries->IndexOf(qr);
 
  533          fQueries->AddAt(qrn, idx);
 
  537    fQueries->Remove(qr);
 
  549 TProofQueryResult *TQueryResultManager::LocateQuery(TString queryref, Int_t &qry, TString &qdir)
 
  551    TProofQueryResult *pqr = 0;
 
  556    if (queryref.IsDigit()) {
 
  557       qry = queryref.Atoi();
 
  558    } 
else if (queryref.Contains(fSessionTag)) {
 
  559       Int_t i1 = queryref.Index(
":q");
 
  561          queryref.Remove(0,i1+2);
 
  562          qry = queryref.Atoi();
 
  571          Info("LocateQuery", "local query: %d", qry);
 
  576          while ((pqr = (TProofQueryResult *) nxq())) {
 
  577             if (pqr->GetSeqNum() == qry) {
 
  579                qdir = Form(
"%s/%d", fQueryDir.Data(), qry);
 
  587          Info("LocateQuery", "previously processed query: %s", queryref.Data());
 
  590       if (fPreviousQueries) {
 
  591          TIter nxq(fPreviousQueries);
 
  592          while ((pqr = (TProofQueryResult *) nxq())) {
 
  593             if (queryref.Contains(pqr->GetTitle()) &&
 
  594                 queryref.Contains(pqr->GetName()))
 
  599       queryref.ReplaceAll(
":q",
"/");
 
  601       qdir = qdir.Remove(qdir.Index(kPROOF_QueryDir)+strlen(kPROOF_QueryDir));
 
  602       qdir = Form(
"%s/%s", qdir.Data(), queryref.Data());
 
  612 Bool_t TQueryResultManager::FinalizeQuery(TProofQueryResult *pq,
 
  613                                           TProof *proof, TVirtualProofPlayer *player)
 
  615    if (!pq || !proof || !player) {
 
  616       Warning(
"FinalizeQuery", 
"bad inputs: query = %p, proof = %p, player: %p ",
 
  617               pq ? pq : 0, proof ? proof : 0, player ? player : 0);
 
  621    Int_t qn = pq->GetSeqNum();
 
  622    Long64_t np = player->GetEventsProcessed();
 
  623    TVirtualProofPlayer::EExitStatus est = player->GetExitStatus();
 
  624    TList *out = player->GetOutputList();
 
  626    Float_t cpu = proof->GetCpuTime();
 
  627    Long64_t bytes = proof->GetBytesRead();
 
  629    TQueryResult::EQueryStatus st = TQueryResult::kAborted;
 
  631    PDB(kGlobal, 2) Info("FinalizeQuery","query 
#%d", qn); 
  634       Info("FinalizeQuery","%.1f %lld", cpu, bytes);
 
  639    case TVirtualProofPlayer::kAborted:
 
  641          Info("FinalizeQuery", "query %d has been ABORTED <====", qn);
 
  645    case TVirtualProofPlayer::kStopped:
 
  647          Info("FinalizeQuery",
 
  648               "query %d has been STOPPED: %lld events processed", qn, np);
 
  649       st = TQueryResult::kStopped;
 
  651    case TVirtualProofPlayer::kFinished:
 
  653          Info("FinalizeQuery",
 
  654               "query %d has been completed: %lld events processed", qn, np);
 
  655       st = TQueryResult::kCompleted;
 
  658       Warning("FinalizeQuery",
 
  659               "query %d: unknown exit status (%d)", qn, player->GetExitStatus());
 
  666       Info("FinalizeQuery", "cpu: %.4f, saved: %.4f, master: %.4f",
 
  667                             cpu, pq->GetUsedCPU() ,GetCpuTime());
 
  670    pq->SetProcessInfo(np, cpu - pq->GetUsedCPU());
 
  671    pq->RecordEnd(st, out);
 
  683 void TQueryResultManager::SaveQuery(TProofQueryResult *pq, Int_t mxq)
 
  687       if (fQueries && fKeptQueries >= mxq) {
 
  689          TQueryResult *fcom = 0;
 
  690          TQueryResult *farc = 0;
 
  692          TQueryResult *qr = 0;
 
  693          while (fKeptQueries >= mxq) {
 
  694             while ((qr = (TQueryResult *) nxq())) {
 
  695                if (qr->IsArchived()) {
 
  696                   if (qr->GetOutputList() && !farc)
 
  698                } 
else if (qr->GetStatus() > TQueryResult::kRunning && !fcom) {
 
  704             if (!farc && !fcom) {
 
  707                RemoveQuery(farc, kTRUE);
 
  717       if (fKeptQueries < mxq) {
 
  722          emsg.Form(
"Too many saved queries (%d): cannot save %s:%s",
 
  723                    fKeptQueries, pq->GetTitle(), pq->GetName());
 
  725             gProofServ->SendAsynMessage(emsg.Data());
 
  727             Warning(
"SaveQuery", 
"%s", emsg.Data());