39 ULong64_t TSocket::fgBytesSent = 0;
40 ULong64_t TSocket::fgBytesRecv = 0;
60 Int_t TSocket::fgClientProtocol = 17;
62 TVirtualMutex *gSocketAuthMutex = 0;
76 TSocket::TSocket(TInetAddress addr,
const char *service, Int_t tcpwindowsize)
77 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
86 if (fService.Contains(
"root"))
88 if (fService.Contains(
"proof"))
91 fAddress.fPort = gSystem->GetServiceByName(service);
94 fTcpWindowSize = tcpwindowsize;
97 ResetBit(TSocket::kBrokenConn);
99 if (fAddress.GetPort() != -1) {
100 fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
103 if (fSocket != kInvalid) {
104 gROOT->GetListOfSockets()->Add(
this);
121 TSocket::TSocket(TInetAddress addr, Int_t port, Int_t tcpwindowsize)
122 : TNamed(addr.GetHostName(),
""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
127 fService = gSystem->GetServiceByPort(port);
131 if (fService.Contains(
"root"))
133 if (fService.Contains(
"proof"))
136 fAddress.fPort = port;
140 fTcpWindowSize = tcpwindowsize;
143 ResetBit(TSocket::kBrokenConn);
145 fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
147 if (fSocket == kInvalid)
150 gROOT->GetListOfSockets()->Add(
this);
164 TSocket::TSocket(
const char *host,
const char *service, Int_t tcpwindowsize)
165 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
174 if (fService.Contains(
"root"))
176 if (fService.Contains(
"proof"))
178 fAddress = gSystem->GetHostByName(host);
179 fAddress.fPort = gSystem->GetServiceByName(service);
180 SetName(fAddress.GetHostName());
183 fTcpWindowSize = tcpwindowsize;
186 ResetBit(TSocket::kBrokenConn);
188 if (fAddress.GetPort() != -1) {
189 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), tcpwindowsize);
190 if (fSocket != kInvalid) {
191 gROOT->GetListOfSockets()->Add(
this);
209 TSocket::TSocket(
const char *url, Int_t port, Int_t tcpwindowsize)
210 : TNamed(TUrl(url).GetHost(),
""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
216 TString host(TUrl(fUrl).GetHost());
218 fService = gSystem->GetServiceByPort(port);
222 if (fUrl.Contains(
"root"))
224 if (fUrl.Contains(
"proof"))
226 fAddress = gSystem->GetHostByName(host);
227 fAddress.fPort = port;
228 SetName(fAddress.GetHostName());
232 fTcpWindowSize = tcpwindowsize;
235 ResetBit(TSocket::kBrokenConn);
237 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), tcpwindowsize);
238 if (fSocket == kInvalid) {
239 fAddress.fPort = kInvalid;
241 gROOT->GetListOfSockets()->Add(
this);
252 TSocket::TSocket(
const char *sockpath) : TNamed(sockpath,
""),
253 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
265 fName.Form(
"unix:%s", sockpath);
272 ResetBit(TSocket::kBrokenConn);
274 fSocket = gSystem->OpenConnection(sockpath, -1, -1);
276 gROOT->GetListOfSockets()->Add(
this);
284 TSocket::TSocket(Int_t desc) : TNamed(
"",
""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
291 fService = (
char *)kSOCKD;
298 ResetBit(TSocket::kBrokenConn);
302 fAddress = gSystem->GetPeerName(fSocket);
303 gROOT->GetListOfSockets()->Add(
this);
313 TSocket::TSocket(Int_t desc,
const char *sockpath) : TNamed(sockpath,
""),
314 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
326 fName.Form(
"unix:%s", sockpath);
333 ResetBit(TSocket::kBrokenConn);
337 gROOT->GetListOfSockets()->Add(
this);
346 TSocket::TSocket(
const TSocket &s) : TNamed(s)
349 fService = s.fService;
350 fAddress = s.fAddress;
351 fLocalAddress = s.fLocalAddress;
352 fBytesSent = s.fBytesSent;
353 fBytesRecv = s.fBytesRecv;
354 fCompress = s.fCompress;
355 fSecContext = s.fSecContext;
356 fRemoteProtocol = s.fRemoteProtocol;
357 fServType = s.fServType;
358 fTcpWindowSize = s.fTcpWindowSize;
361 ResetBit(TSocket::kBrokenConn);
363 if (fSocket != kInvalid) {
364 gROOT->GetListOfSockets()->Add(
this);
370 void TSocket::MarkBrokenConnection()
372 SetBit(TSocket::kBrokenConn);
374 gSystem->CloseConnection(fSocket, kFALSE);
375 fSocket = kInvalidStillInList;
379 SafeDelete(fLastUsageMtx);
388 void TSocket::Close(Option_t *option)
390 Bool_t force = option ? (!strcmp(option,
"force") ? kTRUE : kFALSE) : kFALSE;
392 if (fSocket != kInvalid) {
394 gSystem->CloseConnection(fSocket, force);
396 gROOT->GetListOfSockets()->Remove(
this);
401 SafeDelete(fLastUsageMtx);
408 TInetAddress TSocket::GetLocalInetAddress()
411 if (fLocalAddress.GetPort() == -1)
412 fLocalAddress = gSystem->GetSockName(fSocket);
413 return fLocalAddress;
415 return TInetAddress();
422 Int_t TSocket::GetLocalPort()
425 if (fLocalAddress.GetPort() == -1)
426 GetLocalInetAddress();
427 return fLocalAddress.GetPort();
442 Int_t TSocket::Select(Int_t interest, Long_t timeout)
447 TFileHandler fh(fSocket, interest);
450 rc = gSystem->Select(&fh, timeout);
462 Int_t TSocket::Send(Int_t kind)
467 if ((nsent = Send(mess)) < 0)
480 Int_t TSocket::Send(Int_t status, Int_t kind)
486 if ((nsent = Send(mess)) < 0)
499 Int_t TSocket::Send(
const char *str, Int_t kind)
502 if (str) mess.WriteString(str);
505 if ((nsent = Send(mess)) < 0)
508 return nsent -
sizeof(Int_t);
521 Int_t TSocket::Send(
const TMessage &mess)
523 TSystem::ResetErrno();
525 if (!IsValid())
return -1;
527 if (mess.IsReading()) {
528 Error(
"Send",
"cannot send a message used for reading");
533 SendStreamerInfos(mess);
536 SendProcessIDs(mess);
540 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
541 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
543 if (mess.GetCompressionLevel() > 0)
544 const_cast<TMessage&>(mess).Compress();
546 char *mbuf = mess.Buffer();
547 Int_t mlen = mess.Length();
548 if (mess.CompBuffer()) {
549 mbuf = mess.CompBuffer();
550 mlen = mess.CompLength();
553 ResetBit(TSocket::kBrokenConn);
555 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
558 MarkBrokenConnection();
564 fgBytesSent += nsent;
567 if (mess.What() & kMESS_ACK) {
568 TSystem::ResetErrno();
569 ResetBit(TSocket::kBrokenConn);
572 if ((n = gSystem->RecvRaw(fSocket, buf,
sizeof(buf), 0)) < 0) {
575 MarkBrokenConnection();
580 if (strncmp(buf,
"ok", 2)) {
581 Error(
"Send",
"bad acknowledgement");
590 return nsent -
sizeof(UInt_t);
599 Int_t TSocket::SendObject(
const TObject *obj, Int_t kind)
603 mess.WriteObject(obj);
607 if ((nsent = Send(mess)) < 0)
619 Int_t TSocket::SendRaw(
const void *buffer, Int_t length, ESendRecvOptions opt)
621 TSystem::ResetErrno();
623 if (!IsValid())
return -1;
625 ResetBit(TSocket::kBrokenConn);
627 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (
int) opt)) <= 0) {
630 MarkBrokenConnection();
636 fgBytesSent += nsent;
648 void TSocket::SendStreamerInfos(
const TMessage &mess)
650 if (mess.fInfos && mess.fInfos->GetEntries()) {
651 TIter next(mess.fInfos);
654 while ((info = (TStreamerInfo*)next())) {
655 Int_t uid = info->GetNumber();
656 if (fBitsInfo.TestBitNumber(uid))
658 fBitsInfo.SetBitNumber(uid);
660 minilist =
new TList();
662 Info(
"SendStreamerInfos",
"sending TStreamerInfo: %s, version = %d",
663 info->GetName(),info->GetClassVersion());
667 TMessage messinfo(kMESS_STREAMERINFO);
668 messinfo.WriteObject(minilist);
671 messinfo.fInfos->Clear();
672 if (Send(messinfo) < 0)
673 Warning(
"SendStreamerInfos",
"problems sending TStreamerInfo's ...");
683 void TSocket::SendProcessIDs(
const TMessage &mess)
685 if (mess.TestBitNumber(0)) {
686 TObjArray *pids = TProcessID::GetPIDs();
687 Int_t npids = pids->GetEntries();
690 for (Int_t ipid = 0; ipid < npids; ipid++) {
691 pid = (TProcessID*)pids->At(ipid);
692 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
697 fUUIDs =
new TList();
699 if (fUUIDs->FindObject(pid->GetTitle()))
702 fUUIDs->Add(
new TObjString(pid->GetTitle()));
704 minilist =
new TList();
706 Info(
"SendProcessIDs",
"sending TProcessID: %s", pid->GetTitle());
710 TMessage messpid(kMESS_PROCESSID);
711 messpid.WriteObject(minilist);
713 if (Send(messpid) < 0)
714 Warning(
"SendProcessIDs",
"problems sending TProcessID's ...");
726 Int_t TSocket::Recv(
char *str, Int_t max)
730 ResetBit(TSocket::kBrokenConn);
731 if ((n = Recv(str, max, kind)) <= 0) {
733 SetBit(TSocket::kBrokenConn);
739 if (kind != kMESS_STRING) {
740 Error(
"Recv",
"got message of wrong kind (expected %d, got %d)",
754 Int_t TSocket::Recv(
char *str, Int_t max, Int_t &kind)
759 ResetBit(TSocket::kBrokenConn);
760 if ((n = Recv(mess)) <= 0) {
762 SetBit(TSocket::kBrokenConn);
770 if (mess->BufferSize() > (Int_t)
sizeof(Int_t))
771 mess->ReadString(str, max);
787 Int_t TSocket::Recv(Int_t &status, Int_t &kind)
792 ResetBit(TSocket::kBrokenConn);
793 if ((n = Recv(mess)) <= 0) {
795 SetBit(TSocket::kBrokenConn);
816 Int_t TSocket::Recv(TMessage *&mess)
818 TSystem::ResetErrno();
826 ResetBit(TSocket::kBrokenConn);
829 if ((n = gSystem->RecvRaw(fSocket, &len,
sizeof(UInt_t), 0)) <= 0) {
830 if (n == 0 || n == -5) {
832 MarkBrokenConnection();
839 ResetBit(TSocket::kBrokenConn);
840 char *buf =
new char[len+
sizeof(UInt_t)];
841 if ((n = gSystem->RecvRaw(fSocket, buf+
sizeof(UInt_t), len, 0)) <= 0) {
842 if (n == 0 || n == -5) {
844 MarkBrokenConnection();
851 fBytesRecv += n +
sizeof(UInt_t);
852 fgBytesRecv += n +
sizeof(UInt_t);
854 mess =
new TMessage(buf, len+
sizeof(UInt_t));
857 if (RecvStreamerInfos(mess))
861 if (RecvProcessIDs(mess))
864 if (mess->What() & kMESS_ACK) {
865 ResetBit(TSocket::kBrokenConn);
866 char ok[2] = {
'o',
'k' };
868 if ((n2 = gSystem->SendRaw(fSocket, ok,
sizeof(ok), 0)) < 0) {
871 MarkBrokenConnection();
877 mess->SetWhat(mess->What() & ~kMESS_ACK);
896 Int_t TSocket::RecvRaw(
void *buffer, Int_t length, ESendRecvOptions opt)
898 TSystem::ResetErrno();
900 if (!IsValid())
return -1;
901 if (length == 0)
return 0;
903 ResetBit(TSocket::kBrokenConn);
905 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (
int) opt)) <= 0) {
906 if (n == 0 || n == -5) {
908 MarkBrokenConnection();
926 Bool_t TSocket::RecvStreamerInfos(TMessage *mess)
928 if (mess->What() == kMESS_STREAMERINFO) {
929 TList *list = (TList*)mess->ReadObject(TList::Class());
932 TObjLink *lnk = list->FirstLink();
935 info = (TStreamerInfo*)lnk->GetObject();
936 TObject *element = info->GetElements()->UncheckedAt(0);
937 Bool_t isstl = element && strcmp(
"This",element->GetName())==0;
941 Info(
"RecvStreamerInfos",
"importing TStreamerInfo: %s, version = %d",
942 info->GetName(), info->GetClassVersion());
947 lnk = list->FirstLink();
949 info = (TStreamerInfo*)lnk->GetObject();
950 TObject *element = info->GetElements()->UncheckedAt(0);
951 Bool_t isstl = element && strcmp(
"This",element->GetName())==0;
955 Info(
"RecvStreamerInfos",
"importing TStreamerInfo: %s, version = %d",
956 info->GetName(), info->GetClassVersion());
973 Bool_t TSocket::RecvProcessIDs(TMessage *mess)
975 if (mess->What() == kMESS_PROCESSID) {
976 TList *list = (TList*)mess->ReadObject(TList::Class());
979 while ((pid = (TProcessID*)next())) {
981 TObjArray *pidslist = TProcessID::GetPIDs();
982 TIter nextpid(pidslist);
984 while ((p = (TProcessID*)nextpid())) {
985 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
993 Info(
"RecvProcessIDs",
"importing TProcessID: %s", pid->GetTitle());
994 pid->IncrementCount();
996 Int_t ind = pidslist->IndexOf(pid);
997 pid->SetUniqueID((UInt_t)ind);
1011 Int_t TSocket::SetOption(ESockOptions opt, Int_t val)
1013 if (!IsValid())
return -1;
1015 return gSystem->SetSockOpt(fSocket, opt, val);
1021 Int_t TSocket::GetOption(ESockOptions opt, Int_t &val)
1023 if (!IsValid())
return -1;
1025 return gSystem->GetSockOpt(fSocket, opt, &val);
1033 Int_t TSocket::GetErrorCode()
const
1044 void TSocket::SetCompressionAlgorithm(Int_t algorithm)
1046 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1047 if (fCompress < 0) {
1048 fCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
1050 int level = fCompress % 100;
1051 fCompress = 100 * algorithm + level;
1058 void TSocket::SetCompressionLevel(Int_t level)
1060 if (level < 0) level = 0;
1061 if (level > 99) level = 99;
1062 if (fCompress < 0) {
1066 int algorithm = fCompress / 100;
1067 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1068 fCompress = 100 * algorithm + level;
1096 void TSocket::SetCompressionSettings(Int_t settings)
1098 fCompress = settings;
1104 Bool_t TSocket::Authenticate(
const char *user)
1109 TString sproto = TUrl(fUrl).GetProtocol();
1110 if (sproto.Contains(
"sockd")) {
1112 }
else if (sproto.Contains(
"rootd")) {
1114 }
else if (sproto.Contains(
"proofd")) {
1115 fServType = kPROOFD;
1117 TString opt(TUrl(fUrl).GetOptions());
1119 if (!strncasecmp(opt,
"S", 1)) {
1120 if (Send(
"slave") < 0)
return rc;
1121 }
else if (!strncasecmp(opt,
"M", 1)) {
1122 if (Send(
"master") < 0)
return rc;
1124 Warning(
"Authenticate",
1125 "called by TSlave: unknown option '%c' %s",
1126 opt[0],
" - assuming Slave");
1127 if (Send(
"slave") < 0)
return rc;
1131 Info(
"Authenticate",
"Local protocol: %s",sproto.Data());
1134 Int_t kind = kROOTD_PROTOCOL;
1138 if (fRemoteProtocol == -1) {
1139 if (Send(Form(
" %d", fgClientProtocol), kROOTD_PROTOCOL) < 0) {
1142 if (Recv(fRemoteProtocol, kind) < 0) {
1149 if (kind == kROOTD_ERR) {
1150 fRemoteProtocol = 9;
1156 Bool_t runauth = kTRUE;
1157 if (fRemoteProtocol > 1000) {
1160 fRemoteProtocol %= 1000;
1165 TString host = GetInetAddress().GetHostName();
1169 TString alib =
"Xrd";
1170 if (fRemoteProtocol < 100) {
1177 gROOT->GetPluginManager()->FindHandler(
"TVirtualAuth", alib);
1178 if (!h || h->LoadPlugin() != 0) {
1179 Error(
"Authenticate",
1180 "could not load properly %s authentication plugin", alib.Data());
1185 TVirtualAuth *auth = (TVirtualAuth *)(h->ExecPlugin(0));
1187 Error(
"Authenticate",
"could not instantiate the interface class");
1191 Info(
"Authenticate",
"class for '%s' authentication loaded", alib.Data());
1193 Option_t *opts = (gROOT->IsProofServ()) ?
"P" :
"";
1194 if (!(auth->Authenticate(
this, host, user, opts))) {
1195 Error(
"Authenticate",
1196 "authentication attempt failed for %s@%s", user, host.Data());
1203 UserGroup_t *u = gSystem->GetUserInfo();
1205 if (Send(Form(
"%s %s", u->fUser.Data(), user), kROOTD_USER) < 0)
1206 Warning(
"Authenticate",
"problem sending kROOTD_USER (%s,%s)", u->fUser.Data(), user);
1209 if (Send(Form(
"-1 %s", user), kROOTD_USER) < 0)
1210 Warning(
"Authenticate",
"problem sending kROOTD_USER (-1,%s)", user);
1216 if (Recv(stat, kind) > 0) {
1218 if (kind == kROOTD_ERR) {
1220 TSocket::NetError(
"TSocket::Authenticate", stat);
1221 }
else if (kind == kROOTD_AUTH) {
1225 fSecContext =
new TSecContext(user, host, 0, -4, 0, 0);
1227 Info(
"Authenticate",
"no authentication required remotely");
1233 Info(
"Authenticate",
"expected message type %d, received %d",
1238 Info(
"Authenticate",
"error receiving message");
1288 TSocket *TSocket::CreateAuthSocket(
const char *url, Int_t size, Int_t tcpwindowsize,
1289 TSocket *opensock, Int_t *err)
1291 R__LOCKGUARD2(gSocketAuthMutex);
1297 Bool_t parallel = kFALSE;
1298 TString proto(TUrl(url).GetProtocol());
1299 TString protosave = proto;
1303 if (proto.EndsWith(
"up") || proto.EndsWith(
"ug")) {
1305 asfx.Remove(0,proto.Length()-2);
1306 proto.Resize(proto.Length()-2);
1307 }
else if (proto.EndsWith(
"s") || proto.EndsWith(
"k") ||
1308 proto.EndsWith(
"g") || proto.EndsWith(
"h")) {
1310 asfx.Remove(0,proto.Length()-1);
1311 proto.Resize(proto.Length()-1);
1315 if (((proto.EndsWith(
"p") || size > 1) &&
1316 !proto.BeginsWith(
"proof")) ||
1317 proto.BeginsWith(
"root") ) {
1319 if (proto.EndsWith(
"p"))
1320 proto.Resize(proto.Length()-1);
1324 if (!proto.BeginsWith(
"sock") && !proto.BeginsWith(
"proof") &&
1325 !proto.BeginsWith(
"root"))
1332 eurl.ReplaceAll(protosave,proto);
1340 if (opensock && opensock->IsValid())
1343 sock =
new TSocket(eurl, TUrl(url).GetPort(), tcpwindowsize);
1346 if (sock && sock->IsValid()) {
1347 if (!sock->Authenticate(TUrl(url).GetUser())) {
1350 *err = (Int_t)kErrAuthNotOK;
1351 if (sock->TestBit(TSocket::kBrokenConn)) *err = (Int_t)kErrConnectionRefused;
1364 if (eurl.Contains(
"?"))
1365 eurl.Resize(eurl.Index(
"?"));
1369 if (opensock && opensock->IsValid())
1370 sock =
new TPSocket(eurl, TUrl(url).GetPort(), size, opensock);
1372 sock =
new TPSocket(eurl, TUrl(url).GetPort(), size, tcpwindowsize);
1375 if (sock && !sock->IsAuthenticated()) {
1378 *err = (Int_t)kErrAuthNotOK;
1379 if (sock->TestBit(TSocket::kBrokenConn)) *err = (Int_t)kErrConnectionRefused;
1381 if (sock->IsValid())
1430 TSocket *TSocket::CreateAuthSocket(
const char *user,
const char *url,
1431 Int_t port, Int_t size, Int_t tcpwindowsize,
1432 TSocket *opensock, Int_t *err)
1434 R__LOCKGUARD2(gSocketAuthMutex);
1440 if (TString(TUrl(url).GetProtocol()).Length() > 0) {
1441 eurl += TString(TUrl(url).GetProtocol());
1442 eurl += TString(
"://");
1445 if (!user || strlen(user) > 0) {
1446 eurl += TString(user);
1447 eurl += TString(
"@");
1450 eurl += TString(TUrl(url).GetHost());
1452 eurl += TString(
":");
1453 eurl += (port > 0 ? port : 0);
1455 if (TString(TUrl(url).GetOptions()).Length() > 0) {
1456 eurl += TString(
"/?");
1457 eurl += TString(TUrl(url).GetOptions());
1461 return TSocket::CreateAuthSocket(eurl,size,tcpwindowsize,opensock,err);
1467 Int_t TSocket::GetClientProtocol()
1469 return fgClientProtocol;
1475 void TSocket::NetError(
const char *where, Int_t err)
1478 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1481 ::Error(where,
"%s", gRootdErrStr[err]);
1487 ULong64_t TSocket::GetSocketBytesSent()
1495 ULong64_t TSocket::GetSocketBytesRecv()