23 #include "RConfigure.h" 
   41 TSlave_t TSlave::fgTXSlaveHook = 0;
 
   46 TSlave::TSlave(
const char *url, 
const char *ord, Int_t perf,
 
   47                const char *image, TProof *proof, Int_t stype,
 
   48                const char *workdir, 
const char *msd, Int_t)
 
   49   : fImage(image), fProofWorkDir(workdir),
 
   50     fWorkDir(workdir), fPort(-1),
 
   51     fOrdinal(ord), fPerfIdx(perf),
 
   52     fProtocol(0), fSocket(0), fProof(proof),
 
   53     fInput(0), fBytesRead(0), fRealTime(0),
 
   54     fCpuTime(0), fSlaveType((ESlaveType)stype), fStatus(TSlave::kInvalid),
 
   55     fParallel(0), fMsd(msd)
 
   57    fName = TUrl(url).GetHostFQDN();
 
   58    fPort = TUrl(url).GetPort();
 
   88 void TSlave::Init(
const char *host, Int_t port, Int_t stype)
 
   92    TString proto = fProof->fUrl.GetProtocol();
 
   96    hurl.SetProtocol(proto);
 
  102    if (fProof->IsMaster() && stype == kSlave) {
 
  104       hurl.SetOptions(
"SM");
 
  105    } 
else if (fProof->IsMaster() && stype == kMaster) {
 
  107       hurl.SetOptions(
"MM");
 
  108    } 
else if (!fProof->IsMaster() && stype == kMaster) {
 
  109       iam = 
"Local Client";
 
  110       hurl.SetOptions(
"MC");
 
  112       Error(
"Init",
"Impossible PROOF <-> SlaveType Configuration Requested");
 
  121    fSocket = TSocket::CreateAuthSocket(hurl.GetUrl(), 0, wsize, fSocket);
 
  123    if (!fSocket || !fSocket->IsAuthenticated()) {
 
  133       R__LOCKGUARD(gROOTMutex);
 
  134       gROOT->GetListOfSockets()->Remove(fSocket);
 
  138    fUser              = fSocket->GetSecContext()->GetUser();
 
  140       Info(
"Init",
"%s: fUser is .... %s", iam.Data(), fUser.Data());
 
  143    if (fSocket->GetRemoteProtocol() >= 14 ) {
 
  144       TMessage m(kPROOF_SETENV);
 
  146       const TList *envs = TProof::GetEnvVars();
 
  149          for (TObject *o = next(); o != 0; o = next()) {
 
  150             TNamed *env = 
dynamic_cast<TNamed*
>(o);
 
  152                TString def = Form(
"%s=%s", env->GetName(), env->GetTitle());
 
  153                const char *p = def.Data();
 
  160       Info(
"Init",
"** NOT ** Sending kPROOF_SETENV RemoteProtocol : %d",
 
  161          fSocket->GetRemoteProtocol());
 
  165    fSocket->Recv(buf, 
sizeof(buf));
 
  166    if (strcmp(buf, 
"Okay")) {
 
  179 Int_t TSlave::SetupServ(Int_t stype, 
const char *conffile)
 
  185    if (fSocket->Recv(buf, 
sizeof(buf), what) <= 0) {
 
  186       Error(
"SetupServ", 
"failed to receive slave startup message");
 
  191    if (what == kMESS_NOTOK) {
 
  198    if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*
sizeof(Int_t)) {
 
  199       Error(
"SetupServ", 
"failed to send local PROOF protocol");
 
  204    if (fSocket->Recv(fProtocol, what) != 2*
sizeof(Int_t)) {
 
  205       Error(
"SetupServ", 
"failed to receive remote PROOF protocol");
 
  212       Error(
"SetupServ", 
"incompatible PROOF versions (remote version" 
  213                       " must be >= 4, is %d)", fProtocol);
 
  218    fProof->fProtocol   = fProtocol;   
 
  223       Bool_t isMaster = (stype == kMaster);
 
  224       TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
 
  225       if (OldAuthSetup(isMaster, wconf) != 0) {
 
  226          Error(
"SetupServ", 
"OldAuthSetup: failed to setup authentication");
 
  234       if (stype == kMaster)
 
  235          mess << fUser << fOrdinal << TString(conffile);
 
  237          mess << fUser << fOrdinal << fProofWorkDir;
 
  239       if (fSocket->Send(mess) < 0) {
 
  240          Error(
"SetupServ", 
"failed to send ordinal and config info");
 
  247    fSocket->SetOption(kNoDelay, 1);
 
  260 void TSlave::Init(TSocket *s, Int_t stype)
 
  263    TSlave::Init(s->GetInetAddress().GetHostName(), s->GetPort(), stype);
 
  277 void TSlave::Close(Option_t *opt)
 
  282       if (!(fProof->IsMaster()) && !strncasecmp(opt,
"S",1)) {
 
  284          Interrupt(TProof::kShutdownInterrupt);
 
  289       TSecContext *sc = fSocket->GetSecContext();
 
  290       if (sc && sc->IsActive()) {
 
  291          TIter last(sc->GetSecContextCleanup(), kIterBackward);
 
  292          TSecContextCleanup *nscc = 0;
 
  293          while ((nscc = (TSecContextCleanup *)last())) {
 
  294             if (nscc->GetType() == TSocket::kPROOFD &&
 
  295                 nscc->GetProtocol() < 9) {
 
  310 Int_t TSlave::Compare(
const TObject *obj)
 const 
  312    const TSlave *sl = 
dynamic_cast<const TSlave*
>(obj);
 
  315       Error(
"Compare", 
"input is not a TSlave object");
 
  319    if (fPerfIdx > sl->GetPerfIdx()) 
return 1;
 
  320    if (fPerfIdx < sl->GetPerfIdx()) 
return -1;
 
  321    const char *myord = GetOrdinal();
 
  322    const char *otherord = sl->GetOrdinal();
 
  323    while (myord && otherord) {
 
  324       Int_t myval = atoi(myord);
 
  325       Int_t otherval = atoi(otherord);
 
  326       if (myval < otherval) 
return 1;
 
  327       if (myval > otherval) 
return -1;
 
  328       myord = strchr(myord, 
'.');
 
  330       otherord = strchr(otherord, 
'.');
 
  331       if (otherord) otherord++;
 
  333    if (myord) 
return -1;
 
  334    if (otherord) 
return 1;
 
  341 void TSlave::Print(Option_t *)
 const 
  345    const char *sst[] = { 
"invalid" , 
"valid", 
"inactive" };
 
  346    Int_t st = fSocket ? ((fStatus == kInactive) ? 2 : 1) : 0;
 
  348    Printf(
"*** Worker %s  (%s)", fOrdinal.Data(), sst[st]);
 
  349    Printf(
"    Host name:               %s", GetName());
 
  350    Printf(
"    Port number:             %d", GetPort());
 
  351    Printf(
"    Worker session tag:      %s", GetSessionTag());
 
  352    Printf(
"    ROOT version|rev|tag:    %s", GetROOTVersion());
 
  353    Printf(
"    Architecture-Compiler:   %s", GetArchCompiler());
 
  355       if (strlen(GetGroup()) > 0) {
 
  356          Printf(
"    User/Group:              %s/%s", GetUser(), GetGroup());
 
  358          Printf(
"    User:                    %s", GetUser());
 
  360       if (fSocket->GetSecContext())
 
  361          Printf(
"    Security context:        %s", fSocket->GetSecContext()->AsString(sc));
 
  362       Printf(
"    Proofd protocol version: %d", fSocket->GetRemoteProtocol());
 
  363       Printf(
"    Image name:              %s", GetImage());
 
  364       Printf(
"    Working directory:       %s", GetWorkDir());
 
  365       Printf(
"    Performance index:       %d", GetPerfIdx());
 
  366       Printf(
"    MB's processed:          %.2f", 
float(GetBytesRead())/(1024*1024));
 
  367       Printf(
"    MB's sent:               %.2f", 
float(fSocket->GetBytesRecv())/(1024*1024));
 
  368       Printf(
"    MB's received:           %.2f", 
float(fSocket->GetBytesSent())/(1024*1024));
 
  369       Printf(
"    Real time used (s):      %.3f", GetRealTime());
 
  370       Printf(
"    CPU time used (s):       %.3f", GetCpuTime());
 
  372       if (strlen(GetGroup()) > 0) {
 
  373          Printf(
"    User/Group:              %s/%s", GetUser(), GetGroup());
 
  375          Printf(
"    User:                    %s", GetUser());
 
  377       Printf(
"    Security context:");
 
  378       Printf(
"    Proofd protocol version:");
 
  379       Printf(
"    Image name:              %s", GetImage());
 
  380       Printf(
"    Working directory:       %s", GetWorkDir());
 
  381       Printf(
"    Performance index:       %d", GetPerfIdx());
 
  382       Printf(
"    MB's processed:          %.2f", 
float(GetBytesRead())/(1024*1024));
 
  383       Printf(
"    MB's sent:");
 
  384       Printf(
"    MB's received:");
 
  385       Printf(
"    Real time used (s):      %.3f", GetRealTime());
 
  386       Printf(
"    CPU time used (s):       %.3f", GetCpuTime());
 
  394 void TSlave::SetInputHandler(TFileHandler *ih)
 
  404 Int_t TSlave::OldAuthSetup(Bool_t master, TString wconf)
 
  406    static OldSlaveAuthSetup_t oldAuthSetupHook = 0;
 
  408    if (!oldAuthSetupHook) {
 
  410       TString authlib = 
"libRootAuth";
 
  413       if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
 
  415          if (gSystem->Load(authlib) == -1) {
 
  416             Error(
"OldAuthSetup", 
"can't load %s",authlib.Data());
 
  420          Error(
"OldAuthSetup", 
"can't locate %s",authlib.Data());
 
  425       Func_t f = gSystem->DynFindSymbol(authlib,
"OldSlaveAuthSetup");
 
  427          oldAuthSetupHook = (OldSlaveAuthSetup_t)(f);
 
  429          Error(
"OldAuthSetup", 
"can't find OldSlaveAuthSetup");
 
  435    return (*oldAuthSetupHook)(fSocket, master, fOrdinal, wconf);
 
  442 TSlave *TSlave::Create(
const char *url, 
const char *ord, Int_t perf,
 
  443                        const char *image, TProof *proof, Int_t stype,
 
  444                        const char *workdir, 
const char *msd, Int_t nwk)
 
  449    if (!strcmp(url, 
"lite")) {
 
  450       return new TSlaveLite(ord, perf, image, proof, stype, workdir, msd);
 
  454    Bool_t tryxpd = kTRUE;
 
  455    if (!(proof->IsMaster())) {
 
  456       if (proof->IsProofd())
 
  459       if (gApplication && (gApplication->Argc() < 3 ||
 
  460                           (gApplication->Argc() > 2 && gApplication->Argv(2) &&
 
  461                            strncmp(gApplication->Argv(2),
"xpd",3))))
 
  467    if (!fgTXSlaveHook) {
 
  470       TString proofxlib = 
"libProofx";
 
  472       if ((p = gSystem->DynamicPathName(proofxlib, kTRUE))) {
 
  474          if (gSystem->Load(proofxlib) == -1)
 
  475             ::Error(
"TSlave::Create", 
"can't load %s", proofxlib.Data());
 
  477          ::Error(
"TSlave::Create", 
"can't locate %s", proofxlib.Data());
 
  481    if (fgTXSlaveHook && tryxpd) {
 
  482       s = (*fgTXSlaveHook)(url, ord, perf, image, proof, stype, workdir, msd, nwk);
 
  484       s = 
new TSlave(url, ord, perf, image, proof, stype, workdir, msd);
 
  496    if (!IsValid()) 
return -1;
 
  498    TMessage mess(kPROOF_PING | kMESS_ACK);
 
  500    if (fSocket->Send(mess) == -1) {
 
  501       Warning(
"Ping",
"%s: acknowledgement not received", GetOrdinal());
 
  511 void TSlave::Interrupt(Int_t type)
 
  513    if (!IsValid()) 
return;
 
  515    char oobc = (char) type;
 
  516    const int kBufSize = 1024;
 
  517    char waste[kBufSize];
 
  520    if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
 
  521       Error(
"Interrupt", 
"error sending oobc to slave %s", GetOrdinal());
 
  525    if (type == TProof::kHardInterrupt) {
 
  527       int   n, nch, nbytes = 0, nloop = 0;
 
  530       while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
 
  542             fSocket->GetOption(kBytesToRead, nch);
 
  544                gSystem->Sleep(1000);
 
  548             if (nch > kBufSize) nch = kBufSize;
 
  549             n = fSocket->RecvRaw(waste, nch);
 
  551                Error(
"Interrupt", 
"error receiving waste from slave %s",
 
  556          } 
else if (n == -3) {   
 
  562                Error(
"Interrupt", 
"server %s does not respond", GetOrdinal());
 
  566             Error(
"Interrupt", 
"error receiving OOB from server %s",
 
  579          fSocket->GetOption(kAtMark, atmark);
 
  585          fSocket->GetOption(kBytesToRead, nch);
 
  587             gSystem->Sleep(1000);
 
  591          if (nch > kBufSize) nch = kBufSize;
 
  592          n = fSocket->RecvRaw(waste, nch);
 
  594             Error(
"Interrupt", 
"error receiving waste (2) from slave %s",
 
  601          if (fProof->IsMaster())
 
  602             Info(
"Interrupt", 
"slave %s:%s synchronized: %d bytes discarded",
 
  603                  GetName(), GetOrdinal(), nbytes);
 
  605             Info(
"Interrupt", 
"PROOF synchronized: %d bytes discarded", nbytes);
 
  609       fProof->Collect(
this);
 
  611    } 
else if (type == TProof::kSoftInterrupt) {
 
  614       fProof->Collect(
this);
 
  616    } 
else if (type == TProof::kShutdownInterrupt) {
 
  623       fProof->Collect(
this);
 
  630 void TSlave::StopProcess(Bool_t abort, Int_t timeout)
 
  633    TMessage msg(kPROOF_STOPPROCESS);
 
  635    if (fProof->fProtocol > 9)
 
  644 TObjString *TSlave::SendCoordinator(Int_t, 
const char *, Int_t)
 
  647       Info(
"SendCoordinator",
"method not implemented for this communication layer");
 
  656 void TSlave::SetAlias(
const char *)
 
  659       Info(
"SetAlias",
"method not implemented for this communication layer");
 
  666 void TSlave::SetTXSlaveHook(TSlave_t xslavehook)
 
  668    fgTXSlaveHook = xslavehook;