Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProofCondor.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Fons Rademakers 13/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TProof //
15 // //
16 // This class controls a Parallel ROOT Facility, PROOF, cluster. //
17 // It fires the slave servers, it keeps track of how many slaves are //
18 // running, it keeps track of the slaves running status, it broadcasts //
19 // messages to all slaves, it collects results, etc. //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #include "TProofCondor.h"
24 
25 #include "TCondor.h"
26 #include "TList.h"
27 #include "TMap.h"
28 #include "TMessage.h"
29 #include "TMonitor.h"
30 #include "TProofNodeInfo.h"
31 #include "TProofResourcesStatic.h"
32 #include "TProofServ.h"
33 #include "TSlave.h"
34 #include "TSocket.h"
35 #include "TString.h"
36 #include "TTimer.h"
37 
38 ClassImp(TProofCondor);
39 
40 ////////////////////////////////////////////////////////////////////////////////
41 /// Start proof using condor
42 
43 TProofCondor::TProofCondor(const char *masterurl, const char *conffile,
44  const char *confdir, Int_t loglevel,
45  const char *, TProofMgr *mgr)
46  : fCondor(0), fTimer(0)
47 {
48  // Default initializations
49  InitMembers();
50 
51  // This may be needed during init
52  fManager = mgr;
53 
54  fUrl = TUrl(masterurl);
55 
56  if (!conffile || !conffile[0]) {
57  conffile = kPROOF_ConfFile;
58  } else if (!strncasecmp(conffile, "condor:", 7)) {
59  conffile+=7;
60  }
61 
62  if (!confdir || !confdir[0]) {
63  confdir = kPROOF_ConfDir;
64  }
65 
66  Init(masterurl, conffile, confdir, loglevel);
67 }
68 
69 ////////////////////////////////////////////////////////////////////////////////
70 /// Clean up Condor PROOF environment.
71 
72 TProofCondor::~TProofCondor()
73 {
74  SafeDelete(fCondor);
75  SafeDelete(fTimer);
76 }
77 
78 ////////////////////////////////////////////////////////////////////////////////
79 /// Setup Condor workers using dynamic information
80 
81 Bool_t TProofCondor::StartSlaves(Bool_t)
82 {
83  fCondor = new TCondor;
84  TString jobad = GetJobAd();
85 
86  fImage = fCondor->GetImage(gSystem->HostName());
87  if (fImage.Length() == 0) {
88  Error("StartSlaves", "Empty Condor image found for system %s",
89  gSystem->HostName());
90  return kFALSE;
91  }
92 
93  TList claims;
94  if (fConfFile.IsNull()) {
95  // startup all slaves if no config file given
96  TList *condorclaims = fCondor->Claim(9999, jobad);
97  TIter nextclaim(condorclaims);
98  while (TObject *o = nextclaim()) claims.Add(o);
99  } else {
100  // parse config file
101  TProofResourcesStatic *resources = new TProofResourcesStatic(fConfDir, fConfFile);
102  fConfFile = resources->GetFileName(); // Update the global file name (with path)
103  PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
104 
105  // Get all workers
106  TList *workerList = resources->GetWorkers();
107  if (workerList->GetSize() == 0) {
108  Error("StartSlaves", "Found no condorworkers in %s", fConfFile.Data());
109  return kFALSE;
110  }
111 
112  // check for valid slave lines and claim condor nodes
113  Int_t ord = 0;
114 
115  // Loop over all workers and start them
116  TListIter next(workerList);
117  TObject *to;
118  TProofNodeInfo *worker;
119  int nSlavesDone = 0;
120  while ((to = next())) {
121  // Get the next worker from the list
122  worker = (TProofNodeInfo *)to;
123 
124  // Read back worker node info
125  const Char_t *image = worker->GetImage().Data();
126  const Char_t *workdir = worker->GetWorkDir().Data();
127  Int_t perfidx = worker->GetPerfIndex();
128 
129  gSystem->Sleep(10 /* ms */);
130  TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
131  if (csl) {
132  csl->fPerfIdx = perfidx;
133  csl->fImage = image;
134  csl->fWorkDir = gSystem->ExpandPathName(workdir);
135  TString fullord = TString(gProofServ->GetOrdinal()) + "." + ((Long_t) ord);
136  csl->fOrdinal = fullord.Data();
137  claims.Add(csl);
138  ord++;
139  }
140 
141  // Notify claim creation
142  nSlavesDone++;
143  TMessage m(kPROOF_SERVERSTARTED);
144  m << TString("Creating COD Claim") << workerList->GetSize()
145  << nSlavesDone << (csl != 0);
146  gProofServ->GetSocket()->Send(m);
147 
148  } // end while (worker loop)
149 
150  // Cleanup
151  delete resources;
152  resources = 0;
153  } // end else (parse config file)
154 
155  Long_t delay = 500; // timer delay 0.5s
156  Int_t ntries = 20; // allow 20 tries (must be > 1 for algorithm to work)
157  Int_t trial = 1;
158  Int_t idx = 0;
159 
160  int nClaims = claims.GetSize();
161  int nClaimsDone = 0;
162  while (claims.GetSize() > 0) {
163  TCondorSlave* c = 0;
164 
165  // Get Condor Slave
166  if (trial == 1) {
167  c = dynamic_cast<TCondorSlave*>(claims.At(idx));
168  } else {
169  TPair *p = dynamic_cast<TPair*>(claims.At(idx));
170  if (p) {
171  TTimer *t = dynamic_cast<TTimer*>(p->Value());
172  if (t) {
173  // wait remaining time
174  Long64_t wait = t->GetAbsTime()-gSystem->Now();
175  if (wait > 0) gSystem->Sleep((UInt_t)wait);
176  c = dynamic_cast<TCondorSlave*>(p->Key());
177  }
178  }
179  }
180 
181  // create slave
182  TSlave *slave = 0;
183  if (c) slave = CreateSlave(Form("%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
184  c->fPerfIdx, c->fImage, c->fWorkDir);
185 
186  // add slave to appropriate list
187  if (trial < ntries) {
188  if (slave && slave->IsValid()) {
189  fSlaves->Add(slave);
190  if (trial == 1) {
191  claims.Remove(c);
192  } else {
193  TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
194  if (p) {
195  TTimer *xt = dynamic_cast<TTimer*>(p->Value());
196  if (xt) delete xt;
197  delete p;
198  }
199  }
200  nClaimsDone++;
201  TMessage m(kPROOF_SERVERSTARTED);
202  m << TString("Opening connections to workers") << nClaims
203  << nClaimsDone << kTRUE;
204  gProofServ->GetSocket()->Send(m);
205  } else if (slave) {
206  if (trial == 1) {
207  TTimer* timer = new TTimer(delay);
208  TPair *p = new TPair(c, timer);
209  claims.RemoveAt(idx);
210  claims.AddAt(p, idx);
211  } else {
212  TPair *p = dynamic_cast<TPair*>(claims.At(idx));
213  if (p && p->Value()) {
214  TTimer *xt = dynamic_cast<TTimer*>(p->Value());
215  if (xt) xt->Reset();
216  }
217  }
218  delete slave;
219  idx++;
220  } else {
221  Warning("StartSlaves", "could not create TSlave object!");
222  }
223  } else {
224  if (slave) {
225  fSlaves->Add(slave);
226  TPair *p = dynamic_cast<TPair*>(claims.Remove(c));
227  if (p && p->Value()) {
228  TTimer *xt = dynamic_cast<TTimer*>(p->Value());
229  delete xt;
230  }
231  if (p) delete p;
232 
233  nClaimsDone++;
234  TMessage m(kPROOF_SERVERSTARTED);
235  m << TString("Opening connections to workers") << nClaims
236  << nClaimsDone << slave->IsValid();
237  gProofServ->GetSocket()->Send(m);
238  } else {
239  Warning("StartSlaves", "could not create TSlave object!");
240  }
241  }
242 
243  if (idx>=claims.GetSize()) {
244  trial++;
245  idx = 0;
246  }
247  }
248 
249  // Here we finalize the server startup: in this way the bulk
250  // of remote operations are almost parallelized
251  TIter nxsl(fSlaves);
252  TSlave *sl = 0;
253  int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
254  while ((sl = (TSlave *) nxsl())) {
255 
256  // Finalize setup of the server
257  if (sl->IsValid()) {
258  sl->SetupServ(TSlave::kSlave, 0);
259  }
260 
261  if (sl->IsValid()) {
262  fAllMonitor->Add(sl->GetSocket());
263  } else {
264  fBadSlaves->Add(sl);
265  }
266 
267  // Notify end of startup operations
268  nSlavesDone++;
269  TMessage m(kPROOF_SERVERSTARTED);
270  Bool_t wrkvalid = sl->IsValid() ? kTRUE : kFALSE;
271  m << TString("Setting up worker servers") << nSlavesTotal
272  << nSlavesDone << wrkvalid;
273  gProofServ->GetSocket()->Send(m);
274  }
275 
276  return kTRUE;
277 }
278 
279 ////////////////////////////////////////////////////////////////////////////////
280 /// Suspend or resume PROOF via Condor.
281 
282 void TProofCondor::SetActive(Bool_t active)
283 {
284  if (fTimer == 0) {
285  fTimer = new TTimer();
286  }
287  if (active) {
288  PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
289  fTimer->Stop();
290  if (fCondor->GetState() == TCondor::kSuspended)
291  fCondor->Resume();
292  } else {
293 #if 1
294  return; // don't suspend for the moment
295 #else
296  Int_t delay = 60000; // milli seconds
297  PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
298  delay, delay + Long64_t(gSystem->Now()));
299  fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
300  fTimer->Start(10000, kTRUE); // single shot
301 #endif
302  }
303 }
304 
305 ////////////////////////////////////////////////////////////////////////////////
306 /// Get job Ad
307 
308 TString TProofCondor::GetJobAd()
309 {
310  TString ad;
311 
312  ad = "JobUniverse = 5\n"; // vanilla
313  ad += Form("Cmd = \"%s/bin/proofd\"\n", GetConfDir());
314  ad += Form("Iwd = \"%s\"\n", gSystem->TempDirectory());
315  ad += "In = \"/dev/null\"\n";
316  ad += Form("Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
317  ad += Form("Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
318  ad += Form("Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());
319 
320  return ad;
321 }