Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TSelVerifyDataSet.cxx
Go to the documentation of this file.
1 // @(#)root/proof:$Id$
2 // Author: Sangsu Ryu 28/06/2011
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 /** \class TSelVerifyDataSet
13 \ingroup proofkernel
14 
15 Selector to verify dataset in parallel on workers
16 
17 */
18 
19 #define TSelVerifyDataSet_cxx
20 
21 #include "TSelVerifyDataSet.h"
22 #include "TDataSetManager.h"
23 #include "TDSet.h"
24 #include "TParameter.h"
25 #include "TTree.h"
26 #include "TFile.h"
27 #include "TNamed.h"
28 #include "TSystem.h"
29 #include "TROOT.h"
30 #include "TEnv.h"
31 #include "TFileStager.h"
32 #include "TProofDebug.h"
33 #include "TProofServ.h"
34 #include "TFileCollection.h"
35 #include "TFileInfo.h"
36 
37 ClassImp(TSelVerifyDataSet);
38 
39 ////////////////////////////////////////////////////////////////////////////////
40 /// Constructor
41 
42 TSelVerifyDataSet::TSelVerifyDataSet(TTree *)
43 {
44  InitMembers();
45 }
46 
47 ////////////////////////////////////////////////////////////////////////////////
48 /// Constructor
49 
50 TSelVerifyDataSet::TSelVerifyDataSet()
51 {
52  InitMembers();
53 }
54 
55 ////////////////////////////////////////////////////////////////////////////////
56 /// Initialize members
57 
58 void TSelVerifyDataSet::InitMembers()
59 {
60  fFopt = -1;
61  fSopt = 0;
62  fRopt = 0;
63 
64  fAllf = 0;
65  fCheckstg = 0;
66  fNonStgf = 0;
67  fReopen = 0;
68  fTouch = 0;
69  fStgf = 0;
70  fNoaction = 0;
71  fFullproc = 0;
72  fLocateonly = 0;
73  fStageonly = 0;
74  fDoall = 0;
75  fGetlistonly = 0;
76  fScanlist = 0;
77  fDbg = 0;
78 
79  fChangedDs = kFALSE;
80  fTouched = 0;
81  fOpened = 0;
82  fDisappeared = 0;
83  fSubDataSet = 0;
84 }
85 
86 ////////////////////////////////////////////////////////////////////////////////
87 /// Worker Begin
88 
89 void TSelVerifyDataSet::SlaveBegin(TTree *)
90 {
91  TString dsname, opts;
92 
93  TNamed* par = dynamic_cast<TNamed*>(fInput->FindObject("PROOF_VerifyDataSet"));
94  if (par) {
95  dsname = par->GetTitle();
96  } else {
97  Abort("cannot find dataset name: cannot continue", kAbortProcess);
98  return;
99  }
100 
101  par = dynamic_cast<TNamed*>(fInput->FindObject("PROOF_VerifyDataSetOption"));
102  if (par) {
103  opts = par->GetTitle();
104  } else {
105  Abort("cannot find verify options: cannot continue", kAbortProcess);
106  return;
107  }
108 
109  par = dynamic_cast<TNamed*>(fInput->FindObject("PROOF_MSS"));
110  if (par) {
111  fMss = par->GetTitle();
112  PDB(kSelector, 2) Info("SlaveBegin", "dataset MSS: '%s'", fMss.Data());
113  }
114 
115  par = dynamic_cast<TNamed*>(fInput->FindObject("PROOF_StageOption"));
116  if (par) {
117  fStageopts = par->GetTitle();
118  PDB(kSelector, 2) Info("SlaveBegin", "dataset stage options: '%s'", fStageopts.Data());
119  }
120 
121  // Extract the directives
122  UInt_t o = 0;
123  if (!opts.IsNull()) {
124  // Selection options
125  if (strstr(opts, "allfiles:") || strchr(opts, 'A'))
126  o |= TDataSetManager::kAllFiles;
127  else if (strstr(opts, "staged:") || strchr(opts, 'D'))
128  o |= TDataSetManager::kStagedFiles;
129  // Pre-action options
130  if (strstr(opts, "open:") || strchr(opts, 'O'))
131  o |= TDataSetManager::kReopen;
132  if (strstr(opts, "touch:") || strchr(opts, 'T'))
133  o |= TDataSetManager::kTouch;
134  if (strstr(opts, "nostagedcheck:") || strchr(opts, 'I'))
135  o |= TDataSetManager::kNoStagedCheck;
136  // Process options
137  if (strstr(opts, "noaction:") || strchr(opts, 'N'))
138  o |= TDataSetManager::kNoAction;
139  if (strstr(opts, "locateonly:") || strchr(opts, 'L'))
140  o |= TDataSetManager::kLocateOnly;
141  if (strstr(opts, "stageonly:") || strchr(opts, 'S'))
142  o |= TDataSetManager::kStageOnly;
143  // Auxilliary options
144  if (strstr(opts, "verbose:") || strchr(opts, 'V'))
145  o |= TDataSetManager::kDebug;
146  } else {
147  // Default
148  o = TDataSetManager::kReopen | TDataSetManager::kDebug;
149  }
150 
151  PDB(kSelector, 1) Info("SlaveBegin", "o=%d", o);
152  // File selection
153  fFopt = ((o & TDataSetManager::kAllFiles)) ? -1 : 0;
154  if (fFopt >= 0) {
155  if ((o & TDataSetManager::kStagedFiles)) {
156  fFopt = 10;
157  } else {
158  if ((o & TDataSetManager::kReopen)) fFopt++;
159  if ((o & TDataSetManager::kTouch)) fFopt++;
160  }
161  if ((o & TDataSetManager::kNoStagedCheck)) fFopt += 100;
162  } else {
163  if ((o & TDataSetManager::kStagedFiles) || (o & TDataSetManager::kReopen) || (o & TDataSetManager::kTouch)) {
164  Warning("SlaveBegin", "kAllFiles mode: ignoring kStagedFiles or kReopen"
165  " or kTouch requests");
166  }
167  if ((o & TDataSetManager::kNoStagedCheck)) fFopt -= 100;
168  }
169  PDB(kSelector, 1) Info("SlaveBegin", "fFopt=%d", fFopt);
170 
171  // Type of action
172  fSopt = ((o & TDataSetManager::kNoAction)) ? -1 : 0;
173  if (fSopt >= 0) {
174  if ((o & TDataSetManager::kLocateOnly) && (o & TDataSetManager::kStageOnly)) {
175  Error("SlaveBegin", "kLocateOnly and kStageOnly cannot be processed concurrently");
176  return;
177  }
178  if ((o & TDataSetManager::kLocateOnly)) fSopt = 1;
179  if ((o & TDataSetManager::kStageOnly)) fSopt = 2;
180  } else if ((o & TDataSetManager::kLocateOnly) || (o & TDataSetManager::kStageOnly)) {
181  Warning("SlaveBegin", "kNoAction mode: ignoring kLocateOnly or kStageOnly requests");
182  }
183  PDB(kSelector, 1) Info("SlaveBegin", "fSopt=%d", fSopt);
184 
185  fDbg = ((o & TDataSetManager::kDebug)) ? kTRUE : kFALSE;
186 
187  // File selection, Reopen and Touch options
188  fAllf = (fFopt == -1) ? kTRUE : kFALSE;
189  fCheckstg = (fFopt >= 100 || fFopt < -1) ? kFALSE : kTRUE;
190  if (fFopt >= 0) fFopt %= 100;
191  fNonStgf = (fFopt >= 0 && fFopt < 10) ? kTRUE : kFALSE;
192  fReopen = (fFopt >= 1 && fFopt < 10) ? kTRUE : kFALSE;
193  fTouch = (fFopt >= 2 && fFopt < 10) ? kTRUE : kFALSE;
194  fStgf = (fFopt == 10) ? kTRUE : kFALSE;
195 
196  PDB(kSelector, 1) Info("SlaveBegin",
197  "fAllf=%d fCheckstg=%d fNonStgf=%d fReopen=%d fTouch=%d fStgf=%d",
198  fAllf, fCheckstg, fNonStgf, fReopen, fTouch, fStgf);
199 
200  // File processing options
201  fNoaction = (fSopt == -1) ? kTRUE : kFALSE;
202  fFullproc = (fSopt == 0) ? kTRUE : kFALSE;
203  fLocateonly = (fSopt == 1) ? kTRUE : kFALSE;
204  fStageonly = (fSopt == 2) ? kTRUE : kFALSE;
205 
206  PDB(kSelector, 1) Info("SlaveBegin",
207  "fNoaction=%d fFullproc=%d fLocateonly=%d fStageonly=%d",
208  fNoaction, fFullproc, fLocateonly, fStageonly);
209 
210  // Run options
211  fDoall = (fRopt == 0) ? kTRUE : kFALSE;
212  fGetlistonly = (fRopt == 1) ? kTRUE : kFALSE;
213  fScanlist = (fRopt == 2) ? kTRUE : kFALSE;
214 
215  PDB(kSelector, 1) Info("SlaveBegin",
216  "fDoall=%d fGetlistonly=%d fScanlist=%d",
217  fDoall, fGetlistonly, fScanlist);
218 
219  TString hostname(TUrl(gSystem->HostName()).GetHostFQDN());
220  TString thisordinal = gProofServ ? gProofServ->GetOrdinal() : "n.d";
221  TString title =
222  TString::Format("TSelVerifyDataSet_%s_%s", hostname.Data(), thisordinal.Data());
223  fSubDataSet= new TFileCollection(dsname, title);
224 }
225 
226 ////////////////////////////////////////////////////////////////////////////////
227 /// Process a single entry
228 
229 Bool_t TSelVerifyDataSet::Process(Long64_t entry)
230 {
231  TDSetElement *fCurrent = 0;
232  TPair *elemPair = 0;
233  if (fInput && (elemPair = dynamic_cast<TPair *>
234  (fInput->FindObject("PROOF_CurrentElement")))) {
235  if ((fCurrent = dynamic_cast<TDSetElement *>(elemPair->Value())))
236  Info("Process", "entry %lld: file: '%s'", entry, fCurrent->GetName());
237  }
238  if (!fCurrent) {
239  Error("Process", "entry %lld: current element not found!", entry);
240  return kFALSE;
241  }
242 
243  TFileInfo *fileInfo = dynamic_cast<TFileInfo*>(fCurrent->GetAssocObj(0));
244  if (!fileInfo) {
245  Error("Process", "can not get TFileInfo; returning");
246  return kFALSE;
247  }
248 
249  PDB(kSelector, 1) {
250  Info("Process", "input fileinfo: ");
251  fileInfo->Print("L");
252  }
253 
254  TFileStager *stager = 0;
255  Bool_t createStager = kFALSE;
256 
257  TFileInfo* newfileinfo = new TFileInfo(*fileInfo);
258  newfileinfo->SetIndex(fileInfo->GetIndex());
259 
260  if (fDoall || fGetlistonly) {
261 
262  stager = (fMss && strlen(fMss) > 0) ? TFileStager::Open(fMss) : 0;
263  createStager = (stager) ? kFALSE : kTRUE;
264 
265  // Check which files have been staged, this can be replaced by a bulk command,
266  // once it exists in the xrdclient
267 
268  // For real time monitoring
269  gSystem->DispatchOneEvent(kTRUE);
270 
271  Bool_t changed = kFALSE;
272  Bool_t touched = kFALSE;
273  Bool_t disappeared = kFALSE;
274 
275  TDataSetManager::CheckStagedStatus(newfileinfo, fFopt, -1, 0, stager, createStager,
276  fDbg, changed, touched, disappeared);
277 
278  if (changed) fChangedDs = kTRUE;
279  if (touched) fTouched++;
280  if (disappeared) fDisappeared++;
281 
282  SafeDelete(stager);
283 
284  PDB(kSelector, 1) Info("Process",
285  "fChangedDs = %d, fTouched = %d disappeared = %d",
286  fChangedDs, fTouched, fDisappeared);
287 
288  // If required to only get the list we are done
289  if (fGetlistonly) {
290  Info("Process", "updated fileinfo: ");
291  newfileinfo->Print("F");
292  fSubDataSet->Add(newfileinfo);
293  return kTRUE;
294  }
295  }
296 
297  if (!fNoaction && (fDoall || fScanlist)) {
298 
299  // Point to the fileinfo
300  //newStagedFiles = (!fDoall && fScanlist && flist) ? flist : newStagedFiles;
301  if (!fDoall && fScanlist) {
302  SafeDelete(newfileinfo);
303  newfileinfo = new TFileInfo(*fileInfo);
304  newfileinfo->SetIndex(fileInfo->GetIndex());
305  }
306 
307  // Loop over now staged files
308  PDB(kSelector, 1) Info("Process",
309  "file appear to be newly staged; %s",
310  newfileinfo->GetFirstUrl()->GetUrl());
311 
312  // If staging files, prepare the stager
313  if (fLocateonly || fStageonly) {
314  stager = (fMss && strlen(fMss) > 0) ? TFileStager::Open(fMss) : 0;
315  createStager = (stager) ? kFALSE : kTRUE;
316  }
317 
318  // Process the file
319  Bool_t changed = kFALSE;
320  Bool_t opened = kFALSE;
321  TDataSetManager::ProcessFile(newfileinfo, fSopt, fCheckstg, fDoall, stager, createStager, fStageopts,
322  fDbg, changed, opened);
323 
324  if (changed) fChangedDs = kTRUE;
325  if (opened) fOpened++;
326  }
327 
328  PDB(kSelector, 1) {
329  Info("Process", "updated fileinfo: ");
330  newfileinfo->Print("L");
331  }
332  fSubDataSet->Add(newfileinfo);
333 
334  return kTRUE;
335 }
336 
337 ////////////////////////////////////////////////////////////////////////////////
338 /// Worker Terminate
339 
340 void TSelVerifyDataSet::SlaveTerminate()
341 {
342  if (fSubDataSet) {
343  fSubDataSet->Update();
344  if (fSubDataSet->GetNFiles() > 0) {
345  fOutput->Add(fSubDataSet);
346  Info("SlaveTerminate",
347  "sub-dataset '%s' added to the output list (%lld files)",
348  fSubDataSet->GetTitle(), fSubDataSet->GetNFiles());
349  }
350  // Add information for registration
351  fOutput->Add(new TNamed(TString::Format("DATASET_%s", fSubDataSet->GetName()).Data(),"OT:sortidx:"));
352  fOutput->Add(new TNamed("PROOFSERV_RegisterDataSet", ""));
353  }
354 
355  // Send the number of files disppeared, opened and mark 'changed'if any fileinfo in the dataset has changed
356  TString hostname(TUrl(gSystem->HostName()).GetHostFQDN());
357  TString thisordinal = gProofServ ? gProofServ->GetOrdinal() : "n.d";
358  TString sfdisppeared= TString::Format("PROOF_NoFilesDisppeared_%s_%s", hostname.Data(), thisordinal.Data());
359  fOutput->Add(new TParameter<Int_t>(sfdisppeared.Data(), fDisappeared));
360  TString sfOpened= TString::Format("PROOF_NoFilesOpened_%s_%s", hostname.Data(), thisordinal.Data());
361  fOutput->Add(new TParameter<Int_t>(sfOpened.Data(), fOpened));
362  TString sfTouched = TString::Format("PROOF_NoFilesTouched_%s_%s", hostname.Data(), thisordinal.Data());
363  fOutput->Add(new TParameter<Int_t>(sfTouched.Data(), fTouched));
364  TString schanged= TString::Format("PROOF_DataSetChanged_%s_%s", hostname.Data(), thisordinal.Data());
365  fOutput->Add(new TParameter<Bool_t>(schanged.Data(), fChangedDs));
366 }