33 static constexpr
const char* kKeySeparator =
"_";
34 static constexpr
const char* kKeyNTupleFooter =
"NTPLF";
35 static constexpr
const char* kKeyNTupleHeader =
"NTPLH";
36 static constexpr
const char* kKeyPagePayload =
"NTPLP";
40 ROOT::Experimental::Detail::RPageSinkRoot::RPageSinkRoot(std::string_view ntupleName, std::string_view path,
41 const RNTupleWriteOptions &options)
42 : RPageSink(ntupleName, options)
43 , fMetrics(
"RPageSinkRoot")
44 , fPageAllocator(std::make_unique<RPageAllocatorHeap>())
46 R__WARNING_HERE(
"NTuple") <<
"The RNTuple file format will change. " <<
47 "Do not store real data with this version of RNTuple!";
48 fFile = std::unique_ptr<TFile>(TFile::Open(std::string(path).c_str(),
"RECREATE"));
49 fFile->SetCompressionSettings(fOptions.GetCompression());
52 ROOT::Experimental::Detail::RPageSinkRoot::~RPageSinkRoot()
58 void ROOT::Experimental::Detail::RPageSinkRoot::DoCreate(
const RNTupleModel & )
60 fDirectory = fFile->mkdir(fNTupleName.c_str());
62 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
63 auto szHeader = descriptor.SerializeHeader(
nullptr);
64 auto buffer =
new unsigned char[szHeader];
65 descriptor.SerializeHeader(buffer);
66 ROOT::Experimental::Internal::RNTupleBlob blob(szHeader, buffer);
67 fDirectory->WriteObject(&blob, kKeyNTupleHeader);
71 ROOT::Experimental::RClusterDescriptor::RLocator
72 ROOT::Experimental::Detail::RPageSinkRoot::DoCommitPage(ColumnHandle_t columnHandle,
const RPage &page)
74 unsigned char *buffer =
reinterpret_cast<unsigned char *
>(page.GetBuffer());
75 auto packedBytes = page.GetSize();
76 auto element = columnHandle.fColumn->GetElement();
77 const auto isMappable = element->IsMappable();
80 packedBytes = (page.GetNElements() * element->GetBitsOnStorage() + 7) / 8;
81 buffer =
new unsigned char[packedBytes];
82 element->Pack(buffer, page.GetBuffer(), page.GetNElements());
85 ROOT::Experimental::Internal::RNTupleBlob pagePayload(packedBytes, buffer);
86 std::string keyName = std::string(kKeyPagePayload) +
87 std::to_string(fLastClusterId) + kKeySeparator +
88 std::to_string(fLastPageIdx);
89 fDirectory->WriteObject(&pagePayload, keyName.c_str());
95 RClusterDescriptor::RLocator result;
96 result.fPosition = fLastPageIdx++;
97 result.fBytesOnStorage = packedBytes;
101 ROOT::Experimental::RClusterDescriptor::RLocator
102 ROOT::Experimental::Detail::RPageSinkRoot::DoCommitCluster(ROOT::Experimental::NTupleSize_t )
105 return RClusterDescriptor::RLocator();
108 void ROOT::Experimental::Detail::RPageSinkRoot::DoCommitDataset()
113 const auto &descriptor = fDescriptorBuilder.GetDescriptor();
114 auto szFooter = descriptor.SerializeFooter(
nullptr);
115 auto buffer =
new unsigned char[szFooter];
116 descriptor.SerializeFooter(buffer);
117 ROOT::Experimental::Internal::RNTupleBlob footerBlob(szFooter, buffer);
118 fDirectory->WriteObject(&footerBlob, kKeyNTupleFooter);
122 ROOT::Experimental::Detail::RPage
123 ROOT::Experimental::Detail::RPageSinkRoot::ReservePage(ColumnHandle_t columnHandle, std::size_t nElements)
126 nElements = kDefaultElementsPerPage;
127 auto elementSize = columnHandle.fColumn->GetElement()->GetSize();
128 return fPageAllocator->NewPage(columnHandle.fId, elementSize, nElements);
131 void ROOT::Experimental::Detail::RPageSinkRoot::ReleasePage(RPage &page)
133 fPageAllocator->DeletePage(page);
140 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageAllocatorKey::NewPage(
141 ColumnId_t columnId,
void *mem, std::size_t elementSize, std::size_t nElements)
143 RPage newPage(columnId, mem, elementSize * nElements, elementSize);
144 newPage.TryGrow(nElements);
148 void ROOT::Experimental::Detail::RPageAllocatorKey::DeletePage(
149 const RPage& page, ROOT::Experimental::Internal::RNTupleBlob *payload)
153 R__ASSERT(page.GetBuffer() == payload->fContent);
154 free(payload->fContent);
162 ROOT::Experimental::Detail::RPageSourceRoot::RPageSourceRoot(std::string_view ntupleName, std::string_view path,
163 const RNTupleReadOptions &options)
164 : RPageSource(ntupleName, options)
165 , fMetrics(
"RPageSourceRoot")
166 , fPageAllocator(std::make_unique<RPageAllocatorKey>())
167 , fPagePool(std::make_shared<RPagePool>())
169 fFile = std::unique_ptr<TFile>(TFile::Open(std::string(path).c_str(),
"READ"));
173 ROOT::Experimental::Detail::RPageSourceRoot::~RPageSourceRoot()
180 ROOT::Experimental::RNTupleDescriptor ROOT::Experimental::Detail::RPageSourceRoot::DoAttach()
182 fDirectory = fFile->GetDirectory(fNTupleName.c_str());
183 RNTupleDescriptorBuilder descBuilder;
185 auto keyRawNTupleHeader = fDirectory->GetKey(kKeyNTupleHeader);
186 auto ntupleRawHeader = keyRawNTupleHeader->ReadObject<ROOT::Experimental::Internal::RNTupleBlob>();
187 descBuilder.SetFromHeader(ntupleRawHeader->fContent);
188 free(ntupleRawHeader->fContent);
189 delete ntupleRawHeader;
191 auto keyRawNTupleFooter = fDirectory->GetKey(kKeyNTupleFooter);
192 auto ntupleRawFooter = keyRawNTupleFooter->ReadObject<ROOT::Experimental::Internal::RNTupleBlob>();
193 descBuilder.AddClustersFromFooter(ntupleRawFooter->fContent);
194 free(ntupleRawFooter->fContent);
195 delete ntupleRawFooter;
197 return descBuilder.MoveDescriptor();
201 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRoot::PopulatePageFromCluster(
202 ColumnHandle_t columnHandle,
const RClusterDescriptor &clusterDescriptor, ClusterSize_t::ValueType clusterIndex)
204 auto columnId = columnHandle.fId;
205 auto clusterId = clusterDescriptor.GetId();
206 const auto &pageRange = clusterDescriptor.GetPageRange(columnId);
209 RClusterDescriptor::RPageRange::RPageInfo pageInfo;
210 decltype(clusterIndex) firstInPage = 0;
211 for (const auto &pi : pageRange.fPageInfos) {
212 if (firstInPage + pi.fNElements > clusterIndex) {
216 firstInPage += pi.fNElements;
218 R__ASSERT(firstInPage <= clusterIndex);
219 R__ASSERT((firstInPage + pageInfo.fNElements) > clusterIndex);
223 std::string keyName = std::string(kKeyPagePayload) +
224 std::to_string(clusterId) + kKeySeparator +
225 std::to_string(pageInfo.fLocator.fPosition);
226 auto pageKey = fDirectory->GetKey(keyName.c_str());
227 auto pagePayload = pageKey->ReadObject<ROOT::Experimental::Internal::RNTupleBlob>();
229 unsigned char *buffer = pagePayload->fContent;
230 auto element = columnHandle.fColumn->GetElement();
231 auto elementSize = element->GetSize();
232 if (!element->IsMappable()) {
233 auto pageSize = elementSize * pageInfo.fNElements;
234 buffer =
reinterpret_cast<unsigned char *
>(malloc(pageSize));
235 R__ASSERT(buffer !=
nullptr);
236 element->Unpack(buffer, pagePayload->fContent, pageInfo.fNElements);
237 free(pagePayload->fContent);
238 pagePayload->fContent = buffer;
239 pagePayload->fSize = pageSize;
242 auto indexOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
243 auto newPage = fPageAllocator->NewPage(columnId, pagePayload->fContent, elementSize, pageInfo.fNElements);
244 newPage.SetWindow(indexOffset + firstInPage, RPage::RClusterInfo(clusterId, indexOffset));
245 fPagePool->RegisterPage(newPage,
246 RPageDeleter([](
const RPage &page,
void *userData)
248 RPageAllocatorKey::DeletePage(page, reinterpret_cast<ROOT::Experimental::Internal::RNTupleBlob *>(userData));
254 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRoot::PopulatePage(
255 ColumnHandle_t columnHandle, NTupleSize_t globalIndex)
257 auto columnId = columnHandle.fId;
258 auto cachedPage = fPagePool->GetPage(columnId, globalIndex);
259 if (!cachedPage.IsNull())
262 auto clusterId = fDescriptor.FindClusterId(columnId, globalIndex);
263 R__ASSERT(clusterId != kInvalidDescriptorId);
264 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
265 auto selfOffset = clusterDescriptor.GetColumnRange(columnId).fFirstElementIndex;
266 R__ASSERT(selfOffset <= globalIndex);
267 return PopulatePageFromCluster(columnHandle, clusterDescriptor, globalIndex - selfOffset);
271 ROOT::Experimental::Detail::RPage ROOT::Experimental::Detail::RPageSourceRoot::PopulatePage(
272 ColumnHandle_t columnHandle,
const RClusterIndex &clusterIndex)
274 auto clusterId = clusterIndex.GetClusterId();
275 auto index = clusterIndex.GetIndex();
276 auto columnId = columnHandle.fId;
277 auto cachedPage = fPagePool->GetPage(columnId, clusterIndex);
278 if (!cachedPage.IsNull())
281 R__ASSERT(clusterId != kInvalidDescriptorId);
282 const auto &clusterDescriptor = fDescriptor.GetClusterDescriptor(clusterId);
283 return PopulatePageFromCluster(columnHandle, clusterDescriptor, index);
286 void ROOT::Experimental::Detail::RPageSourceRoot::ReleasePage(RPage &page)
288 fPagePool->ReturnPage(page);
291 std::unique_ptr<ROOT::Experimental::Detail::RPageSource> ROOT::Experimental::Detail::RPageSourceRoot::Clone()
const
293 return std::make_unique<RPageSourceRoot>(fNTupleName, fFile->GetName(), fOptions);