39 ClassImp(TProofSuperMaster);
44 TProofSuperMaster::TProofSuperMaster(
const char *masterurl,
const char *conffile,
45 const char *confdir, Int_t loglevel,
46 const char *alias, TProofMgr *mgr)
54 fUrl = TUrl(masterurl);
56 if (!conffile || !conffile[0])
57 conffile = kPROOF_ConfFile;
58 else if (!strncasecmp(conffile,
"sm:", 3))
60 if (!confdir || !confdir[0])
61 confdir = kPROOF_ConfDir;
65 ResetBit(TProof::kIsClient);
66 SetBit(TProof::kIsMaster);
67 SetBit(TProof::kIsTopMaster);
69 Init(masterurl, conffile, confdir, loglevel, alias);
72 gROOT->GetListOfProofs()->Add(
this);
78 Bool_t TProofSuperMaster::StartSlaves(Bool_t)
88 TList *submasterList =
new TList;
90 if (gProofServ->GetWorkers(submasterList, pc) == TProofServ::kQueryStop) {
91 Error(
"StartSlaves",
"getting list of submaster nodes");
94 fImage = gProofServ->GetImage();
96 fImage = Form(
"%s:%s", TUrl(gSystem->HostName()).GetHostFQDN(),
97 gProofServ->GetWorkDir());
99 UInt_t nSubmasters = submasterList->GetSize();
100 UInt_t nSubmastersDone = 0;
102 TList validSubmasters;
104 validPairs.SetOwner();
107 TListIter next(submasterList);
109 TProofNodeInfo *submaster;
110 while ((to = next())) {
112 submaster = (TProofNodeInfo *)to;
113 const Char_t *conffile = submaster->GetConfig();
114 const Char_t *image = submaster->GetImage();
115 const Char_t *msd = submaster->GetMsd();
116 Int_t sport = submaster->GetPort();
118 sport = fUrl.GetPort();
120 TString fullord = TString(gProofServ->GetOrdinal()) +
"." + ((Long_t) ord);
123 TUrl u(Form("%s:%d", submaster->GetNodeName().Data(), sport));
125 if (strlen(gProofServ->GetGroup()) > 0) {
127 if (strlen(u.GetUser()) <= 0)
128 u.SetUser(gProofServ->GetUser());
129 u.SetPasswd(gProofServ->GetGroup());
132 CreateSubmaster(u.GetUrl(), fullord, image, msd);
136 Bool_t submasterOk = kTRUE;
138 if (slave->IsValid()) {
139 validPairs.Add(
new TPair(slave,
new TObjString(conffile)));
141 submasterOk = kFALSE;
142 fBadSlaves->Add(slave);
146 Info("StartSlaves","submaster on host %s created and"
147 " added to list", submaster->GetNodeName().Data());
151 TMessage m(kPROOF_SERVERSTARTED);
152 m << TString("Opening connections to submasters") << nSubmasters
153 << nSubmastersDone << submasterOk;
154 gProofServ->GetSocket()->Send(m);
161 SafeDelete(submasterList);
167 TIter nxsc(&validPairs);
169 while ((sc = (TPair *) nxsc())) {
171 TSlave *sl = (TSlave *) sc->Key();
172 TObjString *cf = (TObjString *) sc->Value();
173 sl->SetupServ(TSlave::kMaster, cf->GetName());
176 Bool_t submasterOk = kTRUE;
180 if (fProtocol == 1) {
181 Error(
"StartSlaves",
"master and submaster protocols"
182 " not compatible (%d and %d)",
183 kPROOF_Protocol, fProtocol);
184 submasterOk = kFALSE;
187 fAllMonitor->Add(sl->GetSocket());
188 validSubmasters.Add(sl);
191 submasterOk = kFALSE;
197 TMessage m(kPROOF_SERVERSTARTED);
198 m << TString(
"Setting up submasters") << nSubmasters
199 << nSubmastersDone << submasterOk;
200 gProofServ->GetSocket()->Send(m);
204 TIter nextSubmaster(&validSubmasters);
205 while (TSlave* sl = dynamic_cast<TSlave*>(nextSubmaster())) {
206 if (sl->GetStatus() == -99) {
207 Error(
"StartSlaves",
"not allowed to connect to PROOF master server");
212 if (!sl->IsValid()) {
213 Error(
"StartSlaves",
"failed to setup connection with PROOF master server");
229 Long64_t TProofSuperMaster::Process(TDSet *set,
const char *selector, Option_t *option,
230 Long64_t nentries, Long64_t first)
232 if (!IsValid())
return -1;
234 R__ASSERT(GetPlayer());
236 if (GetProgressDialog())
237 GetProgressDialog()->ExecPlugin(5,
this, selector, set->GetListOfElements()->GetSize(),
240 return GetPlayer()->Process(set, selector, option, nentries, first);
246 void TProofSuperMaster::ValidateDSet(TDSet *dset)
248 if (dset->ElementsValid())
return;
251 dset->ResetBit(TDSet::kValidityChecked);
252 dset->ResetBit(TDSet::kSomeInvalid);
260 elemholder.SetOwner();
263 TIter nextSubmaster(GetListOfActiveSlaves());
264 while (TSlave *sl = dynamic_cast<TSlave*>(nextSubmaster())) {
266 TPair *p =
dynamic_cast<TPair*
>(msds.FindObject(sl->GetMsd()));
269 smlist->SetName(sl->GetMsd());
271 smholder.Add(smlist);
272 TList *elemlist =
new TSortedList(kSortDescending);
273 elemlist->SetName(TString(sl->GetMsd())+
"_elem");
274 elemholder.Add(elemlist);
275 msds.Add(
new TPair(smlist, elemlist));
277 smlist =
dynamic_cast<TList*
>(p->Key());
279 if (smlist) smlist->Add(sl);
282 TIter nextElem(dset->GetListOfElements());
283 while (TDSetElement *elem = dynamic_cast<TDSetElement*>(nextElem())) {
284 if (elem->GetValid())
continue;
285 TPair *p =
dynamic_cast<TPair*
>(msds.FindObject(elem->GetMsd()));
286 if (p && p->Value()) {
287 TList *xl =
dynamic_cast<TList*
>(p->Value());
288 if (xl) xl->Add(elem);
290 Error(
"ValidateDSet",
"no mass storage domain '%s' associated"
291 " with available submasters",
301 while (TPair *msd = dynamic_cast<TPair*>(nextSM())) {
302 TList *sms =
dynamic_cast<TList*
>(msd->Key());
303 TList *setelements =
dynamic_cast<TList*
>(msd->Value());
306 Int_t nsms = sms ? sms->GetSize() : -1;
307 Int_t nelements = setelements ? setelements->GetSize() : -1;
308 for (Int_t i=0; i<nsms; i++) {
310 TDSet set(dset->GetType(), dset->GetObjName(),
311 dset->GetDirectory());
312 for (Int_t j = (i*nelements)/nsms;
313 j < ((i+1)*nelements)/nsms;
315 TDSetElement *elem = setelements ?
316 dynamic_cast<TDSetElement*
>(setelements->At(j)) : (TDSetElement *)0;
318 set.Add(elem->GetFileName(), elem->GetObjName(),
319 elem->GetDirectory(), elem->GetFirst(),
320 elem->GetNum(), elem->GetMsd());
324 if (set.GetListOfElements()->GetSize()>0) {
325 TMessage mesg(kPROOF_VALIDATE_DSET);
328 TSlave *sl =
dynamic_cast<TSlave*
>(sms->At(i));
332 "Sending TDSet with %d elements to worker %s"
333 " to be validated", set.GetListOfElements()->GetSize(),
335 sl->GetSocket()->Send(mesg);
338 Warning(
"ValidateDSet",
"not a TSlave object");
345 Info("ValidateDSet","Calling Collect");
355 TVirtualProofPlayer *TProofSuperMaster::MakePlayer(const
char *player, TSocket *s)
360 SetPlayer(TVirtualProofPlayer::Create(player,
this, s));