27 #include "Xrd/XrdBuffer.hh"
28 #include "Xrd/XrdScheduler.hh"
30 #include "XrdOuc/XrdOucStream.hh"
51 static int ExportCpCmd(
const char *k, XpdAdminCpCmd *cc,
void *s)
53 XPDLOC(PMGR,
"ExportCpCmd")
55 XrdOucString *ccs = (XrdOucString *)s;
57 if (ccs->length() > 0) *ccs +=
",";
61 TRACE(DBG, k <<
" : "<<cc->fCmd<<
" fmt: '"<<cc->fFmt<<
"'");
73 XrdProofdAdmin::XrdProofdAdmin(XrdProofdManager *mgr,
74 XrdProtocol_Config *pi, XrdSysError *e)
75 : XrdProofdConfig(pi->ConfigFN, e)
80 fAllowedCpCmds.Add(
"file",
new XpdAdminCpCmd(
"cp",
"cp -rp %s %s",1));
81 fAllowedCpCmds.Add(
"root",
new XpdAdminCpCmd(
"xrdcp",
"xrdcp %s %s",1));
82 fAllowedCpCmds.Add(
"xrd",
new XpdAdminCpCmd(
"xrdcp",
"xrdcp %s %s",1));
83 #if !defined(__APPLE__)
84 fAllowedCpCmds.Add(
"http",
new XpdAdminCpCmd(
"wget",
"wget %s -O %s",0));
85 fAllowedCpCmds.Add(
"https",
new XpdAdminCpCmd(
"wget",
"wget %s -O %s",0));
87 fAllowedCpCmds.Add(
"http",
new XpdAdminCpCmd(
"curl",
"curl %s -o %s",0));
88 fAllowedCpCmds.Add(
"https",
new XpdAdminCpCmd(
"curl",
"curl %s -o %s",0));
91 fAllowedCpCmds.Apply(ExportCpCmd, (
void *)&fCpCmds);
100 void XrdProofdAdmin::RegisterDirectives()
102 Register(
"exportpath",
new XrdProofdDirective(
"exportpath",
this, &DoDirectiveClass));
103 Register(
"cpcmd",
new XrdProofdDirective(
"cpcmd",
this, &DoDirectiveClass));
109 int XrdProofdAdmin::Process(XrdProofdProtocol *p,
int type)
111 XPDLOC(ALL,
"Admin::Process")
114 XPD_SETRESP(p, "Process");
116 TRACEP(p, REQ, "req
id: " << type << " ("<<
117 XrdProofdAux::AdminMsgType(type) << ")");
122 return QueryMssUrl(p);
124 return QuerySessions(p);
126 return QueryLogPaths(p);
127 case kCleanupSessions:
128 return CleanupSessions(p);
130 return SendMsgToUser(p);
131 case kGroupProperties:
132 return SetGroupProperties(p);
134 return GetWorkers(p);
136 return QueryWorkers(p);
137 case kQueryROOTVersions:
138 return QueryROOTVersions(p);
140 return SetROOTVersion(p);
142 return SetSessionAlias(p);
144 return SetSessionTag(p);
146 return ReleaseWorker(p);
156 emsg +=
"Invalid type: ";
162 response->Send(kXR_InvalidRequest, emsg.c_str());
172 int XrdProofdAdmin::Config(
bool rcf)
174 XPDLOC(ALL,
"Admin::Config")
177 if (XrdProofdConfig::Config(rcf) != 0) {
178 XPDERR(
"problems parsing file ");
183 msg = (rcf) ?
"re-configuring" :
"configuring";
184 TRACE(ALL, msg.c_str());
187 if (fExportPaths.size() > 0) {
188 TRACE(ALL,
"additional paths which can be browsed by all users: ");
189 std::list<XrdOucString>::iterator is = fExportPaths.begin();
190 while (is != fExportPaths.end()) { TRACE(ALL,
" "<<*is); ++is; }
193 TRACE(ALL,
"allowed/supported copy commands: "<<fCpCmds);
202 int XrdProofdAdmin::DoDirective(XrdProofdDirective *d,
203 char *val, XrdOucStream *cfg,
bool rcf)
205 XPDLOC(SMGR,
"Admin::DoDirective")
211 if (d->fName == "exportpath") {
212 return DoDirectiveExportPath(val, cfg, rcf);
213 }
else if (d->fName ==
"cpcmd") {
214 return DoDirectiveCpCmd(val, cfg, rcf);
216 TRACE(XERR,
"unknown directive: "<<d->fName);
224 int XrdProofdAdmin::DoDirectiveExportPath(
char *val, XrdOucStream *cfg,
bool)
226 XPDLOC(SMGR,
"Admin::DoDirectiveExportPath")
232 TRACE(ALL,"val: "<<val);
235 XrdOucString tkns(val), tkn;
237 while ((from = tkns.tokenize(tkn, from,
' ')) != STR_NPOS) {
238 fExportPaths.push_back(tkn);
241 val = cfg->GetWord();
251 int XrdProofdAdmin::DoDirectiveCpCmd(
char *val, XrdOucStream *cfg,
bool)
253 XPDLOC(SMGR,
"Admin::DoDirectiveCpCmd")
259 XrdOucString proto, cpcmd, fmt;
260 bool canput = 0, isfmt = 0, rm = 0;
263 XrdOucString tkn(val);
264 if (proto.length() <= 0) {
266 if (proto.beginswith(
'-')) {
271 }
else if (cpcmd.length() <= 0) {
273 }
else if (tkn.beginswith(
"put:")) {
275 if (tkn ==
"put:1") canput = 1;
276 }
else if (tkn.beginswith(
"fmt:")) {
277 fmt.assign(tkn, 4, -1);
286 val = cfg->GetWord();
291 fAllowedCpCmds.Del(proto.c_str());
292 }
else if (cpcmd.length() > 0 && fmt.length() > 0) {
295 fmt.insert(cpcmd, 0);
296 fAllowedCpCmds.Rep(proto.c_str(),
new XpdAdminCpCmd(cpcmd.c_str(),fmt.c_str(),canput));
298 TRACE(ALL,
"incomplete information: ignoring!");
303 fAllowedCpCmds.Apply(ExportCpCmd, (
void *)&fCpCmds);
312 int XrdProofdAdmin::QueryMssUrl(XrdProofdProtocol *p)
314 XPDLOC(ALL,
"Admin::QueryMssUrl")
317 XPD_SETRESP(p, "QueryMssUrl");
319 XrdOucString msg = fMgr->PoolURL();
321 msg += fMgr->NameSpace();
323 TRACEP(p, DBG, "sending: "<<msg);
326 response->Send((
void *)msg.c_str(), msg.length()+1);
335 int XrdProofdAdmin::QueryROOTVersions(XrdProofdProtocol *p)
337 XPDLOC(ALL,
"Admin::QueryROOTVersions")
340 XPD_SETRESP(p, "QueryROOTVersions");
342 XrdOucString msg = fMgr->ROOTMgr()->ExportVersions(p->Client()->ROOT());
344 TRACEP(p, DBG, "sending: "<<msg);
347 response->Send((
void *)msg.c_str(), msg.length()+1);
356 int XrdProofdAdmin::SetROOTVersion(XrdProofdProtocol *p)
358 XPDLOC(ALL,
"Admin::SetROOTVersion")
361 XPD_SETRESP(p, "SetROOTVersion");
364 const
char *t = p->Argp() ? (const
char *) p->Argp()->buff : "default";
365 int len = p->Argp() ? p->Request()->header.dlen : strlen("default");
366 XrdOucString tag(t,len);
371 if (tag.beginswith("u:")) {
373 usr.erase(usr.rfind(
' '));
374 usr.replace(
"u:",
"");
376 tag.erase(0,tag.find(
' ') + 1);
378 TRACEP(p, REQ,
"usr: "<<usr<<
", version tag: "<< tag);
382 XrdProofdClient *c = p->Client();
384 if (usr.length() > 0) {
386 if (usr.find(
':') != STR_NPOS) {
388 grp.erase(grp.rfind(
':'));
389 usr.erase(0,usr.find(
':') + 1);
392 (fMgr->GroupsMgr()) ? fMgr->GroupsMgr()->GetUserGroup(usr.c_str()) : 0;
393 grp = g ? g->Name() :
"default";
395 if (usr != p->Client()->User()) {
396 if (!p->SuperUser()) {
397 usr.insert(
"not allowed to change settings for usr '", 0);
399 TRACEP(p, XERR, usr.c_str());
400 response->Send(kXR_InvalidRequest, usr.c_str());
404 if (!(c = fMgr->ClientMgr()->GetClient(usr.c_str(), grp.c_str()))) {
406 XrdOucString emsg(
"user not found or not allowed: ");
408 TRACEP(p, XERR, emsg.c_str());
409 response->Send(kXR_InvalidRequest, emsg.c_str());
416 XrdROOT *r = fMgr->ROOTMgr()->GetVersion(tag.c_str());
418 if (!r && tag ==
"default") {
420 r = fMgr->ROOTMgr()->DefaultVersion();
428 TRACEP(p, DBG,
"default changed to "<<c->ROOT()->Tag()<<
429 " for {client, group} = {"<<usr<<
", "<<grp<<
"} ("<<c<<
")");
432 if (fMgr->SrvType() != kXPD_Worker) {
433 XrdOucString buf(
"u:");
434 buf += c->UI().fUser;
437 int type = ntohl(p->Request()->proof.int1);
438 brc = fMgr->NetMgr()->Broadcast(type, buf.c_str(), p->Client()->User(), response);
445 tag.insert(
"tag '", 0);
446 tag +=
"' not found in the list of available ROOT versions on some worker nodes";
447 TRACEP(p, XERR, tag.c_str());
448 response->Send(kXR_InvalidRequest, tag.c_str());
451 tag.insert(
"tag '", 0);
452 tag +=
"' not found in the list of available ROOT versions";
453 TRACEP(p, XERR, tag.c_str());
454 response->Send(kXR_InvalidRequest, tag.c_str());
464 int XrdProofdAdmin::QueryWorkers(XrdProofdProtocol *p)
466 XPDLOC(ALL,
"Admin::QueryWorkers")
469 XPD_SETRESP(p, "QueryWorkers");
472 XrdOucString sbuf(1024);
473 fMgr->ProofSched()->ExportInfo(sbuf);
476 char *buf = (
char *) sbuf.c_str();
477 int len = sbuf.length() + 1;
478 TRACEP(p, DBG, "sending: "<<buf);
481 response->Send(buf, len);
490 int XrdProofdAdmin::GetWorkers(XrdProofdProtocol *p)
492 XPDLOC(ALL,
"Admin::GetWorkers")
495 XPD_SETRESP(p, "GetWorkers");
498 int psid = ntohl(p->Request()->proof.sid);
501 XrdProofdProofServ *xps = 0;
502 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
503 TRACEP(p, XERR,
"session ID not found: "<<psid);
504 response->Send(kXR_InvalidRequest,
"session ID not found");
507 int pid = xps->SrvPID();
508 TRACEP(p, REQ,
"request from session "<<pid);
511 XrdOucString wrks(
"");
516 if (p->Request()->header.dlen > 0)
517 msg.assign((
const char *) p->Argp()->buff, 0, p->Request()->header.dlen);
518 if (fMgr->GetWorkers(wrks, xps, msg.c_str()) < 0 ) {
520 response->Send(kXR_InvalidRequest,
"GetWorkers failed");
526 char *buf = (
char *) wrks.c_str();
527 int len = wrks.length() + 1;
528 TRACEP(p, DBG,
"sending: "<<buf);
532 response->Send(buf, len);
535 response->Send(kXR_InvalidRequest,
"GetWorkers failed");
546 int XrdProofdAdmin::SetGroupProperties(XrdProofdProtocol *p)
548 XPDLOC(ALL,
"Admin::SetGroupProperties")
551 XPD_SETRESP(p, "SetGroupProperties");
554 int len = p->Request()->header.dlen;
555 char *grp = new
char[len+1];
556 memcpy(grp, p->Argp()->buff, len);
558 TRACEP(p, DBG, "request to change priority for group '"<< grp<<"'");
561 if (strcmp(grp, p->Client()->UI().fGroup.c_str())) {
562 TRACEP(p, XERR,
"received group does not match the user's one");
563 response->Send(kXR_InvalidRequest,
564 "SetGroupProperties: received group does not match the user's one");
570 int priority = ntohl(p->Request()->proof.int2);
573 if (fMgr && fMgr->PriorityMgr()) {
575 XPDFORM(buf,
"%s %d", grp, priority);
576 if (fMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kSetGroupPriority,
578 TRACEP(p, XERR,
"problem sending message on the pipe");
579 response->Send(kXR_ServerError,
580 "SetGroupProperties: problem sending message on the pipe");
587 TRACEP(p, REQ,
"priority for group '"<< grp<<
"' has been set to "<<priority);
601 int XrdProofdAdmin::SendMsgToUser(XrdProofdProtocol *p)
603 XPDLOC(ALL,
"Admin::SendMsgToUser")
606 XPD_SETRESP(p, "SendMsgToUser");
609 XrdProofdClient *tgtclnt = p->Client();
610 XrdProofdClient *c = 0;
611 std::list<XrdProofdClient *>::iterator i;
614 int len = p->Request()->header.dlen;
617 TRACEP(p, XERR,
"no message");
618 response->Send(kXR_InvalidRequest,
"SendMsgToUser: no message");
622 XrdOucString cmsg((
const char *)p->Argp()->buff, len);
624 if (cmsg.beginswith(
"u:")) {
626 int isp = cmsg.find(
' ');
627 if (isp != STR_NPOS) {
628 usr.assign(cmsg, 2, isp-1);
629 cmsg.erase(0, isp+1);
631 if (usr.length() > 0) {
632 TRACEP(p, REQ,
"request for user: '"<<usr<<
"'");
635 if ((c = fMgr->ClientMgr()->GetClient(usr.c_str(), 0))) {
641 TRACEP(p, XERR,
"target client not found");
642 response->Send(kXR_InvalidRequest,
643 "SendMsgToUser: target client not found");
649 if (cmsg.length() <= 0) {
651 TRACEP(p, XERR,
"no message after user specification");
652 response->Send(kXR_InvalidRequest,
653 "SendMsgToUser: no message after user specification");
658 if (!p->SuperUser()) {
659 if (usr.length() > 0) {
660 if (tgtclnt != p->Client()) {
661 TRACEP(p, XERR,
"not allowed to send messages to usr '"<<usr<<
"'");
662 response->Send(kXR_InvalidRequest,
663 "SendMsgToUser: not allowed to send messages to specified usr");
667 TRACEP(p, XERR,
"not allowed to send messages to connected users");
668 response->Send(kXR_InvalidRequest,
669 "SendMsgToUser: not allowed to send messages to connected users");
673 if (usr.length() <= 0) tgtclnt = 0;
677 fMgr->ClientMgr()->Broadcast(tgtclnt, cmsg.c_str());
689 int XrdProofdAdmin::QuerySessions(XrdProofdProtocol *p)
691 XPDLOC(ALL,
"Admin::QuerySessions")
694 XPD_SETRESP(p, "QuerySessions");
696 XrdOucString notmsg, msg;
698 XpdSrvMgrCreateCnt cnt(fMgr->SessionMgr(), XrdProofdProofServMgr::kProcessCnt);
699 msg = p->Client()->ExportSessions(notmsg, response);
702 if (notmsg.length() > 0) {
704 response->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) notmsg.c_str(), notmsg.length());
707 TRACEP(p, DBG,
"sending: "<<msg);
710 response->Send((
void *)msg.c_str(), msg.length()+1);
719 int XrdProofdAdmin::QueryLogPaths(XrdProofdProtocol *p)
721 XPDLOC(ALL,
"Admin::QueryLogPaths")
724 XPD_SETRESP(p, "QueryLogPaths");
726 int ridx = ntohl(p->Request()->proof.int2);
727 bool broadcast = (ntohl(p->Request()->proof.int3) == 1) ? 1 : 0;
730 XrdOucString stag, master, user, ord, buf;
731 int len = p->Request()->header.dlen;
733 buf.assign(p->Argp()->buff,0,len-1);
734 int im = buf.find(
"|master:");
735 int iu = buf.find(
"|user:");
736 int io = buf.find(
"|ord:");
738 stag.erase(stag.find(
"|"));
739 if (im != STR_NPOS) {
740 master.assign(buf, im + strlen(
"|master:"));
741 master.erase(master.find(
"|"));
743 if (iu != STR_NPOS) {
744 user.assign(buf, iu + strlen(
"|user:"));
745 user.erase(user.find(
"|"));
747 if (io != STR_NPOS) {
748 ord.assign(buf, io + strlen(
"|ord:"));
749 ord.erase(ord.find(
"|"));
751 if (stag.beginswith(
'*'))
754 TRACEP(p, DBG,
"master: "<<master<<
", user: "<<user<<
", ord: "<<ord<<
", stag: "<<stag);
756 XrdProofdClient *client = (user.length() > 0) ? 0 : p->Client();
759 client = fMgr->ClientMgr()->GetClient(user.c_str(), 0);
761 TRACEP(p, XERR,
"query sess logs: client for '"<<user<<
"' not found");
762 response->Send(kXR_InvalidRequest,
"QueryLogPaths: query log: client not found");
766 XrdOucString tag = (stag ==
"" && ridx >= 0) ?
"last" : stag;
767 if (stag ==
"" && client->Sandbox()->GuessTag(tag, ridx) != 0) {
768 TRACEP(p, XERR,
"query sess logs: session tag not found");
769 response->Send(kXR_InvalidRequest,
"QueryLogPaths: query log: session tag not found");
776 if (master.length() <= 0) {
778 rmsg += tag; rmsg +=
"|";
780 rmsg += fMgr->PoolURL(); rmsg +=
"|";
784 XrdOucString sdir(client->Sandbox()->Dir());
789 DIR *dir = opendir(sdir.c_str());
791 XrdOucString msg(
"cannot open dir ");
792 msg += sdir; msg +=
" (errno: "; msg += errno; msg +=
")";
793 TRACEP(p, XERR, msg.c_str());
794 response->Send(kXR_InvalidRequest, msg.c_str());
799 XrdOucString wfile(sdir);
800 wfile +=
"/.workers";
801 bool ismaster = (access(wfile.c_str(), F_OK) == 0) ? 1 : 0;
804 XrdOucString xo, logtag, xf;
805 int ilog, idas, iund1, iund2;
806 struct dirent *ent = 0;
807 while ((ent = (
struct dirent *)readdir(dir))) {
808 if (!strcmp(ent->d_name,
".") || !strcmp(ent->d_name,
".."))
continue;
809 XPDFORM(xf,
"%s/%s", sdir.c_str(), (
const char *) ent->d_name);
811 if (stat(xf.c_str(), &st) != 0)
continue;
812 if (!S_ISREG(st.st_mode))
continue;
814 if (xo.matches(
"*-*-*-*-*.log") <= 0 && xo.matches(
"*-*-*-*-*.valgrind.log") <= 0)
continue;
815 TRACEP(p, ALL,
"xf: "<<xf<<
"; st_mode: "<<st.st_mode);
817 if ((ilog = xo.find(
".log")) != STR_NPOS) {
818 xo.replace(
".log",
"");
822 iund1 = xo.find(
"__");
823 if (iund1 != STR_NPOS) {
824 iund2 = xo.rfind(
"__");
825 if ((iund2 != STR_NPOS) && (iund2 != iund1)) {
828 logtag.erase(0, iund1+2);
832 if ((idas = xo.find(
'-')) != STR_NPOS) xo.erase(0, idas + 1);
833 if ((idas = xo.find(
'-')) != STR_NPOS) xo.erase(idas);
834 if (ord.length() > 0 && (ord == xo)) {
837 if (ismaster && !broadcast) {
838 if (!strncmp(ent->d_name,
"master-", 7)) recordinfo = 1;
844 rmsg +=
"|"; rmsg += xo;
845 if (logtag !=
"") { rmsg +=
'('; rmsg += logtag; rmsg +=
')'; }
846 rmsg +=
" proof://"; rmsg += fMgr->Host(); rmsg +=
':';
847 rmsg += fMgr->Port(); rmsg +=
'/';
848 rmsg += sdir; rmsg +=
'/'; rmsg += ent->d_name;
856 if (broadcast && ismaster) {
857 XrdOucString msg(tag);
861 msg += client->User();
862 char *bmst = fMgr->NetMgr()->ReadLogPaths(msg.c_str(), ridx);
867 }
else if (ismaster) {
870 FILE *f = fopen(wfile.c_str(),
"r");
873 while (fgets(ln,
sizeof(ln), f)) {
874 if (ln[strlen(ln)-1] ==
'\n')
875 ln[strlen(ln)-1] = 0;
877 char *ps = strchr(ln,
' ');
882 char *po = strchr(ps,
' ');
886 char *pp = strchr(po,
' ');
891 rmsg +=
"|"; rmsg += po; rmsg +=
" ";
892 if (master.length() > 0) {
896 rmsg += ln; rmsg +=
'/';
899 char *ppl = strrchr(pp,
'/');
900 pp = (ppl) ? ppl : pp;
903 bool ismst = (strstr(pp,
"master-")) ? 1 : 0;
905 XrdClientUrlInfo u((
const char *)&ln[0]);
906 XrdOucString msg(stag);
911 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
912 char *bmst = fMgr->NetMgr()->ReadLogPaths(u.GetUrl().c_str(), msg.c_str(), ridx);
927 response->Send((
void *) rmsg.c_str(), rmsg.length()+1);
936 int XrdProofdAdmin::CleanupSessions(XrdProofdProtocol *p)
938 XPDLOC(ALL,
"Admin::CleanupSessions")
941 XPD_SETRESP(p, "CleanupSessions");
946 XrdProofdClient *tgtclnt = p->Client();
952 if (p->SuperUser()) {
953 int what = ntohl(p->Request()->proof.int2);
954 all = (what == 1) ? 1 : 0;
960 int len = p->Request()->header.dlen;
963 buf = p->Argp()->buff;
964 len = (len < 9) ? len : 8;
966 buf = (
char *) p->Client()->User();
967 len = strlen(p->Client()->User());
970 usr =
new char[len+1];
971 memcpy(usr, buf, len);
974 char *grp = strstr(usr,
":");
978 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(usr, grp);
983 TRACEP(p, REQ,
"superuser, cleaning usr: "<< usr);
987 TRACEP(p, REQ,
"superuser, all sessions cleaned");
992 int len = strlen(tgtclnt->User()) + 1;
993 usr =
new char[len+1];
994 memcpy(usr, tgtclnt->User(), len);
1000 TRACEP(p, DBG,
"client '"<<usr<<
"' has no sessions - do nothing");
1004 bool hard = (ntohl(p->Request()->proof.int3) == 1 || p->ProofProtocol() < 18) ? 1 : 0;
1005 const char *lab = hard ?
"hard-reset" :
"soft-reset";
1008 if (fMgr->SrvType() != kXPD_Worker) {
1009 XPDFORM(cmsg,
"CleanupSessions: %s: signalling active sessions for termination", lab);
1010 response->Send(kXR_attn, kXPD_srvmsg, (
char *) cmsg.c_str(), cmsg.length());
1014 XPDFORM(cmsg,
"CleanupSessions: %s: cleaning up client: requested by: %s", lab, p->Link()->ID);
1015 int srvtype = ntohl(p->Request()->proof.int2);
1016 fMgr->ClientMgr()->TerminateSessions(tgtclnt, cmsg.c_str(), srvtype);
1019 if (hard && fMgr->SrvType() != kXPD_Worker) {
1022 XPDFORM(cmsg,
"CleanupSessions: %s: forwarding the reset request to next tier(s) ", lab);
1023 response->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) cmsg.c_str(), cmsg.length());
1025 int type = ntohl(p->Request()->proof.int1);
1026 fMgr->NetMgr()->Broadcast(type, usr, p->Client()->User(), response, 1);
1034 while (twait-- > 0 &&
1035 fMgr->SessionMgr()->CheckCounter(XrdProofdProofServMgr::kCleanSessionsCnt) > 0) {
1037 XPDFORM(cmsg,
"CleanupSessions: %s: wait %d more seconds for completion ...", lab, twait);
1038 response->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) cmsg.c_str(), cmsg.length());
1056 int XrdProofdAdmin::SetSessionAlias(XrdProofdProtocol *p)
1058 XPDLOC(ALL,
"Admin::SetSessionAlias")
1061 XPD_SETRESP(p, "SetSessionAlias");
1065 int psid = ntohl(p->Request()->proof.sid);
1066 XrdProofdProofServ *xps = 0;
1067 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
1068 TRACEP(p, XERR,
"session ID not found: "<<psid);
1069 response->Send(kXR_InvalidRequest,
"SetSessionAlias: session ID not found");
1074 const char *msg = (
const char *) p->Argp()->buff;
1075 int len = p->Request()->header.dlen;
1076 if (len > kXPROOFSRVALIASMAX - 1)
1077 len = kXPROOFSRVALIASMAX - 1;
1080 if (len > 0 && msg) {
1083 XrdOucString alias(xps->Alias());
1084 TRACEP(p, DBG,
"session alias set to: "<<alias);
1098 int XrdProofdAdmin::SetSessionTag(XrdProofdProtocol *p)
1100 XPDLOC(ALL,
"Admin::SetSessionTag")
1103 XPD_SETRESP(p, "SetSessionTag");
1106 int psid = ntohl(p->Request()->proof.sid);
1107 XrdProofdProofServ *xps = 0;
1108 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
1109 TRACEP(p, XERR,
"session ID not found: "<<psid);
1110 response->Send(kXR_InvalidRequest,
"SetSessionTag: session ID not found");
1115 const char *msg = (
const char *) p->Argp()->buff;
1116 int len = p->Request()->header.dlen;
1117 if (len > kXPROOFSRVTAGMAX - 1)
1118 len = kXPROOFSRVTAGMAX - 1;
1121 if (len > 0 && msg) {
1124 XrdOucString tag(xps->Tag());
1125 TRACEP(p, DBG,
"session tag set to: "<<tag);
1139 int XrdProofdAdmin::ReleaseWorker(XrdProofdProtocol *p)
1141 XPDLOC(ALL,
"Admin::ReleaseWorker")
1144 XPD_SETRESP(p, "ReleaseWorker");
1147 int psid = ntohl(p->Request()->proof.sid);
1148 XrdProofdProofServ *xps = 0;
1149 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
1150 TRACEP(p, XERR,
"session ID not found: "<<psid);
1151 response->Send(kXR_InvalidRequest,
"ReleaseWorker: session ID not found");
1156 const char *msg = (
const char *) p->Argp()->buff;
1157 int len = p->Request()->header.dlen;
1158 if (len > kXPROOFSRVTAGMAX - 1)
1159 len = kXPROOFSRVTAGMAX - 1;
1162 if (len > 0 && msg) {
1163 xps->RemoveWorker(msg);
1164 TRACEP(p, DBG,
"worker \""<<msg<<
"\" released");
1165 if (TRACING(HDBG)) fMgr->NetMgr()->Dump();
1179 int XrdProofdAdmin::CheckForbiddenChars(
const char *s)
1182 if (!s || (len = strlen(s)) <= 0)
return 0;
1187 if (c ==
'(' || c ==
')' || c ==
'{' || c ==
'}' || c ==
';') {
1198 int XrdProofdAdmin::Exec(XrdProofdProtocol *p)
1200 XPDLOC(ALL,
"Admin::Exec")
1203 #if !defined(__APPLE__)
1204 const char *cmds[] = {
"rm",
"ls",
"more",
"grep",
"tail",
"md5sum",
"stat",
"find" };
1206 const char *cmds[] = {
"rm",
"ls",
"more",
"grep",
"tail",
"md5",
"stat",
"find" };
1208 const char *actcmds[] = {
"remove",
"access",
"open",
"open",
"open",
"open",
"stat",
"find"};
1211 XPD_SETRESP(p,
"Exec");
1216 XrdProofdClient *tgtclnt = p->Client();
1218 emsg =
"client instance not found";
1219 TRACEP(p, XERR, emsg);
1220 response->Send(kXR_InvalidRequest, emsg.c_str());
1225 int action = ntohl(p->Request()->proof.int2);
1226 if (action < kRm || action > kFind) {
1227 emsg =
"unknown action type: ";
1229 TRACEP(p, XERR, emsg);
1230 response->Send(kXR_InvalidRequest, emsg.c_str());
1235 int dlen = p->Request()->header.dlen;
1236 XrdOucString msg, node, path, opt;
1237 if (dlen > 0 && p->Argp()->buff) {
1238 msg.assign((
const char *)p->Argp()->buff, 0, dlen);
1242 if ((from = msg.tokenize(node, from,
'|')) != -1) {
1243 if ((from = msg.tokenize(path, from,
'|')) != -1) {
1244 from = msg.tokenize(opt, from,
'|');
1246 emsg =
"'path' not found in message";
1249 emsg =
"'node' not found in message";
1251 if (emsg.length() > 0) {
1252 TRACEP(p, XERR, emsg);
1253 response->Send(kXR_InvalidRequest, emsg.c_str());
1259 if (CheckForbiddenChars(path.c_str()) != 0) {
1260 emsg =
"none of the characters '(){};' are allowed in path string ("; emsg += path; emsg +=
")";
1261 TRACEP(p, XERR, emsg);
1262 response->Send(kXR_InvalidRequest, emsg.c_str());
1265 if (CheckForbiddenChars(opt.c_str()) != 0) {
1266 emsg =
"none of the characters '(){};' are allowed in opt string ("; emsg += opt; emsg +=
")";
1267 TRACEP(p, XERR, emsg);
1268 response->Send(kXR_InvalidRequest, emsg.c_str());
1273 XrdOucString result;
1274 bool islocal = fMgr->NetMgr()->IsLocal(node.c_str(), 1);
1275 if (fMgr->SrvType() != kXPD_Worker) {
1276 int type = ntohl(p->Request()->proof.int1);
1277 if (node ==
"all") {
1278 if (action == kStat || action == kMd5sum) {
1279 emsg =
"action cannot be run in mode 'all' - running on master only";
1280 response->Send(kXR_attn, kXPD_srvmsg, 2, (
char *)emsg.c_str(), emsg.length());
1282 fMgr->NetMgr()->Broadcast(type, msg.c_str(), p->Client()->User(), response, 0, action);
1284 }
else if (!islocal) {
1286 XrdOucString u = (p->Client()->User()) ? p->Client()->User() : fMgr->EffectiveUser();
1289 TRACEP(p, HDBG,
"sending request to "<<u);
1291 XrdClientMessage *xrsp;
1292 if (!(xrsp = fMgr->NetMgr()->Send(u.c_str(), type, msg.c_str(), 0, response, 0, action))) {
1293 TRACEP(p, XERR,
"problems sending request to "<<u);
1295 if (action == kStat || action == kMd5sum) {
1297 result.assign((
const char *) xrsp->GetData(), 0, xrsp->DataLen());
1298 }
else if (action == kRm) {
1309 if (node !=
"all" && !islocal) {
1311 if (result.length() > 0) {
1312 response->Send(result.c_str());
1321 XrdOucString cmd, pfx(fMgr->Host());
1322 pfx +=
":"; pfx += fMgr->Port();
1325 if (node !=
"all") {
1326 if (action != kStat && action != kMd5sum && action != kRm) {
1327 emsg =
"Node: "; emsg += pfx;
1329 response->Send(kXR_attn, kXPD_srvmsg, 2, (
char *)emsg.c_str(), emsg.length());
1338 XrdOucString fullpath(path);
1340 bool haswild = (fullpath.find(
'*') != STR_NPOS) ? 1 : 0;
1341 int check = (action == kMore || action == kTail ||
1342 action == kGrep || action == kMd5sum) ? 2 : 1;
1343 if ((action == kRm || action == kLs) && haswild) check = 0;
1346 if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
1347 fullpath, check, sandbox, &st, emsg)) != 0) {
1349 emsg = cmds[action];
1350 emsg +=
": cannot ";
1351 emsg += actcmds[action];
1354 emsg +=
"': No such file or directory";
1355 }
else if (rccp == -3) {
1356 emsg = cmds[action];
1357 emsg +=
": cannot stat ";
1359 emsg +=
": errno: ";
1360 emsg += (int) errno;
1361 }
else if (rccp == -4) {
1362 emsg = cmds[action];
1365 emsg +=
": Is not a regular file";
1367 TRACEP(p, XERR, emsg);
1368 response->Send(kXR_InvalidRequest, emsg.c_str());
1373 if (action == kRm) {
1377 emsg =
"not allowed to rm with wild cards on path: ";
1379 TRACEP(p, XERR, emsg);
1380 response->Send(kXR_InvalidRequest, emsg.c_str());
1383 if ((
int) st.st_uid != tgtclnt->UI().fUid || (int) st.st_gid != tgtclnt->UI().fGid) {
1384 emsg =
"rm on path: ";
1386 emsg +=
" requires ownership; path owned by: (";
1387 emsg += (int) st.st_uid; emsg +=
",";
1388 emsg += (
int) st.st_gid; emsg +=
")";
1389 TRACEP(p, XERR, emsg);
1390 response->Send(kXR_InvalidRequest, emsg.c_str());
1395 const char *sbdir[5] = {
"queries",
"packages",
"cache",
"datasets",
"data"};
1396 while (fullpath.endswith(
'/'))
1397 fullpath.erasefromend(1);
1398 XrdOucString sball(tgtclnt->Sandbox()->Dir()), sball1 = sball;
1399 sball +=
"/*"; sball1 +=
"/*/";
1400 if (fullpath == sball || fullpath == sball1) {
1401 emsg =
"removing all sandbox directory is not allowed: ";
1403 TRACEP(p, XERR, emsg);
1404 response->Send(kXR_InvalidRequest, emsg.c_str());
1409 if (fullpath.endswith(sbdir[kk])) {
1410 emsg =
"removing a basic sandbox directory is not allowed: ";
1412 TRACEP(p, XERR, emsg);
1413 response->Send(kXR_InvalidRequest, emsg.c_str());
1421 if (opt.length() <= 0) opt =
"-f";
1422 cmd +=
" "; cmd += opt;
1423 cmd +=
" "; cmd += fullpath;
1428 XrdOucString rederr;
1432 if (opt.length() <= 0) opt =
"-C";
1450 if (action != kFind) {
1451 if (cmd.length() > 0) cmd +=
" ";
1452 if (opt.length() > 0) { cmd += opt; cmd +=
" ";}
1455 cmd +=
" "; cmd += fullpath;
1456 if (opt.length() > 0) { cmd +=
" "; cmd += opt; }
1458 if (rederr.length() > 0) cmd += rederr;
1463 if (ExecCmd(p, response, action, cmd.c_str(), emsg) != 0) {
1464 TRACEP(p, XERR, emsg);
1465 response->Send(kXR_ServerError, emsg.c_str());
1471 response->Send(emsg.c_str());
1474 response->Send(
"OK");
1493 int XrdProofdAdmin::ExecCmd(XrdProofdProtocol *p, XrdProofdResponse *r,
1494 int action,
const char *cmd, XrdOucString &emsg)
1496 XPDLOC(ALL,
"Admin::ExecCmd")
1499 XrdOucString pfx = emsg;
1503 if (!cmd || strlen(cmd) <= 0) {
1504 emsg =
"undefined command!";
1510 if (!pp.IsValid()) {
1511 emsg =
"cannot create the pipe";
1516 TRACEP(p, DBG,
"forking to execute in the private sandbox");
1518 if (!(pid = fMgr->Sched()->Fork(
"adminexeccmd"))) {
1522 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
1523 emsg =
"SetUserEnvironment did not return OK";
1527 if (action == kStat) {
1529 if ((stat(cmd, &st)) != 0) {
1530 if (errno == ENOENT) {
1531 emsg +=
"stat: cannot stat `";
1533 emsg +=
"': No such file or directory";
1535 emsg +=
"stat: cannot stat ";
1537 emsg +=
": errno: ";
1538 emsg += (int) errno;
1543 int islink = S_ISLNK(st.st_mode);
1544 snprintf(msg, 256,
"%ld %ld %d %d %d %lld %ld %d", (
long)st.st_dev,
1545 (
long)st.st_ino, st.st_mode, (
int)(st.st_uid),
1546 (
int)(st.st_gid), (kXR_int64)st.st_size, st.st_mtime, islink);
1551 FILE *fp = popen(cmd,
"r");
1553 emsg =
"could not run '"; emsg += cmd; emsg +=
"'";
1557 int pfxlen = pfx.length();
1561 int bufsiz = 1024, left = bufsiz - 1, lines = 0;
1562 while (fgets(line,
sizeof(line), fp)) {
1564 int llen = strlen(line);
1567 if (lines == 1 && action == kMd5sum) {
1568 if (line[llen-1] ==
'\n') {
1569 line[llen-1] =
'\0';
1572 #if !defined(__APPLE__)
1574 XrdOucString sl(line);
1575 sl.tokenize(emsg, 0,
' ');
1578 XrdOucString sl(line), tkn;
1580 while ((from = sl.tokenize(tkn, from,
' ')) != STR_NPOS) {
1587 if ((llen + pfxlen) > left) {
1589 if (buf[len-1] ==
'\n') buf[len-1] =
'\0';
1590 if (r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) &buf[0], len) != 0) {
1591 emsg =
"error sending message to requester";
1601 memcpy(buf+len, pfx.c_str(), pfxlen);
1606 memcpy(buf+len, line, llen);
1610 if (lines > 0 && !(lines % 10)) {
1612 if (p->Link()->Peek(&b[0], 1, 0) == 1) {
1613 p->Process(p->Link());
1614 if (p->IsCtrlC())
break;
1621 if (buf[len-1] ==
'\n') buf[len-1] =
'\0';
1622 if (r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) &buf[0], len) != 0) {
1623 emsg =
"error sending message to requester";
1629 if ((rcpc = pclose(fp)) == -1) {
1630 emsg =
"could not close the command pipe";
1633 if (WEXITSTATUS(rcpc) != 0) {
1634 emsg =
"failure: return code: ";
1635 emsg += (int) WEXITSTATUS(rcpc);
1644 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
1648 if (pp.Post(0, emsg.c_str()) != 0) rc = 1;
1656 emsg =
"forking failed - errno: "; emsg += (int) errno;
1661 TRACEP(p, DBG,
"forking OK: wait for information");
1664 int prc = 0, rst = -1;
1666 while (rst < 0 && rc >= 0) {
1667 while ((prc = pp.Poll(60)) > 0) {
1669 if (pp.Recv(msg) != 0) {
1670 emsg =
"error receiving message from pipe";
1679 if (buf.length() <= 0) {
1680 emsg =
"error reading string from received message";
1686 if (action == kMd5sum || action == kStat) {
1688 if (buf.length() <= 0) {
1689 emsg =
"error reading string from received message";
1700 emsg =
"timeout from poll";
1702 }
else if (prc < 0) {
1703 emsg =
"error from poll - errno: "; emsg += -prc;
1715 int XrdProofdAdmin::CheckPath(
bool superuser,
const char *sbdir,
1716 XrdOucString &fullpath,
int check,
bool &sandbox,
1717 struct stat *st, XrdOucString &emsg)
1719 if (!sbdir || strlen(sbdir) <= 0) {
1720 emsg =
"CheckPath: sandbox dir undefined!";
1725 XrdOucString path(fullpath);
1727 if (path.beginswith(
'/')) {
1729 if (fullpath.beginswith(sbdir)) sandbox = 1;
1731 if (path.beginswith(
"../")) path.erase(0,2);
1732 if (path.beginswith(
"./") || path.beginswith(
"~/")) path.erase(0,1);
1733 if (!path.beginswith(
"/")) path.insert(
'/',0);
1738 fullpath.replace(
"//",
"/");
1741 if (!sandbox && !superuser) {
1743 std::list<XrdOucString>::iterator si = fExportPaths.begin();
1744 while (si != fExportPaths.end()) {
1745 if (path.beginswith((*si).c_str())) {
1752 emsg =
"CheckPath: not allowed to run the requested action on ";
1758 if (check > 0 && st) {
1760 if (stat(fullpath.c_str(), st) != 0) {
1761 if (errno == ENOENT) {
1769 if ((check == 2) && !S_ISREG(st->st_mode))
return -4;
1779 int XrdProofdAdmin::GetFile(XrdProofdProtocol *p)
1781 XPDLOC(ALL,
"Admin::GetFile")
1784 XPD_SETRESP(p, "GetFile");
1789 XrdProofdClient *tgtclnt = p->Client();
1791 emsg =
"client instance not found";
1792 TRACEP(p, XERR, emsg);
1793 response->Send(kXR_InvalidRequest, emsg.c_str());
1798 int dlen = p->Request()->header.dlen;
1800 if (dlen > 0 && p->Argp()->buff) {
1801 path.assign((
const char *)p->Argp()->buff, 0, dlen);
1802 if (path.length() <= 0) {
1803 TRACEP(p, XERR,
"path missing!");
1804 response->Send(kXR_InvalidRequest,
"path missing!");
1811 XrdOucString fullpath(path);
1812 bool sandbox = 0, check = 2;
1815 if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
1816 fullpath, check, sandbox, &st, emsg)) != 0) {
1818 emsg =
"Cannot open `";
1820 emsg +=
"': No such file or directory";
1821 }
else if (rccp == -3) {
1822 emsg =
"Cannot stat `";
1824 emsg +=
"': errno: ";
1825 emsg += (int) errno;
1826 }
else if (rccp == -4) {
1828 emsg +=
" is not a regular file";
1830 TRACEP(p, XERR, emsg);
1831 response->Send(kXR_InvalidRequest, emsg.c_str());
1837 if (!pp.IsValid()) {
1838 emsg =
"cannot create the pipe for internal communications";
1839 TRACEP(p, XERR, emsg);
1840 response->Send(kXR_InvalidRequest, emsg.c_str());
1844 TRACEP(p, DBG,
"forking to execute in the private sandbox");
1846 if (!(pid = fMgr->Sched()->Fork(
"admingetfile"))) {
1851 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
1852 emsg =
"SetUserEnvironment did not return OK";
1857 int fd = open(fullpath.c_str(), O_RDONLY);
1859 emsg =
"cannot open file: ";
1861 emsg +=
" - errno:";
1862 emsg += (int) errno;
1863 TRACEP(p, XERR, emsg);
1864 response->Send(kXR_ServerError, emsg.c_str());
1870 snprintf(sizmsg, 64,
"%lld", (kXR_int64) st.st_size);
1871 response->Send((
const char *) &sizmsg[0]);
1872 TRACEP(p, XERR,
"size is "<<sizmsg<<
" bytes");
1875 const int kMAXBUF = 16384;
1878 lseek(fd, pos, SEEK_SET);
1880 while (rc == 0 && pos < st.st_size) {
1881 off_t left = st.st_size - pos;
1882 if (left > kMAXBUF) left = kMAXBUF;
1885 while ((siz = read(fd, &buf[0], left)) < 0 && errno == EINTR)
1887 if (siz < 0 || siz != left) {
1888 emsg =
"error reading from file: errno: ";
1889 emsg += (int) errno;
1895 if ((src = response->Send(kXR_attn, kXPD_msg, (
void *)&buf[0], left)) != 0) {
1896 emsg =
"error reading from file: errno: ";
1904 if (pp.Post(0,
"") != 0) {
1913 TRACEP(p, XERR, emsg);
1914 response->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) emsg.c_str(), emsg.length());
1922 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
1925 if (pp.Post(1,
"") != 0) rc = 1;
1934 emsg =
"forking failed - errno: "; emsg += (int) errno;
1935 TRACEP(p, XERR, emsg);
1936 response->Send(kXR_ServerError, emsg.c_str());
1941 TRACEP(p, DBG,
"forking OK: execution will continue in the child process");
1944 int prc = 0, rst = 0;
1946 while (rst == 0 && rc >= 0) {
1947 while ((prc = pp.Poll(60)) > 0) {
1949 if (pp.Recv(msg) != 0) {
1950 emsg =
"error receiving message from pipe";
1961 if (emsg.length() <= 0) {
1962 emsg =
"error reading string from received message";
1966 }
else if (rst > 0) {
1972 emsg =
"timeout from poll";
1974 }
else if (prc < 0) {
1975 emsg =
"error from poll - errno: "; emsg += -prc;
1981 TRACEP(p, DBG,
"execution over: "<< ((rc == 0) ?
"ok" :
"failed"));
1990 int XrdProofdAdmin::PutFile(XrdProofdProtocol *p)
1992 XPDLOC(ALL,
"Admin::PutFile")
1995 XPD_SETRESP(p, "PutFile");
2000 XrdProofdClient *tgtclnt = p->Client();
2002 emsg =
"client instance not found";
2003 TRACEP(p, XERR, emsg);
2004 response->Send(kXR_InvalidRequest, emsg.c_str());
2009 kXR_int64 size = -1;
2010 int dlen = p->Request()->header.dlen;
2011 XrdOucString cmd, path, ssiz, opt;
2012 if (dlen > 0 && p->Argp()->buff) {
2013 cmd.assign((
const char *)p->Argp()->buff, 0, dlen);
2014 if (cmd.length() <= 0) {
2015 TRACEP(p, XERR,
"input buffer missing!");
2016 response->Send(kXR_InvalidRequest,
"input buffer missing!");
2020 if ((from = cmd.tokenize(path, from,
' ')) < 0) {
2021 TRACEP(p, XERR,
"cannot resolve path!");
2022 response->Send(kXR_InvalidRequest,
"cannot resolve path!");
2025 if ((from = cmd.tokenize(ssiz, from,
' ')) < 0) {
2026 TRACEP(p, XERR,
"cannot resolve word with size!");
2027 response->Send(kXR_InvalidRequest,
"cannot resolve word with size!");
2031 size = atoll(ssiz.c_str());
2033 TRACEP(p, XERR,
"cannot resolve size!");
2034 response->Send(kXR_InvalidRequest,
"cannot resolve size!");
2038 cmd.tokenize(opt, from,
' ');
2040 TRACEP(p, DBG,
"path: '"<<path<<
"'; size: "<<size<<
" bytes; opt: '"<<opt<<
"'");
2043 kXR_unt32 openflags = O_WRONLY | O_TRUNC | O_CREAT;
2044 kXR_unt32 modeflags = 0600;
2048 XrdOucString fullpath(path);
2049 bool sandbox = 0, check = 1;
2052 if ((rccp = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
2053 fullpath, check, sandbox, &st, emsg)) != 0) {
2057 emsg +=
"' exists but cannot be stat: errno: ";
2058 emsg += (int) errno;
2061 TRACEP(p, XERR, emsg);
2062 response->Send(kXR_InvalidRequest, emsg.c_str());
2067 if (opt ==
"force") {
2068 openflags = O_WRONLY | O_TRUNC;
2072 emsg +=
"' exists; user option 'force' to override it";
2073 TRACEP(p, XERR, emsg);
2074 response->Send(kXR_InvalidRequest, emsg.c_str());
2081 if (!pp.IsValid()) {
2082 emsg =
"cannot create the pipe for internal communications";
2083 TRACEP(p, XERR, emsg);
2084 response->Send(kXR_InvalidRequest, emsg.c_str());
2088 TRACEP(p, DBG,
"forking to execute in the private sandbox");
2090 if (!(pid = fMgr->Sched()->Fork(
"adminputfile"))) {
2094 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
2095 emsg =
"SetUserEnvironment did not return OK";
2099 int fd = open(fullpath.c_str(), openflags, modeflags);
2101 emsg =
"cannot open file: ";
2103 emsg +=
" - errno: ";
2104 emsg += (int) errno;
2105 TRACEP(p, XERR, emsg);
2106 response->Send(kXR_ServerError, emsg.c_str());
2111 response->Send(
"OK");
2113 const int kMAXBUF = XrdProofdProtocol::MaxBuffsz();
2115 XrdBuffer *argp = XrdProofdProtocol::GetBuff(kMAXBUF);
2117 emsg =
"cannot get buffer to read data out";
2121 kXR_int64 filesize = 0, left = 0;
2122 while (rc == 0 && filesize < size) {
2123 left = size - filesize;
2124 if (left > kMAXBUF) left = kMAXBUF;
2126 TRACEP(p, ALL,
"receiving "<<left<<
" ...");
2127 if ((rc = p->GetData(
"data", argp->buff, left))) {
2128 XrdProofdProtocol::ReleaseBuff(argp);
2129 emsg =
"cannot read data out";
2136 char *b = argp->buff;
2140 while ((w = write(fd, b, r)) < 0 && errno == EINTR)
2143 emsg =
"error writing to unit: ";
2152 if (pp.Post(0,
"") != 0) {
2160 XrdProofdProtocol::ReleaseBuff(argp);
2163 TRACEP(p, XERR, emsg);
2164 response->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) emsg.c_str(), emsg.length());
2171 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
2174 if (pp.Post(1,
"") != 0) rc = 1;
2182 emsg =
"forking failed - errno: "; emsg += (int) errno;
2183 TRACEP(p, XERR, emsg);
2184 response->Send(kXR_ServerError, emsg.c_str());
2189 TRACEP(p, DBG,
"forking OK: execution will continue in the child process");
2192 int prc = 0, rst = 0;
2194 while (rst == 0 && rc >= 0) {
2195 while ((prc = pp.Poll(60)) > 0) {
2197 if (pp.Recv(msg) != 0) {
2198 emsg =
"error receiving message from pipe";
2209 if (emsg.length() <= 0) {
2210 emsg =
"error reading string from received message";
2214 }
else if (rst > 0) {
2220 emsg =
"timeout from poll";
2222 }
else if (prc < 0) {
2223 emsg =
"error from poll - errno: "; emsg += -prc;
2229 TRACEP(p, DBG,
"execution over: "<< ((rc == 0) ?
"ok" :
"failed"));
2238 int XrdProofdAdmin::CpFile(XrdProofdProtocol *p)
2240 XPDLOC(ALL,
"Admin::CpFile")
2243 XPD_SETRESP(p, "CpFile");
2248 XrdProofdClient *tgtclnt = p->Client();
2250 emsg =
"client instance not found";
2251 TRACEP(p, XERR, emsg);
2252 response->Send(kXR_InvalidRequest, emsg.c_str());
2257 int dlen = p->Request()->header.dlen;
2258 XrdOucString buf, src, dst, fmt;
2259 if (dlen > 0 && p->Argp()->buff) {
2260 buf.assign((
const char *)p->Argp()->buff, 0, dlen);
2261 if (buf.length() <= 0) {
2262 TRACEP(p, XERR,
"input buffer missing!");
2263 response->Send(kXR_InvalidRequest,
"input buffer missing!");
2267 if ((from = buf.tokenize(src, from,
' ')) < 0) {
2268 TRACEP(p, XERR,
"cannot resolve src path!");
2269 response->Send(kXR_InvalidRequest,
"cannot resolve src path!");
2272 if ((from = buf.tokenize(dst, from,
' ')) < 0) {
2273 TRACEP(p, XERR,
"cannot resolve dst path!");
2274 response->Send(kXR_InvalidRequest,
"cannot resolve dst path!");
2278 fmt.assign(buf, from);
2280 TRACEP(p, DBG,
"src: '"<<src<<
"'; dst: '"<<dst<<
"'; fmt: '"<<fmt<<
"'");
2284 XrdClientUrlInfo usrc(src.c_str());
2285 if (usrc.Proto.length() > 0 && usrc.Proto !=
"file") {
2287 if (!fAllowedCpCmds.Find(usrc.Proto.c_str())) {
2288 TRACEP(p, XERR,
"protocol for source file not supported");
2289 response->Send(kXR_InvalidRequest,
"protocol for source file not supported");
2293 if (usrc.Proto ==
"file") src = usrc.File;
2295 XrdClientUrlInfo udst(dst.c_str());
2296 if (udst.Proto.length() > 0 && udst.Proto !=
"file") {
2298 if (!fAllowedCpCmds.Find(udst.Proto.c_str())) {
2299 TRACEP(p, XERR,
"protocol for destination file not supported");
2300 response->Send(kXR_InvalidRequest,
"protocol for destination file not supported");
2304 if (udst.Proto ==
"file") dst = udst.File;
2310 XpdAdminCpCmd *xc = 0;
2311 if (!locsrc && !locdst) {
2313 TRACEP(p, XERR,
"At least destination or source must be local");
2314 response->Send(kXR_InvalidRequest,
"At least destination or source must be local");
2316 }
else if (!locdst) {
2318 xc = fAllowedCpCmds.Find(udst.Proto.c_str());
2320 TRACEP(p, XERR,
"not allowed to create destination file with the chosen protocol");
2321 response->Send(kXR_InvalidRequest,
"not allowed to create destination file with the chosen protocol");
2326 }
else if (!locsrc) {
2328 xc = fAllowedCpCmds.Find(usrc.Proto.c_str());
2333 xc = fAllowedCpCmds.Find(
"file");
2337 XrdOucString srcpath(src), dstpath(dst);
2338 bool sbsrc = 0, sbdst = 0;
2339 struct stat stsrc, stdst;
2340 int rccpsrc = 0, rccpdst = 0;
2341 if (loc2loc || loc2rem) {
2342 if ((rccpsrc = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
2343 srcpath, 2, sbsrc, &stsrc, emsg)) != 0) {
2344 if (rccpsrc == -2) {
2346 emsg +=
": cannot open `";
2348 emsg +=
"': No such file or directory";
2349 }
else if (rccpsrc == -3) {
2351 emsg +=
": cannot stat ";
2353 emsg +=
": errno: ";
2354 emsg += (int) errno;
2355 }
else if (rccpsrc == -4) {
2359 emsg +=
": Is not a regular file";
2361 TRACEP(p, XERR, emsg);
2362 response->Send(kXR_InvalidRequest, emsg.c_str());
2366 if (loc2loc || rem2loc) {
2367 if ((rccpdst = CheckPath(p->SuperUser(), tgtclnt->Sandbox()->Dir(),
2368 dstpath, 0, sbdst, &stdst, emsg)) != 0) {
2369 if (rccpdst == -2) {
2371 emsg +=
": cannot open `";
2373 emsg +=
"': No such file or directory";
2374 }
else if (rccpdst == -3) {
2376 emsg +=
": cannot stat ";
2378 emsg +=
": errno: ";
2379 emsg += (int) errno;
2380 }
else if (rccpdst == -4) {
2384 emsg +=
": Is not a regular file";
2386 TRACEP(p, XERR, emsg);
2387 response->Send(kXR_InvalidRequest, emsg.c_str());
2393 if (fmt.length() <= 0) {
2396 if (!fmt.beginswith(xc->fCmd)) {
2398 fmt.insert(xc->fCmd, 0);
2400 if (fmt.find(
"%s") == STR_NPOS) {
2401 fmt.insert(
" %s %s", -1);
2407 XrdProofdAux::Form(cmd, fmt.c_str(), srcpath.c_str(), dstpath.c_str());
2409 TRACEP(p, DBG,
"Executing command: " << cmd);
2413 if (!pp.IsValid()) {
2414 emsg =
"cannot create the pipe";
2415 TRACEP(p, XERR, emsg);
2416 response->Send(kXR_ServerError, emsg.c_str());
2421 TRACEP(p, DBG,
"forking to execute in the private sandbox");
2423 if (!(pid = fMgr->Sched()->Fork(
"admincpfile"))) {
2427 if (fMgr->SessionMgr()->SetUserEnvironment(p) != 0) {
2428 emsg =
"SetUserEnvironment did not return OK";
2432 FILE *fp = popen(cmd.c_str(),
"r");
2434 emsg =
"could not run '"; emsg += cmd; emsg +=
"'";
2439 while (fgets(line,
sizeof(line), fp)) {
2441 int llen = strlen(line);
2442 if (llen > 0 && line[llen-1] ==
'\n') {
2443 line[llen-1] =
'\0';
2448 response->Send(kXR_attn, kXPD_srvmsg, 4, (
char *) &line[0], llen) != 0) {
2449 emsg =
"error sending message to requester";
2455 if (p->Link()->Peek(&b[0], 1, 0) == 1) {
2456 p->Process(p->Link());
2457 if (p->IsCtrlC())
break;
2460 if (pp.Post(0,
"") != 0) {
2467 if ((rcpc = pclose(fp)) == -1) {
2468 emsg =
"error while trying to close the command pipe";
2471 if (WEXITSTATUS(rcpc) != 0) {
2472 emsg =
"return code: ";
2473 emsg += (int) WEXITSTATUS(rcpc);
2477 char cp[1] = {
'\n'};
2478 if (response->Send(kXR_attn, kXPD_srvmsg, 3, (
char *) &cp[0], 1) != 0) {
2479 emsg =
"error sending progress notification to requester";
2487 if (pp.Post(-1, emsg.c_str()) != 0) rc = 1;
2491 if (pp.Post(1,
"") != 0) rc = 1;
2499 emsg =
"forking failed - errno: "; emsg += (int) errno;
2504 TRACEP(p, DBG,
"forking OK: wait for execution");
2507 int prc = 0, rst = 0;
2509 while (rst == 0 && rc >= 0) {
2510 while ((prc = pp.Poll(60)) > 0) {
2512 if (pp.Recv(msg) != 0) {
2513 emsg =
"error receiving message from pipe";;
2524 if (emsg.length() <= 0)
2525 emsg =
"error reading string from received message";
2526 }
else if (rst == 1) {
2532 emsg =
"timeout from poll";
2534 }
else if (prc < 0) {
2535 emsg =
"error from poll - errno: "; emsg += -prc;
2541 TRACEP(p, DBG,
"execution over: "<< ((rc == 0) ?
"ok" :
"failed"));
2544 emsg.insert(
"failure: ", 0);
2545 TRACEP(p, XERR, emsg);
2546 response->Send(kXR_ServerError, emsg.c_str());
2548 response->Send(
"OK");