13 #include "XrdNet/XrdNet.hh"
27 XrdProofdProofServ::XrdProofdProofServ()
29 fMutex =
new XrdSysRecMutex;
37 fSrvType = kXPD_AnyServer;
47 fSetIdleTime = time(0);
66 XrdProofdProofServ::~XrdProofdProofServ()
71 std::vector<XrdClientID *>::iterator i;
72 for (i = fClients.begin(); i != fClients.end(); ++i)
84 unlink(fUNIXSockPath.c_str());
92 static int DecreaseWorkerCounters(
const char *, XrdProofWorker *w,
void *x)
94 XPDLOC(PMGR,
"DecreaseWorkerCounters")
96 XrdProofdProofServ *xps = (XrdProofdProofServ *)x;
99 w->RemoveProofServ(xps);
100 TRACE(REQ, w->fHost.c_str() <<
" done");
112 static int DumpWorkerCounters(
const char *k, XrdProofWorker *w,
void *)
114 XPDLOC(PMGR,
"DumpWorkerCounters")
117 TRACE(ALL, k <<
" : "<<w->fHost.c_str()<<
":"<<w->fPort <<
" act: "<<w->Active());
129 void XrdProofdProofServ::ClearWorkers()
131 XrdSysMutexHelper mhp(fMutex);
134 fWorkers.Apply(DecreaseWorkerCounters,
this);
141 void XrdProofdProofServ::AddWorker(
const char *o, XrdProofWorker *w)
143 if (!o || !w)
return;
145 XrdSysMutexHelper mhp(fMutex);
147 fWorkers.Add(o, w, 0, Hash_keepdata);
153 void XrdProofdProofServ::RemoveWorker(
const char *o)
155 XPDLOC(SMGR,
"ProofServ::RemoveWorker")
159 TRACE(DBG,"removing: "<<o);
161 XrdSysMutexHelper mhp(fMutex);
163 XrdProofWorker *w = fWorkers.Find(o);
164 if (w) w->RemoveProofServ(this);
166 if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
173 int XrdProofdProofServ::Reset(const
char *msg,
int type)
175 XPDLOC(SMGR,
"ProofServ::Reset")
181 XPDFORM(fn, "%s.status", fAdminPath.c_str());
182 FILE *fpid = fopen(fn.c_str(), "r");
185 if (fgets(line,
sizeof(line), fpid)) {
186 if (line[strlen(line)-1] ==
'\n') line[strlen(line)-1] = 0;
189 TRACE(XERR,
"problems reading from file "<<fn);
193 TRACE(DBG,
"file: "<<fn<<
", st:"<<st);
194 XrdSysMutexHelper mhp(fMutex);
197 Broadcast(
"idle-timeout", type);
199 Broadcast(msg, type);
202 if (fSrvType == kXPD_TopMaster) rc = 1;
212 void XrdProofdProofServ::Reset()
214 XrdSysMutexHelper mhp(fMutex);
229 fDisconnectTime = -1;
235 fSrvType = kXPD_AnyServer;
255 void XrdProofdProofServ::DeleteUNIXSock()
258 unlink(fUNIXSockPath.c_str());
265 bool XrdProofdProofServ::SkipCheck()
267 XrdSysMutexHelper mhp(fMutex);
269 bool rc = fSkipCheck;
277 XrdClientID *XrdProofdProofServ::GetClientID(
int cid)
279 XPDLOC(SMGR,
"ProofServ::GetClientID")
281 XrdClientID *csid = 0;
284 TRACE(XERR,
"negative ID: protocol error!");
289 { XrdSysMutexHelper mhp(fMutex);
296 if (cid < (
int)fClients.size()) {
297 csid = fClients.at(cid);
302 XPDFORM(msg,
"cid: %d, size: %d", cid, fClients.size());
308 if (cid >= (
int)fClients.capacity())
309 fClients.reserve(2*fClients.capacity());
312 int ic = (int)fClients.size();
313 for (; ic <= cid; ic++)
314 fClients.push_back((csid =
new XrdClientID()));
318 XPDFORM(msg,
"cid: %d, new size: %d", cid, fClients.size());
331 int XrdProofdProofServ::FreeClientID(
int pid)
333 XPDLOC(SMGR,
"ProofServ::FreeClientID")
335 TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
336 fStatus<<",
# clients: "<< fNClients);
339 TRACE(XERR,
"undefined pid!");
342 if (!IsValid())
return rc;
344 { XrdSysMutexHelper mhp(fMutex);
347 std::vector<XrdClientID *>::iterator i;
348 for (i = fClients.begin(); i != fClients.end(); ++i) {
349 if ((*i) && (*i)->P()) {
350 if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
351 if (fProtocol == (*i)->P()) {
356 if (fParent == (*i)) SetParent(0);
360 fDisconnectTime = time(0);
367 if (TRACING(REQ) && (rc == 0)) {
369 TRACE(REQ, spid<<
": slot for client pid: "<<pid<<
" has been reset");
380 int XrdProofdProofServ::GetNClients(
bool check)
382 XrdSysMutexHelper mhp(fMutex);
387 std::vector<XrdClientID *>::iterator i;
388 for (i = fClients.begin(); i != fClients.end(); ++i) {
389 if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
401 int XrdProofdProofServ::DisconnectTime()
403 XrdSysMutexHelper mhp(fMutex);
406 if (fDisconnectTime > 0)
407 disct = time(0) - fDisconnectTime;
408 return ((disct > 0) ? disct : -1);
415 int XrdProofdProofServ::IdleTime()
417 XrdSysMutexHelper mhp(fMutex);
420 if (fStatus == kXPD_idle)
421 idlet = time(0) - fSetIdleTime;
422 return ((idlet > 0) ? idlet : -1);
429 void XrdProofdProofServ::SetIdle()
431 XrdSysMutexHelper mhp(fMutex);
434 fSetIdleTime = time(0);
441 void XrdProofdProofServ::SetRunning()
443 XrdSysMutexHelper mhp(fMutex);
445 fStatus = kXPD_running;
452 void XrdProofdProofServ::Broadcast(
const char *msg,
int type)
454 XPDLOC(SMGR,
"ProofServ::Broadcast")
457 int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
461 if (msg && (len = strlen(msg)) > 0) {
462 XrdProofdProtocol *p = 0;
463 int ic = 0, ncz = 0, sid = -1;
464 { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
465 for (ic = 0; ic < ncz; ic++) {
466 { XrdSysMutexHelper mhp(fMutex);
467 p = fClients.at(ic)->P();
468 sid = fClients.at(ic)->Sid(); }
470 if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
471 XrdProofdResponse *response = p->Response(sid);
473 response->Send(kXR_attn, (XProofActionCode)type, (
void *)msg, len);
476 XPDFORM(m,
"response instance for sid: %d not found", sid);
485 XPDFORM(m,
"type: %d, message: '%s' notified to %d clients", type, msg, nc);
498 int XrdProofdProofServ::TerminateProofServ(
bool changeown)
500 XPDLOC(SMGR,
"ProofServ::TerminateProofServ")
503 TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
508 XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
509 if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
510 TRACE(XERR,
"ord: problems signalling process: "<<fSrvPID);
512 XrdSysMutexHelper mhp(fMutex);
529 int XrdProofdProofServ::VerifyProofServ(
bool forward)
531 XPDLOC(SMGR,
"ProofServ::VerifyProofServ")
533 TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
539 int len = sizeof(kXR_int32);
540 char *buf = new
char[len];
542 kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
543 ifw = static_cast<kXR_int32>(htonl(ifw));
544 memcpy(buf, &ifw, sizeof(kXR_int32));
546 { XrdSysMutexHelper mhp(fMutex);
548 if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
549 msg =
"could not propagate ping to proofsrv";
568 int XrdProofdProofServ::BroadcastPriority(
int priority)
570 XPDLOC(SMGR,
"ProofServ::BroadcastPriority")
572 XrdSysMutexHelper mhp(fMutex);
575 int len = sizeof(kXR_int32);
576 char *buf = new
char[len];
577 kXR_int32 itmp = priority;
578 itmp = static_cast<kXR_int32>(htonl(itmp));
579 memcpy(buf, &itmp, sizeof(kXR_int32));
581 if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
583 TRACE(XERR,
"problems telling proofserv");
588 TRACE(DBG,
"priority "<<priority<<
" sent over");
596 int XrdProofdProofServ::SendData(
int cid,
void *buff,
int len)
598 XPDLOC(SMGR,
"ProofServ::SendData")
600 TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
606 XrdClientID *csid = 0;
607 { XrdSysMutexHelper mhp(fMutex);
608 if (cid < 0 || cid > (
int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
609 XPDFORM(msg,
"client ID not found (cid: %d, size: %d)", cid, fClients.size());
612 if (!rs && !(csid->R())) {
613 XPDFORM(msg,
"client not connected: csid: %p, cid: %d, fSid: %d",
614 csid, cid, csid->Sid());
623 XrdProofdResponse *response = csid->R() ? csid->R() : 0;
625 if (!response->Send(kXR_attn, kXPD_msg, buff, len))
640 int XrdProofdProofServ::SendDataN(
void *buff,
int len)
642 XPDLOC(SMGR,
"ProofServ::SendDataN")
644 TRACE(HDBG, "length: "<<len<<" bytes");
648 XrdSysMutexHelper mhp(fMutex);
651 XrdClientID *csid = 0;
653 for (ic = 0; ic < (
int) fClients.size(); ic++) {
654 if ((csid = fClients.at(ic)) && csid->P()) {
655 XrdProofdResponse *resp = csid->R();
656 if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
668 void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
670 XPDLOC(SMGR,
"ProofServ::ExportBuf")
674 XrdOucString tag, alias;
675 { XrdSysMutexHelper mhp(fMutex);
681 XPDFORM(buf,
" | %d %s %s %d %d",
id, tag.c_str(), alias.c_str(), status, nc);
682 TRACE(HDBG,
"buf: "<< buf);
691 int XrdProofdProofServ::CreateUNIXSock(XrdSysError *edest)
693 XPDLOC(SMGR,
"ProofServ::CreateUNIXSock")
699 TRACE(DBG,
"UNIX socket exists already! ("<<fUNIXSockPath<<
")");
704 fUNIXSock =
new XrdNet(edest);
707 if (fAdminPath.length() > 0) {
708 FILE *fadm = fopen(fAdminPath.c_str(),
"a");
712 TRACE(XERR,
"unable to open / create admin path "<< fAdminPath <<
"; errno = "<<errno);
719 if (unlink(fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
720 XPDPRT(
"WARNING: path exists: unable to delete it:"
721 " try to use it anyway " <<fUNIXSockPath);
728 if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
729 TRACE(XERR,
"unable to create path: " <<fUNIXSockPath);
736 if (fUNIXSock->Bind((
char *)fUNIXSockPath.c_str())) {
737 TRACE(XERR,
" problems binding to UNIX socket; path: " <<fUNIXSockPath);
740 TRACE(DBG,
"path for UNIX for socket is " <<fUNIXSockPath);
742 TRACE(XERR,
"unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
749 XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
750 if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
751 TRACE(XERR,
"unable to change ownership of the UNIX socket"<<fUNIXSockPath);
763 int XrdProofdProofServ::SetAdminPath(
const char *a,
bool assert,
bool setown)
765 XPDLOC(SMGR,
"ProofServ::SetAdminPath")
767 XrdSysMutexHelper mhp(fMutex);
772 if (!assert) return 0;
775 FILE *fpid = fopen(a, "a");
779 TRACE(XERR,
"unable to open / create admin path "<< fAdminPath <<
"; errno = "<<errno);
785 XPDFORM(fn,
"%s.status", a);
786 if ((fpid = fopen(fn.c_str(),
"a"))) {
787 fprintf(fpid,
"%d", fStatus);
790 TRACE(XERR,
"unable to open / create status path "<< fn <<
"; errno = "<<errno);
797 if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
798 TRACE(XERR,
"unable to get info for user "<<fClient<<
"; errno = "<<errno);
801 if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
802 TRACE(XERR,
"unable to give ownership of the status file "<< fn <<
" to user; errno = "<<errno);
816 int XrdProofdProofServ::Resume()
818 XPDLOC(SMGR,
"ProofServ::Resume")
820 TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
825 { XrdSysMutexHelper mhp(fMutex);
827 if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
828 msg =
"could not propagate resume to proofsrv";
844 static int ExportWorkerDescription(
const char *k, XrdProofWorker *w,
void *s)
846 XPDLOC(PMGR,
"ExportWorkerDescription")
848 XrdOucString *wrks = (XrdOucString *)s;
851 if (w->fType ==
'M') {
852 if (wrks->length() > 0) wrks->insert(
'&',0);
853 wrks->insert(w->Export(), 0);
856 if (wrks->length() > 0)
859 (*wrks) += w->Export(k);
861 TRACE(HDBG, k <<
" : "<<w->fHost.c_str()<<
":"<<w->fPort <<
" act: "<<w->Active());
873 void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
875 XrdSysMutexHelper mhp(fMutex);
877 fWorkers.Apply(ExportWorkerDescription, (
void *)&wrks);
883 void XrdProofdProofServ::DumpQueries()
885 XPDLOC(PMGR,
"DumpQueries")
887 XrdSysMutexHelper mhp(fMutex);
889 TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
890 TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
891 ",
# of queries: "<< fQueries.size());
892 std::list<XrdProofQuery *>::iterator ii;
894 for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
896 TRACE(ALL,
" +++ #"<<i<<
" tag:"<< (*ii)->GetTag()<<
" dset: "<<
897 (*ii)->GetDSName()<<
" size:"<<(*ii)->GetDSSize());
899 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
905 XrdProofQuery *XrdProofdProofServ::GetQuery(
const char *tag)
907 XrdProofQuery *q = 0;
908 if (!tag || strlen(tag) <= 0)
return q;
910 XrdSysMutexHelper mhp(fMutex);
912 if (fQueries.size() <= 0)
return q;
914 std::list<XrdProofQuery *>::iterator ii;
915 for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
917 if (!strcmp(tag, q->GetTag()))
break;
927 void XrdProofdProofServ::RemoveQuery(
const char *tag)
929 XrdProofQuery *q = 0;
930 if (!tag || strlen(tag) <= 0)
return;
932 XrdSysMutexHelper mhp(fMutex);
934 if (fQueries.size() <= 0)
return;
936 std::list<XrdProofQuery *>::iterator ii;
937 for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
939 if (!strcmp(tag, q->GetTag()))
break;
955 static int CountEffectiveSessions(
const char *, XrdProofWorker *w,
void *s)
957 int *actw = (
int *)s;
959 *actw += w->GetNActiveSessions();
973 void XrdProofdProofServ::SendClusterInfo(
int nsess,
int nacti)
975 XPDLOC(PMGR,
"SendClusterInfo")
978 if (fWorkers.Num() <= 0) return;
981 fWorkers.Apply(CountEffectiveSessions, (
void *)&actw);
983 int neffs = (actw*1000)/fWorkers.Num();
984 TRACE(DBG, "
# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.);
986 XrdSysMutexHelper mhp(fMutex);
989 int len = 3*
sizeof(kXR_int32);
990 char *buf =
new char[len];
992 kXR_int32 itmp = nsess;
993 itmp =
static_cast<kXR_int32
>(htonl(itmp));
994 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
995 off +=
sizeof(kXR_int32);
997 itmp =
static_cast<kXR_int32
>(htonl(itmp));
998 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
999 off +=
sizeof(kXR_int32);
1001 itmp =
static_cast<kXR_int32
>(htonl(itmp));
1002 memcpy(buf + off, &itmp,
sizeof(kXR_int32));
1004 if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
1006 TRACE(XERR,
"problems sending proofserv");
1016 int XrdProofdProofServ::CheckSession(
bool oldvers,
bool isrec,
1017 int shutopt,
int shutdel,
bool changeown,
int &nc)
1019 XPDLOC(PMGR,
"SendClusterInfo")
1024 { XrdSysMutexHelper mhp(fMutex);
1026 bool skipcheck = fSkipCheck;
1029 if (!skipcheck || oldvers) {
1032 std::vector<XrdClientID *>::iterator i;
1033 for (i = fClients.begin(); i != fClients.end(); ++i) {
1034 if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
1037 if (nc <= 0 && (!isrec || oldvers)) {
1038 int idlet = -1, disct = -1, now = time(0);
1039 if (fStatus == kXPD_idle)
1040 idlet = now - fSetIdleTime;
1041 if (idlet <= 0) idlet = -1;
1042 if (fDisconnectTime > 0)
1043 disct = now - fDisconnectTime;
1044 if (disct <= 0) disct = -1;
1045 if ((fSrvType != kXPD_TopMaster) ||
1046 (shutopt == 1 && (idlet >= shutdel)) ||
1047 (shutopt == 2 && (disct >= shutdel))) {
1051 XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
1052 if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
1053 XPDFORM(emsg,
"ord: problems signalling process: %d", fSrvPID);
1063 if (emsg.length() > 0) {
1064 TRACE(XERR,emsg.c_str());