Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
XrdProofdProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 #include <sys/stat.h>
12 
13 #include "XrdNet/XrdNet.hh"
14 
15 #include "XrdProofdAux.h"
16 #include "XrdProofdProofServ.h"
17 #include "XrdProofWorker.h"
18 #include "XrdProofSched.h"
19 #include "XrdProofdManager.h"
20 
21 // Tracing utils
22 #include "XrdProofdTrace.h"
23 
24 ////////////////////////////////////////////////////////////////////////////////
25 /// Constructor
26 
27 XrdProofdProofServ::XrdProofdProofServ()
28 {
29  fMutex = new XrdSysRecMutex;
30  fResponse = 0;
31  fProtocol = 0;
32  fParent = 0;
33  fPingSem = 0;
34  fStartMsg = 0;
35  fStatus = kXPD_idle;
36  fSrvPID = -1;
37  fSrvType = kXPD_AnyServer;
38  fPLiteNWrks = -1;
39  fID = -1;
40  fIsShutdown = false;
41  fIsValid = true; // It is created for a valid server ...
42  fSkipCheck = false;
43  fProtVer = -1;
44  fNClients = 0;
45  fClients.reserve(10);
46  fDisconnectTime = -1;
47  fSetIdleTime = time(0);
48  fROOT = 0;
49  // Strings
50  fAdminPath = "";
51  fAlias = "";
52  fClient = "";
53  fFileout = "";
54  fGroup = "";
55  fOrdinal = "";
56  fTag = "";
57  fUserEnvs = "";
58  fUNIXSock = 0;
59  fUNIXSockPath = "";
60  fQueries.clear();
61 }
62 
63 ////////////////////////////////////////////////////////////////////////////////
64 /// Destructor
65 
66 XrdProofdProofServ::~XrdProofdProofServ()
67 {
68  SafeDel(fStartMsg);
69  SafeDel(fPingSem);
70 
71  std::vector<XrdClientID *>::iterator i;
72  for (i = fClients.begin(); i != fClients.end(); ++i)
73  if (*i)
74  delete (*i);
75  fClients.clear();
76 
77  // Cleanup worker info
78  ClearWorkers();
79 
80  // Cleanup queries info
81  fQueries.clear();
82 
83  // Remove the associated UNIX socket path
84  unlink(fUNIXSockPath.c_str());
85 
86  SafeDel(fMutex);
87 }
88 
89 ////////////////////////////////////////////////////////////////////////////////
90 /// Decrease active session counters on worker w
91 
92 static int DecreaseWorkerCounters(const char *, XrdProofWorker *w, void *x)
93 {
94  XPDLOC(PMGR, "DecreaseWorkerCounters")
95 
96  XrdProofdProofServ *xps = (XrdProofdProofServ *)x;
97 
98  if (w && xps) {
99  w->RemoveProofServ(xps);
100  TRACE(REQ, w->fHost.c_str() <<" done");
101  // Check next
102  return 0;
103  }
104 
105  // Not enough info: stop
106  return 1;
107 }
108 
109 ////////////////////////////////////////////////////////////////////////////////
110 /// Decrease active session counters on worker w
111 
112 static int DumpWorkerCounters(const char *k, XrdProofWorker *w, void *)
113 {
114  XPDLOC(PMGR, "DumpWorkerCounters")
115 
116  if (w) {
117  TRACE(ALL, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
118  // Check next
119  return 0;
120  }
121 
122  // Not enough info: stop
123  return 1;
124 }
125 
126 ////////////////////////////////////////////////////////////////////////////////
127 /// Decrease worker counters and clean-up the list
128 
129 void XrdProofdProofServ::ClearWorkers()
130 {
131  XrdSysMutexHelper mhp(fMutex);
132 
133  // Decrease workers' counters and remove this from workers
134  fWorkers.Apply(DecreaseWorkerCounters, this);
135  fWorkers.Purge();
136 }
137 
138 ////////////////////////////////////////////////////////////////////////////////
139 /// Add a worker assigned to this session with label 'o'
140 
141 void XrdProofdProofServ::AddWorker(const char *o, XrdProofWorker *w)
142 {
143  if (!o || !w) return;
144 
145  XrdSysMutexHelper mhp(fMutex);
146 
147  fWorkers.Add(o, w, 0, Hash_keepdata);
148 }
149 
150 ////////////////////////////////////////////////////////////////////////////////
151 /// Release worker assigned to this session with label 'o'
152 
153 void XrdProofdProofServ::RemoveWorker(const char *o)
154 {
155  XPDLOC(SMGR, "ProofServ::RemoveWorker")
156 
157  if (!o) return;
158 
159  TRACE(DBG,"removing: "<<o);
160 
161  XrdSysMutexHelper mhp(fMutex);
162 
163  XrdProofWorker *w = fWorkers.Find(o);
164  if (w) w->RemoveProofServ(this);
165  fWorkers.Del(o);
166  if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
167 }
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 /// Reset this instance, broadcasting a message to the clients.
171 /// return 1 if top master, 0 otherwise
172 
173 int XrdProofdProofServ::Reset(const char *msg, int type)
174 {
175  XPDLOC(SMGR, "ProofServ::Reset")
176 
177  int rc = 0;
178  // Read the status file
179  int st = -1;
180  XrdOucString fn;
181  XPDFORM(fn, "%s.status", fAdminPath.c_str());
182  FILE *fpid = fopen(fn.c_str(), "r");
183  if (fpid) {
184  char line[64];
185  if (fgets(line, sizeof(line), fpid)) {
186  if (line[strlen(line)-1] == '\n') line[strlen(line)-1] = 0;
187  st = atoi(line);
188  } else {
189  TRACE(XERR,"problems reading from file "<<fn);
190  }
191  fclose(fpid);
192  }
193  TRACE(DBG,"file: "<<fn<<", st:"<<st);
194  XrdSysMutexHelper mhp(fMutex);
195  // Broadcast msg
196  if (st == 4) {
197  Broadcast("idle-timeout", type);
198  } else {
199  Broadcast(msg, type);
200  }
201  // What kind of server is this?
202  if (fSrvType == kXPD_TopMaster) rc = 1;
203  // Reset instance
204  Reset();
205  // Done
206  return rc;
207 }
208 
209 ////////////////////////////////////////////////////////////////////////////////
210 /// Reset this instance
211 
212 void XrdProofdProofServ::Reset()
213 {
214  XrdSysMutexHelper mhp(fMutex);
215 
216  fResponse = 0;
217  fProtocol = 0;
218  fParent = 0;
219  SafeDel(fStartMsg);
220  SafeDel(fPingSem);
221  fSrvPID = -1;
222  fID = -1;
223  fIsShutdown = false;
224  fIsValid = false;
225  fSkipCheck = false;
226  fProtVer = -1;
227  fNClients = 0;
228  fClients.clear();
229  fDisconnectTime = -1;
230  fSetIdleTime = -1;
231  fROOT = 0;
232  // Cleanup worker info
233  ClearWorkers();
234  // ClearWorkers depends on the fSrvType and fStatus
235  fSrvType = kXPD_AnyServer;
236  fPLiteNWrks = -1;
237  fStatus = kXPD_idle;
238  // Cleanup queries info
239  fQueries.clear();
240  // Strings
241  fAdminPath = "";
242  fAlias = "";
243  fClient = "";
244  fFileout = "";
245  fGroup = "";
246  fOrdinal = "";
247  fTag = "";
248  fUserEnvs = "";
249  DeleteUNIXSock();
250 }
251 
252 ////////////////////////////////////////////////////////////////////////////////
253 /// Delete the current UNIX socket
254 
255 void XrdProofdProofServ::DeleteUNIXSock()
256 {
257  SafeDel(fUNIXSock);
258  unlink(fUNIXSockPath.c_str());
259  fUNIXSockPath = "";
260 }
261 
262 ////////////////////////////////////////////////////////////////////////////////
263 /// Return the value of fSkipCheck and reset it to false
264 
265 bool XrdProofdProofServ::SkipCheck()
266 {
267  XrdSysMutexHelper mhp(fMutex);
268 
269  bool rc = fSkipCheck;
270  fSkipCheck = false;
271  return rc;
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Get instance corresponding to cid
276 
277 XrdClientID *XrdProofdProofServ::GetClientID(int cid)
278 {
279  XPDLOC(SMGR, "ProofServ::GetClientID")
280 
281  XrdClientID *csid = 0;
282 
283  if (cid < 0) {
284  TRACE(XERR, "negative ID: protocol error!");
285  return csid;
286  }
287 
288  XrdOucString msg;
289  { XrdSysMutexHelper mhp(fMutex);
290 
291  // Count new attached client
292  fNClients++;
293 
294  // If in the allocate range reset the corresponding instance and
295  // return it
296  if (cid < (int)fClients.size()) {
297  csid = fClients.at(cid);
298  csid->Reset();
299 
300  // Notification message
301  if (TRACING(DBG)) {
302  XPDFORM(msg, "cid: %d, size: %d", cid, fClients.size());
303  }
304  }
305 
306  if (!csid) {
307  // If not, allocate a new one; we need to resize (double it)
308  if (cid >= (int)fClients.capacity())
309  fClients.reserve(2*fClients.capacity());
310 
311  // Allocate new elements (for fast access we need all of them)
312  int ic = (int)fClients.size();
313  for (; ic <= cid; ic++)
314  fClients.push_back((csid = new XrdClientID()));
315 
316  // Notification message
317  if (TRACING(DBG)) {
318  XPDFORM(msg, "cid: %d, new size: %d", cid, fClients.size());
319  }
320  }
321  }
322  TRACE(DBG, msg);
323 
324  // We are done
325  return csid;
326 }
327 
328 ////////////////////////////////////////////////////////////////////////////////
329 /// Free instance corresponding to protocol connecting process 'pid'
330 
331 int XrdProofdProofServ::FreeClientID(int pid)
332 {
333  XPDLOC(SMGR, "ProofServ::FreeClientID")
334 
335  TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
336  fStatus<<", # clients: "<< fNClients);
337  int rc = -1;
338  if (pid <= 0) {
339  TRACE(XERR, "undefined pid!");
340  return rc;
341  }
342  if (!IsValid()) return rc;
343 
344  { XrdSysMutexHelper mhp(fMutex);
345 
346  // Remove this from the list of clients
347  std::vector<XrdClientID *>::iterator i;
348  for (i = fClients.begin(); i != fClients.end(); ++i) {
349  if ((*i) && (*i)->P()) {
350  if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
351  if (fProtocol == (*i)->P()) {
352  SetProtocol(0);
353  SetConnection(0);
354  }
355  (*i)->Reset();
356  if (fParent == (*i)) SetParent(0);
357  fNClients--;
358  // Record time of last disconnection
359  if (fNClients <= 0)
360  fDisconnectTime = time(0);
361  rc = 0;
362  break;
363  }
364  }
365  }
366  }
367  if (TRACING(REQ) && (rc == 0)) {
368  int spid = SrvPID();
369  TRACE(REQ, spid<<": slot for client pid: "<<pid<<" has been reset");
370  }
371 
372  // Out of range
373  return rc;
374 }
375 
376 ////////////////////////////////////////////////////////////////////////////////
377 /// Get the number of connected clients. If check is true check that
378 /// they are still valid ones and free the slots for the invalid ones
379 
380 int XrdProofdProofServ::GetNClients(bool check)
381 {
382  XrdSysMutexHelper mhp(fMutex);
383 
384  if (check) {
385  fNClients = 0;
386  // Remove this from the list of clients
387  std::vector<XrdClientID *>::iterator i;
388  for (i = fClients.begin(); i != fClients.end(); ++i) {
389  if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
390  }
391  }
392 
393  // Done
394  return fNClients;
395 }
396 
397 ////////////////////////////////////////////////////////////////////////////////
398 /// Return the time (in secs) all clients have been disconnected.
399 /// Return -1 if the session is running
400 
401 int XrdProofdProofServ::DisconnectTime()
402 {
403  XrdSysMutexHelper mhp(fMutex);
404 
405  int disct = -1;
406  if (fDisconnectTime > 0)
407  disct = time(0) - fDisconnectTime;
408  return ((disct > 0) ? disct : -1);
409 }
410 
411 ////////////////////////////////////////////////////////////////////////////////
412 /// Return the time (in secs) the session has been idle.
413 /// Return -1 if the session is running
414 
415 int XrdProofdProofServ::IdleTime()
416 {
417  XrdSysMutexHelper mhp(fMutex);
418 
419  int idlet = -1;
420  if (fStatus == kXPD_idle)
421  idlet = time(0) - fSetIdleTime;
422  return ((idlet > 0) ? idlet : -1);
423 }
424 
425 ////////////////////////////////////////////////////////////////////////////////
426 /// Set status to idle and update the related time stamp
427 ///
428 
429 void XrdProofdProofServ::SetIdle()
430 {
431  XrdSysMutexHelper mhp(fMutex);
432 
433  fStatus = kXPD_idle;
434  fSetIdleTime = time(0);
435 }
436 
437 ////////////////////////////////////////////////////////////////////////////////
438 /// Set status to running and reset the related time stamp
439 ///
440 
441 void XrdProofdProofServ::SetRunning()
442 {
443  XrdSysMutexHelper mhp(fMutex);
444 
445  fStatus = kXPD_running;
446  fSetIdleTime = -1;
447 }
448 
449 ////////////////////////////////////////////////////////////////////////////////
450 /// Broadcast message 'msg' at 'type' to the attached clients
451 
452 void XrdProofdProofServ::Broadcast(const char *msg, int type)
453 {
454  XPDLOC(SMGR, "ProofServ::Broadcast")
455 
456  // Backward-compatibility check
457  int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
458 
459  XrdOucString m;
460  int len = 0, nc = 0;
461  if (msg && (len = strlen(msg)) > 0) {
462  XrdProofdProtocol *p = 0;
463  int ic = 0, ncz = 0, sid = -1;
464  { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
465  for (ic = 0; ic < ncz; ic++) {
466  { XrdSysMutexHelper mhp(fMutex);
467  p = fClients.at(ic)->P();
468  sid = fClients.at(ic)->Sid(); }
469  // Send message
470  if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
471  XrdProofdResponse *response = p->Response(sid);
472  if (response) {
473  response->Send(kXR_attn, (XProofActionCode)type, (void *)msg, len);
474  nc++;
475  } else {
476  XPDFORM(m, "response instance for sid: %d not found", sid);
477  }
478  }
479  if (m.length() > 0)
480  TRACE(XERR, m);
481  m = "";
482  }
483  }
484  if (TRACING(DBG)) {
485  XPDFORM(m, "type: %d, message: '%s' notified to %d clients", type, msg, nc);
486  XPDPRT(m);
487  }
488 }
489 
490 ////////////////////////////////////////////////////////////////////////////////
491 /// Terminate the associated process.
492 /// A shutdown interrupt message is forwarded.
493 /// If add is TRUE (default) the pid is added to the list of processes
494 /// requested to terminate.
495 /// Return the pid of tyhe terminated process on success, -1 if not allowed
496 /// or other errors occured.
497 
498 int XrdProofdProofServ::TerminateProofServ(bool changeown)
499 {
500  XPDLOC(SMGR, "ProofServ::TerminateProofServ")
501 
502  int pid = fSrvPID;
503  TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
504 
505  // Send a terminate signal to the proofserv
506  if (pid > -1) {
507  XrdProofUI ui;
508  XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
509  if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
510  TRACE(XERR, "ord: problems signalling process: "<<fSrvPID);
511  }
512  XrdSysMutexHelper mhp(fMutex);
513  fIsShutdown = true;
514  }
515 
516  // Failed
517  return -1;
518 }
519 
520 ////////////////////////////////////////////////////////////////////////////////
521 /// Check if the associated proofserv process is alive. This is done
522 /// asynchronously by asking the process to callback and proof its vitality.
523 /// We do not block here: the caller may setup a waiting structure if
524 /// required.
525 /// If forward is true, the process will forward the request to the following
526 /// tiers.
527 /// Return 0 if the request was send successfully, -1 in case of error.
528 
529 int XrdProofdProofServ::VerifyProofServ(bool forward)
530 {
531  XPDLOC(SMGR, "ProofServ::VerifyProofServ")
532 
533  TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
534 
535  int rc = 0;
536  XrdOucString msg;
537 
538  // Prepare buffer
539  int len = sizeof(kXR_int32);
540  char *buf = new char[len];
541  // Option
542  kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
543  ifw = static_cast<kXR_int32>(htonl(ifw));
544  memcpy(buf, &ifw, sizeof(kXR_int32));
545 
546  { XrdSysMutexHelper mhp(fMutex);
547  // Propagate the ping request
548  if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
549  msg = "could not propagate ping to proofsrv";
550  rc = -1;
551  }
552  }
553  // Cleanup
554  delete[] buf;
555 
556  // Notify errors, if any
557  if (rc != 0)
558  TRACE(XERR, msg);
559 
560  // Done
561  return rc;
562 }
563 
564 ////////////////////////////////////////////////////////////////////////////////
565 /// Broadcast a new group priority value to the worker servers.
566 /// Called by masters.
567 
568 int XrdProofdProofServ::BroadcastPriority(int priority)
569 {
570  XPDLOC(SMGR, "ProofServ::BroadcastPriority")
571 
572  XrdSysMutexHelper mhp(fMutex);
573 
574  // Prepare buffer
575  int len = sizeof(kXR_int32);
576  char *buf = new char[len];
577  kXR_int32 itmp = priority;
578  itmp = static_cast<kXR_int32>(htonl(itmp));
579  memcpy(buf, &itmp, sizeof(kXR_int32));
580  // Send over
581  if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
582  // Failure
583  TRACE(XERR,"problems telling proofserv");
584  SafeDelArray(buf);
585  return -1;
586  }
587  SafeDelArray(buf);
588  TRACE(DBG, "priority "<<priority<<" sent over");
589  // Done
590  return 0;
591 }
592 
593 ////////////////////////////////////////////////////////////////////////////////
594 /// Send data to client cid.
595 
596 int XrdProofdProofServ::SendData(int cid, void *buff, int len)
597 {
598  XPDLOC(SMGR, "ProofServ::SendData")
599 
600  TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
601 
602  int rs = 0;
603  XrdOucString msg;
604 
605  // Get corresponding instance
606  XrdClientID *csid = 0;
607  { XrdSysMutexHelper mhp(fMutex);
608  if (cid < 0 || cid > (int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
609  XPDFORM(msg, "client ID not found (cid: %d, size: %d)", cid, fClients.size());
610  rs = -1;
611  }
612  if (!rs && !(csid->R())) {
613  XPDFORM(msg, "client not connected: csid: %p, cid: %d, fSid: %d",
614  csid, cid, csid->Sid());
615  rs = -1;
616  }
617  }
618 
619  //
620  // The message is strictly for the client requiring it
621  if (!rs) {
622  rs = -1;
623  XrdProofdResponse *response = csid->R() ? csid->R() : 0;
624  if (response)
625  if (!response->Send(kXR_attn, kXPD_msg, buff, len))
626  rs = 0;
627  } else {
628  // Notify error
629  TRACE(XERR, msg);
630  }
631 
632  // Done
633  return rs;
634 }
635 
636 ////////////////////////////////////////////////////////////////////////////////
637 /// Send data over the open client links of this session.
638 /// Used when all the connected clients are eligible to receive the message.
639 
640 int XrdProofdProofServ::SendDataN(void *buff, int len)
641 {
642  XPDLOC(SMGR, "ProofServ::SendDataN")
643 
644  TRACE(HDBG, "length: "<<len<<" bytes");
645 
646  XrdOucString msg;
647 
648  XrdSysMutexHelper mhp(fMutex);
649 
650  // Send to connected clients
651  XrdClientID *csid = 0;
652  int ic = 0;
653  for (ic = 0; ic < (int) fClients.size(); ic++) {
654  if ((csid = fClients.at(ic)) && csid->P()) {
655  XrdProofdResponse *resp = csid->R();
656  if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
657  return -1;
658  }
659  }
660 
661  // Done
662  return 0;
663 }
664 
665 ////////////////////////////////////////////////////////////////////////////////
666 /// Fill buf with relevant info about this session
667 
668 void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
669 {
670  XPDLOC(SMGR, "ProofServ::ExportBuf")
671 
672  buf = "";
673  int id, status, nc;
674  XrdOucString tag, alias;
675  { XrdSysMutexHelper mhp(fMutex);
676  id = fID;
677  status = fStatus;
678  nc = fNClients;
679  tag = fTag;
680  alias = fAlias; }
681  XPDFORM(buf, " | %d %s %s %d %d", id, tag.c_str(), alias.c_str(), status, nc);
682  TRACE(HDBG, "buf: "<< buf);
683 
684  // Done
685  return;
686 }
687 
688 ////////////////////////////////////////////////////////////////////////////////
689 /// Create UNIX socket for internal connections
690 
691 int XrdProofdProofServ::CreateUNIXSock(XrdSysError *edest)
692 {
693  XPDLOC(SMGR, "ProofServ::CreateUNIXSock")
694 
695  TRACE(DBG, "enter");
696 
697  // Make sure we do not have already a socket
698  if (fUNIXSock) {
699  TRACE(DBG,"UNIX socket exists already! ("<<fUNIXSockPath<<")");
700  return 0;
701  }
702 
703  // Create socket
704  fUNIXSock = new XrdNet(edest);
705 
706  // Make sure the admin path exists
707  if (fAdminPath.length() > 0) {
708  FILE *fadm = fopen(fAdminPath.c_str(), "a");
709  if (fadm) {
710  fclose(fadm);
711  } else {
712  TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
713  return -1;
714  }
715  }
716 
717  // Check the path
718  bool ok = 0;
719  if (unlink(fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
720  XPDPRT("WARNING: path exists: unable to delete it:"
721  " try to use it anyway " <<fUNIXSockPath);
722  ok = 1;
723  }
724 
725  // Create the path
726  int fd = 0;
727  if (!ok) {
728  if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
729  TRACE(XERR, "unable to create path: " <<fUNIXSockPath);
730  return -1;
731  }
732  close(fd);
733  }
734  if (fd > -1) {
735  // Change ownership
736  if (fUNIXSock->Bind((char *)fUNIXSockPath.c_str())) {
737  TRACE(XERR, " problems binding to UNIX socket; path: " <<fUNIXSockPath);
738  return -1;
739  } else
740  TRACE(DBG, "path for UNIX for socket is " <<fUNIXSockPath);
741  } else {
742  TRACE(XERR, "unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
743  return -1;
744  }
745 
746  // Change ownership if running as super-user
747  if (!geteuid()) {
748  XrdProofUI ui;
749  XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
750  if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
751  TRACE(XERR, "unable to change ownership of the UNIX socket"<<fUNIXSockPath);
752  return -1;
753  }
754  }
755 
756  // We are done
757  return 0;
758 }
759 
760 ////////////////////////////////////////////////////////////////////////////////
761 /// Set the admin path and make sure the file exists
762 
763 int XrdProofdProofServ::SetAdminPath(const char *a, bool assert, bool setown)
764 {
765  XPDLOC(SMGR, "ProofServ::SetAdminPath")
766 
767  XrdSysMutexHelper mhp(fMutex);
768 
769  fAdminPath = a;
770 
771  // If we are not asked to assert the file we are done
772  if (!assert) return 0;
773 
774  // Check if the session file exists
775  FILE *fpid = fopen(a, "a");
776  if (fpid) {
777  fclose(fpid);
778  } else {
779  TRACE(XERR, "unable to open / create admin path "<< fAdminPath << "; errno = "<<errno);
780  return -1;
781  }
782 
783  // Check if the status file exists
784  XrdOucString fn;
785  XPDFORM(fn, "%s.status", a);
786  if ((fpid = fopen(fn.c_str(), "a"))) {
787  fprintf(fpid, "%d", fStatus);
788  fclose(fpid);
789  } else {
790  TRACE(XERR, "unable to open / create status path "<< fn << "; errno = "<<errno);
791  return -1;
792  }
793 
794  if (setown) {
795  // Set the ownership of the status file to the user
796  XrdProofUI ui;
797  if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
798  TRACE(XERR, "unable to get info for user "<<fClient<<"; errno = "<<errno);
799  return -1;
800  }
801  if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
802  TRACE(XERR, "unable to give ownership of the status file "<< fn << " to user; errno = "<<errno);
803  return -1;
804  }
805  }
806 
807  // Done
808  return 0;
809 }
810 
811 ////////////////////////////////////////////////////////////////////////////////
812 /// Send a resume message to the this session. It is assumed that the session
813 /// has at least one async query to process and will immediately send
814 /// a getworkers request (the workers are already assigned).
815 
816 int XrdProofdProofServ::Resume()
817 {
818  XPDLOC(SMGR, "ProofServ::Resume")
819 
820  TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
821 
822  int rc = 0;
823  XrdOucString msg;
824 
825  { XrdSysMutexHelper mhp(fMutex);
826  //
827  if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
828  msg = "could not propagate resume to proofsrv";
829  rc = -1;
830  }
831  }
832 
833  // Notify errors, if any
834  if (rc != 0)
835  TRACE(XERR, msg);
836 
837  // Done
838  return rc;
839 }
840 
841 ////////////////////////////////////////////////////////////////////////////////
842 /// Decrease active session counters on worker w
843 
844 static int ExportWorkerDescription(const char *k, XrdProofWorker *w, void *s)
845 {
846  XPDLOC(PMGR, "ExportWorkerDescription")
847 
848  XrdOucString *wrks = (XrdOucString *)s;
849  if (w && wrks) {
850  // Master at the beginning
851  if (w->fType == 'M') {
852  if (wrks->length() > 0) wrks->insert('&',0);
853  wrks->insert(w->Export(), 0);
854  } else {
855  // Add separator if not the first
856  if (wrks->length() > 0)
857  (*wrks) += '&';
858  // Add export version of the info
859  (*wrks) += w->Export(k);
860  }
861  TRACE(HDBG, k <<" : "<<w->fHost.c_str()<<":"<<w->fPort <<" act: "<<w->Active());
862  // Check next
863  return 0;
864  }
865 
866  // Not enough info: stop
867  return 1;
868 }
869 
870 ////////////////////////////////////////////////////////////////////////////////
871 /// Export the assigned workers in the format understood by proofserv
872 
873 void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
874 {
875  XrdSysMutexHelper mhp(fMutex);
876  wrks = "";
877  fWorkers.Apply(ExportWorkerDescription, (void *)&wrks);
878 }
879 
880 ////////////////////////////////////////////////////////////////////////////////
881 /// Export the assigned workers in the format understood by proofserv
882 
883 void XrdProofdProofServ::DumpQueries()
884 {
885  XPDLOC(PMGR, "DumpQueries")
886 
887  XrdSysMutexHelper mhp(fMutex);
888 
889  TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
890  TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
891  ", # of queries: "<< fQueries.size());
892  std::list<XrdProofQuery *>::iterator ii;
893  int i = 0;
894  for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
895  i++;
896  TRACE(ALL," +++ #"<<i<<" tag:"<< (*ii)->GetTag()<<" dset: "<<
897  (*ii)->GetDSName()<<" size:"<<(*ii)->GetDSSize());
898  }
899  TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
900 }
901 
902 ////////////////////////////////////////////////////////////////////////////////
903 /// Get query with tag form the list of queries
904 
905 XrdProofQuery *XrdProofdProofServ::GetQuery(const char *tag)
906 {
907  XrdProofQuery *q = 0;
908  if (!tag || strlen(tag) <= 0) return q;
909 
910  XrdSysMutexHelper mhp(fMutex);
911 
912  if (fQueries.size() <= 0) return q;
913 
914  std::list<XrdProofQuery *>::iterator ii;
915  for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
916  q = *ii;
917  if (!strcmp(tag, q->GetTag())) break;
918  q = 0;
919  }
920  // Done
921  return q;
922 }
923 
924 ////////////////////////////////////////////////////////////////////////////////
925 /// remove query with tag form the list of queries
926 
927 void XrdProofdProofServ::RemoveQuery(const char *tag)
928 {
929  XrdProofQuery *q = 0;
930  if (!tag || strlen(tag) <= 0) return;
931 
932  XrdSysMutexHelper mhp(fMutex);
933 
934  if (fQueries.size() <= 0) return;
935 
936  std::list<XrdProofQuery *>::iterator ii;
937  for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
938  q = *ii;
939  if (!strcmp(tag, q->GetTag())) break;
940  q = 0;
941  }
942  // remove it
943  if (q) {
944  fQueries.remove(q);
945  delete q;
946  }
947 
948  // Done
949  return;
950 }
951 
952 ////////////////////////////////////////////////////////////////////////////////
953 /// Decrease active session counters on worker w
954 
955 static int CountEffectiveSessions(const char *, XrdProofWorker *w, void *s)
956 {
957  int *actw = (int *)s;
958  if (w && actw) {
959  *actw += w->GetNActiveSessions();
960  // Check next
961  return 0;
962  }
963 
964  // Not enough info: stop
965  return 1;
966 }
967 
968 ////////////////////////////////////////////////////////////////////////////////
969 /// Calculate the effective number of users on this session nodes
970 /// and communicate it to the master together with the total number
971 /// of sessions and the number of active sessions. for monitoring issues.
972 
973 void XrdProofdProofServ::SendClusterInfo(int nsess, int nacti)
974 {
975  XPDLOC(PMGR, "SendClusterInfo")
976 
977  // Only if we are active
978  if (fWorkers.Num() <= 0) return;
979 
980  int actw = 0;
981  fWorkers.Apply(CountEffectiveSessions, (void *)&actw);
982  // The number of effective sessions * 1000
983  int neffs = (actw*1000)/fWorkers.Num();
984  TRACE(DBG, "# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.);
985 
986  XrdSysMutexHelper mhp(fMutex);
987 
988  // Prepare buffer
989  int len = 3*sizeof(kXR_int32);
990  char *buf = new char[len];
991  kXR_int32 off = 0;
992  kXR_int32 itmp = nsess;
993  itmp = static_cast<kXR_int32>(htonl(itmp));
994  memcpy(buf + off, &itmp, sizeof(kXR_int32));
995  off += sizeof(kXR_int32);
996  itmp = nacti;
997  itmp = static_cast<kXR_int32>(htonl(itmp));
998  memcpy(buf + off, &itmp, sizeof(kXR_int32));
999  off += sizeof(kXR_int32);
1000  itmp = neffs;
1001  itmp = static_cast<kXR_int32>(htonl(itmp));
1002  memcpy(buf + off, &itmp, sizeof(kXR_int32));
1003  // Send over
1004  if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
1005  // Failure
1006  TRACE(XERR,"problems sending proofserv");
1007  }
1008  SafeDelArray(buf);
1009 }
1010 
1011 ////////////////////////////////////////////////////////////////////////////////
1012 /// Calculate the effective number of users on this session nodes
1013 /// and communicate it to the master together with the total number
1014 /// of sessions and the number of active sessions. for monitoring issues.
1015 
1016 int XrdProofdProofServ::CheckSession(bool oldvers, bool isrec,
1017  int shutopt, int shutdel, bool changeown, int &nc)
1018 {
1019  XPDLOC(PMGR, "SendClusterInfo")
1020 
1021  XrdOucString emsg;
1022  bool rmsession = 0;
1023  nc = -1;
1024  { XrdSysMutexHelper mhp(fMutex);
1025 
1026  bool skipcheck = fSkipCheck;
1027  fSkipCheck = false;
1028 
1029  if (!skipcheck || oldvers) {
1030  nc = 0;
1031  // Remove this from the list of clients
1032  std::vector<XrdClientID *>::iterator i;
1033  for (i = fClients.begin(); i != fClients.end(); ++i) {
1034  if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
1035  }
1036  // Check if we need to shutdown it
1037  if (nc <= 0 && (!isrec || oldvers)) {
1038  int idlet = -1, disct = -1, now = time(0);
1039  if (fStatus == kXPD_idle)
1040  idlet = now - fSetIdleTime;
1041  if (idlet <= 0) idlet = -1;
1042  if (fDisconnectTime > 0)
1043  disct = now - fDisconnectTime;
1044  if (disct <= 0) disct = -1;
1045  if ((fSrvType != kXPD_TopMaster) ||
1046  (shutopt == 1 && (idlet >= shutdel)) ||
1047  (shutopt == 2 && (disct >= shutdel))) {
1048  // Send a terminate signal to the proofserv
1049  if (fSrvPID > -1) {
1050  XrdProofUI ui;
1051  XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
1052  if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
1053  XPDFORM(emsg, "ord: problems signalling process: %d", fSrvPID);
1054  }
1055  fIsShutdown = true;
1056  }
1057  rmsession = 1;
1058  }
1059  }
1060  }
1061  }
1062  // Notify error, if any
1063  if (emsg.length() > 0) {
1064  TRACE(XERR,emsg.c_str());
1065  }
1066  // Done
1067  return rmsession;
1068 }