13 #include "XrdNet/XrdNet.hh" 
   27 XrdProofdProofServ::XrdProofdProofServ()
 
   29    fMutex = 
new XrdSysRecMutex;
 
   37    fSrvType = kXPD_AnyServer;
 
   47    fSetIdleTime = time(0);
 
   66 XrdProofdProofServ::~XrdProofdProofServ()
 
   71    std::vector<XrdClientID *>::iterator i;
 
   72    for (i = fClients.begin(); i != fClients.end(); ++i)
 
   84    unlink(fUNIXSockPath.c_str());
 
   92 static int DecreaseWorkerCounters(
const char *, XrdProofWorker *w, 
void *x)
 
   94    XPDLOC(PMGR, 
"DecreaseWorkerCounters")
 
   96    XrdProofdProofServ *xps = (XrdProofdProofServ *)x;
 
   99       w->RemoveProofServ(xps);
 
  100       TRACE(REQ, w->fHost.c_str() <<
" done");
 
  112 static int DumpWorkerCounters(
const char *k, XrdProofWorker *w, 
void *)
 
  114    XPDLOC(PMGR, 
"DumpWorkerCounters")
 
  117       TRACE(ALL, k <<
" : "<<w->fHost.c_str()<<
":"<<w->fPort <<
" act: "<<w->Active());
 
  129 void XrdProofdProofServ::ClearWorkers()
 
  131    XrdSysMutexHelper mhp(fMutex);
 
  134    fWorkers.Apply(DecreaseWorkerCounters, 
this);
 
  141 void XrdProofdProofServ::AddWorker(
const char *o, XrdProofWorker *w)
 
  143    if (!o || !w) 
return;
 
  145    XrdSysMutexHelper mhp(fMutex);
 
  147    fWorkers.Add(o, w, 0, Hash_keepdata);
 
  153 void XrdProofdProofServ::RemoveWorker(
const char *o)
 
  155    XPDLOC(SMGR, 
"ProofServ::RemoveWorker")
 
  159    TRACE(DBG,"removing: "<<o);
 
  161    XrdSysMutexHelper mhp(fMutex);
 
  163    XrdProofWorker *w = fWorkers.Find(o);
 
  164    if (w) w->RemoveProofServ(this);
 
  166    if (TRACING(HDBG)) fWorkers.Apply(DumpWorkerCounters, 0);
 
  173 int XrdProofdProofServ::Reset(const 
char *msg, 
int type)
 
  175    XPDLOC(SMGR, 
"ProofServ::Reset")
 
  181    XPDFORM(fn, "%s.status", fAdminPath.c_str());
 
  182    FILE *fpid = fopen(fn.c_str(), "r");
 
  185       if (fgets(line, 
sizeof(line), fpid)) {
 
  186          if (line[strlen(line)-1] == 
'\n') line[strlen(line)-1] = 0;
 
  189          TRACE(XERR,
"problems reading from file "<<fn);
 
  193    TRACE(DBG,
"file: "<<fn<<
", st:"<<st);
 
  194    XrdSysMutexHelper mhp(fMutex);
 
  197       Broadcast(
"idle-timeout", type);
 
  199       Broadcast(msg, type);
 
  202    if (fSrvType == kXPD_TopMaster) rc = 1;
 
  212 void XrdProofdProofServ::Reset()
 
  214    XrdSysMutexHelper mhp(fMutex);
 
  229    fDisconnectTime = -1;
 
  235    fSrvType = kXPD_AnyServer;
 
  255 void XrdProofdProofServ::DeleteUNIXSock()
 
  258    unlink(fUNIXSockPath.c_str());
 
  265 bool XrdProofdProofServ::SkipCheck()
 
  267    XrdSysMutexHelper mhp(fMutex);
 
  269    bool rc = fSkipCheck;
 
  277 XrdClientID *XrdProofdProofServ::GetClientID(
int cid)
 
  279    XPDLOC(SMGR, 
"ProofServ::GetClientID")
 
  281    XrdClientID *csid = 0;
 
  284       TRACE(XERR, 
"negative ID: protocol error!");
 
  289    {  XrdSysMutexHelper mhp(fMutex);
 
  296       if (cid < (
int)fClients.size()) {
 
  297          csid = fClients.at(cid);
 
  302             XPDFORM(msg, 
"cid: %d, size: %d", cid, fClients.size());
 
  308          if (cid >= (
int)fClients.capacity())
 
  309             fClients.reserve(2*fClients.capacity());
 
  312          int ic = (int)fClients.size();
 
  313          for (; ic <= cid; ic++)
 
  314             fClients.push_back((csid = 
new XrdClientID()));
 
  318             XPDFORM(msg, 
"cid: %d, new size: %d", cid, fClients.size());
 
  331 int XrdProofdProofServ::FreeClientID(
int pid)
 
  333    XPDLOC(SMGR, 
"ProofServ::FreeClientID")
 
  335    TRACE(DBG, "svrPID: "<<fSrvPID<< ", pid: "<<pid<<", session status: "<<
 
  336               fStatus<<", 
# clients: "<< fNClients); 
  339       TRACE(XERR, 
"undefined pid!");
 
  342    if (!IsValid()) 
return rc;
 
  344    {  XrdSysMutexHelper mhp(fMutex);
 
  347       std::vector<XrdClientID *>::iterator i;
 
  348       for (i = fClients.begin(); i != fClients.end(); ++i) {
 
  349          if ((*i) && (*i)->P()) {
 
  350             if ((*i)->P()->Pid() == pid || (*i)->P()->Pid() == -1) {
 
  351                if (fProtocol == (*i)->P()) {
 
  356                if (fParent == (*i)) SetParent(0);
 
  360                   fDisconnectTime = time(0);
 
  367    if (TRACING(REQ) && (rc == 0)) {
 
  369       TRACE(REQ, spid<<
": slot for client pid: "<<pid<<
" has been reset");
 
  380 int XrdProofdProofServ::GetNClients(
bool check)
 
  382    XrdSysMutexHelper mhp(fMutex);
 
  387       std::vector<XrdClientID *>::iterator i;
 
  388       for (i = fClients.begin(); i != fClients.end(); ++i) {
 
  389          if ((*i) && (*i)->P() && (*i)->P()->Link()) fNClients++;
 
  401 int XrdProofdProofServ::DisconnectTime()
 
  403    XrdSysMutexHelper mhp(fMutex);
 
  406    if (fDisconnectTime > 0)
 
  407       disct = time(0) - fDisconnectTime;
 
  408    return ((disct > 0) ? disct : -1);
 
  415 int XrdProofdProofServ::IdleTime()
 
  417    XrdSysMutexHelper mhp(fMutex);
 
  420    if (fStatus == kXPD_idle)
 
  421       idlet = time(0) - fSetIdleTime;
 
  422    return ((idlet > 0) ? idlet : -1);
 
  429 void XrdProofdProofServ::SetIdle()
 
  431    XrdSysMutexHelper mhp(fMutex);
 
  434    fSetIdleTime = time(0);
 
  441 void XrdProofdProofServ::SetRunning()
 
  443    XrdSysMutexHelper mhp(fMutex);
 
  445    fStatus = kXPD_running;
 
  452 void XrdProofdProofServ::Broadcast(
const char *msg, 
int type)
 
  454    XPDLOC(SMGR, 
"ProofServ::Broadcast")
 
  457    int clproto = (type >= kXPD_wrkmortem) ? 18 : -1;
 
  461    if (msg && (len = strlen(msg)) > 0) {
 
  462       XrdProofdProtocol *p = 0;
 
  463       int ic = 0, ncz = 0, sid = -1;
 
  464       { XrdSysMutexHelper mhp(fMutex); ncz = (int) fClients.size(); }
 
  465       for (ic = 0; ic < ncz; ic++) {
 
  466          {  XrdSysMutexHelper mhp(fMutex);
 
  467             p = fClients.at(ic)->P();
 
  468             sid = fClients.at(ic)->Sid(); }
 
  470          if (p && XPD_CLNT_VERSION_OK(p, clproto)) {
 
  471             XrdProofdResponse *response = p->Response(sid);
 
  473                response->Send(kXR_attn, (XProofActionCode)type, (
void *)msg, len);
 
  476                XPDFORM(m, 
"response instance for sid: %d not found", sid);
 
  485       XPDFORM(m, 
"type: %d, message: '%s' notified to %d clients", type, msg, nc);
 
  498 int XrdProofdProofServ::TerminateProofServ(
bool changeown)
 
  500    XPDLOC(SMGR, 
"ProofServ::TerminateProofServ")
 
  503    TRACE(DBG, "ord: " << fOrdinal << ", pid: " << pid);
 
  508       XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
 
  509       if (XrdProofdAux::KillProcess(pid, 0, ui, changeown) != 0) {
 
  510          TRACE(XERR, 
"ord: problems signalling process: "<<fSrvPID);
 
  512       XrdSysMutexHelper mhp(fMutex);
 
  529 int XrdProofdProofServ::VerifyProofServ(
bool forward)
 
  531    XPDLOC(SMGR, 
"ProofServ::VerifyProofServ")
 
  533    TRACE(DBG, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
 
  539    int len = sizeof(kXR_int32);
 
  540    char *buf = new 
char[len];
 
  542    kXR_int32 ifw = (forward) ? (kXR_int32)1 : (kXR_int32)0;
 
  543    ifw = static_cast<kXR_int32>(htonl(ifw));
 
  544    memcpy(buf, &ifw, sizeof(kXR_int32));
 
  546    {  XrdSysMutexHelper mhp(fMutex);
 
  548       if (!fResponse || fResponse->Send(kXR_attn, kXPD_ping, buf, len) != 0) {
 
  549          msg = 
"could not propagate ping to proofsrv";
 
  568 int XrdProofdProofServ::BroadcastPriority(
int priority)
 
  570    XPDLOC(SMGR, 
"ProofServ::BroadcastPriority")
 
  572    XrdSysMutexHelper mhp(fMutex);
 
  575    int len = sizeof(kXR_int32);
 
  576    char *buf = new 
char[len];
 
  577    kXR_int32 itmp = priority;
 
  578    itmp = static_cast<kXR_int32>(htonl(itmp));
 
  579    memcpy(buf, &itmp, sizeof(kXR_int32));
 
  581    if (!fResponse || fResponse->Send(kXR_attn, kXPD_priority, buf, len) != 0) {
 
  583       TRACE(XERR,
"problems telling proofserv");
 
  588    TRACE(DBG, 
"priority "<<priority<<
" sent over");
 
  596 int XrdProofdProofServ::SendData(
int cid, 
void *buff, 
int len)
 
  598    XPDLOC(SMGR, 
"ProofServ::SendData")
 
  600    TRACE(HDBG, "length: "<<len<<" bytes (cid: "<<cid<<")");
 
  606    XrdClientID *csid = 0;
 
  607    {  XrdSysMutexHelper mhp(fMutex);
 
  608       if (cid < 0 || cid > (
int)(fClients.size() - 1) || !(csid = fClients.at(cid))) {
 
  609          XPDFORM(msg, 
"client ID not found (cid: %d, size: %d)", cid, fClients.size());
 
  612       if (!rs && !(csid->R())) {
 
  613          XPDFORM(msg, 
"client not connected: csid: %p, cid: %d, fSid: %d",
 
  614                        csid, cid, csid->Sid());
 
  623       XrdProofdResponse *response = csid->R() ? csid->R() : 0;
 
  625          if (!response->Send(kXR_attn, kXPD_msg, buff, len))
 
  640 int XrdProofdProofServ::SendDataN(
void *buff, 
int len)
 
  642    XPDLOC(SMGR, 
"ProofServ::SendDataN")
 
  644    TRACE(HDBG, "length: "<<len<<" bytes");
 
  648    XrdSysMutexHelper mhp(fMutex);
 
  651    XrdClientID *csid = 0;
 
  653    for (ic = 0; ic < (
int) fClients.size(); ic++) {
 
  654       if ((csid = fClients.at(ic)) && csid->P()) {
 
  655          XrdProofdResponse *resp = csid->R();
 
  656          if (!resp || resp->Send(kXR_attn, kXPD_msg, buff, len) != 0)
 
  668 void XrdProofdProofServ::ExportBuf(XrdOucString &buf)
 
  670    XPDLOC(SMGR, 
"ProofServ::ExportBuf")
 
  674    XrdOucString tag, alias;
 
  675    {  XrdSysMutexHelper mhp(fMutex);
 
  681    XPDFORM(buf, 
" | %d %s %s %d %d", 
id, tag.c_str(), alias.c_str(), status, nc);
 
  682    TRACE(HDBG, 
"buf: "<< buf);
 
  691 int XrdProofdProofServ::CreateUNIXSock(XrdSysError *edest)
 
  693    XPDLOC(SMGR, 
"ProofServ::CreateUNIXSock")
 
  699        TRACE(DBG,
"UNIX socket exists already! ("<<fUNIXSockPath<<
")");
 
  704    fUNIXSock = 
new XrdNet(edest);
 
  707    if (fAdminPath.length() > 0) {
 
  708       FILE *fadm = fopen(fAdminPath.c_str(), 
"a");
 
  712          TRACE(XERR, 
"unable to open / create admin path "<< fAdminPath << 
"; errno = "<<errno);
 
  719    if (unlink(fUNIXSockPath.c_str()) != 0 && (errno != ENOENT)) {
 
  720       XPDPRT(
"WARNING: path exists: unable to delete it:" 
  721                " try to use it anyway " <<fUNIXSockPath);
 
  728       if ((fd = open(fUNIXSockPath.c_str(), O_EXCL | O_RDWR | O_CREAT, 0700)) < 0) {
 
  729          TRACE(XERR, 
"unable to create path: " <<fUNIXSockPath);
 
  736       if (fUNIXSock->Bind((
char *)fUNIXSockPath.c_str())) {
 
  737          TRACE(XERR, 
" problems binding to UNIX socket; path: " <<fUNIXSockPath);
 
  740          TRACE(DBG, 
"path for UNIX for socket is " <<fUNIXSockPath);
 
  742       TRACE(XERR, 
"unable to open / create path for UNIX socket; tried path "<< fUNIXSockPath);
 
  749       XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
 
  750       if (chown(fUNIXSockPath.c_str(), ui.fUid, ui.fGid) != 0) {
 
  751          TRACE(XERR, 
"unable to change ownership of the UNIX socket"<<fUNIXSockPath);
 
  763 int XrdProofdProofServ::SetAdminPath(
const char *a, 
bool assert, 
bool setown)
 
  765    XPDLOC(SMGR, 
"ProofServ::SetAdminPath")
 
  767    XrdSysMutexHelper mhp(fMutex);
 
  772    if (!assert) return 0;
 
  775    FILE *fpid = fopen(a, "a");
 
  779       TRACE(XERR, 
"unable to open / create admin path "<< fAdminPath << 
"; errno = "<<errno);
 
  785    XPDFORM(fn, 
"%s.status", a);
 
  786    if ((fpid = fopen(fn.c_str(), 
"a"))) {
 
  787       fprintf(fpid, 
"%d", fStatus);
 
  790       TRACE(XERR, 
"unable to open / create status path "<< fn << 
"; errno = "<<errno);
 
  797       if (XrdProofdAux::GetUserInfo(fClient.c_str(), ui) != 0) {
 
  798          TRACE(XERR, 
"unable to get info for user "<<fClient<<
"; errno = "<<errno);
 
  801       if (XrdProofdAux::ChangeOwn(fn.c_str(), ui) != 0) {
 
  802          TRACE(XERR, 
"unable to give ownership of the status file "<< fn << 
" to user; errno = "<<errno);
 
  816 int XrdProofdProofServ::Resume()
 
  818    XPDLOC(SMGR, 
"ProofServ::Resume")
 
  820    TRACE(REQ, "ord: " << fOrdinal<< ", pid: " << fSrvPID);
 
  825    {  XrdSysMutexHelper mhp(fMutex);
 
  827       if (!fResponse || fResponse->Send(kXR_attn, kXPD_resume, 0, 0) != 0) {
 
  828          msg = 
"could not propagate resume to proofsrv";
 
  844 static int ExportWorkerDescription(
const char *k, XrdProofWorker *w, 
void *s)
 
  846    XPDLOC(PMGR, 
"ExportWorkerDescription")
 
  848    XrdOucString *wrks = (XrdOucString *)s;
 
  851       if (w->fType == 
'M') {
 
  852          if (wrks->length() > 0) wrks->insert(
'&',0);
 
  853          wrks->insert(w->Export(), 0);
 
  856          if (wrks->length() > 0)
 
  859          (*wrks) += w->Export(k);
 
  861       TRACE(HDBG, k <<
" : "<<w->fHost.c_str()<<
":"<<w->fPort <<
" act: "<<w->Active());
 
  873 void XrdProofdProofServ::ExportWorkers(XrdOucString &wrks)
 
  875    XrdSysMutexHelper mhp(fMutex);
 
  877    fWorkers.Apply(ExportWorkerDescription, (
void *)&wrks);
 
  883 void XrdProofdProofServ::DumpQueries()
 
  885    XPDLOC(PMGR, 
"DumpQueries")
 
  887    XrdSysMutexHelper mhp(fMutex);
 
  889    TRACE(ALL," ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
 
  890    TRACE(ALL," +++ client: "<<fClient<<", session: "<< fSrvPID <<
 
  891              ", 
# of queries: "<< fQueries.size()); 
  892    std::list<XrdProofQuery *>::iterator ii;
 
  894    for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
 
  896       TRACE(ALL,
" +++ #"<<i<<
" tag:"<< (*ii)->GetTag()<<
" dset: "<<
 
  897                 (*ii)->GetDSName()<<
" size:"<<(*ii)->GetDSSize());
 
  899    TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
 
  905 XrdProofQuery *XrdProofdProofServ::GetQuery(
const char *tag)
 
  907    XrdProofQuery *q = 0;
 
  908    if (!tag || strlen(tag) <= 0) 
return q;
 
  910    XrdSysMutexHelper mhp(fMutex);
 
  912    if (fQueries.size() <= 0) 
return q;
 
  914    std::list<XrdProofQuery *>::iterator ii;
 
  915    for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
 
  917       if (!strcmp(tag, q->GetTag())) 
break;
 
  927 void XrdProofdProofServ::RemoveQuery(
const char *tag)
 
  929    XrdProofQuery *q = 0;
 
  930    if (!tag || strlen(tag) <= 0) 
return;
 
  932    XrdSysMutexHelper mhp(fMutex);
 
  934    if (fQueries.size() <= 0) 
return;
 
  936    std::list<XrdProofQuery *>::iterator ii;
 
  937    for (ii = fQueries.begin(); ii != fQueries.end(); ++ii) {
 
  939       if (!strcmp(tag, q->GetTag())) 
break;
 
  955 static int CountEffectiveSessions(
const char *, XrdProofWorker *w, 
void *s)
 
  957    int *actw = (
int *)s;
 
  959       *actw += w->GetNActiveSessions();
 
  973 void XrdProofdProofServ::SendClusterInfo(
int nsess, 
int nacti)
 
  975    XPDLOC(PMGR, 
"SendClusterInfo")
 
  978    if (fWorkers.Num() <= 0) return;
 
  981    fWorkers.Apply(CountEffectiveSessions, (
void *)&actw);
 
  983    int neffs = (actw*1000)/fWorkers.Num();
 
  984    TRACE(DBG, "
# sessions: "<<nsess<<", # active: "<<nacti<<", # effective: "<<neffs/1000.); 
  986    XrdSysMutexHelper mhp(fMutex);
 
  989    int len = 3*
sizeof(kXR_int32);
 
  990    char *buf = 
new char[len];
 
  992    kXR_int32 itmp = nsess;
 
  993    itmp = 
static_cast<kXR_int32
>(htonl(itmp));
 
  994    memcpy(buf + off, &itmp, 
sizeof(kXR_int32));
 
  995    off += 
sizeof(kXR_int32);
 
  997    itmp = 
static_cast<kXR_int32
>(htonl(itmp));
 
  998    memcpy(buf + off, &itmp, 
sizeof(kXR_int32));
 
  999    off += 
sizeof(kXR_int32);
 
 1001    itmp = 
static_cast<kXR_int32
>(htonl(itmp));
 
 1002    memcpy(buf + off, &itmp, 
sizeof(kXR_int32));
 
 1004    if (!fResponse || fResponse->Send(kXR_attn, kXPD_clusterinfo, buf, len) != 0) {
 
 1006       TRACE(XERR,
"problems sending proofserv");
 
 1016 int XrdProofdProofServ::CheckSession(
bool oldvers, 
bool isrec,
 
 1017                                       int shutopt, 
int shutdel, 
bool changeown, 
int &nc)
 
 1019    XPDLOC(PMGR, 
"SendClusterInfo")
 
 1024    {  XrdSysMutexHelper mhp(fMutex);
 
 1026       bool skipcheck = fSkipCheck;
 
 1029       if (!skipcheck || oldvers) {
 
 1032          std::vector<XrdClientID *>::iterator i;
 
 1033          for (i = fClients.begin(); i != fClients.end(); ++i) {
 
 1034             if ((*i) && (*i)->P() && (*i)->P()->Link()) nc++;
 
 1037          if (nc <= 0 && (!isrec || oldvers)) {
 
 1038             int idlet = -1, disct = -1, now = time(0);
 
 1039             if (fStatus == kXPD_idle)
 
 1040                idlet = now - fSetIdleTime;
 
 1041             if (idlet <= 0) idlet = -1;
 
 1042             if (fDisconnectTime > 0)
 
 1043                disct = now - fDisconnectTime;
 
 1044             if (disct <= 0) disct = -1;
 
 1045             if ((fSrvType != kXPD_TopMaster) ||
 
 1046                 (shutopt == 1 && (idlet >= shutdel)) ||
 
 1047                 (shutopt == 2 && (disct >= shutdel))) {
 
 1051                   XrdProofdAux::GetUserInfo(fClient.c_str(), ui);
 
 1052                   if (XrdProofdAux::KillProcess(fSrvPID, 0, ui, changeown) != 0) {
 
 1053                      XPDFORM(emsg, 
"ord: problems signalling process: %d", fSrvPID);
 
 1063    if (emsg.length() > 0) {
 
 1064       TRACE(XERR,emsg.c_str());