28 #include "XrdOuc/XrdOucStream.hh"
30 #include "XrdVersion.hh"
31 #include "Xrd/XrdBuffer.hh"
32 #include "Xrd/XrdScheduler.hh"
50 XrdOucTrace *XrdProofdTrace = 0;
53 static XrdSysLogger gMainLogger;
57 int XrdProofdProtocol::fgCount = 0;
58 XpdObjectQ XrdProofdProtocol::fgProtStack(
"ProtStack",
59 "xproofd protocol anchor");
60 XrdSysRecMutex XrdProofdProtocol::fgBMutex;
61 XrdBuffManager *XrdProofdProtocol::fgBPool = 0;
62 int XrdProofdProtocol::fgMaxBuffsz= 0;
63 XrdSysError XrdProofdProtocol::fgEDest(0,
"xpd");
64 XrdSysLogger *XrdProofdProtocol::fgLogger = 0;
67 bool XrdProofdProtocol::fgConfigDone = 0;
69 int XrdProofdProtocol::fgReadWait = 0;
71 XrdProofdManager *XrdProofdProtocol::fgMgr = 0;
74 int XrdProofdProtocol::fgEUidAtStartup = -1;
81 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
84 #define XPDSETSTRING(n,ns,c,s) \
85 { if (XPDCOND(n,ns)) { \
86 SafeFree(c); c = strdup(s.c_str()); ns = n; }}
89 #ifndef XPDADOPTSTRING
90 #define XPDADOPTSTRING(n,ns,c,s) \
92 XPDSETSTRING(n, ns, t, s); \
93 if (t && strlen(t)) { \
100 #define XPDSETINT(n,ns,i,s) \
101 { if (XPDCOND(n,ns)) { \
102 i = strtol(s.c_str(),0,10); ns = n; }}
112 typedef struct ResetCtrlcGuard {
113 XrdProofdProtocol *xpd;
115 ResetCtrlcGuard(XrdProofdProtocol *p,
int t) : xpd(p), type(t) { }
116 ~ResetCtrlcGuard() {
if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
121 class XrdProofdProtCfg :
public XrdProofdConfig {
124 XrdProofdProtCfg(
const char *cfg, XrdSysError *edest = 0);
125 int DoDirective(XrdProofdDirective *,
char *, XrdOucStream *,
bool);
126 void RegisterDirectives();
132 XrdProofdProtCfg::XrdProofdProtCfg(
const char *cfg, XrdSysError *edest)
133 : XrdProofdConfig(cfg, edest)
136 RegisterDirectives();
142 void XrdProofdProtCfg::RegisterDirectives()
144 Register(
"port",
new XrdProofdDirective(
"port",
this, &DoDirectiveClass));
145 Register(
"xrd.protocol",
new XrdProofdDirective(
"xrd.protocol",
this, &DoDirectiveClass));
151 int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
152 char *val, XrdOucStream *cfg,
bool)
156 XrdOucString port(val);
157 if (d->fName ==
"xrd.protocol") {
158 port = cfg->GetWord();
159 port.replace(
"xproofd:",
"");
160 }
else if (d->fName !=
"port") {
163 if (port.length() > 0) {
164 fPort = strtol(port.c_str(), 0, 10);
166 fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
170 #if (ROOTXRDVERS >= 300030000)
171 XrdVERSIONINFO(XrdgetProtocol,xproofd);
172 XrdVERSIONINFO(XrdgetProtocolPort,xproofd);
181 XrdProtocol *XrdgetProtocol(
const char *,
char *parms, XrdProtocol_Config *pi)
184 if (XrdProofdProtocol::Configure(parms, pi)) {
186 return (XrdProtocol *)
new XrdProofdProtocol(pi);
188 return (XrdProtocol *)0;
195 int XrdgetProtocolPort(
const char * ,
char * , XrdProtocol_Config *pi)
198 int port = XPD_DEF_PORT;
201 XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
203 XrdProofdTrace =
new XrdOucTrace(pi->eDest);
206 if (pcfg.fPort > 0) {
209 port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
218 XrdProofdProtocol::XrdProofdProtocol(XrdProtocol_Config *pi)
219 : XrdProtocol(
"xproofd protocol handler"), fProtLink(this)
226 fResponses.reserve(10);
228 fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
237 XrdProofdResponse *XrdProofdProtocol::Response(kXR_unt16 sid)
239 XPDLOC(ALL,
"Protocol::Response")
241 TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
244 if (sid <= fResponses.size())
245 return fResponses[sid-1];
247 return (XrdProofdResponse *)0;
253 XrdProofdResponse *XrdProofdProtocol::GetNewResponse(kXR_unt16 sid)
255 XPDLOC(ALL,
"Protocol::GetNewResponse")
258 XPDFORM(msg, "sid: %d", sid);
260 if (sid > fResponses.size()) {
261 if (sid > fResponses.capacity()) {
262 int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
263 fResponses.reserve(newsz);
265 msg +=
" new capacity: ";
266 msg += (int) fResponses.capacity();
269 int nnew = sid - fResponses.size();
271 fResponses.push_back(
new XrdProofdResponse());
273 msg +=
"; new size: ";
274 msg += (int) fResponses.size();
278 TRACE(XERR,
"wrong sid: "<<sid);
279 return (XrdProofdResponse *)0;
285 return fResponses[sid-1];
291 XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp)
293 XPDLOC(ALL,
"Protocol::Match")
295 struct ClientInitHandShake hsdata;
296 char *hsbuff = (
char *)&hsdata;
298 static hs_response_t hsresp = {0, 0, kXR_int32(htonl(XPROOFD_VERSBIN)), 0};
300 XrdProtocol *xp =
nullptr;
302 TRACE(HDBG,
"enter");
306 if ((dlen = lp->Peek(hsbuff,
sizeof(hsdata),fgReadWait)) !=
sizeof(hsdata)) {
307 if (dlen <= 0) lp->setEtext(
"Match: handshake not received");
310 hsdata.first = ntohl(hsdata.first);
311 if (hsdata.first == 8) {
312 emsg =
"rootd-file serving not supported any-longer";
314 if (emsg.length() > 0) {
315 lp->setEtext(emsg.c_str());
317 lp->setEtext(
"link transfered");
321 TRACE(XERR,
"peeked incomplete or empty information! (dlen: "<<dlen<<
" bytes)");
326 hsdata.third = ntohl(hsdata.third);
327 if (dlen !=
sizeof(hsdata) || hsdata.first || hsdata.second
328 || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
331 if (fgMgr->Xrootd() && (xp = fgMgr->Xrootd()->Match(lp))) {
332 TRACE(ALL,
"matched xrootd protocol on link: serving a file");
334 TRACE(XERR,
"failed to match any known or enabled protocol");
340 if (!lp->Send((
char *)&hsresp,
sizeof(hsresp))) {
341 lp->setEtext(
"Match: handshake failed");
342 TRACE(XERR,
"handshake failed");
347 int len =
sizeof(hsdata);
348 if (lp->Recv(hsbuff, len) != len) {
349 lp->setEtext(
"Match: reread failed");
350 TRACE(XERR,
"reread failed");
355 XrdProofdProtocol *xpp =
nullptr;
356 if (!(xpp = fgProtStack.Pop()))
357 xpp =
new XrdProofdProtocol();
361 snprintf(xpp->fSecEntity.prot, XrdSecPROTOIDSIZE,
"host");
362 xpp->fSecEntity.host = strdup((
char *)lp->Host());
366 if (xpp->GetData(
"dummy",(
char *)&dum[0],
sizeof(dum)) != 0) {
370 xp = (XrdProtocol *) xpp;
380 int XrdProofdProtocol::Stats(
char *buff,
int blen,
int)
382 static char statfmt[] =
"<stats id=\"xproofd\"><num>%ld</num></stats>";
386 return sizeof(statfmt)+16;
389 return snprintf(buff, blen, statfmt, fgCount);
395 void XrdProofdProtocol::Reset()
403 fConnType = kXPD_ClientMaster;
415 fSecEntity = XrdSecEntity();
417 std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin();
418 while (ii != fResponses.end()) {
429 int XrdProofdProtocol::Configure(
char *parms, XrdProtocol_Config *pi)
431 XPDLOC(ALL,
"Protocol::Configure")
441 fgLogger = pi->eDest->logger();
442 fgEDest.logger(fgLogger);
443 if (XrdProofdTrace) delete XrdProofdTrace;
444 XrdProofdTrace = new XrdOucTrace(&fgEDest);
446 fgReadWait = pi->readWait;
449 fgMaxBuffsz = fgBPool->MaxSize();
455 fgProtStack.Set(pi->Sched, XrdProofdTrace, TRACE_MEM);
456 fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
458 fgProtStack.Set(pi->Sched, 3600);
465 XrdProofdTrace->What = TRACE_DOMAINS;
470 XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK);
474 fgEUidAtStartup = geteuid();
475 if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
479 fgMgr =
new XrdProofdManager(parms, pi, &fgEDest);
480 if (fgMgr->Config(0))
return 0;
481 mp =
"global manager created";
485 TRACE(ALL,
"xproofd protocol version "<<XPROOFD_VERSION<<
486 " build "<<XrdVERSION<<
" successfully loaded");
496 int XrdProofdProtocol::Process(XrdLink *)
498 XPDLOC(ALL,
"Protocol::Process")
501 TRACET(TraceID(), DBG, "instance: " << this);
504 if ((rc = GetData("request", (
char *)&fRequest, sizeof(fRequest))) != 0)
506 TRACET(TraceID(), HDBG, "after GetData: rc: " << rc);
509 fRequest.header.requestid = ntohs(fRequest.header.requestid);
510 fRequest.header.dlen = ntohl(fRequest.header.dlen);
514 memcpy((
void *)&sid, (const
void *)&(fRequest.header.streamid[0]), 2);
515 XrdProofdResponse *response = 0;
516 if (!(response = Response(sid))) {
517 if (!(response = GetNewResponse(sid))) {
518 TRACET(TraceID(), XERR,
"could not get Response instance for rid: "<< sid);
523 response->Set(fRequest.header.streamid);
524 response->Set(fLink);
526 TRACET(TraceID(), REQ,
"sid: " << sid <<
", req id: " << fRequest.header.requestid <<
527 " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<<
528 ")" <<
", dlen: " <<fRequest.header.dlen);
532 if (fRequest.header.dlen < 0) {
533 response->Send(kXR_ArgInvalid,
"Process: Invalid request data length");
534 return fLink->setEtext(
"Process: protocol data length error");
540 if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
541 if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
542 response->Send(kXR_ArgTooLong,
"fRequest.argument is too long");
545 if ((rc = GetData(
"arg", fArgp->buff, fRequest.header.dlen)))
547 fArgp->buff[fRequest.header.dlen] =
'\0';
558 int XrdProofdProtocol::Process2()
560 XPDLOC(ALL,
"Protocol::Process2")
563 XPD_SETRESP(this, "Process2");
565 TRACET(TraceID(), REQ, "req
id: " << fRequest.header.requestid << " (" <<
566 XrdProofdAux::ProofRequestTypes(fRequest.header.requestid) << ")");
568 ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
571 if (fStatus && (fStatus & XPD_LOGGEDIN)) {
576 TRACET(TraceID(), XERR,
"client undefined!!! ");
577 response->Send(kXR_InvalidRequest,
"client undefined!!! ");
581 switch(fRequest.header.requestid) {
606 if (!fLink || (fLink->FDnum() <= 0)) {
607 TRACE(XERR,
"link is undefined! ");
615 rc = fgMgr->Process(
this);
617 if (!fLink || (fLink->FDnum() <= 0)) {
618 TRACE(XERR,
"link is undefined! ");
627 void XrdProofdProtocol::Recycle(XrdLink *,
int,
const char *)
629 XPDLOC(ALL,
"Protocol::Recycle")
631 const
char *srvtype[6] = {
"ANY",
"MasterWorker",
"MasterMaster",
632 "ClientMaster",
"Internal",
"Admin"};
637 XPDFORM(buf,
"user %s disconnected; type: %s", fPClient->User(),
638 srvtype[fConnType+1]);
640 XPDFORM(buf,
"user disconnected; type: %s", srvtype[fConnType+1]);
641 TRACET(TraceID(), LOGIN, buf);
645 fgBPool->Release(fArgp);
650 XrdProofdClient *pmgr = fPClient;
655 TRACE(REQ,
"External disconnection of protocol associated with pid "<<fPid);
658 XrdOucString discpath(fAdminPath);
659 discpath.replace(
"/cid",
"/disconnected");
660 FILE *fd = fopen(discpath.c_str(),
"w");
661 if (!fd && errno != ENOENT) {
662 TRACE(XERR,
"unable to create path: " <<discpath<<
" (errno: "<<errno<<
")");
669 pmgr->ResetClientSlot(fCID);
670 if(fgMgr && fgMgr->SessionMgr()) {
671 XrdSysMutexHelper mhp(fgMgr->SessionMgr()->Mutex());
673 fgMgr->SessionMgr()->DisconnectFromProofServ(fPid);
674 if((fConnType == 0) && fgMgr->SessionMgr()->Alive(
this)) {
675 TRACE(REQ,
"Non-destroyed proofserv processes attached to this protocol ("<<
this<<
676 "), setting reconnect time");
677 fgMgr->SessionMgr()->SetReconnectTime(
true);
679 fgMgr->SessionMgr()->CheckActiveSessions(0);
681 TRACE(XERR,
"No XrdProofdMgr ("<<fgMgr<<
") or SessionMgr ("
682 <<(fgMgr ? fgMgr->SessionMgr() : (
void *) -1)<<
")")
690 if (fgMgr && fgMgr->SessionMgr()) {
691 XrdSysMutexHelper mhp(fgMgr->SessionMgr()->Mutex());
692 TRACE(HDBG,
"fAdminPath: "<<fAdminPath);
693 buf.assign(fAdminPath, fAdminPath.rfind(
'/') + 1, -1);
694 fgMgr->SessionMgr()->DeleteFromSessions(buf.c_str());
696 fgMgr->SessionMgr()->MvSession(buf.c_str());
699 TRACE(XERR,
"No XrdProofdMgr ("<<fgMgr<<
") or SessionMgr ("<<fgMgr->SessionMgr()<<
")")
707 fgProtStack.Push(&fProtLink);
709 if(fgProtStack.Push(&fProtLink) != 0) {
710 XrdProofdProtocol *xp = fProtLink.objectItem();
711 fProtLink.setItem(0);
721 XrdBuffer *XrdProofdProtocol::GetBuff(
int quantum, XrdBuffer *argp)
723 XPDLOC(ALL,
"Protocol::GetBuff")
725 TRACE(HDBG, "len: "<<quantum);
730 if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
735 XrdSysMutexHelper mh(fgBMutex);
737 fgBPool->Release(argp);
740 if ((argp = fgBPool->Obtain(quantum)) == 0) {
741 TRACE(XERR,
"could not get requested buffer (size: "<<quantum<<
742 ") = insufficient memory");
744 TRACE(HDBG,
"quantum: "<<quantum<<
745 ", buff: "<<(
void *)(argp->buff)<<
", bsize:"<<argp->bsize);
755 void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp)
757 XrdSysMutexHelper mh(fgBMutex);
758 fgBPool->Release(argp);
764 int XrdProofdProtocol::GetData(
const char *dtype,
char *buff,
int blen)
766 XPDLOC(ALL,
"Protocol::GetData")
772 TRACET(TraceID(), HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
775 rlen = fLink->Recv(buff, blen, fgReadWait);
777 if (rlen != -ENOMSG && rlen != -ECONNRESET) {
778 XrdOucString emsg =
"link read error: errno: ";
780 TRACET(TraceID(), XERR, emsg.c_str());
781 return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
783 TRACET(TraceID(), HDBG,
"connection closed by peer (errno: "<<-rlen<<
")");
788 TRACET(TraceID(), DBG, dtype <<
" timeout; read " <<rlen <<
" of " <<blen <<
" bytes - rescheduling");
791 TRACET(TraceID(), HDBG,
"rlen: "<<rlen);
799 int XrdProofdProtocol::SendData(XrdProofdProofServ *xps,
800 kXR_int32 sid, XrdSrvBuffer **buf,
bool savebuf)
802 XPDLOC(ALL,
"Protocol::SendData")
806 TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
809 int len = fRequest.header.dlen;
812 int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
815 XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
816 if (!argp) return -1;
822 XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
824 if ((rc = GetData(
"data", argp->buff, quantum))) {
825 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
828 if (buf && !(*buf) && savebuf)
829 *buf =
new XrdSrvBuffer(argp->buff, quantum, 1);
833 XPDFORM(msg,
"EXT: server ID: %d, sending: %d bytes", sid, quantum);
834 if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
835 argp->buff, quantum) != 0) {
836 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
837 XPDFORM(msg,
"EXT: server ID: %d, problems sending: %d bytes to server",
839 TRACET(TraceID(), XERR, msg);
845 int cid = ntohl(fRequest.sendrcv.cid);
847 XPDFORM(msg,
"INT: client ID: %d, sending: %d bytes", cid, quantum);
848 if (xps->SendData(cid, argp->buff, quantum) != 0) {
849 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
850 XPDFORM(msg,
"INT: client ID: %d, problems sending: %d bytes to client",
852 TRACET(TraceID(), XERR, msg);
856 TRACET(TraceID(), HDBG, msg);
864 { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
875 int XrdProofdProtocol::SendDataN(XrdProofdProofServ *xps,
876 XrdSrvBuffer **buf,
bool savebuf)
878 XPDLOC(ALL,
"Protocol::SendDataN")
882 TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
885 int len = fRequest.header.dlen;
888 int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
891 XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
892 if (!argp) return -1;
896 if ((rc = GetData(
"data", argp->buff, quantum))) {
897 XrdProofdProtocol::ReleaseBuff(argp);
900 if (buf && !(*buf) && savebuf)
901 *buf =
new XrdSrvBuffer(argp->buff, quantum, 1);
904 if (xps->SendDataN(argp->buff, quantum) != 0) {
905 XrdProofdProtocol::ReleaseBuff(argp);
916 XrdProofdProtocol::ReleaseBuff(argp);
925 int XrdProofdProtocol::SendMsg()
927 XPDLOC(ALL,
"Protocol::SendMsg")
929 static const
char *crecv[5] = {
"master proofserv",
"top master",
930 "client",
"undefined",
"any"};
933 XPD_SETRESP(
this,
"SendMsg");
936 int psid = ntohl(fRequest.sendrcv.sid);
937 int opt = ntohl(fRequest.sendrcv.opt);
941 XrdProofdProofServ *xps = 0;
942 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
943 XPDFORM(msg,
"%s: session ID not found: %d", (Internal() ?
"INT" :
"EXT"), psid);
944 TRACET(TraceID(), XERR, msg.c_str());
945 response->Send(kXR_InvalidRequest, msg.c_str());
950 int len = fRequest.header.dlen;
956 XPDFORM(msg,
"EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
957 " cid: %d)", len, psid, xps, xps->Status(), fCID);
958 TRACET(TraceID(), HDBG, msg.c_str());
963 TRACET(TraceID(), REQ,
"EXT: error getting clientSID");
964 response->Send(kXP_ServerError,
"EXT: getting clientSID");
967 if (SendData(xps, fCID)) {
968 TRACET(TraceID(), REQ,
"EXT: error sending message to proofserv");
969 response->Send(kXP_reconnecting,
"EXT: sending message to proofserv");
980 XPDFORM(msg,
"INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
981 len, psid, xps, xps->Status());
982 TRACET(TraceID(), HDBG, msg.c_str());
984 bool saveStartMsg = 0;
985 XrdSrvBuffer *savedBuf = 0;
987 if (opt & kXPD_setidle) {
988 TRACET(TraceID(), DBG,
"INT: setting proofserv in 'idle' state");
989 xps->SetStatus(kXPD_idle);
990 PostSession(-1, fPClient->UI().fUser.c_str(),
991 fPClient->UI().fGroup.c_str(), xps);
992 }
else if (opt & kXPD_querynum) {
993 TRACET(TraceID(), DBG,
"INT: got message with query number");
994 }
else if (opt & kXPD_startprocess) {
995 TRACET(TraceID(), DBG,
"INT: setting proofserv in 'running' state");
996 xps->SetStatus(kXPD_running);
997 PostSession(1, fPClient->UI().fUser.c_str(),
998 fPClient->UI().fGroup.c_str(), xps);
1000 xps->DeleteStartMsg();
1002 }
else if (opt & kXPD_logmsg) {
1005 if (xps->Status() == kXPD_running) {
1006 TRACET(TraceID(), DBG,
"INT: broadcasting log message");
1007 opt |= kXPD_fb_prog;
1010 bool fbprog = (opt & kXPD_fb_prog);
1015 if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1016 response->Send(kXP_reconnecting,
1017 "SendMsg: INT: session is reconnecting: retry later");
1022 if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1023 response->Send(kXP_reconnecting,
1024 "SendMsg: INT: session is reconnecting: retry later");
1030 xps->SetStartMsg(savedBuf);
1033 int ii = xps->SrvType();
1036 XPDFORM(msg,
"INT: message sent to %s (%d bytes)", crecv[ii], len);
1037 TRACET(TraceID(), DBG, msg);
1050 int XrdProofdProtocol::Urgent()
1052 XPDLOC(ALL,
"Protocol::Urgent")
1054 unsigned
int rc = 0;
1056 XPD_SETRESP(this, "Urgent");
1059 int psid = ntohl(fRequest.proof.sid);
1060 int type = ntohl(fRequest.proof.int1);
1061 int int1 = ntohl(fRequest.proof.int2);
1062 int int2 = ntohl(fRequest.proof.int3);
1064 TRACET(TraceID(), REQ, "psid: "<<psid<<", type: "<< type);
1067 XrdProofdProofServ *xps = 0;
1068 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1069 TRACET(TraceID(), XERR,
"session ID not found: "<<psid);
1070 response->Send(kXR_InvalidRequest,
"Urgent: session ID not found");
1074 TRACET(TraceID(), DBG,
"xps: "<<xps<<
", status: "<<xps->Status());
1077 if (!xps->Match(psid)) {
1078 response->Send(kXP_InvalidRequest,
"Urgent: IDs do not match - do nothing");
1083 if (!xps->Response()) {
1084 response->Send(kXP_InvalidRequest,
"Urgent: session response object undefined - do nothing");
1089 int len = 3 *
sizeof(kXR_int32);
1090 char *buf =
new char[len];
1092 kXR_int32 itmp =
static_cast<kXR_int32
>(htonl(type));
1093 memcpy(buf, &itmp,
sizeof(kXR_int32));
1095 itmp =
static_cast<kXR_int32
>(htonl(int1));
1096 memcpy(buf +
sizeof(kXR_int32), &itmp,
sizeof(kXR_int32));
1098 itmp =
static_cast<kXR_int32
>(htonl(int2));
1099 memcpy(buf + 2 *
sizeof(kXR_int32), &itmp,
sizeof(kXR_int32));
1101 if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
1102 response->Send(kXP_ServerError,
1103 "Urgent: could not propagate request to proofsrv");
1109 TRACET(TraceID(), DBG,
"request propagated to proofsrv");
1118 int XrdProofdProtocol::Interrupt()
1120 XPDLOC(ALL,
"Protocol::Interrupt")
1124 XPD_SETRESP(this, "Interrupt");
1127 int psid = ntohl(fRequest.interrupt.sid);
1128 int type = ntohl(fRequest.interrupt.type);
1129 TRACET(TraceID(), REQ, "psid: "<<psid<<", type:"<<type);
1132 XrdProofdProofServ *xps = 0;
1133 if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1134 TRACET(TraceID(), XERR,
"session ID not found: "<<psid);
1135 response->Send(kXR_InvalidRequest,
"Interrupt: session ID not found");
1142 if (!xps->Match(psid)) {
1143 response->Send(kXP_InvalidRequest,
"Interrupt: IDs do not match - do nothing");
1148 XPDFORM(msg,
"xps: %p, link ID: %s, proofsrv PID: %d",
1149 xps, xps->Response()->TraceID(), xps->SrvPID());
1150 TRACET(TraceID(), DBG, msg.c_str());
1153 if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
1154 response->Send(kXP_ServerError,
1155 "Interrupt: could not propagate interrupt code to proofsrv");
1161 TRACET(TraceID(), DBG,
"interrupt propagated to proofsrv");
1174 int XrdProofdProtocol::Ping()
1176 XPDLOC(ALL,
"Protocol::Ping")
1180 if (TRACING(HDBG)) {
1181 XPD_SETRESP(
this,
"Ping");
1182 TRACET(TraceID(), HDBG,
"INT: nothing to do ");
1186 XPD_SETRESP(
this,
"Ping");
1189 int psid = ntohl(fRequest.sendrcv.sid);
1190 int asyncopt = ntohl(fRequest.sendrcv.opt);
1192 TRACET(TraceID(), REQ,
"psid: "<<psid<<
", async: "<<asyncopt);
1196 XrdProofdProofServ *xps = 0;
1197 if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
1198 TRACET(TraceID(), XERR,
"session ID not found: "<<psid);
1199 response->Send(kXR_InvalidRequest,
"session ID not found");
1204 kXR_int32 pingres = (psid > -1) ? 0 : 1;
1205 if (psid > -1 && xps && xps->IsValid()) {
1207 TRACET(TraceID(), DBG,
"EXT: psid: "<<psid);
1210 kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
1213 if (asyncopt == 1) {
1214 TRACET(TraceID(), DBG,
"EXT: async: notifying timeout to client: "<<checkfq<<
" secs");
1215 response->Send(kXR_ok, checkfq);
1219 XrdOucString path(xps->AdminPath());
1220 if (path.length() <= 0) {
1221 TRACET(TraceID(), XERR,
"EXT: admin path is empty! - protocol error");
1223 response->Send(kXP_ServerError,
"EXT: admin path is empty! - protocol error");
1233 if (stat(path.c_str(), &st0) != 0) {
1234 TRACET(TraceID(), XERR,
"EXT: cannot stat admin path: "<<path);
1236 response->Send(kXP_ServerError,
"EXT: cannot stat admin path");
1241 int pid = xps->SrvPID();
1243 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1245 if ((now - st0.st_mtime) > checkfq - 5) {
1247 if (xps->VerifyProofServ(1) != 0) {
1248 TRACET(TraceID(), XERR,
"EXT: could not send verify request to proofsrv");
1250 response->Send(kXP_ServerError,
"EXT: could not verify reuqest to proofsrv");
1257 if (stat(path.c_str(), &st1) == 0) {
1258 if (st1.st_mtime > st0.st_mtime) {
1264 TRACET(TraceID(), DBG,
"EXT: waiting "<<ns<<
" secs for session "<<pid<<
1265 " to touch the admin path");
1279 TRACET(TraceID(), DBG,
"EXT: notified the result to client: "<<pingres);
1280 if (asyncopt == 0) {
1281 response->Send(kXR_ok, pingres);
1284 int len =
sizeof(kXR_int32);
1285 char *buf =
new char[len];
1287 kXR_int32 ifw = (kXR_int32)0;
1288 ifw =
static_cast<kXR_int32
>(htonl(ifw));
1289 memcpy(buf, &ifw,
sizeof(kXR_int32));
1290 response->Send(kXR_attn, kXPD_ping, buf, len);
1293 }
else if (psid > -1) {
1295 TRACET(TraceID(), XERR,
"session ID not found: "<<psid);
1299 response->Send(kXR_ok, pingres);
1308 void XrdProofdProtocol::PostSession(
int on,
const char *u,
const char *g,
1309 XrdProofdProofServ *xps)
1311 XPDLOC(ALL,
"Protocol::PostSession")
1314 if (fgMgr && fgMgr->PriorityMgr()) {
1315 int pid = (xps) ? xps->SrvPID() : -1;
1317 TRACE(XERR,
"undefined session or process id");
1321 XPDFORM(buf,
"%d %s %s %d", on, u, g, pid);
1323 if (fgMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kChangeStatus,
1324 buf.c_str()) != 0) {
1325 TRACE(XERR,
"problem posting the prority manager pipe");
1329 if (fgMgr && fgMgr->ProofSched()) {
1330 if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
1331 TRACE(DBG,
"posting the scheduler pipe");
1332 if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
1333 TRACE(XERR,
"problem posting the scheduler pipe");
1338 if (fgMgr && fgMgr->SessionMgr()) {
1339 if (fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kChgSessionSt, 0) != 0) {
1340 TRACE(XERR,
"problem posting the session manager pipe");
1350 void XrdProofdProtocol::TouchAdminPath()
1352 XPDLOC(ALL,
"Protocol::TouchAdminPath")
1354 XPD_SETRESPV(this, "TouchAdminPath");
1355 TRACET(TraceID(), HDBG, fAdminPath);
1357 if (fAdminPath.length() > 0) {
1359 if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
1363 XrdOucString apath = fAdminPath;
1364 if (rc == -ENOENT && Internal()) {
1365 apath.replace(
"/activesessions/",
"/terminatedsessions/");
1366 apath.replace(
".status",
"");
1367 rc = XrdProofdAux::Touch(apath.c_str());
1369 if (rc != 0 && rc != -ENOENT) {
1370 const char *type = Internal() ?
"internal" :
"external";
1371 TRACET(TraceID(), XERR, type<<
": problems touching "<<apath<<
"; errno: "<<-rc);
1382 int XrdProofdProtocol::CtrlC()
1384 XPDLOC(ALL,
"Protocol::CtrlC")
1386 TRACET(TraceID(), ALL, "handling request");
1388 { XrdSysMutexHelper mhp(fCtrlcMutex);
1394 if (fgMgr->SrvType() != kXPD_Worker) {
1395 if (fgMgr->NetMgr()) {
1396 fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());