Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TParallelMergingFile.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Philippe Canal October 2011.
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2011, Rene Brun, Fons Rademakers and al. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TParallelMergingFile //
15 // //
16 // Specialization of TMemFile to connect to a parallel file merger. //
17 // Upon a call to UploadAndReset, the content already written to the //
18 // file is upload to the server and the object implementing the function//
19 // ResetAfterMerge (like TTree) are reset. //
20 // The parallel file merger will then collate the information coming //
21 // from this client and any other client in to the file described by //
22 // the filename of this object. //
23 // //
24 //////////////////////////////////////////////////////////////////////////
25 
26 #include "TParallelMergingFile.h"
27 #include "TSocket.h"
28 #include "TArrayC.h"
29 
30 ////////////////////////////////////////////////////////////////////////////////
31 /// Constructor.
32 /// We do no yet open any connection to the server. This will be done at the
33 /// time the first upload will be requested.
34 
35 TParallelMergingFile::TParallelMergingFile(const char *filename, Option_t *option /* = "" */,
36  const char *ftitle /* = "" */, Int_t compress /* = 1 */) :
37  TMemFile(filename,option,ftitle,compress),fSocket(0),fServerIdx(-1),fServerVersion(0),fClassSent(0),fMessage(kMESS_OBJECT)
38 {
39  TString serverurl = strstr(fUrl.GetOptions(),"pmerge=");
40  if (serverurl.Length()) {
41  serverurl.ReplaceAll("pmerge=","pmerge://");
42  fServerLocation = TUrl(serverurl);
43  }
44 }
45 
46 ////////////////////////////////////////////////////////////////////////////////
47 /// Destructor.
48 
49 TParallelMergingFile::~TParallelMergingFile()
50 {
51  // We need to call Close, right here so that it is executed _before_
52  // the data member of TParallelMergingFile are destructed.
53  Close();
54  delete fClassSent;
55 }
56 
57 ////////////////////////////////////////////////////////////////////////////////
58 
59 void TParallelMergingFile::Close(Option_t *option)
60 {
61  TMemFile::Close(option);
62  if (fSocket) {
63  if (0==fSocket->Send("Finished")) { // tell server we are finished
64  Warning("Close","Failed to send the finishing message to the server %s:%d",fServerLocation.GetHost(),fServerLocation.GetPort());
65  }
66  fSocket->Close();
67  delete fSocket;
68  }
69  fSocket = 0;
70 }
71 
72 ////////////////////////////////////////////////////////////////////////////////
73 /// Upload the current file data to the merging server.
74 /// Reset the file and return true in case of success.
75 
76 Bool_t TParallelMergingFile::UploadAndReset()
77 {
78  // Open connection to server
79  if (fSocket == 0) {
80  const char *host = fServerLocation.GetHost();
81  Int_t port = fServerLocation.GetPort();
82  if (host == 0 || host[0] == '\0') {
83  host = "localhost";
84  }
85  if (port <= 0) {
86  port = 1095;
87  }
88  fSocket = new TSocket(host,port);
89  if (!fSocket->IsValid()) {
90  Error("UploadAndReset","Could not contact the server %s:%d\n",host,port);
91  delete fSocket;
92  fSocket = 0;
93  return kFALSE;
94  }
95  // Wait till we get the start message
96  // server tells us who we are
97  Int_t kind;
98  Int_t n = fSocket->Recv(fServerIdx, kind);
99 
100  if (n < 0 && kind != 0 /* kStartConnection */)
101  {
102  Error("UploadAndReset","Unexpected server message: kind=%d idx=%d\n",kind,fServerIdx);
103  delete fSocket;
104  fSocket = 0;
105  return kTRUE;
106  }
107  n = fSocket->Recv(fServerVersion, kind);
108  if (n < 0 && kind != 1 /* kProtocol */)
109  {
110  Fatal("UploadAndReset","Unexpected server message: kind=%d status=%d\n",kind,fServerVersion);
111  } else {
112  Info("UploadAndReset","Connected to fastMergeServer version %d with index %d\n",fServerVersion,fServerIdx);
113  }
114  TMessage::EnableSchemaEvolutionForAll(kTRUE);
115  }
116 
117  fMessage.Reset(kMESS_ANY); // re-use TMessage object
118  fMessage.WriteInt(fServerIdx);
119  fMessage.WriteTString(GetName());
120  fMessage.WriteLong64(GetEND());
121  CopyTo(fMessage);
122 
123  // FIXME: CXX17: Use init-statement in if to declare `error` variable
124  int error;
125  if ((error = fSocket->Send(fMessage)) <= 0) {
126  Error("UploadAndReset","Upload to the merging server failed with %d\n",error);
127  delete fSocket;
128  fSocket = 0;
129  return kFALSE;
130  }
131 
132  // Record the StreamerInfo we sent over.
133  Int_t isize = fClassIndex->GetSize();
134  if (!fClassSent) {
135  fClassSent = new TArrayC(isize);
136  } else {
137  if (isize > fClassSent->GetSize()) {
138  fClassSent->Set(isize);
139  }
140  }
141  for(Int_t c = 0; c < isize; ++c) {
142  if (fClassIndex->fArray[c]) {
143  fClassSent->fArray[c] = 1;
144  }
145  }
146  ResetAfterMerge(0);
147 
148  return kTRUE;
149 }
150 
151 ////////////////////////////////////////////////////////////////////////////////
152 /// Write memory objects to this file and upload them to the parallel merge server.
153 /// Then reset all the resetable object (those with a ResetAfterMerge routine,
154 /// like TTree).
155 ///
156 /// Loop on all objects in memory (including subdirectories).
157 /// A new key is created in the KEYS linked list for each object.
158 /// The list of keys is then saved on the file (via WriteKeys)
159 /// as a single data record.
160 /// For values of opt see TObject::Write().
161 /// The directory header info is rewritten on the directory header record.
162 /// The linked list of FREE segments is written.
163 /// The file header is written (bytes 1->fBEGIN).
164 
165 Int_t TParallelMergingFile::Write(const char *, Int_t opt, Int_t bufsiz)
166 {
167  Int_t nbytes = TMemFile::Write(0,opt,bufsiz);
168  if (nbytes) {
169  UploadAndReset();
170  }
171  return nbytes;
172 }
173 
174 ////////////////////////////////////////////////////////////////////////////////
175 /// One can not save a const TDirectory object.
176 
177 Int_t TParallelMergingFile::Write(const char *n, Int_t opt, Int_t bufsize) const
178 {
179  Error("Write const","A const TFile object should not be saved. We try to proceed anyway.");
180  return const_cast<TParallelMergingFile*>(this)->Write(n, opt, bufsize);
181 }
182 
183 ////////////////////////////////////////////////////////////////////////////////
184 /// Write the list of TStreamerInfo as a single object in this file
185 /// The class Streamer description for all classes written to this file
186 /// is saved. See class TStreamerInfo.
187 
188 void TParallelMergingFile::WriteStreamerInfo()
189 {
190  if (!fWritable) return;
191  if (!fClassIndex) return;
192  //no need to update the index if no new classes added to the file
193  if (fClassIndex->fArray[0] == 0) return;
194 
195  // clear fClassIndex for anything we already sent.
196  if (fClassSent) {
197  Int_t isize = fClassIndex->GetSize();
198  Int_t ssize = fClassSent->GetSize();
199  for(Int_t c = 0; c < isize && c < ssize; ++c) {
200  if (fClassSent->fArray[c]) {
201  fClassIndex->fArray[c] = 0;
202  }
203  }
204  }
205 
206  TMemFile::WriteStreamerInfo();
207 }