23 #include "XrdOuc/XrdOucStream.hh"
36 XrdProofGroupMgr *fGroupMgr;
37 std::list<XrdProofdSessionEntry *> *fSortedList;
39 } XpdCreateActiveList_t;
51 void *XrdProofdPriorityCron(
void *p)
53 XPDLOC(PMGR,
"PriorityCron")
55 XrdProofdPriorityMgr *mgr = (XrdProofdPriorityMgr *)p;
57 TRACE(REQ,
"undefined manager: cannot start");
63 int pollRet = mgr->Pipe()->Poll(-1);
67 if ((rc = mgr->Pipe()->Recv(msg)) != 0) {
68 XPDERR(
"problems receiving message; errno: "<<-rc);
72 if (msg.Type() == XrdProofdPriorityMgr::kChangeStatus) {
73 XrdOucString usr, grp;
74 int opt = 0, pid = -1;
76 rc = (rc == 0) ? msg.Get(usr) : rc;
77 rc = (rc == 0) ? msg.Get(grp) : rc;
78 rc = (rc == 0) ? msg.Get(pid) : rc;
80 XPDERR(
"kChangeStatus: problems parsing message : '"<<msg.Buf()<<
"'; errno: "<<-rc);
85 mgr->RemoveSession(pid);
88 mgr->AddSession(usr.c_str(), grp.c_str(), pid);
90 XPDERR(
"kChangeStatus: invalid opt: "<< opt);
92 }
else if (msg.Type() == XrdProofdPriorityMgr::kSetGroupPriority) {
96 rc = (rc == 0) ? msg.Get(prio) : rc;
98 XPDERR(
"kSetGroupPriority: problems parsing message; errno: "<<-rc);
102 mgr->SetGroupPriority(grp.c_str(), prio);
104 XPDERR(
"unknown message type: "<< msg.Type());
107 if (mgr->SetNiceValues() != 0) {
108 XPDERR(
"problem setting nice values ");
120 XrdProofdPriorityMgr::XrdProofdPriorityMgr(XrdProofdManager *mgr,
121 XrdProtocol_Config *pi, XrdSysError *e)
122 : XrdProofdConfig(pi->ConfigFN, e)
124 XPDLOC(PMGR,
"XrdProofdPriorityMgr")
127 fSchedOpt = kXPD_sched_off;
132 if (!fPipe.IsValid()) {
133 TRACE(XERR,
"unable to generate pipe for the priority poller");
138 RegisterDirectives();
144 static int DumpPriorityChanges(
const char *, XrdProofdPriority *p,
void *s)
146 XPDLOC(PMGR,
"DumpPriorityChanges")
148 XrdSysError *e = (XrdSysError *)s;
152 XPDFORM(msg,
"priority will be changed by %d for user(s): %s",
153 p->fDeltaPriority, p->fUser.c_str());
167 int XrdProofdPriorityMgr::Config(
bool rcf)
169 XPDLOC(PMGR,
"PriorityMgr::Config")
172 if (XrdProofdConfig::Config(rcf) != 0) {
173 XPDERR(
"problems parsing file ");
178 msg = (rcf) ?
"re-configuring" :
"configuring";
182 if (fPriorities.Num() > 0) {
183 fPriorities.Apply(DumpPriorityChanges, (
void *)fEDest);
185 TRACE(ALL,
"no priority changes requested");
189 if (fMgr->GroupsMgr() && fMgr->GroupsMgr()->Num() > 1 && fSchedOpt != kXPD_sched_off) {
190 XPDFORM(msg,
"worker sched based on '%s' priorities",
191 (fSchedOpt == kXPD_sched_central) ?
"central" :
"local");
198 if (XrdSysThread::Run(&tid, XrdProofdPriorityCron,
199 (
void *)
this, 0,
"PriorityMgr poller thread") != 0) {
200 XPDERR(
"could not start poller thread");
203 TRACE(ALL,
"poller thread started");
213 void XrdProofdPriorityMgr::RegisterDirectives()
215 Register(
"schedopt",
new XrdProofdDirective(
"schedopt",
this, &DoDirectiveClass));
216 Register(
"priority",
new XrdProofdDirective(
"priority",
this, &DoDirectiveClass));
222 int XrdProofdPriorityMgr::DoDirective(XrdProofdDirective *d,
223 char *val, XrdOucStream *cfg,
bool rcf)
225 XPDLOC(PMGR,
"PriorityMgr::DoDirective")
231 if (d->fName == "priority") {
232 return DoDirectivePriority(val, cfg, rcf);
233 }
else if (d->fName ==
"schedopt") {
234 return DoDirectiveSchedOpt(val, cfg, rcf);
236 TRACE(XERR,
"unknown directive: "<<d->fName);
243 void XrdProofdPriorityMgr::SetGroupPriority(
const char *grp,
int priority)
245 XrdProofGroup *g = fMgr->GroupsMgr()->GetGroup(grp);
247 g->SetPriority((
float)priority);
250 SetSchedOpt(kXPD_sched_central);
259 static int ResetEntryPriority(
const char *, XrdProofdSessionEntry *e,
void *)
274 static int CreateActiveList(
const char *, XrdProofdSessionEntry *e,
void *s)
276 XPDLOC(PMGR,
"CreateActiveList")
278 XpdCreateActiveList_t *cal = (XpdCreateActiveList_t *)s;
282 XrdProofGroupMgr *gm = cal->fGroupMgr;
283 std::list<XrdProofdSessionEntry *> *sorted = cal->fSortedList;
285 XrdProofGroup *g = gm->GetGroup(e->fGroup.c_str());
287 float ef = g->FracEff() / g->Active();
288 int nsrv = g->Active(e->fUser.c_str());
293 std::list<XrdProofdSessionEntry *>::iterator ssvi;
294 for (ssvi = sorted->begin() ; ssvi != sorted->end(); ++ssvi) {
295 if (ef >= (*ssvi)->fFracEff) {
296 sorted->insert(ssvi, e);
302 sorted->push_back(e);
307 emsg =
"no srv sessions for active client";
310 emsg =
"group not found: "; emsg += e->fGroup.c_str();
313 emsg =
"group manager undefined";
316 emsg =
"input structure or entry undefined";
320 if (cal) cal->error = 1;
321 TRACE(XERR, (e ? e->fUser :
"---") <<
": protocol error: "<<emsg);
334 int XrdProofdPriorityMgr::SetNiceValues(
int opt)
336 XPDLOC(PMGR,
"PriorityMgr::SetNiceValues")
338 TRACE(REQ, "------------------- Start ----------------------");
340 TRACE(REQ, "opt: "<<opt);
342 if (!fMgr->GroupsMgr() || fMgr->GroupsMgr()->Num() <= 1 || !IsSchedOn()) {
344 TRACE(REQ,
"------------------- End ------------------------");
349 int nact = fSessions.Num();
350 TRACE(DBG, fMgr->GroupsMgr()->Num()<<
" groups, " << nact<<
" active sessions");
354 fSessions.Apply(ResetEntryPriority, 0);
356 TRACE(REQ,
"------------------- End ------------------------");
360 XrdSysMutexHelper mtxh(&fMutex);
364 if ((rc = fMgr->GroupsMgr()->SetEffectiveFractions(IsSchedOn())) != 0) {
366 TRACE(XERR,
"failure from SetEffectiveFractions");
371 TRACE(DBG,
"creating a list of active sessions sorted by decreasing effective fraction ");
372 std::list<XrdProofdSessionEntry *> sorted;
373 XpdCreateActiveList_t cal = { fMgr->GroupsMgr(), &sorted, 0 };
375 fSessions.Apply(CreateActiveList, (
void *)&cal);
380 std::list<XrdProofdSessionEntry *>::iterator ssvi;
382 for (ssvi = sorted.begin() ; ssvi != sorted.end(); ++ssvi)
383 TRACE(HDBG, i++ <<
" eff: "<< (*ssvi)->fFracEff);
386 TRACE(DBG,
"calculating nice values");
389 ssvi = sorted.begin();
390 float xmax = (*ssvi)->fFracEff;
393 int nice = 20 - fPriorityMax;
394 (*ssvi)->SetPriority(nice);
397 while (ssvi != sorted.end()) {
398 int xpri = (int) ((*ssvi)->fFracEff / xmax * (fPriorityMax - fPriorityMin))
401 TRACE(DBG,
" --> nice value for client "<< (*ssvi)->fUser<<
" is "<<nice);
402 (*ssvi)->SetPriority(nice);
406 TRACE(XERR,
"negative or null max effective fraction: "<<xmax);
410 TRACE(XERR,
"failure from CreateActiveList");
413 TRACE(REQ,
"------------------- End ------------------------");
422 int XrdProofdPriorityMgr::DoDirectivePriority(
char *val, XrdOucStream *cfg,
bool)
429 int dp = strtol(val,0,10);
430 XrdProofdPriority *p =
new XrdProofdPriority(
"*", dp);
432 if ((val = cfg->GetWord()) && !strncmp(val,
"if",2)) {
433 if ((val = cfg->GetWord()) && val[0]) {
438 fPriorities.Rep(p->fUser.c_str(), p);
445 int XrdProofdPriorityMgr::DoDirectiveSchedOpt(
char *val, XrdOucStream *cfg,
bool)
447 XPDLOC(PMGR,
"PriorityMgr::DoDirectiveSchedOpt")
457 while (val && val[0]) {
458 XrdOucString o = val;
459 if (o.beginswith(
"min:")) {
461 o.replace(
"min:",
"");
463 }
else if (o.beginswith(
"max:")) {
465 o.replace(
"max:",
"");
469 opt = kXPD_sched_central;
470 else if (o ==
"local")
471 opt = kXPD_sched_local;
474 if (fMgr->Host() && cfg)
475 if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
478 val = cfg->GetWord();
484 fPriorityMin = (pmin >= 1 && pmin <= 40) ? pmin : fPriorityMin;
486 fPriorityMax = (pmax >= 1 && pmax <= 40) ? pmax : fPriorityMax;
491 if (fPriorityMin > fPriorityMax) {
492 TRACE(XERR,
"inconsistent value for fPriorityMin (> fPriorityMax) ["<<
493 fPriorityMin <<
", "<<fPriorityMax<<
"] - correcting");
494 fPriorityMin = fPriorityMax;
504 int XrdProofdPriorityMgr::RemoveSession(
int pid)
506 XrdOucString key; key += pid;
507 return fSessions.Del(key.c_str());
514 int XrdProofdPriorityMgr::AddSession(
const char *u,
const char *g,
int pid)
517 XrdOucString key; key += pid;
518 XrdProofdSessionEntry *oldent = fSessions.Find(key.c_str());
521 fSessions.Rep(key.c_str(),
new XrdProofdSessionEntry(u, g, pid));
523 fSessions.Add(key.c_str(),
new XrdProofdSessionEntry(u, g, pid));
534 int XrdProofdPriorityMgr::SetProcessPriority(
int pid,
const char *user,
int &dp)
536 XPDLOC(PMGR,
"PriorityMgr::SetProcessPriority")
539 if (fPriorities.Num() > 0) {
540 XrdProofdPriority *pu = fPriorities.Find(user);
542 dp = pu->fDeltaPriority;
545 int priority = XPPM_NOPRIORITY;
546 if ((priority = getpriority(PRIO_PROCESS, pid)) == -1 && errno != 0) {
547 TRACE(XERR,
"getpriority: errno: " << errno);
551 int newp = priority + dp;
553 XrdProofdAux::GetUserInfo(geteuid(), ui);
554 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
555 if (XpdBadPGuard(pGuard, ui.fUid)) {
556 TRACE(XERR,
"could not get privileges");
559 TRACE(REQ,
"got privileges ");
561 if (setpriority(PRIO_PROCESS, pid, newp) != 0) {
562 TRACE(XERR,
"setpriority: errno: " << errno);
563 return ((errno != 0) ? -errno : -1);
565 if ((getpriority(PRIO_PROCESS, pid)) != newp && errno != 0) {
566 TRACE(XERR,
"did not succeed: errno: " << errno);
582 XrdProofdSessionEntry::XrdProofdSessionEntry(
const char *u,
const char *g,
int pid)
583 : fUser(u), fGroup(g), fPid(pid), fFracEff(0.)
585 XPDLOC(PMGR,
"XrdProofdSessionEntry")
587 fPriority = XPPM_NOPRIORITY;
588 fDefaultPriority = XPPM_NOPRIORITY;
590 int prio = getpriority(PRIO_PROCESS, pid);
592 TRACE(XERR,
" getpriority: errno: " << errno);
595 fDefaultPriority = prio;
601 XrdProofdSessionEntry::~XrdProofdSessionEntry()
603 SetPriority(fDefaultPriority);
609 int XrdProofdSessionEntry::SetPriority(
int priority)
611 XPDLOC(PMGR,
"SessionEntry::SetPriority")
613 if (priority != XPPM_NOPRIORITY)
614 priority = fDefaultPriority;
616 if (priority != fPriority) {
619 XrdProofdAux::GetUserInfo(geteuid(), ui);
620 XrdSysPrivGuard pGuard((uid_t)0, (gid_t)0);
621 if (XpdBadPGuard(pGuard, ui.fUid)) {
622 TRACE(XERR,
"could not get privileges");
626 if (setpriority(PRIO_PROCESS, fPid, priority) != 0) {
627 TRACE(XERR,
"setpriority: errno: " << errno);
630 fPriority = priority;