46 const int kIncremental = 0;
47 const int kReplaceImmediately = 1;
48 const int kReplaceWait = 2;
51 static Bool_t R__NeedInitialMerge(TDirectory *dir)
54 if (dir==0)
return kFALSE;
56 TIter nextkey(dir->GetListOfKeys());
58 while( (key = (TKey*)nextkey()) ) {
59 TClass *cl = TClass::GetClass(key->GetClassName());
60 if (cl->InheritsFrom(TDirectory::Class())) {
61 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
63 subdir = (TDirectory *)key->ReadObj();
65 if (R__NeedInitialMerge(subdir)) {
69 if (0 != cl->GetResetAfterMerge()) {
77 static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
81 TIter nextkey(dir->GetListOfKeys());
83 while( (key = (TKey*)nextkey()) ) {
84 TClass *cl = TClass::GetClass(key->GetClassName());
85 if (cl->InheritsFrom(TDirectory::Class())) {
86 TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
88 subdir = (TDirectory *)key->ReadObj();
90 R__DeleteObject(subdir,withReset);
92 Bool_t todelete = kFALSE;
94 todelete = (0 != cl->GetResetAfterMerge());
96 todelete = (0 == cl->GetResetAfterMerge());
100 dir->GetListOfKeys()->Remove(key);
107 static void R__MigrateKey(TDirectory *destination, TDirectory *source)
109 if (destination==0 || source==0)
return;
111 TIter nextkey(source->GetListOfKeys());
113 while( (key = (TKey*)nextkey()) ) {
114 TClass *cl = TClass::GetClass(key->GetClassName());
115 if (cl->InheritsFrom(TDirectory::Class())) {
116 TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
117 if (!source_subdir) {
118 source_subdir = (TDirectory *)key->ReadObj();
120 TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
121 if (!destination_subdir) {
122 destination_subdir = destination->mkdir(key->GetName());
124 R__MigrateKey(destination,source);
126 TKey *oldkey = destination->GetKey(key->GetName());
131 TKey *newkey =
new TKey(destination,*key,0 );
132 destination->GetFile()->SumBuffer(newkey->GetObjlen());
133 newkey->WriteFile(0);
134 if (destination->GetFile()->TestBit(TFile::kWriteError)) {
139 destination->SaveSelf();
146 UInt_t fContactsCount;
147 TTimeStamp fLastContact;
148 Double_t fTimeSincePrevContact;
150 ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151 ClientInfo(
const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
152 fLocalName.Form(
"%s-%d-%d",filename,clientId,gSystem->GetPid());
155 void Set(TFile *file)
162 R__MigrateKey(fFile,file);
170 fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
176 struct ParallelFileMerger :
public TObject
178 typedef std::vector<ClientInfo> ClientColl_t;
181 TBits fClientsContact;
182 UInt_t fNClientsContact;
183 ClientColl_t fClients;
184 TTimeStamp fLastMerge;
187 ParallelFileMerger(
const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
191 fMerger.SetPrintLevel(0);
192 fMerger.OutputFile(filename,
"RECREATE");
193 if (writeCache)
new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
196 ~ParallelFileMerger()
200 for(
unsigned int f = 0 ; f < fClients.size(); ++f) {
201 fprintf(stderr,
"Client %d reported %u times\n",f,fClients[f].fContactsCount);
203 for( ClientColl_t::iterator iter = fClients.begin();
204 iter != fClients.end();
214 return fFilename.Hash();
217 const char *GetName()
const
223 Bool_t InitialMerge(TFile *input)
228 fMerger.AddFile(input);
230 Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable);
232 R__DeleteObject(input,kTRUE);
240 R__DeleteObject(fMerger.GetOutputFile(),kFALSE);
241 for(
unsigned int f = 0 ; f < fClients.size(); ++f) {
242 fMerger.AddFile(fClients[f].fFile);
244 Bool_t result = fMerger.PartialMerge(TFileMerger::kAllIncremental);
248 for(
unsigned int f = 0 ; f < fClients.size(); ++f) {
249 if (fClients[f].fFile) {
250 R__DeleteObject(fClients[f].fFile,kTRUE);
253 TFile *file = TFile::Open(fClients[f].fLocalName,
"UPDATE");
254 R__DeleteObject(file,kTRUE);
259 fLastMerge = TTimeStamp();
260 fNClientsContact = 0;
261 fClientsContact.Clear();
266 Bool_t NeedFinalMerge()
270 return fClientsContact.CountBits() > 0;
273 Bool_t NeedMerge(Float_t clientThreshold)
277 if (fClients.size()==0) {
284 for(
unsigned int c = 0 ; c < fClients.size(); ++c) {
285 sum += fClients[c].fTimeSincePrevContact;
286 sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
288 Double_t avg = sum / fClients.size();
289 Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
290 Double_t target = avg + 2*sigma;
292 if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
302 Float_t cut = clientThreshold * fClients.size();
303 return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
306 void RegisterClient(UInt_t clientId, TFile *file)
311 fClientsContact.SetBitNumber(clientId);
312 if (fClients.size() < clientId+1) {
313 fClients.push_back( ClientInfo(fFilename,clientId) );
315 fClients[clientId].Set(file);
318 ClassDef(ParallelFileMerger,0);
321 void parallelMergeServer(
bool cache =
false) {
325 TServerSocket *ss =
new TServerSocket(1095, kTRUE, 100);
326 if (!ss->IsValid()) {
330 TMonitor *mon =
new TMonitor;
334 UInt_t clientCount = 0;
335 UInt_t clientIndex = 0;
340 kStartConnection = 0,
346 printf(
"fastMergeServerHist ready to accept connections\n");
355 if (s->IsA() == TServerSocket::Class()) {
356 if (clientCount > 100) {
357 printf(
"only accept 100 clients connections\n");
361 TSocket *client = ((TServerSocket *)s)->Accept();
362 client->Send(clientIndex, kStartConnection);
363 client->Send(kProtocolVersion, kProtocol);
367 printf(
"Accept %d connections\n",clientCount);
375 Error(
"fastMergeServer",
"The client did not send a message\n");
376 }
else if (mess->What() == kMESS_STRING) {
378 mess->ReadString(str, 64);
379 printf(
"Client %d: %s\n", clientCount, str);
381 printf(
"Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
385 if (mon->GetActive() == 0 || clientCount == 0) {
386 printf(
"No more active clients... stopping\n");
389 }
else if (mess->What() == kMESS_ANY) {
394 mess->ReadInt(clientId);
395 mess->ReadTString(filename);
396 mess->ReadLong64(length);
400 TMemFile *
transient =
new TMemFile(filename,mess->Buffer() + mess->Length(),length,
"UPDATE");
401 mess->SetBufferOffset(mess->Length()+length);
403 const Float_t clientThreshold = 0.75;
405 ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
407 info =
new ParallelFileMerger(filename,cache);
411 if (R__NeedInitialMerge(
transient)) {
412 info->InitialMerge(
transient);
414 info->RegisterClient(clientId,
transient);
415 if (info->NeedMerge(clientThreshold)) {
417 Info(
"fastMergeServerHist",
"Merging input from %ld clients (%d)",info->fClients.size(),clientId);
421 }
else if (mess->What() == kMESS_OBJECT) {
422 printf(
"got object of class: %s\n", mess->GetClass()->GetName());
424 printf(
"*** Unexpected message ***\n");
430 TIter next(&mergers);
431 ParallelFileMerger *info;
432 while ( (info = (ParallelFileMerger*)next()) ) {
433 if (info->NeedFinalMerge())