Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
XrdProofdNetMgr.cxx
Go to the documentation of this file.
1 // @(#)root/proofd:$Id$
2 // Author: G. Ganis Jan 2008
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 #include "XrdProofdPlatform.h"
12 
13 //////////////////////////////////////////////////////////////////////////
14 // //
15 // XrdProofdNetMgr //
16 // //
17 // Authors: G. Ganis, CERN, 2008 //
18 // //
19 // Manages connections between PROOF server daemons //
20 // //
21 //////////////////////////////////////////////////////////////////////////
22 
23 #include "XrdProofdNetMgr.h"
24 
25 #include "XrdProofdXrdVers.h"
26 #ifndef ROOT_XrdFour
27 # include "XpdSysDNS.h"
28 #else
29 # include "XrdNet/XrdNetAddr.hh"
30 #endif
31 #include "Xrd/XrdBuffer.hh"
36 #include "XrdOuc/XrdOucStream.hh"
37 #include "XrdSys/XrdSysPlatform.hh"
38 
39 #include "XrdProofdClient.h"
40 #include "XrdProofdManager.h"
41 #include "XrdProofdProtocol.h"
42 #include "XrdProofdResponse.h"
43 #include "XrdProofWorker.h"
44 
45 // Tracing utilities
46 #include "XrdProofdTrace.h"
47 
48 #include <algorithm>
49 #include <limits>
50 #include <math.h>
51 
52 ////////////////////////////////////////////////////////////////////////////////
53 /// Send up a message from the server
54 
55 int MessageSender(const char *msg, int len, void *arg)
56 {
57  XrdProofdResponse *r = (XrdProofdResponse *) arg;
58  if (r) {
59  return r->Send(kXR_attn, kXPD_srvmsg, 2, (char *) msg, len);
60  }
61  return -1;
62 }
63 
64 ////////////////////////////////////////////////////////////////////////////////
65 /// Constructor
66 
67 XrdProofdNetMgr::XrdProofdNetMgr(XrdProofdManager *mgr,
68  XrdProtocol_Config *pi, XrdSysError *e)
69  : XrdProofdConfig(pi->ConfigFN, e)
70 {
71  fMgr = mgr;
72  fResourceType = kRTNone;
73  fPROOFcfg.fName = "";
74  fPROOFcfg.fMtime = -1;
75  fReloadPROOFcfg = 1;
76  fDfltFallback = 0;
77  fDfltWorkers.clear();
78  fRegWorkers.clear();
79  fWorkers.clear();
80  fNodes.clear();
81  fNumLocalWrks = XrdProofdAux::GetNumCPUs();
82  fWorkerUsrCfg = 0;
83  fRequestTO = 30;
84 
85  // Configuration directives
86  RegisterDirectives();
87 }
88 
89 ////////////////////////////////////////////////////////////////////////////////
90 /// Register config directives
91 
92 void XrdProofdNetMgr::RegisterDirectives()
93 {
94  Register("adminreqto", new XrdProofdDirective("adminreqto", this, &DoDirectiveClass));
95  Register("resource", new XrdProofdDirective("resource", this, &DoDirectiveClass));
96  Register("worker", new XrdProofdDirective("worker", this, &DoDirectiveClass));
97  Register("localwrks", new XrdProofdDirective("localwrks", (void *)&fNumLocalWrks, &DoDirectiveInt));
98 }
99 
100 ////////////////////////////////////////////////////////////////////////////////
101 /// Destructor
102 
103 XrdProofdNetMgr::~XrdProofdNetMgr()
104 {
105  // Cleanup the worker lists
106  // (the nodes list points to the same object, no cleanup is needed)
107  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
108  while (w != fRegWorkers.end()) {
109  delete *w;
110  w = fRegWorkers.erase(w);
111  }
112  w = fDfltWorkers.begin();
113  while (w != fDfltWorkers.end()) {
114  delete *w;
115  w = fDfltWorkers.erase(w);
116  }
117  fWorkers.clear();
118 }
119 
120 ////////////////////////////////////////////////////////////////////////////////
121 /// Run configuration and parse the entered config directives.
122 /// Return 0 on success, -1 on error
123 
124 int XrdProofdNetMgr::Config(bool rcf)
125 {
126  XPDLOC(NMGR, "NetMgr::Config")
127 
128  // Lock the method to protect the lists.
129  XrdSysMutexHelper mhp(fMutex);
130 
131  // Cleanup the worker list
132  std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
133  while (w != fWorkers.end()) {
134  delete *w;
135  w = fWorkers.erase(w);
136  }
137  // Create a default master line
138  XrdOucString mm("master ", 128);
139  mm += fMgr->Host();
140  mm += " port=";
141  mm += fMgr->Port();
142  fWorkers.push_back(new XrdProofWorker(mm.c_str()));
143 
144  // Run first the configurator
145  if (XrdProofdConfig::Config(rcf) != 0) {
146  XPDERR("problems parsing file ");
147  return -1;
148  }
149 
150  XrdOucString msg;
151  msg = (rcf) ? "re-configuring" : "configuring";
152  TRACE(ALL, msg);
153 
154  if (fMgr->SrvType() != kXPD_Worker || fMgr->SrvType() == kXPD_AnyServer) {
155  TRACE(ALL, "PROOF config file: " <<
156  ((fPROOFcfg.fName.length() > 0) ? fPROOFcfg.fName.c_str() : "none"));
157  if (fResourceType == kRTStatic) {
158  // Initialize the list of workers if a static config has been required
159  // Default file path, if none specified
160  bool dodefault = 1;
161  if (fPROOFcfg.fName.length() > 0) {
162  // Load file content in memory
163  if (ReadPROOFcfg() == 0) {
164  TRACE(ALL, "PROOF config file will " <<
165  ((fReloadPROOFcfg) ? "" : "not ") << "be reloaded upon change");
166  dodefault = 0;
167  } else {
168  if (!fDfltFallback) {
169  XPDERR("unable to find valid information in PROOF config file " <<
170  fPROOFcfg.fName);
171  fPROOFcfg.fMtime = -1;
172  return 0;
173  } else {
174  TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration to start with");
175  }
176  }
177  }
178  if (dodefault) {
179  // Use default
180  CreateDefaultPROOFcfg();
181  }
182  } else if (fResourceType == kRTNone && fWorkers.size() <= 1) {
183  // Nothing defined: use default
184  CreateDefaultPROOFcfg();
185  }
186 
187  // Find unique nodes
188  FindUniqueNodes();
189  }
190 
191  // For connection to the other xproofds we try only once
192  XrdProofConn::SetRetryParam(1, 1);
193  // Request Timeout
194  EnvPutInt(NAME_REQUESTTIMEOUT, fRequestTO);
195 
196  // Notification
197  XPDFORM(msg, "%d worker nodes defined at start-up", fWorkers.size() - 1);
198  TRACE(ALL, msg);
199 
200  // Done
201  return 0;
202 }
203 
204 ////////////////////////////////////////////////////////////////////////////////
205 /// Update the priorities of the active sessions.
206 
207 int XrdProofdNetMgr::DoDirective(XrdProofdDirective *d,
208  char *val, XrdOucStream *cfg, bool rcf)
209 {
210  XPDLOC(NMGR, "NetMgr::DoDirective")
211 
212  if (!d)
213  // undefined inputs
214  return -1;
215 
216  if (d->fName == "resource") {
217  return DoDirectiveResource(val, cfg, rcf);
218  } else if (d->fName == "adminreqto") {
219  return DoDirectiveAdminReqTO(val, cfg, rcf);
220  } else if (d->fName == "worker") {
221  return DoDirectiveWorker(val, cfg, rcf);
222  }
223 
224  TRACE(XERR, "unknown directive: " << d->fName);
225 
226  return -1;
227 }
228 
229 ////////////////////////////////////////////////////////////////////////////////
230 /// Indices (this will be used twice).
231 
232 void XrdProofdNetMgr::BalanceNodesOrder()
233 {
234  list<XrdProofWorker *>::const_iterator iter, iter2;
235  list<XrdProofWorker *>::iterator iter3; // Not const, less efficient.
236  // Map to store information of the balancer.
237  map<XrdProofWorker *, BalancerInfo> info;
238  // Node with minimum number of workers distinct to 1.
239  unsigned int min = UINT_MAX;
240  // Total number of nodes and per iteration assignments.
241  unsigned int total = 0, total_perit = 0;
242  // Number of iterations to get every node filled.
243  unsigned int total_added = 0;
244  // Temporary list to store the balanced configuration
245  list<XrdProofWorker *> tempNodes;
246  // Flag for the search and destroy loop.
247  bool deleted;
248 
249  // Fill the information store with the first data (number of nodes).
250  for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
251  // The next code is not the same as this:
252  //info[*iter].available = count(fWorkers.begin(), fWorkers.end(), *iter);
253  // The previous piece of STL code only checks the pointer of the value
254  // stored on the list, altough it is more efficient, it needs that repeated
255  // nodes point to the same object. To allow hybrid configurations, we are
256  // doing a 'manually' matching since statically configured nodes are
257  // created in multiple ways.
258  info[*iter].available = 0;
259  for (iter2 = fWorkers.begin(); iter2 != fWorkers.end(); ++iter2) {
260  if ((*iter)->Matches(*iter2)) {
261  info[*iter].available++;
262  }
263  }
264  info[*iter].added = 0;
265  // Calculate the minimum greater than 1.
266  if (info[*iter].available > 1 && info[*iter].available < min)
267  min = info[*iter].available;
268  // Calculate the totals.
269  total += info[*iter].available;
270  }
271 
272  // Now, calculate the number of workers to add in each iteration of the
273  // round robin, scaling to the smaller number.
274  for (iter = fNodes.begin(); iter != fNodes.end(); ++iter) {
275  if (info[*iter].available > 1) {
276  info[*iter].per_iteration = (unsigned int)floor((double)info[*iter].available / (double)min);
277  } else {
278  info[*iter].per_iteration = 1;
279  }
280  // Calculate the totals.
281  total_perit += info[*iter].per_iteration;
282  }
283 
284  // Since we are going to substitute the list, don't forget to recover the
285  // default node at the fist time.
286  tempNodes.push_back(fWorkers.front());
287 
288  // Finally, do the round robin assignment of nodes.
289  // Stop when every node has its workers processed.
290  while (total_added < total) {
291  for (map<XrdProofWorker *, BalancerInfo>::iterator i = info.begin(); i != info.end(); ++i) {
292  if (i->second.added < i->second.available) {
293  // Be careful with the remainders (on prime number of nodes).
294  unsigned int to_add = xrdmin(i->second.per_iteration,
295  (i->second.available - i->second.added));
296  // Then add the nodes.
297  for (unsigned int j = 0; j < to_add; j++) {
298  tempNodes.push_back(i->first);
299  }
300  i->second.added += to_add;
301  total_added += to_add;
302  }
303  }
304  }
305 
306  // Since we are mergin nodes in only one object, we must merge the current
307  // sessions of the static nodes (that can be distinct objects that represent
308  // the same node) and delete the orphaned objects. If, in the future, we can
309  // assure that every worker has only one object in the list, this is not more
310  // necessary. The things needed to change are the DoDirectiveWorker, it must
311  // search for a node before inserting it, and in the repeat directive insert
312  // the same node always. Also the default configuration methods (there are
313  // two in this class) must be updated.
314  iter3 = ++(fWorkers.begin());
315  while (iter3 != fWorkers.end()) {
316  deleted = false;
317  // If the worker is not in the fWorkers list, we must process it. Note that
318  // std::count() uses a plain comparison between values, in this case, we
319  // are comparing pointers (numbers, at the end).
320  if (count(++(tempNodes.begin()), tempNodes.end(), *iter3) == 0) {
321  // Search for an object that matches with this in the temp list.
322  for (iter2 = ++(tempNodes.begin()); iter2 != tempNodes.end(); ++iter2) {
323  if ((*iter2)->Matches(*iter3)) {
324  // Copy data and delete the *iter object.
325  (*iter2)->MergeProofServs(*(*iter3));
326  deleted = true;
327  delete *iter3;
328  fWorkers.erase(iter3++);
329  break;
330  }
331  }
332  }
333  // Do not forget to increase the iterator.
334  if (!deleted)
335  ++iter3;
336  }
337 
338  // Then, substitute the current fWorkers list with the balanced one.
339  fWorkers = tempNodes;
340 }
341 
342 ////////////////////////////////////////////////////////////////////////////////
343 /// Process 'adminreqto' directive
344 
345 int XrdProofdNetMgr::DoDirectiveAdminReqTO(char *val, XrdOucStream *cfg, bool)
346 {
347  if (!val)
348  // undefined inputs
349  return -1;
350 
351  // Check deprecated 'if' directive
352  if (fMgr->Host() && cfg)
353  if (XrdProofdAux::CheckIf(cfg, fMgr->Host()) == 0)
354  return 0;
355 
356  // Timeout on requested broadcasted to workers; there are 4 attempts,
357  // so the real timeout is 4 x fRequestTO
358  int to = strtol(val, 0, 10);
359  fRequestTO = (to > 0) ? to : fRequestTO;
360  return 0;
361 }
362 
363 ////////////////////////////////////////////////////////////////////////////////
364 /// Process 'resource' directive
365 
366 int XrdProofdNetMgr::DoDirectiveResource(char *val, XrdOucStream *cfg, bool)
367 {
368  XPDLOC(NMGR, "NetMgr::DoDirectiveResource")
369 
370  if (!val || !cfg)
371  // undefined inputs
372  return -1;
373 
374  if (!strcmp("static", val)) {
375  // We just take the path of the config file here; the
376  // rest is used by the static scheduler
377  fResourceType = kRTStatic;
378  while ((val = cfg->GetWord()) && val[0]) {
379  XrdOucString s(val);
380  if (s.beginswith("ucfg:")) {
381  fWorkerUsrCfg = s.endswith("yes") ? 1 : 0;
382  } else if (s.beginswith("reload:")) {
383  fReloadPROOFcfg = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
384  } else if (s.beginswith("dfltfallback:")) {
385  fDfltFallback = (s.endswith("1") || s.endswith("yes")) ? 1 : 0;
386  } else if (s.beginswith("wmx:")) {
387  } else if (s.beginswith("selopt:")) {
388  } else {
389  // Config file
390  fPROOFcfg.fName = val;
391  if (fPROOFcfg.fName.beginswith("sm:")) {
392  fPROOFcfg.fName.replace("sm:", "");
393  }
394  XrdProofdAux::Expand(fPROOFcfg.fName);
395  // Make sure it exists and can be read
396  if (access(fPROOFcfg.fName.c_str(), R_OK)) {
397  if (errno == ENOENT) {
398  TRACE(ALL, "WARNING: configuration file does not exists: " << fPROOFcfg.fName);
399  } else {
400  TRACE(XERR, "configuration file cannot be read: " << fPROOFcfg.fName);
401  fPROOFcfg.fName = "";
402  fPROOFcfg.fMtime = -1;
403  }
404  }
405  }
406  }
407  }
408  return 0;
409 }
410 
411 ////////////////////////////////////////////////////////////////////////////////
412 /// Process 'worker' directive
413 
414 int XrdProofdNetMgr::DoDirectiveWorker(char *val, XrdOucStream *cfg, bool)
415 {
416  XPDLOC(NMGR, "NetMgr::DoDirectiveWorker")
417 
418  if (!val || !cfg)
419  // undefined inputs
420  return -1;
421 
422  // Lock the method to protect the lists.
423  XrdSysMutexHelper mhp(fMutex);
424 
425  // Get the full line (w/o heading keyword)
426  cfg->RetToken();
427  XrdOucString wrd(cfg->GetWord());
428  if (wrd.length() > 0) {
429  // Build the line
430  XrdOucString line;
431  char rest[2048] = {0};
432  cfg->GetRest((char *)&rest[0], 2048);
433  XPDFORM(line, "%s %s", wrd.c_str(), rest);
434  // Parse it now
435  if (wrd == "master" || wrd == "node") {
436  // Init a master instance
437  XrdProofWorker *pw = new XrdProofWorker(line.c_str());
438  if (pw->fHost.beginswith("localhost") ||
439  pw->Matches(fMgr->Host())) {
440  // Replace the default line (the first with what found in the file)
441  XrdProofWorker *fw = fWorkers.front();
442  fw->Reset(line.c_str());
443  }
444  SafeDelete(pw);
445  } else {
446  // How many lines like this?
447  int nr = 1;
448  int ir = line.find("repeat=");
449  if (ir != STR_NPOS) {
450  XrdOucString r(line, ir + strlen("repeat="));
451  r.erase(r.find(' '));
452  nr = r.atoi();
453  if (nr < 0 || !XPD_LONGOK(nr)) nr = 1;
454  TRACE(DBG, "found repeat = " << nr);
455  }
456  while (nr--) {
457  // Build the worker object
458  XrdProofdMultiStr mline(line.c_str());
459  if (mline.IsValid()) {
460  TRACE(DBG, "found multi-line with: " << mline.N() << " tokens");
461  for (int i = 0; i < mline.N(); i++) {
462  TRACE(HDBG, "found token: " << mline.Get(i));
463  fWorkers.push_back(new XrdProofWorker(mline.Get(i).c_str()));
464  }
465  } else {
466  TRACE(DBG, "found line: " << line);
467  fWorkers.push_back(new XrdProofWorker(line.c_str()));
468  }
469  }
470  }
471  }
472 
473  // Necessary for the balancer when Bonjour is enabled. Note that this balancer
474  // can also be enabled with a static configuration. By this time is disabled
475  // due to its experimental status.
476  FindUniqueNodes();
477  //BalanceNodesOrder();
478 
479  return 0;
480 }
481 
482 ////////////////////////////////////////////////////////////////////////////////
483 /// Broadcast a ctrlc interrupt
484 /// Return 0 on success, -1 on error
485 
486 int XrdProofdNetMgr::BroadcastCtrlC(const char *usr)
487 {
488  XPDLOC(NMGR, "NetMgr::BroadcastCtrlC")
489 
490  int rc = 0;
491 
492  // Loop over unique nodes
493  std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
494  XrdProofWorker *w = 0;
495  while (iw != fNodes.end()) {
496  if ((w = *iw) && w->fType != 'M') {
497  // Do not send it to ourselves
498  bool us = (((w->fHost.find("localhost") != STR_NPOS ||
499  XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
500  (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
501  if (!us) {
502  // Create 'url'
503  // We use the enforced username if specified in the config file; this is the case
504  // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
505  XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
506  if (u.length() <= 0) u = fMgr->EffectiveUser();
507  u += '@';
508  u += w->fHost;
509  if (w->fPort != -1) {
510  u += ':';
511  u += w->fPort;
512  }
513  TRACE(HDBG, "sending request to: "<<u);
514  // Get a connection to the server
515  XrdProofConn *conn = GetProofConn(u.c_str());
516  if (conn && conn->IsValid()) {
517  // Prepare request
518  XPClientRequest reqhdr;
519  memset(&reqhdr, 0, sizeof(reqhdr));
520  conn->SetSID(reqhdr.header.streamid);
521  reqhdr.proof.requestid = kXP_ctrlc;
522  reqhdr.proof.sid = 0;
523  reqhdr.proof.dlen = 0;
524  // We need the right order
525  if (XPD::clientMarshall(&reqhdr) != 0) {
526  TRACE(XERR, "problems marshalling request");
527  return -1;
528  }
529  if (conn->LowWrite(&reqhdr, 0, 0) != kOK) {
530  TRACE(XERR, "problems sending ctrl-c request to server " << u);
531  }
532  // Clean it up, to avoid leaving open tcp connection possibly going forever
533  // into CLOSE_WAIT
534  SafeDelete(conn);
535  }
536  } else {
537  TRACE(DBG, "broadcast request for ourselves: ignore");
538  }
539  }
540  // Next worker
541  ++iw;
542  }
543 
544  // Done
545  return rc;
546 }
547 
548 ////////////////////////////////////////////////////////////////////////////////
549 /// Broadcast request to known potential sub-nodes.
550 /// Return 0 on success, -1 on error
551 
552 int XrdProofdNetMgr::Broadcast(int type, const char *msg, const char *usr,
553  XrdProofdResponse *r, bool notify, int subtype)
554 {
555  XPDLOC(NMGR, "NetMgr::Broadcast")
556 
557  unsigned int nok = 0;
558  TRACE(REQ, "type: " << type);
559 
560  // Loop over unique nodes
561  std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
562  XrdProofWorker *w = 0;
563  XrdClientMessage *xrsp = 0;
564  while (iw != fNodes.end()) {
565  if ((w = *iw) && w->fType != 'M') {
566  // Do not send it to ourselves
567  bool us = (((w->fHost.find("localhost") != STR_NPOS ||
568  XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
569  (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
570  if (!us) {
571  // Create 'url'
572  // We use the enforced username if specified in the config file; this is the case
573  // of user-dedicated daemons with mapped usernames, like PoD@gLite ...
574  XrdOucString u = (w->fUser.length() > 0) ? w->fUser : usr;
575  if (u.length() <= 0) u = fMgr->EffectiveUser();
576  u += '@';
577  u += w->fHost;
578  if (w->fPort != -1) {
579  u += ':';
580  u += w->fPort;
581  }
582  // Type of server
583  int srvtype = (w->fType != 'W') ? (kXR_int32) kXPD_Master
584  : (kXR_int32) kXPD_Worker;
585  TRACE(HDBG, "sending request to " << u);
586  // Send request
587  if (!(xrsp = Send(u.c_str(), type, msg, srvtype, r, notify, subtype))) {
588  TRACE(XERR, "problems sending request to " << u);
589  } else {
590  nok++;
591  }
592  // Cleanup answer
593  SafeDelete(xrsp);
594  } else {
595  TRACE(DBG, "broadcast request for ourselves: ignore");
596  }
597  }
598  // Next worker
599  ++iw;
600  }
601 
602  // Done
603  return (nok == fNodes.size()) ? 0 : -1;
604 }
605 
606 ////////////////////////////////////////////////////////////////////////////////
607 /// Get a XrdProofConn for url; create a new one if not available
608 
609 XrdProofConn *XrdProofdNetMgr::GetProofConn(const char *url)
610 {
611  XrdProofConn *p = 0;
612 
613  // If not found create a new one
614  XrdOucString buf = " Manager connection from ";
615  buf += fMgr->Host();
616  buf += "|ord:000";
617  char m = 'A'; // log as admin
618 
619  {
620  XrdSysMutexHelper mhp(fMutex);
621  p = new XrdProofConn(url, m, -1, -1, 0, buf.c_str());
622  }
623  if (p && !(p->IsValid())) SafeDelete(p);
624 
625  // Done
626  return p;
627 }
628 
629 ////////////////////////////////////////////////////////////////////////////////
630 /// Broadcast request to known potential sub-nodes.
631 /// Return 0 on success, -1 on error
632 
633 XrdClientMessage *XrdProofdNetMgr::Send(const char *url, int type,
634  const char *msg, int srvtype,
635  XrdProofdResponse *r, bool notify,
636  int subtype)
637 {
638  XPDLOC(NMGR, "NetMgr::Send")
639 
640  XrdClientMessage *xrsp = 0;
641  TRACE(REQ, "type: " << type);
642 
643  if (!url || strlen(url) <= 0)
644  return xrsp;
645 
646  // Get a connection to the server
647  XrdProofConn *conn = GetProofConn(url);
648 
649  bool ok = 1;
650  if (conn && conn->IsValid()) {
651  XrdOucString notifymsg("Send: ");
652  // Prepare request
653  XPClientRequest reqhdr;
654  const void *buf = 0;
655  char **vout = 0;
656  memset(&reqhdr, 0, sizeof(reqhdr));
657  conn->SetSID(reqhdr.header.streamid);
658  reqhdr.header.requestid = kXP_admin;
659  reqhdr.proof.int1 = type;
660  switch (type) {
661  case kROOTVersion:
662  notifymsg += "change-of-ROOT version request to ";
663  notifymsg += url;
664  notifymsg += " msg: ";
665  notifymsg += msg;
666  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
667  buf = (msg) ? (const void *)msg : buf;
668  break;
669  case kCleanupSessions:
670  notifymsg += "cleanup request to ";
671  notifymsg += url;
672  notifymsg += " for user: ";
673  notifymsg += msg;
674  reqhdr.proof.int2 = (kXR_int32) srvtype;
675  reqhdr.proof.sid = -1;
676  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
677  buf = (msg) ? (const void *)msg : buf;
678  break;
679  case kExec:
680  notifymsg += "exec ";
681  notifymsg += subtype;
682  notifymsg += "request for ";
683  notifymsg += msg;
684  reqhdr.proof.int2 = (kXR_int32) subtype;
685  reqhdr.proof.sid = -1;
686  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
687  buf = (msg) ? (const void *)msg : buf;
688  break;
689  default:
690  ok = 0;
691  TRACE(XERR, "invalid request type " << type);
692  break;
693  }
694 
695  // Notify the client that we are sending the request
696  if (r && notify)
697  r->Send(kXR_attn, kXPD_srvmsg, 0, (char *) notifymsg.c_str(), notifymsg.length());
698 
699  // Activate processing of unsolicited responses
700  conn->SetAsync(conn, &MessageSender, (void *)r);
701 
702  // Send over
703  if (ok)
704  xrsp = conn->SendReq(&reqhdr, buf, vout, "NetMgr::Send");
705 
706  // Deactivate processing of unsolicited responses
707  conn->SetAsync(0, 0, (void *)0);
708 
709  // Print error msg, if any
710  if (r && !xrsp && conn->GetLastErr()) {
711  XrdOucString cmsg = url;
712  cmsg += ": ";
713  cmsg += conn->GetLastErr();
714  r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
715  }
716  // Clean it up, to avoid leaving open tcp connection possibly going forever
717  // into CLOSE_WAIT
718  SafeDelete(conn);
719 
720  } else {
721  TRACE(XERR, "could not open connection to " << url);
722  if (r) {
723  XrdOucString cmsg = "failure attempting connection to ";
724  cmsg += url;
725  r->Send(kXR_attn, kXPD_srvmsg, (char *) cmsg.c_str(), cmsg.length());
726  }
727  }
728 
729  // Done
730  return xrsp;
731 }
732 
733 ////////////////////////////////////////////////////////////////////////////////
734 /// Check if 'host' is this local host. If checkport is true,
735 /// matching of the local port with the one implied by host is also checked.
736 /// Return 1 if 'local', 0 otherwise
737 
738 bool XrdProofdNetMgr::IsLocal(const char *host, bool checkport)
739 {
740  XPDLOC(NMGR, "NetMgr::IsLocal")
741 
742  int rc = 0;
743  if (host && strlen(host) > 0) {
744  XrdClientUrlInfo uu(host);
745  if (uu.Port <= 0) uu.Port = 1093;
746  // Fully qualified name
747 #ifndef ROOT_XrdFour
748  char *fqn = XrdSysDNS::getHostName(uu.Host.c_str());
749 #else
750  XrdNetAddr aNA;
751  aNA.Set(uu.Host.c_str());
752  char *fqn = (char *) aNA.Name();
753 #endif
754  TRACE(HDBG, "fqn: '"<<fqn<<"' mgrh: '"<<fMgr->Host()<<"'");
755  if (fqn && (strstr(fqn, "localhost") || !strcmp(fqn, "127.0.0.1") ||
756  !strcmp(fMgr->Host(), fqn))) {
757  if (!checkport || (uu.Port == fMgr->Port()))
758  rc = 1;
759  }
760 #ifndef ROOT_XrdFour
761  SafeFree(fqn);
762 #endif
763  }
764  // Done
765  return rc;
766 }
767 
768 ////////////////////////////////////////////////////////////////////////////////
769 /// Process a readbuf request
770 
771 int XrdProofdNetMgr::ReadBuffer(XrdProofdProtocol *p)
772 {
773  XPDLOC(NMGR, "NetMgr::ReadBuffer")
774 
775  int rc = 0;
776  XPD_SETRESP(p, "ReadBuffer");
777 
778  XrdOucString emsg;
779 
780  // Unmarshall the data
781  //
782  kXR_int64 ofs = ntohll(p->Request()->readbuf.ofs);
783  int len = ntohl(p->Request()->readbuf.len);
784 
785  // Find out the file name
786  char *file = 0;
787  char *filen = 0;
788  char *pattern = 0;
789  int dlen = p->Request()->header.dlen;
790  int grep = ntohl(p->Request()->readbuf.int1);
791  int blen = dlen;
792  bool local = 0;
793  if (dlen > 0 && p->Argp()->buff) {
794  file = new char[dlen+1];
795  memcpy(file, p->Argp()->buff, dlen);
796  file[dlen] = 0;
797  // Check if local
798  XrdClientUrlInfo ui(file);
799  if (ui.Host.length() > 0) {
800  // Check locality
801  local = XrdProofdNetMgr::IsLocal(ui.Host.c_str());
802  if (local) {
803  memcpy(file, ui.File.c_str(), ui.File.length());
804  file[ui.File.length()] = 0;
805  blen = ui.File.length();
806  TRACEP(p, DBG, "file is LOCAL");
807  }
808  }
809  // If grep, extract the pattern
810  if (grep > 0) {
811  // 'grep' operation: len is the length of the 'pattern' to be grepped
812  pattern = new char[len + 1];
813  int j = blen - len;
814  int i = 0;
815  while (j < blen)
816  pattern[i++] = file[j++];
817  pattern[i] = 0;
818  filen = strdup(file);
819  filen[blen - len] = 0;
820  TRACEP(p, DBG, "grep operation " << grep << ", pattern:" << pattern);
821  }
822  } else {
823  emsg = "file name not found";
824  TRACEP(p, XERR, emsg);
825  response->Send(kXR_InvalidRequest, emsg.c_str());
826  return 0;
827  }
828  if (grep) {
829  TRACEP(p, REQ, "file: " << filen << ", ofs: " << ofs << ", len: " << len <<
830  ", pattern: " << pattern);
831  } else {
832  TRACEP(p, REQ, "file: " << file << ", ofs: " << ofs << ", len: " << len);
833  }
834 
835  // Get the buffer
836  int lout = len;
837  char *buf = 0;
838  if (local) {
839  if (grep > 0) {
840  // Grep local file
841  lout = blen; // initial length
842  buf = ReadBufferLocal(filen, pattern, lout, grep);
843  } else {
844  // Read portion of local file
845  buf = ReadBufferLocal(file, ofs, lout);
846  }
847  } else {
848  // Read portion of remote file
849  XrdClientUrlInfo u(file);
850  if (u.User.length() <= 0)
851  u.User = p->Client()->User() ? p->Client()->User() : fMgr->EffectiveUser();
852  buf = ReadBufferRemote(u.GetUrl().c_str(), file, ofs, lout, grep);
853  }
854 
855  bool sent = 0;
856  if (!buf) {
857  if (lout > 0) {
858  if (grep > 0) {
859  if (TRACING(DBG)) {
860  XPDFORM(emsg, "nothing found by 'grep' in %s, pattern: %s", filen, pattern);
861  TRACEP(p, DBG, emsg);
862  }
863  response->Send();
864  sent = 1;
865  } else {
866  XPDFORM(emsg, "could not read buffer from %s %s",
867  (local) ? "local file " : "remote file ", file);
868  TRACEP(p, XERR, emsg);
869  response->Send(kXR_InvalidRequest, emsg.c_str());
870  sent = 1;
871  }
872  } else {
873  // Just got an empty buffer
874  if (TRACING(DBG)) {
875  emsg = "nothing found in ";
876  emsg += (grep > 0) ? filen : file;
877  TRACEP(p, DBG, emsg);
878  }
879  }
880  }
881 
882  // Send back to user
883  if (!sent)
884  response->Send(buf, lout);
885 
886  // Cleanup
887  SafeFree(buf);
888  SafeDelArray(file);
889  SafeFree(filen);
890  SafeDelArray(pattern);
891 
892  // Done
893  return 0;
894 }
895 
896 ////////////////////////////////////////////////////////////////////////////////
897 /// Locate the exact file path allowing for wildcards '*' in the file name.
898 /// In case of success, returns 0 and fills file wity the first matching instance.
899 /// Return -1 if no matching pat is found.
900 
901 int XrdProofdNetMgr::LocateLocalFile(XrdOucString &file)
902 {
903  XPDLOC(NMGR, "NetMgr::LocateLocalFile")
904 
905  // If no wild cards or empty, nothing to do
906  if (file.length() <= 0 || file.find('*') == STR_NPOS) return 0;
907 
908  // Locate the file name and the dir
909  XrdOucString fn, dn;
910  int isl = file.rfind('/');
911  if (isl != STR_NPOS) {
912  fn.assign(file, isl + 1, -1);
913  dn.assign(file, 0, isl);
914  } else {
915  fn = file;
916  dn = "./";
917  }
918 
919  XrdOucString emsg;
920  // Scan the dir
921  DIR *dirp = opendir(dn.c_str());
922  if (!dirp) {
923  XPDFORM(emsg, "cannot open '%s' - errno: %d", dn.c_str(), errno);
924  TRACE(XERR, emsg.c_str());
925  return -1;
926  }
927  struct dirent *ent = 0;
928  XrdOucString sent;
929  while ((ent = readdir(dirp))) {
930  if (!strncmp(ent->d_name, ".", 1) || !strncmp(ent->d_name, "..", 2))
931  continue;
932  // Check the match
933  sent = ent->d_name;
934  if (sent.matches(fn.c_str()) > 0) break;
935  sent = "";
936  }
937  closedir(dirp);
938 
939  // If found fill a new output
940  if (sent.length() > 0) {
941  XPDFORM(file, "%s%s", dn.c_str(), sent.c_str());
942  return 0;
943  }
944 
945  // Not found
946  return -1;
947 }
948 
949 ////////////////////////////////////////////////////////////////////////////////
950 /// Read a buffer of length 'len' at offset 'ofs' of local file 'path'; the
951 /// returned buffer must be freed by the caller.
952 /// Wild cards '*' are allowed in the file name of 'path'; the first matching
953 /// instance is taken.
954 /// Returns 0 in case of error.
955 
956 char *XrdProofdNetMgr::ReadBufferLocal(const char *path, kXR_int64 ofs, int &len)
957 {
958  XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
959 
960  XrdOucString emsg;
961  TRACE(REQ, "file: " << path << ", ofs: " << ofs << ", len: " << len);
962 
963  // Check input
964  if (!path || strlen(path) <= 0) {
965  TRACE(XERR, "path undefined!");
966  return (char *)0;
967  }
968 
969  // Locate the path resolving wild cards
970  XrdOucString spath(path);
971  if (LocateLocalFile(spath) != 0) {
972  TRACE(XERR, "path cannot be resolved! (" << path << ")");
973  return (char *)0;
974  }
975  const char *file = spath.c_str();
976 
977  // Open the file in read mode
978  int fd = open(file, O_RDONLY);
979  if (fd < 0) {
980  emsg = "could not open ";
981  emsg += file;
982  TRACE(XERR, emsg);
983  return (char *)0;
984  }
985 
986  // Size of the output
987  struct stat st;
988  if (fstat(fd, &st) != 0) {
989  emsg = "could not get size of file with stat: errno: ";
990  emsg += (int)errno;
991  TRACE(XERR, emsg);
992  close(fd);
993  return (char *)0;
994  }
995  off_t ltot = st.st_size;
996 
997  // Estimate offsets of the requested range
998  // Start from ...
999  kXR_int64 start = ofs;
1000  off_t fst = (start < 0) ? ltot + start : start;
1001  fst = (fst < 0) ? 0 : ((fst >= ltot) ? ltot - 1 : fst);
1002  // End at ...
1003  kXR_int64 end = fst + len;
1004  off_t lst = (end >= ltot) ? ltot : ((end > fst) ? end : ltot);
1005  TRACE(DBG, "file size: " << ltot << ", read from: " << fst << " to " << lst);
1006 
1007  // Number of bytes to be read
1008  len = lst - fst;
1009 
1010  // Output buffer
1011  char *buf = (char *)malloc(len + 1);
1012  if (!buf) {
1013  emsg = "could not allocate enough memory on the heap: errno: ";
1014  emsg += (int)errno;
1015  XPDERR(emsg);
1016  close(fd);
1017  return (char *)0;
1018  }
1019 
1020  // Reposition, if needed
1021  if (fst >= 0)
1022  lseek(fd, fst, SEEK_SET);
1023 
1024  int left = len;
1025  int pos = 0;
1026  int nr = 0;
1027  do {
1028  while ((nr = read(fd, buf + pos, left)) < 0 && errno == EINTR)
1029  errno = 0;
1030  if (nr < 0) {
1031  TRACE(XERR, "error reading from file: errno: " << errno);
1032  break;
1033  }
1034 
1035  // Update counters
1036  pos += nr;
1037  left -= nr;
1038 
1039  } while (nr > 0 && left > 0);
1040 
1041  // Termination
1042  buf[len] = 0;
1043  TRACE(HDBG, "read " << nr << " bytes: " << buf);
1044 
1045  // Close file
1046  close(fd);
1047 
1048  // Done
1049  return buf;
1050 }
1051 
1052 ////////////////////////////////////////////////////////////////////////////////
1053 /// Grep lines matching 'pat' form 'path'; the returned buffer (length in 'len')
1054 /// must be freed by the caller.
1055 /// Wild cards '*' are allowed in the file name of 'path'; the first matching
1056 /// instance is taken.
1057 /// Returns 0 in case of error.
1058 
1059 char *XrdProofdNetMgr::ReadBufferLocal(const char *path,
1060  const char *pat, int &len, int opt)
1061 {
1062  XPDLOC(NMGR, "NetMgr::ReadBufferLocal")
1063 
1064  XrdOucString emsg;
1065  TRACE(REQ, "file: " << path << ", pat: " << pat << ", len: " << len);
1066 
1067  // Check input
1068  if (!path || strlen(path) <= 0) {
1069  TRACE(XERR, "file path undefined!");
1070  return (char *)0;
1071  }
1072 
1073  // Locate the path resolving wild cards
1074  XrdOucString spath(path);
1075  if (LocateLocalFile(spath) != 0) {
1076  TRACE(XERR, "path cannot be resolved! (" << path << ")");
1077  return (char *)0;
1078  }
1079  const char *file = spath.c_str();
1080 
1081  // Size of the output
1082  struct stat st;
1083  if (stat(file, &st) != 0) {
1084  emsg = "could not get size of file with stat: errno: ";
1085  emsg += (int)errno;
1086  TRACE(XERR, emsg);
1087  return (char *)0;
1088  }
1089  off_t ltot = st.st_size;
1090 
1091  // The grep command
1092  char *cmd = 0;
1093  int lcmd = 0;
1094  if (pat && strlen(pat) > 0) {
1095  lcmd = strlen(pat) + strlen(file) + 20;
1096  cmd = new char[lcmd];
1097  if (opt == 1) {
1098  snprintf(cmd, lcmd, "grep %s %s", pat, file);
1099  } else if (opt == 2) {
1100  snprintf(cmd, lcmd, "grep -v %s %s", pat, file);
1101  } else if (opt == 3) {
1102  snprintf(cmd, lcmd, "cat %s | %s", file, pat);
1103  } else { // should not be here
1104  snprintf(cmd, lcmd, "cat %s", file);
1105  }
1106  } else {
1107  lcmd = strlen(file) + 10;
1108  cmd = new char[lcmd];
1109  snprintf(cmd, lcmd, "cat %s", file);
1110  }
1111  TRACE(DBG, "cmd: " << cmd);
1112 
1113  // Execute the command in a pipe
1114  FILE *fp = popen(cmd, "r");
1115  if (!fp) {
1116  emsg = "could not run '";
1117  emsg += cmd;
1118  emsg += "'";
1119  TRACE(XERR, emsg);
1120  delete[] cmd;
1121  return (char *)0;
1122  }
1123  delete[] cmd;
1124 
1125  // Read line by line
1126  len = 0;
1127  char *buf = 0;
1128  char line[2048];
1129  int bufsiz = 0, left = 0, lines = 0;
1130  while ((ltot > 0) && fgets(line, sizeof(line), fp)) {
1131  // Parse the line
1132  int llen = strlen(line);
1133  ltot -= llen;
1134  lines++;
1135  // (Re-)allocate the buffer
1136  if (!buf || (llen > left)) {
1137  int dsiz = 100 * ((int)((len + llen) / lines) + 1);
1138  dsiz = (dsiz > llen) ? dsiz : llen;
1139  bufsiz += dsiz;
1140  buf = (char *)realloc(buf, bufsiz + 1);
1141  left += dsiz;
1142  }
1143  if (!buf) {
1144  emsg = "could not allocate enough memory on the heap: errno: ";
1145  emsg += (int)errno;
1146  TRACE(XERR, emsg);
1147  pclose(fp);
1148  return (char *)0;
1149  }
1150  // Add line to the buffer
1151  memcpy(buf + len, line, llen);
1152  len += llen;
1153  left -= llen;
1154  if (TRACING(HDBG))
1155  fprintf(stderr, "line: %s", line);
1156  }
1157 
1158  // Check the result and terminate the buffer
1159  if (buf) {
1160  if (len > 0) {
1161  buf[len] = 0;
1162  } else {
1163  free(buf);
1164  buf = 0;
1165  }
1166  }
1167 
1168  // Close file
1169  pclose(fp);
1170 
1171  // Done
1172  return buf;
1173 }
1174 
1175 ////////////////////////////////////////////////////////////////////////////////
1176 /// Send a read buffer request of length 'len' at offset 'ofs' for remote file
1177 /// defined by 'url'; the returned buffer must be freed by the caller.
1178 /// Returns 0 in case of error.
1179 
1180 char *XrdProofdNetMgr::ReadBufferRemote(const char *url, const char *file,
1181  kXR_int64 ofs, int &len, int grep)
1182 {
1183  XPDLOC(NMGR, "NetMgr::ReadBufferRemote")
1184 
1185  TRACE(REQ, "url: " << (url ? url : "undef") <<
1186  ", file: " << (file ? file : "undef") << ", ofs: " << ofs <<
1187  ", len: " << len << ", grep: " << grep);
1188 
1189  // Check input
1190  if (!file || strlen(file) <= 0) {
1191  TRACE(XERR, "file undefined!");
1192  return (char *)0;
1193  }
1194  XrdClientUrlInfo u(url);
1195  if (!url || strlen(url) <= 0) {
1196  // Use file as url
1197  u.TakeUrl(XrdOucString(file));
1198  if (u.User.length() <= 0) u.User = fMgr->EffectiveUser();
1199  }
1200 
1201  // Get a connection (logs in)
1202  XrdProofConn *conn = GetProofConn(u.GetUrl().c_str());
1203 
1204  char *buf = 0;
1205  if (conn && conn->IsValid()) {
1206  // Prepare request
1207  XPClientRequest reqhdr;
1208  memset(&reqhdr, 0, sizeof(reqhdr));
1209  conn->SetSID(reqhdr.header.streamid);
1210  reqhdr.header.requestid = kXP_readbuf;
1211  reqhdr.readbuf.ofs = ofs;
1212  reqhdr.readbuf.len = len;
1213  reqhdr.readbuf.int1 = grep;
1214  reqhdr.header.dlen = strlen(file);
1215  const void *btmp = (const void *) file;
1216  char **vout = &buf;
1217  // Send over
1218  XrdClientMessage *xrsp =
1219  conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadBufferRemote");
1220 
1221  // If positive answer
1222  if (xrsp && buf && (xrsp->DataLen() > 0)) {
1223  len = xrsp->DataLen();
1224  } else {
1225  if (xrsp && !(xrsp->IsError()))
1226  // The buffer was just empty: do not call it error
1227  len = 0;
1228  SafeFree(buf);
1229  }
1230 
1231  // Clean the message
1232  SafeDelete(xrsp);
1233  // Clean it up, to avoid leaving open tcp connection possibly going forever
1234  // into CLOSE_WAIT
1235  SafeDelete(conn);
1236  }
1237 
1238  // Done
1239  return buf;
1240 }
1241 
1242 ////////////////////////////////////////////////////////////////////////////////
1243 /// Get log paths from next tier; used in multi-master setups
1244 /// Returns 0 in case of error.
1245 
1246 char *XrdProofdNetMgr::ReadLogPaths(const char *url, const char *msg, int isess)
1247 {
1248  XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1249 
1250  TRACE(REQ, "url: " << (url ? url : "undef") <<
1251  ", msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1252 
1253  // Check input
1254  if (!url || strlen(url) <= 0) {
1255  TRACE(XERR, "url undefined!");
1256  return (char *)0;
1257  }
1258 
1259  // Get a connection (logs in)
1260  XrdProofConn *conn = GetProofConn(url);
1261 
1262  char *buf = 0;
1263  if (conn && conn->IsValid()) {
1264  // Prepare request
1265  XPClientRequest reqhdr;
1266  memset(&reqhdr, 0, sizeof(reqhdr));
1267  conn->SetSID(reqhdr.header.streamid);
1268  reqhdr.header.requestid = kXP_admin;
1269  reqhdr.proof.int1 = kQueryLogPaths;
1270  reqhdr.proof.int2 = isess;
1271  reqhdr.proof.sid = -1;
1272  reqhdr.header.dlen = msg ? strlen(msg) : 0;
1273  const void *btmp = (const void *) msg;
1274  char **vout = &buf;
1275  // Send over
1276  XrdClientMessage *xrsp =
1277  conn->SendReq(&reqhdr, btmp, vout, "NetMgr::ReadLogPaths");
1278 
1279  // If positive answer
1280  if (xrsp && buf && (xrsp->DataLen() > 0)) {
1281  int len = xrsp->DataLen();
1282  buf = (char *) realloc((void *)buf, len + 1);
1283  if (buf)
1284  buf[len] = 0;
1285  } else {
1286  SafeFree(buf);
1287  }
1288 
1289  // Clean the message
1290  SafeDelete(xrsp);
1291  // Clean it up, to avoid leaving open tcp connection possibly going forever
1292  // into CLOSE_WAIT
1293  SafeDelete(conn);
1294  }
1295 
1296  // Done
1297  return buf;
1298 }
1299 
1300 ////////////////////////////////////////////////////////////////////////////////
1301 /// Get log paths from next tier; used in multi-master setups
1302 /// Returns 0 in case of error.
1303 
1304 char *XrdProofdNetMgr::ReadLogPaths(const char *msg, int isess)
1305 {
1306  XPDLOC(NMGR, "NetMgr::ReadLogPaths")
1307 
1308  TRACE(REQ, "msg: " << (msg ? msg : "undef") << ", isess: " << isess);
1309 
1310  char *buf = 0, *pbuf = buf;
1311  int len = 0;
1312  // Loop over unique nodes
1313  std::list<XrdProofWorker *>::iterator iw = fNodes.begin();
1314  XrdProofWorker *w = 0;
1315  while (iw != fNodes.end()) {
1316  if ((w = *iw)) {
1317  // Do not send it to ourselves
1318  bool us = (((w->fHost.find("localhost") != STR_NPOS ||
1319  XrdOucString(fMgr->Host()).find(w->fHost.c_str()) != STR_NPOS)) &&
1320  (w->fPort == -1 || w->fPort == fMgr->Port())) ? 1 : 0;
1321  if (!us) {
1322  // Create 'url'
1323  XrdOucString u = fMgr->EffectiveUser();
1324  u += '@';
1325  u += w->fHost;
1326  if (w->fPort != -1) {
1327  u += ':';
1328  u += w->fPort;
1329  }
1330  // Ask the node
1331  char *bmst = fMgr->NetMgr()->ReadLogPaths(u.c_str(), msg, isess);
1332  if (bmst) {
1333  len += strlen(bmst) + 1;
1334  buf = (char *) realloc((void *)buf, len);
1335  pbuf = buf + len - strlen(bmst) - 1;
1336  memcpy(pbuf, bmst, strlen(bmst) + 1);
1337  buf[len - 1] = 0;
1338  pbuf = buf + len;
1339  free(bmst);
1340  }
1341  } else {
1342  TRACE(DBG, "request for ourselves: ignore");
1343  }
1344  }
1345  // Next worker
1346  ++iw;
1347  }
1348 
1349  // Done
1350  return buf;
1351 }
1352 
1353 ////////////////////////////////////////////////////////////////////////////////
1354 /// Fill-in fWorkers for a localhost based on the number of
1355 /// workers fNumLocalWrks.
1356 
1357 void XrdProofdNetMgr::CreateDefaultPROOFcfg()
1358 {
1359  XPDLOC(NMGR, "NetMgr::CreateDefaultPROOFcfg")
1360 
1361  TRACE(DBG, "enter: local workers: " << fNumLocalWrks);
1362 
1363  // Lock the method to protect the lists.
1364  XrdSysMutexHelper mhp(fMutex);
1365 
1366  // Cleanup the worker list
1367  fWorkers.clear();
1368  // The first time we need to create the default workers
1369  if (fDfltWorkers.size() < 1) {
1370  // Create a default master line
1371  XrdOucString mm("master ", 128);
1372  mm += fMgr->Host();
1373  fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1374 
1375  // Create 'localhost' lines for each worker
1376  int nwrk = fNumLocalWrks;
1377  if (nwrk > 0) {
1378  mm = "worker localhost port=";
1379  mm += fMgr->Port();
1380  while (nwrk--) {
1381  fDfltWorkers.push_back(new XrdProofWorker(mm.c_str()));
1382  TRACE(DBG, "added line: " << mm);
1383  }
1384  }
1385  }
1386 
1387  // Copy the list
1388  std::list<XrdProofWorker *>::iterator w = fDfltWorkers.begin();
1389  for (; w != fDfltWorkers.end(); ++w) {
1390  fWorkers.push_back(*w);
1391  }
1392 
1393  TRACE(DBG, "done: " << fWorkers.size() - 1 << " workers");
1394 
1395  // Find unique nodes
1396  FindUniqueNodes();
1397 
1398  // We are done
1399  return;
1400 }
1401 
1402 ////////////////////////////////////////////////////////////////////////////////
1403 /// Return the list of workers after having made sure that the info is
1404 /// up-to-date
1405 
1406 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetActiveWorkers()
1407 {
1408  XPDLOC(NMGR, "NetMgr::GetActiveWorkers")
1409 
1410  XrdSysMutexHelper mhp(fMutex);
1411 
1412  if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1413  // Check if there were any changes in the config file
1414  if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1415  if (fDfltFallback) {
1416  // Use default settings
1417  CreateDefaultPROOFcfg();
1418  TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1419  } else {
1420  TRACE(XERR, "unable to read the configuration file");
1421  return (std::list<XrdProofWorker *> *)0;
1422  }
1423  }
1424  }
1425  TRACE(DBG, "returning list with " << fWorkers.size() << " entries");
1426 
1427  if (TRACING(HDBG)) Dump();
1428 
1429  return &fWorkers;
1430 }
1431 
1432 ////////////////////////////////////////////////////////////////////////////////
1433 /// Dump status
1434 
1435 void XrdProofdNetMgr::Dump()
1436 {
1437  const char *xpdloc = "NetMgr::Dump";
1438 
1439  XrdSysMutexHelper mhp(fMutex);
1440 
1441  XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1442  XPDPRT("+ Active workers status");
1443  XPDPRT("+ Size: " << fWorkers.size());
1444  XPDPRT("+ ");
1445 
1446  std::list<XrdProofWorker *>::iterator iw;
1447  for (iw = fWorkers.begin(); iw != fWorkers.end(); ++iw) {
1448  XPDPRT("+ wrk: " << (*iw)->fHost << ":" << (*iw)->fPort << " type:" << (*iw)->fType <<
1449  " active sessions:" << (*iw)->Active());
1450  }
1451  XPDPRT("+ ");
1452  XPDPRT("+++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
1453 }
1454 
1455 ////////////////////////////////////////////////////////////////////////////////
1456 /// Return the list of unique nodes after having made sure that the info is
1457 /// up-to-date
1458 
1459 std::list<XrdProofWorker *> *XrdProofdNetMgr::GetNodes()
1460 {
1461  XPDLOC(NMGR, "NetMgr::GetNodes")
1462 
1463  XrdSysMutexHelper mhp(fMutex);
1464 
1465  if (fResourceType == kRTStatic && fPROOFcfg.fName.length() > 0) {
1466  // Check if there were any changes in the config file
1467  if (fReloadPROOFcfg && ReadPROOFcfg(1) != 0) {
1468  if (fDfltFallback) {
1469  // Use default settings
1470  CreateDefaultPROOFcfg();
1471  TRACE(DBG, "parsing of " << fPROOFcfg.fName << " failed: use default settings");
1472  } else {
1473  TRACE(XERR, "unable to read the configuration file");
1474  return (std::list<XrdProofWorker *> *)0;
1475  }
1476  }
1477  }
1478  TRACE(DBG, "returning list with " << fNodes.size() << " entries");
1479 
1480  return &fNodes;
1481 }
1482 
1483 ////////////////////////////////////////////////////////////////////////////////
1484 /// Read PROOF config file and load the information in fWorkers.
1485 /// NB: 'master' information here is ignored, because it is passed
1486 /// via the 'xpd.workdir' and 'xpd.image' config directives
1487 
1488 int XrdProofdNetMgr::ReadPROOFcfg(bool reset)
1489 {
1490  XPDLOC(NMGR, "NetMgr::ReadPROOFcfg")
1491 
1492  TRACE(REQ, "saved time of last modification: " << fPROOFcfg.fMtime);
1493 
1494  // Lock the method to protect the lists.
1495  XrdSysMutexHelper mhp(fMutex);
1496 
1497  // Check inputs
1498  if (fPROOFcfg.fName.length() <= 0)
1499  return -1;
1500 
1501  // Get the modification time
1502  struct stat st;
1503  if (stat(fPROOFcfg.fName.c_str(), &st) != 0) {
1504  // If the file disappeared, reset the modification time so that we are sure
1505  // to reload it if it comes back
1506  if (errno == ENOENT) fPROOFcfg.fMtime = -1;
1507  if (!fDfltFallback) {
1508  TRACE(XERR, "unable to stat file: " << fPROOFcfg.fName << " - errno: " << errno);
1509  } else {
1510  TRACE(ALL, "file " << fPROOFcfg.fName << " cannot be parsed: use default configuration");
1511  }
1512  return -1;
1513  }
1514  TRACE(DBG, "time of last modification: " << st.st_mtime);
1515 
1516  // File should be loaded only once
1517  if (st.st_mtime <= fPROOFcfg.fMtime)
1518  return 0;
1519 
1520  // Save the modification time
1521  fPROOFcfg.fMtime = st.st_mtime;
1522 
1523  // Open the defined path.
1524  FILE *fin = 0;
1525  if (!(fin = fopen(fPROOFcfg.fName.c_str(), "r"))) {
1526  if (fWorkers.size() > 1) {
1527  TRACE(XERR, "unable to fopen file: " << fPROOFcfg.fName << " - errno: " << errno);
1528  TRACE(XERR, "continuing with existing list of workers.");
1529  return 0;
1530  } else {
1531  return -1;
1532  }
1533  }
1534 
1535  if (reset) {
1536  // Cleanup the worker list
1537  fWorkers.clear();
1538  }
1539 
1540  // Add default a master line if not yet there
1541  if (fRegWorkers.size() < 1) {
1542  XrdOucString mm("master ", 128);
1543  mm += fMgr->Host();
1544  fRegWorkers.push_back(new XrdProofWorker(mm.c_str()));
1545  } else {
1546  // Deactivate all current active workers
1547  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1548  // Skip the master line
1549  ++w;
1550  for (; w != fRegWorkers.end(); ++w) {
1551  (*w)->fActive = 0;
1552  }
1553  }
1554 
1555  // Read now the directives
1556  int nw = 0;
1557  char lin[2048];
1558  while (fgets(lin, sizeof(lin), fin)) {
1559  // Skip empty lines
1560  int p = 0;
1561  while (lin[p++] == ' ') {
1562  ;
1563  }
1564  p--;
1565  if (lin[p] == '\0' || lin[p] == '\n')
1566  continue;
1567 
1568  // Skip comments
1569  if (lin[0] == '#')
1570  continue;
1571 
1572  // Remove trailing '\n';
1573  if (lin[strlen(lin)-1] == '\n')
1574  lin[strlen(lin)-1] = '\0';
1575 
1576  TRACE(DBG, "found line: " << lin);
1577 
1578  // Parse the line
1579  XrdProofWorker *pw = new XrdProofWorker(lin);
1580 
1581  const char *pfx[2] = { "master", "node" };
1582  if (!strncmp(lin, pfx[0], strlen(pfx[0])) ||
1583  !strncmp(lin, pfx[1], strlen(pfx[1]))) {
1584  // Init a master instance
1585  if (pw->fHost.beginswith("localhost") ||
1586  pw->Matches(fMgr->Host())) {
1587  // Replace the default line (the first with what found in the file)
1588  XrdProofWorker *fw = fRegWorkers.front();
1589  fw->Reset(lin);
1590  }
1591  // Ignore it
1592  SafeDelete(pw);
1593  } else {
1594  // Check if we have already it
1595  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1596  // Skip the master line
1597  ++w;
1598  bool haveit = 0;
1599  while (w != fRegWorkers.end()) {
1600  if (!((*w)->fActive)) {
1601  if ((*w)->fHost == pw->fHost && (*w)->fPort == pw->fPort) {
1602  (*w)->fActive = 1;
1603  haveit = 1;
1604  break;
1605  }
1606  }
1607  // Go to next
1608  ++w;
1609  }
1610  // If we do not have it, build a new worker object
1611  if (!haveit) {
1612  // Keep it
1613  fRegWorkers.push_back(pw);
1614  } else {
1615  // Drop it
1616  SafeDelete(pw);
1617  }
1618  }
1619  }
1620 
1621  // Copy the active workers
1622  std::list<XrdProofWorker *>::iterator w = fRegWorkers.begin();
1623  while (w != fRegWorkers.end()) {
1624  if ((*w)->fActive) {
1625  fWorkers.push_back(*w);
1626  nw++;
1627  }
1628  ++w;
1629  }
1630 
1631  // Close files
1632  fclose(fin);
1633 
1634  // Find unique nodes
1635  if (reset)
1636  FindUniqueNodes();
1637 
1638  // We are done
1639  return ((nw == 0) ? -1 : 0);
1640 }
1641 
1642 ////////////////////////////////////////////////////////////////////////////////
1643 /// Scan fWorkers for unique nodes (stored in fNodes).
1644 /// Return the number of unque nodes.
1645 /// NB: 'master' information here is ignored, because it is passed
1646 /// via the 'xpd.workdir' and 'xpd.image' config directives
1647 
1648 int XrdProofdNetMgr::FindUniqueNodes()
1649 {
1650  XPDLOC(NMGR, "NetMgr::FindUniqueNodes")
1651 
1652  TRACE(REQ, "# workers: " << fWorkers.size());
1653 
1654  // Cleanup the nodes list
1655  fNodes.clear();
1656 
1657  // Build the list of unique nodes (skip the master line);
1658  if (fWorkers.size() > 1) {
1659  std::list<XrdProofWorker *>::iterator w = fWorkers.begin();
1660  ++w;
1661  for (; w != fWorkers.end(); ++w) if ((*w)->fActive) {
1662  bool add = 1;
1663  std::list<XrdProofWorker *>::iterator n;
1664  for (n = fNodes.begin() ; n != fNodes.end(); ++n) {
1665  if ((*n)->Matches(*w)) {
1666  add = 0;
1667  break;
1668  }
1669  }
1670  if (add)
1671  fNodes.push_back(*w);
1672  }
1673  }
1674  TRACE(REQ, "found " << fNodes.size() << " unique nodes");
1675 
1676  // We are done
1677  return fNodes.size();
1678 }