23 #include "RConfigure.h"
41 TSlave_t TSlave::fgTXSlaveHook = 0;
46 TSlave::TSlave(
const char *url,
const char *ord, Int_t perf,
47 const char *image, TProof *proof, Int_t stype,
48 const char *workdir,
const char *msd, Int_t)
49 : fImage(image), fProofWorkDir(workdir),
50 fWorkDir(workdir), fPort(-1),
51 fOrdinal(ord), fPerfIdx(perf),
52 fProtocol(0), fSocket(0), fProof(proof),
53 fInput(0), fBytesRead(0), fRealTime(0),
54 fCpuTime(0), fSlaveType((ESlaveType)stype), fStatus(TSlave::kInvalid),
55 fParallel(0), fMsd(msd)
57 fName = TUrl(url).GetHostFQDN();
58 fPort = TUrl(url).GetPort();
88 void TSlave::Init(
const char *host, Int_t port, Int_t stype)
92 TString proto = fProof->fUrl.GetProtocol();
96 hurl.SetProtocol(proto);
102 if (fProof->IsMaster() && stype == kSlave) {
104 hurl.SetOptions(
"SM");
105 }
else if (fProof->IsMaster() && stype == kMaster) {
107 hurl.SetOptions(
"MM");
108 }
else if (!fProof->IsMaster() && stype == kMaster) {
109 iam =
"Local Client";
110 hurl.SetOptions(
"MC");
112 Error(
"Init",
"Impossible PROOF <-> SlaveType Configuration Requested");
121 fSocket = TSocket::CreateAuthSocket(hurl.GetUrl(), 0, wsize, fSocket);
123 if (!fSocket || !fSocket->IsAuthenticated()) {
133 R__LOCKGUARD(gROOTMutex);
134 gROOT->GetListOfSockets()->Remove(fSocket);
138 fUser = fSocket->GetSecContext()->GetUser();
140 Info(
"Init",
"%s: fUser is .... %s", iam.Data(), fUser.Data());
143 if (fSocket->GetRemoteProtocol() >= 14 ) {
144 TMessage m(kPROOF_SETENV);
146 const TList *envs = TProof::GetEnvVars();
149 for (TObject *o = next(); o != 0; o = next()) {
150 TNamed *env =
dynamic_cast<TNamed*
>(o);
152 TString def = Form(
"%s=%s", env->GetName(), env->GetTitle());
153 const char *p = def.Data();
160 Info(
"Init",
"** NOT ** Sending kPROOF_SETENV RemoteProtocol : %d",
161 fSocket->GetRemoteProtocol());
165 fSocket->Recv(buf,
sizeof(buf));
166 if (strcmp(buf,
"Okay")) {
179 Int_t TSlave::SetupServ(Int_t stype,
const char *conffile)
185 if (fSocket->Recv(buf,
sizeof(buf), what) <= 0) {
186 Error(
"SetupServ",
"failed to receive slave startup message");
191 if (what == kMESS_NOTOK) {
198 if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*
sizeof(Int_t)) {
199 Error(
"SetupServ",
"failed to send local PROOF protocol");
204 if (fSocket->Recv(fProtocol, what) != 2*
sizeof(Int_t)) {
205 Error(
"SetupServ",
"failed to receive remote PROOF protocol");
212 Error(
"SetupServ",
"incompatible PROOF versions (remote version"
213 " must be >= 4, is %d)", fProtocol);
218 fProof->fProtocol = fProtocol;
223 Bool_t isMaster = (stype == kMaster);
224 TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
225 if (OldAuthSetup(isMaster, wconf) != 0) {
226 Error(
"SetupServ",
"OldAuthSetup: failed to setup authentication");
234 if (stype == kMaster)
235 mess << fUser << fOrdinal << TString(conffile);
237 mess << fUser << fOrdinal << fProofWorkDir;
239 if (fSocket->Send(mess) < 0) {
240 Error(
"SetupServ",
"failed to send ordinal and config info");
247 fSocket->SetOption(kNoDelay, 1);
260 void TSlave::Init(TSocket *s, Int_t stype)
263 TSlave::Init(s->GetInetAddress().GetHostName(), s->GetPort(), stype);
277 void TSlave::Close(Option_t *opt)
282 if (!(fProof->IsMaster()) && !strncasecmp(opt,
"S",1)) {
284 Interrupt(TProof::kShutdownInterrupt);
289 TSecContext *sc = fSocket->GetSecContext();
290 if (sc && sc->IsActive()) {
291 TIter last(sc->GetSecContextCleanup(), kIterBackward);
292 TSecContextCleanup *nscc = 0;
293 while ((nscc = (TSecContextCleanup *)last())) {
294 if (nscc->GetType() == TSocket::kPROOFD &&
295 nscc->GetProtocol() < 9) {
310 Int_t TSlave::Compare(
const TObject *obj)
const
312 const TSlave *sl =
dynamic_cast<const TSlave*
>(obj);
315 Error(
"Compare",
"input is not a TSlave object");
319 if (fPerfIdx > sl->GetPerfIdx())
return 1;
320 if (fPerfIdx < sl->GetPerfIdx())
return -1;
321 const char *myord = GetOrdinal();
322 const char *otherord = sl->GetOrdinal();
323 while (myord && otherord) {
324 Int_t myval = atoi(myord);
325 Int_t otherval = atoi(otherord);
326 if (myval < otherval)
return 1;
327 if (myval > otherval)
return -1;
328 myord = strchr(myord,
'.');
330 otherord = strchr(otherord,
'.');
331 if (otherord) otherord++;
333 if (myord)
return -1;
334 if (otherord)
return 1;
341 void TSlave::Print(Option_t *)
const
345 const char *sst[] = {
"invalid" ,
"valid",
"inactive" };
346 Int_t st = fSocket ? ((fStatus == kInactive) ? 2 : 1) : 0;
348 Printf(
"*** Worker %s (%s)", fOrdinal.Data(), sst[st]);
349 Printf(
" Host name: %s", GetName());
350 Printf(
" Port number: %d", GetPort());
351 Printf(
" Worker session tag: %s", GetSessionTag());
352 Printf(
" ROOT version|rev|tag: %s", GetROOTVersion());
353 Printf(
" Architecture-Compiler: %s", GetArchCompiler());
355 if (strlen(GetGroup()) > 0) {
356 Printf(
" User/Group: %s/%s", GetUser(), GetGroup());
358 Printf(
" User: %s", GetUser());
360 if (fSocket->GetSecContext())
361 Printf(
" Security context: %s", fSocket->GetSecContext()->AsString(sc));
362 Printf(
" Proofd protocol version: %d", fSocket->GetRemoteProtocol());
363 Printf(
" Image name: %s", GetImage());
364 Printf(
" Working directory: %s", GetWorkDir());
365 Printf(
" Performance index: %d", GetPerfIdx());
366 Printf(
" MB's processed: %.2f",
float(GetBytesRead())/(1024*1024));
367 Printf(
" MB's sent: %.2f",
float(fSocket->GetBytesRecv())/(1024*1024));
368 Printf(
" MB's received: %.2f",
float(fSocket->GetBytesSent())/(1024*1024));
369 Printf(
" Real time used (s): %.3f", GetRealTime());
370 Printf(
" CPU time used (s): %.3f", GetCpuTime());
372 if (strlen(GetGroup()) > 0) {
373 Printf(
" User/Group: %s/%s", GetUser(), GetGroup());
375 Printf(
" User: %s", GetUser());
377 Printf(
" Security context:");
378 Printf(
" Proofd protocol version:");
379 Printf(
" Image name: %s", GetImage());
380 Printf(
" Working directory: %s", GetWorkDir());
381 Printf(
" Performance index: %d", GetPerfIdx());
382 Printf(
" MB's processed: %.2f",
float(GetBytesRead())/(1024*1024));
383 Printf(
" MB's sent:");
384 Printf(
" MB's received:");
385 Printf(
" Real time used (s): %.3f", GetRealTime());
386 Printf(
" CPU time used (s): %.3f", GetCpuTime());
394 void TSlave::SetInputHandler(TFileHandler *ih)
404 Int_t TSlave::OldAuthSetup(Bool_t master, TString wconf)
406 static OldSlaveAuthSetup_t oldAuthSetupHook = 0;
408 if (!oldAuthSetupHook) {
410 TString authlib =
"libRootAuth";
413 if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
415 if (gSystem->Load(authlib) == -1) {
416 Error(
"OldAuthSetup",
"can't load %s",authlib.Data());
420 Error(
"OldAuthSetup",
"can't locate %s",authlib.Data());
425 Func_t f = gSystem->DynFindSymbol(authlib,
"OldSlaveAuthSetup");
427 oldAuthSetupHook = (OldSlaveAuthSetup_t)(f);
429 Error(
"OldAuthSetup",
"can't find OldSlaveAuthSetup");
435 return (*oldAuthSetupHook)(fSocket, master, fOrdinal, wconf);
442 TSlave *TSlave::Create(
const char *url,
const char *ord, Int_t perf,
443 const char *image, TProof *proof, Int_t stype,
444 const char *workdir,
const char *msd, Int_t nwk)
449 if (!strcmp(url,
"lite")) {
450 return new TSlaveLite(ord, perf, image, proof, stype, workdir, msd);
454 Bool_t tryxpd = kTRUE;
455 if (!(proof->IsMaster())) {
456 if (proof->IsProofd())
459 if (gApplication && (gApplication->Argc() < 3 ||
460 (gApplication->Argc() > 2 && gApplication->Argv(2) &&
461 strncmp(gApplication->Argv(2),
"xpd",3))))
467 if (!fgTXSlaveHook) {
470 TString proofxlib =
"libProofx";
472 if ((p = gSystem->DynamicPathName(proofxlib, kTRUE))) {
474 if (gSystem->Load(proofxlib) == -1)
475 ::Error(
"TSlave::Create",
"can't load %s", proofxlib.Data());
477 ::Error(
"TSlave::Create",
"can't locate %s", proofxlib.Data());
481 if (fgTXSlaveHook && tryxpd) {
482 s = (*fgTXSlaveHook)(url, ord, perf, image, proof, stype, workdir, msd, nwk);
484 s =
new TSlave(url, ord, perf, image, proof, stype, workdir, msd);
496 if (!IsValid())
return -1;
498 TMessage mess(kPROOF_PING | kMESS_ACK);
500 if (fSocket->Send(mess) == -1) {
501 Warning(
"Ping",
"%s: acknowledgement not received", GetOrdinal());
511 void TSlave::Interrupt(Int_t type)
513 if (!IsValid())
return;
515 char oobc = (char) type;
516 const int kBufSize = 1024;
517 char waste[kBufSize];
520 if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
521 Error(
"Interrupt",
"error sending oobc to slave %s", GetOrdinal());
525 if (type == TProof::kHardInterrupt) {
527 int n, nch, nbytes = 0, nloop = 0;
530 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
542 fSocket->GetOption(kBytesToRead, nch);
544 gSystem->Sleep(1000);
548 if (nch > kBufSize) nch = kBufSize;
549 n = fSocket->RecvRaw(waste, nch);
551 Error(
"Interrupt",
"error receiving waste from slave %s",
556 }
else if (n == -3) {
562 Error(
"Interrupt",
"server %s does not respond", GetOrdinal());
566 Error(
"Interrupt",
"error receiving OOB from server %s",
579 fSocket->GetOption(kAtMark, atmark);
585 fSocket->GetOption(kBytesToRead, nch);
587 gSystem->Sleep(1000);
591 if (nch > kBufSize) nch = kBufSize;
592 n = fSocket->RecvRaw(waste, nch);
594 Error(
"Interrupt",
"error receiving waste (2) from slave %s",
601 if (fProof->IsMaster())
602 Info(
"Interrupt",
"slave %s:%s synchronized: %d bytes discarded",
603 GetName(), GetOrdinal(), nbytes);
605 Info(
"Interrupt",
"PROOF synchronized: %d bytes discarded", nbytes);
609 fProof->Collect(
this);
611 }
else if (type == TProof::kSoftInterrupt) {
614 fProof->Collect(
this);
616 }
else if (type == TProof::kShutdownInterrupt) {
623 fProof->Collect(
this);
630 void TSlave::StopProcess(Bool_t abort, Int_t timeout)
633 TMessage msg(kPROOF_STOPPROCESS);
635 if (fProof->fProtocol > 9)
644 TObjString *TSlave::SendCoordinator(Int_t,
const char *, Int_t)
647 Info(
"SendCoordinator",
"method not implemented for this communication layer");
656 void TSlave::SetAlias(
const char *)
659 Info(
"SetAlias",
"method not implemented for this communication layer");
666 void TSlave::SetTXSlaveHook(TSlave_t xslavehook)
668 fgTXSlaveHook = xslavehook;