46 TSlave *GetTXSlave(
const char *url,
const char *ord, Int_t perf,
47 const char *image, TProof *proof, Int_t stype,
48 const char *workdir,
const char *msd, Int_t nwk)
50 return ((TSlave *)(
new TXSlave(url, ord, perf, image,
51 proof, stype, workdir, msd, nwk)));
57 TSlave::SetTXSlaveHook(&GetTXSlave);
59 static XSlaveInit xslave_init;
69 void TXSlave::DoError(
int level,
const char *location,
const char *fmt, va_list va)
const
71 ::ErrorHandler(level, Form(
"TXSlave::%s", location), fmt, va);
77 class TXSlaveInterruptHandler :
public TSignalHandler {
81 TXSlaveInterruptHandler(TXSocket *s = 0)
82 : TSignalHandler(kSigInterrupt, kFALSE), fSocket(s) { }
89 Bool_t TXSlaveInterruptHandler::Notify()
91 Info(
"Notify",
"Processing interrupt signal ...");
95 fSocket->SetInterrupt();
103 TXSlave::TXSlave(
const char *url,
const char *ord, Int_t perf,
104 const char *image, TProof *proof, Int_t stype,
105 const char *workdir,
const char *msd, Int_t nwk) : TSlave()
108 fProofWorkDir = workdir;
113 fSlaveType = (ESlaveType)stype;
120 TXSocketHandler *sh = TXSocketHandler::GetSocketHandler();
121 gSystem->AddFileHandler(sh);
123 TXSocket::SetLocation((fProof->IsMaster()) ?
"master" :
"client");
133 void TXSlave::Init(
const char *host, Int_t stype)
140 url.SetProtocol(fProof->fUrl.GetProtocol());
142 if (url.GetPort() == TUrl(
"a").GetPort()) {
144 Int_t port = gSystem->GetServiceByName(
"proofd");
147 Info(
"Init",
"service 'proofd' not found by GetServiceByName"
148 ": using default IANA assigned tcp port 1093");
152 Info(
"Init",
"port from GetServiceByName: %d", port);
158 fName = url.GetHostFQDN();
159 fPort = url.GetPort();
161 fGroup = url.GetPasswd();
166 TString opts(url.GetOptions());
167 Bool_t attach = (opts.Length() > 0 && opts.IsDigit()) ? kTRUE : kFALSE;
168 Int_t psid = (attach) ? opts.Atoi() : kPROOF_Protocol;
173 TString alias = fProof->GetTitle();
174 if (fProof->IsMaster() && stype == kSlave) {
178 alias.Form(
"session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
179 }
else if (fProof->IsMaster() && stype == kMaster) {
184 alias.Form(
"session-%s|ord:%s|plite:%d", fProof->GetName(), fOrdinal.Data(), fNWrks);
187 alias.Form(
"session-%s|ord:%s", fProof->GetName(), fOrdinal.Data());
189 }
else if (!fProof->IsMaster() && stype == kMaster) {
190 iam =
"Local Client";
191 mode = (attach) ?
'A' :
'M';
193 Error(
"Init",
"Impossible PROOF <-> SlaveType Configuration Requested");
198 if (fProof->fConfFile.Length() > 0 && fNWrks <= 1)
199 alias += Form(
"|cf:%s",fProof->fConfFile.Data());
203 if (!fProof->GetManager() ||
204 fProof->GetManager()->GetRemoteProtocol() > 1001) {
207 if (gSystem->Getenv(
"XrdSecPROTOCOL")) {
208 TProof::DelEnvVar(
"XrdSecPROTOCOL");
209 TProof::AddEnvVar(
"XrdSecPROTOCOL", gSystem->Getenv(
"XrdSecPROTOCOL"));
211 const TList *envs = TProof::GetEnvVars();
214 for (TObject *o = next(); o != 0; o = next()) {
215 TNamed *env =
dynamic_cast<TNamed*
>(o);
217 if (!envlist.IsNull())
219 envlist += Form(
"%s=%s", env->GetName(), env->GetTitle());
224 if (fProof->GetManager() && TProof::GetEnvVars())
225 Info(
"Init",
"** NOT ** sending user envs - RemoteProtocol : %d",
226 fProof->GetManager()->GetRemoteProtocol());
230 if (!envlist.IsNull())
231 alias += Form(
"|envs:%s", envlist.Data());
235 if (!(fSocket =
new TXSocket(url.GetUrl(kTRUE), mode, psid,
236 -1, alias, fProof->GetLogLevel(),
this))) {
238 Error(
"Init",
"while opening the connection to %s - exit", url.GetUrl(kTRUE));
243 if (!(fSocket->IsValid())) {
246 Error("Init", "some severe error occurred while opening "
247 "the connection at %s - exit", url.GetUrl(kTRUE));
250 fUser = ((TXSocket *)fSocket)->fUser;
251 PDB(kGlobal,3) Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
257 fSocket->SetTitle(fOrdinal);
260 if (!fProof->GetManager() && !envlist.IsNull() &&
261 ((TXSocket *)fSocket)->GetXrdProofdVersion() <= 1001) {
262 Info(
"Init",
"user envs setting sent but unsupported remotely - RemoteProtocol : %d",
263 ((TXSocket *)fSocket)->GetXrdProofdVersion());
267 ((TXSocket *)fSocket)->fReference = fProof;
270 fProtocol = fSocket->GetRemoteProtocol();
273 fProof->fServType = TProofMgr::kXProofd;
276 fProof->fSessionID = ((TXSocket *)fSocket)->GetSessionID();
286 R__LOCKGUARD(gROOTMutex);
287 gROOT->GetListOfSockets()->Remove(fSocket);
291 fUser = ((TXSocket *)fSocket)->fUser;
293 Info(
"Init",
"%s: fUser is .... %s", iam.Data(), fUser.Data());
303 void TXSlave::ParseBuffer()
306 TString buffer(((TXSocket *)fSocket)->fBuffer);
307 if (!buffer.IsNull()) {
308 Ssiz_t ilog = buffer.Index(
"|log:");
311 TString dpu = (ilog != kNPOS) ? buffer(0, ilog) : buffer;
312 if (dpu.Length() > 0) fProof->SetDataPoolUrl(dpu);
316 buffer.Remove(0, ilog +
sizeof(
"|log:") - 1);
318 if ((ilog = fWorkDir.Last(
'.')) != kNPOS) fWorkDir.Remove(ilog);
320 Info(
"ParseBuffer",
"workdir is: %s", fWorkDir.Data());
321 }
else if (fProtocol > 31) {
322 Warning(
"ParseBuffer",
"expected log path not found in received startup buffer!");
334 Int_t TXSlave::SetupServ(Int_t,
const char *)
340 if (fSocket->Recv(buf,
sizeof(buf), what) <= 0) {
341 Error(
"SetupServ",
"failed to receive slave startup message");
348 if (what == kMESS_NOTOK) {
356 Error(
"SetupServ",
"incompatible PROOF versions (remote version "
357 "must be >= 4, is %d)", fProtocol);
363 fProof->fProtocol = fProtocol;
366 fSocket->SetOption(kNoDelay, 1);
383 void TXSlave::Close(Option_t *opt)
397 Int_t TXSlave::Ping()
399 if (!IsValid())
return -1;
401 return (((TXSocket *)fSocket)->Ping(GetOrdinal()) ? 0 : -1);
407 void TXSlave::Touch()
409 if (!IsValid())
return;
411 ((TXSocket *)fSocket)->RemoteTouch();
419 void TXSlave::Interrupt(Int_t type)
421 if (!IsValid())
return;
423 if (type == TProof::kLocalInterrupt) {
430 TMonitor *mon = fProof->fCurrentMonitor;
431 if (mon && fSocket && mon->GetListOfActives()->FindObject(fSocket)) {
434 Info(
"Interrupt",
"%p: deactivating from monitor %p",
this, mon);
435 mon->DeActivate(fSocket);
438 Warning(
"Interrupt",
"%p: reference to PROOF missing",
this);
442 if (fSocket) ((TXSocket *)fSocket)->PostSemAll();
447 if (fSocket) ((TXSocket *)fSocket)->SendInterrupt(type);
448 Info(
"Interrupt",
"Interrupt of type %d sent", type);
455 void TXSlave::StopProcess(Bool_t abort, Int_t timeout)
457 if (!IsValid())
return;
459 ((TXSocket *)fSocket)->SendUrgent(TXSocket::kStopProcess, (Int_t)abort, timeout);
461 Info(
"StopProcess",
"Request of type %d sent over", abort);
468 Int_t TXSlave::GetProofdProtocol(TSocket *s)
473 Int_t len =
sizeof(cproto);
474 memcpy((
char *)&cproto,
475 Form(
" %d", TSocket::GetClientProtocol()),len);
476 Int_t ns = s->SendRaw(&cproto, len);
478 ::Error(
"TXSlave::GetProofdProtocol",
479 "sending %d bytes to proofd server [%s:%d]",
480 len, (s->GetInetAddress()).GetHostName(), s->GetPort());
487 Int_t nr = s->RecvRaw(ibuf, len);
489 ::Error(
"TXSlave::GetProofdProtocol",
490 "reading %d bytes from proofd server [%s:%d]",
491 len, (s->GetInetAddress()).GetHostName(), s->GetPort());
494 Int_t kind = net2host(ibuf[0]);
495 if (kind == kROOTD_PROTOCOL) {
496 rproto = net2host(ibuf[1]);
498 kind = net2host(ibuf[1]);
499 if (kind == kROOTD_PROTOCOL) {
500 len =
sizeof(rproto);
501 nr = s->RecvRaw(&rproto, len);
503 ::Error(
"TXSlave::GetProofdProtocol",
504 "reading %d bytes from proofd server [%s:%d]",
505 len, (s->GetInetAddress()).GetHostName(), s->GetPort());
508 rproto = net2host(rproto);
512 ::Info(
"TXSlave::GetProofdProtocol",
513 "remote proofd: buf1: %d, buf2: %d rproto: %d",
514 net2host(ibuf[0]),net2host(ibuf[1]),rproto);
524 TObjString *TXSlave::SendCoordinator(Int_t kind,
const char *msg, Int_t int2)
526 return ((TXSocket *)fSocket)->SendCoordinator(kind, msg, int2);
534 void TXSlave::SetAlias(
const char *alias)
537 if (!IsValid())
return;
539 ((TXSocket *)fSocket)->SendCoordinator(kSessionAlias, alias);
549 Int_t TXSlave::SendGroupPriority(
const char *grp, Int_t priority)
552 if (!IsValid())
return -1;
554 ((TXSocket *)fSocket)->SendCoordinator(kGroupProperties, grp, priority);
562 Bool_t TXSlave::HandleError(
const void *in)
564 XHandleErr_t *herr = in ? (XHandleErr_t *)in : 0;
567 if (fSocket && herr && (herr->fOpt == 1)) {
569 ((TXSocket *)fSocket)->Reconnect();
570 if (fSocket && fSocket->IsValid()) {
572 if (!strcmp(GetOrdinal(),
"0")) {
573 Printf(
"Proof: connection to master at %s:%d re-established",
574 GetName(), GetPort());
576 Printf(
"Proof: connection to node '%s' at %s:%d re-established",
577 GetOrdinal(), GetName(), GetPort());
585 Info(
"HandleError",
"%p:%s:%s got called ... fProof: %p, fSocket: %p (valid: %d)",
586 this, fName.Data(), fOrdinal.Data(), fProof, fSocket,
587 (fSocket ? (Int_t)fSocket->IsValid() : -1));
591 SetInterruptHandler(kFALSE);
596 if (fProof->fIntHandler)
597 fProof->fIntHandler->Remove();
599 Info(
"HandleError",
"%p: proof: %p",
this, fProof);
603 ((TXSocket *)fSocket)->SetSessionID(-1);
605 ((TXSocket *)fSocket)->SetInterrupt();
608 ((TXSocket *)fSocket)->PostMsg(kPROOF_FATAL);
612 if (fProof->IsMaster()) {
613 TString msg(Form(
"Worker '%s-%s' has been removed from the active list",
614 fName.Data(), fOrdinal.Data()));
615 TMessage m(kPROOF_MESSAGE);
618 gProofServ->GetSocket()->Send(m);
620 Warning(
"HandleError",
"%p: global reference to TProofServ missing",
this);
623 Warning(
"HandleError",
"%p: reference to PROOF missing",
this);
626 Printf(
"TXSlave::HandleError: %p: DONE ... ",
this);
635 Bool_t TXSlave::HandleInput(
const void *)
640 TMonitor *mon = fProof->fCurrentMonitor;
643 Info(
"HandleInput",
"%p: %s: proof: %p, mon: %p",
644 this, GetOrdinal(), fProof, mon);
646 if (mon && mon->IsActive(fSocket)) {
649 Info(
"HandleInput",
"%p: %s: posting monitor %p",
this, GetOrdinal(), mon);
650 mon->SetReady(fSocket);
655 Info(
"HandleInput",
"%p: %s: not active in current monitor"
656 " - calling TProof::CollectInputFrom",
659 Info(
"HandleInput",
"%p: %s: calling TProof::CollectInputFrom",
663 if (fProof->CollectInputFrom(fSocket) < 0)
668 Warning(
"HandleInput",
"%p: %s: reference to PROOF missing",
this, GetOrdinal());
679 void TXSlave::SetInterruptHandler(Bool_t on)
682 Info(
"SetInterruptHandler",
"enter: %d", on);
686 fIntHandler =
new TXSlaveInterruptHandler((TXSocket *)fSocket);
690 fIntHandler->Remove();
697 void TXSlave::FlushSocket()
700 Info(
"FlushSocket",
"enter: %p", fSocket);
703 TXSocket::fgPipe.Flush(fSocket);