44 #include <sys/socket.h>
55 XrdOucTrace *XrdProofdTrace = 0;
56 static XrdSysLogger eLogger;
57 static XrdSysError eDest(0,
"Proofx");
60 ULong64_t TSocket::fgBytesSent;
61 ULong64_t TSocket::fgBytesRecv;
71 void TXSocket::DoError(
int level,
const char *location,
const char *fmt, va_list va)
const
73 ::ErrorHandler(level, Form(
"TXSocket::%s", location), fmt, va);
79 class TXSocketPingHandler :
public TFileHandler {
82 TXSocketPingHandler(TXSocket *s, Int_t fd)
83 : TFileHandler(fd, 1) { fSocket = s; }
85 Bool_t ReadNotify() {
return Notify(); }
91 Bool_t TXSocketPingHandler::Notify()
93 fSocket->Ping(
"ping handler");
99 Bool_t TXSocket::fgInitDone = kFALSE;
102 TXSockPipe TXSocket::fgPipe;
103 TString TXSocket::fgLoc =
"undef";
106 std::mutex TXSocket::fgSMtx;
107 std::list<TXSockBuf *> TXSocket::fgSQue;
108 Long64_t TXSockBuf::fgBuffMem = 0;
109 Long64_t TXSockBuf::fgMemMax = 10485760;
127 TXSocket::TXSocket(
const char *url, Char_t m, Int_t psid, Char_t capver,
128 const char *logbuf, Int_t loglevel, TXHandler *handler)
129 : TSocket(), fMode(m), fLogLevel(loglevel),
130 fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
131 fDontTimeout(kFALSE), fRDInterrupt(kFALSE), fXrdProofdVersion(-1)
135 eDest.logger(&eLogger);
137 XrdProofdTrace =
new XrdOucTrace(&eDest);
156 fRemoteProtocol = -1;
158 fSendOpt = (fMode ==
'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
159 fSessionID = (fMode ==
'C') ? -1 : psid;
167 if (!fgPipe.IsValid()) {
168 Error(
"TXSocket",
"internal pipe is invalid");
174 fAddress = gSystem->GetHostByName(u.GetHost());
175 u.SetProtocol(
"proof", kTRUE);
176 fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
185 char md = (fMode !=
'A' && fMode !=
'C') ? fMode :
'M';
186 fConn =
new XrdProofConn(url, md, psid, capver,
this, fBuffer.Data());
187 if (!fConn || !(fConn->IsValid())) {
188 if (fConn->GetServType() != XrdProofConn::kSTProofd)
190 Error(
"TXSocket",
"fatal error occurred while opening a connection"
191 " to server [%s]: %s", url, fConn->GetLastErr());
196 fUser = fConn->fUser.c_str();
197 fHost = fConn->fHost.c_str();
198 fPort = fConn->fPort;
201 if (fMode ==
'm' || fMode ==
's' || fMode ==
'M' || fMode ==
'A'|| fMode ==
'L') {
205 Error(
"TXSocket",
"create or attach failed (%s)",
206 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() :
"-"));
214 fXrdProofdVersion = fConn->fRemoteProtocol;
215 fRemoteProtocol = fConn->fRemoteProtocol;
219 fUrl = fConn->fUrl.GetUrl().c_str();
220 fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
221 fAddress.fPort = fPort;
224 fPid = gSystem->GetPid();
231 TXSocket::~TXSocket()
242 void TXSocket::SetLocation(
const char *loc)
256 void TXSocket::SetSessionID(Int_t
id)
268 void TXSocket::DisconnectSession(Int_t
id, Option_t *opt)
273 Info(
"DisconnectSession",
"not connected: nothing to do");
277 Bool_t shutdown = opt && (strchr(opt,
'S') || strchr(opt,
's'));
278 Bool_t all = opt && (strchr(opt,
'A') || strchr(opt,
'a'));
280 if (
id > -1 || all) {
282 XPClientRequest Request;
283 memset(&Request, 0,
sizeof(Request) );
284 fConn->SetSID(Request.header.streamid);
286 Request.proof.requestid = kXP_destroy;
288 Request.proof.requestid = kXP_detach;
289 Request.proof.sid = id;
292 XrdClientMessage *xrsp =
293 fConn->SendReq(&Request, (
const void *)0, 0,
"DisconnectSession");
296 if (!xrsp && fConn->GetLastErr())
297 Printf(
"%s: %s", fHost.Data(), fConn->GetLastErr());
311 void TXSocket::Close(Option_t *opt)
313 Int_t to = gEnv->GetValue(
"XProof.AsynProcSemTimeout", 60);
314 if (fAsynProc.Wait(to*1000) != 0)
315 Warning(
"Close",
"could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
318 TXSocket::fgPipe.Flush(
this);
323 Info(
"Close",
"no connection: nothing to do");
336 Int_t sessID = fSessionID;
337 if (o.Index(
"#") != kNPOS) {
338 o.Remove(0,o.Index(
"#")+1);
339 if (o.Index(
"#") != kNPOS) {
340 o.Remove(o.Index(
"#"));
341 sessID = o.IsDigit() ? o.Atoi() : sessID;
347 DisconnectSession(sessID, opt);
368 UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
371 UnsolRespProcResult rc = kUNSOL_KEEP;
374 TXSemaphoreGuard semg(&fAsynProc);
375 if (!semg.IsValid()) {
376 Error(
"ProcessUnsolicitedMsg",
"%p: async semaphore taken by Close()! Should not be here!",
this);
377 return kUNSOL_CONTINUE;
382 Info(
"ProcessUnsolicitedMsg",
"%p: got empty message: skipping",
this);
384 return kUNSOL_CONTINUE;
387 Info(
"ProcessUnsolicitedMsg",
"%p: got message with status: %d, len: %d bytes (ID: %d)",
388 this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
393 if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
395 Info(
"ProcessUnsolicitedMsg",
"%p: got error from underlying connection",
this);
396 XHandleErr_t herr = {1, 0};
397 if (!fHandler || fHandler->HandleError((
const void *)&herr)) {
399 Info(
"ProcessUnsolicitedMsg",
"%p: handler undefined or recovery failed",
this);
409 Info(
"ProcessUnsolicitedMsg",
"%p: underlying connection timed out",
this);
412 return kUNSOL_CONTINUE;
417 if (fConn && !m->MatchStreamid(fConn->fStreamid)) {
419 Info(
"ProcessUnsolicitedMsg",
"%p: IDs do not match: {%d, %d}",
this, fConn->fStreamid, m->HeaderSID());
420 return kUNSOL_CONTINUE;
425 if ((len = m->DataLen()) < (
int)
sizeof(kXR_int32)) {
426 Error(
"ProcessUnsolicitedMsg",
"empty or bad-formed message - disabling");
427 PostMsg(kPROOF_STOP);
436 memcpy(&acod, m->GetData(),
sizeof(kXR_int32));
438 Info(
"ProcessUnsolicitedMsg",
"%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
439 this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
442 void *pdata = (
void *)((
char *)(m->GetData()) +
sizeof(kXR_int32));
443 len -=
sizeof(kXR_int32);
445 Info(
"ProcessUnsolicitedMsg",
"%p: got action: %d (%d bytes) (ID: %d)",
446 this, acod, len, m->HeaderSID());
449 fgPipe.DumpReadySock();
459 ilev = TProof::kPing;
464 lab = !lab ?
"kXPD_interrupt" : lab;
465 { std::lock_guard<std::recursive_mutex> lock(fIMtx);
466 if (acod == kXPD_interrupt) {
467 memcpy(&ilev, pdata,
sizeof(kXR_int32));
468 ilev = net2host(ilev);
470 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
471 len -=
sizeof(kXR_int32);
476 memcpy(&ifw, pdata,
sizeof(kXR_int32));
479 Info(
"ProcessUnsolicitedMsg",
"%s: forwarding option: %d", lab, ifw);
484 fIForward = (ifw == 1) ? kTRUE : kFALSE;
488 XHandleIn_t hin = {acod, 0, 0, 0};
490 fHandler->HandleInput((
const void *)&hin);
492 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
503 memcpy(&opt, pdata,
sizeof(kXR_int32));
506 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found opt: %d", opt);
508 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
509 len -=
sizeof(kXR_int32);
513 memcpy(&delay, pdata,
sizeof(kXR_int32));
514 delay = net2host(delay);
516 Info(
"ProcessUnsolicitedMsg",
"kXPD_timer: found delay: %d", delay);
518 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
519 len -=
sizeof(kXR_int32);
524 XHandleIn_t hin = {acod, opt, delay, 0};
526 fHandler->HandleInput((
const void *)&hin);
528 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
535 kXR_int32 inflate = 1000;
537 memcpy(&inflate, pdata,
sizeof(kXR_int32));
538 inflate = net2host(inflate);
540 Info(
"ProcessUnsolicitedMsg",
"kXPD_inflate: factor: %d", inflate);
542 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
543 len -=
sizeof(kXR_int32);
547 XHandleIn_t hin = {acod, inflate, 0, 0};
549 fHandler->HandleInput((
const void *)&hin);
551 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
558 kXR_int32 priority = -1;
560 memcpy(&priority, pdata,
sizeof(kXR_int32));
561 priority = net2host(priority);
563 Info(
"ProcessUnsolicitedMsg",
"kXPD_priority: priority: %d", priority);
565 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
566 len -=
sizeof(kXR_int32);
570 XHandleIn_t hin = {acod, priority, 0, 0};
572 fHandler->HandleInput((
const void *)&hin);
574 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
583 XHandleIn_t hin = {acod, 0, 0, 0};
585 fHandler->HandleInput((
const void *)&hin);
587 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
597 memcpy(&type, pdata,
sizeof(kXR_int32));
598 type = net2host(type);
600 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found type: %d", type);
602 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
603 len -=
sizeof(kXR_int32);
608 memcpy(&int1, pdata,
sizeof(kXR_int32));
609 int1 = net2host(int1);
611 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int1: %d", int1);
613 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
614 len -=
sizeof(kXR_int32);
619 memcpy(&int2, pdata,
sizeof(kXR_int32));
620 int2 = net2host(int2);
622 Info(
"ProcessUnsolicitedMsg",
"kXPD_urgent: found int2: %d", int2);
624 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
625 len -=
sizeof(kXR_int32);
630 XHandleIn_t hin = {acod, type, int1, int2};
632 fHandler->HandleInput((
const void *)&hin);
634 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
640 { std::lock_guard<std::recursive_mutex> lock(fAMtx);
643 TXSockBuf *b = PopUpSpare(len);
645 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
648 memcpy(b->fBuf, pdata, len);
662 Info(
"ProcessUnsolicitedMsg",
"%p: %s: posting semaphore: %p (%d bytes)",
663 this, GetTitle(), &fASem, len);
669 Info(
"ProcessUnsolicitedMsg",
670 "kXPD_feedback treatment not yet implemented");
678 memcpy(&opt, pdata,
sizeof(kXR_int32));
680 if (opt >= 0 && opt <= 4) {
682 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
683 len -=
sizeof(kXR_int32);
690 Printf(
"| %.*s", len, (
char *)pdata);
691 }
else if (opt == 2) {
693 Printf(
"%.*s", len, (
char *)pdata);
694 }
else if (opt == 3) {
696 fprintf(stderr,
"%.*s", len, (
char *)pdata);
697 }
else if (opt == 4) {
699 fprintf(stderr,
"%.*s\r", len, (
char *)pdata);
703 Printf(
"| Message from server:");
704 Printf(
"| %.*s", len, (
char *)pdata);
712 Printf(
"| Error condition occured: message from server:");
713 Printf(
"| %.*s", len, (
char *)pdata);
717 fHandler->HandleError();
719 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
724 { std::lock_guard<std::recursive_mutex> lock(fAMtx);
728 memcpy(&cid, pdata,
sizeof(kXR_int32));
732 Info(
"ProcessUnsolicitedMsg",
"found cid: %d", cid);
735 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
736 len -=
sizeof(kXR_int32);
739 TXSockBuf *b = PopUpSpare(len);
741 Error(
"ProcessUnsolicitedMsg",
"could allocate spare buffer");
744 memcpy(b->fBuf, pdata, len);
761 Info(
"ProcessUnsolicitedMsg",
"%p: cid: %d, posting semaphore: %p (%d bytes)",
762 this, cid, &fASem, len);
770 { TString what = TString::Format(
"%.*s", len, (
char *)pdata);
771 if (what.BeginsWith(
"idle-timeout")) {
773 PostMsg(kPROOF_FATAL, kPROOF_WorkerIdleTO);
776 Printf(
"| %s", what.Data());
779 fHandler->HandleError();
781 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
789 PostMsg(kPROOF_TOUCH);
794 PostMsg(kPROOF_STARTPROCESS);
796 case kXPD_clusterinfo:
800 kXR_int32 nsess = -1, nacti = -1, neffs = -1;
803 memcpy(&nsess, pdata,
sizeof(kXR_int32));
804 nsess = net2host(nsess);
805 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
806 len -=
sizeof(kXR_int32);
808 memcpy(&nacti, pdata,
sizeof(kXR_int32));
809 nacti = net2host(nacti);
810 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
811 len -=
sizeof(kXR_int32);
813 memcpy(&neffs, pdata,
sizeof(kXR_int32));
814 neffs = net2host(neffs);
815 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
816 len -=
sizeof(kXR_int32);
819 Info(
"ProcessUnsolicitedMsg",
"kXPD_clusterinfo: # sessions: %d,"
820 " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
823 XHandleIn_t hin = {acod, nsess, nacti, neffs};
825 fHandler->HandleInput((
const void *)&hin);
827 Error(
"ProcessUnsolicitedMsg",
"handler undefined");
831 Error(
"ProcessUnsolicitedMsg",
"%p: unknown action code: %d received from '%s' - disabling",
832 this, acod, GetTitle());
833 PostMsg(kPROOF_STOP);
848 void TXSocket::PostMsg(Int_t type,
const char *msg)
854 if (msg && strlen(msg) > 0)
861 char *mbuf = m.Buffer();
862 Int_t mlen = m.Length();
863 if (m.CompBuffer()) {
864 mbuf = m.CompBuffer();
865 mlen = m.CompLength();
870 std::lock_guard<std::recursive_mutex> lock(fAMtx);
873 TXSockBuf *b = PopUpSpare(mlen);
875 Error(
"PostMsg",
"could allocate spare buffer");
880 memcpy(b->fBuf, mbuf, mlen);
894 Info(
"PostMsg",
"%p: posting type %d to semaphore: %p (%d bytes)",
895 this, type, &fASem, mlen);
905 void TXSocket::PostSemAll()
907 std::lock_guard<std::recursive_mutex> lock(fAMtx);
910 while (fASem.TryWait() != 1)
919 Int_t TXSocket::GetLogConnID()
const
921 return (fConn ? fConn->GetLogConnID() : -1);
927 Int_t TXSocket::GetOpenError()
const
929 return (fConn ? fConn->GetOpenError() : -1);
935 Int_t TXSocket::GetServType()
const
937 return (fConn ? fConn->GetServType() : -1);
943 Int_t TXSocket::GetSessionID()
const
945 return (fConn ? fConn->GetSessionID() : -1);
951 Bool_t TXSocket::IsValid()
const
953 return (fConn ? (fConn->IsValid()) : kFALSE);
959 Bool_t TXSocket::IsServProofd()
961 if (fConn && (fConn->GetServType() == XrdProofConn::kSTProofd))
972 Int_t TXSocket::GetInterrupt(Bool_t &forward)
975 Info(
"GetInterrupt",
"%p: waiting to lock mutex",
this);
977 std::lock_guard<std::recursive_mutex> lock(fIMtx);
985 Error(
"GetInterrupt",
"value is unset (%d) - protocol error",fILev);
1004 Int_t TXSocket::Flush()
1007 list<TXSockBuf *> splist;
1008 list<TXSockBuf *>::iterator i;
1010 { std::lock_guard<std::recursive_mutex> lock(fAMtx);
1013 if (fAQue.size() > 0) {
1016 Int_t sz = fAQue.size();
1018 for (i = fAQue.begin(); i != fAQue.end();) {
1020 splist.push_back(*i);
1028 if (fASem.TryWait() == 1)
1029 Printf(
"Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
1036 { std::lock_guard<std::mutex> lock(fgSMtx);
1037 if (splist.size() > 0) {
1038 for (i = splist.begin(); i != splist.end();) {
1039 fgSQue.push_back(*i);
1040 i = splist.erase(i);
1053 Bool_t TXSocket::Create(Bool_t attach)
1058 Info(
"Create",
"not connected: nothing to do");
1062 Int_t retriesleft = gEnv->GetValue(
"XProof.CreationRetries", 4);
1064 while (retriesleft--) {
1066 XPClientRequest reqhdr;
1069 memset( &reqhdr, 0,
sizeof(reqhdr));
1070 fConn->SetSID(reqhdr.header.streamid);
1073 if (fMode ==
'A' || attach) {
1074 reqhdr.header.requestid = kXP_attach;
1075 reqhdr.proof.sid = fSessionID;
1077 reqhdr.header.requestid = kXP_create;
1081 reqhdr.proof.int1 = fLogLevel;
1084 const void *buf = (
const void *)(fBuffer.Data());
1085 reqhdr.header.dlen = fBuffer.Length();
1087 Info(
"Create",
"sending %d bytes to server", reqhdr.header.dlen);
1091 Info(
"Create",
"creating session of server %s", fUrl.Data());
1095 XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
1096 &answData,
"TXSocket::Create", 0);
1097 struct ServerResponseBody_Protocol *srvresp = (
struct ServerResponseBody_Protocol *)answData;
1105 void *pdata = (
void *)(xrsp->GetData());
1106 Int_t len = xrsp->DataLen();
1108 if (len >= (Int_t)
sizeof(kXR_int32)) {
1111 memcpy(&psid, pdata,
sizeof(kXR_int32));
1112 fSessionID = net2host(psid);
1113 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1114 len -=
sizeof(kXR_int32);
1116 Error(
"Create",
"session ID is undefined!");
1118 if (srvresp) free(srvresp);
1122 if (len >= (Int_t)
sizeof(kXR_int16)) {
1125 memcpy(&dver, pdata,
sizeof(kXR_int16));
1126 fRemoteProtocol = net2host(dver);
1127 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1128 len -=
sizeof(kXR_int16);
1130 Warning(
"Create",
"protocol version of the remote PROOF undefined!");
1133 if (fRemoteProtocol == 0) {
1135 len +=
sizeof(kXR_int16);
1137 memcpy(&dver, pdata,
sizeof(kXR_int32));
1138 fRemoteProtocol = net2host(dver);
1139 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int32));
1140 len -=
sizeof(kXR_int32);
1142 if (len >= (Int_t)
sizeof(kXR_int16)) {
1145 memcpy(&dver, pdata,
sizeof(kXR_int16));
1146 fXrdProofdVersion = net2host(dver);
1147 pdata = (
void *)((
char *)pdata +
sizeof(kXR_int16));
1148 len -=
sizeof(kXR_int16);
1150 Warning(
"Create",
"version of the remote XrdProofdProtocol undefined!");
1156 char *url =
new char[len+1];
1157 memcpy(url, pdata, len);
1165 if (srvresp) free(srvresp);
1171 Ssiz_t ilog = kNPOS;
1172 if (retriesleft <= 0 && fConn->GetLastErr()) {
1173 fBuffer = fConn->GetLastErr();
1174 if ((ilog = fBuffer.Index(
"|log:")) != kNPOS) fBuffer.Remove(0, ilog);
1177 if (fConn->GetOpenError() == kXP_TooManySess) {
1180 if (srvresp) free(srvresp);
1184 if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr()) {
1185 TString emsg(fConn->GetLastErr());
1186 if ((ilog = emsg.Index(
"|log:")) != kNPOS) emsg.Remove(ilog);
1187 Printf(
"%s: %s", fHost.Data(), emsg.Data());
1193 Info(
"Create",
"creation/attachment attempt failed: %d attempts left", retriesleft);
1194 if (retriesleft <= 0)
1195 Error(
"Create",
"%d creation/attachment attempts failed: no attempts left",
1196 gEnv->GetValue(
"XProof.CreationRetries", 4));
1198 if (srvresp) free(srvresp);
1207 "problems creating or attaching to a remote server (%s)",
1208 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() :
"-"));
1218 Int_t TXSocket::SendRaw(
const void *buffer, Int_t length, ESendRecvOptions opt)
1220 TSystem::ResetErrno();
1223 fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
1224 : (~kXPD_async & fSendOpt) ;
1227 XPClientRequest Request;
1228 memset( &Request, 0,
sizeof(Request) );
1229 fConn->SetSID(Request.header.streamid);
1230 Request.sendrcv.requestid = kXP_sendmsg;
1231 Request.sendrcv.sid = fSessionID;
1232 Request.sendrcv.opt = fSendOpt;
1233 Request.sendrcv.cid = GetClientID();
1234 Request.sendrcv.dlen = length;
1236 Info(
"SendRaw",
"sending %d bytes to server", Request.sendrcv.dlen);
1239 XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0,
"SendRaw");
1243 Int_t nsent = length;
1246 fBytesSent += length;
1258 if (fConn->GetLastErr())
1259 Printf(
"%s: %s", fHost.Data(), fConn->GetLastErr());
1261 Printf(
"%s: error occured but no message from server", fHost.Data());
1265 Error(
"SendRaw",
"%s: problems sending %d bytes to server",
1266 fHost.Data(), length);
1275 Bool_t TXSocket::Ping(
const char *ord)
1277 TSystem::ResetErrno();
1280 Info(
"Ping",
"%p: %s: sid: %d",
this, ord ? ord :
"int", fSessionID);
1284 Error(
"Ping",
"not connected: nothing to do");
1289 kXR_int32 options = (fMode ==
'i') ? kXPD_internal : 0;
1292 XPClientRequest Request;
1293 memset( &Request, 0,
sizeof(Request) );
1294 fConn->SetSID(Request.header.streamid);
1295 Request.sendrcv.requestid = kXP_ping;
1296 Request.sendrcv.sid = fSessionID;
1297 Request.sendrcv.opt = options;
1298 Request.sendrcv.dlen = 0;
1301 Bool_t res = kFALSE;
1304 XrdClientMessage *xrsp =
1305 fConn->SendReq(&Request, (
const void *)0, &pans,
"Ping");
1306 kXR_int32 *pres = (kXR_int32 *) pans;
1309 if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
1310 *pres = net2host(*pres);
1311 res = (*pres == 1) ? kTRUE : kFALSE;
1316 if (fConn->GetLastErr())
1317 Printf(
"%s: %s", fHost.Data(), fConn->GetLastErr());
1322 if (pans) free(pans);
1325 if (XPD::clientMarshall(&Request) == 0) {
1326 XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
1327 res = (e == kOK) ? kTRUE : kFALSE;
1329 Error(
"Ping",
"%p: int: problems marshalling request",
this);
1335 Error(
"Ping",
"%p: %s: problems sending ping to server",
this, ord ? ord :
"int");
1336 }
else if (gDebug > 0) {
1337 Info(
"Ping",
"%p: %s: sid: %d OK",
this, ord ? ord :
"int", fSessionID);
1347 void TXSocket::RemoteTouch()
1349 TSystem::ResetErrno();
1352 Info(
"RemoteTouch",
"%p: sending touch request to %s",
this, GetName());
1356 Error(
"RemoteTouch",
"not connected: nothing to do");
1361 XPClientRequest Request;
1362 memset( &Request, 0,
sizeof(Request) );
1363 fConn->SetSID(Request.header.streamid);
1364 Request.sendrcv.requestid = kXP_touch;
1365 Request.sendrcv.sid = fSessionID;
1366 Request.sendrcv.opt = 0;
1367 Request.sendrcv.dlen = 0;
1370 if (XPD::clientMarshall(&Request) != 0) {
1371 Error(
"Touch",
"%p: problems marshalling request ",
this);
1374 if (fConn->LowWrite(&Request, 0, 0) != kOK)
1375 Error(
"Touch",
"%p: problems sending touch request to server",
this);
1385 void TXSocket::CtrlC()
1387 TSystem::ResetErrno();
1390 Info(
"CtrlC",
"%p: sending ctrl-c request to %s",
this, GetName());
1394 Error(
"CtrlC",
"not connected: nothing to do");
1399 XPClientRequest Request;
1400 memset( &Request, 0,
sizeof(Request) );
1401 fConn->SetSID(Request.header.streamid);
1402 Request.proof.requestid = kXP_ctrlc;
1403 Request.proof.sid = 0;
1404 Request.proof.dlen = 0;
1407 if (XPD::clientMarshall(&Request) != 0) {
1408 Error(
"CtrlC",
"%p: problems marshalling request ",
this);
1411 if (fConn->LowWrite(&Request, 0, 0) != kOK)
1412 Error(
"CtrlC",
"%p: problems sending ctrl-c request to server",
this);
1421 Int_t TXSocket::PickUpReady()
1427 Info(
"PickUpReady",
"%p: %s: going to sleep",
this, GetTitle());
1430 if (!fDontTimeout) {
1431 static Int_t timeout = gEnv->GetValue(
"XProof.ReadTimeout", 300) * 1000;
1432 static Int_t dt = 2000;
1434 SetInterrupt(kFALSE);
1435 while (to && !IsInterrupt()) {
1437 if (fASem.Wait(dt) != 0) {
1440 Error(
"PickUpReady",
"error waiting at semaphore");
1444 Info(
"PickUpReady",
"%p: %s: got timeout: retring (%d secs)",
1445 this, GetTitle(), to/1000);
1452 if (IsInterrupt()) {
1454 Info(
"PickUpReady",
"interrupted");
1455 SetInterrupt(kFALSE);
1462 if (fASem.Wait() != 0) {
1463 Error(
"PickUpReady",
"error waiting at semaphore");
1470 Info(
"PickUpReady",
"%p: %s: waken up",
this, GetTitle());
1472 std::lock_guard<std::recursive_mutex> lock(fAMtx);
1475 if (fAQue.size() <= 0) {
1476 Error(
"PickUpReady",
"queue is empty - protocol error ?");
1479 if (!(fBufCur = fAQue.front())) {
1480 Error(
"PickUpReady",
"got invalid buffer - protocol error ?");
1487 fByteLeft = fBufCur->fLen;
1490 Info(
"PickUpReady",
"%p: %s: got message (%d bytes)",
1491 this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
1494 fBytesRecv += fBufCur->fLen;
1497 if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
1498 SetClientID(fBufCur->fCid);
1512 TXSockBuf *TXSocket::PopUpSpare(Int_t size)
1515 static Int_t nBuf = 0;
1517 std::lock_guard<std::mutex> lock(fgSMtx);
1520 if (fgSQue.size() > 0) {
1521 list<TXSockBuf *>::iterator i;
1522 for (i = fgSQue.begin(); i != fgSQue.end(); ++i) {
1523 maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1524 if ((*i) && (*i)->fSiz >= size) {
1527 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1528 size, (
int) fgSQue.size(), nBuf, buf, buf->fSiz);
1535 buf = fgSQue.front();
1538 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1539 size, (
int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1546 buf =
new TXSockBuf((
char *)malloc(size), size);
1550 Info(
"PopUpSpare",
"asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1551 size, (
int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1560 void TXSocket::PushBackSpare()
1562 std::lock_guard<std::mutex> lock(fgSMtx);
1565 Info(
"PushBackSpare",
"release buf %p, sz: %d (BuffMem: %lld)",
1566 fBufCur, fBufCur->fSiz, TXSockBuf::BuffMem());
1568 if (TXSockBuf::BuffMem() < TXSockBuf::GetMemMax()) {
1569 fgSQue.push_back(fBufCur);
1581 Int_t TXSocket::RecvRaw(
void *buffer, Int_t length, ESendRecvOptions)
1584 if (!buffer || (length <= 0))
1588 if (!fBufCur && (PickUpReady() != 0))
1592 if (fByteLeft >= length) {
1593 memcpy(buffer, fBufCur->fBuf + fByteCur, length);
1595 if ((fByteLeft -= length) <= 0)
1603 memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
1604 Int_t at = fByteLeft;
1605 Int_t tobecopied = length - fByteLeft;
1607 while (tobecopied > 0) {
1609 if (PickUpReady() != 0)
1612 Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1613 memcpy((
void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
1615 if ((fByteLeft -= ncpy) <= 0)
1625 fBytesRecv += length;
1626 fgBytesRecv += length;
1638 Int_t TXSocket::SendInterrupt(Int_t type)
1640 TSystem::ResetErrno();
1643 XPClientRequest Request;
1644 memset(&Request, 0,
sizeof(Request) );
1645 fConn->SetSID(Request.header.streamid);
1646 if (type == (Int_t) TProof::kShutdownInterrupt)
1647 Request.interrupt.requestid = kXP_destroy;
1649 Request.interrupt.requestid = kXP_interrupt;
1650 Request.interrupt.sid = fSessionID;
1651 Request.interrupt.type = type;
1652 Request.interrupt.dlen = 0;
1655 XrdClientMessage *xrsp =
1656 fConn->SendReq(&Request, (
const void *)0, 0,
"SendInterrupt");
1666 if (fConn->GetLastErr())
1667 Printf(
"%s: %s", fHost.Data(), fConn->GetLastErr());
1671 Error(
"SendInterrupt",
"problems sending interrupt to server");
1677 void TXSocket::SetInterrupt(Bool_t i)
1679 std::lock_guard<std::recursive_mutex> lock(fAMtx);
1681 if (i && fConn) fConn->SetInterrupt();
1682 if (i && fAWait) fASem.Post();
1689 Int_t TXSocket::Send(
const TMessage &mess)
1691 TSystem::ResetErrno();
1693 if (mess.IsReading()) {
1694 Error(
"Send",
"cannot send a message used for reading");
1699 SendStreamerInfos(mess);
1702 SendProcessIDs(mess);
1706 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
1707 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
1709 if (mess.GetCompressionLevel() > 0)
1710 const_cast<TMessage&>(mess).Compress();
1712 char *mbuf = mess.Buffer();
1713 Int_t mlen = mess.Length();
1714 if (mess.CompBuffer()) {
1715 mbuf = mess.CompBuffer();
1716 mlen = mess.CompLength();
1720 kXR_int32 fSendOptDefault = fSendOpt;
1721 switch (mess.What()) {
1722 case kPROOF_PROCESS:
1723 fSendOpt |= kXPD_process;
1725 case kPROOF_PROGRESS:
1726 case kPROOF_FEEDBACK:
1727 fSendOpt |= kXPD_fb_prog;
1729 case kPROOF_QUERYSUBMITTED:
1730 fSendOpt |= kXPD_querynum;
1731 fSendOpt |= kXPD_fb_prog;
1733 case kPROOF_STARTPROCESS:
1734 fSendOpt |= kXPD_startprocess;
1735 fSendOpt |= kXPD_fb_prog;
1737 case kPROOF_STOPPROCESS:
1738 fSendOpt |= kXPD_fb_prog;
1740 case kPROOF_SETIDLE:
1741 fSendOpt |= kXPD_setidle;
1742 fSendOpt |= kXPD_fb_prog;
1744 case kPROOF_LOGFILE:
1745 case kPROOF_LOGDONE:
1746 if (GetClientIDSize() <= 1)
1747 fSendOpt |= kXPD_logmsg;
1754 Info(
"Send",
"sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
1756 Int_t nsent = SendRaw(mbuf, mlen);
1757 fSendOpt = fSendOptDefault;
1762 fBytesSent += nsent;
1763 fgBytesSent += nsent;
1765 return nsent -
sizeof(UInt_t);
1774 Int_t TXSocket::Recv(TMessage *&mess)
1776 TSystem::ResetErrno();
1786 if ((n = RecvRaw(&len,
sizeof(UInt_t))) <= 0) {
1790 len = net2host(len);
1792 char *buf =
new char[len+
sizeof(UInt_t)];
1793 if ((n = RecvRaw(buf+
sizeof(UInt_t), len)) <= 0) {
1799 fBytesRecv += n +
sizeof(UInt_t);
1800 fgBytesRecv += n +
sizeof(UInt_t);
1802 mess =
new TMessage(buf, len+
sizeof(UInt_t));
1805 if (RecvStreamerInfos(mess))
1809 if (RecvProcessIDs(mess))
1812 if (mess->What() & kMESS_ACK) {
1814 mess->SetWhat(mess->What() & ~kMESS_ACK);
1825 TObjString *TXSocket::SendCoordinator(Int_t kind,
const char *msg, Int_t int2,
1826 Long64_t l64, Int_t int3,
const char *)
1828 TObjString *sout = 0;
1831 XPClientRequest reqhdr;
1832 const void *buf = 0;
1835 memset(&reqhdr, 0,
sizeof(reqhdr));
1836 fConn->SetSID(reqhdr.header.streamid);
1837 reqhdr.header.requestid = kXP_admin;
1838 reqhdr.proof.int1 = kind;
1839 reqhdr.proof.int2 = int2;
1842 case kQueryROOTVersions:
1843 case kQuerySessions:
1845 reqhdr.proof.sid = 0;
1846 reqhdr.header.dlen = 0;
1847 vout = (
char **)&bout;
1849 case kCleanupSessions:
1850 reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
1851 : (kXR_int32) kXPD_TopMaster;
1852 reqhdr.proof.int3 = int2;
1853 reqhdr.proof.sid = fSessionID;
1854 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1855 buf = (msg) ? (
const void *)msg : buf;
1861 reqhdr.proof.sid = fSessionID;
1862 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1863 buf = (msg) ? (
const void *)msg : buf;
1864 vout = (
char **)&bout;
1866 case kQueryLogPaths:
1867 vout = (
char **)&bout;
1868 reqhdr.proof.int3 = int3;
1869 case kReleaseWorker:
1870 case kSendMsgToUser:
1871 case kGroupProperties:
1874 reqhdr.proof.sid = fSessionID;
1875 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1876 buf = (msg) ? (
const void *)msg : buf;
1879 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1880 buf = (msg) ? (
const void *)msg : buf;
1883 reqhdr.proof.sid = fSessionID;
1884 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1886 buf = (
const void *)msg;
1887 vout = (
char **)&bout;
1890 reqhdr.header.requestid = kXP_readbuf;
1891 reqhdr.readbuf.ofs = l64;
1892 reqhdr.readbuf.len = int2;
1893 if (int3 > 0 && fXrdProofdVersion < 1003) {
1894 Info(
"SendCoordinator",
"kReadBuffer: old server (ver %d < 1003):"
1895 " grep functionality not supported", fXrdProofdVersion);
1898 reqhdr.readbuf.int1 = int3;
1899 if (!msg || strlen(msg) <= 0) {
1900 Info(
"SendCoordinator",
"kReadBuffer: file path undefined");
1903 reqhdr.header.dlen = strlen(msg);
1904 buf = (
const void *)msg;
1905 vout = (
char **)&bout;
1908 Info(
"SendCoordinator",
"unknown message kind: %d", kind);
1913 Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
1914 XrdClientMessage *xrsp =
1915 fConn->SendReq(&reqhdr, buf, vout,
"TXSocket::SendCoordinator", noterr);
1920 if (bout && (xrsp->DataLen() > 0))
1921 sout =
new TObjString(TString(bout,xrsp->DataLen()));
1929 if (fConn->GetLastErr())
1930 Printf(
"%s: %s", fHost.Data(), fConn->GetLastErr());
1942 void TXSocket::SendUrgent(Int_t type, Int_t int1, Int_t int2)
1944 TSystem::ResetErrno();
1947 XPClientRequest Request;
1948 memset(&Request, 0,
sizeof(Request) );
1949 fConn->SetSID(Request.header.streamid);
1950 Request.proof.requestid = kXP_urgent;
1951 Request.proof.sid = fSessionID;
1952 Request.proof.int1 = type;
1953 Request.proof.int2 = int1;
1954 Request.proof.int3 = int2;
1955 Request.proof.dlen = 0;
1958 XrdClientMessage *xrsp =
1959 fConn->SendReq(&Request, (
const void *)0, 0,
"SendUrgent");
1967 if (fConn->GetLastErr())
1968 Printf(
"%s: %s", fHost.Data(), fConn->GetLastErr());
1977 Int_t TXSocket::GetLowSocket()
const {
1978 return (fConn ? fConn->GetLowSocket() : -1);
1984 void TXSocket::InitEnvs()
1987 Int_t deb = gEnv->GetValue(
"XProof.Debug", -1);
1988 EnvPutInt(NAME_DEBUG, deb);
1990 XrdProofdTrace->What |= TRACE_REQ;
1992 XrdProofdTrace->What |= TRACE_DBG;
1994 XrdProofdTrace->What |= TRACE_ALL;
1997 const char *cenv = 0;
2000 TString allowCO = gEnv->GetValue(
"XProof.ConnectDomainAllowRE",
"");
2001 if (allowCO.Length() > 0)
2002 EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
2005 TString denyCO = gEnv->GetValue(
"XProof.ConnectDomainDenyRE",
"");
2006 if (denyCO.Length() > 0)
2007 EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
2010 XrdProofConn::SetRetryParam(-1, -1);
2011 Int_t maxRetries = gEnv->GetValue(
"XProof.FirstConnectMaxCnt",5);
2012 EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
2013 Int_t connTO = gEnv->GetValue(
"XProof.ConnectTimeout", 2);
2014 EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
2017 Int_t recoTO = gEnv->GetValue(
"XProof.ReconnectWait",
2018 DFLT_RECONNECTWAIT);
2019 if (recoTO == DFLT_RECONNECTWAIT) {
2021 recoTO = gEnv->GetValue(
"XProof.ReconnectTimeout",
2022 DFLT_RECONNECTWAIT);
2024 EnvPutInt(NAME_RECONNECTWAIT, recoTO);
2027 Int_t requTO = gEnv->GetValue(
"XProof.RequestTimeout", 150);
2028 EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
2031 EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
2034 TString socks4Host = gEnv->GetValue(
"XNet.SOCKS4Host",
"");
2035 Int_t socks4Port = gEnv->GetValue(
"XNet.SOCKS4Port",-1);
2036 if (socks4Port > 0) {
2037 if (socks4Host.IsNull())
2039 socks4Host =
"127.0.0.1";
2040 EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
2041 EnvPutInt(NAME_SOCKS4PORT, socks4Port);
2045 TString autolog = gEnv->GetValue(
"XSec.Pwd.AutoLogin",
"1");
2046 if (autolog.Length() > 0 &&
2047 (!(cenv = gSystem->Getenv(
"XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2048 gSystem->Setenv(
"XrdSecPWDAUTOLOG",autolog.Data());
2052 netrc.Form(
"%s/.rootnetrc",gSystem->HomeDirectory());
2053 gSystem->Setenv(
"XrdSecNETRC", netrc.Data());
2055 TString alogfile = gEnv->GetValue(
"XSec.Pwd.ALogFile",
"");
2056 if (alogfile.Length() > 0)
2057 gSystem->Setenv(
"XrdSecPWDALOGFILE",alogfile.Data());
2059 TString verisrv = gEnv->GetValue(
"XSec.Pwd.VerifySrv",
"1");
2060 if (verisrv.Length() > 0 &&
2061 (!(cenv = gSystem->Getenv(
"XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2062 gSystem->Setenv(
"XrdSecPWDVERIFYSRV",verisrv.Data());
2064 TString srvpuk = gEnv->GetValue(
"XSec.Pwd.ServerPuk",
"");
2065 if (srvpuk.Length() > 0)
2066 gSystem->Setenv(
"XrdSecPWDSRVPUK",srvpuk.Data());
2069 TString cadir = gEnv->GetValue(
"XSec.GSI.CAdir",
"");
2070 if (cadir.Length() > 0)
2071 gSystem->Setenv(
"XrdSecGSICADIR",cadir.Data());
2073 TString crldir = gEnv->GetValue(
"XSec.GSI.CRLdir",
"");
2074 if (crldir.Length() > 0)
2075 gSystem->Setenv(
"XrdSecGSICRLDIR",crldir.Data());
2077 TString crlext = gEnv->GetValue(
"XSec.GSI.CRLextension",
"");
2078 if (crlext.Length() > 0)
2079 gSystem->Setenv(
"XrdSecGSICRLEXT",crlext.Data());
2081 TString ucert = gEnv->GetValue(
"XSec.GSI.UserCert",
"");
2082 if (ucert.Length() > 0)
2083 gSystem->Setenv(
"XrdSecGSIUSERCERT",ucert.Data());
2085 TString ukey = gEnv->GetValue(
"XSec.GSI.UserKey",
"");
2086 if (ukey.Length() > 0)
2087 gSystem->Setenv(
"XrdSecGSIUSERKEY",ukey.Data());
2089 TString upxy = gEnv->GetValue(
"XSec.GSI.UserProxy",
"");
2090 if (upxy.Length() > 0)
2091 gSystem->Setenv(
"XrdSecGSIUSERPROXY",upxy.Data());
2093 TString valid = gEnv->GetValue(
"XSec.GSI.ProxyValid",
"");
2094 if (valid.Length() > 0)
2095 gSystem->Setenv(
"XrdSecGSIPROXYVALID",valid.Data());
2097 TString deplen = gEnv->GetValue(
"XSec.GSI.ProxyForward",
"0");
2098 if (deplen.Length() > 0 &&
2099 (!(cenv = gSystem->Getenv(
"XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2100 gSystem->Setenv(
"XrdSecGSIPROXYDEPLEN",deplen.Data());
2102 TString pxybits = gEnv->GetValue(
"XSec.GSI.ProxyKeyBits",
"");
2103 if (pxybits.Length() > 0)
2104 gSystem->Setenv(
"XrdSecGSIPROXYKEYBITS",pxybits.Data());
2106 TString crlcheck = gEnv->GetValue(
"XSec.GSI.CheckCRL",
"1");
2107 if (crlcheck.Length() > 0 &&
2108 (!(cenv = gSystem->Getenv(
"XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2109 gSystem->Setenv(
"XrdSecGSICRLCHECK",crlcheck.Data());
2111 TString delegpxy = gEnv->GetValue(
"XSec.GSI.DelegProxy",
"0");
2112 if (delegpxy.Length() > 0 &&
2113 (!(cenv = gSystem->Getenv(
"XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2114 gSystem->Setenv(
"XrdSecGSIDELEGPROXY",delegpxy.Data());
2116 TString signpxy = gEnv->GetValue(
"XSec.GSI.SignProxy",
"1");
2117 if (signpxy.Length() > 0 &&
2118 (!(cenv = gSystem->Getenv(
"XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2119 gSystem->Setenv(
"XrdSecGSISIGNPROXY",signpxy.Data());
2122 if (gEnv->GetValue(
"XNet.PrintTAG",0) == 1)
2123 ::Info(
"TXSocket",
"(C) 2005 CERN TXSocket (XPROOF client) %s",
2124 gROOT->GetVersion());
2133 Int_t TXSocket::Reconnect()
2136 Info(
"Reconnect",
"%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2137 this, fConn, (fConn ? fConn->IsValid() : 0),
2138 fUrl.Data(), (fConn ? fConn->GetLogConnID() : -1));
2141 Int_t tryreconnect = gEnv->GetValue(
"TXSocket.Reconnect", 0);
2142 if (tryreconnect == 0 || fXrdProofdVersion < 1005) {
2143 if (tryreconnect == 0)
2144 Info(
"Reconnect",
"%p: reconnection attempts explicitly disabled!",
this);
2146 Info(
"Reconnect",
"%p: server does not support reconnections (protocol: %d < 1005)",
2147 this, fXrdProofdVersion);
2153 Info(
"Reconnect",
"%p: locking phyconn: %p",
this, fConn->fPhyConn);
2155 if (fConn->IsValid()) {
2157 if (fMode ==
'm' || fMode ==
's' || fMode ==
'M' || fMode ==
'A') {
2159 if (!Create(kTRUE)) {
2161 Error(
"TXSocket",
"create or attach failed (%s)",
2162 ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() :
"-"));
2172 Info(
"Reconnect",
"%p (c:%p): attempt %s (logid: %d)",
this, fConn,
2173 (fConn->IsValid() ?
"succeeded!" :
"failed"),
2174 fConn->GetLogConnID() );
2176 Info(
"Reconnect",
"%p (c:0x0): attempt failed",
this);
2181 return ((fConn && fConn->IsValid()) ? 0 : -1);
2187 TXSockBuf::TXSockBuf(Char_t *bp, Int_t sz, Bool_t own)
2199 TXSockBuf::~TXSockBuf()
2210 void TXSockBuf::Resize(Int_t sz)
2213 if ((fMem = (Char_t *)realloc(fMem, sz))) {
2214 fgBuffMem += (sz - fSiz);
2230 Long64_t TXSockBuf::BuffMem()
2238 Long64_t TXSockBuf::GetMemMax()
2246 void TXSockBuf::SetMemMax(Long64_t memmax)
2248 fgMemMax = memmax > 0 ? memmax : fgMemMax;
2259 TXSockPipe::TXSockPipe(
const char *loc) : fLoc(loc)
2262 if (pipe(fPipe) != 0) {
2263 Printf(
"TXSockPipe: problem initializing pipe for socket inputs");
2273 TXSockPipe::~TXSockPipe()
2275 if (fPipe[0] >= 0) close(fPipe[0]);
2276 if (fPipe[1] >= 0) close(fPipe[1]);
2284 Int_t TXSockPipe::Post(TSocket *s)
2286 if (!IsValid() || !s)
return -1;
2290 { std::lock_guard<std::recursive_mutex> lock(fMutex);
2296 if (write(fPipe[1],(
const void *)&c,
sizeof(Char_t)) < 1) {
2297 Printf(
"TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
2300 if (gDebug > 2) sz = fReadySock.GetSize();
2304 Printf(
"TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2305 fLoc.Data(), s, sz, fPipe[1]);
2313 Int_t TXSockPipe::Clean(TSocket *s)
2316 if (!IsValid() || !s)
return -1;
2321 { std::lock_guard<std::recursive_mutex> lock(fMutex);
2322 if (read(fPipe[0],(
void *)&c,
sizeof(Char_t)) < 1) {
2323 Printf(
"TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
2327 fReadySock.Remove(s);
2329 if (gDebug > 2) sz = fReadySock.GetSize();
2333 Printf(
"TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2334 fLoc.Data(), s, sz, fPipe[0]);
2344 Int_t TXSockPipe::Flush(TSocket *s)
2347 if (!IsValid() || !s)
return -1;
2351 { std::lock_guard<std::recursive_mutex> lock(fMutex);
2352 o = fReadySock.FindObject(s);
2356 fReadySock.Remove(s);
2357 o = fReadySock.FindObject(s);
2360 if (read(fPipe[0],(
void *)&c,
sizeof(Char_t)) < 1)
2361 Printf(
"TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
2365 ((TXSocket *)s)->Flush();
2369 Printf(
"TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
2378 void TXSockPipe::DumpReadySock()
2380 std::lock_guard<std::recursive_mutex> lock(fMutex);
2382 TString buf = Form(
"%d |", fReadySock.GetSize());
2383 TIter nxs(&fReadySock);
2386 buf += Form(
" %p",o);
2387 Printf(
"TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
2393 TXSocket *TXSockPipe::GetLastReady()
2395 std::lock_guard<std::recursive_mutex> lock(fMutex);
2397 return (TXSocket *) fReadySock.Last();