Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
XrdProofdProtocol.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // XrdProofdProtocol //
15 // //
16 // Authors: G. Ganis, CERN, 2005 //
17 // //
18 // XrdProtocol implementation to coordinate 'proofserv' applications. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "XrdProofdPlatform.h"
23 
24 #include "XpdSysError.h"
25 #include "XpdSysLogger.h"
26 
27 #include "XrdSys/XrdSysPriv.hh"
28 #include "XrdOuc/XrdOucStream.hh"
29 
30 #include "XrdVersion.hh"
31 #include "Xrd/XrdBuffer.hh"
32 #include "Xrd/XrdScheduler.hh"
33 
34 #include "XrdProofdClient.h"
35 #include "XrdProofdClientMgr.h"
36 #include "XrdProofdConfig.h"
37 #include "XrdProofdManager.h"
38 #include "XrdProofdNetMgr.h"
39 #include "XrdProofdPriorityMgr.h"
40 #include "XrdProofdProofServMgr.h"
41 #include "XrdProofdProtocol.h"
42 #include "XrdProofdResponse.h"
43 #include "XrdProofdProofServ.h"
44 #include "XrdProofSched.h"
45 #include "XrdROOT.h"
46 #include "rpdconn.h"
47 
48 // Tracing utils
49 #include "XrdProofdTrace.h"
50 XrdOucTrace *XrdProofdTrace = 0;
51 
52 // Loggers: we need two to avoid deadlocks
53 static XrdSysLogger gMainLogger;
54 
55 //
56 // Static area: general protocol managing section
57 int XrdProofdProtocol::fgCount = 0;
58 XpdObjectQ XrdProofdProtocol::fgProtStack("ProtStack",
59  "xproofd protocol anchor");
60 XrdSysRecMutex XrdProofdProtocol::fgBMutex; // Buffer management mutex
61 XrdBuffManager *XrdProofdProtocol::fgBPool = 0;
62 int XrdProofdProtocol::fgMaxBuffsz= 0;
63 XrdSysError XrdProofdProtocol::fgEDest(0, "xpd");
64 XrdSysLogger *XrdProofdProtocol::fgLogger = 0;
65 //
66 // Static area: protocol configuration section
67 bool XrdProofdProtocol::fgConfigDone = 0;
68 //
69 int XrdProofdProtocol::fgReadWait = 0;
70 // Cluster manager
71 XrdProofdManager *XrdProofdProtocol::fgMgr = 0;
72 
73 // Effective uid
74 int XrdProofdProtocol::fgEUidAtStartup = -1;
75 
76 // Local definitions
77 #define MAX_ARGS 128
78 
79 // Macros used to set conditional options
80 #ifndef XPDCOND
81 #define XPDCOND(n,ns) ((n == -1 && ns == -1) || (n > 0 && n >= ns))
82 #endif
83 #ifndef XPDSETSTRING
84 #define XPDSETSTRING(n,ns,c,s) \
85  { if (XPDCOND(n,ns)) { \
86  SafeFree(c); c = strdup(s.c_str()); ns = n; }}
87 #endif
88 
89 #ifndef XPDADOPTSTRING
90 #define XPDADOPTSTRING(n,ns,c,s) \
91  { char *t = 0; \
92  XPDSETSTRING(n, ns, t, s); \
93  if (t && strlen(t)) { \
94  SafeFree(c); c = t; \
95  } else \
96  SafeFree(t); }
97 #endif
98 
99 #ifndef XPDSETINT
100 #define XPDSETINT(n,ns,i,s) \
101  { if (XPDCOND(n,ns)) { \
102  i = strtol(s.c_str(),0,10); ns = n; }}
103 #endif
104 
105 typedef struct {
106  kXR_int32 ptyp; // must be always 0 !
107  kXR_int32 rlen;
108  kXR_int32 pval;
109  kXR_int32 styp;
110 } hs_response_t;
111 
112 typedef struct ResetCtrlcGuard {
113  XrdProofdProtocol *xpd;
114  int type;
115  ResetCtrlcGuard(XrdProofdProtocol *p, int t) : xpd(p), type(t) { }
116  ~ResetCtrlcGuard() { if (xpd && type != kXP_ctrlc) xpd->ResetCtrlC(); }
117 } ResetCtrlcGuard_t;
118 
119 //
120 // Derivation of XrdProofdConfig to read the port from the config file
121 class XrdProofdProtCfg : public XrdProofdConfig {
122 public:
123  int fPort; // The port on which we listen
124  XrdProofdProtCfg(const char *cfg, XrdSysError *edest = 0);
125  int DoDirective(XrdProofdDirective *, char *, XrdOucStream *, bool);
126  void RegisterDirectives();
127 };
128 
129 ////////////////////////////////////////////////////////////////////////////////
130 /// Constructor
131 
132 XrdProofdProtCfg::XrdProofdProtCfg(const char *cfg, XrdSysError *edest)
133  : XrdProofdConfig(cfg, edest)
134 {
135  fPort = -1;
136  RegisterDirectives();
137 }
138 
139 ////////////////////////////////////////////////////////////////////////////////
140 /// Register directives for configuration
141 
142 void XrdProofdProtCfg::RegisterDirectives()
143 {
144  Register("port", new XrdProofdDirective("port", this, &DoDirectiveClass));
145  Register("xrd.protocol", new XrdProofdDirective("xrd.protocol", this, &DoDirectiveClass));
146 }
147 
148 ////////////////////////////////////////////////////////////////////////////////
149 /// Parse directives
150 
151 int XrdProofdProtCfg::DoDirective(XrdProofdDirective *d,
152  char *val, XrdOucStream *cfg, bool)
153 {
154  if (!d) return -1;
155 
156  XrdOucString port(val);
157  if (d->fName == "xrd.protocol") {
158  port = cfg->GetWord();
159  port.replace("xproofd:", "");
160  } else if (d->fName != "port") {
161  return -1;
162  }
163  if (port.length() > 0) {
164  fPort = strtol(port.c_str(), 0, 10);
165  }
166  fPort = (fPort < 0) ? XPD_DEF_PORT : fPort;
167  return 0;
168 }
169 
170 #if (ROOTXRDVERS >= 300030000)
171 XrdVERSIONINFO(XrdgetProtocol,xproofd);
172 XrdVERSIONINFO(XrdgetProtocolPort,xproofd);
173 #endif
174 
175 extern "C" {
176 ////////////////////////////////////////////////////////////////////////////////
177 /// This protocol is meant to live in a shared library. The interface below is
178 /// used by the server to obtain a copy of the protocol object that can be used
179 /// to decide whether or not a link is talking a particular protocol.
180 
181 XrdProtocol *XrdgetProtocol(const char *, char *parms, XrdProtocol_Config *pi)
182 {
183  // Return the protocol object to be used if static init succeeds
184  if (XrdProofdProtocol::Configure(parms, pi)) {
185 
186  return (XrdProtocol *) new XrdProofdProtocol(pi);
187  }
188  return (XrdProtocol *)0;
189 }
190 
191 ////////////////////////////////////////////////////////////////////////////////
192 /// This function is called early on to determine the port we need to use. The
193 /// The default is ostensibly 1093 but can be overidden; which we allow.
194 
195 int XrdgetProtocolPort(const char * /*pname*/, char * /*parms*/, XrdProtocol_Config *pi)
196 {
197  // Default XPD_DEF_PORT (1093)
198  int port = XPD_DEF_PORT;
199 
200  if (pi) {
201  XrdProofdProtCfg pcfg(pi->ConfigFN, pi->eDest);
202  // Init some relevant quantities for tracing
203  XrdProofdTrace = new XrdOucTrace(pi->eDest);
204  pcfg.Config(0);
205 
206  if (pcfg.fPort > 0) {
207  port = pcfg.fPort;
208  } else {
209  port = (pi && pi->Port > 0) ? pi->Port : XPD_DEF_PORT;
210  }
211  }
212  return port;
213 }}
214 
215 ////////////////////////////////////////////////////////////////////////////////
216 /// Protocol constructor
217 
218 XrdProofdProtocol::XrdProofdProtocol(XrdProtocol_Config *pi)
219  : XrdProtocol("xproofd protocol handler"), fProtLink(this)
220 {
221  fLink = 0;
222  fArgp = 0;
223  fPClient = 0;
224  fSecClient = 0;
225  fAuthProt = 0;
226  fResponses.reserve(10);
227 
228  fStdErrFD = (pi && pi->eDest) ? pi->eDest->baseFD() : fileno(stderr);
229 
230  // Instantiate a Proofd protocol object
231  Reset();
232 }
233 
234 ////////////////////////////////////////////////////////////////////////////////
235 /// Get response instance corresponding to stream ID 'sid'
236 
237 XrdProofdResponse *XrdProofdProtocol::Response(kXR_unt16 sid)
238 {
239  XPDLOC(ALL, "Protocol::Response")
240 
241  TRACE(HDBG, "sid: "<<sid<<", size: "<<fResponses.size());
242 
243  if (sid > 0)
244  if (sid <= fResponses.size())
245  return fResponses[sid-1];
246 
247  return (XrdProofdResponse *)0;
248 }
249 
250 ////////////////////////////////////////////////////////////////////////////////
251 /// Create new response instance for stream ID 'sid'
252 
253 XrdProofdResponse *XrdProofdProtocol::GetNewResponse(kXR_unt16 sid)
254 {
255  XPDLOC(ALL, "Protocol::GetNewResponse")
256 
257  XrdOucString msg;
258  XPDFORM(msg, "sid: %d", sid);
259  if (sid > 0) {
260  if (sid > fResponses.size()) {
261  if (sid > fResponses.capacity()) {
262  int newsz = (sid < 2 * fResponses.capacity()) ? 2 * fResponses.capacity() : sid+1 ;
263  fResponses.reserve(newsz);
264  if (TRACING(DBG)) {
265  msg += " new capacity: ";
266  msg += (int) fResponses.capacity();
267  }
268  }
269  int nnew = sid - fResponses.size();
270  while (nnew--)
271  fResponses.push_back(new XrdProofdResponse());
272  if (TRACING(DBG)) {
273  msg += "; new size: ";
274  msg += (int) fResponses.size();
275  }
276  }
277  } else {
278  TRACE(XERR,"wrong sid: "<<sid);
279  return (XrdProofdResponse *)0;
280  }
281 
282  TRACE(DBG, msg);
283 
284  // Done
285  return fResponses[sid-1];
286 }
287 
288 ////////////////////////////////////////////////////////////////////////////////
289 /// Check whether the request matches this protocol
290 
291 XrdProtocol *XrdProofdProtocol::Match(XrdLink *lp)
292 {
293  XPDLOC(ALL, "Protocol::Match")
294 
295  struct ClientInitHandShake hsdata;
296  char *hsbuff = (char *)&hsdata;
297 
298  static hs_response_t hsresp = {0, 0, kXR_int32(htonl(XPROOFD_VERSBIN)), 0};
299 
300  XrdProtocol *xp = nullptr;
301  int dlen;
302  TRACE(HDBG, "enter");
303 
304  XrdOucString emsg;
305  // Peek at the first 20 bytes of data
306  if ((dlen = lp->Peek(hsbuff,sizeof(hsdata),fgReadWait)) != sizeof(hsdata)) {
307  if (dlen <= 0) lp->setEtext("Match: handshake not received");
308  if (dlen == 12) {
309  // Check if it is a request to open a file via 'rootd', unsupported
310  hsdata.first = ntohl(hsdata.first);
311  if (hsdata.first == 8) {
312  emsg = "rootd-file serving not supported any-longer";
313  }
314  if (emsg.length() > 0) {
315  lp->setEtext(emsg.c_str());
316  } else {
317  lp->setEtext("link transfered");
318  }
319  return xp;
320  }
321  TRACE(XERR, "peeked incomplete or empty information! (dlen: "<<dlen<<" bytes)");
322  return xp;
323  }
324 
325  // If this is is not our protocol, we check if it a data serving request via xrootd
326  hsdata.third = ntohl(hsdata.third);
327  if (dlen != sizeof(hsdata) || hsdata.first || hsdata.second
328  || !(hsdata.third == 1) || hsdata.fourth || hsdata.fifth) {
329 
330  // Check if it is a request to open a file via 'xrootd'
331  if (fgMgr->Xrootd() && (xp = fgMgr->Xrootd()->Match(lp))) {
332  TRACE(ALL, "matched xrootd protocol on link: serving a file");
333  } else {
334  TRACE(XERR, "failed to match any known or enabled protocol");
335  }
336  return xp;
337  }
338 
339  // Respond to this request with the handshake response
340  if (!lp->Send((char *)&hsresp, sizeof(hsresp))) {
341  lp->setEtext("Match: handshake failed");
342  TRACE(XERR, "handshake failed");
343  return xp;
344  }
345 
346  // We can now read all 20 bytes and discard them (no need to wait for it)
347  int len = sizeof(hsdata);
348  if (lp->Recv(hsbuff, len) != len) {
349  lp->setEtext("Match: reread failed");
350  TRACE(XERR, "reread failed");
351  return xp;
352  }
353 
354  // Get a protocol object off the stack (if none, allocate a new one)
355  XrdProofdProtocol *xpp = nullptr;
356  if (!(xpp = fgProtStack.Pop()))
357  xpp = new XrdProofdProtocol();
358 
359  // Bind the protocol to the link and return the protocol
360  xpp->fLink = lp;
361  snprintf(xpp->fSecEntity.prot, XrdSecPROTOIDSIZE, "host");
362  xpp->fSecEntity.host = strdup((char *)lp->Host());
363 
364  // Dummy data used by 'proofd'
365  kXR_int32 dum[2];
366  if (xpp->GetData("dummy",(char *)&dum[0],sizeof(dum)) != 0) {
367  xpp->Recycle(0,0,0);
368  }
369 
370  xp = (XrdProtocol *) xpp;
371 
372  // We are done
373  return xp;
374 }
375 
376 ////////////////////////////////////////////////////////////////////////////////
377 /// Return statistics info about the protocol.
378 /// Not really implemented yet: this is a reduced XrdXrootd version.
379 
380 int XrdProofdProtocol::Stats(char *buff, int blen, int)
381 {
382  static char statfmt[] = "<stats id=\"xproofd\"><num>%ld</num></stats>";
383 
384  // If caller wants only size, give it to them
385  if (!buff)
386  return sizeof(statfmt)+16;
387 
388  // We have only one statistic -- number of successful matches
389  return snprintf(buff, blen, statfmt, fgCount);
390 }
391 
392 ////////////////////////////////////////////////////////////////////////////////
393 /// Reset static and local vars
394 
395 void XrdProofdProtocol::Reset()
396 {
397  // Init local vars
398  fLink = 0;
399  fPid = -1;
400  fArgp = 0;
401  fStatus = 0;
402  fClntCapVer = 0;
403  fConnType = kXPD_ClientMaster;
404  fSuperUser = 0;
405  fPClient = 0;
406  fUserIn = "";
407  fGroupIn = "";
408  fCID = -1;
409  fTraceID = "";
410  fAdminPath = "";
411  if (fAuthProt) {
412  fAuthProt->Delete();
413  fAuthProt = 0;
414  }
415  fSecEntity = XrdSecEntity();
416  // Cleanup existing XrdProofdResponse objects
417  std::vector<XrdProofdResponse *>::iterator ii = fResponses.begin(); // One per each logical connection
418  while (ii != fResponses.end()) {
419  (*ii)->Reset();
420  ++ii;
421  }
422 }
423 
424 ////////////////////////////////////////////////////////////////////////////////
425 /// Protocol configuration tool
426 /// Function: Establish configuration at load time.
427 /// Output: 1 upon success or 0 otherwise.
428 
429 int XrdProofdProtocol::Configure(char *parms, XrdProtocol_Config *pi)
430 {
431  XPDLOC(ALL, "Protocol::Configure")
432 
433  XrdOucString mp;
434 
435  // Only once
436  if (fgConfigDone)
437  return 1;
438  fgConfigDone = 1;
439 
440  // Copy out the special info we want to use at top level
441  fgLogger = pi->eDest->logger();
442  fgEDest.logger(fgLogger);
443  if (XrdProofdTrace) delete XrdProofdTrace; // It could have been initialized in XrdgetProtocolPort
444  XrdProofdTrace = new XrdOucTrace(&fgEDest);
445  fgBPool = pi->BPool;
446  fgReadWait = pi->readWait;
447 
448  // Pre-initialize some i/o values
449  fgMaxBuffsz = fgBPool->MaxSize();
450 
451  // Schedule protocol object cleanup; the maximum number of objects
452  // and the max age are taken from XrdXrootdProtocol: this may need
453  // some optimization in the future.
454 #if 1
455  fgProtStack.Set(pi->Sched, XrdProofdTrace, TRACE_MEM);
456  fgProtStack.Set((pi->ConnMax/3 ? pi->ConnMax/3 : 30), 60*60);
457 #else
458  fgProtStack.Set(pi->Sched, 3600);
459 #endif
460 
461  // Default tracing options: always trace logins and errors for all
462  // domains; if the '-d' option was specified on the command line then
463  // trace also REQ and FORM.
464  // NB: these are superseeded by settings in the config file (xpd.trace)
465  XrdProofdTrace->What = TRACE_DOMAINS;
466  TRACESET(XERR, 1);
467  TRACESET(LOGIN, 1);
468  TRACESET(RSP, 0);
469  if (pi->DebugON)
470  XrdProofdTrace->What |= (TRACE_REQ | TRACE_FORK);
471 
472  // Work as root to avoid contineous changes of the effective user
473  // (users are logged in their box after forking)
474  fgEUidAtStartup = geteuid();
475  if (!getuid()) XrdSysPriv::ChangePerm((uid_t)0, (gid_t)0);
476 
477  // Process the config file for directives meaningful to us
478  // Create and Configure the manager
479  fgMgr = new XrdProofdManager(parms, pi, &fgEDest);
480  if (fgMgr->Config(0)) return 0;
481  mp = "global manager created";
482  TRACE(ALL, mp);
483 
484  // Issue herald indicating we configured successfully
485  TRACE(ALL, "xproofd protocol version "<<XPROOFD_VERSION<<
486  " build "<<XrdVERSION<<" successfully loaded");
487 
488  // Return success
489  return 1;
490 }
491 
492 ////////////////////////////////////////////////////////////////////////////////
493 /// Process the information received on the active link.
494 /// (We ignore the argument here)
495 
496 int XrdProofdProtocol::Process(XrdLink *)
497 {
498  XPDLOC(ALL, "Protocol::Process")
499 
500  int rc = 0;
501  TRACET(TraceID(), DBG, "instance: " << this);
502 
503  // Read the next request header
504  if ((rc = GetData("request", (char *)&fRequest, sizeof(fRequest))) != 0)
505  return rc;
506  TRACET(TraceID(), HDBG, "after GetData: rc: " << rc);
507 
508  // Deserialize the data
509  fRequest.header.requestid = ntohs(fRequest.header.requestid);
510  fRequest.header.dlen = ntohl(fRequest.header.dlen);
511 
512  // Get response object
513  kXR_unt16 sid;
514  memcpy((void *)&sid, (const void *)&(fRequest.header.streamid[0]), 2);
515  XrdProofdResponse *response = 0;
516  if (!(response = Response(sid))) {
517  if (!(response = GetNewResponse(sid))) {
518  TRACET(TraceID(), XERR, "could not get Response instance for rid: "<< sid);
519  return rc;
520  }
521  }
522  // Set the stream ID for the reply
523  response->Set(fRequest.header.streamid);
524  response->Set(fLink);
525 
526  TRACET(TraceID(), REQ, "sid: " << sid << ", req id: " << fRequest.header.requestid <<
527  " (" << XrdProofdAux::ProofRequestTypes(fRequest.header.requestid)<<
528  ")" << ", dlen: " <<fRequest.header.dlen);
529 
530  // Every request has an associated data length. It better be >= 0 or we won't
531  // be able to know how much data to read.
532  if (fRequest.header.dlen < 0) {
533  response->Send(kXR_ArgInvalid, "Process: Invalid request data length");
534  return fLink->setEtext("Process: protocol data length error");
535  }
536 
537  // Read any argument data at this point, except when the request is to forward
538  // a buffer: the argument may have to be segmented and we're not prepared to do
539  // that here.
540  if (fRequest.header.requestid != kXP_sendmsg && fRequest.header.dlen) {
541  if ((fArgp = GetBuff(fRequest.header.dlen+1, fArgp)) == 0) {
542  response->Send(kXR_ArgTooLong, "fRequest.argument is too long");
543  return rc;
544  }
545  if ((rc = GetData("arg", fArgp->buff, fRequest.header.dlen)))
546  return rc;
547  fArgp->buff[fRequest.header.dlen] = '\0';
548  }
549 
550  // Continue with request processing at the resume point
551  return Process2();
552 }
553 
554 ////////////////////////////////////////////////////////////////////////////////
555 /// Local processing method: here the request is dispatched to the appropriate
556 /// method
557 
558 int XrdProofdProtocol::Process2()
559 {
560  XPDLOC(ALL, "Protocol::Process2")
561 
562  int rc = 0;
563  XPD_SETRESP(this, "Process2");
564 
565  TRACET(TraceID(), REQ, "req id: " << fRequest.header.requestid << " (" <<
566  XrdProofdAux::ProofRequestTypes(fRequest.header.requestid) << ")");
567 
568  ResetCtrlcGuard_t ctrlcguard(this, fRequest.header.requestid);
569 
570  // If the user is logged in check if the wanted action is to be done by us
571  if (fStatus && (fStatus & XPD_LOGGEDIN)) {
572  // Record time of the last action
573  TouchAdminPath();
574  // We must have a client instance if here
575  if (!fPClient) {
576  TRACET(TraceID(), XERR, "client undefined!!! ");
577  response->Send(kXR_InvalidRequest,"client undefined!!! ");
578  return 0;
579  }
580  bool formgr = 0;
581  switch(fRequest.header.requestid) {
582  case kXP_ctrlc:
583  rc = CtrlC();
584  break;
585  case kXP_touch:
586  // Reset the asked-to-touch flag, if it was never set
587  fPClient->Touch(1);
588  break;
589  case kXP_interrupt:
590  rc = Interrupt();
591  break;
592  case kXP_ping:
593  rc = Ping();
594  break;
595  case kXP_sendmsg:
596  rc = SendMsg();
597  break;
598  case kXP_urgent:
599  rc = Urgent();
600  break;
601  default:
602  formgr = 1;
603  }
604  if (!formgr) {
605  // Check the link
606  if (!fLink || (fLink->FDnum() <= 0)) {
607  TRACE(XERR, "link is undefined! ");
608  return -1;
609  }
610  return rc;
611  }
612  }
613 
614  // The request is for the manager
615  rc = fgMgr->Process(this);
616  // Check the link
617  if (!fLink || (fLink->FDnum() <= 0)) {
618  TRACE(XERR, "link is undefined! ");
619  return -1;
620  }
621  return rc;
622 }
623 
624 ////////////////////////////////////////////////////////////////////////////////
625 /// Recycle call. Release the instance and give it back to the stack.
626 
627 void XrdProofdProtocol::Recycle(XrdLink *, int, const char *)
628 {
629  XPDLOC(ALL, "Protocol::Recycle")
630 
631  const char *srvtype[6] = {"ANY", "MasterWorker", "MasterMaster",
632  "ClientMaster", "Internal", "Admin"};
633  XrdOucString buf;
634 
635  // Document the disconnect
636  if (fPClient)
637  XPDFORM(buf, "user %s disconnected; type: %s", fPClient->User(),
638  srvtype[fConnType+1]);
639  else
640  XPDFORM(buf, "user disconnected; type: %s", srvtype[fConnType+1]);
641  TRACET(TraceID(), LOGIN, buf);
642 
643  // If we have a buffer, release it
644  if (fArgp) {
645  fgBPool->Release(fArgp);
646  fArgp = 0;
647  }
648 
649  // Locate the client instance
650  XrdProofdClient *pmgr = fPClient;
651 
652  if (pmgr) {
653  if (!Internal()) {
654 
655  TRACE(REQ,"External disconnection of protocol associated with pid "<<fPid);
656 
657  // Write disconnection file
658  XrdOucString discpath(fAdminPath);
659  discpath.replace("/cid", "/disconnected");
660  FILE *fd = fopen(discpath.c_str(), "w");
661  if (!fd && errno != ENOENT) {
662  TRACE(XERR, "unable to create path: " <<discpath<<" (errno: "<<errno<<")");
663  } else if (fd) {
664  fclose(fd);
665  }
666 
667  // Remove protocol and response from attached client/proofserv instances
668  // Set reconnect flag if proofserv instances attached to this client are still running
669  pmgr->ResetClientSlot(fCID);
670  if(fgMgr && fgMgr->SessionMgr()) {
671  XrdSysMutexHelper mhp(fgMgr->SessionMgr()->Mutex());
672 
673  fgMgr->SessionMgr()->DisconnectFromProofServ(fPid);
674  if((fConnType == 0) && fgMgr->SessionMgr()->Alive(this)) {
675  TRACE(REQ, "Non-destroyed proofserv processes attached to this protocol ("<<this<<
676  "), setting reconnect time");
677  fgMgr->SessionMgr()->SetReconnectTime(true);
678  }
679  fgMgr->SessionMgr()->CheckActiveSessions(0);
680  } else {
681  TRACE(XERR, "No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("
682  <<(fgMgr ? fgMgr->SessionMgr() : (void *) -1)<<")")
683  }
684 
685  } else {
686 
687  // Internal connection: we need to remove this instance from the list
688  // of proxy servers and to notify the attached clients.
689  // Tell the session manager that this session has gone
690  if (fgMgr && fgMgr->SessionMgr()) {
691  XrdSysMutexHelper mhp(fgMgr->SessionMgr()->Mutex());
692  TRACE(HDBG, "fAdminPath: "<<fAdminPath);
693  buf.assign(fAdminPath, fAdminPath.rfind('/') + 1, -1);
694  fgMgr->SessionMgr()->DeleteFromSessions(buf.c_str());
695  // Move the entry to the terminated sessions area
696  fgMgr->SessionMgr()->MvSession(buf.c_str());
697  }
698  else {
699  TRACE(XERR,"No XrdProofdMgr ("<<fgMgr<<") or SessionMgr ("<<fgMgr->SessionMgr()<<")")
700  }
701  }
702  }
703  // Set fields to starting point (debugging mostly)
704  Reset();
705 
706  // Push ourselves on the stack
707  fgProtStack.Push(&fProtLink);
708 #if 0
709  if(fgProtStack.Push(&fProtLink) != 0) {
710  XrdProofdProtocol *xp = fProtLink.objectItem();
711  fProtLink.setItem(0);
712  delete xp;
713  }
714 #endif
715 }
716 
717 ////////////////////////////////////////////////////////////////////////////////
718 /// Allocate a buffer to handle quantum bytes; if argp points to an existing
719 /// buffer, its size is checked and re-allocated if needed
720 
721 XrdBuffer *XrdProofdProtocol::GetBuff(int quantum, XrdBuffer *argp)
722 {
723  XPDLOC(ALL, "Protocol::GetBuff")
724 
725  TRACE(HDBG, "len: "<<quantum);
726 
727  // If we are given an existing buffer, we keep it if we use at least half
728  // of it; otherwise we take a smaller one
729  if (argp) {
730  if (quantum >= argp->bsize / 2 && quantum <= argp->bsize)
731  return argp;
732  }
733 
734  // Release the buffer if too small
735  XrdSysMutexHelper mh(fgBMutex);
736  if (argp)
737  fgBPool->Release(argp);
738 
739  // Obtain a new one
740  if ((argp = fgBPool->Obtain(quantum)) == 0) {
741  TRACE(XERR, "could not get requested buffer (size: "<<quantum<<
742  ") = insufficient memory");
743  } else {
744  TRACE(HDBG, "quantum: "<<quantum<<
745  ", buff: "<<(void *)(argp->buff)<<", bsize:"<<argp->bsize);
746  }
747 
748  // Done
749  return argp;
750 }
751 
752 ////////////////////////////////////////////////////////////////////////////////
753 /// Release a buffer previously allocated via GetBuff
754 
755 void XrdProofdProtocol::ReleaseBuff(XrdBuffer *argp)
756 {
757  XrdSysMutexHelper mh(fgBMutex);
758  fgBPool->Release(argp);
759 }
760 
761 ////////////////////////////////////////////////////////////////////////////////
762 /// Get data from the open link
763 
764 int XrdProofdProtocol::GetData(const char *dtype, char *buff, int blen)
765 {
766  XPDLOC(ALL, "Protocol::GetData")
767 
768  int rlen;
769 
770  // Read the data but reschedule the link if we have not received all of the
771  // data within the timeout interval.
772  TRACET(TraceID(), HDBG, "dtype: "<<(dtype ? dtype : " - ")<<", blen: "<<blen);
773 
774  // No need to lock:the link is disable while we are here
775  rlen = fLink->Recv(buff, blen, fgReadWait);
776  if (rlen < 0) {
777  if (rlen != -ENOMSG && rlen != -ECONNRESET) {
778  XrdOucString emsg = "link read error: errno: ";
779  emsg += -rlen;
780  TRACET(TraceID(), XERR, emsg.c_str());
781  return (fLink ? fLink->setEtext(emsg.c_str()) : -1);
782  } else {
783  TRACET(TraceID(), HDBG, "connection closed by peer (errno: "<<-rlen<<")");
784  return -1;
785  }
786  }
787  if (rlen < blen) {
788  TRACET(TraceID(), DBG, dtype << " timeout; read " <<rlen <<" of " <<blen <<" bytes - rescheduling");
789  return 1;
790  }
791  TRACET(TraceID(), HDBG, "rlen: "<<rlen);
792 
793  return 0;
794 }
795 
796 ////////////////////////////////////////////////////////////////////////////////
797 /// Send data over the open link. Segmentation is done here, if required.
798 
799 int XrdProofdProtocol::SendData(XrdProofdProofServ *xps,
800  kXR_int32 sid, XrdSrvBuffer **buf, bool savebuf)
801 {
802  XPDLOC(ALL, "Protocol::SendData")
803 
804  int rc = 0;
805 
806  TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
807 
808  // Buffer length
809  int len = fRequest.header.dlen;
810 
811  // Quantum size
812  int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
813 
814  // Get a buffer
815  XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
816  if (!argp) return -1;
817 
818  // Now send over all of the data as unsolicited messages
819  XrdOucString msg;
820  while (len > 0) {
821 
822  XrdProofdResponse *response = (sid > -1) ? xps->Response() : 0;
823 
824  if ((rc = GetData("data", argp->buff, quantum))) {
825  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
826  return -1;
827  }
828  if (buf && !(*buf) && savebuf)
829  *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
830  // Send
831  if (sid > -1) {
832  if (TRACING(HDBG))
833  XPDFORM(msg, "EXT: server ID: %d, sending: %d bytes", sid, quantum);
834  if (!response || response->Send(kXR_attn, kXPD_msgsid, sid,
835  argp->buff, quantum) != 0) {
836  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
837  XPDFORM(msg, "EXT: server ID: %d, problems sending: %d bytes to server",
838  sid, quantum);
839  TRACET(TraceID(), XERR, msg);
840  return -1;
841  }
842  } else {
843 
844  // Get ID of the client
845  int cid = ntohl(fRequest.sendrcv.cid);
846  if (TRACING(HDBG))
847  XPDFORM(msg, "INT: client ID: %d, sending: %d bytes", cid, quantum);
848  if (xps->SendData(cid, argp->buff, quantum) != 0) {
849  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
850  XPDFORM(msg, "INT: client ID: %d, problems sending: %d bytes to client",
851  cid, quantum);
852  TRACET(TraceID(), XERR, msg);
853  return -1;
854  }
855  }
856  TRACET(TraceID(), HDBG, msg);
857  // Next segment
858  len -= quantum;
859  if (len < quantum)
860  quantum = len;
861  }
862 
863  // Release the buffer
864  { XrdSysMutexHelper mh(fgBMutex); fgBPool->Release(argp); }
865 
866  // Done
867  return 0;
868 }
869 
870 ////////////////////////////////////////////////////////////////////////////////
871 /// Send data over the open client links of session 'xps'.
872 /// Used when all the connected clients are eligible to receive the message.
873 /// Segmentation is done here, if required.
874 
875 int XrdProofdProtocol::SendDataN(XrdProofdProofServ *xps,
876  XrdSrvBuffer **buf, bool savebuf)
877 {
878  XPDLOC(ALL, "Protocol::SendDataN")
879 
880  int rc = 0;
881 
882  TRACET(TraceID(), HDBG, "length: "<<fRequest.header.dlen<<" bytes ");
883 
884  // Buffer length
885  int len = fRequest.header.dlen;
886 
887  // Quantum size
888  int quantum = (len > fgMaxBuffsz ? fgMaxBuffsz : len);
889 
890  // Get a buffer
891  XrdBuffer *argp = XrdProofdProtocol::GetBuff(quantum);
892  if (!argp) return -1;
893 
894  // Now send over all of the data as unsolicited messages
895  while (len > 0) {
896  if ((rc = GetData("data", argp->buff, quantum))) {
897  XrdProofdProtocol::ReleaseBuff(argp);
898  return -1;
899  }
900  if (buf && !(*buf) && savebuf)
901  *buf = new XrdSrvBuffer(argp->buff, quantum, 1);
902 
903  // Send to connected clients
904  if (xps->SendDataN(argp->buff, quantum) != 0) {
905  XrdProofdProtocol::ReleaseBuff(argp);
906  return -1;
907  }
908 
909  // Next segment
910  len -= quantum;
911  if (len < quantum)
912  quantum = len;
913  }
914 
915  // Release the buffer
916  XrdProofdProtocol::ReleaseBuff(argp);
917 
918  // Done
919  return 0;
920 }
921 
922 ////////////////////////////////////////////////////////////////////////////////
923 /// Handle a request to forward a message to another process
924 
925 int XrdProofdProtocol::SendMsg()
926 {
927  XPDLOC(ALL, "Protocol::SendMsg")
928 
929  static const char *crecv[5] = {"master proofserv", "top master",
930  "client", "undefined", "any"};
931  int rc = 0;
932 
933  XPD_SETRESP(this, "SendMsg");
934 
935  // Unmarshall the data
936  int psid = ntohl(fRequest.sendrcv.sid);
937  int opt = ntohl(fRequest.sendrcv.opt);
938 
939  XrdOucString msg;
940  // Find server session
941  XrdProofdProofServ *xps = 0;
942  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
943  XPDFORM(msg, "%s: session ID not found: %d", (Internal() ? "INT" : "EXT"), psid);
944  TRACET(TraceID(), XERR, msg.c_str());
945  response->Send(kXR_InvalidRequest, msg.c_str());
946  return 0;
947  }
948 
949  // Message length
950  int len = fRequest.header.dlen;
951 
952  if (!Internal()) {
953 
954  if (TRACING(HDBG)) {
955  // Notify
956  XPDFORM(msg, "EXT: sending %d bytes to proofserv (psid: %d, xps: %p, status: %d,"
957  " cid: %d)", len, psid, xps, xps->Status(), fCID);
958  TRACET(TraceID(), HDBG, msg.c_str());
959  }
960 
961  // Send to proofsrv our client ID
962  if (fCID == -1) {
963  TRACET(TraceID(), REQ, "EXT: error getting clientSID");
964  response->Send(kXP_ServerError,"EXT: getting clientSID");
965  return 0;
966  }
967  if (SendData(xps, fCID)) {
968  TRACET(TraceID(), REQ, "EXT: error sending message to proofserv");
969  response->Send(kXP_reconnecting,"EXT: sending message to proofserv");
970  return 0;
971  }
972 
973  // Notify to user
974  response->Send();
975 
976  } else {
977 
978  if (TRACING(HDBG)) {
979  // Notify
980  XPDFORM(msg, "INT: sending %d bytes to client/master (psid: %d, xps: %p, status: %d)",
981  len, psid, xps, xps->Status());
982  TRACET(TraceID(), HDBG, msg.c_str());
983  }
984  bool saveStartMsg = 0;
985  XrdSrvBuffer *savedBuf = 0;
986  // Additional info about the message
987  if (opt & kXPD_setidle) {
988  TRACET(TraceID(), DBG, "INT: setting proofserv in 'idle' state");
989  xps->SetStatus(kXPD_idle);
990  PostSession(-1, fPClient->UI().fUser.c_str(),
991  fPClient->UI().fGroup.c_str(), xps);
992  } else if (opt & kXPD_querynum) {
993  TRACET(TraceID(), DBG, "INT: got message with query number");
994  } else if (opt & kXPD_startprocess) {
995  TRACET(TraceID(), DBG, "INT: setting proofserv in 'running' state");
996  xps->SetStatus(kXPD_running);
997  PostSession(1, fPClient->UI().fUser.c_str(),
998  fPClient->UI().fGroup.c_str(), xps);
999  // Save start processing message for later clients
1000  xps->DeleteStartMsg();
1001  saveStartMsg = 1;
1002  } else if (opt & kXPD_logmsg) {
1003  // We broadcast log messages only not idle to catch the
1004  // result from processing
1005  if (xps->Status() == kXPD_running) {
1006  TRACET(TraceID(), DBG, "INT: broadcasting log message");
1007  opt |= kXPD_fb_prog;
1008  }
1009  }
1010  bool fbprog = (opt & kXPD_fb_prog);
1011 
1012  if (!fbprog) {
1013  //
1014  // The message is strictly for the client requiring it
1015  if (SendData(xps, -1, &savedBuf, saveStartMsg) != 0) {
1016  response->Send(kXP_reconnecting,
1017  "SendMsg: INT: session is reconnecting: retry later");
1018  return 0;
1019  }
1020  } else {
1021  // Send to all connected clients
1022  if (SendDataN(xps, &savedBuf, saveStartMsg) != 0) {
1023  response->Send(kXP_reconnecting,
1024  "SendMsg: INT: session is reconnecting: retry later");
1025  return 0;
1026  }
1027  }
1028  // Save start processing messages, if required
1029  if (saveStartMsg)
1030  xps->SetStartMsg(savedBuf);
1031 
1032  if (TRACING(DBG)) {
1033  int ii = xps->SrvType();
1034  if (ii > 3) ii = 3;
1035  if (ii < 0) ii = 4;
1036  XPDFORM(msg, "INT: message sent to %s (%d bytes)", crecv[ii], len);
1037  TRACET(TraceID(), DBG, msg);
1038  }
1039  // Notify to proofsrv
1040  response->Send();
1041  }
1042 
1043  // Over
1044  return 0;
1045 }
1046 
1047 ////////////////////////////////////////////////////////////////////////////////
1048 /// Handle generic request of a urgent message to be forwarded to the server
1049 
1050 int XrdProofdProtocol::Urgent()
1051 {
1052  XPDLOC(ALL, "Protocol::Urgent")
1053 
1054  unsigned int rc = 0;
1055 
1056  XPD_SETRESP(this, "Urgent");
1057 
1058  // Unmarshall the data
1059  int psid = ntohl(fRequest.proof.sid);
1060  int type = ntohl(fRequest.proof.int1);
1061  int int1 = ntohl(fRequest.proof.int2);
1062  int int2 = ntohl(fRequest.proof.int3);
1063 
1064  TRACET(TraceID(), REQ, "psid: "<<psid<<", type: "<< type);
1065 
1066  // Find server session
1067  XrdProofdProofServ *xps = 0;
1068  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1069  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1070  response->Send(kXR_InvalidRequest,"Urgent: session ID not found");
1071  return 0;
1072  }
1073 
1074  TRACET(TraceID(), DBG, "xps: "<<xps<<", status: "<<xps->Status());
1075 
1076  // Check ID matching
1077  if (!xps->Match(psid)) {
1078  response->Send(kXP_InvalidRequest,"Urgent: IDs do not match - do nothing");
1079  return 0;
1080  }
1081 
1082  // Check the link to the session
1083  if (!xps->Response()) {
1084  response->Send(kXP_InvalidRequest,"Urgent: session response object undefined - do nothing");
1085  return 0;
1086  }
1087 
1088  // Prepare buffer
1089  int len = 3 *sizeof(kXR_int32);
1090  char *buf = new char[len];
1091  // Type
1092  kXR_int32 itmp = static_cast<kXR_int32>(htonl(type));
1093  memcpy(buf, &itmp, sizeof(kXR_int32));
1094  // First info container
1095  itmp = static_cast<kXR_int32>(htonl(int1));
1096  memcpy(buf + sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1097  // Second info container
1098  itmp = static_cast<kXR_int32>(htonl(int2));
1099  memcpy(buf + 2 * sizeof(kXR_int32), &itmp, sizeof(kXR_int32));
1100  // Send over
1101  if (xps->Response()->Send(kXR_attn, kXPD_urgent, buf, len) != 0) {
1102  response->Send(kXP_ServerError,
1103  "Urgent: could not propagate request to proofsrv");
1104  return 0;
1105  }
1106 
1107  // Notify to user
1108  response->Send();
1109  TRACET(TraceID(), DBG, "request propagated to proofsrv");
1110 
1111  // Over
1112  return 0;
1113 }
1114 
1115 ////////////////////////////////////////////////////////////////////////////////
1116 /// Handle an interrupt request
1117 
1118 int XrdProofdProtocol::Interrupt()
1119 {
1120  XPDLOC(ALL, "Protocol::Interrupt")
1121 
1122  int rc = 0;
1123 
1124  XPD_SETRESP(this, "Interrupt");
1125 
1126  // Unmarshall the data
1127  int psid = ntohl(fRequest.interrupt.sid);
1128  int type = ntohl(fRequest.interrupt.type);
1129  TRACET(TraceID(), REQ, "psid: "<<psid<<", type:"<<type);
1130 
1131  // Find server session
1132  XrdProofdProofServ *xps = 0;
1133  if (!fPClient || !(xps = fPClient->GetServer(psid))) {
1134  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1135  response->Send(kXR_InvalidRequest,"Interrupt: session ID not found");
1136  return 0;
1137  }
1138 
1139  if (xps) {
1140 
1141  // Check ID matching
1142  if (!xps->Match(psid)) {
1143  response->Send(kXP_InvalidRequest,"Interrupt: IDs do not match - do nothing");
1144  return 0;
1145  }
1146 
1147  XrdOucString msg;
1148  XPDFORM(msg, "xps: %p, link ID: %s, proofsrv PID: %d",
1149  xps, xps->Response()->TraceID(), xps->SrvPID());
1150  TRACET(TraceID(), DBG, msg.c_str());
1151 
1152  // Propagate the type as unsolicited
1153  if (xps->Response()->Send(kXR_attn, kXPD_interrupt, type) != 0) {
1154  response->Send(kXP_ServerError,
1155  "Interrupt: could not propagate interrupt code to proofsrv");
1156  return 0;
1157  }
1158 
1159  // Notify to user
1160  response->Send();
1161  TRACET(TraceID(), DBG, "interrupt propagated to proofsrv");
1162  }
1163 
1164  // Over
1165  return 0;
1166 }
1167 
1168 ////////////////////////////////////////////////////////////////////////////////
1169 /// Handle a ping request.
1170 /// For internal connections, ping is done asynchronously to avoid locking
1171 /// problems; the session checker verifies that the admin file has been touched
1172 /// recently enough; touching is done in Process2, so we have nothing to do here
1173 
1174 int XrdProofdProtocol::Ping()
1175 {
1176  XPDLOC(ALL, "Protocol::Ping")
1177 
1178  int rc = 0;
1179  if (Internal()) {
1180  if (TRACING(HDBG)) {
1181  XPD_SETRESP(this, "Ping");
1182  TRACET(TraceID(), HDBG, "INT: nothing to do ");
1183  }
1184  return 0;
1185  }
1186  XPD_SETRESP(this, "Ping");
1187 
1188  // Unmarshall the data
1189  int psid = ntohl(fRequest.sendrcv.sid);
1190  int asyncopt = ntohl(fRequest.sendrcv.opt);
1191 
1192  TRACET(TraceID(), REQ, "psid: "<<psid<<", async: "<<asyncopt);
1193 
1194  // For connections to servers find the server session; manager connections
1195  // (psid == -1) do not have any session attached
1196  XrdProofdProofServ *xps = 0;
1197  if (!fPClient || (psid > -1 && !(xps = fPClient->GetServer(psid)))) {
1198  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1199  response->Send(kXR_InvalidRequest,"session ID not found");
1200  return 0;
1201  }
1202 
1203  // For manager connections we are done
1204  kXR_int32 pingres = (psid > -1) ? 0 : 1;
1205  if (psid > -1 && xps && xps->IsValid()) {
1206 
1207  TRACET(TraceID(), DBG, "EXT: psid: "<<psid);
1208 
1209  // This is the max time we will privide an answer
1210  kXR_int32 checkfq = fgMgr->SessionMgr()->CheckFrequency();
1211 
1212  // If asynchronous return the timeout for an answer
1213  if (asyncopt == 1) {
1214  TRACET(TraceID(), DBG, "EXT: async: notifying timeout to client: "<<checkfq<<" secs");
1215  response->Send(kXR_ok, checkfq);
1216  }
1217 
1218  // Admin path
1219  XrdOucString path(xps->AdminPath());
1220  if (path.length() <= 0) {
1221  TRACET(TraceID(), XERR, "EXT: admin path is empty! - protocol error");
1222  if (asyncopt == 0)
1223  response->Send(kXP_ServerError, "EXT: admin path is empty! - protocol error");
1224  return 0;
1225  }
1226  path += ".status";
1227 
1228  // Current time
1229  int now = time(0);
1230 
1231  // Stat the admin file
1232  struct stat st0;
1233  if (stat(path.c_str(), &st0) != 0) {
1234  TRACET(TraceID(), XERR, "EXT: cannot stat admin path: "<<path);
1235  if (asyncopt == 0)
1236  response->Send(kXP_ServerError, "EXT: cannot stat admin path");
1237  return 0;
1238  }
1239 
1240  // Take the pid
1241  int pid = xps->SrvPID();
1242  // If the session is alive ...
1243  if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1244  // If it as not touched during the last ~checkfq secs we ask for a refresh
1245  if ((now - st0.st_mtime) > checkfq - 5) {
1246  // Send the request (asking for further propagation)
1247  if (xps->VerifyProofServ(1) != 0) {
1248  TRACET(TraceID(), XERR, "EXT: could not send verify request to proofsrv");
1249  if (asyncopt == 0)
1250  response->Send(kXP_ServerError, "EXT: could not verify reuqest to proofsrv");
1251  return 0;
1252  }
1253  // Wait for the action for checkfq secs, checking every 1 sec
1254  struct stat st1;
1255  int ns = checkfq;
1256  while (ns--) {
1257  if (stat(path.c_str(), &st1) == 0) {
1258  if (st1.st_mtime > st0.st_mtime) {
1259  pingres = 1;
1260  break;
1261  }
1262  }
1263  // Wait 1 sec
1264  TRACET(TraceID(), DBG, "EXT: waiting "<<ns<<" secs for session "<<pid<<
1265  " to touch the admin path");
1266  sleep(1);
1267  }
1268 
1269  } else {
1270  // Session is alive
1271  pingres = 1;
1272  }
1273  } else {
1274  // Session is dead
1275  pingres = 0;
1276  }
1277 
1278  // Notify the client
1279  TRACET(TraceID(), DBG, "EXT: notified the result to client: "<<pingres);
1280  if (asyncopt == 0) {
1281  response->Send(kXR_ok, pingres);
1282  } else {
1283  // Prepare buffer for asynchronous notification
1284  int len = sizeof(kXR_int32);
1285  char *buf = new char[len];
1286  // Option
1287  kXR_int32 ifw = (kXR_int32)0;
1288  ifw = static_cast<kXR_int32>(htonl(ifw));
1289  memcpy(buf, &ifw, sizeof(kXR_int32));
1290  response->Send(kXR_attn, kXPD_ping, buf, len);
1291  }
1292  return 0;
1293  } else if (psid > -1) {
1294  // This is a failure for connections to sessions
1295  TRACET(TraceID(), XERR, "session ID not found: "<<psid);
1296  }
1297 
1298  // Send the result
1299  response->Send(kXR_ok, pingres);
1300 
1301  // Done
1302  return 0;
1303 }
1304 
1305 ////////////////////////////////////////////////////////////////////////////////
1306 /// Post change of session status
1307 
1308 void XrdProofdProtocol::PostSession(int on, const char *u, const char *g,
1309  XrdProofdProofServ *xps)
1310 {
1311  XPDLOC(ALL, "Protocol::PostSession")
1312 
1313  // Tell the priority manager
1314  if (fgMgr && fgMgr->PriorityMgr()) {
1315  int pid = (xps) ? xps->SrvPID() : -1;
1316  if (pid < 0) {
1317  TRACE(XERR, "undefined session or process id");
1318  return;
1319  }
1320  XrdOucString buf;
1321  XPDFORM(buf, "%d %s %s %d", on, u, g, pid);
1322 
1323  if (fgMgr->PriorityMgr()->Pipe()->Post(XrdProofdPriorityMgr::kChangeStatus,
1324  buf.c_str()) != 0) {
1325  TRACE(XERR, "problem posting the prority manager pipe");
1326  }
1327  }
1328  // Tell the scheduler
1329  if (fgMgr && fgMgr->ProofSched()) {
1330  if (on == -1 && xps && xps->SrvType() == kXPD_TopMaster) {
1331  TRACE(DBG, "posting the scheduler pipe");
1332  if (fgMgr->ProofSched()->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
1333  TRACE(XERR, "problem posting the scheduler pipe");
1334  }
1335  }
1336  }
1337  // Tell the session manager
1338  if (fgMgr && fgMgr->SessionMgr()) {
1339  if (fgMgr->SessionMgr()->Pipe()->Post(XrdProofdProofServMgr::kChgSessionSt, 0) != 0) {
1340  TRACE(XERR, "problem posting the session manager pipe");
1341  }
1342  }
1343  // Done
1344  return;
1345 }
1346 
1347 ////////////////////////////////////////////////////////////////////////////////
1348 /// Recording time of the last request on this instance
1349 
1350 void XrdProofdProtocol::TouchAdminPath()
1351 {
1352  XPDLOC(ALL, "Protocol::TouchAdminPath")
1353 
1354  XPD_SETRESPV(this, "TouchAdminPath");
1355  TRACET(TraceID(), HDBG, fAdminPath);
1356 
1357  if (fAdminPath.length() > 0) {
1358  int rc = 0;
1359  if ((rc = XrdProofdAux::Touch(fAdminPath.c_str())) != 0) {
1360  // In the case the file was not found and the connetion is internal
1361  // try also the terminated sessions, as the file could have been moved
1362  // in the meanwhile
1363  XrdOucString apath = fAdminPath;
1364  if (rc == -ENOENT && Internal()) {
1365  apath.replace("/activesessions/", "/terminatedsessions/");
1366  apath.replace(".status", "");
1367  rc = XrdProofdAux::Touch(apath.c_str());
1368  }
1369  if (rc != 0 && rc != -ENOENT) {
1370  const char *type = Internal() ? "internal" : "external";
1371  TRACET(TraceID(), XERR, type<<": problems touching "<<apath<<"; errno: "<<-rc);
1372  }
1373  }
1374  }
1375  // Done
1376  return;
1377 }
1378 
1379 ////////////////////////////////////////////////////////////////////////////////
1380 /// Set and propagate a Ctrl-C request
1381 
1382 int XrdProofdProtocol::CtrlC()
1383 {
1384  XPDLOC(ALL, "Protocol::CtrlC")
1385 
1386  TRACET(TraceID(), ALL, "handling request");
1387 
1388  { XrdSysMutexHelper mhp(fCtrlcMutex);
1389  fIsCtrlC = 1;
1390  }
1391 
1392  // Propagate now
1393  if (fgMgr) {
1394  if (fgMgr->SrvType() != kXPD_Worker) {
1395  if (fgMgr->NetMgr()) {
1396  fgMgr->NetMgr()->BroadcastCtrlC(Client()->User());
1397  }
1398  }
1399  }
1400 
1401  // Over
1402  return 0;
1403 }