diff --git a/include/internal/csv_stat.cpp b/include/internal/csv_stat.cpp index 0abfc4a8..874a6682 100644 --- a/include/internal/csv_stat.cpp +++ b/include/internal/csv_stat.cpp @@ -75,7 +75,7 @@ namespace csv { return ret; } - CSV_INLINE void CSVStat::calcChunk() { + CSV_INLINE void CSVStat::calc_chunk() { /** Only create stats counters the first time **/ if (dtypes.empty()) { /** Go through all records and calculate specified statistics */ @@ -110,12 +110,12 @@ namespace csv { /** Chunk rows */ if (this->records.size() == CALC_CHUNK_SIZE) { - calcChunk(); + calc_chunk(); } } if (!this->records.empty()) { - calcChunk(); + calc_chunk(); } } diff --git a/include/internal/csv_stat.hpp b/include/internal/csv_stat.hpp index ed3b8cd8..eab73434 100644 --- a/include/internal/csv_stat.hpp +++ b/include/internal/csv_stat.hpp @@ -51,7 +51,7 @@ namespace csv { void dtype(CSVField&, const size_t&); void calc(); - void calcChunk(); + void calc_chunk(); void calc_worker(const size_t&); CSVReader reader; diff --git a/single_include/csv.hpp b/single_include/csv.hpp index 01de78b7..8fb9287c 100644 --- a/single_include/csv.hpp +++ b/single_include/csv.hpp @@ -6335,7 +6335,7 @@ namespace csv { std::vector get_col_names() const; int index_of(csv::string_view col_name) const; ///@} - + /** @name CSV Metadata: Attributes */ ///@{ /** Whether or not the file or stream contains valid CSV rows, @@ -6378,7 +6378,7 @@ namespace csv { std::unique_ptr parser = nullptr; /** Queue of parsed CSV rows */ - RowCollection records = RowCollection(100); + std::unique_ptr records{new RowCollection(100)}; size_t n_cols = 0; /**< The number of columns in this CSV */ size_t _n_rows = 0; /**< How many rows (minus header) have been read so far */ @@ -6408,6 +6408,7 @@ namespace csv { void trim_header(); }; } + /** @file * Calculates statistics from CSV files */ @@ -6459,6 +6460,7 @@ namespace csv { void dtype(CSVField&, const size_t&); void calc(); + void calc_chunk(); void calc_worker(const size_t&); CSVReader reader; @@ -6576,6 +6578,9 @@ namespace csv { * @tparam OutputStream The output stream, e.g. `std::ofstream`, `std::stringstream` * @tparam Delim The delimiter character * @tparam Quote The quote character + * @tparam Flush True: flush after every writing function, + * false: you need to flush explicitly if needed. + * In both cases the destructor will flush. * * @par Hint * Use the aliases csv::CSVWriter to write CSV @@ -6588,7 +6593,7 @@ namespace csv { * @par Example w/ std::tuple * @snippet test_write_csv.cpp CSV Writer Tuple Example */ - template + template class DelimWriter { public: /** Construct a DelimWriter over the specified output stream @@ -6605,6 +6610,13 @@ namespace csv { */ DelimWriter(const std::string& filename) : DelimWriter(std::ifstream(filename)) {}; + /** Destructor will flush remaining data + * + */ + ~DelimWriter() { + out.flush(); + } + /** Format a sequence of strings and write to CSV according to RFC 4180 * * @warning This does not check to make sure row lengths are consistent @@ -6620,7 +6632,7 @@ namespace csv { if (i + 1 != Size) out << Delim; } - out << std::endl; + end_out(); return *this; } @@ -6651,10 +6663,17 @@ namespace csv { i++; } - out << std::endl; + end_out(); return *this; } + /** Flushes the written data + * + */ + void flush() { + out.flush(); + } + private: template< typename T, @@ -6734,7 +6753,13 @@ namespace csv { template typename std::enable_if::type write_tuple(const std::tuple& record) { (void)record; - out << std::endl; + end_out(); + } + + /** Ends a line in 'out' and flushes, if Flush is true.*/ + void end_out() { + out << '\n'; + IF_CONSTEXPR(Flush) out.flush(); } OutputStream & out; @@ -6748,19 +6773,19 @@ namespace csv { * @note Use `csv::make_csv_writer()` to in instatiate this class over * an actual output stream. */ - template - using CSVWriter = DelimWriter; + template + using CSVWriter = DelimWriter; /** Class for writing tab-separated values files -* + * * @sa csv::DelimWriter::write_row() * @sa csv::DelimWriter::operator<<() * * @note Use `csv::make_tsv_writer()` to in instatiate this class over * an actual output stream. */ - template - using TSVWriter = DelimWriter; + template + using TSVWriter = DelimWriter; /** Return a csv::CSVWriter over the output stream */ template @@ -6768,11 +6793,23 @@ namespace csv { return CSVWriter(out, quote_minimal); } + /** Return a buffered csv::CSVWriter over the output stream (does not auto flush) */ + template + inline CSVWriter make_csv_writer_buffered(OutputStream& out, bool quote_minimal=true) { + return CSVWriter(out, quote_minimal); + } + /** Return a csv::TSVWriter over the output stream */ template inline TSVWriter make_tsv_writer(OutputStream& out, bool quote_minimal=true) { return TSVWriter(out, quote_minimal); } + + /** Return a buffered csv::TSVWriter over the output stream (does not auto flush) */ + template + inline TSVWriter make_tsv_writer_buffered(OutputStream& out, bool quote_minimal=true) { + return TSVWriter(out, quote_minimal); + } ///@} } @@ -7041,6 +7078,7 @@ namespace csv { } } + namespace csv { namespace internals { CSV_INLINE std::vector ColNames::get_col_names() const { @@ -7173,8 +7211,9 @@ namespace csv { for (size_t i = 0; i < row.size(); i++) { ret << row[i]; if (i + 1 < row.size()) ret << delim; - else ret << std::endl; + else ret << '\n'; } + ret.flush(); return ret.str(); } @@ -7190,7 +7229,7 @@ namespace csv { auto trim_chars = format.get_trim_chars(); std::stringstream source(head.data()); RowCollection rows; - + StreamParser parser(source, format); parser.set_output(rows); parser.next(); @@ -7231,7 +7270,7 @@ namespace csv { double final_score = 0; size_t header_row = 0; - // Final score is equal to the largest + // Final score is equal to the largest // row size times rows of that size for (auto& pair : row_tally) { auto row_size = pair.first; @@ -7334,7 +7373,7 @@ namespace csv { CSV_INLINE CSVFormat CSVReader::get_format() const { CSVFormat new_format = this->_format; - // Since users are normally not allowed to set + // Since users are normally not allowed to set // column names and header row simulatenously, // we will set the backing variables directly here new_format.col_names = this->col_names->get_col_names(); @@ -7365,12 +7404,12 @@ namespace csv { CSV_INLINE void CSVReader::trim_header() { if (!this->header_trimmed) { - for (int i = 0; i <= this->_format.header && !this->records.empty(); i++) { + for (int i = 0; i <= this->_format.header && !this->records->empty(); i++) { if (i == this->_format.header && this->col_names->empty()) { - this->set_col_names(this->records.pop_front()); + this->set_col_names(this->records->pop_front()); } else { - this->records.pop_front(); + this->records->pop_front(); } } @@ -7400,9 +7439,9 @@ namespace csv { */ CSV_INLINE bool CSVReader::read_csv(size_t bytes) { // Tell read_row() to listen for CSV rows - this->records.notify_all(); + this->records->notify_all(); - this->parser->set_output(this->records); + this->parser->set_output(*this->records); this->parser->next(bytes); if (!this->header_trimmed) { @@ -7410,7 +7449,7 @@ namespace csv { } // Tell read_row() to stop waiting - this->records.kill_all(); + this->records->kill_all(); return true; } @@ -7431,10 +7470,10 @@ namespace csv { */ CSV_INLINE bool CSVReader::read_row(CSVRow &row) { while (true) { - if (this->records.empty()) { - if (this->records.is_waitable()) + if (this->records->empty()) { + if (this->records->is_waitable()) // Reading thread is currently active => wait for it to populate records - this->records.wait(); + this->records->wait(); else if (this->parser->eof()) // End of file and no more records return false; @@ -7446,9 +7485,9 @@ namespace csv { this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE); } } - else if (this->records.front().size() != this->n_cols && + else if (this->records->front().size() != this->n_cols && this->_format.variable_column_policy != VariableColumnPolicy::KEEP) { - auto errored_row = this->records.pop_front(); + auto errored_row = this->records->pop_front(); if (this->_format.variable_column_policy == VariableColumnPolicy::THROW) { if (errored_row.size() < this->n_cols) @@ -7458,15 +7497,16 @@ namespace csv { } } else { - row = std::move(this->records.pop_front()); + row = std::move(this->records->pop_front()); this->_n_rows++; return true; } } - + return false; } } + /** @file * Defines an input iterator for csv::CSVReader */ @@ -7475,15 +7515,15 @@ namespace csv { namespace csv { /** Return an iterator to the first row in the reader */ CSV_INLINE CSVReader::iterator CSVReader::begin() { - if (this->records.empty()) { + if (this->records->empty()) { this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE); this->read_csv_worker.join(); // Still empty => return end iterator - if (this->records.empty()) return this->end(); + if (this->records->empty()) return this->end(); } - CSVReader::iterator ret(this, std::move(this->records.pop_front())); + CSVReader::iterator ret(this, std::move(this->records->pop_front())); return ret; } @@ -7528,6 +7568,7 @@ namespace csv { return temp; } } + /** @file * Defines the data type used for storing information about a CSV row */ @@ -8058,17 +8099,9 @@ namespace csv { return ret; } - CSV_INLINE void CSVStat::calc() { - constexpr size_t CALC_CHUNK_SIZE = 5000; - - while (true) { - /** Chunk rows */ - for (auto& row : reader) { - if (this->records.size() < CALC_CHUNK_SIZE) { - this->records.push_back(std::move(row)); - } - } - + CSV_INLINE void CSVStat::calc_chunk() { + /** Only create stats counters the first time **/ + if (dtypes.empty()) { /** Go through all records and calculate specified statistics */ for (size_t i = 0; i < this->get_col_names().size(); i++) { dtypes.push_back({}); @@ -8079,18 +8112,34 @@ namespace csv { maxes.push_back(NAN); n.push_back(0); } + } + + // Start threads + std::vector pool; + for (size_t i = 0; i < this->get_col_names().size(); i++) + pool.push_back(std::thread(&CSVStat::calc_worker, this, i)); - std::vector pool; + // Block until done + for (auto& th : pool) + th.join(); - // Start threads - for (size_t i = 0; i < this->get_col_names().size(); i++) - pool.push_back(std::thread(&CSVStat::calc_worker, this, i)); + this->records.clear(); + } - // Block until done - for (auto& th : pool) - th.join(); + CSV_INLINE void CSVStat::calc() { + constexpr size_t CALC_CHUNK_SIZE = 5000; + + for (auto& row : reader) { + this->records.push_back(std::move(row)); + + /** Chunk rows */ + if (this->records.size() == CALC_CHUNK_SIZE) { + calc_chunk(); + } + } - if (reader.eof()) break; + if (!this->records.empty()) { + calc_chunk(); } } diff --git a/single_include_test/csv.hpp b/single_include_test/csv.hpp index d4af6c41..8fb9287c 100644 --- a/single_include_test/csv.hpp +++ b/single_include_test/csv.hpp @@ -1828,12 +1828,14 @@ using shared_ummap_sink = basic_shared_mmap_sink; #include #if defined(_WIN32) -#include -#define WIN32_LEAN_AND_MEAN -#undef max -#undef min +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include +# undef max +# undef min #elif defined(__linux__) -#include +# include #endif /** Helper macro which should be #defined as "inline" @@ -6333,7 +6335,7 @@ namespace csv { std::vector get_col_names() const; int index_of(csv::string_view col_name) const; ///@} - + /** @name CSV Metadata: Attributes */ ///@{ /** Whether or not the file or stream contains valid CSV rows, @@ -6376,7 +6378,7 @@ namespace csv { std::unique_ptr parser = nullptr; /** Queue of parsed CSV rows */ - RowCollection records = RowCollection(100); + std::unique_ptr records{new RowCollection(100)}; size_t n_cols = 0; /**< The number of columns in this CSV */ size_t _n_rows = 0; /**< How many rows (minus header) have been read so far */ @@ -6406,6 +6408,7 @@ namespace csv { void trim_header(); }; } + /** @file * Calculates statistics from CSV files */ @@ -6457,6 +6460,7 @@ namespace csv { void dtype(CSVField&, const size_t&); void calc(); + void calc_chunk(); void calc_worker(const size_t&); CSVReader reader; @@ -6574,6 +6578,9 @@ namespace csv { * @tparam OutputStream The output stream, e.g. `std::ofstream`, `std::stringstream` * @tparam Delim The delimiter character * @tparam Quote The quote character + * @tparam Flush True: flush after every writing function, + * false: you need to flush explicitly if needed. + * In both cases the destructor will flush. * * @par Hint * Use the aliases csv::CSVWriter to write CSV @@ -6586,7 +6593,7 @@ namespace csv { * @par Example w/ std::tuple * @snippet test_write_csv.cpp CSV Writer Tuple Example */ - template + template class DelimWriter { public: /** Construct a DelimWriter over the specified output stream @@ -6603,6 +6610,13 @@ namespace csv { */ DelimWriter(const std::string& filename) : DelimWriter(std::ifstream(filename)) {}; + /** Destructor will flush remaining data + * + */ + ~DelimWriter() { + out.flush(); + } + /** Format a sequence of strings and write to CSV according to RFC 4180 * * @warning This does not check to make sure row lengths are consistent @@ -6618,7 +6632,7 @@ namespace csv { if (i + 1 != Size) out << Delim; } - out << std::endl; + end_out(); return *this; } @@ -6649,10 +6663,17 @@ namespace csv { i++; } - out << std::endl; + end_out(); return *this; } + /** Flushes the written data + * + */ + void flush() { + out.flush(); + } + private: template< typename T, @@ -6732,7 +6753,13 @@ namespace csv { template typename std::enable_if::type write_tuple(const std::tuple& record) { (void)record; - out << std::endl; + end_out(); + } + + /** Ends a line in 'out' and flushes, if Flush is true.*/ + void end_out() { + out << '\n'; + IF_CONSTEXPR(Flush) out.flush(); } OutputStream & out; @@ -6746,19 +6773,19 @@ namespace csv { * @note Use `csv::make_csv_writer()` to in instatiate this class over * an actual output stream. */ - template - using CSVWriter = DelimWriter; + template + using CSVWriter = DelimWriter; /** Class for writing tab-separated values files -* + * * @sa csv::DelimWriter::write_row() * @sa csv::DelimWriter::operator<<() * * @note Use `csv::make_tsv_writer()` to in instatiate this class over * an actual output stream. */ - template - using TSVWriter = DelimWriter; + template + using TSVWriter = DelimWriter; /** Return a csv::CSVWriter over the output stream */ template @@ -6766,11 +6793,23 @@ namespace csv { return CSVWriter(out, quote_minimal); } + /** Return a buffered csv::CSVWriter over the output stream (does not auto flush) */ + template + inline CSVWriter make_csv_writer_buffered(OutputStream& out, bool quote_minimal=true) { + return CSVWriter(out, quote_minimal); + } + /** Return a csv::TSVWriter over the output stream */ template inline TSVWriter make_tsv_writer(OutputStream& out, bool quote_minimal=true) { return TSVWriter(out, quote_minimal); } + + /** Return a buffered csv::TSVWriter over the output stream (does not auto flush) */ + template + inline TSVWriter make_tsv_writer_buffered(OutputStream& out, bool quote_minimal=true) { + return TSVWriter(out, quote_minimal); + } ///@} } @@ -6957,6 +6996,8 @@ namespace csv { if (this->field_length == 0) { quote_escape = true; data_pos++; + if (field_start == UNINITIALIZED_FIELD && !ws_flag(in[data_pos])) + field_start = (int)(data_pos - current_row_start()); break; } @@ -7037,6 +7078,7 @@ namespace csv { } } + namespace csv { namespace internals { CSV_INLINE std::vector ColNames::get_col_names() const { @@ -7169,8 +7211,9 @@ namespace csv { for (size_t i = 0; i < row.size(); i++) { ret << row[i]; if (i + 1 < row.size()) ret << delim; - else ret << std::endl; + else ret << '\n'; } + ret.flush(); return ret.str(); } @@ -7186,7 +7229,7 @@ namespace csv { auto trim_chars = format.get_trim_chars(); std::stringstream source(head.data()); RowCollection rows; - + StreamParser parser(source, format); parser.set_output(rows); parser.next(); @@ -7227,7 +7270,7 @@ namespace csv { double final_score = 0; size_t header_row = 0; - // Final score is equal to the largest + // Final score is equal to the largest // row size times rows of that size for (auto& pair : row_tally) { auto row_size = pair.first; @@ -7330,7 +7373,7 @@ namespace csv { CSV_INLINE CSVFormat CSVReader::get_format() const { CSVFormat new_format = this->_format; - // Since users are normally not allowed to set + // Since users are normally not allowed to set // column names and header row simulatenously, // we will set the backing variables directly here new_format.col_names = this->col_names->get_col_names(); @@ -7361,12 +7404,12 @@ namespace csv { CSV_INLINE void CSVReader::trim_header() { if (!this->header_trimmed) { - for (int i = 0; i <= this->_format.header && !this->records.empty(); i++) { + for (int i = 0; i <= this->_format.header && !this->records->empty(); i++) { if (i == this->_format.header && this->col_names->empty()) { - this->set_col_names(this->records.pop_front()); + this->set_col_names(this->records->pop_front()); } else { - this->records.pop_front(); + this->records->pop_front(); } } @@ -7396,9 +7439,9 @@ namespace csv { */ CSV_INLINE bool CSVReader::read_csv(size_t bytes) { // Tell read_row() to listen for CSV rows - this->records.notify_all(); + this->records->notify_all(); - this->parser->set_output(this->records); + this->parser->set_output(*this->records); this->parser->next(bytes); if (!this->header_trimmed) { @@ -7406,7 +7449,7 @@ namespace csv { } // Tell read_row() to stop waiting - this->records.kill_all(); + this->records->kill_all(); return true; } @@ -7427,10 +7470,10 @@ namespace csv { */ CSV_INLINE bool CSVReader::read_row(CSVRow &row) { while (true) { - if (this->records.empty()) { - if (this->records.is_waitable()) + if (this->records->empty()) { + if (this->records->is_waitable()) // Reading thread is currently active => wait for it to populate records - this->records.wait(); + this->records->wait(); else if (this->parser->eof()) // End of file and no more records return false; @@ -7442,9 +7485,9 @@ namespace csv { this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE); } } - else if (this->records.front().size() != this->n_cols && + else if (this->records->front().size() != this->n_cols && this->_format.variable_column_policy != VariableColumnPolicy::KEEP) { - auto errored_row = this->records.pop_front(); + auto errored_row = this->records->pop_front(); if (this->_format.variable_column_policy == VariableColumnPolicy::THROW) { if (errored_row.size() < this->n_cols) @@ -7454,15 +7497,16 @@ namespace csv { } } else { - row = std::move(this->records.pop_front()); + row = std::move(this->records->pop_front()); this->_n_rows++; return true; } } - + return false; } } + /** @file * Defines an input iterator for csv::CSVReader */ @@ -7471,15 +7515,15 @@ namespace csv { namespace csv { /** Return an iterator to the first row in the reader */ CSV_INLINE CSVReader::iterator CSVReader::begin() { - if (this->records.empty()) { + if (this->records->empty()) { this->read_csv_worker = std::thread(&CSVReader::read_csv, this, internals::ITERATION_CHUNK_SIZE); this->read_csv_worker.join(); // Still empty => return end iterator - if (this->records.empty()) return this->end(); + if (this->records->empty()) return this->end(); } - CSVReader::iterator ret(this, std::move(this->records.pop_front())); + CSVReader::iterator ret(this, std::move(this->records->pop_front())); return ret; } @@ -7524,6 +7568,7 @@ namespace csv { return temp; } } + /** @file * Defines the data type used for storing information about a CSV row */ @@ -8054,17 +8099,9 @@ namespace csv { return ret; } - CSV_INLINE void CSVStat::calc() { - constexpr size_t CALC_CHUNK_SIZE = 5000; - - while (true) { - /** Chunk rows */ - for (auto& row : reader) { - if (this->records.size() < CALC_CHUNK_SIZE) { - this->records.push_back(std::move(row)); - } - } - + CSV_INLINE void CSVStat::calc_chunk() { + /** Only create stats counters the first time **/ + if (dtypes.empty()) { /** Go through all records and calculate specified statistics */ for (size_t i = 0; i < this->get_col_names().size(); i++) { dtypes.push_back({}); @@ -8075,18 +8112,34 @@ namespace csv { maxes.push_back(NAN); n.push_back(0); } + } + + // Start threads + std::vector pool; + for (size_t i = 0; i < this->get_col_names().size(); i++) + pool.push_back(std::thread(&CSVStat::calc_worker, this, i)); + + // Block until done + for (auto& th : pool) + th.join(); + + this->records.clear(); + } - std::vector pool; + CSV_INLINE void CSVStat::calc() { + constexpr size_t CALC_CHUNK_SIZE = 5000; - // Start threads - for (size_t i = 0; i < this->get_col_names().size(); i++) - pool.push_back(std::thread(&CSVStat::calc_worker, this, i)); + for (auto& row : reader) { + this->records.push_back(std::move(row)); - // Block until done - for (auto& th : pool) - th.join(); + /** Chunk rows */ + if (this->records.size() == CALC_CHUNK_SIZE) { + calc_chunk(); + } + } - if (reader.eof()) break; + if (!this->records.empty()) { + calc_chunk(); } } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0868584f..df9412d1 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -10,7 +10,7 @@ target_sources(csv_test test_csv_iterator.cpp test_csv_row.cpp test_csv_row_json.cpp - #test_csv_stat.cpp + test_csv_stat.cpp test_guess_csv.cpp test_read_csv.cpp test_read_csv_file.cpp diff --git a/tests/data b/tests/data index 63e4f385..5e94ae8e 160000 --- a/tests/data +++ b/tests/data @@ -1 +1 @@ -Subproject commit 63e4f385ccd8cc7e43ba678fb712be4185ec2352 +Subproject commit 5e94ae8e97c5fd447e1c37ca12eb614e2483710e diff --git a/tests/test_csv_stat.cpp b/tests/test_csv_stat.cpp index 918c9566..0f2d621a 100644 --- a/tests/test_csv_stat.cpp +++ b/tests/test_csv_stat.cpp @@ -60,7 +60,8 @@ TEST_CASE( "Statistics - Rows of Integers", "[read_csv_stat]" ) { TEST_CASE( "Statistics - persons.csv", "[test_stat_person]" ) { CSVStat reader(PERSONS_CSV); - REQUIRE( ceil(reader.get_mean()[1]) == 42 ); + REQUIRE(reader.get_maxes()[0] == 49999); + REQUIRE( ceil(reader.get_mean()[2]) == 42 ); } TEST_CASE("Data Types - persons.csv", "test_dtypes_person]") {