38 ClassImp(TProofCondor);
43 TProofCondor::TProofCondor(
const char *masterurl,
const char *conffile,
44 const char *confdir, Int_t loglevel,
45 const char *, TProofMgr *mgr)
46 : fCondor(0), fTimer(0)
54 fUrl = TUrl(masterurl);
56 if (!conffile || !conffile[0]) {
57 conffile = kPROOF_ConfFile;
58 }
else if (!strncasecmp(conffile,
"condor:", 7)) {
62 if (!confdir || !confdir[0]) {
63 confdir = kPROOF_ConfDir;
66 Init(masterurl, conffile, confdir, loglevel);
72 TProofCondor::~TProofCondor()
81 Bool_t TProofCondor::StartSlaves(Bool_t)
83 fCondor =
new TCondor;
84 TString jobad = GetJobAd();
86 fImage = fCondor->GetImage(gSystem->HostName());
87 if (fImage.Length() == 0) {
88 Error(
"StartSlaves",
"Empty Condor image found for system %s",
94 if (fConfFile.IsNull()) {
96 TList *condorclaims = fCondor->Claim(9999, jobad);
97 TIter nextclaim(condorclaims);
98 while (TObject *o = nextclaim()) claims.Add(o);
101 TProofResourcesStatic *resources =
new TProofResourcesStatic(fConfDir, fConfFile);
102 fConfFile = resources->GetFileName();
103 PDB(kGlobal,1) Info("StartSlaves", "using PROOF config file: %s", fConfFile.Data());
106 TList *workerList = resources->GetWorkers();
107 if (workerList->GetSize() == 0) {
108 Error(
"StartSlaves",
"Found no condorworkers in %s", fConfFile.Data());
116 TListIter next(workerList);
118 TProofNodeInfo *worker;
120 while ((to = next())) {
122 worker = (TProofNodeInfo *)to;
125 const Char_t *image = worker->GetImage().Data();
126 const Char_t *workdir = worker->GetWorkDir().Data();
127 Int_t perfidx = worker->GetPerfIndex();
130 TCondorSlave* csl = fCondor->Claim(worker->GetNodeName().Data(), jobad);
132 csl->fPerfIdx = perfidx;
134 csl->fWorkDir = gSystem->ExpandPathName(workdir);
135 TString fullord = TString(gProofServ->GetOrdinal()) +
"." + ((Long_t) ord);
136 csl->fOrdinal = fullord.Data();
143 TMessage m(kPROOF_SERVERSTARTED);
144 m << TString("Creating COD Claim") << workerList->GetSize()
145 << nSlavesDone << (csl != 0);
146 gProofServ->GetSocket()->Send(m);
160 int nClaims = claims.GetSize();
162 while (claims.GetSize() > 0) {
167 c =
dynamic_cast<TCondorSlave*
>(claims.At(idx));
169 TPair *p =
dynamic_cast<TPair*
>(claims.At(idx));
171 TTimer *t =
dynamic_cast<TTimer*
>(p->Value());
174 Long64_t wait = t->GetAbsTime()-gSystem->Now();
175 if (wait > 0) gSystem->Sleep((UInt_t)wait);
176 c =
dynamic_cast<TCondorSlave*
>(p->Key());
183 if (c) slave = CreateSlave(Form(
"%s:%d", c->fHostname.Data(), c->fPort), c->fOrdinal,
184 c->fPerfIdx, c->fImage, c->fWorkDir);
187 if (trial < ntries) {
188 if (slave && slave->IsValid()) {
193 TPair *p =
dynamic_cast<TPair*
>(claims.Remove(c));
195 TTimer *xt =
dynamic_cast<TTimer*
>(p->Value());
201 TMessage m(kPROOF_SERVERSTARTED);
202 m << TString(
"Opening connections to workers") << nClaims
203 << nClaimsDone << kTRUE;
204 gProofServ->GetSocket()->Send(m);
207 TTimer* timer =
new TTimer(delay);
208 TPair *p =
new TPair(c, timer);
209 claims.RemoveAt(idx);
210 claims.AddAt(p, idx);
212 TPair *p =
dynamic_cast<TPair*
>(claims.At(idx));
213 if (p && p->Value()) {
214 TTimer *xt =
dynamic_cast<TTimer*
>(p->Value());
221 Warning(
"StartSlaves",
"could not create TSlave object!");
226 TPair *p =
dynamic_cast<TPair*
>(claims.Remove(c));
227 if (p && p->Value()) {
228 TTimer *xt =
dynamic_cast<TTimer*
>(p->Value());
234 TMessage m(kPROOF_SERVERSTARTED);
235 m << TString(
"Opening connections to workers") << nClaims
236 << nClaimsDone << slave->IsValid();
237 gProofServ->GetSocket()->Send(m);
239 Warning(
"StartSlaves",
"could not create TSlave object!");
243 if (idx>=claims.GetSize()) {
253 int nSlavesDone = 0, nSlavesTotal = fSlaves->GetSize();
254 while ((sl = (TSlave *) nxsl())) {
258 sl->SetupServ(TSlave::kSlave, 0);
262 fAllMonitor->Add(sl->GetSocket());
269 TMessage m(kPROOF_SERVERSTARTED);
270 Bool_t wrkvalid = sl->IsValid() ? kTRUE : kFALSE;
271 m << TString(
"Setting up worker servers") << nSlavesTotal
272 << nSlavesDone << wrkvalid;
273 gProofServ->GetSocket()->Send(m);
282 void TProofCondor::SetActive(Bool_t active)
285 fTimer =
new TTimer();
288 PDB(kCondor,1) Info("SetActive","-- Condor Resume --");
290 if (fCondor->GetState() == TCondor::kSuspended)
297 PDB(kCondor,1) Info("SetActive","-- Delayed Condor Suspend (%d msec / to %lld) --",
298 delay, delay + Long64_t(gSystem->Now()));
299 fTimer->Connect("Timeout()", "TCondor", fCondor, "Suspend()");
300 fTimer->Start(10000, kTRUE);
308 TString TProofCondor::GetJobAd()
312 ad =
"JobUniverse = 5\n";
313 ad += Form(
"Cmd = \"%s/bin/proofd\"\n", GetConfDir());
314 ad += Form(
"Iwd = \"%s\"\n", gSystem->TempDirectory());
315 ad +=
"In = \"/dev/null\"\n";
316 ad += Form(
"Out = \"%s/proofd.out.$(Port)\"\n", gSystem->TempDirectory());
317 ad += Form(
"Err = \"%s/proofd.err.$(Port)\"\n", gSystem->TempDirectory());
318 ad += Form(
"Args = \"-f -p $(Port) -d %d %s\"\n", GetLogLevel(), GetConfDir());