27 #include <XrdCl/XrdClURL.hh>
28 #include <XrdCl/XrdClFile.hh>
29 #include <XrdCl/XrdClXRootDResponses.hh>
30 #include <XrdCl/XrdClDefaultEnv.hh>
31 #include <XrdVersion.hh>
38 class TAsyncOpenHandler:
public XrdCl::ResponseHandler
45 TAsyncOpenHandler(TNetXNGFile *file)
48 fFile->SetAsyncOpenStatus(TFile::kAOSInProgress);
55 virtual void HandleResponse(XrdCl::XRootDStatus *status,
56 XrdCl::AnyObject *response)
60 fFile->SetAsyncOpenStatus(TFile::kAOSSuccess);
64 fFile->SetAsyncOpenStatus(TFile::kAOSFailure);
80 class TAsyncReadvHandler:
public XrdCl::ResponseHandler
87 TAsyncReadvHandler(std::vector<XrdCl::XRootDStatus*> *statuses,
89 TSemaphore *semaphore):
90 fStatuses(statuses), fStatusIndex(statusIndex), fSemaphore(semaphore) {}
97 virtual void HandleResponse(XrdCl::XRootDStatus *status,
98 XrdCl::AnyObject *response)
100 fStatuses->at(fStatusIndex) = status;
107 std::vector<XrdCl::XRootDStatus*> *fStatuses;
109 TSemaphore *fSemaphore;
113 ClassImp(TNetXNGFile);
125 TNetXNGFile::TNetXNGFile(
const char *url,
130 Bool_t parallelopen) :
131 TNetXNGFile(url,0,mode,title,compress,netopt,parallelopen){}
133 TNetXNGFile::TNetXNGFile(
const char *url,
139 Bool_t parallelopen) :
140 TFile((lurl ? lurl : url),
"NET", title, compress)
142 using namespace XrdCl;
145 TString val = gSystem->Getenv(
"XRD_LOGLEVEL");
146 if (val.IsNull()) val = gEnv->GetValue(
"NetXNG.Debug",
"");
147 if (!val.IsNull()) XrdCl::DefaultEnv::SetLogLevel(val.Data());
153 TUrl urlnoanchor(url);
154 urlnoanchor.SetAnchor(
"");
155 fUrl =
new URL(std::string(urlnoanchor.GetUrl()));
159 fInitCondVar =
new XrdSysCondVar();
160 fUrl->SetProtocol(std::string(
"root"));
161 fQueryReadVParams = 1;
162 fReadvIorMax = 2097136;
165 if (ParseOpenMode(mode, fOption, fMode, kTRUE)<0) {
166 Error(
"Open",
"could not parse open mode %s", mode);
175 if (gMonitoringWriter) {
177 fOpenPhases =
new TList;
178 fOpenPhases->SetOwner();
180 gMonitoringWriter->SendFileOpenProgress(
this, fOpenPhases,
"xrdopen",
187 TAsyncOpenHandler *handler =
new TAsyncOpenHandler(
this);
188 status = fFile->Open(fUrl->GetURL(), fMode, Access::None, handler);
189 if (!status.IsOK()) {
190 Error(
"Open",
"%s", status.ToStr().c_str());
197 status = fFile->Open(fUrl->GetURL(), fMode);
198 if (!status.IsOK()) {
199 #if XrdVNUMBER >= 40000
200 if( status.code == errRedirect )
201 fNewUrl = status.GetErrorMessage().c_str();
203 Error(
"Open",
"%s", status.ToStr().c_str());
205 Error(
"Open",
"%s", status.ToStr().c_str());
211 if( (fMode & OpenFlags::New) || (fMode & OpenFlags::Delete) ||
212 (fMode & OpenFlags::Update) )
217 if( (fMode & OpenFlags::New) || (fMode & OpenFlags::Delete) )
222 GetVectorReadLimits();
228 TNetXNGFile::~TNetXNGFile()
241 void TNetXNGFile::Init(Bool_t create)
243 using namespace XrdCl;
246 if (gDebug > 1) Info(
"Init",
"TFile::Init already called once");
251 if (!IsOpen() && fAsyncOpenStatus == kAOSInProgress) {
252 fInitCondVar->Wait();
256 if (gMonitoringWriter)
257 gMonitoringWriter->SendFileOpenProgress(
this, fOpenPhases,
"rootinit",
264 if (gMonitoringWriter)
265 gMonitoringWriter->SendFileOpenProgress(
this, fOpenPhases,
"endopen",
269 GetVectorReadLimits();
276 Long64_t TNetXNGFile::GetSize()
const
278 using namespace XrdCl;
284 bool forceStat =
true;
285 if( fMode == XrdCl::OpenFlags::Read )
289 if( !fFile->Stat( forceStat, info ).IsOK() )
291 Long64_t size = info->GetSize();
299 Bool_t TNetXNGFile::IsOpen()
const
301 return fFile->IsOpen();
307 void TNetXNGFile::SetAsyncOpenStatus(EAsyncOpenStatus status)
309 fAsyncOpenStatus = status;
311 fInitCondVar->Signal();
320 void TNetXNGFile::Close(
const Option_t *)
324 XrdCl::XRootDStatus status = fFile->Close();
325 if (!status.IsOK()) {
326 Error(
"Close",
"%s", status.ToStr().c_str());
340 Int_t TNetXNGFile::ReOpen(Option_t *modestr)
342 using namespace XrdCl;
344 OpenFlags::Flags mode;
346 Int_t parseres = ParseOpenMode(modestr, newOpt, mode, kFALSE);
349 if (parseres<0 || (mode != OpenFlags::Read && mode != OpenFlags::Update)) {
350 Error(
"ReOpen",
"mode must be either READ or UPDATE, not %s", modestr);
355 if (mode == fMode || (mode == OpenFlags::Update
356 && fMode == OpenFlags::New)) {
360 XRootDStatus st = fFile->Close();
362 Error(
"ReOpen",
"%s", st.ToStr().c_str());
368 st = fFile->Open(fUrl->GetURL(), fMode);
370 Error(
"ReOpen",
"%s", st.ToStr().c_str());
384 Bool_t TNetXNGFile::ReadBuffer(
char *buffer, Int_t length)
386 return ReadBuffer(buffer, GetRelOffset(), length);
397 Bool_t TNetXNGFile::ReadBuffer(
char *buffer, Long64_t position, Int_t length)
399 using namespace XrdCl;
401 Info(
"ReadBuffer",
"offset: %lld length: %d", position, length);
410 if ((status = ReadBufferViaCache(buffer, length))) {
417 if (gPerfStats) start = TTimeStamp();
420 uint32_t bytesRead = 0;
421 XRootDStatus st = fFile->Read(fOffset, length, buffer, bytesRead);
423 Info(
"ReadBuffer",
"%s bytes read: %u", st.ToStr().c_str(), bytesRead);
426 Error(
"ReadBuffer",
"%s", st.ToStr().c_str());
430 if ((Int_t)bytesRead != length) {
431 Error(
"ReadBuffer",
"error reading all requested bytes, got %u of %d",
437 fOffset += bytesRead;
438 fBytesRead += bytesRead;
439 fgBytesRead += bytesRead;
444 gPerfStats->FileReadEvent(
this, (Int_t)bytesRead, start);
446 if (gMonitoringWriter)
447 gMonitoringWriter->SendFileReadProgress(
this);
464 Bool_t TNetXNGFile::ReadBuffers(
char *buffer, Long64_t *position, Int_t *length,
467 using namespace XrdCl;
473 std::vector<ChunkList> chunkLists;
475 std::vector<XRootDStatus*> *statuses;
476 TSemaphore *semaphore;
477 Int_t totalBytes = 0;
479 char *cursor = buffer;
482 if (gPerfStats) start = TTimeStamp();
485 for (Int_t i = 0; i < nbuffs; i++)
486 position[i] += fArchiveOffset;
489 for (Int_t i = 0; i < nbuffs; ++i) {
490 totalBytes += length[i];
493 if (length[i] > fReadvIorMax) {
494 Int_t nsplit = length[i] / fReadvIorMax;
495 Int_t rem = length[i] % fReadvIorMax;
499 for (j = 0; j < nsplit; ++j) {
500 offset = position[i] + (j * fReadvIorMax);
501 chunks.push_back(ChunkInfo(offset, fReadvIorMax, cursor));
502 cursor += fReadvIorMax;
506 offset = position[i] + (j * fReadvIorMax);
507 chunks.push_back(ChunkInfo(offset, rem, cursor));
510 chunks.push_back(ChunkInfo(position[i], length[i], cursor));
515 if ((Int_t) chunks.size() == fReadvIovMax) {
516 chunkLists.push_back(chunks);
517 chunks = ChunkList();
518 }
else if ((Int_t) chunks.size() > fReadvIovMax) {
519 chunkLists.push_back(ChunkList(chunks.begin(),
520 chunks.begin() + fReadvIovMax));
521 chunks = ChunkList(chunks.begin() + fReadvIovMax, chunks.end());
526 if( !chunks.empty() )
527 chunkLists.push_back(chunks);
529 TAsyncReadvHandler *handler;
531 semaphore =
new TSemaphore(0);
532 statuses =
new std::vector<XRootDStatus*>(chunkLists.size());
535 std::vector<ChunkList>::iterator it;
536 for (it = chunkLists.begin(); it != chunkLists.end(); ++it)
538 handler =
new TAsyncReadvHandler(statuses, it - chunkLists.begin(),
540 status = fFile->VectorRead(*it, 0, handler);
542 if (!status.IsOK()) {
543 Error(
"ReadBuffers",
"%s", status.ToStr().c_str());
549 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
554 for (it = chunkLists.begin(); it != chunkLists.end(); ++it) {
555 XRootDStatus *st = statuses->at(it - chunkLists.begin());
558 Error(
"ReadBuffers",
"%s", st->ToStr().c_str());
559 for( ; it != chunkLists.end(); ++it )
561 st = statuses->at( it - chunkLists.begin() );
573 fBytesRead += totalBytes;
574 fgBytesRead += totalBytes;
579 fOffset = position[0];
580 gPerfStats->FileReadEvent(
this, totalBytes, start);
583 if (gMonitoringWriter)
584 gMonitoringWriter->SendFileReadProgress(
this);
598 Bool_t TNetXNGFile::WriteBuffer(
const char *buffer, Int_t length)
600 using namespace XrdCl;
608 Info(
"WriteBuffer",
"file not writable");
614 if ((status = WriteBufferViaCache(buffer, length))) {
621 XRootDStatus st = fFile->Write(fOffset, length, buffer);
623 Error(
"WriteBuffer",
"%s", st.ToStr().c_str());
629 fBytesWrite += length;
630 fgBytesWrite += length;
637 void TNetXNGFile::Flush()
644 Info(
"Flush",
"file not writable - do nothing");
652 XrdCl::XRootDStatus status = fFile->Sync();
654 Error(
"Flush",
"%s", status.ToStr().c_str());
657 Info(
"Flush",
"XrdClient::Sync succeeded.");
666 void TNetXNGFile::Seek(Long64_t offset, ERelativeTo position)
668 SetOffset(offset, position);
682 Int_t TNetXNGFile::ParseOpenMode(Option_t *in, TString &modestr,
683 XrdCl::OpenFlags::Flags &mode,
686 using namespace XrdCl;
687 modestr = ToUpper(TString(in));
689 if (modestr ==
"NEW" || modestr ==
"CREATE") mode = OpenFlags::New;
690 else if (modestr ==
"RECREATE") mode = OpenFlags::Delete;
691 else if (modestr ==
"UPDATE") mode = OpenFlags::Update;
692 else if (modestr ==
"READ") mode = OpenFlags::Read;
698 mode = OpenFlags::Read;
707 Bool_t TNetXNGFile::IsUseable()
const
710 Error(
"TNetXNGFile",
"Object is in 'zombie' state");
715 Error(
"TNetXNGFile",
"The remote file is not open");
726 Bool_t TNetXNGFile::GetVectorReadLimits()
728 using namespace XrdCl;
734 if (!fQueryReadVParams)
737 #if XrdVNUMBER >= 40000
739 fFile->GetProperty(
"LastURL",lasturl);
743 if(lrl.GetProtocol().compare(
"file") == 0 &&
744 lrl.GetHostId().compare(
"localhost") == 0){
746 Info(
"GetVectorReadLimits",
"Local redirect, using default values");
750 std::string dataServerStr;
751 if( !fFile->GetProperty(
"DataServer", dataServerStr ) )
753 URL dataServer(dataServerStr);
755 URL dataServer(fFile->GetDataServer());
757 FileSystem fs(dataServer);
760 arg.FromString(std::string(
"readv_ior_max readv_iov_max"));
762 XRootDStatus status = fs.Query(QueryCode::Config, arg, response);
769 std::vector<TString> resps;
770 while (TString(response->ToString()).Tokenize(token, from,
"\n"))
771 resps.push_back(token);
773 if (resps.size() != 2)
776 if (resps[0].IsDigit())
777 fReadvIorMax = resps[0].Atoi();
779 if (resps[1].IsDigit())
780 fReadvIovMax = resps[1].Atoi();
786 if( fReadvIovMax == 0x7FFFFFFF )
789 fReadvIorMax = 2097136;
798 void TNetXNGFile::SetEnv()
800 XrdCl::Env *env = XrdCl::DefaultEnv::GetEnv();
801 const char *cenv = 0;
804 val = gEnv->GetValue(
"NetXNG.ConnectionWindow",
"");
805 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_CONNECTIONWINDOW"))
806 || strlen(cenv) <= 0))
807 env->PutInt(
"ConnectionWindow", val.Atoi());
809 val = gEnv->GetValue(
"NetXNG.ConnectionRetry",
"");
810 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_CONNECTIONRETRY"))
811 || strlen(cenv) <= 0))
812 env->PutInt(
"RequestTimeout", val.Atoi());
814 val = gEnv->GetValue(
"NetXNG.RequestTimeout",
"");
815 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_REQUESTTIMEOUT"))
816 || strlen(cenv) <= 0))
817 env->PutInt(
"RequestTimeout", val.Atoi());
819 val = gEnv->GetValue(
"NetXNG.SubStreamsPerChannel",
"");
820 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_SUBSTREAMSPERCHANNEL"))
821 || strlen(cenv) <= 0))
822 env->PutInt(
"SubStreamsPerChannel", val.Atoi());
824 val = gEnv->GetValue(
"NetXNG.TimeoutResolution",
"");
825 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_TIMEOUTRESOLUTION"))
826 || strlen(cenv) <= 0))
827 env->PutInt(
"TimeoutResolution", val.Atoi());
829 val = gEnv->GetValue(
"NetXNG.StreamErrorWindow",
"");
830 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_STREAMERRORWINDOW"))
831 || strlen(cenv) <= 0))
832 env->PutInt(
"StreamErrorWindow", val.Atoi());
834 val = gEnv->GetValue(
"NetXNG.RunForkHandler",
"");
835 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_RUNFORKHANDLER"))
836 || strlen(cenv) <= 0))
837 env->PutInt(
"RunForkHandler", val.Atoi());
839 val = gEnv->GetValue(
"NetXNG.RedirectLimit",
"");
840 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_REDIRECTLIMIT"))
841 || strlen(cenv) <= 0))
842 env->PutInt(
"RedirectLimit", val.Atoi());
844 val = gEnv->GetValue(
"NetXNG.WorkerThreads",
"");
845 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_WORKERTHREADS"))
846 || strlen(cenv) <= 0))
847 env->PutInt(
"WorkerThreads", val.Atoi());
849 val = gEnv->GetValue(
"NetXNG.CPChunkSize",
"");
850 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_CPCHUNKSIZE"))
851 || strlen(cenv) <= 0))
852 env->PutInt(
"CPChunkSize", val.Atoi());
854 val = gEnv->GetValue(
"NetXNG.CPParallelChunks",
"");
855 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_CPPARALLELCHUNKS"))
856 || strlen(cenv) <= 0))
857 env->PutInt(
"CPParallelChunks", val.Atoi());
859 val = gEnv->GetValue(
"NetXNG.PollerPreference",
"");
860 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_POLLERPREFERENCE"))
861 || strlen(cenv) <= 0))
862 env->PutString(
"PollerPreference", val.Data());
864 val = gEnv->GetValue(
"NetXNG.ClientMonitor",
"");
865 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_CLIENTMONITOR"))
866 || strlen(cenv) <= 0))
867 env->PutString(
"ClientMonitor", val.Data());
869 val = gEnv->GetValue(
"NetXNG.ClientMonitorParam",
"");
870 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XRD_CLIENTMONITORPARAM"))
871 || strlen(cenv) <= 0))
872 env->PutString(
"ClientMonitorParam", val.Data());
874 fQueryReadVParams = gEnv->GetValue(
"NetXNG.QueryReadVParams", 1);
875 env->PutInt(
"MultiProtocol", gEnv->GetValue(
"TFile.CrossProtocolRedirects", 1));
879 netrc.Form(
"%s/.rootnetrc", gSystem->HomeDirectory());
880 gSystem->Setenv(
"XrdSecNETRC", netrc.Data());
883 val = gEnv->GetValue(
"XSec.Pwd.ALogFile",
"");
884 if (val.Length() > 0)
885 gSystem->Setenv(
"XrdSecPWDALOGFILE", val.Data());
887 val = gEnv->GetValue(
"XSec.Pwd.ServerPuk",
"");
888 if (val.Length() > 0)
889 gSystem->Setenv(
"XrdSecPWDSRVPUK", val.Data());
891 val = gEnv->GetValue(
"XSec.GSI.CAdir",
"");
892 if (val.Length() > 0)
893 gSystem->Setenv(
"XrdSecGSICADIR", val.Data());
895 val = gEnv->GetValue(
"XSec.GSI.CRLdir",
"");
896 if (val.Length() > 0)
897 gSystem->Setenv(
"XrdSecGSICRLDIR", val.Data());
899 val = gEnv->GetValue(
"XSec.GSI.CRLextension",
"");
900 if (val.Length() > 0)
901 gSystem->Setenv(
"XrdSecGSICRLEXT", val.Data());
903 val = gEnv->GetValue(
"XSec.GSI.UserCert",
"");
904 if (val.Length() > 0)
905 gSystem->Setenv(
"XrdSecGSIUSERCERT", val.Data());
907 val = gEnv->GetValue(
"XSec.GSI.UserKey",
"");
908 if (val.Length() > 0)
909 gSystem->Setenv(
"XrdSecGSIUSERKEY", val.Data());
911 val = gEnv->GetValue(
"XSec.GSI.UserProxy",
"");
912 if (val.Length() > 0)
913 gSystem->Setenv(
"XrdSecGSIUSERPROXY", val.Data());
915 val = gEnv->GetValue(
"XSec.GSI.ProxyValid",
"");
916 if (val.Length() > 0)
917 gSystem->Setenv(
"XrdSecGSIPROXYVALID", val.Data());
919 val = gEnv->GetValue(
"XSec.GSI.ProxyKeyBits",
"");
920 if (val.Length() > 0)
921 gSystem->Setenv(
"XrdSecGSIPROXYKEYBITS", val.Data());
923 val = gEnv->GetValue(
"XSec.GSI.ProxyForward",
"0");
924 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XrdSecGSIPROXYDEPLEN"))
925 || strlen(cenv) <= 0))
926 gSystem->Setenv(
"XrdSecGSIPROXYDEPLEN", val.Data());
928 val = gEnv->GetValue(
"XSec.GSI.CheckCRL",
"1");
929 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XrdSecGSICRLCHECK"))
930 || strlen(cenv) <= 0))
931 gSystem->Setenv(
"XrdSecGSICRLCHECK", val.Data());
933 val = gEnv->GetValue(
"XSec.GSI.DelegProxy",
"0");
934 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XrdSecGSIDELEGPROXY"))
935 || strlen(cenv) <= 0))
936 gSystem->Setenv(
"XrdSecGSIDELEGPROXY", val.Data());
938 val = gEnv->GetValue(
"XSec.GSI.SignProxy",
"1");
939 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XrdSecGSISIGNPROXY"))
940 || strlen(cenv) <= 0))
941 gSystem->Setenv(
"XrdSecGSISIGNPROXY", val.Data());
943 val = gEnv->GetValue(
"XSec.Pwd.AutoLogin",
"1");
944 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XrdSecPWDAUTOLOG"))
945 || strlen(cenv) <= 0))
946 gSystem->Setenv(
"XrdSecPWDAUTOLOG", val.Data());
948 val = gEnv->GetValue(
"XSec.Pwd.VerifySrv",
"1");
949 if (val.Length() > 0 && (!(cenv = gSystem->Getenv(
"XrdSecPWDVERIFYSRV"))
950 || strlen(cenv) <= 0))
951 gSystem->Setenv(
"XrdSecPWDVERIFYSRV", val.Data());