Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TXSocket.h
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: G. Ganis Oct 2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 #ifndef ROOT_TXSocket
13 #define ROOT_TXSocket
14 
15 //////////////////////////////////////////////////////////////////////////
16 // //
17 // TXSocket //
18 // //
19 // High level handler of connections to xproofd. //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #define DFLT_CONNECTMAXTRY 10
24 
25 #include "TSemaphore.h"
26 #include "TString.h"
27 #include "TList.h"
28 #include "TMessage.h"
29 #include "TUrl.h"
30 #include "TSocket.h"
31 #ifndef __XPTYPES_H
32 #include "XProtocol/XPtypes.hh"
33 #endif
35 
36 #include <list>
37 #include <mutex>
38 
39 class TObjString;
40 class TXSockBuf;
41 class TXSockPipe;
42 class TXHandler;
43 class TXSocketHandler;
44 class XrdClientMessage;
45 class XrdProofConn;
46 
47 // To transmit info to Handlers
48 typedef struct {
49  Int_t fInt1;
50  Int_t fInt2;
51  Int_t fInt3;
52  Int_t fInt4;
53 } XHandleIn_t;
54 typedef struct {
55  Int_t fOpt;
56  const char *fMsg;
57 } XHandleErr_t;
58 
59 class TXSocket : public TSocket, public XrdClientAbsUnsolMsgHandler {
60 
61 friend class TXProofMgr;
62 friend class TXProofServ;
63 friend class TXSlave;
64 friend class TXSocketHandler;
65 friend class TXSockPipe;
66 friend class TXUnixSocket;
67 
68 private:
69  char fMode; // 'e' (def) or 'i' (internal - proofsrv)
70  kXR_int32 fSendOpt; // Options for sending messages
71  Short_t fSessionID; // proofsrv: remote ID of connected session
72  TString fUser; // Username used for login
73  TString fHost; // Remote host
74  Int_t fPort; // Remote port
75 
76  Int_t fLogLevel; // Log level to be transmitted to servers
77 
78  TString fBuffer; // Container for exchanging information
79  TObject *fReference; // Generic object reference of this socket
80  TXHandler *fHandler; // Handler of asynchronous events (input, error)
81 
82  XrdProofConn *fConn; // instance of the underlying connection module
83 
84  // Asynchronous messages
85  TSemaphore fASem; // Control access to conn async msg queue
86  std::recursive_mutex fAMtx; // To protect async msg queue
87  Bool_t fAWait; // kTRUE if waiting at the async msg queue
88  std::list<TXSockBuf *> fAQue; // list of asynchronous messages
89  Int_t fByteLeft; // bytes left in the first buffer
90  Int_t fByteCur; // current position in the first buffer
91  TXSockBuf *fBufCur; // current read buffer
92 
93  TSemaphore fAsynProc; // Control actions while processing async messages
94 
95  // Interrupts
96  std::recursive_mutex fIMtx; // To protect interrupt queue
97  kXR_int32 fILev; // Highest received interrupt
98  Bool_t fIForward; // Whether the interrupt should be propagated
99 
100  // Process ID of the instatiating process (to signal interrupts)
101  Int_t fPid;
102 
103  // Whether to timeout or not
104  Bool_t fDontTimeout; // If true wait forever for incoming messages
105  Bool_t fRDInterrupt; // To interrupt waiting for messages
106 
107  // Version of the remote XrdProofdProtocol
108  Int_t fXrdProofdVersion;
109 
110  // Static area for input handling
111  static TXSockPipe fgPipe; // Pipe for input monitoring
112  static TString fgLoc; // Location string
113  static Bool_t fgInitDone; // Avoid initializing more than once
114 
115  // List of spare buffers
116  static std::mutex fgSMtx; // To protect spare list
117  static std::list<TXSockBuf *> fgSQue; // list of spare buffers
118 
119  // Manage asynchronous message
120  Int_t PickUpReady();
121  TXSockBuf *PopUpSpare(Int_t sz);
122  void PushBackSpare();
123 
124  // Post a message into the queue for asynchronous processing
125  void PostMsg(Int_t type, const char *msg = 0);
126 
127  // Wake up all threads waiting for at the semaphore (used by TXSlave)
128  void PostSemAll();
129 
130  // Auxilliary
131  Int_t GetLowSocket() const;
132 
133  static void SetLocation(const char *loc = ""); // Set location string
134 
135  static void InitEnvs(); // Initialize environment variables
136 
137 public:
138  // Should be the same as in proofd/src/XrdProofdProtocol::Urgent
139  enum EUrgentMsgType { kStopProcess = 2000 };
140 
141  TXSocket(const char *url, Char_t mode = 'M', Int_t psid = -1, Char_t ver = -1,
142  const char *logbuf = 0, Int_t loglevel = -1, TXHandler *handler = 0);
143  virtual ~TXSocket();
144 
145  virtual void Close(Option_t *opt = "");
146  Bool_t Create(Bool_t attach = kFALSE);
147  void DisconnectSession(Int_t id, Option_t *opt = "");
148 
149  void DoError(int level,
150  const char *location, const char *fmt, va_list va) const;
151 
152  virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s,
153  XrdClientMessage *msg);
154 
155  virtual Int_t GetClientID() const { return -1; }
156  virtual Int_t GetClientIDSize() const { return 1; }
157  Int_t GetLogConnID() const;
158  Int_t GetOpenError() const;
159  Int_t GetServType() const;
160  Int_t GetSessionID() const;
161  Int_t GetXrdProofdVersion() const { return fXrdProofdVersion; }
162 
163  Bool_t IsValid() const;
164  Bool_t IsServProofd();
165  virtual void RemoveClientID() { }
166  virtual void SetClientID(Int_t) { }
167  void SetSendOpt(ESendRecvOptions o) { fSendOpt = o; }
168  void SetSessionID(Int_t id);
169 
170  // Send interfaces
171  Int_t Send(const TMessage &mess);
172  Int_t Send(Int_t kind) { return TSocket::Send(kind); }
173  Int_t Send(Int_t status, Int_t kind)
174  { return TSocket::Send(status, kind); }
175  Int_t Send(const char *mess, Int_t kind = kMESS_STRING)
176  { return TSocket::Send(mess, kind); }
177  Int_t SendRaw(const void *buf, Int_t len,
178  ESendRecvOptions opt = kDontBlock);
179 
180  TObjString *SendCoordinator(Int_t kind, const char *msg = 0, Int_t int2 = 0,
181  Long64_t l64 = 0, Int_t int3 = 0, const char *opt = 0);
182 
183  // Recv interfaces
184  Int_t Recv(TMessage *&mess);
185  Int_t Recv(Int_t &status, Int_t &kind)
186  { return TSocket::Recv(status, kind); }
187  Int_t Recv(char *mess, Int_t max)
188  { return TSocket::Recv(mess, max); }
189  Int_t Recv(char *mess, Int_t max, Int_t &kind)
190  { return TSocket::Recv(mess, max, kind); }
191  Int_t RecvRaw(void *buf, Int_t len,
192  ESendRecvOptions opt = kDefault);
193 
194  // Interrupts
195  Int_t SendInterrupt(Int_t type);
196  Int_t GetInterrupt(Bool_t &forward);
197 
198  // Urgent message
199  void SendUrgent(Int_t type, Int_t int1, Int_t int2);
200 
201  // Interrupt the low level socket
202  void SetInterrupt(Bool_t i = kTRUE);
203  inline Bool_t IsInterrupt() { std::lock_guard<std::recursive_mutex> lock(fAMtx);
204  return fRDInterrupt; }
205 
206  // Set / Check async msg queue waiting status
207  inline void SetAWait(Bool_t w = kTRUE) {
208  std::lock_guard<std::recursive_mutex> lock(fAMtx);
209  fAWait = w; }
210  inline Bool_t IsAWait() { std::lock_guard<std::recursive_mutex> lock(fAMtx);
211  return fAWait; }
212  // Flush the asynchronous queue
213  Int_t Flush();
214 
215  // Ping the counterpart
216  Bool_t Ping(const char *ord = 0);
217 
218  // Request remote touch of the admin file associated with this connection
219  void RemoteTouch();
220  // Propagate a Ctrl-C
221  void CtrlC();
222 
223  // Standard options cannot be set
224  Int_t SetOption(ESockOptions, Int_t) { return 0; }
225 
226  // Disable / Enable read timeout
227  void DisableTimeout() { fDontTimeout = kTRUE; }
228  void EnableTimeout() { fDontTimeout = kFALSE; }
229 
230  // Try reconnection after error
231  virtual Int_t Reconnect();
232 
233  ClassDef(TXSocket, 0) //A high level connection class for PROOF
234 };
235 
236 
237 //
238 // The following structure is used to store buffers received asynchronously
239 //
240 class TXSockBuf {
241 public:
242  Int_t fSiz;
243  Int_t fLen;
244  Char_t *fBuf;
245  Bool_t fOwn;
246  Int_t fCid;
247 
248  TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1);
249  ~TXSockBuf();
250 
251  void Resize(Int_t sz);
252 
253  static Long64_t BuffMem();
254  static Long64_t GetMemMax();
255  static void SetMemMax(Long64_t memmax);
256 
257 private:
258  Char_t *fMem;
259  static Long64_t fgBuffMem; // Total allocated memory
260  static Long64_t fgMemMax; // Max allocated memory allowed
261 };
262 
263 //
264 // The following class describes internal pipes
265 //
266 class TXSockPipe {
267 public:
268 
269  TXSockPipe(const char *loc = "");
270  virtual ~TXSockPipe();
271 
272  Bool_t IsValid() const { return ((fPipe[0] >= 0 && fPipe[1] >= 0) ? kTRUE : kFALSE); }
273 
274  TXSocket *GetLastReady();
275 
276  Int_t GetRead() const { return fPipe[0]; }
277  Int_t Post(TSocket *s); // Notify socket ready via global pipe
278  Int_t Clean(TSocket *s); // Clean previous pipe notification
279  Int_t Flush(TSocket *s); // Remove any instance of 's' from the pipe
280  void DumpReadySock();
281 
282  void SetLoc(const char *loc = "") { fLoc = loc; }
283 
284 private:
285  std::recursive_mutex fMutex; // Protect access to the sockets-ready list
286  Int_t fPipe[2]; // Pipe for input monitoring
287  TString fLoc; // Location string
288  TList fReadySock; // List of sockets ready to be read
289 };
290 
291 //
292 // Guard for a semaphore
293 //
294 class TXSemaphoreGuard {
295 public:
296 
297  TXSemaphoreGuard(TSemaphore *sem) : fSem(sem), fValid(kTRUE) { if (!fSem || fSem->TryWait()) fValid = kFALSE; }
298  virtual ~TXSemaphoreGuard() { if (fValid && fSem) fSem->Post(); }
299 
300  Bool_t IsValid() const { return fValid; }
301 
302 private:
303  TSemaphore *fSem;
304  Bool_t fValid;
305 };
306 
307 #endif