29 # include "XrdNet/XrdNetAddr.hh"
31 #include "Xrd/XrdBuffer.hh"
36 #include "XrdOuc/XrdOucStream.hh"
37 #include "XrdSys/XrdSysPlatform.hh"
55 int MessageSender(
const char *msg,
int len,
void *arg)
57 XrdProofdResponse *r = (XrdProofdResponse *) arg;
59 return r->Send(kXR_attn, kXPD_srvmsg, 2, (
char *) msg, len);
67 XrdProofdNetMgr::XrdProofdNetMgr(XrdProofdManager *mgr,
68 XrdProtocol_Config *pi, XrdSysError *e)
69 : XrdProofdConfig(pi->ConfigFN, e)
72 fResourceType = kRTNone;
74 fPROOFcfg.fMtime = -1;
81 fNumLocalWrks = XrdProofdAux::GetNumCPUs();
92 void XrdProofdNetMgr::RegisterDirectives()
94 Register(
"adminreqto",
new XrdProofdDirective(
"adminreqto",
this, &DoDirectiveClass));
95 Register(
"resource",
new XrdProofdDirective(
"resource",
this, &DoDirectiveClass));
96 Register(
"worker",
new XrdProofdDirective(
"worker",
this, &DoDirectiveClass));
97 Register(
"localwrks",
new XrdProofdDirective(
"localwrks", (
void *)&fNumLocalWrks, &DoDirectiveInt));
103 XrdProofdNetMgr::~XrdProofdNetMgr()
107 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
108 while (w != fRegWorkers.end()) {
110 w = fRegWorkers.erase(w);
112 w = fDfltWorkers.begin();
113 while (w != fDfltWorkers.end()) {
115 w = fDfltWorkers.erase(w);
124 int XrdProofdNetMgr::Config(
bool rcf)
126 XPDLOC(NMGR,
"NetMgr::Config")
129 XrdSysMutexHelper mhp(fMutex);
132 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
133 while (w != fWorkers.end()) {
135 w = fWorkers.erase(w);
138 XrdOucString mm(
"master ", 128);
142 fWorkers.push_back(
new XrdProofWorker(mm.c_str()));
145 if (XrdProofdConfig::Config(rcf) != 0) {
146 XPDERR(
"problems parsing file ");
151 msg = (rcf) ?
"re-configuring" :
"configuring";
154 if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
155 TRACE(ALL,
"PROOF config file: " <<
156 ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() :
"none"));
157 if (fResourceType == kRTStatic) {
161 if (fPROOFcfg.fName.length() > 0) {
163 if (ReadPROOFcfg() == 0) {
164 TRACE(ALL,
"PROOF config file will " <<
165 ((fReloadPROOFcfg) ?
"" :
"not ") <<
"be reloaded upon change");
168 if (!fDfltFallback) {
169 XPDERR(
"unable to find valid information in PROOF config file " <<
171 fPROOFcfg.fMtime = -1;
174 TRACE(ALL,
"file " << fPROOFcfg.fName <<
" cannot be parsed: use default configuration to start with");
180 CreateDefaultPROOFcfg();
182 }
else if (fResourceType == kRTNone && fWorkers.size() <= 1) {
184 CreateDefaultPROOFcfg();
192 XrdProofConn::SetRetryParam(1, 1);
194 EnvPutInt(NAME_REQUESTTIMEOUT, fRequestTO);
197 XPDFORM(msg,
"%d worker nodes defined at start-up", fWorkers.size() - 1);
207 int XrdProofdNetMgr::DoDirective(XrdProofdDirective *d,
208 char *val, XrdOucStream *cfg,
bool rcf)
210 XPDLOC(NMGR,
"NetMgr::DoDirective")
216 if (d->fName == "resource") {
217 return DoDirectiveResource(val, cfg, rcf);
218 }
else if (d->fName ==
"adminreqto") {
219 return DoDirectiveAdminReqTO(val, cfg, rcf);
220 }
else if (d->fName ==
"worker") {
221 return DoDirectiveWorker(val, cfg, rcf);
224 TRACE(XERR,
"unknown directive: " << d->fName);
232 void XrdProofdNetMgr::BalanceNodesOrder()
234 list<XrdProofWorker *>::const_iterator iter, iter2;
235 list<XrdProofWorker *>::iterator iter3;
237 map<XrdProofWorker *, BalancerInfo> info;
239 unsigned int min = UINT_MAX;
241 unsigned int total = 0, total_perit = 0;
243 unsigned int total_added = 0;
245 list<XrdProofWorker *> tempNodes;
250 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
258 info[*iter].available = 0;
259 for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
260 if ((*iter)->Matches(*iter2)) {
261 info[*iter].available++;
264 info[*iter].added = 0;
266 if (info[*iter].available > 1 && info[*iter].available < min)
267 min = info[*iter].available;
269 total += info[*iter].available;
274 for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
275 if (info[*iter].available > 1) {
276 info[*iter].per_iteration = (
unsigned int)floor((
double)info[*iter].available / (double)min);
278 info[*iter].per_iteration = 1;
281 total_perit += info[*iter].per_iteration;
286 tempNodes.push_back(fWorkers.front());
290 while (total_added < total) {
291 for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
292 if (i->second.added < i->second.available) {
294 unsigned int to_add = xrdmin(i->second.per_iteration,
295 (i->second.available - i->second.added));
297 for (
unsigned int j = 0; j < to_add; j++) {
298 tempNodes.push_back(i->first);
300 i->second.added += to_add;
301 total_added += to_add;
314 iter3 = ++(fWorkers.begin());
315 while (iter3 != fWorkers.end()) {
320 if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
322 for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
323 if ((*iter2)->Matches(*iter3)) {
325 (*iter2)->MergeProofServs(*(*iter3));
328 fWorkers.erase(iter3++);
339 fWorkers = tempNodes;
345 int XrdProofdNetMgr::DoDirectiveAdminReqTO(
char *val, XrdOucStream *cfg,
bool)
352 if (fMgr->Host() && cfg)
353 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
358 int to = strtol(val, 0, 10);
359 fRequestTO = (to > 0) ? to : fRequestTO;
366 int XrdProofdNetMgr::DoDirectiveResource(
char *val, XrdOucStream *cfg,
bool)
368 XPDLOC(NMGR,
"NetMgr::DoDirectiveResource")
374 if (!strcmp("static", val)) {
377 fResourceType = kRTStatic;
378 while ((val = cfg->GetWord()) && val[0]) {
380 if (s.beginswith(
"ucfg:")) {
381 fWorkerUsrCfg = s.endswith(
"yes") ? 1 : 0;
382 }
else if (s.beginswith(
"reload:")) {
383 fReloadPROOFcfg = (s.endswith(
"1") || s.endswith(
"yes")) ? 1 : 0;
384 }
else if (s.beginswith(
"dfltfallback:")) {
385 fDfltFallback = (s.endswith(
"1") || s.endswith(
"yes")) ? 1 : 0;
386 }
else if (s.beginswith(
"wmx:")) {
387 }
else if (s.beginswith(
"selopt:")) {
390 fPROOFcfg.fName = val;
391 if (fPROOFcfg.fName.beginswith(
"sm:")) {
392 fPROOFcfg.fName.replace(
"sm:",
"");
394 XrdProofdAux::Expand(fPROOFcfg.fName);
396 if (access(fPROOFcfg.fName.c_str(), R_OK)) {
397 if (errno == ENOENT) {
398 TRACE(ALL,
"WARNING: configuration file does not exists: " << fPROOFcfg.fName);
400 TRACE(XERR,
"configuration file cannot be read: " << fPROOFcfg.fName);
401 fPROOFcfg.fName =
"";
402 fPROOFcfg.fMtime = -1;
414 int XrdProofdNetMgr::DoDirectiveWorker(
char *val, XrdOucStream *cfg,
bool)
416 XPDLOC(NMGR,
"NetMgr::DoDirectiveWorker")
423 XrdSysMutexHelper mhp(fMutex);
427 XrdOucString wrd(cfg->GetWord());
428 if (wrd.length() > 0) {
431 char rest[2048] = {0};
432 cfg->GetRest((
char *)&rest[0], 2048);
433 XPDFORM(line,
"%s %s", wrd.c_str(), rest);
435 if (wrd ==
"master" || wrd ==
"node") {
437 XrdProofWorker *pw =
new XrdProofWorker(line.c_str());
438 if (pw->fHost.beginswith(
"localhost") ||
439 pw->Matches(fMgr->Host())) {
441 XrdProofWorker *fw = fWorkers.front();
442 fw->Reset(line.c_str());
448 int ir = line.find(
"repeat=");
449 if (ir != STR_NPOS) {
450 XrdOucString r(line, ir + strlen(
"repeat="));
451 r.erase(r.find(
' '));
453 if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
454 TRACE(DBG,
"found repeat = " << nr);
458 XrdProofdMultiStr mline(line.c_str());
459 if (mline.IsValid()) {
460 TRACE(DBG,
"found multi-line with: " << mline.N() <<
" tokens");
461 for (
int i = 0; i < mline.N(); i++) {
462 TRACE(HDBG,
"found token: " << mline.Get(i));
463 fWorkers.push_back(
new XrdProofWorker(mline.Get(i).c_str()));
466 TRACE(DBG,
"found line: " << line);
467 fWorkers.push_back(
new XrdProofWorker(line.c_str()));
486 int XrdProofdNetMgr::BroadcastCtrlC(
const char *usr)
488 XPDLOC(NMGR,
"NetMgr::BroadcastCtrlC")
493 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
494 XrdProofWorker *w = 0;
495 while (iw != fNodes.end()) {
496 if ((w = *iw) && w->fType !=
'M') {
498 bool us = (((w->fHost.find(
"localhost") != STR_NPOS ||
499 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
500 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
505 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
506 if (u.length() <= 0) u = fMgr->EffectiveUser();
509 if (w->fPort != -1) {
513 TRACE(HDBG,
"sending request to: "<<u);
515 XrdProofConn *conn = GetProofConn(u.c_str());
516 if (conn && conn->IsValid()) {
518 XPClientRequest reqhdr;
519 memset(&reqhdr, 0,
sizeof(reqhdr));
520 conn->SetSID(reqhdr.header.streamid);
521 reqhdr.proof.requestid = kXP_ctrlc;
522 reqhdr.proof.sid = 0;
523 reqhdr.proof.dlen = 0;
525 if (XPD::clientMarshall(&reqhdr) != 0) {
526 TRACE(XERR,
"problems marshalling request");
529 if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
530 TRACE(XERR,
"problems sending ctrl-c request to server " << u);
537 TRACE(DBG,
"broadcast request for ourselves: ignore");
552 int XrdProofdNetMgr::Broadcast(
int type,
const char *msg,
const char *usr,
553 XrdProofdResponse *r,
bool notify,
int subtype)
555 XPDLOC(NMGR,
"NetMgr::Broadcast")
557 unsigned
int nok = 0;
558 TRACE(REQ, "type: " << type);
561 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
562 XrdProofWorker *w = 0;
563 XrdClientMessage *xrsp = 0;
564 while (iw != fNodes.end()) {
565 if ((w = *iw) && w->fType !=
'M') {
567 bool us = (((w->fHost.find(
"localhost") != STR_NPOS ||
568 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
569 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
574 XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
575 if (u.length() <= 0) u = fMgr->EffectiveUser();
578 if (w->fPort != -1) {
583 int srvtype = (w->fType !=
'W') ? (kXR_int32) kXPD_Master
584 : (kXR_int32) kXPD_Worker;
585 TRACE(HDBG,
"sending request to " << u);
587 if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
588 TRACE(XERR,
"problems sending request to " << u);
595 TRACE(DBG,
"broadcast request for ourselves: ignore");
603 return (nok == fNodes.size()) ? 0 : -1;
609 XrdProofConn *XrdProofdNetMgr::GetProofConn(
const char *url)
614 XrdOucString buf =
" Manager connection from ";
620 XrdSysMutexHelper mhp(fMutex);
621 p =
new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
623 if (p && !(p->IsValid())) SafeDelete(p);
633 XrdClientMessage *XrdProofdNetMgr::Send(
const char *url,
int type,
634 const char *msg,
int srvtype,
635 XrdProofdResponse *r,
bool notify,
638 XPDLOC(NMGR,
"NetMgr::Send")
640 XrdClientMessage *xrsp = 0;
641 TRACE(REQ, "type: " << type);
643 if (!url || strlen(url) <= 0)
647 XrdProofConn *conn = GetProofConn(url);
650 if (conn && conn->IsValid()) {
651 XrdOucString notifymsg(
"Send: ");
653 XPClientRequest reqhdr;
656 memset(&reqhdr, 0,
sizeof(reqhdr));
657 conn->SetSID(reqhdr.header.streamid);
658 reqhdr.header.requestid = kXP_admin;
659 reqhdr.proof.int1 = type;
662 notifymsg +=
"change-of-ROOT version request to ";
664 notifymsg +=
" msg: ";
666 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
667 buf = (msg) ? (
const void *)msg : buf;
669 case kCleanupSessions:
670 notifymsg +=
"cleanup request to ";
672 notifymsg +=
" for user: ";
674 reqhdr.proof.int2 = (kXR_int32) srvtype;
675 reqhdr.proof.sid = -1;
676 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
677 buf = (msg) ? (
const void *)msg : buf;
680 notifymsg +=
"exec ";
681 notifymsg += subtype;
682 notifymsg +=
"request for ";
684 reqhdr.proof.int2 = (kXR_int32) subtype;
685 reqhdr.proof.sid = -1;
686 reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
687 buf = (msg) ? (
const void *)msg : buf;
691 TRACE(XERR,
"invalid request type " << type);
697 r->Send(kXR_attn, kXPD_srvmsg, 0, (
char *) notifymsg.c_str(), notifymsg.length());
700 conn->SetAsync(conn, &MessageSender, (
void *)r);
704 xrsp = conn->SendReq(&reqhdr, buf, vout,
"NetMgr::Send");
707 conn->SetAsync(0, 0, (
void *)0);
710 if (r && !xrsp && conn->GetLastErr()) {
711 XrdOucString cmsg = url;
713 cmsg += conn->GetLastErr();
714 r->Send(kXR_attn, kXPD_srvmsg, (
char *) cmsg.c_str(), cmsg.length());
721 TRACE(XERR,
"could not open connection to " << url);
723 XrdOucString cmsg =
"failure attempting connection to ";
725 r->Send(kXR_attn, kXPD_srvmsg, (
char *) cmsg.c_str(), cmsg.length());
738 bool XrdProofdNetMgr::IsLocal(
const char *host,
bool checkport)
740 XPDLOC(NMGR,
"NetMgr::IsLocal")
743 if (host && strlen(host) > 0) {
744 XrdClientUrlInfo uu(host);
745 if (uu.Port <= 0) uu.Port = 1093;
748 char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
751 aNA.Set(uu.Host.c_str());
752 char *fqn = (
char *) aNA.Name();
754 TRACE(HDBG,
"fqn: '"<<fqn<<
"' mgrh: '"<<fMgr->Host()<<
"'");
755 if (fqn && (strstr(fqn,
"localhost") || !strcmp(fqn,
"127.0.0.1") ||
756 !strcmp(fMgr->Host(), fqn))) {
757 if (!checkport || (uu.Port == fMgr->Port()))
771 int XrdProofdNetMgr::ReadBuffer(XrdProofdProtocol *p)
773 XPDLOC(NMGR,
"NetMgr::ReadBuffer")
776 XPD_SETRESP(p, "ReadBuffer");
782 kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
783 int len = ntohl(p->Request()->readbuf.len);
789 int dlen = p->Request()->header.dlen;
790 int grep = ntohl(p->Request()->readbuf.int1);
793 if (dlen > 0 && p->Argp()->buff) {
794 file =
new char[dlen+1];
795 memcpy(file, p->Argp()->buff, dlen);
798 XrdClientUrlInfo ui(file);
799 if (ui.Host.length() > 0) {
801 local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
803 memcpy(file, ui.File.c_str(), ui.File.length());
804 file[ui.File.length()] = 0;
805 blen = ui.File.length();
806 TRACEP(p, DBG,
"file is LOCAL");
812 pattern =
new char[len + 1];
816 pattern[i++] = file[j++];
818 filen = strdup(file);
819 filen[blen - len] = 0;
820 TRACEP(p, DBG,
"grep operation " << grep <<
", pattern:" << pattern);
823 emsg =
"file name not found";
824 TRACEP(p, XERR, emsg);
825 response->Send(kXR_InvalidRequest, emsg.c_str());
829 TRACEP(p, REQ,
"file: " << filen <<
", ofs: " << ofs <<
", len: " << len <<
830 ", pattern: " << pattern);
832 TRACEP(p, REQ,
"file: " << file <<
", ofs: " << ofs <<
", len: " << len);
842 buf = ReadBufferLocal(filen, pattern, lout, grep);
845 buf = ReadBufferLocal(file, ofs, lout);
849 XrdClientUrlInfo u(file);
850 if (u.User.length() <= 0)
851 u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
852 buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
860 XPDFORM(emsg,
"nothing found by 'grep' in %s, pattern: %s", filen, pattern);
861 TRACEP(p, DBG, emsg);
866 XPDFORM(emsg,
"could not read buffer from %s %s",
867 (local) ?
"local file " :
"remote file ", file);
868 TRACEP(p, XERR, emsg);
869 response->Send(kXR_InvalidRequest, emsg.c_str());
875 emsg =
"nothing found in ";
876 emsg += (grep > 0) ? filen : file;
877 TRACEP(p, DBG, emsg);
884 response->Send(buf, lout);
890 SafeDelArray(pattern);
901 int XrdProofdNetMgr::LocateLocalFile(XrdOucString &file)
903 XPDLOC(NMGR,
"NetMgr::LocateLocalFile")
906 if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
910 int isl = file.rfind('/');
911 if (isl != STR_NPOS) {
912 fn.assign(file, isl + 1, -1);
913 dn.assign(file, 0, isl);
921 DIR *dirp = opendir(dn.c_str());
923 XPDFORM(emsg,
"cannot open '%s' - errno: %d", dn.c_str(), errno);
924 TRACE(XERR, emsg.c_str());
927 struct dirent *ent = 0;
929 while ((ent = readdir(dirp))) {
930 if (!strncmp(ent->d_name,
".", 1) || !strncmp(ent->d_name,
"..", 2))
934 if (sent.matches(fn.c_str()) > 0)
break;
940 if (sent.length() > 0) {
941 XPDFORM(file,
"%s%s", dn.c_str(), sent.c_str());
956 char *XrdProofdNetMgr::ReadBufferLocal(
const char *path, kXR_int64 ofs,
int &len)
958 XPDLOC(NMGR,
"NetMgr::ReadBufferLocal")
961 TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
964 if (!path || strlen(path) <= 0) {
965 TRACE(XERR,
"path undefined!");
970 XrdOucString spath(path);
971 if (LocateLocalFile(spath) != 0) {
972 TRACE(XERR,
"path cannot be resolved! (" << path <<
")");
975 const char *file = spath.c_str();
978 int fd = open(file, O_RDONLY);
980 emsg =
"could not open ";
988 if (fstat(fd, &st) != 0) {
989 emsg =
"could not get size of file with stat: errno: ";
995 off_t ltot = st.st_size;
999 kXR_int64 start = ofs;
1000 off_t fst = (start < 0) ? ltot + start : start;
1001 fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1003 kXR_int64 end = fst + len;
1004 off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1005 TRACE(DBG,
"file size: " << ltot <<
", read from: " << fst <<
" to " << lst);
1011 char *buf = (
char *)malloc(len + 1);
1013 emsg =
"could not allocate enough memory on the heap: errno: ";
1022 lseek(fd, fst, SEEK_SET);
1028 while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1031 TRACE(XERR,
"error reading from file: errno: " << errno);
1039 }
while (nr > 0 && left > 0);
1043 TRACE(HDBG,
"read " << nr <<
" bytes: " << buf);
1059 char *XrdProofdNetMgr::ReadBufferLocal(
const char *path,
1060 const char *pat,
int &len,
int opt)
1062 XPDLOC(NMGR,
"NetMgr::ReadBufferLocal")
1065 TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
1068 if (!path || strlen(path) <= 0) {
1069 TRACE(XERR,
"file path undefined!");
1074 XrdOucString spath(path);
1075 if (LocateLocalFile(spath) != 0) {
1076 TRACE(XERR,
"path cannot be resolved! (" << path <<
")");
1079 const char *file = spath.c_str();
1083 if (stat(file, &st) != 0) {
1084 emsg =
"could not get size of file with stat: errno: ";
1089 off_t ltot = st.st_size;
1094 if (pat && strlen(pat) > 0) {
1095 lcmd = strlen(pat) + strlen(file) + 20;
1096 cmd =
new char[lcmd];
1098 snprintf(cmd, lcmd,
"grep %s %s", pat, file);
1099 }
else if (opt == 2) {
1100 snprintf(cmd, lcmd,
"grep -v %s %s", pat, file);
1101 }
else if (opt == 3) {
1102 snprintf(cmd, lcmd,
"cat %s | %s", file, pat);
1104 snprintf(cmd, lcmd,
"cat %s", file);
1107 lcmd = strlen(file) + 10;
1108 cmd =
new char[lcmd];
1109 snprintf(cmd, lcmd,
"cat %s", file);
1111 TRACE(DBG,
"cmd: " << cmd);
1114 FILE *fp = popen(cmd,
"r");
1116 emsg =
"could not run '";
1129 int bufsiz = 0, left = 0, lines = 0;
1130 while ((ltot > 0) && fgets(line,
sizeof(line), fp)) {
1132 int llen = strlen(line);
1136 if (!buf || (llen > left)) {
1137 int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1138 dsiz = (dsiz > llen) ? dsiz : llen;
1140 buf = (
char *)realloc(buf, bufsiz + 1);
1144 emsg =
"could not allocate enough memory on the heap: errno: ";
1151 memcpy(buf + len, line, llen);
1155 fprintf(stderr,
"line: %s", line);
1180 char *XrdProofdNetMgr::ReadBufferRemote(
const char *url,
const char *file,
1181 kXR_int64 ofs,
int &len,
int grep)
1183 XPDLOC(NMGR,
"NetMgr::ReadBufferRemote")
1185 TRACE(REQ, "url: " << (url ? url : "undef") <<
1186 ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
1187 ", len: " << len << ", grep: " << grep);
1190 if (!file || strlen(file) <= 0) {
1191 TRACE(XERR,
"file undefined!");
1194 XrdClientUrlInfo u(url);
1195 if (!url || strlen(url) <= 0) {
1197 u.TakeUrl(XrdOucString(file));
1198 if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
1202 XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
1205 if (conn && conn->IsValid()) {
1207 XPClientRequest reqhdr;
1208 memset(&reqhdr, 0,
sizeof(reqhdr));
1209 conn->SetSID(reqhdr.header.streamid);
1210 reqhdr.header.requestid = kXP_readbuf;
1211 reqhdr.readbuf.ofs = ofs;
1212 reqhdr.readbuf.len = len;
1213 reqhdr.readbuf.int1 = grep;
1214 reqhdr.header.dlen = strlen(file);
1215 const void *btmp = (
const void *) file;
1218 XrdClientMessage *xrsp =
1219 conn->SendReq(&reqhdr, btmp, vout,
"NetMgr::ReadBufferRemote");
1222 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1223 len = xrsp->DataLen();
1225 if (xrsp && !(xrsp->IsError()))
1246 char *XrdProofdNetMgr::ReadLogPaths(
const char *url,
const char *msg,
int isess)
1248 XPDLOC(NMGR,
"NetMgr::ReadLogPaths")
1250 TRACE(REQ, "url: " << (url ? url : "undef") <<
1251 ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1254 if (!url || strlen(url) <= 0) {
1255 TRACE(XERR,
"url undefined!");
1260 XrdProofConn *conn = GetProofConn(url);
1263 if (conn && conn->IsValid()) {
1265 XPClientRequest reqhdr;
1266 memset(&reqhdr, 0,
sizeof(reqhdr));
1267 conn->SetSID(reqhdr.header.streamid);
1268 reqhdr.header.requestid = kXP_admin;
1269 reqhdr.proof.int1 = kQueryLogPaths;
1270 reqhdr.proof.int2 = isess;
1271 reqhdr.proof.sid = -1;
1272 reqhdr.header.dlen = msg ? strlen(msg) : 0;
1273 const void *btmp = (
const void *) msg;
1276 XrdClientMessage *xrsp =
1277 conn->SendReq(&reqhdr, btmp, vout,
"NetMgr::ReadLogPaths");
1280 if (xrsp && buf && (xrsp->DataLen() > 0)) {
1281 int len = xrsp->DataLen();
1282 buf = (
char *) realloc((
void *)buf, len + 1);
1304 char *XrdProofdNetMgr::ReadLogPaths(
const char *msg,
int isess)
1306 XPDLOC(NMGR,
"NetMgr::ReadLogPaths")
1308 TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1310 char *buf = 0, *pbuf = buf;
1313 std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
1314 XrdProofWorker *w = 0;
1315 while (iw != fNodes.end()) {
1318 bool us = (((w->fHost.find(
"localhost") != STR_NPOS ||
1319 XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
1320 (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
1323 XrdOucString u = fMgr->EffectiveUser();
1326 if (w->fPort != -1) {
1331 char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
1333 len += strlen(bmst) + 1;
1334 buf = (
char *) realloc((
void *)buf, len);
1335 pbuf = buf + len - strlen(bmst) - 1;
1336 memcpy(pbuf, bmst, strlen(bmst) + 1);
1342 TRACE(DBG,
"request for ourselves: ignore");
1357 void XrdProofdNetMgr::CreateDefaultPROOFcfg()
1359 XPDLOC(NMGR,
"NetMgr::CreateDefaultPROOFcfg")
1361 TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
1364 XrdSysMutexHelper mhp(fMutex);
1369 if (fDfltWorkers.size() < 1) {
1371 XrdOucString mm(
"master ", 128);
1373 fDfltWorkers.push_back(
new XrdProofWorker(mm.c_str()));
1376 int nwrk = fNumLocalWrks;
1378 mm =
"worker localhost port=";
1381 fDfltWorkers.push_back(
new XrdProofWorker(mm.c_str()));
1382 TRACE(DBG,
"added line: " << mm);
1388 std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
1389 for (; w != fDfltWorkers.end(); ++w) {
1390 fWorkers.push_back(*w);
1393 TRACE(DBG,
"done: " << fWorkers.size() - 1 <<
" workers");
1406 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
1408 XPDLOC(NMGR,
"NetMgr::GetActiveWorkers")
1410 XrdSysMutexHelper mhp(fMutex);
1412 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1414 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1415 if (fDfltFallback) {
1417 CreateDefaultPROOFcfg();
1418 TRACE(DBG,
"parsing of " << fPROOFcfg.fName <<
" failed: use default settings");
1420 TRACE(XERR,
"unable to read the configuration file");
1421 return (std::list<XrdProofWorker *> *)0;
1425 TRACE(DBG,
"returning list with " << fWorkers.size() <<
" entries");
1427 if (TRACING(HDBG)) Dump();
1435 void XrdProofdNetMgr::Dump()
1437 const char *xpdloc =
"NetMgr::Dump";
1439 XrdSysMutexHelper mhp(fMutex);
1441 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1442 XPDPRT(
"+ Active workers status");
1443 XPDPRT(
"+ Size: " << fWorkers.size());
1446 std::list<XrdProofWorker *>::iterator iw;
1447 for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
1448 XPDPRT(
"+ wrk: " << (*iw)->fHost <<
":" << (*iw)->fPort <<
" type:" << (*iw)->fType <<
1449 " active sessions:" << (*iw)->Active());
1452 XPDPRT(
"+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1459 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
1461 XPDLOC(NMGR,
"NetMgr::GetNodes")
1463 XrdSysMutexHelper mhp(fMutex);
1465 if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1467 if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1468 if (fDfltFallback) {
1470 CreateDefaultPROOFcfg();
1471 TRACE(DBG,
"parsing of " << fPROOFcfg.fName <<
" failed: use default settings");
1473 TRACE(XERR,
"unable to read the configuration file");
1474 return (std::list<XrdProofWorker *> *)0;
1478 TRACE(DBG,
"returning list with " << fNodes.size() <<
" entries");
1488 int XrdProofdNetMgr::ReadPROOFcfg(
bool reset)
1490 XPDLOC(NMGR,
"NetMgr::ReadPROOFcfg")
1492 TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
1495 XrdSysMutexHelper mhp(fMutex);
1498 if (fPROOFcfg.fName.length() <= 0)
1503 if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
1506 if (errno == ENOENT) fPROOFcfg.fMtime = -1;
1507 if (!fDfltFallback) {
1508 TRACE(XERR,
"unable to stat file: " << fPROOFcfg.fName <<
" - errno: " << errno);
1510 TRACE(ALL,
"file " << fPROOFcfg.fName <<
" cannot be parsed: use default configuration");
1514 TRACE(DBG,
"time of last modification: " << st.st_mtime);
1517 if (st.st_mtime <= fPROOFcfg.fMtime)
1521 fPROOFcfg.fMtime = st.st_mtime;
1525 if (!(fin = fopen(fPROOFcfg.fName.c_str(),
"r"))) {
1526 if (fWorkers.size() > 1) {
1527 TRACE(XERR,
"unable to fopen file: " << fPROOFcfg.fName <<
" - errno: " << errno);
1528 TRACE(XERR,
"continuing with existing list of workers.");
1541 if (fRegWorkers.size() < 1) {
1542 XrdOucString mm(
"master ", 128);
1544 fRegWorkers.push_back(
new XrdProofWorker(mm.c_str()));
1547 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1550 for (; w != fRegWorkers.end(); ++w) {
1558 while (fgets(lin,
sizeof(lin), fin)) {
1561 while (lin[p++] ==
' ') {
1565 if (lin[p] ==
'\0' || lin[p] ==
'\n')
1573 if (lin[strlen(lin)-1] ==
'\n')
1574 lin[strlen(lin)-1] =
'\0';
1576 TRACE(DBG,
"found line: " << lin);
1579 XrdProofWorker *pw =
new XrdProofWorker(lin);
1581 const char *pfx[2] = {
"master",
"node" };
1582 if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1583 !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1585 if (pw->fHost.beginswith(
"localhost") ||
1586 pw->Matches(fMgr->Host())) {
1588 XrdProofWorker *fw = fRegWorkers.front();
1595 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1599 while (w != fRegWorkers.end()) {
1600 if (!((*w)->fActive)) {
1601 if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
1613 fRegWorkers.push_back(pw);
1622 std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1623 while (w != fRegWorkers.end()) {
1624 if ((*w)->fActive) {
1625 fWorkers.push_back(*w);
1639 return ((nw == 0) ? -1 : 0);
1648 int XrdProofdNetMgr::FindUniqueNodes()
1650 XPDLOC(NMGR,
"NetMgr::FindUniqueNodes")
1652 TRACE(REQ, "
# workers: " << fWorkers.size());
1658 if (fWorkers.size() > 1) {
1659 std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
1661 for (; w != fWorkers.end(); ++w)
if ((*w)->fActive) {
1663 std::list<XrdProofWorker *>::iterator n;
1664 for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
1665 if ((*n)->Matches(*w)) {
1671 fNodes.push_back(*w);
1674 TRACE(REQ,
"found " << fNodes.size() <<
" unique nodes");
1677 return fNodes.size();