40 TList TProofMgr::fgListOfManagers;
41 TProofMgr_t TProofMgr::fgTXProofMgrHook = 0;
62 TProofMgr::TProofMgr(
const char *url, Int_t,
const char *alias)
63 : TNamed(
"",
""), fRemoteProtocol(-1), fServType(kXProofd),
64 fSessions(0), fIntHandler(0)
69 fUrl = (!url || strlen(url) <= 0) ? TUrl(
"proof://localhost") : TUrl(url);
72 if (!strcmp(fUrl.GetProtocol(), TUrl(
"a").GetProtocol()))
73 fUrl.SetProtocol(
"proof");
76 if (fUrl.GetPort() == TUrl(
"a").GetPort()) {
80 Int_t port = gSystem->GetServiceByName(
"proofd");
83 Info(
"TProofMgr",
"service 'proofd' not found by GetServiceByName"
84 ": using default IANA assigned tcp port 1093");
88 Info(
"TProofMgr",
"port from GetServiceByName: %d", port);
94 if (strcmp(fUrl.GetHost(),
"__lite__")) {
95 if (strcmp(fUrl.GetHost(), fUrl.GetHostFQDN()))
96 fUrl.SetHost(fUrl.GetHostFQDN());
99 SetName(fUrl.GetUrl(kTRUE));
103 SetAlias(fUrl.GetHost());
109 TProofMgr::~TProofMgr()
111 SafeDelete(fSessions);
112 SafeDelete(fIntHandler);
114 fgListOfManagers.Remove(
this);
115 gROOT->GetListOfProofs()->Remove(
this);
123 TProof *TProofMgr::AttachSession(Int_t
id, Bool_t gui)
125 TProofDesc *d = GetProofDesc(
id);
127 return AttachSession(d, gui);
129 Info(
"AttachSession",
"invalid proofserv id (%d)",
id);
138 TProof *TProofMgr::AttachSession(TProofDesc *d, Bool_t)
141 Warning(
"AttachSession",
"invalid description object - do nothing");
147 return d->GetProof();
149 Warning(
"AttachSession",
"session not available - do nothing");
159 void TProofMgr::DetachSession(Int_t
id, Option_t *opt)
162 Warning(
"DetachSession",
"invalid TProofMgr - do nothing");
168 TProofDesc *d = GetProofDesc(
id);
171 d->GetProof()->Detach(opt);
172 TProof *p = d->GetProof();
173 fSessions->Remove(d);
178 }
else if (
id == 0) {
183 TIter nxd(fSessions);
185 while ((d = (TProofDesc *)nxd())) {
187 d->GetProof()->Detach(opt);
188 TProof *p = d->GetProof();
189 fSessions->Remove(d);
203 void TProofMgr::DetachSession(TProof *p, Option_t *opt)
206 Warning(
"DetachSession",
"invalid TProofMgr - do nothing");
212 TProofDesc *d = GetProofDesc(p);
216 d->GetProof()->Detach(opt);
217 fSessions->Remove(d);
228 TList *TProofMgr::QuerySessions(Option_t *opt)
230 if (opt && !strncasecmp(opt,
"L",1))
236 fSessions =
new TList();
237 fSessions->SetOwner();
241 if (gROOT->GetListOfProofs()) {
243 TIter nxp(gROOT->GetListOfProofs());
247 while ((o = nxp())) {
248 if (o->InheritsFrom(TProof::Class())) {
251 if (MatchUrl(p->GetUrl())) {
252 if (!(fSessions->FindObject(p->GetSessionTag()))) {
253 Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
254 : TProofDesc::kRunning;
256 new TProofDesc(p->GetName(), p->GetTitle(), p->GetUrl(),
257 ++ns, p->GetSessionID(), st, p);
266 if (fSessions->GetSize() > 0) {
267 TIter nxd(fSessions);
269 while ((d = (TProofDesc *)nxd())) {
271 if (!(gROOT->GetListOfProofs()->FindObject(d->GetProof()))) {
272 fSessions->Remove(d);
275 if (opt && !strncasecmp(opt,
"S",1))
294 Int_t TProofMgr::SendMsgToUsers(
const char *,
const char *)
296 Warning(
"SendMsgToUsers",
"functionality not supported");
306 Int_t TProofMgr::Reset(Bool_t,
const char *)
308 Warning(
"Reset",
"functionality not supported");
316 void TProofMgr::ShowWorkers()
318 AbstractMethod(
"ShowWorkers");
324 TProofDesc *TProofMgr::GetProofDesc(Int_t
id)
331 TIter nxd(fSessions);
332 while ((d = (TProofDesc *)nxd())) {
345 TProofDesc *TProofMgr::GetProofDesc(TProof *p)
352 TIter nxd(fSessions);
353 while ((d = (TProofDesc *)nxd())) {
354 if (p == d->GetProof())
366 void TProofMgr::DiscardSession(TProof *p)
371 TIter nxd(fSessions);
372 while ((d = (TProofDesc *)nxd())) {
373 if (p == d->GetProof()) {
374 fSessions->Remove(d);
386 TProof *TProofMgr::CreateSession(
const char *cfg,
387 const char *cfgdir, Int_t loglevel)
391 fUrl.SetOptions(
"std");
394 TProof *p =
new TProof(fUrl.GetUrl(), cfg, cfgdir, loglevel, 0,
this);
396 if (p && p->IsValid()) {
402 if (fSessions->Last())
403 ns = ((TProofDesc *)(fSessions->Last()))->GetLocalId() + 1;
406 fSessions =
new TList;
410 Int_t st = (p->IsIdle()) ? TProofDesc::kIdle : TProofDesc::kRunning ;
412 new TProofDesc(p->GetName(), p->GetTitle(), p->GetUrl(),
413 ns, p->GetSessionID(), st, p);
418 if (gDebug > 0) Error(
"CreateSession",
"PROOF session creation failed");
430 Bool_t TProofMgr::MatchUrl(
const char *url)
435 if (!strcmp(u.GetProtocol(), TUrl(
"a").GetProtocol()))
436 u.SetProtocol(
"proof");
439 if (u.GetPort() == TUrl(
"a").GetPort()) {
440 Int_t port = gSystem->GetServiceByName(
"proofd");
447 if (!strcmp(u.GetHostFQDN(), fUrl.GetHostFQDN()))
448 if (u.GetPort() == fUrl.GetPort())
449 if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
459 TList *TProofMgr::GetListOfManagers()
462 if (gROOT->GetListOfProofs()) {
463 TIter nxp(gROOT->GetListOfProofs());
465 while ((o = nxp())) {
466 if (o->InheritsFrom(TProofMgr::Class()) && !fgListOfManagers.FindObject(o))
467 fgListOfManagers.Add(o);
472 if (fgListOfManagers.GetSize() > 0) {
473 TIter nxp(&fgListOfManagers);
476 while ((o = nxp())) {
477 if (!(gROOT->GetListOfProofs()->FindObject(o))) {
478 fgListOfManagers.Remove(o);
480 TProofMgr *p = (TProofMgr *)o;
482 Printf(
"// #%d: \"%s\" (%s)", ++nm, p->GetName(), p->GetTitle());
487 Printf(
"No managers found");
491 return &fgListOfManagers;
498 TProofMgr *TProofMgr::Create(
const char *uin, Int_t loglevel,
499 const char *alias, Bool_t xpd)
503 Bool_t isLite = kFALSE;
507 TString proto = u.GetProtocol();
508 if (proto.IsNull()) {
509 u.SetUrl(gEnv->GetValue(
"Proof.LocalDefault",
"lite://"));
510 proto = u.GetProtocol();
512 TString host = u.GetHost();
513 if (proto ==
"lite" || host ==
"__lite__" ) {
516 u.SetHost(
"__lite__");
517 u.SetProtocol(
"proof");
520 ::Info(
"TProofMgr::Create",
"'lite' not yet supported on Windows");
528 if (!strcmp(u.GetProtocol(), TUrl(
"a").GetProtocol()))
529 u.SetProtocol(
"proof");
530 if (u.GetPort() == TUrl(
"a").GetPort())
535 const char *url = u.GetUrl();
538 TList *lm = TProofMgr::GetListOfManagers();
541 while ((m = (TProofMgr *)nxm())) {
543 if (m->MatchUrl(url))
return m;
545 fgListOfManagers.Remove(m);
554 return new TProofMgrLite(url, loglevel, alias);
558 Bool_t trystd = kTRUE;
562 TProofMgr_t cm = TProofMgr::GetXProofMgrHook();
564 m = (TProofMgr *) (*cm)(url, loglevel, alias);
566 trystd = (m && !(m->IsValid()) && m->IsProofd()) ? kTRUE : kFALSE;
573 m =
new TProofMgr(url, loglevel, alias);
578 fgListOfManagers.Add(m);
579 if (m->IsValid() && !(m->IsProofd())) {
580 R__LOCKGUARD(gROOTMutex);
581 gROOT->GetListOfProofs()->Add(m);
582 gROOT->GetListOfSockets()->Add(m);
595 TProofMgr_t TProofMgr::GetXProofMgrHook()
597 if (!fgTXProofMgrHook) {
599 TString prooflib =
"libProofx";
601 if ((p = gSystem->DynamicPathName(prooflib, kTRUE))) {
603 if (gSystem->Load(prooflib) == -1)
604 ::Error(
"TProofMgr::GetXProofMgrCtor",
605 "can't load %s", prooflib.Data());
607 ::Error(
"TProofMgr::GetXProofMgrCtor",
608 "can't locate %s", prooflib.Data());
612 return fgTXProofMgrHook;
618 void TProofMgr::SetTXProofMgrHook(TProofMgr_t pmh)
620 fgTXProofMgrHook = pmh;
630 Int_t TProofMgr::Ping(
const char *url, Bool_t checkxrd)
632 if (!url || (url && strlen(url) <= 0)) {
633 ::Error(
"TProofMgr::Ping",
"empty url - fail");
639 if (!strcmp(u.GetProtocol(),
"http") && u.GetPort() == 80) {
648 Int_t oldLevel = gErrorIgnoreLevel;
649 gErrorIgnoreLevel = kSysError+1;
650 TSocket s(u.GetHost(), u.GetPort());
651 if (!(s.IsValid())) {
653 ::Info(
"TProofMgr::Ping",
"could not open connection to %s:%d", u.GetHost(), u.GetPort());
654 gErrorIgnoreLevel = oldLevel;
660 memset(&initHS, 0,
sizeof(initHS));
661 int len =
sizeof(initHS);
663 initHS.fourth = (int)host2net((
int)4);
664 initHS.fifth = (int)host2net((
int)2012);
665 if ((writeCount = s.SendRaw(&initHS, len)) != len) {
667 ::Info(
"TProofMgr::Ping",
"1st: wrong number of bytes sent: %d (expected: %d)",
669 gErrorIgnoreLevel = oldLevel;
673 initHS.third = (int)host2net((
int)1);
674 if ((writeCount = s.SendRaw(&initHS, len)) != len) {
676 ::Info(
"TProofMgr::Ping",
"1st: wrong number of bytes sent: %d (expected: %d)",
678 gErrorIgnoreLevel = oldLevel;
683 dum[0] = (int)host2net((
int)4);
684 dum[1] = (int)host2net((
int)2012);
685 if ((writeCount = s.SendRaw(&dum[0],
sizeof(dum))) !=
sizeof(dum)) {
687 ::Info(
"TProofMgr::Ping",
"2nd: wrong number of bytes sent: %d (expected: %d)",
688 writeCount, (
int)
sizeof(dum));
689 gErrorIgnoreLevel = oldLevel;
696 int readCount = s.RecvRaw(&type, len);
697 if (readCount != len) {
699 ::Info(
"TProofMgr::Ping",
"1st: wrong number of bytes read: %d (expected: %d)",
701 gErrorIgnoreLevel = oldLevel;
705 type = net2host(type);
710 readCount = s.RecvRaw(&xbody, len);
711 if (readCount != len) {
713 ::Info(
"TProofMgr::Ping",
"2nd: wrong number of bytes read: %d (expected: %d)",
715 gErrorIgnoreLevel = oldLevel;
718 xbody.protover = net2host(xbody.protover);
719 xbody.msgval = net2host(xbody.msglen);
720 xbody.msglen = net2host(xbody.msgval);
722 }
else if (type == 8) {
724 if (gDebug > 0) ::Info(
"TProofMgr::Ping",
"server is old %s", (checkxrd ?
"ROOTD" :
"PROOFD"));
725 gErrorIgnoreLevel = oldLevel;
729 if (gDebug > 0) ::Info(
"TProofMgr::Ping",
"unknown server type: %d", type);
730 gErrorIgnoreLevel = oldLevel;
735 gErrorIgnoreLevel = oldLevel;
744 void TProofMgr::ReplaceSubdirs(
const char *fn, TString &fdst, TList &dirph)
746 if (!fn || (fn && strlen(fn) <= 0))
return;
747 if (dirph.GetSize() <= 0)
return;
753 while (dd.Tokenize(d, from,
"/")) {
754 if (!d.IsNull()) dirs.Add(
new TObjString(d));
756 if (dirs.GetSize() <= 0)
return;
757 dirs.SetOwner(kTRUE);
760 TParameter<Int_t> *pi = 0;
761 while ((pi = (TParameter<Int_t> *) nxph())) {
762 if (pi->GetVal() < dirs.GetSize()) {
763 TObjString *os = (TObjString *) dirs.At(pi->GetVal());
764 if (os) fdst.ReplaceAll(pi->GetName(), os->GetName());
766 ::Warning(
"TProofMgr::ReplaceSubdirs",
767 "requested directory level '%s' is not available in the file path",
801 TFileCollection *TProofMgr::UploadFiles(TList *src,
802 const char *mss,
const char *dest)
804 TFileCollection *ds = 0;
807 if (!src || (src && src->GetSize() <= 0)) {
808 ::Warning(
"TProofMgr::UploadFiles",
"list is empty!");
811 if (!mss || (mss && strlen(mss) <= 0)) {
812 ::Warning(
"TProofMgr::UploadFiles",
"MSS is undefined!");
819 if (dest && strlen(dest) > 0) {
820 TString dst(dest), dt;
822 TRegexp re(
"<d+[0-9]>");
823 while (dst.Tokenize(dt, from,
"/")) {
824 if (dt.Contains(re)) {
825 TParameter<Int_t> *pi =
new TParameter<Int_t>(dt, -1);
826 dt.ReplaceAll(
"<d",
"");
827 dt.ReplaceAll(
">",
"");
829 pi->SetVal(dt.Atoi());
836 dirph.SetOwner(kTRUE);
839 TString sForm = TString::Format(
"%%0%dd",
840 Int_t(TMath::Log10(src->GetEntries()+1)));
843 ds =
new TFileCollection();
849 while ((o = nxf())) {
851 if (!strcmp(o->ClassName(),
"TFileInfo")) {
852 if (!(fi = dynamic_cast<TFileInfo *>(o))) {
853 ::Warning(
"TProofMgr::UploadFiles",
854 "object of class name '%s' does not cast to %s - ignore",
855 o->ClassName(), o->ClassName());
858 furl = fi->GetFirstUrl();
859 }
else if (!strcmp(o->ClassName(),
"TObjString")) {
860 if (!(os = dynamic_cast<TObjString *>(o))) {
861 ::Warning(
"TProofMgr::UploadFiles",
862 "object of class name '%s' does not cast to %s - ignore",
863 o->ClassName(), o->ClassName());
866 furl =
new TUrl(os->GetName());
868 ::Warning(
"TProofMgr::UploadFiles",
869 "object of unsupported class '%s' found in list - ignore", o->ClassName());
874 if (gSystem->AccessPathName(furl->GetUrl()) == kFALSE) {
878 if (dest && strlen(dest) > 0) {
881 fdst += TString::Format(
"/%s", furl->GetFile());
885 if (fdst.Contains(
"<bn>")) fdst.ReplaceAll(
"<bn>", gSystem->BaseName(furl->GetFile()));
886 if (fdst.Contains(
"<fn>")) fdst.ReplaceAll(
"<fn>", furl->GetFile());
887 if (fdst.Contains(
"<bs>")) {
889 TString bs(gSystem->BaseName(furl->GetFile()));
890 Int_t idx = bs.Last(
'.');
891 if (idx != kNPOS) bs.Remove(idx);
892 fdst.ReplaceAll(
"<bs>", bs.Data());
894 if (fdst.Contains(
"<ex>")) {
896 TString ex(furl->GetFile());
897 Int_t idx = ex.Last(
'.');
898 if (idx != kNPOS) ex.Remove(0, idx+1);
900 fdst.ReplaceAll(
"<ex>", ex);
902 if (fdst.Contains(
"<pa>")) {
903 fdst.ReplaceAll(
"<pa>",
904 gSystem->BaseName(gSystem
905 ->DirName(furl->GetFile())));
908 if (fdst.Contains(
"<gp>")) {
909 fdst.ReplaceAll(
"<gp>",
910 gSystem->BaseName(gSystem
912 ->DirName(furl->GetFile()))));
918 if (fdst.Contains(
"<sn>")) {
919 TString skn = TString::Format(
"%d", kn);
920 fdst.ReplaceAll(
"<sn>", skn);
922 if (fdst.Contains(
"<s0>")) {
923 TString skn = TString::Format(sForm.Data(), kn);
924 fdst.ReplaceAll(
"<s0>", skn);
929 UserGroup_t *pw = gSystem->GetUserInfo();
931 if (fdst.Contains(
"<us>")) fdst.ReplaceAll(
"<us>", pw->fUser);
932 if (fdst.Contains(
"<gr>")) fdst.ReplaceAll(
"<gr>", pw->fGroup);
935 if (gProof && fdst.Contains(
"<pg>"))
936 fdst.ReplaceAll(
"<pg>", gProof->GetGroup());
939 if (dirph.GetSize() > 0)
940 TProofMgr::ReplaceSubdirs(gSystem->DirName(furl->GetFile()), fdst, dirph);
947 ::Info(
"TProofMgr::UploadFiles",
"uploading '%s' to '%s'", furl->GetUrl(), fdst.Data());
948 if (TFile::Cp(furl->GetUrl(), fdst.Data())) {
950 ds->Add(
new TFileInfo(fdst.Data()));
952 ::Error(
"TProofMgr::UploadFiles",
"file %s was not copied", furl->GetUrl());
986 TFileCollection *TProofMgr::UploadFiles(
const char *srcfiles,
987 const char *mss,
const char *dest)
989 TFileCollection *ds = 0;
992 if (!srcfiles || (srcfiles && strlen(srcfiles) <= 0)) {
993 ::Error(
"TProofMgr::UploadFiles",
"input text file or directory undefined!");
996 if (!mss || (mss && strlen(mss) <= 0)) {
997 ::Error(
"TProofMgr::UploadFiles",
"MSS is undefined!");
1001 TString inpath(gSystem->ExpandPathName(srcfiles));
1004 if (gSystem->GetPathInfo(inpath.Data(), fst)) {
1005 ::Error(
"TProofMgr::UploadFiles",
1006 "could not get information about the input path '%s':"
1007 " make sure that it exists and is readable", srcfiles);
1016 if (R_ISREG(fst.fMode)) {
1019 f.open(inpath.Data(), std::ifstream::out);
1022 line.ReadToDelim(f);
1023 line.Strip(TString::kTrailing,
'\n');
1025 if (line.BeginsWith(
"#"))
continue;
1026 if (gSystem->AccessPathName(line, kReadPermission) == kFALSE)
1027 files.Add(
new TFileInfo(line));
1031 ::Error(
"TProofMgr::UploadFiles",
"unable to open file '%s'", srcfiles);
1033 }
else if (R_ISDIR(fst.fMode)) {
1035 void *dirp = gSystem->OpenDirectory(inpath.Data());
1037 const char *ent = 0;
1038 while ((ent = gSystem->GetDirEntry(dirp))) {
1039 if (!strcmp(ent,
".") || !strcmp(ent,
".."))
continue;
1040 line.Form(
"%s/%s", inpath.Data(), ent);
1041 if (gSystem->AccessPathName(line, kReadPermission) == kFALSE)
1042 files.Add(
new TFileInfo(line));
1044 gSystem->FreeDirectory(dirp);
1046 ::Error(
"TProofMgr::UploadFiles",
"unable to open directory '%s'", inpath.Data());
1049 ::Error(
"TProofMgr::UploadFiles",
1050 "input path '%s' is neither a regular file nor a directory!", inpath.Data());
1053 if (files.GetSize() <= 0) {
1054 ::Warning(
"TProofMgr::UploadFiles",
"no files found in file or directory '%s'", inpath.Data());
1056 ds = TProofMgr::UploadFiles(&files, mss, dest);
1065 Int_t TProofMgr::Rm(
const char *what,
const char *,
const char *)
1070 Error(
"Rm",
"invalid TProofMgr - do nothing");
1074 if (!what || (what && strlen(what) <= 0)) {
1075 Error(
"Rm",
"path undefined!");
1080 if (!strcmp(u.GetProtocol(),
"file")) {
1081 rc = gSystem->Unlink(u.GetFile());
1083 rc = gSystem->Unlink(what);
1086 return (rc == 0) ? 0 : -1;
1093 ClassImp(TProofDesc);
1098 void TProofDesc::Print(Option_t *)
const
1100 const char *st[] = {
"unknown",
"idle",
"processing",
"shutting down"};
1102 Printf(
"// # %d", fLocalId);
1103 Printf(
"// alias: %s, url: \"%s\"", GetTitle(), GetUrl());
1104 Printf(
"// tag: %s", GetName());
1105 Printf(
"// status: %s, attached: %s (remote ID: %d)",st[fStatus+1], (fProof ?
"YES" :
"NO"), fRemoteId);