Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TUDPSocket.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Marcelo Sousa 26/10/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TUDPSocket //
15 // //
16 // This class implements UDP client sockets. A socket is an endpoint //
17 // for communication between two machines. //
18 // The actual work is done via the TSystem class (either TUnixSystem //
19 // or TWinNTSystem). //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #include "Bytes.h"
24 #include "Compression.h"
25 #include "NetErrors.h"
26 #include "TEnv.h"
27 #include "TError.h"
28 #include "TMessage.h"
29 #include "TUDPSocket.h"
30 #include "TPluginManager.h"
31 #include "TROOT.h"
32 #include "TString.h"
33 #include "TSystem.h"
34 #include "TUrl.h"
35 #include "TVirtualAuth.h"
36 #include "TStreamerInfo.h"
37 #include "TProcessID.h"
38 
39 ULong64_t TUDPSocket::fgBytesSent = 0;
40 ULong64_t TUDPSocket::fgBytesRecv = 0;
41 
42 
43 ClassImp(TUDPSocket);
44 
45 ////////////////////////////////////////////////////////////////////////////////
46 /// Create a socket. Connect to the named service at address addr.
47 /// Use tcpwindowsize to specify the size of the receive buffer, it has
48 /// to be specified here to make sure the window scale option is set (for
49 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
50 /// Returns when connection has been accepted by remote side. Use IsValid()
51 /// to check the validity of the socket. Every socket is added to the TROOT
52 /// sockets list which will make sure that any open sockets are properly
53 /// closed on program termination.
54 
55 TUDPSocket::TUDPSocket(TInetAddress addr, const char *service)
56  : TNamed(addr.GetHostName(), service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
57 {
58  R__ASSERT(gROOT);
59  R__ASSERT(gSystem);
60 
61  fService = service;
62  fSecContext = 0;
63  fRemoteProtocol= -1;
64  fServType = kSOCKD;
65  if (fService.Contains("root"))
66  fServType = kROOTD;
67  if (fService.Contains("proof"))
68  fServType = kPROOFD;
69  fAddress = addr;
70  fAddress.fPort = gSystem->GetServiceByName(service);
71  fBytesSent = 0;
72  fBytesRecv = 0;
73  fUUIDs = 0;
74  fLastUsageMtx = 0;
75  ResetBit(TUDPSocket::kBrokenConn);
76 
77  if (fAddress.GetPort() != -1) {
78  fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
79  -1, "upd");
80 
81  if (fSocket != -1) {
82  R__LOCKGUARD(gROOTMutex);
83  gROOT->GetListOfSockets()->Add(this);
84  }
85  } else
86  fSocket = -1;
87 
88 }
89 
90 
91 ////////////////////////////////////////////////////////////////////////////////
92 /// Create a socket. Connect to the specified port # at address addr.
93 /// Use tcpwindowsize to specify the size of the receive buffer, it has
94 /// to be specified here to make sure the window scale option is set (for
95 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
96 /// Returns when connection has been accepted by remote side. Use IsValid()
97 /// to check the validity of the socket. Every socket is added to the TROOT
98 /// sockets list which will make sure that any open sockets are properly
99 /// closed on program termination.
100 
101 TUDPSocket::TUDPSocket(TInetAddress addr, Int_t port)
102  : TNamed(addr.GetHostName(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
103 {
104  R__ASSERT(gROOT);
105  R__ASSERT(gSystem);
106 
107  fService = gSystem->GetServiceByPort(port);
108  fSecContext = 0;
109  fRemoteProtocol= -1;
110  fServType = kSOCKD;
111  if (fService.Contains("root"))
112  fServType = kROOTD;
113  if (fService.Contains("proof"))
114  fServType = kPROOFD;
115  fAddress = addr;
116  fAddress.fPort = port;
117  SetTitle(fService);
118  fBytesSent = 0;
119  fBytesRecv = 0;
120  fUUIDs = 0;
121  fLastUsageMtx = 0;
122  ResetBit(TUDPSocket::kBrokenConn);
123 
124  fSocket = gSystem->OpenConnection(addr.GetHostName(), fAddress.GetPort(),
125  -1, "upd");
126  if (fSocket == -1)
127  fAddress.fPort = -1;
128  else {
129  R__LOCKGUARD(gROOTMutex);
130  gROOT->GetListOfSockets()->Add(this);
131  }
132 }
133 
134 ////////////////////////////////////////////////////////////////////////////////
135 /// Create a socket. Connect to named service on the remote host.
136 /// Use tcpwindowsize to specify the size of the receive buffer, it has
137 /// to be specified here to make sure the window scale option is set (for
138 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
139 /// Returns when connection has been accepted by remote side. Use IsValid()
140 /// to check the validity of the socket. Every socket is added to the TROOT
141 /// sockets list which will make sure that any open sockets are properly
142 /// closed on program termination.
143 
144 TUDPSocket::TUDPSocket(const char *host, const char *service)
145  : TNamed(host, service), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
146 {
147  R__ASSERT(gROOT);
148  R__ASSERT(gSystem);
149 
150  fService = service;
151  fSecContext = 0;
152  fRemoteProtocol= -1;
153  fServType = kSOCKD;
154  if (fService.Contains("root"))
155  fServType = kROOTD;
156  if (fService.Contains("proof"))
157  fServType = kPROOFD;
158  fAddress = gSystem->GetHostByName(host);
159  fAddress.fPort = gSystem->GetServiceByName(service);
160  SetName(fAddress.GetHostName());
161  fBytesSent = 0;
162  fBytesRecv = 0;
163  fUUIDs = 0;
164  fLastUsageMtx = 0;
165  ResetBit(TUDPSocket::kBrokenConn);
166 
167  if (fAddress.GetPort() != -1) {
168  fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "upd");
169  if (fSocket != -1) {
170  R__LOCKGUARD(gROOTMutex);
171  gROOT->GetListOfSockets()->Add(this);
172  }
173  } else
174  fSocket = -1;
175 }
176 
177 ////////////////////////////////////////////////////////////////////////////////
178 /// Create a socket; see CreateAuthSocket for the form of url.
179 /// Connect to the specified port # on the remote host.
180 /// If user is specified in url, try authentication as user.
181 /// Use tcpwindowsize to specify the size of the receive buffer, it has
182 /// to be specified here to make sure the window scale option is set (for
183 /// tcpwindowsize > 65KB and for platforms supporting window scaling).
184 /// Returns when connection has been accepted by remote side. Use IsValid()
185 /// to check the validity of the socket. Every socket is added to the TROOT
186 /// sockets list which will make sure that any open sockets are properly
187 /// closed on program termination.
188 
189 TUDPSocket::TUDPSocket(const char *url, Int_t port)
190  : TNamed(TUrl(url).GetHost(), ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
191 {
192  R__ASSERT(gROOT);
193  R__ASSERT(gSystem);
194 
195  fUrl = TString(url);
196  TString host(TUrl(fUrl).GetHost());
197 
198  fService = gSystem->GetServiceByPort(port);
199  fSecContext = 0;
200  fRemoteProtocol= -1;
201  fServType = kSOCKD;
202  if (fUrl.Contains("root"))
203  fServType = kROOTD;
204  if (fUrl.Contains("proof"))
205  fServType = kPROOFD;
206  fAddress = gSystem->GetHostByName(host);
207  fAddress.fPort = port;
208  SetName(fAddress.GetHostName());
209  SetTitle(fService);
210  fBytesSent = 0;
211  fBytesRecv = 0;
212  fUUIDs = 0;
213  fLastUsageMtx = 0;
214  ResetBit(TUDPSocket::kBrokenConn);
215 
216  fSocket = gSystem->OpenConnection(host, fAddress.GetPort(), -1, "udp");
217  if (fSocket == -1) {
218  fAddress.fPort = -1;
219  } else {
220  R__LOCKGUARD(gROOTMutex);
221  gROOT->GetListOfSockets()->Add(this);
222  }
223 }
224 
225 ////////////////////////////////////////////////////////////////////////////////
226 /// Create a socket in the Unix domain on 'sockpath'.
227 /// Returns when connection has been accepted by the server. Use IsValid()
228 /// to check the validity of the socket. Every socket is added to the TROOT
229 /// sockets list which will make sure that any open sockets are properly
230 /// closed on program termination.
231 
232 TUDPSocket::TUDPSocket(const char *sockpath) : TNamed(sockpath, ""),
233  fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
234 {
235  R__ASSERT(gROOT);
236  R__ASSERT(gSystem);
237 
238  fUrl = sockpath;
239 
240  fService = "unix";
241  fSecContext = 0;
242  fRemoteProtocol= -1;
243  fServType = kSOCKD;
244  fAddress.fPort = -1;
245  fName.Form("unix:%s", sockpath);
246  SetTitle(fService);
247  fBytesSent = 0;
248  fBytesRecv = 0;
249  fUUIDs = 0;
250  fLastUsageMtx = 0;
251  ResetBit(TUDPSocket::kBrokenConn);
252 
253  fSocket = gSystem->OpenConnection(sockpath, -1, -1, "udp");
254  if (fSocket > 0) {
255  R__LOCKGUARD(gROOTMutex);
256  gROOT->GetListOfSockets()->Add(this);
257  }
258 }
259 
260 ////////////////////////////////////////////////////////////////////////////////
261 /// Create a socket. The socket will adopt previously opened TCP socket with
262 /// descriptor desc.
263 
264 TUDPSocket::TUDPSocket(Int_t desc) : TNamed("", ""), fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
265 {
266  R__ASSERT(gROOT);
267  R__ASSERT(gSystem);
268 
269  fSecContext = 0;
270  fRemoteProtocol = 0;
271  fService = (char *)kSOCKD;
272  fServType = kSOCKD;
273  fBytesSent = 0;
274  fBytesRecv = 0;
275  fUUIDs = 0;
276  fLastUsageMtx = 0;
277  ResetBit(TUDPSocket::kBrokenConn);
278 
279  if (desc >= 0) {
280  fSocket = desc;
281  fAddress = gSystem->GetPeerName(fSocket);
282  R__LOCKGUARD(gROOTMutex);
283  gROOT->GetListOfSockets()->Add(this);
284  } else
285  fSocket = -1;
286 }
287 
288 ////////////////////////////////////////////////////////////////////////////////
289 /// Create a socket. The socket will adopt previously opened Unix socket with
290 /// descriptor desc. The sockpath arg is for info purposes only. Use
291 /// this method to adopt e.g. a socket created via socketpair().
292 
293 TUDPSocket::TUDPSocket(Int_t desc, const char *sockpath) : TNamed(sockpath, ""),
294  fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
295 {
296  R__ASSERT(gROOT);
297  R__ASSERT(gSystem);
298 
299  fUrl = sockpath;
300 
301  fService = "unix";
302  fSecContext = 0;
303  fRemoteProtocol= -1;
304  fServType = kSOCKD;
305  fAddress.fPort = -1;
306  fName.Form("unix:%s", sockpath);
307  SetTitle(fService);
308  fBytesSent = 0;
309  fBytesRecv = 0;
310  fUUIDs = 0;
311  fLastUsageMtx = 0;
312  ResetBit(TUDPSocket::kBrokenConn);
313 
314  if (desc >= 0) {
315  fSocket = desc;
316  R__LOCKGUARD(gROOTMutex);
317  gROOT->GetListOfSockets()->Add(this);
318  } else
319  fSocket = -1;
320 }
321 
322 
323 ////////////////////////////////////////////////////////////////////////////////
324 /// TUDPSocket copy ctor.
325 
326 TUDPSocket::TUDPSocket(const TUDPSocket &s) : TNamed(s)
327 {
328  fSocket = s.fSocket;
329  fService = s.fService;
330  fAddress = s.fAddress;
331  fLocalAddress = s.fLocalAddress;
332  fBytesSent = s.fBytesSent;
333  fBytesRecv = s.fBytesRecv;
334  fCompress = s.fCompress;
335  fSecContext = s.fSecContext;
336  fRemoteProtocol = s.fRemoteProtocol;
337  fServType = s.fServType;
338  fUUIDs = 0;
339  fLastUsageMtx = 0;
340  ResetBit(TUDPSocket::kBrokenConn);
341 
342  if (fSocket != -1) {
343  R__LOCKGUARD(gROOTMutex);
344  gROOT->GetListOfSockets()->Add(this);
345  }
346 }
347 
348 ////////////////////////////////////////////////////////////////////////////////
349 /// Close the socket. If option is "force", calls shutdown(id,2) to
350 /// shut down the connection. This will close the connection also
351 /// for the parent of this process. Also called via the dtor (without
352 /// option "force", call explicitly Close("force") if this is desired).
353 
354 void TUDPSocket::Close(Option_t *option)
355 {
356  Bool_t force = option ? (!strcmp(option, "force") ? kTRUE : kFALSE) : kFALSE;
357 
358  if (fSocket != -1) {
359  gSystem->CloseConnection(fSocket, force);
360  R__LOCKGUARD(gROOTMutex);
361  gROOT->GetListOfSockets()->Remove(this);
362  }
363  fSocket = -1;
364 
365  SafeDelete(fUUIDs);
366  SafeDelete(fLastUsageMtx);
367 }
368 
369 ////////////////////////////////////////////////////////////////////////////////
370 /// Return internet address of local host to which the socket is bound.
371 /// In case of error TInetAddress::IsValid() returns kFALSE.
372 
373 TInetAddress TUDPSocket::GetLocalInetAddress()
374 {
375  if (IsValid()) {
376  if (fLocalAddress.GetPort() == -1)
377  fLocalAddress = gSystem->GetSockName(fSocket);
378  return fLocalAddress;
379  }
380  return TInetAddress();
381 }
382 
383 ////////////////////////////////////////////////////////////////////////////////
384 /// Return the local port # to which the socket is bound.
385 /// In case of error return -1.
386 
387 Int_t TUDPSocket::GetLocalPort()
388 {
389  if (IsValid()) {
390  if (fLocalAddress.GetPort() == -1)
391  GetLocalInetAddress();
392  return fLocalAddress.GetPort();
393  }
394  return -1;
395 }
396 
397 ////////////////////////////////////////////////////////////////////////////////
398 /// Waits for this socket to change status. If interest=kRead,
399 /// the socket will be watched to see if characters become available for
400 /// reading; if interest=kWrite the socket will be watched to
401 /// see if a write will not block.
402 /// The argument 'timeout' specifies a maximum time to wait in millisec.
403 /// Default no timeout.
404 /// Returns 1 if a change of status of interest has been detected within
405 /// timeout; 0 in case of timeout; < 0 if an error occured.
406 
407 Int_t TUDPSocket::Select(Int_t interest, Long_t timeout)
408 {
409  Int_t rc = 1;
410 
411  // Associate a TFileHandler to this socket
412  TFileHandler fh(fSocket, interest);
413 
414  // Wait for an event now
415  rc = gSystem->Select(&fh, timeout);
416 
417  return rc;
418 }
419 
420 ////////////////////////////////////////////////////////////////////////////////
421 /// Send a single message opcode. Use kind (opcode) to set the
422 /// TMessage "what" field. Returns the number of bytes that were sent
423 /// (always sizeof(Int_t)) and -1 in case of error. In case the kind has
424 /// been or'ed with kMESS_ACK, the call will only return after having
425 /// received an acknowledgement, making the sending process synchronous.
426 
427 Int_t TUDPSocket::Send(Int_t kind)
428 {
429  TMessage mess(kind);
430 
431  Int_t nsent;
432  if ((nsent = Send(mess)) < 0)
433  return -1;
434 
435  return nsent;
436 }
437 
438 ////////////////////////////////////////////////////////////////////////////////
439 /// Send a status and a single message opcode. Use kind (opcode) to set the
440 /// TMessage "what" field. Returns the number of bytes that were sent
441 /// (always 2*sizeof(Int_t)) and -1 in case of error. In case the kind has
442 /// been or'ed with kMESS_ACK, the call will only return after having
443 /// received an acknowledgement, making the sending process synchronous.
444 
445 Int_t TUDPSocket::Send(Int_t status, Int_t kind)
446 {
447  TMessage mess(kind);
448  mess << status;
449 
450  Int_t nsent;
451  if ((nsent = Send(mess)) < 0)
452  return -1;
453 
454  return nsent;
455 }
456 
457 ////////////////////////////////////////////////////////////////////////////////
458 /// Send a character string buffer. Use kind to set the TMessage "what" field.
459 /// Returns the number of bytes in the string str that were sent and -1 in
460 /// case of error. In case the kind has been or'ed with kMESS_ACK, the call
461 /// will only return after having received an acknowledgement, making the
462 /// sending process synchronous.
463 
464 Int_t TUDPSocket::Send(const char *str, Int_t kind)
465 {
466  TMessage mess(kind);
467  if (str) mess.WriteString(str);
468 
469  Int_t nsent;
470  if ((nsent = Send(mess)) < 0)
471  return -1;
472 
473  return nsent - sizeof(Int_t); // - TMessage::What()
474 }
475 
476 ////////////////////////////////////////////////////////////////////////////////
477 /// Send a TMessage object. Returns the number of bytes in the TMessage
478 /// that were sent and -1 in case of error. In case the TMessage::What
479 /// has been or'ed with kMESS_ACK, the call will only return after having
480 /// received an acknowledgement, making the sending process synchronous.
481 /// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
482 /// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
483 /// support for streaming TStreamerInfo added by Rene Brun May 2008
484 /// support for streaming TProcessID added by Rene Brun June 2008
485 
486 Int_t TUDPSocket::Send(const TMessage &mess)
487 {
488  TSystem::ResetErrno();
489 
490  if (fSocket == -1) return -1;
491 
492  if (mess.IsReading()) {
493  Error("Send", "cannot send a message used for reading");
494  return -1;
495  }
496 
497  // send streamer infos in case schema evolution is enabled in the TMessage
498  SendStreamerInfos(mess);
499 
500  // send the process id's so TRefs work
501  SendProcessIDs(mess);
502 
503  mess.SetLength(); //write length in first word of buffer
504 
505  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
506  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
507 
508  if (mess.GetCompressionLevel() > 0)
509  const_cast<TMessage&>(mess).Compress();
510 
511  char *mbuf = mess.Buffer();
512  Int_t mlen = mess.Length();
513  if (mess.CompBuffer()) {
514  mbuf = mess.CompBuffer();
515  mlen = mess.CompLength();
516  }
517 
518  ResetBit(TUDPSocket::kBrokenConn);
519  Int_t nsent;
520  if ((nsent = gSystem->SendRaw(fSocket, mbuf, mlen, 0)) <= 0) {
521  if (nsent == -5) {
522  // Connection reset by peer or broken
523  SetBit(TUDPSocket::kBrokenConn);
524  Close();
525  }
526  return nsent;
527  }
528 
529  fBytesSent += nsent;
530  fgBytesSent += nsent;
531 
532  // If acknowledgement is desired, wait for it
533  if (mess.What() & kMESS_ACK) {
534  TSystem::ResetErrno();
535  ResetBit(TUDPSocket::kBrokenConn);
536  char buf[2];
537  Int_t n = 0;
538  if ((n = gSystem->RecvRaw(fSocket, buf, sizeof(buf), 0)) < 0) {
539  if (n == -5) {
540  // Connection reset by peer or broken
541  SetBit(TUDPSocket::kBrokenConn);
542  Close();
543  } else
544  n = -1;
545  return n;
546  }
547  if (strncmp(buf, "ok", 2)) {
548  Error("Send", "bad acknowledgement");
549  return -1;
550  }
551  fBytesRecv += 2;
552  fgBytesRecv += 2;
553  }
554 
555  Touch(); // update usage timestamp
556 
557  return nsent - sizeof(UInt_t); //length - length header
558 }
559 
560 ////////////////////////////////////////////////////////////////////////////////
561 /// Send an object. Returns the number of bytes sent and -1 in case of error.
562 /// In case the "kind" has been or'ed with kMESS_ACK, the call will only
563 /// return after having received an acknowledgement, making the sending
564 /// synchronous.
565 
566 Int_t TUDPSocket::SendObject(const TObject *obj, Int_t kind)
567 {
568  //stream object to message buffer
569  TMessage mess(kind);
570  mess.WriteObject(obj);
571 
572  //now sending the object itself
573  Int_t nsent;
574  if ((nsent = Send(mess)) < 0)
575  return -1;
576 
577  return nsent;
578 }
579 
580 ////////////////////////////////////////////////////////////////////////////////
581 /// Send a raw buffer of specified length. Using option kOob one can send
582 /// OOB data. Returns the number of bytes sent or -1 in case of error.
583 /// Returns -4 in case of kNoBlock and errno == EWOULDBLOCK.
584 /// Returns -5 if pipe broken or reset by peer (EPIPE || ECONNRESET).
585 
586 Int_t TUDPSocket::SendRaw(const void *buffer, Int_t length, ESendRecvOptions opt)
587 {
588  TSystem::ResetErrno();
589 
590  if (fSocket == -1) return -1;
591 
592  ResetBit(TUDPSocket::kBrokenConn);
593  Int_t nsent;
594  if ((nsent = gSystem->SendRaw(fSocket, buffer, length, (int) opt)) <= 0) {
595  if (nsent == -5) {
596  // Connection reset or broken: close
597  SetBit(TUDPSocket::kBrokenConn);
598  Close();
599  }
600  return nsent;
601  }
602 
603  fBytesSent += nsent;
604  fgBytesSent += nsent;
605 
606  Touch(); // update usage timestamp
607 
608  return nsent;
609 }
610 
611 ////////////////////////////////////////////////////////////////////////////////
612 /// Check if TStreamerInfo must be sent. The list of TStreamerInfo of classes
613 /// in the object in the message is in the fInfos list of the message.
614 /// We send only the TStreamerInfos not yet sent on this socket.
615 
616 void TUDPSocket::SendStreamerInfos(const TMessage &mess)
617 {
618  if (mess.fInfos && mess.fInfos->GetEntries()) {
619  TIter next(mess.fInfos);
620  TStreamerInfo *info;
621  TList *minilist = 0;
622  while ((info = (TStreamerInfo*)next())) {
623  Int_t uid = info->GetNumber();
624  if (fBitsInfo.TestBitNumber(uid))
625  continue; //TStreamerInfo had already been sent
626  fBitsInfo.SetBitNumber(uid);
627  if (!minilist)
628  minilist = new TList();
629  if (gDebug > 0)
630  Info("SendStreamerInfos", "sending TStreamerInfo: %s, version = %d",
631  info->GetName(),info->GetClassVersion());
632  minilist->Add(info);
633  }
634  if (minilist) {
635  TMessage messinfo(kMESS_STREAMERINFO);
636  messinfo.WriteObject(minilist);
637  delete minilist;
638  if (messinfo.fInfos)
639  messinfo.fInfos->Clear();
640  if (Send(messinfo) < 0)
641  Warning("SendStreamerInfos", "problems sending TStreamerInfo's ...");
642  }
643  }
644 }
645 
646 ////////////////////////////////////////////////////////////////////////////////
647 /// Check if TProcessIDs must be sent. The list of TProcessIDs
648 /// in the object in the message is found by looking in the TMessage bits.
649 /// We send only the TProcessIDs not yet send on this socket.
650 
651 void TUDPSocket::SendProcessIDs(const TMessage &mess)
652 {
653  if (mess.TestBitNumber(0)) {
654  TObjArray *pids = TProcessID::GetPIDs();
655  Int_t npids = pids->GetEntries();
656  TProcessID *pid;
657  TList *minilist = 0;
658  for (Int_t ipid = 0; ipid < npids; ipid++) {
659  pid = (TProcessID*)pids->At(ipid);
660  if (!pid || !mess.TestBitNumber(pid->GetUniqueID()+1))
661  continue;
662  //check if a pid with this title has already been sent through the socket
663  //if not add it to the fUUIDs list
664  if (!fUUIDs) {
665  fUUIDs = new TList();
666  } else {
667  if (fUUIDs->FindObject(pid->GetTitle()))
668  continue;
669  }
670  fUUIDs->Add(new TObjString(pid->GetTitle()));
671  if (!minilist)
672  minilist = new TList();
673  if (gDebug > 0)
674  Info("SendProcessIDs", "sending TProcessID: %s", pid->GetTitle());
675  minilist->Add(pid);
676  }
677  if (minilist) {
678  TMessage messpid(kMESS_PROCESSID);
679  messpid.WriteObject(minilist);
680  delete minilist;
681  if (Send(messpid) < 0)
682  Warning("SendProcessIDs", "problems sending TProcessID's ...");
683  }
684  }
685 }
686 
687 ////////////////////////////////////////////////////////////////////////////////
688 /// Receive a character string message of maximum max length. The expected
689 /// message must be of type kMESS_STRING. Returns length of received string
690 /// (can be 0 if otherside of connection is closed) or -1 in case of error
691 /// or -4 in case a non-blocking socket would block (i.e. there is nothing
692 /// to be read).
693 
694 Int_t TUDPSocket::Recv(char *str, Int_t max)
695 {
696  Int_t n, kind;
697 
698  ResetBit(TUDPSocket::kBrokenConn);
699  if ((n = Recv(str, max, kind)) <= 0) {
700  if (n == -5) {
701  SetBit(TUDPSocket::kBrokenConn);
702  n = -1;
703  }
704  return n;
705  }
706 
707  if (kind != kMESS_STRING) {
708  Error("Recv", "got message of wrong kind (expected %d, got %d)",
709  kMESS_STRING, kind);
710  return -1;
711  }
712 
713  return n;
714 }
715 
716 ////////////////////////////////////////////////////////////////////////////////
717 /// Receive a character string message of maximum max length. Returns in
718 /// kind the message type. Returns length of received string+4 (can be 0 if
719 /// other side of connection is closed) or -1 in case of error or -4 in
720 /// case a non-blocking socket would block (i.e. there is nothing to be read).
721 
722 Int_t TUDPSocket::Recv(char *str, Int_t max, Int_t &kind)
723 {
724  Int_t n;
725  TMessage *mess;
726 
727  ResetBit(TUDPSocket::kBrokenConn);
728  if ((n = Recv(mess)) <= 0) {
729  if (n == -5) {
730  SetBit(TUDPSocket::kBrokenConn);
731  n = -1;
732  }
733  return n;
734  }
735 
736  kind = mess->What();
737  if (str) {
738  if (mess->BufferSize() > (Int_t)sizeof(Int_t)) // if mess contains more than kind
739  mess->ReadString(str, max);
740  else
741  str[0] = 0;
742  }
743 
744  delete mess;
745 
746  return n; // number of bytes read (len of str + sizeof(kind)
747 }
748 
749 ////////////////////////////////////////////////////////////////////////////////
750 /// Receives a status and a message type. Returns length of received
751 /// integers, 2*sizeof(Int_t) (can be 0 if other side of connection
752 /// is closed) or -1 in case of error or -4 in case a non-blocking
753 /// socket would block (i.e. there is nothing to be read).
754 
755 Int_t TUDPSocket::Recv(Int_t &status, Int_t &kind)
756 {
757  Int_t n;
758  TMessage *mess;
759 
760  ResetBit(TUDPSocket::kBrokenConn);
761  if ((n = Recv(mess)) <= 0) {
762  if (n == -5) {
763  SetBit(TUDPSocket::kBrokenConn);
764  n = -1;
765  }
766  return n;
767  }
768 
769  kind = mess->What();
770  (*mess) >> status;
771 
772  delete mess;
773 
774  return n; // number of bytes read (2 * sizeof(Int_t)
775 }
776 
777 ////////////////////////////////////////////////////////////////////////////////
778 /// Receive a TMessage object. The user must delete the TMessage object.
779 /// Returns length of message in bytes (can be 0 if other side of connection
780 /// is closed) or -1 in case of error or -4 in case a non-blocking socket
781 /// would block (i.e. there is nothing to be read) or -5 if pipe broken
782 /// or reset by peer (EPIPE || ECONNRESET). In those case mess == 0.
783 
784 Int_t TUDPSocket::Recv(TMessage *&mess)
785 {
786  TSystem::ResetErrno();
787 
788  if (fSocket == -1) {
789  mess = 0;
790  return -1;
791  }
792 
793 oncemore:
794  ResetBit(TUDPSocket::kBrokenConn);
795  Int_t n;
796  UInt_t len;
797  if ((n = gSystem->RecvRaw(fSocket, &len, sizeof(UInt_t), 0)) <= 0) {
798  if (n == 0 || n == -5) {
799  // Connection closed, reset or broken
800  SetBit(TUDPSocket::kBrokenConn);
801  Close();
802  }
803  mess = 0;
804  return n;
805  }
806  len = net2host(len); //from network to host byte order
807 
808  ResetBit(TUDPSocket::kBrokenConn);
809  char *buf = new char[len+sizeof(UInt_t)];
810  if ((n = gSystem->RecvRaw(fSocket, buf+sizeof(UInt_t), len, 0)) <= 0) {
811  if (n == 0 || n == -5) {
812  // Connection closed, reset or broken
813  SetBit(TUDPSocket::kBrokenConn);
814  Close();
815  }
816  delete [] buf;
817  mess = 0;
818  return n;
819  }
820 
821  fBytesRecv += n + sizeof(UInt_t);
822  fgBytesRecv += n + sizeof(UInt_t);
823 
824  mess = new TMessage(buf, len+sizeof(UInt_t));
825 
826  // receive any streamer infos
827  if (RecvStreamerInfos(mess))
828  goto oncemore;
829 
830  // receive any process ids
831  if (RecvProcessIDs(mess))
832  goto oncemore;
833 
834  if (mess->What() & kMESS_ACK) {
835  ResetBit(TUDPSocket::kBrokenConn);
836  char ok[2] = { 'o', 'k' };
837  Int_t n2 = 0;
838  if ((n2 = gSystem->SendRaw(fSocket, ok, sizeof(ok), 0)) < 0) {
839  if (n2 == -5) {
840  // Connection reset or broken
841  SetBit(TUDPSocket::kBrokenConn);
842  Close();
843  }
844  delete mess;
845  mess = 0;
846  return n2;
847  }
848  mess->SetWhat(mess->What() & ~kMESS_ACK);
849 
850  fBytesSent += 2;
851  fgBytesSent += 2;
852  }
853 
854  Touch(); // update usage timestamp
855 
856  return n;
857 }
858 
859 ////////////////////////////////////////////////////////////////////////////////
860 /// Receive a raw buffer of specified length bytes. Using option kPeek
861 /// one can peek at incoming data. Returns number of received bytes.
862 /// Returns -1 in case of error. In case of opt == kOob: -2 means
863 /// EWOULDBLOCK and -3 EINVAL. In case of non-blocking mode (kNoBlock)
864 /// -4 means EWOULDBLOCK. Returns -5 if pipe broken or reset by
865 /// peer (EPIPE || ECONNRESET).
866 
867 Int_t TUDPSocket::RecvRaw(void *buffer, Int_t length, ESendRecvOptions opt)
868 {
869  TSystem::ResetErrno();
870 
871  if (fSocket == -1) return -1;
872  if (length == 0) return 0;
873 
874  ResetBit(TUDPSocket::kBrokenConn);
875  Int_t n;
876  if ((n = gSystem->RecvRaw(fSocket, buffer, length, (int) opt)) <= 0) {
877  if (n == 0 || n == -5) {
878  // Connection closed, reset or broken
879  SetBit(TUDPSocket::kBrokenConn);
880  Close();
881  }
882  return n;
883  }
884 
885  fBytesRecv += n;
886  fgBytesRecv += n;
887 
888  Touch(); // update usage timestamp
889 
890  return n;
891 }
892 
893 ////////////////////////////////////////////////////////////////////////////////
894 /// Receive a message containing streamer infos. In case the message contains
895 /// streamer infos they are imported, the message will be deleted and the
896 /// method returns kTRUE.
897 
898 Bool_t TUDPSocket::RecvStreamerInfos(TMessage *mess)
899 {
900  if (mess->What() == kMESS_STREAMERINFO) {
901  TList *list = (TList*)mess->ReadObject(TList::Class());
902  TIter next(list);
903  TStreamerInfo *info;
904  TObjLink *lnk = list->FirstLink();
905  // First call BuildCheck for regular class
906  while (lnk) {
907  info = (TStreamerInfo*)lnk->GetObject();
908  TObject *element = info->GetElements()->UncheckedAt(0);
909  Bool_t isstl = element && strcmp("This",element->GetName())==0;
910  if (!isstl) {
911  info->BuildCheck();
912  if (gDebug > 0)
913  Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
914  info->GetName(), info->GetClassVersion());
915  }
916  lnk = lnk->Next();
917  }
918  // Then call BuildCheck for stl class
919  lnk = list->FirstLink();
920  while (lnk) {
921  info = (TStreamerInfo*)lnk->GetObject();
922  TObject *element = info->GetElements()->UncheckedAt(0);
923  Bool_t isstl = element && strcmp("This",element->GetName())==0;
924  if (isstl) {
925  info->BuildCheck();
926  if (gDebug > 0)
927  Info("RecvStreamerInfos", "importing TStreamerInfo: %s, version = %d",
928  info->GetName(), info->GetClassVersion());
929  }
930  lnk = lnk->Next();
931  }
932  delete list;
933  delete mess;
934 
935  return kTRUE;
936  }
937  return kFALSE;
938 }
939 
940 ////////////////////////////////////////////////////////////////////////////////
941 /// Receive a message containing process ids. In case the message contains
942 /// process ids they are imported, the message will be deleted and the
943 /// method returns kTRUE.
944 
945 Bool_t TUDPSocket::RecvProcessIDs(TMessage *mess)
946 {
947  if (mess->What() == kMESS_PROCESSID) {
948  TList *list = (TList*)mess->ReadObject(TList::Class());
949  TIter next(list);
950  TProcessID *pid;
951  while ((pid = (TProcessID*)next())) {
952  // check that a similar pid is not already registered in fgPIDs
953  TObjArray *pidslist = TProcessID::GetPIDs();
954  TIter nextpid(pidslist);
955  TProcessID *p;
956  while ((p = (TProcessID*)nextpid())) {
957  if (!strcmp(p->GetTitle(), pid->GetTitle())) {
958  delete pid;
959  pid = 0;
960  break;
961  }
962  }
963  if (pid) {
964  if (gDebug > 0)
965  Info("RecvProcessIDs", "importing TProcessID: %s", pid->GetTitle());
966  pid->IncrementCount();
967  pidslist->Add(pid);
968  Int_t ind = pidslist->IndexOf(pid);
969  pid->SetUniqueID((UInt_t)ind);
970  }
971  }
972  delete list;
973  delete mess;
974 
975  return kTRUE;
976  }
977  return kFALSE;
978 }
979 
980 ////////////////////////////////////////////////////////////////////////////////
981 /// Set socket options.
982 
983 Int_t TUDPSocket::SetOption(ESockOptions opt, Int_t val)
984 {
985  if (fSocket == -1) return -1;
986 
987  return gSystem->SetSockOpt(fSocket, opt, val);
988 }
989 
990 ////////////////////////////////////////////////////////////////////////////////
991 /// Get socket options. Returns -1 in case of error.
992 
993 Int_t TUDPSocket::GetOption(ESockOptions opt, Int_t &val)
994 {
995  if (fSocket == -1) return -1;
996 
997  return gSystem->GetSockOpt(fSocket, opt, &val);
998 }
999 
1000 ////////////////////////////////////////////////////////////////////////////////
1001 /// Returns error code. Meaning depends on context where it is called.
1002 /// If no error condition returns 0 else a value < 0.
1003 /// For example see TServerSocket ctor.
1004 
1005 Int_t TUDPSocket::GetErrorCode() const
1006 {
1007  if (!IsValid())
1008  return fSocket;
1009 
1010  return 0;
1011 }
1012 
1013 ////////////////////////////////////////////////////////////////////////////////
1014 /// See comments for function SetCompressionSettings
1015 
1016 void TUDPSocket::SetCompressionAlgorithm(Int_t algorithm)
1017 {
1018  if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1019  if (fCompress < 0) {
1020  // if the level is not defined yet use 4 as a default (with ZLIB was 1)
1021  fCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
1022  } else {
1023  int level = fCompress % 100;
1024  fCompress = 100 * algorithm + level;
1025  }
1026 }
1027 
1028 ////////////////////////////////////////////////////////////////////////////////
1029 /// See comments for function SetCompressionSettings
1030 
1031 void TUDPSocket::SetCompressionLevel(Int_t level)
1032 {
1033  if (level < 0) level = 0;
1034  if (level > 99) level = 99;
1035  if (fCompress < 0) {
1036  // if the algorithm is not defined yet use 0 as a default
1037  fCompress = level;
1038  } else {
1039  int algorithm = fCompress / 100;
1040  if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
1041  fCompress = 100 * algorithm + level;
1042  }
1043 }
1044 
1045 ////////////////////////////////////////////////////////////////////////////////
1046 /// Used to specify the compression level and algorithm:
1047 /// settings = 100 * algorithm + level
1048 ///
1049 /// level = 0, objects written to this file will not be compressed.
1050 /// level = 1, minimal compression level but fast.
1051 /// ....
1052 /// level = 9, maximal compression level but slower and might use more memory.
1053 /// (For the currently supported algorithms, the maximum level is 9)
1054 /// If compress is negative it indicates the compression level is not set yet.
1055 ///
1056 /// The enumeration ROOT::RCompressionSetting::EAlgorithm associates each
1057 /// algorithm with a number. There is a utility function to help
1058 /// to set the value of the argument. For example,
1059 /// ROOT::CompressionSettings(ROOT::kLZMA, 1)
1060 /// will build an integer which will set the compression to use
1061 /// the LZMA algorithm and compression level 1. These are defined
1062 /// in the header file Compression.h.
1063 ///
1064 /// Note that the compression settings may be changed at any time.
1065 /// The new compression settings will only apply to branches created
1066 /// or attached after the setting is changed and other objects written
1067 /// after the setting is changed.
1068 
1069 void TUDPSocket::SetCompressionSettings(Int_t settings)
1070 {
1071  fCompress = settings;
1072 }
1073 
1074 ////////////////////////////////////////////////////////////////////////////////
1075 /// Print error string depending on error code.
1076 
1077 void TUDPSocket::NetError(const char *where, Int_t err)
1078 {
1079  // Make sure it is in range
1080  err = (err < kErrError) ? ((err > -1) ? err : 0) : kErrError;
1081 
1082  if (gDebug > 0)
1083  ::Error(where, "%s", gRootdErrStr[err]);
1084 }
1085 
1086 ////////////////////////////////////////////////////////////////////////////////
1087 /// Get total number of bytes sent via all sockets.
1088 
1089 ULong64_t TUDPSocket::GetSocketBytesSent()
1090 {
1091  return fgBytesSent;
1092 }
1093 
1094 ////////////////////////////////////////////////////////////////////////////////
1095 /// Get total number of bytes received via all sockets.
1096 
1097 ULong64_t TUDPSocket::GetSocketBytesRecv()
1098 {
1099  return fgBytesRecv;
1100 }