Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
TMessage.cxx
Go to the documentation of this file.
1 // @(#)root/net:$Id$
2 // Author: Fons Rademakers 19/12/96
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TMessage //
15 // //
16 // Message buffer class used for serializing objects and sending them //
17 // over a network. This class inherits from TBuffer the basic I/O //
18 // serializer. //
19 // //
20 //////////////////////////////////////////////////////////////////////////
21 
22 #include "TMessage.h"
23 #include "Compression.h"
24 #include "TVirtualStreamerInfo.h"
25 #include "TList.h"
26 #include "Bytes.h"
27 #include "TProcessID.h"
28 #include "RZip.h"
29 
30 Bool_t TMessage::fgEvolution = kFALSE;
31 
32 
33 ClassImp(TMessage);
34 
35 ////////////////////////////////////////////////////////////////////////////////
36 /// Create a TMessage object for storing objects. The "what" integer
37 /// describes the type of message. Predefined ROOT system message types
38 /// can be found in MessageTypes.h. Make sure your own message types are
39 /// unique from the ROOT defined message types (i.e. 0 - 10000 are
40 /// reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
41 /// will wait for an acknowledgment from the remote side. This makes
42 /// the sending process synchronous. In case you OR "what" with kMESS_ZIP,
43 /// the message will be compressed in TSocket using the zip algorithm
44 /// (only if message is > 256 bytes).
45 
46 TMessage::TMessage(UInt_t what, Int_t bufsiz) :
47  TBufferFile(TBuffer::kWrite, bufsiz + 2*sizeof(UInt_t)),
48  fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
49 {
50  // space at the beginning of the message reserved for the message length
51  UInt_t reserved = 0;
52  *this << reserved;
53 
54  fWhat = what;
55  *this << what;
56 
57  fClass = nullptr;
58  fBufComp = nullptr;
59  fBufCompCur = nullptr;
60  fCompPos = nullptr;
61  fInfos = nullptr;
62  fEvolution = kFALSE;
63 
64  SetBit(kCannotHandleMemberWiseStreaming);
65 }
66 
67 ////////////////////////////////////////////////////////////////////////////////
68 /// Create a TMessage object for reading objects. The objects will be
69 /// read from buf. Use the What() method to get the message type.
70 
71 TMessage::TMessage(void *buf, Int_t bufsize) : TBufferFile(TBuffer::kRead, bufsize, buf),
72  fCompress(ROOT::RCompressionSetting::EAlgorithm::kUseGlobal)
73 {
74  // skip space at the beginning of the message reserved for the message length
75  fBufCur += sizeof(UInt_t);
76 
77  *this >> fWhat;
78 
79  fBufComp = nullptr;
80  fBufCompCur = nullptr;
81  fCompPos = nullptr;
82  fInfos = nullptr;
83  fEvolution = kFALSE;
84 
85  if (fWhat & kMESS_ZIP) {
86  // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
87  fBufComp = fBuffer;
88  fBufCompCur = fBuffer + bufsize;
89  fBuffer = nullptr;
90  Uncompress();
91  }
92 
93  if (fWhat == kMESS_OBJECT) {
94  InitMap();
95  fClass = ReadClass(); // get first the class stored in message
96  SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
97  ResetMap();
98  } else {
99  fClass = nullptr;
100  }
101 }
102 
103 ////////////////////////////////////////////////////////////////////////////////
104 /// Destructor
105 
106 TMessage::~TMessage()
107 {
108  delete [] fBufComp;
109  delete fInfos;
110 }
111 
112 ////////////////////////////////////////////////////////////////////////////////
113 /// Static function enabling or disabling the automatic schema evolution.
114 /// By default schema evolution support is off.
115 
116 void TMessage::EnableSchemaEvolutionForAll(Bool_t enable)
117 {
118  fgEvolution = enable;
119 }
120 
121 ////////////////////////////////////////////////////////////////////////////////
122 /// Static function returning status of global schema evolution.
123 
124 Bool_t TMessage::UsesSchemaEvolutionForAll()
125 {
126  return fgEvolution;
127 }
128 
129 ////////////////////////////////////////////////////////////////////////////////
130 /// Force writing the TStreamerInfo to the message.
131 
132 void TMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
133 {
134  if (fgEvolution || fEvolution) {
135  if (!fInfos) fInfos = new TList();
136  fInfos->Add(info);
137  }
138 }
139 
140 ////////////////////////////////////////////////////////////////////////////////
141 /// Change a buffer that was received into one that can be send, i.e.
142 /// forward a just received message.
143 
144 void TMessage::Forward()
145 {
146  if (IsReading()) {
147  SetWriteMode();
148  SetBufferOffset(fBufSize);
149  SetBit(kCannotHandleMemberWiseStreaming);
150 
151  if (fBufComp) {
152  fCompPos = fBufCur;
153  }
154  }
155 }
156 
157 ////////////////////////////////////////////////////////////////////////////////
158 /// Remember that the StreamerInfo is being used in writing.
159 ///
160 /// When support for schema evolution is enabled the list of TStreamerInfo
161 /// used to stream this object is kept in fInfos. This information is used
162 /// by TSocket::Send that sends this list through the socket. This list is in
163 /// turn used by TSocket::Recv to store the TStreamerInfo objects in the
164 /// relevant TClass in case the TClass does not know yet about a particular
165 /// class version. This feature is implemented to support clients and servers
166 /// with either different ROOT versions or different user classes versions.
167 
168 void TMessage::TagStreamerInfo(TVirtualStreamerInfo *info)
169 {
170  if (fgEvolution || fEvolution) {
171  if (!fInfos) fInfos = new TList();
172  fInfos->Add(info);
173  }
174 }
175 
176 ////////////////////////////////////////////////////////////////////////////////
177 /// Reset the message buffer so we can use (i.e. fill) it again.
178 
179 void TMessage::Reset()
180 {
181  SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
182  ResetMap();
183 
184  if (fBufComp) {
185  delete [] fBufComp;
186  fBufComp = nullptr;
187  fBufCompCur = nullptr;
188  fCompPos = nullptr;
189  }
190 
191  if (fgEvolution || fEvolution) {
192  if (fInfos)
193  fInfos->Clear();
194  }
195  fBitsPIDs.ResetAllBits();
196 }
197 
198 ////////////////////////////////////////////////////////////////////////////////
199 /// Set the message length at the beginning of the message buffer.
200 /// This method is only called by TSocket::Send().
201 
202 void TMessage::SetLength() const
203 {
204  if (IsWriting()) {
205  char *buf = Buffer();
206  if (buf)
207  tobuf(buf, (UInt_t)(Length() - sizeof(UInt_t)));
208 
209  if (fBufComp) {
210  buf = fBufComp;
211  tobuf(buf, (UInt_t)(CompLength() - sizeof(UInt_t)));
212  }
213  }
214 }
215 
216 ////////////////////////////////////////////////////////////////////////////////
217 /// Using this method one can change the message type a-posteriori
218 /// In case you OR "what" with kMESS_ACK, the message will wait for
219 /// an acknowledgment from the remote side. This makes the sending
220 /// process synchronous.
221 
222 void TMessage::SetWhat(UInt_t what)
223 {
224  fWhat = what;
225 
226  char *buf = Buffer();
227  if (buf) {
228  buf += sizeof(UInt_t); // skip reserved length space
229  tobuf(buf, what);
230  }
231 
232  if (fBufComp) {
233  buf = fBufComp;
234  buf += sizeof(UInt_t); // skip reserved length space
235  tobuf(buf, what | kMESS_ZIP);
236  }
237 }
238 
239 ////////////////////////////////////////////////////////////////////////////////
240 /// Set compression algorithm
241 
242 void TMessage::SetCompressionAlgorithm(Int_t algorithm)
243 {
244  if (algorithm < 0 || algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
245  Int_t newCompress;
246  if (fCompress < 0) {
247  newCompress = 100 * algorithm + ROOT::RCompressionSetting::ELevel::kUseMin;
248  } else {
249  int level = fCompress % 100;
250  newCompress = 100 * algorithm + level;
251  }
252  if (newCompress != fCompress && fBufComp) {
253  delete [] fBufComp;
254  fBufComp = nullptr;
255  fBufCompCur = nullptr;
256  fCompPos = nullptr;
257  }
258  fCompress = newCompress;
259 }
260 
261 ////////////////////////////////////////////////////////////////////////////////
262 /// Set compression level
263 
264 void TMessage::SetCompressionLevel(Int_t level)
265 {
266  if (level < 0) level = 0;
267  if (level > 99) level = 99;
268  Int_t newCompress;
269  if (fCompress < 0) {
270  newCompress = level;
271  } else {
272  int algorithm = fCompress / 100;
273  if (algorithm >= ROOT::RCompressionSetting::EAlgorithm::kUndefined) algorithm = 0;
274  newCompress = 100 * algorithm + level;
275  }
276  if (newCompress != fCompress && fBufComp) {
277  delete [] fBufComp;
278  fBufComp = nullptr;
279  fBufCompCur = nullptr;
280  fCompPos = nullptr;
281  }
282  fCompress = newCompress;
283 }
284 
285 ////////////////////////////////////////////////////////////////////////////////
286 /// Set compression settings
287 
288 void TMessage::SetCompressionSettings(Int_t settings)
289 {
290  if (settings != fCompress && fBufComp) {
291  delete [] fBufComp;
292  fBufComp = nullptr;
293  fBufCompCur = nullptr;
294  fCompPos = nullptr;
295  }
296  fCompress = settings;
297 }
298 
299 ////////////////////////////////////////////////////////////////////////////////
300 /// Compress the message. The message will only be compressed if the
301 /// compression level > 0 and the if the message is > 256 bytes.
302 /// Returns -1 in case of error (when compression fails or
303 /// when the message increases in size in some pathological cases),
304 /// otherwise returns 0.
305 
306 Int_t TMessage::Compress()
307 {
308  Int_t compressionLevel = GetCompressionLevel();
309  Int_t compressionAlgorithm = GetCompressionAlgorithm();
310  if (compressionLevel <= 0) {
311  // no compression specified
312  if (fBufComp) {
313  delete [] fBufComp;
314  fBufComp = nullptr;
315  fBufCompCur = nullptr;
316  fCompPos = nullptr;
317  }
318  return 0;
319  }
320 
321  if (fBufComp && fCompPos == fBufCur) {
322  // the message was already compressed
323  return 0;
324  }
325 
326  // remove any existing compressed buffer before compressing modified message
327  if (fBufComp) {
328  delete [] fBufComp;
329  fBufComp = nullptr;
330  fBufCompCur = nullptr;
331  fCompPos = nullptr;
332  }
333 
334  if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
335  // this message is too small to be compressed
336  return 0;
337  }
338 
339  if (!Buffer()) {
340  // error condition, should never happen
341  return -1;
342  }
343 
344  Int_t hdrlen = 2*sizeof(UInt_t);
345  Int_t messlen = Length() - hdrlen;
346  Int_t nbuffers = 1 + (messlen - 1) / kMAXZIPBUF;
347  Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
348  Int_t buflen = std::max(512, chdrlen + messlen + 9*nbuffers);
349  fBufComp = new char[buflen];
350  char *messbuf = Buffer() + hdrlen;
351  char *bufcur = fBufComp + chdrlen;
352  Int_t noutot = 0;
353  Int_t nzip = 0;
354  Int_t nout, bufmax;
355  for (Int_t i = 0; i < nbuffers; ++i) {
356  if (i == nbuffers - 1)
357  bufmax = messlen - nzip;
358  else
359  bufmax = kMAXZIPBUF;
360  R__zipMultipleAlgorithm(compressionLevel, &bufmax, messbuf, &bufmax, bufcur, &nout,
361  static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(compressionAlgorithm));
362  if (nout == 0 || nout >= messlen) {
363  //this happens when the buffer cannot be compressed
364  delete [] fBufComp;
365  fBufComp = nullptr;
366  fBufCompCur = nullptr;
367  fCompPos = nullptr;
368  return -1;
369  }
370  bufcur += nout;
371  noutot += nout;
372  messbuf += kMAXZIPBUF;
373  nzip += kMAXZIPBUF;
374  }
375  fBufCompCur = bufcur;
376  fCompPos = fBufCur;
377 
378  bufcur = fBufComp;
379  tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
380  Int_t what = fWhat | kMESS_ZIP;
381  tobuf(bufcur, what);
382  tobuf(bufcur, Length()); // original uncompressed buffer length
383 
384  return 0;
385 }
386 
387 ////////////////////////////////////////////////////////////////////////////////
388 /// Uncompress the message. The message will only be uncompressed when
389 /// kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
390 
391 Int_t TMessage::Uncompress()
392 {
393  if (!fBufComp || !(fWhat & kMESS_ZIP))
394  return -1;
395 
396  Int_t buflen;
397  Int_t hdrlen = 2*sizeof(UInt_t);
398  char *bufcur1 = fBufComp + hdrlen;
399  frombuf(bufcur1, &buflen);
400  UChar_t *bufcur = (UChar_t*)bufcur1;
401 
402  /* early consistency check */
403  Int_t nin, nbuf;
404  if(R__unzip_header(&nin, bufcur, &nbuf)!=0) {
405  Error("Uncompress", "Inconsistency found in header (nin=%d, nbuf=%d)", nin, nbuf);
406  return -1;
407  }
408 
409  fBuffer = new char[buflen];
410  fBufSize = buflen;
411  fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
412  fBufMax = fBuffer + fBufSize;
413  char *messbuf = fBuffer + hdrlen;
414 
415  Int_t nout;
416  Int_t noutot = 0;
417  while (1) {
418  Int_t hc = R__unzip_header(&nin, bufcur, &nbuf);
419  if (hc!=0) break;
420  R__unzip(&nin, bufcur, &nbuf, (unsigned char*) messbuf, &nout);
421  if (!nout) break;
422  noutot += nout;
423  if (noutot >= buflen - hdrlen) break;
424  bufcur += nin;
425  messbuf += nout;
426  }
427 
428  fWhat &= ~kMESS_ZIP;
429  fCompress = 1;
430 
431  return 0;
432 }
433 
434 ////////////////////////////////////////////////////////////////////////////////
435 /// Check if the ProcessID pid is already in the message.
436 /// If not, then:
437 /// - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
438 /// - mark bit uid+1 where uid id the uid of the ProcessID
439 
440 UShort_t TMessage::WriteProcessID(TProcessID *pid)
441 {
442  if (fBitsPIDs.TestBitNumber(0)) return 0;
443  if (!pid)
444  pid = TProcessID::GetPID();
445  if (!pid) return 0;
446  fBitsPIDs.SetBitNumber(0);
447  UInt_t uid = pid->GetUniqueID();
448  fBitsPIDs.SetBitNumber(uid+1);
449  return 1;
450 }