27 #include "Xrd/XrdBuffer.hh"
29 #include "Xrd/XrdScheduler.hh"
30 #include "XrdNet/XrdNet.hh"
31 #include "XrdOuc/XrdOucRash.hh"
32 #include "XrdOuc/XrdOucStream.hh"
34 #include "XrdSys/XrdSysPlugin.hh"
52 XrdProofGroupMgr *fGroupMgr;
54 } XpdBroadcastPriority_t;
56 XrdProofdManager *fMgr;
57 XrdProofdClient *fClient;
63 #define PutEnv(x,e) { if (e) { putenv(x); } else { delete[] x; } }
69 static XpdManagerCron_t fManagerCron;
82 void *XrdProofdProofServCron(
void *p)
84 XPDLOC(SMGR,
"ProofServCron")
86 XpdManagerCron_t *mc = (XpdManagerCron_t *)p;
87 XrdProofdProofServMgr *mgr = mc->fSessionMgr;
88 XrdProofSched *sched = mc->fProofSched;
90 TRACE(XERR,
"undefined session manager: cannot start");
96 int quickcheckfreq = 5;
100 int lastrun = time(0);
101 int lastcheck = lastrun, ckfreq = mgr->CheckFrequency(), waitt = 0;
102 int deltat = ((int)(0.1*ckfreq) >= 1) ? (
int)(0.1*ckfreq) : 1;
103 int maxdelay = 5*ckfreq;
104 mgr->SetNextSessionsCheck(lastcheck + ckfreq);
105 TRACE(ALL,
"next full sessions check in "<<ckfreq<<
" secs");
110 waitt = ckfreq - (time(0) - lastcheck);
111 if (waitt > quickcheckfreq || waitt <= 0)
112 waitt = quickcheckfreq;
113 int pollRet = mgr->Pipe()->Poll(waitt);
119 if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
120 TRACE(XERR,
"problems receiving message; errno: "<<-rc);
124 if (msg.Type() == XrdProofdProofServMgr::kSessionRemoval) {
127 if ((rc = msg.Get(fpid)) != 0) {
128 TRACE(XERR,
"kSessionRemoval: problems receiving process ID (buf: '"<<
129 msg.Buf()<<
"'); errno: "<<-rc);
132 XrdSysMutexHelper mhp(mgr->Mutex());
134 mgr->DeleteFromSessions(fpid.c_str());
136 mgr->MvSession(fpid.c_str());
139 if (sched->Pipe()->Post(XrdProofSched::kReschedule, 0) != 0) {
140 TRACE(XERR,
"kSessionRemoval: problem posting the scheduler pipe");
144 TRACE(REQ,
"kSessionRemoval: session: "<<fpid<<
145 " has been removed from the active list");
146 }
else if (msg.Type() == XrdProofdProofServMgr::kClientDisconnect) {
148 TRACE(XERR,
"obsolete type: XrdProofdProofServMgr::kClientDisconnect");
149 }
else if (msg.Type() == XrdProofdProofServMgr::kCleanSessions) {
151 XpdSrvMgrCreateCnt cnt(mgr, XrdProofdProofServMgr::kCleanSessionsCnt);
154 int svrtype = kXPD_AnyServer;
155 rc = (rc == 0) ? msg.Get(svrtype) : rc;
157 TRACE(XERR,
"kCleanSessions: problems parsing message (buf: '"<<
158 msg.Buf()<<
"'); errno: "<<-rc);
162 TRACE(REQ,
"kCleanSessions: request for user: '"<<usr<<
"', server type: "<<svrtype);
164 mgr->CleanClientSessions(usr.c_str(), svrtype);
166 mgr->CleanupLostProofServ();
167 }
else if (msg.Type() == XrdProofdProofServMgr::kProcessReq) {
169 mgr->ProcessSem()->Post();
170 }
else if (msg.Type() == XrdProofdProofServMgr::kChgSessionSt) {
172 mgr->BroadcastClusterInfo();
174 TRACE(XERR,
"unknown type: "<<msg.Type());
183 int cnt = mgr->CheckCounter(XrdProofdProofServMgr::kProcessCnt);
185 if ((now - lastrun) < maxdelay) {
187 lastcheck = now + 5 - ckfreq;
188 mgr->SetNextSessionsCheck(now + 5);
190 TRACE(ALL,
"postponing sessions check (will retry in 5 secs)");
194 TRACE(ALL,
"Max time without checks reached ("<<maxdelay<<
"): force a session check");
196 mgr->UpdateCounter(XrdProofdProofServMgr::kProcessCnt, -cnt);
200 bool full = (now > mgr->NextSessionsCheck() - deltat) ? 1 : 0;
203 mgr->CheckActiveSessions();
204 mgr->CheckTerminatedSessions();
205 if (clnlostscale <= 0) {
206 mgr->CleanupLostProofServ();
212 int cursess = mgr->CurrentSessions(1);
213 TRACE(ALL, cursess <<
" sessions are currently active");
217 mgr->SetNextSessionsCheck(lastcheck + mgr->CheckFrequency());
219 TRACE(ALL,
"next sessions check in "<<mgr->CheckFrequency()<<
" secs");
221 TRACE(HDBG,
"nothing to do; "<<mgr->NextSessionsCheck()-now<<
" secs to full check");
240 void *XrdProofdProofServRecover(
void *p)
242 XPDLOC(SMGR,
"ProofServRecover")
244 XpdManagerCron_t *mc = (XpdManagerCron_t *)p;
245 XrdProofdProofServMgr *mgr = mc->fSessionMgr;
247 TRACE(XERR,
"undefined session manager: cannot start");
252 int rc = mgr->RecoverActiveSessions();
256 TRACE(ALL,
"timeout recovering sessions: "<<rc<<
" sessions not recovered");
258 TRACE(XERR,
"some problem occured while recovering sessions");
260 TRACE(ALL,
"recovering successfully terminated");
270 XrdProofdProofServMgr::XrdProofdProofServMgr(XrdProofdManager *mgr,
271 XrdProtocol_Config *pi, XrdSysError *e)
272 : XrdProofdConfig(pi->ConfigFN, e), fProcessSem(0)
274 XPDLOC(SMGR,
"XrdProofdProofServMgr")
277 fLogger = pi->eDest->logger();
279 fActiveSessions.clear();
283 fReconnectTimeOut = 300;
284 fNextSessionsCheck = -1;
286 for (
int i = 0; i < PSMMAXCNTS; i++) {
289 fCurrentSessions = 0;
296 fCheckFrequency = 30;
297 fTerminationTimeOut = fCheckFrequency - 10;
298 fVerifyTimeOut = 3 * fCheckFrequency;
299 fRecoverTimeOut = 10;
302 fParentExecs =
"xproofd,xrootd";
306 fRecoverDeadline = -1;
309 if (!fPipe.IsValid()) {
310 TRACE(XERR,
"unable to generate pipe for the session poller");
315 RegisterDirectives();
322 int XrdProofdProofServMgr::Config(
bool rcf)
324 XPDLOC(SMGR,
"ProofServMgr::Config")
326 XrdSysMutexHelper mhp(fEnvsMutex);
328 bool notify = (rcf) ? 0 : 1;
329 if (rcf && ReadFile(0)) {
331 fProofServRCs.clear();
332 fProofServEnvs.clear();
338 if (XrdProofdConfig::Config(rcf) != 0) {
339 TRACE(XERR,
"problems parsing file ");
344 msg = (rcf) ?
"re-configuring" :
"configuring";
345 if (notify) XPDPRT(msg);
348 XPDFORM(msg,
"setting internal timeout to %d secs", fInternalWait);
349 if (notify) XPDPRT(msg);
352 msg =
"client sessions shutdown after disconnection";
353 if (fShutdownOpt > 0) {
354 XPDFORM(msg,
"client sessions kept %sfor %d secs after disconnection",
355 (fShutdownOpt == 1) ?
"idle " :
"", fShutdownDelay);
357 if (notify) XPDPRT(msg);
361 fActiAdminPath = fMgr->AdminPath();
362 fActiAdminPath +=
"/activesessions";
363 fTermAdminPath = fMgr->AdminPath();
364 fTermAdminPath +=
"/terminatedsessions";
368 XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui);
369 if (XrdProofdAux::AssertDir(fActiAdminPath.c_str(), ui, 1) != 0) {
370 TRACE(XERR,
"unable to assert the admin path: "<<fActiAdminPath);
374 XPDPRT(
"active sessions admin path set to: "<<fActiAdminPath);
376 if (XrdProofdAux::AssertDir(fTermAdminPath.c_str(), ui, 1) != 0) {
377 TRACE(XERR,
"unable to assert the admin path "<<fTermAdminPath);
381 XPDPRT(
"terminated sessions admin path set to "<<fTermAdminPath);
385 XPDPRT(
"RC settings: "<< fProofServRCs.size());
386 if (fProofServRCs.size() > 0) {
387 std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
388 for ( ; ircs != fProofServRCs.end(); ++ircs) { (*ircs).Print(
"rc"); }
390 XPDPRT(
"ENV settings: "<< fProofServEnvs.size());
391 if (fProofServEnvs.size() > 0) {
392 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
393 for ( ; ienvs != fProofServEnvs.end(); ++ienvs) { (*ienvs).Print(
"env"); }
398 XPDFORM(msg,
"using %s to start proofserv sessions", fUseFork ?
"fork()" :
"system()");
399 if (notify) XPDPRT(msg);
404 if ((nr = PrepareSessionRecovering()) < 0) {
405 TRACE(XERR,
"problems trying to recover active sessions");
407 XPDFORM(msg,
"%d active sessions have been recovered", nr);
414 fManagerCron.fClientMgr = fMgr->ClientMgr();
415 fManagerCron.fSessionMgr =
this;
416 if (XrdSysThread::Run(&tid, XrdProofdProofServCron,
417 (
void *)&fManagerCron, 0,
"ProofServMgr cron thread") != 0) {
418 TRACE(XERR,
"could not start cron thread");
421 XPDPRT(
"cron thread started");
431 int XrdProofdProofServMgr::AddSession(XrdProofdProtocol *p, XrdProofdProofServ *s)
433 XPDLOC(SMGR,
"ProofServMgr::AddSession")
435 TRACE(REQ, "adding new active session ...");
438 if (!s || !p->Client()) {
439 TRACE(XERR,
"invalid inputs: "<<(s ?
"" :
"s, ") <<
", "<< (p->Client() ?
"" :
"p->Client()"));
442 XrdProofdClient *c = p->Client();
446 XPDFORM(path,
"%s/%s.%s.%d", fActiAdminPath.c_str(), c->User(), c->Group(), s->SrvPID());
449 XrdProofSessionInfo info(c, s);
450 int rc = info.SaveToFile(path.c_str());
459 bool XrdProofdProofServMgr::IsSessionSocket(
const char *fpid)
461 XPDLOC(SMGR,
"ProofServMgr::IsSessionSocket")
463 TRACE(REQ, "checking "<<fpid<<" ...");
466 if (!fpid || strlen(fpid) <= 0) {
467 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
472 XrdOucString spath(fpid);
473 if (!spath.endswith(
".sock"))
return 0;
474 if (!spath.beginswith(fActiAdminPath.c_str())) {
476 XPDFORM(spath,
"%s/%s", fActiAdminPath.c_str(), fpid);
478 XrdOucString apath = spath;
479 apath.replace(
".sock",
"");
483 if (stat(apath.c_str(), &st) != 0 && (errno == ENOENT)) {
485 if (CheckCounter(kCreateCnt) <= 0) {
486 unlink(spath.c_str());
487 TRACE(REQ,
"missing admin path: removing "<<spath<<
" ...");
498 int XrdProofdProofServMgr::MvSession(
const char *fpid)
500 XPDLOC(SMGR,
"ProofServMgr::MvSession")
502 TRACE(REQ, "moving "<<fpid<<" ...");
505 if (!fpid || strlen(fpid) <= 0) {
506 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
511 XrdOucString opath(fpid), npath;
512 if (!opath.beginswith(fActiAdminPath.c_str())) {
514 XPDFORM(opath,
"%s/%s", fActiAdminPath.c_str(), fpid);
515 opath.replace(
".status",
"");
518 opath.replace(
".status",
"");
522 npath.replace(fActiAdminPath.c_str(), fTermAdminPath.c_str());
525 XrdOucString spath = opath;
527 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
528 TRACE(XERR,
"problems removing the UNIX socket path: "<<spath<<
"; errno: "<<errno);
529 spath.replace(
".sock",
".status");
530 if (unlink(spath.c_str()) != 0 && errno != ENOENT)
531 TRACE(XERR,
"problems removing the status file: "<<spath<<
"; errno: "<<errno);
536 if ((rc = rename(opath.c_str(), npath.c_str())) == 0 || (errno == ENOENT)) {
539 TouchSession(fpid, npath.c_str());
543 TRACE(XERR,
"session pid file cannot be moved: "<<opath<<
544 "; target file: "<<npath<<
"; errno: "<<errno);
551 int XrdProofdProofServMgr::RmSession(
const char *fpid)
553 XPDLOC(SMGR,
"ProofServMgr::RmSession")
555 TRACE(REQ, "removing "<<fpid<<" ...");
558 if (!fpid || strlen(fpid) <= 0) {
559 TRACE(XERR,
"invalid input: "<< (fpid ? fpid :
"<nul>"));
565 XPDFORM(path,
"%s/%s", fTermAdminPath.c_str(), fpid);
568 if (unlink(path.c_str()) == 0)
571 TRACE(XERR,
"session pid file cannot be unlinked: "<<
572 path<<
"; error: "<<errno);
579 int XrdProofdProofServMgr::TouchSession(
const char *fpid,
const char *fpath)
581 XPDLOC(SMGR,
"ProofServMgr::TouchSession")
583 TRACE(REQ, "touching "<<(fpid ? fpid : "<nul>")<<", "<<(fpath ? fpath : "<nul>")<<" ...");
586 if (!fpid || strlen(fpid) <= 0) {
587 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
592 XrdOucString path(fpath);
593 if (!fpath || !fpath[0])
594 XPDFORM(path,
"%s/%s.status", fActiAdminPath.c_str(), fpid);
597 if (utime(path.c_str(), 0) == 0)
600 TRACE(XERR,
"time stamps for session pid file cannot be updated: "<<
601 path<<
"; error: "<<errno);
611 int XrdProofdProofServMgr::VerifySession(
const char *fpid,
612 int to,
const char *fpath)
614 XPDLOC(SMGR,
"ProofServMgr::VerifySession")
617 if (!fpid || strlen(fpid) <= 0) {
618 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
624 if (fpath && strlen(fpath) > 0)
625 XPDFORM(path,
"%s/%s", fpath, fpid);
627 XPDFORM(path,
"%s/%s", fActiAdminPath.c_str(), fpid);
635 if (stat(path.c_str(), &st)) {
636 TRACE(XERR,
"session status file cannot be stat'ed: "<<
637 path<<
"; error: "<<errno);
641 int xto = (to > 0) ? to : fVerifyTimeOut;
642 deltat = time(0) - st.st_mtime;
644 if (path.endswith(
".status")) {
646 path.erase(path.rfind(
".status"));
649 TRACE(DBG,
"admin path for session "<<fpid<<
" hase not been touched"
650 " since at least "<< xto <<
" secs");
660 TRACE(DBG,
"admin path for session "<<fpid<<
" was touched " <<
661 deltat <<
" secs ago");
669 int XrdProofdProofServMgr::DeleteFromSessions(
const char *fpid)
671 XPDLOC(SMGR,
"ProofServMgr::DeleteFromSessions")
673 TRACE(REQ, "session: "<<fpid);
676 if (!fpid || strlen(fpid) <= 0) {
677 TRACE(XERR,
"invalid input: "<<(fpid ? fpid :
"<nul>"));
681 XrdOucString key = fpid;
682 key.replace(
".status",
"");
683 key.erase(0, key.rfind(
'.') + 1);
684 XrdProofdProofServ *xps = 0;
685 { XrdSysMutexHelper mhp(fMutex); xps = fSessions.Find(key.c_str()); }
689 XPDFORM(msg,
"session: %s terminated by peer", fpid);
692 int tp = xps->Reset(msg.c_str(), kXPD_wrkmortem);
694 XrdSysMutexHelper mhp(fMutex);
695 if (tp == 1) fCurrentSessions--;
697 fActiveSessions.remove(xps);
700 { XrdSysMutexHelper mhp(fMutex); rc = fSessions.Del(key.c_str()); }
709 int XrdProofdProofServMgr::PrepareSessionRecovering()
711 XPDLOC(SMGR,
"ProofServMgr::PrepareSessionRecovering")
714 DIR *dir = opendir(fActiAdminPath.c_str());
716 TRACE(XERR,
"cannot open dir "<<fActiAdminPath<<
" ; error: "<<errno);
719 TRACE(REQ,
"preparing recovering of active sessions ...");
722 fRecoverClients =
new std::list<XpdClientSessions *>;
723 struct dirent *ent = 0;
724 while ((ent = (
struct dirent *)readdir(dir))) {
725 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
727 XrdOucString rest, a;
728 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
729 if (!XPD_LONGOK(pid) || pid <= 0)
continue;
730 if (a.length() > 0)
continue;
733 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
734 if (ResolveSession(ent->d_name) == 0) {
735 TRACE(DBG,
"found active session: "<<pid);
741 MvSession(ent->d_name);
748 { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
753 fManagerCron.fClientMgr = fMgr->ClientMgr();
754 fManagerCron.fSessionMgr =
this;
755 fManagerCron.fProofSched = fMgr->ProofSched();
756 if (XrdSysThread::Run(&tid, XrdProofdProofServRecover, (
void *)&fManagerCron,
757 0,
"ProofServMgr session recover thread") != 0) {
758 TRACE(XERR,
"could not start session recover thread");
761 XPDPRT(
"session recover thread started");
764 if (fMgr->ClientMgr() && fMgr->ClientMgr()->GetNClients() <= 0)
780 int XrdProofdProofServMgr::RecoverActiveSessions()
782 XPDLOC(SMGR,
"ProofServMgr::RecoverActiveSessions")
786 if (!fRecoverClients) {
788 TRACE(XERR,
"recovering clients list undefined");
793 { XrdSysMutexHelper mhp(fRecoverMutex); nrc = fRecoverClients->size(); }
794 TRACE(REQ,
"start recovering of "<<nrc<<
" clients");
797 { XrdSysMutexHelper mhp(fRecoverMutex);
798 fRecoverDeadline = time(0) + fRecoverTimeOut * nrc; }
802 XpdClientSessions *cls = 0;
807 { XrdSysMutexHelper mhp(fRecoverMutex); cls = fRecoverClients->front(); }
813 { XrdSysMutexHelper mhp(cls->fMutex);
814 if (cls->fProofServs.size() <= 0) {
815 XrdSysMutexHelper mhpr(fRecoverMutex);
816 fRecoverClients->remove(cls);
818 if ((nrc = fRecoverClients->size()) <= 0)
823 TRACE(REQ, nrc<<
" clients still to recover");
826 { XrdSysMutexHelper mhp(fRecoverMutex);
827 go = (time(0) < fRecoverDeadline) ?
true :
false; }
834 { XrdSysMutexHelper mhp(fRecoverMutex);
835 if (fRecoverClients->size() > 0) {
836 std::list<XpdClientSessions* >::iterator ii = fRecoverClients->begin();
837 for (; ii != fRecoverClients->end(); ++ii) {
838 rc += (*ii)->fProofServs.size();
844 { XrdSysMutexHelper mhp(fRecoverMutex);
845 fRecoverClients->clear();
846 delete fRecoverClients;
848 fRecoverDeadline = -1;
860 bool XrdProofdProofServMgr::IsClientRecovering(
const char *usr,
const char *grp,
863 XPDLOC(SMGR,
"ProofServMgr::IsClientRecovering")
866 TRACE(XERR,
"invalid inputs: usr: "<<(usr ? usr :
"")<<
", grp:"<<(grp ? grp :
"")<<
" ...");
872 { XrdSysMutexHelper mhp(fRecoverMutex);
873 if (fRecoverClients && fRecoverClients->size() > 0) {
874 std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
875 for (; ii != fRecoverClients->end(); ++ii) {
876 if ((*ii)->fClient && (*ii)->fClient->Match(usr, grp)) {
878 deadline = fRecoverDeadline;
884 TRACE(DBG,
"checking usr: "<<usr<<
", grp:"<<grp<<
" ... recovering? "<<
885 rc<<
", until: "<<deadline);
897 int XrdProofdProofServMgr::CheckActiveSessions(
bool verify)
899 XPDLOC(SMGR,
"ProofServMgr::CheckActiveSessions")
901 TRACE(REQ, "checking active sessions ...");
904 DIR *dir = opendir(fActiAdminPath.c_str());
906 TRACE(XERR,
"cannot open dir "<<fActiAdminPath<<
" ; error: "<<errno);
911 struct dirent *ent = 0;
912 while ((ent = (
struct dirent *)readdir(dir))) {
913 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
916 if (strstr(ent->d_name,
".sock") && IsSessionSocket(ent->d_name))
continue;
918 XrdOucString rest, key, after;
919 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, after);
921 if (after !=
"status")
continue;
923 if (!XPD_LONGOK(pid) || pid <= 0)
continue;
926 XrdProofdProofServ *xps = 0;
927 { XrdSysMutexHelper mhp(fMutex);
928 xps = fSessions.Find(key.c_str());
931 bool sessionalive = (VerifySession(ent->d_name) == 0) ? 1 : 0;
934 if (!xps->IsValid() || !sessionalive) rmsession = 1;
938 if (sessionalive)
continue;
943 bool oldvers = (xps && xps->ROOT() && xps->ROOT()->SrvProtVers() >= 18) ? 0 : 1;
949 rmsession = xps->CheckSession(oldvers, IsReconnecting(),
950 fShutdownOpt, fShutdownDelay, fMgr->ChangeOwn(), nc);
956 if (!rmsession && verify && !oldvers) {
957 if (xps->VerifyProofServ(0) != 0) {
962 TRACE(REQ,
"session: "<<ent->d_name<<
"; nc: "<<nc<<
"; rm: "<<rmsession);
965 MvSession(ent->d_name);
979 int XrdProofdProofServMgr::CheckTerminatedSessions()
981 XPDLOC(SMGR,
"ProofServMgr::CheckTerminatedSessions")
983 TRACE(REQ, "checking terminated sessions ...");
986 DIR *dir = opendir(fTermAdminPath.c_str());
988 TRACE(XERR,
"cannot open dir "<<fTermAdminPath<<
" ; error: "<<errno);
994 struct dirent *ent = 0;
995 while ((ent = (
struct dirent *)readdir(dir))) {
996 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
998 XrdOucString rest, a;
999 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
1000 if (!XPD_LONGOK(pid) || pid <= 0)
continue;
1003 now = (now > 0) ? now : time(0);
1007 XPDFORM(path,
"%s/%s", fTermAdminPath.c_str(), ent->d_name);
1011 int rcst = stat(path.c_str(), &st);
1012 TRACE(DBG, pid<<
": rcst: "<<rcst<<
", now - mtime: "<<now - st.st_mtime<<
" secs")
1013 if ((now - st.st_mtime) > fTerminationTimeOut || rcst != 0) {
1015 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1017 XrdProofSessionInfo info(path.c_str());
1019 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
1020 XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn());
1023 RmSession(ent->d_name);
1038 int XrdProofdProofServMgr::CleanClientSessions(
const char *usr,
int srvtype)
1040 XPDLOC(SMGR,
"ProofServMgr::CleanClientSessions")
1042 TRACE(REQ, "cleaning "<<usr<<" ...");
1045 bool all = (!usr || strlen(usr) <= 0 || !strcmp(usr, "all")) ? 1 : 0;
1050 XrdProofdAux::GetUserInfo(usr, ui);
1051 XrdOucString path, rest, key, a;
1054 XrdSysRecMutex *mtx = 0;
1060 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(usr);
1061 if (c) mtx = c->Mutex();
1064 std::list<int> tobedel;
1065 { XrdSysMutexHelper mtxh(mtx);
1068 DIR *dir = opendir(fTermAdminPath.c_str());
1070 TRACE(XERR,
"cannot open dir "<<fTermAdminPath<<
" ; error: "<<errno);
1073 struct dirent *ent = 0;
1074 while ((ent = (
struct dirent *)readdir(dir))) {
1076 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
1078 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
1079 if (!XPD_LONGOK(pid) || pid <= 0)
continue;
1081 XPDFORM(path,
"%s/%s", fTermAdminPath.c_str(), ent->d_name);
1082 XrdProofSessionInfo info(path.c_str());
1084 if (!all && info.fUser != usr)
continue;
1086 if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype)
continue;
1089 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
1091 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1093 XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn());
1096 RmSession(ent->d_name);
1104 dir = opendir(fActiAdminPath.c_str());
1106 TRACE(XERR,
"cannot open dir "<<fActiAdminPath<<
" ; error: "<<errno);
1111 struct dirent *ent = 0;
1112 while ((ent = (
struct dirent *)readdir(dir))) {
1114 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
1116 int pid = XrdProofdAux::ParsePidPath(ent->d_name, rest, a);
1117 if (a ==
"status")
continue;
1118 if (!XPD_LONGOK(pid) || pid <= 0)
continue;
1120 XPDFORM(path,
"%s/%s", fActiAdminPath.c_str(), ent->d_name);
1121 XrdProofSessionInfo info(path.c_str());
1122 if (!all && info.fUser != usr)
continue;
1124 if (srvtype != kXPD_AnyServer && info.fSrvType != srvtype)
continue;
1127 XrdProofdAux::GetUserInfo(info.fUser.c_str(), ui);
1129 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
1131 tobedel.push_back(pid);
1133 XrdProofdAux::KillProcess(pid, 0, ui, fMgr->ChangeOwn());
1136 MvSession(ent->d_name);
1143 std::list<int>::iterator ii = tobedel.begin();
1144 while (ii != tobedel.end()) {
1145 XPDFORM(key,
"%d", *ii);
1146 XrdSysMutexHelper mhp(fMutex);
1147 XrdProofdProofServ *xps = fSessions.Find(key.c_str());
1149 std::list<XrdProofdProofServ *>::iterator ixps = fActiveSessions.begin();
1150 while (ixps != fActiveSessions.end()) {
1157 if (!active) fSessions.Del(key.c_str());
1168 void XrdProofdProofServMgr::RegisterDirectives()
1171 Register(
"proofservmgr",
new XrdProofdDirective(
"proofservmgr",
this, &DoDirectiveClass));
1172 Register(
"putenv",
new XrdProofdDirective(
"putenv",
this, &DoDirectiveClass));
1173 Register(
"putrc",
new XrdProofdDirective(
"putrc",
this, &DoDirectiveClass));
1174 Register(
"shutdown",
new XrdProofdDirective(
"shutdown",
this, &DoDirectiveClass));
1177 new XrdProofdDirective(
"intwait", (
void *)&fInternalWait, &DoDirectiveInt));
1178 Register(
"reconnto",
1179 new XrdProofdDirective(
"reconnto", (
void *)&fReconnectTimeOut, &DoDirectiveInt));
1181 Register(
"proofplugin",
1182 new XrdProofdDirective(
"proofplugin", (
void *)&fProofPlugin, &DoDirectiveString));
1183 Register(
"proofservparents",
1184 new XrdProofdDirective(
"proofservparents", (
void *)&fParentExecs, &DoDirectiveString));
1190 int XrdProofdProofServMgr::DoDirective(XrdProofdDirective *d,
1191 char *val, XrdOucStream *cfg,
bool rcf)
1193 XPDLOC(SMGR,
"ProofServMgr::DoDirective")
1199 if (d->fName == "proofservmgr") {
1200 return DoDirectiveProofServMgr(val, cfg, rcf);
1201 }
else if (d->fName ==
"putenv") {
1202 return DoDirectivePutEnv(val, cfg, rcf);
1203 }
else if (d->fName ==
"putrc") {
1204 return DoDirectivePutRc(val, cfg, rcf);
1205 }
else if (d->fName ==
"shutdown") {
1206 return DoDirectiveShutdown(val, cfg, rcf);
1208 TRACE(XERR,
"unknown directive: "<<d->fName);
1216 int XrdProofdProofServMgr::DoDirectiveProofServMgr(
char *val, XrdOucStream *cfg,
bool rcf)
1218 XPDLOC(SMGR,
"ProofServMgr::DoDirectiveProofServMgr")
1236 XrdOucString tok(val);
1237 if (tok.beginswith(
"checkfq:")) {
1238 tok.replace(
"checkfq:",
"");
1239 checkfq = strtol(tok.c_str(), 0, 10);
1240 }
else if (tok.beginswith(
"termto:")) {
1241 tok.replace(
"termto:",
"");
1242 termto = strtol(tok.c_str(), 0, 10);
1243 }
else if (tok.beginswith(
"verifyto:")) {
1244 tok.replace(
"verifyto:",
"");
1245 verifyto = strtol(tok.c_str(), 0, 10);
1246 }
else if (tok.beginswith(
"recoverto:")) {
1247 tok.replace(
"recoverto:",
"");
1248 recoverto = strtol(tok.c_str(), 0, 10);
1249 }
else if (tok.beginswith(
"checklost:")) {
1250 tok.replace(
"checklost:",
"");
1251 checklost = strtol(tok.c_str(), 0, 10);
1252 }
else if (tok.beginswith(
"usefork:")) {
1253 tok.replace(
"usefork:",
"");
1254 usefork = strtol(tok.c_str(), 0, 10);
1257 val = cfg->GetWord();
1261 if (fMgr->Host() && cfg)
1262 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
1266 fCheckFrequency = (XPD_LONGOK(checkfq) && checkfq > 0) ? checkfq : fCheckFrequency;
1267 fTerminationTimeOut = (XPD_LONGOK(termto) && termto > 0) ? termto : fTerminationTimeOut;
1268 fVerifyTimeOut = (XPD_LONGOK(verifyto) && (verifyto > fCheckFrequency + 1))
1269 ? verifyto : fVerifyTimeOut;
1270 fRecoverTimeOut = (XPD_LONGOK(recoverto) && recoverto > 0) ? recoverto : fRecoverTimeOut;
1271 if (XPD_LONGOK(checklost)) fCheckLost = (checklost != 0) ? 1 : 0;
1272 if (XPD_LONGOK(usefork)) fUseFork = (usefork != 0) ? 1 : 0;
1275 XPDFORM(msg,
"checkfq: %d s, termto: %d s, verifyto: %d s, recoverto: %d s, checklost: %d, usefork: %d",
1276 fCheckFrequency, fTerminationTimeOut, fVerifyTimeOut, fRecoverTimeOut, fCheckLost, fUseFork);
1285 int XrdProofdProofServMgr::DoDirectivePutEnv(
char *val, XrdOucStream *cfg,
bool)
1292 XrdOucString users, groups, rcval, rcnam;
1293 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1295 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1298 int iequ = rcnam.find(
'=');
1299 if (iequ == STR_NPOS)
return -1;
1303 FillEnvList(&fProofServEnvs, rcnam.c_str(), rcval.c_str(),
1304 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1316 int XrdProofdProofServMgr::DoDirectivePutRc(
char *val, XrdOucStream *cfg,
bool)
1323 XrdOucString users, groups, rcval, rcnam;
1324 int smi = -1, smx = -1, vmi = -1, vmx = -1;
1326 ExtractEnv(val, cfg, users, groups, rcval, rcnam, smi, smx, vmi, vmx, hex);
1329 FillEnvList(&fProofServRCs, rcnam.c_str(), rcval.c_str(),
1330 users.c_str(), groups.c_str(), smi, smx, vmi, vmx, hex);
1338 void XrdProofdProofServMgr::ExtractEnv(
char *val, XrdOucStream *cfg,
1339 XrdOucString &users, XrdOucString &groups,
1340 XrdOucString &rcval, XrdOucString &rcnam,
1341 int &smi,
int &smx,
int &vmi,
int &vmx,
bool &hex)
1343 XrdOucString ssvn, sver;
1345 while (val && val[0]) {
1346 if (!strncmp(val,
"u:", 2)) {
1349 }
else if (!strncmp(val,
"g:", 2)) {
1352 }
else if (!strncmp(val,
"s:", 2)) {
1355 idash = ssvn.find(
'-');
1356 if (idash != STR_NPOS) {
1357 if (ssvn.isdigit(0, idash-1)) smi = ssvn.atoi(0, idash-1);
1358 if (ssvn.isdigit(idash+1)) smx = ssvn.atoi(idash+1);
1360 if (ssvn.isdigit()) smi = ssvn.atoi();
1362 }
else if (!strncmp(val,
"v:", 2)) {
1366 if (sver.beginswith(
'x')) {
1370 idash = sver.find(
'-');
1371 if (idash != STR_NPOS) {
1372 if (sver.isdigit(0, idash-1)) vmi = sver.atoi(0, idash-1);
1373 if (sver.isdigit(idash+1)) vmx = sver.atoi(idash+1);
1375 if (sver.isdigit()) vmi = sver.atoi();
1378 if (rcval.length() > 0) {
1385 val = cfg->GetWord();
1394 void XrdProofdProofServMgr::FillEnvList(std::list<XpdEnv> *el,
const char *nam,
const char *val,
1395 const char *usrs,
const char *grps,
1396 int smi,
int smx,
int vmi,
int vmx,
bool hex)
1398 XPDLOC(SMGR,
"ProofServMgr::FillEnvList")
1401 TRACE(ALL,
"env list undefined!");
1405 XrdOucString users(usrs), groups(grps);
1407 if (vmi > 0) vmi = XpdEnv::ToVersCode(vmi, hex);
1408 if (vmx > 0) vmx = XpdEnv::ToVersCode(vmx, hex);
1410 XpdEnv xpe(nam, val, users.c_str(), groups.c_str(), smi, smx, vmi, vmx);
1411 if (users.length() > 0) {
1414 while ((from = users.tokenize(usr, from,
',')) != -1) {
1415 if (usr.length() > 0) {
1416 if (groups.length() > 0) {
1419 while ((fromg = groups.tokenize(grp, from,
',')) != -1) {
1420 if (grp.length() > 0) {
1421 xpe.Reset(nam, val, usr.c_str(), grp.c_str(), smi, smx, vmi, vmx);
1426 xpe.Reset(nam, val, usr.c_str(), 0, smi, smx, vmi, vmx);
1432 if (groups.length() > 0) {
1435 while ((fromg = groups.tokenize(grp, fromg,
',')) != -1) {
1436 if (grp.length() > 0) {
1437 xpe.Reset(nam, val, 0, grp.c_str(), smi, smx, vmi, vmx);
1452 int XrdProofdProofServMgr::DoDirectiveShutdown(
char *val, XrdOucStream *cfg,
bool)
1462 int dp = strtol(val,0,10);
1463 if (dp >= 0 && dp <= 2)
1466 if ((val = cfg->GetWord())) {
1467 int l = strlen(val);
1469 XrdOucString tval = val;
1471 if (val[l-1] ==
's') {
1473 }
else if (val[l-1] ==
'm') {
1476 }
else if (val[l-1] ==
'h') {
1479 }
else if (val[l-1] < 48 || val[l-1] > 57) {
1483 int de = strtol(val,0,10);
1490 if (fMgr->Host() && cfg)
1491 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
1495 fShutdownOpt = (opt > -1) ? opt : fShutdownOpt;
1496 fShutdownDelay = (delay > -1) ? delay : fShutdownDelay;
1504 int XrdProofdProofServMgr::Process(XrdProofdProtocol *p)
1506 XPDLOC(SMGR,
"ProofServMgr::Process")
1509 XPD_SETRESP(p, "Process");
1511 TRACEP(p, REQ, "enter: req
id: " << p->Request()->header.requestid << " (" <<
1512 XrdProofdAux::ProofRequestTypes(p->Request()->header.requestid) << ")");
1514 XrdSysMutexHelper mtxh(p->Client()->Mutex());
1517 XrdOucString emsg("Invalid request code: ");
1521 if (Pipe()->Post(XrdProofdProofServMgr::kProcessReq, 0) != 0) {
1522 response->Send(kXR_ServerError,
1523 "ProofServMgr::Process: error posting internal pipe for authorization to proceed");
1526 if (fProcessSem.Wait(twait) != 0) {
1527 response->Send(kXR_ServerError,
1528 "ProofServMgr::Process: timed-out waiting for authorization to proceed - retry later");
1533 XpdSrvMgrCreateCnt cnt(
this, kProcessCnt);
1535 switch(p->Request()->header.requestid) {
1545 emsg += p->Request()->header.requestid;
1550 response->Send(kXR_InvalidRequest, emsg.c_str());
1557 int XrdProofdProofServMgr::Attach(XrdProofdProtocol *p)
1559 XPDLOC(SMGR,
"ProofServMgr::Attach")
1561 int psid = -1, rc = 0;
1562 XPD_SETRESP(p, "Attach");
1565 psid = ntohl(p->Request()->proof.sid);
1566 TRACEP(p, REQ, "psid: "<<psid<<", CID = "<<p->CID());
1569 XrdProofdClient *c = p->Client();
1571 TRACEP(p, XERR,
"client instance undefined");
1572 response->Send(kXR_ServerError,
"client instance undefined");
1578 XrdProofdProofServ *xps = 0;
1580 int deadline = -1, defdeadline = now + fRecoverTimeOut;
1581 while ((deadline < 0) || (now < deadline)) {
1582 if (!(xps = c->GetServer(psid)) || !xps->IsValid()) {
1584 if (!IsClientRecovering(c->User(), c->Group(), deadline)) {
1586 TRACEP(p, XERR,
"session ID not found: "<<psid);
1587 response->Send(kXR_InvalidRequest,
"session ID not found");
1591 deadline = (deadline > 0) ? deadline : defdeadline;
1602 if (!xps || !xps->IsValid()) {
1603 TRACEP(p, XERR,
"session ID not found: "<<psid);
1604 response->Send(kXR_InvalidRequest,
"session ID not found");
1607 TRACEP(p, DBG,
"xps: "<<xps<<
", status: "<< xps->Status());
1611 memcpy((
void *)&sid, (
const void *)&(p->Request()->header.streamid[0]), 2);
1615 XrdClientID *csid = xps->GetClientID(p->CID());
1620 if (!(xps->Parent()))
1621 xps->SetParent(csid);
1624 int protvers = (xps && xps->ROOT()) ? xps->ROOT()->SrvProtVers() : -1;
1625 if (p->ConnType() == kXPD_ClientMaster) {
1627 XrdOucString dpu = fMgr->PoolURL();
1628 if (!dpu.endswith(
'/'))
1630 dpu += fMgr->NameSpace();
1631 response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN,
1632 (
void *) dpu.c_str(), dpu.length());
1634 response->SendI(psid, protvers, (kXR_int16)XPROOFD_VERSBIN);
1637 if (xps->Status() == kXPD_running && xps->StartMsg()) {
1638 TRACEP(p, XERR,
"sending start process message ("<<xps->StartMsg()->fSize<<
" bytes)");
1639 response->Send(kXR_attn, kXPD_msg,
1640 xps->StartMsg()->fBuff, xps->StartMsg()->fSize);
1650 XrdProofdProofServ *XrdProofdProofServMgr::PrepareProofServ(XrdProofdProtocol *p,
1651 XrdProofdResponse *r,
1652 unsigned short &sid)
1654 XPDLOC(SMGR,
"ProofServMgr::PrepareProofServ")
1657 XrdProofdProofServ *xps = p->Client()->GetFreeServObj();
1658 xps->SetClient(p->Client()->User());
1659 xps->SetSrvType(p->ConnType());
1662 memcpy((
void *)&sid, (const
void *)&(p->Request()->header.streamid[0]), 2);
1665 XrdClientID *csid = xps->GetClientID(p->CID());
1669 xps->SetParent(csid);
1672 xps->SetROOT(p->Client()->ROOT());
1674 XPDFORM(msg, "using ROOT version: %s", xps->ROOT()->Export());
1675 TRACEP(p, REQ, msg);
1676 if (p->ConnType() == kXPD_ClientMaster) {
1678 if (fMgr && p->Client()->ROOT() != fMgr->ROOTMgr()->DefaultVersion()) {
1679 XPDFORM(msg,
"++++ Using NON-default ROOT version: %s ++++\n", xps->ROOT()->Export());
1680 r->Send(kXR_attn, kXPD_srvmsg, (
char *) msg.c_str(), msg.length());
1691 void XrdProofdProofServMgr::ParseCreateBuffer(XrdProofdProtocol *p,
1692 XrdProofdProofServ *xps,
1693 XrdOucString &tag, XrdOucString &ord,
1694 XrdOucString &cffile,
1695 XrdOucString &uenvs,
int &intwait)
1697 XPDLOC(SMGR,
"ProofServMgr::ParseCreateBuffer")
1700 char *buf = p->Argp()->buff;
1701 int len = p->Request()->proof.dlen;
1704 tag.assign(buf,0,len-1);
1706 TRACEP(p, DBG, "received buf: "<<tag);
1708 tag.erase(tag.find('|'));
1709 xps->SetTag(tag.c_str());
1710 TRACEP(p, DBG, "tag: "<<tag);
1714 if ((p->ConnType() == kXPD_MasterWorker) || (p->ConnType() == kXPD_MasterMaster)) {
1715 ord.assign(buf,0,len-1);
1716 int iord = ord.find(
"|ord:");
1717 if (iord != STR_NPOS) {
1718 ord.erase(0,iord+5);
1719 ord.erase(ord.find(
"|"));
1723 xps->SetOrdinal(ord.c_str());
1726 cffile.assign(buf,0,len-1);
1727 int icf = cffile.find(
"|cf:");
1728 if (icf != STR_NPOS) {
1729 cffile.erase(0,icf+4);
1730 cffile.erase(cffile.find(
"|"));
1735 XrdOucString plitenwk;
1736 plitenwk.assign(buf,0,len-1);
1737 int inwk = plitenwk.find(
"|plite:");
1738 if (inwk != STR_NPOS) {
1739 plitenwk.erase(0,inwk+7);
1740 plitenwk.erase(plitenwk.find(
"|"));
1741 int nwk = plitenwk.atoi();
1743 xps->SetPLiteNWrks(nwk);
1744 TRACEP(p, DBG,
"P-Lite master with "<<nwk<<
" workers (0 means # or cores)");
1749 uenvs.assign(buf,0,len-1);
1750 int ienv = uenvs.find(
"|envs:");
1751 if (ienv != STR_NPOS) {
1752 uenvs.erase(0,ienv+6);
1753 uenvs.erase(uenvs.find(
"|"));
1754 xps->SetUserEnvs(uenvs.c_str());
1759 intwait = fInternalWait;
1760 if (uenvs.length() > 0) {
1761 TRACEP(p, DBG,
"user envs: "<<uenvs);
1763 if ((iiw = uenvs.find(
"PROOF_INTWAIT=")) != STR_NPOS) {
1764 XrdOucString s(uenvs, iiw + strlen(
"PROOF_INTWAIT="));
1765 s.erase(s.find(
','));
1768 TRACEP(p, ALL,
"startup internal wait set by user to "<<intwait);
1777 int XrdProofdProofServMgr::Create(XrdProofdProtocol *p)
1779 XPDLOC(SMGR,
"ProofServMgr::Create")
1781 int psid = -1, rc = 0;
1782 XPD_SETRESP(p, "Create");
1784 TRACEP(p, DBG, "enter");
1787 XpdSrvMgrCreateGuard mcGuard;
1790 int mxsess = (fMgr && fMgr->ProofSched()) ? fMgr->ProofSched()->MaxSessions() : -1;
1791 if (p->ConnType() == kXPD_ClientMaster && mxsess > 0) {
1792 XrdSysMutexHelper mhp(fMutex);
1793 int cursess = CurrentSessions();
1794 TRACEP(p,ALL,
" cursess: "<<cursess);
1795 if (mxsess <= cursess) {
1796 XPDFORM(msg,
" ++++ Max number of sessions reached (%d) - please retry later ++++ \n", cursess);
1797 response->Send(kXR_attn, kXPD_srvmsg, (
char *) msg.c_str(), msg.length());
1798 response->Send(kXP_TooManySess,
"cannot start a new session");
1802 mcGuard.Set(&fCurrentSessions);
1806 XpdSrvMgrCreateCnt cnt(
this, kCreateCnt);
1808 int nc = CheckCounter(kCreateCnt);
1809 TRACEP(p, DBG, nc <<
" threads are creating a new session");
1814 XrdProofdProofServ *xps = PrepareProofServ(p, response, sid);
1818 int loglevel = ntohl(p->Request()->proof.int1);
1822 XrdOucString tag, ord, cffile, uenvs;
1823 ParseCreateBuffer(p, xps, tag, ord, cffile, uenvs, intwait);
1826 TRACEP(p, DBG,
"{ord,cfg,psid,cid,log}: {"<<ord<<
","<<cffile<<
","<<psid
1827 <<
","<<p->CID()<<
","<<loglevel<<
"}");
1832 if (fForkSem.Wait(10) != 0) {
1835 response->Send(kXP_ServerError,
"timed-out acquiring fork semaphore");
1840 XrdProofdPipe fpc, fcp;
1841 if (!(fpc.IsValid()) || !(fcp.IsValid())) {
1844 response->Send(kXP_ServerError,
1845 "unable to create pipes for communication during setup");
1850 ProofServEnv_t in = {xps, loglevel, cffile.c_str(),
"",
"", tag.c_str(),
"",
"", 1};
1851 GetTagDirs(0, p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
1855 TRACEP(p, FORK,
"Forking external proofsrv");
1856 if (!(pid = fMgr->Sched()->Fork(
"proofsrv"))) {
1859 GetTagDirs((
int)getpid(),
1860 p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
1863 FormFileNameInSessionDir(p, xps, in.fSessionDir.c_str(),
"log", in.fLogFile);
1866 if (fLogger) fLogger->Bind(in.fLogFile.c_str());
1867 TRACE(FORK,
"log file: "<<in.fLogFile);
1869 XrdOucString pmsg =
"*** spawned child process ";
1870 pmsg += (int) getpid();
1875 if (chown(in.fLogFile.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid) != 0)
1876 TRACE(XERR,
"chown on '"<<in.fLogFile.c_str()<<
"'; errno: "<<errno);
1879 XrdOucString path, sockpath, emsg;
1882 if (fpc.Poll() < 0) {
1883 TRACE(XERR,
"error while polling to receive the admin path from parent - EXIT" );
1886 if (fpc.Recv(xmsg) != 0) {
1887 TRACE(XERR,
"error reading message while waiting for the admin path from parent - EXIT" );
1890 if (xmsg.Type() < 0) {
1891 TRACE(XERR,
"the parent failed to setup the admin path - EXIT" );
1896 xps->SetAdminPath(path.c_str(), 0, fMgr->ChangeOwn());
1897 TRACE(FORK,
"admin path: "<<path);
1901 if (fpc.Poll() < 0) {
1902 TRACE(XERR,
"error while polling to receive the sock path from parent - EXIT" );
1905 if (fpc.Recv(xmsg) != 0) {
1906 TRACE(XERR,
"error reading message while waiting for the sock path from parent - EXIT" );
1909 if (xmsg.Type() < 0) {
1910 TRACE(XERR,
"the parent failed to setup the sock path - EXIT" );
1914 sockpath = xmsg.Buf();
1915 xps->SetUNIXSockPath(sockpath.c_str());
1916 TRACE(FORK,
"UNIX sock path: "<<sockpath);
1919 bool asserdatadir = 1;
1920 int srvtype = xps->SrvType();
1921 TRACE(ALL,
"srvtype = "<< srvtype);
1922 if (xps->SrvType() != kXPD_Worker && !strchr(fMgr->DataDirOpts(),
'M')) {
1924 }
else if (xps->SrvType() == kXPD_Worker && !strchr(fMgr->DataDirOpts(),
'W')) {
1927 const char *pord = asserdatadir ? ord.c_str() : 0;
1928 const char *ptag = asserdatadir ? in.fSessionTag.c_str() : 0;
1929 if (SetUserOwnerships(p, pord, ptag) != 0) {
1930 emsg =
"SetUserOwnerships did not return OK - EXIT";
1932 if (fcp.Post(0, emsg.c_str()) != 0)
1933 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1938 if (SetUserEnvironment(p) != 0) {
1939 emsg =
"SetUserEnvironment did not return OK - EXIT";
1941 if (fcp.Post(0, emsg.c_str()) != 0)
1942 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1946 char *argvv[7] = {0};
1950 emsg =
"XrdProofdManager instance undefined!";
1952 if (fcp.Post(0, emsg.c_str()) != 0)
1953 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1957 if (fMgr->AdminPath()) {
1959 size_t len = strlen(fMgr->AdminPath()) + strlen(
"xpdpath:") + 1;
1960 sxpd =
new char[len];
1961 snprintf(sxpd, len,
"xpdpath:%s", fMgr->AdminPath());
1964 sxpd =
new char[10];
1965 snprintf(sxpd, 10,
"%d", getppid());
1969 char slog[10] = {0};
1970 snprintf(slog, 10,
"%d", loglevel);
1973 char ssrv[10] = {0};
1974 snprintf(ssrv, 10,
"%d", xps->SrvType());
1977 argvv[0] = (
char *) xps->ROOT()->PrgmSrv();
1978 argvv[1] = (
char *)((p->ConnType() == kXPD_MasterWorker) ?
"proofslave"
1980 argvv[2] = (
char *)
"xpd";
1981 argvv[3] = (
char *)sxpd;
1982 argvv[4] = (
char *)slog;
1983 argvv[5] = (
char *)ssrv;
1987 if (SetProofServEnv(p, (
void *)&in) != 0) {
1988 emsg =
"SetProofServEnv did not return OK - EXIT";
1990 if (fcp.Post(0, emsg.c_str()) != 0)
1991 TRACE(XERR,
"cannot write to internal pipe; errno: "<<errno);
1994 TRACE(FORK, (
int)getpid() <<
": proofserv env set up");
1998 if (fcp.Post(1, xps->Fileout()) != 0) {
1999 TRACE(XERR,
"cannot write log file path to internal pipe; errno: "<<errno);
2002 TRACE(FORK, (
int)getpid()<<
": log file path communicated");
2006 sigemptyset(&myset);
2007 sigaddset(&myset, SIGUSR1);
2008 sigaddset(&myset, SIGUSR2);
2009 pthread_sigmask(SIG_UNBLOCK, &myset, 0);
2015 TRACE(FORK, (
int)getpid()<<
": user: "<<p->Client()->User()<<
2016 ", uid: "<<getuid()<<
", euid:"<<geteuid()<<
2017 ", psrv: "<<xps->ROOT()->PrgmSrv()<<
", argvv[1]: "<<argvv[1]);
2019 execv(xps->ROOT()->PrgmSrv(), argvv);
2022 TRACE(XERR,
"returned from execv: bad, bad sign !!! errno:" << (
int)errno);
2033 response->Send(kXP_ServerError,
"could not fork agent");
2037 TRACEP(p, FORK,
"Parent process: child is "<<pid);
2041 GetTagDirs((
int)pid, p, xps, in.fSessionTag, in.fTopSessionTag, in.fSessionDir, in.fWrkDir);
2044 FormFileNameInSessionDir(p, xps, in.fSessionDir.c_str(),
"log", in.fLogFile);
2046 TRACEP(p, FORK,
"log file: "<<in.fLogFile);
2050 XPDFORM(npfx,
"%s-%s:", (p->ConnType() == kXPD_MasterWorker) ?
"wrk" :
"mst", xps->Ordinal());
2053 if (xps->UNIXSock()) {
2054 TRACEP(p, FORK,
"current UNIX sock: "<<xps->UNIXSock() <<
", path: "<<xps->UNIXSockPath());
2055 xps->DeleteUNIXSock();
2060 XrdOucString path, sockpath;
2061 XPDFORM(path,
"%s/%s.%s.%d", fActiAdminPath.c_str(),
2062 p->Client()->User(), p->Client()->Group(), pid);
2064 XPDFORM(sockpath,
"%s/xpd.%d.%d", fMgr->SockPathDir(), fMgr->Port(), pid);
2065 struct sockaddr_un unserver;
2066 if (sockpath.length() > (int)(
sizeof(unserver.sun_path) - 1)) {
2067 emsg =
"socket path very long (";
2068 emsg += sockpath.length();
2069 emsg +=
"): this may lead to stack corruption!";
2070 emsg +=
" Use xpd.sockpathdir to change it";
2071 TRACEP(p, XERR, emsg.c_str());
2074 if (!pathrc && !(pathrc = xps->SetAdminPath(path.c_str(), 1, fMgr->ChangeOwn()))) {
2076 if ((pathrc = fpc.Post(0, path.c_str())) != 0) {
2077 emsg =
"failed to communicating path to child";
2078 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2079 TRACEP(p, XERR, emsg.c_str());
2082 emsg =
"failed to setup child admin path";
2084 if ((pathrc = fpc.Post(-1, path.c_str())) != 0) {
2085 emsg +=
": failed communicating failure to child";
2086 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2087 TRACEP(p, XERR, emsg.c_str());
2092 xps->SetUNIXSockPath(sockpath.c_str());
2093 if ((pathrc = xps->CreateUNIXSock(fEDest)) != 0) {
2095 emsg =
"failure creating UNIX socket on " ;
2097 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2098 TRACEP(p, XERR, emsg.c_str());
2102 TRACEP(p, FORK,
"UNIX sock: "<<xps->UNIXSockPath());
2103 if ((pathrc = chown(sockpath.c_str(), p->Client()->UI().fUid, p->Client()->UI().fGid)) != 0) {
2104 emsg =
"failure changing ownership of the UNIX socket on " ;
2106 emsg +=
"; errno: " ;
2108 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2109 TRACEP(p, XERR, emsg.c_str());
2115 if ((pathrc = fpc.Post(0, sockpath.c_str())) != 0) {
2116 emsg =
"failed to communicating path to child";
2117 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2118 TRACEP(p, XERR, emsg.c_str());
2121 emsg =
"failed to setup child admin path";
2123 if ((pathrc = fpc.Post(-1, sockpath.c_str())) != 0) {
2124 emsg +=
": failed communicating failure to child";
2125 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2126 TRACEP(p, XERR, emsg.c_str());
2133 XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
2136 emsg += in.fLogFile;
2137 emsg.insert(npfx, 0);
2138 response->Send(kXP_ServerError, emsg.c_str());
2142 TRACEP(p, FORK,
"waiting for client setup status ...");
2144 emsg =
"proofserv setup";
2148 int ntry = 10, prc = 0, rst = -1;
2149 while (prc == 0 && ntry--) {
2151 if ((prc = fcp.Poll(2)) > 0) {
2154 if (fcp.Recv(xmsg) != 0) {
2155 emsg =
"error receiving message from pipe";
2156 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2157 TRACEP(p, XERR, emsg.c_str());
2164 XrdOucString xbuf = xmsg.Buf();
2165 if (xbuf.length() <= 0) {
2166 emsg =
"error reading buffer {logfile, error message} from message received on the pipe";
2167 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2168 TRACEP(p, XERR, emsg.c_str());
2174 xps->SetFileout(xbuf.c_str());
2176 XrdOucString stag(xbuf);
2177 stag.erase(stag.rfind(
'/'));
2178 stag.erase(0, stag.find(
"session-") + strlen(
"session-"));
2179 xps->SetTag(stag.c_str());
2186 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2187 TRACEP(p, XERR, emsg.c_str());
2191 }
else if (prc < 0) {
2192 emsg =
"error receive status-of-setup from pipe";
2193 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2194 TRACEP(p, XERR, emsg.c_str());
2197 TRACEP(p, FORK,
"receiving status-of-setup from pipe: waiting 2 s ..."<<pid);
2205 TRACEP(p, FORK,
"tags: tag:"<<in.fSessionTag<<
" top:"<<in.fTopSessionTag<<
" xps:"<<xps->Tag());
2210 emsg =
"failure setting up proofserv" ;
2211 if (prc == 0) emsg +=
": timed-out receiving status-of-setup from pipe";
2213 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2216 XrdProofdAux::KillProcess(pid, 1, p->Client()->UI(), fMgr->ChangeOwn());
2219 emsg += in.fLogFile;
2220 TRACEP(p, XERR, emsg.c_str());
2221 emsg.insert(npfx, 0);
2222 response->Send(kXP_ServerError, emsg.c_str());
2230 info += xps->Fileout();
2232 response->SendI(psid, xps->ROOT()->SrvProtVers(), (kXR_int16)XPROOFD_VERSBIN,
2233 (
void *) info.c_str(), info.length());
2237 TRACEP(p, FORK,
"server launched: wait for callback ");
2240 xps->SetSrvPID(pid);
2243 if (AcceptPeer(xps, intwait, emsg) != 0) {
2244 emsg =
"problems accepting callback: ";
2246 if (XrdProofdAux::KillProcess(pid, 0, p->Client()->UI(), fMgr->ChangeOwn()) != 0)
2247 emsg +=
"process could not be killed - pid: ";
2249 emsg +=
"process killed - pid: ";
2252 XrdProofdAux::LogEmsgToFile(in.fLogFile.c_str(), emsg.c_str(), npfx.c_str());
2256 TRACEP(p, XERR, emsg.c_str());
2257 emsg.insert(npfx, 0);
2258 response->Send(kXR_attn, kXPD_errmsg, (
char *) emsg.c_str(), emsg.length());
2262 xps->SetGroup(p->Client()->Group());
2266 if (fMgr->PriorityMgr()->SetProcessPriority(xps->SrvPID(),
2267 p->Client()->User(), dp) != 0) {
2268 TRACEP(p, XERR,
"problems changing child process priority");
2269 }
else if (dp > 0) {
2270 TRACEP(p, DBG,
"priority of the child process changed by " << dp <<
" units");
2273 XrdClientID *cid = xps->Parent();
2274 TRACEP(p, FORK,
"xps: "<<xps<<
", ClientID: "<<(
int *)cid<<
" (sid: "<<sid<<
")"<<
" NClients: "<<xps->GetNClients(1));
2277 if (p->Client()->Sandbox()->AddSession(xps->Tag()) == -1)
2278 TRACEP(p, REQ,
"problems recording session in sandbox");
2284 XrdOucString key; key += pid;
2285 { XrdSysMutexHelper mh(fMutex);
2286 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2287 fActiveSessions.push_back(xps);
2292 if (!xps->IsValid()) {
2294 TRACEP(p, XERR,
"PROOF session is invalid: protocol error? " <<emsg);
2304 int XrdProofdProofServMgr::CreateAdminPath(XrdProofdProofServ *xps,
2305 XrdProofdProtocol *p,
int pid,
2309 bool assert = (pid > 0) ? 1 : 0;
2310 XPDFORM(path,
"%s/%s.%s.", fActiAdminPath.c_str(),
2311 p->Client()->User(), p->Client()->Group());
2312 if (pid > 0) path += pid;
2313 if (xps->SetAdminPath(path.c_str(), assert, fMgr->ChangeOwn()) != 0) {
2314 XPDFORM(emsg,
"failure setting admin path '%s'", path.c_str());
2325 int XrdProofdProofServMgr::CreateSockPath(XrdProofdProofServ *xps,
2326 XrdProofdProtocol *p,
2327 unsigned int seq, XrdOucString &emsg)
2329 XPDLOC(SMGR,
"ProofServMgr::CreateSockPath")
2331 XrdOucString sockpath;
2333 XPDFORM(sockpath, "%s/xpd.%d.%d.%u", fMgr->SockPathDir(), fMgr->Port(), getpid(), seq);
2334 TRACEP(p, ALL, "socket path: " << sockpath);
2335 struct sockaddr_un unserver;
2336 if (sockpath.length() > (
int)(sizeof(unserver.sun_path) - 1)) {
2337 XPDFORM(emsg,
"socket path very long (%d): this may lead to stack corruption! ", sockpath.length());
2341 xps->SetUNIXSockPath(sockpath.c_str());
2342 if (xps->CreateUNIXSock(fEDest) != 0) {
2344 XPDFORM(emsg,
"failure creating UNIX socket on '%s'", sockpath.c_str());
2347 if (chmod(sockpath.c_str(), 0755) != 0) {
2348 XPDFORM(emsg,
"failure changing permissions of the UNIX socket on '%s'; errno: %d",
2349 sockpath.c_str(), (int)errno);
2360 void XrdProofdProofServMgr::SendErrLog(
const char *errlog, XrdProofdResponse *r)
2362 XPDLOC(SMGR,
"ProofServMgr::SendErrLog")
2364 XrdOucString emsg("An error occured: the content of errlog follows:");
2365 r->Send(kXR_attn, kXPD_srvmsg, (
char *) emsg.c_str(), emsg.length());
2366 emsg = "------------------------------------------------\n";
2367 r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2369 int ierr = open(errlog, O_RDONLY);
2371 XPDFORM(emsg,
"cannot open '%s' (errno: %d)", errlog, errno);
2372 r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2376 if (fstat(ierr, &st) != 0) {
2377 XPDFORM(emsg,
"cannot stat '%s' (errno: %d)", errlog, errno);
2378 r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2382 off_t len = st.st_size;
2383 TRACE(ALL,
" reading "<<len<<
" bytes from "<<errlog);
2384 ssize_t chunk = 2048, nb, nr;
2388 nb = (left > chunk) ? chunk : left;
2389 if ((nr = read(ierr, buf, nb)) < 0) {
2390 XPDFORM(emsg,
"problems reading from '%s' (errno: %d)", errlog, errno);
2391 r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2396 r->Send(kXR_attn, kXPD_srvmsg, 2, buf, nr);
2400 emsg =
"------------------------------------------------";
2401 r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) emsg.c_str(), emsg.length());
2410 int XrdProofdProofServMgr::ResolveSession(
const char *fpid)
2412 XPDLOC(SMGR,
"ProofServMgr::ResolveSession")
2414 TRACE(REQ, "resolving "<< (fpid ? fpid : "<nul>")<<" ...");
2417 if (!fpid || strlen(fpid)<= 0 || !(fMgr->ClientMgr()) || !fRecoverClients) {
2418 TRACE(XERR,
"invalid inputs: "<<(fpid ? fpid :
"<nul>")<<
", "<<fMgr->ClientMgr()<<
2419 ", "<<fRecoverClients);
2425 XPDFORM(path,
"%s/%s", fActiAdminPath.c_str(), fpid);
2428 XrdProofSessionInfo si(path.c_str());
2431 if (si.fSrvProtVers < 18) {
2432 TRACE(DBG,
"session does not support recovering: protocol "
2433 <<si.fSrvProtVers<<
" < 18");
2438 XrdProofdClient *c = fMgr->ClientMgr()->GetClient(si.fUser.c_str(), si.fGroup.c_str(),
2439 si.fUnixPath.c_str());
2441 TRACE(DBG,
"client instance not initialized");
2447 XrdProofdProofServ *xps = c->GetServObj(psid);
2449 TRACE(DBG,
"server object not initialized");
2454 si.FillProofServ(*xps, fMgr->ROOTMgr());
2455 if (xps->CreateUNIXSock(fEDest) != 0) {
2457 TRACE(XERR,
"failure creating UNIX socket on " << xps->UNIXSockPath());
2466 XrdSysMutexHelper mhp(fRecoverMutex);
2467 std::list<XpdClientSessions *>::iterator ii = fRecoverClients->begin();
2468 while (ii != fRecoverClients->end()) {
2469 if ((*ii)->fClient == c)
2473 if (ii != fRecoverClients->end()) {
2474 (*ii)->fProofServs.push_back(xps);
2476 XpdClientSessions *cl =
new XpdClientSessions(c);
2477 cl->fProofServs.push_back(xps);
2478 fRecoverClients->push_back(cl);
2488 int XrdProofdProofServMgr::Recover(XpdClientSessions *cl)
2490 XPDLOC(SMGR,
"ProofServMgr::Recover")
2493 TRACE(XERR,
"invalid input!");
2497 TRACE(DBG,
"client: "<< cl->fClient->User());
2501 XrdProofdProofServ *xps = 0;
2503 { XrdSysMutexHelper mhp(cl->fMutex); nps = cl->fProofServs.size(); }
2506 { XrdSysMutexHelper mhp(cl->fMutex); xps = cl->fProofServs.front();
2507 cl->fProofServs.remove(xps); cl->fProofServs.push_back(xps); }
2510 if (AcceptPeer(xps, 1, emsg) != 0) {
2511 if (emsg ==
"timeout") {
2512 TRACE(DBG,
"timeout while accepting callback");
2514 TRACE(XERR,
"problems accepting callback: "<<emsg);
2518 XrdOucString key; key += xps->SrvPID();
2519 fSessions.Add(key.c_str(), xps, 0, Hash_keepdata);
2520 fActiveSessions.push_back(xps);
2521 xps->Protocol()->SetAdminPath(xps->AdminPath());
2523 { XrdSysMutexHelper mhp(cl->fMutex); cl->fProofServs.remove(xps); }
2528 int pid = xps->SrvPID();
2530 { XrdSysMutexHelper mhp(cl->fMutex); left = cl->fProofServs.size(); }
2531 XPDPRT(
"session for "<<cl->fClient->User()<<
"."<<cl->fClient->Group()<<
2532 " successfully recovered ("<<left<<
" left); pid: "<<pid);
2541 #ifndef ROOT_XrdFour
2547 int XrdProofdProofServMgr::AcceptPeer(XrdProofdProofServ *xps,
2548 int to, XrdOucString &msg)
2550 XPDLOC(SMGR,
"ProofServMgr::AcceptPeer")
2553 XrdNetPeer peerpsrv;
2556 if (!xps || !xps->UNIXSock()) {
2557 XPDFORM(msg,
"session pointer undefined or socket invalid: %p", xps);
2560 TRACE(REQ,
"waiting for server callback for "<<to<<
" secs ... on "<<xps->UNIXSockPath());
2563 if (!(xps->UNIXSock()->Accept(peerpsrv, XRDNET_NODNTRIM, to))) {
2569 if (SetupProtocol(peerpsrv, xps, msg) != 0) {
2570 msg =
"could not assert connected peer: ";
2581 int XrdProofdProofServMgr::SetupProtocol(XrdNetPeer &peerpsrv,
2582 XrdProofdProofServ *xps, XrdOucString &msg)
2584 XPDLOC(SMGR,
"ProofServMgr::SetupProtocol")
2587 XrdLink *linkpsrv = 0;
2588 XrdProtocol *xp = 0;
2593 if (peerpsrv.InetName) free(peerpsrv.InetName);
2594 peerpsrv.InetName = XrdSysDNS::getHostName("localhost");
2597 if (!(linkpsrv = XrdLink::Alloc(peerpsrv, lnkopts))) {
2598 msg =
"could not allocate network object: ";
2604 peerpsrv.InetBuff = 0;
2605 TRACE(DBG,
"connection accepted: matching protocol ... ");
2607 XrdProofdProtocol *p =
new XrdProofdProtocol();
2608 if (!(xp = p->Match(linkpsrv))) {
2609 msg =
"match failed: protocol error: ";
2617 XrdOucString apath(xps->AdminPath());
2619 ((XrdProofdProtocol *)xp)->SetAdminPath(apath.c_str());
2621 if (xp->Process(linkpsrv) != 0) {
2622 msg =
"handshake with internal link failed: ";
2628 if (go && !XrdPoll::Attach(linkpsrv)) {
2629 msg =
"could not attach new internal link to poller: ";
2641 linkpsrv->setProtocol(xp);
2643 TRACE(REQ,
"Protocol "<<xp<<
" attached to link "<<linkpsrv<<
" ("<< peerpsrv.InetName <<
")");
2646 fMgr->Sched()->Schedule((XrdJob *)linkpsrv);
2649 xps->SetProtocol((XrdProofdProtocol *)xp);
2662 int XrdProofdProofServMgr::AcceptPeer(XrdProofdProofServ *xps,
2663 int to, XrdOucString &msg)
2665 XPDLOC(SMGR,
"ProofServMgr::AcceptPeer")
2671 if (!xps || !xps->UNIXSock()) {
2672 XPDFORM(msg,
"session pointer undefined or socket invalid: %p", xps);
2675 TRACE(REQ,
"waiting for server callback for "<<to<<
" secs ... on "<<xps->UNIXSockPath());
2678 if (!(xps->UNIXSock()->Accept(netaddr, 0, to))) {
2684 if (SetupProtocol(netaddr, xps, msg) != 0) {
2685 msg =
"could not assert connected peer: ";
2696 int XrdProofdProofServMgr::SetupProtocol(XrdNetAddr &netaddr,
2697 XrdProofdProofServ *xps, XrdOucString &msg)
2699 XPDLOC(SMGR,
"ProofServMgr::SetupProtocol")
2702 XrdLink *linkpsrv = 0;
2703 XrdProtocol *xp = 0;
2708 if (!(linkpsrv = XrdLink::Alloc(netaddr, lnkopts))) {
2709 msg =
"could not allocate network object: ";
2714 TRACE(DBG,
"connection accepted: matching protocol ... ");
2716 XrdProofdProtocol *p =
new XrdProofdProtocol();
2717 if (!(xp = p->Match(linkpsrv))) {
2718 msg =
"match failed: protocol error: ";
2726 XrdOucString apath(xps->AdminPath());
2728 ((XrdProofdProtocol *)xp)->SetAdminPath(apath.c_str());
2730 if (xp->Process(linkpsrv) != 0) {
2731 msg =
"handshake with internal link failed: ";
2737 if (go && !XrdPoll::Attach(linkpsrv)) {
2738 msg =
"could not attach new internal link to poller: ";
2750 linkpsrv->setProtocol(xp);
2752 TRACE(REQ,
"Protocol "<<xp<<
" attached to link "<<linkpsrv<<
" ("<< netaddr.Name() <<
")");
2755 fMgr->Sched()->Schedule((XrdJob *)linkpsrv);
2758 xps->SetProtocol((XrdProofdProtocol *)xp);
2769 int XrdProofdProofServMgr::Detach(XrdProofdProtocol *p)
2771 XPDLOC(SMGR,
"ProofServMgr::Detach")
2773 int psid = -1, rc = 0;
2774 XPD_SETRESP(p, "Detach");
2777 psid = ntohl(p->Request()->proof.sid);
2778 TRACEP(p, REQ, "psid: "<<psid);
2781 XrdProofdProofServ *xps = 0;
2782 if (!p->Client() || !(xps = p->Client()->GetServer(psid))) {
2783 TRACEP(p, XERR,
"session ID not found: "<<psid);
2784 response->Send(kXR_InvalidRequest,
"session ID not found");
2787 xps->FreeClientID(p->Pid());
2798 int XrdProofdProofServMgr::Destroy(XrdProofdProtocol *p)
2800 XPDLOC(SMGR,
"ProofServMgr::Destroy")
2802 int psid = -1, rc = 0;
2803 XPD_SETRESP(p, "Destroy");
2806 psid = ntohl(p->Request()->proof.sid);
2807 TRACEP(p, REQ, "psid: "<<psid);
2812 XrdProofdProofServ *xpsref = 0;
2815 if (!p->Client() || !(xpsref = p->Client()->GetServer(psid))) {
2816 TRACEP(p, XERR,
"reference session ID not found");
2817 response->Send(kXR_InvalidRequest,
"reference session ID not found");
2820 XPDFORM(msg,
"session %d destroyed by %s", xpsref->SrvPID(), p->Link()->ID);
2822 XPDFORM(msg,
"all sessions destroyed by %s", p->Link()->ID);
2826 p->Client()->TerminateSessions(kXPD_AnyServer, xpsref,
2827 msg.c_str(), Pipe(), fMgr->ChangeOwn());
2830 fDestroyTimes[p] = time(0);
2842 static int WriteSessEnvs(
const char *, XpdEnv *env,
void *s)
2844 XPDLOC(SMGR,
"WriteSessEnvs")
2848 XpdWriteEnv_t *xwe = (XpdWriteEnv_t *)s;
2850 if (env && xwe && xwe->fMgr && xwe->fClient && xwe->fEnv) {
2851 if (env->fEnv.length() > 0) {
2853 xwe->fMgr->ResolveKeywords(env->fEnv, xwe->fClient);
2855 char *ev =
new char[env->fEnv.length()+1];
2856 strncpy(ev, env->fEnv.c_str(), env->fEnv.length());
2857 ev[env->fEnv.length()] = 0;
2858 fprintf(xwe->fEnv,
"%s\n", ev);
2860 PutEnv(ev, xwe->fExport);
2865 emsg =
"some input undefined";
2869 TRACE(XERR,
"protocol error: "<<emsg);
2877 int XrdProofdProofServMgr::SetProofServEnvOld(XrdProofdProtocol *p,
void *input)
2879 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnvOld")
2884 if (!p || !p->Client() || !input) {
2885 TRACE(XERR,
"at leat one input is invalid - cannot continue");
2890 if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
2891 TRACE(XERR,
"problems setting basic environment - exit");
2895 ProofServEnv_t *in = (ProofServEnv_t *)input;
2898 XrdProofdProofServ *xps = in->fPS;
2900 TRACE(XERR,
"unable to get instance of proofserv proxy");
2903 int psid = xps->ID();
2904 TRACE(REQ,
"psid: "<<psid<<
", log: "<<in->fLogLevel);
2907 XrdOucString udir = p->Client()->Sandbox()->Dir();
2908 TRACE(DBG,
"working dir for "<<p->Client()->User()<<
" is: "<<udir);
2910 size_t len = strlen(
"ROOTPROOFSESSDIR=") + in->fWrkDir.length() + 2;
2912 snprintf(ev, len,
"ROOTPROOFSESSDIR=%s", in->fWrkDir.c_str());
2917 len = strlen(
"ROOTPROOFLOGLEVEL=") + 5;
2919 snprintf(ev, len,
"ROOTPROOFLOGLEVEL=%d", in->fLogLevel);
2924 len = strlen(
"ROOTPROOFORDINAL=")+strlen(xps->Ordinal()) + 2;
2926 snprintf(ev, len,
"ROOTPROOFORDINAL=%s", xps->Ordinal());
2931 len = strlen(
"ROOTVERSIONTAG=")+strlen(p->Client()->ROOT()->Tag())+2;
2933 snprintf(ev, len,
"ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
2938 TRACE(DBG,
"creating env file");
2939 XrdOucString envfile = in->fWrkDir;
2941 FILE *fenv = fopen(envfile.c_str(),
"w");
2944 "unable to open env file: "<<envfile);
2947 TRACE(DBG,
"environment file: "<< envfile);
2950 if (p->AuthProt()) {
2953 XrdOucString secenvs(getenv(
"XrdSecENVS"));
2954 if (secenvs.length() > 0) {
2958 while ((from = secenvs.tokenize(env, from,
',')) != -1) {
2959 if (env.length() > 0) {
2961 ev =
new char[env.length()+1];
2962 strncpy(ev, env.c_str(), env.length());
2963 ev[env.length()] = 0;
2965 fprintf(fenv,
"%s\n", ev);
2972 XrdSecCredentials *creds = p->AuthProt()->getCredentials();
2974 len = strlen(
"XrdSecCREDS=")+creds->size;
2975 ev =
new char[len + 1];
2976 strcpy(ev,
"XrdSecCREDS=");
2977 memcpy(ev + strlen(
"XrdSecCREDS="), creds->buffer, creds->size);
2980 TRACE(DBG,
"XrdSecCREDS set");
2982 XrdOucString credsdir = udir;
2983 credsdir +=
"/.creds";
2985 if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
2986 if ((*fCredsSaver)(creds, credsdir.c_str(), p->Client()->UI()) != 0) {
2987 TRACE(DBG,
"problems in saving authentication creds under "<<credsdir);
2990 TRACE(XERR,
"unable to create creds dir: "<<credsdir);
2999 fprintf(fenv,
"ROOTSYS=%s\n", xps->ROOT()->Dir());
3002 fprintf(fenv,
"ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
3005 fprintf(fenv,
"ROOTTMPDIR=%s\n", fMgr->TMPdir());
3008 fprintf(fenv,
"ROOTXPDPORT=%d\n", fMgr->Port());
3011 fprintf(fenv,
"ROOTPROOFWORKDIR=%s\n", udir.c_str());
3014 fprintf(fenv,
"ROOTPROOFSESSIONTAG=%s\n", in->fSessionTag.c_str());
3017 if (fMgr->NetMgr()->WorkerUsrCfg())
3018 fprintf(fenv,
"ROOTUSEUSERCFG=1\n");
3021 fprintf(fenv,
"ROOTOPENSOCK=%s\n", xps->UNIXSockPath());
3024 fprintf(fenv,
"ROOTENTITY=%s@%s\n", p->Client()->User(), p->Link()->Host());
3027 fprintf(fenv,
"ROOTSESSIONID=%d\n", psid);
3030 fprintf(fenv,
"ROOTCLIENTID=%d\n", p->CID());
3033 fprintf(fenv,
"ROOTPROOFCLNTVERS=%d\n", p->ProofProtocol());
3036 fprintf(fenv,
"ROOTPROOFORDINAL=%s\n", xps->Ordinal());
3039 if (getenv(
"ROOTVERSIONTAG"))
3040 fprintf(fenv,
"ROOTVERSIONTAG=%s\n", getenv(
"ROOTVERSIONTAG"));
3043 if (in->fCfg.length() > 0)
3044 fprintf(fenv,
"ROOTPROOFCFGFILE=%s\n", in->fCfg.c_str());
3047 fprintf(fenv,
"ROOTPROOFLOGFILE=%s\n", in->fLogFile.c_str());
3048 xps->SetFileout(in->fLogFile.c_str());
3051 { XrdSysMutexHelper mhp(fEnvsMutex);
3052 if (fProofServEnvs.size() > 0) {
3054 XrdOucHash<XpdEnv> sessenvs;
3055 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
3056 for ( ; ienvs != fProofServEnvs.end(); ++ienvs) {
3057 int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
3058 p->Client()->ROOT()->VersionCode());
3059 if (envmatch >= 0) {
3060 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3062 int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
3063 p->Client()->ROOT()->VersionCode());
3064 if (envmatch > envmtcex) {
3067 sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
3072 sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
3074 TRACE(HDBG,
"Adding: "<<(*ienvs).fEnv);
3077 XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv, in->fOld};
3078 sessenvs.Apply(WriteSessEnvs, (
void *)&xpwe);
3083 if (xps->UserEnvs() &&
3084 strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),
"=")) {
3086 XrdOucString ue = xps->UserEnvs();
3087 XrdOucString env, namelist;
3088 int from = 0, ieq = -1;
3089 while ((from = ue.tokenize(env, from,
',')) != -1) {
3090 if (env.length() > 0 && (ieq = env.find(
'=')) != -1) {
3092 ResolveKeywords(env, in);
3093 ev =
new char[env.length()+1];
3094 strncpy(ev, env.c_str(), env.length());
3095 ev[env.length()] = 0;
3097 fprintf(fenv,
"%s\n", ev);
3100 if (namelist.length() > 0)
3106 len = strlen(
"PROOF_ALLVARS=") + namelist.length() + 2;
3108 snprintf(ev, len,
"PROOF_ALLVARS=%s", namelist.c_str());
3110 fprintf(fenv,
"%s\n", ev);
3118 TRACE(DBG,
"creating symlink");
3119 XrdOucString syml = udir;
3120 if (p->ConnType() == kXPD_MasterWorker)
3121 syml +=
"/last-worker-session";
3123 syml +=
"/last-master-session";
3124 if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
3125 TRACE(XERR,
"problems creating symlink to last session (errno: "<<errno<<
")");
3136 int XrdProofdProofServMgr::SetProofServEnv(XrdProofdManager *mgr, XrdROOT *r)
3138 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnv")
3143 TRACE(REQ, "ROOT dir: "<< (r ? r->Dir() : "*** undef ***"));
3146 char *libdir = (
char *) r->LibDir();
3148 if (mgr->BareLibPath() && strlen(mgr->BareLibPath()) > 0) {
3149 len = 32 + strlen(libdir) + strlen(mgr->BareLibPath());
3150 ldpath =
new char[len];
3151 snprintf(ldpath, len,
"%s=%s:%s", XPD_LIBPATH, libdir, mgr->BareLibPath());
3153 len = 32 + strlen(libdir);
3154 ldpath =
new char[len];
3155 snprintf(ldpath, len,
"%s=%s", XPD_LIBPATH, libdir);
3159 char *rootsys = (
char *) r->Dir();
3160 len = 15 + strlen(rootsys);
3162 snprintf(ev, len,
"ROOTSYS=%s", rootsys);
3166 char *bindir = (
char *) r->BinDir();
3167 len = 15 + strlen(bindir);
3169 snprintf(ev, len,
"ROOTBINDIR=%s", bindir);
3173 char *confdir = (
char *) r->DataDir();
3174 len = 20 + strlen(confdir);
3176 snprintf(ev, len,
"ROOTCONFDIR=%s", confdir);
3180 len = 20 + strlen(mgr->TMPdir());
3182 snprintf(ev, len,
"TMPDIR=%s", mgr->TMPdir());
3190 TRACE(XERR,
"XrdROOT instance undefined!");
3196 void XrdProofdProofServMgr::FormFileNameInSessionDir(XrdProofdProtocol *p,
3197 XrdProofdProofServ *xps,
3198 const char *sessiondir,
3199 const char *extension,
3200 XrdOucString &outfn)
3202 XrdOucString host = fMgr->Host();
3203 XrdOucString ord = xps->Ordinal();
3207 if (host.find(
".") != STR_NPOS)
3208 host.erase(host.find(
"."));
3210 if (p->ConnType() == kXPD_MasterWorker) role =
"worker";
3211 else role =
"master";
3216 XPDFORM(outfn,
"%s/%s-%s-%s.%s",
3228 void XrdProofdProofServMgr::GetTagDirs(
int pid,
3229 XrdProofdProtocol *p, XrdProofdProofServ *xps,
3230 XrdOucString &sesstag, XrdOucString &topsesstag,
3231 XrdOucString &sessiondir, XrdOucString &sesswrkdir)
3233 XPDLOC(SMGR,
"GetTagDirs")
3236 XrdOucString udir = p->Client()->Sandbox()->Dir();
3241 XrdOucString host = fMgr->Host();
3242 if (host.find(
".") != STR_NPOS)
3243 host.erase(host.find(
"."));
3244 XPDFORM(sesstag,
"%s-%d-", host.c_str(), (int)time(0));
3248 if (p->ConnType() == kXPD_ClientMaster) {
3249 sessiondir +=
"/session-";
3250 sessiondir += sesstag;
3251 topsesstag = sesstag;
3254 sessiondir += xps->Tag();
3255 topsesstag = xps->Tag();
3256 topsesstag.replace(
"session-",
"");
3258 if (XrdProofdAux::AssertDir(sessiondir.c_str(), p->Client()->UI(),
3259 fMgr->ChangeOwn()) == -1) {
3260 TRACE(XERR,
"problems asserting dir '"<<sessiondir<<
"' - errno: "<<errno);
3265 }
else if (pid > 0) {
3271 if (p->ConnType() == kXPD_ClientMaster) {
3272 topsesstag = sesstag;
3274 xps->SetTag(sesstag.c_str());
3278 if (pid == (
int) getpid()) {
3279 if (XrdProofdAux::AssertDir(sessiondir.c_str(), p->Client()->UI(),
3280 fMgr->ChangeOwn()) == -1) {
3286 sesswrkdir = sessiondir;
3287 if (p->ConnType() == kXPD_MasterWorker) {
3288 XPDFORM(sesswrkdir,
"%s/worker-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
3290 XPDFORM(sesswrkdir,
"%s/master-%s-%s", sessiondir.c_str(), xps->Ordinal(), sesstag.c_str());
3293 TRACE(XERR,
"negative pid ("<<pid<<
"): should not have got here!");
3303 static int WriteSessRCs(
const char *, XpdEnv *erc,
void *f)
3305 XPDLOC(SMGR,
"WriteSessRCs")
3308 FILE *frc = (FILE *)f;
3310 XrdOucString rc = erc->fEnv;
3311 if (rc.length() > 0) {
3312 if (rc.find(
"Proof.DataSetManager") != STR_NPOS) {
3313 TRACE(ALL,
"Proof.DataSetManager ignored: use xpd.datasetsrc to define dataset managers");
3315 fprintf(frc,
"%s\n", rc.c_str());
3321 emsg =
"file or input entry undefined";
3325 TRACE(XERR,
"protocol error: "<<emsg);
3332 int XrdProofdProofServMgr::SetProofServEnv(XrdProofdProtocol *p,
void *input)
3334 XPDLOC(SMGR,
"ProofServMgr::SetProofServEnv")
3337 if (!p || !p->Client() || !input) {
3338 TRACE(XERR,
"at leat one input is invalid - cannot continue");
3343 int rootvers = p->Client()->ROOT() ? p->Client()->ROOT()->SrvProtVers() : -1;
3344 TRACE(DBG,
"rootvers: "<< rootvers);
3345 if (rootvers < 14 && rootvers > -1)
3346 return SetProofServEnvOld(p, input);
3348 ProofServEnv_t *in = (ProofServEnv_t *)input;
3351 XrdProofdProofServ *xps = in->fPS;
3353 TRACE(XERR,
"unable to get instance of proofserv proxy");
3356 int psid = xps->ID();
3357 TRACE(REQ,
"psid: "<<psid<<
", log: "<<in->fLogLevel);
3360 XrdOucString udir = p->Client()->Sandbox()->Dir();
3361 TRACE(DBG,
"sandbox for "<<p->Client()->User()<<
" is: "<<udir);
3362 TRACE(DBG,
"session unique tag "<<in->fSessionTag);
3363 TRACE(DBG,
"session dir " << in->fSessionDir);
3364 TRACE(DBG,
"session working dir:" << in->fWrkDir);
3367 if (XrdProofdAux::ChangeToDir(in->fSessionDir.c_str(), p->Client()->UI(),
3368 fMgr->ChangeOwn()) != 0) {
3369 TRACE(XERR,
"couldn't change directory to " << in->fSessionDir);
3374 if (SetProofServEnv(fMgr, p->Client()->ROOT()) != 0) {
3375 TRACE(XERR,
"problems setting basic environment - exit");
3380 TRACE(DBG,
"creating rc and env files");
3381 XrdOucString rcfile, envfile;
3382 FormFileNameInSessionDir(p, xps, in->fSessionDir.c_str(),
"rootrc", rcfile);
3383 if (CreateProofServRootRc(p, in, rcfile.c_str()) != 0) {
3384 TRACE(XERR,
"problems creating RC file "<<rcfile.c_str());
3388 FormFileNameInSessionDir(p, xps, in->fSessionDir.c_str(),
"env", envfile);
3389 if (CreateProofServEnvFile(p, in, envfile.c_str(), rcfile.c_str()) != 0) {
3390 TRACE(XERR,
"problems creating environment file "<<envfile.c_str());
3396 TRACE(REQ,
"creating symlink");
3397 XrdOucString syml = udir;
3398 if (p->ConnType() == kXPD_MasterWorker)
3399 syml +=
"/last-worker-session";
3401 syml +=
"/last-master-session";
3402 if (XrdProofdAux::SymLink(in->fSessionDir.c_str(), syml.c_str()) != 0) {
3403 TRACE(XERR,
"problems creating symlink to "
3404 " last session (errno: "<<errno<<
")");
3417 int XrdProofdProofServMgr::CreateProofServEnvFile(XrdProofdProtocol *p,
void *input,
3418 const char *envfn,
const char *rcfn)
3420 XPDLOC(SMGR,
"ProofServMgr::CreateProofServEnvFile")
3423 if (!p || !input || (!envfn ||
3424 (envfn && strlen(envfn) <= 0)) || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3425 TRACE(XERR,
"invalid inputs!");
3430 ProofServEnv_t *in = (ProofServEnv_t *)input;
3433 XrdProofdProofServ *xps = in->fPS;
3435 TRACE(XERR,
"unable to get instance of proofserv proxy");
3439 FILE *fenv = fopen(envfn,
"w");
3441 TRACE(XERR,
"unable to open env file: "<<envfn);
3444 TRACE(REQ,
"environment file: "<< envfn);
3449 if (p->AuthProt()) {
3452 XrdOucString secenvs(getenv(
"XrdSecENVS"));
3453 if (secenvs.length() > 0) {
3457 while ((from = secenvs.tokenize(env, from,
',')) != -1) {
3458 if (env.length() > 0) {
3460 ev =
new char[env.length()+1];
3461 strncpy(ev, env.c_str(), env.length());
3462 ev[env.length()] = 0;
3463 fprintf(fenv,
"%s\n", ev);
3465 PutEnv(ev, in->fOld);
3471 XrdSecCredentials *creds = p->AuthProt()->getCredentials();
3473 int lev = strlen(
"XrdSecCREDS=") + creds->size;
3474 ev =
new char[lev+1];
3475 strncpy(ev,
"XrdSecCREDS=", lev);
3476 memcpy(ev+strlen(
"XrdSecCREDS="), creds->buffer, creds->size);
3478 PutEnv(ev, in->fOld);
3479 TRACE(DBG,
"XrdSecCREDS set");
3482 XrdOucString credsdir = p->Client()->Sandbox()->Dir();
3483 credsdir +=
"/.creds";
3485 if (!XrdProofdAux::AssertDir(credsdir.c_str(), p->Client()->UI(), fMgr->ChangeOwn())) {
3486 if ((*fCredsSaver)(creds, credsdir.c_str(), p->Client()->UI()) != 0) {
3487 TRACE(DBG,
"problems in saving authentication creds under "<<credsdir);
3490 TRACE(XERR,
"unable to create creds dir: "<<credsdir);
3499 fprintf(fenv,
"%s=%s\n", XPD_LIBPATH, getenv(XPD_LIBPATH));
3502 fprintf(fenv,
"ROOTSYS=%s\n", xps->ROOT()->Dir());
3505 fprintf(fenv,
"ROOTCONFDIR=%s\n", xps->ROOT()->Dir());
3508 fprintf(fenv,
"TMPDIR=%s\n", fMgr->TMPdir());
3512 len = strlen(
"ROOTRCFILE=") + strlen(rcfn) + 2;
3514 snprintf(ev, len,
"ROOTRCFILE=%s", rcfn);
3515 fprintf(fenv,
"%s\n", ev);
3517 PutEnv(ev, in->fOld);
3521 len = strlen(
"ROOTVERSIONTAG=") + strlen(p->Client()->ROOT()->Tag()) + 2;
3523 snprintf(ev, len,
"ROOTVERSIONTAG=%s", p->Client()->ROOT()->Tag());
3524 fprintf(fenv,
"%s\n", ev);
3526 PutEnv(ev, in->fOld);
3530 len = strlen(
"ROOTPROOFLOGFILE=") + in->fLogFile.length() + 2;
3532 snprintf(ev, len,
"ROOTPROOFLOGFILE=%s", in->fLogFile.c_str());
3533 fprintf(fenv,
"%s\n", ev);
3534 xps->SetFileout(in->fLogFile.c_str());
3536 PutEnv(ev, in->fOld);
3540 XrdOucString locdatasrv;
3541 XPDFORM(locdatasrv,
"root://%s", fMgr->Host());
3543 int nrk = fMgr->ResolveKeywords(locdatasrv, p->Client());
3544 TRACE(HDBG, nrk <<
" placeholders resolved for LOCALDATASERVER");
3545 len = strlen(
"LOCALDATASERVER=") + locdatasrv.length() + 2;
3547 snprintf(ev, len,
"LOCALDATASERVER=%s", locdatasrv.c_str());
3548 fprintf(fenv,
"%s\n", ev);
3550 PutEnv(ev, in->fOld);
3554 len = strlen(
"XRDCF=") + strlen(CfgFile()) + 2;
3556 snprintf(ev, len,
"XRDCF=%s", CfgFile());
3557 fprintf(fenv,
"%s\n", ev);
3559 PutEnv(ev, in->fOld);
3563 { XrdSysMutexHelper mhp(fEnvsMutex);
3564 if (fProofServEnvs.size() > 0) {
3566 XrdOucHash<XpdEnv> sessenvs;
3567 std::list<XpdEnv>::iterator ienvs = fProofServEnvs.begin();
3568 for ( ; ienvs != fProofServEnvs.end(); ++ienvs) {
3569 int envmatch = (*ienvs).Matches(p->Client()->User(), p->Client()->Group(),
3570 p->Client()->ROOT()->VersionCode());
3571 if (envmatch >= 0) {
3572 XpdEnv *env = sessenvs.Find((*ienvs).fName.c_str());
3574 int envmtcex = env->Matches(p->Client()->User(), p->Client()->Group(),
3575 p->Client()->ROOT()->VersionCode());
3576 if (envmatch > envmtcex) {
3579 sessenvs.Rep(env->fName.c_str(), env, 0, Hash_keepdata);
3584 sessenvs.Add(env->fName.c_str(), env, 0, Hash_keepdata);
3586 TRACE(HDBG,
"Adding: "<<(*ienvs).fEnv);
3589 XpdWriteEnv_t xpwe = {fMgr, p->Client(), fenv, in->fOld};
3590 sessenvs.Apply(WriteSessEnvs, (
void *)&xpwe);
3594 if (xps->UserEnvs() &&
3595 strlen(xps->UserEnvs()) && strstr(xps->UserEnvs(),
"=")) {
3597 XrdOucString ue = xps->UserEnvs();
3598 XrdOucString env, namelist;
3599 int from = 0, ieq = -1;
3600 while ((from = ue.tokenize(env, from,
',')) != -1) {
3601 if (env.length() > 0 && (ieq = env.find(
'=')) != -1) {
3603 ResolveKeywords(env, in);
3604 ev =
new char[env.length()+1];
3605 strncpy(ev, env.c_str(), env.length());
3606 ev[env.length()] = 0;
3607 if (env.find(
"WRAPPERCMD") == STR_NPOS || !xps->IsPLite())
3608 fprintf(fenv,
"%s\n", ev);
3610 PutEnv(ev, in->fOld);
3611 if (env.find(
"WRAPPERCMD") == STR_NPOS || !xps->IsPLite()) {
3613 if (namelist.length() > 0)
3620 len = strlen(
"PROOF_ALLVARS=") + namelist.length() + 2;
3622 snprintf(ev, len,
"PROOF_ALLVARS=%s", namelist.c_str());
3623 fprintf(fenv,
"%s\n", ev);
3625 PutEnv(ev, in->fOld);
3639 int XrdProofdProofServMgr::CreateProofServRootRc(XrdProofdProtocol *p,
3640 void *input,
const char *rcfn)
3642 XPDLOC(SMGR,
"ProofServMgr::CreateProofServRootRc")
3645 if (!p || !input || (!rcfn || (rcfn && strlen(rcfn) <= 0))) {
3646 TRACE(XERR,
"invalid inputs!");
3651 ProofServEnv_t *in = (ProofServEnv_t *)input;
3654 XrdProofdProofServ *xps = in->fPS;
3656 TRACE(XERR,
"unable to get instance of proofserv proxy");
3659 int psid = xps->ID();
3661 FILE *frc = fopen(rcfn,
"w");
3663 TRACE(XERR,
"unable to open rootrc file: "<<rcfn);
3668 if (XrdProofdAux::SymLink(rcfn,
"session.rootrc") != 0) {
3669 TRACE(XERR,
"problems creating symlink to 'session.rootrc' (errno: "<<errno<<
")");
3672 TRACE(REQ,
"session rootrc file: "<< rcfn);
3675 fprintf(frc,
"# XrdProofdProtocol listening port\n");
3676 fprintf(frc,
"ProofServ.XpdPort: %d\n", fMgr->Port());
3679 if (fMgr->LocalROOT() && strlen(fMgr->LocalROOT()) > 0) {
3680 fprintf(frc,
"# Prefix to be prepended to local paths\n");
3681 fprintf(frc,
"Path.Localroot: %s\n", fMgr->LocalROOT());
3685 if (fMgr->PoolURL() && strlen(fMgr->PoolURL()) > 0) {
3686 XrdOucString purl(fMgr->PoolURL());
3687 if (!purl.endswith(
"/"))
3689 fprintf(frc,
"# URL for the data pool entry-point\n");
3690 fprintf(frc,
"ProofServ.PoolUrl: %s\n", purl.c_str());
3695 fprintf(frc,
"# The session working dir\n");
3696 fprintf(frc,
"ProofServ.SessionDir: %s\n", in->fWrkDir.c_str());
3700 fprintf(frc,
"# Proof Log/Debug level\n");
3701 fprintf(frc,
"Proof.DebugLevel: %d\n", in->fLogLevel);
3704 fprintf(frc,
"# Ordinal number\n");
3705 fprintf(frc,
"ProofServ.Ordinal: %s\n", xps->Ordinal());
3708 if (p->Client()->ROOT()) {
3709 fprintf(frc,
"# ROOT Version tag\n");
3710 fprintf(frc,
"ProofServ.RootVersionTag: %s\n", p->Client()->ROOT()->Tag());
3713 if (p->Client()->Group()) {
3714 fprintf(frc,
"# Proof group\n");
3715 fprintf(frc,
"ProofServ.ProofGroup: %s\n", p->Client()->Group());
3719 if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->GetCfgFile()) {
3720 fprintf(frc,
"# File with group information\n");
3721 fprintf(frc,
"Proof.GroupFile: %s\n", fMgr->GroupsMgr()->GetCfgFile());
3725 XrdOucString udir = p->Client()->Sandbox()->Dir();
3726 fprintf(frc,
"# Users sandbox\n");
3727 fprintf(frc,
"ProofServ.Sandbox: %s\n", udir.c_str());
3730 if (fMgr->Image() && strlen(fMgr->Image()) > 0) {
3731 fprintf(frc,
"# Server image\n");
3732 fprintf(frc,
"ProofServ.Image: %s\n", fMgr->Image());
3737 fprintf(frc,
"# Session tag\n");
3738 fprintf(frc,
"ProofServ.SessionTag: %s\n", in->fSessionTag.c_str());
3739 fprintf(frc,
"# Top Session tag\n");
3740 fprintf(frc,
"ProofServ.TopSessionTag: %s\n", in->fTopSessionTag.c_str());
3744 fprintf(frc,
"# Session admin path\n");
3745 int proofvrs = (p->Client()->ROOT()) ? p->Client()->ROOT()->SrvProtVers() : -1;
3746 if (proofvrs < 0 || proofvrs < 27) {
3748 fprintf(frc,
"ProofServ.AdminPath: %s\n", xps->AdminPath());
3752 fprintf(frc,
"ProofServ.AdminPath: %s.status\n", xps->AdminPath());
3757 if (fMgr->NetMgr()->WorkerUsrCfg()) {
3758 fprintf(frc,
"# Whether user specific config files are enabled\n");
3759 fprintf(frc,
"ProofServ.UseUserCfg: 1\n");
3762 fprintf(frc,
"# Open socket\n");
3763 fprintf(frc,
"ProofServ.OpenSock: %s\n", xps->UNIXSockPath());
3765 fprintf(frc,
"# Entity\n");
3766 if (p->Client()->UI().fGroup.length() > 0)
3767 fprintf(frc,
"ProofServ.Entity: %s:%s@%s\n",
3768 p->Client()->User(), p->Client()->UI().fGroup.c_str(), p->Link()->Host());
3770 fprintf(frc,
"ProofServ.Entity: %s@%s\n", p->Client()->User(), p->Link()->Host());
3774 fprintf(frc,
"# Session ID\n");
3775 fprintf(frc,
"ProofServ.SessionID: %d\n", psid);
3778 fprintf(frc,
"# Client ID\n");
3779 fprintf(frc,
"ProofServ.ClientID: %d\n", p->CID());
3782 fprintf(frc,
"# Client Protocol\n");
3783 fprintf(frc,
"ProofServ.ClientVersion: %d\n", p->ProofProtocol());
3786 if (in->fCfg.length() > 0) {
3787 if (in->fCfg ==
"masteronly") {
3788 fprintf(frc,
"# MasterOnly option\n");
3790 fprintf(frc,
"Proof.MasterOnly: 1\n");
3792 fprintf(frc,
"# Config file\n");
3794 fprintf(frc,
"ProofServ.ProofConfFile: %s\n", in->fCfg.c_str());
3797 fprintf(frc,
"# Config file\n");
3798 if (fMgr->IsSuperMst()) {
3799 fprintf(frc,
"ProofServ.ProofConfFile: sm:\n");
3800 }
else if (xps->IsPLite()) {
3801 fprintf(frc,
"ProofServ.ProofConfFile: lite:\n");
3802 fprintf(frc,
"# Number of ProofLite workers\n");
3803 fprintf(frc,
"ProofLite.Workers: %d\n", xps->PLiteNWrks());
3804 fprintf(frc,
"# Users sandbox\n");
3805 fprintf(frc,
"ProofLite.Sandbox: %s\n", udir.c_str());
3806 fprintf(frc,
"# No subpaths\n");
3807 fprintf(frc,
"ProofLite.SubPath: 0\n");
3808 }
else if (fProofPlugin.length() > 0) {
3809 fprintf(frc,
"ProofServ.ProofConfFile: %s\n", fProofPlugin.c_str());
3815 fprintf(frc,
"# Default settings for XrdClient\n");
3816 fprintf(frc,
"XNet.FirstConnectMaxCnt 3\n");
3817 fprintf(frc,
"XNet.ConnectTimeout 5\n");
3820 int vrscode = (p->Client()->ROOT()) ? p->Client()->ROOT()->VersionCode() : -1;
3821 if (vrscode > 0 && vrscode < XrdROOT::GetVersionCode(5,24,0)) {
3822 fprintf(frc,
"# Force remote reading also for local files to avoid a wrong TTreeCache initialization\n");
3823 fprintf(frc,
"Path.ForceRemote 1\n");
3827 { XrdSysMutexHelper mhp(fEnvsMutex);
3828 if (fProofServRCs.size() > 0) {
3829 fprintf(frc,
"# Additional rootrcs (xpd.putrc directives)\n");
3831 XrdOucHash<XpdEnv> sessrcs;
3832 std::list<XpdEnv>::iterator ircs = fProofServRCs.begin();
3833 for ( ; ircs != fProofServRCs.end(); ++ircs) {
3834 int rcmatch = (*ircs).Matches(p->Client()->User(), p->Client()->Group(),
3835 p->Client()->ROOT()->VersionCode());
3837 XpdEnv *rcenv = sessrcs.Find((*ircs).fName.c_str());
3839 int rcmtcex = rcenv->Matches(p->Client()->User(), p->Client()->Group(),
3840 p->Client()->ROOT()->VersionCode());
3841 if (rcmatch > rcmtcex) {
3844 sessrcs.Rep(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
3849 sessrcs.Add(rcenv->fName.c_str(), rcenv, 0, Hash_keepdata);
3851 TRACE(HDBG,
"Adding: "<<(*ircs).fEnv);
3854 sessrcs.Apply(WriteSessRCs, (
void *)frc);
3858 if (fMgr->DataSetSrcs()->size() > 0) {
3859 fprintf(frc,
"# Dataset sources\n");
3860 XrdOucString rc(
"Proof.DataSetManager: ");
3861 std::list<XrdProofdDSInfo *>::iterator ii;
3862 for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ++ii) {
3863 if (ii != fMgr->DataSetSrcs()->begin()) rc +=
", ";
3870 rc += (*ii)->fObscure;
3872 fprintf(frc,
"%s\n", rc.c_str());
3876 if (strlen(fMgr->StageReqRepo()) > 0) {
3877 fprintf(frc,
"# Dataset staging requests repository\n");
3878 fprintf(frc,
"Proof.DataSetStagingRequests: %s\n", fMgr->StageReqRepo());
3882 if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0) {
3883 fprintf(frc,
"# Data directory\n");
3885 XPDFORM(rc,
"ProofServ.DataDir: %s/%s/%s/%s/%s", fMgr->DataDir(),
3886 p->Client()->Group(), p->Client()->User(), xps->Ordinal(),
3887 in->fSessionTag.c_str());
3888 if (fMgr->DataDirUrlOpts() && strlen(fMgr->DataDirUrlOpts()) > 0) {
3889 fprintf(frc,
"%s %s\n", rc.c_str(), fMgr->DataDirUrlOpts());
3891 fprintf(frc,
"%s\n", rc.c_str());
3909 int XrdProofdProofServMgr::CleanupLostProofServ()
3911 XPDLOC(SMGR,
"ProofServMgr::CleanupLostProofServ")
3914 TRACE(REQ,
"disabled ...");
3918 TRACE(REQ,
"checking for orphalin proofserv processes ...");
3922 std::map<int,XrdOucString> procs;
3923 if (XrdProofdAux::GetProcesses(
"proofserv", &procs) <= 0) {
3924 TRACE(DBG,
" no proofservs around: nothing to do");
3929 if (XrdProofdAux::GetUserInfo(fMgr->EffectiveUser(), ui) != 0) {
3930 TRACE(DBG,
"problems getting info for user " << fMgr->EffectiveUser());
3935 XrdOucRash<int, int> controlled, xrdproc;
3938 XrdOucHash<XrdOucString> sessionspaths;
3942 XrdOucString cmd, apath, pidpath, sessiondir, emsg, rest, after;
3943 std::map<int,XrdOucString>::iterator ip;
3944 for (ip = procs.begin(); ip != procs.end(); ++ip) {
3947 if ((ia = cmd.find(
"xpdpath:")) != STR_NPOS) {
3948 cmd.tokenize(apath, ia,
' ');
3949 apath.replace(
"xpdpath:",
"");
3950 if (apath.length() <= 0) {
3951 TRACE(ALL,
"admin path not found; initial cmd line: "<<cmd);
3955 XPDFORM(pidpath,
"%s/xrootd.pid", apath.c_str());
3956 TRACE(ALL,
"pidpath: "<<pidpath);
3957 int xpid = XrdProofdAux::GetIDFromPath(pidpath.c_str(), emsg);
3958 int *alive = xrdproc.Find(xpid);
3960 a = (XrdProofdAux::VerifyProcessByID(xpid, fParentExecs.c_str())) ? 1 : 0;
3961 xrdproc.Add(xpid, a);
3969 const char *subdir[2] = {
"activesessions",
"terminatedsessions"};
3970 for (
int i = 0; i < 2; i++) {
3971 XPDFORM(sessiondir,
"%s/%s", apath.c_str(), subdir[i]);
3972 if (!sessionspaths.Find(sessiondir.c_str())) {
3973 DIR *sdir = opendir(sessiondir.c_str());
3975 XPDFORM(emsg,
"cannot open '%s' - errno: %d", apath.c_str(), errno);
3976 TRACE(XERR, emsg.c_str());
3979 struct dirent *sent = 0;
3980 while ((sent = readdir(sdir))) {
3981 if (!strncmp(sent->d_name,
".", 1) || !strncmp(sent->d_name,
"..", 2))
3984 int ppid = XrdProofdAux::ParsePidPath(sent->d_name, rest, after);
3986 controlled.Add(ppid, ppid);
3989 sessionspaths.Add(sessiondir.c_str(), 0, 0, Hash_data_is_key);
3991 ok = (controlled.Find(pid)) ? 1 : ok;
3998 TRACE(ALL,
"process: "<<pid<<
" lost its controller: killing");
3999 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
4019 int XrdProofdProofServMgr::CleanupProofServ(
bool all,
const char *usr)
4021 XPDLOC(SMGR,
"ProofServMgr::CleanupProofServ")
4023 TRACE(REQ, "all: "<<all<<", usr: " << (usr ? usr : "undef"));
4027 const
char *pn = "proofserv";
4034 TRACE(DBG,
"usr must be defined for all = FALSE");
4037 if (XrdProofdAux::GetUserInfo(usr, ui) != 0) {
4038 TRACE(DBG,
"problems getting info for user " << usr);
4046 DIR *dir = opendir(
"/proc");
4048 XrdOucString emsg(
"cannot open /proc - errno: ");
4050 TRACE(DBG, emsg.c_str());
4054 struct dirent *ent = 0;
4055 while ((ent = readdir(dir))) {
4056 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
4057 if (DIGIT(ent->d_name[0])) {
4058 XrdOucString fn(
"/proc/", 256);
4062 FILE *ffn = fopen(fn.c_str(),
"r");
4064 XrdOucString emsg(
"cannot open file ");
4065 emsg += fn; emsg +=
" - errno: "; emsg += errno;
4070 bool xname = 1, xpid = 1, xppid = 1;
4071 bool xuid = (all) ? 0 : 1;
4074 char line[2048] = { 0 };
4075 while (fgets(line,
sizeof(line), ffn) &&
4076 (xname || xpid || xppid || xuid)) {
4078 if (xname && strstr(line,
"Name:")) {
4079 if (!strstr(line, pn))
4083 if (xpid && strstr(line,
"Pid:")) {
4084 pid = (int) XrdProofdAux::GetLong(&line[strlen(
"Pid:")]);
4087 if (xppid && strstr(line,
"PPid:")) {
4088 ppid = (int) XrdProofdAux::GetLong(&line[strlen(
"PPid:")]);
4090 if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
4095 if (xuid && strstr(line,
"Uid:")) {
4096 int uid = (int) XrdProofdAux::GetLong(&line[strlen(
"Uid:")]);
4104 if (!xname && !xpid && !xppid && !xuid) {
4107 if (fMgr->MultiUser() && !all) {
4111 XrdProofdProofServ *srv = GetActiveSession(pid);
4112 if (!srv || (srv && !strcmp(usr, srv->Client())))
4116 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
4124 #elif defined(__sun)
4127 DIR *dir = opendir(
"/proc");
4129 XrdOucString emsg(
"cannot open /proc - errno: ");
4135 struct dirent *ent = 0;
4136 while ((ent = readdir(dir))) {
4137 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
continue;
4138 if (DIGIT(ent->d_name[0])) {
4139 XrdOucString fn(
"/proc/", 256);
4143 int ffd = open(fn.c_str(), O_RDONLY);
4145 XrdOucString emsg(
"cannot open file ");
4146 emsg += fn; emsg +=
" - errno: "; emsg += errno;
4152 bool xuid = (all) ? 0 : 1;
4156 if (read(ffd, &psi,
sizeof(psinfo_t)) !=
sizeof(psinfo_t)) {
4157 XrdOucString emsg(
"cannot read ");
4158 emsg += fn; emsg +=
": errno: "; emsg += errno;
4168 if (!strstr(psi.pr_fname, pn))
4174 if (refuid == psi.pr_uid)
4178 int ppid = psi.pr_ppid;
4179 if (ppid != getpid() && XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str())) {
4186 if (!xname && !xppid && !xuid) {
4188 if (fMgr->MultiUser() && !all) {
4192 XrdProofdProofServ *srv = GetActiveSession(psi.pr_pid);
4193 if (!srv || (srv && !strcmp(usr, srv->Client())))
4197 if (XrdProofdAux::KillProcess(psi.pr_pid, 1, ui, fMgr->ChangeOwn()) == 0)
4205 #elif defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__APPLE__)
4211 if ((ern = XrdProofdAux::GetMacProcList(&pl, np)) != 0) {
4212 XrdOucString emsg(
"cannot get the process list: errno: ");
4221 if (strstr(pl[ii].kp_proc.p_comm, pn)) {
4222 if (all || (
int)(pl[ii].kp_eproc.e_ucred.cr_uid) == refuid) {
4224 int ppid = pl[ii].kp_eproc.e_ppid;
4226 if (ppid != getpid()) {
4229 if (strstr(pl[jj].kp_proc.p_comm,
"xrootd") &&
4230 pl[jj].kp_proc.p_pid == ppid) {
4238 if (fMgr->MultiUser() && !all) {
4242 XrdProofdProofServ *srv = GetActiveSession(pl[np].kp_proc.p_pid);
4243 if (!srv || (srv && !strcmp(usr, srv->Client())))
4248 if (XrdProofdAux::KillProcess(pl[np].kp_proc.p_pid, 1, ui, fMgr->ChangeOwn()))
4260 XrdOucString cmd =
"ps ";
4264 const char *cusr = (usr && strlen(usr) && fSuperUser) ? usr : fPClient->ID();
4266 const char *cusr = (usr && strlen(usr)) ? usr : 0;
4280 cmd +=
" | grep proofserv 2>/dev/null";
4284 snprintf(cpid, 10,
"%d", getpid());
4287 XrdOucString pids =
":";
4288 FILE *fp = popen(cmd.c_str(),
"r");
4290 char line[2048] = { 0 };
4291 while (fgets(line,
sizeof(line), fp)) {
4293 char *px = strstr(line,
"xpd");
4297 char *pi = strstr(px+3, cpid);
4301 int ppid = (int) XrdProofdAux::GetLong(pi);
4302 TRACE(HDBG,
"found alternative parent ID: "<< ppid);
4304 if (XrdProofdAux::VerifyProcessByID(ppid, fParentExecs.c_str()))
4310 from += strlen(cusr);
4311 int pid = (int) XrdProofdAux::GetLong(&line[from]);
4313 if (fMgr->MultiUser() && !all) {
4317 XrdProofdProofServ *srv = GetActiveSession(pid);
4318 if (!srv || (srv && !strcmp(usr, srv->Client())))
4323 if (XrdProofdAux::KillProcess(pid, 1, ui, fMgr->ChangeOwn()) == 0)
4341 int XrdProofdProofServMgr::SetUserOwnerships(XrdProofdProtocol *p,
4342 const char *ord,
const char *stag)
4344 XPDLOC(SMGR,
"ProofServMgr::SetUserOwnerships")
4346 TRACE(REQ, "enter");
4350 if (fMgr->DataSetSrcs()->size() > 0) {
4352 XrdProofdAux::GetUserInfo(XrdProofdProtocol::EUidAtStartup(), ui);
4353 std::list<XrdProofdDSInfo *>::iterator ii;
4354 for (ii = fMgr->DataSetSrcs()->begin(); ii != fMgr->DataSetSrcs()->end(); ++ii) {
4355 TRACE(ALL,
"Checking dataset source: url:"<<(*ii)->fUrl<<
", local:"
4356 <<(*ii)->fLocal<<
", rw:"<<(*ii)->fRW);
4357 if ((*ii)->fLocal && (*ii)->fRW) {
4359 XPDFORM(d,
"%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str());
4360 if (XrdProofdAux::AssertDir(d.c_str(), ui, fMgr->ChangeOwn()) == 0) {
4361 if (XrdProofdAux::ChangeMod(d.c_str(), 0777) == 0) {
4362 XPDFORM(d,
"%s/%s/%s", ((*ii)->fUrl).c_str(), p->Client()->UI().fGroup.c_str(),
4363 p->Client()->UI().fUser.c_str());
4364 if (XrdProofdAux::AssertDir(d.c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
4365 if (XrdProofdAux::ChangeMod(d.c_str(), 0755) != 0) {
4366 TRACE(XERR,
"problems setting permissions 0755 on: "<<d);
4369 TRACE(XERR,
"problems asserting: "<<d);
4372 TRACE(XERR,
"problems setting permissions 0777 on: "<<d);
4375 TRACE(XERR,
"problems asserting: "<<d);
4383 if (fMgr->DataDir() && strlen(fMgr->DataDir()) > 0 &&
4384 fMgr->DataDirOpts() && strlen(fMgr->DataDirOpts()) > 0 && ord && stag) {
4386 XrdProofdAux::GetUserInfo(XrdProofdProtocol::EUidAtStartup(), ui);
4387 XrdOucString dgr, dus[3];
4388 XPDFORM(dgr,
"%s/%s", fMgr->DataDir(), p->Client()->UI().fGroup.c_str());
4389 if (XrdProofdAux::AssertDir(dgr.c_str(), ui, fMgr->ChangeOwn()) == 0) {
4390 if (XrdProofdAux::ChangeMod(dgr.c_str(), 0777) == 0) {
4391 unsigned int mode = 0755;
4392 if (strchr(fMgr->DataDirOpts(),
'g')) mode = 0775;
4393 if (strchr(fMgr->DataDirOpts(),
'a') || strchr(fMgr->DataDirOpts(),
'o')) mode = 0777;
4394 XPDFORM(dus[0],
"%s/%s", dgr.c_str(), p->Client()->UI().fUser.c_str());
4395 XPDFORM(dus[1],
"%s/%s", dus[0].c_str(), ord);
4396 XPDFORM(dus[2],
"%s/%s", dus[1].c_str(), stag);
4397 for (
int i = 0; i < 3; i++) {
4398 if (XrdProofdAux::AssertDir(dus[i].c_str(), p->Client()->UI(), fMgr->ChangeOwn()) == 0) {
4399 if (XrdProofdAux::ChangeMod(dus[i].c_str(), mode) != 0) {
4400 std::ios_base::fmtflags oflags = std::cerr.flags();
4401 TRACE(XERR,
"problems setting permissions "<< oct << mode<<
" on: "<<dus[i]);
4402 std::cerr.flags(oflags);
4405 TRACE(XERR,
"problems asserting: "<<dus[i]);
4410 TRACE(XERR,
"problems setting permissions 0777 on: "<<dgr);
4413 TRACE(XERR,
"problems asserting: "<<dgr);
4417 if (fMgr->ChangeOwn()) {
4419 XrdOucString creds(p->Client()->Sandbox()->Dir());
4421 if (XrdProofdAux::ChangeOwn(creds.c_str(), p->Client()->UI()) != 0) {
4422 TRACE(XERR,
"can't change ownership of "<<creds);
4438 int XrdProofdProofServMgr::SetUserEnvironment(XrdProofdProtocol *p)
4440 XPDLOC(SMGR,
"ProofServMgr::SetUserEnvironment")
4442 TRACE(REQ, "enter");
4444 if (XrdProofdAux::ChangeToDir(p->Client()->Sandbox()->Dir(),
4445 p->Client()->UI(), fMgr->ChangeOwn()) != 0) {
4446 TRACE(XERR,
"couldn't change directory to "<< p->Client()->Sandbox()->Dir());
4452 len = 8 + strlen(p->Client()->Sandbox()->Dir());
4453 char *h =
new char[len];
4454 snprintf(h, len,
"HOME=%s", p->Client()->Sandbox()->Dir());
4456 TRACE(DBG,
"set "<<h);
4459 len = 8 + strlen(p->Client()->User());
4460 char *u =
new char[len];
4461 snprintf(u, len,
"USER=%s", p->Client()->User());
4463 TRACE(DBG,
"set "<<u);
4467 TRACE(DBG,
"setting ACLs");
4468 if (fMgr->ChangeOwn() && (int) geteuid() != p->Client()->UI().fUid) {
4470 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
4471 if (XpdBadPGuard(pGuard, p->Client()->UI().fUid)) {
4472 TRACE(XERR,
"could not get privileges");
4476 initgroups(p->Client()->UI().fUser.c_str(), p->Client()->UI().fGid);
4479 if (fMgr->ChangeOwn()) {
4481 TRACE(DBG,
"acquiring target user identity: "<<(uid_t)p->Client()->UI().fUid<<
4482 ", "<<(gid_t)p->Client()->UI().fGid);
4483 if (XrdSysPriv::ChangePerm((uid_t)p->Client()->UI().fUid,
4484 (gid_t)p->Client()->UI().fGid) != 0) {
4485 TRACE(XERR,
"can't acquire "<< p->Client()->UI().fUser <<
" identity");
4498 XrdProofdProofServ *XrdProofdProofServMgr::GetActiveSession(
int pid)
4500 XrdOucString key; key += pid;
4501 return fSessions.Find(key.c_str());
4507 static int BroadcastPriority(
const char *, XrdProofdProofServ *ps,
void *s)
4509 XPDLOC(SMGR,
"BroadcastPriority")
4511 XpdBroadcastPriority_t *bp = (XpdBroadcastPriority_t *)s;
4513 int nb = *(bp->fNBroadcast);
4517 if (ps->IsValid() && (ps->Status() == kXPD_running) &&
4518 !(ps->SrvType() == kXPD_Master)) {
4519 XrdProofGroup *g = (ps->Group() && bp->fGroupMgr)
4520 ? bp->fGroupMgr->GetGroup(ps->Group()) : 0;
4521 TRACE(DBG,
"group: "<< g<<
", client: "<<ps->Client());
4522 if (g && g->Active() > 0) {
4523 TRACE(DBG,
"priority: "<< g->Priority()<<
" active: "<<g->Active());
4524 int prio = (int) (g->Priority() * 100);
4525 ps->BroadcastPriority(prio);
4532 emsg =
"input entry undefined";
4536 TRACE(XERR,
"protocol error: "<<emsg);
4543 void XrdProofdProofServMgr::BroadcastClusterInfo()
4545 XPDLOC(SMGR,
"ProofServMgr::BroadcastClusterInfo")
4547 TRACE(REQ, "enter");
4549 int tot = 0, act = 0;
4550 std::list<XrdProofdProofServ *>::iterator si = fActiveSessions.begin();
4551 while (si != fActiveSessions.end()) {
4552 if ((*si)->SrvType() != kXPD_Worker) {
4554 if ((*si)->Status() == kXPD_running) act++;
4559 XPDPRT(
"tot: "<<tot<<
", act: "<<act);
4561 si = fActiveSessions.begin();
4562 while (si != fActiveSessions.end()) {
4563 if ((*si)->Status() == kXPD_running &&
4564 (*si)->SrvType() != kXPD_Worker) (*si)->SendClusterInfo(tot, act);
4568 TRACE(DBG,
"No master or submaster controlled by this manager");
4576 int XrdProofdProofServMgr::BroadcastPriorities()
4578 XPDLOC(SMGR,
"ProofServMgr::BroadcastPriorities")
4580 TRACE(REQ, "enter");
4583 XpdBroadcastPriority_t bp = { (fMgr ? fMgr->GroupsMgr() : 0), &nb };
4584 fSessions.Apply(BroadcastPriority, (
void *)&bp);
4594 bool XrdProofdProofServMgr::IsReconnecting()
4597 if (fReconnectTime >= 0) {
4598 rect = time(0) - fReconnectTime;
4599 if (rect < fReconnectTimeOut)
4610 void XrdProofdProofServMgr::SetReconnectTime(
bool on)
4612 XrdSysMutexHelper mhp(fMutex);
4615 fReconnectTime = time(0);
4617 fReconnectTime = -1;
4625 bool XrdProofdProofServMgr::Alive(XrdProofdProtocol* p)
4627 XrdSysMutexHelper mhp(fMutex);
4631 std::map<XrdProofdProtocol*,int>::iterator iter = fDestroyTimes.begin();
4632 while (iter != fDestroyTimes.end()) {
4633 int rect = now - iter->second;
4634 if (rect < fReconnectTimeOut) {
4635 if (p == iter->first) alive =
false;
4638 iter = fDestroyTimes.erase(iter);
4648 static int FreeClientID(
const char *, XrdProofdProofServ *ps,
void *s)
4650 XPDLOC(SMGR,
"FreeClientID")
4652 int pid = *((
int *)s);
4655 ps->FreeClientID(pid);
4661 TRACE(XERR,
"protocol error: undefined session!");
4669 void XrdProofdProofServMgr::DisconnectFromProofServ(
int pid)
4671 XrdSysMutexHelper mhp(fMutex);
4673 fSessions.Apply(FreeClientID, (
void *)&pid);
4679 static int CountTopMasters(
const char *, XrdProofdProofServ *ps,
void *s)
4681 XPDLOC(SMGR,
"CountTopMasters")
4683 int *ntm = (
int *)s;
4687 if (ps->SrvType() == kXPD_TopMaster) (*ntm)++;
4691 emsg =
"input entry undefined";
4695 TRACE(XERR,
"protocol error: "<<emsg);
4702 int XrdProofdProofServMgr::CurrentSessions(
bool recalculate)
4704 XPDLOC(SMGR,
"ProofServMgr::CurrentSessions")
4706 TRACE(REQ, "enter");
4708 XrdSysMutexHelper mhp(fMutex);
4710 fCurrentSessions = 0;
4711 fSessions.Apply(CountTopMasters, (
void *)&fCurrentSessions);
4715 return fCurrentSessions;
4722 void XrdProofdProofServMgr::ResolveKeywords(XrdOucString &s, ProofServEnv_t *in)
4727 if (in->fPS->SrvType() == kXPD_Worker) isWorker = 1;
4730 if (!isWorker && s.find(
"<logfilemst>") != STR_NPOS) {
4731 XrdOucString lfr(in->fLogFile);
4732 if (lfr.endswith(
".log")) lfr.erase(lfr.rfind(
".log"));
4733 s.replace(
"<logfilemst>", lfr);
4734 }
else if (isWorker && s.find(
"<logfilewrk>") != STR_NPOS) {
4735 XrdOucString lfr(in->fLogFile);
4736 if (lfr.endswith(
".log")) lfr.erase(lfr.rfind(
".log"));
4737 s.replace(
"<logfilewrk>", lfr);
4741 if (getenv(
"USER") && s.find(
"<user>") != STR_NPOS) {
4742 XrdOucString usr(getenv(
"USER"));
4743 s.replace(
"<user>", usr);
4747 if (getenv(
"ROOTSYS") && s.find(
"<rootsys>") != STR_NPOS) {
4748 XrdOucString rootsys(getenv(
"ROOTSYS"));
4749 s.replace(
"<rootsys>", rootsys);
4760 XrdProofSessionInfo::XrdProofSessionInfo(XrdProofdClient *c, XrdProofdProofServ *s)
4765 fUser = c ? c->User() :
"";
4766 fGroup = c ? c->Group() :
"";
4769 fPid = s ? s->SrvPID() : -1;
4770 fID = s ? s->ID() : -1;
4771 fSrvType = s ? s->SrvType() : -1;
4772 fPLiteNWrks = s ? s->PLiteNWrks() : -1;
4773 fStatus = s ? s->Status() : kXPD_unknown;
4774 fOrdinal = s ? s->Ordinal() :
"";
4775 fTag = s ? s->Tag() :
"";
4776 fAlias = s ? s->Alias() :
"";
4777 fLogFile = s ? s->Fileout() :
"";
4778 fROOTTag = (s && s->ROOT())? s->ROOT()->Tag() :
"";
4779 fSrvProtVers = (s && s->ROOT()) ? s->ROOT()->SrvProtVers() : -1;
4780 fUserEnvs = s ? s->UserEnvs() :
"";
4781 fAdminPath = s ? s->AdminPath() :
"";
4782 fUnixPath = s ? s->UNIXSockPath() :
"";
4788 void XrdProofSessionInfo::FillProofServ(XrdProofdProofServ &s, XrdROOTMgr *rmgr)
4790 XPDLOC(SMGR,
"SessionInfo::FillProofServ")
4792 s.SetClient(fUser.c_str());
4793 s.SetGroup(fGroup.c_str());
4798 s.SetSrvType(fSrvType);
4799 s.SetPLiteNWrks(fPLiteNWrks);
4800 s.SetStatus(fStatus);
4801 s.SetOrdinal(fOrdinal.c_str());
4802 s.SetTag(fTag.c_str());
4803 s.SetAlias(fAlias.c_str());
4804 s.SetFileout(fLogFile.c_str());
4806 if (rmgr->GetVersion(fROOTTag.c_str())) {
4807 s.SetROOT(rmgr->GetVersion(fROOTTag.c_str()));
4809 TRACE(ALL,
"ROOT version '"<< fROOTTag <<
4810 "' not availabe anymore: setting the default");
4811 s.SetROOT(rmgr->DefaultVersion());
4814 s.SetUserEnvs(fUserEnvs.c_str());
4815 s.SetAdminPath(fAdminPath.c_str(), 0, 0);
4816 s.SetUNIXSockPath(fUnixPath.c_str());
4822 int XrdProofSessionInfo::SaveToFile(
const char *file)
4824 XPDLOC(SMGR,
"SessionInfo::SaveToFile")
4827 if (!file || strlen(file) <= 0) {
4828 TRACE(XERR,
"invalid input: "<< (file ? file :
"<nul>"));
4831 TRACE(HDBG,
"session saved to file: "<<file);
4834 FILE *fpid = fopen(file,
"w");
4836 fprintf(fpid,
"%s %s\n", fUser.c_str(), fGroup.c_str());
4837 fprintf(fpid,
"%s\n", fUnixPath.c_str());
4838 fprintf(fpid,
"%d %d %d %d\n", fPid, fID, fSrvType, fPLiteNWrks);
4839 fprintf(fpid,
"%s %s %s\n", fOrdinal.c_str(), fTag.c_str(), fAlias.c_str());
4840 fprintf(fpid,
"%s\n", fLogFile.c_str());
4841 fprintf(fpid,
"%d %s\n", fSrvProtVers, fROOTTag.c_str());
4842 if (fUserEnvs.length() > 0)
4843 fprintf(fpid,
"\n%s", fUserEnvs.c_str());
4848 if (chmod(file, 0666) != 0) {
4849 TRACE(XERR,
"could not change mode to 0666 on file "<<
4850 file<<
"; error: "<<errno);
4856 TRACE(XERR,
"session pid file cannot be (re-)created: "<<
4857 file<<
"; error: "<<errno);
4864 void XrdProofSessionInfo::Reset()
4872 fStatus = kXPD_unknown;
4888 int XrdProofSessionInfo::ReadFromFile(
const char *file)
4890 XPDLOC(SMGR,
"SessionInfo::ReadFromFile")
4895 if (!file || strlen(file) <= 0) {
4896 TRACE(XERR,
"invalid input: "<<(file ? file :
"<nul>"));
4901 FILE *fpid = fopen(file,
"r");
4904 XrdOucString sline, t;
4906 if (fgets(line,
sizeof(line), fpid)) {
4907 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] =
'\0';
4909 if ((from = sline.tokenize(fUser, from,
' ')) == -1)
4910 TRACE(XERR,
"warning: fUser: corrupted line? "<<line<<
" (file: "<<file<<
")");
4911 if ((from = sline.tokenize(fGroup, from,
' ')) == -1)
4912 TRACE(XERR,
"warning: fGroup: corrupted line? "<<line<<
" (file: "<<file<<
")");
4914 if (fgets(line,
sizeof(line), fpid)) {
4915 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] =
'\0';
4918 if (fgets(line,
sizeof(line), fpid)) {
4919 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] =
'\0';
4922 if ((from = sline.tokenize(t, from,
' ')) == -1)
4923 TRACE(XERR,
"warning: fPid: corrupted line? "<<line<<
" (file: "<<file<<
")");
4925 if ((from = sline.tokenize(t, from,
' ')) == -1)
4926 TRACE(XERR,
"warning: fID: corrupted line? "<<line<<
" (file: "<<file<<
")");
4928 if ((from = sline.tokenize(t, from,
' ')) == -1)
4929 TRACE(XERR,
"warning: fSrvType: corrupted line? "<<line<<
" (file: "<<file<<
")");
4930 fSrvType = t.atoi();
4932 if (fgets(line,
sizeof(line), fpid)) {
4933 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] =
'\0';
4936 if ((from = sline.tokenize(fOrdinal, from,
' ')) == -1)
4937 TRACE(XERR,
"warning: fOrdinal: corrupted line? "<<line<<
" (file: "<<file<<
")");
4938 if ((from = sline.tokenize(fTag, from,
' ')) == -1)
4939 TRACE(XERR,
"warning: fTag: corrupted line? "<<line<<
" (file: "<<file<<
")");
4940 if ((from = sline.tokenize(fAlias, from,
' ')) == -1)
4941 TRACE(HDBG,
"fAlias undefined "<<line);
4943 if (fgets(line,
sizeof(line), fpid)) {
4944 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] =
'\0';
4947 if (fgets(line,
sizeof(line), fpid)) {
4948 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] =
'\0';
4951 if ((from = sline.tokenize(t, from,
' ')) == -1)
4952 TRACE(XERR,
"warning: fSrvProtVers: corrupted line? "<<line<<
" (file: "<<file<<
")");
4953 fSrvProtVers = t.atoi();
4954 if ((from = sline.tokenize(fROOTTag, from,
' ')) == -1)
4955 TRACE(XERR,
"warning: fROOTTag: corrupted line? "<<line<<
" (file: "<<file<<
")");
4959 off_t lnow = lseek(fileno(fpid), (off_t) 0, SEEK_CUR);
4960 off_t ltot = lseek(fileno(fpid), (off_t) 0, SEEK_END);
4961 int left = (int)(ltot - lnow);
4964 int wanted = (left > 4095) ? 4095 : left;
4965 while ((len = read(fileno(fpid), line, wanted)) < 0 &&
4968 if (len < 0 || len < wanted) {
4976 }
while (len > 0 && left > 0);
4983 if (!stat(file, &st))
4984 fLastAccess = st.st_atime;
4986 TRACE(XERR,
"session file cannot be open: "<< file<<
"; error: "<<errno);
4991 XrdOucString fs(file);
4993 fpid = fopen(fs.c_str(),
"r");
4996 if (fgets(line,
sizeof(line), fpid)) {
4997 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] = 0;
4998 fStatus = atoi(line);
5003 TRACE(DBG,
"no session status file for: "<< fs<<
"; session was probably terminated");
5016 int XpdEnv::Matches(
const char *usr,
const char *grp,
int ver)
5018 XPDLOC(SMGR,
"XpdEnv::Matches")
5022 if (fUsers.length() > 0) {
5023 XrdOucString u(usr);
5024 if ((nmtc = u.matches(fUsers.c_str())) == 0)
return -1;
5031 if (fGroups.length() > 0) {
5032 XrdOucString g(grp);
5033 if ((nmtcg = g.matches(fGroups.c_str())) == 0)
return -1;
5035 nmtcg = strlen(grp);
5039 TRACE(HDBG, fEnv <<
", u:"<<usr<<
", g:"<<grp<<
" --> nmtc: "<<nmtc);
5042 TRACE(HDBG, fEnv <<
", ver:"<<ver);
5043 if (fVerMin > 0 && ver < fVerMin)
return -1;
5044 if (fVerMax > 0 && ver > fVerMax)
return -1;
5054 int XpdEnv::ToVersCode(
int ver,
bool hex)
5056 int maj = -1, min = -1, ptc = -1, xv = ver;
5061 ptc = xv - min * 256;
5066 ptc = xv - min * 100;
5069 int vc = (maj << 16) + (min << 8) + ptc;
5076 void XpdEnv::Print(
const char *what)
5080 XrdOucString vmi("-1"), vmx("-1");
5082 int maj = (fVerMin >> 16);
5083 int min = ((fVerMin - maj * 65536) >> 8);
5084 int ptc = fVerMin - maj * 65536 - min * 256;
5085 XPDFORM(vmi,
"%d%d%d", maj, min, ptc);
5088 int maj = (fVerMax >> 16);
5089 int min = ((fVerMax - maj * 65536) >> 8);
5090 int ptc = fVerMax - maj * 65536 - min * 256;
5091 XPDFORM(vmx,
"%d%d%d", maj, min, ptc);
5093 XrdOucString u(
"allusers"), g(
"allgroups");
5094 if (fUsers.length() > 0) u = fUsers;
5095 if (fGroups.length() > 0) u = fGroups;
5097 TRACE(ALL,
"'"<<fEnv<<
"' {"<<u<<
"|"<<g<<
5098 "} svn:["<<fSvnMin<<
","<<fSvnMax<<
"] vers:["<<vmi<<
","<<vmx<<
"]");