48 #include <sys/resource.h>
53 ClassImp(TFileMerger);
55 TClassRef R__TH1_Class(
"TH1");
56 TClassRef R__TTree_Class(
"TTree");
58 static const Int_t kCpProgress = BIT(14);
59 static const Int_t kCintFileNumber = 100;
64 static Int_t R__GetSystemMaxOpenedFiles()
68 maxfiles = _getmaxstdio();
71 if (getrlimit(RLIMIT_NOFILE,&filelimit)==0) {
72 maxfiles = filelimit.rlim_cur;
78 if (maxfiles > kCintFileNumber) {
79 return maxfiles - kCintFileNumber;
80 }
else if (maxfiles > 5) {
90 TFileMerger::TFileMerger(Bool_t isLocal, Bool_t histoOneGo)
91 : fMaxOpenedFiles( R__GetSystemMaxOpenedFiles() ),
92 fLocal(isLocal), fHistoOneGo(histoOneGo)
94 fMergeList.SetOwner(kTRUE);
95 fExcessFiles.SetOwner(kTRUE);
97 R__LOCKGUARD(gROOTMutex);
98 gROOT->GetListOfCleanups()->Add(
this);
104 TFileMerger::~TFileMerger()
107 R__LOCKGUARD(gROOTMutex);
108 gROOT->GetListOfCleanups()->Remove(
this);
110 SafeDelete(fOutputFile);
116 void TFileMerger::Reset()
120 fExcessFiles.Clear();
121 fObjectNames.Clear();
127 Bool_t TFileMerger::AddFile(
const char *url, Bool_t cpProgress)
129 if (fPrintLevel > 0) {
130 Printf(
"%s Source file %d: %s", fMsgPrefix.Data(), fFileList.GetEntries() + fExcessFiles.GetEntries() + 1, url);
136 if (fFileList.GetEntries() >= (fMaxOpenedFiles-1)) {
138 TObjString *urlObj =
new TObjString(url);
139 fMergeList.Add(urlObj);
141 urlObj =
new TObjString(url);
142 urlObj->SetBit(kCpProgress);
143 fExcessFiles.Add(urlObj);
148 TDirectory::TContext ctxt;
152 localcopy.Form(
"file:%s/ROOTMERGE-%s.root", gSystem->TempDirectory(), uuid.AsString());
153 if (!TFile::Cp(url, localcopy, cpProgress)) {
154 Error(
"AddFile",
"cannot get a local copy of file %s", url);
157 newfile = TFile::Open(localcopy,
"READ");
159 newfile = TFile::Open(url,
"READ");
163 if (newfile && newfile->IsZombie()) {
170 Error(
"AddFile",
"cannot open local copy %s of URL %s",
171 localcopy.Data(), url);
173 Error(
"AddFile",
"cannot open file %s", url);
176 if (fOutputFile && fOutputFile->GetCompressionLevel() != newfile->GetCompressionLevel()) fCompressionChange = kTRUE;
178 newfile->SetBit(kCanDelete);
179 fFileList.Add(newfile);
181 TObjString *urlObj =
new TObjString(url);
182 fMergeList.Add(urlObj);
194 Bool_t TFileMerger::AddFile(TFile *source, Bool_t cpProgress)
196 return AddFile(source,kFALSE,cpProgress);
205 Bool_t TFileMerger::AddAdoptFile(TFile *source, Bool_t cpProgress)
207 return AddFile(source,kTRUE,cpProgress);
216 Bool_t TFileMerger::AddFile(TFile *source, Bool_t own, Bool_t cpProgress)
218 if (source == 0 || source->IsZombie()) {
222 if (fPrintLevel > 0) {
223 Printf(
"%s Source file %d: %s",fMsgPrefix.Data(),fFileList.GetEntries()+1,source->GetName());
230 TDirectory::TContext ctxt;
231 if (fLocal && !source->InheritsFrom(TMemFile::Class())) {
233 localcopy.Form(
"file:%s/ROOTMERGE-%s.root", gSystem->TempDirectory(), uuid.AsString());
234 if (!source->Cp(localcopy, cpProgress)) {
235 Error(
"AddFile",
"cannot get a local copy of file %s", source->GetName());
238 newfile = TFile::Open(localcopy,
"READ");
240 if (newfile && newfile->IsZombie()) {
250 Error(
"AddFile",
"cannot open local copy %s of URL %s",
251 localcopy.Data(), source->GetName());
253 Error(
"AddFile",
"cannot open file %s", source->GetName());
256 if (fOutputFile && fOutputFile->GetCompressionSettings() != newfile->GetCompressionSettings()) fCompressionChange = kTRUE;
258 if (own || newfile != source) {
259 newfile->SetBit(kCanDelete);
261 newfile->ResetBit(kCanDelete);
263 fFileList.Add(newfile);
265 TObjString *urlObj =
new TObjString(source->GetName());
266 fMergeList.Add(urlObj);
268 if (newfile != source && own) {
278 Bool_t TFileMerger::OutputFile(
const char *outputfile, Bool_t force, Int_t compressionLevel)
280 return OutputFile(outputfile,(force?
"RECREATE":
"CREATE"),compressionLevel);
286 Bool_t TFileMerger::OutputFile(
const char *outputfile, Bool_t force)
288 Bool_t res = OutputFile(outputfile,(force?
"RECREATE":
"CREATE"),1);
289 fExplicitCompLevel = kFALSE;
300 Bool_t TFileMerger::OutputFile(
const char *outputfile,
const char *mode, Int_t compressionLevel)
303 TDirectory::TContext ctxt;
304 if (TFile *outputFile = TFile::Open(outputfile, mode,
"", compressionLevel))
305 return OutputFile(std::unique_ptr<TFile>(outputFile));
307 Error(
"OutputFile",
"cannot open the MERGER output file %s", fOutputFilename.Data());
314 Bool_t TFileMerger::OutputFile(std::unique_ptr<TFile> outputfile)
316 if (!outputfile || outputfile->IsZombie()) {
317 Error(
"OutputFile",
"cannot open the MERGER output file %s", (outputfile) ? outputfile->GetName() :
"");
321 if (!outputfile->IsWritable()) {
322 Error(
"OutputFile",
"output file %s is not writable", outputfile->GetName());
326 fExplicitCompLevel = kTRUE;
328 TFile *oldfile = fOutputFile;
333 fOutputFilename = outputfile->GetName();
335 TDirectory::TContext ctxt;
336 fOutputFile = outputfile.release();
346 Bool_t TFileMerger::OutputFile(
const char *outputfile,
const char *mode )
348 Bool_t res = OutputFile(outputfile,mode,1);
349 fExplicitCompLevel = kFALSE;
356 void TFileMerger::PrintFiles(Option_t *options)
358 fFileList.Print(options);
359 fExcessFiles.Print(options);
369 Bool_t TFileMerger::Merge(Bool_t)
371 return PartialMerge(kAll | kRegular);
379 Bool_t TFileMerger::MergeRecursive(TDirectory *target, TList *sourcelist, Int_t type )
381 Bool_t status = kTRUE;
382 Bool_t onlyListed = kFALSE;
383 if (fPrintLevel > 0) {
384 Printf(
"%s Target path: %s",fMsgPrefix.Data(),target->GetPath());
388 TString path(target->GetPath());
390 path.Remove(0, std::strlen(target->GetFile()->GetPath()));
392 Int_t nguess = sourcelist->GetSize()+1000;
393 THashList allNames(nguess);
394 allNames.SetOwner(kTRUE);
396 if (type & kSkipListed) {
397 TObjArray *arr = fObjectNames.Tokenize(
" ");
398 arr->SetOwner(kFALSE);
399 for (Int_t iname=0; iname<arr->GetEntriesFast(); iname++)
400 allNames.Add(arr->At(iname));
403 ((THashList*)target->GetList())->Rehash(nguess);
404 ((THashList*)target->GetListOfKeys())->Rehash(nguess);
406 TFileMergeInfo info(target);
407 info.fIOFeatures = fIOFeatures;
408 info.fOptions = fMergeOptions;
409 if (fFastMethod && ((type&kKeepCompression) || !fCompressionChange) ) {
410 info.fOptions.Append(
" fast");
414 TDirectory *current_sourcedir;
415 if (type & kIncremental) {
417 current_sourcedir = target;
419 current_file = (TFile*)sourcelist->First();
420 current_sourcedir = current_file->GetDirectory(path);
422 while (current_file || current_sourcedir) {
425 if (current_sourcedir && (current_file == 0 || current_sourcedir != target)) {
428 TIter nextkey( current_sourcedir->GetListOfKeys() );
432 while ( (key = (TKey*)nextkey())) {
437 Bool_t alreadyseen = (oldkeyname == key->GetName()) ? kTRUE : kFALSE;
440 if (strcmp(key->GetClassName(),
"TProcessID") == 0) { key->ReadObj();
continue;}
445 if (allNames.FindObject(key->GetName())) {
446 oldkeyname = key->GetName();
450 TClass *cl = TClass::GetClass(key->GetClassName());
452 Info(
"MergeRecursive",
"cannot indentify object type (%s), name: %s title: %s",
453 key->GetClassName(), key->GetName(), key->GetTitle());
458 if (cl->GetMerge() || cl->InheritsFrom(TDirectory::Class()) ||
459 (cl->IsTObject() && !cl->IsLoaded() &&
463 (cl->GetMethodWithPrototype(
"Merge",
"TCollection*,TFileMergeInfo*") ||
464 cl->GetMethodWithPrototype(
"Merge",
"TCollection*"))))
465 allNames.Add(
new TObjString(key->GetName()));
467 if (fNoTrees && cl->InheritsFrom(R__TTree_Class)) {
469 oldkeyname = key->GetName();
473 if (type & kOnlyListed) {
475 oldkeyname = key->GetName();
477 onlyListed = fObjectNames.Contains(oldkeyname);
478 oldkeyname = key->GetName();
479 if ((!onlyListed) && (!cl->InheritsFrom(TDirectory::Class())))
continue;
482 if (!(type&kResetable && type&kNonResetable)) {
484 if (!(type&kResetable)) {
485 if (cl->GetResetAfterMerge()) {
487 oldkeyname = key->GetName();
491 if (!(type&kNonResetable)) {
492 if (!cl->GetResetAfterMerge()) {
494 oldkeyname = key->GetName();
501 if (type & kIncremental) {
502 obj = current_sourcedir->GetList()->FindObject(key->GetName());
504 obj = key->ReadObj();
507 obj = key->ReadObj();
510 Info(
"MergeRecursive",
"could not read object for key {%s, %s}",
511 key->GetName(), key->GetTitle());
516 if (cl->IsTObject() && cl != obj->IsA()) {
517 Error(
"MergeRecursive",
"TKey and object retrieve disagree on type (%s vs %s). Continuing with %s.",
518 key->GetClassName(), obj->IsA()->GetName(), obj->IsA()->GetName());
521 Bool_t canBeMerged = kTRUE;
523 if ( cl->InheritsFrom( TDirectory::Class() ) ) {
530 if (type & kIncremental || alreadyseen) {
531 newdir = target->GetDirectory(obj->GetName());
533 newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
537 newdir = target->mkdir( obj->GetName(), obj->GetTitle() );
546 if (onlyListed) type &= ~kOnlyListed;
547 status = MergeRecursive(newdir, sourcelist, type);
548 if (onlyListed) type |= kOnlyListed;
549 if (!status)
return status;
550 }
else if (cl->GetMerge()) {
553 if (alreadyseen)
continue;
556 Bool_t oneGo = fHistoOneGo && cl->InheritsFrom(R__TH1_Class);
559 TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
560 if (nextsource == 0) {
562 ROOT::MergeFunc_t func = cl->GetMerge();
563 func(obj, &inputs, &info);
564 info.fIsFirst = kFALSE;
568 TDirectory *ndir = nextsource->GetDirectory(path);
574 TKey *key2 = (TKey*)ndir->GetListOfKeys()->FindObject(key->GetName());
576 TObject *hobj = key2->ReadObj();
578 Info(
"MergeRecursive",
"could not read object for key {%s, %s}; skipping file %s",
579 key->GetName(), key->GetTitle(), nextsource->GetName());
580 nextsource = (TFile*)sourcelist->After(nextsource);
584 if (hobj->InheritsFrom(TCollection::Class())) {
585 ((TCollection*)hobj)->SetOwner();
587 hobj->ResetBit(kMustCleanup);
590 ROOT::MergeFunc_t func = cl->GetMerge();
591 Long64_t result = func(obj, &inputs, &info);
592 info.fIsFirst = kFALSE;
594 Error(
"MergeRecursive",
"calling Merge() on '%s' with the corresponding object in '%s'",
595 obj->GetName(), nextsource->GetName());
601 nextsource = (TFile*)sourcelist->After( nextsource );
602 }
while (nextsource);
604 if (oneGo || info.fIsFirst) {
605 ROOT::MergeFunc_t func = cl->GetMerge();
606 func(obj, &inputs, &info);
607 info.fIsFirst = kFALSE;
611 }
else if (cl->IsTObject() &&
612 cl->GetMethodWithPrototype(
"Merge",
"TCollection*,TFileMergeInfo*") ) {
616 if (alreadyseen)
continue;
620 listHargs.Form(
"(TCollection*)0x%lx,(TFileMergeInfo*)0x%lx", (ULong_t)&listH,(ULong_t)&info);
623 TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
624 if (nextsource == 0) {
627 obj->Execute(
"Merge", listHargs.Data(), &error);
628 info.fIsFirst = kFALSE;
630 Error(
"MergeRecursive",
"calling Merge() on '%s' with the corresponding object in '%s'",
631 obj->GetName(), key->GetName());
636 TDirectory *ndir = nextsource->GetDirectory(path);
642 TKey *key2 = (TKey*)ndir->GetListOfKeys()->FindObject(key->GetName());
644 TObject *hobj = key2->ReadObj();
646 Info(
"MergeRecursive",
"could not read object for key {%s, %s}; skipping file %s",
647 key->GetName(), key->GetTitle(), nextsource->GetName());
648 nextsource = (TFile*)sourcelist->After(nextsource);
652 if (hobj->InheritsFrom(TCollection::Class())) {
653 ((TCollection*)hobj)->SetOwner();
655 hobj->ResetBit(kMustCleanup);
658 obj->Execute(
"Merge", listHargs.Data(), &error);
659 info.fIsFirst = kFALSE;
661 Error(
"MergeRecursive",
"calling Merge() on '%s' with the corresponding object in '%s'",
662 obj->GetName(), nextsource->GetName());
667 nextsource = (TFile*)sourcelist->After( nextsource );
672 obj->Execute(
"Merge", listHargs.Data(), &error);
673 info.fIsFirst = kFALSE;
677 }
else if (cl->IsTObject() &&
678 cl->GetMethodWithPrototype(
"Merge",
"TCollection*") ) {
682 if (alreadyseen)
continue;
686 listHargs.Form(
"((TCollection*)0x%lx)", (ULong_t)&listH);
689 TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
690 if (nextsource == 0) {
693 obj->Execute(
"Merge", listHargs.Data(), &error);
695 Error(
"MergeRecursive",
"calling Merge() on '%s' with the corresponding object in '%s'",
696 obj->GetName(), key->GetName());
701 TDirectory *ndir = nextsource->GetDirectory(path);
707 TKey *key2 = (TKey*)ndir->GetListOfKeys()->FindObject(key->GetName());
709 TObject *hobj = key2->ReadObj();
711 Info(
"MergeRecursive",
"could not read object for key {%s, %s}; skipping file %s",
712 key->GetName(), key->GetTitle(), nextsource->GetName());
713 nextsource = (TFile*)sourcelist->After(nextsource);
717 if (hobj->InheritsFrom(TCollection::Class())) {
718 ((TCollection*)hobj)->SetOwner();
720 hobj->ResetBit(kMustCleanup);
723 obj->Execute(
"Merge", listHargs.Data(), &error);
724 info.fIsFirst = kFALSE;
726 Error(
"MergeRecursive",
"calling Merge() on '%s' with the corresponding object in '%s'",
727 obj->GetName(), nextsource->GetName());
732 nextsource = (TFile*)sourcelist->After( nextsource );
737 obj->Execute(
"Merge", listHargs.Data(), &error);
738 info.fIsFirst = kFALSE;
744 canBeMerged = kFALSE;
753 oldkeyname = key->GetName();
755 if(cl->InheritsFrom( TDirectory::Class() )) {
758 auto dirobj =
dynamic_cast<TDirectory*
>(obj);
759 TString dirpath(dirobj->GetPath());
761 dirpath.Remove(0, std::strlen(dirobj->GetFile()->GetPath()));
767 if (!(type&kIncremental) || dirobj->GetFile() != target) {
768 dirobj->ResetBit(kMustCleanup);
774 TFile *nextsource = current_file ? (TFile*)sourcelist->After( current_file ) : (TFile*)sourcelist->First();
776 TDirectory *ndir = nextsource->GetDirectory(dirpath);
779 ndir->ResetBit(kMustCleanup);
781 nextsource = (TFile*)sourcelist->After( nextsource );
783 }
else if (cl->InheritsFrom( TCollection::Class() )) {
785 if ( obj->Write( oldkeyname, canBeMerged ? TObject::kSingleKey | TObject::kOverwrite : TObject::kSingleKey) <= 0 ) {
788 ((TCollection*)obj)->SetOwner();
793 if (cl->IsTObject()) {
794 if ( obj->Write( oldkeyname, canBeMerged ? TObject::kOverwrite : 0) <= 0) {
797 obj->ResetBit(kMustCleanup);
799 if ( target->WriteObjectAny( (
void*)obj, cl, oldkeyname, canBeMerged ?
"OverWrite" :
"" ) <= 0) {
808 current_file = current_file ? (TFile*)sourcelist->After(current_file) : (TFile*)sourcelist->First();
810 current_sourcedir = current_file->GetDirectory(path);
812 current_sourcedir = 0;
816 if (!(type&kIncremental)) {
819 target->SaveSelf(kTRUE);
839 Bool_t TFileMerger::PartialMerge(Int_t in_type)
842 TString outf(fOutputFilename);
844 outf.Form(
"file:%s/FileMerger.root", gSystem->TempDirectory());
845 Info(
"PartialMerge",
"will merge the results to the file %s\n"
846 "since you didn't specify a merge filename",
847 TUrl(outf).GetFile());
849 if (!OutputFile(outf.Data())) {
855 if ((fFileList.GetEntries() == 1) && !fExcessFiles.GetEntries() &&
856 !(in_type & kIncremental) && !fCompressionChange && !fExplicitCompLevel) {
857 fOutputFile->Close();
858 SafeDelete(fOutputFile);
860 TFile *file = (TFile *) fFileList.First();
861 if (!file || (file && file->IsZombie())) {
862 Error(
"PartialMerge",
"one-file case: problem attaching to file");
865 Bool_t result = kTRUE;
866 if (!(result = file->Cp(fOutputFilename))) {
867 Error(
"PartialMerge",
"one-file case: could not copy '%s' to '%s'",
868 file->GetPath(), fOutputFilename.Data());
871 if (file->TestBit(kCanDelete)) file->Close();
874 if (fLocal && !file->InheritsFrom(TMemFile::Class())) {
875 TUrl u(file->GetPath(), kTRUE);
876 if (gSystem->Unlink(u.GetFile()) != 0)
877 Warning(
"PartialMerge",
"problems removing temporary local file '%s'", u.GetFile());
883 fOutputFile->SetBit(kMustCleanup);
885 TDirectory::TContext ctxt;
887 Bool_t result = kTRUE;
888 Int_t type = in_type;
889 while (result && fFileList.GetEntries()>0) {
890 result = MergeRecursive(fOutputFile, &fFileList, type);
893 TIter next(&fFileList);
895 while ((file = (TFile*) next())) {
897 if (file->TestBit(kCanDelete)) file->Close();
899 if(fLocal && !file->InheritsFrom(TMemFile::Class())) {
900 TString p(file->GetPath());
902 p = p(0, p.Index(
':',0));
907 if (result && fExcessFiles.GetEntries() > 0) {
912 type = type | kIncremental;
913 result = OpenExcessFiles();
917 Error(
"Merge",
"error during merge of your ROOT files");
920 if (in_type & kIncremental) {
921 fOutputFile->Write(
"",TObject::kOverwrite);
923 gROOT->GetListOfFiles()->Remove(fOutputFile);
924 fOutputFile->Close();
929 if (in_type & kIncremental) {
932 fOutputFile->ResetBit(kMustCleanup);
933 SafeDelete(fOutputFile);
941 Bool_t TFileMerger::OpenExcessFiles()
943 if (fPrintLevel > 0) {
944 Printf(
"%s Opening the next %d files", fMsgPrefix.Data(), TMath::Min(fExcessFiles.GetEntries(), fMaxOpenedFiles - 1));
947 TIter next(&fExcessFiles);
951 TDirectory::TContext ctxt;
952 while( nfiles < (fMaxOpenedFiles-1) && ( url = (TObjString*)next() ) ) {
956 localcopy.Form(
"file:%s/ROOTMERGE-%s.root", gSystem->TempDirectory(), uuid.AsString());
957 if (!TFile::Cp(url->GetName(), localcopy, url->TestBit(kCpProgress))) {
958 Error(
"OpenExcessFiles",
"cannot get a local copy of file %s", url->GetName());
961 newfile = TFile::Open(localcopy,
"READ");
963 newfile = TFile::Open(url->GetName(),
"READ");
968 Error(
"OpenExcessFiles",
"cannot open local copy %s of URL %s",
969 localcopy.Data(), url->GetName());
971 Error(
"OpenExcessFiles",
"cannot open file %s", url->GetName());
974 if (fOutputFile && fOutputFile->GetCompressionLevel() != newfile->GetCompressionLevel()) fCompressionChange = kTRUE;
976 newfile->SetBit(kCanDelete);
977 fFileList.Add(newfile);
979 fExcessFiles.Remove(url);
988 void TFileMerger::RecursiveRemove(TObject *obj)
990 if (obj == fOutputFile) {
991 Fatal(
"RecursiveRemove",
"Output file of the TFile Merger (targeting %s) has been deleted (likely due to a TTree larger than 100Gb)", fOutputFilename.Data());
1002 void TFileMerger::SetMaxOpenedFiles(Int_t newmax)
1004 Int_t sysmax = R__GetSystemMaxOpenedFiles();
1005 if (newmax < sysmax) {
1006 fMaxOpenedFiles = newmax;
1008 fMaxOpenedFiles = sysmax;
1010 if (fMaxOpenedFiles < 2) {
1011 fMaxOpenedFiles = 2;
1018 void TFileMerger::SetMsgPrefix(
const char *prefix)
1020 fMsgPrefix = prefix;