33 ROOT::Experimental::Detail::RPageSinkRaw::RPageSinkRaw(std::string_view ntupleName, std::string_view path,
34 const RNTupleWriteOptions &options)
35 : RPageSink(ntupleName, options)
36 , fMetrics(
"RPageSinkRaw")
37 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
38 , fZipBuffer(std::make_unique<std::array<char, kMaxPageSize>>())
40 R__WARNING_HERE(
"NTuple") <<
"The RNTuple file format will change. " <<
41 "Do not store real data with this version of RNTuple!";
42 fFile = fopen(std::string(path).c_str(),
"wb");
46 ROOT::Experimental::Detail::RPageSinkRaw::~RPageSinkRaw()
52 void ROOT::Experimental::Detail::RPageSinkRaw::Write(
const void *buffer, std::size_t nbytes)
55 auto written = fwrite(buffer, 1, nbytes, fFile);
56 R__ASSERT(written == nbytes);
60 void ROOT::Experimental::Detail::RPageSinkRaw::DoCreate(
const RNTupleModel & )
62 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
63 auto szHeader = descriptor.SerializeHeader(
nullptr);
64 auto buffer =
new unsigned char[szHeader];
65 descriptor.SerializeHeader(buffer);
66 Write(buffer, szHeader);
68 fClusterStart = fFilePos;
71 ROOT::Experimental::RClusterDescriptor::RLocator
72 ROOT::Experimental::Detail::RPageSinkRaw::DoCommitPage(ColumnHandle_t columnHandle,
const RPage &page)
74 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(page.GetBuffer());
75 bool isAdoptedBuffer =
true;
76 auto packedBytes = page.GetSize();
77 auto element = columnHandle.fColumn->GetElement();
78 const auto isMappable = element->IsMappable();
81 packedBytes = (page.GetNElements() * element->GetBitsOnStorage() + 7) / 8;
82 buffer =
new unsigned char[packedBytes];
83 element->Pack(buffer, page.GetBuffer(), page.GetNElements());
84 isAdoptedBuffer =
false;
87 if (fOptions.GetCompression() % 100 != 0) {
88 R__ASSERT(packedBytes <= kMaxPageSize);
89 auto level = fOptions.GetCompression() % 100;
90 auto algorithm =
static_cast<ROOT::RCompressionSetting::EAlgorithm::EValues
>(fOptions.GetCompression() / 100);
91 int szZipBuffer = kMaxPageSize;
92 int szSource = packedBytes;
93 char *source =
reinterpret_cast<char *
>(buffer);
95 R__zipMultipleAlgorithm(level, &szSource, source, &szZipBuffer, fZipBuffer->data(), &zipBytes, algorithm);
96 if ((zipBytes > 0) && (zipBytes < szSource)) {
99 buffer =
reinterpret_cast<unsigned char *
>(fZipBuffer->data());
100 packedBytes = zipBytes;
101 isAdoptedBuffer =
true;
105 RClusterDescriptor::RLocator result;
106 result.fPosition = fFilePos;
107 result.fBytesOnStorage = packedBytes;
108 Write(buffer, packedBytes);
110 if (!isAdoptedBuffer)
116 ROOT::Experimental::RClusterDescriptor::RLocator
117 ROOT::Experimental::Detail::RPageSinkRaw::DoCommitCluster(ROOT::Experimental::NTupleSize_t )
119 RClusterDescriptor::RLocator result;
120 result.fPosition = fClusterStart;
121 result.fBytesOnStorage = fFilePos - fClusterStart;
122 fClusterStart = fFilePos;
126 void ROOT::Experimental::Detail::RPageSinkRaw::DoCommitDataset()
128 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
129 auto szFooter = descriptor.SerializeFooter(
nullptr);
130 auto buffer =
new unsigned char[szFooter];
131 descriptor.SerializeFooter(buffer);
132 Write(buffer, szFooter);
136 ROOT::Experimental::Detail::RPage
137 ROOT::Experimental::Detail::RPageSinkRaw::ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
140 nElements = kDefaultElementsPerPage;
141 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
142 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
145 void ROOT::Experimental::Detail::RPageSinkRaw::ReleasePage(RPage &page)
147 fPageAllocator->DeletePage(page);
153 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageAllocatorFile::NewPage(
154 ColumnId_t columnId,
void *mem, std::size_t elementSize, std::size_t nElements)
156 RPage newPage(columnId, mem, elementSize * nElements, elementSize);
157 newPage.TryGrow(nElements);
161 void ROOT::Experimental::Detail::RPageAllocatorFile::DeletePage(
const RPage& page)
165 free(page.GetBuffer());
172 ROOT::Experimental::Detail::RPageSourceRaw::RPageSourceRaw(std::string_view ntupleName,
173 const RNTupleReadOptions &options)
174 : RPageSource(ntupleName, options)
175 , fPageAllocator(std::make_unique<RPageAllocatorFile>())
176 , fPagePool(std::make_shared<RPagePool>())
177 , fUnzipBuffer(std::make_unique<std::array<unsigned char, kMaxPageSize>>())
178 , fMetrics(
"RPageSourceRaw")
180 fCtrNRead = fMetrics.MakeCounter<decltype(fCtrNRead)>(
"nRead",
"",
"number of read() calls");
181 fCtrSzRead = fMetrics.MakeCounter<decltype(fCtrSzRead)>(
"szRead",
"B",
"volume read from file");
182 fCtrSzUnzip = fMetrics.MakeCounter<decltype(fCtrSzUnzip)>(
"szUnzip",
"B",
"volume after unzipping");
183 fCtrNPages = fMetrics.MakeCounter<decltype(fCtrNPages)>(
"nPages",
"",
"number of populated pages");
184 fCtrTimeWallRead = fMetrics.MakeCounter<decltype(fCtrTimeWallRead)>(
185 "timeWallRead",
"ns",
"wall clock time spent reading");
186 fCtrTimeCpuRead = fMetrics.MakeCounter<decltype(fCtrTimeCpuRead)>(
"timeCpuRead",
"ns",
"CPU time spent reading");
187 fCtrTimeWallUnzip = fMetrics.MakeCounter<decltype(fCtrTimeWallUnzip)>(
188 "timeWallUnzip",
"ns",
"wall clock time spent decompressing");
189 fCtrTimeCpuUnzip = fMetrics.MakeCounter<decltype(fCtrTimeCpuUnzip)>(
190 "timeCpuUnzip",
"ns",
"CPU time spent decompressing");
193 ROOT::Experimental::Detail::RPageSourceRaw::RPageSourceRaw(std::string_view ntupleName, std::string_view path,
194 const RNTupleReadOptions &options)
195 : RPageSourceRaw(ntupleName, options)
197 fFile = ROOT::Internal::RRawFile::Create(path);
199 R__ASSERT(fFile->GetFeatures() & ROOT::Internal::RRawFile::kFeatureHasSize);
203 ROOT::Experimental::Detail::RPageSourceRaw::~RPageSourceRaw()
208 void ROOT::Experimental::Detail::RPageSourceRaw::Read(
void *buffer, std::size_t nbytes, std::uint64_t offset)
210 RNTuplePlainTimer timer(*fCtrTimeWallRead, *fCtrTimeCpuRead);
211 auto nread = fFile->ReadAt(buffer, nbytes, offset);
212 R__ASSERT(nread == nbytes);
213 fCtrSzRead->Add(nread);
218 ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::Detail::RPageSourceRaw::DoAttach()
220 unsigned char postscript[RNTupleDescriptor::kNBytesPostscript];
221 auto fileSize = fFile->GetSize();
222 R__ASSERT(fileSize != ROOT::Internal::RRawFile::kUnknownFileSize);
223 R__ASSERT(fileSize >= RNTupleDescriptor::kNBytesPostscript);
224 auto offset = fileSize - RNTupleDescriptor::kNBytesPostscript;
225 Read(postscript, RNTupleDescriptor::kNBytesPostscript, offset);
227 std::uint32_t szHeader;
228 std::uint32_t szFooter;
229 RNTupleDescriptor::LocateMetadata(postscript, szHeader, szFooter);
230 R__ASSERT(fileSize >= szHeader + szFooter);
232 unsigned char *header =
new unsigned char[szHeader];
233 unsigned char *footer =
new unsigned char[szFooter];
234 Read(header, szHeader, 0);
235 Read(footer, szFooter, fileSize - szFooter);
237 RNTupleDescriptorBuilder descBuilder;
238 descBuilder.SetFromHeader(header);
239 descBuilder.AddClustersFromFooter(footer);
243 return descBuilder.MoveDescriptor();
247 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRaw::PopulatePageFromCluster(
248 ColumnHandle_t columnHandle,
const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType clusterIndex)
251 auto columnId = columnHandle.fId;
252 auto clusterId = clusterDescriptor.GetId();
253 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
256 RClusterDescriptor::RPageRange::RPageInfo pageInfo;
257 decltype(clusterIndex) firstInPage = 0;
258 for (const auto &pi : pageRange.fPageInfos) {
259 if (firstInPage + pi.fNElements > clusterIndex) {
263 firstInPage += pi.fNElements;
265 R__ASSERT(firstInPage <= clusterIndex);
266 R__ASSERT((firstInPage + pageInfo.fNElements) > clusterIndex);
268 auto element = columnHandle.fColumn->GetElement();
269 auto elementSize = element->GetSize();
271 auto pageSize = pageInfo.fLocator.fBytesOnStorage;
272 void *pageBuffer = malloc(std::max(pageSize, static_cast<std::uint32_t>(elementSize * pageInfo.fNElements)));
273 R__ASSERT(pageBuffer);
274 Read(pageBuffer, pageSize, pageInfo.fLocator.fPosition);
276 auto bytesOnStorage = (element->GetBitsOnStorage() * pageInfo.fNElements + 7) / 8;
277 if (pageSize != bytesOnStorage) {
278 RNTuplePlainTimer timer(*fCtrTimeWallUnzip, *fCtrTimeCpuUnzip);
280 R__ASSERT(bytesOnStorage <= kMaxPageSize);
283 int szUnzipBuffer = kMaxPageSize;
284 int szSource = pageSize;
285 unsigned char *source =
reinterpret_cast<unsigned char *
>(pageBuffer);
287 R__unzip(&szSource, source, &szUnzipBuffer, fUnzipBuffer->data(), &unzipBytes);
288 R__ASSERT(unzipBytes > static_cast<int>(pageSize));
289 memcpy(pageBuffer, fUnzipBuffer->data(), unzipBytes);
290 pageSize = unzipBytes;
291 fCtrSzUnzip->Add(unzipBytes);
294 if (!element->IsMappable()) {
295 pageSize = elementSize * pageInfo.fNElements;
296 auto unpackedBuffer =
reinterpret_cast<unsigned char *
>(malloc(pageSize));
297 R__ASSERT(unpackedBuffer !=
nullptr);
298 element->Unpack(unpackedBuffer, pageBuffer, pageInfo.fNElements);
300 pageBuffer = unpackedBuffer;
303 auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
304 auto newPage = fPageAllocator->NewPage(columnId, pageBuffer, elementSize, pageInfo.fNElements);
305 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
306 fPagePool->RegisterPage(newPage,
307 RPageDeleter([](
const RPage &page,
void * )
309 RPageAllocatorFile::DeletePage(page);
315 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRaw::PopulatePage(
316 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
318 auto columnId = columnHandle.fId;
319 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
320 if (!cachedPage.IsNull())
323 auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
324 R__ASSERT(clusterId != kInvalidDescriptorId);
325 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
326 auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
327 R__ASSERT(selfOffset <= globalIndex);
328 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
332 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRaw::PopulatePage(
333 ColumnHandle_t columnHandle,
const RClusterIndex &clusterIndex)
335 auto clusterId = clusterIndex.GetClusterId();
336 auto index = clusterIndex.GetIndex();
337 auto columnId = columnHandle.fId;
338 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
339 if (!cachedPage.IsNull())
342 R__ASSERT(clusterId != kInvalidDescriptorId);
343 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
344 return PopulatePageFromCluster(columnHandle, clusterDescriptor, index);
347 void ROOT::Experimental::Detail::RPageSourceRaw::ReleasePage(RPage &page)
349 fPagePool->ReturnPage(page);
352 std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceRaw::Clone()
const
354 auto clone =
new RPageSourceRaw(fNTupleName, fOptions);
355 clone->fFile = fFile->Clone();
356 return std::unique_ptr<RPageSourceRaw>(clone);