50 #include "XrdOuc/XrdOucString.hh"
51 #include "XrdOuc/XrdOucStream.hh"
88 void *XrdProofSchedCron(
void *p)
90 XPDLOC(SCHED,
"SchedCron")
92 XrdProofSched *sched = (XrdProofSched *)p;
94 TRACE(XERR,
"undefined scheduler: cannot start");
99 int lastcheck = time(0), ckfreq = sched->CheckFrequency(), deltat = 0;
102 if ((deltat = ckfreq - (time(0) - lastcheck)) <= 0)
104 int pollRet = sched->Pipe()->Poll(deltat);
110 if ((rc = sched->Pipe()->Recv(msg)) != 0) {
111 XPDERR(
"problems receiving message; errno: "<<-rc);
116 if (msg.Type() == XrdProofSched::kReschedule) {
118 TRACE(ALL,
"received kReschedule");
125 TRACE(XERR,
"unknown type: "<<msg.Type());
130 TRACE(ALL,
"running regular checks");
145 static bool XpdWrkComp(XrdProofWorker *&lhs, XrdProofWorker *&rhs)
147 return ((lhs && rhs &&
148 lhs->GetNActiveSessions() < rhs->GetNActiveSessions()) ? 1 : 0);
154 int DoSchedDirective(XrdProofdDirective *d,
char *val, XrdOucStream *cfg,
bool rcf)
156 if (!d || !(d->fVal))
160 return ((XrdProofSched *)d->fVal)->ProcessDirective(d, val, cfg, rcf);
166 XrdProofSched::XrdProofSched(
const char *name,
167 XrdProofdManager *mgr, XrdProofGroupMgr *grpmgr,
168 const char *cfn, XrdSysError *e)
169 : XrdProofdConfig(cfn, e)
179 memset(fName, 0, kXPSMXNMLEN);
181 memcpy(fName, name, kXPSMXNMLEN-1);
184 RegisterDirectives();
190 void XrdProofSched::RegisterDirectives()
192 Register(
"schedparam",
new XrdProofdDirective(
"schedparam",
this, &DoDirectiveClass));
193 Register(
"resource",
new XrdProofdDirective(
"resource",
this, &DoDirectiveClass));
199 int XrdProofSched::DoDirective(XrdProofdDirective *d,
200 char *val, XrdOucStream *cfg,
bool rcf)
202 XPDLOC(SCHED,
"Sched::DoDirective")
208 if (d->fName == "schedparam") {
209 return DoDirectiveSchedParam(val, cfg, rcf);
210 }
else if (d->fName ==
"resource") {
211 return DoDirectiveResource(val, cfg, rcf);
213 TRACE(XERR,
"unknown directive: "<<d->fName);
221 void XrdProofSched::ResetParameters()
226 fWorkerSel = kSSORoundRobin;
229 fNodesFraction = 0.5;
230 fCheckFrequency = 30;
238 int XrdProofSched::Config(
bool rcf)
240 XPDLOC(SCHED,
"Sched::Config")
243 if (XrdProofdConfig::Config(rcf) != 0) {
244 XPDERR(
"problems parsing file ");
254 XPDFORM(msg,
"maxsess: %d, maxrun: %d, maxwrks: %d, selopt: %d, fifo:%d",
255 fMaxSessions, fMaxRunning, fWorkerMax, fWorkerSel, fUseFIFO);
261 if (XrdSysThread::Run(&tid, XrdProofSchedCron,
262 (
void *)
this, 0,
"Scheduler cron thread") != 0) {
263 XPDERR(
"could not start cron thread");
267 TRACE(ALL,
"cron thread started");
278 int XrdProofSched::Enqueue(XrdProofdProofServ *xps, XrdProofQuery *query)
282 if (xps->Enqueue(query) == 1) {
283 std::list<XrdProofdProofServ *>::iterator ii;
284 for (ii = fQueue.begin(); ii != fQueue.end(); ++ii) {
285 if ((*ii)->Status() == kXPD_running)
break;
287 if (ii != fQueue.end()) {
288 fQueue.insert(ii, xps);
290 fQueue.push_back(xps);
293 if (TRACING(DBG)) DumpQueues(
"Enqueue");
301 void XrdProofSched::DumpQueues(
const char *prefix)
303 XPDLOC(SCHED,
"DumpQueues")
305 TRACE(ALL," ++++++++++++++++++++ DumpQueues ++++++++++++++++++++++++++++++++ ");
306 if (prefix) TRACE(ALL, " +++ Called from: "<<prefix);
307 TRACE(ALL," +++
# of waiting sessions: "<<fQueue.size());
308 std::list<XrdProofdProofServ *>::iterator ii;
310 for (ii = fQueue.begin(); ii != fQueue.end(); ++ii) {
311 TRACE(ALL,
" +++ #"<<++i<<
" client:"<< (*ii)->Client()<<
" # of queries: "<< (*ii)->Queries()->size());
313 TRACE(ALL,
" ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ ");
322 XrdProofdProofServ *XrdProofSched::FirstSession()
328 XrdProofdProofServ *xps = fQueue.front();
329 while (xps && !(xps->IsValid())) {
331 xps = fQueue.front();
333 if (TRACING(DBG)) DumpQueues(
"FirstSession");
341 int XrdProofSched::GetNumWorkers(XrdProofdProofServ *xps)
343 XPDLOC(SCHED,
"Sched::GetNumWorkers")
347 std::list<XrdProofWorker *> *wrks = fMgr->NetMgr()->GetActiveWorkers();
348 std::list<XrdProofWorker *>::iterator iter;
349 for (iter = wrks->begin(); iter != wrks->end(); ++iter) {
350 TRACE(DBG, (*iter)->fImage<<
" : # act: "<<(*iter)->fProofServs.size());
351 if ((*iter)->fType !=
'M' && (*iter)->fType !=
'S'
352 && (
int) (*iter)->fProofServs.size() < fOptWrksPerUnit)
354 nFreeCPUs += fOptWrksPerUnit - (*iter)->fProofServs.size();
358 XrdProofGroup *grp = 0;
359 if (fGrpMgr && xps->Group())
360 grp = fGrpMgr->GetGroup(xps->Group());
362 std::list<XrdProofdProofServ *> *sessions = fMgr->SessionMgr()->ActiveSessions();
363 std::list<XrdProofdProofServ *>::iterator sesIter;
364 float summedPriority = 0;
365 for (sesIter = sessions->begin(); sesIter != sessions->end(); ++sesIter) {
366 if ((*sesIter)->Group()) {
367 XrdProofGroup *g = fGrpMgr->GetGroup((*sesIter)->Group());
369 summedPriority += g->Priority();
372 if (summedPriority > 0)
373 priority = (grp->Priority() * sessions->size()) / summedPriority;
376 int nWrks = (int)(nFreeCPUs * fNodesFraction * priority);
377 if (nWrks <= fMinForQuery) {
378 nWrks = fMinForQuery;
379 }
else if (nWrks >= (
int) wrks->size()) {
380 nWrks = wrks->size() - 1;
382 TRACE(DBG, nFreeCPUs<<
" : "<< nWrks);
397 int XrdProofSched::GetWorkers(XrdProofdProofServ *xps,
398 std::list<XrdProofWorker *> *wrks,
399 const char *querytag)
401 XPDLOC(SCHED,
"Sched::GetWorkers")
409 TRACE(REQ, "enter: query tag: "<< ((querytag) ? querytag : ""));
413 if (querytag && !strncmp(querytag, XPD_GW_Static, strlen(XPD_GW_Static) - 1)) {
418 if (querytag && xps && xps->Workers()->Num() > 0) {
419 if (TRACING(REQ)) xps->DumpQueries();
420 const char *cqtag = (xps->CurrentQuery()) ? xps->CurrentQuery()->GetTag() :
"undef";
421 TRACE(REQ,
"current query tag: "<< cqtag );
422 if (!strcmp(querytag, cqtag)) {
424 xps->RemoveQuery(cqtag);
425 TRACE(REQ,
"current assignment for session "<< xps->SrvPID() <<
" is valid");
439 if (fUseFIFO && xps->Workers()->Num() > 0) {
440 if (!xps->GetQuery(querytag))
441 Enqueue(xps,
new XrdProofQuery(querytag));
442 if (TRACING(DBG)) xps->DumpQueries();
444 TRACE(REQ,
"session has already assigned workers: enqueue");
450 std::list<XrdProofWorker *> *acws = 0;
452 if (!fMgr || !(acws = fMgr->NetMgr()->GetActiveWorkers()))
456 XrdProofWorker *mst = acws->front();
460 if (fWorkerSel == kSSOLoadBased) {
465 XrdProofWorker::Sort(acws, XpdWrkComp);
468 int nw = GetNumWorkers(xps);
472 wrks->push_back(mst);
474 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
479 wrks->push_back(*nxWrk);
487 if (!xps->GetQuery(querytag))
488 Enqueue(xps,
new XrdProofQuery(querytag));
489 if (TRACING(DBG)) xps->DumpQueries();
491 TRACE(REQ,
"no workers currently available: session enqueued");
495 wrks->push_back(mst);
504 std::list<XrdProofWorker *> *acwseff = 0;
505 int maxnum = (querytag && strcmp(querytag, XPD_GW_Static)) ? fMaxRunning : fMaxSessions;
509 acwseff =
new std::list<XrdProofWorker *>;
510 std::list<XrdProofWorker *>::iterator xWrk = acws->begin();
511 if ((*xWrk)->Active() < maxnum) {
512 acwseff->push_back(*xWrk);
514 for (; xWrk != acws->end(); ++xWrk) {
515 if ((*xWrk)->Active() < maxnum) {
516 acwseff->push_back(*xWrk);
520 }
else if (!fUseFIFO) {
521 TRACE(REQ,
"max number of sessions reached - ("<< maxnum <<
")");
524 if (!ok) {
delete acwseff; acwseff = 0; }
531 int nactsess = mst->GetNActiveSessions();
532 TRACE(REQ,
"act sess ... " << nactsess);
533 if (nactsess < maxnum) {
535 }
else if (!fUseFIFO) {
536 TRACE(REQ,
"max number of sessions reached - ("<< maxnum <<
")");
539 if (!ok) acws = acwseff;
544 if (!acws || (acws && acws->size() <= 1)) {
548 if (!xps->GetQuery(querytag))
549 Enqueue(xps,
new XrdProofQuery(querytag));
550 if (TRACING(REQ)) xps->DumpQueries();
552 TRACE(REQ,
"no workers currently available: session enqueued");
556 TRACE(XERR,
"no worker available: do nothing");
563 if (!isDynamic && (xps->Workers()->Num() > 0)) {
570 wrks->push_back(mst);
572 if (fWorkerMax > 0 && fWorkerMax < (
int) acws->size()) {
575 if (fWorkerSel == kSSORandom) {
577 static bool rndmInit = 0;
579 const char *randdev =
"/dev/urandom";
582 if ((fd = open(randdev, O_RDONLY)) != -1) {
583 if (read(fd, &seed,
sizeof(seed)) !=
sizeof(seed)) {
584 TRACE(XERR,
"problems reading seed; errno: "<< errno);
592 int nwt = acws->size();
593 std::vector<int> walloc(nwt, 0);
594 std::vector<XrdProofWorker *> vwrk(nwt);
599 std::list<XrdProofWorker *>::iterator iwk = acws->begin();
601 for ( ; iwk != acws->end(); ++iwk) {
603 int na = (*iwk)->Active();
605 walloc[i] = na + walloc[i-1];
607 namx = (na > namx) ? na : namx;
611 for (i = 1; i < nwt; i++) {
613 walloc[i] = namx*i - walloc[i] + i;
617 int natot = walloc[nwt - 1];
622 int maxAtt = 10000, natt = 0;
624 while ((iw < 1 || iw >= nwt) && natt < maxAtt) {
625 int jw = rand() % natot;
626 for (i = 0; i < nwt; i++) {
627 if (jw < walloc[i]) {
630 for (j = i; j < nwt; j++) {
643 wrks->push_back(vwrk[iw]);
646 TRACE(XERR,
"random generation failed");
653 if (fNextWrk >= (
int) acws->size())
656 std::list<XrdProofWorker *>::iterator nxWrk = acws->begin();
659 while (iw != fNextWrk) {
665 wrks->push_back(*nxWrk);
668 if (fNextWrk >= (
int) acws->size()) {
671 nxWrk = acws->begin();
677 std::list<XrdProofWorker *>::iterator iw = acws->begin();
679 while (iw != acws->end()) {
681 wrks->push_back(*iw);
687 if (wrks->size() <= 1) {
688 TRACE(XERR,
"no worker found: do nothing");
693 if (acwseff) {
delete acwseff; acwseff = 0; }
706 int XrdProofSched::Reschedule()
708 XPDLOC(SCHED,
"Sched::Reschedule")
710 if (fUseFIFO && TRACING(DBG)) DumpQueues("Reschedule");
712 if (!fQueue.empty()) {
715 XrdProofdProofServ *xps = FirstSession();
717 TRACE(XERR,
"got undefined session: protocol error!");
723 if (xps && xps->CurrentQuery()) {
724 qtag = xps->CurrentQuery()->GetTag();
725 if (qtag.beginswith(XPD_GW_Static)) {
726 qtag = XPD_GW_Static;
727 qtag.replace(
":",
"");
730 if (fMgr->GetWorkers(wrks, xps, qtag.c_str()) < 0 ) {
732 TRACE(XERR,
"failure from GetWorkers: protocol error!");
737 if (wrks.length() > 0 && wrks != XPD_GW_QueryEnqueued) {
745 if (xps->Queries()->size() > 1)
746 fQueue.push_back(xps);
747 if (TRACING(DBG)) DumpQueues(
"Reschedule 2");
760 int XrdProofSched::ExportInfo(XrdOucString &sbuf)
763 const char *osel[] = {
"all",
"round-robin",
"random",
"load-based"};
764 sbuf +=
"Selection: ";
765 sbuf += osel[fWorkerSel+1];
766 if (fWorkerSel > -1) {
767 sbuf +=
", max workers: ";
768 sbuf += fWorkerMax; sbuf +=
" &";
772 std::list<XrdProofWorker *> *acws = fMgr->NetMgr()->GetActiveWorkers();
773 std::list<XrdProofWorker *>::iterator iw;
774 for (iw = acws->begin(); iw != acws->end(); ++iw) {
775 sbuf += (*iw)->fType;
776 sbuf +=
": "; sbuf += (*iw)->fHost;
777 if ((*iw)->fPort > -1) {
778 sbuf +=
":"; sbuf += (*iw)->fPort;
781 sbuf +=
" sessions: "; sbuf += (*iw)->Active();
792 int XrdProofSched::ProcessDirective(XrdProofdDirective *d,
793 char *val, XrdOucStream *cfg,
bool rcf)
795 XPDLOC(SCHED,
"Sched::ProcessDirective")
801 if (d->fName == "schedparam") {
802 return DoDirectiveSchedParam(val, cfg, rcf);
803 }
else if (d->fName ==
"resource") {
804 return DoDirectiveResource(val, cfg, rcf);
806 TRACE(XERR,
"unknown directive: "<<d->fName);
813 int XrdProofSched::DoDirectiveSchedParam(
char *val, XrdOucStream *cfg,
bool)
815 XPDLOC(SCHED,
"Sched::DoDirectiveSchedParam")
822 while (val && val[0]) {
824 if (s.beginswith(
"wmx:")) {
825 s.replace(
"wmx:",
"");
826 fWorkerMax = strtol(s.c_str(), (
char **)0, 10);
827 }
else if (s.beginswith(
"mxsess:")) {
828 s.replace(
"mxsess:",
"");
829 fMaxSessions = strtol(s.c_str(), (
char **)0, 10);
830 }
else if (s.beginswith(
"mxrun:")) {
831 s.replace(
"mxrun:",
"");
832 fMaxRunning = strtol(s.c_str(), (
char **)0, 10);
833 }
else if (s.beginswith(
"selopt:")) {
834 if (s.endswith(
"random"))
835 fWorkerSel = kSSORandom;
836 else if (s.endswith(
"load"))
837 fWorkerSel = kSSOLoadBased;
839 fWorkerSel = kSSORoundRobin;
840 }
else if (s.beginswith(
"fraction:")) {
841 s.replace(
"fraction:",
"");
842 fNodesFraction = strtod(s.c_str(), (
char **)0);
843 }
else if (s.beginswith(
"optnwrks:")) {
844 s.replace(
"optnwrks:",
"");
845 fOptWrksPerUnit = strtol(s.c_str(), (
char **)0, 10);
846 }
else if (s.beginswith(
"minforquery:")) {
847 s.replace(
"minforquery:",
"");
848 fMinForQuery = strtol(s.c_str(), (
char **)0, 10);
849 }
else if (s.beginswith(
"queue:")) {
850 if (s.endswith(
"fifo")) {
853 }
else if (strncmp(val,
"default", 7)) {
858 val = cfg->GetWord();
863 if (fMaxSessions > 0) {
866 if (fMaxRunning < 0 || fMaxRunning > fMaxSessions)
867 fMaxRunning = fMaxSessions;
871 if (fWorkerSel == kSSOLoadBased && fMaxRunning > 0) {
872 TRACE(ALL,
"WARNING: in Load-Based mode the max number of sessions"
873 " to be run is determined dynamically");
882 int XrdProofSched::DoDirectiveResource(
char *val, XrdOucStream *cfg,
bool)
889 if (strncmp(val,
"static", 6) && strncmp(val,
"default", 7))
892 while ((val = cfg->GetWord()) && val[0]) {
894 if (s.beginswith(
"wmx:")) {
895 s.replace(
"wmx:",
"");
896 fWorkerMax = strtol(s.c_str(), (
char **)0, 10);
897 }
else if (s.beginswith(
"mxsess:")) {
898 s.replace(
"mxsess:",
"");
899 fMaxSessions = strtol(s.c_str(), (
char **)0, 10);
900 }
else if (s.beginswith(
"selopt:")) {
901 if (s.endswith(
"random"))
902 fWorkerSel = kSSORandom;
904 fWorkerSel = kSSORoundRobin;