Logo ROOT   6.30.04
Reference Guide
 All Namespaces Files Pages
RCsvDS.cxx
Go to the documentation of this file.
1 // Author: Enric Tejedor CERN 10/2017
2 
3 /*************************************************************************
4  * Copyright (C) 1995-2017, Rene Brun and Fons Rademakers. *
5  * All rights reserved. *
6  * *
7  * For the licensing terms see $ROOTSYS/LICENSE. *
8  * For the list of contributors see $ROOTSYS/README/CREDITS. *
9  *************************************************************************/
10 
11 // clang-format off
12 /** \class ROOT::RDF::RCsvDS
13  \ingroup dataframe
14  \brief RDataFrame data source class for reading CSV files.
15 
16 The RCsvDS class implements a CSV file reader for RDataFrame.
17 
18 A RDataFrame that reads from a CSV file can be constructed using the factory method
19 ROOT::RDF::MakeCsvDataFrame, which accepts three parameters:
20 1. Path to the CSV file.
21 2. Boolean that specifies whether the first row of the CSV file contains headers or
22 not (optional, default `true`). If `false`, header names will be automatically generated as Col0, Col1, ..., ColN.
23 3. Delimiter (optional, default ',').
24 
25 The types of the columns in the CSV file are automatically inferred. The supported
26 types are:
27 - Integer: stored as a 64-bit long long int.
28 - Floating point number: stored with double precision.
29 - Boolean: matches the literals `true` and `false`.
30 - String: stored as an std::string, matches anything that does not fall into any of the
31 previous types.
32 
33 These are some formatting rules expected by the RCsvDS implementation:
34 - All records must have the same number of fields, in the same order.
35 - Any field may be quoted.
36 ~~~
37  "1997","Ford","E350"
38 ~~~
39 - Fields with embedded delimiters (e.g. comma) must be quoted.
40 ~~~
41  1997,Ford,E350,"Super, luxurious truck"
42 ~~~
43 - Fields with double-quote characters must be quoted, and each of the embedded
44 double-quote characters must be represented by a pair of double-quote characters.
45 ~~~
46  1997,Ford,E350,"Super, ""luxurious"" truck"
47 ~~~
48 - Fields with embedded line breaks are not supported, even when quoted.
49 ~~~
50  1997,Ford,E350,"Go get one now
51  they are going fast"
52 ~~~
53 - Spaces are considered part of a field and are not ignored.
54 ~~~
55  1997, Ford , E350
56  not same as
57  1997,Ford,E350
58  but same as
59  1997, "Ford" , E350
60 ~~~
61 - If a header row is provided, it must contain column names for each of the fields.
62 ~~~
63  Year,Make,Model
64  1997,Ford,E350
65  2000,Mercury,Cougar
66 ~~~
67 
68 The current implementation of RCsvDS reads the entire CSV file content into memory before
69 RDataFrame starts processing it. Therefore, before creating a CSV RDataFrame, it is
70 important to check both how much memory is available and the size of the CSV file.
71 */
72 // clang-format on
73 
74 #include <ROOT/RDF/Utils.hxx>
75 #include <ROOT/TSeq.hxx>
76 #include <ROOT/RCsvDS.hxx>
77 #include <ROOT/RMakeUnique.hxx>
78 #include <TError.h>
79 
80 #include <algorithm>
81 #include <iostream>
82 #include <sstream>
83 #include <string>
84 
85 namespace ROOT {
86 
87 namespace RDF {
88 
89 std::string RCsvDS::AsString()
90 {
91  return "CSV data source";
92 }
93 
94 // Regular expressions for type inference
95 TRegexp RCsvDS::intRegex("^[-+]?[0-9]+$");
96 TRegexp RCsvDS::doubleRegex1("^[-+]?[0-9]+\\.[0-9]*$");
97 TRegexp RCsvDS::doubleRegex2("^[-+]?[0-9]*\\.[0-9]+$");
98 TRegexp RCsvDS::doubleRegex3("^[-+]?[0-9]*\\.[0-9]+[eEdDqQ][-+]?[0-9]+$");
99 TRegexp RCsvDS::trueRegex("^true$");
100 TRegexp RCsvDS::falseRegex("^false$");
101 
102 const std::map<RCsvDS::ColType_t, std::string>
103  RCsvDS::fgColTypeMap({{'b', "bool"}, {'d', "double"}, {'l', "Long64_t"}, {'s', "std::string"}});
104 
105 void RCsvDS::FillHeaders(const std::string &line)
106 {
107  auto columns = ParseColumns(line);
108  for (auto &col : columns) {
109  fHeaders.emplace_back(col);
110  }
111 }
112 
113 void RCsvDS::FillRecord(const std::string &line, Record_t &record)
114 {
115  std::istringstream lineStream(line);
116  auto i = 0U;
117 
118  auto columns = ParseColumns(line);
119 
120  for (auto &col : columns) {
121  auto colType = fColTypes[fHeaders[i]];
122 
123  switch (colType) {
124  case 'd': {
125  record.emplace_back(new double(std::stod(col)));
126  break;
127  }
128  case 'l': {
129  record.emplace_back(new Long64_t(std::stoll(col)));
130  break;
131  }
132  case 'b': {
133  auto b = new bool();
134  record.emplace_back(b);
135  std::istringstream is(col);
136  is >> std::boolalpha >> *b;
137  break;
138  }
139  case 's': {
140  record.emplace_back(new std::string(col));
141  break;
142  }
143  }
144  ++i;
145  }
146 }
147 
148 void RCsvDS::GenerateHeaders(size_t size)
149 {
150  for (size_t i = 0; i < size; ++i) {
151  fHeaders.push_back("Col" + std::to_string(i));
152  }
153 }
154 
155 std::vector<void *> RCsvDS::GetColumnReadersImpl(std::string_view colName, const std::type_info &ti)
156 {
157  const auto colType = GetType(colName);
158 
159  if ((colType == 'd' && typeid(double) != ti) || (colType == 'l' && typeid(Long64_t) != ti) ||
160  (colType == 's' && typeid(std::string) != ti) || (colType == 'b' && typeid(bool) != ti)) {
161  std::string err = "The type selected for column \"";
162  err += colName;
163  err += "\" does not correspond to column type, which is ";
164  err += fgColTypeMap.at(colType);
165  throw std::runtime_error(err);
166  }
167 
168  const auto &colNames = GetColumnNames();
169  const auto index = std::distance(colNames.begin(), std::find(colNames.begin(), colNames.end(), colName));
170  std::vector<void *> ret(fNSlots);
171  for (auto slot : ROOT::TSeqU(fNSlots)) {
172  auto &val = fColAddresses[index][slot];
173  if (ti == typeid(double)) {
174  val = &fDoubleEvtValues[index][slot];
175  } else if (ti == typeid(Long64_t)) {
176  val = &fLong64EvtValues[index][slot];
177  } else if (ti == typeid(std::string)) {
178  val = &fStringEvtValues[index][slot];
179  } else {
180  val = &fBoolEvtValues[index][slot];
181  }
182  ret[slot] = &val;
183  }
184  return ret;
185 }
186 
187 void RCsvDS::InferColTypes(std::vector<std::string> &columns)
188 {
189  auto i = 0U;
190  for (auto &col : columns) {
191  InferType(col, i);
192  ++i;
193  }
194 }
195 
196 void RCsvDS::InferType(const std::string &col, unsigned int idxCol)
197 {
198  ColType_t type;
199  int dummy;
200 
201  if (intRegex.Index(col, &dummy) != -1) {
202  type = 'l'; // Long64_t
203  } else if (doubleRegex1.Index(col, &dummy) != -1 ||
204  doubleRegex2.Index(col, &dummy) != -1 ||
205  doubleRegex3.Index(col, &dummy) != -1) {
206  type = 'd'; // double
207  } else if (trueRegex.Index(col, &dummy) != -1 || falseRegex.Index(col, &dummy) != -1) {
208  type = 'b'; // bool
209  } else { // everything else is a string
210  type = 's'; // std::string
211  }
212  // TODO: Date
213 
214  fColTypes[fHeaders[idxCol]] = type;
215  fColTypesList.push_back(type);
216 }
217 
218 std::vector<std::string> RCsvDS::ParseColumns(const std::string &line)
219 {
220  std::vector<std::string> columns;
221 
222  for (size_t i = 0; i < line.size(); ++i) {
223  i = ParseValue(line, columns, i);
224  }
225 
226  return columns;
227 }
228 
229 size_t RCsvDS::ParseValue(const std::string &line, std::vector<std::string> &columns, size_t i)
230 {
231  std::stringstream val;
232  bool quoted = false;
233 
234  for (; i < line.size(); ++i) {
235  if (line[i] == fDelimiter && !quoted) {
236  break;
237  } else if (line[i] == '"') {
238  // Keep just one quote for escaped quotes, none for the normal quotes
239  if (line[i + 1] != '"') {
240  quoted = !quoted;
241  } else {
242  val << line[++i];
243  }
244  } else {
245  val << line[i];
246  }
247  }
248 
249  columns.emplace_back(val.str());
250 
251  return i;
252 }
253 
254 ////////////////////////////////////////////////////////////////////////
255 /// Constructor to create a CSV RDataSource for RDataFrame.
256 /// \param[in] fileName Path of the CSV file.
257 /// \param[in] readHeaders `true` if the CSV file contains headers as first row, `false` otherwise
258 /// (default `true`).
259 /// \param[in] delimiter Delimiter character (default ',').
260 RCsvDS::RCsvDS(std::string_view fileName, bool readHeaders, char delimiter, Long64_t linesChunkSize) // TODO: Let users specify types?
261  : fReadHeaders(readHeaders),
262  fStream(std::string(fileName)),
263  fDelimiter(delimiter),
264  fLinesChunkSize(linesChunkSize)
265 {
266  std::string line;
267 
268  // Read the headers if present
269  if (fReadHeaders) {
270  if (std::getline(fStream, line) && !line.empty()) {
271  FillHeaders(line);
272  } else {
273  std::string msg = "Error reading headers of CSV file ";
274  msg += fileName;
275  throw std::runtime_error(msg);
276  }
277  }
278 
279  fDataPos = fStream.tellg();
280  bool eof = false;
281  do {
282  eof = !std::getline(fStream, line);
283  } while (line.empty());
284  if (!eof) {
285  auto columns = ParseColumns(line);
286 
287  // Generate headers if not present
288  if (!fReadHeaders) {
289  GenerateHeaders(columns.size());
290  }
291 
292  // Infer types of columns with first record
293  InferColTypes(columns);
294 
295  // rewind
296  fStream.seekg(fDataPos);
297  } else {
298  std::string msg = "Could not infer column types of CSV file ";
299  msg += fileName;
300  throw std::runtime_error(msg);
301  }
302 }
303 
304 void RCsvDS::FreeRecords()
305 {
306  for (auto &record : fRecords) {
307  for (size_t i = 0; i < record.size(); ++i) {
308  void *p = record[i];
309  const auto colType = fColTypes[fHeaders[i]];
310  switch (colType) {
311  case 'd': {
312  delete static_cast<double *>(p);
313  break;
314  }
315  case 'l': {
316  delete static_cast<Long64_t *>(p);
317  break;
318  }
319  case 'b': {
320  delete static_cast<bool *>(p);
321  break;
322  }
323  case 's': {
324  delete static_cast<std::string *>(p);
325  break;
326  }
327  }
328  }
329  }
330  fRecords.clear();
331 }
332 
333 ////////////////////////////////////////////////////////////////////////
334 /// Destructor.
335 RCsvDS::~RCsvDS()
336 {
337  FreeRecords();
338 }
339 
340 void RCsvDS::Finalise()
341 {
342  fStream.clear();
343  fStream.seekg(fDataPos);
344  fProcessedLines = 0ULL;
345  fEntryRangesRequested = 0ULL;
346  FreeRecords();
347 }
348 
349 const std::vector<std::string> &RCsvDS::GetColumnNames() const
350 {
351  return fHeaders;
352 }
353 
354 std::vector<std::pair<ULong64_t, ULong64_t>> RCsvDS::GetEntryRanges()
355 {
356 
357  // Read records and store them in memory
358  auto linesToRead = fLinesChunkSize;
359  FreeRecords();
360 
361  std::string line;
362  while ((-1LL == fLinesChunkSize || 0 != linesToRead) && std::getline(fStream, line)) {
363  if (line.empty()) continue; // skip empty lines
364  fRecords.emplace_back();
365  FillRecord(line, fRecords.back());
366  --linesToRead;
367  }
368 
369  if (gDebug > 0) {
370  if (fLinesChunkSize == -1LL) {
371  Info("GetEntryRanges", "Attempted to read entire CSV file into memory, %lu lines read", fRecords.size());
372  } else {
373  Info("GetEntryRanges", "Attempted to read chunk of %lld lines of CSV file into memory, %lu lines read", fLinesChunkSize, fRecords.size());
374  }
375  }
376 
377  std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
378  const auto nRecords = fRecords.size();
379  if (0 == nRecords)
380  return entryRanges;
381 
382  const auto chunkSize = nRecords / fNSlots;
383  const auto remainder = 1U == fNSlots ? 0 : nRecords % fNSlots;
384  auto start = 0ULL == fEntryRangesRequested ? 0ULL : fProcessedLines;
385  auto end = start;
386 
387  for (auto i : ROOT::TSeqU(fNSlots)) {
388  start = end;
389  end += chunkSize;
390  entryRanges.emplace_back(start, end);
391  (void)i;
392  }
393  entryRanges.back().second += remainder;
394 
395  fProcessedLines += nRecords;
396  fEntryRangesRequested++;
397 
398  return entryRanges;
399 }
400 
401 RCsvDS::ColType_t RCsvDS::GetType(std::string_view colName) const
402 {
403  if (!HasColumn(colName)) {
404  std::string msg = "The dataset does not have column ";
405  msg += colName;
406  throw std::runtime_error(msg);
407  }
408 
409  return fColTypes.at(colName.data());
410 }
411 
412 std::string RCsvDS::GetTypeName(std::string_view colName) const
413 {
414  return fgColTypeMap.at(GetType(colName));
415 }
416 
417 bool RCsvDS::HasColumn(std::string_view colName) const
418 {
419  return fHeaders.end() != std::find(fHeaders.begin(), fHeaders.end(), colName);
420 }
421 
422 bool RCsvDS::SetEntry(unsigned int slot, ULong64_t entry)
423 {
424  // Here we need to normalise the entry to the number of lines we already processed.
425  const auto offset = (fEntryRangesRequested - 1) * fLinesChunkSize;
426  const auto recordPos = entry - offset;
427  int colIndex = 0;
428  for (auto &colType : fColTypesList) {
429  auto dataPtr = fRecords[recordPos][colIndex];
430  switch (colType) {
431  case 'd': {
432  fDoubleEvtValues[colIndex][slot] = *static_cast<double *>(dataPtr);
433  break;
434  }
435  case 'l': {
436  fLong64EvtValues[colIndex][slot] = *static_cast<Long64_t *>(dataPtr);
437  break;
438  }
439  case 'b': {
440  fBoolEvtValues[colIndex][slot] = *static_cast<bool *>(dataPtr);
441  break;
442  }
443  case 's': {
444  fStringEvtValues[colIndex][slot] = *static_cast<std::string *>(dataPtr);
445  break;
446  }
447  }
448  colIndex++;
449  }
450  return true;
451 }
452 
453 void RCsvDS::SetNSlots(unsigned int nSlots)
454 {
455  R__ASSERT(0U == fNSlots && "Setting the number of slots even if the number of slots is different from zero.");
456 
457  fNSlots = nSlots;
458 
459  const auto nColumns = fHeaders.size();
460  // Initialise the entire set of addresses
461  fColAddresses.resize(nColumns, std::vector<void *>(fNSlots, nullptr));
462 
463  // Initialize the per event data holders
464  fDoubleEvtValues.resize(nColumns, std::vector<double>(fNSlots));
465  fLong64EvtValues.resize(nColumns, std::vector<Long64_t>(fNSlots));
466  fStringEvtValues.resize(nColumns, std::vector<std::string>(fNSlots));
467  fBoolEvtValues.resize(nColumns, std::deque<bool>(fNSlots));
468 }
469 
470 std::string RCsvDS::GetLabel()
471 {
472  return "RCsv";
473 }
474 
475 RDataFrame MakeCsvDataFrame(std::string_view fileName, bool readHeaders, char delimiter, Long64_t linesChunkSize)
476 {
477  ROOT::RDataFrame tdf(std::make_unique<RCsvDS>(fileName, readHeaders, delimiter, linesChunkSize));
478  return tdf;
479 }
480 
481 } // ns RDF
482 
483 } // ns ROOT