Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProofServ.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Fons Rademakers 16/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofServ
13 \ingroup proofkernel
14 
15 Class providing the PROOF server. It can act either as the master
16 server or as a slave server, depending on its startup arguments. It
17 receives and handles message coming from the client or from the
18 master server.
19 
20 */
21 
22 #include "RConfigure.h"
23 #include <ROOT/RConfig.hxx>
24 #include "Riostream.h"
25 
26 #ifdef WIN32
27  #include <process.h>
28  #include <io.h>
29  #include "snprintf.h"
30  typedef long off_t;
31 #endif
32 #include <errno.h>
33 #include <time.h>
34 #include <fcntl.h>
35 #include <sys/types.h>
36 #include <sys/stat.h>
37 #ifndef WIN32
38 #include <sys/wait.h>
39 #endif
40 #include <cstdlib>
41 
42 // To handle exceptions
43 #include <exception>
44 #include <new>
45 
46 using namespace std;
47 
48 #if (defined(__FreeBSD__) && (__FreeBSD__ < 4)) || \
49  (defined(__APPLE__) && (!defined(MAC_OS_X_VERSION_10_3) || \
50  (MAC_OS_X_VERSION_MAX_ALLOWED < MAC_OS_X_VERSION_10_3)))
51 #include <sys/file.h>
52 #define lockf(fd, op, sz) flock((fd), (op))
53 #ifndef F_LOCK
54 #define F_LOCK (LOCK_EX | LOCK_NB)
55 #endif
56 #ifndef F_ULOCK
57 #define F_ULOCK LOCK_UN
58 #endif
59 #endif
60 
61 #include "TProofServ.h"
62 #include "TDSetProxy.h"
63 #include "TEnv.h"
64 #include "TError.h"
65 #include "TEventList.h"
66 #include "TEntryList.h"
67 #include "TException.h"
68 #include "TFile.h"
69 #include "THashList.h"
70 #include "TInterpreter.h"
71 #include "TKey.h"
72 #include "TMessage.h"
73 #include "TVirtualPerfStats.h"
74 #include "TProofDebug.h"
75 #include "TProof.h"
76 #include "TVirtualProofPlayer.h"
77 #include "TProofQueryResult.h"
78 #include "TQueryResultManager.h"
79 #include "TRegexp.h"
80 #include "TROOT.h"
81 #include "TSocket.h"
82 #include "TStopwatch.h"
83 #include "TSystem.h"
84 #include "TTimeStamp.h"
85 #include "TUrl.h"
86 #include "TPluginManager.h"
87 #include "TObjString.h"
88 #include "compiledata.h"
89 #include "TProofResourcesStatic.h"
90 #include "TProofNodeInfo.h"
91 #include "TFileInfo.h"
92 #include "TClass.h"
93 #include "TSQLServer.h"
94 #include "TSQLResult.h"
95 #include "TSQLRow.h"
96 #include "TPRegexp.h"
97 #include "TParameter.h"
98 #include "TMap.h"
99 #include "TSortedList.h"
100 #include "TParameter.h"
101 #include "TFileCollection.h"
102 #include "TLockFile.h"
103 #include "TDataSetManagerFile.h"
104 #include "TProofProgressStatus.h"
105 #include "TServerSocket.h"
106 #include "TMonitor.h"
107 #include "TProofOutputFile.h"
108 #include "TSelector.h"
109 #include "TPackMgr.h"
110 
111 // global proofserv handle
112 TProofServ *gProofServ = 0;
113 
114 // debug hook
115 static volatile Int_t gProofServDebug = 1;
116 
117 // Syslog control
118 Int_t TProofServ::fgLogToSysLog = 0;
119 TString TProofServ::fgSysLogService("proof");
120 TString TProofServ::fgSysLogEntity("undef:default");
121 
122 // File where to log: default stderr
123 FILE *TProofServ::fgErrorHandlerFile = 0;
124 
125 // Integrate with crash reporter.
126 #ifdef __APPLE__
127 extern "C" {
128 static const char *__crashreporter_info__ = 0;
129 asm(".desc ___crashreporter_info__, 0x10");
130 }
131 #endif
132 
133 // To control allowed actions while processing
134 Int_t TProofServ::fgRecursive = 0;
135 
136 // Last message and entry before exceptions
137 TString TProofServ::fgLastMsg("<undef>");
138 Long64_t TProofServ::fgLastEntry = -1;
139 
140 // Memory controllers
141 Long_t TProofServ::fgVirtMemMax = -1;
142 Long_t TProofServ::fgResMemMax = -1;
143 Float_t TProofServ::fgMemHWM = 0.80;
144 Float_t TProofServ::fgMemStop = 0.95;
145 
146 // Async Logger
147 static void SendAsynMsg(const char *msg) {
148  if (gProofServ) gProofServ->SendAsynMessage(msg, kTRUE);
149 }
150 
151 //----- Termination signal handler ---------------------------------------------
152 ////////////////////////////////////////////////////////////////////////////////
153 
154 class TProofServTerminationHandler : public TSignalHandler {
155  TProofServ *fServ;
156 public:
157  TProofServTerminationHandler(TProofServ *s)
158  : TSignalHandler(kSigTermination, kFALSE) { fServ = s; }
159  Bool_t Notify();
160 };
161 
162 ////////////////////////////////////////////////////////////////////////////////
163 /// Handle this interrupt
164 
165 Bool_t TProofServTerminationHandler::Notify()
166 {
167  Printf("Received SIGTERM: terminating");
168  fServ->HandleTermination();
169  return kTRUE;
170 }
171 
172 //----- Interrupt signal handler -----------------------------------------------
173 ////////////////////////////////////////////////////////////////////////////////
174 
175 class TProofServInterruptHandler : public TSignalHandler {
176  TProofServ *fServ;
177 public:
178  TProofServInterruptHandler(TProofServ *s)
179  : TSignalHandler(kSigUrgent, kFALSE) { fServ = s; }
180  Bool_t Notify();
181 };
182 
183 ////////////////////////////////////////////////////////////////////////////////
184 /// Handle this interrupt
185 
186 Bool_t TProofServInterruptHandler::Notify()
187 {
188  fServ->HandleUrgentData();
189  if (TROOT::Initialized()) {
190  Throw(GetSignal());
191  }
192  return kTRUE;
193 }
194 
195 //----- SigPipe signal handler -------------------------------------------------
196 ////////////////////////////////////////////////////////////////////////////////
197 
198 class TProofServSigPipeHandler : public TSignalHandler {
199  TProofServ *fServ;
200 public:
201  TProofServSigPipeHandler(TProofServ *s) : TSignalHandler(kSigPipe, kFALSE)
202  { fServ = s; }
203  Bool_t Notify();
204 };
205 
206 ////////////////////////////////////////////////////////////////////////////////
207 /// Handle this signal
208 
209 Bool_t TProofServSigPipeHandler::Notify()
210 {
211  fServ->HandleSigPipe();
212  return kTRUE;
213 }
214 
215 //----- Input handler for messages from parent or master -----------------------
216 ////////////////////////////////////////////////////////////////////////////////
217 
218 class TProofServInputHandler : public TFileHandler {
219  TProofServ *fServ;
220 public:
221  TProofServInputHandler(TProofServ *s, Int_t fd) : TFileHandler(fd, 1)
222  { fServ = s; }
223  Bool_t Notify();
224  Bool_t ReadNotify() { return Notify(); }
225 };
226 
227 ////////////////////////////////////////////////////////////////////////////////
228 /// Handle this input
229 
230 Bool_t TProofServInputHandler::Notify()
231 {
232  fServ->HandleSocketInput();
233  return kTRUE;
234 }
235 
236 TString TProofServLogHandler::fgPfx = ""; // Default prefix to be prepended to messages
237 Int_t TProofServLogHandler::fgCmdRtn = 0; // Return code of the command execution (available only
238  // after closing the pipe)
239 ////////////////////////////////////////////////////////////////////////////////
240 /// Execute 'cmd' in a pipe and handle output messages from the related file
241 
242 TProofServLogHandler::TProofServLogHandler(const char *cmd,
243  TSocket *s, const char *pfx)
244  : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
245 {
246  ResetBit(kFileIsPipe);
247  fgCmdRtn = 0;
248  fFile = 0;
249  if (s && cmd) {
250  fFile = gSystem->OpenPipe(cmd, "r");
251  if (fFile) {
252  SetFd(fileno(fFile));
253  // Notify what already in the file
254  Notify();
255  // Used in the destructor
256  SetBit(kFileIsPipe);
257  } else {
258  fSocket = 0;
259  Error("TProofServLogHandler", "executing command in pipe");
260  fgCmdRtn = -1;
261  }
262  } else {
263  Error("TProofServLogHandler",
264  "undefined command (%p) or socket (%p)", (int *)cmd, s);
265  }
266 }
267 ////////////////////////////////////////////////////////////////////////////////
268 /// Handle available message from the open file 'f'
269 
270 TProofServLogHandler::TProofServLogHandler(FILE *f, TSocket *s, const char *pfx)
271  : TFileHandler(-1, 1), fSocket(s), fPfx(pfx)
272 {
273  ResetBit(kFileIsPipe);
274  fgCmdRtn = 0;
275  fFile = 0;
276  if (s && f) {
277  fFile = f;
278  SetFd(fileno(fFile));
279  // Notify what already in the file
280  Notify();
281  } else {
282  Error("TProofServLogHandler", "undefined file (%p) or socket (%p)", f, s);
283  }
284 }
285 ////////////////////////////////////////////////////////////////////////////////
286 /// Handle available message in the open file
287 
288 TProofServLogHandler::~TProofServLogHandler()
289 {
290  if (TestBit(kFileIsPipe) && fFile) {
291  Int_t rc = gSystem->ClosePipe(fFile);
292 #ifdef WIN32
293  fgCmdRtn = rc;
294 #else
295  fgCmdRtn = WIFEXITED(rc) ? WEXITSTATUS(rc) : -1;
296 #endif
297  }
298  fFile = 0;
299  fSocket = 0;
300  ResetBit(kFileIsPipe);
301 }
302 ////////////////////////////////////////////////////////////////////////////////
303 /// Handle available message in the open file
304 
305 Bool_t TProofServLogHandler::Notify()
306 {
307  if (IsValid()) {
308  TMessage m(kPROOF_MESSAGE);
309  // Read buffer
310  char line[4096];
311  char *plf = 0;
312  while (fgets(line, sizeof(line), fFile)) {
313  if ((plf = strchr(line, '\n')))
314  *plf = 0;
315  // Create log string
316  TString log;
317  if (fPfx.Length() > 0) {
318  // Prepend prefix specific to this instance
319  log.Form("%s: %s", fPfx.Data(), line);
320  } else if (fgPfx.Length() > 0) {
321  // Prepend default prefix
322  log.Form("%s: %s", fgPfx.Data(), line);
323  } else {
324  // Nothing to prepend
325  log = line;
326  }
327  // Send the message one level up
328  m.Reset(kPROOF_MESSAGE);
329  m << log;
330  fSocket->Send(m);
331  }
332  }
333  return kTRUE;
334 }
335 ////////////////////////////////////////////////////////////////////////////////
336 /// Static method to set the default prefix
337 
338 void TProofServLogHandler::SetDefaultPrefix(const char *pfx)
339 {
340  fgPfx = pfx;
341 }
342 ////////////////////////////////////////////////////////////////////////////////
343 /// Static method to get the return code from the execution of a command via
344 /// the pipe. This is always 0 when the log handler is not used with a pipe
345 
346 Int_t TProofServLogHandler::GetCmdRtn()
347 {
348  return fgCmdRtn;
349 }
350 
351 ////////////////////////////////////////////////////////////////////////////////
352 /// Init a guard for executing a command in a pipe
353 
354 TProofServLogHandlerGuard::TProofServLogHandlerGuard(const char *cmd, TSocket *s,
355  const char *pfx, Bool_t on)
356 {
357  fExecHandler = 0;
358  if (cmd && on) {
359  fExecHandler = new TProofServLogHandler(cmd, s, pfx);
360  if (fExecHandler->IsValid()) {
361  gSystem->AddFileHandler(fExecHandler);
362  } else {
363  Error("TProofServLogHandlerGuard","invalid handler");
364  }
365  } else {
366  if (on)
367  Error("TProofServLogHandlerGuard","undefined command");
368  }
369 }
370 
371 ////////////////////////////////////////////////////////////////////////////////
372 /// Init a guard for executing a command in a pipe
373 
374 TProofServLogHandlerGuard::TProofServLogHandlerGuard(FILE *f, TSocket *s,
375  const char *pfx, Bool_t on)
376 {
377  fExecHandler = 0;
378  if (f && on) {
379  fExecHandler = new TProofServLogHandler(f, s, pfx);
380  if (fExecHandler->IsValid()) {
381  gSystem->AddFileHandler(fExecHandler);
382  } else {
383  Error("TProofServLogHandlerGuard","invalid handler");
384  }
385  } else {
386  if (on)
387  Error("TProofServLogHandlerGuard","undefined file");
388  }
389 }
390 
391 ////////////////////////////////////////////////////////////////////////////////
392 /// Close a guard for executing a command in a pipe
393 
394 TProofServLogHandlerGuard::~TProofServLogHandlerGuard()
395 {
396  if (fExecHandler && fExecHandler->IsValid()) {
397  gSystem->RemoveFileHandler(fExecHandler);
398  SafeDelete(fExecHandler);
399  }
400 }
401 
402 //--- Special timer to control delayed shutdowns ----------------------------//
403 ////////////////////////////////////////////////////////////////////////////////
404 /// Construtor
405 
406 TShutdownTimer::TShutdownTimer(TProofServ *p, Int_t delay)
407  : TTimer(delay, kFALSE), fProofServ(p)
408 {
409  fTimeout = gEnv->GetValue("ProofServ.ShutdownTimeout", 20);
410  // Backward compaitibility: until 5.32 the variable was called ProofServ.ShutdonwTimeout
411  fTimeout = gEnv->GetValue("ProofServ.ShutdonwTimeout", fTimeout);
412 }
413 
414 ////////////////////////////////////////////////////////////////////////////////
415 /// Handle expiration of the shutdown timer. In the case of low activity the
416 /// process will be aborted.
417 
418 Bool_t TShutdownTimer::Notify()
419 {
420  if (gDebug > 0)
421  printf("TShutdownTimer::Notify: checking activity on the input socket\n");
422 
423  // Check activity on the socket
424  TSocket *xs = 0;
425  if (fProofServ && (xs = fProofServ->GetSocket())) {
426  TTimeStamp now;
427  TTimeStamp ts = xs->GetLastUsage();
428  Long_t dt = (Long_t)(now.GetSec() - ts.GetSec()) * 1000 +
429  (Long_t)(now.GetNanoSec() - ts.GetNanoSec()) / 1000000 ;
430  if (dt > fTimeout * 60000) {
431  printf("TShutdownTimer::Notify: input socket: %p: did not show any activity"
432  " during the last %d mins: aborting\n", xs, fTimeout);
433  // At this point we lost our controller: we need to abort to avoid
434  // hidden timeouts or loops
435  gSystem->Abort();
436  } else {
437  if (gDebug > 0)
438  printf("TShutdownTimer::Notify: input socket: %p: show activity"
439  " %ld secs ago\n", xs, dt / 60000);
440  }
441  }
442  // Needed for the next shot
443  Reset();
444  return kTRUE;
445 }
446 
447 //--- Synchronous timer used to reap children processes change of state ------//
448 ////////////////////////////////////////////////////////////////////////////////
449 /// Destructor
450 
451 TReaperTimer::~TReaperTimer()
452 {
453  if (fChildren) {
454  fChildren->SetOwner(kTRUE);
455  delete fChildren;
456  fChildren = 0;
457  }
458 }
459 
460 ////////////////////////////////////////////////////////////////////////////////
461 /// Add an entry for 'pid' in the internal list
462 
463 void TReaperTimer::AddPid(Int_t pid)
464 {
465  if (pid > 0) {
466  if (!fChildren)
467  fChildren = new TList;
468  TString spid;
469  spid.Form("%d", pid);
470  fChildren->Add(new TParameter<Int_t>(spid.Data(), pid));
471  TurnOn();
472  }
473 }
474 
475 ////////////////////////////////////////////////////////////////////////////////
476 /// Check if any of the registered children has changed its state.
477 /// Unregister those that are gone.
478 
479 Bool_t TReaperTimer::Notify()
480 {
481  if (fChildren) {
482  TIter nxp(fChildren);
483  TParameter<Int_t> *p = 0;
484  while ((p = (TParameter<Int_t> *)nxp())) {
485  int status;
486 #ifndef WIN32
487  pid_t pid;
488  do {
489  pid = waitpid(p->GetVal(), &status, WNOHANG);
490  } while (pid < 0 && errno == EINTR);
491 #else
492  intptr_t pid;
493  pid = _cwait(&status, (intptr_t)p->GetVal(), 0);
494 #endif
495  if (pid > 0 && pid == p->GetVal()) {
496  // Remove from the list
497  fChildren->Remove(p);
498  delete p;
499  }
500  }
501  }
502 
503  // Stop the timer if no children
504  if (!fChildren || fChildren->GetSize() <= 0) {
505  Stop();
506  } else {
507  // Needed for the next shot
508  Reset();
509  }
510  return kTRUE;
511 }
512 
513 //--- Special timer to terminate idle sessions ----------------------------//
514 ////////////////////////////////////////////////////////////////////////////////
515 /// Handle expiration of the idle timer. The session will just be terminated.
516 
517 Bool_t TIdleTOTimer::Notify()
518 {
519  Info ("Notify", "session idle for more then %lld secs: terminating", Long64_t(fTime)/1000);
520 
521  if (fProofServ) {
522  // Set the status to timed-out
523  Int_t uss_rc = -1;
524  if ((uss_rc = fProofServ->UpdateSessionStatus(4)) != 0)
525  Warning("Notify", "problems updating session status (errno: %d)", -uss_rc);
526  // Send a terminate request
527  TString msg;
528  if (fProofServ->GetProtocol() < 29) {
529  msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n"
530  "// Please IGNORE any error message possibly displayed below\n//",
531  gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
532  } else {
533  msg.Form("\n//\n// PROOF session at %s (%s) terminated because idle for more than %lld secs\n//",
534  gSystem->HostName(), fProofServ->GetSessionTag(), Long64_t(fTime)/1000);
535  }
536  fProofServ->SendAsynMessage(msg.Data());
537  fProofServ->Terminate(0);
538  Reset();
539  Stop();
540  } else {
541  Warning("Notify", "fProofServ undefined!");
542  Start(-1, kTRUE);
543  }
544  return kTRUE;
545 }
546 
547 ClassImp(TProofServ);
548 
549 // Hook to the constructor. This is needed to avoid using the plugin manager
550 // which may create problems in multi-threaded environments.
551 extern "C" {
552  TApplication *GetTProofServ(Int_t *argc, char **argv, FILE *flog)
553  { return new TProofServ(argc, argv, flog); }
554 }
555 
556 ////////////////////////////////////////////////////////////////////////////////
557 /// Main constructor. Create an application environment. The TProofServ
558 /// environment provides an eventloop via inheritance of TApplication.
559 /// Actual server creation work is done in CreateServer() to allow
560 /// overloading.
561 
562 TProofServ::TProofServ(Int_t *argc, char **argv, FILE *flog)
563  : TApplication("proofserv", argc, argv, 0, -1)
564 {
565  // If test and tty, we are done
566  Bool_t xtest = (argc && *argc == 1) ? kTRUE : kFALSE;
567  if (xtest) {
568  Printf("proofserv: command line testing: OK");
569  exit(0);
570  }
571 
572  // Read session specific rootrc file
573  TString rcfile = gSystem->Getenv("ROOTRCFILE") ? gSystem->Getenv("ROOTRCFILE")
574  : "session.rootrc";
575  if (!gSystem->AccessPathName(rcfile, kReadPermission))
576  gEnv->ReadFile(rcfile, kEnvChange);
577 
578  // Upper limit on Virtual Memory (in kB)
579  fgVirtMemMax = gEnv->GetValue("Proof.VirtMemMax",-1);
580  if (fgVirtMemMax < 0 && gSystem->Getenv("PROOF_VIRTMEMMAX")) {
581  Long_t mmx = strtol(gSystem->Getenv("PROOF_VIRTMEMMAX"), 0, 10);
582  if (mmx < kMaxLong && mmx > 0)
583  fgVirtMemMax = mmx * 1024;
584  }
585  // Old variable for backward compatibility
586  if (fgVirtMemMax < 0 && gSystem->Getenv("ROOTPROOFASHARD")) {
587  Long_t mmx = strtol(gSystem->Getenv("ROOTPROOFASHARD"), 0, 10);
588  if (mmx < kMaxLong && mmx > 0)
589  fgVirtMemMax = mmx * 1024;
590  }
591  // Upper limit on Resident Memory (in kB)
592  fgResMemMax = gEnv->GetValue("Proof.ResMemMax",-1);
593  if (fgResMemMax < 0 && gSystem->Getenv("PROOF_RESMEMMAX")) {
594  Long_t mmx = strtol(gSystem->Getenv("PROOF_RESMEMMAX"), 0, 10);
595  if (mmx < kMaxLong && mmx > 0)
596  fgResMemMax = mmx * 1024;
597  }
598  // Thresholds for warnings and stop processing
599  fgMemStop = gEnv->GetValue("Proof.MemStop", 0.95);
600  fgMemHWM = gEnv->GetValue("Proof.MemHWM", 0.80);
601  if (fgVirtMemMax > 0 || fgResMemMax > 0) {
602  if ((fgMemStop < 0.) || (fgMemStop > 1.)) {
603  Warning("TProofServ", "requested memory fraction threshold to stop processing"
604  " (MemStop) out of range [0,1] - ignoring");
605  fgMemStop = 0.95;
606  }
607  if ((fgMemHWM < 0.) || (fgMemHWM > fgMemStop)) {
608  Warning("TProofServ", "requested memory fraction threshold for warning and finer monitoring"
609  " (MemHWM) out of range [0,MemStop] - ignoring");
610  fgMemHWM = 0.80;
611  }
612  }
613 
614  // Wait (loop) to allow debugger to connect
615  Bool_t test = (argc && *argc >= 4 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
616  if ((gEnv->GetValue("Proof.GdbHook",0) == 3 && !test) ||
617  (gEnv->GetValue("Proof.GdbHook",0) == 4 && test)) {
618  while (gProofServDebug)
619  ;
620  }
621 
622  // Test instance
623  if (argc && *argc >= 4)
624  if (!strcmp(argv[3], "test"))
625  fService = "prooftest";
626 
627  // crude check on number of arguments
628  if (argc && *argc < 2) {
629  Error("TProofServ", "Must have at least 1 arguments (see proofd).");
630  exit(1);
631  }
632 
633  // Set global to this instance
634  gProofServ = this;
635 
636  // Log control flags
637  fSendLogToMaster = kFALSE;
638 
639  // Abort on higher than kSysError's and set error handler
640  gErrorAbortLevel = kSysError + 1;
641  SetErrorHandlerFile(stderr);
642  SetErrorHandler(ErrorHandler);
643 
644  fNcmd = 0;
645  fGroupPriority = 100;
646  fInterrupt = kFALSE;
647  fProtocol = 0;
648  fOrdinal = gEnv->GetValue("ProofServ.Ordinal", "-1");
649  fGroupId = -1;
650  fGroupSize = 0;
651  fRealTime = 0.0;
652  fCpuTime = 0.0;
653  fProof = 0;
654  fPlayer = 0;
655  fSocket = 0;
656 
657  fTotSessions = -1;
658  fActSessions = -1;
659  fEffSessions = -1.;
660  fPackMgr = 0;
661 
662  fLogFile = flog;
663  fLogFileDes = -1;
664 
665  fArchivePath = "";
666  // Init lockers
667  fCacheLock = 0;
668  fQueryLock = 0;
669 
670  fQMgr = 0;
671  fWaitingQueries = new TList;
672  fIdle = kTRUE;
673  fQuerySeqNum = -1;
674 
675  fQueuedMsg = new TList;
676 
677  fRealTimeLog = kFALSE;
678 
679  fShutdownTimer = 0;
680  fReaperTimer = 0;
681  fIdleTOTimer = 0;
682 
683  fDataSetManager = 0; // Initialized in Setup()
684  fDataSetStgRepo = 0; // Initialized in Setup()
685 
686  fInputHandler = 0;
687 
688  // Quotas disabled by default
689  fMaxQueries = -1;
690  fMaxBoxSize = -1;
691  fHWMBoxSize = -1;
692 
693  // Submerger quantities
694  fMergingSocket = 0;
695  fMergingMonitor = 0;
696  fMergedWorkers = 0;
697 
698  // Bit to flg high-memory footprint
699  ResetBit(TProofServ::kHighMemory);
700 
701  // Max message size
702  fMsgSizeHWM = gEnv->GetValue("ProofServ.MsgSizeHWM", 1000000);
703 
704  // Message compression
705  fCompressMsg = gEnv->GetValue("ProofServ.CompressMessage", 0);
706 
707  gProofDebugLevel = gEnv->GetValue("Proof.DebugLevel",0);
708  fLogLevel = gProofDebugLevel;
709 
710  gProofDebugMask = (TProofDebug::EProofDebugMask) gEnv->GetValue("Proof.DebugMask",~0);
711  if (gProofDebugLevel > 0)
712  Info("TProofServ", "DebugLevel %d Mask 0x%x", gProofDebugLevel, gProofDebugMask);
713 
714  // Max log file size
715  fLogFileMaxSize = -1;
716  TString logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
717  if (!logmx.IsNull()) {
718  Long64_t xf = 1;
719  if (!logmx.IsDigit()) {
720  if (logmx.EndsWith("K")) {
721  xf = 1024;
722  logmx.Remove(TString::kTrailing, 'K');
723  } else if (logmx.EndsWith("M")) {
724  xf = 1024*1024;
725  logmx.Remove(TString::kTrailing, 'M');
726  } else if (logmx.EndsWith("G")) {
727  xf = 1024*1024*1024;
728  logmx.Remove(TString::kTrailing, 'G');
729  }
730  }
731  if (logmx.IsDigit()) {
732  fLogFileMaxSize = logmx.Atoi() * xf;
733  if (fLogFileMaxSize > 0)
734  Info("TProofServ", "keeping the log file size within %lld bytes", fLogFileMaxSize);
735  } else {
736  logmx = gEnv->GetValue("ProofServ.LogFileMaxSize", "");
737  Warning("TProofServ", "bad formatted log file size limit ignored: '%s'", logmx.Data());
738  }
739  }
740 
741  // Parse options
742  GetOptions(argc, argv);
743 
744  // Default prefix in the form '<role>-<ordinal>'
745  fPrefix = (IsMaster() ? "Mst-" : "Wrk-");
746  if (test) fPrefix = "Test";
747  if (fOrdinal != "-1")
748  fPrefix += fOrdinal;
749  TProofServLogHandler::SetDefaultPrefix(fPrefix);
750 
751  // Syslog control
752  TString slog = gEnv->GetValue("ProofServ.LogToSysLog", "");
753  if (!(slog.IsNull())) {
754  if (slog.IsDigit()) {
755  fgLogToSysLog = slog.Atoi();
756  } else {
757  char c = (slog[0] == 'M' || slog[0] == 'm') ? 'm' : 'a';
758  c = (slog[0] == 'W' || slog[0] == 'w') ? 'w' : c;
759  Bool_t dosyslog = ((c == 'm' && IsMaster()) ||
760  (c == 'w' && !IsMaster()) || c == 'a') ? kTRUE : kFALSE;
761  if (dosyslog) {
762  slog.Remove(0,1);
763  if (slog.IsDigit()) fgLogToSysLog = slog.Atoi();
764  if (fgLogToSysLog <= 0)
765  Warning("TProofServ", "request for syslog logging ineffective!");
766  }
767  }
768  }
769  // Initialize proper service if required
770  if (fgLogToSysLog > 0) {
771  fgSysLogService = (IsMaster()) ? "proofm" : "proofw";
772  if (fOrdinal != "-1") fgSysLogService += TString::Format("-%s", fOrdinal.Data());
773  gSystem->Openlog(fgSysLogService, kLogPid | kLogCons, kLogLocal5);
774  }
775 
776  // Enable optimized sending of streamer infos to use embedded backward/forward
777  // compatibility support between different ROOT versions and different versions of
778  // users classes
779  Bool_t enableSchemaEvolution = gEnv->GetValue("Proof.SchemaEvolution",1);
780  if (enableSchemaEvolution) {
781  TMessage::EnableSchemaEvolutionForAll();
782  } else {
783  Info("TProofServ", "automatic schema evolution in TMessage explicitly disabled");
784  }
785 }
786 
787 ////////////////////////////////////////////////////////////////////////////////
788 /// Finalize the server setup. If master, create the TProof instance to talk
789 /// to the worker or submaster nodes.
790 /// Return 0 on success, -1 on error
791 
792 Int_t TProofServ::CreateServer()
793 {
794  // Get socket to be used (setup in proofd)
795  TString opensock = gSystem->Getenv("ROOTOPENSOCK");
796  if (opensock.Length() <= 0)
797  opensock = gEnv->GetValue("ProofServ.OpenSock", "-1");
798  Int_t sock = opensock.Atoi();
799  if (sock <= 0) {
800  Fatal("CreateServer", "Invalid socket descriptor number (%d)", sock);
801  return -1;
802  }
803  fSocket = new TSocket(sock);
804 
805  // Set compression level, if any
806  fSocket->SetCompressionSettings(fCompressMsg);
807 
808  // debug hooks
809  if (IsMaster()) {
810  // wait (loop) in master to allow debugger to connect
811  if (gEnv->GetValue("Proof.GdbHook",0) == 1) {
812  while (gProofServDebug)
813  ;
814  }
815  } else {
816  // wait (loop) in slave to allow debugger to connect
817  if (gEnv->GetValue("Proof.GdbHook",0) == 2) {
818  while (gProofServDebug)
819  ;
820  }
821  }
822 
823  if (gProofDebugLevel > 0)
824  Info("CreateServer", "Service %s ConfDir %s IsMaster %d\n",
825  GetService(), GetConfDir(), (Int_t)fMasterServ);
826 
827  if (Setup() != 0) {
828  // Setup failure
829  LogToMaster();
830  SendLogFile();
831  Terminate(0);
832  return -1;
833  }
834 
835  // Set the default prefix in the form '<role>-<ordinal>' (it was already done
836  // in the constructor, but for standard PROOF the ordinal number is only set in
837  // Setup(), so we need to do it again here)
838  TString pfx = (IsMaster() ? "Mst-" : "Wrk-");
839  pfx += GetOrdinal();
840  TProofServLogHandler::SetDefaultPrefix(pfx);
841 
842  if (!fLogFile) {
843  RedirectOutput();
844  // If for some reason we failed setting a redirection file for the logs
845  // we cannot continue
846  if (!fLogFile || (fLogFileDes = fileno(fLogFile)) < 0) {
847  LogToMaster();
848  SendLogFile(-98);
849  Terminate(0);
850  return -1;
851  }
852  } else {
853  // Use the file already open by pmain
854  if ((fLogFileDes = fileno(fLogFile)) < 0) {
855  LogToMaster();
856  SendLogFile(-98);
857  Terminate(0);
858  return -1;
859  }
860  }
861 
862  // Send message of the day to the client
863  if (IsMaster()) {
864  if (CatMotd() == -1) {
865  LogToMaster();
866  SendLogFile(-99);
867  Terminate(0);
868  return -1;
869  }
870  }
871 
872  // Everybody expects std::iostream to be available, so load it...
873  ProcessLine("#include <iostream>", kTRUE);
874  ProcessLine("#include <string>",kTRUE); // for std::string std::iostream.
875 
876  // The following libs are also useful to have, make sure they are loaded...
877  //gROOT->LoadClass("TMinuit", "Minuit");
878  //gROOT->LoadClass("TPostScript", "Postscript");
879 
880  // Load user functions
881  const char *logon;
882  logon = gEnv->GetValue("Proof.Load", (char *)0);
883  if (logon) {
884  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
885  if (mac)
886  ProcessLine(TString::Format(".L %s", logon), kTRUE);
887  delete [] mac;
888  }
889 
890  // Execute logon macro
891  logon = gEnv->GetValue("Proof.Logon", (char *)0);
892  if (logon && !NoLogOpt()) {
893  char *mac = gSystem->Which(TROOT::GetMacroPath(), logon, kReadPermission);
894  if (mac)
895  ProcessFile(logon);
896  delete [] mac;
897  }
898 
899  // Save current interpreter context
900  gInterpreter->SaveContext();
901  gInterpreter->SaveGlobalsContext();
902 
903  // Install interrupt and message input handlers
904  gSystem->AddSignalHandler(new TProofServTerminationHandler(this));
905  gSystem->AddSignalHandler(new TProofServInterruptHandler(this));
906  fInputHandler = new TProofServInputHandler(this, sock);
907  gSystem->AddFileHandler(fInputHandler);
908 
909  // if master, start slave servers
910  if (IsMaster()) {
911  TString master = "proof://__master__";
912  TInetAddress a = gSystem->GetSockName(sock);
913  if (a.IsValid()) {
914  master += ":";
915  master += a.GetPort();
916  }
917 
918  // Get plugin manager to load appropriate TProof from
919  TPluginManager *pm = gROOT->GetPluginManager();
920  if (!pm) {
921  Error("CreateServer", "no plugin manager found");
922  SendLogFile(-99);
923  Terminate(0);
924  return -1;
925  }
926 
927  // Find the appropriate handler
928  TPluginHandler *h = pm->FindHandler("TProof", fConfFile);
929  if (!h) {
930  Error("CreateServer", "no plugin found for TProof with a"
931  " config file of '%s'", fConfFile.Data());
932  SendLogFile(-99);
933  Terminate(0);
934  return -1;
935  }
936 
937  // load the plugin
938  if (h->LoadPlugin() == -1) {
939  Error("CreateServer", "plugin for TProof could not be loaded");
940  SendLogFile(-99);
941  Terminate(0);
942  return -1;
943  }
944 
945  // make instance of TProof
946  fProof = reinterpret_cast<TProof*>(h->ExecPlugin(5, master.Data(),
947  fConfFile.Data(),
948  GetConfDir(),
949  fLogLevel, 0));
950  if (!fProof || !fProof->IsValid()) {
951  Error("CreateServer", "plugin for TProof could not be executed");
952  SafeDelete(fProof);
953  SendLogFile(-99);
954  Terminate(0);
955  return -1;
956  }
957  // Find out if we are a master in direct contact only with workers
958  fEndMaster = fProof->IsEndMaster();
959 
960  SendLogFile();
961  }
962 
963  // Setup the shutdown timer
964  if (!fShutdownTimer) {
965  // Check activity on socket every 5 mins
966  fShutdownTimer = new TShutdownTimer(this, 300000);
967  fShutdownTimer->Start(-1, kFALSE);
968  }
969 
970  // Check if schema evolution is effective: clients running versions <=17 do not
971  // support that: send a warning message
972  if (fProtocol <= 17) {
973  TString msg;
974  msg.Form("Warning: client version is too old: automatic schema evolution is ineffective.\n"
975  " This may generate compatibility problems between streamed objects.\n"
976  " The advise is to move to ROOT >= 5.21/02 .");
977  SendAsynMessage(msg.Data());
978  }
979 
980  // Setup the idle timer
981  if (IsMaster() && !fIdleTOTimer) {
982  // Check activity on socket every 5 mins
983  Int_t idle_to = gEnv->GetValue("ProofServ.IdleTimeout", -1);
984  if (idle_to > 0) {
985  fIdleTOTimer = new TIdleTOTimer(this, idle_to * 1000);
986  fIdleTOTimer->Start(-1, kTRUE);
987  if (gProofDebugLevel > 0)
988  Info("CreateServer", " idle timer started (%d secs)", idle_to);
989  } else if (gProofDebugLevel > 0) {
990  Info("CreateServer", " idle timer not started (no idle timeout requested)");
991  }
992  }
993 
994  // Done
995  return 0;
996 }
997 
998 ////////////////////////////////////////////////////////////////////////////////
999 /// Cleanup. Not really necessary since after this dtor there is no
1000 /// live anyway.
1001 
1002 TProofServ::~TProofServ()
1003 {
1004  SafeDelete(fWaitingQueries);
1005  SafeDelete(fSocket);
1006  SafeDelete(fPackMgr);
1007  SafeDelete(fCacheLock);
1008  SafeDelete(fQueryLock);
1009  SafeDelete(fDataSetManager);
1010  SafeDelete(fDataSetStgRepo);
1011  close(fLogFileDes);
1012 }
1013 
1014 ////////////////////////////////////////////////////////////////////////////////
1015 /// Print message of the day (in the file pointed by the env PROOFMOTD
1016 /// or from fConfDir/etc/proof/motd). The motd is not shown more than
1017 /// once a day. If the file pointed by env PROOFNOPROOF exists (or the
1018 /// file fConfDir/etc/proof/noproof exists), show its contents and close
1019 /// the connection.
1020 
1021 Int_t TProofServ::CatMotd()
1022 {
1023  TString lastname;
1024  FILE *motd;
1025  Bool_t show = kFALSE;
1026 
1027  // If we are disabled just print the message and close the connection
1028  TString motdname(GetConfDir());
1029  // The env variable PROOFNOPROOF allows to put the file in an alternative
1030  // location not overwritten by a new installation
1031  if (gSystem->Getenv("PROOFNOPROOF")) {
1032  motdname = gSystem->Getenv("PROOFNOPROOF");
1033  } else {
1034  motdname += "/etc/proof/noproof";
1035  }
1036  if ((motd = fopen(motdname, "r"))) {
1037  Int_t c;
1038  printf("\n");
1039  while ((c = getc(motd)) != EOF)
1040  putchar(c);
1041  fclose(motd);
1042  printf("\n");
1043 
1044  return -1;
1045  }
1046 
1047  // get last modification time of the file ~/proof/.prooflast
1048  lastname = TString(GetWorkDir()) + "/.prooflast";
1049  char *last = gSystem->ExpandPathName(lastname.Data());
1050  Long64_t size;
1051  Long_t id, flags, modtime, lasttime = 0;
1052  if (gSystem->GetPathInfo(last, &id, &size, &flags, &lasttime) == 1)
1053  lasttime = 0;
1054 
1055  // show motd at least once per day
1056  if (time(0) - lasttime > (time_t)86400)
1057  show = kTRUE;
1058 
1059  // The env variable PROOFMOTD allows to put the file in an alternative
1060  // location not overwritten by a new installation
1061  if (gSystem->Getenv("PROOFMOTD")) {
1062  motdname = gSystem->Getenv("PROOFMOTD");
1063  } else {
1064  motdname = GetConfDir();
1065  motdname += "/etc/proof/motd";
1066  }
1067  if (gSystem->GetPathInfo(motdname, &id, &size, &flags, &modtime) == 0) {
1068  if (modtime > lasttime || show) {
1069  if ((motd = fopen(motdname, "r"))) {
1070  Int_t c;
1071  printf("\n");
1072  while ((c = getc(motd)) != EOF)
1073  putchar(c);
1074  fclose(motd);
1075  printf("\n");
1076  }
1077  }
1078  }
1079 
1080  if (lasttime)
1081  gSystem->Unlink(last);
1082  Int_t fd = creat(last, 0600);
1083  if (fd >= 0) close(fd);
1084  delete [] last;
1085 
1086  return 0;
1087 }
1088 
1089 ////////////////////////////////////////////////////////////////////////////////
1090 /// Get object with name "name;cycle" (e.g. "aap;2") from master or client.
1091 /// This method is called by TDirectory::Get() in case the object can not
1092 /// be found locally.
1093 
1094 TObject *TProofServ::Get(const char *namecycle)
1095 {
1096  if (fSocket->Send(namecycle, kPROOF_GETOBJECT) < 0) {
1097  Error("Get", "problems sending request");
1098  return (TObject *)0;
1099  }
1100 
1101  TObject *idcur = 0;
1102 
1103  Bool_t notdone = kTRUE;
1104  while (notdone) {
1105  TMessage *mess = 0;
1106  if (fSocket->Recv(mess) < 0)
1107  return 0;
1108  Int_t what = mess->What();
1109  if (what == kMESS_OBJECT) {
1110  idcur = mess->ReadObject(mess->GetClass());
1111  notdone = kFALSE;
1112  } else {
1113  Int_t xrc = HandleSocketInput(mess, kFALSE);
1114  if (xrc == -1) {
1115  Error("Get", "command %d cannot be executed while processing", what);
1116  } else if (xrc == -2) {
1117  Error("Get", "unknown command %d ! Protocol error?", what);
1118  }
1119  }
1120  delete mess;
1121  }
1122 
1123  return idcur;
1124 }
1125 
1126 ////////////////////////////////////////////////////////////////////////////////
1127 /// Reset the compute time
1128 
1129 void TProofServ::RestartComputeTime()
1130 {
1131  fCompute.Stop();
1132  if (fPlayer) {
1133  TProofProgressStatus *status = fPlayer->GetProgressStatus();
1134  if (status) status->SetLearnTime(fCompute.RealTime());
1135  Info("RestartComputeTime", "compute time restarted after %f secs (%d entries)",
1136  fCompute.RealTime(), fPlayer->GetLearnEntries());
1137  }
1138  fCompute.Start(kFALSE);
1139 }
1140 
1141 ////////////////////////////////////////////////////////////////////////////////
1142 /// Get next range of entries to be processed on this server.
1143 
1144 TDSetElement *TProofServ::GetNextPacket(Long64_t totalEntries)
1145 {
1146  Long64_t bytesRead = 0;
1147 
1148  if (gPerfStats) bytesRead = gPerfStats->GetBytesRead();
1149 
1150  if (fCompute.Counter() > 0)
1151  fCompute.Stop();
1152 
1153  TMessage req(kPROOF_GETPACKET);
1154  Double_t cputime = fCompute.CpuTime();
1155  Double_t realtime = fCompute.RealTime();
1156 
1157  if (fProtocol > 18) {
1158  req << fLatency.RealTime();
1159  TProofProgressStatus *status = 0;
1160  if (fPlayer) {
1161  fPlayer->UpdateProgressInfo();
1162  status = fPlayer->GetProgressStatus();
1163  } else {
1164  Error("GetNextPacket", "no progress status object");
1165  return 0;
1166  }
1167  // the CPU and wallclock proc times are kept in the TProofServ and here
1168  // added to the status object in the fPlayer.
1169  if (status->GetEntries() > 0) {
1170  PDB(kLoop, 2) status->Print(GetOrdinal());
1171  status->IncProcTime(realtime);
1172  status->IncCPUTime(cputime);
1173  }
1174  // Flag cases with problems in opening files
1175  if (totalEntries < 0) status->SetBit(TProofProgressStatus::kFileNotOpen);
1176  // Add to the message
1177  req << status;
1178  // Send tree cache information
1179  Long64_t cacheSize = (fPlayer) ? fPlayer->GetCacheSize() : -1;
1180  Int_t learnent = (fPlayer) ? fPlayer->GetLearnEntries() : -1;
1181  req << cacheSize << learnent;
1182 
1183  // Sent over the number of entries in the file, used by packetizer do not relying
1184  // on initial validation. Also, -1 means that the file could not be open, which is
1185  // used to flag files as missing
1186  req << totalEntries;
1187 
1188  // Send the time spent in saving the partial result to file
1189  if (fProtocol > 34) req << fSaveOutput.RealTime();
1190 
1191  PDB(kLoop, 1) {
1192  PDB(kLoop, 2) status->Print();
1193  Info("GetNextPacket","cacheSize: %lld, learnent: %d", cacheSize, learnent);
1194  }
1195  // Reset the status bits
1196  status->ResetBit(TProofProgressStatus::kFileNotOpen);
1197  status->ResetBit(TProofProgressStatus::kFileCorrupted);
1198  status = 0; // status is owned by the player.
1199  } else {
1200  req << fLatency.RealTime() << realtime << cputime
1201  << bytesRead << totalEntries;
1202  if (fPlayer)
1203  req << fPlayer->GetEventsProcessed();
1204  }
1205 
1206  fLatency.Start();
1207  Int_t rc = fSocket->Send(req);
1208  if (rc <= 0) {
1209  Error("GetNextPacket","Send() failed, returned %d", rc);
1210  return 0;
1211  }
1212 
1213  // Save the current output
1214  if (fPlayer) {
1215  fSaveOutput.Start();
1216  if (fPlayer->SavePartialResults(kFALSE) < 0)
1217  Warning("GetNextPacket", "problems saving partial results");
1218  fSaveOutput.Stop();
1219  }
1220 
1221  TDSetElement *e = 0;
1222  Bool_t notdone = kTRUE;
1223  while (notdone) {
1224 
1225  TMessage *mess;
1226  if ((rc = fSocket->Recv(mess)) <= 0) {
1227  fLatency.Stop();
1228  Error("GetNextPacket","Recv() failed, returned %d", rc);
1229  return 0;
1230  }
1231 
1232  Int_t xrc = 0;
1233  TString file, dir, obj;
1234 
1235  Int_t what = mess->What();
1236 
1237  switch (what) {
1238  case kPROOF_GETPACKET:
1239 
1240  fLatency.Stop();
1241  (*mess) >> e;
1242  if (e != 0) {
1243  fCompute.Start();
1244  PDB(kLoop, 2) Info("GetNextPacket", "'%s' '%s' '%s' %lld %lld",
1245  e->GetFileName(), e->GetDirectory(),
1246  e->GetObjName(), e->GetFirst(),e->GetNum());
1247  } else {
1248  PDB(kLoop, 2) Info("GetNextPacket", "Done");
1249  }
1250  notdone = kFALSE;
1251  break;
1252 
1253  case kPROOF_STOPPROCESS:
1254  // if a kPROOF_STOPPROCESS message is returned to kPROOF_GETPACKET
1255  // GetNextPacket() will return 0 and the TPacketizer and hence
1256  // TEventIter will be stopped
1257  fLatency.Stop();
1258  PDB(kLoop, 2) Info("GetNextPacket:kPROOF_STOPPROCESS","received");
1259  break;
1260 
1261  default:
1262  xrc = HandleSocketInput(mess, kFALSE);
1263  if (xrc == -1) {
1264  Error("GetNextPacket", "command %d cannot be executed while processing", what);
1265  } else if (xrc == -2) {
1266  Error("GetNextPacket", "unknown command %d ! Protocol error?", what);
1267  }
1268  break;
1269  }
1270 
1271  delete mess;
1272 
1273  }
1274 
1275  // Done
1276  return e;
1277 }
1278 
1279 ////////////////////////////////////////////////////////////////////////////////
1280 /// Get and handle command line options. Fixed format:
1281 /// "proofserv"|"proofslave" <confdir>
1282 
1283 void TProofServ::GetOptions(Int_t *argc, char **argv)
1284 {
1285  Bool_t xtest = (argc && *argc > 3 && !strcmp(argv[3], "test")) ? kTRUE : kFALSE;
1286 
1287  // If test and tty
1288  if (xtest && !(isatty(0) == 0 || isatty(1) == 0)) {
1289  Printf("proofserv: command line testing: OK");
1290  exit(0);
1291  }
1292 
1293  if (!argc || (argc && *argc <= 1)) {
1294  Fatal("GetOptions", "Must be started from proofd with arguments");
1295  exit(1);
1296  }
1297 
1298  if (!strcmp(argv[1], "proofserv")) {
1299  fMasterServ = kTRUE;
1300  fEndMaster = kTRUE;
1301  } else if (!strcmp(argv[1], "proofslave")) {
1302  fMasterServ = kFALSE;
1303  fEndMaster = kFALSE;
1304  } else {
1305  Fatal("GetOptions", "Must be started as 'proofserv' or 'proofslave'");
1306  exit(1);
1307  }
1308 
1309  fService = argv[1];
1310 
1311  // Confdir
1312  if (!(gSystem->Getenv("ROOTCONFDIR"))) {
1313  Fatal("GetOptions", "ROOTCONFDIR shell variable not set");
1314  exit(1);
1315  }
1316  fConfDir = gSystem->Getenv("ROOTCONFDIR");
1317 }
1318 
1319 ////////////////////////////////////////////////////////////////////////////////
1320 /// Handle input coming from the client or from the master server.
1321 
1322 void TProofServ::HandleSocketInput()
1323 {
1324  // The idle timeout guard: stops the timer and restarts when we return from here
1325  TIdleTOTimerGuard itg(fIdleTOTimer);
1326 
1327  Bool_t all = (fgRecursive > 0) ? kFALSE : kTRUE;
1328  fgRecursive++;
1329 
1330  TMessage *mess;
1331  Int_t rc = 0;
1332  TString exmsg;
1333 
1334  // Check log file length (before the action, so we have the chance to keep the
1335  // latest logs)
1336  TruncateLogFile();
1337 
1338  try {
1339 
1340  // Get message
1341  if (fSocket->Recv(mess) <= 0 || !mess) {
1342  // Pending: do something more intelligent here
1343  // but at least get a message in the log file
1344  Error("HandleSocketInput", "retrieving message from input socket");
1345  Terminate(0);
1346  return;
1347  }
1348  Int_t what = mess->What();
1349  PDB(kCollect, 1)
1350  Info("HandleSocketInput", "got type %d from '%s'", what, fSocket->GetTitle());
1351 
1352  fNcmd++;
1353 
1354  if (fProof) fProof->SetActive();
1355 
1356  Bool_t doit = kTRUE;
1357 
1358  while (doit) {
1359 
1360  // Process the message
1361  rc = HandleSocketInput(mess, all);
1362  if (rc < 0) {
1363  TString emsg;
1364  if (rc == -1) {
1365  emsg.Form("HandleSocketInput: command %d cannot be executed while processing", what);
1366  } else if (rc == -3) {
1367  emsg.Form("HandleSocketInput: message %d undefined! Protocol error?", what);
1368  } else {
1369  emsg.Form("HandleSocketInput: unknown command %d! Protocol error?", what);
1370  }
1371  SendAsynMessage(emsg.Data());
1372  } else if (rc == 2) {
1373  // Add to the queue
1374  fQueuedMsg->Add(mess);
1375  PDB(kGlobal, 1)
1376  Info("HandleSocketInput", "message of type %d enqueued; sz: %d",
1377  what, fQueuedMsg->GetSize());
1378  mess = 0;
1379  }
1380 
1381  // Still something to do?
1382  doit = 0;
1383  if (fgRecursive == 1 && fQueuedMsg->GetSize() > 0) {
1384  // Add to the queue
1385  PDB(kCollect, 1)
1386  Info("HandleSocketInput", "processing enqueued message of type %d; left: %d",
1387  what, fQueuedMsg->GetSize());
1388  all = 1;
1389  SafeDelete(mess);
1390  mess = (TMessage *) fQueuedMsg->First();
1391  if (mess) fQueuedMsg->Remove(mess);
1392  doit = 1;
1393  }
1394  }
1395 
1396  } catch (std::bad_alloc &) {
1397  // Memory allocation problem:
1398  exmsg.Form("caught exception 'bad_alloc' (memory leak?) %s %lld",
1399  fgLastMsg.Data(), fgLastEntry);
1400  } catch (std::exception &exc) {
1401  // Standard exception caught
1402  exmsg.Form("caught standard exception '%s' %s %lld",
1403  exc.what(), fgLastMsg.Data(), fgLastEntry);
1404  } catch (int i) {
1405  // Other exception caught
1406  exmsg.Form("caught exception throwing %d %s %lld",
1407  i, fgLastMsg.Data(), fgLastEntry);
1408  } catch (const char *str) {
1409  // Other exception caught
1410  exmsg.Form("caught exception throwing '%s' %s %lld",
1411  str, fgLastMsg.Data(), fgLastEntry);
1412  } catch (...) {
1413  // Caught other exception
1414  exmsg.Form("caught exception <unknown> %s %lld",
1415  fgLastMsg.Data(), fgLastEntry);
1416  }
1417 
1418  // Terminate on exception
1419  if (!exmsg.IsNull()) {
1420  // Save info in the log file too
1421  Error("HandleSocketInput", "%s", exmsg.Data());
1422  // Try to warn the user
1423  SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1424  // Terminate
1425  Terminate(0);
1426  }
1427 
1428  // Terminate also if a high memory footprint was detected before the related
1429  // exception was thrwon
1430  if (TestBit(TProofServ::kHighMemory)) {
1431  // Save info in the log file too
1432  exmsg.Form("high-memory footprint detected during Process(...) - terminating");
1433  Error("HandleSocketInput", "%s", exmsg.Data());
1434  // Try to warn the user
1435  SendAsynMessage(TString::Format("%s: %s", GetOrdinal(), exmsg.Data()));
1436  // Terminate
1437  Terminate(0);
1438  }
1439 
1440  fgRecursive--;
1441 
1442  if (fProof) {
1443  // If something wrong went on during processing and we do not have
1444  // any worker anymore, we shutdown this session
1445  Bool_t masterOnly = gEnv->GetValue("Proof.MasterOnly", kFALSE);
1446  Bool_t dynamicStartup = gEnv->GetValue("Proof.DynamicStartup", kFALSE);
1447  Int_t ngwrks = fProof->GetListOfActiveSlaves()->GetSize() + fProof->GetListOfInactiveSlaves()->GetSize();
1448  if (rc == 0 && ngwrks == 0 && !masterOnly && !dynamicStartup) {
1449  SendAsynMessage(" *** No workers left: cannot continue! Terminating ... *** ");
1450  Terminate(0);
1451  }
1452  fProof->SetActive(kFALSE);
1453  // Reset PROOF to running state
1454  fProof->SetRunStatus(TProof::kRunning);
1455  }
1456 
1457  // Cleanup
1458  SafeDelete(mess);
1459 }
1460 
1461 ////////////////////////////////////////////////////////////////////////////////
1462 /// Process input coming from the client or from the master server.
1463 /// If 'all' is kFALSE, process only those messages that can be handled
1464 /// during query processing.
1465 /// Returns -1 if the message could not be processed, <-1 if something went
1466 /// wrong. Returns 1 if the action may have changed the parallel state.
1467 /// Returns 2 if the message has to be enqueued.
1468 /// Returns 0 otherwise
1469 
1470 Int_t TProofServ::HandleSocketInput(TMessage *mess, Bool_t all)
1471 {
1472  static TStopwatch timer;
1473  char str[2048];
1474  Bool_t aborted = kFALSE;
1475 
1476  if (!mess) return -3;
1477 
1478  Int_t what = mess->What();
1479  PDB(kCollect, 1)
1480  Info("HandleSocketInput", "processing message type %d from '%s'",
1481  what, fSocket->GetTitle());
1482 
1483  timer.Start();
1484 
1485  Int_t rc = 0, lirc = 0;
1486  TString slb;
1487  TString *pslb = (fgLogToSysLog > 0) ? &slb : (TString *)0;
1488 
1489  switch (what) {
1490 
1491  case kMESS_CINT:
1492  if (all) {
1493  mess->ReadString(str, sizeof(str));
1494  // Make sure that the relevant files are available
1495  TString fn;
1496 
1497  Bool_t hasfn = TProof::GetFileInCmd(str, fn);
1498 
1499  if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
1500  fProof->SendCommand(str);
1501  } else {
1502  PDB(kGlobal, 1)
1503  Info("HandleSocketInput:kMESS_CINT", "processing: %s...", str);
1504  TString ocwd;
1505  if (hasfn) {
1506  fCacheLock->Lock();
1507  ocwd = gSystem->WorkingDirectory();
1508  gSystem->ChangeDirectory(fCacheDir.Data());
1509  }
1510  ProcessLine(str);
1511  if (hasfn) {
1512  gSystem->ChangeDirectory(ocwd);
1513  fCacheLock->Unlock();
1514  }
1515  }
1516 
1517  LogToMaster();
1518  } else {
1519  rc = -1;
1520  }
1521  SendLogFile();
1522  if (pslb) slb = str;
1523  break;
1524 
1525  case kMESS_STRING:
1526  if (all) {
1527  mess->ReadString(str, sizeof(str));
1528  } else {
1529  rc = -1;
1530  }
1531  break;
1532 
1533  case kMESS_OBJECT:
1534  if (all) {
1535  mess->ReadObject(mess->GetClass());
1536  } else {
1537  rc = -1;
1538  }
1539  break;
1540 
1541  case kPROOF_GROUPVIEW:
1542  if (all) {
1543  mess->ReadString(str, sizeof(str));
1544  // coverity[secure_coding]
1545  sscanf(str, "%d %d", &fGroupId, &fGroupSize);
1546  } else {
1547  rc = -1;
1548  }
1549  break;
1550 
1551  case kPROOF_LOGLEVEL:
1552  { UInt_t mask;
1553  mess->ReadString(str, sizeof(str));
1554  sscanf(str, "%d %u", &fLogLevel, &mask);
1555  Bool_t levelchanged = (fLogLevel != gProofDebugLevel) ? kTRUE : kFALSE;
1556  gProofDebugLevel = fLogLevel;
1557  gProofDebugMask = (TProofDebug::EProofDebugMask) mask;
1558  if (levelchanged)
1559  Info("HandleSocketInput:kPROOF_LOGLEVEL", "debug level set to %d (mask: 0x%x)",
1560  gProofDebugLevel, gProofDebugMask);
1561  if (IsMaster())
1562  fProof->SetLogLevel(fLogLevel, mask);
1563  }
1564  break;
1565 
1566  case kPROOF_PING:
1567  { if (IsMaster())
1568  fProof->Ping();
1569  // do nothing (ping is already acknowledged)
1570  }
1571  break;
1572 
1573  case kPROOF_PRINT:
1574  mess->ReadString(str, sizeof(str));
1575  Print(str);
1576  LogToMaster();
1577  SendLogFile();
1578  break;
1579 
1580  case kPROOF_RESET:
1581  if (all) {
1582  mess->ReadString(str, sizeof(str));
1583  Reset(str);
1584  } else {
1585  rc = -1;
1586  }
1587  break;
1588 
1589  case kPROOF_STATUS:
1590  Warning("HandleSocketInput:kPROOF_STATUS",
1591  "kPROOF_STATUS message is obsolete");
1592  if (fSocket->Send(fProof->GetParallel(), kPROOF_STATUS) < 0)
1593  Warning("HandleSocketInput:kPROOF_STATUS", "problem sending of request");
1594  break;
1595 
1596  case kPROOF_GETSTATS:
1597  SendStatistics();
1598  break;
1599 
1600  case kPROOF_GETPARALLEL:
1601  SendParallel();
1602  break;
1603 
1604  case kPROOF_STOP:
1605  if (all) {
1606  if (IsMaster()) {
1607  TString ord;
1608  *mess >> ord;
1609  PDB(kGlobal, 1)
1610  Info("HandleSocketInput:kPROOF_STOP", "request for worker %s", ord.Data());
1611  if (fProof) fProof->TerminateWorker(ord);
1612  } else {
1613  PDB(kGlobal, 1)
1614  Info("HandleSocketInput:kPROOF_STOP", "got request to terminate");
1615  Terminate(0);
1616  }
1617  } else {
1618  rc = -1;
1619  }
1620  break;
1621 
1622  case kPROOF_STOPPROCESS:
1623  if (all) {
1624  // this message makes only sense when the query is being processed,
1625  // however the message can also be received if the user pressed
1626  // ctrl-c, so ignore it!
1627  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_STOPPROCESS","enter");
1628  } else {
1629  Long_t timeout = -1;
1630  (*mess) >> aborted;
1631  if (fProtocol > 9)
1632  (*mess) >> timeout;
1633  PDB(kGlobal, 1)
1634  Info("HandleSocketInput:kPROOF_STOPPROCESS",
1635  "recursive mode: enter %d, %ld", aborted, timeout);
1636  if (fProof)
1637  // On the master: propagate further
1638  fProof->StopProcess(aborted, timeout);
1639  else
1640  // Worker: actually stop processing
1641  if (fPlayer)
1642  fPlayer->StopProcess(aborted, timeout);
1643  }
1644  break;
1645 
1646  case kPROOF_PROCESS:
1647  {
1648  TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
1649  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_PROCESS","enter");
1650  HandleProcess(mess, pslb);
1651  // The log file is send either in HandleProcess or HandleSubmergers.
1652  // The reason is that the order of various messages depend on the
1653  // processing mode (sync/async) and/or merging mode
1654  }
1655  break;
1656 
1657  case kPROOF_SENDOUTPUT:
1658  {
1659  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_SENDOUTPUT",
1660  "worker was asked to send output to master");
1661  Int_t sorc = 0;
1662  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0) {
1663  Error("HandleSocketInput:kPROOF_SENDOUTPUT", "problems sending output list");
1664  sorc = 1;
1665  }
1666  // Signal the master that we are idle
1667  fSocket->Send(kPROOF_SETIDLE);
1668  SetIdle(kTRUE);
1669  DeletePlayer();
1670  SendLogFile(sorc);
1671  }
1672  break;
1673 
1674  case kPROOF_QUERYLIST:
1675  {
1676  HandleQueryList(mess);
1677  // Notify
1678  SendLogFile();
1679  }
1680  break;
1681 
1682  case kPROOF_REMOVE:
1683  {
1684  HandleRemove(mess, pslb);
1685  // Notify
1686  SendLogFile();
1687  }
1688  break;
1689 
1690  case kPROOF_RETRIEVE:
1691  {
1692  HandleRetrieve(mess, pslb);
1693  // Notify
1694  SendLogFile();
1695  }
1696  break;
1697 
1698  case kPROOF_ARCHIVE:
1699  {
1700  HandleArchive(mess, pslb);
1701  // Notify
1702  SendLogFile();
1703  }
1704  break;
1705 
1706  case kPROOF_MAXQUERIES:
1707  { PDB(kGlobal, 1)
1708  Info("HandleSocketInput:kPROOF_MAXQUERIES", "Enter");
1709  TMessage m(kPROOF_MAXQUERIES);
1710  m << fMaxQueries;
1711  fSocket->Send(m);
1712  // Notify
1713  SendLogFile();
1714  }
1715  break;
1716 
1717  case kPROOF_CLEANUPSESSION:
1718  if (all) {
1719  PDB(kGlobal, 1)
1720  Info("HandleSocketInput:kPROOF_CLEANUPSESSION", "Enter");
1721  TString stag;
1722  (*mess) >> stag;
1723  if (fQMgr && fQMgr->CleanupSession(stag) == 0) {
1724  Printf("Session %s cleaned up", stag.Data());
1725  } else {
1726  Printf("Could not cleanup session %s", stag.Data());
1727  }
1728  } else {
1729  rc = -1;
1730  }
1731  // Notify
1732  SendLogFile();
1733  break;
1734 
1735  case kPROOF_GETENTRIES:
1736  { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Enter");
1737  Bool_t isTree;
1738  TString filename;
1739  TString dir;
1740  TString objname("undef");
1741  Long64_t entries = -1;
1742 
1743  if (all) {
1744  (*mess) >> isTree >> filename >> dir >> objname;
1745  PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1746  "Report size of object %s (%s) in dir %s in file %s",
1747  objname.Data(), isTree ? "T" : "O",
1748  dir.Data(), filename.Data());
1749  entries = TDSet::GetEntries(isTree, filename, dir, objname);
1750  PDB(kGlobal, 2) Info("HandleSocketInput:kPROOF_GETENTRIES",
1751  "Found %lld %s", entries, isTree ? "entries" : "objects");
1752  } else {
1753  rc = -1;
1754  }
1755  TMessage answ(kPROOF_GETENTRIES);
1756  answ << entries << objname;
1757  SendLogFile(); // in case of error messages
1758  fSocket->Send(answ);
1759  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETENTRIES", "Done");
1760  }
1761  break;
1762 
1763  case kPROOF_CHECKFILE:
1764  if (!all && fProtocol <= 19) {
1765  // Come back later
1766  rc = 2;
1767  } else {
1768  // Handle file checking request
1769  HandleCheckFile(mess, pslb);
1770  FlushLogFile(); // Avoid sending (error) messages at next action
1771  }
1772  break;
1773 
1774  case kPROOF_SENDFILE:
1775  if (!all && fProtocol <= 19) {
1776  // Come back later
1777  rc = 2;
1778  } else {
1779  mess->ReadString(str, sizeof(str));
1780  Long_t size;
1781  Int_t bin, fw = 1;
1782  char name[1024];
1783  if (fProtocol > 5) {
1784  sscanf(str, "%1023s %d %ld %d", name, &bin, &size, &fw);
1785  } else {
1786  sscanf(str, "%1023s %d %ld", name, &bin, &size);
1787  }
1788  TString fnam(name);
1789  Bool_t copytocache = kTRUE;
1790  if (fnam.BeginsWith("cache:")) {
1791  fnam.ReplaceAll("cache:", TString::Format("%s/", fCacheDir.Data()));
1792  copytocache = kFALSE;
1793  }
1794 
1795  Int_t rfrc = 0;
1796  if (size > 0) {
1797  rfrc = ReceiveFile(fnam, bin ? kTRUE : kFALSE, size);
1798  } else {
1799  // Take it from the cache
1800  if (!fnam.BeginsWith(fCacheDir.Data())) {
1801  fnam.Insert(0, TString::Format("%s/", fCacheDir.Data()));
1802  }
1803  }
1804  if (rfrc == 0) {
1805  // copy file to cache if not a PAR file
1806  if (copytocache && size > 0 && !fPackMgr->IsInDir(name))
1807  gSystem->Exec(TString::Format("%s %s %s", kCP, fnam.Data(), fCacheDir.Data()));
1808  if (IsMaster() && fw == 1) {
1809  Int_t opt = TProof::kForward | TProof::kCp;
1810  if (bin)
1811  opt |= TProof::kBinary;
1812  PDB(kGlobal, 1)
1813  Info("HandleSocketInput","forwarding file: %s", fnam.Data());
1814  if (fProof->SendFile(fnam, opt, (copytocache ? "cache" : "")) < 0) {
1815  Error("HandleSocketInput", "forwarding file: %s", fnam.Data());
1816  }
1817  }
1818  if (fProtocol > 19) fSocket->Send(kPROOF_SENDFILE);
1819  } else {
1820  // There was an error
1821  SendLogFile(1);
1822  }
1823  }
1824  break;
1825 
1826  case kPROOF_LOGFILE:
1827  {
1828  Int_t start, end;
1829  (*mess) >> start >> end;
1830  PDB(kGlobal, 1)
1831  Info("HandleSocketInput:kPROOF_LOGFILE",
1832  "Logfile request - byte range: %d - %d", start, end);
1833 
1834  LogToMaster();
1835  SendLogFile(0, start, end);
1836  }
1837  break;
1838 
1839  case kPROOF_PARALLEL:
1840  if (all) {
1841  if (IsMaster()) {
1842  Int_t nodes;
1843  Bool_t random = kFALSE;
1844  (*mess) >> nodes;
1845  if ((mess->BufferSize() > mess->Length()))
1846  (*mess) >> random;
1847  if (fProof) fProof->SetParallel(nodes, random);
1848  rc = 1;
1849  }
1850  } else {
1851  rc = -1;
1852  }
1853  // Notify
1854  SendLogFile();
1855  break;
1856 
1857  case kPROOF_CACHE:
1858  if (!all && fProtocol <= 19) {
1859  // Come back later
1860  rc = 2;
1861  } else {
1862  TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
1863  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_CACHE","enter");
1864  Int_t hcrc = HandleCache(mess, pslb);
1865  // Notify
1866  SendLogFile(hcrc);
1867  }
1868  break;
1869 
1870  case kPROOF_WORKERLISTS:
1871  { Int_t wlrc = -1;
1872  if (all) {
1873  if (IsMaster())
1874  wlrc = HandleWorkerLists(mess);
1875  else
1876  Warning("HandleSocketInput:kPROOF_WORKERLISTS",
1877  "Action meaning-less on worker nodes: protocol error?");
1878  } else {
1879  rc = -1;
1880  }
1881  // Notify
1882  SendLogFile(wlrc);
1883  }
1884  break;
1885 
1886  case kPROOF_GETSLAVEINFO:
1887  if (all) {
1888  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Enter");
1889  if (IsMaster()) {
1890 
1891  Bool_t ok = kTRUE;
1892  // if the session does not have workers and is in the dynamic mode
1893  if (fProof->UseDynamicStartup()) {
1894  ok = kFALSE;
1895  // get the a list of workers and start them
1896  Int_t pc = 0;
1897  TList* workerList = new TList();
1898  EQueryAction retVal = GetWorkers(workerList, pc);
1899  if (retVal != TProofServ::kQueryStop && retVal != TProofServ::kQueryEnqueued) {
1900  Int_t ret = fProof->AddWorkers(workerList);
1901  if (ret < 0) {
1902  Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1903  "adding a list of worker nodes returned: %d", ret);
1904  }
1905  } else {
1906  Error("HandleSocketInput:kPROOF_GETSLAVEINFO",
1907  "getting list of worker nodes returned: %d", retVal);
1908  }
1909  ok = kTRUE;
1910  }
1911  if (ok) {
1912  TList *info = fProof->GetListOfSlaveInfos();
1913  TMessage answ(kPROOF_GETSLAVEINFO);
1914  answ << info;
1915  fSocket->Send(answ);
1916  // stop the workers
1917  if (IsMaster() && fProof->UseDynamicStartup()) fProof->RemoveWorkers(0);
1918  }
1919  } else {
1920  TMessage answ(kPROOF_GETSLAVEINFO);
1921  TList *info = new TList;
1922  TSlaveInfo *wi = new TSlaveInfo(GetOrdinal(), TUrl(gSystem->HostName()).GetHostFQDN(), 0, "", GetDataDir());
1923  SysInfo_t si;
1924  gSystem->GetSysInfo(&si);
1925  wi->SetSysInfo(si);
1926  info->Add(wi);
1927  answ << (TList *)info;
1928  fSocket->Send(answ);
1929  info->SetOwner(kTRUE);
1930  delete info;
1931  }
1932 
1933  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETSLAVEINFO", "Done");
1934  } else {
1935  TMessage answ(kPROOF_GETSLAVEINFO);
1936  answ << (TList *)0;
1937  fSocket->Send(answ);
1938  rc = -1;
1939  }
1940  break;
1941 
1942  case kPROOF_GETTREEHEADER:
1943  if (all) {
1944  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Enter");
1945 
1946  TVirtualProofPlayer *p = TVirtualProofPlayer::Create("slave", 0, fSocket);
1947  if (p) {
1948  p->HandleGetTreeHeader(mess);
1949  delete p;
1950  } else {
1951  Error("HandleSocketInput:kPROOF_GETTREEHEADER", "could not create TProofPlayer instance!");
1952  }
1953 
1954  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETTREEHEADER", "Done");
1955  } else {
1956  TMessage answ(kPROOF_GETTREEHEADER);
1957  answ << TString("Failed") << (TObject *)0;
1958  fSocket->Send(answ);
1959  rc = -1;
1960  }
1961  break;
1962 
1963  case kPROOF_GETOUTPUTLIST:
1964  { PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Enter");
1965  TList* outputList = 0;
1966  if (IsMaster()) {
1967  outputList = fProof->GetOutputList();
1968  if (!outputList)
1969  outputList = new TList();
1970  } else {
1971  outputList = new TList();
1972  if (fProof->GetPlayer()) {
1973  TList *olist = fProof->GetPlayer()->GetOutputList();
1974  TIter next(olist);
1975  TObject *o;
1976  while ( (o = next()) ) {
1977  outputList->Add(new TNamed(o->GetName(), ""));
1978  }
1979  }
1980  }
1981  outputList->SetOwner();
1982  TMessage answ(kPROOF_GETOUTPUTLIST);
1983  answ << outputList;
1984  fSocket->Send(answ);
1985  delete outputList;
1986  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_GETOUTPUTLIST", "Done");
1987  }
1988  break;
1989 
1990  case kPROOF_VALIDATE_DSET:
1991  if (all) {
1992  PDB(kGlobal, 1)
1993  Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Enter");
1994 
1995  TDSet* dset = 0;
1996  (*mess) >> dset;
1997 
1998  if (IsMaster()) fProof->ValidateDSet(dset);
1999  else dset->Validate();
2000 
2001  TMessage answ(kPROOF_VALIDATE_DSET);
2002  answ << dset;
2003  fSocket->Send(answ);
2004  delete dset;
2005  PDB(kGlobal, 1)
2006  Info("HandleSocketInput:kPROOF_VALIDATE_DSET", "Done");
2007  } else {
2008  rc = -1;
2009  }
2010  // Notify
2011  SendLogFile();
2012  break;
2013 
2014  case kPROOF_DATA_READY:
2015  if (all) {
2016  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Enter");
2017  TMessage answ(kPROOF_DATA_READY);
2018  if (IsMaster()) {
2019  Long64_t totalbytes = 0, bytesready = 0;
2020  Bool_t dataready = fProof->IsDataReady(totalbytes, bytesready);
2021  answ << dataready << totalbytes << bytesready;
2022  } else {
2023  Error("HandleSocketInput:kPROOF_DATA_READY",
2024  "This message should not be sent to slaves");
2025  answ << kFALSE << Long64_t(0) << Long64_t(0);
2026  }
2027  fSocket->Send(answ);
2028  PDB(kGlobal, 1) Info("HandleSocketInput:kPROOF_DATA_READY", "Done");
2029  } else {
2030  TMessage answ(kPROOF_DATA_READY);
2031  answ << kFALSE << Long64_t(0) << Long64_t(0);
2032  fSocket->Send(answ);
2033  rc = -1;
2034  }
2035  // Notify
2036  SendLogFile();
2037  break;
2038 
2039  case kPROOF_DATASETS:
2040  { Int_t dsrc = -1;
2041  if (fProtocol > 16) {
2042  dsrc = HandleDataSets(mess, pslb);
2043  } else {
2044  Error("HandleSocketInput", "old client: no or incompatible dataset support");
2045  }
2046  SendLogFile(dsrc);
2047  }
2048  break;
2049 
2050  case kPROOF_SUBMERGER:
2051  { HandleSubmerger(mess);
2052  }
2053  break;
2054 
2055  case kPROOF_LIB_INC_PATH:
2056  if (all) {
2057  lirc = HandleLibIncPath(mess);
2058  } else {
2059  rc = -1;
2060  }
2061  // Notify the client
2062  if (lirc > 0) SendLogFile();
2063  break;
2064 
2065  case kPROOF_REALTIMELOG:
2066  { Bool_t on;
2067  (*mess) >> on;
2068  PDB(kGlobal, 1)
2069  Info("HandleSocketInput:kPROOF_REALTIMELOG",
2070  "setting real-time logging %s", (on ? "ON" : "OFF"));
2071  fRealTimeLog = on;
2072  // Forward the request to lower levels
2073  if (IsMaster())
2074  fProof->SetRealTimeLog(on);
2075  }
2076  break;
2077 
2078  case kPROOF_FORK:
2079  if (all) {
2080  HandleFork(mess);
2081  LogToMaster();
2082  } else {
2083  rc = -1;
2084  }
2085  SendLogFile();
2086  break;
2087 
2088  case kPROOF_STARTPROCESS:
2089  if (all) {
2090  // This message resumes the session; should not come during processing.
2091 
2092  if (WaitingQueries() == 0) {
2093  Error("HandleSocketInput", "no queries enqueued");
2094  break;
2095  }
2096 
2097  // Similar to handle process
2098  // get the list of workers and start them
2099  TList *workerList = (fProof->UseDynamicStartup()) ? new TList : (TList *)0;
2100  Int_t pc = 0;
2101  EQueryAction retVal = GetWorkers(workerList, pc, kTRUE);
2102 
2103  if (retVal == TProofServ::kQueryOK) {
2104  Int_t ret = 0;
2105  if (workerList && (ret = fProof->AddWorkers(workerList)) < 0) {
2106  Error("HandleSocketInput", "adding a list of worker nodes returned: %d", ret);
2107  } else {
2108  ProcessNext(pslb);
2109  // Set idle
2110  SetIdle(kTRUE);
2111  // Signal the client that we are idle
2112  TMessage m(kPROOF_SETIDLE);
2113  Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
2114  m << waiting;
2115  fSocket->Send(m);
2116  }
2117  } else {
2118  if (retVal == TProofServ::kQueryStop) {
2119  Error("HandleSocketInput", "error getting list of worker nodes");
2120  } else if (retVal != TProofServ::kQueryEnqueued) {
2121  Warning("HandleSocketInput", "query was re-queued!");
2122  } else {
2123  Error("HandleSocketInput", "unexpected answer: %d", retVal);
2124  break;
2125  }
2126  }
2127 
2128  }
2129  break;
2130 
2131  case kPROOF_GOASYNC:
2132  { // The client requested to switch to asynchronous mode:
2133  // communicate the sequential number of the running query for later
2134  // identification, if any
2135  if (!IsIdle() && fPlayer) {
2136  // Get query currently being processed
2137  TProofQueryResult *pq = (TProofQueryResult *) fPlayer->GetCurrentQuery();
2138  TMessage m(kPROOF_QUERYSUBMITTED);
2139  m << pq->GetSeqNum() << kFALSE;
2140  fSocket->Send(m);
2141  } else {
2142  // Idle or undefined: nothing to do; ignore
2143  SendAsynMessage("Processing request to go asynchronous:"
2144  " idle or undefined player - ignoring");
2145  }
2146  }
2147  break;
2148 
2149  case kPROOF_ECHO:
2150  { // Echo request: an object has been sent along. If the object is a
2151  // string, it is simply echoed back to the client from the master
2152  // and each worker. Elsewhere, the output of TObject::Print() is
2153  // sent. Received object is disposed after usage.
2154 
2155  TObject *obj = mess->ReadObject(0x0); // class type ignored
2156 
2157  if (IsMaster()) {
2158  // We are on master
2159  // dbTODO: forward on dynamic startup when wrks are up
2160  if (IsParallel() && fProof && !fProof->UseDynamicStartup()) {
2161  fProof->Echo(obj); // forward to lower layer
2162  }
2163  }
2164 
2165  TMessage rmsg(kPROOF_MESSAGE);
2166  TString smsg;
2167 
2168  if (obj->InheritsFrom(TObjString::Class())) {
2169  // It's a string: echo it
2170  smsg.Form("Echo response from %s:%s: %s",
2171  gSystem->HostName(), GetOrdinal(),
2172  ((TObjString *)obj)->String().Data());
2173  }
2174  else {
2175  // Not a string: collect Print() output and send it
2176 
2177  // Output to tempfile
2178  TString tmpfn = "echo-out-";
2179  FILE *tf = gSystem->TempFileName(tmpfn, fDataDir);
2180  if (!tf || (gSystem->RedirectOutput(tmpfn.Data()) == -1)) {
2181  Error("HandleSocketInput", "Can't redirect output");
2182  if (tf) {
2183  fclose(tf);
2184  gSystem->Unlink(tmpfn);
2185  }
2186  rc = -1;
2187  delete obj;
2188  break;
2189  }
2190  //cout << obj->ClassName() << endl;
2191  obj->Print();
2192  gSystem->RedirectOutput(0x0); // restore
2193  fclose(tf);
2194 
2195  // Read file back and send it via message
2196  smsg.Form("*** Echo response from %s:%s ***\n",
2197  gSystem->HostName(), GetOrdinal());
2198  TMacro *fr = new TMacro();
2199  fr->ReadFile(tmpfn);
2200  TIter nextLine(fr->GetListOfLines());
2201  TObjString *line;
2202  while (( line = (TObjString *)nextLine() )) {
2203  smsg.Append( line->String() );
2204  }
2205 
2206  // Close the reader (TMacro) and remove file
2207  delete fr;
2208  gSystem->Unlink(tmpfn);
2209  }
2210 
2211  // Send message and dispose object
2212  rmsg << smsg;
2213  GetSocket()->Send(rmsg);
2214  delete obj;
2215  }
2216  break;
2217 
2218  default:
2219  Error("HandleSocketInput", "unknown command %d", what);
2220  rc = -2;
2221  break;
2222  }
2223 
2224  fRealTime += (Float_t)timer.RealTime();
2225  fCpuTime += (Float_t)timer.CpuTime();
2226 
2227  if (!(slb.IsNull()) || fgLogToSysLog > 1) {
2228  TString s;
2229  s.Form("%s %d %.3f %.3f %s", fgSysLogEntity.Data(),
2230  what, timer.RealTime(), timer.CpuTime(), slb.Data());
2231  gSystem->Syslog(kLogNotice, s.Data());
2232  }
2233 
2234  // Done
2235  return rc;
2236 }
2237 
2238 ////////////////////////////////////////////////////////////////////////////////
2239 /// Accept and merge results from a set of workers
2240 
2241 Bool_t TProofServ::AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer)
2242 {
2243  TMessage *mess = new TMessage();
2244  Int_t mergedWorkers = 0;
2245 
2246  PDB(kSubmerger, 1) Info("AcceptResults", "enter");
2247 
2248  // Overall result of this procedure
2249  Bool_t result = kTRUE;
2250 
2251  fMergingMonitor = new TMonitor();
2252  fMergingMonitor->Add(fMergingSocket);
2253 
2254  Int_t numworkers = 0;
2255  while (fMergingMonitor->GetActive() > 0 && mergedWorkers < connections) {
2256 
2257  TSocket *s = fMergingMonitor->Select();
2258  if (!s) {
2259  Info("AcceptResults", "interrupt!");
2260  result = kFALSE;
2261  break;
2262  }
2263 
2264  if (s == fMergingSocket) {
2265  // New incoming connection
2266  TSocket *sw = fMergingSocket->Accept();
2267  if (sw && sw != (TSocket *)(-1)) {
2268  fMergingMonitor->Add(sw);
2269 
2270  PDB(kSubmerger, 2)
2271  Info("AcceptResults", "connection from a worker accepted on merger %s ",
2272  fOrdinal.Data());
2273  // All assigned workers are connected
2274  if (++numworkers >= connections)
2275  fMergingMonitor->Remove(fMergingSocket);
2276  } else {
2277  PDB(kSubmerger, 1)
2278  Info("AcceptResults", "spurious signal found of merging socket");
2279  }
2280  } else {
2281  if (s->Recv(mess) < 0) {
2282  Error("AcceptResults", "problems receiving message");
2283  continue;
2284  }
2285  PDB(kSubmerger, 2)
2286  Info("AcceptResults", "message received: %d ", (mess ? mess->What() : 0));
2287  if (!mess) {
2288  Error("AcceptResults", "message received: %p ", mess);
2289  continue;
2290  }
2291  Int_t type = 0;
2292 
2293  // Read output objec(s) from the received message
2294  while ((mess->BufferSize() > mess->Length())) {
2295  (*mess) >> type;
2296 
2297  PDB(kSubmerger, 2) Info("AcceptResults", " type %d ", type);
2298  if (type == 2) {
2299  mergedWorkers++;
2300  PDB(kSubmerger, 2)
2301  Info("AcceptResults",
2302  "a new worker has been mergerd. Total merged workers: %d",
2303  mergedWorkers);
2304  }
2305  TObject *o = mess->ReadObject(TObject::Class());
2306  if (mergerPlayer->AddOutputObject(o) == 1) {
2307  // Remove the object if it has been merged
2308  PDB(kSubmerger, 2) Info("AcceptResults", "removing %p (has been merged)", o);
2309  SafeDelete(o);
2310  } else
2311  PDB(kSubmerger, 2) Info("AcceptResults", "%p not merged yet", o);
2312  }
2313  }
2314  }
2315  fMergingMonitor->DeActivateAll();
2316 
2317  TList* sockets = fMergingMonitor->GetListOfDeActives();
2318  Int_t size = sockets->GetSize();
2319  for (Int_t i =0; i< size; ++i){
2320  ((TSocket*)(sockets->At(i)))->Close();
2321  PDB(kSubmerger, 2) Info("AcceptResults", "closing socket");
2322  delete ((TSocket*)(sockets->At(i)));
2323  }
2324 
2325  fMergingMonitor->RemoveAll();
2326  SafeDelete(fMergingMonitor);
2327 
2328  PDB(kSubmerger, 2) Info("AcceptResults", "exit: %d", result);
2329  return result;
2330 }
2331 
2332 ////////////////////////////////////////////////////////////////////////////////
2333 /// Handle Out-Of-Band data sent by the master or client.
2334 
2335 void TProofServ::HandleUrgentData()
2336 {
2337  char oob_byte;
2338  Int_t n, nch, wasted = 0;
2339 
2340  const Int_t kBufSize = 1024;
2341  char waste[kBufSize];
2342 
2343  // Real-time notification of messages
2344  TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
2345 
2346  PDB(kGlobal, 5)
2347  Info("HandleUrgentData", "handling oob...");
2348 
2349  // Receive the OOB byte
2350  while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
2351  if (n == -2) { // EWOULDBLOCK
2352  //
2353  // The OOB data has not yet arrived: flush the input stream
2354  //
2355  // In some systems (Solaris) regular recv() does not return upon
2356  // receipt of the oob byte, which makes the below call to recv()
2357  // block indefinitely if there are no other data in the queue.
2358  // FIONREAD ioctl can be used to check if there are actually any
2359  // data to be flushed. If not, wait for a while for the oob byte
2360  // to arrive and try to read it again.
2361  //
2362  fSocket->GetOption(kBytesToRead, nch);
2363  if (nch == 0) {
2364  gSystem->Sleep(1000);
2365  continue;
2366  }
2367 
2368  if (nch > kBufSize) nch = kBufSize;
2369  n = fSocket->RecvRaw(waste, nch);
2370  if (n <= 0) {
2371  Error("HandleUrgentData", "error receiving waste");
2372  break;
2373  }
2374  wasted = 1;
2375  } else {
2376  Error("HandleUrgentData", "error receiving OOB");
2377  return;
2378  }
2379  }
2380 
2381  PDB(kGlobal, 5)
2382  Info("HandleUrgentData", "got OOB byte: %d\n", oob_byte);
2383 
2384  if (fProof) fProof->SetActive();
2385 
2386  switch (oob_byte) {
2387 
2388  case TProof::kHardInterrupt:
2389  Info("HandleUrgentData", "*** Hard Interrupt");
2390 
2391  // If master server, propagate interrupt to slaves
2392  if (IsMaster())
2393  fProof->Interrupt(TProof::kHardInterrupt);
2394 
2395  // Flush input socket
2396  while (1) {
2397  Int_t atmark;
2398 
2399  fSocket->GetOption(kAtMark, atmark);
2400 
2401  if (atmark) {
2402  // Send the OOB byte back so that the client knows where
2403  // to stop flushing its input stream of obsolete messages
2404  n = fSocket->SendRaw(&oob_byte, 1, kOob);
2405  if (n <= 0)
2406  Error("HandleUrgentData", "error sending OOB");
2407  break;
2408  }
2409 
2410  // find out number of bytes to read before atmark
2411  fSocket->GetOption(kBytesToRead, nch);
2412  if (nch == 0) {
2413  gSystem->Sleep(1000);
2414  continue;
2415  }
2416 
2417  if (nch > kBufSize) nch = kBufSize;
2418  n = fSocket->RecvRaw(waste, nch);
2419  if (n <= 0) {
2420  Error("HandleUrgentData", "error receiving waste (2)");
2421  break;
2422  }
2423  }
2424 
2425  SendLogFile();
2426 
2427  break;
2428 
2429  case TProof::kSoftInterrupt:
2430  Info("HandleUrgentData", "Soft Interrupt");
2431 
2432  // If master server, propagate interrupt to slaves
2433  if (IsMaster())
2434  fProof->Interrupt(TProof::kSoftInterrupt);
2435 
2436  if (wasted) {
2437  Error("HandleUrgentData", "soft interrupt flushed stream");
2438  break;
2439  }
2440 
2441  Interrupt();
2442 
2443  SendLogFile();
2444 
2445  break;
2446 
2447  case TProof::kShutdownInterrupt:
2448  Info("HandleUrgentData", "Shutdown Interrupt");
2449 
2450  // If master server, propagate interrupt to slaves
2451  if (IsMaster())
2452  fProof->Interrupt(TProof::kShutdownInterrupt);
2453 
2454  Terminate(0);
2455 
2456  break;
2457 
2458  default:
2459  Error("HandleUrgentData", "unexpected OOB byte");
2460  break;
2461  }
2462 
2463  if (fProof) fProof->SetActive(kFALSE);
2464 }
2465 
2466 ////////////////////////////////////////////////////////////////////////////////
2467 /// Called when the client is not alive anymore (i.e. when kKeepAlive
2468 /// has failed).
2469 
2470 void TProofServ::HandleSigPipe()
2471 {
2472  // Real-time notification of messages
2473  TProofServLogHandlerGuard hg(fLogFile, fSocket, "", fRealTimeLog);
2474 
2475  if (IsMaster()) {
2476  // Check if we are here because client is closed. Try to ping client,
2477  // if that works it we are here because some slave died
2478  if (fSocket->Send(kPROOF_PING | kMESS_ACK) < 0) {
2479  Info("HandleSigPipe", "keepAlive probe failed");
2480  // Tell slaves we are going to close since there is no client anymore
2481 
2482  fProof->SetActive();
2483  fProof->Interrupt(TProof::kShutdownInterrupt);
2484  fProof->SetActive(kFALSE);
2485  Terminate(0);
2486  }
2487  } else {
2488  Info("HandleSigPipe", "keepAlive probe failed");
2489  Terminate(0); // will not return from here....
2490  }
2491 }
2492 
2493 ////////////////////////////////////////////////////////////////////////////////
2494 /// True if in parallel mode.
2495 
2496 Bool_t TProofServ::IsParallel() const
2497 {
2498  if (IsMaster() && fProof)
2499  return fProof->IsParallel() || fProof->UseDynamicStartup() ;
2500 
2501  // false in case we are a slave
2502  return kFALSE;
2503 }
2504 
2505 ////////////////////////////////////////////////////////////////////////////////
2506 /// Print status of slave server.
2507 
2508 void TProofServ::Print(Option_t *option) const
2509 {
2510  if (IsMaster() && fProof)
2511  fProof->Print(option);
2512  else
2513  Printf("This is worker %s", gSystem->HostName());
2514 }
2515 
2516 ////////////////////////////////////////////////////////////////////////////////
2517 /// Redirect stdout to a log file. This log file will be flushed to the
2518 /// client or master after each command.
2519 
2520 void TProofServ::RedirectOutput(const char *dir, const char *mode)
2521 {
2522  char logfile[512];
2523 
2524  TString sdir = (dir && strlen(dir) > 0) ? dir : fSessionDir.Data();
2525  if (IsMaster()) {
2526  snprintf(logfile, 512, "%s/master-%s.log", sdir.Data(), fOrdinal.Data());
2527  } else {
2528  snprintf(logfile, 512, "%s/worker-%s.log", sdir.Data(), fOrdinal.Data());
2529  }
2530 
2531  if ((freopen(logfile, mode, stdout)) == 0)
2532  SysError("RedirectOutput", "could not freopen stdout (%s)", logfile);
2533 
2534  if ((dup2(fileno(stdout), fileno(stderr))) < 0)
2535  SysError("RedirectOutput", "could not redirect stderr");
2536 
2537  if ((fLogFile = fopen(logfile, "r")) == 0)
2538  SysError("RedirectOutput", "could not open logfile '%s'", logfile);
2539 
2540  // from this point on stdout and stderr are properly redirected
2541  if (fProtocol < 4 && fWorkDir != TString::Format("~/%s", kPROOF_WorkDir)) {
2542  Warning("RedirectOutput", "no way to tell master (or client) where"
2543  " to upload packages");
2544  }
2545 }
2546 
2547 ////////////////////////////////////////////////////////////////////////////////
2548 /// Reset PROOF environment to be ready for execution of next command.
2549 
2550 void TProofServ::Reset(const char *dir)
2551 {
2552  // First go to new directory. Check first that we got a reasonable path;
2553  // in PROOF-Lite it may not be the case
2554  TString dd(dir);
2555  if (!dd.BeginsWith("proofserv")) {
2556  Int_t ic = dd.Index(":");
2557  if (ic != kNPOS)
2558  dd.Replace(0, ic, "proofserv");
2559  }
2560  gDirectory->cd(dd.Data());
2561 
2562  // Clear interpreter environment.
2563  gROOT->Reset();
2564 
2565  // Make sure current directory is empty (don't delete anything when
2566  // we happen to be in the ROOT memory only directory!?)
2567  if (gDirectory != gROOT) {
2568  gDirectory->Delete();
2569  }
2570 
2571  if (IsMaster()) fProof->SendCurrentState();
2572 }
2573 
2574 ////////////////////////////////////////////////////////////////////////////////
2575 /// Receive a file, either sent by a client or a master server.
2576 /// If bin is true it is a binary file, other wise it is an ASCII
2577 /// file and we need to check for Windows \r tokens. Returns -1 in
2578 /// case of error, 0 otherwise.
2579 
2580 Int_t TProofServ::ReceiveFile(const char *file, Bool_t bin, Long64_t size)
2581 {
2582  if (size <= 0) return 0;
2583 
2584  // open file, overwrite already existing file
2585  Int_t fd = open(file, O_CREAT | O_TRUNC | O_WRONLY, 0600);
2586  if (fd < 0) {
2587  SysError("ReceiveFile", "error opening file %s", file);
2588  return -1;
2589  }
2590 
2591  const Int_t kMAXBUF = 16384; //32768 //16384 //65536;
2592  char buf[kMAXBUF], cpy[kMAXBUF];
2593 
2594  Int_t left, r;
2595  Long64_t filesize = 0;
2596 
2597  while (filesize < size) {
2598  left = Int_t(size - filesize);
2599  if (left > kMAXBUF)
2600  left = kMAXBUF;
2601  r = fSocket->RecvRaw(&buf, left);
2602  if (r > 0) {
2603  char *p = buf;
2604 
2605  filesize += r;
2606  while (r) {
2607  Int_t w;
2608 
2609  if (!bin) {
2610  Int_t k = 0, i = 0, j = 0;
2611  char *q;
2612  while (i < r) {
2613  if (p[i] == '\r') {
2614  i++;
2615  k++;
2616  }
2617  cpy[j++] = buf[i++];
2618  }
2619  q = cpy;
2620  r -= k;
2621  w = write(fd, q, r);
2622  } else {
2623  w = write(fd, p, r);
2624  }
2625 
2626  if (w < 0) {
2627  SysError("ReceiveFile", "error writing to file %s", file);
2628  close(fd);
2629  return -1;
2630  }
2631  r -= w;
2632  p += w;
2633  }
2634  } else if (r < 0) {
2635  Error("ReceiveFile", "error during receiving file %s", file);
2636  close(fd);
2637  return -1;
2638  }
2639  }
2640 
2641  close(fd);
2642 
2643  if (chmod(file, 0644) != 0)
2644  Warning("ReceiveFile", "error setting mode 0644 on file %s", file);
2645 
2646  return 0;
2647 }
2648 
2649 ////////////////////////////////////////////////////////////////////////////////
2650 /// Main server eventloop.
2651 
2652 void TProofServ::Run(Bool_t retrn)
2653 {
2654  // Setup the server
2655  if (CreateServer() == 0) {
2656 
2657  // Run the main event loop
2658  TApplication::Run(retrn);
2659  }
2660 }
2661 
2662 ////////////////////////////////////////////////////////////////////////////////
2663 /// Send log file to master.
2664 /// If start > -1 send only bytes in the range from start to end,
2665 /// if end <= start send everything from start.
2666 
2667 void TProofServ::SendLogFile(Int_t status, Int_t start, Int_t end)
2668 {
2669  // Determine the number of bytes left to be read from the log file.
2670  fflush(stdout);
2671 
2672  // On workers we do not send the logs to masters (to avoid duplication of
2673  // text) unless asked explicitly, e.g. after an Exec(...) request.
2674  if (!IsMaster()) {
2675  if (!fSendLogToMaster) {
2676  FlushLogFile();
2677  } else {
2678  // Decide case by case
2679  LogToMaster(kFALSE);
2680  }
2681  }
2682 
2683  off_t ltot=0, lnow=0;
2684  Int_t left = -1;
2685  Bool_t adhoc = kFALSE;
2686 
2687  if (fLogFileDes > -1) {
2688  ltot = lseek(fileno(stdout), (off_t) 0, SEEK_END);
2689  lnow = lseek(fLogFileDes, (off_t) 0, SEEK_CUR);
2690 
2691  if (ltot >= 0 && lnow >= 0) {
2692  if (start > -1) {
2693  lseek(fLogFileDes, (off_t) start, SEEK_SET);
2694  if (end <= start || end > ltot)
2695  end = ltot;
2696  left = (Int_t)(end - start);
2697  if (end < ltot)
2698  left++;
2699  adhoc = kTRUE;
2700  } else {
2701  left = (Int_t)(ltot - lnow);
2702  }
2703  }
2704  }
2705 
2706  if (left > 0) {
2707  if (fSocket->Send(left, kPROOF_LOGFILE) < 0) {
2708  SysError("SendLogFile", "error sending kPROOF_LOGFILE");
2709  return;
2710  }
2711 
2712  const Int_t kMAXBUF = 32768; //16384 //65536;
2713  char buf[kMAXBUF];
2714  Int_t wanted = (left > kMAXBUF) ? kMAXBUF : left;
2715  Int_t len;
2716  do {
2717  while ((len = read(fLogFileDes, buf, wanted)) < 0 &&
2718  TSystem::GetErrno() == EINTR)
2719  TSystem::ResetErrno();
2720 
2721  if (len < 0) {
2722  SysError("SendLogFile", "error reading log file");
2723  break;
2724  }
2725 
2726  if (end == ltot && len == wanted)
2727  buf[len-1] = '\n';
2728 
2729  if (fSocket->SendRaw(buf, len) < 0) {
2730  SysError("SendLogFile", "error sending log file");
2731  break;
2732  }
2733 
2734  // Update counters
2735  left -= len;
2736  wanted = (left > kMAXBUF) ? kMAXBUF : left;
2737 
2738  } while (len > 0 && left > 0);
2739  }
2740 
2741  // Restore initial position if partial send
2742  if (adhoc && lnow >=0 )
2743  lseek(fLogFileDes, lnow, SEEK_SET);
2744 
2745  TMessage mess(kPROOF_LOGDONE);
2746  if (IsMaster())
2747  mess << status << (fProof ? fProof->GetParallel() : 0);
2748  else
2749  mess << status << (Int_t) 1;
2750 
2751  if (fSocket->Send(mess) < 0) {
2752  SysError("SendLogFile", "error sending kPROOF_LOGDONE");
2753  return;
2754  }
2755 
2756  PDB(kGlobal, 1) Info("SendLogFile", "kPROOF_LOGDONE sent");
2757 }
2758 
2759 ////////////////////////////////////////////////////////////////////////////////
2760 /// Send statistics of slave server to master or client.
2761 
2762 void TProofServ::SendStatistics()
2763 {
2764  Long64_t bytesread = TFile::GetFileBytesRead();
2765  Float_t cputime = fCpuTime, realtime = fRealTime;
2766  if (IsMaster()) {
2767  bytesread = fProof->GetBytesRead();
2768  cputime = fProof->GetCpuTime();
2769  }
2770 
2771  TMessage mess(kPROOF_GETSTATS);
2772  TString workdir = gSystem->WorkingDirectory(); // expect TString on other side
2773  mess << bytesread << realtime << cputime << workdir;
2774  if (fProtocol >= 4) mess << TString(gProofServ->GetWorkDir());
2775  mess << TString(gProofServ->GetImage());
2776  fSocket->Send(mess);
2777 }
2778 
2779 ////////////////////////////////////////////////////////////////////////////////
2780 /// Send number of parallel nodes to master or client.
2781 
2782 void TProofServ::SendParallel(Bool_t async)
2783 {
2784  Int_t nparallel = 0;
2785  if (IsMaster()) {
2786  PDB(kGlobal, 2)
2787  Info("SendParallel", "Will invoke AskParallel()");
2788  fProof->AskParallel();
2789  PDB(kGlobal, 2)
2790  Info("SendParallel", "Will invoke GetParallel()");
2791  nparallel = fProof->GetParallel();
2792  } else {
2793  nparallel = 1;
2794  }
2795 
2796  TMessage mess(kPROOF_GETPARALLEL);
2797  mess << nparallel << async;
2798  fSocket->Send(mess);
2799 }
2800 
2801 ////////////////////////////////////////////////////////////////////////////////
2802 /// Print the ProofServ logo on standard output.
2803 /// Return 0 on success, -1 on failure
2804 
2805 Int_t TProofServ::Setup()
2806 {
2807  char str[512];
2808 
2809  if (IsMaster()) {
2810  snprintf(str, 512, "**** Welcome to the PROOF server @ %s ****", gSystem->HostName());
2811  } else {
2812  snprintf(str, 512, "**** PROOF slave server @ %s started ****", gSystem->HostName());
2813  }
2814 
2815  if (fSocket->Send(str) != 1+static_cast<Int_t>(strlen(str))) {
2816  Error("Setup", "failed to send proof server startup message");
2817  return -1;
2818  }
2819 
2820  // exchange protocol level between client and master and between
2821  // master and slave
2822  Int_t what;
2823  if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
2824  Error("Setup", "failed to receive remote proof protocol");
2825  return -1;
2826  }
2827  if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
2828  Error("Setup", "failed to send local proof protocol");
2829  return -1;
2830  }
2831 
2832  // If old version, setup authentication related stuff
2833  if (fProtocol < 5) {
2834  TString wconf;
2835  if (OldAuthSetup(wconf) != 0) {
2836  Error("Setup", "OldAuthSetup: failed to setup authentication");
2837  return -1;
2838  }
2839  if (IsMaster()) {
2840  fConfFile = wconf;
2841  fWorkDir.Form("~/%s", kPROOF_WorkDir);
2842  } else {
2843  if (fProtocol < 4) {
2844  fWorkDir.Form("~/%s", kPROOF_WorkDir);
2845  } else {
2846  fWorkDir = wconf;
2847  if (fWorkDir.IsNull()) fWorkDir.Form("~/%s", kPROOF_WorkDir);
2848  }
2849  }
2850  } else {
2851 
2852  // Receive some useful information
2853  TMessage *mess;
2854  if ((fSocket->Recv(mess) <= 0) || !mess) {
2855  Error("Setup", "failed to receive ordinal and config info");
2856  return -1;
2857  }
2858  if (IsMaster()) {
2859  (*mess) >> fUser >> fOrdinal >> fConfFile;
2860  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2861  } else {
2862  (*mess) >> fUser >> fOrdinal >> fWorkDir;
2863  if (fWorkDir.IsNull())
2864  fWorkDir = gEnv->GetValue("ProofServ.Sandbox", TString::Format("~/%s", kPROOF_WorkDir));
2865  }
2866  // Set the correct prefix
2867  if (fOrdinal != "-1")
2868  fPrefix += fOrdinal;
2869  TProofServLogHandler::SetDefaultPrefix(fPrefix);
2870  delete mess;
2871  }
2872 
2873  if (IsMaster()) {
2874 
2875  // strip off any prooftype directives
2876  TString conffile = fConfFile;
2877  conffile.Remove(0, 1 + conffile.Index(":"));
2878 
2879  // parse config file to find working directory
2880  TProofResourcesStatic resources(fConfDir, conffile);
2881  if (resources.IsValid()) {
2882  if (resources.GetMaster()) {
2883  TString tmpWorkDir = resources.GetMaster()->GetWorkDir();
2884  if (tmpWorkDir != "")
2885  fWorkDir = tmpWorkDir;
2886  }
2887  } else {
2888  Info("Setup", "invalid config file %s (missing or unreadable",
2889  resources.GetFileName().Data());
2890  }
2891  }
2892 
2893  // Set $HOME and $PATH. The HOME directory was already set to the
2894  // user's home directory by proofd.
2895  gSystem->Setenv("HOME", gSystem->HomeDirectory());
2896 
2897  // Add user name in case of non default workdir
2898  if (fWorkDir.BeginsWith("/") &&
2899  !fWorkDir.BeginsWith(gSystem->HomeDirectory())) {
2900  if (!fWorkDir.EndsWith("/"))
2901  fWorkDir += "/";
2902  UserGroup_t *u = gSystem->GetUserInfo();
2903  if (u) {
2904  fWorkDir += u->fUser;
2905  delete u;
2906  }
2907  }
2908 
2909  // Goto to the main PROOF working directory
2910  char *workdir = gSystem->ExpandPathName(fWorkDir.Data());
2911  fWorkDir = workdir;
2912  delete [] workdir;
2913  if (gProofDebugLevel > 0)
2914  Info("Setup", "working directory set to %s", fWorkDir.Data());
2915 
2916  // host first name
2917  TString host = gSystem->HostName();
2918  if (host.Index(".") != kNPOS)
2919  host.Remove(host.Index("."));
2920 
2921  // Session tag
2922  fSessionTag.Form("%s-%s-%ld-%d", fOrdinal.Data(), host.Data(),
2923  (Long_t)TTimeStamp().GetSec(),gSystem->GetPid());
2924  fTopSessionTag = fSessionTag;
2925 
2926  // create session directory and make it the working directory
2927  fSessionDir = fWorkDir;
2928  if (IsMaster())
2929  fSessionDir += "/master-";
2930  else
2931  fSessionDir += "/slave-";
2932  fSessionDir += fSessionTag;
2933 
2934  // Common setup
2935  if (SetupCommon() != 0) {
2936  Error("Setup", "common setup failed");
2937  return -1;
2938  }
2939 
2940  // Incoming OOB should generate a SIGURG
2941  fSocket->SetOption(kProcessGroup, gSystem->GetPid());
2942 
2943  // Send packets off immediately to reduce latency
2944  fSocket->SetOption(kNoDelay, 1);
2945 
2946  // Check every two hours if client is still alive
2947  fSocket->SetOption(kKeepAlive, 1);
2948 
2949  // Done
2950  return 0;
2951 }
2952 
2953 ////////////////////////////////////////////////////////////////////////////////
2954 /// Common part (between TProofServ and TXProofServ) of the setup phase.
2955 /// Return 0 on success, -1 on error
2956 
2957 Int_t TProofServ::SetupCommon()
2958 {
2959  // deny write access for group and world
2960  gSystem->Umask(022);
2961 
2962 #ifdef R__UNIX
2963  // Add bindir to PATH
2964  TString path(gSystem->Getenv("PATH"));
2965  TString bindir(TROOT::GetBinDir());
2966  // Augment PATH, if required
2967  // ^<compiler>, <compiler>, ^<sysbin>, <sysbin>
2968  TString paths = gEnv->GetValue("ProofServ.BinPaths", "");
2969  if (paths.Length() > 0) {
2970  Int_t icomp = 0;
2971  if (paths.Contains("^<compiler>"))
2972  icomp = 1;
2973  else if (paths.Contains("<compiler>"))
2974  icomp = -1;
2975  if (icomp != 0) {
2976 # ifdef COMPILER
2977  TString compiler = COMPILER;
2978  if (compiler.Index("is ") != kNPOS)
2979  compiler.Remove(0, compiler.Index("is ") + 3);
2980  compiler = gSystem->DirName(compiler);
2981  if (icomp == 1) {
2982  if (!bindir.IsNull()) bindir += ":";
2983  bindir += compiler;
2984  } else if (icomp == -1) {
2985  if (!path.IsNull()) path += ":";
2986  path += compiler;
2987  }
2988 #endif
2989  }
2990  Int_t isysb = 0;
2991  if (paths.Contains("^<sysbin>"))
2992  isysb = 1;
2993  else if (paths.Contains("<sysbin>"))
2994  isysb = -1;
2995  if (isysb != 0) {
2996  if (isysb == 1) {
2997  if (!bindir.IsNull()) bindir += ":";
2998  bindir += "/bin:/usr/bin:/usr/local/bin";
2999  } else if (isysb == -1) {
3000  if (!path.IsNull()) path += ":";
3001  path += "/bin:/usr/bin:/usr/local/bin";
3002  }
3003  }
3004  }
3005  // Final insert
3006  if (!bindir.IsNull()) bindir += ":";
3007  path.Insert(0, bindir);
3008  gSystem->Setenv("PATH", path);
3009 #endif
3010 
3011  if (gSystem->AccessPathName(fWorkDir)) {
3012  gSystem->mkdir(fWorkDir, kTRUE);
3013  if (!gSystem->ChangeDirectory(fWorkDir)) {
3014  Error("SetupCommon", "can not change to PROOF directory %s",
3015  fWorkDir.Data());
3016  return -1;
3017  }
3018  } else {
3019  if (!gSystem->ChangeDirectory(fWorkDir)) {
3020  gSystem->Unlink(fWorkDir);
3021  gSystem->mkdir(fWorkDir, kTRUE);
3022  if (!gSystem->ChangeDirectory(fWorkDir)) {
3023  Error("SetupCommon", "can not change to PROOF directory %s",
3024  fWorkDir.Data());
3025  return -1;
3026  }
3027  }
3028  }
3029 
3030  // Set group
3031  fGroup = gEnv->GetValue("ProofServ.ProofGroup", "default");
3032 
3033  // Check and make sure "cache" directory exists
3034  fCacheDir = gEnv->GetValue("ProofServ.CacheDir",
3035  TString::Format("%s/%s", fWorkDir.Data(), kPROOF_CacheDir));
3036  ResolveKeywords(fCacheDir);
3037  if (gSystem->AccessPathName(fCacheDir))
3038  gSystem->mkdir(fCacheDir, kTRUE);
3039  if (gProofDebugLevel > 0)
3040  Info("SetupCommon", "cache directory set to %s", fCacheDir.Data());
3041  fCacheLock =
3042  new TProofLockPath(TString::Format("%s/%s%s",
3043  gSystem->TempDirectory(), kPROOF_CacheLockFile,
3044  TString(fCacheDir).ReplaceAll("/","%").Data()));
3045  // Make also sure the cache path is in the macro path
3046  TProof::AssertMacroPath(TString::Format("%s/.", fCacheDir.Data()));
3047 
3048  // Check and make sure "packages" directory exists
3049  TString packdir = gEnv->GetValue("ProofServ.PackageDir",
3050  TString::Format("%s/%s", fWorkDir.Data(), kPROOF_PackDir));
3051  ResolveKeywords(packdir);
3052  if (gSystem->AccessPathName(packdir))
3053  gSystem->mkdir(packdir, kTRUE);
3054  fPackMgr = new TPackMgr(packdir);
3055  fPackMgr->SetLogger(SendAsynMsg);
3056  // Notification message
3057  TString noth;
3058  const char *k = (IsMaster()) ? "Mst" : "Wrk";
3059  noth.Form("%s-%s", k, fOrdinal.Data());
3060  fPackMgr->SetPrefix(noth.Data());
3061  if (gProofDebugLevel > 0)
3062  Info("SetupCommon", "package directory set to %s", packdir.Data());
3063 
3064  // Check and make sure "data" directory exists
3065  fDataDir = gEnv->GetValue("ProofServ.DataDir","");
3066  Ssiz_t isep = kNPOS;
3067  if (fDataDir.IsNull()) {
3068  // Use default
3069  fDataDir.Form("%s/%s/<ord>/<stag>", fWorkDir.Data(), kPROOF_DataDir);
3070  } else if ((isep = fDataDir.Last(' ')) != kNPOS) {
3071  fDataDirOpts = fDataDir(isep + 1, fDataDir.Length());
3072  fDataDir.Remove(isep);
3073  }
3074  ResolveKeywords(fDataDir);
3075  if (gSystem->AccessPathName(fDataDir))
3076  if (gSystem->mkdir(fDataDir, kTRUE) != 0) {
3077  Warning("SetupCommon", "problems creating path '%s' (errno: %d)",
3078  fDataDir.Data(), TSystem::GetErrno());
3079  }
3080  if (gProofDebugLevel > 0)
3081  Info("SetupCommon", "data directory set to %s", fDataDir.Data());
3082 
3083  // Check and apply possible options
3084  // (see http://root.cern.ch/drupal/content/configuration-reference-guide#datadir)
3085  TString dataDirOpts = gEnv->GetValue("ProofServ.DataDirOpts","");
3086  if (!dataDirOpts.IsNull()) {
3087  // Do they apply to this server type
3088  Bool_t doit = kTRUE;
3089  if ((IsMaster() && !dataDirOpts.Contains("M")) ||
3090  (!IsMaster() && !dataDirOpts.Contains("W"))) doit = kFALSE;
3091  if (doit) {
3092  // Get the wanted mode
3093  UInt_t m = 0755;
3094  if (dataDirOpts.Contains("g")) m = 0775;
3095  if (dataDirOpts.Contains("a") || dataDirOpts.Contains("o")) m = 0777;
3096  if (gProofDebugLevel > 0)
3097  Info("SetupCommon", "requested mode for data directories is '%o'", m);
3098  // Loop over paths
3099  FileStat_t st;
3100  TString p, subp;
3101  Int_t from = 0;
3102  if (fDataDir.BeginsWith("/")) p = "/";
3103  while (fDataDir.Tokenize(subp, from, "/")) {
3104  if (subp.IsNull()) continue;
3105  p += subp;
3106  if (gSystem->GetPathInfo(p, st) == 0) {
3107  if (st.fUid == (Int_t) gSystem->GetUid() && st.fGid == (Int_t) gSystem->GetGid()) {
3108  if (gSystem->Chmod(p.Data(), m) != 0) {
3109  Warning("SetupCommon", "problems setting mode '%o' on path '%s' (errno: %d)",
3110  m, p.Data(), TSystem::GetErrno());
3111  break;
3112  }
3113  }
3114  p += "/";
3115  } else {
3116  Warning("SetupCommon", "problems stat-ing path '%s' (errno: %d; datadir: %s)",
3117  p.Data(), TSystem::GetErrno(), fDataDir.Data());
3118  break;
3119  }
3120  }
3121  }
3122  }
3123 
3124  // List of directories where to look for global packages
3125  TString globpack = gEnv->GetValue("Proof.GlobalPackageDirs","");
3126 
3127  ResolveKeywords(globpack);
3128  Int_t nglb = TPackMgr::RegisterGlobalPath(globpack);
3129  Info("SetupCommon", " %d global package directories registered", nglb);
3130  FlushLogFile();
3131 
3132  // Check the session dir
3133  if (fSessionDir != gSystem->WorkingDirectory()) {
3134  ResolveKeywords(fSessionDir);
3135  if (gSystem->AccessPathName(fSessionDir))
3136  gSystem->mkdir(fSessionDir, kTRUE);
3137  if (!gSystem->ChangeDirectory(fSessionDir)) {
3138  Error("SetupCommon", "can not change to working directory '%s'",
3139  fSessionDir.Data());
3140  return -1;
3141  }
3142  }
3143  gSystem->Setenv("PROOF_SANDBOX", fSessionDir);
3144  if (gProofDebugLevel > 0)
3145  Info("SetupCommon", "session dir is '%s'", fSessionDir.Data());
3146 
3147  // On masters, check and make sure that "queries" and "datasets"
3148  // directories exist
3149  if (IsMaster()) {
3150 
3151  // Make sure that the 'queries' dir exist
3152  fQueryDir = fWorkDir;
3153  fQueryDir += TString("/") + kPROOF_QueryDir;
3154  ResolveKeywords(fQueryDir);
3155  if (gSystem->AccessPathName(fQueryDir))
3156  gSystem->mkdir(fQueryDir, kTRUE);
3157  fQueryDir += TString("/session-") + fTopSessionTag;
3158  if (gSystem->AccessPathName(fQueryDir))
3159  gSystem->mkdir(fQueryDir, kTRUE);
3160  if (gProofDebugLevel > 0)
3161  Info("SetupCommon", "queries dir is %s", fQueryDir.Data());
3162 
3163  // Create 'queries' locker instance and lock it
3164  fQueryLock = new TProofLockPath(TString::Format("%s/%s%s-%s",
3165  gSystem->TempDirectory(),
3166  kPROOF_QueryLockFile, fSessionTag.Data(),
3167  TString(fQueryDir).ReplaceAll("/","%").Data()));
3168  fQueryLock->Lock();
3169  // Create the query manager
3170  fQMgr = new TQueryResultManager(fQueryDir, fSessionTag, fSessionDir,
3171  fQueryLock, 0);
3172  }
3173 
3174  // Server image
3175  fImage = gEnv->GetValue("ProofServ.Image", "");
3176 
3177  // Get the group priority
3178  if (IsMaster()) {
3179  // Send session tag to client
3180  TMessage m(kPROOF_SESSIONTAG);
3181  m << fTopSessionTag << fGroup << fUser;
3182  fSocket->Send(m);
3183  // Group priority
3184  fGroupPriority = GetPriority();
3185  // Dataset manager instance via plug-in
3186  TPluginHandler *h = 0;
3187  TString dsms = gEnv->GetValue("Proof.DataSetManager", "");
3188  if (!dsms.IsNull()) {
3189  TString dsm;
3190  Int_t from = 0;
3191  while (dsms.Tokenize(dsm, from, ",")) {
3192  if (fDataSetManager && !fDataSetManager->TestBit(TObject::kInvalidObject)) {
3193  Warning("SetupCommon", "a valid dataset manager already initialized");
3194  Warning("SetupCommon", "support for multiple managers not yet available");
3195  break;
3196  }
3197  // Get plugin manager to load the appropriate TDataSetManager
3198  if (gROOT->GetPluginManager()) {
3199  // Find the appropriate handler
3200  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", dsm);
3201  if (h && h->LoadPlugin() != -1) {
3202  // make instance of the dataset manager
3203  fDataSetManager =
3204  reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3, fGroup.Data(),
3205  fUser.Data(), dsm.Data()));
3206  }
3207  }
3208  }
3209  // Check the result of the dataset manager initialization
3210  if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
3211  Warning("SetupCommon", "dataset manager plug-in initialization failed");
3212  SendAsynMessage("TXProofServ::SetupCommon: dataset manager plug-in initialization failed");
3213  SafeDelete(fDataSetManager);
3214  }
3215  } else {
3216  // Initialize the default dataset manager
3217  TString opts("Av:");
3218  TString dsetdir = gEnv->GetValue("ProofServ.DataSetDir", "");
3219  if (dsetdir.IsNull()) {
3220  // Use the default in the sandbox
3221  dsetdir.Form("%s/%s", fWorkDir.Data(), kPROOF_DataSetDir);
3222  if (gSystem->AccessPathName(fDataSetDir))
3223  gSystem->MakeDirectory(fDataSetDir);
3224  opts += "Sb:";
3225  }
3226  // Find the appropriate handler
3227  if (!h) {
3228  h = gROOT->GetPluginManager()->FindHandler("TDataSetManager", "file");
3229  if (h && h->LoadPlugin() == -1) h = 0;
3230  }
3231  if (h) {
3232  // make instance of the dataset manager
3233  TString oo = TString::Format("dir:%s opt:%s", dsetdir.Data(), opts.Data());
3234  fDataSetManager = reinterpret_cast<TDataSetManager*>(h->ExecPlugin(3,
3235  fGroup.Data(), fUser.Data(), oo.Data()));
3236  }
3237  if (fDataSetManager && fDataSetManager->TestBit(TObject::kInvalidObject)) {
3238  Warning("SetupCommon", "default dataset manager plug-in initialization failed");
3239  SafeDelete(fDataSetManager);
3240  }
3241  }
3242  // Dataset manager for staging requests
3243  TString dsReqCfg = gEnv->GetValue("Proof.DataSetStagingRequests", "");
3244  if (!dsReqCfg.IsNull()) {
3245  TPMERegexp reReqDir("(^| )(dir:)?([^ ]+)( |$)");
3246 
3247  if (reReqDir.Match(dsReqCfg) == 5) {
3248  TString dsDirFmt;
3249  dsDirFmt.Form("dir:%s perms:open", reReqDir[3].Data());
3250  fDataSetStgRepo = new TDataSetManagerFile("_stage_", "_stage_",
3251  dsDirFmt);
3252  if (fDataSetStgRepo &&
3253  fDataSetStgRepo->TestBit(TObject::kInvalidObject)) {
3254  Warning("SetupCommon",
3255  "failed init of dataset staging requests repository");
3256  SafeDelete(fDataSetStgRepo);
3257  }
3258  } else {
3259  Warning("SetupCommon",
3260  "specify, with [dir:]<path>, a valid path for staging requests");
3261  }
3262  } else if (gProofDebugLevel > 0) {
3263  Warning("SetupCommon", "no repository for staging requests available");
3264  }
3265  }
3266 
3267  // Quotas
3268  TString quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotas.%s", fUser.Data()),"");
3269  if (quotas.IsNull())
3270  quotas = gEnv->GetValue(TString::Format("ProofServ.UserQuotasByGroup.%s", fGroup.Data()),"");
3271  if (quotas.IsNull())
3272  quotas = gEnv->GetValue("ProofServ.UserQuotas", "");
3273  if (!quotas.IsNull()) {
3274  // Parse it; format ("maxquerykept=10 hwmsz=800m maxsz=1g")
3275  TString tok;
3276  Ssiz_t from = 0;
3277  while (quotas.Tokenize(tok, from, " ")) {
3278  // Set max number of query results to keep
3279  if (tok.BeginsWith("maxquerykept=")) {
3280  tok.ReplaceAll("maxquerykept=","");
3281  if (tok.IsDigit())
3282  fMaxQueries = tok.Atoi();
3283  else
3284  Info("SetupCommon",
3285  "parsing 'maxquerykept' :ignoring token %s : not a digit", tok.Data());
3286  }
3287  // Set High-Water-Mark or max on the sandbox size
3288  const char *ksz[2] = {"hwmsz=", "maxsz="};
3289  for (Int_t j = 0; j < 2; j++) {
3290  if (tok.BeginsWith(ksz[j])) {
3291  tok.ReplaceAll(ksz[j],"");
3292  Long64_t fact = -1;
3293  if (!tok.IsDigit()) {
3294  // Parse (k, m, g)
3295  tok.ToLower();
3296  const char *s[3] = {"k", "m", "g"};
3297  Int_t i = 0, ki = 1024;
3298  while (fact < 0) {
3299  if (tok.EndsWith(s[i++]))
3300  fact = ki;
3301  else
3302  ki *= 1024;
3303  }
3304  tok.Remove(tok.Length()-1);
3305  }
3306  if (tok.IsDigit()) {
3307  if (j == 0)
3308  fHWMBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3309  else
3310  fMaxBoxSize = (fact > 0) ? tok.Atoi() * fact : tok.Atoi();
3311  } else {
3312  TString ssz(ksz[j], strlen(ksz[j])-1);
3313  Info("SetupCommon", "parsing '%s' : ignoring token %s", ssz.Data(), tok.Data());
3314  }
3315  }
3316  }
3317  }
3318  }
3319 
3320  // Apply quotas, if any
3321  if (IsMaster() && fQMgr)
3322  if (fQMgr->ApplyMaxQueries(fMaxQueries) != 0)
3323  Warning("SetupCommon", "problems applying fMaxQueries");
3324 
3325  // Send "ROOTversion|ArchCompiler" flag
3326  if (fProtocol > 12) {
3327  TString vac = gROOT->GetVersion();
3328  vac += TString::Format(":%s", gROOT->GetGitCommit());
3329  TString rtag = gEnv->GetValue("ProofServ.RootVersionTag", "");
3330  if (rtag.Length() > 0)
3331  vac += TString::Format(":%s", rtag.Data());
3332  vac += TString::Format("|%s-%s",gSystem->GetBuildArch(), gSystem->GetBuildCompilerVersion());
3333  TMessage m(kPROOF_VERSARCHCOMP);
3334  m << vac;
3335  fSocket->Send(m);
3336  }
3337 
3338  // Set user vars in TProof
3339  TString all_vars(gSystem->Getenv("PROOF_ALLVARS"));
3340  TString name;
3341  Int_t from = 0;
3342  while (all_vars.Tokenize(name, from, ",")) {
3343  if (!name.IsNull()) {
3344  TString value = gSystem->Getenv(name);
3345  TProof::AddEnvVar(name, value);
3346  }
3347  }
3348 
3349  if (fgLogToSysLog > 0) {
3350  // Set the syslog entity (all the information is available now)
3351  if (!(fUser.IsNull()) && !(fGroup.IsNull())) {
3352  fgSysLogEntity.Form("%s:%s", fUser.Data(), fGroup.Data());
3353  } else if (!(fUser.IsNull()) && fGroup.IsNull()) {
3354  fgSysLogEntity.Form("%s:default", fUser.Data());
3355  } else if (fUser.IsNull() && !(fGroup.IsNull())) {
3356  fgSysLogEntity.Form("undef:%s", fGroup.Data());
3357  }
3358  // Log the beginning of this session
3359  TString s;
3360  s.Form("%s 0 %.3f %.3f", fgSysLogEntity.Data(), fRealTime, fCpuTime);
3361  gSystem->Syslog(kLogNotice, s.Data());
3362  }
3363 
3364  if (gProofDebugLevel > 0)
3365  Info("SetupCommon", "successfully completed");
3366 
3367  // Done
3368  return 0;
3369 }
3370 
3371 ////////////////////////////////////////////////////////////////////////////////
3372 /// Terminate the proof server.
3373 
3374 void TProofServ::Terminate(Int_t status)
3375 {
3376  if (fgLogToSysLog > 0) {
3377  TString s;
3378  s.Form("%s -1 %.3f %.3f %d", fgSysLogEntity.Data(), fRealTime, fCpuTime, status);
3379  gSystem->Syslog(kLogNotice, s.Data());
3380  }
3381 
3382  // Notify the memory footprint
3383  ProcInfo_t pi;
3384  if (!gSystem->GetProcInfo(&pi)){
3385  Info("Terminate", "process memory footprint: %ld/%ld kB virtual, %ld/%ld kB resident ",
3386  pi.fMemVirtual, fgVirtMemMax, pi.fMemResident, fgResMemMax);
3387  }
3388 
3389  // Cleanup session directory
3390  if (status == 0) {
3391  // make sure we remain in a "connected" directory
3392  gSystem->ChangeDirectory("/");
3393  // needed in case fSessionDir is on NFS ?!
3394  gSystem->MakeDirectory(fSessionDir+"/.delete");
3395  gSystem->Exec(TString::Format("%s %s", kRM, fSessionDir.Data()));
3396  }
3397 
3398  // Cleanup queries directory if empty
3399  if (IsMaster()) {
3400  if (!(fQMgr && fQMgr->Queries() && fQMgr->Queries()->GetSize())) {
3401  // make sure we remain in a "connected" directory
3402  gSystem->ChangeDirectory("/");
3403  // needed in case fQueryDir is on NFS ?!
3404  gSystem->MakeDirectory(fQueryDir+"/.delete");
3405  gSystem->Exec(TString::Format("%s %s", kRM, fQueryDir.Data()));
3406  // Remove lock file
3407  if (fQueryLock)
3408  gSystem->Unlink(fQueryLock->GetName());
3409  }
3410 
3411  // Unlock the query dir owned by this session
3412  if (fQueryLock)
3413  fQueryLock->Unlock();
3414  }
3415 
3416  // Cleanup data directory if empty
3417  if (!fDataDir.IsNull() && !gSystem->AccessPathName(fDataDir, kWritePermission)) {
3418  if (UnlinkDataDir(fDataDir))
3419  Info("Terminate", "data directory '%s' has been removed", fDataDir.Data());
3420  }
3421 
3422  // Remove input handler to avoid spurious signals in socket
3423  // selection for closing activities executed upon exit()
3424  TIter next(gSystem->GetListOfFileHandlers());
3425  TObject *fh = 0;
3426  while ((fh = next())) {
3427  TProofServInputHandler *ih = dynamic_cast<TProofServInputHandler *>(fh);
3428  if (ih)
3429  gSystem->RemoveFileHandler(ih);
3430  }
3431 
3432  // Stop processing events
3433  gSystem->ExitLoop();
3434 
3435  // Exit() is called in pmain
3436 }
3437 
3438 ////////////////////////////////////////////////////////////////////////////////
3439 /// Scan recursively the datadir and unlink it if empty
3440 /// Return kTRUE if it can be unlinked, kFALSE otherwise
3441 
3442 Bool_t TProofServ::UnlinkDataDir(const char *path)
3443 {
3444  if (!path || strlen(path) <= 0) return kFALSE;
3445 
3446  Bool_t dorm = kTRUE;
3447  void *dirp = gSystem->OpenDirectory(path);
3448  if (dirp) {
3449  TString fpath;
3450  const char *ent = 0;
3451  while (dorm && (ent = gSystem->GetDirEntry(dirp))) {
3452  if (!strcmp(ent, ".") || !strcmp(ent, "..")) continue;
3453  fpath.Form("%s/%s", path, ent);
3454  FileStat_t st;
3455  if (gSystem->GetPathInfo(fpath, st) == 0 && R_ISDIR(st.fMode)) {
3456  dorm = UnlinkDataDir(fpath);
3457  } else {
3458  dorm = kFALSE;
3459  }
3460  }
3461  // Close the directory
3462  gSystem->FreeDirectory(dirp);
3463  } else {
3464  // Cannot open the directory
3465  dorm = kFALSE;
3466  }
3467 
3468  // Do remove, if required
3469  if (dorm && gSystem->Unlink(path) != 0)
3470  Warning("UnlinkDataDir", "data directory '%s' is empty but could not be removed", path);
3471  // done
3472  return dorm;
3473 }
3474 
3475 ////////////////////////////////////////////////////////////////////////////////
3476 /// Static function that returns kTRUE in case we are a PROOF server.
3477 
3478 Bool_t TProofServ::IsActive()
3479 {
3480  return gProofServ ? kTRUE : kFALSE;
3481 }
3482 
3483 ////////////////////////////////////////////////////////////////////////////////
3484 /// Static function returning pointer to global object gProofServ.
3485 /// Mainly for use via CINT, where the gProofServ symbol might be
3486 /// deleted from the symbol table.
3487 
3488 TProofServ *TProofServ::This()
3489 {
3490  return gProofServ;
3491 }
3492 
3493 ////////////////////////////////////////////////////////////////////////////////
3494 /// Setup authentication related stuff for old versions.
3495 /// Provided for backward compatibility.
3496 
3497 Int_t TProofServ::OldAuthSetup(TString &conf)
3498 {
3499  OldProofServAuthSetup_t oldAuthSetupHook = 0;
3500 
3501  if (!oldAuthSetupHook) {
3502  // Load libraries needed for (server) authentication ...
3503  TString authlib = "libRootAuth";
3504  char *p = 0;
3505  // The generic one
3506  if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
3507  delete[] p;
3508  if (gSystem->Load(authlib) == -1) {
3509  Error("OldAuthSetup", "can't load %s",authlib.Data());
3510  return kFALSE;
3511  }
3512  } else {
3513  Error("OldAuthSetup", "can't locate %s",authlib.Data());
3514  return -1;
3515  }
3516  //
3517  // Locate OldProofServAuthSetup
3518  Func_t f = gSystem->DynFindSymbol(authlib,"OldProofServAuthSetup");
3519  if (f)
3520  oldAuthSetupHook = (OldProofServAuthSetup_t)(f);
3521  else {
3522  Error("OldAuthSetup", "can't find OldProofServAuthSetup");
3523  return -1;
3524  }
3525  }
3526  //
3527  // Setup
3528  return (*oldAuthSetupHook)(fSocket, IsMaster(), fProtocol,
3529  fUser, fOrdinal, conf);
3530 }
3531 
3532 ////////////////////////////////////////////////////////////////////////////////
3533 /// Create a TProofQueryResult instance for this query.
3534 
3535 TProofQueryResult *TProofServ::MakeQueryResult(Long64_t nent,
3536  const char *opt,
3537  TList *inlist, Long64_t fst,
3538  TDSet *dset, const char *selec,
3539  TObject *elist)
3540 {
3541  // Increment sequential number
3542  Int_t seqnum = -1;
3543  if (fQMgr) {
3544  fQMgr->IncrementSeqNum();
3545  seqnum = fQMgr->SeqNum();
3546  }
3547 
3548  // Locally we always use the current streamer
3549  Bool_t olds = (dset && dset->TestBit(TDSet::kWriteV3)) ? kTRUE : kFALSE;
3550  if (olds)
3551  dset->SetWriteV3(kFALSE);
3552 
3553  // Create the instance and add it to the list
3554  TProofQueryResult *pqr = new TProofQueryResult(seqnum, opt, inlist, nent,
3555  fst, dset, selec, elist);
3556  // Title is the session identifier
3557  pqr->SetTitle(gSystem->BaseName(fQueryDir));
3558 
3559  // Restore old streamer info
3560  if (olds)
3561  dset->SetWriteV3(kTRUE);
3562 
3563  return pqr;
3564 }
3565 
3566 ////////////////////////////////////////////////////////////////////////////////
3567 /// Set query in running state.
3568 
3569 void TProofServ::SetQueryRunning(TProofQueryResult *pq)
3570 {
3571  // Record current position in the log file at start
3572  fflush(stdout);
3573  Int_t startlog = lseek(fileno(stdout), (off_t) 0, SEEK_END);
3574 
3575  // Add some header to logs
3576  Printf(" ");
3577  Info("SetQueryRunning", "starting query: %d", pq->GetSeqNum());
3578 
3579  // Build the list of loaded PAR packages
3580  TString parlist = "";
3581  fPackMgr->GetEnabledPackages(parlist);
3582 
3583  if (fProof) {
3584  // Set in running state
3585  pq->SetRunning(startlog, parlist, fProof->GetParallel());
3586 
3587  // Bytes and CPU at start (we will calculate the differential at end)
3588  pq->SetProcessInfo(pq->GetEntries(),
3589  fProof->GetCpuTime(), fProof->GetBytesRead());
3590  } else {
3591  // Set in running state
3592  pq->SetRunning(startlog, parlist, -1);
3593 
3594  // Bytes and CPU at start (we will calculate the differential at end)
3595  pq->SetProcessInfo(pq->GetEntries(), float(0.), 0);
3596  }
3597 }
3598 
3599 ////////////////////////////////////////////////////////////////////////////////
3600 /// Handle archive request.
3601 
3602 void TProofServ::HandleArchive(TMessage *mess, TString *slb)
3603 {
3604  PDB(kGlobal, 1)
3605  Info("HandleArchive", "Enter");
3606 
3607  TString queryref;
3608  TString path;
3609  (*mess) >> queryref >> path;
3610 
3611  if (slb) slb->Form("%s %s", queryref.Data(), path.Data());
3612 
3613  // If this is a set default action just save the default
3614  if (queryref == "Default") {
3615  fArchivePath = path;
3616  Info("HandleArchive",
3617  "default path set to %s", fArchivePath.Data());
3618  return;
3619  }
3620 
3621  Int_t qry = -1;
3622  TString qdir;
3623  TProofQueryResult *pqr = fQMgr ? fQMgr->LocateQuery(queryref, qry, qdir) : 0;
3624  TProofQueryResult *pqm = pqr;
3625 
3626  if (path.Length() <= 0) {
3627  if (fArchivePath.Length() <= 0) {
3628  Info("HandleArchive",
3629  "archive paths are not defined - do nothing");
3630  return;
3631  }
3632  if (qry > 0) {
3633  path.Form("%s/session-%s-%d.root",
3634  fArchivePath.Data(), fTopSessionTag.Data(), qry);
3635  } else {
3636  path = queryref;
3637  path.ReplaceAll(":q","-");
3638  path.Insert(0, TString::Format("%s/",fArchivePath.Data()));
3639  path += ".root";
3640  }
3641  }
3642 
3643  // Build file name for specific query
3644  if (!pqr || qry < 0) {
3645  TString fout = qdir;
3646  fout += "/query-result.root";
3647 
3648  TFile *f = TFile::Open(fout,"READ");
3649  pqr = 0;
3650  if (f) {
3651  f->ReadKeys();
3652  TIter nxk(f->GetListOfKeys());
3653  TKey *k = 0;
3654  while ((k = (TKey *)nxk())) {
3655  if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
3656  pqr = (TProofQueryResult *) f->Get(k->GetName());
3657  if (pqr)
3658  break;
3659  }
3660  }
3661  f->Close();
3662  delete f;
3663  } else {
3664  Info("HandleArchive",
3665  "file cannot be open (%s)",fout.Data());
3666  return;
3667  }
3668  }
3669 
3670  if (pqr) {
3671 
3672  PDB(kGlobal, 1) Info("HandleArchive",
3673  "archive path for query #%d: %s",
3674  qry, path.Data());
3675  TFile *farc = 0;
3676  if (gSystem->AccessPathName(path))
3677  farc = TFile::Open(path,"NEW");
3678  else
3679  farc = TFile::Open(path,"UPDATE");
3680  if (!farc || !(farc->IsOpen())) {
3681  Info("HandleArchive",
3682  "archive file cannot be open (%s)",path.Data());
3683  return;
3684  }
3685  farc->cd();
3686 
3687  // Update query status
3688  pqr->SetArchived(path);
3689  if (pqm)
3690  pqm->SetArchived(path);
3691 
3692  // Write to file
3693  pqr->Write();
3694 
3695  // Update temporary files too
3696  if (qry > -1 && fQMgr)
3697  fQMgr->SaveQuery(pqr);
3698 
3699  // Notify
3700  Info("HandleArchive",
3701  "results of query %s archived to file %s",
3702  queryref.Data(), path.Data());
3703  }
3704 
3705  // Done
3706  return;
3707 }
3708 
3709 ////////////////////////////////////////////////////////////////////////////////
3710 /// Get a map {server-name, list-of-files} for collection 'fc' to be used in
3711 /// TPacketizerFile. Returns a pointer to the map (ownership of the caller).
3712 /// Or (TMap *)0 and an error message in emsg.
3713 
3714 TMap *TProofServ::GetDataSetNodeMap(TFileCollection *fc, TString &emsg)
3715 {
3716  TMap *fcmap = 0;
3717  emsg = "";
3718 
3719  // Sanity checks
3720  if (!fc) {
3721  emsg.Form("file collection undefined!");
3722  return fcmap;
3723  }
3724 
3725  // Prepare data set map
3726  fcmap = new TMap();
3727 
3728  TIter nxf(fc->GetList());
3729  TFileInfo *fiind = 0;
3730  TString key;
3731  while ((fiind = (TFileInfo *)nxf())) {
3732  TUrl *xurl = fiind->GetCurrentUrl();
3733  // Find the key for this server
3734  key.Form("%s://%s", xurl->GetProtocol(), xurl->GetHostFQDN());
3735  if (xurl->GetPort() > 0)
3736  key += TString::Format(":%d", xurl->GetPort());
3737  // Get the map entry for this key
3738  TPair *ent = 0;
3739  THashList* l = 0;
3740  if ((ent = (TPair *) fcmap->FindObject(key.Data()))) {
3741  // Attach to the list
3742  l = (THashList *) ent->Value();
3743  } else {
3744  // Create list
3745  l = new THashList;
3746  l->SetOwner(kTRUE);
3747  // Add it to the map
3748  fcmap->Add(new TObjString(key.Data()), l);
3749  }
3750  // Add fileinfo with index to list
3751  l->Add(fiind);
3752  }
3753 
3754  // Done
3755  return fcmap;
3756 }
3757 
3758 ////////////////////////////////////////////////////////////////////////////////
3759 /// Handle processing request.
3760 
3761 void TProofServ::HandleProcess(TMessage *mess, TString *slb)
3762 {
3763  PDB(kGlobal, 1)
3764  Info("HandleProcess", "Enter");
3765 
3766  // Nothing to do for slaves if we are not idle
3767  if (!IsTopMaster() && !IsIdle())
3768  return;
3769 
3770  TDSet *dset;
3771  TString filename, opt;
3772  TList *input;
3773  Long64_t nentries, first;
3774  TEventList *evl = 0;
3775  TEntryList *enl = 0;
3776  Bool_t sync;
3777 
3778  (*mess) >> dset >> filename >> input >> opt >> nentries >> first >> evl >> sync;
3779  // Get entry list information, if any (support started with fProtocol == 15)
3780  if ((mess->BufferSize() > mess->Length()) && fProtocol > 14)
3781  (*mess) >> enl;
3782  Bool_t hasNoData = (!dset || dset->TestBit(TDSet::kEmpty)) ? kTRUE : kFALSE;
3783 
3784  // Priority to the entry list
3785  TObject *elist = (enl) ? (TObject *)enl : (TObject *)evl;
3786  if (enl && evl)
3787  // Cannot specify both at the same time
3788  SafeDelete(evl);
3789  if ((!hasNoData) && elist)
3790  dset->SetEntryList(elist);
3791 
3792  if (IsTopMaster()) {
3793 
3794  TString emsg;
3795  // Make sure the dataset contains the information needed
3796  if ((!hasNoData) && dset->GetListOfElements()->GetSize() == 0) {
3797  if (TProof::AssertDataSet(dset, input, fDataSetManager, emsg) != 0) {
3798  SendAsynMessage(TString::Format("AssertDataSet on %s: %s",
3799  fPrefix.Data(), emsg.Data()));
3800  Error("HandleProcess", "AssertDataSet: %s", emsg.Data());
3801  // To terminate collection
3802  if (sync) SendLogFile();
3803  return;
3804  }
3805  } else if (hasNoData) {
3806  // Check if we are required to process with TPacketizerFile a registered dataset
3807  TNamed *ftp = dynamic_cast<TNamed *>(input->FindObject("PROOF_FilesToProcess"));
3808  if (ftp) {
3809  TString dsn(ftp->GetTitle());
3810  if (!dsn.Contains(":") || dsn.BeginsWith("dataset:")) {
3811  dsn.ReplaceAll("dataset:", "");
3812  // Get the map for TPacketizerFile
3813  // Make sure we have something in input and a dataset manager
3814  if (!fDataSetManager) {
3815  emsg.Form("dataset manager not initialized!");
3816  } else {
3817  TFileCollection *fc = 0;
3818  // Get the dataset
3819  if (!(fc = fDataSetManager->GetDataSet(dsn))) {
3820  emsg.Form("requested dataset '%s' does not exists", dsn.Data());
3821  } else {
3822  TMap *fcmap = GetDataSetNodeMap(fc, emsg);
3823  if (fcmap) {
3824  input->Remove(ftp);
3825  delete ftp;
3826  fcmap->SetOwner(kTRUE);
3827  fcmap->SetName("PROOF_FilesToProcess");
3828  input->Add(fcmap);
3829  }
3830  }
3831  }
3832  if (!emsg.IsNull()) {
3833  SendAsynMessage(TString::Format("HandleProcess on %s: %s",
3834  fPrefix.Data(), emsg.Data()));
3835  Error("HandleProcess", "%s", emsg.Data());
3836  // To terminate collection
3837  if (sync) SendLogFile();
3838  return;
3839  }
3840  }
3841  }
3842  }
3843 
3844  TProofQueryResult *pq = 0;
3845 
3846  // Create instance of query results; we set ownership of the input list
3847  // to the TQueryResult object, to avoid too many instantiations
3848  pq = MakeQueryResult(nentries, opt, 0, first, 0, filename, 0);
3849 
3850  // Prepare the input list and transfer it into the TQueryResult object
3851  if (dset) input->Add(dset);
3852  if (elist) input->Add(elist);
3853  pq->SetInputList(input, kTRUE);
3854 
3855  // Clear the list
3856  input->Clear("nodelete");
3857  SafeDelete(input);
3858 
3859  // Save input data, if any
3860  if (TProof::SaveInputData(pq, fCacheDir.Data(), emsg) != 0)
3861  Warning("HandleProcess", "could not save input data: %s", emsg.Data());
3862 
3863  // If not a draw action add the query to the main list
3864  if (!(pq->IsDraw())) {
3865  if (fQMgr) {
3866  if (fQMgr->Queries()) fQMgr->Queries()->Add(pq);
3867  // Also save it to queries dir
3868  fQMgr->SaveQuery(pq);
3869  }
3870  }
3871 
3872  // Add anyhow to the waiting lists
3873  QueueQuery(pq);
3874 
3875  // Call get Workers
3876  // if we are not idle the scheduler will just enqueue the query and
3877  // send a resume message later.
3878 
3879  Bool_t enqueued = kFALSE;
3880  Int_t pc = 0;
3881  // if the session does not have workers and is in the dynamic mode
3882  if (fProof->UseDynamicStartup()) {
3883  // get the a list of workers and start them
3884  TList* workerList = new TList();
3885  EQueryAction retVal = GetWorkers(workerList, pc);
3886  if (retVal == TProofServ::kQueryStop) {
3887  Error("HandleProcess", "error getting list of worker nodes");
3888  // To terminate collection
3889  if (sync) SendLogFile();
3890  return;
3891  } else if (retVal == TProofServ::kQueryEnqueued) {
3892  // change to an asynchronous query
3893  enqueued = kTRUE;
3894  Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3895  } else {
3896  Int_t ret = fProof->AddWorkers(workerList);
3897  if (ret < 0) {
3898  Error("HandleProcess", "Adding a list of worker nodes returned: %d",
3899  ret);
3900  // To terminate collection
3901  if (sync) SendLogFile();
3902  return;
3903  }
3904  }
3905  } else {
3906  EQueryAction retVal = GetWorkers(0, pc);
3907  if (retVal == TProofServ::kQueryStop) {
3908  Error("HandleProcess", "error getting list of worker nodes");
3909  // To terminate collection
3910  if (sync) SendLogFile();
3911  return;
3912  } else if (retVal == TProofServ::kQueryEnqueued) {
3913  // change to an asynchronous query
3914  enqueued = kTRUE;
3915  Info("HandleProcess", "query %d enqueued", pq->GetSeqNum());
3916  } else if (retVal != TProofServ::kQueryOK) {
3917  Error("HandleProcess", "unknown return value: %d", retVal);
3918  // To terminate collection
3919  if (sync) SendLogFile();
3920  return;
3921  }
3922  }
3923 
3924  // If the client submission was asynchronous, signal the submission of
3925  // the query and communicate the assigned sequential number for later
3926  // identification
3927  TMessage m(kPROOF_QUERYSUBMITTED);
3928  if (!sync || enqueued) {
3929  m << pq->GetSeqNum() << kFALSE;
3930  fSocket->Send(m);
3931  }
3932 
3933  // Nothing more to do if we are not idle
3934  if (!IsIdle()) {
3935  // Notify submission
3936  Info("HandleProcess",
3937  "query \"%s:%s\" submitted", pq->GetTitle(), pq->GetName());
3938  return;
3939  }
3940 
3941  // Process
3942  // in the static mode, if a session is enqueued it will be processed after current query
3943  // (there is no way to enqueue if idle).
3944  // in the dynamic mode we will process here only if the session was idle and got workers!
3945  Bool_t doprocess = kFALSE;
3946  while (WaitingQueries() > 0 && !enqueued) {
3947  doprocess = kTRUE;
3948  //
3949  ProcessNext(slb);
3950  // avoid processing async queries sent during processing in dyn mode
3951  if (fProof->UseDynamicStartup())
3952  enqueued = kTRUE;
3953 
3954  } // Loop on submitted queries
3955 
3956  // Set idle
3957  SetIdle(kTRUE);
3958 
3959  // Reset mergers
3960  fProof->ResetMergers();
3961 
3962  // kPROOF_SETIDLE sets the client to idle; in asynchronous mode clients monitor
3963  // TProof::IsIdle for to check the readiness of a query, so we need to send this
3964  // before to be sure thatn everything about a query is received by the client
3965  if (!sync) SendLogFile();
3966 
3967  // Signal the client that we are idle
3968  if (doprocess) {
3969  m.Reset(kPROOF_SETIDLE);
3970  Bool_t waiting = (WaitingQueries() > 0) ? kTRUE : kFALSE;
3971  m << waiting;
3972  fSocket->Send(m);
3973  }
3974 
3975  // In synchronous mode TProof::Collect is terminated by the reception of the
3976  // log file and subsequent submissions are controlled by TProof::IsIdle(), so
3977  // this must be last one to be sent
3978  if (sync) SendLogFile();
3979 
3980  // Set idle
3981  SetIdle(kTRUE);
3982 
3983  } else {
3984 
3985  // Reset compute stopwatch: we include all what done from now on
3986  fCompute.Reset();
3987  fCompute.Start();
3988 
3989  // Set not idle
3990  SetIdle(kFALSE);
3991 
3992  // Cleanup the player
3993  Bool_t deleteplayer = kTRUE;
3994  MakePlayer();
3995 
3996  // Setup data set
3997  if (dset && (dset->IsA() == TDSetProxy::Class()))
3998  ((TDSetProxy*)dset)->SetProofServ(this);
3999 
4000  // Get input data, if any
4001  TString emsg;
4002  if (TProof::GetInputData(input, fCacheDir.Data(), emsg) != 0)
4003  Warning("HandleProcess", "could not get input data: %s", emsg.Data());
4004 
4005  // Get query sequential number
4006  if (TProof::GetParameter(input, "PROOF_QuerySeqNum", fQuerySeqNum) != 0)
4007  Warning("HandleProcess", "could not get query sequential number!");
4008 
4009  // Make the ordinal number available in the selector
4010  TObject *nord = 0;
4011  while ((nord = input->FindObject("PROOF_Ordinal")))
4012  input->Remove(nord);
4013  input->Add(new TNamed("PROOF_Ordinal", GetOrdinal()));
4014 
4015  // Set input
4016  TIter next(input);
4017  TObject *o = 0;
4018  while ((o = next())) {
4019  PDB(kGlobal, 2) Info("HandleProcess", "adding: %s", o->GetName());
4020  fPlayer->AddInput(o);
4021  }
4022 
4023  // Check if a TSelector object is passed via input list
4024  TObject *obj = 0;
4025  TSelector *selector_obj = 0;
4026  TIter nxt(input);
4027  while ((obj = nxt())){
4028  if (obj->InheritsFrom("TSelector")) {
4029  selector_obj = (TSelector *) obj;
4030  filename = selector_obj->ClassName();
4031  Info("HandleProcess", "selector obj for '%s' found", selector_obj->ClassName());
4032  break;
4033  }
4034  }
4035 
4036  // Signal the master that we are starting processing
4037  fSocket->Send(kPROOF_STARTPROCESS);
4038 
4039  // Reset latency stopwatch
4040  fLatency.Reset();
4041  fSaveOutput.Reset();
4042 
4043  // Process
4044  PDB(kGlobal, 1) Info("HandleProcess", "calling %s::Process()", fPlayer->IsA()->GetName());
4045 
4046  if (selector_obj){
4047  Info("HandleProcess", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4048  fPlayer->Process(dset, selector_obj, opt, nentries, first);
4049  }
4050  else {
4051  Info("HandleProcess", "calling fPlayer->Process() with selector name: %s", filename.Data());
4052  fPlayer->Process(dset, filename, opt, nentries, first);
4053  }
4054 
4055  // Return number of events processed
4056  TMessage m(kPROOF_STOPPROCESS);
4057  Bool_t abort = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted) ? kFALSE : kTRUE;
4058  if (fProtocol > 18) {
4059  TProofProgressStatus* status =
4060  new TProofProgressStatus(fPlayer->GetEventsProcessed(),
4061  gPerfStats?gPerfStats->GetBytesRead():0);
4062  if (status)
4063  m << status << abort;
4064  if (slb)
4065  slb->Form("%d %lld %lld", fPlayer->GetExitStatus(),
4066  status->GetEntries(), status->GetBytesRead());
4067  SafeDelete(status);
4068  } else {
4069  m << fPlayer->GetEventsProcessed() << abort;
4070  if (slb)
4071  slb->Form("%d %lld -1", fPlayer->GetExitStatus(), fPlayer->GetEventsProcessed());
4072  }
4073 
4074  fSocket->Send(m);
4075  PDB(kGlobal, 2)
4076  Info("TProofServ::Handleprocess",
4077  "worker %s has finished processing with %d objects in output list",
4078  GetOrdinal(), fPlayer->GetOutputList()->GetEntries());
4079 
4080  // Cleanup the input data set info
4081  SafeDelete(dset);
4082  SafeDelete(enl);
4083  SafeDelete(evl);
4084 
4085  Bool_t outok = (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted &&
4086  fPlayer->GetOutputList()) ? kTRUE : kFALSE;
4087  if (outok) {
4088  // Check if in controlled output sending mode or submerging
4089  Int_t cso = 0;
4090  Bool_t isSubMerging = kFALSE;
4091 
4092  // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4093  Int_t nm = 0;
4094  if (TProof::GetParameter(input, "PROOF_UseMergers", nm) == 0) {
4095  isSubMerging = (nm >= 0) ? kTRUE : kFALSE;
4096  }
4097  if (!isSubMerging) {
4098  cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4099  if (TProof::GetParameter(input, "PROOF_ControlSendOutput", cso) != 0)
4100  cso = gEnv->GetValue("Proof.ControlSendOutput", 1);
4101  }
4102 
4103  if (cso > 0) {
4104 
4105  // Control output sending mode: wait for the master to ask for the objects.
4106  // Allows controls of memory usage on the master.
4107  TMessage msg(kPROOF_SENDOUTPUT);
4108  fSocket->Send(msg);
4109 
4110  // Set idle
4111  SetIdle(kTRUE);
4112 
4113  // Do not cleanup the player yet: it will be used in sending output activities
4114  deleteplayer = kFALSE;
4115 
4116  PDB(kGlobal, 1)
4117  Info("HandleProcess", "controlled mode: worker %s has finished,"
4118  " sizes sent to master", fOrdinal.Data());
4119  } else {
4120 
4121  // Check if we are in merging mode (i.e. parameter PROOF_UseMergers exists)
4122  if (TestBit(TProofServ::kHighMemory)) {
4123  if (isSubMerging)
4124  Info("HandleProcess", "submerging disabled because of high-memory case");
4125  isSubMerging = kFALSE;
4126  } else {
4127  PDB(kGlobal, 2) Info("HandleProcess", "merging mode check: %d", isSubMerging);
4128  }
4129 
4130  if (!IsMaster() && isSubMerging) {
4131  // Worker in merging mode.
4132  //----------------------------
4133  // First, it reports only the size of its output to the master
4134  // + port on which it can possibly accept outputs from other workers if it becomes a merger
4135  // Master will later tell it where it should send the output (either to the master or to some merger)
4136  // or if it should become a merger
4137 
4138  TMessage msg_osize(kPROOF_SUBMERGER);
4139  msg_osize << Int_t(TProof::kOutputSize);
4140  msg_osize << fPlayer->GetOutputList()->GetEntries();
4141 
4142  fMergingSocket = new TServerSocket(0);
4143  Int_t merge_port = 0;
4144  if (fMergingSocket) {
4145  PDB(kGlobal, 2)
4146  Info("HandleProcess", "possible port for merging connections: %d",
4147  fMergingSocket->GetLocalPort());
4148  merge_port = fMergingSocket->GetLocalPort();
4149  }
4150  msg_osize << merge_port;
4151  fSocket->Send(msg_osize);
4152 
4153  // Set idle
4154  SetIdle(kTRUE);
4155 
4156  // Do not cleanup the player yet: it will be used in sub-merging activities
4157  deleteplayer = kFALSE;
4158 
4159  PDB(kSubmerger, 2) Info("HandleProcess", "worker %s has finished", fOrdinal.Data());
4160 
4161  } else {
4162  // Sub-master OR worker not in merging mode
4163  // ---------------------------------------------
4164  PDB(kGlobal, 2) Info("HandleProcess", "sending result directly to master");
4165  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
4166  Warning("HandleProcess","problems sending output list");
4167 
4168  // Masters reset the mergers, if any
4169  if (IsMaster()) fProof->ResetMergers();
4170 
4171  // Signal the master that we are idle
4172  fSocket->Send(kPROOF_SETIDLE);
4173 
4174  // Set idle
4175  SetIdle(kTRUE);
4176 
4177  // Notify the user
4178  SendLogFile();
4179  }
4180 
4181 
4182 
4183  }
4184 
4185  } else {
4186  // No output list
4187  if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
4188  Warning("HandleProcess","the output list is empty!");
4189  if (SendResults(fSocket) != 0)
4190  Warning("HandleProcess", "problems sending output list");
4191 
4192  // Masters reset the mergers, if any
4193  if (IsMaster()) fProof->ResetMergers();
4194 
4195  // Signal the master that we are idle
4196  fSocket->Send(kPROOF_SETIDLE);
4197 
4198  // Set idle
4199  SetIdle(kTRUE);
4200 
4201  // Notify the user
4202  SendLogFile();
4203  }
4204 
4205  // Prevent from double-deleting in input
4206  TIter nex(input);
4207  while ((obj = nex())) {
4208  if (obj->InheritsFrom("TSelector")) input->Remove(obj);
4209  }
4210 
4211  // Make also sure the input list objects are deleted
4212  fPlayer->GetInputList()->SetOwner(0);
4213 
4214  // Remove possible inputs from a file and the file, if any
4215  TList *added = dynamic_cast<TList *>(input->FindObject("PROOF_InputObjsFromFile"));
4216  if (added) {
4217  if (added->GetSize() > 0) {
4218  // The file must be the last one
4219  TFile *f = dynamic_cast<TFile *>(added->Last());
4220  if (f) {
4221  added->Remove(f);
4222  TIter nxo(added);
4223  while ((o = nxo())) { input->Remove(o); }
4224  input->Remove(added);
4225  added->SetOwner(kFALSE);
4226  added->Clear();
4227  f->Close();
4228  delete f;
4229  }
4230  }
4231  SafeDelete(added);
4232  }
4233  input->SetOwner();
4234  SafeDelete(input);
4235 
4236  // Cleanup if required
4237  if (deleteplayer) DeletePlayer();
4238  }
4239 
4240  PDB(kGlobal, 1) Info("HandleProcess", "done");
4241 
4242  // Done
4243  return;
4244 }
4245 
4246 ////////////////////////////////////////////////////////////////////////////////
4247 /// Sends all objects from the given list to the specified socket
4248 
4249 Int_t TProofServ::SendResults(TSocket *sock, TList *outlist, TQueryResult *pq)
4250 {
4251  PDB(kOutput, 2) Info("SendResults", "enter");
4252 
4253  TString msg;
4254  if (fProtocol > 23 && outlist) {
4255  // Send objects in bunches of max fMsgSizeHWM bytes to optimize transfer
4256  // Objects are merged one-by-one by the client
4257  // Messages for objects
4258  TMessage mbuf(kPROOF_OUTPUTOBJECT);
4259  // Objects in the output list
4260  Int_t olsz = outlist->GetSize();
4261  if (IsTopMaster() && pq) {
4262  msg.Form("%s: merging output objects ... done ",
4263  fPrefix.Data());
4264  SendAsynMessage(msg.Data());
4265  // Message for the client
4266  msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4267  SendAsynMessage(msg.Data(), kFALSE);
4268  // Send light query info
4269  mbuf << (Int_t) 0;
4270  mbuf.WriteObject(pq);
4271  if (sock->Send(mbuf) < 0) return -1;
4272  }
4273  // Objects in the output list
4274  Int_t ns = 0, np = 0;
4275  TIter nxo(outlist);
4276  TObject *o = 0;
4277  Int_t totsz = 0, objsz = 0;
4278  mbuf.Reset();
4279  while ((o = nxo())) {
4280  if (mbuf.Length() > fMsgSizeHWM) {
4281  PDB(kOutput, 1)
4282  Info("SendResults",
4283  "message has %d bytes: limit of %lld bytes reached - sending ...",
4284  mbuf.Length(), fMsgSizeHWM);
4285  // Compress the message, if required; for these messages we do it already
4286  // here so we get the size; TXSocket does not do it twice.
4287  if (GetCompressionLevel() > 0) {
4288  mbuf.SetCompressionSettings(fCompressMsg);
4289  mbuf.Compress();
4290  objsz = mbuf.CompLength();
4291  } else {
4292  objsz = mbuf.Length();
4293  }
4294  totsz += objsz;
4295  if (IsTopMaster()) {
4296  msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4297  fPrefix.Data(), ns, olsz, objsz);
4298  SendAsynMessage(msg.Data(), kFALSE);
4299  }
4300  if (sock->Send(mbuf) < 0) return -1;
4301  // Reset the message
4302  mbuf.Reset();
4303  np = 0;
4304  }
4305  ns++;
4306  np++;
4307  mbuf << (Int_t) ((ns >= olsz) ? 2 : 1);
4308  mbuf << o;
4309  }
4310  if (np > 0) {
4311  // Compress the message, if required; for these messages we do it already
4312  // here so we get the size; TXSocket does not do it twice.
4313  if (GetCompressionLevel() > 0) {
4314  mbuf.SetCompressionSettings(fCompressMsg);
4315  mbuf.Compress();
4316  objsz = mbuf.CompLength();
4317  } else {
4318  objsz = mbuf.Length();
4319  }
4320  totsz += objsz;
4321  if (IsTopMaster()) {
4322  msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4323  fPrefix.Data(), ns, olsz, objsz);
4324  SendAsynMessage(msg.Data(), kFALSE);
4325  }
4326  if (sock->Send(mbuf) < 0) return -1;
4327  }
4328  if (IsTopMaster()) {
4329  // Send total size
4330  msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4331  fPrefix.Data(), olsz, totsz);
4332  SendAsynMessage(msg.Data());
4333  }
4334  } else if (fProtocol > 10 && outlist) {
4335 
4336  // Send objects one-by-one to optimize transfer and merging
4337  // Messages for objects
4338  TMessage mbuf(kPROOF_OUTPUTOBJECT);
4339  // Objects in the output list
4340  Int_t olsz = outlist->GetSize();
4341  if (IsTopMaster() && pq) {
4342  msg.Form("%s: merging output objects ... done ",
4343  fPrefix.Data());
4344  SendAsynMessage(msg.Data());
4345  // Message for the client
4346  msg.Form("%s: objects merged; sending output: %d objs", fPrefix.Data(), olsz);
4347  SendAsynMessage(msg.Data(), kFALSE);
4348  // Send light query info
4349  mbuf << (Int_t) 0;
4350  mbuf.WriteObject(pq);
4351  if (sock->Send(mbuf) < 0) return -1;
4352  }
4353 
4354  Int_t ns = 0;
4355  Int_t totsz = 0, objsz = 0;
4356  TIter nxo(fPlayer->GetOutputList());
4357  TObject *o = 0;
4358  while ((o = nxo())) {
4359  ns++;
4360  mbuf.Reset();
4361  Int_t type = (Int_t) ((ns >= olsz) ? 2 : 1);
4362  mbuf << type;
4363  mbuf.WriteObject(o);
4364  // Compress the message, if required; for these messages we do it already
4365  // here so we get the size; TXSocket does not do it twice.
4366  if (GetCompressionLevel() > 0) {
4367  mbuf.SetCompressionSettings(fCompressMsg);
4368  mbuf.Compress();
4369  objsz = mbuf.CompLength();
4370  } else {
4371  objsz = mbuf.Length();
4372  }
4373  totsz += objsz;
4374  if (IsTopMaster()) {
4375  msg.Form("%s: objects merged; sending obj %d/%d (%d bytes) ",
4376  fPrefix.Data(), ns, olsz, objsz);
4377  SendAsynMessage(msg.Data(), kFALSE);
4378  }
4379  if (sock->Send(mbuf) < 0) return -1;
4380  }
4381  // Total size
4382  if (IsTopMaster()) {
4383  // Send total size
4384  msg.Form("%s: grand total: sent %d objects, size: %d bytes ",
4385  fPrefix.Data(), olsz, totsz);
4386  SendAsynMessage(msg.Data());
4387  }
4388 
4389  } else if (IsTopMaster() && fProtocol > 6 && outlist) {
4390 
4391  // Buffer to be sent
4392  TMessage mbuf(kPROOF_OUTPUTLIST);
4393  mbuf.WriteObject(pq);
4394  // Sizes
4395  Int_t blen = mbuf.CompLength();
4396  Int_t olsz = outlist->GetSize();
4397  // Message for the client
4398  msg.Form("%s: sending output: %d objs, %d bytes", fPrefix.Data(), olsz, blen);
4399  SendAsynMessage(msg.Data(), kFALSE);
4400  if (sock->Send(mbuf) < 0) return -1;
4401 
4402  } else {
4403  if (outlist) {
4404  PDB(kGlobal, 2) Info("SendResults", "sending output list");
4405  } else {
4406  PDB(kGlobal, 2) Info("SendResults", "notifying failure or abort");
4407  }
4408  if (sock->SendObject(outlist, kPROOF_OUTPUTLIST) < 0) return -1;
4409  }
4410 
4411  PDB(kOutput,2) Info("SendResults", "done");
4412 
4413  // Done
4414  return 0;
4415 }
4416 
4417 ////////////////////////////////////////////////////////////////////////////////
4418 /// process the next query from the queue of submitted jobs.
4419 /// to be called on the top master only.
4420 
4421 void TProofServ::ProcessNext(TString *slb)
4422 {
4423  TDSet *dset = 0;
4424  TString filename, opt;
4425  TList *input = 0;
4426  Long64_t nentries = -1, first = 0;
4427 
4428  // TObject *elist = 0;
4429  TProofQueryResult *pq = 0;
4430 
4431  TObject* obj = 0;
4432  TSelector* selector_obj = 0;
4433 
4434  // Process
4435 
4436  // Reset compute stopwatch: we include all what done from now on
4437  fCompute.Reset();
4438  fCompute.Start();
4439 
4440  // Get next query info (also removes query from the list)
4441  pq = NextQuery();
4442  if (pq) {
4443 
4444  // Set not idle
4445  SetIdle(kFALSE);
4446  opt = pq->GetOptions();
4447  input = pq->GetInputList();
4448  nentries = pq->GetEntries();
4449  first = pq->GetFirst();
4450  filename = pq->GetSelecImp()->GetName();
4451  Ssiz_t id = opt.Last('#');
4452  if (id != kNPOS && id < opt.Length() - 1) {
4453  filename += opt(id + 1, opt.Length());
4454  // Remove it from 'opt' so user found on the workers what they specified
4455  opt.Remove(id);
4456  }
4457  // Attach to data set and entry- (or event-) list (if any)
4458  TObject *o = 0;
4459  if ((o = pq->GetInputObject("TDSet"))) {
4460  dset = (TDSet *) o;
4461  } else {
4462  // Should never get here
4463  Error("ProcessNext", "no TDset object: cannot continue");
4464  return;
4465  }
4466  // elist = 0;
4467  // if ((o = pq->GetInputObject("TEntryList")))
4468  // elist = o;
4469  // else if ((o = pq->GetInputObject("TEventList")))
4470  // elist = o;
4471 
4472  // Expand selector files
4473  if (pq->GetSelecImp()) {
4474  gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecImp()->GetName()));
4475  pq->GetSelecImp()->SaveSource(pq->GetSelecImp()->GetName());
4476  }
4477  if (pq->GetSelecHdr() &&
4478  !strstr(pq->GetSelecHdr()->GetName(), "TProofDrawHist")) {
4479  gSystem->Exec(TString::Format("%s %s", kRM, pq->GetSelecHdr()->GetName()));
4480  pq->GetSelecHdr()->SaveSource(pq->GetSelecHdr()->GetName());
4481  }
4482 
4483  // Taking out a TSelector object from input list
4484  TIter nxt(input);
4485  while ((obj = nxt())){
4486  if (obj->InheritsFrom("TSelector") &&
4487  !strcmp(pq->GetSelecImp()->GetName(), obj->ClassName())) {
4488  selector_obj = (TSelector *) obj;
4489  Info("ProcessNext", "found object for selector '%s'", obj->ClassName());
4490  break;
4491  }
4492  }
4493 
4494  } else {
4495  // Should never get here
4496  Error("ProcessNext", "empty waiting queries list!");
4497  return;
4498  }
4499 
4500  // Set in running state
4501  SetQueryRunning(pq);
4502 
4503  // Save to queries dir, if not standard draw
4504  if (fQMgr) {
4505  if (!(pq->IsDraw()))
4506  fQMgr->SaveQuery(pq);
4507  else
4508  fQMgr->IncrementDrawQueries();
4509  fQMgr->ResetTime();
4510  }
4511 
4512  // Signal the client that we are starting a new query
4513  TMessage m(kPROOF_STARTPROCESS);
4514  m << TString(pq->GetSelecImp()->GetName())
4515  << dset->GetNumOfFiles()
4516  << pq->GetFirst() << pq->GetEntries();
4517  fSocket->Send(m);
4518 
4519  // Create player
4520  MakePlayer();
4521 
4522  // Add query results to the player lists
4523  fPlayer->AddQueryResult(pq);
4524 
4525  // Set query currently processed
4526  fPlayer->SetCurrentQuery(pq);
4527 
4528  // Setup data set
4529  if (dset->IsA() == TDSetProxy::Class())
4530  ((TDSetProxy*)dset)->SetProofServ(this);
4531 
4532  // Add the unique query tag as TNamed object to the input list
4533  // so that it is available in TSelectors for monitoring
4534  TString qid = TString::Format("%s:%s",pq->GetTitle(),pq->GetName());
4535  input->Add(new TNamed("PROOF_QueryTag", qid.Data()));
4536  // ... and the sequential number
4537  fQuerySeqNum = pq->GetSeqNum();
4538  input->Add(new TParameter<Int_t>("PROOF_QuerySeqNum", fQuerySeqNum));
4539 
4540  // Check whether we have to enforce the use of submergers, but only if the user did
4541  // not express itself on the subject
4542  if (gEnv->Lookup("Proof.UseMergers") && !input->FindObject("PROOF_UseMergers")) {
4543  Int_t smg = gEnv->GetValue("Proof.UseMergers",-1);
4544  if (smg >= 0) {
4545  input->Add(new TParameter<Int_t>("PROOF_UseMergers", smg));
4546  PDB(kSubmerger, 2) Info("ProcessNext", "PROOF_UseMergers set to %d", smg);
4547  if (gEnv->Lookup("Proof.MergersByHost")) {
4548  Int_t mbh = gEnv->GetValue("Proof.MergersByHost", 0);
4549  if (mbh != 0) {
4550  // Administrator settings have the priority
4551  TObject *o = 0;
4552  if ((o = input->FindObject("PROOF_MergersByHost"))) { input->Remove(o); delete o; }
4553  input->Add(new TParameter<Int_t>("PROOF_MergersByHost", mbh));
4554  PDB(kSubmerger, 2) Info("ProcessNext", "submergers setup by host/node");
4555  }
4556  }
4557  }
4558  }
4559 
4560  // Set input
4561  TIter next(input);
4562  TObject *o = 0;
4563  while ((o = next())) {
4564  PDB(kGlobal, 2) Info("ProcessNext", "adding: %s", o->GetName());
4565  fPlayer->AddInput(o);
4566  }
4567 
4568  // Remove the list of the missing files from the original list, if any
4569  if ((o = input->FindObject("MissingFiles"))) input->Remove(o);
4570 
4571  // Process
4572  PDB(kGlobal, 1) Info("ProcessNext", "calling %s::Process()", fPlayer->IsA()->GetName());
4573  if (selector_obj){
4574  Info("ProcessNext", "calling fPlayer->Process() with selector object: %s", selector_obj->ClassName());
4575  fPlayer->Process(dset, selector_obj, opt, nentries, first);
4576  }
4577  else {
4578  Info("ProcessNext", "calling fPlayer->Process() with selector name: %s", filename.Data());
4579  fPlayer->Process(dset, filename, opt, nentries, first);
4580  }
4581 
4582  // This is the end of merging
4583  fPlayer->SetMerging(kFALSE);
4584 
4585  // Return number of events processed
4586  Bool_t abort =
4587  (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) ? kTRUE : kFALSE;
4588  if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kFinished) {
4589  m.Reset(kPROOF_STOPPROCESS);
4590  // message sent from worker to the master
4591  if (fProtocol > 18) {
4592  TProofProgressStatus* status = fPlayer->GetProgressStatus();
4593  m << status << abort;
4594  status = 0; // the status belongs to the player.
4595  } else if (fProtocol > 8) {
4596  m << fPlayer->GetEventsProcessed() << abort;
4597  } else {
4598  m << fPlayer->GetEventsProcessed();
4599  }
4600  fSocket->Send(m);
4601  }
4602 
4603  // Register any dataset produced during this processing, if required
4604  if (fDataSetManager && fPlayer->GetOutputList()) {
4605  TNamed *psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet");
4606  if (psr) {
4607  TString emsg;
4608  if (RegisterDataSets(input, fPlayer->GetOutputList(), fDataSetManager, emsg) != 0)
4609  Warning("ProcessNext", "problems registering produced datasets: %s", emsg.Data());
4610  do {
4611  fPlayer->GetOutputList()->Remove(psr);
4612  delete psr;
4613  } while ((psr = (TNamed *) fPlayer->GetOutputList()->FindObject("PROOFSERV_RegisterDataSet")));
4614  }
4615  }
4616 
4617  // Complete filling of the TQueryResult instance
4618  if (fQMgr && !pq->IsDraw()) {
4619  if (!abort) fProof->AskStatistics();
4620  if (fQMgr->FinalizeQuery(pq, fProof, fPlayer))
4621  fQMgr->SaveQuery(pq, fMaxQueries);
4622  }
4623 
4624  // If we were requested to save results on the master and we are not in save-to-file mode
4625  // then we save the results
4626  if (IsTopMaster() && fPlayer->GetOutputList()) {
4627  Bool_t save = kTRUE;
4628  TIter nxo(fPlayer->GetOutputList());
4629  TObject *xo = 0;
4630  while ((xo = nxo())) {
4631  if (xo->InheritsFrom("TProofOutputFile") && xo->TestBit(TProofOutputFile::kSwapFile)) {
4632  save = kFALSE;
4633  break;
4634  }
4635  }
4636  if (save) {
4637  TNamed *nof = (TNamed *) input->FindObject("PROOF_DefaultOutputOption");
4638  if (nof) {
4639  TString oopt(nof->GetTitle());
4640  if (oopt.BeginsWith("of:")) {
4641  oopt.Replace(0, 3, "");
4642  if (!oopt.IsNull()) fPlayer->SetOutputFilePath(oopt);
4643  fPlayer->SavePartialResults(kTRUE, kTRUE);
4644  }
4645  }
4646  }
4647  }
4648 
4649  // Send back the results
4650  TQueryResult *pqr = pq->CloneInfo();
4651  // At least the TDSet name in the light object
4652  Info("ProcessNext", "adding info about dataset '%s' in the light query result", dset->GetName());
4653  TList rin;
4654  TDSet *ds = new TDSet(dset->GetName(), dset->GetObjName());
4655  rin.Add(ds);
4656  if (pqr) pqr->SetInputList(&rin, kTRUE);
4657  if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted && fPlayer->GetOutputList()) {
4658  PDB(kGlobal, 2)
4659  Info("ProcessNext", "sending results");
4660  TQueryResult *xpq = (pqr && fProtocol > 10) ? pqr : pq;
4661  if (SendResults(fSocket, fPlayer->GetOutputList(), xpq) != 0)
4662  Warning("ProcessNext", "problems sending output list");
4663  if (slb) slb->Form("%d %lld %lld %.3f", fPlayer->GetExitStatus(), pq->GetEntries(),
4664  pq->GetBytes(), pq->GetUsedCPU());
4665  } else {
4666  if (fPlayer->GetExitStatus() != TVirtualProofPlayer::kAborted)
4667  Warning("ProcessNext","the output list is empty!");
4668  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
4669  Warning("ProcessNext", "problems sending output list");
4670  if (slb) slb->Form("%d -1 -1 %.3f", fPlayer->GetExitStatus(), pq->GetUsedCPU());
4671  }
4672 
4673  // Remove aborted queries from the list
4674  if (fPlayer->GetExitStatus() == TVirtualProofPlayer::kAborted) {
4675  SafeDelete(pqr);
4676  if (fQMgr) fQMgr->RemoveQuery(pq);
4677  } else {
4678  // Keep in memory only light infor about a query
4679  if (!(pq->IsDraw()) && pqr) {
4680  if (fQMgr && fQMgr->Queries()) {
4681  fQMgr->Queries()->Add(pqr);
4682  // Remove from the fQueries list
4683  fQMgr->Queries()->Remove(pq);
4684  }
4685  // These removes 'pq' from the internal player list and
4686  // deletes it; in this way we do not attempt a double delete
4687  // when destroying the player
4688  fPlayer->RemoveQueryResult(TString::Format("%s:%s",
4689  pq->GetTitle(), pq->GetName()));
4690  }
4691  }
4692 
4693  DeletePlayer();
4694  if (IsMaster() && fProof->UseDynamicStartup())
4695  // stop the workers
4696  fProof->RemoveWorkers(0);
4697 }
4698 
4699 ////////////////////////////////////////////////////////////////////////////////
4700 /// Register TFileCollections in 'out' as datasets according to the rules in 'in'
4701 
4702 Int_t TProofServ::RegisterDataSets(TList *in, TList *out,
4703  TDataSetManager *dsm, TString &msg)
4704 {
4705  PDB(kDataset, 1)
4706  ::Info("TProofServ::RegisterDataSets",
4707  "enter: %d objs in the output list", (out ? out->GetSize() : -1));
4708 
4709  if (!in || !out || !dsm) {
4710  ::Error("TProofServ::RegisterDataSets", "invalid inputs: %p, %p, %p", in, out, dsm);
4711  return 0;
4712  }
4713  msg = "";
4714  THashList tags;
4715  TList torm;
4716  TIter nxo(out);
4717  TObject *o = 0;
4718  while ((o = nxo())) {
4719  // Only file collections TFileCollection
4720  TFileCollection *ds = dynamic_cast<TFileCollection*> (o);
4721  if (ds) {
4722  // Origin of this dataset
4723  ds->SetTitle(gSystem->HostName());
4724  // The tag and register option
4725  TNamed *fcn = 0;
4726  TString tag = TString::Format("DATASET_%s", ds->GetName());
4727  if (!(fcn = (TNamed *) out->FindObject(tag))) continue;
4728  // If this tag is in the list of processed tags, flag it for removal
4729  if (tags.FindObject(tag)) {
4730  torm.Add(o);
4731  continue;
4732  }
4733  // Register option
4734  TString regopt(fcn->GetTitle());
4735  // Sort according to the internal index, if required
4736  if (regopt.Contains(":sortidx:")) {
4737  ds->Sort(kTRUE);
4738  regopt.ReplaceAll(":sortidx:", "");
4739  }
4740  // Register this dataset
4741  if (dsm->TestBit(TDataSetManager::kAllowRegister)) {
4742  // Extract the list
4743  if (ds->GetList()->GetSize() > 0) {
4744  // Register the dataset (quota checks are done inside here)
4745  const char *vfmsg = regopt.Contains("V") ? " and verifying" : "";
4746  msg.Form("Registering%s dataset '%s' ... ", vfmsg, ds->GetName());
4747  // Always allow verification for this action
4748  Bool_t allowVerify = dsm->TestBit(TDataSetManager::kAllowVerify) ? kTRUE : kFALSE;
4749  if (regopt.Contains("V") && !allowVerify) dsm->SetBit(TDataSetManager::kAllowVerify);
4750  // Main action
4751  Int_t rc = dsm->RegisterDataSet(ds->GetName(), ds, regopt);
4752  // Reset to the previous state if needed
4753  if (regopt.Contains("V") && !allowVerify) dsm->ResetBit(TDataSetManager::kAllowVerify);
4754  if (rc != 0) {
4755  ::Warning("TProofServ::RegisterDataSets",
4756  "failure registering or verifying dataset '%s'", ds->GetName());
4757  msg.Form("Registering%s dataset '%s' ... failed! See log for more details", vfmsg, ds->GetName());
4758  } else {
4759  ::Info("TProofServ::RegisterDataSets", "dataset '%s' successfully registered%s",
4760  ds->GetName(), (strlen(vfmsg) > 0) ? " and verified" : "");
4761  msg.Form("Registering%s dataset '%s' ... OK", vfmsg, ds->GetName());
4762  // Add tag to the list of processed tags to avoid double processing
4763  // (there may be more objects with the same name, created by each worker)
4764  tags.Add(new TObjString(tag));
4765  }
4766  // Notify
4767  PDB(kDataset, 2) {
4768  ::Info("TProofServ::RegisterDataSets", "printing collection");
4769  ds->Print("F");
4770  }
4771  } else {
4772  ::Warning("TProofServ::RegisterDataSets", "collection '%s' is empty", o->GetName());
4773  }
4774  } else {
4775  ::Info("TProofServ::RegisterDataSets", "dataset registration not allowed");
4776  return -1;
4777  }
4778  }
4779  }
4780  // Cleanup all temporary stuff possibly created by each worker
4781  TIter nxrm(&torm);
4782  while ((o = nxrm())) out->Remove(o);
4783  torm.SetOwner(kTRUE);
4784  // Remove tags
4785  TIter nxtg(&tags);
4786  while((o = nxtg())) {
4787  TObject *oo = 0;
4788  while ((oo = out->FindObject(o->GetName()))) { out->Remove(oo); }
4789  }
4790  tags.SetOwner(kTRUE);
4791 
4792  PDB(kDataset, 1) ::Info("TProofServ::RegisterDataSets", "exit");
4793  // Done
4794  return 0;
4795 }
4796 
4797 ////////////////////////////////////////////////////////////////////////////////
4798 /// Handle request for list of queries.
4799 
4800 void TProofServ::HandleQueryList(TMessage *mess)
4801 {
4802  PDB(kGlobal, 1)
4803  Info("HandleQueryList", "Enter");
4804 
4805  Bool_t all;
4806  (*mess) >> all;
4807 
4808  TList *ql = new TList;
4809  Int_t ntot = 0, npre = 0, ndraw= 0;
4810  if (fQMgr) {
4811  if (all) {
4812  // Rescan
4813  TString qdir = fQueryDir;
4814  Int_t idx = qdir.Index("session-");
4815  if (idx != kNPOS)
4816  qdir.Remove(idx);
4817  fQMgr->ScanPreviousQueries(qdir);
4818  // Send also information about previous queries, if any
4819  if (fQMgr->PreviousQueries()) {
4820  TIter nxq(fQMgr->PreviousQueries());
4821  TProofQueryResult *pqr = 0;
4822  while ((pqr = (TProofQueryResult *)nxq())) {
4823  ntot++;
4824  pqr->fSeqNum = ntot;
4825  ql->Add(pqr);
4826  }
4827  }
4828  }
4829 
4830  npre = ntot;
4831  if (fQMgr->Queries()) {
4832  // Add info about queries in this session
4833  TIter nxq(fQMgr->Queries());
4834  TProofQueryResult *pqr = 0;
4835  TQueryResult *pqm = 0;
4836  while ((pqr = (TProofQueryResult *)nxq())) {
4837  ntot++;
4838  if ((pqm = pqr->CloneInfo())) {
4839  pqm->fSeqNum = ntot;
4840  ql->Add(pqm);
4841  } else {
4842  Warning("HandleQueryList", "unable to clone TProofQueryResult '%s:%s'",
4843  pqr->GetName(), pqr->GetTitle());
4844  }
4845  }
4846  }
4847  // Number of draw queries
4848  ndraw = fQMgr->DrawQueries();
4849  }
4850 
4851  TMessage m(kPROOF_QUERYLIST);
4852  m << npre << ndraw << ql;
4853  fSocket->Send(m);
4854  delete ql;
4855 
4856  // Done
4857  return;
4858 }
4859 
4860 ////////////////////////////////////////////////////////////////////////////////
4861 /// Handle remove request.
4862 
4863 void TProofServ::HandleRemove(TMessage *mess, TString *slb)
4864 {
4865  PDB(kGlobal, 1)
4866  Info("HandleRemove", "Enter");
4867 
4868  TString queryref;
4869  (*mess) >> queryref;
4870 
4871  if (slb) *slb = queryref;
4872 
4873  if (queryref == "cleanupqueue") {
4874  // Remove pending requests
4875  Int_t pend = CleanupWaitingQueries();
4876  // Notify
4877  Info("HandleRemove", "%d queries removed from the waiting list", pend);
4878  // We are done
4879  return;
4880  }
4881 
4882  if (queryref == "cleanupdir") {
4883 
4884  // Cleanup previous sessions results
4885  Int_t nd = (fQMgr) ? fQMgr->CleanupQueriesDir() : -1;
4886 
4887  // Notify
4888  Info("HandleRemove", "%d directories removed", nd);
4889  // We are done
4890  return;
4891  }
4892 
4893 
4894  if (fQMgr) {
4895  TProofLockPath *lck = 0;
4896  if (fQMgr->LockSession(queryref, &lck) == 0) {
4897 
4898  // Remove query
4899  TList qtorm;
4900  fQMgr->RemoveQuery(queryref, &qtorm);
4901  CleanupWaitingQueries(kFALSE, &qtorm);
4902 
4903  // Unlock and remove the lock file
4904  if (lck) {
4905  gSystem->Unlink(lck->GetName());
4906  SafeDelete(lck);
4907  }
4908 
4909  // We are done
4910  return;
4911  }
4912  } else {
4913  Warning("HandleRemove", "query result manager undefined!");
4914  }
4915 
4916  // Notify failure
4917  Info("HandleRemove",
4918  "query %s could not be removed (unable to lock session)", queryref.Data());
4919 
4920  // Done
4921  return;
4922 }
4923 
4924 ////////////////////////////////////////////////////////////////////////////////
4925 /// Handle retrieve request.
4926 
4927 void TProofServ::HandleRetrieve(TMessage *mess, TString *slb)
4928 {
4929  PDB(kGlobal, 1)
4930  Info("HandleRetrieve", "Enter");
4931 
4932  TString queryref;
4933  (*mess) >> queryref;
4934 
4935  if (slb) *slb = queryref;
4936 
4937  // Parse reference string
4938  Int_t qry = -1;
4939  TString qdir;
4940  if (fQMgr) fQMgr->LocateQuery(queryref, qry, qdir);
4941 
4942  TString fout = qdir;
4943  fout += "/query-result.root";
4944 
4945  TFile *f = TFile::Open(fout,"READ");
4946  TProofQueryResult *pqr = 0;
4947  if (f) {
4948  f->ReadKeys();
4949  TIter nxk(f->GetListOfKeys());
4950  TKey *k = 0;
4951  while ((k = (TKey *)nxk())) {
4952  if (!strcmp(k->GetClassName(), "TProofQueryResult")) {
4953  pqr = (TProofQueryResult *) f->Get(k->GetName());
4954  // For backward compatibility
4955  if (pqr && fProtocol < 13) {
4956  TDSet *d = 0;
4957  TObject *o = 0;
4958  TIter nxi(pqr->GetInputList());
4959  while ((o = nxi()))
4960  if ((d = dynamic_cast<TDSet *>(o)))
4961  break;
4962  d->SetWriteV3(kTRUE);
4963  }
4964  if (pqr) {
4965 
4966  // Message for the client
4967  Float_t qsz = (Float_t) f->GetSize();
4968  Int_t ilb = 0;
4969  static const char *clb[4] = { "bytes", "KB", "MB", "GB" };
4970  while (qsz > 1000. && ilb < 3) {
4971  qsz /= 1000.;
4972  ilb++;
4973  }
4974  SendAsynMessage(TString::Format("%s: sending result of %s:%s (%.1f %s)",
4975  fPrefix.Data(), pqr->GetTitle(), pqr->GetName(),
4976  qsz, clb[ilb]));
4977  fSocket->SendObject(pqr, kPROOF_RETRIEVE);
4978  } else {
4979  Info("HandleRetrieve",
4980  "query not found in file %s",fout.Data());
4981  // Notify not found
4982  fSocket->SendObject(0, kPROOF_RETRIEVE);
4983  }
4984  break;
4985  }
4986  }
4987  f->Close();
4988  delete f;
4989  } else {
4990  Info("HandleRetrieve",
4991  "file cannot be open (%s)",fout.Data());
4992  // Notify not found
4993  fSocket->SendObject(0, kPROOF_RETRIEVE);
4994  return;
4995  }
4996 
4997  // Done
4998  return;
4999 }
5000 
5001 ////////////////////////////////////////////////////////////////////////////////
5002 /// Handle lib, inc search paths modification request
5003 
5004 Int_t TProofServ::HandleLibIncPath(TMessage *mess)
5005 {
5006  TString type;
5007  Bool_t add;
5008  TString path;
5009  Int_t rc = 1;
5010  (*mess) >> type >> add >> path;
5011  if (mess->BufferSize() > mess->Length()) (*mess) >> rc;
5012 
5013  // Check type of action
5014  if ((type != "lib") && (type != "inc")) {
5015  Error("HandleLibIncPath","unknown action type: %s", type.Data());
5016  return rc;
5017  }
5018 
5019  // Separators can be either commas or blanks
5020  path.ReplaceAll(","," ");
5021 
5022  // Decompose lists
5023  TObjArray *op = 0;
5024  if (path.Length() > 0 && path != "-") {
5025  if (!(op = path.Tokenize(" "))) {
5026  Error("HandleLibIncPath","decomposing path %s", path.Data());
5027  return rc;
5028  }
5029  }
5030 
5031  if (add) {
5032 
5033  if (type == "lib") {
5034 
5035  // Add libs
5036  TIter nxl(op, kIterBackward);
5037  TObjString *lib = 0;
5038  while ((lib = (TObjString *) nxl())) {
5039  // Expand path
5040  TString xlib = lib->GetName();
5041  gSystem->ExpandPathName(xlib);
5042  // Add to the dynamic lib search path if it exists and can be read
5043  if (!gSystem->AccessPathName(xlib, kReadPermission)) {
5044  TString newlibpath = gSystem->GetDynamicPath();
5045  // In the first position after the working dir
5046  Int_t pos = 0;
5047  if (newlibpath.BeginsWith(".:"))
5048  pos = 2;
5049  if (newlibpath.Index(xlib) == kNPOS) {
5050  newlibpath.Insert(pos,TString::Format("%s:", xlib.Data()));
5051  gSystem->SetDynamicPath(newlibpath);
5052  }
5053  } else {
5054  Info("HandleLibIncPath",
5055  "libpath %s does not exist or cannot be read - not added", xlib.Data());
5056  }
5057  }
5058 
5059  // Forward the request, if required
5060  if (IsMaster())
5061  fProof->AddDynamicPath(path);
5062 
5063  } else {
5064 
5065  // Add incs
5066  TIter nxi(op);
5067  TObjString *inc = 0;
5068  while ((inc = (TObjString *) nxi())) {
5069  // Expand path
5070  TString xinc = inc->GetName();
5071  gSystem->ExpandPathName(xinc);
5072  // Add to the dynamic lib search path if it exists and can be read
5073  if (!gSystem->AccessPathName(xinc, kReadPermission)) {
5074  TString curincpath = gSystem->GetIncludePath();
5075  if (curincpath.Index(xinc) == kNPOS)
5076  gSystem->AddIncludePath(TString::Format("-I%s", xinc.Data()));
5077  } else
5078  Info("HandleLibIncPath",
5079  "incpath %s does not exist or cannot be read - not added", xinc.Data());
5080  }
5081 
5082  // Forward the request, if required
5083  if (IsMaster())
5084  fProof->AddIncludePath(path);
5085  }
5086 
5087 
5088  } else {
5089 
5090  if (type == "lib") {
5091 
5092  // Remove libs
5093  TIter nxl(op);
5094  TObjString *lib = 0;
5095  while ((lib = (TObjString *) nxl())) {
5096  // Expand path
5097  TString xlib = lib->GetName();
5098  gSystem->ExpandPathName(xlib);
5099  // Remove from the dynamic lib search path
5100  TString newlibpath = gSystem->GetDynamicPath();
5101  newlibpath.ReplaceAll(TString::Format("%s:", xlib.Data()),"");
5102  gSystem->SetDynamicPath(newlibpath);
5103  }
5104 
5105  // Forward the request, if required
5106  if (IsMaster())
5107  fProof->RemoveDynamicPath(path);
5108 
5109  } else {
5110 
5111  // Remove incs
5112  TIter nxi(op);
5113  TObjString *inc = 0;
5114  while ((inc = (TObjString *) nxi())) {
5115  TString newincpath = gSystem->GetIncludePath();
5116  newincpath.ReplaceAll(TString::Format("-I%s", inc->GetName()),"");
5117  // Remove the interpreter path (added anyhow internally)
5118  newincpath.ReplaceAll(gInterpreter->GetIncludePath(),"");
5119  gSystem->SetIncludePath(newincpath);
5120  }
5121 
5122  // Forward the request, if required
5123  if (IsMaster())
5124  fProof->RemoveIncludePath(path);
5125  }
5126  }
5127  // Done
5128  return rc;
5129 }
5130 
5131 ////////////////////////////////////////////////////////////////////////////////
5132 /// Handle file checking request.
5133 
5134 void TProofServ::HandleCheckFile(TMessage *mess, TString *slb)
5135 {
5136  TString filenam;
5137  TMD5 md5;
5138  UInt_t opt = TProof::kUntar;
5139 
5140  TMessage reply(kPROOF_CHECKFILE);
5141 
5142  // Parse message
5143  (*mess) >> filenam >> md5;
5144  if ((mess->BufferSize() > mess->Length()) && (fProtocol > 8))
5145  (*mess) >> opt;
5146 
5147  if (slb) *slb = filenam;
5148 
5149  if (filenam.BeginsWith("-")) {
5150  // install package:
5151  // compare md5's, untar, store md5 in PROOF-INF, remove par file
5152  Int_t st = 0;
5153  Bool_t err = kFALSE;
5154  filenam = filenam.Strip(TString::kLeading, '-');
5155  TString packnam = filenam;
5156  packnam.Remove(packnam.Length() - 4); // strip off ".par"
5157  // compare md5's to check if transmission was ok
5158  TMD5 *md5local = fPackMgr->GetMD5(packnam);
5159  if (md5local && md5 == (*md5local)) {
5160  if ((opt & TProof::kRemoveOld)) {
5161  if ((st = fPackMgr->Clean(packnam)))
5162  Error("HandleCheckFile", "failure cleaning %s", packnam.Data());
5163  }
5164  // Unpack
5165  st = fPackMgr->Unpack(packnam, md5local);
5166  if (st == 0) {
5167  // Notify the client
5168  reply << (Int_t)1;
5169  PDB(kPackage, 1)
5170  Info("HandleCheckFile",
5171  "package %s installed on node", filenam.Data());
5172  } else {
5173  if (st == -2) {
5174  Error("HandleCheckFile", "gunzip not found");
5175  } else {
5176  Error("HandleCheckFile", "package %s did not unpack into %s",
5177  filenam.Data(), packnam.Data());
5178  }
5179  reply << (Int_t)0;
5180  if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5181  err = kTRUE;
5182  }
5183  } else {
5184  reply << (Int_t)0;
5185  if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5186  err = kTRUE;
5187  PDB(kPackage, 1)
5188  Error("HandleCheckFile",
5189  "package %s not yet on node", filenam.Data());
5190  }
5191 
5192  // Note: Originally an fPackageLock->Unlock() call was made
5193  // after the if-else statement below. With multilevel masters,
5194  // submasters still check to make sure the package exists with
5195  // the correct md5 checksum and need to do a read lock there.
5196  // As yet locking is not that sophisicated so the lock must
5197  // be released below before the call to fProof->UploadPackage().
5198  if (err) {
5199  // delete par file in case of error
5200  fPackMgr->Remove(filenam);
5201  } else if (IsMaster()) {
5202  // forward to workers
5203  TString parpath;
5204  fPackMgr->GetParPath(filenam, parpath);
5205  if (fProof->UploadPackage(parpath,
5206  (TProof::EUploadPackageOpt)opt) != 0)
5207  Info("HandleCheckFile",
5208  "problems uploading package %s", parpath.Data());
5209  }
5210  delete md5local;
5211  fSocket->Send(reply);
5212 
5213  } else if (filenam.BeginsWith("+") || filenam.BeginsWith("=")) {
5214  filenam.Remove(0,1);
5215 
5216  TString parname = filenam;
5217  // If remote install it from there
5218  TFile::EFileType ft = TFile::GetType(filenam);
5219  if (ft == TFile::kWeb || ft == TFile::kNet) {
5220  parname = gSystem->BaseName(filenam);
5221  if (fPackMgr->Install(filenam) < 0) {
5222  Warning("HandleCheckFile",
5223  "problems installing package %s", filenam.Data());
5224 
5225  }
5226  }
5227 
5228  // check file in package directory
5229  TMD5 *md5local = fPackMgr->ReadMD5(parname);
5230 
5231  if (md5local && md5 == (*md5local)) {
5232  // package already on server, unlock directory
5233  reply << (Int_t)1;
5234  PDB(kPackage, 1)
5235  Info("HandleCheckFile",
5236  "package %s already on node", parname.Data());
5237  if (IsMaster()) {
5238  Int_t xrc = 0;
5239  TString par = filenam;
5240  if (ft != TFile::kWeb && ft != TFile::kNet) {
5241  xrc = fPackMgr->GetParPath(filenam, par);
5242  }
5243  if (xrc == 0) {
5244  if (fProof->UploadPackage(par) != 0)
5245  Warning("HandleCheckFile",
5246  "problems uploading package %s", par.Data());
5247  }
5248  }
5249 
5250  } else {
5251  reply << (Int_t)0;
5252  if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5253  PDB(kPackage, 1)
5254  Info("HandleCheckFile",
5255  "package %s not yet on node", filenam.Data());
5256  }
5257  delete md5local;
5258  fSocket->Send(reply);
5259 
5260  } else {
5261  // check file in cache directory
5262  TString cachef = fCacheDir + "/" + filenam;
5263  fCacheLock->Lock();
5264  TMD5 *md5local = TMD5::FileChecksum(cachef);
5265 
5266  if (md5local && md5 == (*md5local)) {
5267  reply << (Int_t)1;
5268  PDB(kCache, 1)
5269  Info("HandleCheckFile", "file %s already on node", filenam.Data());
5270  } else {
5271  reply << (Int_t)0;
5272  if (fProtocol <= 19) reply.Reset(kPROOF_FATAL);
5273  PDB(kCache, 1)
5274  Info("HandleCheckFile", "file %s not yet on node", filenam.Data());
5275  }
5276  delete md5local;
5277  fSocket->Send(reply);
5278  fCacheLock->Unlock();
5279  }
5280 }
5281 
5282 ////////////////////////////////////////////////////////////////////////////////
5283 /// Handle here all cache and package requests.
5284 
5285 Int_t TProofServ::HandleCache(TMessage *mess, TString *slb)
5286 {
5287  PDB(kGlobal, 1)
5288  Info("HandleCache", "Enter");
5289 
5290  Int_t status = 0;
5291  Int_t type = 0;
5292  Bool_t all = kFALSE;
5293  TMessage msg;
5294  Bool_t fromglobal = kFALSE;
5295  Int_t chkveropt = TPackMgr::kCheckROOT; // Default: check ROOT version
5296  TPackMgr *packmgr = 0;
5297 
5298  // Notification message
5299  TString noth;
5300  const char *k = (IsMaster()) ? "Mst" : "Wrk";
5301  noth.Form("%s-%s", k, fOrdinal.Data());
5302 
5303  TList *optls = 0;
5304  TString packagedir, package, pdir, ocwd, file;
5305  (*mess) >> type;
5306  switch (type) {
5307  case TProof::kShowCache:
5308  (*mess) >> all;
5309  printf("*** File cache %s:%s ***\n", gSystem->HostName(),
5310  fCacheDir.Data());
5311  fflush(stdout);
5312  PDB(kCache, 1) {
5313  gSystem->Exec(TString::Format("%s -a %s", kLS, fCacheDir.Data()));
5314  } else {
5315  gSystem->Exec(TString::Format("%s %s", kLS, fCacheDir.Data()));
5316  }
5317  if (IsMaster() && all)
5318  fProof->ShowCache(all);
5319  LogToMaster();
5320  if (slb) slb->Form("%d %d", type, all);
5321  break;
5322  case TProof::kClearCache:
5323  file = "";
5324  if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
5325  fCacheLock->Lock();
5326  if (file.IsNull() || file == "*") {
5327  gSystem->Exec(TString::Format("%s %s/* %s/.*.binversion", kRM, fCacheDir.Data(), fCacheDir.Data()));
5328  } else {
5329  gSystem->Exec(TString::Format("%s %s/%s", kRM, fCacheDir.Data(), file.Data()));
5330  }
5331  fCacheLock->Unlock();
5332  if (IsMaster())
5333  fProof->ClearCache(file);
5334  if (slb) slb->Form("%d %s", type, file.Data());
5335  break;
5336  case TProof::kShowPackages:
5337  (*mess) >> all;
5338  fPackMgr->Show();
5339  if (IsMaster() && all)
5340  fProof->ShowPackages(all);
5341  LogToMaster();
5342  if (slb) slb->Form("%d %d", type, all);
5343  break;
5344  case TProof::kClearPackages:
5345  if ((status = fPackMgr->Unload(0)) == 0) {
5346  fPackMgr->Remove();
5347  if (IsMaster())
5348  status = fProof->ClearPackages();
5349  }
5350  if (slb) slb->Form("%d %d", type, status);
5351  break;
5352  case TProof::kClearPackage:
5353  (*mess) >> package;
5354  if ((status = fPackMgr->Unload(package)) == 0) {
5355  fPackMgr->Remove(package);
5356  if (IsMaster())
5357  status = fProof->ClearPackage(package);
5358  }
5359  if (slb) slb->Form("%d %s %d", type, package.Data(), status);
5360  break;
5361  case TProof::kBuildPackage:
5362  (*mess) >> package;
5363  if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
5364 
5365  if (!(packmgr = TPackMgr::GetPackMgr(package.Data(), fPackMgr))) {
5366  // Package not found
5367  SendAsynMessage(TString::Format("%s: kBuildPackage: failure locating %s ...",
5368  noth.Data(), package.Data()));
5369  status = -1;
5370  break;
5371  }
5372  fromglobal = (packmgr == fPackMgr) ? kFALSE : kTRUE;
5373  packagedir = packmgr->GetTitle();
5374 
5375  if (IsMaster() && !fromglobal) {
5376  // Make sure package is available on all slaves, even new ones
5377  TString par;
5378  Int_t xrc = packmgr->GetParPath(package, par);
5379  if (xrc != 0 || fProof->UploadPackage(par) != 0) {
5380  Warning("HandleCache",
5381  "kBuildPackage: problems forwarding package %s to workers", package.Data());
5382  SendAsynMessage(TString::Format("%s: kBuildPackage: problems forwarding package %s to workers ...",
5383  noth.Data(), package.Data()));
5384  }
5385  }
5386 
5387  if (!status) {
5388 
5389  PDB(kPackage, 1)
5390  Info("HandleCache",
5391  "kBuildPackage: package %s exists and has PROOF-INF directory", package.Data());
5392 
5393  // Forward build command to slaves, but don't wait for results
5394  if (IsMaster())
5395  fProof->BuildPackage(package, TProof::kBuildOnSlavesNoWait);
5396 
5397  // Build here
5398  status = packmgr->Build(package.Data(), chkveropt);
5399  }
5400 
5401  if (status) {
5402  // Notify the upper level
5403  SendAsynMessage(TString::Format("%s: failure building %s ... (status: %d)", noth.Data(), package.Data(), status));
5404  } else {
5405  // collect built results from slaves
5406  if (IsMaster())
5407  status = fProof->BuildPackage(package, TProof::kCollectBuildResults);
5408  PDB(kPackage, 1)
5409  Info("HandleCache", "package %s successfully built", package.Data());
5410  }
5411  if (slb) slb->Form("%d %s %d %d", type, package.Data(), status, chkveropt);
5412  break;
5413 
5414  case TProof::kLoadPackage:
5415  (*mess) >> package;
5416 
5417  // Get argument, if any
5418  if ((mess->BufferSize() > mess->Length())) (*mess) >> optls;
5419  // Load the package
5420  if ((status = fPackMgr->Load(package.Data(), optls)) < 0) {
5421 
5422  // Notify the upper level
5423  SendAsynMessage(TString::Format("%s: failure loading %s, args: %p (%d) ...",
5424  noth.Data(), package.Data(), optls,
5425  (optls && optls->GetSize() > 0) ? optls->GetSize() : 0));
5426 
5427  } else {
5428 
5429  if (IsMaster()) {
5430  if (optls && optls->GetSize() > 0) {
5431  // List argument
5432  status = fProof->LoadPackage(package, kFALSE, optls);
5433  } else {
5434  // No argument
5435  status = fProof->LoadPackage(package);
5436  }
5437  }
5438 
5439  PDB(kPackage, 1)
5440  Info("HandleCache", "package %s successfully loaded", package.Data());
5441  }
5442 
5443  if (slb) slb->Form("%d %s %d", type, package.Data(), status);
5444  break;
5445 
5446  case TProof::kShowEnabledPackages:
5447  (*mess) >> all;
5448  { TString title("*** Enabled packages ***");
5449  if (!IsMaster() || all) {
5450  title.Form("*** Enabled packages on %s %s on %s",
5451  (IsMaster()) ? "master" : "worker",
5452  fOrdinal.Data(), gSystem->HostName());
5453  }
5454  fPackMgr->ShowEnabled(title);
5455  }
5456  if (IsMaster() && all)
5457  fProof->ShowEnabledPackages(all);
5458  LogToMaster();
5459  if (slb) slb->Form("%d %d", type, all);
5460  break;
5461  case TProof::kShowSubCache:
5462  (*mess) >> all;
5463  if (IsMaster() && all)
5464  fProof->ShowCache(all);
5465  LogToMaster();
5466  if (slb) slb->Form("%d %d", type, all);
5467  break;
5468  case TProof::kClearSubCache:
5469  file = "";
5470  if ((mess->BufferSize() > mess->Length())) (*mess) >> file;
5471  if (IsMaster())
5472  fProof->ClearCache(file);
5473  if (slb) slb->Form("%d %s", type, file.Data());
5474  break;
5475  case TProof::kShowSubPackages:
5476  (*mess) >> all;
5477  if (IsMaster() && all)
5478  fProof->ShowPackages(all);
5479  LogToMaster();
5480  if (slb) slb->Form("%d %d", type, all);
5481  break;
5482  case TProof::kDisableSubPackages:
5483  if (IsMaster())
5484  fProof->DisablePackages();
5485  if (slb) slb->Form("%d", type);
5486  break;
5487  case TProof::kDisableSubPackage:
5488  (*mess) >> package;
5489  if (IsMaster())
5490  fProof->DisablePackage(package);
5491  if (slb) slb->Form("%d %s", type, package.Data());
5492  break;
5493  case TProof::kBuildSubPackage:
5494  (*mess) >> package;
5495  if ((mess->BufferSize() > mess->Length())) (*mess) >> chkveropt;
5496  if (IsMaster())
5497  fProof->BuildPackage(package, TProof::kBuildAll, chkveropt);
5498  if (slb) slb->Form("%d %s %d", type, package.Data(), chkveropt);
5499  break;
5500  case TProof::kUnloadPackage:
5501  (*mess) >> package;
5502  status = fPackMgr->Unload(package);
5503  if (IsMaster() && status == 0)
5504  status = fProof->UnloadPackage(package);
5505  if (slb) slb->Form("%d %s %d", type, package.Data(), status);
5506  break;
5507  case TProof::kDisablePackage:
5508  (*mess) >> package;
5509  fPackMgr->Remove(package);
5510  if (IsMaster())
5511  fProof->DisablePackage(package);
5512  if (slb) slb->Form("%d %s", type, package.Data());
5513  break;
5514  case TProof::kUnloadPackages:
5515  status = fPackMgr->Unload(0);
5516  if (IsMaster() && status == 0)
5517  status = fProof->UnloadPackages();
5518  if (slb) slb->Form("%d %s %d", type, package.Data(), status);
5519  break;
5520  case TProof::kDisablePackages:
5521  fPackMgr->Remove();
5522  if (IsMaster())
5523  fProof->DisablePackages();
5524  if (slb) slb->Form("%d %s", type, package.Data());
5525  break;
5526  case TProof::kListEnabledPackages:
5527  msg.Reset(kPROOF_PACKAGE_LIST);
5528  { TList *epl = fPackMgr->GetListOfEnabled();
5529  msg << type << epl;
5530  fSocket->Send(msg);
5531  epl->SetOwner();
5532  delete epl;
5533  }
5534  if (slb) slb->Form("%d", type);
5535  break;
5536  case TProof::kListPackages:
5537  {
5538  TList *pack = fPackMgr->GetList();
5539  msg.Reset(kPROOF_PACKAGE_LIST);
5540  msg << type << pack;
5541  fSocket->Send(msg);
5542  pack->SetOwner(kTRUE);
5543  delete pack;
5544  }
5545  if (slb) slb->Form("%d", type);
5546  break;
5547  case TProof::kLoadMacro:
5548  {
5549  (*mess) >> package;
5550 
5551  // By first forwarding the load command to the unique workers
5552  // and only then loading locally we load/build in parallel
5553  if (IsMaster())
5554  fProof->Load(package, kFALSE, kTRUE);
5555 
5556  // Atomic action
5557  fCacheLock->Lock();
5558 
5559  TString originalCwd = gSystem->WorkingDirectory();
5560  gSystem->ChangeDirectory(fCacheDir.Data());
5561 
5562  // Load the macro
5563  TString pack(package);
5564  Ssiz_t from = 0;
5565  if ((from = pack.Index(",")) != kNPOS) pack.Remove(from);
5566  Info("HandleCache", "loading macro %s ...", pack.Data());
5567  gROOT->ProcessLine(TString::Format(".L %s", pack.Data()));
5568 
5569  // Release atomicity
5570  gSystem->ChangeDirectory(originalCwd.Data());
5571  fCacheLock->Unlock();
5572 
5573  // Now we collect the result from the unique workers and send the load request
5574  // to the other workers (no compilation)
5575  if (IsMaster())
5576  fProof->Load(package, kFALSE, kFALSE);
5577 
5578  // Notify the upper level
5579  LogToMaster();
5580 
5581  if (slb) slb->Form("%d %s", type, package.Data());
5582  }
5583  break;
5584  default:
5585  Error("HandleCache", "unknown type %d", type);
5586  break;
5587  }
5588 
5589  // We are done
5590  return status;
5591 }
5592 
5593 ////////////////////////////////////////////////////////////////////////////////
5594 /// Handle here all requests to modify worker lists
5595 
5596 Int_t TProofServ::HandleWorkerLists(TMessage *mess)
5597 {
5598  PDB(kGlobal, 1)
5599  Info("HandleWorkerLists", "Enter");
5600 
5601  Int_t type = 0, rc = 0;
5602  TString ord;
5603 
5604  (*mess) >> type;
5605 
5606  switch (type) {
5607  case TProof::kActivateWorker:
5608  (*mess) >> ord;
5609  if (ord != "*" && !ord.BeginsWith(GetOrdinal()) && ord != "restore") break;
5610  if (fProof) {
5611  Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
5612  Int_t nactmax = fProof->GetListOfSlaves()->GetSize() -
5613  fProof->GetListOfBadSlaves()->GetSize();
5614  if (nact < nactmax || !IsEndMaster()) {
5615  Int_t nwc = fProof->ActivateWorker(ord);
5616  Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
5617  if (ord == "*") {
5618  if (nactnew == nactmax) {
5619  PDB(kGlobal, 1) Info("HandleWorkerList", "all workers (re-)activated");
5620  } else {
5621  if (IsEndMaster())
5622  PDB(kGlobal, 1) Info("HandleWorkerList", "%d workers could not be (re-)activated", nactmax - nactnew);
5623  }
5624  } else if (ord == "restore") {
5625  if (nwc > 0) {
5626  PDB(kGlobal, 1) Info("HandleWorkerList","active worker(s) restored");
5627  } else {
5628  Error("HandleWorkerList", "some active worker(s) could not be restored; check logs");
5629  }
5630  } else {
5631  if (nactnew == (nact + nwc)) {
5632  if (nwc > 0)
5633  PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s (re-)activated", ord.Data());
5634  } else {
5635  if (nwc != -2 && IsEndMaster()) {
5636  Error("HandleWorkerList", "some worker(s) could not be (re-)activated;"
5637  " # of actives: %d --> %d (nwc: %d)",
5638  nact, nactnew, nwc);
5639  }
5640  rc = (nwc < 0) ? nwc : -1;
5641  }
5642  }
5643  } else {
5644  PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already active");
5645  }
5646  } else {
5647  Warning("HandleWorkerList","undefined PROOF session: protocol error?");
5648  }
5649  break;
5650  case TProof::kDeactivateWorker:
5651  (*mess) >> ord;
5652  if (ord != "*" && !ord.BeginsWith(GetOrdinal()) && ord != "restore") break;
5653  if (fProof) {
5654  Int_t nact = fProof->GetListOfActiveSlaves()->GetSize();
5655  if (nact > 0) {
5656  Int_t nwc = fProof->DeactivateWorker(ord);
5657  Int_t nactnew = fProof->GetListOfActiveSlaves()->GetSize();
5658  if (ord == "*") {
5659  if (nactnew == 0) {
5660  PDB(kGlobal, 1) Info("HandleWorkerList","all workers deactivated");
5661  } else {
5662  if (IsEndMaster())
5663  PDB(kGlobal, 1) Info("HandleWorkerList","%d workers could not be deactivated", nactnew);
5664  }
5665  } else {
5666  if (nactnew == (nact - nwc)) {
5667  if (nwc > 0)
5668  PDB(kGlobal, 1) Info("HandleWorkerList","worker(s) %s deactivated", ord.Data());
5669  } else {
5670  if (nwc != -2 && IsEndMaster()) {
5671  Error("HandleWorkerList", "some worker(s) could not be deactivated:"
5672  " # of actives: %d --> %d (nwc: %d)",
5673  nact, nactnew, nwc);
5674  }
5675  rc = (nwc < 0) ? nwc : -1;
5676  }
5677  }
5678  } else {
5679  PDB(kGlobal, 1) Info("HandleWorkerList","all workers are already inactive");
5680  }
5681  } else {
5682  Warning("HandleWorkerList","undefined PROOF session: protocol error?");
5683  }
5684  break;
5685  default:
5686  Warning("HandleWorkerList","unknown action type (%d)", type);
5687  rc = -1;
5688  }
5689  // Done
5690  return rc;
5691 }
5692 
5693 ////////////////////////////////////////////////////////////////////////////////
5694 /// Get list of workers to be used from now on.
5695 /// The list must be provided by the caller.
5696 
5697 TProofServ::EQueryAction TProofServ::GetWorkers(TList *workers,
5698  Int_t & /* prioritychange */,
5699  Bool_t /* resume */)
5700 {
5701  // Parse the config file
5702  TProofResourcesStatic *resources =
5703  new TProofResourcesStatic(fConfDir, fConfFile);
5704  fConfFile = resources->GetFileName(); // Update the global file name (with path)
5705  PDB(kGlobal,1)
5706  Info("GetWorkers", "using PROOF config file: %s", fConfFile.Data());
5707 
5708  // Get the master
5709  TProofNodeInfo *master = resources->GetMaster();
5710  if (!master) {
5711  PDB(kAll,1)
5712  Info("GetWorkers",
5713  "no appropriate master line found in %s", fConfFile.Data());
5714  return kQueryStop;
5715  } else {
5716  // Set image if not yet done and available
5717  if (fImage.IsNull() && strlen(master->GetImage()) > 0)
5718  fImage = master->GetImage();
5719  }
5720 
5721  // Fill submaster or worker list
5722  if (workers) {
5723  if (resources->GetSubmasters() && resources->GetSubmasters()->GetSize() > 0) {
5724  PDB(kAll,1)
5725  resources->GetSubmasters()->Print();
5726  TProofNodeInfo *ni = 0;
5727  TIter nw(resources->GetSubmasters());
5728  while ((ni = (TProofNodeInfo *) nw()))
5729  workers->Add(new TProofNodeInfo(*ni));
5730  } else if (resources->GetWorkers() && resources->GetWorkers()->GetSize() > 0) {
5731  PDB(kAll,1)
5732  resources->GetWorkers()->Print();
5733  TProofNodeInfo *ni = 0;
5734  TIter nw(resources->GetWorkers());
5735  while ((ni = (TProofNodeInfo *) nw()))
5736  workers->Add(new TProofNodeInfo(*ni));
5737  }
5738  }
5739 
5740  // We are done
5741  return kQueryOK;
5742 }
5743 
5744 ////////////////////////////////////////////////////////////////////////////////
5745 /// Set the file stream where to log (default stderr).
5746 /// If ferr == 0 the default is restored.
5747 /// Returns current setting.
5748 
5749 FILE *TProofServ::SetErrorHandlerFile(FILE *ferr)
5750 {
5751  FILE *oldferr = fgErrorHandlerFile;
5752  fgErrorHandlerFile = (ferr) ? ferr : stderr;
5753  return oldferr;
5754 }
5755 
5756 ////////////////////////////////////////////////////////////////////////////////
5757 /// The PROOF error handler function. It prints the message on fgErrorHandlerFile and
5758 /// if abort is set it aborts the application.
5759 
5760 void TProofServ::ErrorHandler(Int_t level, Bool_t abort, const char *location,
5761  const char *msg)
5762 {
5763  if (gErrorIgnoreLevel == kUnset) {
5764  gErrorIgnoreLevel = 0;
5765  if (gEnv) {
5766  TString lvl = gEnv->GetValue("Root.ErrorIgnoreLevel", "Print");
5767  if (!lvl.CompareTo("Print", TString::kIgnoreCase))
5768  gErrorIgnoreLevel = kPrint;
5769  else if (!lvl.CompareTo("Info", TString::kIgnoreCase))
5770  gErrorIgnoreLevel = kInfo;
5771  else if (!lvl.CompareTo("Warning", TString::kIgnoreCase))
5772  gErrorIgnoreLevel = kWarning;
5773  else if (!lvl.CompareTo("Error", TString::kIgnoreCase))
5774  gErrorIgnoreLevel = kError;
5775  else if (!lvl.CompareTo("Break", TString::kIgnoreCase))
5776  gErrorIgnoreLevel = kBreak;
5777  else if (!lvl.CompareTo("SysError", TString::kIgnoreCase))
5778  gErrorIgnoreLevel = kSysError;
5779  else if (!lvl.CompareTo("Fatal", TString::kIgnoreCase))
5780  gErrorIgnoreLevel = kFatal;
5781  }
5782  }
5783 
5784  if (level < gErrorIgnoreLevel)
5785  return;
5786 
5787  // Always communicate errors via SendLogFile
5788  if (level >= kError && gProofServ)
5789  gProofServ->LogToMaster();
5790 
5791  Bool_t tosyslog = (fgLogToSysLog > 2) ? kTRUE : kFALSE;
5792 
5793  const char *type = 0;
5794  ELogLevel loglevel = kLogInfo;
5795 
5796  Int_t ipos = (location) ? strlen(location) : 0;
5797 
5798  if (level >= kPrint) {
5799  loglevel = kLogInfo;
5800  type = "Print";
5801  }
5802  if (level >= kInfo) {
5803  loglevel = kLogInfo;
5804  char *ps = location ? (char *) strrchr(location, '|') : (char *)0;
5805  if (ps) {
5806  ipos = (int)(ps - (char *)location);
5807  type = "SvcMsg";
5808  } else {
5809  type = "Info";
5810  }
5811  }
5812  if (level >= kWarning) {
5813  loglevel = kLogWarning;
5814  type = "Warning";
5815  }
5816  if (level >= kError) {
5817  loglevel = kLogErr;
5818  type = "Error";
5819  }
5820  if (level >= kBreak) {
5821  loglevel = kLogErr;
5822  type = "*** Break ***";
5823  }
5824  if (level >= kSysError) {
5825  loglevel = kLogErr;
5826  type = "SysError";
5827  }
5828  if (level >= kFatal) {
5829  loglevel = kLogErr;
5830  type = "Fatal";
5831  }
5832 
5833 
5834  TString buf;
5835 
5836  // Time stamp
5837  TTimeStamp ts;
5838  TString st(ts.AsString("lc"),19);
5839 
5840  if (!location || ipos == 0 ||
5841  (level >= kPrint && level < kInfo) ||
5842  (level >= kBreak && level < kSysError)) {
5843  fprintf(fgErrorHandlerFile, "%s %5d %s | %s: %s\n", st(11,8).Data(),
5844  gSystem->GetPid(),
5845  (gProofServ ? gProofServ->GetPrefix() : "proof"),
5846  type, msg);
5847  if (tosyslog)
5848  buf.Form("%s: %s:%s", fgSysLogEntity.Data(), type, msg);
5849  } else {
5850  fprintf(fgErrorHandlerFile, "%s %5d %s | %s in <%.*s>: %s\n", st(11,8).Data(),
5851  gSystem->GetPid(),
5852  (gProofServ ? gProofServ->GetPrefix() : "proof"),
5853  type, ipos, location, msg);
5854  if (tosyslog)
5855  buf.Form("%s: %s:<%.*s>: %s", fgSysLogEntity.Data(), type, ipos, location, msg);
5856  }
5857  fflush(fgErrorHandlerFile);
5858 
5859  if (tosyslog)
5860  gSystem->Syslog(loglevel, buf);
5861 
5862 #ifdef __APPLE__
5863  if (__crashreporter_info__)
5864  delete [] __crashreporter_info__;
5865  __crashreporter_info__ = StrDup(buf);
5866 #endif
5867 
5868  if (abort) {
5869 
5870  static Bool_t recursive = kFALSE;
5871 
5872  if (gProofServ != 0 && !recursive) {
5873  recursive = kTRUE;
5874  if (gProofServ->GetSocket()) gProofServ->GetSocket()->Send(kPROOF_FATAL);
5875  recursive = kFALSE;
5876  }
5877 
5878  fprintf(fgErrorHandlerFile, "aborting\n");
5879  fflush(fgErrorHandlerFile);
5880  gSystem->StackTrace();
5881  gSystem->Abort();
5882  }
5883 }
5884 
5885 ////////////////////////////////////////////////////////////////////////////////
5886 /// Make player instance.
5887 
5888 void TProofServ::MakePlayer()
5889 {
5890  TVirtualProofPlayer *p = 0;
5891 
5892  // Cleanup first
5893  DeletePlayer();
5894 
5895  if (IsParallel()) {
5896  // remote mode
5897  p = fProof->MakePlayer();
5898  } else {
5899  // slave or sequential mode
5900  p = TVirtualProofPlayer::Create("slave", 0, fSocket);
5901  if (IsMaster())
5902  fProof->SetPlayer(p);
5903  }
5904 
5905  // set player
5906  fPlayer = p;
5907 }
5908 
5909 ////////////////////////////////////////////////////////////////////////////////
5910 /// Delete player instance.
5911 
5912 void TProofServ::DeletePlayer()
5913 {
5914  if (IsMaster()) {
5915  PDB(kGlobal, 1) {
5916  fCompute.Stop();
5917  Printf(" +++ Latest processing times: %f s (CPU: %f s)",
5918  fCompute.RealTime(), fCompute.CpuTime());
5919  }
5920  if (fProof) fProof->SetPlayer(0);
5921  } else {
5922  SafeDelete(fPlayer);
5923  }
5924  fPlayer = 0;
5925 }
5926 
5927 ////////////////////////////////////////////////////////////////////////////////
5928 /// Get the processing priority for the group the user belongs too. This
5929 /// priority is a number (0 - 100) determined by a scheduler (third
5930 /// party process) based on some basic priority the group has, e.g.
5931 /// we might want to give users in a specific group (e.g. promptana)
5932 /// a higher priority than users in other groups, and on the analysis
5933 /// of historical logging data (i.e. usage of CPU by the group in a
5934 /// previous time slot, as recorded in TPerfStats::WriteQueryLog()).
5935 ///
5936 /// Currently the group priority is obtained by a query in a SQL DB
5937 /// table proofpriority, which has the format:
5938 /// CREATE TABLE proofpriority (
5939 /// id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
5940 /// group VARCHAR(32) NOT NULL,
5941 /// priority INT
5942 ///)
5943 
5944 Int_t TProofServ::GetPriority()
5945 {
5946  TString sqlserv = gEnv->GetValue("ProofServ.QueryLogDB","");
5947  TString sqluser = gEnv->GetValue("ProofServ.QueryLogUser","");
5948  TString sqlpass = gEnv->GetValue("ProofServ.QueryLogPasswd","");
5949 
5950  Int_t priority = 100;
5951 
5952  if (sqlserv == "")
5953  return priority;
5954 
5955  TString sql;
5956  sql.Form("SELECT priority WHERE group='%s' FROM proofpriority", fGroup.Data());
5957 
5958  // open connection to SQL server
5959  TSQLServer *db = TSQLServer::Connect(sqlserv, sqluser, sqlpass);
5960 
5961  if (!db || db->IsZombie()) {
5962  Error("GetPriority", "failed to connect to SQL server %s as %s %s",
5963  sqlserv.Data(), sqluser.Data(), sqlpass.Data());
5964  printf("%s\n", sql.Data());
5965  } else {
5966  TSQLResult *res = db->Query(sql);
5967 
5968  if (!res) {
5969  Error("GetPriority", "query into proofpriority failed");
5970  Printf("%s", sql.Data());
5971  } else {
5972  TSQLRow *row = res->Next(); // first row is header
5973  if (row) {
5974  priority = atoi(row->GetField(0));
5975  delete row;
5976  } else {
5977  Error("GetPriority", "first row is header is NULL");
5978  }
5979  }
5980  delete res;
5981  }
5982  delete db;
5983 
5984  return priority;
5985 }
5986 
5987 ////////////////////////////////////////////////////////////////////////////////
5988 /// Send an asychronous message to the master / client .
5989 /// Masters will forward up the message to the client.
5990 /// The client prints 'msg' of stderr and adds a '\n'/'\r' depending on
5991 /// 'lf' being kTRUE (default) or kFALSE.
5992 /// Returns the return value from TSocket::Send(TMessage &) .
5993 
5994 void TProofServ::SendAsynMessage(const char *msg, Bool_t lf)
5995 {
5996  static TMessage m(kPROOF_MESSAGE);
5997 
5998  // To leave a track in the output file ... if requested
5999  // (clients will be notified twice)
6000  PDB(kAsyn,1)
6001  Info("SendAsynMessage","%s", (msg ? msg : "(null)"));
6002 
6003  if (fSocket && msg) {
6004  m.Reset(kPROOF_MESSAGE);
6005  m << TString(msg) << lf;
6006  if (fSocket->Send(m) <= 0)
6007  Warning("SendAsynMessage", "could not send message '%s'", msg);
6008  }
6009 
6010  return;
6011 }
6012 
6013 ////////////////////////////////////////////////////////////////////////////////
6014 /// Reposition the read pointer in the log file to the very end.
6015 /// This allows to "hide" useful debug messages during normal operations
6016 /// while preserving the possibility to have them in case of problems.
6017 
6018 void TProofServ::FlushLogFile()
6019 {
6020  off_t lend = lseek(fileno(stdout), (off_t)0, SEEK_END);
6021  if (lend >= 0) lseek(fLogFileDes, lend, SEEK_SET);
6022 }
6023 
6024 ////////////////////////////////////////////////////////////////////////////////
6025 /// Truncate the log file to the 80% of the required max size if this
6026 /// is set.
6027 
6028 void TProofServ::TruncateLogFile()
6029 {
6030 #ifndef WIN32
6031  TString emsg;
6032  if (fLogFileMaxSize > 0 && fLogFileDes > 0) {
6033  fflush(stdout);
6034  struct stat st;
6035  if (fstat(fLogFileDes, &st) == 0) {
6036  if (st.st_size >= fLogFileMaxSize) {
6037  off_t truncsz = (off_t) (( fLogFileMaxSize * 80 ) / 100 );
6038  if (truncsz < 100) {
6039  emsg.Form("+++ WARNING +++: %s: requested truncate size too small"
6040  " (%lld,%lld) - ignore ", fPrefix.Data(), (Long64_t) truncsz, fLogFileMaxSize);
6041  SendAsynMessage(emsg.Data());
6042  return;
6043  }
6044  TSystem::ResetErrno();
6045  while (ftruncate(fileno(stdout), truncsz) != 0 &&
6046  (TSystem::GetErrno() == EINTR)) {
6047  TSystem::ResetErrno();
6048  }
6049  if (TSystem::GetErrno() > 0) {
6050  Error("TruncateLogFile", "truncating to %lld bytes; file size is %lld bytes (errno: %d)",
6051  (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
6052  emsg.Form("+++ WARNING +++: %s: problems truncating log file to %lld bytes; file size is %lld bytes"
6053  " (errno: %d)", fPrefix.Data(), (Long64_t)truncsz, (Long64_t)st.st_size, TSystem::GetErrno());
6054  SendAsynMessage(emsg.Data());
6055  } else {
6056  Info("TruncateLogFile", "file truncated to %lld bytes (80%% of %lld); file size was %lld bytes ",
6057  (Long64_t)truncsz, fLogFileMaxSize, (Long64_t)st.st_size);
6058  emsg.Form("+++ WARNING +++: %s: log file truncated to %lld bytes (80%% of %lld)",
6059  fPrefix.Data(), (Long64_t)truncsz, fLogFileMaxSize);
6060  SendAsynMessage(emsg.Data());
6061  }
6062  }
6063  } else {
6064  emsg.Form("+++ WARNING +++: %s: could not stat log file descriptor"
6065  " for truncation (errno: %d)", fPrefix.Data(), TSystem::GetErrno());
6066  SendAsynMessage(emsg.Data());
6067  }
6068  }
6069 #endif
6070 }
6071 
6072 ////////////////////////////////////////////////////////////////////////////////
6073 /// Exception handler: we do not try to recover here, just exit.
6074 
6075 void TProofServ::HandleException(Int_t sig)
6076 {
6077  Error("HandleException", "caugth exception triggered by signal '%d' %s %lld",
6078  sig, fgLastMsg.Data(), fgLastEntry);
6079  // Description
6080  TString emsg;
6081  emsg.Form("%s: caught exception triggered by signal '%d' %s %lld",
6082  GetOrdinal(), sig, fgLastMsg.Data(), fgLastEntry);
6083  // Try to warn the user
6084  SendAsynMessage(emsg.Data());
6085 
6086  gSystem->Exit(sig);
6087 }
6088 
6089 ////////////////////////////////////////////////////////////////////////////////
6090 /// Handle here requests about datasets.
6091 
6092 Int_t TProofServ::HandleDataSets(TMessage *mess, TString *slb)
6093 {
6094  if (gDebug > 0)
6095  Info("HandleDataSets", "enter");
6096 
6097  // We need a dataset manager
6098  if (!fDataSetManager) {
6099  Warning("HandleDataSets", "no data manager is available to fullfil the request");
6100  return -1;
6101  }
6102 
6103  // Used in most cases
6104  TString dsUser, dsGroup, dsName, dsTree, uri, opt;
6105  Int_t rc = 0;
6106 
6107  // Invalid characters in dataset URI
6108  TPMERegexp reInvalid("[^A-Za-z0-9._-]"); // from ParseUri
6109 
6110  // Message type
6111  Int_t type = 0;
6112  (*mess) >> type;
6113 
6114  switch (type) {
6115  case TProof::kCheckDataSetName:
6116  //
6117  // Check whether this dataset exist
6118  {
6119  (*mess) >> uri;
6120  if (slb) slb->Form("%d %s", type, uri.Data());
6121  if (fDataSetManager->ExistsDataSet(uri))
6122  // Dataset name does exist
6123  return -1;
6124  }
6125  break;
6126  case TProof::kRegisterDataSet:
6127  // list size must be above 0
6128  {
6129  if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
6130  (*mess) >> uri;
6131  (*mess) >> opt;
6132  if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
6133  // Extract the list
6134  TFileCollection *dataSet =
6135  dynamic_cast<TFileCollection*> ((mess->ReadObject(TFileCollection::Class())));
6136  if (!dataSet || dataSet->GetList()->GetSize() == 0) {
6137  Error("HandleDataSets", "can not save an empty list.");
6138  return -1;
6139  }
6140  // Register the dataset (quota checks are done inside here)
6141  rc = fDataSetManager->RegisterDataSet(uri, dataSet, opt);
6142  delete dataSet;
6143  return rc;
6144  } else {
6145  Info("HandleDataSets", "dataset registration not allowed");
6146  if (slb) slb->Form("%d notallowed", type);
6147  return -1;
6148  }
6149  }
6150  break;
6151 
6152  case TProof::kRequestStaging:
6153  {
6154  (*mess) >> uri; // TString
6155 
6156  if (!fDataSetStgRepo) {
6157  Error("HandleDataSets",
6158  "no dataset staging request repository available");
6159  return -1;
6160  }
6161 
6162  // Transform input URI in a valid dataset name
6163  TString validUri = uri;
6164  while (reInvalid.Substitute(validUri, "_")) {}
6165 
6166  // Check if dataset exists beforehand: if it does, staging has
6167  // already been requested
6168  if (fDataSetStgRepo->ExistsDataSet(validUri.Data())) {
6169  Warning("HandleDataSets", "staging of %s already requested",
6170  uri.Data());
6171  return -1;
6172  }
6173 
6174  // Try to get dataset from current manager
6175  TFileCollection *fc = fDataSetManager->GetDataSet(uri.Data());
6176  if (!fc || (fc->GetNFiles() == 0)) {
6177  Error("HandleDataSets", "empty dataset or no dataset returned");
6178  if (fc) delete fc;
6179  return -1;
6180  }
6181 
6182  // Reset all staged bits and remove unnecessary URLs (all but last)
6183  TIter it(fc->GetList());
6184  TFileInfo *fi;
6185  while ((fi = dynamic_cast<TFileInfo *>(it.Next()))) {
6186  fi->ResetBit(TFileInfo::kStaged);
6187  Int_t nToErase = fi->GetNUrls() - 1;
6188  for (Int_t i=0; i<nToErase; i++)
6189  fi->RemoveUrlAt(0);
6190  }
6191 
6192  fc->Update(); // absolutely necessary
6193 
6194  // Save request
6195  fDataSetStgRepo->ParseUri(validUri, &dsGroup, &dsUser, &dsName);
6196  if (fDataSetStgRepo->WriteDataSet(dsGroup, dsUser,
6197  dsName, fc) == 0) {
6198  // Error, can't save dataset
6199  Error("HandleDataSets",
6200  "can't register staging request for %s", uri.Data());
6201  delete fc;
6202  return -1;
6203  }
6204 
6205  Info("HandleDataSets",
6206  "Staging request registered for %s", uri.Data());
6207 
6208  delete fc;
6209  return 0; // success (-1 == failure)
6210  }
6211  break;
6212 
6213  case TProof::kStagingStatus:
6214  {
6215  if (!fDataSetStgRepo) {
6216  Error("HandleDataSets",
6217  "no dataset staging request repository available");
6218  return -1;
6219  }
6220 
6221  (*mess) >> uri; // TString
6222 
6223  // Transform URI in a valid dataset name
6224  while (reInvalid.Substitute(uri, "_")) {}
6225 
6226  // Get the list
6227  TFileCollection *fc = fDataSetStgRepo->GetDataSet(uri.Data());
6228  if (fc) {
6229  fSocket->SendObject(fc, kMESS_OK);
6230  delete fc;
6231  return 0;
6232  }
6233  else {
6234  // No such dataset: not an error, but don't send message
6235  Info("HandleDataSets", "no pending staging request for %s",
6236  uri.Data());
6237  return 0;
6238  }
6239  }
6240  break;
6241 
6242  case TProof::kCancelStaging:
6243  {
6244  if (!fDataSetStgRepo) {
6245  Error("HandleDataSets",
6246  "no dataset staging request repository available");
6247  return -1;
6248  }
6249 
6250  (*mess) >> uri;
6251 
6252  // Transform URI in a valid dataset name
6253  while (reInvalid.Substitute(uri, "_")) {}
6254 
6255  if (!fDataSetStgRepo->RemoveDataSet(uri.Data()))
6256  return -1; // failure
6257 
6258  return 0; // success
6259  }
6260  break;
6261 
6262  case TProof::kShowDataSets:
6263  {
6264  (*mess) >> uri >> opt;
6265  if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
6266  // Show content
6267  fDataSetManager->ShowDataSets(uri, opt);
6268  }
6269  break;
6270 
6271  case TProof::kGetDataSets:
6272  {
6273  (*mess) >> uri >> opt;
6274  if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
6275  // Get the datasets and fill a map
6276  UInt_t omsk = (UInt_t)TDataSetManager::kExport;
6277  Ssiz_t kLite = opt.Index(":lite:", 0, TString::kIgnoreCase);
6278  if (kLite != kNPOS) {
6279  omsk |= (UInt_t)TDataSetManager::kReadShort;
6280  opt.Remove(kLite, strlen(":lite:"));
6281  }
6282  TMap *returnMap = fDataSetManager->GetDataSets(uri, omsk);
6283  // If defines, option gives the name of a server for which to extract the information
6284  if (returnMap && !opt.IsNull()) {
6285  // The return map will be in the form </group/user/datasetname> --> <dataset>
6286  TMap *rmap = new TMap;
6287  TObject *k = 0;
6288  TFileCollection *fc = 0, *xfc = 0;
6289  TIter nxd(returnMap);
6290  while ((k = nxd()) && (fc = (TFileCollection *) returnMap->GetValue(k))) {
6291  // Get subset on specified server, if any
6292  if ((xfc = fc->GetFilesOnServer(opt.Data()))) {
6293  rmap->Add(new TObjString(k->GetName()), xfc);
6294  }
6295  }
6296  returnMap->DeleteAll();
6297  if (rmap->GetSize() > 0) {
6298  returnMap = rmap;
6299  } else {
6300  Info("HandleDataSets", "no dataset found on server '%s'", opt.Data());
6301  delete rmap;
6302  returnMap = 0;
6303  }
6304  }
6305  if (returnMap) {
6306  // Send them back
6307  fSocket->SendObject(returnMap, kMESS_OK);
6308  returnMap->DeleteAll();
6309  } else {
6310  // Failure
6311  return -1;
6312  }
6313  }
6314  break;
6315  case TProof::kGetDataSet:
6316  {
6317  (*mess) >> uri >> opt;
6318  if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
6319  // Get the list
6320  TFileCollection *fileList = fDataSetManager->GetDataSet(uri,opt);
6321  if (fileList) {
6322  fSocket->SendObject(fileList, kMESS_OK);
6323  delete fileList;
6324  } else {
6325  // Failure
6326  return -1;
6327  }
6328  }
6329  break;
6330  case TProof::kRemoveDataSet:
6331  {
6332  if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
6333  (*mess) >> uri;
6334  if (slb) slb->Form("%d %s", type, uri.Data());
6335  if (!fDataSetManager->RemoveDataSet(uri)) {
6336  // Failure
6337  return -1;
6338  }
6339  } else {
6340  Info("HandleDataSets", "dataset creation / removal not allowed");
6341  if (slb) slb->Form("%d notallowed", type);
6342  return -1;
6343  }
6344  }
6345  break;
6346  case TProof::kVerifyDataSet:
6347  {
6348  if (fDataSetManager->TestBit(TDataSetManager::kAllowVerify)) {
6349  (*mess) >> uri >> opt;
6350  if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
6351  TProofServLogHandlerGuard hg(fLogFile, fSocket);
6352  rc = fDataSetManager->ScanDataSet(uri, opt);
6353  // TODO: verify in parallel:
6354  // - dataset = GetDataSet(uri)
6355  // - TList flist; TDataSetManager::ScanDataSet(dataset, ..., &flist)
6356  // - fPlayer->Process( ... flist ...) // needs to be developed
6357  // - dataset->Integrate(flist) (perhaps automatic; flist object owned by dataset)
6358  // - RegisterDataSet(uri, dataset, "OT")
6359  } else {
6360  Info("HandleDataSets", "dataset verification not allowed");
6361  return -1;
6362  }
6363  }
6364  break;
6365  case TProof::kGetQuota:
6366  {
6367  if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
6368  if (slb) slb->Form("%d", type);
6369  TMap *groupQuotaMap = fDataSetManager->GetGroupQuotaMap();
6370  if (groupQuotaMap) {
6371  // Send result
6372  fSocket->SendObject(groupQuotaMap, kMESS_OK);
6373  } else {
6374  return -1;
6375  }
6376  } else {
6377  Info("HandleDataSets", "quota control disabled");
6378  if (slb) slb->Form("%d disabled", type);
6379  return -1;
6380  }
6381  }
6382  break;
6383  case TProof::kShowQuota:
6384  {
6385  if (fDataSetManager->TestBit(TDataSetManager::kCheckQuota)) {
6386  if (slb) slb->Form("%d", type);
6387  (*mess) >> opt;
6388  // Display quota information
6389  fDataSetManager->ShowQuota(opt);
6390  } else {
6391  Info("HandleDataSets", "quota control disabled");
6392  if (slb) slb->Form("%d disabled", type);
6393  }
6394  }
6395  break;
6396  case TProof::kSetDefaultTreeName:
6397  {
6398  if (fDataSetManager->TestBit(TDataSetManager::kAllowRegister)) {
6399  (*mess) >> uri;
6400  if (slb) slb->Form("%d %s", type, uri.Data());
6401  rc = fDataSetManager->ScanDataSet(uri, (UInt_t)TDataSetManager::kSetDefaultTree);
6402  } else {
6403  Info("HandleDataSets", "kSetDefaultTreeName: modification of dataset info not allowed");
6404  if (slb) slb->Form("%d notallowed", type);
6405  return -1;
6406  }
6407  }
6408  break;
6409  case TProof::kCache:
6410  {
6411  (*mess) >> uri >> opt;
6412  if (slb) slb->Form("%d %s %s", type, uri.Data(), opt.Data());
6413  if (opt == "show") {
6414  // Show cache content
6415  fDataSetManager->ShowCache(uri);
6416  } else if (opt == "clear") {
6417  // Clear cache content
6418  fDataSetManager->ClearCache(uri);
6419  } else {
6420  Error("HandleDataSets", "kCache: unknown action: %s", opt.Data());
6421  }
6422  }
6423  break;
6424  default:
6425  rc = -1;
6426  Error("HandleDataSets", "unknown type %d", type);
6427  break;
6428  }
6429 
6430  // We are done
6431  return rc;
6432 }
6433 
6434 ////////////////////////////////////////////////////////////////////////////////
6435 /// Handle a message of type kPROOF_SUBMERGER
6436 
6437 void TProofServ::HandleSubmerger(TMessage *mess)
6438 {
6439  // Message type
6440  Int_t type = 0;
6441  (*mess) >> type;
6442 
6443  TString msg;
6444  switch (type) {
6445  case TProof::kOutputSize:
6446  break;
6447 
6448  case TProof::kSendOutput:
6449  {
6450  Bool_t deleteplayer = kTRUE;
6451  if (!IsMaster()) {
6452  if (fMergingMonitor) {
6453  Info("HandleSubmerger", "kSendOutput: interrupting ...");
6454  fMergingMonitor->Interrupt();
6455  }
6456  if (fMergingSocket) {
6457  if (fMergingMonitor) fMergingMonitor->Remove(fMergingSocket);
6458  fMergingSocket->Close();
6459  SafeDelete(fMergingSocket);
6460  }
6461 
6462  TString name;
6463  Int_t port = 0;
6464  Int_t merger_id = -1;
6465  (*mess) >> merger_id >> name >> port;
6466  PDB(kSubmerger, 1)
6467  Info("HandleSubmerger","worker %s redirected to merger #%d %s:%d", fOrdinal.Data(), merger_id, name.Data(), port);
6468 
6469  TSocket *t = 0;
6470  if (name.Length() > 0 && port > 0 && (t = new TSocket(name, port)) && t->IsValid()) {
6471 
6472  PDB(kSubmerger, 2) Info("HandleSubmerger",
6473  "kSendOutput: worker asked for sending output to merger #%d %s:%d",
6474  merger_id, name.Data(), port);
6475 
6476  if (SendResults(t, fPlayer->GetOutputList()) != 0) {
6477  msg.Form("worker %s cannot send results to merger #%d at %s:%d", GetPrefix(), merger_id, name.Data(), port);
6478  PDB(kSubmerger, 2) Info("HandleSubmerger",
6479  "kSendOutput: %s - inform the master", msg.Data());
6480  SendAsynMessage(msg);
6481  // Results not send
6482  TMessage answ(kPROOF_SUBMERGER);
6483  answ << Int_t(TProof::kMergerDown);
6484  answ << merger_id;
6485  fSocket->Send(answ);
6486  } else {
6487  // Worker informs master that it had sent its output to the merger
6488  TMessage answ(kPROOF_SUBMERGER);
6489  answ << Int_t(TProof::kOutputSent);
6490  answ << merger_id;
6491  fSocket->Send(answ);
6492 
6493  PDB(kSubmerger, 2) Info("HandleSubmerger", "kSendOutput: worker sent its output");
6494  fSocket->Send(kPROOF_SETIDLE);
6495  SetIdle(kTRUE);
6496  SendLogFile();
6497  }
6498  } else {
6499 
6500  if (name == "master") {
6501  PDB(kSubmerger, 2) Info("HandleSubmerger",
6502  "kSendOutput: worker was asked for sending output to master");
6503  if (SendResults(fSocket, fPlayer->GetOutputList()) != 0)
6504  Warning("HandleSubmerger", "problems sending output list");
6505  // Signal the master that we are idle
6506  fSocket->Send(kPROOF_SETIDLE);
6507  SetIdle(kTRUE);
6508  SendLogFile();
6509 
6510  } else if (!t || !(t->IsValid())) {
6511  msg.Form("worker %s could not open a valid socket to merger #%d at %s:%d",
6512  GetPrefix(), merger_id, name.Data(), port);
6513  PDB(kSubmerger, 2) Info("HandleSubmerger",
6514  "kSendOutput: %s - inform the master", msg.Data());
6515  SendAsynMessage(msg);
6516  // Results not send
6517  TMessage answ(kPROOF_SUBMERGER);
6518  answ << Int_t(TProof::kMergerDown);
6519  answ << merger_id;
6520  fSocket->Send(answ);
6521  deleteplayer = kFALSE;
6522  }
6523 
6524  SafeDelete(t);
6525 
6526  }
6527 
6528  } else {
6529  Error("HandleSubmerger", "kSendOutput: received not on worker");
6530  }
6531 
6532  // Cleanup
6533  if (deleteplayer) DeletePlayer();
6534  }
6535  break;
6536  case TProof::kBeMerger:
6537  {
6538  Bool_t deleteplayer = kTRUE;
6539  if (!IsMaster()) {
6540  Int_t merger_id = -1;
6541  //Int_t merger_port = 0;
6542  Int_t connections = 0;
6543  (*mess) >> merger_id >> connections;
6544  PDB(kSubmerger, 2)
6545  Info("HandleSubmerger", "worker %s established as merger", fOrdinal.Data());
6546 
6547  PDB(kSubmerger, 2)
6548  Info("HandleSubmerger",
6549  "kBeMerger: worker asked for being merger #%d for %d connections",
6550  merger_id, connections);
6551 
6552  TVirtualProofPlayer *mergerPlayer = TVirtualProofPlayer::Create("remote",fProof,0);
6553 
6554  if (mergerPlayer) {
6555  PDB(kSubmerger, 2) Info("HandleSubmerger",
6556  "kBeMerger: mergerPlayer created (%p) ", mergerPlayer);
6557 
6558  // This may be used internally
6559  mergerPlayer->SetBit(TVirtualProofPlayer::kIsSubmerger);
6560 
6561  // Accept results from assigned workers
6562  if (AcceptResults(connections, mergerPlayer)) {
6563  PDB(kSubmerger, 2)
6564  Info("HandleSubmerger", "kBeMerger: all outputs from workers accepted");
6565 
6566  PDB(kSubmerger, 2)
6567  Info("","adding own output to the list on %s", fOrdinal.Data());
6568 
6569  // Add own results to the output list.
6570  // On workers the player does not own the output list, which is owned
6571  // by the selector and deleted in there
6572  // On workers the player does not own the output list, which is owned
6573  // by the selector and deleted in there
6574  TIter nxo(fPlayer->GetOutputList());
6575  TObject * o = 0;
6576  while ((o = nxo())) {
6577  if ((mergerPlayer->AddOutputObject(o) != 1)) {
6578  // Remove the object if it has not been merged: it is owned
6579  // now by the merger player (in its output list)
6580  if (fPlayer->GetOutputList()) {
6581  PDB(kSubmerger, 2)
6582  Info("HandleSocketInput", "removing merged object (%p)", o);
6583  fPlayer->GetOutputList()->Remove(o);
6584  }
6585  }
6586  }
6587  PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: own outputs added");
6588  PDB(kSubmerger, 2) Info("HandleSubmerger","starting delayed merging on %s", fOrdinal.Data());
6589 
6590  // Delayed merging if neccessary
6591  mergerPlayer->MergeOutput(kTRUE);
6592 
6593  PDB(kSubmerger, 2) mergerPlayer->GetOutputList()->Print("all");
6594 
6595  PDB(kSubmerger, 2) Info("HandleSubmerger", "delayed merging on %s finished ", fOrdinal.Data());
6596  PDB(kSubmerger, 2) Info("HandleSubmerger", "%s sending results to master ", fOrdinal.Data());
6597  // Send merged results to master
6598  if (SendResults(fSocket, mergerPlayer->GetOutputList()) != 0)
6599  Warning("HandleSubmerger","kBeMerger: problems sending output list");
6600  if (mergerPlayer->GetOutputList())
6601  mergerPlayer->GetOutputList()->SetOwner(kTRUE);
6602 
6603  PDB(kSubmerger, 2) Info("HandleSubmerger","kBeMerger: results sent to master");
6604  // Signal the master that we are idle
6605  fSocket->Send(kPROOF_SETIDLE);
6606  SetIdle(kTRUE);
6607  SendLogFile();
6608  } else {
6609  // Results from all assigned workers not accepted
6610  TMessage answ(kPROOF_SUBMERGER);
6611  answ << Int_t(TProof::kMergerDown);
6612  answ << merger_id;
6613  fSocket->Send(answ);
6614  deleteplayer = kFALSE;
6615  }
6616  // Reset
6617  SafeDelete(mergerPlayer);
6618 
6619  } else {
6620  Warning("HandleSubmerger","kBeMerger: problems craeting the merger player!");
6621  // Results from all assigned workers not accepted
6622  TMessage answ(kPROOF_SUBMERGER);
6623  answ << Int_t(TProof::kMergerDown);
6624  answ << merger_id;
6625  fSocket->Send(answ);
6626  deleteplayer = kFALSE;
6627  }
6628  } else {
6629  Error("HandleSubmerger","kSendOutput: received not on worker");
6630  }
6631 
6632  // Cleanup
6633  if (deleteplayer) DeletePlayer();
6634  }
6635  break;
6636 
6637  case TProof::kMergerDown:
6638  break;
6639 
6640  case TProof::kStopMerging:
6641  {
6642  // Received only in case of forced termination of merger by master
6643  PDB(kSubmerger, 2) Info("HandleSubmerger", "kStopMerging");
6644  if (fMergingMonitor) {
6645  Info("HandleSubmerger", "kStopMerging: interrupting ...");
6646  fMergingMonitor->Interrupt();
6647  }
6648  }
6649  break;
6650 
6651  case TProof::kOutputSent:
6652  break;
6653  }
6654 }
6655 
6656 ////////////////////////////////////////////////////////////////////////////////
6657 /// Cloning itself via fork. Not implemented
6658 
6659 void TProofServ::HandleFork(TMessage *)
6660 {
6661  Info("HandleFork", "fork cloning not implemented");
6662 }
6663 
6664 ////////////////////////////////////////////////////////////////////////////////
6665 /// Fork a child.
6666 /// If successful, return 0 in the child process and the child pid in the parent
6667 /// process. The child pid is registered for reaping.
6668 /// Return <0 in the parent process in case of failure.
6669 
6670 Int_t TProofServ::Fork()
6671 {
6672 #ifndef WIN32
6673  // Fork
6674  pid_t pid;
6675  if ((pid = fork()) < 0) {
6676  Error("Fork", "failed to fork");
6677  return pid;
6678  }
6679 
6680  // Nothing else to do in the child
6681  if (!pid) return pid;
6682 
6683  // Make sure that the reaper timer is started
6684  if (!fReaperTimer) {
6685  fReaperTimer = new TReaperTimer(1000);
6686  fReaperTimer->Start(-1);
6687  }
6688 
6689  // Register the new child
6690  fReaperTimer->AddPid(pid);
6691 
6692  // Done
6693  return pid;
6694 #else
6695  Warning("Fork", "Functionality not provided under windows");
6696  return -1;
6697 #endif
6698 }
6699 
6700 ////////////////////////////////////////////////////////////////////////////////
6701 /// Replace <ord>, <user>, <u>, <group>, <stag>, <qnum>, <file>, <rver> and
6702 /// <build> placeholders in fname.
6703 /// Here, <rver> is the root version in integer form, e.g. 53403, and <build> a
6704 /// string includign version, architecture and compiler version, e.g.
6705 /// '53403_linuxx8664gcc_gcc46' .
6706 
6707 void TProofServ::ResolveKeywords(TString &fname, const char *path)
6708 {
6709  // Replace <user>, if any
6710  if (fname.Contains("<user>")) {
6711  if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
6712  fname.ReplaceAll("<user>", gProofServ->GetUser());
6713  } else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
6714  fname.ReplaceAll("<user>", gProof->GetUser());
6715  } else {
6716  fname.ReplaceAll("<user>", "nouser");
6717  }
6718  }
6719  // Replace <us>, if any
6720  if (fname.Contains("<u>")) {
6721  if (gProofServ && gProofServ->GetUser() && strlen(gProofServ->GetUser())) {
6722  TString u(gProofServ->GetUser()[0]);
6723  fname.ReplaceAll("<u>", u);
6724  } else if (gProof && gProof->GetUser() && strlen(gProof->GetUser())) {
6725  TString u(gProof->GetUser()[0]);
6726  fname.ReplaceAll("<u>", u);
6727  } else {
6728  fname.ReplaceAll("<u>", "n");
6729  }
6730  }
6731  // Replace <group>, if any
6732  if (fname.Contains("<group>")) {
6733  if (gProofServ && gProofServ->GetGroup() && strlen(gProofServ->GetGroup())) {
6734  fname.ReplaceAll("<group>", gProofServ->GetGroup());
6735  } else if (gProof && gProof->GetGroup() && strlen(gProof->GetGroup())) {
6736  fname.ReplaceAll("<group>", gProof->GetGroup());
6737  } else {
6738  fname.ReplaceAll("<group>", "default");
6739  }
6740  }
6741  // Replace <stag>, if any
6742  if (fname.Contains("<stag>")) {
6743  if (gProofServ && gProofServ->GetSessionTag() && strlen(gProofServ->GetSessionTag())) {
6744  fname.ReplaceAll("<stag>", gProofServ->GetSessionTag());
6745  } else if (gProof && gProof->GetSessionTag() && strlen(gProof->GetSessionTag())) {
6746  fname.ReplaceAll("<stag>", gProof->GetSessionTag());
6747  } else {
6748  ::Warning("TProofServ::ResolveKeywords", "session tag undefined: ignoring");
6749  }
6750  }
6751  // Replace <ord>, if any
6752  if (fname.Contains("<ord>")) {
6753  if (gProofServ && gProofServ->GetOrdinal() && strlen(gProofServ->GetOrdinal()))
6754  fname.ReplaceAll("<ord>", gProofServ->GetOrdinal());
6755  else
6756  ::Warning("TProofServ::ResolveKeywords", "ordinal number undefined: ignoring");
6757  }
6758  // Replace <qnum>, if any
6759  if (fname.Contains("<qnum>")) {
6760  if (gProofServ && gProofServ->GetQuerySeqNum() && gProofServ->GetQuerySeqNum() > 0)
6761  fname.ReplaceAll("<qnum>", TString::Format("%d", gProofServ->GetQuerySeqNum()).Data());
6762  else
6763  ::Warning("TProofServ::ResolveKeywords", "query seqeuntial number undefined: ignoring");
6764  }
6765  // Replace <file>, if any
6766  if (fname.Contains("<file>") && path && strlen(path) > 0) {
6767  fname.ReplaceAll("<file>", path);
6768  }
6769  // Replace <rver>, if any
6770  if (fname.Contains("<rver>")) {
6771  TString v = TString::Format("%d", gROOT->GetVersionInt());
6772  fname.ReplaceAll("<rver>", v);
6773  }
6774  // Replace <build>, if any
6775  if (fname.Contains("<build>")) {
6776  TString b = TString::Format("%d_%s_%s", gROOT->GetVersionInt(), gSystem->GetBuildArch(),
6777  gSystem->GetBuildCompilerVersion());
6778  fname.ReplaceAll("<build>", b);
6779  }
6780 }
6781 
6782 ////////////////////////////////////////////////////////////////////////////////
6783 /// Return the status of this session:
6784 /// 0 idle
6785 /// 1 running
6786 /// 2 being terminated (currently unused)
6787 /// 3 queued
6788 /// 4 idle timed-out (not set in here but in TIdleTOTimer::Notify)
6789 /// This is typically run in the reader thread, so access needs to be protected
6790 
6791 Int_t TProofServ::GetSessionStatus()
6792 {
6793  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6794  Int_t st = (fIdle) ? 0 : 1;
6795  if (fIdle && fWaitingQueries->GetSize() > 0) st = 3;
6796  return st;
6797 }
6798 
6799 ////////////////////////////////////////////////////////////////////////////////
6800 /// Update the session status in the relevant file. The status is taken from
6801 /// GetSessionStatus() unless xst >= 0, in which case xst is used.
6802 /// Return 0 on success, -errno if the file could not be opened.
6803 
6804 Int_t TProofServ::UpdateSessionStatus(Int_t xst)
6805 {
6806  FILE *fs = fopen(fAdminPath.Data(), "w");
6807  if (fs) {
6808  Int_t st = (xst < 0) ? GetSessionStatus() : xst;
6809  fprintf(fs, "%d", st);
6810  fclose(fs);
6811  PDB(kGlobal, 2)
6812  Info("UpdateSessionStatus", "status (=%d) update in path: %s", st, fAdminPath.Data());
6813  } else {
6814  return -errno;
6815  }
6816  // Done
6817  return 0;
6818 }
6819 
6820 ////////////////////////////////////////////////////////////////////////////////
6821 /// Return the idle status
6822 
6823 Bool_t TProofServ::IsIdle()
6824 {
6825  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6826  return fIdle;
6827 }
6828 
6829 ////////////////////////////////////////////////////////////////////////////////
6830 /// Change the idle status
6831 
6832 void TProofServ::SetIdle(Bool_t st)
6833 {
6834  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6835  fIdle = st;
6836 }
6837 
6838 ////////////////////////////////////////////////////////////////////////////////
6839 /// Return kTRUE if the session is waiting for the OK to start processing
6840 
6841 Bool_t TProofServ::IsWaiting()
6842 {
6843  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6844  if (fIdle && fWaitingQueries->GetSize() > 0) return kTRUE;
6845  return kFALSE;
6846 }
6847 
6848 ////////////////////////////////////////////////////////////////////////////////
6849 /// Return the number of waiting queries
6850 
6851 Int_t TProofServ::WaitingQueries()
6852 {
6853  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6854  return fWaitingQueries->GetSize();
6855 }
6856 
6857 ////////////////////////////////////////////////////////////////////////////////
6858 /// Add a query to the waiting list
6859 /// Returns the number of queries in the list
6860 
6861 Int_t TProofServ::QueueQuery(TProofQueryResult *pq)
6862 {
6863  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6864  fWaitingQueries->Add(pq);
6865  return fWaitingQueries->GetSize();
6866 }
6867 
6868 ////////////////////////////////////////////////////////////////////////////////
6869 /// Get the next query from the waiting list.
6870 /// The query is removed from the list.
6871 
6872 TProofQueryResult *TProofServ::NextQuery()
6873 {
6874  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6875  TProofQueryResult *pq = (TProofQueryResult *) fWaitingQueries->First();
6876  fWaitingQueries->Remove(pq);
6877  return pq;
6878 }
6879 
6880 ////////////////////////////////////////////////////////////////////////////////
6881 /// Cleanup the waiting queries list. The objects are deleted if 'del' is true.
6882 /// If 'qls' is non null, only objects in 'qls' are removed.
6883 /// Returns the number of cleanup queries
6884 
6885 Int_t TProofServ::CleanupWaitingQueries(Bool_t del, TList *qls)
6886 {
6887  std::lock_guard<std::recursive_mutex> lock(fQMtx);
6888  Int_t ncq = 0;
6889  if (qls) {
6890  TIter nxq(qls);
6891  TObject *o = 0;
6892  while ((o = nxq())) {
6893  if (fWaitingQueries->FindObject(o)) ncq++;
6894  fWaitingQueries->Remove(o);
6895  if (del) delete o;
6896  }
6897  } else {
6898  ncq = fWaitingQueries->GetSize();
6899  fWaitingQueries->SetOwner(del);
6900  fWaitingQueries->Delete();
6901  }
6902  // Done
6903  return ncq;
6904 }
6905 
6906 ////////////////////////////////////////////////////////////////////////////////
6907 /// Set the message to be sent back in case of exceptions
6908 
6909 void TProofServ::SetLastMsg(const char *lastmsg)
6910 {
6911  fgLastMsg = lastmsg;
6912 }
6913 
6914 ////////////////////////////////////////////////////////////////////////////////
6915 /// Set the last entry before exception
6916 
6917 void TProofServ::SetLastEntry(Long64_t entry)
6918 {
6919  fgLastEntry = entry;
6920 }
6921 
6922 ////////////////////////////////////////////////////////////////////////////////
6923 /// VirtMemMax getter
6924 
6925 Long_t TProofServ::GetVirtMemMax()
6926 {
6927  return fgVirtMemMax;
6928 }
6929 ////////////////////////////////////////////////////////////////////////////////
6930 /// ResMemMax getter
6931 
6932 Long_t TProofServ::GetResMemMax()
6933 {
6934  return fgResMemMax;
6935 }
6936 ////////////////////////////////////////////////////////////////////////////////
6937 /// MemHWM getter
6938 
6939 Float_t TProofServ::GetMemHWM()
6940 {
6941  return fgMemHWM;
6942 }
6943 ////////////////////////////////////////////////////////////////////////////////
6944 /// MemStop getter
6945 
6946 Float_t TProofServ::GetMemStop()
6947 {
6948  return fgMemStop;
6949 }
6950 
6951 ////////////////////////////////////////////////////////////////////////////////
6952 /// Extract LOCALDATASERVER info in 'dsrv'
6953 
6954 void TProofServ::GetLocalServer(TString &dsrv)
6955 {
6956  // Check if a local data server has been specified
6957  if (gSystem->Getenv("LOCALDATASERVER")) {
6958  dsrv = gSystem->Getenv("LOCALDATASERVER");
6959  if (!dsrv.EndsWith("/")) dsrv += "/";
6960  }
6961 
6962  // Done
6963  return;
6964 }
6965 
6966 ////////////////////////////////////////////////////////////////////////////////
6967 /// If 'path' is local and 'dsrv' is Xrootd, apply 'path.Localroot' settings,
6968 /// if any.
6969 /// The final path via the server is dsrv+path.
6970 
6971 void TProofServ::FilterLocalroot(TString &path, const char *dsrv)
6972 {
6973  TUrl u(path, kTRUE);
6974  if (!strcmp(u.GetProtocol(), "file")) {
6975  // Remove prefix, if any, if included and if Xrootd
6976  TString pfx = gEnv->GetValue("Path.Localroot","");
6977  if (!pfx.IsNull() && !strncmp(u.GetFile(), pfx.Data(), pfx.Length())) {
6978  TString srvp = TUrl(dsrv).GetProtocol();
6979  if (srvp == "root" || srvp == "xrd") path.Remove(0, pfx.Length());
6980  }
6981  }
6982 
6983  // Done
6984  return;
6985 }
6986 
6987 ////////////////////////////////////////////////////////////////////////////////
6988 /// Locks the directory. Waits if lock is hold by an other process.
6989 /// Returns 0 on success, -1 in case of error.
6990 
6991 Int_t TProofLockPath::Lock()
6992 {
6993  const char *pname = GetName();
6994 
6995  if (gSystem->AccessPathName(pname))
6996  fLockId = open(pname, O_CREAT|O_RDWR, 0644);
6997  else
6998  fLockId = open(pname, O_RDWR);
6999 
7000  if (fLockId == -1) {
7001  SysError("Lock", "cannot open lock file %s", pname);
7002  return -1;
7003  }
7004 
7005  PDB(kPackage, 2)
7006  Info("Lock", "%d: locking file %s ...", gSystem->GetPid(), pname);
7007  // lock the file
7008 #if !defined(R__WIN32) && !defined(R__WINGCC)
7009  if (lockf(fLockId, F_LOCK, (off_t) 1) == -1) {
7010  SysError("Lock", "error locking %s", pname);
7011  close(fLockId);
7012  fLockId = -1;
7013  return -1;
7014  }
7015 #endif
7016 
7017  PDB(kPackage, 2)
7018  Info("Lock", "%d: file %s locked", gSystem->GetPid(), pname);
7019 
7020  return 0;
7021 }
7022 
7023 ////////////////////////////////////////////////////////////////////////////////
7024 /// Unlock the directory. Returns 0 in case of success,
7025 /// -1 in case of error.
7026 
7027 Int_t TProofLockPath::Unlock()
7028 {
7029  if (!IsLocked())
7030  return 0;
7031 
7032  PDB(kPackage, 2)
7033  Info("Lock", "%d: unlocking file %s ...", gSystem->GetPid(), GetName());
7034  // unlock the file
7035  lseek(fLockId, 0, SEEK_SET);
7036 #if !defined(R__WIN32) && !defined(R__WINGCC)
7037  if (lockf(fLockId, F_ULOCK, (off_t)1) == -1) {
7038  SysError("Unlock", "error unlocking %s", GetName());
7039  close(fLockId);
7040  fLockId = -1;
7041  return -1;
7042  }
7043 #endif
7044 
7045  PDB(kPackage, 2)
7046  Info("Unlock", "%d: file %s unlocked", gSystem->GetPid(), GetName());
7047 
7048  close(fLockId);
7049  fLockId = -1;
7050 
7051  return 0;
7052 }