54 class TProofMgrInterruptHandler :
public TSignalHandler {
58 TProofMgrInterruptHandler(
const TProofMgrInterruptHandler&);
59 TProofMgrInterruptHandler& operator=(
const TProofMgrInterruptHandler&);
61 TProofMgrInterruptHandler(TProofMgr *mgr)
62 : TSignalHandler(kSigInterrupt, kFALSE), fMgr(mgr) { }
69 Bool_t TProofMgrInterruptHandler::Notify()
72 if (isatty(0) != 0 && isatty(1) != 0) {
73 TString u = fMgr->GetUrl();
74 Printf(
"Opening new connection to %s", u.Data());
75 TXSocket *s =
new TXSocket(u,
'C', kPROOF_Protocol,
76 kXPROOF_Protocol, 0, -1, (TXHandler *)fMgr);
77 if (s && s->IsValid()) {
88 TProofMgr *GetTXProofMgr(
const char *url, Int_t l,
const char *al)
89 {
return ((TProofMgr *)
new TXProofMgr(url, l, al)); }
91 class TXProofMgrInit {
94 TProofMgr::SetTXProofMgrHook(&GetTXProofMgr);
96 static TXProofMgrInit gxproofmgr_init;
101 TXProofMgr::TXProofMgr(
const char *url, Int_t dbg,
const char *alias)
102 : TProofMgr(url, dbg, alias)
105 fServType = kXProofd;
108 if (Init(dbg) != 0) {
122 Int_t TXProofMgr::Init(Int_t)
126 TString u = fUrl.GetUrl(kTRUE);
129 if (!(fSocket =
new TXSocket(u,
'C', kPROOF_Protocol,
130 kXPROOF_Protocol, 0, -1,
this)) ||
131 !(fSocket->IsValid())) {
132 if (!fSocket || !(fSocket->IsServProofd()))
134 Error(
"Init",
"while opening the connection to %s - exit (error: %d)",
135 u.Data(), (fSocket ? fSocket->GetOpenError() : -1));
136 if (fSocket && fSocket->IsServProofd())
137 fServType = TProofMgr::kProofd;
142 fRemoteProtocol = fSocket->GetRemoteProtocol();
145 { R__LOCKGUARD(gROOTMutex);
146 gROOT->GetListOfSockets()->Remove(fSocket);
150 fIntHandler =
new TProofMgrInterruptHandler(
this);
159 TXProofMgr::~TXProofMgr()
167 void TXProofMgr::SetInvalid()
174 { R__LOCKGUARD(gROOTMutex);
175 gROOT->GetListOfSockets()->Remove(
this);
184 TProof *TXProofMgr::AttachSession(TProofDesc *d, Bool_t gui)
187 Warning(
"AttachSession",
"invalid TXProofMgr - do nothing");
191 Warning(
"AttachSession",
"invalid description object - do nothing");
197 return d->GetProof();
200 TString u(Form(
"%s/?%d", fUrl.GetUrl(kTRUE), d->GetRemoteId()));
208 TProof *p =
new TProof(u, 0, 0, gDebug, 0,
this);
209 if (p && p->IsValid()) {
215 Int_t st = (p->IsIdle()) ? TProofDesc::kIdle
216 : TProofDesc::kRunning;
221 p->SetName(d->GetName());
225 Error(
"AttachSession",
"attaching to PROOF session");
236 void TXProofMgr::DetachSession(Int_t
id, Option_t *opt)
239 Warning(
"DetachSession",
"invalid TXProofMgr - do nothing");
245 TProofDesc *d = GetProofDesc(
id);
248 fSocket->DisconnectSession(d->GetRemoteId(), opt);
249 TProof *p = d->GetProof();
250 fSessions->Remove(d);
254 }
else if (
id == 0) {
258 TString o = Form(
"%sA",opt);
259 fSocket->DisconnectSession(-1, o);
263 TIter nxd(fSessions);
265 while ((d = (TProofDesc *)nxd())) {
266 TProof *p = d->GetProof();
280 void TXProofMgr::DetachSession(TProof *p, Option_t *opt)
283 Warning(
"DetachSession",
"invalid TXProofMgr - do nothing");
289 TProofDesc *d = GetProofDesc(p);
292 fSocket->DisconnectSession(d->GetRemoteId(), opt);
293 fSessions->Remove(d);
308 Bool_t TXProofMgr::MatchUrl(
const char *url)
311 Warning(
"MatchUrl",
"invalid TXProofMgr - do nothing");
318 if (!strcmp(u.GetProtocol(), TUrl(
"a").GetProtocol()))
319 u.SetProtocol(
"proof");
321 if (u.GetPort() == TUrl(
"a").GetPort()) {
323 Int_t port = gSystem->GetServiceByName(
"proofd");
330 if (!strcmp(u.GetHostFQDN(), fUrl.GetHost()))
331 if (u.GetPort() == fUrl.GetPort() ||
332 u.GetPort() == fSocket->GetPort())
333 if (strlen(u.GetUser()) <= 0 || !strcmp(u.GetUser(),fUrl.GetUser()))
343 void TXProofMgr::ShowWorkers()
346 Warning(
"ShowWorkers",
"invalid TXProofMgr - do nothing");
351 TObjString *os = fSocket->SendCoordinator(kQueryWorkers);
353 TObjArray *oa = TString(os->GetName()).Tokenize(TString(
"&"));
357 while ((to = (TObjString *) nxos()))
359 Printf(
"+ %s", to->GetName());
369 const char *TXProofMgr::GetMssUrl(Bool_t retrieve)
371 if (fMssUrl.IsNull() || retrieve) {
374 Error(
"GetMssUrl",
"invalid TXProofMgr - do nothing");
378 if (fSocket->GetXrdProofdVersion() < 1007) {
379 Error(
"GetMssUrl",
"functionality not supported by server");
382 TObjString *os = fSocket->SendCoordinator(kQueryMssUrl);
384 Printf(
"os: '%s'", os->GetName());
385 fMssUrl = os->GetName();
388 Error(
"GetMssUrl",
"problems retrieving the required information");
391 }
else if (!IsValid()) {
392 Warning(
"GetMssUrl",
"TXProofMgr is now invalid: information may not be valid");
397 return fMssUrl.Data();
403 TList *TXProofMgr::QuerySessions(Option_t *opt)
405 if (opt && !strncasecmp(opt,
"L",1))
411 Warning(
"QuerySessions",
"invalid TXProofMgr - do nothing");
417 fSessions =
new TList();
418 fSessions->SetOwner();
422 TList *ocl =
new TList;
423 TObjString *os = fSocket->SendCoordinator(kQuerySessions);
425 TObjArray *oa = TString(os->GetName()).Tokenize(TString(
"|"));
429 TObjString *to = (TObjString *) nxos();
430 if (to && to->GetString().IsDigit() && !strncasecmp(opt,
"S",1))
431 Printf(
"// +++ %s session(s) currently active +++", to->GetName());
432 while ((to = (TObjString *) nxos())) {
434 Int_t
id = -1, st = -1;
437 while (to->GetString()[from] ==
' ') { from++; }
438 if (!to->GetString().Tokenize(tk, from,
" ") || !tk.IsDigit())
continue;
440 if (!to->GetString().Tokenize(tg, from,
" "))
continue;
441 if (!to->GetString().Tokenize(al, from,
" "))
continue;
442 if (!to->GetString().Tokenize(tk, from,
" ") || !tk.IsDigit())
continue;
445 if (!(d = (TProofDesc *) fSessions->FindObject(tg))) {
446 Int_t locid = fSessions->GetSize() + 1;
447 d =
new TProofDesc(tg, al, GetUrl(), locid,
id, st, 0);
456 ocl->Add(
new TObjString(tg));
464 if (fSessions->GetSize() > 0) {
465 TIter nxd(fSessions);
467 while ((d = (TProofDesc *)nxd())) {
468 if (ocl->FindObject(d->GetName())) {
469 if (opt && !strncasecmp(opt,
"S",1))
472 fSessions->Remove(d);
485 Bool_t TXProofMgr::HandleInput(
const void *)
487 if (fSocket && fSocket->IsValid()) {
489 if (fSocket->Recv(mess) >= 0) {
490 Int_t what = mess->What();
492 Info(
"HandleInput",
"%p: got message type: %d",
this, what);
495 fSocket->RemoteTouch();
498 Warning(
"HandleInput",
"%p: got unknown message type: %d",
this, what);
503 Warning(
"HandleInput",
"%p: got message but socket is invalid!",
this);
513 Bool_t TXProofMgr::HandleError(
const void *in)
515 XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
518 if (fSocket && herr && (herr->fOpt == 1)) {
519 fSocket->Reconnect();
520 if (fSocket && fSocket->IsValid()) {
522 Printf(
"ProofMgr: connection to coordinator at %s re-established",
527 Printf(
"TXProofMgr::HandleError: %p: got called ...",
this);
530 if (fSessions && fSessions->GetSize() > 0) {
531 TIter nxd(fSessions);
533 while ((d = (TProofDesc *)nxd())) {
534 TProof *p = (TProof *) d->GetProof();
536 p->InterruptCurrentMonitor();
540 Printf(
"TXProofMgr::HandleError: %p: DONE ... ",
this);
557 Int_t TXProofMgr::Reset(Bool_t hard,
const char *usr)
561 Warning(
"Reset",
"invalid TXProofMgr - do nothing");
565 Int_t h = (hard) ? 1 : 0;
566 fSocket->SendCoordinator(kCleanupSessions, usr, h);
592 TProofLog *TXProofMgr::GetSessionLogs(Int_t isess,
const char *stag,
593 const char *pattern, Bool_t rescan)
597 Warning(
"GetSessionLogs",
"invalid TXProofMgr - do nothing");
604 isess = (isess > 0) ? -isess : isess;
608 TString sesstag(stag);
609 if (sesstag ==
"NR") {
615 Int_t xrs = (rescan) ? 1 : 0;
616 TObjString *os = fSocket->SendCoordinator(kQueryLogPaths, sesstag.Data(), isess, -1, xrs);
621 TString rs(os->GetName());
625 if (!rs.Tokenize(tag, from,
"|")) {
626 Warning(
"GetSessionLogs",
"Session tag undefined: corruption?\n"
627 " (received string: %s)", os->GetName());
628 return (TProofLog *)0;
632 if (!rs.Tokenize(purl, from,
"|")) {
633 Warning(
"GetSessionLogs",
"Pool URL undefined: corruption?\n"
634 " (received string: %s)", os->GetName());
635 return (TProofLog *)0;
639 pl =
new TProofLog(tag, GetUrl(),
this);
643 while (rs.Tokenize(to, from,
"|")) {
646 ord.Strip(TString::kLeading,
' ');
648 if ((ii = ord.Index(
" ")) != kNPOS)
650 if ((ii = url.Index(
" ")) != kNPOS)
651 url.Remove(0, ii + 1);
653 if (url.Contains(
".valgrind")) ord +=
"-valgrind";
657 Info(
"GetSessionLogs",
"ord: %s, url: %s", ord.Data(), url.Data());
663 if (pl && retrieve) {
664 const char *pat = pattern ? pattern :
"-v \"| SvcMsg\"";
665 if (pat && strlen(pat) > 0)
666 pl->Retrieve(
"*", TProofLog::kGrep, 0, pat);
680 TObjString *TXProofMgr::ReadBuffer(
const char *fin, Long64_t ofs, Int_t len)
684 Warning(
"ReadBuffer",
"invalid TXProofMgr - do nothing");
685 return (TObjString *)0;
689 return fSocket->SendCoordinator(kReadBuffer, fin, len, ofs, 0);
697 TObjString *TXProofMgr::ReadBuffer(
const char *fin,
const char *pattern)
701 Warning(
"ReadBuffer",
"invalid TXProofMgr - do nothing");
702 return (TObjString *)0;
707 if (*pattern ==
'|') {
717 Int_t plen = strlen(ptr);
718 Int_t lfi = strlen(fin);
719 char *buf =
new char[lfi + plen + 1];
720 memcpy(buf, fin, lfi);
721 memcpy(buf+lfi, ptr, plen);
725 return fSocket->SendCoordinator(kReadBuffer, buf, plen, 0, type);
731 void TXProofMgr::ShowROOTVersions()
735 Warning(
"ShowROOTVersions",
"invalid TXProofMgr - do nothing");
740 TObjString *os = fSocket->SendCoordinator(kQueryROOTVersions);
743 Printf(
"----------------------------------------------------------\n");
744 Printf(
"Available versions (tag ROOT-vers remote-path PROOF-version):\n");
745 Printf(
"%s", os->GetName());
746 Printf(
"----------------------------------------------------------");
757 Int_t TXProofMgr::SetROOTVersion(
const char *tag)
761 Warning(
"SetROOTVersion",
"invalid TXProofMgr - do nothing");
766 fSocket->SendCoordinator(kROOTVersion, tag);
769 return (fSocket->GetOpenError() != kXR_noErrorYet) ? -1 : 0;
780 Int_t TXProofMgr::SendMsgToUsers(
const char *msg,
const char *usr)
785 if (!msg || strlen(msg) <= 0) {
786 Error(
"SendMsgToUsers",
"no message to send - do nothing");
791 const Int_t kMAXBUF = 32768;
792 char buf[kMAXBUF] = {0};
794 size_t space = kMAXBUF - 1;
798 if (usr && strlen(usr) > 0 && (strlen(usr) != 1 || usr[0] !=
'*')) {
799 lusr = (strlen(usr) + 3);
800 snprintf(buf, kMAXBUF,
"u:%s ", usr);
807 if (!gSystem->AccessPathName(msg, kFileExists)) {
809 if (gSystem->AccessPathName(msg, kReadPermission)) {
810 Error(
"SendMsgToUsers",
"request to read message from unreadable file '%s'", msg);
815 if (!(f = fopen(msg,
"r"))) {
816 Error(
"SendMsgToUsers",
"file '%s' cannot be open", msg);
821 off_t rcsk = lseek(fileno(f), (off_t) 0, SEEK_END);
822 if ((rcsk != (off_t)(-1))) {
823 left = (size_t) rcsk;
824 if ((lseek(fileno(f), (off_t) 0, SEEK_SET) == (off_t)(-1))) {
825 Error(
"SendMsgToUsers",
"cannot rewind open file (seek to 0)");
830 Error(
"SendMsgToUsers",
"cannot get size of open file (seek to END)");
835 size_t wanted = left;
836 if (wanted > space) {
838 Warning(
"SendMsgToUsers",
839 "requested to send %lld bytes: max size is %lld bytes: truncating",
840 (Long64_t)left, (Long64_t)space);
843 while ((len = read(fileno(f), p, wanted)) < 0 &&
844 TSystem::GetErrno() == EINTR)
845 TSystem::ResetErrno();
847 SysError(
"SendMsgToUsers",
"error reading file");
852 left = (len >= (ssize_t)left) ? 0 : left - len;
854 wanted = (left > kMAXBUF-1) ? kMAXBUF-1 : left;
856 }
while (len > 0 && left > 0);
862 if (len > (ssize_t)space) {
863 Warning(
"SendMsgToUsers",
864 "requested to send %lld bytes: max size is %lld bytes: truncating",
865 (Long64_t)len, (Long64_t)space);
875 fSocket->SendCoordinator(kSendMsgToUser, buf);
883 void TXProofMgr::Grep(
const char *what,
const char *how,
const char *where)
887 Error(
"Grep",
"invalid TXProofMgr - do nothing");
891 if (fSocket->GetXrdProofdVersion() < 1006) {
892 Error(
"Grep",
"functionality not supported by server");
897 TObjString *os = Exec(kGrep, what, how, where);
900 if (os) Printf(
"%s", os->GetName());
909 void TXProofMgr::Find(
const char *what,
const char *how,
const char *where)
913 Error(
"Find",
"invalid TXProofMgr - do nothing");
917 if (fSocket->GetXrdProofdVersion() < 1006) {
918 Error(
"Find",
"functionality not supported by server (XrdProofd version: %d)",
919 fSocket->GetXrdProofdVersion());
924 TObjString *os = Exec(kFind, what, how, where);
927 if (os) Printf(
"%s", os->GetName());
936 void TXProofMgr::Ls(
const char *what,
const char *how,
const char *where)
940 Error(
"Ls",
"invalid TXProofMgr - do nothing");
944 if (fSocket->GetXrdProofdVersion() < 1006) {
945 Error(
"Ls",
"functionality not supported by server");
950 TObjString *os = Exec(kLs, what, how, where);
953 if (os) Printf(
"%s", os->GetName());
962 void TXProofMgr::More(
const char *what,
const char *how,
const char *where)
966 Error(
"More",
"invalid TXProofMgr - do nothing");
970 if (fSocket->GetXrdProofdVersion() < 1006) {
971 Error(
"More",
"functionality not supported by server");
976 TObjString *os = Exec(kMore, what, how, where);
979 if (os) Printf(
"%s", os->GetName());
990 Int_t TXProofMgr::Rm(
const char *what,
const char *how,
const char *where)
994 Error(
"Rm",
"invalid TXProofMgr - do nothing");
998 if (fSocket->GetXrdProofdVersion() < 1006) {
999 Error(
"Rm",
"functionality not supported by server");
1003 TString prompt, ans(
"Y"), opt(how);
1004 Bool_t force = kFALSE;
1005 if (!opt.IsNull()) {
1008 while (!force && opt.Tokenize(t, from,
" ")) {
1009 if (t ==
"--force") {
1011 }
else if (t.BeginsWith(
"-") && !t.BeginsWith(
"--") && t.Contains(
"f")) {
1017 if (!force && isatty(0) != 0 && isatty(1) != 0) {
1019 prompt.Form(
"Do you really want to remove '%s'? [N/y]", what);
1021 while (ans !=
"N" && ans !=
"Y") {
1022 ans = Getline(prompt.Data());
1023 ans.Remove(TString::kTrailing,
'\n');
1024 if (ans ==
"") ans =
"N";
1026 if (ans !=
"N" && ans !=
"Y")
1027 Printf(
"Please answer y, Y, n or N");
1033 TObjString *os = Exec(kRm, what, how, where);
1036 if (gDebug > 1) Printf(
"%s", os->GetName());
1052 void TXProofMgr::Tail(
const char *what,
const char *how,
const char *where)
1056 Error(
"Tail",
"invalid TXProofMgr - do nothing");
1060 if (fSocket->GetXrdProofdVersion() < 1006) {
1061 Error(
"Tail",
"functionality not supported by server");
1066 TObjString *os = Exec(kTail, what, how, where);
1069 if (os) Printf(
"%s", os->GetName());
1078 Int_t TXProofMgr::Md5sum(
const char *what, TString &sum,
const char *where)
1082 Error(
"Md5sum",
"invalid TXProofMgr - do nothing");
1086 if (fSocket->GetXrdProofdVersion() < 1006) {
1087 Error(
"Md5sum",
"functionality not supported by server");
1091 if (where && !strcmp(where,
"all")) {
1092 Error(
"Md5sum",
"cannot run on all nodes at once: please specify one");
1097 TObjString *os = Exec(kMd5sum, what, 0, where);
1101 if (gDebug > 1) Printf(
"%s", os->GetName());
1102 sum = os->GetName();
1115 Int_t TXProofMgr::Stat(
const char *what, FileStat_t &st,
const char *where)
1119 Error(
"Stat",
"invalid TXProofMgr - do nothing");
1123 if (fSocket->GetXrdProofdVersion() < 1006) {
1124 Error(
"Stat",
"functionality not supported by server");
1128 if (where && !strcmp(where,
"all")) {
1129 Error(
"Stat",
"cannot run on all nodes at once: please specify one");
1134 TObjString *os = Exec(kStat, what, 0, where);
1138 if (gDebug > 1) Printf(
"%s", os->GetName());
1140 Int_t mode, uid, gid, islink;
1141 Long_t dev, ino, mtime;
1144 sscanf(os->GetName(),
"%ld %ld %d %d %d %I64d %ld %d", &dev, &ino, &mode,
1145 &uid, &gid, &size, &mtime, &islink);
1147 sscanf(os->GetName(),
"%ld %ld %d %d %d %lld %ld %d", &dev, &ino, &mode,
1148 &uid, &gid, &size, &mtime, &islink);
1159 st.fIsLink = (islink == 1);
1163 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1164 st.fDev = tkn.Atoi();
1165 if (st.fDev == -1)
return -1;
1166 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1167 st.fIno = tkn.Atoi();
1168 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1169 st.fMode = tkn.Atoi();
1170 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1171 st.fUid = tkn.Atoi();
1172 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1173 st.fGid = tkn.Atoi();
1174 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1175 st.fSize = tkn.Atoll();
1176 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1177 st.fMtime = tkn.Atoi();
1178 if (!os->GetString().Tokenize(tkn, from,
"[ ]+") || !tkn.IsDigit())
return -1;
1179 st.fIsLink = (tkn.Atoi() == 1) ? kTRUE : kFALSE;
1203 TObjString *TXProofMgr::Exec(Int_t action,
1204 const char *what,
const char *how,
const char *where)
1208 Error(
"Exec",
"invalid TXProofMgr - do nothing");
1209 return (TObjString *)0;
1212 if (fSocket->GetXrdProofdVersion() < 1006) {
1213 Error(
"Exec",
"functionality not supported by server");
1214 return (TObjString *)0;
1217 if (!what || strlen(what) <= 0) {
1218 Error(
"Exec",
"specifying a path is mandatory");
1219 return (TObjString *)0;
1223 if (action == kTail && !opt.IsNull()) {
1225 TString opts(how), o;
1227 Bool_t isc = kFALSE, isn = kFALSE;
1228 while (opts.Tokenize(o, from,
" ")) {
1230 if (!o.BeginsWith(
"-") && !isc && isn)
continue;
1232 opt.Form(
"-c %s", o.Data());
1236 opt.Form(
"-n %s", o.Data());
1241 }
else if (o ==
"-n") {
1243 }
else if (o ==
"--bytes=" || o ==
"--lines=") {
1245 }
else if (o.BeginsWith(
"-")) {
1246 o.Remove(TString::kLeading,
'-');
1247 if (o.IsDigit()) opt.Form(
"-%s", o.Data());
1254 if (cmd.IsNull()) cmd.Form(
"%s:%d", fUrl.GetHost(), fUrl.GetPort());
1261 if (fIntHandler) fIntHandler->Add();
1264 TObjString *os = fSocket->SendCoordinator(kExec, cmd.Data(), action);
1267 if (fIntHandler) fIntHandler->Remove();
1281 Int_t TXProofMgr::GetFile(
const char *remote,
const char *local,
const char *opt)
1286 Error(
"GetFile",
"invalid TXProofMgr - do nothing");
1290 if (fSocket->GetXrdProofdVersion() < 1006) {
1291 Error(
"GetFile",
"functionality not supported by server");
1296 TString filerem(remote);
1297 if (filerem.IsNull()) {
1298 Error(
"GetFile",
"remote file path undefined");
1305 Bool_t force = (oo.Contains(
"FORCE")) ? kTRUE : kFALSE;
1306 Bool_t silent = (oo.Contains(
"SILENT")) ? kTRUE : kFALSE;
1309 TString fileloc(local);
1310 if (fileloc.IsNull()) {
1312 fileloc = gSystem->BaseName(filerem);
1314 gSystem->ExpandPathName(fileloc);
1318 UInt_t openflags = O_WRONLY | O_BINARY;
1320 UInt_t openflags = O_WRONLY;
1322 UInt_t openmode = 0600;
1325 UserGroup_t *ugloc = 0;
1328 if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) == 0) {
1329 if (R_ISDIR(stloc.fMode)) {
1331 if (!fileloc.EndsWith(
"/")) fileloc +=
"/";
1332 fileloc += gSystem->BaseName(filerem);
1334 rcloc = gSystem->GetPathInfo(fileloc, stloc);
1338 if (!R_ISREG(stloc.fMode)) {
1340 Printf(
"[GetFile] local file '%s' exists and is not regular: cannot continue",
1345 if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
1346 Error(
"GetFile",
"cannot get user info for additional checks");
1350 Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
1351 Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
1352 Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
1354 if ((owner && !(stloc.fMode & kS_IWUSR)) ||
1355 (group && !(stloc.fMode & kS_IWGRP)) || (other && !(stloc.fMode & kS_IWOTH))) {
1357 Printf(
"[GetFile] file '%s' exists: no permission to delete or overwrite the file", fileloc.Data());
1358 Printf(
"[GetFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
1359 Printf(
"[GetFile] mode: %x", stloc.fMode);
1364 openflags |= O_CREAT | O_TRUNC;
1367 openflags |= O_CREAT;
1371 openflags |= O_CREAT;
1376 if (Md5sum(filerem, remsum) != 0) {
1378 Printf(
"[GetFile] remote file '%s' does not exists or cannot be read", filerem.Data());
1384 if (rcloc == 0 && !force) {
1385 TMD5 *md5loc = TMD5::FileChecksum(fileloc);
1387 if (remsum == md5loc->AsString()) {
1389 Printf(
"[GetFile] local file '%s' and remote file '%s' have the same MD5 check sum",
1390 fileloc.Data(), filerem.Data());
1391 Printf(
"[GetFile] use option 'force' to override");
1400 const char *a = Getline(
"Local file exists already: would you like to overwrite it? [N/y]");
1401 if (a[0] ==
'n' || a[0] ==
'N' || a[0] ==
'\0')
return 0;
1408 Int_t fdout = open(fileloc, openflags, openmode);
1410 Error(
"GetFile",
"could not open local file '%s' for writing: errno: %d", local, errno);
1415 TString cmd(filerem);
1419 gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
1424 TObjString *os = fSocket->SendCoordinator(kGetFile, cmd.Data());
1428 TString ssz(os->GetName());
1429 ssz.ReplaceAll(
" ",
"");
1430 if (!ssz.IsDigit()) {
1431 Error(
"GetFile",
"received non-digit size string: '%s' ('%s')", os->GetName(), ssz.Data());
1435 Long64_t size = ssz.Atoll();
1437 Error(
"GetFile",
"received null or negative size: %lld", size);
1443 const Int_t kMAXBUF = 16384;
1448 Long64_t filesize = 0, left = 0;
1449 while (rc == 0 && filesize < size) {
1450 left = size - filesize;
1451 if (left > kMAXBUF) left = kMAXBUF;
1452 rec = fSocket->RecvRaw(&buf, left);
1453 filesize = (rec > 0) ? (filesize + rec) : filesize;
1459 while ((w = write(fdout, p, r)) < 0 && TSystem::GetErrno() == EINTR)
1460 TSystem::ResetErrno();
1462 SysError(
"GetFile",
"error writing to unit: %d", fdout);
1470 CpProgress(
"GetFile", filesize, size, &watch);
1471 }
else if (rec < 0) {
1473 Error(
"GetFile",
"error during receiving file");
1478 CpProgress(
"GetFile", filesize, size, &watch, kTRUE);
1481 Error(
"GetFile",
"size not received");
1486 gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
1495 std::unique_ptr<TMD5> md5loc(TMD5::FileChecksum(fileloc));
1496 if (!(md5loc.get())) {
1497 Error(
"GetFile",
"cannot get MD5 checksum of the new local file '%s'", fileloc.Data());
1499 }
else if (remsum != md5loc->AsString()) {
1500 Error(
"GetFile",
"checksums for the local copy and the remote file differ: {rem:%s,loc:%s}",
1501 remsum.Data(), md5loc->AsString());
1515 Int_t TXProofMgr::PutFile(
const char *local,
const char *remote,
const char *opt)
1520 Error(
"PutFile",
"invalid TXProofMgr - do nothing");
1524 if (fSocket->GetXrdProofdVersion() < 1006) {
1525 Error(
"PutFile",
"functionality not supported by server");
1530 TString fileloc(local);
1531 if (fileloc.IsNull()) {
1532 Error(
"PutFile",
"local file path undefined");
1535 gSystem->ExpandPathName(fileloc);
1540 Bool_t force = (oo ==
"FORCE") ? kTRUE : kFALSE;
1543 TString filerem(remote);
1544 if (filerem.IsNull()) {
1546 filerem.Form(
"~/%s", gSystem->BaseName(fileloc));
1547 }
else if (filerem.EndsWith(
"/")) {
1549 filerem += gSystem->BaseName(fileloc);
1554 UInt_t openflags = O_RDONLY | O_BINARY;
1556 UInt_t openflags = O_RDONLY;
1562 if ((rcloc = gSystem->GetPathInfo(fileloc, stloc)) != 0 || !R_ISREG(stloc.fMode)) {
1564 const char *why = (rcloc == 0) ?
"is not regular" :
"does not exists";
1565 Printf(
"[PutFile] local file '%s' %s: cannot continue", fileloc.Data(), why);
1569 UserGroup_t *ugloc = 0;
1570 if (!(ugloc = gSystem->GetUserInfo(gSystem->GetUid()))) {
1571 Error(
"PutFile",
"cannot get user info for additional checks");
1575 Bool_t owner = (ugloc->fUid == stloc.fUid && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
1576 Bool_t group = (!owner && ugloc->fGid == stloc.fGid) ? kTRUE : kFALSE;
1577 Bool_t other = (!owner && !group) ? kTRUE : kFALSE;
1579 if ((owner && !(stloc.fMode & kS_IRUSR)) ||
1580 (group && !(stloc.fMode & kS_IRGRP)) || (other && !(stloc.fMode & kS_IROTH))) {
1581 Printf(
"[PutFile] file '%s': no permission to read the file", fileloc.Data());
1582 Printf(
"[PutFile] ownership: owner: %d, group: %d, other: %d", owner, group, other);
1583 Printf(
"[PutFile] mode: %x", stloc.fMode);
1589 TMD5 *md5loc = TMD5::FileChecksum(fileloc);
1591 Error(
"PutFile",
"cannot calculate the check sum for '%s'", fileloc.Data());
1594 locsum = md5loc->AsString();
1599 Bool_t same = kFALSE;
1602 if (Stat(filerem, strem) == 0) {
1603 if (Md5sum(filerem, remsum) != 0) {
1604 Printf(
"[PutFile] remote file exists but the check sum calculation failed");
1608 if (remsum == locsum) {
1610 Printf(
"[PutFile] local file '%s' and remote file '%s' have the same MD5 check sum",
1611 fileloc.Data(), filerem.Data());
1612 Printf(
"[PutFile] use option 'force' to override");
1619 const char *a = Getline(
"Remote file exists already: would you like to overwrite it? [N/y]");
1620 if (a[0] ==
'n' || a[0] ==
'N' || a[0] ==
'\0')
return 0;
1629 int fd = open(fileloc.Data(), openflags);
1631 Error(
"PutFile",
"cannot open file '%s': %d", fileloc.Data(), errno);
1637 cmd.Form(
"%s %lld", filerem.Data(), stloc.fSize);
1638 if (force) cmd +=
" force";
1642 gSystem->RemoveFileHandler(TXSocketHandler::GetSocketHandler());
1647 TObjString *os = fSocket->SendCoordinator(kPutFile, cmd.Data());
1652 const Int_t kMAXBUF = 16384;
1656 lseek(fd, pos, SEEK_SET);
1659 while (rc == 0 && pos < stloc.fSize) {
1660 Long64_t left = stloc.fSize - pos;
1661 if (left > kMAXBUF) left = kMAXBUF;
1663 while ((siz = read(fd, &buf[0], left)) < 0 && TSystem::GetErrno() == EINTR)
1664 TSystem::ResetErrno();
1665 if (siz < 0 || siz != left) {
1666 Error(
"PutFile",
"error reading from file: errno: %d", errno);
1671 if ((src = fSocket->fConn->WriteRaw((
void *)&buf[0], left)) != left) {
1672 Error(
"PutFile",
"error sending over: errno: %d (rc: %d)", TSystem::GetErrno(), src);
1677 CpProgress(
"PutFile", pos, stloc.fSize, &watch);
1682 CpProgress(
"PutFile", pos, stloc.fSize, &watch, kTRUE);
1685 Error(
"PutFile",
"command could not be executed");
1690 gSystem->AddFileHandler(TXSocketHandler::GetSocketHandler());
1699 if (Md5sum(filerem, remsum) != 0) {
1700 Printf(
"[PutFile] cannot get MD5 checksum of the new remote file '%s'", filerem.Data());
1702 }
else if (remsum != locsum) {
1703 Printf(
"[PutFile] checksums for the local copy and the remote file differ: {rem:%s, loc:%s}",
1704 remsum.Data(), locsum.Data());
1716 void TXProofMgr::CpProgress(
const char *pfx, Long64_t bytes,
1717 Long64_t size, TStopwatch *watch, Bool_t cr)
1720 if (!pfx || size == 0 || !watch)
return;
1722 fprintf(stderr,
"[%s] Total %.02f MB\t|", pfx, (Double_t)size/1048576);
1724 for (
int l = 0; l < 20; l++) {
1726 if (l < 20*bytes/size)
1727 fprintf(stderr,
"=");
1728 else if (l == 20*bytes/size)
1729 fprintf(stderr,
">");
1730 else if (l > 20*bytes/size)
1731 fprintf(stderr,
".");
1733 fprintf(stderr,
"=");
1736 gSystem->ProcessEvents();
1738 Double_t copytime = watch->RealTime();
1739 fprintf(stderr,
"| %.02f %% [%.01f MB/s]\r",
1740 100.0*bytes/size, bytes/copytime/1048576.);
1741 if (cr) fprintf(stderr,
"\n");
1750 Int_t TXProofMgr::Cp(
const char *src,
const char *dst,
const char *fmt)
1755 Error(
"Cp",
"invalid TXProofMgr - do nothing");
1759 if (fSocket->GetXrdProofdVersion() < 1006) {
1760 Error(
"Cp",
"functionality not supported by server");
1765 TString filesrc(src);
1766 if (filesrc.IsNull()) {
1767 Error(
"Cp",
"source file path undefined");
1771 TString filedst(dst);
1772 if (filedst.IsNull()) {
1773 filedst = gSystem->BaseName(TUrl(filesrc.Data()).GetFile());
1774 }
else if (filedst.EndsWith(
"/")) {
1776 filedst += gSystem->BaseName(filesrc);
1781 TUrl usrc = TUrl(filesrc.Data(), kTRUE).GetUrl();
1782 filesrc = usrc.GetUrl();
1783 if (!strcmp(usrc.GetProtocol(),
"file"))
1784 filesrc.Form(
"file://host/%s", usrc.GetFileAndOptions());
1785 TUrl udst = TUrl(filedst.Data(), kTRUE).GetUrl();
1786 filedst = udst.GetUrl();
1787 if (!strcmp(udst.GetProtocol(),
"file"))
1788 filedst.Form(
"file://host/%s", udst.GetFileAndOptions());
1792 cmd.Form(
"%s %s %s", filesrc.Data(), filedst.Data(), (fmt ? fmt :
""));
1795 if (fIntHandler) fIntHandler->Add();
1798 TObjString *os = fSocket->SendCoordinator(kCpFile, cmd.Data());
1801 if (fIntHandler) fIntHandler->Remove();
1805 if (gDebug > 0) Printf(
"%s", os->GetName());