Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions components/core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -704,6 +704,7 @@ set(SOURCE_FILES_unitTest
tests/test-clp_s-end_to_end.cpp
tests/test-clp_s-range_index.cpp
tests/test-clp_s-search.cpp
tests/test-clp_s-StringUtils.cpp
tests/test-EncodedVariableInterpreter.cpp
tests/test-encoding_methods.cpp
tests/test-ffi_IrUnitHandlerReq.cpp
Expand Down
6 changes: 6 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
"structurize-arrays",
po::bool_switch(&m_structurize_arrays),
"Structurize arrays instead of compressing them as clp strings."
)(
"sanitize-invalid-json",
po::bool_switch(&m_sanitize_invalid_json),
"Sanitize invalid JSON by escaping unescaped control characters (0x00-0x1F),"
" replacing invalid UTF-8 sequences with U+FFFD, and handling invalid"
" surrogate escapes. When disabled (default), parsing fails on invalid JSON."
)(
"disable-log-order",
po::bool_switch(&m_disable_log_order),
Expand Down
3 changes: 3 additions & 0 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ class CommandLineArguments {

bool get_structurize_arrays() const { return m_structurize_arrays; }

bool get_sanitize_invalid_json() const { return m_sanitize_invalid_json; }

bool get_ordered_decompression() const { return m_ordered_decompression; }

size_t get_target_ordered_chunk_size() const { return m_target_ordered_chunk_size; }
Expand Down Expand Up @@ -202,6 +204,7 @@ class CommandLineArguments {
bool m_no_retain_float_format{false};
bool m_single_file_archive{false};
bool m_structurize_arrays{false};
bool m_sanitize_invalid_json{false};
bool m_ordered_decompression{false};
size_t m_target_ordered_chunk_size{};
bool m_print_ordered_chunk_stats{false};
Expand Down
155 changes: 149 additions & 6 deletions components/core/src/clp_s/JsonFileIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,25 @@
#include <cctype>
#include <cstring>

#include <fmt/format.h>
#include <spdlog/spdlog.h>

#include "Utils.hpp"

namespace clp_s {
JsonFileIterator::JsonFileIterator(
clp::ReaderInterface& reader,
size_t max_document_size,
bool sanitize_invalid_json,
std::string path,
size_t buf_size
)
: m_buf_size(buf_size),
m_max_document_size(max_document_size),
m_buf(new char[buf_size + simdjson::SIMDJSON_PADDING]),
m_reader(reader) {
m_reader(reader),
m_path(std::move(path)),
m_sanitize_invalid_json(sanitize_invalid_json) {
read_new_json();
}

Expand Down Expand Up @@ -64,6 +71,26 @@ bool JsonFileIterator::read_new_json() {
)
.get(m_stream);

// If sanitization is enabled and we encounter errors that can be fixed by sanitization,
// sanitize the buffer and retry parsing
if (m_sanitize_invalid_json) {
// Handle invalid UTF-8 sequences by replacing with U+FFFD
if (simdjson::error_code::UTF8_ERROR == error) {
// Return value intentionally ignored - in read_new_json we always retry after
// sanitization regardless of whether changes were made
static_cast<void>(sanitize_invalid_utf8_and_log());
error = m_parser.iterate_many(m_buf, m_buf_occupied, m_buf_occupied).get(m_stream);
}

// Handle unescaped control characters by escaping them to \u00XX format
if (simdjson::error_code::UNESCAPED_CHARS == error) {
// Return value intentionally ignored - in read_new_json we always retry after
// sanitization regardless of whether changes were made
static_cast<void>(sanitize_control_chars_and_log());
error = m_parser.iterate_many(m_buf, m_buf_occupied, m_buf_occupied).get(m_stream);
}
}

if (error) {
m_error_code = error;
return false;
Expand Down Expand Up @@ -95,6 +122,90 @@ size_t JsonFileIterator::skip_whitespace_and_get_truncated_bytes() {
return m_buf_occupied - m_next_document_position;
}

bool JsonFileIterator::sanitize_invalid_utf8_and_log() {
size_t const old_buf_occupied = m_buf_occupied;
// Note: sanitize_utf8_buffer may reallocate m_buf and will update m_buf_size by reference if
// reallocation is needed. This keeps m_buf_size in sync with the actual allocated buffer size.
auto const result = StringUtils::sanitize_utf8_buffer(
m_buf,
m_buf_size,
m_buf_occupied,
simdjson::SIMDJSON_PADDING
);
m_buf_occupied = result.new_buf_occupied;
m_sanitization_bytes_added += m_buf_occupied - old_buf_occupied;

if (result.sanitized_char_counts.empty()) {
return false;
}

size_t total_replaced = 0;
for (auto const& [ch, count] : result.sanitized_char_counts) {
total_replaced += count;
}
SPDLOG_WARN(
"Replaced {} invalid UTF-8 sequence(s) with U+FFFD{}. Buffer expanded by {} bytes "
"({} -> {}).",
total_replaced,
m_path.empty() ? "" : fmt::format(" in file '{}'", m_path),
static_cast<int64_t>(m_buf_occupied) - static_cast<int64_t>(old_buf_occupied),
old_buf_occupied,
m_buf_occupied
);
return true;
}

bool JsonFileIterator::sanitize_control_chars_and_log() {
size_t const old_buf_occupied = m_buf_occupied;
// Note: sanitize_json_buffer may reallocate m_buf and will update m_buf_size by reference if
// reallocation is needed. This keeps m_buf_size in sync with the actual allocated buffer size.
auto const result = StringUtils::sanitize_json_buffer(
m_buf,
m_buf_size,
m_buf_occupied,
simdjson::SIMDJSON_PADDING
);
m_buf_occupied = result.new_buf_occupied;
m_sanitization_bytes_added += m_buf_occupied - old_buf_occupied;

if (result.sanitized_char_counts.empty()) {
return false;
}

size_t total_sanitized = 0;
std::string char_details;
for (auto const& [ch, count] : result.sanitized_char_counts) {
if (false == char_details.empty()) {
char_details += ", ";
}
char_details += fmt::format("0x{:02x} ({})", static_cast<unsigned char>(ch), count);
total_sanitized += count;
}
SPDLOG_WARN(
"Escaped {} control character(s) in JSON{}: [{}]. Buffer expanded by {} bytes "
"({} -> {}).",
total_sanitized,
m_path.empty() ? "" : fmt::format(" in file '{}'", m_path),
char_details,
static_cast<int64_t>(m_buf_occupied) - static_cast<int64_t>(old_buf_occupied),
old_buf_occupied,
m_buf_occupied
);
return true;
}

bool JsonFileIterator::reinitialize_document_stream() {
auto error = m_parser.iterate_many(m_buf, m_buf_occupied, m_buf_occupied).get(m_stream);
if (error) {
m_error_code = error;
return false;
}
m_doc_it = m_stream.begin();
m_first_doc_in_buffer = true;
m_next_document_position = 0;
return true;
}

bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& it) {
if (false == m_first_read) {
++m_doc_it;
Expand All @@ -118,6 +229,22 @@ bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& i
return true;
} else if (m_doc_it.error() == simdjson::error_code::UTF8_ERROR) {
maybe_utf8_edge_case = true;
} else if (m_sanitize_invalid_json
&& m_doc_it.error() == simdjson::error_code::UNESCAPED_CHARS)
{
// Unescaped control characters detected during document iteration. Sanitize the
// buffer and re-setup the document stream to restart from the beginning.
if (false == sanitize_control_chars_and_log()) {
// Sanitization made no changes - report the original error to avoid infinite
// loop
m_error_code = m_doc_it.error();
return false;
}

if (false == reinitialize_document_stream()) {
return false;
}
continue;
} else {
m_error_code = m_doc_it.error();
return false;
Expand All @@ -137,8 +264,22 @@ bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& i
// If we hit a UTF-8 error and either we have reached eof or the buffer occupancy is
// greater than the maximum document size we assume that the UTF-8 error must have been
// in the middle of the stream. Note: it is possible that the UTF-8 error is at the end
// of the stream and that this is actualy a truncation error. Unfortunately the only way
// to check is to parse it ourselves, so we rely on this heuristic for now.
// of the stream and that this is actually a truncation error. Unfortunately the only
// way to check is to parse it ourselves, so we rely on this heuristic for now.
if (m_sanitize_invalid_json) {
// Sanitize invalid UTF-8 sequences and retry
if (false == sanitize_invalid_utf8_and_log()) {
// Sanitization made no changes - report the original error to avoid infinite
// loop
m_error_code = simdjson::error_code::UTF8_ERROR;
return false;
}

if (false == reinitialize_document_stream()) {
return false;
}
continue;
}
m_error_code = simdjson::error_code::UTF8_ERROR;
return false;
} else if (maybe_utf8_edge_case) {
Expand All @@ -151,10 +292,12 @@ bool JsonFileIterator::get_json(simdjson::ondemand::document_stream::iterator& i
size_t JsonFileIterator::get_num_bytes_consumed() {
// If there are more documents left in the current buffer account for how much of the
// buffer has been consumed, otherwise report the total number of bytes read so that we
// capture trailing whitespace.
// capture trailing whitespace. Include bytes added by sanitization since the sanitized
// content is what gets compressed.
if (m_doc_it != m_stream.end()) {
return m_bytes_read - (m_buf_occupied - m_next_document_position);
return m_bytes_read + m_sanitization_bytes_added
- (m_buf_occupied - m_next_document_position);
}
return m_bytes_read;
return m_bytes_read + m_sanitization_bytes_added;
}
} // namespace clp_s
40 changes: 37 additions & 3 deletions components/core/src/clp_s/JsonFileIterator.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef CLP_S_JSONFILEITERATOR_HPP
#define CLP_S_JSONFILEITERATOR_HPP

#include <string>

#include <simdjson.h>

#include "../clp/ReaderInterface.hpp"
Expand All @@ -11,19 +13,23 @@ class JsonFileIterator {
/**
* An iterator over an input stream containing json objects. JSON is parsed
* using simdjson::parse_many. This allows simdjson to efficiently find
* delimeters between JSON objects, and if enabled parse JSON ahead of time
* delimiters between JSON objects, and if enabled parse JSON ahead of time
* in another thread while the JSON is being iterated over.
*
* The buffer grows automatically if there are JSON objects larger than the buffer size.
* The buffer is padded to be SIMDJSON_PADDING bytes larger than the specified size.

* @param reader the input stream containing JSON
* @param max_document_size the maximum allowed size of a single document
* @param sanitize_invalid_json whether to sanitize invalid JSON (control chars, invalid UTF-8)
* @param path optional path to the file being read (used for logging)
* @param buf_size the initial buffer size
*/
explicit JsonFileIterator(
clp::ReaderInterface& reader,
size_t max_document_size,
bool sanitize_invalid_json,
std::string path = {},
size_t buf_size = 1024 * 1024 /* 1 MiB default */
);
~JsonFileIterator();
Expand All @@ -41,9 +47,11 @@ class JsonFileIterator {
[[nodiscard]] size_t truncated_bytes() const { return m_truncated_bytes; }

/**
* @return total number of bytes read from the file
* @return total number of bytes read from the file, plus any bytes added by sanitization
*/
[[nodiscard]] size_t get_num_bytes_read() const { return m_bytes_read; }
[[nodiscard]] size_t get_num_bytes_read() const {
return m_bytes_read + m_sanitization_bytes_added;
}

/**
* Note: this method can not be const because checking if a simdjson iterator is at the end
Expand Down Expand Up @@ -73,14 +81,40 @@ class JsonFileIterator {
*/
[[nodiscard]] size_t skip_whitespace_and_get_truncated_bytes();

/**
* Sanitizes invalid UTF-8 sequences in the buffer by replacing them with U+FFFD,
* updates buffer tracking variables, and logs a warning if changes were made.
* @return true if sanitization made changes, false otherwise
* @note May reallocate m_buf if buffer expansion is needed. m_buf_size is updated accordingly.
*/
[[nodiscard]] bool sanitize_invalid_utf8_and_log();

/**
* Sanitizes unescaped control characters in the buffer by escaping them to \\u00XX format,
* updates buffer tracking variables, and logs a warning if changes were made.
* @return true if sanitization made changes, false otherwise
* @note May reallocate m_buf if buffer expansion is needed. m_buf_size is updated accordingly.
*/
[[nodiscard]] bool sanitize_control_chars_and_log();

/**
* Reinitializes the document stream after buffer sanitization.
* Resets iteration state to start from the beginning of the buffer.
* @return true on success, false if iteration setup fails (m_error_code is set on failure)
*/
[[nodiscard]] bool reinitialize_document_stream();

size_t m_truncated_bytes{0};
size_t m_next_document_position{0};
size_t m_bytes_read{0};
size_t m_sanitization_bytes_added{0};
size_t m_buf_size{0};
size_t m_buf_occupied{0};
size_t m_max_document_size{0};
char* m_buf{nullptr};
clp::ReaderInterface& m_reader;
std::string m_path;
bool m_sanitize_invalid_json{false};
simdjson::ondemand::parser m_parser;
simdjson::ondemand::document_stream m_stream;
bool m_eof{false};
Expand Down
14 changes: 8 additions & 6 deletions components/core/src/clp_s/JsonParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ JsonParser::JsonParser(JsonParserOption const& option)
m_structurize_arrays(option.structurize_arrays),
m_record_log_order(option.record_log_order),
m_retain_float_format(option.retain_float_format),
m_sanitize_invalid_json(option.sanitize_invalid_json),
m_input_paths(option.input_paths),
m_network_auth(option.network_auth) {
if (false == m_timestamp_key.empty()) {
Expand Down Expand Up @@ -223,7 +224,7 @@ void JsonParser::parse_obj_in_array(simdjson::ondemand::object line, int32_t par
}

cur_field = *object_it_stack.top();
cur_key = cur_field.unescaped_key(true);
cur_key = cur_field.unescaped_key(m_sanitize_invalid_json);
cur_value = cur_field.value();

switch (cur_value.type()) {
Expand Down Expand Up @@ -302,7 +303,7 @@ void JsonParser::parse_obj_in_array(simdjson::ondemand::object line, int32_t par
break;
}
case simdjson::ondemand::json_type::string: {
std::string_view value = cur_value.get_string(true);
std::string_view value = cur_value.get_string(m_sanitize_invalid_json);
if (value.find(' ') != std::string::npos) {
node_id = m_archive_writer
->add_node(node_id_stack.top(), NodeType::ClpString, cur_key);
Expand Down Expand Up @@ -407,7 +408,7 @@ void JsonParser::parse_array(simdjson::ondemand::array array, int32_t parent_nod
break;
}
case simdjson::ondemand::json_type::string: {
std::string_view value = cur_value.get_string(true);
std::string_view value = cur_value.get_string(m_sanitize_invalid_json);
if (value.find(' ') != std::string::npos) {
node_id = m_archive_writer->add_node(parent_node_id, NodeType::ClpString, "");
} else {
Expand Down Expand Up @@ -452,7 +453,7 @@ void JsonParser::parse_line(
do {
if (false == object_stack.empty()) {
cur_field = *object_it_stack.top();
cur_key = cur_field.unescaped_key(true);
cur_key = cur_field.unescaped_key(m_sanitize_invalid_json);
line = cur_field.value();
}

Expand Down Expand Up @@ -555,7 +556,7 @@ void JsonParser::parse_line(
break;
}
case simdjson::ondemand::json_type::string: {
std::string_view value = line.get_string(true);
std::string_view value = line.get_string(m_sanitize_invalid_json);
auto const matches_timestamp
= m_archive_writer->matches_timestamp(node_id_stack.top(), cur_key);
if (matches_timestamp) {
Expand Down Expand Up @@ -668,7 +669,8 @@ auto JsonParser::ingest_json(
Path const& path,
std::string const& archive_creator_id
) -> bool {
JsonFileIterator json_file_iterator(*reader, m_max_document_size);
JsonFileIterator
json_file_iterator(*reader, m_max_document_size, m_sanitize_invalid_json, path.path);
if (simdjson::error_code::SUCCESS != json_file_iterator.get_error()) {
SPDLOG_ERROR(
"Encountered error - {} - while trying to parse {} after parsing 0 bytes",
Expand Down
Loading
Loading