26 # include "snprintf.h"
28 #include "RConfigure.h"
58 Int_t TProofLite::fgWrksMax = -2;
72 TProofLite::TProofLite(
const char *url,
const char *conffile,
const char *confdir,
73 Int_t loglevel,
const char *alias, TProofMgr *mgr)
84 fReInvalid =
new TPMERegexp(
"[^A-Za-z0-9._-]");
91 fServType = TProofMgr::kProofLite;
98 if (fManager) SetBit(TProof::kIsClient);
99 SetBit(TProof::kIsMaster);
102 if (!gSystem->Getenv(
"ROOTPROOFCLIENT")) gSystem->Setenv(
"ROOTPROOFCLIENT",
"");
105 fUrl.SetProtocol(
"proof");
106 fUrl.SetHost(
"__lite__");
110 if (strlen(fUrl.GetUser()) <= 0) {
112 UserGroup_t *pw = gSystem->GetUserInfo();
114 fUrl.SetUser(pw->fUser);
118 fMaster = gSystem->HostName();
121 ParseConfigField(conffile);
126 if ((fNWorkers = GetNumberOfWorkers(url)) > 0) {
130 Int_t port = gEnv->GetValue(
"ProofServ.XpdPort", 1093);
131 stup.Form(
"%s @ %s:%d ", gProofServ->GetOrdinal(), gSystem->HostName(), port);
133 Printf(
" +++ Starting PROOF-Lite %swith %d workers +++", stup.Data(), fNWorkers);
135 Init(url, conffile, confdir, loglevel, alias);
139 if (!gROOT->GetListOfProofs()->FindObject(
this))
140 gROOT->GetListOfProofs()->Add(
this);
154 Int_t TProofLite::Init(
const char *,
const char *conffile,
155 const char *confdir, Int_t loglevel,
const char *)
162 fTty = (isatty(0) == 0 || isatty(1) == 0) ? kFALSE : kTRUE;
164 if (TestBit(TProof::kIsMaster)) {
166 if (!conffile || !conffile[0])
167 fConfFile = kPROOF_ConfFile;
168 if (!confdir || !confdir[0])
169 fConfDir = kPROOF_ConfDir;
172 fConfFile = conffile;
176 if (CreateSandbox() != 0) {
177 Error(
"Init",
"could not create/assert sandbox for this session");
182 TString sockpathdir = gEnv->GetValue(
"ProofLite.SockPathDir", gSystem->TempDirectory());
183 if (sockpathdir.IsNull()) sockpathdir = gSystem->TempDirectory();
184 if (sockpathdir(sockpathdir.Length()-1) ==
'/') sockpathdir.Remove(sockpathdir.Length()-1);
185 fSockPath.Form(
"%s/plite-%d", sockpathdir.Data(), gSystem->GetPid());
186 if (fSockPath.Length() > 104) {
188 Error(
"Init",
"Unix socket path '%s' is too long (%d bytes):",
189 fSockPath.Data(), fSockPath.Length());
190 Error(
"Init",
"use 'ProofLite.SockPathDir' to create it under a directory different"
191 " from '%s'", sockpathdir.Data());
195 fLogLevel = loglevel;
196 fProtocol = kPROOF_Protocol;
197 fSendGroupView = kTRUE;
201 fRecvMessages =
new TList;
202 fRecvMessages->SetOwner(kTRUE);
205 fAvailablePackages = 0;
206 fEnabledPackages = 0;
207 fEndMaster = TestBit(TProof::kIsMaster) ? kTRUE : kFALSE;
209 ResetBit(TProof::kNewInputData);
211 fEnabledPackagesOnCluster =
new TList;
212 fEnabledPackagesOnCluster->SetOwner();
215 fCollectTimeout = gEnv->GetValue(
"Proof.CollectTimeout", -1);
218 fDynamicStartup = kFALSE;
219 fDynamicStartupStep = -1;
220 fDynamicStartupNMax = -1;
221 TString dynconf = gEnv->GetValue(
"Proof.SimulateDynamicStartup",
"");
222 if (dynconf.Length() > 0) {
223 fDynamicStartup = kTRUE;
224 fLastPollWorkers_s = time(0);
228 if (dynconf.Tokenize(p, from,
":"))
229 if (p.IsDigit()) fDynamicStartupStep = p.Atoi();
230 if (dynconf.Tokenize(p, from,
":"))
231 if (p.IsDigit()) fDynamicStartupNMax = p.Atoi();
236 fProgressDialogStarted = kFALSE;
240 if (TestBit(TProof::kIsClient)) {
241 fLogFileName = Form(
"%s/session-%s.log", fWorkDir.Data(), GetName());
242 if ((fLogFileW = fopen(fLogFileName.Data(),
"w")) == 0)
243 Error(
"Init",
"could not create temporary logfile %s", fLogFileName.Data());
244 if ((fLogFileR = fopen(fLogFileName.Data(),
"r")) == 0)
245 Error(
"Init",
"could not open logfile %s for reading", fLogFileName.Data());
247 fLogToWindowOnly = kFALSE;
249 fCacheLock =
new TProofLockPath(TString::Format(
"%s/%s%s", gSystem->TempDirectory(),
250 kPROOF_CacheLockFile,
251 TString(fCacheDir).ReplaceAll(
"/",
"%").Data()));
254 fQueryLock =
new TProofLockPath(TString::Format(
"%s/%s%s-%s", gSystem->TempDirectory(),
255 kPROOF_QueryLockFile, GetName(),
256 TString(fQueryDir).ReplaceAll(
"/",
"%").Data()));
259 fQMgr =
new TQueryResultManager(fQueryDir, GetName(), fWorkDir,
260 fQueryLock, fLogFileW);
263 Int_t maxq = gEnv->GetValue(
"ProofLite.MaxQueriesSaved", 10);
264 if (fQMgr && fQMgr->ApplyMaxQueries(maxq) != 0)
265 Warning(
"Init",
"problems applying fMaxQueries");
267 if (InitDataSetManager() != 0)
268 Warning(
"Init",
"problems initializing the dataset manager");
293 fFeedback =
new TList;
294 fFeedback->SetOwner();
295 fFeedback->SetName(
"FeedbackList");
299 fSlaves =
new TSortedList(kSortDescending);
300 fActiveSlaves =
new TList;
301 fInactiveSlaves =
new TList;
302 fUniqueSlaves =
new TList;
303 fAllUniqueSlaves =
new TList;
304 fNonUniqueMasters =
new TList;
305 fBadSlaves =
new TList;
306 fAllMonitor =
new TMonitor;
307 fActiveMonitor =
new TMonitor;
308 fUniqueMonitor =
new TMonitor;
309 fAllUniqueMonitor =
new TMonitor;
313 fTerminatedSlaveInfos =
new TList;
314 fTerminatedSlaveInfos->SetOwner(kTRUE);
318 fForkStartup = kFALSE;
319 if (gEnv->GetValue(
"ProofLite.ForkStartup", 0) != 0) {
321 fForkStartup = kTRUE;
323 Warning(
"Init",
"fork-based workers startup is not available on Windows - ignoring");
328 if (TestBit(TProof::kIsClient)) {
331 TString globpack = gEnv->GetValue(
"Proof.GlobalPackageDirs",
"");
332 TProofServ::ResolveKeywords(globpack);
333 Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
335 Info(
"Init",
" %d global package directories registered", nglb);
339 if (SetupWorkers(0) != 0) {
340 Error(
"Init",
"problems setting up workers");
348 fAllMonitor->DeActivateAll();
351 GoParallel(-1, kFALSE);
360 ActivateAsyncInput();
362 SetRunStatus(TProof::kRunning);
365 R__LOCKGUARD(gROOTMutex);
366 gROOT->GetListOfSockets()->Add(
this);
370 return fActiveSlaves->GetSize();
375 TProofLite::~TProofLite()
380 if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
382 gSystem->MakeDirectory(fQueryDir+
"/.delete");
383 gSystem->Exec(Form(
"%s %s", kRM, fQueryDir.Data()));
388 gSystem->Unlink(fQueryLock->GetName());
389 fQueryLock->Unlock();
392 SafeDelete(fReInvalid);
393 SafeDelete(fDataSetManager);
394 SafeDelete(fDataSetStgRepo);
397 SafeDelete(fServSock);
398 gSystem->Unlink(fSockPath);
406 Int_t TProofLite::GetNumberOfWorkers(
const char *url)
408 Bool_t notify = kFALSE;
409 if (fgWrksMax == -2) {
411 TString sysname =
"system.rootrc";
412 char *s = gSystem->ConcatFileName(TROOT::GetEtcDir(), sysname);
414 sysenv.ReadFile(s, kEnvGlobal);
415 fgWrksMax = sysenv.GetValue(
"ProofLite.MaxWorkers", -1);
420 if (fgWrksMax == 0) {
421 ::Error(
"TProofLite::GetNumberOfWorkers",
422 "PROOF-Lite disabled by the system administrator: sorry!");
428 Bool_t urlSetting = kFALSE;
429 if (url && strlen(url)) {
431 Int_t in = nw.Index(
"workers=");
433 nw.Remove(0, in + strlen(
"workers="));
434 while (!nw.IsDigit())
435 nw.Remove(nw.Length()-1);
437 if ((nWorkers = nw.Atoi()) <= 0) {
438 ::Warning(
"TProofLite::GetNumberOfWorkers",
439 "number of workers specified by 'workers='"
440 " is non-positive: using default");
447 if (!urlSetting && fgProofEnvList) {
449 TNamed *nm = (TNamed *) fgProofEnvList->FindObject(
"PROOF_NWORKERS");
453 if ((nWorkers = nw.Atoi()) == 0) {
454 ::Warning(
"TProofLite::GetNumberOfWorkers",
455 "number of workers specified by 'workers='"
456 " is non-positive: using default");
462 nWorkers = gEnv->GetValue(
"ProofLite.Workers", -1);
465 if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
471 if (notify) notify = kFALSE;
475 if (fgWrksMax > 0 && fgWrksMax < nWorkers) {
477 ::Warning(
"TProofLite::GetNumberOfWorkers",
"number of PROOF-Lite workers limited by"
478 " the system administrator to %d", fgWrksMax);
479 nWorkers = fgWrksMax;
489 Int_t TProofLite::SetupWorkers(Int_t opt, TList *startedWorkers)
493 if ((fServSock =
new TServerSocket(fSockPath))) {
494 R__LOCKGUARD(gROOTMutex);
496 gROOT->GetListOfSockets()->Remove(fServSock);
499 if (!fServSock || !fServSock->IsValid()) {
500 Error(
"SetupWorkers",
501 "unable to create server socket for internal communications");
502 SetBit(kInvalidObject);
507 TMonitor *mon =
new TMonitor;
512 Int_t nWrksDone = 0, nWrksTot = -1;
516 nWrksTot = fForkStartup ? 1 : fNWorkers;
520 for (; ord < nWrksTot; ord++) {
523 const char *o = (gProofServ) ? gProofServ->GetOrdinal() :
"0";
524 fullord.Form(
"%s.%d", o, ord);
527 SetProofServEnv(fullord);
530 if ((wrk = CreateSlave(
"lite", fullord, 100, fImage, fWorkDir)))
534 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
539 Warning(
"SetupWorkers",
"standard startup: workers already started");
542 nWrksTot = fNWorkers - 1;
547 for (; ord < nWrksTot; ord++) {
550 const char *o = (gProofServ) ? gProofServ->GetOrdinal() :
"0";
551 fullord.Form(
"%s.%d", o, ord + 1);
552 if (!clones.IsNull()) clones +=
" ";
556 if ((wrk = CreateSlave(
"lite", fullord, -1, fImage, fWorkDir)))
560 NotifyStartUp(
"Opening connections to workers", ++nWrksDone, nWrksTot);
565 TMessage m(kPROOF_FORK);
567 Broadcast(m, kActive);
572 nWrksTot = started.GetSize();
574 Int_t to = gEnv->GetValue(
"ProofLite.StartupTimeOut", 5) * 1000;
575 while (started.GetSize() > 0 && nSelects < nWrksTot) {
578 TSocket *xs = mon->Select(to);
582 if (xs == (TSocket *) -1)
continue;
585 TSocket *s = fServSock->Accept();
586 if (s && s->IsValid()) {
589 if (s->Recv(msg) < 0) {
590 Warning(
"SetupWorkers",
"problems receiving message from accepted socket!");
596 if ((wrk = (TSlave *) started.FindObject(ord))) {
606 { R__LOCKGUARD(gROOTMutex);
607 gROOT->GetListOfSockets()->Remove(s);
609 if (wrk->IsValid()) {
611 wrk->SetInputHandler(
new TProofInputHandler(
this, wrk->GetSocket()));
616 wrk->SetupServ(TSlave::kSlave, 0);
621 if (wrk->IsValid()) {
622 if (opt == 1) fActiveSlaves->Add(wrk);
623 fAllMonitor->Add(wrk->GetSocket());
625 if (startedWorkers) startedWorkers->Add(wrk);
627 NotifyStartUp(
"Setting up worker servers", ++nWrksDone, nWrksTot);
630 fBadSlaves->Add(wrk);
634 Warning(
"SetupWorkers",
"received empty message from accepted socket!");
641 mon->DeActivateAll();
645 if (!gROOT->IsBatch() && !fProgressDialog) {
646 if ((fProgressDialog =
647 gROOT->GetPluginManager()->FindHandler(
"TProofProgressDialog")))
648 if (fProgressDialog->LoadPlugin() == -1)
667 void TProofLite::NotifyStartUp(
const char *action, Int_t done, Int_t tot)
669 Int_t frac = (Int_t) (done*100.)/tot;
672 snprintf(msg, 512,
"%s: OK (%d workers) \n",
675 snprintf(msg, 512,
"%s: %d out of %d (%d %%)\r",
676 action, done, tot, frac);
678 fprintf(stderr,
"%s", msg);
684 Int_t TProofLite::SetProofServEnv(
const char *ord)
687 if (!ord || strlen(ord) <= 0) {
688 Error(
"SetProofServEnv",
"ordinal string undefined");
693 TString rcfile(Form(
"%s/worker-%s.rootrc", fWorkDir.Data(), ord));
694 FILE *frc = fopen(rcfile.Data(),
"w");
696 Error(
"SetProofServEnv",
"cannot open rc file %s", rcfile.Data());
701 fprintf(frc,
"# The session working dir\n");
702 fprintf(frc,
"ProofServ.SessionDir: %s/worker-%s\n", fWorkDir.Data(), ord);
705 fprintf(frc,
"# Session tag\n");
706 fprintf(frc,
"ProofServ.SessionTag: %s\n", GetName());
709 fprintf(frc,
"# Proof Log/Debug level\n");
710 fprintf(frc,
"Proof.DebugLevel: %d\n", gDebug);
713 fprintf(frc,
"# Ordinal number\n");
714 fprintf(frc,
"ProofServ.Ordinal: %s\n", ord);
717 fprintf(frc,
"# ROOT Version tag\n");
718 fprintf(frc,
"ProofServ.RootVersionTag: %s\n", gROOT->GetVersion());
721 TString sandbox = fSandbox;
722 if (GetSandbox(sandbox, kFALSE,
"ProofServ.Sandbox") != 0)
723 Warning(
"SetProofServEnv",
"problems getting sandbox string for worker");
724 fprintf(frc,
"# Users sandbox\n");
725 fprintf(frc,
"ProofServ.Sandbox: %s\n", sandbox.Data());
728 fprintf(frc,
"# Users cache\n");
729 fprintf(frc,
"ProofServ.CacheDir: %s\n", fCacheDir.Data());
732 fprintf(frc,
"# Users packages\n");
733 fprintf(frc,
"ProofServ.PackageDir: %s\n", fPackMgr->GetDir());
736 fprintf(frc,
"# Server image\n");
737 fprintf(frc,
"ProofServ.Image: %s\n", fImage.Data());
740 fprintf(frc,
"# Open socket\n");
741 fprintf(frc,
"ProofServ.OpenSock: %s\n", fSockPath.Data());
744 fprintf(frc,
"# Client Protocol\n");
745 fprintf(frc,
"ProofServ.ClientVersion: %d\n", kPROOF_Protocol);
751 TString envfile(Form(
"%s/worker-%s.env", fWorkDir.Data(), ord));
752 FILE *fenv = fopen(envfile.Data(),
"w");
754 Error(
"SetProofServEnv",
"cannot open env file %s", envfile.Data());
758 fprintf(fenv,
"export ROOTSYS=%s\n", TROOT::GetRootSys().Data());
760 fprintf(fenv,
"export ROOTCONFDIR=%s\n", TROOT::GetRootSys().Data());
762 fprintf(fenv,
"export TMPDIR=%s\n", gSystem->TempDirectory());
764 TString logfile(Form(
"%s/worker-%s.log", fWorkDir.Data(), ord));
765 fprintf(fenv,
"export ROOTPROOFLOGFILE=%s\n", logfile.Data());
767 fprintf(fenv,
"export ROOTRCFILE=%s\n", rcfile.Data());
769 fprintf(fenv,
"export ROOTVERSIONTAG=%s\n", gROOT->GetVersion());
771 fprintf(fenv,
"export ROOTPROOFLITE=%d\n", fNWorkers);
773 fprintf(fenv,
"export LOCALDATASERVER=\"file://\"\n");
775 if (fgProofEnvList) {
777 TIter nxenv(fgProofEnvList);
779 while ((env = (TNamed *)nxenv())) {
780 TString senv(env->GetTitle());
781 ResolveKeywords(senv, ord, logfile.Data());
782 fprintf(fenv,
"export %s=%s\n", env->GetName(), senv.Data());
783 if (namelist.Length() > 0)
785 namelist += env->GetName();
787 fprintf(fenv,
"export PROOF_ALLVARS=%s\n", namelist.Data());
801 void TProofLite::ResolveKeywords(TString &s,
const char *ord,
804 if (!logfile)
return;
807 if (s.Contains(
"<logfilewrk>") && logfile) {
808 TString lfr(logfile);
809 if (lfr.EndsWith(
".log")) lfr.Remove(lfr.Last(
'.'));
810 s.ReplaceAll(
"<logfilewrk>", lfr.Data());
814 if (gSystem->Getenv(
"USER") && s.Contains(
"<user>")) {
815 s.ReplaceAll(
"<user>", gSystem->Getenv(
"USER"));
819 if (gSystem->Getenv(
"ROOTSYS") && s.Contains(
"<rootsys>")) {
820 s.ReplaceAll(
"<rootsys>", gSystem->Getenv(
"ROOTSYS"));
824 if (s.Contains(
"<cpupin>")) {
826 Int_t n = o.Index(
'.');
834 const TList *envVars = GetEnvVars();
837 var =
dynamic_cast<TNamed *
>(envVars->FindObject(
"PROOF_SLAVE_CPUPIN_ORDER"));
838 if (var) cpuPinList = var->GetTitle();
845 if (gSystem->GetSysInfo(&si) == 0 && (si.fCpus > 0))
850 if (cpuPinList.IsNull() || (cpuPinList ==
"*")) {
857 n = n % (cpuPinList.CountChar(
'+')+1);
860 for (Int_t i=0; cpuPinList.Tokenize(tok, from,
"\\+"); i++) {
862 n = (tok.Atoi() % nCpus);
873 s.ReplaceAll(
"<cpupin>", o);
880 Int_t TProofLite::CreateSandbox()
883 if (GetSandbox(fSandbox, kTRUE,
"ProofLite.Sandbox") != 0)
return -1;
886 TString packdir = gEnv->GetValue(
"Proof.PackageDir",
"");
887 if (packdir.IsNull())
888 packdir.Form(
"%s/%s", fSandbox.Data(), kPROOF_PackDir);
889 if (AssertPath(packdir, kTRUE) != 0)
return -1;
890 fPackMgr =
new TPackMgr(packdir);
893 fCacheDir = gEnv->GetValue(
"Proof.CacheDir",
"");
894 if (fCacheDir.IsNull())
895 fCacheDir.Form(
"%s/%s", fSandbox.Data(), kPROOF_CacheDir);
896 if (AssertPath(fCacheDir, kTRUE) != 0)
return -1;
899 fDataSetDir = gEnv->GetValue(
"Proof.DataSetDir",
"");
900 if (fDataSetDir.IsNull())
901 fDataSetDir.Form(
"%s/%s", fSandbox.Data(), kPROOF_DataSetDir);
902 if (AssertPath(fDataSetDir, kTRUE) != 0)
return -1;
906 stag.Form(
"%s-%d-%d", gSystem->HostName(), (int)time(0), gSystem->GetPid());
907 SetName(stag.Data());
909 Int_t subpath = gEnv->GetValue(
"ProofLite.SubPath", 1);
913 sessdir = gSystem->WorkingDirectory();
914 sessdir.ReplaceAll(gSystem->HomeDirectory(),
"");
915 sessdir.ReplaceAll(
"/",
"-");
916 sessdir.Replace(0,1,
"/",1);
917 sessdir.Insert(0, fSandbox.Data());
924 fWorkDir.Form(
"%s/session-%s", sessdir.Data(), stag.Data());
925 if (AssertPath(fWorkDir, kTRUE) != 0)
return -1;
929 lastsess.Form(
"%s/last-lite-session", sessdir.Data());
930 gSystem->Unlink(lastsess);
931 gSystem->Symlink(fWorkDir, lastsess);
934 fQueryDir = gEnv->GetValue(
"Proof.QueryDir",
"");
935 if (fQueryDir.IsNull())
936 fQueryDir.Form(
"%s/%s", sessdir.Data(), kPROOF_QueryDir);
937 if (AssertPath(fQueryDir, kTRUE) != 0)
return -1;
949 void TProofLite::Print(Option_t *option)
const
952 if (gProofServ) ord.Form(
"%s ", gProofServ->GetOrdinal());
954 Printf(
"*** PROOF-Lite cluster %s(parallel mode, %d workers):", ord.Data(), GetParallel());
956 Printf(
"*** PROOF-Lite cluster %s(sequential mode)", ord.Data());
959 TString url(gSystem->HostName());
961 Int_t port = gEnv->GetValue(
"ProofServ.XpdPort", 1093);
962 if (port > -1) url.Form(
"%s:%d",gSystem->HostName(), port);
963 Printf(
"URL: %s", url.Data());
965 Printf(
"Host name: %s", gSystem->HostName());
967 Printf(
"User: %s", GetUser());
968 TString ver(gROOT->GetVersion());
969 ver += TString::Format(
"|%s", gROOT->GetGitCommit());
970 if (gSystem->Getenv(
"ROOTVERSIONTAG"))
971 ver += TString::Format(
"|%s", gSystem->Getenv(
"ROOTVERSIONTAG"));
972 Printf(
"ROOT version|rev|tag: %s", ver.Data());
973 Printf(
"Architecture-Compiler: %s-%s", gSystem->GetBuildArch(),
974 gSystem->GetBuildCompilerVersion());
975 Printf(
"Protocol version: %d", GetClientProtocol());
976 Printf(
"Working directory: %s", gSystem->WorkingDirectory());
977 Printf(
"Communication path: %s", fSockPath.Data());
978 Printf(
"Log level: %d", GetLogLevel());
979 Printf(
"Number of workers: %d", GetNumberOfSlaves());
980 Printf(
"Number of active workers: %d", GetNumberOfActiveSlaves());
981 Printf(
"Number of unique workers: %d", GetNumberOfUniqueSlaves());
982 Printf(
"Number of inactive workers: %d", GetNumberOfInactiveSlaves());
983 Printf(
"Number of bad workers: %d", GetNumberOfBadSlaves());
984 Printf(
"Total MB's processed: %.2f",
float(GetBytesRead())/(1024*1024));
985 Printf(
"Total real time used (s): %.3f", GetRealTime());
986 Printf(
"Total CPU time used (s): %.3f", GetCpuTime());
987 if (TString(option).Contains(
"a", TString::kIgnoreCase) && GetNumberOfSlaves()) {
988 Printf(
"List of workers:");
989 TIter nextslave(fSlaves);
990 while (TSlave* sl = dynamic_cast<TSlave*>(nextslave())) {
1000 TProofQueryResult *TProofLite::MakeQueryResult(Long64_t nent,
const char *opt,
1001 Long64_t fst, TDSet *dset,
1007 fQMgr->IncrementSeqNum();
1008 seqnum = fQMgr->SeqNum();
1012 TProofQueryResult *pqr =
new TProofQueryResult(seqnum, opt,
1013 fPlayer->GetInputList(), nent,
1015 (dset ? dset->GetEntryList() : 0));
1017 pqr->SetTitle(GetName());
1025 void TProofLite::SetQueryRunning(TProofQueryResult *pq)
1029 Int_t startlog = lseek(fileno(fLogFileW), (off_t) 0, SEEK_END);
1033 Info(
"SetQueryRunning",
"starting query: %d", pq->GetSeqNum());
1036 TString parlist =
"";
1037 fPackMgr->GetEnabledPackages(parlist);
1040 pq->SetRunning(startlog, parlist, GetParallel());
1044 pq->SetProcessInfo(pq->GetEntries(), GetCpuTime(), GetBytesRead());
1053 Long64_t TProofLite::DrawSelect(TDSet *dset,
const char *varexp,
1054 const char *selection, Option_t *option,
1055 Long64_t nentries, Long64_t first)
1057 if (!IsValid())
return -1;
1061 Info(
"DrawSelect",
"not idle, asynchronous Draw not supported");
1064 TString opt(option);
1065 Int_t idx = opt.Index(
"ASYN", 0, TString::kIgnoreCase);
1067 opt.Replace(idx,4,
"");
1071 fSelection = selection;
1073 return Process(dset,
"draw:", opt, nentries, first);
1083 Long64_t TProofLite::Process(TDSet *dset,
const char *selector, Option_t *option,
1084 Long64_t nentries, Long64_t first)
1089 TString opt(option), optfb, outfile;
1091 if (opt.Contains(
"fb=") || opt.Contains(
"feedback=")) SetFeedback(opt, optfb, 0);
1093 if (HandleOutputOptions(opt, outfile, 0) != 0)
return -1;
1096 fSync = (GetQueryMode(opt) == kSync);
1098 Info(
"Process",
"asynchronous mode not yet supported in PROOF-Lite");
1104 Info(
"Process",
"not idle: cannot accept queries");
1109 if (IsIdle() && fRunningDSets && fRunningDSets->GetSize() > 0) {
1110 fRunningDSets->SetOwner(kTRUE);
1111 fRunningDSets->Delete();
1114 if (!IsValid() || !fQMgr || !fPlayer) {
1115 Error(
"Process",
"invalid sesion or query-result manager undefined!");
1121 if (!fPlayer->GetInputList()->FindObject(
"PROOF_MaxSlavesPerNode"))
1122 SetParameter(
"PROOF_MaxSlavesPerNode", (Long_t)0);
1124 Bool_t hasNoData = (!dset || (dset && dset->TestBit(TDSet::kEmpty))) ? kTRUE : kFALSE;
1130 if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
1131 if (TProof::AssertDataSet(dset, fPlayer->GetInputList(), fDataSetManager, emsg) != 0) {
1132 Error(
"Process",
"from AssertDataSet: %s", emsg.Data());
1135 if (dset->GetListOfElements()->GetSize() == 0) {
1136 Error(
"Process",
"no files to process!");
1139 }
else if (hasNoData) {
1141 TNamed *ftp =
dynamic_cast<TNamed *
>(fPlayer->GetInputList()->FindObject(
"PROOF_FilesToProcess"));
1143 TString dsn(ftp->GetTitle());
1144 if (!dsn.Contains(
":") || dsn.BeginsWith(
"dataset:")) {
1145 dsn.ReplaceAll(
"dataset:",
"");
1147 if (!fDataSetManager) {
1148 emsg.Form(
"dataset manager not initialized!");
1150 TFileCollection *fc = 0;
1152 if (!(fc = fDataSetManager->GetDataSet(dsn))) {
1153 emsg.Form(
"requested dataset '%s' does not exists", dsn.Data());
1155 TMap *fcmap = TProofServ::GetDataSetNodeMap(fc, emsg);
1157 fPlayer->GetInputList()->Remove(ftp);
1159 fcmap->SetOwner(kTRUE);
1160 fcmap->SetName(
"PROOF_FilesToProcess");
1161 fPlayer->GetInputList()->Add(fcmap);
1165 if (!emsg.IsNull()) {
1166 Error(
"HandleProcess",
"%s", emsg.Data());
1173 TString selec(selector), varexp, selection, objname;
1175 if (selec.BeginsWith(
"draw:")) {
1177 selection = fSelection;
1179 if (fPlayer->GetDrawArgs(varexp, selection, opt, selec, objname) != 0) {
1180 Error(
"Process",
"draw query: error parsing arguments '%s', '%s', '%s'",
1181 varexp.Data(), selection.Data(), opt.Data());
1187 TProofQueryResult *pq = MakeQueryResult(nentries, opt, first, 0, selec);
1191 Bool_t savequeries =
1192 (!strcmp(gEnv->GetValue(
"ProofLite.AutoSaveQueries",
"off"),
"on")) ? kTRUE : kFALSE;
1195 Int_t memqueries = gEnv->GetValue(
"ProofLite.MaxQueriesMemory", 1);
1198 if (!(pq->IsDraw())) {
1199 if (fQMgr->Queries()) {
1200 if (memqueries != 0) fQMgr->Queries()->Add(pq);
1201 if (memqueries >= 0 && fQMgr->Queries()->GetSize() > memqueries) {
1203 TObject *qfst = fQMgr->Queries()->First();
1204 fQMgr->Queries()->Remove(qfst);
1209 if (savequeries) fQMgr->SaveQuery(pq);
1213 fSeqNum = pq->GetSeqNum();
1216 SetQueryRunning(pq);
1219 if (!(pq->IsDraw())) {
1220 if (savequeries) fQMgr->SaveQuery(pq);
1222 fQMgr->IncrementDrawQueries();
1226 if (!gROOT->IsBatch()) {
1227 Int_t dsz = (dset && dset->GetListOfElements()) ? dset->GetListOfElements()->GetSize() : -1;
1228 if (fProgressDialog &&
1229 !TestBit(kUsingSessionGui) && TestBit(kUseProgressDialog)) {
1230 if (!fProgressDialogStarted) {
1231 fProgressDialog->ExecPlugin(5,
this, selec.Data(), dsz,
1233 fProgressDialogStarted = kTRUE;
1235 ResetProgressDialog(selec.Data(), dsz, first, nentries);
1238 ResetBit(kUsingSessionGui);
1242 if (!(pq->IsDraw()))
1243 fPlayer->AddQueryResult(pq);
1246 fPlayer->SetCurrentQuery(pq);
1250 TNamed *qtag = (TNamed *) fPlayer->GetInputList()->FindObject(
"PROOF_QueryTag");
1252 qtag->SetTitle(Form(
"%s:%s",pq->GetTitle(),pq->GetName()));
1254 TObject *o = fPlayer->GetInputList()->FindObject(
"PROOF_QueryTag");
1255 if (o) fPlayer->GetInputList()->Remove(o);
1256 fPlayer->AddInput(
new TNamed(
"PROOF_QueryTag",
1257 Form(
"%s:%s",pq->GetTitle(),pq->GetName())));
1261 SetRunStatus(TProof::kRunning);
1265 TSignalHandler *sh = 0;
1268 sh = gSystem->RemoveSignalHandler(gApplication->GetSignalHandler());
1272 fOutputList.Clear();
1275 TList *startedWorkers = 0;
1277 startedWorkers =
new TList;
1278 startedWorkers->SetOwner(kFALSE);
1279 SetupWorkers(1, startedWorkers);
1286 if (!(pq->IsDraw())) {
1287 if (selector && strlen(selector)) {
1288 rv = fPlayer->Process(dset, selec, opt, nentries, first);
1290 rv = fPlayer->Process(dset, fSelector, opt, nentries, first);
1293 rv = fPlayer->DrawSelect(dset, varexp, selection, opt, nentries, first);
1298 Float_t rt = fQuerySTW.RealTime();
1300 TQueryResult *qr = GetQueryResult();
1302 qr->SetTermTime(rt);
1307 if (!optfb.IsNull()) SetFeedback(opt, optfb, 1);
1312 if (fForkStartup && startedWorkers) {
1313 RemoveWorkers(startedWorkers);
1314 SafeDelete(startedWorkers);
1319 gSystem->AddSignalHandler(sh);
1322 if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
1323 Bool_t abort = (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted)
1325 if (abort) fPlayer->StopProcess(kTRUE);
1326 Emit(
"StopProcess(Bool_t)", abort);
1330 pq->SetOutputList(fPlayer->GetOutputList(), kFALSE);
1332 QueryResultReady(Form(
"%s:%s", pq->GetTitle(), pq->GetName()));
1338 if (rv == 0 && dset && !dset->TestBit(TDSet::kEmpty) && pq->GetInputList()) {
1339 pq->GetInputList()->Add(dset);
1340 if (dset->GetEntryList())
1341 pq->GetInputList()->Add(dset->GetEntryList());
1345 if (fDataSetManager && fPlayer->GetOutputList()) {
1346 TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject(
"PROOFSERV_RegisterDataSet");
1349 if (TProofServ::RegisterDataSets(fPlayer->GetInputList(),
1350 fPlayer->GetOutputList(), fDataSetManager, err) != 0)
1351 Warning(
"ProcessNext",
"problems registering produced datasets: %s", err.Data());
1352 fPlayer->GetOutputList()->Remove(psr);
1359 if (!(pq->IsDraw())) {
1360 if (fQMgr->FinalizeQuery(pq,
this, fPlayer)) {
1361 if (savequeries) fQMgr->SaveQuery(pq, -1);
1366 if (fPlayer && fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
1367 if (fPlayer->GetListOfResults()) fPlayer->GetListOfResults()->Remove(pq);
1368 if (fQMgr) fQMgr->RemoveQuery(pq);
1371 QueryResultReady(Form(
"%s:%s", pq->GetTitle(), pq->GetName()));
1373 if (!(pq->IsDraw()) && memqueries >= 0) {
1374 if (fQMgr && fQMgr->Queries()) {
1375 TQueryResult *pqr = pq->CloneInfo();
1376 if (pqr) fQMgr->Queries()->Add(pqr);
1378 fQMgr->Queries()->Remove(pq);
1383 msg.Form(
"Lite-0: all output objects have been merged ");
1384 fprintf(stderr,
"%s\n", msg.Data());
1387 if (!fPerfTree.IsNull()) {
1388 if (SavePerfTree() != 0) Error(
"Process",
"saving performance info ...");
1394 if (HandleOutputOptions(opt, outfile, 1) != 0)
return -1;
1398 TParameter<Long64_t> *sst =
1399 (TParameter<Long64_t> *) fOutputList.FindObject(
"PROOF_SelectorStatus");
1400 if (sst) rv = sst->GetVal();
1412 Int_t TProofLite::InitDataSetManager()
1414 fDataSetManager = 0;
1417 TString user(
"???"), group(
"default");
1418 UserGroup_t *pw = gSystem->GetUserInfo();
1425 TPluginHandler *h = 0;
1426 TString dsm = gEnv->GetValue(
"Proof.DataSetManager",
"");
1427 if (!dsm.IsNull()) {
1429 if (gROOT->GetPluginManager()) {
1431 h = gROOT->GetPluginManager()->FindHandler(
"TDataSetManager", dsm);
1432 if (h && h->LoadPlugin() != -1) {
1435 reinterpret_cast<TDataSetManager*
>(h->ExecPlugin(3, group.Data(),
1436 user.Data(), dsm.Data()));
1440 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
1441 Warning(
"InitDataSetManager",
"dataset manager plug-in initialization failed");
1442 SafeDelete(fDataSetManager);
1446 if (!fDataSetManager) {
1447 TString opts(
"Av:");
1448 TString dsetdir = gEnv->GetValue(
"ProofServ.DataSetDir",
"");
1449 if (dsetdir.IsNull()) {
1451 dsetdir = fDataSetDir;
1456 h = gROOT->GetPluginManager()->FindHandler(
"TDataSetManager",
"file");
1457 if (h && h->LoadPlugin() == -1) h = 0;
1461 fDataSetManager =
reinterpret_cast<TDataSetManager*
>(h->ExecPlugin(3,
1462 group.Data(), user.Data(),
1463 Form(
"dir:%s opt:%s", dsetdir.Data(), opts.Data())));
1465 if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
1466 Warning(
"InitDataSetManager",
"default dataset manager plug-in initialization failed");
1467 SafeDelete(fDataSetManager);
1471 if (gDebug > 0 && fDataSetManager) {
1472 Info(
"InitDataSetManager",
"datasetmgr Cq: %d, Ar: %d, Av: %d, Ti: %d, Sb: %d",
1473 fDataSetManager->TestBit(TDataSetManager::kCheckQuota),
1474 fDataSetManager->TestBit(TDataSetManager::kAllowRegister),
1475 fDataSetManager->TestBit(TDataSetManager::kAllowVerify),
1476 fDataSetManager->TestBit(TDataSetManager::kTrustInfo),
1477 fDataSetManager->TestBit(TDataSetManager::kIsSandbox));
1481 TString dsReqCfg = gEnv->GetValue(
"Proof.DataSetStagingRequests",
"");
1482 if (!dsReqCfg.IsNull()) {
1483 TPMERegexp reReqDir(
"(^| )(dir:)?([^ ]+)( |$)");
1485 if (reReqDir.Match(dsReqCfg) == 5) {
1487 dsDirFmt.Form(
"dir:%s perms:open", reReqDir[3].Data());
1488 fDataSetStgRepo =
new TDataSetManagerFile(
"_stage_",
"_stage_", dsDirFmt);
1489 if (fDataSetStgRepo && fDataSetStgRepo->TestBit(TObject::kInvalidObject)) {
1490 Warning(
"InitDataSetManager",
"failed init of dataset staging requests repository");
1491 SafeDelete(fDataSetStgRepo);
1494 Warning(
"InitDataSetManager",
"specify, with [dir:]<path>, a valid path for staging requests");
1496 }
else if (gDebug > 0) {
1497 Warning(
"InitDataSetManager",
"no repository for staging requests available");
1501 return (fDataSetManager ? 0 : -1);
1508 void TProofLite::ShowCache(Bool_t)
1510 if (!IsValid())
return;
1512 Printf(
"*** Local file cache %s ***", fCacheDir.Data());
1513 gSystem->Exec(Form(
"%s %s", kLS, fCacheDir.Data()));
1519 void TProofLite::ClearCache(
const char *file)
1521 if (!IsValid())
return;
1524 if (!file || strlen(file) <= 0) {
1525 gSystem->Exec(Form(
"%s %s/*", kRM, fCacheDir.Data()));
1527 gSystem->Exec(Form(
"%s %s/%s", kRM, fCacheDir.Data(), file));
1529 fCacheLock->Unlock();
1539 Int_t TProofLite::Load(
const char *macro, Bool_t notOnClient, Bool_t uniqueOnly,
1542 if (!IsValid())
return -1;
1544 if (!macro || !macro[0]) {
1545 Error(
"Load",
"need to specify a macro name");
1549 TString macs(macro), mac;
1551 while (macs.Tokenize(mac, from,
",")) {
1553 if (CopyMacroToCache(mac) < 0)
return -1;
1556 TString macn = gSystem->BaseName(mac);
1557 macn.Remove(macn.Last(
'.'));
1560 TString cacheDir = fCacheDir;
1561 gSystem->ExpandPathName(cacheDir);
1562 void * dirp = gSystem->OpenDirectory(cacheDir);
1565 while ((e = gSystem->GetDirEntry(dirp))) {
1566 if (!strncmp(e, macn.Data(), macn.Length())) {
1567 TString fncache = Form(
"%s/%s", cacheDir.Data(), e);
1568 cachedFiles.Add(
new TObjString(fncache.Data()));
1571 gSystem->FreeDirectory(dirp);
1576 return TProof::Load(macro, notOnClient, uniqueOnly, wrks);
1593 Int_t TProofLite::CopyMacroToCache(
const char *macro, Int_t headerRequired,
1594 TSelector **selector, Int_t opt, TList *)
1597 TString cacheDir = fCacheDir;
1598 gSystem->ExpandPathName(cacheDir);
1599 TProofLockPath *cacheLock = fCacheLock;
1602 TString name = macro;
1603 TString acmode, args, io;
1604 name = gSystem->SplitAclicMode(name, acmode, args, io);
1607 Info("CopyMacroToCache", "enter: names: %s, %s", macro, name.Data());
1610 if (gSystem->AccessPathName(name, kReadPermission)) {
1611 Error(
"CopyMacroToCache",
"file %s not found or not readable", name.Data());
1616 TString mp(TROOT::GetMacroPath());
1617 TString np(gSystem->DirName(name));
1620 if (!mp.BeginsWith(np) && !mp.Contains(
":"+np)) {
1621 Int_t ip = (mp.BeginsWith(
".:")) ? 2 : 0;
1623 TROOT::SetMacroPath(mp);
1625 Info("CopyMacroToCache", "macro path set to '%s'", TROOT::GetMacroPath());
1630 Int_t dot = name.Last('.');
1631 const
char *hext[] = {
".h",
".hh",
"" };
1632 TString hname, checkedext;
1634 while (strlen(hext[i]) > 0) {
1635 hname = name(0, dot);
1637 if (!gSystem->AccessPathName(hname, kReadPermission))
1639 if (!checkedext.IsNull()) checkedext +=
",";
1640 checkedext += hext[i];
1644 if (hname.IsNull() && headerRequired == 1) {
1645 Error(
"CopyMacroToCache",
"header file for %s not found or not readable "
1646 "(checked extensions: %s)", name.Data(), checkedext.Data());
1649 if (headerRequired < 0)
1655 Bool_t useCacheBinaries = kFALSE;
1656 TString cachedname = Form(
"%s/%s", cacheDir.Data(), gSystem->BaseName(name));
1657 TString cachedhname;
1658 if (!hname.IsNull())
1659 cachedhname = Form(
"%s/%s", cacheDir.Data(), gSystem->BaseName(hname));
1660 if (!gSystem->AccessPathName(cachedname, kReadPermission)) {
1661 TMD5 *md5 = TMD5::FileChecksum(name);
1662 TMD5 *md5cache = TMD5::FileChecksum(cachedname);
1663 if (md5 && md5cache && (*md5 == *md5cache))
1664 useCacheBinaries = kTRUE;
1665 if (!hname.IsNull()) {
1666 if (!gSystem->AccessPathName(cachedhname, kReadPermission)) {
1667 TMD5 *md5h = TMD5::FileChecksum(hname);
1668 TMD5 *md5hcache = TMD5::FileChecksum(cachedhname);
1669 if (md5h && md5hcache && (*md5h != *md5hcache))
1670 useCacheBinaries = kFALSE;
1672 SafeDelete(md5hcache);
1676 SafeDelete(md5cache);
1680 TString vername(Form(
".%s", name.Data()));
1681 dot = vername.Last(
'.');
1683 vername.Remove(dot);
1684 vername +=
".binversion";
1685 Bool_t savever = kFALSE;
1688 if (useCacheBinaries) {
1690 FILE *f = fopen(Form(
"%s/%s", cacheDir.Data(), vername.Data()),
"r");
1696 if (!f || v != gROOT->GetVersion() || r != gROOT->GetGitCommit())
1697 useCacheBinaries = kFALSE;
1701 TString binname = gSystem->BaseName(name);
1702 dot = binname.Last(
'.');
1704 binname.Replace(dot,1,
"_");
1705 TString pcmname = TString::Format(
"%s_ACLiC_dict_rdict.pcm", binname.Data());
1708 FileStat_t stlocal, stcache;
1710 if (useCacheBinaries) {
1713 dirp = gSystem->OpenDirectory(cacheDir);
1716 while ((e = gSystem->GetDirEntry(dirp))) {
1717 if (!strncmp(e, binname.Data(), binname.Length()) ||
1718 !strncmp(e, pcmname.Data(), pcmname.Length())) {
1719 TString fncache = Form(
"%s/%s", cacheDir.Data(), e);
1720 Bool_t docp = kTRUE;
1721 if (!gSystem->GetPathInfo(fncache, stcache)) {
1722 Int_t rc = gSystem->GetPathInfo(e, stlocal);
1723 if (rc == 0 && (stlocal.fMtime >= stcache.fMtime))
1727 gSystem->Exec(Form(
"%s %s", kRM, e));
1729 Info("CopyMacroToCache",
1730 "retrieving %s from cache", fncache.Data());
1731 gSystem->Exec(Form("%s %s %s", kCP, fncache.Data(), e));
1736 gSystem->FreeDirectory(dirp);
1739 cacheLock->Unlock();
1743 if (!(*selector = TSelector::GetSelector(macro))) {
1744 Error(
"CopyMacroToCache",
"could not create a selector from %s", macro);
1751 TList *cachedFiles =
new TList;
1753 dirp = gSystem->OpenDirectory(
".");
1756 while ((e = gSystem->GetDirEntry(dirp))) {
1757 if (!strncmp(e, binname.Data(), binname.Length()) ||
1758 !strncmp(e, pcmname.Data(), pcmname.Length())) {
1759 Bool_t docp = kTRUE;
1760 if (!gSystem->GetPathInfo(e, stlocal)) {
1761 TString fncache = Form(
"%s/%s", cacheDir.Data(), e);
1762 Int_t rc = gSystem->GetPathInfo(fncache, stcache);
1763 if (rc == 0 && (stlocal.fMtime <= stcache.fMtime))
1767 gSystem->Exec(Form(
"%s %s", kRM, fncache.Data()));
1769 Info("CopyMacroToCache","caching %s ...", e);
1770 gSystem->Exec(Form("%s %s %s", kCP, e, fncache.Data()));
1774 cachedFiles->Add(new TObjString(fncache.Data()));
1778 gSystem->FreeDirectory(dirp);
1783 FILE *f = fopen(Form(
"%s/%s", cacheDir.Data(), vername.Data()),
"w");
1785 fputs(gROOT->GetVersion(), f);
1786 fputs(Form(
"\n%s", gROOT->GetGitCommit()), f);
1792 if (!useCacheBinaries) {
1793 gSystem->Exec(Form(
"%s %s", kRM, cachedname.Data()));
1795 Info("CopyMacroToCache","caching %s ...", name.Data());
1796 gSystem->Exec(Form("%s %s %s", kCP, name.Data(), cachedname.Data()));
1797 if (!hname.IsNull()) {
1798 gSystem->Exec(Form(
"%s %s", kRM, cachedhname.Data()));
1800 Info("CopyMacroToCache","caching %s ...", hname.Data());
1801 gSystem->Exec(Form("%s %s %s", kCP, hname.Data(), cachedhname.Data()));
1805 cachedFiles->Add(
new TObjString(cachedname.Data()));
1806 if (!hname.IsNull())
1807 cachedFiles->Add(
new TObjString(cachedhname.Data()));
1810 cacheLock->Unlock();
1812 cachedFiles->SetOwner();
1821 Int_t TProofLite::CleanupSandbox()
1823 Int_t maxold = gEnv->GetValue(
"Proof.MaxOldSessions", 1);
1825 if (maxold < 0)
return 0;
1827 TSortedList *olddirs =
new TSortedList(kFALSE);
1829 TString sandbox = gSystem->DirName(fWorkDir.Data());
1831 void *dirp = gSystem->OpenDirectory(sandbox);
1834 while ((e = gSystem->GetDirEntry(dirp))) {
1835 if (!strncmp(e,
"session-", 8) && !strstr(e, GetName())) {
1837 Int_t i = d.Last(
'-');
1838 if (i != kNPOS) d.Remove(i);
1840 if (i != kNPOS) d.Remove(0,i+1);
1841 TString path = Form(
"%s/%s", sandbox.Data(), e);
1842 olddirs->Add(
new TNamed(d, path));
1845 gSystem->FreeDirectory(dirp);
1849 Bool_t notify = kTRUE;
1850 while (olddirs->GetSize() > maxold) {
1851 if (notify && gDebug > 0)
1852 Printf(
"Cleaning sandbox at: %s", sandbox.Data());
1854 TNamed *n = (TNamed *) olddirs->Last();
1856 gSystem->Exec(Form(
"%s %s", kRM, n->GetTitle()));
1863 olddirs->SetOwner();
1873 TList *TProofLite::GetListOfQueries(Option_t *opt)
1875 Bool_t all = ((strchr(opt,
'A') || strchr(opt,
'a'))) ? kTRUE : kFALSE;
1877 TList *ql =
new TList;
1878 Int_t ntot = 0, npre = 0, ndraw= 0;
1882 TString qdir = fQueryDir;
1883 Int_t idx = qdir.Index(
"session-");
1886 fQMgr->ScanPreviousQueries(qdir);
1888 if (fQMgr->PreviousQueries()) {
1889 TIter nxq(fQMgr->PreviousQueries());
1890 TProofQueryResult *pqr = 0;
1891 while ((pqr = (TProofQueryResult *)nxq())) {
1893 pqr->fSeqNum = ntot;
1900 if (fQMgr->Queries()) {
1902 TIter nxq(fQMgr->Queries());
1903 TProofQueryResult *pqr = 0;
1904 TQueryResult *pqm = 0;
1905 while ((pqr = (TProofQueryResult *)nxq())) {
1907 if ((pqm = pqr->CloneInfo())) {
1908 pqm->fSeqNum = ntot;
1911 Warning(
"GetListOfQueries",
"unable to clone TProofQueryResult '%s:%s'",
1912 pqr->GetName(), pqr->GetTitle());
1917 ndraw = fQMgr->DrawQueries();
1920 fOtherQueries = npre;
1921 fDrawQueries = ndraw;
1942 Bool_t TProofLite::RegisterDataSet(
const char *uri,
1943 TFileCollection *dataSet,
const char* optStr)
1945 if (!fDataSetManager) {
1946 Info(
"RegisterDataSet",
"dataset manager not available");
1950 if (!uri || strlen(uri) <= 0) {
1951 Info(
"RegisterDataSet",
"specifying a dataset name is mandatory");
1955 Bool_t parallelverify = kFALSE;
1956 TString sopt(optStr);
1957 if (sopt.Contains(
"V") && !sopt.Contains(
"S")) {
1959 parallelverify = kTRUE;
1960 sopt.ReplaceAll(
"V",
"");
1963 sopt.ReplaceAll(
"S",
"");
1965 Bool_t result = kTRUE;
1966 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
1968 if (!dataSet || dataSet->GetList()->GetSize() == 0) {
1969 Error(
"RegisterDataSet",
"can not save an empty list.");
1973 result = (fDataSetManager->RegisterDataSet(uri, dataSet, sopt) == 0)
1976 Info(
"RegisterDataSet",
"dataset registration not allowed");
1981 Error(
"RegisterDataSet",
"dataset was not saved");
1984 if (!parallelverify)
return result;
1988 if (VerifyDataSet(uri, sopt) < 0){
1989 Error(
"RegisterDataSet",
"problems verifying dataset '%s'", uri);
2002 Int_t TProofLite::SetDataSetTreeName(
const char *dataset,
const char *treename)
2004 if (!fDataSetManager) {
2005 Info(
"ExistsDataSet",
"dataset manager not available");
2009 if (!dataset || strlen(dataset) <= 0) {
2010 Info(
"SetDataSetTreeName",
"specifying a dataset name is mandatory");
2014 if (!treename || strlen(treename) <= 0) {
2015 Info(
"SetDataSetTreeName",
"specifying a tree name is mandatory");
2020 TString fragment(treename);
2021 if (!fragment.BeginsWith(
"/")) fragment.Insert(0,
"/");
2022 uri.SetFragment(fragment);
2024 return fDataSetManager->ScanDataSet(uri.GetUri().Data(),
2025 (UInt_t)TDataSetManager::kSetDefaultTree);
2031 Bool_t TProofLite::ExistsDataSet(
const char *uri)
2033 if (!fDataSetManager) {
2034 Info(
"ExistsDataSet",
"dataset manager not available");
2038 if (!uri || strlen(uri) <= 0) {
2039 Error(
"ExistsDataSet",
"dataset name missing");
2044 return fDataSetManager->ExistsDataSet(uri);
2050 TMap *TProofLite::GetDataSets(
const char *uri,
const char *srvex)
2052 if (!fDataSetManager) {
2053 Info(
"GetDataSets",
"dataset manager not available");
2058 if (srvex && strlen(srvex) > 0) {
2059 return fDataSetManager->GetSubDataSets(uri, srvex);
2061 UInt_t opt = (UInt_t)TDataSetManager::kExport;
2062 return fDataSetManager->GetDataSets(uri, opt);
2070 void TProofLite::ShowDataSets(
const char *uri,
const char *opt)
2072 if (!fDataSetManager) {
2073 Info(
"GetDataSet",
"dataset manager not available");
2077 fDataSetManager->ShowDataSets(uri, opt);
2084 TFileCollection *TProofLite::GetDataSet(
const char *uri,
const char *)
2086 if (!fDataSetManager) {
2087 Info(
"GetDataSet",
"dataset manager not available");
2088 return (TFileCollection *)0;
2091 if (!uri || strlen(uri) <= 0) {
2092 Info(
"GetDataSet",
"specifying a dataset name is mandatory");
2097 return fDataSetManager->GetDataSet(uri);
2104 Int_t TProofLite::RemoveDataSet(
const char *uri,
const char *)
2106 if (!fDataSetManager) {
2107 Info(
"RemoveDataSet",
"dataset manager not available");
2111 if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
2112 if (!fDataSetManager->RemoveDataSet(uri)) {
2117 Info(
"RemoveDataSet",
"dataset creation / removal not allowed");
2131 Bool_t TProofLite::RequestStagingDataSet(
const char *dataset)
2134 Error(
"RequestStagingDataSet",
"invalid dataset specified");
2138 if (!fDataSetStgRepo) {
2139 Error(
"RequestStagingDataSet",
"no dataset staging request repository available");
2143 TString dsUser, dsGroup, dsName, dsTree;
2146 TString validUri = dataset;
2147 while (fReInvalid->Substitute(validUri,
"_")) {}
2150 if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
2151 Warning(
"RequestStagingDataSet",
"staging of %s already requested", dataset);
2156 TFileCollection *fc = fDataSetManager->GetDataSet(dataset);
2157 if (!fc || (fc->GetNFiles() == 0)) {
2158 Error(
"RequestStagingDataSet",
"empty dataset or no dataset returned");
2164 TIter it(fc->GetList());
2166 while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
2167 fi->ResetBit(TFileInfo::kStaged);
2168 Int_t nToErase = fi->GetNUrls() - 1;
2169 for (Int_t i=0; i<nToErase; i++)
2176 fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
2177 if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser, dsName, fc) == 0) {
2179 Error(
"RequestStagingDataSet",
"can't register staging request for %s", dataset);
2184 Info(
"RequestStagingDataSet",
"Staging request registered for %s", dataset);
2195 Bool_t TProofLite::CancelStagingDataSet(
const char *dataset)
2198 Error(
"CancelStagingDataSet",
"invalid dataset specified");
2202 if (!fDataSetStgRepo) {
2203 Error(
"CancelStagingDataSet",
"no dataset staging request repository available");
2208 TString validUri = dataset;
2209 while (fReInvalid->Substitute(validUri,
"_")) {}
2211 if (!fDataSetStgRepo->RemoveDataSet(validUri.Data()))
2223 TFileCollection *TProofLite::GetStagingStatusDataSet(
const char *dataset)
2226 Error(
"GetStagingStatusDataSet",
"invalid dataset specified");
2230 if (!fDataSetStgRepo) {
2231 Error(
"GetStagingStatusDataSet",
"no dataset staging request repository available");
2236 TString validUri = dataset;
2237 while (fReInvalid->Substitute(validUri,
"_")) {}
2240 TFileCollection *fc = fDataSetStgRepo->GetDataSet(validUri.Data());
2243 Info(
"GetStagingStatusDataSet",
"no pending staging request for %s", dataset);
2255 Int_t TProofLite::VerifyDataSet(
const char *uri,
const char *optStr)
2257 if (!fDataSetManager) {
2258 Info(
"VerifyDataSet",
"dataset manager not available");
2263 TString sopt(optStr);
2264 if (sopt.Contains(
"S")) {
2266 if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
2267 rc = fDataSetManager->ScanDataSet(uri);
2269 Info(
"VerifyDataSet",
"dataset verification not allowed");
2276 return VerifyDataSetParallel(uri, optStr);
2282 void TProofLite::ClearDataSetCache(
const char *dataset)
2284 if (fDataSetManager) fDataSetManager->ClearCache(dataset);
2292 void TProofLite::ShowDataSetCache(
const char *dataset)
2295 if (fDataSetManager) fDataSetManager->ShowCache(dataset);
2310 void TProofLite::SendInputDataFile()
2314 PrepareInputDataFile(dataFile);
2317 if (dataFile.Length() > 0) {
2319 if (!dataFile.BeginsWith(fCacheDir)) {
2322 dst.Form(
"%s/%s", fCacheDir.Data(), gSystem->BaseName(dataFile));
2324 if (!gSystem->AccessPathName(dst))
2325 gSystem->Unlink(dst);
2327 if (gSystem->CopyFile(dataFile, dst) != 0)
2328 Warning(
"SendInputDataFile",
"problems copying '%s' to '%s'",
2329 dataFile.Data(), dst.Data());
2333 AddInput(
new TNamed(
"PROOF_InputDataFile", Form(
"%s", gSystem->BaseName(dataFile))));
2340 Int_t TProofLite::Remove(
const char *ref, Bool_t all)
2343 Info("Remove", "Enter: %s, %d", ref, all);
2348 fPlayer->RemoveQueryResult(ref);
2351 TString queryref(ref);
2353 if (queryref ==
"cleanupdir") {
2356 Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
2359 Info(
"Remove",
"%d directories removed", nd);
2366 TProofLockPath *lck = 0;
2367 if (fQMgr->LockSession(queryref, &lck) == 0) {
2370 fQMgr->RemoveQuery(queryref, 0);
2374 gSystem->Unlink(lck->GetName());
2382 Warning(
"Remove",
"query result manager undefined!");
2387 "query %s could not be removed (unable to lock session)", queryref.Data());
2397 TTree *TProofLite::GetTreeHeader(TDSet *dset)
2401 Error(
"GetTreeHeader",
"undefined TDSet");
2406 TDSetElement *e = dset->Next();
2407 Long64_t entries = 0;
2410 PDB(kGlobal, 1) Info("GetTreeHeader", "empty TDSet");
2412 f = TFile::Open(e->GetFileName());
2415 t = (TTree*) f->Get(e->GetObjName());
2417 t->SetMaxVirtualSize(0);
2419 entries = t->GetEntries();
2422 while ((e = dset->Next()) != 0) {
2423 TFile *f1 = TFile::Open(e->GetFileName());
2425 TTree *t1 = (TTree*) f1->Get(e->GetObjName());
2427 entries += t1->GetEntries();
2433 t->SetMaxEntryLoop(entries);
2450 void TProofLite::FindUniqueSlaves()
2452 fUniqueSlaves->Clear();
2453 fUniqueMonitor->RemoveAll();
2454 fAllUniqueSlaves->Clear();
2455 fAllUniqueMonitor->RemoveAll();
2456 fNonUniqueMasters->Clear();
2458 if (fActiveSlaves->GetSize() <= 0)
return;
2460 TSlave *wrk =
dynamic_cast<TSlave*
>(fActiveSlaves->First());
2462 Error(
"FindUniqueSlaves",
"first object in fActiveSlaves not a TSlave: embarrasing!");
2465 fUniqueSlaves->Add(wrk);
2466 fAllUniqueSlaves->Add(wrk);
2467 fUniqueMonitor->Add(wrk->GetSocket());
2468 fAllUniqueMonitor->Add(wrk->GetSocket());
2471 fUniqueMonitor->DeActivateAll();
2472 fAllUniqueMonitor->DeActivateAll();
2479 void TProofLite::ShowData()
2481 if (!IsValid())
return;
2484 TList *wrki = GetListOfSlaveInfos();
2487 while ((wi = (TSlaveInfo *) nxwi())) {
2488 ShowDataDir(wi->GetDataDir());
2495 void TProofLite::ShowDataDir(
const char *dirname)
2497 if (!dirname)
return;
2500 if (gSystem->GetPathInfo(dirname, dirst) != 0)
return;
2501 if (!R_ISDIR(dirst.fMode))
return;
2503 void *dirp = gSystem->OpenDirectory(dirname);
2505 const char *ent = 0;
2506 while ((ent = gSystem->GetDirEntry(dirp))) {
2507 fn.Form(
"%s/%s", dirname, ent);
2509 if (gSystem->GetPathInfo(fn.Data(), st) == 0) {
2510 if (R_ISREG(st.fMode)) {
2511 Printf(
"lite:0| %s", fn.Data());
2512 }
else if (R_ISREG(st.fMode)) {
2513 ShowDataDir(fn.Data());
2528 Int_t TProofLite::PollForNewWorkers()
2531 if (fDynamicStartupNMax <= 0) {
2533 if (gSystem->GetSysInfo(&si) == 0 && si.fCpus > 2) {
2534 fDynamicStartupNMax = si.fCpus;
2536 fDynamicStartupNMax = 2;
2539 if (fNWorkers >= fDynamicStartupNMax) {
2541 Info(
"PollForNewWorkers",
"max reached: %d workers started", fNWorkers);
2542 fDynamicStartup = kFALSE;
2547 Int_t nAdd = (fDynamicStartupStep > 0) ? fDynamicStartupStep : 1;
2550 TMonitor *mon =
new TMonitor;
2551 mon->Add(fServSock);
2555 Int_t nWrksDone = 0, nWrksTot = -1;
2558 nWrksTot = fNWorkers + nAdd;
2561 Int_t ord = fNWorkers;
2562 for (; ord < nWrksTot; ord++) {
2565 fullord = Form(
"0.%d", ord);
2568 SetProofServEnv(fullord);
2571 if ((wrk = CreateSlave(
"lite", fullord, 100, fImage, fWorkDir)))
2575 Info("PollForNewWorkers", "additional worker '%s' started", fullord.Data());
2578 NotifyStartUp("Opening connections to workers", ++nWrksDone, nWrksTot);
2581 fNWorkers = nWrksTot;
2584 TList *addedWorkers = new TList();
2585 addedWorkers->SetOwner(kFALSE);
2589 nWrksTot = started.GetSize();
2591 Int_t to = gEnv->GetValue("ProofLite.StartupTimeOut", 5) * 1000;
2592 while (started.GetSize() > 0 && nSelects < nWrksTot) {
2595 TSocket *xs = mon->Select(to);
2599 if (xs == (TSocket *) -1)
continue;
2602 TSocket *s = fServSock->Accept();
2603 if (s && s->IsValid()) {
2606 if (s->Recv(msg) < 0) {
2607 Warning(
"PollForNewWorkers",
"problems receiving message from accepted socket!");
2612 if ((wrk = (TSlave *) started.FindObject(fullord))) {
2614 started.Remove(wrk);
2622 { R__LOCKGUARD(gROOTMutex);
2623 gROOT->GetListOfSockets()->Remove(s);
2625 if (wrk->IsValid()) {
2627 wrk->SetInputHandler(
new TProofInputHandler(
this, wrk->GetSocket()));
2632 wrk->SetupServ(TSlave::kSlave, 0);
2637 if (wrk->IsValid()) {
2638 fActiveSlaves->Add(wrk);
2639 fAllMonitor->Add(wrk->GetSocket());
2641 addedWorkers->Add(wrk);
2643 NotifyStartUp(
"Setting up added worker servers", ++nWrksDone, nWrksTot);
2646 fBadSlaves->Add(wrk);
2650 Warning(
"PollForNewWorkers",
"received empty message from accepted socket!");
2657 mon->DeActivateAll();
2660 Broadcast(kPROOF_GETSTATS, addedWorkers);
2661 Collect(addedWorkers, fCollectTimeout);
2668 SendCurrentState(addedWorkers);
2671 SetupWorkersEnv(addedWorkers, kTRUE);
2678 Info("PollForNewWorkers", "Will send the PROCESS message to selected workers");
2679 fPlayer->JoinProcess(addedWorkers);
2683 Collect(addedWorkers);
2686 TIter naw(addedWorkers);
2687 while ((wrk = (TSlave *)naw())) {
2688 fActiveMonitor->Add(wrk->GetSocket());
2691 delete addedWorkers;