13 #ifndef ROOT_TProofServ
14 #define ROOT_TProofServ
37 class TDataSetManager;
38 class TDataSetManagerFile;
41 class TFileCollection;
52 class TQueryResultManager;
57 class TVirtualProofPlayer;
62 typedef Int_t (*OldProofServAuthSetup_t)(TSocket *, Bool_t, Int_t,
63 TString &, TString &, TString &);
66 class TProofServ :
public TApplication {
68 friend class TProofServLite;
69 friend class TXProofServ;
72 enum EStatusBits { kHighMemory = BIT(17) };
73 enum EQueryAction { kQueryOK, kQueryModify, kQueryStop, kQueryEnqueued };
84 TString fTopSessionTag;
94 TProofLockPath *fCacheLock;
95 TProofLockPath *fQueryLock;
99 TVirtualProofPlayer *fPlayer;
102 Long64_t fLogFileMaxSize;
109 Int_t fGroupPriority;
117 TStopwatch fSaveOutput;
122 Float_t fEffSessions;
124 TFileHandler *fInputHandler;
126 TQueryResultManager *fQMgr;
128 TList *fWaitingQueries;
130 std::recursive_mutex fQMtx;
138 TShutdownTimer *fShutdownTimer;
139 TReaperTimer *fReaperTimer;
140 TIdleTOTimer *fIdleTOTimer;
144 TDataSetManager* fDataSetManager;
145 TDataSetManagerFile *fDataSetStgRepo;
147 Bool_t fSendLogToMaster;
149 TServerSocket *fMergingSocket;
150 TMonitor *fMergingMonitor;
151 Int_t fMergedWorkers;
155 Long64_t fMaxBoxSize;
156 Long64_t fHWMBoxSize;
159 static Long_t fgVirtMemMax;
160 static Long_t fgResMemMax;
162 static Float_t fgMemHWM;
163 static Float_t fgMemStop;
166 Long64_t fMsgSizeHWM;
168 static FILE *fgErrorHandlerFile;
169 static Int_t fgRecursive;
172 static Int_t fgLogToSysLog;
173 static TString fgSysLogService;
174 static TString fgSysLogEntity;
176 Int_t GetCompressionLevel()
const;
178 void RedirectOutput(
const char *dir = 0,
const char *mode =
"w");
180 Int_t UnloadPackage(
const char *package);
181 Int_t UnloadPackages();
182 Int_t OldAuthSetup(TString &wconf);
186 TProofQueryResult *MakeQueryResult(Long64_t nentries,
const char *opt,
187 TList *inl, Long64_t first, TDSet *dset,
188 const char *selec, TObject *elist);
189 void SetQueryRunning(TProofQueryResult *pq);
192 Int_t SendResults(TSocket *sock, TList *outlist = 0, TQueryResult *pq = 0);
193 Bool_t AcceptResults(Int_t connections, TVirtualProofPlayer *mergerPlayer);
196 void SetIdle(Bool_t st = kTRUE);
198 Int_t WaitingQueries();
199 Int_t QueueQuery(TProofQueryResult *pq);
200 TProofQueryResult *NextQuery();
201 Int_t CleanupWaitingQueries(Bool_t del = kTRUE, TList *qls = 0);
204 virtual void HandleArchive(TMessage *mess, TString *slb = 0);
205 virtual Int_t HandleCache(TMessage *mess, TString *slb = 0);
206 virtual void HandleCheckFile(TMessage *mess, TString *slb = 0);
207 virtual Int_t HandleDataSets(TMessage *mess, TString *slb = 0);
208 virtual void HandleSubmerger(TMessage *mess);
209 virtual void HandleFork(TMessage *mess);
210 virtual Int_t HandleLibIncPath(TMessage *mess);
211 virtual void HandleProcess(TMessage *mess, TString *slb = 0);
212 virtual void HandleQueryList(TMessage *mess);
213 virtual void HandleRemove(TMessage *mess, TString *slb = 0);
214 virtual void HandleRetrieve(TMessage *mess, TString *slb = 0);
215 virtual Int_t HandleWorkerLists(TMessage *mess);
217 virtual void ProcessNext(TString *slb = 0);
218 virtual Int_t Setup();
220 virtual void MakePlayer();
221 virtual void DeletePlayer();
223 virtual Int_t Fork();
224 Int_t GetSessionStatus();
226 Bool_t UnlinkDataDir(
const char *path);
228 static TString fgLastMsg;
229 static Long64_t fgLastEntry;
232 TProofServ(Int_t *argc,
char **argv, FILE *flog = 0);
233 virtual ~TProofServ();
235 virtual Int_t CreateServer();
237 TProof *GetProof()
const {
return fProof; }
238 const char *GetService()
const {
return fService; }
239 const char *GetConfDir()
const {
return fConfDir; }
240 const char *GetConfFile()
const {
return fConfFile; }
241 const char *GetUser()
const {
return fUser; }
242 const char *GetGroup()
const {
return fGroup; }
243 const char *GetWorkDir()
const {
return fWorkDir; }
244 const char *GetImage()
const {
return fImage; }
245 const char *GetSessionTag()
const {
return fSessionTag; }
246 const char *GetTopSessionTag()
const {
return fTopSessionTag; }
247 const char *GetSessionDir()
const {
return fSessionDir; }
248 const char *GetCacheDir()
const {
return fCacheDir; }
249 TPackMgr *GetPackMgr()
const {
return fPackMgr; }
250 const char *GetDataDir()
const {
return fDataDir; }
251 const char *GetDataDirOpts()
const {
return fDataDirOpts; }
252 Int_t GetProtocol()
const {
return fProtocol; }
253 const char *GetOrdinal()
const {
return fOrdinal; }
254 Int_t GetGroupId()
const {
return fGroupId; }
255 Int_t GetGroupSize()
const {
return fGroupSize; }
256 Int_t GetLogLevel()
const {
return fLogLevel; }
257 TSocket *GetSocket()
const {
return fSocket; }
258 Float_t GetRealTime()
const {
return fRealTime; }
259 Float_t GetCpuTime()
const {
return fCpuTime; }
260 Int_t GetQuerySeqNum()
const {
return fQuerySeqNum; }
262 Int_t GetTotSessions()
const {
return fTotSessions; }
263 Int_t GetActSessions()
const {
return fActSessions; }
264 Float_t GetEffSessions()
const {
return fEffSessions; }
266 void GetOptions(Int_t *argc,
char **argv);
267 TList *GetEnabledPackages()
const {
return fPackMgr->GetListOfEnabled(); }
269 static Long_t GetVirtMemMax();
270 static Long_t GetResMemMax();
271 static Float_t GetMemHWM();
272 static Float_t GetMemStop();
274 Long64_t GetMsgSizeHWM()
const {
return fMsgSizeHWM; }
276 const char *GetPrefix()
const {
return fPrefix; }
279 void TruncateLogFile();
281 TProofLockPath *GetCacheLock() {
return fCacheLock; }
283 virtual EQueryAction GetWorkers(TList *workers, Int_t &prioritychange,
284 Bool_t resume = kFALSE);
285 virtual void HandleException(Int_t sig);
286 virtual Int_t HandleSocketInput(TMessage *mess, Bool_t all);
287 virtual void HandleSocketInput();
288 virtual void HandleUrgentData();
289 virtual void HandleSigPipe();
290 virtual void HandleTermination() { Terminate(0); }
291 void Interrupt() { fInterrupt = kTRUE; }
292 Bool_t IsEndMaster()
const {
return fEndMaster; }
293 Bool_t IsMaster()
const {
return fMasterServ; }
294 Bool_t IsParallel()
const;
295 Bool_t IsTopMaster()
const {
return fOrdinal ==
"0"; }
297 void Run(Bool_t retrn = kFALSE);
299 void Print(Option_t *option=
"")
const;
301 void RestartComputeTime();
303 TObject *Get(
const char *namecycle);
304 TDSetElement *GetNextPacket(Long64_t totalEntries = -1);
305 virtual void ReleaseWorker(
const char *) { }
306 void Reset(
const char *dir);
307 Int_t ReceiveFile(
const char *file, Bool_t bin, Long64_t size);
308 void SendAsynMessage(
const char *msg, Bool_t lf = kTRUE);
309 virtual void SendLogFile(Int_t status = 0, Int_t start = -1, Int_t end = -1);
310 void SendStatistics();
311 void SendParallel(Bool_t async = kFALSE);
313 Int_t UpdateSessionStatus(Int_t xst = -1);
316 virtual void DisableTimeout() { }
317 virtual void EnableTimeout() { }
319 virtual void Terminate(Int_t status);
322 void LogToMaster(Bool_t on = kTRUE) { fSendLogToMaster = on; }
324 static FILE *SetErrorHandlerFile(FILE *ferr);
325 static void ErrorHandler(Int_t level, Bool_t abort,
const char *location,
328 static void ResolveKeywords(TString &fname,
const char *path = 0);
330 static void SetLastMsg(
const char *lastmsg);
331 static void SetLastEntry(Long64_t lastentry);
334 static void FilterLocalroot(TString &path,
const char *url =
"root://dum/");
335 static void GetLocalServer(TString &dsrv);
338 static TMap *GetDataSetNodeMap(TFileCollection *fc, TString &emsg);
339 static Int_t RegisterDataSets(TList *in, TList *out, TDataSetManager *dsm, TString &e);
341 static Bool_t IsActive();
342 static TProofServ *This();
344 ClassDef(TProofServ,0)
347 R__EXTERN TProofServ *gProofServ;
349 class TProofLockPath :
public TNamed {
354 TProofLockPath(
const char *path) : TNamed(path,path), fLockId(-1) { }
355 ~TProofLockPath() {
if (IsLocked()) Unlock(); }
360 Bool_t IsLocked()
const {
return (fLockId > -1); }
363 class TProofLockPathGuard {
365 TProofLockPath *fLocker;
368 TProofLockPathGuard(TProofLockPath *l) { fLocker = l;
if (fLocker) fLocker->Lock(); }
369 ~TProofLockPathGuard() {
if (fLocker) fLocker->Unlock(); }
375 class TProofServLogHandler :
public TFileHandler {
381 static TString fgPfx;
382 static Int_t fgCmdRtn;
385 enum EStatusBits { kFileIsPipe = BIT(23) };
386 TProofServLogHandler(
const char *cmd, TSocket *s,
const char *pfx =
"");
387 TProofServLogHandler(FILE *f, TSocket *s,
const char *pfx =
"");
388 virtual ~TProofServLogHandler();
390 Bool_t IsValid() {
return ((fFile && fSocket) ? kTRUE : kFALSE); }
393 Bool_t ReadNotify() {
return Notify(); }
395 static void SetDefaultPrefix(
const char *pfx);
396 static Int_t GetCmdRtn();
401 class TProofServLogHandlerGuard {
404 TProofServLogHandler *fExecHandler;
407 TProofServLogHandlerGuard(
const char *cmd, TSocket *s,
408 const char *pfx =
"", Bool_t on = kTRUE);
409 TProofServLogHandlerGuard(FILE *f, TSocket *s,
410 const char *pfx =
"", Bool_t on = kTRUE);
411 virtual ~TProofServLogHandlerGuard();
416 class TShutdownTimer :
public TTimer {
418 TProofServ *fProofServ;
422 TShutdownTimer(TProofServ *p, Int_t delay);
429 class TReaperTimer :
public TTimer {
434 TReaperTimer(Long_t frequency = 1000) : TTimer(frequency, kTRUE), fChildren(0) { }
435 virtual ~TReaperTimer();
437 void AddPid(Int_t pid);
443 class TIdleTOTimer :
public TTimer {
445 TProofServ *fProofServ;
448 TIdleTOTimer(TProofServ *p, Int_t delay) : TTimer(delay, kTRUE), fProofServ(p) { }
453 class TIdleTOTimerGuard {
456 TIdleTOTimer *fIdleTOTimer;
459 TIdleTOTimerGuard(TIdleTOTimer *t) : fIdleTOTimer(t) {
if (fIdleTOTimer) fIdleTOTimer->Stop(); }
460 virtual ~TIdleTOTimerGuard() {
if (fIdleTOTimer) fIdleTOTimer->Start(-1, kTRUE); }
464 inline Int_t TProofServ::GetCompressionLevel()
const
466 return (fCompressMsg < 0) ? -1 : fCompressMsg % 100;