Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TSlave.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Fons Rademakers 14/02/97
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TSlave
13 \ingroup proofkernel
14 
15 Class describing a PROOF worker server. It contains information like the
16 workers host name, ordinal number, performance index, socket, etc.
17 Objects of this class can only be created via TProof member functions.
18 
19 */
20 
21 #include <stdlib.h>
22 
23 #include "RConfigure.h"
24 #include "TApplication.h"
25 #include "TSlave.h"
26 #include "TSlaveLite.h"
27 #include "TProof.h"
28 #include "TSystem.h"
29 #include "TEnv.h"
30 #include "TROOT.h"
31 #include "TUrl.h"
32 #include "TMessage.h"
33 #include "TError.h"
34 #include "TVirtualMutex.h"
35 #include "TSocket.h"
36 #include "TObjString.h"
37 
38 ClassImp(TSlave);
39 
40 // Hook for the TXSlave constructor
41 TSlave_t TSlave::fgTXSlaveHook = 0;
42 
43 ////////////////////////////////////////////////////////////////////////////////
44 /// Create a PROOF slave object. Called via the TProof ctor.
45 
46 TSlave::TSlave(const char *url, const char *ord, Int_t perf,
47  const char *image, TProof *proof, Int_t stype,
48  const char *workdir, const char *msd, Int_t)
49  : fImage(image), fProofWorkDir(workdir),
50  fWorkDir(workdir), fPort(-1),
51  fOrdinal(ord), fPerfIdx(perf),
52  fProtocol(0), fSocket(0), fProof(proof),
53  fInput(0), fBytesRead(0), fRealTime(0),
54  fCpuTime(0), fSlaveType((ESlaveType)stype), fStatus(TSlave::kInvalid),
55  fParallel(0), fMsd(msd)
56 {
57  fName = TUrl(url).GetHostFQDN();
58  fPort = TUrl(url).GetPort();
59 
60  Init(url, -1, stype);
61 }
62 
63 ////////////////////////////////////////////////////////////////////////////////
64 /// Default constructor used by derived classes
65 
66 TSlave::TSlave()
67 {
68  fPort = -1;
69  fOrdinal = "-1";
70  fPerfIdx = -1;
71  fProof = 0;
72  fSlaveType = kMaster;
73  fProtocol = 0;
74  fSocket = 0;
75  fInput = 0;
76  fBytesRead = 0;
77  fRealTime = 0;
78  fCpuTime = 0;
79  fStatus = kInvalid;
80  fParallel = 0;
81 }
82 
83 ////////////////////////////////////////////////////////////////////////////////
84 /// Init a PROOF slave object. Called via the TSlave ctor.
85 /// The Init method is technology specific and is overwritten by derived
86 /// classes.
87 
88 void TSlave::Init(const char *host, Int_t port, Int_t stype)
89 {
90  // The url contains information about the server type: make sure
91  // it is 'proofd' or alike
92  TString proto = fProof->fUrl.GetProtocol();
93  proto.Insert(5, 'd');
94 
95  TUrl hurl(host);
96  hurl.SetProtocol(proto);
97  if (port > 0)
98  hurl.SetPort(port);
99 
100  // Add information about our status (Client or Master)
101  TString iam;
102  if (fProof->IsMaster() && stype == kSlave) {
103  iam = "Master";
104  hurl.SetOptions("SM");
105  } else if (fProof->IsMaster() && stype == kMaster) {
106  iam = "Master";
107  hurl.SetOptions("MM");
108  } else if (!fProof->IsMaster() && stype == kMaster) {
109  iam = "Local Client";
110  hurl.SetOptions("MC");
111  } else {
112  Error("Init","Impossible PROOF <-> SlaveType Configuration Requested");
113  R__ASSERT(0);
114  }
115 
116  // Open authenticated connection to remote PROOF slave server.
117  // If a connection was already open (fSocket != 0), re-use it
118  // to perform authentication (optimization needed to avoid a double
119  // opening in case this is called by TXSlave).
120  Int_t wsize = 65536;
121  fSocket = TSocket::CreateAuthSocket(hurl.GetUrl(), 0, wsize, fSocket);
122 
123  if (!fSocket || !fSocket->IsAuthenticated()) {
124  SafeDelete(fSocket);
125  return;
126  }
127 
128  // Remove socket from global TROOT socket list. Only the TProof object,
129  // representing all slave sockets, will be added to this list. This will
130  // ensure the correct termination of all proof servers in case the
131  // root session terminates.
132  {
133  R__LOCKGUARD(gROOTMutex);
134  gROOT->GetListOfSockets()->Remove(fSocket);
135  }
136 
137  // Fill some useful info
138  fUser = fSocket->GetSecContext()->GetUser();
139  PDB(kGlobal,3) {
140  Info("Init","%s: fUser is .... %s", iam.Data(), fUser.Data());
141  }
142 
143  if (fSocket->GetRemoteProtocol() >= 14 ) {
144  TMessage m(kPROOF_SETENV);
145 
146  const TList *envs = TProof::GetEnvVars();
147  if (envs != 0 ) {
148  TIter next(envs);
149  for (TObject *o = next(); o != 0; o = next()) {
150  TNamed *env = dynamic_cast<TNamed*>(o);
151  if (env != 0) {
152  TString def = Form("%s=%s", env->GetName(), env->GetTitle());
153  const char *p = def.Data();
154  m << p;
155  }
156  }
157  }
158  fSocket->Send(m);
159  } else {
160  Info("Init","** NOT ** Sending kPROOF_SETENV RemoteProtocol : %d",
161  fSocket->GetRemoteProtocol());
162  }
163 
164  char buf[512];
165  fSocket->Recv(buf, sizeof(buf));
166  if (strcmp(buf, "Okay")) {
167  Printf("%s", buf);
168  SafeDelete(fSocket);
169  return;
170  }
171 
172 }
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 /// Init a PROOF slave object. Called via the TSlave ctor.
176 /// The Init method is technology specific and is overwritten by derived
177 /// classes.
178 
179 Int_t TSlave::SetupServ(Int_t stype, const char *conffile)
180 {
181  // get back startup message of proofserv (we are now talking with
182  // the real proofserver and not anymore with the proofd front-end)
183  Int_t what;
184  char buf[512];
185  if (fSocket->Recv(buf, sizeof(buf), what) <= 0) {
186  Error("SetupServ", "failed to receive slave startup message");
187  SafeDelete(fSocket);
188  return -1;
189  }
190 
191  if (what == kMESS_NOTOK) {
192  SafeDelete(fSocket);
193  return -1;
194  }
195 
196  // exchange protocol level between client and master and between
197  // master and slave
198  if (fSocket->Send(kPROOF_Protocol, kROOTD_PROTOCOL) != 2*sizeof(Int_t)) {
199  Error("SetupServ", "failed to send local PROOF protocol");
200  SafeDelete(fSocket);
201  return -1;
202  }
203 
204  if (fSocket->Recv(fProtocol, what) != 2*sizeof(Int_t)) {
205  Error("SetupServ", "failed to receive remote PROOF protocol");
206  SafeDelete(fSocket);
207  return -1;
208  }
209 
210  // protocols less than 4 are incompatible
211  if (fProtocol < 4) {
212  Error("SetupServ", "incompatible PROOF versions (remote version"
213  " must be >= 4, is %d)", fProtocol);
214  SafeDelete(fSocket);
215  return -1;
216  }
217 
218  fProof->fProtocol = fProtocol; // protocol of last slave on master
219 
220  if (fProtocol < 5) {
221  //
222  // Setup authentication related stuff for ald versions
223  Bool_t isMaster = (stype == kMaster);
224  TString wconf = isMaster ? TString(conffile) : fProofWorkDir;
225  if (OldAuthSetup(isMaster, wconf) != 0) {
226  Error("SetupServ", "OldAuthSetup: failed to setup authentication");
227  SafeDelete(fSocket);
228  return -1;
229  }
230  } else {
231  //
232  // Send ordinal (and config) info to slave (or master)
233  TMessage mess;
234  if (stype == kMaster)
235  mess << fUser << fOrdinal << TString(conffile);
236  else
237  mess << fUser << fOrdinal << fProofWorkDir;
238 
239  if (fSocket->Send(mess) < 0) {
240  Error("SetupServ", "failed to send ordinal and config info");
241  SafeDelete(fSocket);
242  return -1;
243  }
244  }
245 
246  // set some socket options
247  fSocket->SetOption(kNoDelay, 1);
248 
249  // Set active state
250  fStatus = kActive;
251 
252  // We are done
253  return 0;
254 }
255 
256 ////////////////////////////////////////////////////////////////////////////////
257 /// Init a PROOF slave object using the connection opened via s. Used to
258 /// avoid double opening when an attempt via TXSlave found a remote proofd.
259 
260 void TSlave::Init(TSocket *s, Int_t stype)
261 {
262  fSocket = s;
263  TSlave::Init(s->GetInetAddress().GetHostName(), s->GetPort(), stype);
264 }
265 
266 ////////////////////////////////////////////////////////////////////////////////
267 /// Destroy slave.
268 
269 TSlave::~TSlave()
270 {
271  Close();
272 }
273 
274 ////////////////////////////////////////////////////////////////////////////////
275 /// Close slave socket.
276 
277 void TSlave::Close(Option_t *opt)
278 {
279  if (fSocket) {
280 
281  // If local client ...
282  if (!(fProof->IsMaster()) && !strncasecmp(opt,"S",1)) {
283  // ... tell master and slaves to stop
284  Interrupt(TProof::kShutdownInterrupt);
285  }
286 
287  // deactivate used sec context if talking to proofd daemon running
288  // an old protocol (sec context disactivated remotely)
289  TSecContext *sc = fSocket->GetSecContext();
290  if (sc && sc->IsActive()) {
291  TIter last(sc->GetSecContextCleanup(), kIterBackward);
292  TSecContextCleanup *nscc = 0;
293  while ((nscc = (TSecContextCleanup *)last())) {
294  if (nscc->GetType() == TSocket::kPROOFD &&
295  nscc->GetProtocol() < 9) {
296  sc->DeActivate("");
297  break;
298  }
299  }
300  }
301  }
302 
303  SafeDelete(fInput);
304  SafeDelete(fSocket);
305 }
306 
307 ////////////////////////////////////////////////////////////////////////////////
308 /// Used to sort slaves by performance index.
309 
310 Int_t TSlave::Compare(const TObject *obj) const
311 {
312  const TSlave *sl = dynamic_cast<const TSlave*>(obj);
313 
314  if (!sl) {
315  Error("Compare", "input is not a TSlave object");
316  return 0;
317  }
318 
319  if (fPerfIdx > sl->GetPerfIdx()) return 1;
320  if (fPerfIdx < sl->GetPerfIdx()) return -1;
321  const char *myord = GetOrdinal();
322  const char *otherord = sl->GetOrdinal();
323  while (myord && otherord) {
324  Int_t myval = atoi(myord);
325  Int_t otherval = atoi(otherord);
326  if (myval < otherval) return 1;
327  if (myval > otherval) return -1;
328  myord = strchr(myord, '.');
329  if (myord) myord++;
330  otherord = strchr(otherord, '.');
331  if (otherord) otherord++;
332  }
333  if (myord) return -1;
334  if (otherord) return 1;
335  return 0;
336 }
337 
338 ////////////////////////////////////////////////////////////////////////////////
339 /// Printf info about slave.
340 
341 void TSlave::Print(Option_t *) const
342 {
343  TString sc;
344 
345  const char *sst[] = { "invalid" , "valid", "inactive" };
346  Int_t st = fSocket ? ((fStatus == kInactive) ? 2 : 1) : 0;
347 
348  Printf("*** Worker %s (%s)", fOrdinal.Data(), sst[st]);
349  Printf(" Host name: %s", GetName());
350  Printf(" Port number: %d", GetPort());
351  Printf(" Worker session tag: %s", GetSessionTag());
352  Printf(" ROOT version|rev|tag: %s", GetROOTVersion());
353  Printf(" Architecture-Compiler: %s", GetArchCompiler());
354  if (fSocket) {
355  if (strlen(GetGroup()) > 0) {
356  Printf(" User/Group: %s/%s", GetUser(), GetGroup());
357  } else {
358  Printf(" User: %s", GetUser());
359  }
360  if (fSocket->GetSecContext())
361  Printf(" Security context: %s", fSocket->GetSecContext()->AsString(sc));
362  Printf(" Proofd protocol version: %d", fSocket->GetRemoteProtocol());
363  Printf(" Image name: %s", GetImage());
364  Printf(" Working directory: %s", GetWorkDir());
365  Printf(" Performance index: %d", GetPerfIdx());
366  Printf(" MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
367  Printf(" MB's sent: %.2f", float(fSocket->GetBytesRecv())/(1024*1024));
368  Printf(" MB's received: %.2f", float(fSocket->GetBytesSent())/(1024*1024));
369  Printf(" Real time used (s): %.3f", GetRealTime());
370  Printf(" CPU time used (s): %.3f", GetCpuTime());
371  } else {
372  if (strlen(GetGroup()) > 0) {
373  Printf(" User/Group: %s/%s", GetUser(), GetGroup());
374  } else {
375  Printf(" User: %s", GetUser());
376  }
377  Printf(" Security context:");
378  Printf(" Proofd protocol version:");
379  Printf(" Image name: %s", GetImage());
380  Printf(" Working directory: %s", GetWorkDir());
381  Printf(" Performance index: %d", GetPerfIdx());
382  Printf(" MB's processed: %.2f", float(GetBytesRead())/(1024*1024));
383  Printf(" MB's sent:");
384  Printf(" MB's received:");
385  Printf(" Real time used (s): %.3f", GetRealTime());
386  Printf(" CPU time used (s): %.3f", GetCpuTime());
387  }
388 }
389 
390 ////////////////////////////////////////////////////////////////////////////////
391 /// Adopt and register input handler for this slave. Handler will be deleted
392 /// by the slave.
393 
394 void TSlave::SetInputHandler(TFileHandler *ih)
395 {
396  fInput = ih;
397  fInput->Add();
398 }
399 
400 ////////////////////////////////////////////////////////////////////////////////
401 /// Setup authentication related stuff for old versions.
402 /// Provided for backward compatibility.
403 
404 Int_t TSlave::OldAuthSetup(Bool_t master, TString wconf)
405 {
406  static OldSlaveAuthSetup_t oldAuthSetupHook = 0;
407 
408  if (!oldAuthSetupHook) {
409  // Load libraries needed for (server) authentication ...
410  TString authlib = "libRootAuth";
411  char *p = 0;
412  // The generic one
413  if ((p = gSystem->DynamicPathName(authlib, kTRUE))) {
414  delete[] p;
415  if (gSystem->Load(authlib) == -1) {
416  Error("OldAuthSetup", "can't load %s",authlib.Data());
417  return kFALSE;
418  }
419  } else {
420  Error("OldAuthSetup", "can't locate %s",authlib.Data());
421  return -1;
422  }
423  //
424  // Locate OldSlaveAuthSetup
425  Func_t f = gSystem->DynFindSymbol(authlib,"OldSlaveAuthSetup");
426  if (f)
427  oldAuthSetupHook = (OldSlaveAuthSetup_t)(f);
428  else {
429  Error("OldAuthSetup", "can't find OldSlaveAuthSetup");
430  return -1;
431  }
432  }
433  //
434  // Setup
435  return (*oldAuthSetupHook)(fSocket, master, fOrdinal, wconf);
436 }
437 
438 ////////////////////////////////////////////////////////////////////////////////
439 /// Static method returning the appropriate TSlave object for the remote
440 /// server.
441 
442 TSlave *TSlave::Create(const char *url, const char *ord, Int_t perf,
443  const char *image, TProof *proof, Int_t stype,
444  const char *workdir, const char *msd, Int_t nwk)
445 {
446  TSlave *s = 0;
447 
448  // Check if we are setting up a lite version
449  if (!strcmp(url, "lite")) {
450  return new TSlaveLite(ord, perf, image, proof, stype, workdir, msd);
451  }
452 
453  // No need to try a XPD connection in some well defined cases
454  Bool_t tryxpd = kTRUE;
455  if (!(proof->IsMaster())) {
456  if (proof->IsProofd())
457  tryxpd = kFALSE;
458  } else {
459  if (gApplication && (gApplication->Argc() < 3 ||
460  (gApplication->Argc() > 2 && gApplication->Argv(2) &&
461  strncmp(gApplication->Argv(2),"xpd",3))))
462  tryxpd = kFALSE;
463  }
464 
465  // We do this without the plugin manager because it blocks the CINT mutex
466  // breaking the parallel startup
467  if (!fgTXSlaveHook) {
468 
469  // Load the library containing TXSlave ...
470  TString proofxlib = "libProofx";
471  char *p = 0;
472  if ((p = gSystem->DynamicPathName(proofxlib, kTRUE))) {
473  delete[] p;
474  if (gSystem->Load(proofxlib) == -1)
475  ::Error("TSlave::Create", "can't load %s", proofxlib.Data());
476  } else
477  ::Error("TSlave::Create", "can't locate %s", proofxlib.Data());
478  }
479 
480  // Load the right class
481  if (fgTXSlaveHook && tryxpd) {
482  s = (*fgTXSlaveHook)(url, ord, perf, image, proof, stype, workdir, msd, nwk);
483  } else {
484  s = new TSlave(url, ord, perf, image, proof, stype, workdir, msd);
485  }
486 
487  return s;
488 }
489 
490 ////////////////////////////////////////////////////////////////////////////////
491 /// Ping the remote master or slave servers.
492 /// Returns 0 if ok, -1 in case of error
493 
494 Int_t TSlave::Ping()
495 {
496  if (!IsValid()) return -1;
497 
498  TMessage mess(kPROOF_PING | kMESS_ACK);
499  fSocket->Send(mess);
500  if (fSocket->Send(mess) == -1) {
501  Warning("Ping","%s: acknowledgement not received", GetOrdinal());
502  return -1;
503  }
504  return 0;
505 }
506 
507 ////////////////////////////////////////////////////////////////////////////////
508 /// Send interrupt OOB byte to master or slave servers.
509 /// Returns 0 if ok, -1 in case of error
510 
511 void TSlave::Interrupt(Int_t type)
512 {
513  if (!IsValid()) return;
514 
515  char oobc = (char) type;
516  const int kBufSize = 1024;
517  char waste[kBufSize];
518 
519  // Send one byte out-of-band message to server
520  if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
521  Error("Interrupt", "error sending oobc to slave %s", GetOrdinal());
522  return;
523  }
524 
525  if (type == TProof::kHardInterrupt) {
526  char oob_byte;
527  int n, nch, nbytes = 0, nloop = 0;
528 
529  // Receive the OOB byte
530  while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
531  if (n == -2) { // EWOULDBLOCK
532  //
533  // The OOB data has not yet arrived: flush the input stream
534  //
535  // In some systems (Solaris) regular recv() does not return upon
536  // receipt of the oob byte, which makes the below call to recv()
537  // block indefinitely if there are no other data in the queue.
538  // FIONREAD ioctl can be used to check if there are actually any
539  // data to be flushed. If not, wait for a while for the oob byte
540  // to arrive and try to read it again.
541  //
542  fSocket->GetOption(kBytesToRead, nch);
543  if (nch == 0) {
544  gSystem->Sleep(1000);
545  continue;
546  }
547 
548  if (nch > kBufSize) nch = kBufSize;
549  n = fSocket->RecvRaw(waste, nch);
550  if (n <= 0) {
551  Error("Interrupt", "error receiving waste from slave %s",
552  GetOrdinal());
553  break;
554  }
555  nbytes += n;
556  } else if (n == -3) { // EINVAL
557  //
558  // The OOB data has not arrived yet
559  //
560  gSystem->Sleep(100);
561  if (++nloop > 100) { // 10 seconds time-out
562  Error("Interrupt", "server %s does not respond", GetOrdinal());
563  break;
564  }
565  } else {
566  Error("Interrupt", "error receiving OOB from server %s",
567  GetOrdinal());
568  break;
569  }
570  }
571 
572  //
573  // Continue flushing the input socket stream until the OOB
574  // mark is reached
575  //
576  while (1) {
577  int atmark;
578 
579  fSocket->GetOption(kAtMark, atmark);
580 
581  if (atmark)
582  break;
583 
584  // find out number of bytes to read before atmark
585  fSocket->GetOption(kBytesToRead, nch);
586  if (nch == 0) {
587  gSystem->Sleep(1000);
588  continue;
589  }
590 
591  if (nch > kBufSize) nch = kBufSize;
592  n = fSocket->RecvRaw(waste, nch);
593  if (n <= 0) {
594  Error("Interrupt", "error receiving waste (2) from slave %s",
595  GetOrdinal());
596  break;
597  }
598  nbytes += n;
599  }
600  if (nbytes > 0) {
601  if (fProof->IsMaster())
602  Info("Interrupt", "slave %s:%s synchronized: %d bytes discarded",
603  GetName(), GetOrdinal(), nbytes);
604  else
605  Info("Interrupt", "PROOF synchronized: %d bytes discarded", nbytes);
606  }
607 
608  // Get log file from master or slave after a hard interrupt
609  fProof->Collect(this);
610 
611  } else if (type == TProof::kSoftInterrupt) {
612 
613  // Get log file from master or slave after a soft interrupt
614  fProof->Collect(this);
615 
616  } else if (type == TProof::kShutdownInterrupt) {
617 
618  ; // nothing expected to be returned
619 
620  } else {
621 
622  // Unexpected message, just receive log file
623  fProof->Collect(this);
624  }
625 }
626 
627 ////////////////////////////////////////////////////////////////////////////////
628 /// Sent stop/abort request to PROOF server.
629 
630 void TSlave::StopProcess(Bool_t abort, Int_t timeout)
631 {
632  // Notify the remote counterpart
633  TMessage msg(kPROOF_STOPPROCESS);
634  msg << abort;
635  if (fProof->fProtocol > 9)
636  msg << timeout;
637  fSocket->Send(msg);
638 }
639 
640 ////////////////////////////////////////////////////////////////////////////////
641 /// Send message to intermediate coordinator. Only meaningful when there is one,
642 /// i.e. in XPD framework
643 
644 TObjString *TSlave::SendCoordinator(Int_t, const char *, Int_t)
645 {
646  if (gDebug > 0)
647  Info("SendCoordinator","method not implemented for this communication layer");
648  return 0;
649 }
650 
651 ////////////////////////////////////////////////////////////////////////////////
652 /// Set an alias for this session. If reconnection is supported, the alias
653 /// will be communicated to the remote coordinator so that it can be recovered
654 /// when reconnecting
655 
656 void TSlave::SetAlias(const char *)
657 {
658  if (gDebug > 0)
659  Info("SetAlias","method not implemented for this communication layer");
660  return;
661 }
662 
663 ////////////////////////////////////////////////////////////////////////////////
664 /// Set hook to TXSlave ctor
665 
666 void TSlave::SetTXSlaveHook(TSlave_t xslavehook)
667 {
668  fgTXSlaveHook = xslavehook;
669 }