Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
RColumn.hxx
Go to the documentation of this file.
1 /// \file ROOT/RColumn.hxx
2 /// \ingroup NTuple ROOT7
3 /// \author Jakob Blomer <jblomer@cern.ch>
4 /// \date 2018-10-09
5 /// \warning This is part of the ROOT 7 prototype! It will change without notice. It might trigger earthquakes. Feedback
6 /// is welcome!
7 
8 /*************************************************************************
9  * Copyright (C) 1995-2019, Rene Brun and Fons Rademakers. *
10  * All rights reserved. *
11  * *
12  * For the licensing terms see $ROOTSYS/LICENSE. *
13  * For the list of contributors see $ROOTSYS/README/CREDITS. *
14  *************************************************************************/
15 
16 #ifndef ROOT7_RColumn
17 #define ROOT7_RColumn
18 
19 #include <ROOT/RColumnElement.hxx>
20 #include <ROOT/RColumnModel.hxx>
21 #include <ROOT/RNTupleUtil.hxx>
22 #include <ROOT/RPage.hxx>
23 #include <ROOT/RPageStorage.hxx>
24 
25 #include <TError.h>
26 
27 #include <memory>
28 #include <vector>
29 
30 namespace ROOT {
31 namespace Experimental {
32 namespace Detail {
33 
34 // clang-format off
35 /**
36 \class ROOT::Experimental::RColumn
37 \ingroup NTuple
38 \brief A column is a storage-backed array of a simple, fixed-size type, from which pages can be mapped into memory.
39 
40 On the primitives data layer, the RColumn and RColumnElement are the equivalents to RField and RTreeValue on the
41 logical data layer.
42 */
43 // clang-format on
44 class RColumn {
45 private:
46  RColumnModel fModel;
47  /**
48  * Columns belonging to the same field are distinguished by their order. E.g. for an std::string field, there is
49  * the offset column with index 0 and the character value column with index 1.
50  */
51  std::uint32_t fIndex;
52  RPageSink *fPageSink;
53  RPageSource *fPageSource;
54  RPageStorage::ColumnHandle_t fHandleSink;
55  RPageStorage::ColumnHandle_t fHandleSource;
56  /// Open page into which new elements are being written
57  RPage fHeadPage;
58  /// The number of elements written resp. available in the column
59  NTupleSize_t fNElements;
60  /// The currently mapped page for reading
61  RPage fCurrentPage;
62  /// The column id is used to find matching pages with content when reading
63  ColumnId_t fColumnIdSource;
64  /// Used to pack and unpack pages on writing/reading
65  std::unique_ptr<RColumnElementBase> fElement;
66 
67  RColumn(const RColumnModel &model, std::uint32_t index);
68 
69 public:
70  template <typename CppT, EColumnType ColumnT>
71  static RColumn *Create(const RColumnModel &model, std::uint32_t index) {
72  R__ASSERT(model.GetType() == ColumnT);
73  auto column = new RColumn(model, index);
74  column->fElement = std::unique_ptr<RColumnElementBase>(new RColumnElement<CppT, ColumnT>(nullptr));
75  return column;
76  }
77 
78  RColumn(const RColumn&) = delete;
79  RColumn &operator =(const RColumn&) = delete;
80  ~RColumn();
81 
82  void Connect(DescriptorId_t fieldId, RPageStorage *pageStorage);
83 
84  void Append(const RColumnElementBase &element) {
85  void *dst = fHeadPage.TryGrow(1);
86  if (dst == nullptr) {
87  Flush();
88  dst = fHeadPage.TryGrow(1);
89  R__ASSERT(dst != nullptr);
90  }
91  element.WriteTo(dst, 1);
92  fNElements++;
93  }
94 
95  void AppendV(const RColumnElementBase &elemArray, std::size_t count) {
96  void *dst = fHeadPage.TryGrow(count);
97  if (dst == nullptr) {
98  for (unsigned i = 0; i < count; ++i) {
99  Append(RColumnElementBase(elemArray, i));
100  }
101  return;
102  }
103  elemArray.WriteTo(dst, count);
104  fNElements += count;
105  }
106 
107  void Read(const NTupleSize_t globalIndex, RColumnElementBase *element) {
108  if (!fCurrentPage.Contains(globalIndex)) {
109  MapPage(globalIndex);
110  }
111  void *src = static_cast<unsigned char *>(fCurrentPage.GetBuffer()) +
112  (globalIndex - fCurrentPage.GetGlobalRangeFirst()) * element->GetSize();
113  element->ReadFrom(src, 1);
114  }
115 
116  void Read(const RClusterIndex &clusterIndex, RColumnElementBase *element) {
117  if (!fCurrentPage.Contains(clusterIndex)) {
118  MapPage(clusterIndex);
119  }
120  void *src = static_cast<unsigned char *>(fCurrentPage.GetBuffer()) +
121  (clusterIndex.GetIndex() - fCurrentPage.GetClusterRangeFirst()) * element->GetSize();
122  element->ReadFrom(src, 1);
123  }
124 
125  void ReadV(const NTupleSize_t globalIndex, const ClusterSize_t::ValueType count, RColumnElementBase *elemArray) {
126  if (!fCurrentPage.Contains(globalIndex)) {
127  MapPage(globalIndex);
128  }
129  NTupleSize_t idxInPage = globalIndex - fCurrentPage.GetGlobalRangeFirst();
130 
131  void *src = static_cast<unsigned char *>(fCurrentPage.GetBuffer()) + idxInPage * elemArray->GetSize();
132  if (globalIndex + count <= fCurrentPage.GetGlobalRangeLast() + 1) {
133  elemArray->ReadFrom(src, count);
134  } else {
135  ClusterSize_t::ValueType nBatch = fCurrentPage.GetNElements() - idxInPage;
136  elemArray->ReadFrom(src, nBatch);
137  RColumnElementBase elemTail(*elemArray, nBatch);
138  ReadV(globalIndex + nBatch, count - nBatch, &elemTail);
139  }
140  }
141 
142  void ReadV(const RClusterIndex &clusterIndex, const ClusterSize_t::ValueType count, RColumnElementBase *elemArray)
143  {
144  if (!fCurrentPage.Contains(clusterIndex)) {
145  MapPage(clusterIndex);
146  }
147  NTupleSize_t idxInPage = clusterIndex.GetIndex() - fCurrentPage.GetClusterRangeFirst();
148 
149  void* src = static_cast<unsigned char *>(fCurrentPage.GetBuffer()) + idxInPage * elemArray->GetSize();
150  if (clusterIndex.GetIndex() + count <= fCurrentPage.GetClusterRangeLast() + 1) {
151  elemArray->ReadFrom(src, count);
152  } else {
153  ClusterSize_t::ValueType nBatch = fCurrentPage.GetNElements() - idxInPage;
154  elemArray->ReadFrom(src, nBatch);
155  RColumnElementBase elemTail(*elemArray, nBatch);
156  ReadV(RClusterIndex(clusterIndex.GetClusterId(), clusterIndex.GetIndex() + nBatch), count - nBatch, &elemTail);
157  }
158  }
159 
160  template <typename CppT, EColumnType ColumnT>
161  CppT *Map(const NTupleSize_t globalIndex) {
162  if (!fCurrentPage.Contains(globalIndex)) {
163  MapPage(globalIndex);
164  }
165  return reinterpret_cast<CppT*>(
166  static_cast<unsigned char *>(fCurrentPage.GetBuffer()) +
167  (globalIndex - fCurrentPage.GetGlobalRangeFirst()) * RColumnElement<CppT, ColumnT>::kSize);
168  }
169 
170  template <typename CppT, EColumnType ColumnT>
171  CppT *Map(const RClusterIndex &clusterIndex) {
172  if (!fCurrentPage.Contains(clusterIndex)) {
173  MapPage(clusterIndex);
174  }
175  return reinterpret_cast<CppT*>(
176  static_cast<unsigned char *>(fCurrentPage.GetBuffer()) +
177  (clusterIndex.GetIndex() - fCurrentPage.GetClusterRangeFirst()) * RColumnElement<CppT, ColumnT>::kSize);
178  }
179 
180  NTupleSize_t GetGlobalIndex(const RClusterIndex &clusterIndex) {
181  if (!fCurrentPage.Contains(clusterIndex)) {
182  MapPage(clusterIndex);
183  }
184  return fCurrentPage.GetClusterInfo().GetIndexOffset() + clusterIndex.GetIndex();
185  }
186 
187  RClusterIndex GetClusterIndex(NTupleSize_t globalIndex) {
188  if (!fCurrentPage.Contains(globalIndex)) {
189  MapPage(globalIndex);
190  }
191  return RClusterIndex(fCurrentPage.GetClusterInfo().GetId(),
192  globalIndex - fCurrentPage.GetClusterInfo().GetIndexOffset());
193  }
194 
195  /// For offset columns only, look at the two adjacent values that define a collection's coordinates
196  void GetCollectionInfo(const NTupleSize_t globalIndex, RClusterIndex *collectionStart, ClusterSize_t *collectionSize)
197  {
198  auto idxStart = (globalIndex == 0) ? 0 : *Map<ClusterSize_t, EColumnType::kIndex>(globalIndex - 1);
199  auto idxEnd = *Map<ClusterSize_t, EColumnType::kIndex>(globalIndex);
200  auto selfOffset = fCurrentPage.GetClusterInfo().GetIndexOffset();
201  if (globalIndex == selfOffset) {
202  // Passed cluster boundary
203  idxStart = 0;
204  }
205  *collectionSize = idxEnd - idxStart;
206  *collectionStart = RClusterIndex(fCurrentPage.GetClusterInfo().GetId(), idxStart);
207  }
208 
209  void GetCollectionInfo(const RClusterIndex &clusterIndex,
210  RClusterIndex *collectionStart, ClusterSize_t *collectionSize)
211  {
212  auto index = clusterIndex.GetIndex();
213  auto idxStart = (index == 0) ? 0 : *Map<ClusterSize_t, EColumnType::kIndex>(clusterIndex - 1);
214  auto idxEnd = *Map<ClusterSize_t, EColumnType::kIndex>(clusterIndex);
215  *collectionSize = idxEnd - idxStart;
216  *collectionStart = RClusterIndex(clusterIndex.GetClusterId(), idxStart);
217  }
218 
219  /// Get the currently active cluster id
220  void GetSwitchInfo(NTupleSize_t globalIndex, RClusterIndex *varIndex, std::uint32_t *tag) {
221  auto varSwitch = Map<RColumnSwitch, EColumnType::kSwitch>(globalIndex);
222  *varIndex = RClusterIndex(fCurrentPage.GetClusterInfo().GetId(), varSwitch->GetIndex());
223  *tag = varSwitch->GetTag();
224  }
225 
226  void Flush();
227  void MapPage(const NTupleSize_t index);
228  void MapPage(const RClusterIndex &clusterIndex);
229  NTupleSize_t GetNElements() const { return fNElements; }
230  RColumnElementBase *GetElement() const { return fElement.get(); }
231  const RColumnModel &GetModel() const { return fModel; }
232  std::uint32_t GetIndex() const { return fIndex; }
233  ColumnId_t GetColumnIdSource() const { return fColumnIdSource; }
234  RPageSource *GetPageSource() const { return fPageSource; }
235  RPageStorage::ColumnHandle_t GetHandleSource() const { return fHandleSource; }
236  RPageStorage::ColumnHandle_t GetHandleSink() const { return fHandleSink; }
237  RNTupleVersion GetVersion() const { return RNTupleVersion(); }
238 };
239 
240 } // namespace Detail
241 
242 } // namespace Experimental
243 } // namespace ROOT
244 
245 #endif