Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TProofOutputFile.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Long Tran-Thanh 14/09/07
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2002, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TProofOutputFile
13 \ingroup proofkernel
14 
15 Class to steer the merging of files produced on the workers
16 
17 */
18 
19 #include "TProofOutputFile.h"
20 #include <TEnv.h>
21 #include <TError.h>
22 #include <TFileCollection.h>
23 #include <TFileInfo.h>
24 #include <TFileMerger.h>
25 #include <TFile.h>
26 #include <TList.h>
27 #include <TObjArray.h>
28 #include <TObject.h>
29 #include <TObjString.h>
30 #include <TProofDebug.h>
31 #include <TProofServ.h>
32 #include <TSystem.h>
33 #include <TUUID.h>
34 
35 ClassImp(TProofOutputFile);
36 
37 ////////////////////////////////////////////////////////////////////////////////
38 /// Main constructor
39 
40 TProofOutputFile::TProofOutputFile(const char *path,
41  ERunType type, UInt_t opt, const char *dsname)
42  : TNamed(path, ""), fRunType(type), fTypeOpt(opt)
43 {
44  fIsLocal = kFALSE;
45  fMerged = kFALSE;
46  fMerger = 0;
47  fDataSet = 0;
48  ResetBit(TProofOutputFile::kRetrieve);
49  ResetBit(TProofOutputFile::kSwapFile);
50 
51  Init(path, dsname);
52 }
53 
54 ////////////////////////////////////////////////////////////////////////////////
55 /// Constructor with the old signature, kept for convenience and backard compatibility.
56 /// Options:
57 /// 'M' merge: finally merge the created files
58 /// 'L' local: copy locally the files before merging (implies 'M')
59 /// 'D' dataset: create a TFileCollection
60 /// 'R' register: dataset run with dataset registration
61 /// 'O' overwrite: force dataset replacement during registration
62 /// 'V' verify: verify the registered dataset
63 /// 'H' merge histograms in one go (option to TFileMerger)
64 /// Special 'option' values for backward compatibility:
65 /// "" equivalent to "M"
66 /// "LOCAL" equivalent to "ML" or "L"
67 
68 TProofOutputFile::TProofOutputFile(const char *path,
69  const char *option, const char *dsname)
70  : TNamed(path, "")
71 {
72  fIsLocal = kFALSE;
73  fMerged = kFALSE;
74  fMerger = 0;
75  fDataSet = 0;
76  fMergeHistosOneGo = kFALSE;
77 
78  // Fill the run type and option type
79  fRunType = kMerge;
80  fTypeOpt = kRemote;
81  if (option && strlen(option) > 0) {
82  TString opt(option);
83  if (opt.Contains("L") || (opt == "LOCAL")) fTypeOpt = kLocal;
84  if (opt.Contains("H")) fMergeHistosOneGo = kTRUE;
85  if (!opt.Contains("M") && opt.Contains("D")) {
86  // Dataset creation mode
87  fRunType = kDataset;
88  fTypeOpt = kCreate;
89  if (opt.Contains("R")) fTypeOpt = (ETypeOpt) (fTypeOpt | kRegister);
90  if (opt.Contains("O")) fTypeOpt = (ETypeOpt) (fTypeOpt | kOverwrite);
91  if (opt.Contains("V")) fTypeOpt = (ETypeOpt) (fTypeOpt | kVerify);
92  }
93  }
94 
95  Init(path, dsname);
96 }
97 
98 ////////////////////////////////////////////////////////////////////////////////
99 /// Initializer. Called by all constructors
100 
101 void TProofOutputFile::Init(const char *path, const char *dsname)
102 {
103  fLocalHost = TUrl(gSystem->HostName()).GetHostFQDN();
104  Int_t port = gEnv->GetValue("ProofServ.XpdPort", -1);
105  if (port > -1) {
106  fLocalHost += ":";
107  fLocalHost += port;
108  }
109 
110  TString xpath(path);
111  // Resolve the relevant placeholders in fFileName (e.g. root://a.ser.ver//data/dir/<group>/<user>/file)
112  TProofServ::ResolveKeywords(xpath, 0);
113  TUrl u(xpath, kTRUE);
114  // File name
115  fFileName = u.GetFile();
116  // The name is used to identify this entity
117  SetName(gSystem->BaseName(fFileName.Data()));
118  // The title is the dataset name in the case such option is chosen.
119  // In the merging case it can be the final location of the file on the client if the retrieve
120  // option is chosen; if the case, this set in TProofPlayer::MergeOutputFiles.
121  if (fRunType == kDataset) {
122  if (dsname && strlen(dsname) > 0) {
123  // This is the dataset name in case such option is chosen
124  SetTitle(dsname);
125  } else {
126  // Default dataset name
127  SetTitle(GetName());
128  }
129  }
130  // Options and anchor, if any
131  if (u.GetOptions() && strlen(u.GetOptions()) > 0)
132  fOptionsAnchor += TString::Format("?%s", u.GetOptions());
133  if (u.GetAnchor() && strlen(u.GetAnchor()) > 0)
134  fOptionsAnchor += TString::Format("#%s", u.GetAnchor());
135  // Path
136  fIsLocal = kFALSE;
137  fDir = u.GetUrl();
138  Int_t pos = fDir.Index(fFileName);
139  if (pos != kNPOS) fDir.Remove(pos);
140  fRawDir = fDir;
141 
142  if (fDir.BeginsWith("file:")) {
143  fIsLocal = kTRUE;
144  // For local files, the user is allowed to create files under the specified directory.
145  // If this is not the case, the file is rooted automatically to the assigned dir which
146  // is the datadir for dataset creation runs, and the working dir for merging runs
147  TString dirPath = gSystem->DirName(fFileName);
148  fFileName = gSystem->BaseName(fFileName);
149  if (AssertDir(dirPath) != 0)
150  Error("Init", "problems asserting path '%s'", dirPath.Data());
151  TString dirData = (!IsMerge() && gProofServ) ? gProofServ->GetDataDir()
152  : gSystem->WorkingDirectory();
153  if ((dirPath[0] == '/') && gSystem->AccessPathName(dirPath, kWritePermission)) {
154  Warning("Init", "not allowed to create files under '%s' - chrooting to '%s'",
155  dirPath.Data(), dirData.Data());
156  dirPath.Insert(0, dirData);
157  } else if (dirPath.BeginsWith("..")) {
158  dirPath.Remove(0, 2);
159  if (dirPath[0] != '/') dirPath.Insert(0, "/");
160  dirPath.Insert(0, dirData);
161  } else if (dirPath[0] == '.' || dirPath[0] == '~') {
162  dirPath.Remove(0, 1);
163  if (dirPath[0] != '/') dirPath.Insert(0, "/");
164  dirPath.Insert(0, dirData);
165  } else if (dirPath.IsNull()) {
166  dirPath = dirData;
167  }
168  // Make sure that session-tag, ordinal and query sequential number are present otherwise
169  // we may override outputs from other workers
170  if (gProofServ) {
171  if (!IsMerge() || (!dirPath.BeginsWith(gProofServ->GetDataDir()) &&
172  !dirPath.BeginsWith(gSystem->WorkingDirectory()))) {
173  if (!dirPath.Contains(gProofServ->GetOrdinal())) {
174  if (!dirPath.EndsWith("/")) dirPath += "/";
175  dirPath += gProofServ->GetOrdinal();
176  }
177  }
178  if (!IsMerge()) {
179  if (!dirPath.Contains(gProofServ->GetSessionTag())) {
180  if (!dirPath.EndsWith("/")) dirPath += "/";
181  dirPath += gProofServ->GetSessionTag();
182  }
183  if (!dirPath.Contains("<qnum>")) {
184  if (!dirPath.EndsWith("/")) dirPath += "/";
185  dirPath += "<qnum>";
186  }
187  // Resolve the relevant placeholders
188  TProofServ::ResolveKeywords(dirPath, 0);
189  }
190  }
191  // Save the raw directory
192  fRawDir = dirPath;
193  // Make sure the the path exists
194  if (AssertDir(dirPath) != 0)
195  Error("Init", "problems asserting path '%s'", dirPath.Data());
196  // Take into account local server settings
197  TProofServ::GetLocalServer(fDir);
198  TProofServ::FilterLocalroot(dirPath, fDir);
199  // The path to be used to address the file
200  fDir += dirPath;
201  }
202  // Notify
203  Info("Init", "dir: %s (raw: %s)", fDir.Data(), fRawDir.Data());
204 
205  // Default output file name
206  ResetBit(TProofOutputFile::kOutputFileNameSet);
207  fOutputFileName = "<file>";
208  if (gEnv->Lookup("Proof.OutputFile")) {
209  fOutputFileName = gEnv->GetValue("Proof.OutputFile", "<file>");
210  SetBit(TProofOutputFile::kOutputFileNameSet);
211  }
212  // Add default file name
213  TString fileName = path;
214  if (!fileName.EndsWith(".root")) fileName += ".root";
215  // Make sure that the file name was inserted (may not happen if the placeholder <file> is missing)
216  if (!fOutputFileName.IsNull() && !fOutputFileName.Contains("<file>")) {
217  if (!fOutputFileName.EndsWith("/")) fOutputFileName += "/";
218  fOutputFileName += fileName;
219  }
220  // Resolve placeholders
221  fileName.ReplaceAll("<ord>",""); // No ordinal in the final merged file
222  TProofServ::ResolveKeywords(fOutputFileName, fileName);
223  Info("Init", "output file url: %s", fOutputFileName.Data());
224  // Fill ordinal
225  fWorkerOrdinal = "<ord>";
226  TProofServ::ResolveKeywords(fWorkerOrdinal, 0);
227 }
228 
229 ////////////////////////////////////////////////////////////////////////////////
230 /// Main destructor
231 
232 TProofOutputFile::~TProofOutputFile()
233 {
234  if (fDataSet) delete fDataSet;
235  if (fMerger) delete fMerger;
236 }
237 
238 ////////////////////////////////////////////////////////////////////////////////
239 /// Set the name of the output file; in the form of an Url.
240 
241 void TProofOutputFile::SetOutputFileName(const char *name)
242 {
243  if (name && strlen(name) > 0) {
244  fOutputFileName = name;
245  TProofServ::ResolveKeywords(fOutputFileName);
246  PDB(kOutput,1) Info("SetOutputFileName", "output file url: %s", fOutputFileName.Data());
247  } else {
248  fOutputFileName = "";
249  }
250  SetBit(TProofOutputFile::kOutputFileNameSet);
251 }
252 
253 ////////////////////////////////////////////////////////////////////////////////
254 /// Open the file using the unique temporary name
255 
256 TFile* TProofOutputFile::OpenFile(const char* opt)
257 {
258  if (fFileName.IsNull()) return 0;
259 
260  // Create the path
261  TString fileLoc;
262  fileLoc.Form("%s/%s%s", fRawDir.Data(), fFileName.Data(), fOptionsAnchor.Data());
263 
264  // Open the file
265  TFile *retFile = TFile::Open(fileLoc, opt);
266 
267  return retFile;
268 }
269 
270 ////////////////////////////////////////////////////////////////////////////////
271 /// Adopt a file already open.
272 /// Return 0 if OK, -1 in case of failure
273 
274 Int_t TProofOutputFile::AdoptFile(TFile *f)
275 {
276  if (!f || (f && f->IsZombie())) {
277  Error("AdoptFile", "file is undefined or zombie!");
278  return -1;
279  }
280  const TUrl *u = f->GetEndpointUrl();
281  if (!u) {
282  Error("AdoptFile", "file end-point url is undefined!");
283  return -1;
284  }
285 
286  // Set the name and dir
287  fIsLocal = kFALSE;
288  if (!strcmp(u->GetProtocol(), "file")) {
289  fIsLocal = kTRUE;
290  fDir = u->GetFile();
291  } else {
292  fDir = u->GetUrl();
293  }
294  fFileName = gSystem->BaseName(fDir.Data());
295  fDir.ReplaceAll(fFileName, "");
296  fRawDir = fDir;
297 
298  // If local remove prefix, if any
299  if (fIsLocal) {
300  TString localDS;
301  TProofServ::GetLocalServer(localDS);
302  if (!localDS.IsNull()) {
303  TProofServ::FilterLocalroot(fDir, localDS);
304  fDir.Insert(0, localDS);
305  }
306  }
307 
308  return 0;
309 }
310 
311 ////////////////////////////////////////////////////////////////////////////////
312 /// Merge objects from the list into this object
313 
314 Long64_t TProofOutputFile::Merge(TCollection* list)
315 {
316  PDB(kOutput,2) Info("Merge","enter: merge? %d", IsMerge());
317 
318  // Needs somethign to merge
319  if(!list || list->IsEmpty()) return 0;
320 
321  if (IsMerge()) {
322  // Build-up the merger
323  TString fileLoc;
324  TString outputFileLoc = (fOutputFileName.IsNull()) ? fFileName : fOutputFileName;
325  // Get the file merger instance
326  Bool_t localMerge = (fRunType == kMerge && fTypeOpt == kLocal) ? kTRUE : kFALSE;
327  TFileMerger *merger = GetFileMerger(localMerge);
328  if (!merger) {
329  Error("Merge", "could not instantiate the file merger");
330  return -1;
331  }
332 
333  if (!fMerged) {
334  merger->OutputFile(outputFileLoc);
335  fileLoc.Form("%s/%s", fDir.Data(), GetFileName());
336  AddFile(merger, fileLoc);
337  fMerged = kTRUE;
338  }
339 
340  TIter next(list);
341  TObject *o = 0;
342  while((o = next())) {
343  TProofOutputFile *pFile = dynamic_cast<TProofOutputFile *>(o);
344  if (pFile) {
345  fileLoc.Form("%s/%s", pFile->GetDir(), pFile->GetFileName());
346  AddFile(merger, fileLoc);
347  }
348  }
349  } else {
350  // Get the reference MSS url, if any
351  TUrl mssUrl(gEnv->GetValue("ProofServ.PoolUrl",""));
352  // Build-up the TFileCollection
353  TFileCollection *dataset = GetFileCollection();
354  if (!dataset) {
355  Error("Merge", "could not instantiate the file collection");
356  return -1;
357  }
358  fMerged = kTRUE;
359  TString path;
360  TFileInfo *fi = 0;
361  // If new, add ourseelves
362  dataset->Update();
363  PDB(kOutput,2) Info("Merge","dataset: %s (nfiles: %lld)", dataset->GetName(), dataset->GetNFiles());
364  if (dataset->GetNFiles() == 0) {
365  // Save the export and raw urls
366  path.Form("%s/%s%s", GetDir(), GetFileName(), GetOptionsAnchor());
367  fi = new TFileInfo(path);
368  // Add also an URL with the redirector path, if any
369  if (mssUrl.IsValid()) {
370  TUrl ur(fi->GetFirstUrl()->GetUrl());
371  ur.SetProtocol(mssUrl.GetProtocol());
372  ur.SetHost(mssUrl.GetHost());
373  ur.SetPort(mssUrl.GetPort());
374  if (mssUrl.GetUser() && strlen(mssUrl.GetUser()) > 0)
375  ur.SetUser(mssUrl.GetUser());
376  fi->AddUrl(ur.GetUrl());
377  }
378  // Add special local URL to keep track of the file
379  path.Form("%s/%s?node=%s", GetDir(kTRUE), GetFileName(), GetLocalHost());
380  fi->AddUrl(path);
381  PDB(kOutput,2) fi->Print();
382  // Now add to the dataset
383  dataset->Add(fi);
384  }
385 
386  TIter next(list);
387  TObject *o = 0;
388  while((o = next())) {
389  TProofOutputFile *pFile = dynamic_cast<TProofOutputFile *>(o);
390  if (pFile) {
391  // Save the export and raw urls
392  path.Form("%s/%s%s", pFile->GetDir(), pFile->GetFileName(), pFile->GetOptionsAnchor());
393  fi = new TFileInfo(path);
394  // Add also an URL with the redirector path, if any
395  if (mssUrl.IsValid()) {
396  TUrl ur(fi->GetFirstUrl()->GetUrl());
397  ur.SetProtocol(mssUrl.GetProtocol());
398  ur.SetHost(mssUrl.GetHost());
399  ur.SetPort(mssUrl.GetPort());
400  if (mssUrl.GetUser() && strlen(mssUrl.GetUser()) > 0)
401  ur.SetUser(mssUrl.GetUser());
402  fi->AddUrl(ur.GetUrl());
403  }
404  // Add special local URL to keep track of the file
405  path.Form("%s/%s?node=%s", pFile->GetDir(kTRUE), pFile->GetFileName(), pFile->GetLocalHost());
406  fi->AddUrl(path);
407  PDB(kOutput,2) fi->Print();
408  // Now add to the dataset
409  dataset->Add(fi);
410  }
411  }
412  }
413  PDB(kOutput,2) Info("Merge","Done");
414 
415  // Done
416  return 0;
417 }
418 
419 ////////////////////////////////////////////////////////////////////////////////
420 /// Dump the class content
421 
422 void TProofOutputFile::Print(Option_t *) const
423 {
424  Info("Print","-------------- %s : start (%s) ------------", GetName(), fLocalHost.Data());
425  Info("Print"," dir: %s", fDir.Data());
426  Info("Print"," raw dir: %s", fRawDir.Data());
427  Info("Print"," file name: %s%s", fFileName.Data(), fOptionsAnchor.Data());
428  if (IsMerge()) {
429  Info("Print"," run type: create a merged file");
430  Info("Print"," merging option: %s",
431  (fTypeOpt == kLocal) ? "local copy" : "keep remote");
432  } else {
433  TString opt;
434  if ((fTypeOpt & kRegister)) opt += "R";
435  if ((fTypeOpt & kOverwrite)) opt += "O";
436  if ((fTypeOpt & kVerify)) opt += "V";
437  Info("Print"," run type: create dataset (name: '%s', opt: '%s')",
438  GetTitle(), opt.Data());
439  }
440  Info("Print"," output file name: %s", fOutputFileName.Data());
441  Info("Print"," ordinal: %s", fWorkerOrdinal.Data());
442  Info("Print","-------------- %s : done -------------", GetName());
443 
444  return;
445 }
446 
447 ////////////////////////////////////////////////////////////////////////////////
448 /// Notify error message
449 
450 void TProofOutputFile::NotifyError(const char *msg)
451 {
452  if (msg) {
453  if (gProofServ)
454  gProofServ->SendAsynMessage(msg);
455  else
456  Printf("%s", msg);
457  } else {
458  Info("NotifyError","called with empty message");
459  }
460 
461  return;
462 }
463 
464 ////////////////////////////////////////////////////////////////////////////////
465 /// Add file to merger, checking the result
466 
467 void TProofOutputFile::AddFile(TFileMerger *merger, const char *path)
468 {
469  if (merger && path) {
470  if (!merger->AddFile(path))
471  NotifyError(Form("TProofOutputFile::AddFile:"
472  " error from TFileMerger::AddFile(%s)", path));
473  }
474 }
475 
476 ////////////////////////////////////////////////////////////////////////////////
477 /// Unlink path
478 
479 void TProofOutputFile::Unlink(const char *path)
480 {
481  if (path) {
482  if (!gSystem->AccessPathName(path)) {
483  if (gSystem->Unlink(path) != 0)
484  NotifyError(Form("TProofOutputFile::Unlink:"
485  " error from TSystem::Unlink(%s)", path));
486  }
487  }
488 }
489 
490 ////////////////////////////////////////////////////////////////////////////////
491 /// Get instance of the file collection to be used in 'dataset' mode
492 
493 TFileCollection *TProofOutputFile::GetFileCollection()
494 {
495  if (!fDataSet)
496  fDataSet = new TFileCollection(GetTitle());
497  return fDataSet;
498 }
499 
500 ////////////////////////////////////////////////////////////////////////////////
501 /// Get instance of the file merger to be used in 'merge' mode
502 
503 TFileMerger *TProofOutputFile::GetFileMerger(Bool_t local)
504 {
505  if (!fMerger)
506  fMerger = new TFileMerger(local, fMergeHistosOneGo);
507  return fMerger;
508 }
509 
510 ////////////////////////////////////////////////////////////////////////////////
511 /// Assert directory path 'dirpath', with the ownership of the last already
512 /// existing subpath.
513 /// Return 0 on success, -1 on error
514 
515 Int_t TProofOutputFile::AssertDir(const char *dirpath)
516 {
517  TString existsPath(dirpath);
518  TList subPaths;
519  while (existsPath != "/" && existsPath != "." && gSystem->AccessPathName(existsPath)) {
520  subPaths.AddFirst(new TObjString(gSystem->BaseName(existsPath)));
521  existsPath = gSystem->DirName(existsPath);
522  }
523  subPaths.SetOwner(kTRUE);
524  FileStat_t st;
525  if (gSystem->GetPathInfo(existsPath, st) == 0) {
526  TString xpath = existsPath;
527  TIter nxp(&subPaths);
528  TObjString *os = 0;
529  while ((os = (TObjString *) nxp())) {
530  xpath += TString::Format("/%s", os->GetName());
531  if (gSystem->mkdir(xpath, kTRUE) == 0) {
532  if (gSystem->Chmod(xpath, (UInt_t) st.fMode) != 0)
533  ::Warning("TProofOutputFile::AssertDir", "problems setting mode on '%s'", xpath.Data());
534  } else {
535  ::Error("TProofOutputFile::AssertDir", "problems creating path '%s'", xpath.Data());
536  return -1;
537  }
538  }
539  } else {
540  ::Warning("TProofOutputFile::AssertDir", "could not get info for path '%s': will only try to create"
541  " the full path w/o trying to set the mode", existsPath.Data());
542  if (gSystem->mkdir(existsPath, kTRUE) != 0) {
543  ::Error("TProofOutputFile::AssertDir", "problems creating path '%s'", existsPath.Data());
544  return -1;
545  }
546  }
547  // Done
548  return 0;
549 }