Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TPSocket.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Fons Rademakers 22/1/2001
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2001, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TPSocket //
15 // //
16 // This class implements parallel client sockets. A parallel socket is //
17 // an endpoint for communication between two machines. It is parallel //
18 // because several TSockets are open at the same time to the same //
19 // destination. This especially speeds up communication over Big Fat //
20 // Pipes (i.e. high bandwidth, high latency WAN connections). //
21 // //
22 //////////////////////////////////////////////////////////////////////////
23 
24 #include "TPSocket.h"
25 #include "TUrl.h"
26 #include "TServerSocket.h"
27 #include "TMonitor.h"
28 #include "TSystem.h"
29 #include "TMessage.h"
30 #include "Bytes.h"
31 #include "TROOT.h"
32 #include "TError.h"
33 #include "TVirtualMutex.h"
34 
35 ClassImp(TPSocket);
36 
37 ////////////////////////////////////////////////////////////////////////////////
38 /// Create a parallel socket. Connect to the named service at address addr.
39 /// Use tcpwindowsize to specify the size of the receive buffer, it has
40 /// to be specified here to make sure the window scale option is set (for
41 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
42 /// Returns when connection has been accepted by remote side. Use IsValid()
43 /// to check the validity of the socket. Every socket is added to the TROOT
44 /// sockets list which will make sure that any open sockets are properly
45 /// closed on program termination.
46 
47 TPSocket::TPSocket(TInetAddress addr, const char *service, Int_t size,
48  Int_t tcpwindowsize) : TSocket(addr, service)
49 {
50  fSize = size;
51  Init(tcpwindowsize);
52 }
53 
54 ////////////////////////////////////////////////////////////////////////////////
55 /// Create a parallel socket. Connect to the specified port # at address addr.
56 /// Use tcpwindowsize to specify the size of the receive buffer, it has
57 /// to be specified here to make sure the window scale option is set (for
58 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
59 /// Returns when connection has been accepted by remote side. Use IsValid()
60 /// to check the validity of the socket. Every socket is added to the TROOT
61 /// sockets list which will make sure that any open sockets are properly
62 /// closed on program termination.
63 
64 TPSocket::TPSocket(TInetAddress addr, Int_t port, Int_t size,
65  Int_t tcpwindowsize) : TSocket(addr, port)
66 {
67  fSize = size;
68  Init(tcpwindowsize);
69 }
70 
71 ////////////////////////////////////////////////////////////////////////////////
72 /// Create a parallel socket. Connect to named service on the remote host.
73 /// Use tcpwindowsize to specify the size of the receive buffer, it has
74 /// to be specified here to make sure the window scale option is set (for
75 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
76 /// Returns when connection has been accepted by remote side. Use IsValid()
77 /// to check the validity of the socket. Every socket is added to the TROOT
78 /// sockets list which will make sure that any open sockets are properly
79 /// closed on program termination.
80 
81 TPSocket::TPSocket(const char *host, const char *service, Int_t size,
82  Int_t tcpwindowsize) : TSocket(host, service)
83 {
84  fSize = size;
85  Init(tcpwindowsize);
86 }
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 /// Create a parallel socket. Connect to specified port # on the remote host.
90 /// Use tcpwindowsize to specify the size of the receive buffer, it has
91 /// to be specified here to make sure the window scale option is set (for
92 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
93 /// Returns when connection has been accepted by remote side. Use IsValid()
94 /// to check the validity of the socket. Every socket is added to the TROOT
95 /// sockets list which will make sure that any open sockets are properly
96 /// closed on program termination.
97 
98 TPSocket::TPSocket(const char *host, Int_t port, Int_t size,
99  Int_t tcpwindowsize)
100  : TSocket(host, port, (Int_t)(size > 1 ? -1 : tcpwindowsize))
101 {
102  // To avoid uninitialization problems when Init is not called ...
103  fSockets = 0;
104  fWriteMonitor = 0;
105  fReadMonitor = 0;
106  fWriteBytesLeft = 0;
107  fReadBytesLeft = 0;
108  fWritePtr = 0;
109  fReadPtr = 0;
110 
111  // set to the real value only at end (except for old servers)
112  fSize = 1;
113 
114  // to control the flow
115  Bool_t valid = TSocket::IsValid();
116 
117  // check if we are called from CreateAuthSocket()
118  Bool_t authreq = kFALSE;
119  char *pauth = (char *)strstr(host, "?A");
120  if (pauth) {
121  authreq = kTRUE;
122  }
123 
124  // perhaps we can use fServType here ... to be checked
125  Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
126 
127  // try authentication , if required
128  if (authreq) {
129  if (valid) {
130  if (!Authenticate(TUrl(host).GetUser())) {
131  if (rootdSrv && (fRemoteProtocol > 0 && fRemoteProtocol < 10)) {
132  // We failed because we are talking to an old
133  // server: we need to re-open the connection
134  // and communicate the size first
135  Int_t tcpw = (size > 1 ? -1 : tcpwindowsize);
136  TSocket *ns = new TSocket(host, port, tcpw);
137  if (ns->IsValid()) {
138  R__LOCKGUARD(gROOTMutex);
139  gROOT->GetListOfSockets()->Remove(ns);
140  fSocket = ns->GetDescriptor();
141  fSize = size;
142  Init(tcpwindowsize);
143  }
144  if ((valid = IsValid())) {
145  if (!Authenticate(TUrl(host).GetUser())) {
146  TSocket::Close();
147  valid = kFALSE;
148  }
149  }
150  } else {
151  TSocket::Close();
152  valid = kFALSE;
153  }
154  }
155  }
156  // reset url to the original state
157  *pauth = '\0';
158  SetUrl(host);
159  }
160 
161  // open the sockets ...
162  if (!rootdSrv || fRemoteProtocol > 9) {
163  if (valid) {
164  fSize = size;
165  Init(tcpwindowsize);
166  }
167  }
168 }
169 
170 ////////////////////////////////////////////////////////////////////////////////
171 /// Create a parallel socket on a connection already opened via
172 /// TSocket sock.
173 /// This constructor is provided to optimize TNetFile opening when
174 /// instatiated via a call to TXNetFile.
175 /// Returns when connection has been accepted by remote side. Use IsValid()
176 /// to check the validity of the socket. Every socket is added to the TROOT
177 /// sockets list which will make sure that any open sockets are properly
178 /// closed on program termination.
179 
180 TPSocket::TPSocket(const char *host, Int_t port, Int_t size, TSocket *sock)
181 {
182  // To avoid uninitialization problems when Init is not called ...
183  fSockets = 0;
184  fWriteMonitor = 0;
185  fReadMonitor = 0;
186  fWriteBytesLeft = 0;
187  fReadBytesLeft = 0;
188  fWritePtr = 0;
189  fReadPtr = 0;
190 
191  // set to the real value only at end (except for old servers)
192  fSize = 1;
193 
194  // We need a opened connection
195  if (!sock) return;
196 
197  // Now import existing socket info
198  fSocket = sock->GetDescriptor();
199  fService = sock->GetService();
200  fAddress = sock->GetInetAddress();
201  fLocalAddress = sock->GetLocalInetAddress();
202  fBytesSent = sock->GetBytesSent();
203  fBytesRecv = sock->GetBytesRecv();
204  fCompress = sock->GetCompressionSettings();
205  fSecContext = sock->GetSecContext();
206  fRemoteProtocol = sock->GetRemoteProtocol();
207  fServType = (TSocket::EServiceType)sock->GetServType();
208  fTcpWindowSize = sock->GetTcpWindowSize();
209 
210  // to control the flow
211  Bool_t valid = sock->IsValid();
212 
213  // check if we are called from CreateAuthSocket()
214  Bool_t authreq = kFALSE;
215  char *pauth = (char *)strstr(host, "?A");
216  if (pauth) {
217  authreq = kTRUE;
218  }
219 
220  // perhaps we can use fServType here ... to be checked
221  Bool_t rootdSrv = (strstr(host,"rootd")) ? kTRUE : kFALSE;
222 
223  // try authentication , if required
224  if (authreq) {
225  if (valid) {
226  if (!Authenticate(TUrl(host).GetUser())) {
227  if (rootdSrv && (fRemoteProtocol > 0 && fRemoteProtocol < 10)) {
228  // We failed because we are talking to an old
229  // server: we need to re-open the connection
230  // and communicate the size first
231  Int_t tcpw = (size > 1 ? -1 : fTcpWindowSize);
232  TSocket *ns = new TSocket(host, port, tcpw);
233  if (ns->IsValid()) {
234  R__LOCKGUARD(gROOTMutex);
235  gROOT->GetListOfSockets()->Remove(ns);
236  fSocket = ns->GetDescriptor();
237  fSize = size;
238  Init(fTcpWindowSize);
239  }
240  if ((valid = IsValid())) {
241  if (!Authenticate(TUrl(host).GetUser())) {
242  TSocket::Close();
243  valid = kFALSE;
244  }
245  }
246  } else {
247  TSocket::Close();
248  valid = kFALSE;
249  }
250  }
251  }
252  // reset url to the original state
253  *pauth = '\0';
254  SetUrl(host);
255  }
256 
257  // open the sockets ...
258  if (!rootdSrv || fRemoteProtocol > 9) {
259  if (valid) {
260  fSize = size;
261  Init(fTcpWindowSize, sock);
262  }
263  }
264 
265  // Add to the list if everything OK
266  if (IsValid()) {
267  R__LOCKGUARD(gROOTMutex);
268  gROOT->GetListOfSockets()->Add(this);
269  }
270 }
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Create a parallel socket. This ctor is called by TPServerSocket.
274 
275 TPSocket::TPSocket(TSocket *pSockets[], Int_t size)
276 {
277  fSockets = pSockets;
278  fSize = size;
279 
280  // set descriptor if simple socket (needed when created
281  // by TPServerSocket)
282  if (fSize <= 1)
283  fSocket = fSockets[0]->GetDescriptor();
284 
285  // set socket options (no blocking and no delay)
286  SetOption(kNoDelay, 1);
287  if (fSize > 1)
288  SetOption(kNoBlock, 1);
289 
290  fWriteMonitor = new TMonitor;
291  fReadMonitor = new TMonitor;
292  fWriteBytesLeft = new Int_t[fSize];
293  fReadBytesLeft = new Int_t[fSize];
294  fWritePtr = new char*[fSize];
295  fReadPtr = new char*[fSize];
296 
297  for (int i = 0; i < fSize; i++) {
298  fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
299  fReadMonitor->Add(fSockets[i], TMonitor::kRead);
300  }
301  fWriteMonitor->DeActivateAll();
302  fReadMonitor->DeActivateAll();
303 
304  SetName(fSockets[0]->GetName());
305  SetTitle(fSockets[0]->GetTitle());
306  fAddress = fSockets[0]->GetInetAddress();
307 
308  {
309  R__LOCKGUARD(gROOTMutex);
310  gROOT->GetListOfSockets()->Add(this);
311  }
312 }
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 /// Cleanup the parallel socket.
316 
317 TPSocket::~TPSocket()
318 {
319  Close();
320 
321  delete fWriteMonitor;
322  delete fReadMonitor;
323  delete [] fWriteBytesLeft;
324  delete [] fReadBytesLeft;
325  delete [] fWritePtr;
326  delete [] fReadPtr;
327 }
328 
329 ////////////////////////////////////////////////////////////////////////////////
330 /// Close a parallel socket. If option is "force", calls shutdown(id,2) to
331 /// shut down the connection. This will close the connection also
332 /// for the parent of this process. Also called via the dtor (without
333 /// option "force", call explicitly Close("force") if this is desired).
334 
335 void TPSocket::Close(Option_t *option)
336 {
337 
338  if (!IsValid()) {
339  // if closing happens too early (e.g. timeout) the underlying
340  // socket may still be open
341  TSocket::Close(option);
342  return;
343  }
344 
345  if (fSize <= 1) {
346  TSocket::Close(option);
347  } else {
348  for (int i = 0; i < fSize; i++) {
349  fSockets[i]->Close(option);
350  delete fSockets[i];
351  }
352  }
353  delete [] fSockets;
354  fSockets = 0;
355 
356  {
357  R__LOCKGUARD(gROOTMutex);
358  gROOT->GetListOfSockets()->Remove(this);
359  }
360 }
361 
362 ////////////////////////////////////////////////////////////////////////////////
363 /// Create a parallel socket to the specified host.
364 
365 void TPSocket::Init(Int_t tcpwindowsize, TSocket *sock)
366 {
367  fSockets = 0;
368  fWriteMonitor = 0;
369  fReadMonitor = 0;
370  fWriteBytesLeft = 0;
371  fReadBytesLeft = 0;
372  fWritePtr = 0;
373  fReadPtr = 0;
374 
375  if ((sock && !sock->IsValid()) || !TSocket::IsValid())
376  return;
377 
378  Int_t i = 0;
379 
380  if (fSize <= 1) {
381  // check if single mode
382  fSize = 1;
383 
384  // set socket options (no delay)
385  if (sock)
386  sock->SetOption(kNoDelay, 1);
387  else
388  TSocket::SetOption(kNoDelay, 1);
389 
390  // if yes, communicate this to server
391  // (size = 0 for backward compatibility)
392  if (sock) {
393  if (sock->Send((Int_t)0, (Int_t)0) < 0)
394  Warning("Init", "%p: problems sending (0,0)", sock);
395  } else {
396  if (TSocket::Send((Int_t)0, (Int_t)0) < 0)
397  Warning("Init", "problems sending (0,0)");
398  }
399 
400  // needs to fill additional private members
401  fSockets = new TSocket*[1];
402  fSockets[0]= (TSocket *)this;
403 
404  } else {
405 
406  // create server that will be used to accept the parallel sockets from
407  // the remote host, use port=0 to scan for a free port
408  TServerSocket ss(0, kFALSE, fSize, tcpwindowsize);
409 
410  // send the local port number of the just created server socket and the
411  // number of desired parallel sockets
412  if (sock) {
413  if (sock->Send(ss.GetLocalPort(), fSize) < 0)
414  Warning("Init", "%p: problems sending size", sock);
415  } else {
416  if (TSocket::Send(ss.GetLocalPort(), fSize) < 0)
417  Warning("Init", "problems sending size");
418  }
419 
420  fSockets = new TSocket*[fSize];
421 
422  // establish fSize parallel socket connections between client and server
423  for (i = 0; i < fSize; i++) {
424  fSockets[i] = ss.Accept();
425  R__LOCKGUARD(gROOTMutex);
426  gROOT->GetListOfSockets()->Remove(fSockets[i]);
427  }
428 
429  // set socket options (no blocking and no delay)
430  SetOption(kNoDelay, 1);
431  SetOption(kNoBlock, 1);
432 
433  // close original socket
434  if (sock)
435  sock->Close();
436  else
437  gSystem->CloseConnection(fSocket, kFALSE);
438  fSocket = -1;
439  }
440 
441  fWriteMonitor = new TMonitor;
442  fReadMonitor = new TMonitor;
443  fWriteBytesLeft = new Int_t[fSize];
444  fReadBytesLeft = new Int_t[fSize];
445  fWritePtr = new char*[fSize];
446  fReadPtr = new char*[fSize];
447 
448  for (i = 0; i < fSize; i++) {
449  fWriteMonitor->Add(fSockets[i], TMonitor::kWrite);
450  fReadMonitor->Add(fSockets[i], TMonitor::kRead);
451  }
452  fWriteMonitor->DeActivateAll();
453  fReadMonitor->DeActivateAll();
454 }
455 
456 ////////////////////////////////////////////////////////////////////////////////
457 /// Return internet address of local host to which the socket is bound.
458 /// In case of error TInetAddress::IsValid() returns kFALSE.
459 
460 TInetAddress TPSocket::GetLocalInetAddress()
461 {
462  if (fSize<= 1)
463  return TSocket::GetLocalInetAddress();
464 
465  if (IsValid()) {
466  if (fLocalAddress.GetPort() == -1)
467  fLocalAddress = gSystem->GetSockName(fSockets[0]->GetDescriptor());
468  return fLocalAddress;
469  }
470  return TInetAddress();
471 }
472 
473 ////////////////////////////////////////////////////////////////////////////////
474 /// Return socket descriptor
475 
476 Int_t TPSocket::GetDescriptor() const
477 {
478  if (fSize <= 1)
479  return TSocket::GetDescriptor();
480 
481  return fSockets ? fSockets[0]->GetDescriptor() : -1;
482 
483 }
484 
485 ////////////////////////////////////////////////////////////////////////////////
486 /// Send a TMessage object. Returns the number of bytes in the TMessage
487 /// that were sent and -1 in case of error. In case the TMessage::What
488 /// has been or'ed with kMESS_ACK, the call will only return after having
489 /// received an acknowledgement, making the sending process synchronous.
490 /// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
491 
492 Int_t TPSocket::Send(const TMessage &mess)
493 {
494  if (!fSockets || fSize <= 1)
495  return TSocket::Send(mess); // only the case when called via Init()
496 
497  if (!IsValid()) {
498  return -1;
499  }
500 
501  if (mess.IsReading()) {
502  Error("Send", "cannot send a message used for reading");
503  return -1;
504  }
505 
506  // send streamer infos in case schema evolution is enabled in the TMessage
507  SendStreamerInfos(mess);
508 
509  // send the process id's so TRefs work
510  SendProcessIDs(mess);
511 
512  mess.SetLength(); //write length in first word of buffer
513 
514  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
515  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
516 
517  if (mess.GetCompressionLevel() > 0)
518  const_cast<TMessage&>(mess).Compress();
519 
520  char *mbuf = mess.Buffer();
521  Int_t mlen = mess.Length();
522  if (mess.CompBuffer()) {
523  mbuf = mess.CompBuffer();
524  mlen = mess.CompLength();
525  }
526 
527  Int_t nsent, ulen = (Int_t) sizeof(UInt_t);
528  // send length
529  if ((nsent = SendRaw(mbuf, ulen, kDefault)) <= 0)
530  return nsent;
531 
532  // send buffer (this might go in parallel)
533  if ((nsent = SendRaw(mbuf+ulen, mlen-ulen, kDefault)) <= 0)
534  return nsent;
535 
536  // if acknowledgement is desired, wait for it
537  if (mess.What() & kMESS_ACK) {
538  char buf[2];
539  if (RecvRaw(buf, sizeof(buf), kDefault) < 0)
540  return -1;
541  if (strncmp(buf, "ok", 2)) {
542  Error("Send", "bad acknowledgement");
543  return -1;
544  }
545  }
546 
547  return nsent; //length - length header
548 }
549 
550 ////////////////////////////////////////////////////////////////////////////////
551 /// Send a raw buffer of specified length. Returns the number of bytes
552 /// send and -1 in case of error.
553 
554 Int_t TPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
555 {
556  if (fSize == 1)
557  return TSocket::SendRaw(buffer,length,opt);
558 
559  if (!fSockets) return -1;
560 
561  // if data buffer size < 4K use only one socket
562  Int_t i, nsocks = fSize, len = length;
563  if (len < 4096)
564  nsocks = 1;
565 
566  ESendRecvOptions sendopt = kDontBlock;
567  if (nsocks == 1)
568  sendopt = kDefault;
569 
570  if (opt != kDefault) {
571  nsocks = 1;
572  sendopt = opt;
573  }
574 
575  if (nsocks == 1)
576  fSockets[0]->SetOption(kNoBlock, 0);
577  else
578  fSockets[0]->SetOption(kNoBlock, 1);
579 
580  // setup pointer appropriately for transferring data equally on the
581  // parallel sockets
582  for (i = 0; i < nsocks; i++) {
583  fWriteBytesLeft[i] = len/nsocks;
584  fWritePtr[i] = (char *)buffer + (i*fWriteBytesLeft[i]);
585  fWriteMonitor->Activate(fSockets[i]);
586  }
587  fWriteBytesLeft[nsocks-1] += len%nsocks;
588 
589  // send the data on the parallel sockets
590  while (len > 0) {
591  TSocket *s = fWriteMonitor->Select();
592  for (int is = 0; is < nsocks; is++) {
593  if (s == fSockets[is]) {
594  if (fWriteBytesLeft[is] > 0) {
595  Int_t nsent;
596 again:
597  ResetBit(TSocket::kBrokenConn);
598  if ((nsent = fSockets[is]->SendRaw(fWritePtr[is],
599  fWriteBytesLeft[is],
600  sendopt)) <= 0) {
601  if (nsent == -4) {
602  // got EAGAIN/EWOULDBLOCK error, keep trying...
603  goto again;
604  }
605  fWriteMonitor->DeActivateAll();
606  if (nsent == -5) {
607  // connection reset by peer or broken ...
608  SetBit(TSocket::kBrokenConn);
609  Close();
610  }
611  return -1;
612  }
613  if (opt == kDontBlock) {
614  fWriteMonitor->DeActivateAll();
615  return nsent;
616  }
617  fWriteBytesLeft[is] -= nsent;
618  fWritePtr[is] += nsent;
619  len -= nsent;
620  }
621  }
622  }
623  }
624  fWriteMonitor->DeActivateAll();
625 
626  return length;
627 }
628 
629 ////////////////////////////////////////////////////////////////////////////////
630 /// Receive a TMessage object. The user must delete the TMessage object.
631 /// Returns length of message in bytes (can be 0 if other side of connection
632 /// is closed) or -1 in case of error or -4 in case a non-blocking socket would
633 /// block (i.e. there is nothing to be read). In those case mess == 0.
634 
635 Int_t TPSocket::Recv(TMessage *&mess)
636 {
637  if (fSize <= 1)
638  return TSocket::Recv(mess);
639 
640  if (!IsValid()) {
641  mess = 0;
642  return -1;
643  }
644 
645 oncemore:
646  Int_t n;
647  UInt_t len;
648  if ((n = RecvRaw(&len, sizeof(UInt_t), kDefault)) <= 0) {
649  mess = 0;
650  return n;
651  }
652  len = net2host(len); //from network to host byte order
653 
654  char *buf = new char[len+sizeof(UInt_t)];
655  if ((n = RecvRaw(buf+sizeof(UInt_t), len, kDefault)) <= 0) {
656  delete [] buf;
657  mess = 0;
658  return n;
659  }
660 
661  mess = new TMessage(buf, len+sizeof(UInt_t));
662 
663  // receive any streamer infos
664  if (RecvStreamerInfos(mess))
665  goto oncemore;
666 
667  // receive any process ids
668  if (RecvProcessIDs(mess))
669  goto oncemore;
670 
671  if (mess->What() & kMESS_ACK) {
672  char ok[2] = { 'o', 'k' };
673  if (SendRaw(ok, sizeof(ok), kDefault) < 0) {
674  delete mess;
675  mess = 0;
676  return -1;
677  }
678  mess->SetWhat(mess->What() & ~kMESS_ACK);
679  }
680 
681  return n;
682 }
683 
684 ////////////////////////////////////////////////////////////////////////////////
685 /// Send a raw buffer of specified length. Returns the number of bytes
686 /// sent or -1 in case of error.
687 
688 Int_t TPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
689 {
690  if (fSize <= 1)
691  return TSocket::RecvRaw(buffer,length,opt);
692 
693  if (!fSockets) return -1;
694 
695  // if data buffer size < 4K use only one socket
696  Int_t i, nsocks = fSize, len = length;
697  if (len < 4096)
698  nsocks = 1;
699 
700  ESendRecvOptions recvopt = kDontBlock;
701  if (nsocks == 1)
702  recvopt = kDefault;
703 
704  if (opt != kDefault) {
705  nsocks = 1;
706  recvopt = opt;
707  }
708 
709  if (nsocks == 1)
710  fSockets[0]->SetOption(kNoBlock, 0);
711  else
712  fSockets[0]->SetOption(kNoBlock, 1);
713 
714  // setup pointer appropriately for transferring data equally on the
715  // parallel sockets
716  for (i = 0; i < nsocks; i++) {
717  fReadBytesLeft[i] = len/nsocks;
718  fReadPtr[i] = (char *)buffer + (i*fReadBytesLeft[i]);
719  fReadMonitor->Activate(fSockets[i]);
720  }
721  fReadBytesLeft[nsocks-1] += len%nsocks;
722 
723  // start receiving data on all sockets. Receive data as and when
724  // they are available on a socket by by using select.
725  // Exit the loop as soon as all data has been received.
726  while (len > 0) {
727  TSocket *s = fReadMonitor->Select();
728  for (int is = 0; is < nsocks; is++) {
729  if (s == fSockets[is]) {
730  if (fReadBytesLeft[is] > 0) {
731  Int_t nrecv;
732  ResetBit(TSocket::kBrokenConn);
733  if ((nrecv = fSockets[is]->RecvRaw(fReadPtr[is],
734  fReadBytesLeft[is],
735  recvopt)) <= 0) {
736  fReadMonitor->DeActivateAll();
737  if (nrecv == -5) {
738  // connection reset by peer or broken ...
739  SetBit(TSocket::kBrokenConn);
740  Close();
741  }
742  return -1;
743  }
744  if (opt == kDontBlock) {
745  fReadMonitor->DeActivateAll();
746  return nrecv;
747  }
748  fReadBytesLeft[is] -= nrecv;
749  fReadPtr[is] += nrecv;
750  len -= nrecv;
751  }
752  }
753  }
754  }
755  fReadMonitor->DeActivateAll();
756 
757  return length;
758 }
759 
760 ////////////////////////////////////////////////////////////////////////////////
761 /// Set socket options.
762 
763 Int_t TPSocket::SetOption(ESockOptions opt, Int_t val)
764 {
765  if (fSize <= 1)
766  return TSocket::SetOption(opt,val);
767 
768  Int_t ret = 0;
769  for (int i = 0; i < fSize; i++)
770  ret = fSockets[i]->SetOption(opt, val);
771  return ret;
772 }
773 
774 ////////////////////////////////////////////////////////////////////////////////
775 /// Get socket options. Returns -1 in case of error.
776 
777 Int_t TPSocket::GetOption(ESockOptions opt, Int_t &val)
778 {
779  if (fSize <= 1)
780  return TSocket::GetOption(opt,val);
781 
782  Int_t ret = 0;
783  for (int i = 0; i < fSize; i++)
784  ret = fSockets[i]->GetOption(opt, val);
785  return ret;
786 }
787 
788 ////////////////////////////////////////////////////////////////////////////////
789 /// Returns error code. Meaning depends on context where it is called.
790 /// If no error condition returns 0 else a value < 0.
791 
792 Int_t TPSocket::GetErrorCode() const
793 {
794  if (fSize <= 1)
795  return TSocket::GetErrorCode();
796 
797  return fSockets[0] ? fSockets[0]->GetErrorCode() : 0;
798 }