39 ULong64_t TUDPSocket::fgBytesSent = 0;
40 ULong64_t TUDPSocket::fgBytesRecv = 0;
55 TUDPSocket::TUDPSocket(TInetAddress addr,
const char *service)
56 : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
65 if (fService.Contains(
"root"))
67 if (fService.Contains(
"proof"))
70 fAddress.fPort = gSystem->GetServiceByName(service);
75 ResetBit(TUDPSocket::kBrokenConn);
77 if (fAddress.GetPort() != -1) {
78 fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
82 R__LOCKGUARD(gROOTMutex);
83 gROOT->GetListOfSockets()->Add(
this);
101 TUDPSocket::TUDPSocket(TInetAddress addr, Int_t port)
102 : TNamed(addr.GetHostName(),
""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
107 fService = gSystem->GetServiceByPort(port);
111 if (fService.Contains(
"root"))
113 if (fService.Contains(
"proof"))
116 fAddress.fPort = port;
122 ResetBit(TUDPSocket::kBrokenConn);
124 fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
129 R__LOCKGUARD(gROOTMutex);
130 gROOT->GetListOfSockets()->Add(
this);
144 TUDPSocket::TUDPSocket(
const char *host,
const char *service)
145 : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
154 if (fService.Contains(
"root"))
156 if (fService.Contains(
"proof"))
158 fAddress = gSystem->GetHostByName(host);
159 fAddress.fPort = gSystem->GetServiceByName(service);
160 SetName(fAddress.GetHostName());
165 ResetBit(TUDPSocket::kBrokenConn);
167 if (fAddress.GetPort() != -1) {
168 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1,
"upd");
170 R__LOCKGUARD(gROOTMutex);
171 gROOT->GetListOfSockets()->Add(
this);
189 TUDPSocket::TUDPSocket(
const char *url, Int_t port)
190 : TNamed(TUrl(url).GetHost(),
""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
196 TString host(TUrl(fUrl).GetHost());
198 fService = gSystem->GetServiceByPort(port);
202 if (fUrl.Contains(
"root"))
204 if (fUrl.Contains(
"proof"))
206 fAddress = gSystem->GetHostByName(host);
207 fAddress.fPort = port;
208 SetName(fAddress.GetHostName());
214 ResetBit(TUDPSocket::kBrokenConn);
216 fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1,
"udp");
220 R__LOCKGUARD(gROOTMutex);
221 gROOT->GetListOfSockets()->Add(
this);
232 TUDPSocket::TUDPSocket(
const char *sockpath) : TNamed(sockpath,
""),
233 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
245 fName.Form(
"unix:%s", sockpath);
251 ResetBit(TUDPSocket::kBrokenConn);
253 fSocket = gSystem->OpenConnection(sockpath, -1, -1,
"udp");
255 R__LOCKGUARD(gROOTMutex);
256 gROOT->GetListOfSockets()->Add(
this);
264 TUDPSocket::TUDPSocket(Int_t desc) : TNamed(
"",
""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
271 fService = (
char *)kSOCKD;
277 ResetBit(TUDPSocket::kBrokenConn);
281 fAddress = gSystem->GetPeerName(fSocket);
282 R__LOCKGUARD(gROOTMutex);
283 gROOT->GetListOfSockets()->Add(
this);
293 TUDPSocket::TUDPSocket(Int_t desc,
const char *sockpath) : TNamed(sockpath,
""),
294 fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
306 fName.Form(
"unix:%s", sockpath);
312 ResetBit(TUDPSocket::kBrokenConn);
316 R__LOCKGUARD(gROOTMutex);
317 gROOT->GetListOfSockets()->Add(
this);
326 TUDPSocket::TUDPSocket(
const TUDPSocket &s) : TNamed(s)
329 fService = s.fService;
330 fAddress = s.fAddress;
331 fLocalAddress = s.fLocalAddress;
332 fBytesSent = s.fBytesSent;
333 fBytesRecv = s.fBytesRecv;
334 fCompress = s.fCompress;
335 fSecContext = s.fSecContext;
336 fRemoteProtocol = s.fRemoteProtocol;
337 fServType = s.fServType;
340 ResetBit(TUDPSocket::kBrokenConn);
343 R__LOCKGUARD(gROOTMutex);
344 gROOT->GetListOfSockets()->Add(
this);
354 void TUDPSocket::Close(Option_t *option)
356 Bool_t force = option ? (!strcmp(option,
"force") ? kTRUE : kFALSE) : kFALSE;
359 gSystem->CloseConnection(fSocket, force);
360 R__LOCKGUARD(gROOTMutex);
361 gROOT->GetListOfSockets()->Remove(
this);
366 SafeDelete(fLastUsageMtx);
373 TInetAddress TUDPSocket::GetLocalInetAddress()
376 if (fLocalAddress.GetPort() == -1)
377 fLocalAddress = gSystem->GetSockName(fSocket);
378 return fLocalAddress;
380 return TInetAddress();
387 Int_t TUDPSocket::GetLocalPort()
390 if (fLocalAddress.GetPort() == -1)
391 GetLocalInetAddress();
392 return fLocalAddress.GetPort();
407 Int_t TUDPSocket::Select(Int_t interest, Long_t timeout)
412 TFileHandler fh(fSocket, interest);
415 rc = gSystem->Select(&fh, timeout);
427 Int_t TUDPSocket::Send(Int_t kind)
432 if ((nsent = Send(mess)) < 0)
445 Int_t TUDPSocket::Send(Int_t status, Int_t kind)
451 if ((nsent = Send(mess)) < 0)
464 Int_t TUDPSocket::Send(
const char *str, Int_t kind)
467 if (str) mess.WriteString(str);
470 if ((nsent = Send(mess)) < 0)
473 return nsent -
sizeof(Int_t);
486 Int_t TUDPSocket::Send(
const TMessage &mess)
488 TSystem::ResetErrno();
490 if (fSocket == -1)
return -1;
492 if (mess.IsReading()) {
493 Error(
"Send",
"cannot send a message used for reading");
498 SendStreamerInfos(mess);
501 SendProcessIDs(mess);
505 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
506 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
508 if (mess.GetCompressionLevel() > 0)
509 const_cast<TMessage&>(mess).Compress();
511 char *mbuf = mess.Buffer();
512 Int_t mlen = mess.Length();
513 if (mess.CompBuffer()) {
514 mbuf = mess.CompBuffer();
515 mlen = mess.CompLength();
518 ResetBit(TUDPSocket::kBrokenConn);
520 if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
523 SetBit(TUDPSocket::kBrokenConn);
530 fgBytesSent += nsent;
533 if (mess.What() & kMESS_ACK) {
534 TSystem::ResetErrno();
535 ResetBit(TUDPSocket::kBrokenConn);
538 if ((n = gSystem->RecvRaw(fSocket, buf,
sizeof(buf), 0)) < 0) {
541 SetBit(TUDPSocket::kBrokenConn);
547 if (strncmp(buf,
"ok", 2)) {
548 Error(
"Send",
"bad acknowledgement");
557 return nsent -
sizeof(UInt_t);
566 Int_t TUDPSocket::SendObject(
const TObject *obj, Int_t kind)
570 mess.WriteObject(obj);
574 if ((nsent = Send(mess)) < 0)
586 Int_t TUDPSocket::SendRaw(
const void *buffer, Int_t length, ESendRecvOptions opt)
588 TSystem::ResetErrno();
590 if (fSocket == -1)
return -1;
592 ResetBit(TUDPSocket::kBrokenConn);
594 if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (
int) opt)) <= 0) {
597 SetBit(TUDPSocket::kBrokenConn);
604 fgBytesSent += nsent;
616 void TUDPSocket::SendStreamerInfos(
const TMessage &mess)
618 if (mess.fInfos && mess.fInfos->GetEntries()) {
619 TIter next(mess.fInfos);
622 while ((info = (TStreamerInfo*)next())) {
623 Int_t uid = info->GetNumber();
624 if (fBitsInfo.TestBitNumber(uid))
626 fBitsInfo.SetBitNumber(uid);
628 minilist =
new TList();
630 Info(
"SendStreamerInfos",
"sending TStreamerInfo: %s, version = %d",
631 info->GetName(),info->GetClassVersion());
635 TMessage messinfo(kMESS_STREAMERINFO);
636 messinfo.WriteObject(minilist);
639 messinfo.fInfos->Clear();
640 if (Send(messinfo) < 0)
641 Warning(
"SendStreamerInfos",
"problems sending TStreamerInfo's ...");
651 void TUDPSocket::SendProcessIDs(
const TMessage &mess)
653 if (mess.TestBitNumber(0)) {
654 TObjArray *pids = TProcessID::GetPIDs();
655 Int_t npids = pids->GetEntries();
658 for (Int_t ipid = 0; ipid < npids; ipid++) {
659 pid = (TProcessID*)pids->At(ipid);
660 if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
665 fUUIDs =
new TList();
667 if (fUUIDs->FindObject(pid->GetTitle()))
670 fUUIDs->Add(
new TObjString(pid->GetTitle()));
672 minilist =
new TList();
674 Info(
"SendProcessIDs",
"sending TProcessID: %s", pid->GetTitle());
678 TMessage messpid(kMESS_PROCESSID);
679 messpid.WriteObject(minilist);
681 if (Send(messpid) < 0)
682 Warning(
"SendProcessIDs",
"problems sending TProcessID's ...");
694 Int_t TUDPSocket::Recv(
char *str, Int_t max)
698 ResetBit(TUDPSocket::kBrokenConn);
699 if ((n = Recv(str, max, kind)) <= 0) {
701 SetBit(TUDPSocket::kBrokenConn);
707 if (kind != kMESS_STRING) {
708 Error(
"Recv",
"got message of wrong kind (expected %d, got %d)",
722 Int_t TUDPSocket::Recv(
char *str, Int_t max, Int_t &kind)
727 ResetBit(TUDPSocket::kBrokenConn);
728 if ((n = Recv(mess)) <= 0) {
730 SetBit(TUDPSocket::kBrokenConn);
738 if (mess->BufferSize() > (Int_t)
sizeof(Int_t))
739 mess->ReadString(str, max);
755 Int_t TUDPSocket::Recv(Int_t &status, Int_t &kind)
760 ResetBit(TUDPSocket::kBrokenConn);
761 if ((n = Recv(mess)) <= 0) {
763 SetBit(TUDPSocket::kBrokenConn);
784 Int_t TUDPSocket::Recv(TMessage *&mess)
786 TSystem::ResetErrno();
794 ResetBit(TUDPSocket::kBrokenConn);
797 if ((n = gSystem->RecvRaw(fSocket, &len,
sizeof(UInt_t), 0)) <= 0) {
798 if (n == 0 || n == -5) {
800 SetBit(TUDPSocket::kBrokenConn);
808 ResetBit(TUDPSocket::kBrokenConn);
809 char *buf =
new char[len+
sizeof(UInt_t)];
810 if ((n = gSystem->RecvRaw(fSocket, buf+
sizeof(UInt_t), len, 0)) <= 0) {
811 if (n == 0 || n == -5) {
813 SetBit(TUDPSocket::kBrokenConn);
821 fBytesRecv += n +
sizeof(UInt_t);
822 fgBytesRecv += n +
sizeof(UInt_t);
824 mess =
new TMessage(buf, len+
sizeof(UInt_t));
827 if (RecvStreamerInfos(mess))
831 if (RecvProcessIDs(mess))
834 if (mess->What() & kMESS_ACK) {
835 ResetBit(TUDPSocket::kBrokenConn);
836 char ok[2] = {
'o',
'k' };
838 if ((n2 = gSystem->SendRaw(fSocket, ok,
sizeof(ok), 0)) < 0) {
841 SetBit(TUDPSocket::kBrokenConn);
848 mess->SetWhat(mess->What() & ~kMESS_ACK);
867 Int_t TUDPSocket::RecvRaw(
void *buffer, Int_t length, ESendRecvOptions opt)
869 TSystem::ResetErrno();
871 if (fSocket == -1)
return -1;
872 if (length == 0)
return 0;
874 ResetBit(TUDPSocket::kBrokenConn);
876 if ((n = gSystem->RecvRaw(fSocket, buffer, length, (
int) opt)) <= 0) {
877 if (n == 0 || n == -5) {
879 SetBit(TUDPSocket::kBrokenConn);
898 Bool_t TUDPSocket::RecvStreamerInfos(TMessage *mess)
900 if (mess->What() == kMESS_STREAMERINFO) {
901 TList *list = (TList*)mess->ReadObject(TList::Class());
904 TObjLink *lnk = list->FirstLink();
907 info = (TStreamerInfo*)lnk->GetObject();
908 TObject *element = info->GetElements()->UncheckedAt(0);
909 Bool_t isstl = element && strcmp(
"This",element->GetName())==0;
913 Info(
"RecvStreamerInfos",
"importing TStreamerInfo: %s, version = %d",
914 info->GetName(), info->GetClassVersion());
919 lnk = list->FirstLink();
921 info = (TStreamerInfo*)lnk->GetObject();
922 TObject *element = info->GetElements()->UncheckedAt(0);
923 Bool_t isstl = element && strcmp(
"This",element->GetName())==0;
927 Info(
"RecvStreamerInfos",
"importing TStreamerInfo: %s, version = %d",
928 info->GetName(), info->GetClassVersion());
945 Bool_t TUDPSocket::RecvProcessIDs(TMessage *mess)
947 if (mess->What() == kMESS_PROCESSID) {
948 TList *list = (TList*)mess->ReadObject(TList::Class());
951 while ((pid = (TProcessID*)next())) {
953 TObjArray *pidslist = TProcessID::GetPIDs();
954 TIter nextpid(pidslist);
956 while ((p = (TProcessID*)nextpid())) {
957 if (!strcmp(p->GetTitle(), pid->GetTitle())) {
965 Info(
"RecvProcessIDs",
"importing TProcessID: %s", pid->GetTitle());
966 pid->IncrementCount();
968 Int_t ind = pidslist->IndexOf(pid);
969 pid->SetUniqueID((UInt_t)ind);
983 Int_t TUDPSocket::SetOption(ESockOptions opt, Int_t val)
985 if (fSocket == -1)
return -1;
987 return gSystem->SetSockOpt(fSocket, opt, val);
993 Int_t TUDPSocket::GetOption(ESockOptions opt, Int_t &val)
995 if (fSocket == -1)
return -1;
997 return gSystem->GetSockOpt(fSocket, opt, &val);
1005 Int_t TUDPSocket::GetErrorCode()
const
1016 void TUDPSocket::SetCompressionAlgorithm(Int_t algorithm)
1018 if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1019 if (fCompress < 0) {
1021 fCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
1023 int level = fCompress % 100;
1024 fCompress = 100 * algorithm + level;
1031 void TUDPSocket::SetCompressionLevel(Int_t level)
1033 if (level < 0) level = 0;
1034 if (level > 99) level = 99;
1035 if (fCompress < 0) {
1039 int algorithm = fCompress / 100;
1040 if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1041 fCompress = 100 * algorithm + level;
1069 void TUDPSocket::SetCompressionSettings(Int_t settings)
1071 fCompress = settings;
1077 void TUDPSocket::NetError(
const char *where, Int_t err)
1080 err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1083 ::Error(where,
"%s", gRootdErrStr[err]);
1089 ULong64_t TUDPSocket::GetSocketBytesSent()
1097 ULong64_t TUDPSocket::GetSocketBytesRecv()