Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
RPageStorageRaw.cxx
Go to the documentation of this file.
1 /// \file RPageStorageRaw.cxx
2 /// \ingroup NTuple ROOT7
3 /// \author Jakob Blomer <jblomer@cern.ch>
4 /// \date 2018-10-04
5 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6 /// is welcome!
7 
8 /*************************************************************************
9  * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10  * All rights reserved. *
11  * *
12  * For the licensing terms see $ROOTSYS/LICENSE. *
13  * For the list of contributors see $ROOTSYS/README/CREDITS. *
14  *************************************************************************/
15 
16 #include <ROOT/RPageStorageRaw.hxx>
17 #include <ROOT/RColumn.hxx>
18 #include <ROOT/RLogger.hxx>
20 #include <ROOT/RPage.hxx>
21 #include <ROOT/RPageAllocator.hxx>
22 #include <ROOT/RPagePool.hxx>
23 #include <ROOT/RRawFile.hxx>
24 
25 #include <Compression.h>
26 #include <RZip.h>
27 #include <TError.h>
28 
29 #include <cstdio>
30 #include <cstring>
31 #include <iostream>
32 
33 ROOT::Experimental::Detail::RPageSinkRaw::RPageSinkRaw(std::string_view ntupleName, std::string_view path,
34  const RNTupleWriteOptions &options)
35  : RPageSink(ntupleName, options)
36  , fMetrics("RPageSinkRaw")
37  , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
38  , fZipBuffer(std::make_unique<std::array<char, kMaxPageSize>>())
39 {
40  R__WARNING_HERE("NTuple") << "The RNTuple file format will change. " <<
41  "Do not store real data with this version of RNTuple!";
42  fFile = fopen(std::string(path).c_str(), "wb");
43  R__ASSERT(fFile);
44 }
45 
46 ROOT::Experimental::Detail::RPageSinkRaw::~RPageSinkRaw()
47 {
48  if (fFile)
49  fclose(fFile);
50 }
51 
52 void ROOT::Experimental::Detail::RPageSinkRaw::Write(const void *buffer, std::size_t nbytes)
53 {
54  R__ASSERT(fFile);
55  auto written = fwrite(buffer, 1, nbytes, fFile);
56  R__ASSERT(written == nbytes);
57  fFilePos += written;
58 }
59 
60 void ROOT::Experimental::Detail::RPageSinkRaw::DoCreate(const RNTupleModel & /* model */)
61 {
62  const auto &descriptor = fDescriptorBuilder.GetDescriptor();
63  auto szHeader = descriptor.SerializeHeader(nullptr);
64  auto buffer = new unsigned char[szHeader];
65  descriptor.SerializeHeader(buffer);
66  Write(buffer, szHeader);
67  delete[] buffer;
68  fClusterStart = fFilePos;
69 }
70 
71 ROOT::Experimental::RClusterDescriptor::RLocator
72 ROOT::Experimental::Detail::RPageSinkRaw::DoCommitPage(ColumnHandle_t columnHandle, const RPage &page)
73 {
74  unsigned char *buffer = reinterpret_cast<unsigned char *>(page.GetBuffer());
75  bool isAdoptedBuffer = true;
76  auto packedBytes = page.GetSize();
77  auto element = columnHandle.fColumn->GetElement();
78  const auto isMappable = element->IsMappable();
79 
80  if (!isMappable) {
81  packedBytes = (page.GetNElements() * element->GetBitsOnStorage() + 7) / 8;
82  buffer = new unsigned char[packedBytes];
83  element->Pack(buffer, page.GetBuffer(), page.GetNElements());
84  isAdoptedBuffer = false;
85  }
86 
87  if (fOptions.GetCompression() % 100 != 0) {
88  R__ASSERT(packedBytes <= kMaxPageSize);
89  auto level = fOptions.GetCompression() % 100;
90  auto algorithm = static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues>(fOptions.GetCompression() / 100);
91  int szZipBuffer = kMaxPageSize;
92  int szSource = packedBytes;
93  char *source = reinterpret_cast<char *>(buffer);
94  int zipBytes = 0;
95  R__zipMultipleAlgorithm(level, &szSource, source, &szZipBuffer, fZipBuffer->data(), &zipBytes, algorithm);
96  if ((zipBytes > 0) && (zipBytes < szSource)) {
97  if (!isAdoptedBuffer)
98  delete[] buffer;
99  buffer = reinterpret_cast<unsigned char *>(fZipBuffer->data());
100  packedBytes = zipBytes;
101  isAdoptedBuffer = true;
102  }
103  }
104 
105  RClusterDescriptor::RLocator result;
106  result.fPosition = fFilePos;
107  result.fBytesOnStorage = packedBytes;
108  Write(buffer, packedBytes);
109 
110  if (!isAdoptedBuffer)
111  delete[] buffer;
112 
113  return result;
114 }
115 
116 ROOT::Experimental::RClusterDescriptor::RLocator
117 ROOT::Experimental::Detail::RPageSinkRaw::DoCommitCluster(ROOT::Experimental::NTupleSize_t /* nEntries */)
118 {
119  RClusterDescriptor::RLocator result;
120  result.fPosition = fClusterStart;
121  result.fBytesOnStorage = fFilePos - fClusterStart;
122  fClusterStart = fFilePos;
123  return result;
124 }
125 
126 void ROOT::Experimental::Detail::RPageSinkRaw::DoCommitDataset()
127 {
128  const auto &descriptor = fDescriptorBuilder.GetDescriptor();
129  auto szFooter = descriptor.SerializeFooter(nullptr);
130  auto buffer = new unsigned char[szFooter];
131  descriptor.SerializeFooter(buffer);
132  Write(buffer, szFooter);
133  delete[] buffer;
134 }
135 
136 ROOT::Experimental::Detail::RPage
137 ROOT::Experimental::Detail::RPageSinkRaw::ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
138 {
139  if (nElements == 0)
140  nElements = kDefaultElementsPerPage;
141  auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
142  return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
143 }
144 
145 void ROOT::Experimental::Detail::RPageSinkRaw::ReleasePage(RPage &page)
146 {
147  fPageAllocator->DeletePage(page);
148 }
149 
150 ////////////////////////////////////////////////////////////////////////////////
151 
152 
153 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageAllocatorFile::NewPage(
154  ColumnId_t columnId, void *mem, std::size_t elementSize, std::size_t nElements)
155 {
156  RPage newPage(columnId, mem, elementSize * nElements, elementSize);
157  newPage.TryGrow(nElements);
158  return newPage;
159 }
160 
161 void ROOT::Experimental::Detail::RPageAllocatorFile::DeletePage(const RPage& page)
162 {
163  if (page.IsNull())
164  return;
165  free(page.GetBuffer());
166 }
167 
168 
169 ////////////////////////////////////////////////////////////////////////////////
170 
171 
172 ROOT::Experimental::Detail::RPageSourceRaw::RPageSourceRaw(std::string_view ntupleName,
173  const RNTupleReadOptions &options)
174  : RPageSource(ntupleName, options)
175  , fPageAllocator(std::make_unique<RPageAllocatorFile>())
176  , fPagePool(std::make_shared<RPagePool>())
177  , fUnzipBuffer(std::make_unique<std::array<unsigned char, kMaxPageSize>>())
178  , fMetrics("RPageSourceRaw")
179 {
180  fCtrNRead = fMetrics.MakeCounter<decltype(fCtrNRead)>("nRead", "", "number of read() calls");
181  fCtrSzRead = fMetrics.MakeCounter<decltype(fCtrSzRead)>("szRead", "B", "volume read from file");
182  fCtrSzUnzip = fMetrics.MakeCounter<decltype(fCtrSzUnzip)>("szUnzip", "B", "volume after unzipping");
183  fCtrNPages = fMetrics.MakeCounter<decltype(fCtrNPages)>("nPages", "", "number of populated pages");
184  fCtrTimeWallRead = fMetrics.MakeCounter<decltype(fCtrTimeWallRead)>(
185  "timeWallRead", "ns", "wall clock time spent reading");
186  fCtrTimeCpuRead = fMetrics.MakeCounter<decltype(fCtrTimeCpuRead)>("timeCpuRead", "ns", "CPU time spent reading");
187  fCtrTimeWallUnzip = fMetrics.MakeCounter<decltype(fCtrTimeWallUnzip)>(
188  "timeWallUnzip", "ns", "wall clock time spent decompressing");
189  fCtrTimeCpuUnzip = fMetrics.MakeCounter<decltype(fCtrTimeCpuUnzip)>(
190  "timeCpuUnzip", "ns", "CPU time spent decompressing");
191 }
192 
193 ROOT::Experimental::Detail::RPageSourceRaw::RPageSourceRaw(std::string_view ntupleName, std::string_view path,
194  const RNTupleReadOptions &options)
195  : RPageSourceRaw(ntupleName, options)
196 {
197  fFile = ROOT::Internal::RRawFile::Create(path);
198  R__ASSERT(fFile);
199  R__ASSERT(fFile->GetFeatures() & ROOT::Internal::RRawFile::kFeatureHasSize);
200 }
201 
202 
203 ROOT::Experimental::Detail::RPageSourceRaw::~RPageSourceRaw()
204 {
205 }
206 
207 
208 void ROOT::Experimental::Detail::RPageSourceRaw::Read(void *buffer, std::size_t nbytes, std::uint64_t offset)
209 {
210  RNTuplePlainTimer timer(*fCtrTimeWallRead, *fCtrTimeCpuRead);
211  auto nread = fFile->ReadAt(buffer, nbytes, offset);
212  R__ASSERT(nread == nbytes);
213  fCtrSzRead->Add(nread);
214  fCtrNRead->Inc();
215 }
216 
217 
218 ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::Detail::RPageSourceRaw::DoAttach()
219 {
220  unsigned char postscript[RNTupleDescriptor::kNBytesPostscript];
221  auto fileSize = fFile->GetSize();
222  R__ASSERT(fileSize != ROOT::Internal::RRawFile::kUnknownFileSize);
223  R__ASSERT(fileSize >= RNTupleDescriptor::kNBytesPostscript);
224  auto offset = fileSize - RNTupleDescriptor::kNBytesPostscript;
225  Read(postscript, RNTupleDescriptor::kNBytesPostscript, offset);
226 
227  std::uint32_t szHeader;
228  std::uint32_t szFooter;
229  RNTupleDescriptor::LocateMetadata(postscript, szHeader, szFooter);
230  R__ASSERT(fileSize >= szHeader + szFooter);
231 
232  unsigned char *header = new unsigned char[szHeader];
233  unsigned char *footer = new unsigned char[szFooter];
234  Read(header, szHeader, 0);
235  Read(footer, szFooter, fileSize - szFooter);
236 
237  RNTupleDescriptorBuilder descBuilder;
238  descBuilder.SetFromHeader(header);
239  descBuilder.AddClustersFromFooter(footer);
240  delete[] header;
241  delete[] footer;
242 
243  return descBuilder.MoveDescriptor();
244 }
245 
246 
247 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRaw::PopulatePageFromCluster(
248  ColumnHandle_t columnHandle, const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType clusterIndex)
249 {
250  fCtrNPages->Inc();
251  auto columnId = columnHandle.fId;
252  auto clusterId = clusterDescriptor.GetId();
253  const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
254 
255  // TODO(jblomer): binary search
256  RClusterDescriptor::RPageRange::RPageInfo pageInfo;
257  decltype(clusterIndex) firstInPage = 0;
258  for (const auto &pi : pageRange.fPageInfos) {
259  if (firstInPage + pi.fNElements > clusterIndex) {
260  pageInfo = pi;
261  break;
262  }
263  firstInPage += pi.fNElements;
264  }
265  R__ASSERT(firstInPage <= clusterIndex);
266  R__ASSERT((firstInPage + pageInfo.fNElements) > clusterIndex);
267 
268  auto element = columnHandle.fColumn->GetElement();
269  auto elementSize = element->GetSize();
270 
271  auto pageSize = pageInfo.fLocator.fBytesOnStorage;
272  void *pageBuffer = malloc(std::max(pageSize, static_cast<std::uint32_t>(elementSize * pageInfo.fNElements)));
273  R__ASSERT(pageBuffer);
274  Read(pageBuffer, pageSize, pageInfo.fLocator.fPosition);
275 
276  auto bytesOnStorage = (element->GetBitsOnStorage() * pageInfo.fNElements + 7) / 8;
277  if (pageSize != bytesOnStorage) {
278  RNTuplePlainTimer timer(*fCtrTimeWallUnzip, *fCtrTimeCpuUnzip);
279 
280  R__ASSERT(bytesOnStorage <= kMaxPageSize);
281  // We do have the unzip information in the column range, but here we simply use the value from
282  // the R__zip header
283  int szUnzipBuffer = kMaxPageSize;
284  int szSource = pageSize;
285  unsigned char *source = reinterpret_cast<unsigned char *>(pageBuffer);
286  int unzipBytes = 0;
287  R__unzip(&szSource, source, &szUnzipBuffer, fUnzipBuffer->data(), &unzipBytes);
288  R__ASSERT(unzipBytes > static_cast<int>(pageSize));
289  memcpy(pageBuffer, fUnzipBuffer->data(), unzipBytes);
290  pageSize = unzipBytes;
291  fCtrSzUnzip->Add(unzipBytes);
292  }
293 
294  if (!element->IsMappable()) {
295  pageSize = elementSize * pageInfo.fNElements;
296  auto unpackedBuffer = reinterpret_cast<unsigned char *>(malloc(pageSize));
297  R__ASSERT(unpackedBuffer != nullptr);
298  element->Unpack(unpackedBuffer, pageBuffer, pageInfo.fNElements);
299  free(pageBuffer);
300  pageBuffer = unpackedBuffer;
301  }
302 
303  auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
304  auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, elementSize, pageInfo.fNElements);
305  newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
306  fPagePool->RegisterPage(newPage,
307  RPageDeleter([](const RPage &page, void * /*userData*/)
308  {
309  RPageAllocatorFile::DeletePage(page);
310  }, nullptr));
311  return newPage;
312 }
313 
314 
315 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRaw::PopulatePage(
316  ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
317 {
318  auto columnId = columnHandle.fId;
319  auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
320  if (!cachedPage.IsNull())
321  return cachedPage;
322 
323  auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
324  R__ASSERT(clusterId != kInvalidDescriptorId);
325  const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
326  auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
327  R__ASSERT(selfOffset <= globalIndex);
328  return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
329 }
330 
331 
332 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRaw::PopulatePage(
333  ColumnHandle_t columnHandle, const RClusterIndex &clusterIndex)
334 {
335  auto clusterId = clusterIndex.GetClusterId();
336  auto index = clusterIndex.GetIndex();
337  auto columnId = columnHandle.fId;
338  auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
339  if (!cachedPage.IsNull())
340  return cachedPage;
341 
342  R__ASSERT(clusterId != kInvalidDescriptorId);
343  const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
344  return PopulatePageFromCluster(columnHandle, clusterDescriptor, index);
345 }
346 
347 void ROOT::Experimental::Detail::RPageSourceRaw::ReleasePage(RPage &page)
348 {
349  fPagePool->ReturnPage(page);
350 }
351 
352 std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceRaw::Clone() const
353 {
354  auto clone = new RPageSourceRaw(fNTupleName, fOptions);
355  clone->fFile = fFile->Clone();
356  return std::unique_ptr<RPageSourceRaw>(clone);
357 }