Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
parallelMergeServer.C
Go to the documentation of this file.
1 /// \file
2 /// \ingroup tutorial_net
3 /// This script shows how to make a simple iterative server that
4 /// can accept connections while handling currently open connections.
5 /// Compare this script to hserv.C that blocks on accept.
6 /// In this script a server socket is created and added to a monitor.
7 /// A monitor object is used to monitor connection requests on
8 /// the server socket. After accepting the connection
9 /// the new socket is added to the monitor and immediately ready
10 /// for use. Once two connections are accepted the server socket
11 /// is removed from the monitor and closed. The monitor continues
12 /// monitoring the sockets.
13 ///
14 /// To run this demo do the following:
15 /// - Open three windows
16 /// - Start ROOT in all three windows
17 /// - Execute in the first window: .x hserv2.C
18 /// - Execute in the second and third windows: .x hclient.C
19 ///
20 /// \macro_code
21 ///
22 /// \author Fons Rademakers
23 
24 #include "TMessage.h"
25 #include "TBenchmark.h"
26 #include "TSocket.h"
27 #include "TH2.h"
28 #include "TTree.h"
29 #include "TMemFile.h"
30 #include "TRandom.h"
31 #include "TError.h"
32 #include "TFileMerger.h"
33 
34 #include "TServerSocket.h"
35 #include "TPad.h"
36 #include "TCanvas.h"
37 #include "TMonitor.h"
38 
39 #include "TFileCacheWrite.h"
40 #include "TSystem.h"
41 #include "THashTable.h"
42 
43 #include "TMath.h"
44 #include "TTimeStamp.h"
45 
46 const int kIncremental = 0;
47 const int kReplaceImmediately = 1;
48 const int kReplaceWait = 2;
49 
50 #include "TKey.h"
51 static Bool_t R__NeedInitialMerge(TDirectory *dir)
52 {
53 
54  if (dir==0) return kFALSE;
55 
56  TIter nextkey(dir->GetListOfKeys());
57  TKey *key;
58  while( (key = (TKey*)nextkey()) ) {
59  TClass *cl = TClass::GetClass(key->GetClassName());
60  if (cl->InheritsFrom(TDirectory::Class())) {
61  TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
62  if (!subdir) {
63  subdir = (TDirectory *)key->ReadObj();
64  }
65  if (R__NeedInitialMerge(subdir)) {
66  return kTRUE;
67  }
68  } else {
69  if (0 != cl->GetResetAfterMerge()) {
70  return kTRUE;
71  }
72  }
73  }
74  return kFALSE;
75 }
76 
77 static void R__DeleteObject(TDirectory *dir, Bool_t withReset)
78 {
79  if (dir==0) return;
80 
81  TIter nextkey(dir->GetListOfKeys());
82  TKey *key;
83  while( (key = (TKey*)nextkey()) ) {
84  TClass *cl = TClass::GetClass(key->GetClassName());
85  if (cl->InheritsFrom(TDirectory::Class())) {
86  TDirectory *subdir = (TDirectory *)dir->GetList()->FindObject(key->GetName());
87  if (!subdir) {
88  subdir = (TDirectory *)key->ReadObj();
89  }
90  R__DeleteObject(subdir,withReset);
91  } else {
92  Bool_t todelete = kFALSE;
93  if (withReset) {
94  todelete = (0 != cl->GetResetAfterMerge());
95  } else {
96  todelete = (0 == cl->GetResetAfterMerge());
97  }
98  if (todelete) {
99  key->Delete();
100  dir->GetListOfKeys()->Remove(key);
101  delete key;
102  }
103  }
104  }
105 }
106 
107 static void R__MigrateKey(TDirectory *destination, TDirectory *source)
108 {
109  if (destination==0 || source==0) return;
110 
111  TIter nextkey(source->GetListOfKeys());
112  TKey *key;
113  while( (key = (TKey*)nextkey()) ) {
114  TClass *cl = TClass::GetClass(key->GetClassName());
115  if (cl->InheritsFrom(TDirectory::Class())) {
116  TDirectory *source_subdir = (TDirectory *)source->GetList()->FindObject(key->GetName());
117  if (!source_subdir) {
118  source_subdir = (TDirectory *)key->ReadObj();
119  }
120  TDirectory *destination_subdir = destination->GetDirectory(key->GetName());
121  if (!destination_subdir) {
122  destination_subdir = destination->mkdir(key->GetName());
123  }
124  R__MigrateKey(destination,source);
125  } else {
126  TKey *oldkey = destination->GetKey(key->GetName());
127  if (oldkey) {
128  oldkey->Delete();
129  delete oldkey;
130  }
131  TKey *newkey = new TKey(destination,*key,0 /* pidoffset */); // a priori the file are from the same client ..
132  destination->GetFile()->SumBuffer(newkey->GetObjlen());
133  newkey->WriteFile(0);
134  if (destination->GetFile()->TestBit(TFile::kWriteError)) {
135  return;
136  }
137  }
138  }
139  destination->SaveSelf();
140 }
141 
142 struct ClientInfo
143 {
144  TFile *fFile; // This object does *not* own the file, it will be own by the owner of the ClientInfo.
145  TString fLocalName;
146  UInt_t fContactsCount;
147  TTimeStamp fLastContact;
148  Double_t fTimeSincePrevContact;
149 
150  ClientInfo() : fFile(0), fLocalName(), fContactsCount(0), fTimeSincePrevContact(0) {}
151  ClientInfo(const char *filename, UInt_t clientId) : fFile(0), fContactsCount(0), fTimeSincePrevContact(0) {
152  fLocalName.Form("%s-%d-%d",filename,clientId,gSystem->GetPid());
153  }
154 
155  void Set(TFile *file)
156  {
157  // Register the new file as coming from this client.
158  if (file != fFile) {
159  // We need to keep any of the keys from the previous file that
160  // are not in the new file.
161  if (fFile) {
162  R__MigrateKey(fFile,file);
163  // delete the previous memory file (if any)
164  delete file;
165  } else {
166  fFile = file;
167  }
168  }
169  TTimeStamp now;
170  fTimeSincePrevContact = now.AsDouble() - fLastContact.AsDouble();
171  fLastContact = now;
172  ++fContactsCount;
173  }
174 };
175 
176 struct ParallelFileMerger : public TObject
177 {
178  typedef std::vector<ClientInfo> ClientColl_t;
179 
180  TString fFilename;
181  TBits fClientsContact; //
182  UInt_t fNClientsContact; //
183  ClientColl_t fClients;
184  TTimeStamp fLastMerge;
185  TFileMerger fMerger;
186 
187  ParallelFileMerger(const char *filename, Bool_t writeCache = kFALSE) : fFilename(filename), fNClientsContact(0), fMerger(kFALSE,kTRUE)
188  {
189  // Default constructor.
190 
191  fMerger.SetPrintLevel(0);
192  fMerger.OutputFile(filename,"RECREATE");
193  if (writeCache) new TFileCacheWrite(fMerger.GetOutputFile(),32*1024*1024);
194  }
195 
196  ~ParallelFileMerger()
197  {
198  // Destructor.
199 
200  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
201  fprintf(stderr,"Client %d reported %u times\n",f,fClients[f].fContactsCount);
202  }
203  for( ClientColl_t::iterator iter = fClients.begin();
204  iter != fClients.end();
205  ++iter)
206  {
207  delete iter->fFile;
208  }
209  }
210 
211  ULong_t Hash() const
212  {
213  // Return hash value for this object.
214  return fFilename.Hash();
215  }
216 
217  const char *GetName() const
218  {
219  // Return the name of the object which is the name of the output file.
220  return fFilename;
221  }
222 
223  Bool_t InitialMerge(TFile *input)
224  {
225  // Initial merge of the input to copy the resetable object (TTree) into the output
226  // and remove them from the input file.
227 
228  fMerger.AddFile(input);
229 
230  Bool_t result = fMerger.PartialMerge(TFileMerger::kIncremental | TFileMerger::kResetable);
231 
232  R__DeleteObject(input,kTRUE);
233  return result;
234  }
235 
236  Bool_t Merge()
237  {
238  // Merge the current inputs into the output file.
239 
240  R__DeleteObject(fMerger.GetOutputFile(),kFALSE); // Remove object that can *not* be incrementally merge and will *not* be reset by the client code.
241  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
242  fMerger.AddFile(fClients[f].fFile);
243  }
244  Bool_t result = fMerger.PartialMerge(TFileMerger::kAllIncremental);
245 
246  // Remove any 'resetable' object (like TTree) from the input file so that they will not
247  // be re-merged. Keep only the object that always need to be re-merged (Histograms).
248  for(unsigned int f = 0 ; f < fClients.size(); ++f) {
249  if (fClients[f].fFile) {
250  R__DeleteObject(fClients[f].fFile,kTRUE);
251  } else {
252  // We back up the file (probably due to memory constraint)
253  TFile *file = TFile::Open(fClients[f].fLocalName,"UPDATE");
254  R__DeleteObject(file,kTRUE); // Remove object that can be incrementally merge and will be reset by the client code.
255  file->Write();
256  delete file;
257  }
258  }
259  fLastMerge = TTimeStamp();
260  fNClientsContact = 0;
261  fClientsContact.Clear();
262 
263  return result;
264  }
265 
266  Bool_t NeedFinalMerge()
267  {
268  // Return true, if there is any data that has not been merged.
269 
270  return fClientsContact.CountBits() > 0;
271  }
272 
273  Bool_t NeedMerge(Float_t clientThreshold)
274  {
275  // Return true, if enough client have reported
276 
277  if (fClients.size()==0) {
278  return kFALSE;
279  }
280 
281  // Calculate average and rms of the time between the last 2 contacts.
282  Double_t sum = 0;
283  Double_t sum2 = 0;
284  for(unsigned int c = 0 ; c < fClients.size(); ++c) {
285  sum += fClients[c].fTimeSincePrevContact;
286  sum2 += fClients[c].fTimeSincePrevContact*fClients[c].fTimeSincePrevContact;
287  }
288  Double_t avg = sum / fClients.size();
289  Double_t sigma = sum2 ? TMath::Sqrt( sum2 / fClients.size() - avg*avg) : 0;
290  Double_t target = avg + 2*sigma;
291  TTimeStamp now;
292  if ( (now.AsDouble() - fLastMerge.AsDouble()) > target) {
293 // Float_t cut = clientThreshold * fClients.size();
294 // if (!(fClientsContact.CountBits() > cut )) {
295 // for(unsigned int c = 0 ; c < fClients.size(); ++c) {
296 // fprintf(stderr,"%d:%f ",c,fClients[c].fTimeSincePrevContact);
297 // }
298 // fprintf(stderr,"merge:%f avg:%f target:%f\n",(now.AsDouble() - fLastMerge.AsDouble()),avg,target);
299 // }
300  return kTRUE;
301  }
302  Float_t cut = clientThreshold * fClients.size();
303  return fClientsContact.CountBits() > cut || fNClientsContact > 2*cut;
304  }
305 
306  void RegisterClient(UInt_t clientId, TFile *file)
307  {
308  // Register that a client has sent a file.
309 
310  ++fNClientsContact;
311  fClientsContact.SetBitNumber(clientId);
312  if (fClients.size() < clientId+1) {
313  fClients.push_back( ClientInfo(fFilename,clientId) );
314  }
315  fClients[clientId].Set(file);
316  }
317 
318  ClassDef(ParallelFileMerger,0);
319 };
320 
321 void parallelMergeServer(bool cache = false) {
322  // Open a server socket looking for connections on a named service or
323  // on a specified port.
324  //TServerSocket *ss = new TServerSocket("rootserv", kTRUE);
325  TServerSocket *ss = new TServerSocket(1095, kTRUE, 100);
326  if (!ss->IsValid()) {
327  return;
328  }
329 
330  TMonitor *mon = new TMonitor;
331 
332  mon->Add(ss);
333 
334  UInt_t clientCount = 0;
335  UInt_t clientIndex = 0;
336 
337  THashTable mergers;
338 
339  enum StatusKind {
340  kStartConnection = 0,
341  kProtocol = 1,
342 
343  kProtocolVersion = 1
344  };
345 
346  printf("fastMergeServerHist ready to accept connections\n");
347  while (1) {
348  TMessage *mess;
349  TSocket *s;
350 
351  // NOTE: this needs to be update to handle the case where the client
352  // dies.
353  s = mon->Select();
354 
355  if (s->IsA() == TServerSocket::Class()) {
356  if (clientCount > 100) {
357  printf("only accept 100 clients connections\n");
358  mon->Remove(ss);
359  ss->Close();
360  } else {
361  TSocket *client = ((TServerSocket *)s)->Accept();
362  client->Send(clientIndex, kStartConnection);
363  client->Send(kProtocolVersion, kProtocol);
364  ++clientCount;
365  ++clientIndex;
366  mon->Add(client);
367  printf("Accept %d connections\n",clientCount);
368  }
369  continue;
370  }
371 
372  s->Recv(mess);
373 
374  if (mess==0) {
375  Error("fastMergeServer","The client did not send a message\n");
376  } else if (mess->What() == kMESS_STRING) {
377  char str[64];
378  mess->ReadString(str, 64);
379  printf("Client %d: %s\n", clientCount, str);
380  mon->Remove(s);
381  printf("Client %d: bytes recv = %d, bytes sent = %d\n", clientCount, s->GetBytesRecv(),
382  s->GetBytesSent());
383  s->Close();
384  --clientCount;
385  if (mon->GetActive() == 0 || clientCount == 0) {
386  printf("No more active clients... stopping\n");
387  break;
388  }
389  } else if (mess->What() == kMESS_ANY) {
390 
391  Long64_t length;
392  TString filename;
393  Int_t clientId;
394  mess->ReadInt(clientId);
395  mess->ReadTString(filename);
396  mess->ReadLong64(length); // '*mess >> length;' is broken in CINT for Long64_t.
397 
398  // Info("fastMergeServerHist","Received input from client %d for %s",clientId,filename.Data());
399 
400  TMemFile *transient = new TMemFile(filename,mess->Buffer() + mess->Length(),length,"UPDATE"); // UPDATE because we need to remove the TTree after merging them.
401  mess->SetBufferOffset(mess->Length()+length);
402 
403  const Float_t clientThreshold = 0.75; // control how often the histogram are merged. Here as soon as half the clients have reported.
404 
405  ParallelFileMerger *info = (ParallelFileMerger*)mergers.FindObject(filename);
406  if (!info) {
407  info = new ParallelFileMerger(filename,cache);
408  mergers.Add(info);
409  }
410 
411  if (R__NeedInitialMerge(transient)) {
412  info->InitialMerge(transient);
413  }
414  info->RegisterClient(clientId,transient);
415  if (info->NeedMerge(clientThreshold)) {
416  // Enough clients reported.
417  Info("fastMergeServerHist","Merging input from %ld clients (%d)",info->fClients.size(),clientId);
418  info->Merge();
419  }
420  transient = 0;
421  } else if (mess->What() == kMESS_OBJECT) {
422  printf("got object of class: %s\n", mess->GetClass()->GetName());
423  } else {
424  printf("*** Unexpected message ***\n");
425  }
426 
427  delete mess;
428  }
429 
430  TIter next(&mergers);
431  ParallelFileMerger *info;
432  while ( (info = (ParallelFileMerger*)next()) ) {
433  if (info->NeedFinalMerge())
434  {
435  info->Merge();
436  }
437  }
438 
439  mergers.Delete();
440  delete mon;
441  delete ss;
442 }