39 #include <sys/types.h>
47 Bool_t TARInterruptHandler::Notify()
49 Info(
"Notify",
"Processing interrupt signal ...");
52 fApplicationRemote->Interrupt(kRRI_Hard);
58 ClassImp(TApplicationRemote);
60 static const char *gScript =
"roots";
61 static const char *gScriptCmd =
"\\\"%s %d localhost:%d/%s -d=%d\\\"";
63 static const char *gSshCmd =
"ssh %s -f4 %s -R %d:localhost:%d sh -c \
64 \"'(sh=\\`basename \'\\\\\\$SHELL\'\\`; \
65 if test xbash = x\'\\\\\\$sh\' -o xsh = x\'\\\\\\$sh\' -o xzsh = x\'\\\\\\$sh\' -o xdash = x\'\\\\\\$sh\'; then \
66 \'\\\\\\$SHELL\' -l -c %s; \
67 elif test xcsh = x\'\\\\\\$sh\' -o xtcsh = x\'\\\\\\$sh\' -o xksh = x\'\\\\\\$sh\'; then \
68 \'\\\\\\$SHELL\' -c %s; \
70 echo \\\"Unknown shell \'\\\\\\$SHELL\'\\\"; \
73 static const char *gSshCmd =
"ssh %s -f4 %s -R %d:localhost:%d sh -c \
74 \"'(sh=`basename $SHELL`; \
75 if test xbash = x$sh -o xsh = x$sh -o xzsh = x$sh -o xdash = x$sh; then \
77 elif test xcsh = x$sh -o xtcsh = x$sh -o xksh = x$sh; then \
80 echo \"Unknown shell $SHELL\"; \
84 Int_t TApplicationRemote::fgPortAttempts = 100;
85 Int_t TApplicationRemote::fgPortLower = 49152;
86 Int_t TApplicationRemote::fgPortUpper = 65535;
93 TApplicationRemote::TApplicationRemote(
const char *url, Int_t debug,
95 : TApplication(), fUrl(url)
98 fName = fUrl.GetHost();
99 if (strlen(fUrl.GetOptions()) > 0)
100 fName += Form(
"-%s", fUrl.GetOptions());
101 UserGroup_t *pw = gSystem->GetUserInfo(gSystem->GetEffectiveUid());
102 TString user = (pw) ? (
const char*) pw->fUser :
"";
104 if (strlen(fUrl.GetUser()) > 0 && user != fUrl.GetUser())
105 fName.Insert(0,Form(
"%s@", fUrl.GetUser()));
114 ResetBit(kCollecting);
118 Int_t na = fgPortAttempts;
119 Long64_t now = gSystem->Now();
120 std::default_random_engine randomEngine(now);
121 std::uniform_int_distribution<Int_t> randomPort(fgPortLower, fgPortUpper);
122 TServerSocket *ss = 0;
124 port = randomPort(randomEngine);
125 ss =
new TServerSocket(port);
129 if (!ss || !ss->IsValid()) {
130 Error(
"TApplicationRemote",
"unable to find a free port for connections");
131 SetBit(kInvalidObject);
136 TMonitor *mon =
new TMonitor;
140 Int_t rport = (port < fgPortUpper) ? port + 1 : port - 1;
141 TString sc = gScript;
142 if (script && *script) {
144 if (script[1] ==
'<') {
146 sc.Form(
"source %s; %s", script+2, gScript);
148 Error(
"TApplicationRemote",
"illegal script name <");
152 sc.ReplaceAll(
"\"",
"");
153 TString userhost = fUrl.GetHost();
154 if (strlen(fUrl.GetUser()) > 0)
155 userhost.Insert(0, Form(
"%s@", fUrl.GetUser()));
156 const char *verb =
"";
160 scriptCmd.Form(gScriptCmd, sc.Data(), kRRemote_Protocol, rport, fUrl.GetFile(), debug);
162 cmd.Form(gSshCmd, verb, userhost.Data(), rport, port, scriptCmd.Data(), scriptCmd.Data());
165 TApplication::NeedGraphicsLibs();
166 gApplication->InitializeGraphics();
169 Info(
"TApplicationRemote",
"executing: %s", cmd.Data());
170 if (gSystem->Exec(cmd) != 0) {
171 Info(
"TApplicationRemote",
"an error occured during SSH connection");
172 mon->DeActivateAll();
176 SetBit(kInvalidObject);
184 if (!(fSocket = ss->Accept())) {
185 Error(
"TApplicationRemote",
"failed to open connection");
186 SetBit(kInvalidObject);
191 mon->DeActivateAll();
198 if (fSocket->Recv(buf,
sizeof(buf), what) <= 0) {
199 Error(
"TApplicationRemote",
"failed to receive startup message");
201 SetBit(kInvalidObject);
207 if (fSocket->Recv(fProtocol, what) != 2*
sizeof(Int_t)) {
208 Error(
"TApplicationRemote",
"failed to receive remote server protocol");
210 SetBit(kInvalidObject);
213 if (fProtocol != kRRemote_Protocol)
214 Info(
"TApplicationRemote",
"server runs a different protocol version: %d (vs %d)",
215 fProtocol, kRRemote_Protocol);
219 if (fSocket->Recv(msg) < 0 || msg->What() != kMESS_ANY) {
220 Error(
"TApplicationRemote",
"failed to receive server info - protocol error");
222 SetBit(kInvalidObject);
228 (*msg) >> hostname >> fLogFilePath;
229 fUrl.SetHost(hostname);
232 fMonitor =
new TMonitor;
233 fMonitor->Add(fSocket);
236 fIntHandler =
new TARInterruptHandler(
this);
239 gROOT->GetListOfSockets()->Remove(fSocket);
240 gROOT->GetListOfSockets()->Add(
this);
242 fRootFiles =
new TList;
243 fRootFiles->SetName(
"Files");
255 TApplicationRemote::~TApplicationRemote()
257 gROOT->GetListOfSockets()->Remove(
this);
265 Int_t TApplicationRemote::Broadcast(
const TMessage &mess)
267 if (!IsValid())
return -1;
269 if (fSocket->Send(mess) == -1) {
270 Error(
"Broadcast",
"could not send message");
282 Int_t TApplicationRemote::Broadcast(
const char *str, Int_t kind, Int_t type)
285 if (kind == kMESS_ANY)
287 if (str) mess.WriteString(str);
288 return Broadcast(mess);
296 Int_t TApplicationRemote::BroadcastObject(
const TObject *obj, Int_t kind)
299 mess.WriteObject(obj);
300 return Broadcast(mess);
307 Int_t TApplicationRemote::BroadcastRaw(
const void *buffer, Int_t length)
309 if (!IsValid())
return -1;
311 if (fSocket->SendRaw(buffer, length) == -1) {
312 Error(
"Broadcast",
"could not send raw buffer");
325 Int_t TApplicationRemote::Collect(Long_t timeout)
328 fMonitor->ActivateAll();
329 if (!fMonitor->GetActive())
333 Long_t nto = timeout;
335 Info(
"Collect",
"active: %d", fMonitor->GetActive());
344 Int_t rc = 0, cnt = 0;
345 while (fMonitor->GetActive() && (nto < 0 || nto > 0)) {
348 TSocket *s = fMonitor->Select(1000);
350 if (s && s != (TSocket *)(-1)) {
352 if ((rc = CollectInput()) != 0) {
354 fMonitor->DeActivate(s);
356 Info(
"Collect",
"deactivating %p", s);
368 fMonitor->DeActivateAll();
370 if (s == (TSocket *)(-1) && nto > 0)
376 ResetBit(kCollecting);
380 fMonitor->DeActivateAll();
384 fIntHandler->Remove();
393 Int_t TApplicationRemote::CollectInput()
401 Bool_t delete_mess = kTRUE;
403 if (fSocket->Recv(mess) < 0) {
404 SetBit(kInvalidObject);
410 SetBit(kInvalidObject);
418 Info(
"CollectInput",
"what %d", what);
424 TObject *o = mess->ReadObject(mess->GetClass());
426 if (TString(o->ClassName()) ==
"TCanvas")
428 else if (TString(o->ClassName()) ==
"TRemoteObject") {
429 TRemoteObject *robj = (TRemoteObject *)o;
430 if (TString(robj->GetClassName()) ==
"TSystemDirectory") {
431 if (fWorkingDir == 0) {
432 fWorkingDir = (TRemoteObject *)o;
436 else if (TString(o->ClassName()) ==
"TList") {
437 TList *list = (TList *)o;
438 TRemoteObject *robj = (TRemoteObject *)list->First();
439 if (robj && (TString(robj->GetClassName()) ==
"TFile")) {
441 while ((robj = (TRemoteObject *)next())) {
442 if (!fRootFiles->FindObject(robj->GetName()))
443 fRootFiles->Add(robj);
445 gROOT->RefreshBrowsers();
458 Info(
"CollectInput",
"type %d", type);
464 mess->ReadString(str,
sizeof(str));
465 obj = gDirectory->Get(str);
467 fSocket->SendObject(obj);
469 Warning(
"CollectInput",
470 "server requested an object that we do not have");
471 fSocket->Send(kMESS_NOTOK);
493 SetBit(kInvalidObject);
496 Info(
"CollectInput",
"kRTT_LogDone: status %d", st);
504 (*mess) >> msg >> lfeed;
506 fprintf(stderr,
"%s\n", msg.Data());
508 fprintf(stderr,
"%s\r", msg.Data());
516 TMessage m(kMESS_ANY);
517 m << (Int_t) kRRT_SendFile;
520 char *imp = gSystem->Which(TROOT::GetMacroPath(), fname, kReadPermission);
522 Error(
"CollectInput",
"file %s not found in path(s) %s",
523 fname.Data(), TROOT::GetMacroPath());
524 m << (Bool_t) kFALSE;
527 TString impfile = imp;
529 Int_t dot = impfile.Last(
'.');
532 Bool_t hasHeader = kTRUE;
533 TString headfile = impfile;
535 headfile.Remove(dot);
537 if (gSystem->AccessPathName(headfile, kReadPermission)) {
538 TString h = headfile;
539 headfile.Remove(dot);
541 if (gSystem->AccessPathName(headfile, kReadPermission)) {
544 Info(
"CollectInput",
"no associated header file"
545 " found: tried: %s %s",
546 h.Data(), headfile.Data());
553 if (SendFile(impfile, kForce) == -1) {
554 Info(
"CollectInput",
"problems sending file %s", impfile.Data());
559 if (SendFile(headfile, kForce) == -1) {
560 Info(
"CollectInput",
"problems sending file %s", headfile.Data());
567 m << (Int_t) kRRT_SendFile;
568 m << (Bool_t) kFALSE;
574 Warning(
"CollectInput",
"unknown type received from server: %d", type);
582 Error(
"CollectInput",
"unknown command received from server: %d", what);
583 SetBit(kInvalidObject);
601 void TApplicationRemote::RecvLogFile(Int_t size)
603 const Int_t kMAXBUF = 16384;
607 Int_t fdout = fileno(stdout);
609 Warning(
"RecvLogFile",
"file descriptor for outputs undefined (%d):"
610 " will not log msgs", fdout);
613 lseek(fdout, (off_t) 0, SEEK_END);
618 while (filesize < size) {
619 left = Int_t(size - filesize);
622 rec = fSocket->RecvRaw(&buf, left);
623 filesize = (rec > 0) ? (filesize + rec) : filesize;
631 w = write(fdout, p, r);
634 SysError(
"RecvLogFile",
"error writing to unit: %d", fdout);
640 }
else if (rec < 0) {
641 Error(
"RecvLogFile",
"error during receiving log file");
651 Int_t TApplicationRemote::SendObject(
const TObject *obj)
653 if (!IsValid() || !obj)
return -1;
655 TMessage mess(kMESS_OBJECT);
656 mess.WriteObject(obj);
657 return Broadcast(mess);
673 Bool_t TApplicationRemote::CheckFile(
const char *file, Long_t modtime)
675 Bool_t sendto = kFALSE;
677 if (!IsValid())
return -1;
680 TString fn = gSystem->BaseName(file);
684 if (fFileList && (fs = (TARFileStat *) fFileList->FindObject(fn))) {
686 if (fs->fModtime != modtime) {
687 TMD5 *md5 = TMD5::FileChecksum(file);
689 if ((*md5) != fs->fMD5) {
692 fs->fModtime = modtime;
696 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
702 TMD5 *md5 = TMD5::FileChecksum(file);
704 fs =
new TARFileStat(fn, md5, modtime);
706 fFileList =
new THashList;
710 Error(
"CheckFile",
"could not calculate local MD5 check sum - dont send");
713 TMessage mess(kMESS_ANY);
714 mess << Int_t(kRRT_CheckFile) << TString(gSystem->BaseName(file)) << fs->fMD5;
718 fSocket->Recv(reply);
720 if (reply->What() == kMESS_ANY) {
724 (*reply) >> type >> uptodate;
725 if (type != kRRT_CheckFile) {
727 Warning(
"CheckFile",
"received wrong type:"
728 " %d (expected %d): protocol error?",
729 type, (Int_t)kRRT_CheckFile);
731 sendto = uptodate ? kFALSE : kTRUE;
734 Error(
"CheckFile",
"received wrong message: %d (expected %d)",
735 reply->What(), kMESS_ANY);
738 Error(
"CheckFile",
"received empty message");
761 Int_t TApplicationRemote::SendFile(
const char *file, Int_t opt,
const char *rfile)
763 if (!IsValid())
return -1;
766 Int_t fd = open(file, O_RDONLY);
768 Int_t fd = open(file, O_RDONLY | O_BINARY);
771 SysError(
"SendFile",
"cannot open file %s", file);
777 Long_t id, flags, modtime;
778 if (gSystem->GetPathInfo(file, &
id, &size, &flags, &modtime) == 1) {
779 Error(
"SendFile",
"cannot stat file %s", file);
784 Error(
"SendFile",
"empty file %s", file);
790 Bool_t bin = (opt & kBinary) ? kTRUE : kFALSE;
791 Bool_t force = (opt & kForce) ? kTRUE : kFALSE;
793 const Int_t kMAXBUF = 32768;
796 const char *fnam = (rfile) ? rfile : gSystem->BaseName(file);
798 Bool_t sendto = force ? kTRUE : CheckFile(file, modtime);
802 size = sendto ? size : 0;
804 if (gDebug > 1 && size > 0)
805 Info(
"SendFile",
"sending file %s", file);
807 snprintf(buf, kMAXBUF,
"%s %d %lld", fnam, bin, size);
808 if (Broadcast(buf, kMESS_ANY, kRRT_File) == -1) {
816 lseek(fd, 0, SEEK_SET);
820 while ((len = read(fd, buf, kMAXBUF)) < 0 && TSystem::GetErrno() == EINTR)
821 TSystem::ResetErrno();
824 SysError(
"SendFile",
"error reading from file %s", file);
830 if (len > 0 && fSocket->SendRaw(buf, len) == -1) {
831 SysError(
"SendFile",
"error writing to server @ %s:%d (now offline)",
832 fUrl.GetHost(), fUrl.GetPort());
842 if (!TestBit(kCollecting))
846 return IsValid() ? 0 : -1;
852 void TApplicationRemote::Terminate(Int_t status)
854 TMessage mess(kMESS_ANY);
855 mess << (Int_t)kRRT_Terminate << status;
858 SafeDelete(fRootFiles);
859 SafeDelete(fMonitor);
866 void TApplicationRemote::SetPortParam(Int_t lower, Int_t upper, Int_t attempts)
873 fgPortAttempts = attempts;
875 ::Info(
"TApplicationRemote::SetPortParam",
"port scan: %d attempts in [%d,%d]",
876 fgPortAttempts, fgPortLower, fgPortUpper);
885 Long_t TApplicationRemote::ProcessLine(
const char *line, Bool_t, Int_t *)
887 if (!line || !*line)
return 0;
889 if (!strncasecmp(line,
".q", 2)) {
891 gApplication->ProcessLine(
".R -close");
895 if (!strncmp(line,
"?", 1)) {
903 InitializeGraphics();
906 Broadcast(line, kMESS_CINT);
912 return (Long_t)fReceivedObject;
919 void TApplicationRemote::Print(Option_t *opt)
const
921 TString s(Form(
"OBJ: TApplicationRemote %s", fName.Data()));
922 Printf(
"%s", s.Data());
923 if (opt && opt[0] ==
'F') {
925 if (strlen(fUrl.GetUser()) > 0)
926 s += Form(
"%s@", fUrl.GetUser());
927 s += fUrl.GetHostFQDN();
928 s += Form(
" logfile: %s", fLogFilePath.Data());
929 Printf(
"%s", s.Data());
936 void TApplicationRemote::Interrupt(Int_t type)
938 if (!IsValid())
return;
943 Info(
"Interrupt",
"*** Ctrl-C not yet enabled *** (type= %d)", type);
947 char oobc = (char) type;
948 const int kBufSize = 1024;
949 char waste[kBufSize];
952 if (fSocket->SendRaw(&oobc, 1, kOob) <= 0) {
953 Error(
"Interrupt",
"error sending oobc to server");
957 if (type == kRRI_Hard) {
959 int n, nch, nbytes = 0, nloop = 0;
962 while ((n = fSocket->RecvRaw(&oob_byte, 1, kOob)) < 0) {
974 fSocket->GetOption(kBytesToRead, nch);
976 gSystem->Sleep(1000);
980 if (nch > kBufSize) nch = kBufSize;
981 n = fSocket->RecvRaw(waste, nch);
983 Error(
"Interrupt",
"error receiving waste from server");
987 }
else if (n == -3) {
993 Error(
"Interrupt",
"server does not respond");
997 Error(
"Interrupt",
"error receiving OOB from server");
1009 fSocket->GetOption(kAtMark, atmark);
1015 fSocket->GetOption(kBytesToRead, nch);
1017 gSystem->Sleep(1000);
1021 if (nch > kBufSize) nch = kBufSize;
1022 n = fSocket->RecvRaw(waste, nch);
1024 Error(
"Interrupt",
"error receiving waste (2) from server");
1030 Info(
"Interrupt",
"server synchronized: %d bytes discarded", nbytes);
1035 }
else if (type == kRRI_Soft) {
1040 }
else if (type == kRRI_Shutdown) {
1055 void TApplicationRemote::Browse(TBrowser *b)
1057 b->Add(fRootFiles,
"ROOT Files");
1058 b->Add(fWorkingDir, fWorkingDir->GetTitle());
1059 gROOT->RefreshBrowsers();