89 std::string RCsvDS::AsString()
91 return "CSV data source";
95 TRegexp RCsvDS::intRegex(
"^[-+]?[0-9]+$");
96 TRegexp RCsvDS::doubleRegex1(
"^[-+]?[0-9]+\\.[0-9]*$");
97 TRegexp RCsvDS::doubleRegex2(
"^[-+]?[0-9]*\\.[0-9]+$");
98 TRegexp RCsvDS::doubleRegex3(
"^[-+]?[0-9]*\\.[0-9]+[eEdDqQ][-+]?[0-9]+$");
99 TRegexp RCsvDS::trueRegex(
"^true$");
100 TRegexp RCsvDS::falseRegex(
"^false$");
102 const std::map<RCsvDS::ColType_t, std::string>
103 RCsvDS::fgColTypeMap({{
'b',
"bool"}, {
'd',
"double"}, {
'l',
"Long64_t"}, {
's',
"std::string"}});
105 void RCsvDS::FillHeaders(
const std::string &line)
107 auto columns = ParseColumns(line);
108 for (
auto &col : columns) {
109 fHeaders.emplace_back(col);
113 void RCsvDS::FillRecord(
const std::string &line, Record_t &record)
115 std::istringstream lineStream(line);
118 auto columns = ParseColumns(line);
120 for (
auto &col : columns) {
121 auto colType = fColTypes[fHeaders[i]];
125 record.emplace_back(
new double(std::stod(col)));
129 record.emplace_back(
new Long64_t(std::stoll(col)));
134 record.emplace_back(b);
135 std::istringstream is(col);
136 is >> std::boolalpha >> *b;
140 record.emplace_back(
new std::string(col));
148 void RCsvDS::GenerateHeaders(
size_t size)
150 for (
size_t i = 0; i < size; ++i) {
151 fHeaders.push_back(
"Col" + std::to_string(i));
155 std::vector<void *> RCsvDS::GetColumnReadersImpl(std::string_view colName,
const std::type_info &ti)
157 const auto colType = GetType(colName);
159 if ((colType ==
'd' &&
typeid(
double) != ti) || (colType ==
'l' &&
typeid(Long64_t) != ti) ||
160 (colType ==
's' &&
typeid(std::string) != ti) || (colType ==
'b' &&
typeid(
bool) != ti)) {
161 std::string err =
"The type selected for column \"";
163 err +=
"\" does not correspond to column type, which is ";
164 err += fgColTypeMap.at(colType);
165 throw std::runtime_error(err);
168 const auto &colNames = GetColumnNames();
169 const auto index = std::distance(colNames.begin(), std::find(colNames.begin(), colNames.end(), colName));
170 std::vector<void *> ret(fNSlots);
171 for (
auto slot : ROOT::TSeqU(fNSlots)) {
172 auto &val = fColAddresses[index][slot];
173 if (ti ==
typeid(
double)) {
174 val = &fDoubleEvtValues[index][slot];
175 }
else if (ti ==
typeid(Long64_t)) {
176 val = &fLong64EvtValues[index][slot];
177 }
else if (ti ==
typeid(std::string)) {
178 val = &fStringEvtValues[index][slot];
180 val = &fBoolEvtValues[index][slot];
187 void RCsvDS::InferColTypes(std::vector<std::string> &columns)
190 for (
auto &col : columns) {
196 void RCsvDS::InferType(
const std::string &col,
unsigned int idxCol)
201 if (intRegex.Index(col, &dummy) != -1) {
203 }
else if (doubleRegex1.Index(col, &dummy) != -1 ||
204 doubleRegex2.Index(col, &dummy) != -1 ||
205 doubleRegex3.Index(col, &dummy) != -1) {
207 }
else if (trueRegex.Index(col, &dummy) != -1 || falseRegex.Index(col, &dummy) != -1) {
214 fColTypes[fHeaders[idxCol]] = type;
215 fColTypesList.push_back(type);
218 std::vector<std::string> RCsvDS::ParseColumns(
const std::string &line)
220 std::vector<std::string> columns;
222 for (
size_t i = 0; i < line.size(); ++i) {
223 i = ParseValue(line, columns, i);
229 size_t RCsvDS::ParseValue(
const std::string &line, std::vector<std::string> &columns,
size_t i)
231 std::stringstream val;
234 for (; i < line.size(); ++i) {
235 if (line[i] == fDelimiter && !quoted) {
237 }
else if (line[i] ==
'"') {
239 if (line[i + 1] !=
'"') {
249 columns.emplace_back(val.str());
260 RCsvDS::RCsvDS(std::string_view fileName,
bool readHeaders,
char delimiter, Long64_t linesChunkSize)
261 : fReadHeaders(readHeaders),
262 fStream(std::string(fileName)),
263 fDelimiter(delimiter),
264 fLinesChunkSize(linesChunkSize)
270 if (std::getline(fStream, line) && !line.empty()) {
273 std::string msg =
"Error reading headers of CSV file ";
275 throw std::runtime_error(msg);
279 fDataPos = fStream.tellg();
282 eof = !std::getline(fStream, line);
283 }
while (line.empty());
285 auto columns = ParseColumns(line);
289 GenerateHeaders(columns.size());
293 InferColTypes(columns);
296 fStream.seekg(fDataPos);
298 std::string msg =
"Could not infer column types of CSV file ";
300 throw std::runtime_error(msg);
304 void RCsvDS::FreeRecords()
306 for (
auto &record : fRecords) {
307 for (
size_t i = 0; i < record.size(); ++i) {
309 const auto colType = fColTypes[fHeaders[i]];
312 delete static_cast<double *
>(p);
316 delete static_cast<Long64_t *
>(p);
320 delete static_cast<bool *
>(p);
324 delete static_cast<std::string *
>(p);
340 void RCsvDS::Finalise()
343 fStream.seekg(fDataPos);
344 fProcessedLines = 0ULL;
345 fEntryRangesRequested = 0ULL;
349 const std::vector<std::string> &RCsvDS::GetColumnNames()
const
354 std::vector<std::pair<ULong64_t, ULong64_t>> RCsvDS::GetEntryRanges()
358 auto linesToRead = fLinesChunkSize;
362 while ((-1LL == fLinesChunkSize || 0 != linesToRead) && std::getline(fStream, line)) {
363 if (line.empty())
continue;
364 fRecords.emplace_back();
365 FillRecord(line, fRecords.back());
370 if (fLinesChunkSize == -1LL) {
371 Info(
"GetEntryRanges",
"Attempted to read entire CSV file into memory, %lu lines read", fRecords.size());
373 Info(
"GetEntryRanges",
"Attempted to read chunk of %lld lines of CSV file into memory, %lu lines read", fLinesChunkSize, fRecords.size());
377 std::vector<std::pair<ULong64_t, ULong64_t>> entryRanges;
378 const auto nRecords = fRecords.size();
382 const auto chunkSize = nRecords / fNSlots;
383 const auto remainder = 1U == fNSlots ? 0 : nRecords % fNSlots;
384 auto start = 0ULL == fEntryRangesRequested ? 0ULL : fProcessedLines;
387 for (
auto i : ROOT::TSeqU(fNSlots)) {
390 entryRanges.emplace_back(start, end);
393 entryRanges.back().second += remainder;
395 fProcessedLines += nRecords;
396 fEntryRangesRequested++;
401 RCsvDS::ColType_t RCsvDS::GetType(std::string_view colName)
const
403 if (!HasColumn(colName)) {
404 std::string msg =
"The dataset does not have column ";
406 throw std::runtime_error(msg);
409 return fColTypes.at(colName.data());
412 std::string RCsvDS::GetTypeName(std::string_view colName)
const
414 return fgColTypeMap.at(GetType(colName));
417 bool RCsvDS::HasColumn(std::string_view colName)
const
419 return fHeaders.end() != std::find(fHeaders.begin(), fHeaders.end(), colName);
422 bool RCsvDS::SetEntry(
unsigned int slot, ULong64_t entry)
425 const auto offset = (fEntryRangesRequested - 1) * fLinesChunkSize;
426 const auto recordPos = entry - offset;
428 for (
auto &colType : fColTypesList) {
429 auto dataPtr = fRecords[recordPos][colIndex];
432 fDoubleEvtValues[colIndex][slot] = *
static_cast<double *
>(dataPtr);
436 fLong64EvtValues[colIndex][slot] = *
static_cast<Long64_t *
>(dataPtr);
440 fBoolEvtValues[colIndex][slot] = *
static_cast<bool *
>(dataPtr);
444 fStringEvtValues[colIndex][slot] = *
static_cast<std::string *
>(dataPtr);
453 void RCsvDS::SetNSlots(
unsigned int nSlots)
455 R__ASSERT(0U == fNSlots &&
"Setting the number of slots even if the number of slots is different from zero.");
459 const auto nColumns = fHeaders.size();
461 fColAddresses.resize(nColumns, std::vector<void *>(fNSlots,
nullptr));
464 fDoubleEvtValues.resize(nColumns, std::vector<double>(fNSlots));
465 fLong64EvtValues.resize(nColumns, std::vector<Long64_t>(fNSlots));
466 fStringEvtValues.resize(nColumns, std::vector<std::string>(fNSlots));
467 fBoolEvtValues.resize(nColumns, std::deque<bool>(fNSlots));
470 std::string RCsvDS::GetLabel()
475 RDataFrame MakeCsvDataFrame(std::string_view fileName,
bool readHeaders,
char delimiter, Long64_t linesChunkSize)
477 ROOT::RDataFrame tdf(std::make_unique<RCsvDS>(fileName, readHeaders, delimiter, linesChunkSize));