47 TPSocket::TPSocket(TInetAddress addr,
const char *service, Int_t size,
48 Int_t tcpwindowsize) : TSocket(addr, service)
64 TPSocket::TPSocket(TInetAddress addr, Int_t port, Int_t size,
65 Int_t tcpwindowsize) : TSocket(addr, port)
81 TPSocket::TPSocket(
const char *host,
const char *service, Int_t size,
82 Int_t tcpwindowsize) : TSocket(host, service)
98 TPSocket::TPSocket(
const char *host, Int_t port, Int_t size,
100 : TSocket(host, port, (Int_t)(size > 1 ? -1 : tcpwindowsize))
115 Bool_t valid = TSocket::IsValid();
118 Bool_t authreq = kFALSE;
119 char *pauth = (
char *)strstr(host,
"?A");
125 Bool_t rootdSrv = (strstr(host,
"rootd")) ? kTRUE : kFALSE;
130 if (!Authenticate(TUrl(host).GetUser())) {
131 if (rootdSrv && (fRemoteProtocol > 0 && fRemoteProtocol < 10)) {
135 Int_t tcpw = (size > 1 ? -1 : tcpwindowsize);
136 TSocket *ns =
new TSocket(host, port, tcpw);
138 R__LOCKGUARD(gROOTMutex);
139 gROOT->GetListOfSockets()->Remove(ns);
140 fSocket = ns->GetDescriptor();
144 if ((valid = IsValid())) {
145 if (!Authenticate(TUrl(host).GetUser())) {
162 if (!rootdSrv || fRemoteProtocol > 9) {
180 TPSocket::TPSocket(
const char *host, Int_t port, Int_t size, TSocket *sock)
198 fSocket = sock->GetDescriptor();
199 fService = sock->GetService();
200 fAddress = sock->GetInetAddress();
201 fLocalAddress = sock->GetLocalInetAddress();
202 fBytesSent = sock->GetBytesSent();
203 fBytesRecv = sock->GetBytesRecv();
204 fCompress = sock->GetCompressionSettings();
205 fSecContext = sock->GetSecContext();
206 fRemoteProtocol = sock->GetRemoteProtocol();
207 fServType = (TSocket::EServiceType)sock->GetServType();
208 fTcpWindowSize = sock->GetTcpWindowSize();
211 Bool_t valid = sock->IsValid();
214 Bool_t authreq = kFALSE;
215 char *pauth = (
char *)strstr(host,
"?A");
221 Bool_t rootdSrv = (strstr(host,
"rootd")) ? kTRUE : kFALSE;
226 if (!Authenticate(TUrl(host).GetUser())) {
227 if (rootdSrv && (fRemoteProtocol > 0 && fRemoteProtocol < 10)) {
231 Int_t tcpw = (size > 1 ? -1 : fTcpWindowSize);
232 TSocket *ns =
new TSocket(host, port, tcpw);
234 R__LOCKGUARD(gROOTMutex);
235 gROOT->GetListOfSockets()->Remove(ns);
236 fSocket = ns->GetDescriptor();
238 Init(fTcpWindowSize);
240 if ((valid = IsValid())) {
241 if (!Authenticate(TUrl(host).GetUser())) {
258 if (!rootdSrv || fRemoteProtocol > 9) {
261 Init(fTcpWindowSize, sock);
267 R__LOCKGUARD(gROOTMutex);
268 gROOT->GetListOfSockets()->Add(
this);
275 TPSocket::TPSocket(TSocket *pSockets[], Int_t size)
283 fSocket = fSockets[0]->GetDescriptor();
286 SetOption(kNoDelay, 1);
288 SetOption(kNoBlock, 1);
290 fWriteMonitor =
new TMonitor;
291 fReadMonitor =
new TMonitor;
292 fWriteBytesLeft =
new Int_t[fSize];
293 fReadBytesLeft =
new Int_t[fSize];
294 fWritePtr =
new char*[fSize];
295 fReadPtr =
new char*[fSize];
297 for (
int i = 0; i < fSize; i++) {
298 fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
299 fReadMonitor->Add(fSockets[i], TMonitor::kRead);
301 fWriteMonitor->DeActivateAll();
302 fReadMonitor->DeActivateAll();
304 SetName(fSockets[0]->GetName());
305 SetTitle(fSockets[0]->GetTitle());
306 fAddress = fSockets[0]->GetInetAddress();
309 R__LOCKGUARD(gROOTMutex);
310 gROOT->GetListOfSockets()->Add(
this);
317 TPSocket::~TPSocket()
321 delete fWriteMonitor;
323 delete [] fWriteBytesLeft;
324 delete [] fReadBytesLeft;
335 void TPSocket::Close(Option_t *option)
341 TSocket::Close(option);
346 TSocket::Close(option);
348 for (
int i = 0; i < fSize; i++) {
349 fSockets[i]->Close(option);
357 R__LOCKGUARD(gROOTMutex);
358 gROOT->GetListOfSockets()->Remove(
this);
365 void TPSocket::Init(Int_t tcpwindowsize, TSocket *sock)
375 if ((sock && !sock->IsValid()) || !TSocket::IsValid())
386 sock->SetOption(kNoDelay, 1);
388 TSocket::SetOption(kNoDelay, 1);
393 if (sock->Send((Int_t)0, (Int_t)0) < 0)
394 Warning(
"Init",
"%p: problems sending (0,0)", sock);
396 if (TSocket::Send((Int_t)0, (Int_t)0) < 0)
397 Warning(
"Init",
"problems sending (0,0)");
401 fSockets =
new TSocket*[1];
402 fSockets[0]= (TSocket *)
this;
408 TServerSocket ss(0, kFALSE, fSize, tcpwindowsize);
413 if (sock->Send(ss.GetLocalPort(), fSize) < 0)
414 Warning(
"Init",
"%p: problems sending size", sock);
416 if (TSocket::Send(ss.GetLocalPort(), fSize) < 0)
417 Warning(
"Init",
"problems sending size");
420 fSockets =
new TSocket*[fSize];
423 for (i = 0; i < fSize; i++) {
424 fSockets[i] = ss.Accept();
425 R__LOCKGUARD(gROOTMutex);
426 gROOT->GetListOfSockets()->Remove(fSockets[i]);
430 SetOption(kNoDelay, 1);
431 SetOption(kNoBlock, 1);
437 gSystem->CloseConnection(fSocket, kFALSE);
441 fWriteMonitor =
new TMonitor;
442 fReadMonitor =
new TMonitor;
443 fWriteBytesLeft =
new Int_t[fSize];
444 fReadBytesLeft =
new Int_t[fSize];
445 fWritePtr =
new char*[fSize];
446 fReadPtr =
new char*[fSize];
448 for (i = 0; i < fSize; i++) {
449 fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
450 fReadMonitor->Add(fSockets[i], TMonitor::kRead);
452 fWriteMonitor->DeActivateAll();
453 fReadMonitor->DeActivateAll();
460 TInetAddress TPSocket::GetLocalInetAddress()
463 return TSocket::GetLocalInetAddress();
466 if (fLocalAddress.GetPort() == -1)
467 fLocalAddress = gSystem->GetSockName(fSockets[0]->GetDescriptor());
468 return fLocalAddress;
470 return TInetAddress();
476 Int_t TPSocket::GetDescriptor()
const
479 return TSocket::GetDescriptor();
481 return fSockets ? fSockets[0]->GetDescriptor() : -1;
492 Int_t TPSocket::Send(
const TMessage &mess)
494 if (!fSockets || fSize <= 1)
495 return TSocket::Send(mess);
501 if (mess.IsReading()) {
502 Error(
"Send",
"cannot send a message used for reading");
507 SendStreamerInfos(mess);
510 SendProcessIDs(mess);
514 if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
515 const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
517 if (mess.GetCompressionLevel() > 0)
518 const_cast<TMessage&>(mess).Compress();
520 char *mbuf = mess.Buffer();
521 Int_t mlen = mess.Length();
522 if (mess.CompBuffer()) {
523 mbuf = mess.CompBuffer();
524 mlen = mess.CompLength();
527 Int_t nsent, ulen = (Int_t)
sizeof(UInt_t);
529 if ((nsent = SendRaw(mbuf, ulen, kDefault)) <= 0)
533 if ((nsent = SendRaw(mbuf+ulen, mlen-ulen, kDefault)) <= 0)
537 if (mess.What() & kMESS_ACK) {
539 if (RecvRaw(buf,
sizeof(buf), kDefault) < 0)
541 if (strncmp(buf,
"ok", 2)) {
542 Error(
"Send",
"bad acknowledgement");
554 Int_t TPSocket::SendRaw(
const void *buffer, Int_t length, ESendRecvOptions opt)
557 return TSocket::SendRaw(buffer,length,opt);
559 if (!fSockets)
return -1;
562 Int_t i, nsocks = fSize, len = length;
566 ESendRecvOptions sendopt = kDontBlock;
570 if (opt != kDefault) {
576 fSockets[0]->SetOption(kNoBlock, 0);
578 fSockets[0]->SetOption(kNoBlock, 1);
582 for (i = 0; i < nsocks; i++) {
583 fWriteBytesLeft[i] = len/nsocks;
584 fWritePtr[i] = (
char *)buffer + (i*fWriteBytesLeft[i]);
585 fWriteMonitor->Activate(fSockets[i]);
587 fWriteBytesLeft[nsocks-1] += len%nsocks;
591 TSocket *s = fWriteMonitor->Select();
592 for (
int is = 0; is < nsocks; is++) {
593 if (s == fSockets[is]) {
594 if (fWriteBytesLeft[is] > 0) {
597 ResetBit(TSocket::kBrokenConn);
598 if ((nsent = fSockets[is]->SendRaw(fWritePtr[is],
605 fWriteMonitor->DeActivateAll();
608 SetBit(TSocket::kBrokenConn);
613 if (opt == kDontBlock) {
614 fWriteMonitor->DeActivateAll();
617 fWriteBytesLeft[is] -= nsent;
618 fWritePtr[is] += nsent;
624 fWriteMonitor->DeActivateAll();
635 Int_t TPSocket::Recv(TMessage *&mess)
638 return TSocket::Recv(mess);
648 if ((n = RecvRaw(&len,
sizeof(UInt_t), kDefault)) <= 0) {
654 char *buf =
new char[len+
sizeof(UInt_t)];
655 if ((n = RecvRaw(buf+
sizeof(UInt_t), len, kDefault)) <= 0) {
661 mess =
new TMessage(buf, len+
sizeof(UInt_t));
664 if (RecvStreamerInfos(mess))
668 if (RecvProcessIDs(mess))
671 if (mess->What() & kMESS_ACK) {
672 char ok[2] = {
'o',
'k' };
673 if (SendRaw(ok,
sizeof(ok), kDefault) < 0) {
678 mess->SetWhat(mess->What() & ~kMESS_ACK);
688 Int_t TPSocket::RecvRaw(
void *buffer, Int_t length, ESendRecvOptions opt)
691 return TSocket::RecvRaw(buffer,length,opt);
693 if (!fSockets)
return -1;
696 Int_t i, nsocks = fSize, len = length;
700 ESendRecvOptions recvopt = kDontBlock;
704 if (opt != kDefault) {
710 fSockets[0]->SetOption(kNoBlock, 0);
712 fSockets[0]->SetOption(kNoBlock, 1);
716 for (i = 0; i < nsocks; i++) {
717 fReadBytesLeft[i] = len/nsocks;
718 fReadPtr[i] = (
char *)buffer + (i*fReadBytesLeft[i]);
719 fReadMonitor->Activate(fSockets[i]);
721 fReadBytesLeft[nsocks-1] += len%nsocks;
727 TSocket *s = fReadMonitor->Select();
728 for (
int is = 0; is < nsocks; is++) {
729 if (s == fSockets[is]) {
730 if (fReadBytesLeft[is] > 0) {
732 ResetBit(TSocket::kBrokenConn);
733 if ((nrecv = fSockets[is]->RecvRaw(fReadPtr[is],
736 fReadMonitor->DeActivateAll();
739 SetBit(TSocket::kBrokenConn);
744 if (opt == kDontBlock) {
745 fReadMonitor->DeActivateAll();
748 fReadBytesLeft[is] -= nrecv;
749 fReadPtr[is] += nrecv;
755 fReadMonitor->DeActivateAll();
763 Int_t TPSocket::SetOption(ESockOptions opt, Int_t val)
766 return TSocket::SetOption(opt,val);
769 for (
int i = 0; i < fSize; i++)
770 ret = fSockets[i]->SetOption(opt, val);
777 Int_t TPSocket::GetOption(ESockOptions opt, Int_t &val)
780 return TSocket::GetOption(opt,val);
783 for (
int i = 0; i < fSize; i++)
784 ret = fSockets[i]->GetOption(opt, val);
792 Int_t TPSocket::GetErrorCode()
const
795 return TSocket::GetErrorCode();
797 return fSockets[0] ? fSockets[0]->GetErrorCode() : 0;