Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TXProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TXProofServ
13 \ingroup proofx
14 
15 This class implements the XProofD version of TProofServ, with respect to which it differs only
16 for the underlying connection technology.
17 
18 */
19 
20 #include "RConfigure.h"
21 #include <ROOT/RConfig.hxx>
22 #include "Riostream.h"
23 
24 #ifdef WIN32
25  #include <io.h>
26  typedef long off_t;
27 #endif
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <utime.h>
31 
32 #include "TXProofServ.h"
33 #include "TObjString.h"
34 #include "TEnv.h"
35 #include "TError.h"
36 #include "TException.h"
37 #include "THashList.h"
38 #include "TInterpreter.h"
39 #include "TParameter.h"
40 #include "TProofDebug.h"
41 #include "TProof.h"
42 #include "TProofPlayer.h"
43 #include "TQueryResultManager.h"
44 #include "TRegexp.h"
45 #include "TClass.h"
46 #include "TROOT.h"
47 #include "TSystem.h"
48 #include "TPluginManager.h"
49 #include "TXSocketHandler.h"
50 #include "TXUnixSocket.h"
51 #include "compiledata.h"
52 #include "TProofNodeInfo.h"
53 #include "XProofProtocol.h"
54 
57 
58 
59 // debug hook
60 static volatile Int_t gProofServDebug = 1;
61 
62 //----- SigPipe signal handler -------------------------------------------------
63 ////////////////////////////////////////////////////////////////////////////////
64 
65 class TXProofServSigPipeHandler : public TSignalHandler {
66  TXProofServ *fServ;
67 public:
68  TXProofServSigPipeHandler(TXProofServ *s) : TSignalHandler(kSigInterrupt, kFALSE)
69  { fServ = s; }
70  Bool_t Notify();
71 };
72 
73 ////////////////////////////////////////////////////////////////////////////////
74 
75 Bool_t TXProofServSigPipeHandler::Notify()
76 {
77  fServ->HandleSigPipe();
78  return kTRUE;
79 }
80 
81 //----- Termination signal handler ---------------------------------------------
82 ////////////////////////////////////////////////////////////////////////////////
83 
84 class TXProofServTerminationHandler : public TSignalHandler {
85  TXProofServ *fServ;
86 public:
87  TXProofServTerminationHandler(TXProofServ *s)
88  : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
89  Bool_t Notify();
90 };
91 
92 ////////////////////////////////////////////////////////////////////////////////
93 
94 Bool_t TXProofServTerminationHandler::Notify()
95 {
96  Printf("Received SIGTERM: terminating");
97 
98  fServ->HandleTermination();
99  return kTRUE;
100 }
101 
102 //----- Seg violation signal handler ---------------------------------------------
103 ////////////////////////////////////////////////////////////////////////////////
104 
105 class TXProofServSegViolationHandler : public TSignalHandler {
106  TXProofServ *fServ;
107 public:
108  TXProofServSegViolationHandler(TXProofServ *s)
109  : TSignalHandler(kSigSegmentationViolation, kFALSE) { fServ = s; }
110  Bool_t Notify();
111 };
112 
113 ////////////////////////////////////////////////////////////////////////////////
114 
115 Bool_t TXProofServSegViolationHandler::Notify()
116 {
117  Printf("**** ");
118  Printf("**** Segmentation violation: terminating ****");
119  Printf("**** ");
120  fServ->HandleTermination();
121  return kTRUE;
122 }
123 
124 //----- Input handler for messages from parent or master -----------------------
125 ////////////////////////////////////////////////////////////////////////////////
126 
127 class TXProofServInputHandler : public TFileHandler {
128  TXProofServ *fServ;
129 public:
130  TXProofServInputHandler(TXProofServ *s, Int_t fd) : TFileHandler(fd, 1)
131  { fServ = s; }
132  Bool_t Notify();
133  Bool_t ReadNotify() { return Notify(); }
134 };
135 
136 ////////////////////////////////////////////////////////////////////////////////
137 
138 Bool_t TXProofServInputHandler::Notify()
139 {
140  fServ->HandleSocketInput();
141  // This request has been completed: remove the client ID from the pipe
142  ((TXUnixSocket *) fServ->GetSocket())->RemoveClientID();
143  return kTRUE;
144 }
145 
146 ClassImp(TXProofServ);
147 
148 // Hook to the constructor. This is needed to avoid using the plugin manager
149 // which may create problems in multi-threaded environments.
150 extern "C" {
151  TApplication *GetTXProofServ(Int_t *argc, char **argv, FILE *flog)
152  { return new TXProofServ(argc, argv, flog); }
153 }
154 
155 ////////////////////////////////////////////////////////////////////////////////
156 /// Main constructor
157 
158 TXProofServ::TXProofServ(Int_t *argc, char **argv, FILE *flog)
159  : TProofServ(argc, argv, flog)
160 {
161  fInterruptHandler = 0;
162  fInputHandler = 0;
163  fTerminated = kFALSE;
164 
165  // TODO:
166  // Int_t useFIFO = 0;
167 /* if (GetParameter(fProof->GetInputList(), "PROOF_UseFIFO", useFIFO) != 0) {
168  if (useFIFO == 1)
169  Info("", "enablig use of FIFO (if allowed by the server)");
170  else
171  Warning("", "unsupported strategy index (%d): ignore", strategy);
172  }
173 */
174 }
175 
176 ////////////////////////////////////////////////////////////////////////////////
177 /// Finalize the server setup. If master, create the TProof instance to talk
178 /// the worker or submaster nodes.
179 /// Return 0 on success, -1 on error
180 
181 Int_t TXProofServ::CreateServer()
182 {
183  Bool_t xtest = (Argc() > 3 && !strcmp(Argv(3), "test")) ? kTRUE : kFALSE;
184 
185  if (gProofDebugLevel > 0)
186  Info("CreateServer", "starting%s server creation", (xtest ? " test" : ""));
187 
188  // Get file descriptor for log file
189  if (fLogFile) {
190  // Use the file already open by pmain
191  if ((fLogFileDes = fileno(fLogFile)) < 0) {
192  Error("CreateServer", "resolving the log file description number");
193  return -1;
194  }
195  // Hide the session start-up logs unless we are in verbose mode
196  if (gProofDebugLevel <= 0)
197  lseek(fLogFileDes, (off_t) 0, SEEK_END);
198  }
199 
200  // Global location string in TXSocket
201  TXSocket::SetLocation((IsMaster()) ? "master" : "slave");
202 
203  // Set debug level in XrdClient
204  EnvPutInt(NAME_DEBUG, gEnv->GetValue("XNet.Debug", 0));
205 
206  // Get socket to be used to call back our xpd
207  if (xtest) {
208  // test session, just send the protocol version on the open pipe
209  // and exit
210  if (!(fSockPath = gSystem->Getenv("ROOTOPENSOCK"))) {
211  Error("CreateServer", "test: socket setup by xpd undefined");
212  return -1;
213  }
214  Int_t fpw = (Int_t) strtol(fSockPath.Data(), 0, 10);
215  int proto = htonl(kPROOF_Protocol);
216  fSockPath = "";
217  if (write(fpw, &proto, sizeof(proto)) != sizeof(proto)) {
218  Error("CreateServer", "test: sending protocol number");
219  return -1;
220  }
221  exit(0);
222  } else {
223  fSockPath = gEnv->GetValue("ProofServ.OpenSock", "");
224  if (fSockPath.Length() <= 0) {
225  Error("CreateServer", "socket setup by xpd undefined");
226  return -1;
227  }
228  TString entity = gEnv->GetValue("ProofServ.Entity", "");
229  if (entity.Length() > 0)
230  fSockPath.Insert(0,Form("%s/", entity.Data()));
231  }
232 
233  // Get open socket descriptor, if any
234  Int_t sockfd = -1;
235  const char *opensock = gSystem->Getenv("ROOTOPENSOCK");
236  if (opensock && strlen(opensock) > 0) {
237  TSystem::ResetErrno();
238  sockfd = (Int_t) strtol(opensock, 0, 10);
239  if (TSystem::GetErrno() == ERANGE) {
240  sockfd = -1;
241  Warning("CreateServer", "socket descriptor: wrong conversion from '%s'", opensock);
242  }
243  if (sockfd > 0 && gProofDebugLevel > 0)
244  Info("CreateServer", "using open connection (descriptor %d)", sockfd);
245  }
246 
247  // Get the sessions ID
248  Int_t psid = gEnv->GetValue("ProofServ.SessionID", -1);
249  if (psid < 0) {
250  Error("CreateServer", "Session ID undefined");
251  return -1;
252  }
253 
254  // Call back the server
255  fSocket = new TXUnixSocket(fSockPath, psid, -1, this, sockfd);
256  if (!fSocket || !(fSocket->IsValid())) {
257  Error("CreateServer", "Failed to open connection to XrdProofd coordinator");
258  return -1;
259  }
260  // Set compression level, if any
261  fSocket->SetCompressionSettings(fCompressMsg);
262 
263  // Set the title for debugging
264  TString tgt("client");
265  if (fOrdinal != "0") {
266  tgt = fOrdinal;
267  if (tgt.Last('.') != kNPOS) tgt.Remove(tgt.Last('.'));
268  }
269  fSocket->SetTitle(tgt);
270 
271  // Set the this as reference of this socket
272  ((TXSocket *)fSocket)->fReference = this;
273 
274  // Get socket descriptor
275  Int_t sock = fSocket->GetDescriptor();
276 
277  // Install message input handlers
278  fInputHandler =
279  TXSocketHandler::GetSocketHandler(new TXProofServInputHandler(this, sock), fSocket);
280  gSystem->AddFileHandler(fInputHandler);
281 
282  // Get the client ID
283  Int_t cid = gEnv->GetValue("ProofServ.ClientID", -1);
284  if (cid < 0) {
285  Error("CreateServer", "Client ID undefined");
286  SendLogFile();
287  return -1;
288  }
289  ((TXSocket *)fSocket)->SetClientID(cid);
290 
291  // debug hooks
292  if (IsMaster()) {
293  // wait (loop) in master to allow debugger to connect
294  if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
295  while (gProofServDebug)
296  ;
297  }
298  } else {
299  // wait (loop) in slave to allow debugger to connect
300  if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
301  while (gProofServDebug)
302  ;
303  }
304  }
305 
306  if (gProofDebugLevel > 0)
307  Info("CreateServer", "Service: %s, ConfDir: %s, IsMaster: %d",
308  fService.Data(), fConfDir.Data(), (Int_t)fMasterServ);
309 
310  if (Setup() == -1) {
311  // Setup failure
312  LogToMaster();
313  SendLogFile();
314  Terminate(0);
315  return -1;
316  }
317 
318  if (!fLogFile) {
319  RedirectOutput();
320  // If for some reason we failed setting a redirection file for the logs
321  // we cannot continue
322  if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
323  LogToMaster();
324  SendLogFile(-98);
325  Terminate(0);
326  return -1;
327  }
328  }
329 
330  // Send message of the day to the client
331  if (IsMaster()) {
332  if (CatMotd() == -1) {
333  LogToMaster();
334  SendLogFile(-99);
335  Terminate(0);
336  return -1;
337  }
338  }
339 
340  // Everybody expects iostream to be available, so load it...
341  ProcessLine("#include <iostream>", kTRUE);
342  ProcessLine("#include <string>",kTRUE); // for std::string iostream.
343 
344  // Load user functions
345  const char *logon;
346  logon = gEnv->GetValue("Proof.Load", (char *)0);
347  if (logon) {
348  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
349  if (mac)
350  ProcessLine(Form(".L %s", logon), kTRUE);
351  delete [] mac;
352  }
353 
354  // Execute logon macro
355  logon = gEnv->GetValue("Proof.Logon", (char *)0);
356  if (logon && !NoLogOpt()) {
357  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
358  if (mac)
359  ProcessFile(logon);
360  delete [] mac;
361  }
362 
363  // Save current interpreter context
364  gInterpreter->SaveContext();
365  gInterpreter->SaveGlobalsContext();
366 
367  // if master, start slave servers
368  if (IsMaster()) {
369  TString master;
370 
371  if (fConfFile.BeginsWith("lite:")) {
372  master = "lite://";
373  } else {
374  master.Form("proof://%s@__master__", fUser.Data());
375 
376  // Add port, if defined
377  Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
378  if (port > -1) {
379  master += ":";
380  master += port;
381  }
382  }
383 
384  // Make sure that parallel startup via threads is not active
385  // (it is broken for xpd because of the locks on gInterpreterMutex)
386  gEnv->SetValue("Proof.ParallelStartup", 0);
387 
388  // Get plugin manager to load appropriate TProof from
389  TPluginManager *pm = gROOT->GetPluginManager();
390  if (!pm) {
391  Error("CreateServer", "no plugin manager found");
392  SendLogFile(-99);
393  Terminate(0);
394  return -1;
395  }
396 
397  // Find the appropriate handler
398  TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
399  if (!h) {
400  Error("CreateServer", "no plugin found for TProof with a"
401  " config file of '%s'", fConfFile.Data());
402  SendLogFile(-99);
403  Terminate(0);
404  return -1;
405  }
406 
407  // load the plugin
408  if (h->LoadPlugin() == -1) {
409  Error("CreateServer", "plugin for TProof could not be loaded");
410  SendLogFile(-99);
411  Terminate(0);
412  return -1;
413  }
414 
415  // Make instance of TProof
416  if (fConfFile.BeginsWith("lite:")) {
417  // Remove input and signal handlers to avoid spurious "signals"
418  // during startup
419  gSystem->RemoveFileHandler(fInputHandler);
420  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(6, master.Data(),
421  0, 0,
422  fLogLevel,
423  fSessionDir.Data(), 0));
424  // Re-enable input and signal handlers
425  gSystem->AddFileHandler(fInputHandler);
426  } else {
427  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
428  fConfFile.Data(),
429  fConfDir.Data(),
430  fLogLevel,
431  fTopSessionTag.Data()));
432  }
433 
434  // Save worker info
435  if (fProof) fProof->SaveWorkerInfo();
436 
437  if (!fProof || (fProof && !fProof->IsValid())) {
438  Error("CreateServer", "plugin for TProof could not be executed");
439  FlushLogFile();
440  delete fProof;
441  fProof = 0;
442  SendLogFile(-99);
443  Terminate(0);
444  return -1;
445  }
446  // Find out if we are a master in direct contact only with workers
447  fEndMaster = fProof->IsEndMaster();
448 
449  SendLogFile();
450  }
451 
452  // Setup the shutdown timer
453  if (!fShutdownTimer) {
454  // Check activity on socket every 5 mins
455  fShutdownTimer = new TShutdownTimer(this, 300000);
456  fShutdownTimer->Start(-1, kFALSE);
457  }
458 
459  // Check if schema evolution is effective: clients running versions <=17 do not
460  // support that: send a warning message
461  if (fProtocol <= 17) {
462  TString msg;
463  msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
464  " This may generate compatibility problems between streamed objects.\n"
465  " The advise is to move to ROOT >= 5.21/02 .");
466  SendAsynMessage(msg.Data());
467  }
468 
469  // Setup the idle timer
470  if (IsMaster() && !fIdleTOTimer) {
471  // Check activity on socket every 5 mins
472  Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
473  if (idle_to > 0) {
474  fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
475  fIdleTOTimer->Start(-1, kTRUE);
476  if (gProofDebugLevel > 0)
477  Info("CreateServer", " idle timer started (%d secs)", idle_to);
478  } else if (gProofDebugLevel > 0) {
479  Info("CreateServer", " idle timer not started (no idle timeout requested)");
480  }
481  }
482 
483  // Done
484  return 0;
485 }
486 
487 ////////////////////////////////////////////////////////////////////////////////
488 /// Cleanup. Not really necessary since after this dtor there is no
489 /// live anyway.
490 
491 TXProofServ::~TXProofServ()
492 {
493  delete fSocket;
494 }
495 
496 ////////////////////////////////////////////////////////////////////////////////
497 /// Handle high priority data sent by the master or client.
498 
499 void TXProofServ::HandleUrgentData()
500 {
501  // Real-time notification of messages
502  TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
503 
504  // Get interrupt
505  Bool_t fw = kFALSE;
506  Int_t iLev = ((TXSocket *)fSocket)->GetInterrupt(fw);
507  if (iLev < 0) {
508  Error("HandleUrgentData", "error receiving interrupt");
509  return;
510  }
511 
512  PDB(kGlobal, 2)
513  Info("HandleUrgentData", "got interrupt: %d\n", iLev);
514 
515  if (fProof)
516  fProof->SetActive();
517 
518  switch (iLev) {
519 
520  case TProof::kPing:
521  PDB(kGlobal, 2)
522  Info("HandleUrgentData", "*** Ping");
523 
524  // If master server, propagate interrupt to slaves
525  if (fw && IsMaster()) {
526  Int_t nbad = fProof->fActiveSlaves->GetSize() - fProof->Ping();
527  if (nbad > 0) {
528  Info("HandleUrgentData","%d slaves did not reply to ping",nbad);
529  }
530  }
531 
532  // Touch the admin path to show we are alive
533  if (fAdminPath.IsNull()) {
534  fAdminPath = gEnv->GetValue("ProofServ.AdminPath", "");
535  }
536 
537  if (!fAdminPath.IsNull()) {
538  if (!fAdminPath.EndsWith(".status")) {
539  // Update file time stamps
540  if (utime(fAdminPath.Data(), 0) != 0)
541  Info("HandleUrgentData", "problems touching path: %s", fAdminPath.Data());
542  else
543  PDB(kGlobal, 2)
544  Info("HandleUrgentData", "touching path: %s", fAdminPath.Data());
545  } else {
546  // Update the status in the file
547  // 0 idle
548  // 1 running
549  // 2 being terminated (currently unused)
550  // 3 queued
551  // 4 idle timed-out
552  Int_t uss_rc = UpdateSessionStatus(-1);
553  if (uss_rc != 0)
554  Error("HandleUrgentData", "problems updating status path: %s (errno: %d)", fAdminPath.Data(), -uss_rc);
555  }
556  } else {
557  Info("HandleUrgentData", "admin path undefined");
558  }
559 
560  break;
561 
562  case TProof::kHardInterrupt:
563  Info("HandleUrgentData", "*** Hard Interrupt");
564 
565  // If master server, propagate interrupt to slaves
566  if (fw && IsMaster())
567  fProof->Interrupt(TProof::kHardInterrupt);
568 
569  // Flush input socket
570  ((TXSocket *)fSocket)->Flush();
571 
572  if (IsMaster())
573  SendLogFile();
574 
575  break;
576 
577  case TProof::kSoftInterrupt:
578  Info("HandleUrgentData", "Soft Interrupt");
579 
580  // If master server, propagate interrupt to slaves
581  if (fw && IsMaster())
582  fProof->Interrupt(TProof::kSoftInterrupt);
583 
584  Interrupt();
585 
586  if (IsMaster())
587  SendLogFile();
588 
589  break;
590 
591 
592  case TProof::kShutdownInterrupt:
593  Info("HandleUrgentData", "Shutdown Interrupt");
594 
595  // When returning for here connection are closed
596  HandleTermination();
597 
598  break;
599 
600  default:
601  Error("HandleUrgentData", "unexpected type: %d", iLev);
602  break;
603  }
604 
605 
606  if (fProof) fProof->SetActive(kFALSE);
607 }
608 
609 ////////////////////////////////////////////////////////////////////////////////
610 /// Called when the client is not alive anymore; terminate the session.
611 
612 void TXProofServ::HandleSigPipe()
613 {
614  // Real-time notification of messages
615 
616  Info("HandleSigPipe","got sigpipe ... do nothing");
617 }
618 
619 ////////////////////////////////////////////////////////////////////////////////
620 /// Called when the client is not alive anymore; terminate the session.
621 
622 void TXProofServ::HandleTermination()
623 {
624  // If master server, propagate interrupt to slaves
625  // (shutdown interrupt send internally).
626  if (IsMaster()) {
627 
628  // If not idle, try first to stop processing
629  if (!fIdle) {
630  // Remove pending requests
631  fWaitingQueries->Delete();
632  // Interrupt the current monitor
633  fProof->InterruptCurrentMonitor();
634  // Do not wait for ever, but al least 20 seconds
635  Long_t timeout = gEnv->GetValue("Proof.ShutdownTimeout", 60);
636  timeout = (timeout > 20) ? timeout : 20;
637  // Processing will be aborted
638  fProof->StopProcess(kTRUE, (Long_t) (timeout / 2));
639  // Receive end-of-processing messages, but do not wait for ever
640  fProof->Collect(TProof::kActive, timeout);
641  // Still not idle
642  if (!fIdle)
643  Warning("HandleTermination","processing could not be stopped");
644  }
645  // Close the session
646  if (fProof)
647  fProof->Close("S");
648  }
649 
650  Terminate(0); // will not return from here....
651 }
652 
653 ////////////////////////////////////////////////////////////////////////////////
654 /// Print the ProofServ logo on standard output.
655 /// Return 0 on success, -1 on error
656 
657 Int_t TXProofServ::Setup()
658 {
659  char str[512];
660 
661  if (IsMaster()) {
662  snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
663  } else {
664  snprintf(str, 512, "**** PROOF worker server @ %s started ****", gSystem->HostName());
665  }
666 
667  if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
668  Error("Setup", "failed to send proof server startup message");
669  return -1;
670  }
671 
672  // Get client protocol
673  if ((fProtocol = gEnv->GetValue("ProofServ.ClientVersion", -1)) < 0) {
674  Error("Setup", "remote proof protocol missing");
675  return -1;
676  }
677 
678  // The local user
679  fUser = gEnv->GetValue("ProofServ.Entity", "");
680  if (fUser.Length() >= 0) {
681  if (fUser.Contains(":"))
682  fUser.Remove(fUser.Index(":"));
683  if (fUser.Contains("@"))
684  fUser.Remove(fUser.Index("@"));
685  } else {
686  UserGroup_t *pw = gSystem->GetUserInfo();
687  if (pw) {
688  fUser = pw->fUser;
689  delete pw;
690  }
691  }
692 
693  // Work dir and ...
694  if (IsMaster()) {
695  TString cf = gEnv->GetValue("ProofServ.ProofConfFile", "");
696  if (cf.Length() > 0)
697  fConfFile = cf;
698  }
699  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", Form("~/%s", kPROOF_WorkDir));
700 
701  // Get Session tag
702  if ((fSessionTag = gEnv->GetValue("ProofServ.SessionTag", "-1")) == "-1") {
703  Error("Setup", "Session tag missing");
704  return -1;
705  }
706  // Get top session tag, i.e. the tag of the PROOF session
707  if ((fTopSessionTag = gEnv->GetValue("ProofServ.TopSessionTag", "-1")) == "-1") {
708  fTopSessionTag = "";
709  // Try to extract it from log file path (for backward compatibility)
710  if (gSystem->Getenv("ROOTPROOFLOGFILE")) {
711  fTopSessionTag = gSystem->DirName(gSystem->Getenv("ROOTPROOFLOGFILE"));
712  Ssiz_t lstl;
713  if ((lstl = fTopSessionTag.Last('/')) != kNPOS) fTopSessionTag.Remove(0, lstl + 1);
714  if (fTopSessionTag.BeginsWith("session-")) {
715  fTopSessionTag.Remove(0, strlen("session-"));
716  } else {
717  fTopSessionTag = "";
718  }
719  }
720  if (fTopSessionTag.IsNull()) {
721  Error("Setup", "top session tag missing");
722  return -1;
723  }
724  }
725 
726  // Make sure the process ID is in the tag
727  TString spid = Form("-%d", gSystem->GetPid());
728  if (!fSessionTag.EndsWith(spid)) {
729  Int_t nd = 0;
730  if ((nd = fSessionTag.CountChar('-')) >= 2) {
731  Int_t id = fSessionTag.Index("-", fSessionTag.Index("-") + 1);
732  if (id != kNPOS) fSessionTag.Remove(id);
733  } else if (nd != 1) {
734  Warning("Setup", "Wrong number of '-' in session tag: protocol error? %s", fSessionTag.Data());
735  }
736  // Add this process ID
737  fSessionTag += spid;
738  }
739  if (gProofDebugLevel > 0)
740  Info("Setup", "session tags: %s, %s", fTopSessionTag.Data(), fSessionTag.Data());
741 
742  // Get Session dir (sandbox)
743  if ((fSessionDir = gEnv->GetValue("ProofServ.SessionDir", "-1")) == "-1") {
744  Error("Setup", "Session dir missing");
745  return -1;
746  }
747 
748  // Goto to the main PROOF working directory
749  char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
750  fWorkDir = workdir;
751  delete [] workdir;
752  if (gProofDebugLevel > 0)
753  Info("Setup", "working directory set to %s", fWorkDir.Data());
754 
755  // Common setup
756  if (SetupCommon() != 0) {
757  Error("Setup", "common setup failed");
758  return -1;
759  }
760 
761  // Send packages off immediately to reduce latency
762  fSocket->SetOption(kNoDelay, 1);
763 
764  // Check every two hours if client is still alive
765  fSocket->SetOption(kKeepAlive, 1);
766 
767  // Install SigPipe handler to handle kKeepAlive failure
768  gSystem->AddSignalHandler(new TXProofServSigPipeHandler(this));
769 
770  // Install Termination handler
771  gSystem->AddSignalHandler(new TXProofServTerminationHandler(this));
772 
773  // Install seg violation handler
774  gSystem->AddSignalHandler(new TXProofServSegViolationHandler(this));
775 
776  if (gProofDebugLevel > 0)
777  Info("Setup", "successfully completed");
778 
779  // Done
780  return 0;
781 }
782 
783 ////////////////////////////////////////////////////////////////////////////////
784 /// Get list of workers to be used from now on.
785 /// The list must be provided by the caller.
786 
787 TProofServ::EQueryAction TXProofServ::GetWorkers(TList *workers,
788  Int_t & /* prioritychange */,
789  Bool_t resume)
790 {
791  TProofServ::EQueryAction rc = kQueryStop;
792 
793  // User config files, when enabled, override cluster-wide configuration
794  if (gEnv->GetValue("ProofServ.UseUserCfg", 0) != 0) {
795  Int_t pc = 1;
796  return TProofServ::GetWorkers(workers, pc);
797  }
798 
799  // seqnum of the query for which we call getworkers
800  Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
801  TString seqnum = (dynamicStartup) ? "" : XPD_GW_Static;
802  if (!fWaitingQueries->IsEmpty()) {
803  if (resume) {
804  seqnum += ((TProofQueryResult *)(fWaitingQueries->First()))->GetSeqNum();
805  } else {
806  seqnum += ((TProofQueryResult *)(fWaitingQueries->Last()))->GetSeqNum();
807  }
808  }
809  // Send request to the coordinator
810  TObjString *os = 0;
811  if (dynamicStartup) {
812  // We wait dynto seconds for the first worker to come; -1 means forever
813  Int_t dynto = gEnv->GetValue("Proof.DynamicStartupTimeout", -1);
814  Bool_t doto = (dynto > 0) ? kTRUE : kFALSE;
815  while (!(os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data()))) {
816  if (doto > 0 && --dynto < 0) break;
817  // Another second
818  gSystem->Sleep(1000);
819  }
820  } else {
821  os = ((TXSocket *)fSocket)->SendCoordinator(kGetWorkers, seqnum.Data());
822  }
823 
824  // The reply contains some information about the master (image, workdir)
825  // followed by the information about the workers; the tokens for each node
826  // are separated by '&'
827  if (os) {
828  TString fl(os->GetName());
829  if (fl.BeginsWith(XPD_GW_QueryEnqueued)) {
830  SendAsynMessage("+++ Query cannot be processed now: enqueued");
831  return kQueryEnqueued;
832  }
833 
834  // Honour a max number of workers request (typically when running in valgrind)
835  Int_t nwrks = -1;
836  Bool_t pernode = kFALSE;
837  if (gSystem->Getenv("PROOF_NWORKERS")) {
838  TString s(gSystem->Getenv("PROOF_NWORKERS"));
839  if (s.EndsWith("x")) {
840  pernode = kTRUE;
841  s.ReplaceAll("x", "");
842  }
843  if (s.IsDigit()) {
844  nwrks = s.Atoi();
845  if (!dynamicStartup && (nwrks > 0)) {
846  // Notify, except in dynamic workers mode to avoid flooding
847  TString msg;
848  if (pernode) {
849  msg.Form("+++ Starting max %d workers per node following the setting of PROOF_NWORKERS", nwrks);
850  } else {
851  msg.Form("+++ Starting max %d workers following the setting of PROOF_NWORKERS", nwrks);
852  }
853  SendAsynMessage(msg);
854  } else {
855  nwrks = -1;
856  }
857  } else {
858  pernode = kFALSE;
859  }
860  }
861 
862  TString tok;
863  Ssiz_t from = 0;
864  TList *nodecnt = (pernode) ? new TList : 0 ;
865  if (fl.Tokenize(tok, from, "&")) {
866  if (!tok.IsNull()) {
867  TProofNodeInfo *master = new TProofNodeInfo(tok);
868  if (!master) {
869  Error("GetWorkers", "no appropriate master line got from coordinator");
870  return kQueryStop;
871  } else {
872  // Set image if not yet done and available
873  if (fImage.IsNull() && strlen(master->GetImage()) > 0)
874  fImage = master->GetImage();
875  SafeDelete(master);
876  }
877  // Now the workers
878  while (fl.Tokenize(tok, from, "&")) {
879  if (!tok.IsNull()) {
880  if (nwrks == -1 || nwrks > 0) {
881  // We have the minimal set of information to start
882  rc = kQueryOK;
883  if (pernode && nodecnt) {
884  TProofNodeInfo *ni = new TProofNodeInfo(tok);
885  TParameter<Int_t> *p = 0;
886  Int_t nw = 0;
887  if (!(p = (TParameter<Int_t> *) nodecnt->FindObject(ni->GetNodeName().Data()))) {
888  p = new TParameter<Int_t>(ni->GetNodeName().Data(), nw);
889  nodecnt->Add(p);
890  }
891  nw = p->GetVal();
892  if (gDebug > 0)
893  Info("GetWorkers","%p: name: %s (%s) val: %d (nwrks: %d)",
894  p, p->GetName(), ni->GetNodeName().Data(), nw, nwrks);
895  if (nw < nwrks) {
896  if (workers) workers->Add(ni);
897  nw++;
898  p->SetVal(nw);
899  } else {
900  // Two many workers on this machine already
901  SafeDelete(ni);
902  }
903  } else {
904  if (workers)
905  workers->Add(new TProofNodeInfo(tok));
906  // Count down
907  if (nwrks != -1) nwrks--;
908  }
909  } else {
910  // Release this worker (to cleanup the session list in the coordinator and get a fresh
911  // and correct list next call)
912  TProofNodeInfo *ni = new TProofNodeInfo(tok);
913  ReleaseWorker(ni->GetOrdinal().Data());
914  }
915  }
916  }
917  }
918  }
919  // Cleanup
920  if (nodecnt) {
921  nodecnt->SetOwner(kTRUE);
922  SafeDelete(nodecnt);
923  }
924  }
925 
926  // We are done
927  return rc;
928 }
929 
930 ////////////////////////////////////////////////////////////////////////////////
931 /// Handle error on the input socket
932 
933 Bool_t TXProofServ::HandleError(const void *)
934 {
935  // Try reconnection
936  if (fSocket && !fSocket->IsValid()) {
937 
938  fSocket->Reconnect();
939  if (fSocket && fSocket->IsValid()) {
940  if (gDebug > 0)
941  Info("HandleError",
942  "%p: connection to local coordinator re-established", this);
943  FlushLogFile();
944  return kFALSE;
945  }
946  }
947  Printf("TXProofServ::HandleError: %p: got called ...", this);
948 
949  // If master server, propagate interrupt to slaves
950  // (shutdown interrupt send internally).
951  if (IsMaster())
952  fProof->Close("S");
953 
954  // Avoid communicating back anything to the coordinator (it is gone)
955  if (fSocket) ((TXSocket *)fSocket)->SetSessionID(-1);
956 
957  Terminate(0);
958 
959  Printf("TXProofServ::HandleError: %p: DONE ... ", this);
960 
961  // We are done
962  return kTRUE;
963 }
964 
965 ////////////////////////////////////////////////////////////////////////////////
966 /// Handle asynchronous input on the input socket
967 
968 Bool_t TXProofServ::HandleInput(const void *in)
969 {
970  if (gDebug > 2)
971  Printf("TXProofServ::HandleInput %p, in: %p", this, in);
972 
973  XHandleIn_t *hin = (XHandleIn_t *) in;
974  Int_t acod = (hin) ? hin->fInt1 : kXPD_msg;
975 
976  // Act accordingly
977  if (acod == kXPD_ping || acod == kXPD_interrupt) {
978  // Interrupt or Ping
979  HandleUrgentData();
980 
981  } else if (acod == kXPD_flush) {
982  // Flush stdout, so that we can access the full log file
983  Info("HandleInput","kXPD_flush: flushing log file (stdout)");
984  fflush(stdout);
985 
986  } else if (acod == kXPD_urgent) {
987  // Get type
988  Int_t type = hin->fInt2;
989  switch (type) {
990  case TXSocket::kStopProcess:
991  {
992  // Abort or Stop ?
993  Bool_t abort = (hin->fInt3 != 0) ? kTRUE : kFALSE;
994  // Timeout
995  Int_t timeout = hin->fInt4;
996  // Act now
997  if (fProof)
998  fProof->StopProcess(abort, timeout);
999  else
1000  if (fPlayer)
1001  fPlayer->StopProcess(abort, timeout);
1002  }
1003  break;
1004  default:
1005  Info("HandleInput","kXPD_urgent: unknown type: %d", type);
1006  }
1007 
1008  } else if (acod == kXPD_inflate) {
1009 
1010  // Obsolete type
1011  Warning("HandleInput", "kXPD_inflate: obsolete message type");
1012 
1013  } else if (acod == kXPD_priority) {
1014 
1015  // The factor is the priority to be propagated
1016  fGroupPriority = hin->fInt2;
1017  if (fProof)
1018  fProof->BroadcastGroupPriority(fGroup, fGroupPriority);
1019  // Notify
1020  Info("HandleInput", "kXPD_priority: group %s priority set to %f",
1021  fGroup.Data(), (Float_t) fGroupPriority / 100.);
1022 
1023  } else if (acod == kXPD_clusterinfo) {
1024 
1025  // Information about the cluster status
1026  fTotSessions = hin->fInt2;
1027  fActSessions = hin->fInt3;
1028  fEffSessions = (hin->fInt4)/1000.;
1029  // Notify
1030  Info("HandleInput", "kXPD_clusterinfo: tot: %d, act: %d, eff: %f",
1031  fTotSessions, fActSessions, fEffSessions);
1032 
1033  } else {
1034  // Standard socket input
1035  HandleSocketInput();
1036  // This request has been completed: remove the client ID from the pipe
1037  ((TXSocket *)fSocket)->RemoveClientID();
1038  }
1039 
1040  // We are done
1041  return kTRUE;
1042 }
1043 
1044 ////////////////////////////////////////////////////////////////////////////////
1045 /// Disable read timeout on the underlying socket
1046 
1047 void TXProofServ::DisableTimeout()
1048 {
1049  if (fSocket)
1050  ((TXSocket *)fSocket)->DisableTimeout();
1051 }
1052 
1053 ////////////////////////////////////////////////////////////////////////////////
1054 /// Enable read timeout on the underlying socket
1055 
1056 void TXProofServ::EnableTimeout()
1057 {
1058  if (fSocket)
1059  ((TXSocket *)fSocket)->EnableTimeout();
1060 }
1061 
1062 ////////////////////////////////////////////////////////////////////////////////
1063 /// Terminate the proof server.
1064 
1065 void TXProofServ::Terminate(Int_t status)
1066 {
1067  if (fTerminated)
1068  // Avoid doubling the exit operations
1069  exit(1);
1070  fTerminated = kTRUE;
1071 
1072  // Notify
1073  Info("Terminate", "starting session termination operations ...");
1074  if (fgLogToSysLog > 0) {
1075  TString s;
1076  s.Form("%s -1 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
1077  gSystem->Syslog(kLogNotice, s.Data());
1078  }
1079 
1080  // Notify the memory footprint
1081  ProcInfo_t pi;
1082  if (!gSystem->GetProcInfo(&pi)){
1083  Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
1084  pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
1085  }
1086 
1087  // Deactivate current monitor, if any
1088  if (fProof)
1089  fProof->SetMonitor(0, kFALSE);
1090 
1091  // Cleanup session directory
1092  if (status == 0) {
1093  // make sure we remain in a "connected" directory
1094  gSystem->ChangeDirectory("/");
1095  // needed in case fSessionDir is on NFS ?!
1096  gSystem->MakeDirectory(fSessionDir+"/.delete");
1097  gSystem->Exec(Form("%s %s", kRM, fSessionDir.Data()));
1098  }
1099 
1100  // Cleanup queries directory if empty
1101  if (IsMaster()) {
1102  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
1103  // make sure we remain in a "connected" directory
1104  gSystem->ChangeDirectory("/");
1105  // needed in case fQueryDir is on NFS ?!
1106  gSystem->MakeDirectory(fQueryDir+"/.delete");
1107  gSystem->Exec(Form("%s %s", kRM, fQueryDir.Data()));
1108  // Remove lock file
1109  if (fQueryLock)
1110  gSystem->Unlink(fQueryLock->GetName());
1111  }
1112 
1113  // Unlock the query dir owned by this session
1114  if (fQueryLock)
1115  fQueryLock->Unlock();
1116  } else {
1117  // Try to stop processing if any
1118  Bool_t abort = (status == 0) ? kFALSE : kTRUE;
1119  if (!fIdle && fPlayer)
1120  fPlayer->StopProcess(abort,1);
1121  gSystem->Sleep(2000);
1122  }
1123 
1124  // Cleanup data directory if empty
1125  if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
1126  if (UnlinkDataDir(fDataDir))
1127  Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
1128  }
1129 
1130  // Remove input and signal handlers to avoid spurious "signals"
1131  // for closing activities executed upon exit()
1132  gSystem->RemoveFileHandler(fInputHandler);
1133 
1134  // Stop processing events (set a flag to exit the event loop)
1135  gSystem->ExitLoop();
1136 
1137  // We post the pipe once to wake up the main thread which is waiting for
1138  // activity on this socket; this fake activity will make it return and
1139  // eventually exit the loop.
1140  TXSocket::fgPipe.Post((TXSocket *)fSocket);
1141 
1142  // Notify
1143  Printf("Terminate: termination operations ended: quitting!");
1144 }
1145 
1146 ////////////////////////////////////////////////////////////////////////////////
1147 /// Try locking query area of session tagged sessiontag.
1148 /// The id of the locking file is returned in fid and must be
1149 /// unlocked via UnlockQueryFile(fid).
1150 
1151 Int_t TXProofServ::LockSession(const char *sessiontag, TProofLockPath **lck)
1152 {
1153  // We do not need to lock our own session
1154  if (strstr(sessiontag, fTopSessionTag))
1155  return 0;
1156 
1157  if (!lck) {
1158  Info("LockSession","locker space undefined");
1159  return -1;
1160  }
1161  *lck = 0;
1162 
1163  // Check the format
1164  TString stag = sessiontag;
1165  TRegexp re("session-.*-.*-.*");
1166  Int_t i1 = stag.Index(re);
1167  if (i1 == kNPOS) {
1168  Info("LockSession","bad format: %s", sessiontag);
1169  return -1;
1170  }
1171  stag.ReplaceAll("session-","");
1172 
1173  // Drop query number, if any
1174  Int_t i2 = stag.Index(":q");
1175  if (i2 != kNPOS)
1176  stag.Remove(i2);
1177 
1178  // Make sure that parent process does not exist anylonger
1179  TString parlog = fSessionDir;
1180  parlog = parlog.Remove(parlog.Index("master-")+strlen("master-"));
1181  parlog += stag;
1182  if (!gSystem->AccessPathName(parlog)) {
1183  Info("LockSession","parent still running: do nothing");
1184  return -1;
1185  }
1186 
1187  // Lock the query lock file
1188  TString qlock = fQueryLock->GetName();
1189  qlock.ReplaceAll(fTopSessionTag, stag);
1190 
1191  if (!gSystem->AccessPathName(qlock)) {
1192  *lck = new TProofLockPath(qlock);
1193  if (((*lck)->Lock()) < 0) {
1194  Info("LockSession","problems locking query lock file");
1195  SafeDelete(*lck);
1196  return -1;
1197  }
1198  }
1199 
1200  // We are done
1201  return 0;
1202 }
1203 
1204 ////////////////////////////////////////////////////////////////////////////////
1205 /// Send message to intermediate coordinator to release worker of last ordinal
1206 /// ord.
1207 
1208 void TXProofServ::ReleaseWorker(const char *ord)
1209 {
1210  if (gDebug > 2) Info("ReleaseWorker","releasing: %s", ord);
1211 
1212  ((TXSocket *)fSocket)->SendCoordinator(kReleaseWorker, ord);
1213 }