Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProofServ.h
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Fons Rademakers 16/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 
13 #ifndef ROOT_TProofServ
14 #define ROOT_TProofServ
15 
16 //////////////////////////////////////////////////////////////////////////
17 // //
18 // TProofServ //
19 // //
20 // TProofServ is the PROOF server. It can act either as the master //
21 // server or as a slave server, depending on its startup arguments. It //
22 // receives and handles message coming from the client or from the //
23 // master server. //
24 // //
25 //////////////////////////////////////////////////////////////////////////
26 
27 #include "TApplication.h"
28 #include "TString.h"
29 #include "TSysEvtHandler.h"
30 #include "TStopwatch.h"
31 #include "TTimer.h"
32 #include "TPackMgr.h"
33 #include "TProofQueryResult.h"
34 
35 #include <mutex>
36 
37 class TDataSetManager;
38 class TDataSetManagerFile;
39 class TDSet;
40 class TDSetElement;
41 class TFileCollection;
42 class TFileHandler;
43 class THashList;
44 class TIdleTOTimer;
45 class TList;
46 class TMap;
47 class TMessage;
48 class TMonitor;
49 class TPackMgr;
50 class TProof;
51 class TProofLockPath;
52 class TQueryResultManager;
53 class TReaperTimer;
54 class TServerSocket;
55 class TShutdownTimer;
56 class TSocket;
57 class TVirtualProofPlayer;
58 
59 // Hook to external function setting up authentication related stuff
60 // for old versions.
61 // For backward compatibility
62 typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
63  TString &, TString &, TString &);
64 
65 
66 class TProofServ : public TApplication {
67 
68 friend class TProofServLite;
69 friend class TXProofServ;
70 
71 public:
72  enum EStatusBits { kHighMemory = BIT(17) };
73  enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
74 
75 private:
76  TString fService; //service we are running, either "proofserv" or "proofslave"
77  TString fUser; //user as which we run
78  TString fGroup; //group the user belongs to
79  TString fConfDir; //directory containing cluster config information
80  TString fConfFile; //file containing config information
81  TString fWorkDir; //directory containing all proof related info
82  TString fImage; //image name of the session
83  TString fSessionTag; //tag for the server session
84  TString fTopSessionTag; //tag for the global session
85  TString fSessionDir; //directory containing session dependent files
86  TPackMgr *fPackMgr; // Default package manager
87  TString fCacheDir; //directory containing cache of user files
88  TString fQueryDir; //directory containing query results and status
89  TString fDataSetDir; //directory containing info about known data sets
90  TString fDataDir; //directory containing data files produced during queries
91  TString fDataDirOpts; //Url type options for fDataDir
92  TString fAdminPath; //admin path for this session
93  TString fOutputFile; //path with the temporary results of the current or last query
94  TProofLockPath *fCacheLock; //cache dir locker
95  TProofLockPath *fQueryLock; //query dir locker
96  TString fArchivePath; //default archive path
97  TSocket *fSocket; //socket connection to client
98  TProof *fProof; //PROOF talking to slave servers
99  TVirtualProofPlayer *fPlayer; //actual player
100  FILE *fLogFile; //log file
101  Int_t fLogFileDes; //log file descriptor
102  Long64_t fLogFileMaxSize; //max size for log files (enabled if > 0)
103  Int_t fProtocol; //protocol version number
104  TString fOrdinal; //slave ordinal number
105  Int_t fGroupId; //slave unique id in the active slave group
106  Int_t fGroupSize; //size of the active slave group
107  Int_t fLogLevel; //debug logging level
108  Int_t fNcmd; //command history number
109  Int_t fGroupPriority; //priority of group the user belongs to (0 - 100)
110  Bool_t fEndMaster; //true for a master in direct contact only with workers
111  Bool_t fMasterServ; //true if we are a master server
112  Bool_t fInterrupt; //if true macro execution will be stopped
113  Float_t fRealTime; //real time spent executing commands
114  Float_t fCpuTime; //CPU time spent executing commands
115  TStopwatch fLatency; //measures latency of packet requests
116  TStopwatch fCompute; //measures time spent processing a packet
117  TStopwatch fSaveOutput; //measures time spent saving the partial result
118  Int_t fQuerySeqNum; //sequential number of the current or last query
119 
120  Int_t fTotSessions; //Total number of PROOF sessions on the cluster
121  Int_t fActSessions; //Total number of active PROOF sessions on the cluster
122  Float_t fEffSessions; //Effective Number of PROOF sessions on the assigned machines
123 
124  TFileHandler *fInputHandler; //Input socket handler
125 
126  TQueryResultManager *fQMgr; //Query-result manager
127 
128  TList *fWaitingQueries; //list of TProofQueryResult waiting to be processed
129  Bool_t fIdle; //TRUE if idle
130  std::recursive_mutex fQMtx; // To protect async msg queue
131 
132  TList *fQueuedMsg; //list of messages waiting to be processed
133 
134  TString fPrefix; //Prefix identifying the node
135 
136  Bool_t fRealTimeLog; //TRUE if log messages should be send back in real-time
137 
138  TShutdownTimer *fShutdownTimer; // Timer used to shutdown out-of-control sessions
139  TReaperTimer *fReaperTimer; // Timer used to control children state
140  TIdleTOTimer *fIdleTOTimer; // Timer used to control children state
141 
142  Int_t fCompressMsg; // Compression level for messages
143 
144  TDataSetManager* fDataSetManager; // dataset manager
145  TDataSetManagerFile *fDataSetStgRepo; // repository for staging requests
146 
147  Bool_t fSendLogToMaster; // On workers, controls logs sending to master
148 
149  TServerSocket *fMergingSocket; // Socket used for merging outputs if submerger
150  TMonitor *fMergingMonitor; // Monitor for merging sockets
151  Int_t fMergedWorkers; // Number of workers merged
152 
153  // Quotas (-1 to disable)
154  Int_t fMaxQueries; //Max number of queries fully kept
155  Long64_t fMaxBoxSize; //Max size of the sandbox
156  Long64_t fHWMBoxSize; //High-Water-Mark on the sandbox size
157 
158  // Memory limits (-1 to disable) set by envs ROOTPROFOASHARD, PROOF_VIRTMEMMAX, PROOF_RESMEMMAX
159  static Long_t fgVirtMemMax; //Hard limit enforced by the system (in kB)
160  static Long_t fgResMemMax; //Hard limit on the resident memory checked
161  //in TProofPlayer::Process (in kB)
162  static Float_t fgMemHWM; // Threshold fraction of max for warning and finer monitoring
163  static Float_t fgMemStop; // Fraction of max for stop processing
164 
165  // In bytes; default is 1MB
166  Long64_t fMsgSizeHWM; //High-Water-Mark on the size of messages with results
167 
168  static FILE *fgErrorHandlerFile; // File where to log
169  static Int_t fgRecursive; // Keep track of recursive inputs during processing
170 
171  // Control sending information to syslog
172  static Int_t fgLogToSysLog; // >0 sent to syslog too
173  static TString fgSysLogService; // name of the syslog service (eg: proofm-0, proofw-0.67)
174  static TString fgSysLogEntity; // logging entity (<user>:<group>)
175 
176  Int_t GetCompressionLevel() const;
177 
178  void RedirectOutput(const char *dir = 0, const char *mode = "w");
179  Int_t CatMotd();
180  Int_t UnloadPackage(const char *package);
181  Int_t UnloadPackages();
182  Int_t OldAuthSetup(TString &wconf);
183  Int_t GetPriority();
184 
185  // Query handlers
186  TProofQueryResult *MakeQueryResult(Long64_t nentries, const char *opt,
187  TList *inl, Long64_t first, TDSet *dset,
188  const char *selec, TObject *elist);
189  void SetQueryRunning(TProofQueryResult *pq);
190 
191  // Results handling
192  Int_t SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
193  Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
194 
195  // Waiting queries handlers
196  void SetIdle(Bool_t st = kTRUE);
197  Bool_t IsWaiting();
198  Int_t WaitingQueries();
199  Int_t QueueQuery(TProofQueryResult *pq);
200  TProofQueryResult *NextQuery();
201  Int_t CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
202 
203 protected:
204  virtual void HandleArchive(TMessage *mess, TString *slb = 0);
205  virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
206  virtual void HandleCheckFile(TMessage *mess, TString *slb = 0);
207  virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
208  virtual void HandleSubmerger(TMessage *mess);
209  virtual void HandleFork(TMessage *mess);
210  virtual Int_t HandleLibIncPath(TMessage *mess);
211  virtual void HandleProcess(TMessage *mess, TString *slb = 0);
212  virtual void HandleQueryList(TMessage *mess);
213  virtual void HandleRemove(TMessage *mess, TString *slb = 0);
214  virtual void HandleRetrieve(TMessage *mess, TString *slb = 0);
215  virtual Int_t HandleWorkerLists(TMessage *mess);
216 
217  virtual void ProcessNext(TString *slb = 0);
218  virtual Int_t Setup();
219  Int_t SetupCommon();
220  virtual void MakePlayer();
221  virtual void DeletePlayer();
222 
223  virtual Int_t Fork();
224  Int_t GetSessionStatus();
225  Bool_t IsIdle();
226  Bool_t UnlinkDataDir(const char *path);
227 
228  static TString fgLastMsg; // Message about status before exception
229  static Long64_t fgLastEntry; // Last entry before exception
230 
231 public:
232  TProofServ(Int_t *argc, char **argv, FILE *flog = 0);
233  virtual ~TProofServ();
234 
235  virtual Int_t CreateServer();
236 
237  TProof *GetProof() const { return fProof; }
238  const char *GetService() const { return fService; }
239  const char *GetConfDir() const { return fConfDir; }
240  const char *GetConfFile() const { return fConfFile; }
241  const char *GetUser() const { return fUser; }
242  const char *GetGroup() const { return fGroup; }
243  const char *GetWorkDir() const { return fWorkDir; }
244  const char *GetImage() const { return fImage; }
245  const char *GetSessionTag() const { return fSessionTag; }
246  const char *GetTopSessionTag() const { return fTopSessionTag; }
247  const char *GetSessionDir() const { return fSessionDir; }
248  const char *GetCacheDir() const { return fCacheDir; }
249  TPackMgr *GetPackMgr() const { return fPackMgr; }
250  const char *GetDataDir() const { return fDataDir; }
251  const char *GetDataDirOpts() const { return fDataDirOpts; }
252  Int_t GetProtocol() const { return fProtocol; }
253  const char *GetOrdinal() const { return fOrdinal; }
254  Int_t GetGroupId() const { return fGroupId; }
255  Int_t GetGroupSize() const { return fGroupSize; }
256  Int_t GetLogLevel() const { return fLogLevel; }
257  TSocket *GetSocket() const { return fSocket; }
258  Float_t GetRealTime() const { return fRealTime; }
259  Float_t GetCpuTime() const { return fCpuTime; }
260  Int_t GetQuerySeqNum() const { return fQuerySeqNum; }
261 
262  Int_t GetTotSessions() const { return fTotSessions; }
263  Int_t GetActSessions() const { return fActSessions; }
264  Float_t GetEffSessions() const { return fEffSessions; }
265 
266  void GetOptions(Int_t *argc, char **argv);
267  TList *GetEnabledPackages() const { return fPackMgr->GetListOfEnabled(); }
268 
269  static Long_t GetVirtMemMax();
270  static Long_t GetResMemMax();
271  static Float_t GetMemHWM();
272  static Float_t GetMemStop();
273 
274  Long64_t GetMsgSizeHWM() const { return fMsgSizeHWM; }
275 
276  const char *GetPrefix() const { return fPrefix; }
277 
278  void FlushLogFile();
279  void TruncateLogFile(); // Called also by TDSetProxy::Next()
280 
281  TProofLockPath *GetCacheLock() { return fCacheLock; } //cache dir locker; used by TProofPlayer
282 
283  virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
284  Bool_t resume = kFALSE);
285  virtual void HandleException(Int_t sig);
286  virtual Int_t HandleSocketInput(TMessage *mess, Bool_t all);
287  virtual void HandleSocketInput();
288  virtual void HandleUrgentData();
289  virtual void HandleSigPipe();
290  virtual void HandleTermination() { Terminate(0); }
291  void Interrupt() { fInterrupt = kTRUE; }
292  Bool_t IsEndMaster() const { return fEndMaster; }
293  Bool_t IsMaster() const { return fMasterServ; }
294  Bool_t IsParallel() const;
295  Bool_t IsTopMaster() const { return fOrdinal == "0"; }
296 
297  void Run(Bool_t retrn = kFALSE);
298 
299  void Print(Option_t *option="") const;
300 
301  void RestartComputeTime();
302 
303  TObject *Get(const char *namecycle);
304  TDSetElement *GetNextPacket(Long64_t totalEntries = -1);
305  virtual void ReleaseWorker(const char *) { }
306  void Reset(const char *dir);
307  Int_t ReceiveFile(const char *file, Bool_t bin, Long64_t size);
308  void SendAsynMessage(const char *msg, Bool_t lf = kTRUE);
309  virtual void SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
310  void SendStatistics();
311  void SendParallel(Bool_t async = kFALSE);
312 
313  Int_t UpdateSessionStatus(Int_t xst = -1);
314 
315  // Disable / Enable read timeout
316  virtual void DisableTimeout() { }
317  virtual void EnableTimeout() { }
318 
319  virtual void Terminate(Int_t status);
320 
321  // Log control
322  void LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
323 
324  static FILE *SetErrorHandlerFile(FILE *ferr);
325  static void ErrorHandler(Int_t level, Bool_t abort, const char *location,
326  const char *msg);
327 
328  static void ResolveKeywords(TString &fname, const char *path = 0);
329 
330  static void SetLastMsg(const char *lastmsg);
331  static void SetLastEntry(Long64_t lastentry);
332 
333  // To handle local data server related paths
334  static void FilterLocalroot(TString &path, const char *url = "root://dum/");
335  static void GetLocalServer(TString &dsrv);
336 
337  // To prepara ethe map of files to process
338  static TMap *GetDataSetNodeMap(TFileCollection *fc, TString &emsg);
339  static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e);
340 
341  static Bool_t IsActive();
342  static TProofServ *This();
343 
344  ClassDef(TProofServ,0) //PROOF Server Application Interface
345 };
346 
347 R__EXTERN TProofServ *gProofServ;
348 
349 class TProofLockPath : public TNamed {
350 private:
351  Int_t fLockId; //file id of dir lock
352 
353 public:
354  TProofLockPath(const char *path) : TNamed(path,path), fLockId(-1) { }
355  ~TProofLockPath() { if (IsLocked()) Unlock(); }
356 
357  Int_t Lock();
358  Int_t Unlock();
359 
360  Bool_t IsLocked() const { return (fLockId > -1); }
361 };
362 
363 class TProofLockPathGuard {
364 private:
365  TProofLockPath *fLocker; //locker instance
366 
367 public:
368  TProofLockPathGuard(TProofLockPath *l) { fLocker = l; if (fLocker) fLocker->Lock(); }
369  ~TProofLockPathGuard() { if (fLocker) fLocker->Unlock(); }
370 };
371 
372 //----- Handles output from commands executed externally via a pipe. ---------//
373 //----- The output is redirected one level up (i.e., to master or client). ---//
374 //______________________________________________________________________________
375 class TProofServLogHandler : public TFileHandler {
376 private:
377  TSocket *fSocket; // Socket where to redirect the message
378  FILE *fFile; // File connected with the open pipe
379  TString fPfx; // Prefix to be prepended to messages
380 
381  static TString fgPfx; // Default prefix to be prepended to messages
382  static Int_t fgCmdRtn; // Return code of the command execution (available only
383  // after closing the pipe)
384 public:
385  enum EStatusBits { kFileIsPipe = BIT(23) };
386  TProofServLogHandler(const char *cmd, TSocket *s, const char *pfx = "");
387  TProofServLogHandler(FILE *f, TSocket *s, const char *pfx = "");
388  virtual ~TProofServLogHandler();
389 
390  Bool_t IsValid() { return ((fFile && fSocket) ? kTRUE : kFALSE); }
391 
392  Bool_t Notify();
393  Bool_t ReadNotify() { return Notify(); }
394 
395  static void SetDefaultPrefix(const char *pfx);
396  static Int_t GetCmdRtn();
397 };
398 
399 //--- Guard class: close pipe, deactivatethe related descriptor --------------//
400 //______________________________________________________________________________
401 class TProofServLogHandlerGuard {
402 
403 private:
404  TProofServLogHandler *fExecHandler;
405 
406 public:
407  TProofServLogHandlerGuard(const char *cmd, TSocket *s,
408  const char *pfx = "", Bool_t on = kTRUE);
409  TProofServLogHandlerGuard(FILE *f, TSocket *s,
410  const char *pfx = "", Bool_t on = kTRUE);
411  virtual ~TProofServLogHandlerGuard();
412 };
413 
414 //--- Special timer to control delayed shutdowns
415 //______________________________________________________________________________
416 class TShutdownTimer : public TTimer {
417 private:
418  TProofServ *fProofServ;
419  Int_t fTimeout;
420 
421 public:
422  TShutdownTimer(TProofServ *p, Int_t delay);
423 
424  Bool_t Notify();
425 };
426 
427 //--- Synchronous timer used to reap children processes change of state
428 //______________________________________________________________________________
429 class TReaperTimer : public TTimer {
430 private:
431  TList *fChildren; // List of children (forked) processes
432 
433 public:
434  TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
435  virtual ~TReaperTimer();
436 
437  void AddPid(Int_t pid);
438  Bool_t Notify();
439 };
440 
441 //--- Special timer to terminate idle sessions
442 //______________________________________________________________________________
443 class TIdleTOTimer : public TTimer {
444 private:
445  TProofServ *fProofServ;
446 
447 public:
448  TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
449 
450  Bool_t Notify();
451 };
452 //______________________________________________________________________________
453 class TIdleTOTimerGuard {
454 
455 private:
456  TIdleTOTimer *fIdleTOTimer;
457 
458 public:
459  TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) { if (fIdleTOTimer) fIdleTOTimer->Stop(); }
460  virtual ~TIdleTOTimerGuard() { if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
461 };
462 
463 //______________________________________________________________________________
464 inline Int_t TProofServ::GetCompressionLevel() const
465 {
466  return (fCompressMsg < 0) ? -1 : fCompressMsg % 100;
467 }
468 
469 #endif