Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TNetXNGFile.cxx
Go to the documentation of this file.
1 // @(#)root/netxng:$Id$
2 /*************************************************************************
3  * Copyright (C) 1995-2013, Rene Brun and Fons Rademakers. *
4  * All rights reserved. *
5  * *
6  * For the licensing terms see $ROOTSYS/LICENSE. *
7  * For the list of contributors see $ROOTSYS/README/CREDITS. *
8  *************************************************************************/
9 
10 ////////////////////////////////////////////////////////////////////////////////
11 // //
12 // TNetXNGFile //
13 // //
14 // Authors: Justin Salmon, Lukasz Janyst //
15 // CERN, 2013 //
16 // //
17 // Enables access to XRootD files using the new client. //
18 // //
19 ////////////////////////////////////////////////////////////////////////////////
20 
21 #include "TNetXNGFile.h"
22 #include "TEnv.h"
23 #include "TSystem.h"
24 #include "TTimeStamp.h"
25 #include "TVirtualPerfStats.h"
26 #include "TVirtualMonitoring.h"
27 #include <XrdCl/XrdClURL.hh>
28 #include <XrdCl/XrdClFile.hh>
29 #include <XrdCl/XrdClXRootDResponses.hh>
30 #include <XrdCl/XrdClDefaultEnv.hh>
31 #include <XrdVersion.hh>
32 #include <iostream>
33 
34 //------------------------------------------------------------------------------
35 // Open handler for async open requests
36 ////////////////////////////////////////////////////////////////////////////////
37 
38 class TAsyncOpenHandler: public XrdCl::ResponseHandler
39 {
40  public:
41  //------------------------------------------------------------------------
42  // Constructor
43  //////////////////////////////////////////////////////////////////////////
44 
45  TAsyncOpenHandler(TNetXNGFile *file)
46  {
47  fFile = file;
48  fFile->SetAsyncOpenStatus(TFile::kAOSInProgress);
49  }
50 
51  //------------------------------------------------------------------------
52  // Called when a response to open arrives
53  //////////////////////////////////////////////////////////////////////////
54 
55  virtual void HandleResponse(XrdCl::XRootDStatus *status,
56  XrdCl::AnyObject *response)
57  {
58  if (status->IsOK())
59  {
60  fFile->SetAsyncOpenStatus(TFile::kAOSSuccess);
61  }
62  else
63  {
64  fFile->SetAsyncOpenStatus(TFile::kAOSFailure);
65  }
66 
67  delete response;
68  delete status;
69  delete this;
70  }
71 
72  private:
73  TNetXNGFile *fFile;
74 };
75 
76 //------------------------------------------------------------------------------
77 // Async readv handler
78 ////////////////////////////////////////////////////////////////////////////////
79 
80 class TAsyncReadvHandler: public XrdCl::ResponseHandler
81 {
82  public:
83  //------------------------------------------------------------------------
84  // Constructor
85  //////////////////////////////////////////////////////////////////////////
86 
87  TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
88  Int_t statusIndex,
89  TSemaphore *semaphore):
90  fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
91 
92 
93  //------------------------------------------------------------------------
94  // Handle readv response
95  //////////////////////////////////////////////////////////////////////////
96 
97  virtual void HandleResponse(XrdCl::XRootDStatus *status,
98  XrdCl::AnyObject *response)
99  {
100  fStatuses->at(fStatusIndex) = status;
101  fSemaphore->Post();
102  delete response;
103  delete this;
104  }
105 
106  private:
107  std::vector<XrdCl::XRootDStatus*> *fStatuses; // Pointer to status vector
108  Int_t fStatusIndex; // Index into status vector
109  TSemaphore *fSemaphore; // Synchronize the responses
110 };
111 
112 
113 ClassImp(TNetXNGFile);
114 
115 ////////////////////////////////////////////////////////////////////////////////
116 /// Constructor
117 ///
118 /// param url: URL of the entry-point server to be contacted
119 /// param mode: initial file access mode
120 /// param title: title of the file (shown by ROOT browser)
121 /// param compress: compression level and algorithm
122 /// param netopt: TCP window size in bytes (unused)
123 /// param parallelopen: open asynchronously
124 
125 TNetXNGFile::TNetXNGFile(const char *url,
126  Option_t *mode,
127  const char *title,
128  Int_t compress,
129  Int_t netopt,
130  Bool_t parallelopen) :
131  TNetXNGFile(url,0,mode,title,compress,netopt,parallelopen){}
132 
133 TNetXNGFile::TNetXNGFile(const char *url,
134  const char *lurl,
135  Option_t *mode,
136  const char *title,
137  Int_t compress,
138  Int_t /*netopt*/,
139  Bool_t parallelopen) :
140  TFile((lurl ? lurl : url), "NET", title, compress)
141 {
142  using namespace XrdCl;
143 
144  // Set the log level
145  TString val = gSystem->Getenv("XRD_LOGLEVEL");
146  if (val.IsNull()) val = gEnv->GetValue("NetXNG.Debug", "");
147  if (!val.IsNull()) XrdCl::DefaultEnv::SetLogLevel(val.Data());
148 
149  // Remove any anchor from the url. It may have been used by the base TFile
150  // constructor to setup a TArchiveFile but we should not pass it to the xroot
151  // client as a part of the filename
152  {
153  TUrl urlnoanchor(url);
154  urlnoanchor.SetAnchor("");
155  fUrl = new URL(std::string(urlnoanchor.GetUrl()));
156  }
157 
158  fFile = new File();
159  fInitCondVar = new XrdSysCondVar();
160  fUrl->SetProtocol(std::string("root"));
161  fQueryReadVParams = 1;
162  fReadvIorMax = 2097136;
163  fReadvIovMax = 1024;
164 
165  if (ParseOpenMode(mode, fOption, fMode, kTRUE)<0) {
166  Error("Open", "could not parse open mode %s", mode);
167  MakeZombie();
168  return;
169  }
170 
171  // Map ROOT and xrootd environment
172  SetEnv();
173 
174  // Init the monitoring system
175  if (gMonitoringWriter) {
176  if (!fOpenPhases) {
177  fOpenPhases = new TList;
178  fOpenPhases->SetOwner();
179  }
180  gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "xrdopen",
181  kFALSE);
182  }
183 
184  XRootDStatus status;
185  if (parallelopen) {
186  // Open the file asynchronously
187  TAsyncOpenHandler *handler = new TAsyncOpenHandler(this);
188  status = fFile->Open(fUrl->GetURL(), fMode, Access::None, handler);
189  if (!status.IsOK()) {
190  Error("Open", "%s", status.ToStr().c_str());
191  MakeZombie();
192  }
193  return;
194  }
195 
196  // Open the file synchronously
197  status = fFile->Open(fUrl->GetURL(), fMode);
198  if (!status.IsOK()) {
199 #if XrdVNUMBER >= 40000
200  if( status.code == errRedirect )
201  fNewUrl = status.GetErrorMessage().c_str();
202  else
203  Error("Open", "%s", status.ToStr().c_str());
204 #else
205  Error("Open", "%s", status.ToStr().c_str());
206 #endif
207  MakeZombie();
208  return;
209  }
210 
211  if( (fMode & OpenFlags::New) || (fMode & OpenFlags::Delete) ||
212  (fMode & OpenFlags::Update) )
213  fWritable = true;
214 
215  // Initialize the file
216  bool create = false;
217  if( (fMode & OpenFlags::New) || (fMode & OpenFlags::Delete) )
218  create = true;
219  TFile::Init(create);
220 
221  // Get the vector read limits
222  GetVectorReadLimits();
223 }
224 
225 ////////////////////////////////////////////////////////////////////////////////
226 /// Destructor
227 
228 TNetXNGFile::~TNetXNGFile()
229 {
230  if (IsOpen())
231  Close();
232  delete fFile;
233  delete fUrl;
234  delete fInitCondVar;
235 }
236 
237 ////////////////////////////////////////////////////////////////////////////////
238 /// Initialize the file. Makes sure that the file is really open before
239 /// calling TFile::Init. It may block.
240 
241 void TNetXNGFile::Init(Bool_t create)
242 {
243  using namespace XrdCl;
244 
245  if (fInitDone) {
246  if (gDebug > 1) Info("Init", "TFile::Init already called once");
247  return;
248  }
249 
250  // If the async open didn't return yet, wait for it
251  if (!IsOpen() && fAsyncOpenStatus == kAOSInProgress) {
252  fInitCondVar->Wait();
253  }
254 
255  // Notify the monitoring system
256  if (gMonitoringWriter)
257  gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "rootinit",
258  kFALSE);
259 
260  // Initialize the file
261  TFile::Init(create);
262 
263  // Notify the monitoring system
264  if (gMonitoringWriter)
265  gMonitoringWriter->SendFileOpenProgress(this, fOpenPhases, "endopen",
266  kTRUE);
267 
268  // Get the vector read limits
269  GetVectorReadLimits();
270 }
271 
272 ////////////////////////////////////////////////////////////////////////////////
273 /// Get the file size. Returns -1 in the case that the file could not be
274 /// stat'ed.
275 
276 Long64_t TNetXNGFile::GetSize() const
277 {
278  using namespace XrdCl;
279 
280  // Check the file isn't a zombie or closed
281  if (!IsUseable())
282  return -1;
283 
284  bool forceStat = true;
285  if( fMode == XrdCl::OpenFlags::Read )
286  forceStat = false;
287 
288  StatInfo *info = 0;
289  if( !fFile->Stat( forceStat, info ).IsOK() )
290  return -1;
291  Long64_t size = info->GetSize();
292  delete info;
293  return size;
294 }
295 
296 ////////////////////////////////////////////////////////////////////////////////
297 /// Check if the file is open
298 
299 Bool_t TNetXNGFile::IsOpen() const
300 {
301  return fFile->IsOpen();
302 }
303 
304 ////////////////////////////////////////////////////////////////////////////////
305 /// Set the status of an asynchronous file open
306 
307 void TNetXNGFile::SetAsyncOpenStatus(EAsyncOpenStatus status)
308 {
309  fAsyncOpenStatus = status;
310  // Unblock Init() if it is waiting
311  fInitCondVar->Signal();
312 }
313 
314 ////////////////////////////////////////////////////////////////////////////////
315 /// Close the file
316 ///
317 /// param option: if == "R", all TProcessIDs referenced by this file are
318 /// deleted (is this valid in xrootd context?)
319 
320 void TNetXNGFile::Close(const Option_t */*option*/)
321 {
322  TFile::Close();
323 
324  XrdCl::XRootDStatus status = fFile->Close();
325  if (!status.IsOK()) {
326  Error("Close", "%s", status.ToStr().c_str());
327  MakeZombie();
328  }
329 }
330 
331 ////////////////////////////////////////////////////////////////////////////////
332 /// Reopen the file with the new access mode
333 ///
334 /// param mode: the new access mode
335 /// returns: 0 in case the mode was successfully modified, 1 in case
336 /// the mode did not change (was already as requested or wrong
337 /// input arguments) and -1 in case of failure, in which case
338 /// the file cannot be used anymore
339 
340 Int_t TNetXNGFile::ReOpen(Option_t *modestr)
341 {
342  using namespace XrdCl;
343  TString newOpt;
344  OpenFlags::Flags mode;
345 
346  Int_t parseres = ParseOpenMode(modestr, newOpt, mode, kFALSE);
347 
348  // Only Read and Update are valid modes
349  if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
350  Error("ReOpen", "mode must be either READ or UPDATE, not %s", modestr);
351  return 1;
352  }
353 
354  // The mode is not really changing
355  if (mode == fMode || (mode == OpenFlags::Update
356  && fMode == OpenFlags::New)) {
357  return 1;
358  }
359 
360  XRootDStatus st = fFile->Close();
361  if (!st.IsOK()) {
362  Error("ReOpen", "%s", st.ToStr().c_str());
363  return 1;
364  }
365  fOption = newOpt;
366  fMode = mode;
367 
368  st = fFile->Open(fUrl->GetURL(), fMode);
369  if (!st.IsOK()) {
370  Error("ReOpen", "%s", st.ToStr().c_str());
371  return 1;
372  }
373 
374  return 0;
375 }
376 
377 ////////////////////////////////////////////////////////////////////////////////
378 /// Read a data chunk of the given size
379 ///
380 /// param buffer: a pointer to a buffer big enough to hold the data
381 /// param length: number of bytes to be read
382 /// returns: kTRUE in case of failure
383 
384 Bool_t TNetXNGFile::ReadBuffer(char *buffer, Int_t length)
385 {
386  return ReadBuffer(buffer, GetRelOffset(), length);
387 }
388 
389 ////////////////////////////////////////////////////////////////////////////////
390 /// Read a data chunk of the given size, starting from the given offset
391 ///
392 /// param buffer: a pointer to a buffer big enough to hold the data
393 /// param position: offset from the beginning of the file
394 /// param length: number of bytes to be read
395 /// returns: kTRUE in case of failure
396 
397 Bool_t TNetXNGFile::ReadBuffer(char *buffer, Long64_t position, Int_t length)
398 {
399  using namespace XrdCl;
400  if (gDebug > 0)
401  Info("ReadBuffer", "offset: %lld length: %d", position, length);
402 
403  // Check the file isn't a zombie or closed
404  if (!IsUseable())
405  return kTRUE;
406 
407  // Try to read from cache
408  SetOffset(position);
409  Int_t status;
410  if ((status = ReadBufferViaCache(buffer, length))) {
411  if (status == 2)
412  return kTRUE;
413  return kFALSE;
414  }
415 
416  Double_t start = 0;
417  if (gPerfStats) start = TTimeStamp();
418 
419  // Read the data
420  uint32_t bytesRead = 0;
421  XRootDStatus st = fFile->Read(fOffset, length, buffer, bytesRead);
422  if (gDebug > 0)
423  Info("ReadBuffer", "%s bytes read: %u", st.ToStr().c_str(), bytesRead);
424 
425  if (!st.IsOK()) {
426  Error("ReadBuffer", "%s", st.ToStr().c_str());
427  return kTRUE;
428  }
429 
430  if ((Int_t)bytesRead != length) {
431  Error("ReadBuffer", "error reading all requested bytes, got %u of %d",
432  bytesRead, length);
433  return kTRUE;
434  }
435 
436  // Bump the globals
437  fOffset += bytesRead;
438  fBytesRead += bytesRead;
439  fgBytesRead += bytesRead;
440  fReadCalls ++;
441  fgReadCalls ++;
442 
443  if (gPerfStats)
444  gPerfStats->FileReadEvent(this, (Int_t)bytesRead, start);
445 
446  if (gMonitoringWriter)
447  gMonitoringWriter->SendFileReadProgress(this);
448 
449  return kFALSE;
450 }
451 
452 ////////////////////////////////////////////////////////////////////////////////
453 /// Read scattered data chunks in one operation
454 ///
455 /// param buffer: a pointer to a buffer big enough to hold all of the
456 /// requested data
457 /// param position: position[i] is the seek position of chunk i of len
458 /// length[i]
459 /// param length: length[i] is the length of the chunk at offset
460 /// position[i]
461 /// param nbuffs: number of chunks
462 /// returns: kTRUE in case of failure
463 
464 Bool_t TNetXNGFile::ReadBuffers(char *buffer, Long64_t *position, Int_t *length,
465  Int_t nbuffs)
466 {
467  using namespace XrdCl;
468 
469  // Check the file isn't a zombie or closed
470  if (!IsUseable())
471  return kTRUE;
472 
473  std::vector<ChunkList> chunkLists;
474  ChunkList chunks;
475  std::vector<XRootDStatus*> *statuses;
476  TSemaphore *semaphore;
477  Int_t totalBytes = 0;
478  Long64_t offset = 0;
479  char *cursor = buffer;
480 
481  Double_t start = 0;
482  if (gPerfStats) start = TTimeStamp();
483 
484  if (fArchiveOffset)
485  for (Int_t i = 0; i < nbuffs; i++)
486  position[i] += fArchiveOffset;
487 
488  // Build a list of chunks. Put the buffers in the ChunkInfo's
489  for (Int_t i = 0; i < nbuffs; ++i) {
490  totalBytes += length[i];
491 
492  // If the length is bigger than max readv size, split into smaller chunks
493  if (length[i] > fReadvIorMax) {
494  Int_t nsplit = length[i] / fReadvIorMax;
495  Int_t rem = length[i] % fReadvIorMax;
496  Int_t j;
497 
498  // Add as many max-size chunks as are divisible
499  for (j = 0; j < nsplit; ++j) {
500  offset = position[i] + (j * fReadvIorMax);
501  chunks.push_back(ChunkInfo(offset, fReadvIorMax, cursor));
502  cursor += fReadvIorMax;
503  }
504 
505  // Add the remainder
506  offset = position[i] + (j * fReadvIorMax);
507  chunks.push_back(ChunkInfo(offset, rem, cursor));
508  cursor += rem;
509  } else {
510  chunks.push_back(ChunkInfo(position[i], length[i], cursor));
511  cursor += length[i];
512  }
513 
514  // If there are more than or equal to max chunks, make another chunk list
515  if ((Int_t) chunks.size() == fReadvIovMax) {
516  chunkLists.push_back(chunks);
517  chunks = ChunkList();
518  } else if ((Int_t) chunks.size() > fReadvIovMax) {
519  chunkLists.push_back(ChunkList(chunks.begin(),
520  chunks.begin() + fReadvIovMax));
521  chunks = ChunkList(chunks.begin() + fReadvIovMax, chunks.end());
522  }
523  }
524 
525  // Push back the last chunk list
526  if( !chunks.empty() )
527  chunkLists.push_back(chunks);
528 
529  TAsyncReadvHandler *handler;
530  XRootDStatus status;
531  semaphore = new TSemaphore(0);
532  statuses = new std::vector<XRootDStatus*>(chunkLists.size());
533 
534  // Read asynchronously but wait for all responses
535  std::vector<ChunkList>::iterator it;
536  for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
537  {
538  handler = new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
539  semaphore);
540  status = fFile->VectorRead(*it, 0, handler);
541 
542  if (!status.IsOK()) {
543  Error("ReadBuffers", "%s", status.ToStr().c_str());
544  return kTRUE;
545  }
546  }
547 
548  // Wait for all responses
549  for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
550  semaphore->Wait();
551  }
552 
553  // Check for errors
554  for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
555  XRootDStatus *st = statuses->at(it - chunkLists.begin());
556 
557  if (!st->IsOK()) {
558  Error("ReadBuffers", "%s", st->ToStr().c_str());
559  for( ; it != chunkLists.end(); ++it )
560  {
561  st = statuses->at( it - chunkLists.begin() );
562  delete st;
563  }
564  delete statuses;
565  delete semaphore;
566 
567  return kTRUE;
568  }
569  delete st;
570  }
571 
572  // Bump the globals
573  fBytesRead += totalBytes;
574  fgBytesRead += totalBytes;
575  fReadCalls ++;
576  fgReadCalls ++;
577 
578  if (gPerfStats) {
579  fOffset = position[0];
580  gPerfStats->FileReadEvent(this, totalBytes, start);
581  }
582 
583  if (gMonitoringWriter)
584  gMonitoringWriter->SendFileReadProgress(this);
585 
586  delete statuses;
587  delete semaphore;
588  return kFALSE;
589 }
590 
591 ////////////////////////////////////////////////////////////////////////////////
592 /// Write a data chunk
593 ///
594 /// param buffer: the data to be written
595 /// param length: the size of the buffer
596 /// returns: kTRUE in case of failure
597 
598 Bool_t TNetXNGFile::WriteBuffer(const char *buffer, Int_t length)
599 {
600  using namespace XrdCl;
601 
602  // Check the file isn't a zombie or closed
603  if (!IsUseable())
604  return kTRUE;
605 
606  if (!fWritable) {
607  if (gDebug > 1)
608  Info("WriteBuffer", "file not writable");
609  return kTRUE;
610  }
611 
612  // Check the write cache
613  Int_t status;
614  if ((status = WriteBufferViaCache(buffer, length))) {
615  if (status == 2)
616  return kTRUE;
617  return kFALSE;
618  }
619 
620  // Write the data
621  XRootDStatus st = fFile->Write(fOffset, length, buffer);
622  if (!st.IsOK()) {
623  Error("WriteBuffer", "%s", st.ToStr().c_str());
624  return kTRUE;
625  }
626 
627  // Bump the globals
628  fOffset += length;
629  fBytesWrite += length;
630  fgBytesWrite += length;
631 
632  return kFALSE;
633 }
634 
635 ////////////////////////////////////////////////////////////////////////////////
636 
637 void TNetXNGFile::Flush()
638 {
639  if (!IsUseable())
640  return;
641 
642  if (!fWritable) {
643  if (gDebug > 1)
644  Info("Flush", "file not writable - do nothing");
645  return;
646  }
647 
648  FlushWriteCache();
649 
650  //
651  // Flush via the remote xrootd
652  XrdCl::XRootDStatus status = fFile->Sync();
653  if( !status.IsOK() )
654  Error("Flush", "%s", status.ToStr().c_str());
655 
656  if (gDebug > 1)
657  Info("Flush", "XrdClient::Sync succeeded.");
658 }
659 
660 ////////////////////////////////////////////////////////////////////////////////
661 /// Set the position within the file
662 ///
663 /// param offset: the new offset relative to position
664 /// param position: the relative position, either kBeg, kCur or kEnd
665 
666 void TNetXNGFile::Seek(Long64_t offset, ERelativeTo position)
667 {
668  SetOffset(offset, position);
669 }
670 
671 ////////////////////////////////////////////////////////////////////////////////
672 /// Parse a file open mode given as a string into a canonically formatted
673 /// output mode string and an integer code that the xroot client can use
674 ///
675 /// param in: the file open mode as a string (in)
676 /// modestr: open mode string after parsing (out)
677 /// mode: correctly parsed option mode code (out)
678 /// assumeRead: if the open mode is not recognised assume read (in)
679 /// returns: 0 in case the mode was successfully parsed,
680 /// -1 in case of failure
681 
682 Int_t TNetXNGFile::ParseOpenMode(Option_t *in, TString &modestr,
683  XrdCl::OpenFlags::Flags &mode,
684  Bool_t assumeRead)
685 {
686  using namespace XrdCl;
687  modestr = ToUpper(TString(in));
688 
689  if (modestr == "NEW" || modestr == "CREATE") mode = OpenFlags::New;
690  else if (modestr == "RECREATE") mode = OpenFlags::Delete;
691  else if (modestr == "UPDATE") mode = OpenFlags::Update;
692  else if (modestr == "READ") mode = OpenFlags::Read;
693  else {
694  if (!assumeRead) {
695  return -1;
696  }
697  modestr = "READ";
698  mode = OpenFlags::Read;
699  }
700 
701  return 0;
702 }
703 
704 ////////////////////////////////////////////////////////////////////////////////
705 /// Check the file is open and isn't a zombie
706 
707 Bool_t TNetXNGFile::IsUseable() const
708 {
709  if (IsZombie()) {
710  Error("TNetXNGFile", "Object is in 'zombie' state");
711  return kFALSE;
712  }
713 
714  if (!IsOpen()) {
715  Error("TNetXNGFile", "The remote file is not open");
716  return kFALSE;
717  }
718 
719  return kTRUE;
720 }
721 
722 ////////////////////////////////////////////////////////////////////////////////
723 /// Find the server-specific readv config params. Returns kFALSE in case of
724 /// error, kTRUE otherwise.
725 
726 Bool_t TNetXNGFile::GetVectorReadLimits()
727 {
728  using namespace XrdCl;
729 
730  // Check the file isn't a zombie or closed
731  if (!IsUseable())
732  return kFALSE;
733 
734  if (!fQueryReadVParams)
735  return kTRUE;
736 
737 #if XrdVNUMBER >= 40000
738  std::string lasturl;
739  fFile->GetProperty("LastURL",lasturl);
740  URL lrl(lasturl);
741  //local redirect will split vector reads into multiple local reads anyway,
742  // so we are fine with the default values
743  if(lrl.GetProtocol().compare("file") == 0 &&
744  lrl.GetHostId().compare("localhost") == 0){
745  if (gDebug >= 1)
746  Info("GetVectorReadLimits","Local redirect, using default values");
747  return kTRUE;
748  }
749 
750  std::string dataServerStr;
751  if( !fFile->GetProperty( "DataServer", dataServerStr ) )
752  return kFALSE;
753  URL dataServer(dataServerStr);
754 #else
755  URL dataServer(fFile->GetDataServer());
756 #endif
757  FileSystem fs(dataServer);
758  Buffer arg;
759  Buffer *response;
760  arg.FromString(std::string("readv_ior_max readv_iov_max"));
761 
762  XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
763  if (!status.IsOK())
764  return kFALSE;
765 
766  Ssiz_t from = 0;
767  TString token;
768 
769  std::vector<TString> resps;
770  while (TString(response->ToString()).Tokenize(token, from, "\n"))
771  resps.push_back(token);
772 
773  if (resps.size() != 2)
774  return kFALSE;
775 
776  if (resps[0].IsDigit())
777  fReadvIorMax = resps[0].Atoi();
778 
779  if (resps[1].IsDigit())
780  fReadvIovMax = resps[1].Atoi();
781 
782  delete response;
783 
784  // this is to workaround a dCache bug reported here:
785  // https://sft.its.cern.ch/jira/browse/ROOT-6639
786  if( fReadvIovMax == 0x7FFFFFFF )
787  {
788  fReadvIovMax = 1024;
789  fReadvIorMax = 2097136;
790  }
791 
792  return kTRUE;
793 }
794 
795 ////////////////////////////////////////////////////////////////////////////////
796 /// Map ROOT and xrootd environment variables
797 
798 void TNetXNGFile::SetEnv()
799 {
800  XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
801  const char *cenv = 0;
802  TString val;
803 
804  val = gEnv->GetValue("NetXNG.ConnectionWindow", "");
805  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CONNECTIONWINDOW"))
806  || strlen(cenv) <= 0))
807  env->PutInt("ConnectionWindow", val.Atoi());
808 
809  val = gEnv->GetValue("NetXNG.ConnectionRetry", "");
810  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CONNECTIONRETRY"))
811  || strlen(cenv) <= 0))
812  env->PutInt("RequestTimeout", val.Atoi());
813 
814  val = gEnv->GetValue("NetXNG.RequestTimeout", "");
815  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_REQUESTTIMEOUT"))
816  || strlen(cenv) <= 0))
817  env->PutInt("RequestTimeout", val.Atoi());
818 
819  val = gEnv->GetValue("NetXNG.SubStreamsPerChannel", "");
820  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_SUBSTREAMSPERCHANNEL"))
821  || strlen(cenv) <= 0))
822  env->PutInt("SubStreamsPerChannel", val.Atoi());
823 
824  val = gEnv->GetValue("NetXNG.TimeoutResolution", "");
825  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_TIMEOUTRESOLUTION"))
826  || strlen(cenv) <= 0))
827  env->PutInt("TimeoutResolution", val.Atoi());
828 
829  val = gEnv->GetValue("NetXNG.StreamErrorWindow", "");
830  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_STREAMERRORWINDOW"))
831  || strlen(cenv) <= 0))
832  env->PutInt("StreamErrorWindow", val.Atoi());
833 
834  val = gEnv->GetValue("NetXNG.RunForkHandler", "");
835  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_RUNFORKHANDLER"))
836  || strlen(cenv) <= 0))
837  env->PutInt("RunForkHandler", val.Atoi());
838 
839  val = gEnv->GetValue("NetXNG.RedirectLimit", "");
840  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_REDIRECTLIMIT"))
841  || strlen(cenv) <= 0))
842  env->PutInt("RedirectLimit", val.Atoi());
843 
844  val = gEnv->GetValue("NetXNG.WorkerThreads", "");
845  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_WORKERTHREADS"))
846  || strlen(cenv) <= 0))
847  env->PutInt("WorkerThreads", val.Atoi());
848 
849  val = gEnv->GetValue("NetXNG.CPChunkSize", "");
850  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CPCHUNKSIZE"))
851  || strlen(cenv) <= 0))
852  env->PutInt("CPChunkSize", val.Atoi());
853 
854  val = gEnv->GetValue("NetXNG.CPParallelChunks", "");
855  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CPPARALLELCHUNKS"))
856  || strlen(cenv) <= 0))
857  env->PutInt("CPParallelChunks", val.Atoi());
858 
859  val = gEnv->GetValue("NetXNG.PollerPreference", "");
860  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_POLLERPREFERENCE"))
861  || strlen(cenv) <= 0))
862  env->PutString("PollerPreference", val.Data());
863 
864  val = gEnv->GetValue("NetXNG.ClientMonitor", "");
865  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CLIENTMONITOR"))
866  || strlen(cenv) <= 0))
867  env->PutString("ClientMonitor", val.Data());
868 
869  val = gEnv->GetValue("NetXNG.ClientMonitorParam", "");
870  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XRD_CLIENTMONITORPARAM"))
871  || strlen(cenv) <= 0))
872  env->PutString("ClientMonitorParam", val.Data());
873 
874  fQueryReadVParams = gEnv->GetValue("NetXNG.QueryReadVParams", 1);
875  env->PutInt( "MultiProtocol", gEnv->GetValue("TFile.CrossProtocolRedirects", 1));
876 
877  // Old style netrc file
878  TString netrc;
879  netrc.Form("%s/.rootnetrc", gSystem->HomeDirectory());
880  gSystem->Setenv("XrdSecNETRC", netrc.Data());
881 
882  // For authentication
883  val = gEnv->GetValue("XSec.Pwd.ALogFile", "");
884  if (val.Length() > 0)
885  gSystem->Setenv("XrdSecPWDALOGFILE", val.Data());
886 
887  val = gEnv->GetValue("XSec.Pwd.ServerPuk", "");
888  if (val.Length() > 0)
889  gSystem->Setenv("XrdSecPWDSRVPUK", val.Data());
890 
891  val = gEnv->GetValue("XSec.GSI.CAdir", "");
892  if (val.Length() > 0)
893  gSystem->Setenv("XrdSecGSICADIR", val.Data());
894 
895  val = gEnv->GetValue("XSec.GSI.CRLdir", "");
896  if (val.Length() > 0)
897  gSystem->Setenv("XrdSecGSICRLDIR", val.Data());
898 
899  val = gEnv->GetValue("XSec.GSI.CRLextension", "");
900  if (val.Length() > 0)
901  gSystem->Setenv("XrdSecGSICRLEXT", val.Data());
902 
903  val = gEnv->GetValue("XSec.GSI.UserCert", "");
904  if (val.Length() > 0)
905  gSystem->Setenv("XrdSecGSIUSERCERT", val.Data());
906 
907  val = gEnv->GetValue("XSec.GSI.UserKey", "");
908  if (val.Length() > 0)
909  gSystem->Setenv("XrdSecGSIUSERKEY", val.Data());
910 
911  val = gEnv->GetValue("XSec.GSI.UserProxy", "");
912  if (val.Length() > 0)
913  gSystem->Setenv("XrdSecGSIUSERPROXY", val.Data());
914 
915  val = gEnv->GetValue("XSec.GSI.ProxyValid", "");
916  if (val.Length() > 0)
917  gSystem->Setenv("XrdSecGSIPROXYVALID", val.Data());
918 
919  val = gEnv->GetValue("XSec.GSI.ProxyKeyBits", "");
920  if (val.Length() > 0)
921  gSystem->Setenv("XrdSecGSIPROXYKEYBITS", val.Data());
922 
923  val = gEnv->GetValue("XSec.GSI.ProxyForward", "0");
924  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN"))
925  || strlen(cenv) <= 0))
926  gSystem->Setenv("XrdSecGSIPROXYDEPLEN", val.Data());
927 
928  val = gEnv->GetValue("XSec.GSI.CheckCRL", "1");
929  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK"))
930  || strlen(cenv) <= 0))
931  gSystem->Setenv("XrdSecGSICRLCHECK", val.Data());
932 
933  val = gEnv->GetValue("XSec.GSI.DelegProxy", "0");
934  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY"))
935  || strlen(cenv) <= 0))
936  gSystem->Setenv("XrdSecGSIDELEGPROXY", val.Data());
937 
938  val = gEnv->GetValue("XSec.GSI.SignProxy", "1");
939  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY"))
940  || strlen(cenv) <= 0))
941  gSystem->Setenv("XrdSecGSISIGNPROXY", val.Data());
942 
943  val = gEnv->GetValue("XSec.Pwd.AutoLogin", "1");
944  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG"))
945  || strlen(cenv) <= 0))
946  gSystem->Setenv("XrdSecPWDAUTOLOG", val.Data());
947 
948  val = gEnv->GetValue("XSec.Pwd.VerifySrv", "1");
949  if (val.Length() > 0 && (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV"))
950  || strlen(cenv) <= 0))
951  gSystem->Setenv("XrdSecPWDVERIFYSRV", val.Data());
952 }