24 #include "XrdNet/XrdNet.hh"
37 XrdProofdClient::XrdProofdClient(XrdProofUI ui,
bool master,
bool changeown,
38 XrdSysError *,
const char *adminpath,
int rtime)
39 : fSandbox(ui, master, changeown)
41 XPDLOC(CMGR,
"Client::Client")
49 fChangeOwn = changeown;
50 fReconnectTimeOut = rtime;
53 XPDFORM(fAdminPath, "%s/%s.%s", adminpath, ui.fUser.c_str(), ui.fGroup.c_str());
55 if (stat(adminpath, &st) != 0) {
56 TRACE(XERR,
"problems stating admin path "<<adminpath<<
"; errno = "<<errno);
60 XrdProofdAux::GetUserInfo(st.st_uid, effui);
61 if (XrdProofdAux::AssertDir(fAdminPath.c_str(), effui, 1) != 0)
65 if (fSandbox.IsValid()) fIsValid = 1;
71 XrdProofdClient::~XrdProofdClient()
78 bool XrdProofdClient::Match(
const char *usr,
const char *grp)
80 if (!fIsValid)
return 0;
82 bool rc = (usr && !strcmp(usr, User())) ? 1 : 0;
83 if (rc && grp && strlen(grp) > 0)
84 rc = (grp && Group() && !strcmp(grp, Group())) ? 1 : 0;
93 int XrdProofdClient::GetClientID(XrdProofdProtocol *p)
95 XPDLOC(CMGR,
"Client::GetClientID")
99 { XrdSysMutexHelper mh(fMutex);
100 if (!fIsValid)
return -1;
102 for (ic = 0; ic < (int)fClients.size() ; ic++) {
103 if (fClients[ic] && !fClients[ic]->IsValid()) {
104 int rtime = fClients[ic]->ResetTime();
105 if ((rtime >= 0) && ((time(0) - rtime) < fReconnectTimeOut)) {
117 if (ic >= (
int)fClients.capacity())
118 fClients.reserve(2*fClients.capacity());
121 cid =
new XrdClientID();
122 fClients.push_back(cid);
123 sz = fClients.size();
131 memcpy((
void *)&sid, (
const void *)&(p->Request()->header.streamid[0]), 2);
135 TRACE(DBG,
"size = "<<sz<<
", ic = "<<ic);
145 int XrdProofdClient::ReserveClientID(
int cid)
147 XPDLOC(CMGR,
"Client::ReserveClientID")
152 int sz = 0, newsz = 0;
153 { XrdSysMutexHelper mh(fMutex);
154 if (!fIsValid)
return -1;
155 if (cid >= (
int)fClients.size()) {
158 newsz = fClients.capacity();
159 if (cid >= (
int)fClients.capacity()) {
160 newsz = 2 * fClients.capacity();
161 newsz = (cid < newsz) ? newsz : cid + 1;
162 fClients.reserve(newsz);
166 while (cid >= (
int)fClients.size())
167 fClients.push_back(
new XrdClientID());
169 sz = fClients.size();
172 TRACE(DBG,
"cid = "<<cid<<
", size = "<<sz<<
", capacity = "<<newsz);
182 XrdProofdProofServ *XrdProofdClient::GetFreeServObj()
184 XPDLOC(CMGR,
"Client::GetFreeServObj")
186 int ic = 0, newsz = 0, sz = 0;
187 XrdProofdProofServ *xps = 0;
189 { XrdSysMutexHelper mh(fMutex);
190 if (!fIsValid)
return xps;
193 for (ic = 0; ic < (int)fProofServs.size() ; ic++) {
194 if (fProofServs[ic] && !(fProofServs[ic]->IsValid())) {
195 fProofServs[ic]->SetValid();
201 if (ic >= (
int)fProofServs.capacity()) {
202 newsz = 2 * fProofServs.capacity();
203 fProofServs.reserve(newsz);
205 if (ic >= (
int)fProofServs.size()) {
207 fProofServs.push_back(
new XrdProofdProofServ());
209 sz = fProofServs.size();
211 xps = fProofServs[ic];
219 XPDFORM(msg,
"new capacity = %d, size = %d, ic = %d, xps = %p",
222 XPDFORM(msg,
"size = %d, ic = %d, xps = %p", sz, ic, xps);
234 XrdProofdProofServ *XrdProofdClient::GetServObj(
int id)
236 XPDLOC(CMGR,
"Client::GetServObj")
238 TRACE(DBG, "
id: "<<
id);
241 TRACE(XERR,
"invalid input: id: "<<
id);
242 return (XrdProofdProofServ *)0;
245 XrdOucString dmsg, emsg;
246 XrdProofdProofServ *xps = 0;
247 int siz = 0, cap = 0;
248 { XrdSysMutexHelper mh(fMutex);
249 if (!fIsValid)
return xps;
250 siz = fProofServs.size();
251 cap = fProofServs.capacity();
253 TRACE(DBG,
"size = "<<siz<<
"; capacity = "<<cap);
255 { XrdSysMutexHelper mh(fMutex);
256 if (!fIsValid)
return xps;
257 if (
id < (
int)fProofServs.size()) {
258 if (!(xps = fProofServs[
id])) {
259 emsg =
"instance in use or undefined! protocol error";
263 if (
id >= (
int)fProofServs.capacity()) {
264 int newsz = 2 * fProofServs.capacity();
265 newsz = (
id < newsz) ? newsz :
id+1;
266 fProofServs.reserve(newsz);
267 cap = fProofServs.capacity();
269 int nnew =
id - fProofServs.size() + 1;
271 fProofServs.push_back(
new XrdProofdProofServ());
272 xps = fProofServs[id];
280 { XrdSysMutexHelper mh(fMutex);
282 siz = fProofServs.size();
283 cap = fProofServs.capacity();
286 TRACE(DBG,
"size = "<<siz<<
" (capacity = "<<cap<<
"); id = "<<
id);
296 XrdProofdProofServ *XrdProofdClient::GetServer(XrdProofdProtocol *p)
298 XPDLOC(CMGR,
"Client::GetServer")
300 TRACE(DBG, "enter: p: " << p);
303 XrdProofdProofServ *xps = 0;
304 std::vector<XrdProofdProofServ *>::iterator ip;
305 XrdSysMutexHelper mh(fMutex);
306 if (!fIsValid) return xps;
307 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
309 if (xps && xps->SrvPID() == p->Pid())
320 XrdProofdProofServ *XrdProofdClient::GetServer(
int psid)
322 XrdSysMutexHelper mh(fMutex);
323 if (fIsValid && psid > -1 && psid < (
int) fProofServs.size())
324 return fProofServs.at(psid);
326 return (XrdProofdProofServ *)0;
332 void XrdProofdClient::EraseServer(
int psid)
334 XPDLOC(CMGR,
"Client::EraseServer")
336 TRACE(DBG, "enter: psid: " << psid);
338 XrdProofdProofServ *xps = 0;
339 std::vector<XrdProofdProofServ *>::iterator ip;
340 XrdSysMutexHelper mh(fMutex);
341 if (!fIsValid) return;
343 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
345 if (xps && xps->Match(psid)) {
356 int XrdProofdClient::GetTopServers()
358 XPDLOC(CMGR,
"Client::GetTopServers")
362 XrdProofdProofServ *xps = 0;
363 std::vector<XrdProofdProofServ *>::iterator ip;
364 XrdSysMutexHelper mh(fMutex);
365 if (!fIsValid) return nv;
366 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
367 if ((xps = *ip) && xps->IsValid() && (xps->SrvType() == kXPD_TopMaster)) {
368 TRACE(DBG,
"found potentially valid topmaster session: pid "<<xps->SrvPID());
380 int XrdProofdClient::ResetClientSlot(
int ic)
382 XPDLOC(CMGR,
"Client::ResetClientSlot")
384 TRACE(DBG, "enter: ic: " << ic);
386 XrdSysMutexHelper mh(fMutex);
388 if (ic >= 0 && ic < (
int) fClients.size()) {
389 fClients[ic]->Reset();
400 XrdProofdProtocol *XrdProofdClient::GetProtocol(
int ic)
402 XPDLOC(CMGR,
"Client::GetProtocol")
404 TRACE(DBG, "enter: ic: " << ic);
406 XrdProofdProtocol *p = 0;
408 XrdSysMutexHelper mh(fMutex);
410 if (ic >= 0 && ic < (
int) fClients.size()) {
411 p = fClients[ic]->P();
421 int XrdProofdClient::SetClientID(
int cid, XrdProofdProtocol *p)
423 XPDLOC(CMGR,
"Client::SetClientID")
425 TRACE(DBG, "cid: "<< cid <<", p: " << p);
427 XrdSysMutexHelper mh(fMutex);
428 if (!fIsValid) return -1;
430 if (cid >= 0 && cid < (
int) fClients.size()) {
431 if (fClients[cid] && (fClients[cid]->P() != p))
432 fClients[cid]->Reset();
433 fClients[cid]->SetP(p);
436 memcpy((
void *)&sid, (
const void *)&(p->Request()->header.streamid[0]), 2);
437 fClients[cid]->SetSid(sid);
448 void XrdProofdClient::Broadcast(
const char *msg)
450 XPDLOC(CMGR,
"Client::Broadcast")
453 if (msg && (len = strlen(msg)) > 0) {
457 XrdClientID *cid = 0;
458 XrdSysMutexHelper mh(fMutex);
459 for (ic = 0; ic < (int) fClients.size(); ic++) {
460 if ((cid = fClients.at(ic)) && cid->P() && cid->P()->ConnType() == kXPD_ClientMaster) {
462 if (cid->P()->Link()) {
463 TRACE(ALL,
" sending to: "<<cid->P()->Link()->ID);
464 XrdProofdResponse *response = cid->R();
466 response->Send(kXR_attn, kXPD_srvmsg, (
char *) msg, len);
482 int XrdProofdClient::Touch(
bool reset)
491 if (fAskedToTouch)
return 1;
495 XrdClientID *cid = 0;
496 XrdSysMutexHelper mh(fMutex);
497 for (ic = 0; ic < (int) fClients.size(); ic++) {
499 if ((cid = fClients.at(ic)) && cid->P() && cid->P()->ProofProtocol() > 17) {
500 if (cid->P()->ConnType() != kXPD_Internal) {
501 XrdProofdResponse *response = cid->R();
502 if (response) response->Send(kXR_attn, kXPD_touch, (
char *)0, 0);
517 bool XrdProofdClient::VerifySession(XrdProofdProofServ *xps, XrdProofdResponse *r)
519 XPDLOC(CMGR,
"Client::VerifySession")
521 if (!xps || !(xps->IsValid())) {
522 TRACE(XERR,
" session undefined or invalid");
527 XrdOucString path(xps->AdminPath());
528 if (path.length() <= 0) {
529 TRACE(XERR,
"admin path is empty! - protocol error");
536 if (stat(path.c_str(), &st0) != 0) {
537 TRACE(XERR,
"cannot stat admin path: "<<path);
541 if (now >= st0.st_mtime && (now - st0.st_mtime) <= 1)
return 1;
542 TRACE(ALL,
"admin path: "<<path<<
", mtime: "<< st0.st_mtime <<
", now: "<< now);
545 int pid = xps->SrvPID();
547 if (XrdProofdAux::VerifyProcessByID(pid) != 0) {
549 if (xps->VerifyProofServ(0) != 0) {
550 TRACE(XERR,
"could not send verify request to proofsrv");
559 if (stat(path.c_str(), &st1) == 0) {
560 if (st1.st_mtime > st0.st_mtime) {
565 TRACE(HDBG,
"waiting "<<ns<<
" secs for session "<<pid<<
566 " to touch the admin path");
568 XPDFORM(notmsg,
"verifying existing sessions, %d seconds ...", ns);
569 r->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) notmsg.c_str(), notmsg.length());
585 void XrdProofdClient::SkipSessionsCheck(std::list<XrdProofdProofServ *> *active,
586 XrdOucString &emsg, XrdProofdResponse *r)
588 XPDLOC(CMGR,
"Client::SkipSessionsCheck")
590 XrdProofdProofServ *xps = 0;
591 std::vector<XrdProofdProofServ *>::iterator ip;
592 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
593 if ((xps = *ip) && xps->IsValid() && (xps->SrvType() == kXPD_TopMaster)) {
594 if (VerifySession(xps, r)) {
596 if (active) active->push_back(xps);
598 if (xps->SrvPID() > 0) {
599 if (emsg.length() <= 0)
600 emsg =
"ignoring (apparently) non-responding session(s): ";
603 emsg += xps->SrvPID();
605 TRACE(ALL,
"session "<<xps->SrvPID()<<
" does not react: dead?");
610 TRACE(HDBG,
"found: " << active->size() <<
" sessions");
619 XrdOucString XrdProofdClient::ExportSessions(XrdOucString &emsg,
620 XrdProofdResponse *r)
622 XrdOucString out, buf;
625 std::list<XrdProofdProofServ *> active;
626 SkipSessionsCheck(&active, emsg, r);
629 XrdProofdProofServ *xps = 0;
630 out += (int) active.size();
631 std::list<XrdProofdProofServ *>::iterator ia;
632 for (ia = active.begin(); ia != active.end(); ++ia) {
633 if ((xps = *ia) && xps->IsValid()) {
646 void XrdProofdClient::TerminateSessions(
int srvtype, XrdProofdProofServ *ref,
647 const char *msg, XrdProofdPipe *pipe,
650 XPDLOC(CMGR,
"Client::TerminateSessions")
654 XrdProofdProofServ *s = 0;
655 for (is = 0; is < (
int) fProofServs.size(); is++) {
656 if ((s = fProofServs.at(is)) && s->IsValid() && (!ref || ref == s) &&
657 (s->SrvType() == srvtype || (srvtype == kXPD_AnyServer))) {
658 TRACE(DBG,
"terminating " << s->SrvPID());
660 if (srvtype == kXPD_TopMaster && msg && strlen(msg) > 0)
665 s->TerminateProofServ(changeown);
668 XrdOucString tag =
"-";
670 if (fSandbox.GuessTag(tag, 1) == 0)
671 fSandbox.RemoveSession(tag.c_str());
676 XrdOucString buf(s->AdminPath());
677 buf.erase(0, buf.rfind(
'/') + 1);
678 TRACE(DBG,
"posting kSessionRemoval with: '"<<buf<<
"'");
679 if ((rc = pipe->Post(XrdProofdProofServMgr::kSessionRemoval, buf.c_str())) != 0) {
680 TRACE(XERR,
"problem posting the pipe; errno: "<<-rc);
693 void XrdProofdClient::ResetSessions()
697 XrdSysMutexHelper mh(fMutex);
698 std::vector<XrdProofdProofServ *>::iterator ip;
699 for (ip = fProofServs.begin(); ip != fProofServs.end(); ++ip) {
701 if (*ip) (*ip)->Reset();