diff --git a/CMakeLists.txt b/CMakeLists.txt index 6057fddf8..8b63200ce 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -209,6 +209,35 @@ install(TARGETS ${PROJECT_NAME} EXPORT ${PROJECT_NAME}_targets DESTINATION "${OSI_INSTALL_LIB_DIR}" COMPONENT lib) + +##### TODO: WORK IN PROGRESS START + +# mcap support +set(TRACEFILE_MCAP_SOURCES mcap/osi_tracefile_mcap_reader.cpp mcap/osi_tracefile_mcap_writer.cpp) +add_library(${PROJECT_NAME}_mcap_static STATIC ${TRACEFILE_MCAP_SOURCES} mcap/include/osi_tracefile_mcap_reader.h mcap/include/osi_tracefile_mcap_writer.h) +add_library(${PROJECT_NAME}::${PROJECT_NAME}_mcap_static ALIAS ${PROJECT_NAME}_mcap_static) +add_dependencies(${PROJECT_NAME}_mcap_static ${PROJECT_NAME}) +target_include_directories(${PROJECT_NAME}_mcap_static + PUBLIC + $ + $ + PRIVATE + mcap/mcap/include +) +set_target_properties(${PROJECT_NAME}_mcap_static PROPERTIES CXX_STANDARD 17) + +find_package(PkgConfig REQUIRED) +pkg_check_modules(lz4 REQUIRED IMPORTED_TARGET liblz4) +pkg_check_modules(ZSTD REQUIRED libzstd) +target_link_libraries(${PROJECT_NAME}_mcap_static PUBLIC ${lz4_LIBRARIES} ${ZSTD_LIBRARIES} ${PROJECT_NAME}_pic) + +install(TARGETS ${PROJECT_NAME}_mcap_static + EXPORT ${PROJECT_NAME}_targets + ARCHIVE DESTINATION "${OSI_INSTALL_LIB_DIR}" COMPONENT lib + INCLUDES DESTINATION "${OSI_INSTALL_INCLUDE_DIR}") + +##### TODO: WORK IN PROGRESS END + # Copy proto headers to where they are expected by the package config file add_custom_command( TARGET ${PROJECT_NAME} POST_BUILD @@ -246,7 +275,7 @@ install(FILES COMPONENT dev) # Header files -install(FILES ${PROTO_HEADERS} ${FLAT_HEADERS} +install(FILES ${PROTO_HEADERS} ${FLAT_HEADERS} mcap/include/osi_tracefile_mcap_reader.h mcap/include/osi_tracefile_mcap_writer.h DESTINATION "${OSI_INSTALL_INCLUDE_DIR}") # Install the export set for use with the install-tree diff --git a/mcap/README.MD b/mcap/README.MD new file mode 100644 index 000000000..0d595ff37 --- /dev/null +++ b/mcap/README.MD @@ -0,0 +1,6 @@ +# Mcap Tracefile Support +This folder contains the official upstream MCAP c++ library as well as convince classes for c++ OSI trace file writer and reader support. + +# Work in Progress +- This is a work in progress and just a proof of concept. +- Major cleaning and refactoring is needed. diff --git a/mcap/include/osi_tracefile_mcap_reader.h b/mcap/include/osi_tracefile_mcap_reader.h new file mode 100644 index 000000000..8f1c20f26 --- /dev/null +++ b/mcap/include/osi_tracefile_mcap_reader.h @@ -0,0 +1,24 @@ +#ifndef MCAP_TRACEFILE_H +#define MCAP_TRACEFILE_H +#include <../mcap/include/mcap/mcap.hpp> +#include + +namespace osi3 +{ +class TracefileMcapReader { +public: + explicit TracefileMcapReader(std::filesystem::path mcap_file); + ~TracefileMcapReader() { reader_.close(); }; + bool has_next() const; + std::pair read_next(); +private: + mcap::McapReader reader_; + std::unique_ptr message_view_ = nullptr; + std::optional message_it_; +}; + + +} // namespace osi3 + + +#endif //MCAP_TRACEFILE_H diff --git a/mcap/include/osi_tracefile_mcap_writer.h b/mcap/include/osi_tracefile_mcap_writer.h new file mode 100644 index 000000000..3614fe3f9 --- /dev/null +++ b/mcap/include/osi_tracefile_mcap_writer.h @@ -0,0 +1,27 @@ +#ifndef OSI_TRACEFILE_MCAP_WRITER_H +#define OSI_TRACEFILE_MCAP_WRITER_H +#include <../mcap/include/mcap/mcap.hpp> +#include + +namespace osi3 +{ +class TracefileMcapWriter { +public: + explicit TracefileMcapWriter(std::filesystem::path mcap_file); + ~TracefileMcapWriter() { writer_.close(); }; + template void write(T top_level_message); + void close() { writer_.close(); } + +private: + void addOSITopLevelMessagesAsSchemata(); + void addCommonMetadata(); + mcap::McapWriter writer_; + std::map name_to_schema_ids_; + +}; + + +} // namespace osi3 + + +#endif //OSI_TRACEFILE_MCAP_WRITER_H diff --git a/mcap/mcap/LICENSE b/mcap/mcap/LICENSE new file mode 100644 index 000000000..0aa593ea8 --- /dev/null +++ b/mcap/mcap/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) Foxglove Technologies Inc + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/mcap/mcap/conanfile.py b/mcap/mcap/conanfile.py new file mode 100644 index 000000000..dc9bc2715 --- /dev/null +++ b/mcap/mcap/conanfile.py @@ -0,0 +1,28 @@ +from conans import ConanFile, tools + + +class McapConan(ConanFile): + name = "mcap" + version = "1.4.0" + url = "https://github.com/foxglove/mcap" + homepage = "https://github.com/foxglove/mcap" + description = "A C++ implementation of the MCAP file format" + license = "MIT" + topics = ("mcap", "serialization", "deserialization", "recording") + + settings = ("os", "compiler", "build_type", "arch") + requires = ("lz4/1.9.4", "zstd/1.5.2") + generators = "cmake" + + def validate(self): + tools.check_min_cppstd(self, "17") + + def configure(self): + pass + + def package(self): + self.copy(pattern="LICENSE", dst="licenses") + self.copy("include/*") + + def package_id(self): + self.info.header_only() diff --git a/mcap/mcap/include/mcap/crc32.hpp b/mcap/mcap/include/mcap/crc32.hpp new file mode 100644 index 000000000..7eff3f7cb --- /dev/null +++ b/mcap/mcap/include/mcap/crc32.hpp @@ -0,0 +1,108 @@ +#include +#include +#include + +namespace mcap::internal { + +/** + * Compute CRC32 lookup tables as described at: + * https://github.com/komrad36/CRC#option-6-1-byte-tabular + * + * An iteration of CRC computation can be performed on 8 bits of input at once. By pre-computing a + * table of the values of CRC(?) for all 2^8 = 256 possible byte values, during the final + * computation we can replace a loop over 8 bits with a single lookup in the table. + * + * For further speedup, we can also pre-compute the values of CRC(?0) for all possible bytes when a + * zero byte is appended. Then we can process two bytes of input at once by computing CRC(AB) = + * CRC(A0) ^ CRC(B), using one lookup in the CRC(?0) table and one lookup in the CRC(?) table. + * + * The same technique applies for any number of bytes to be processed at once, although the speed + * improvements diminish. + * + * @param Polynomial The binary representation of the polynomial to use (reversed, i.e. most + * significant bit represents x^0). + * @param NumTables The number of bytes of input that will be processed at once. + */ +template +struct CRC32Table { +private: + std::array table = {}; + +public: + constexpr CRC32Table() { + for (uint32_t i = 0; i < 256; i++) { + uint32_t r = i; + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + r = ((r & 1) * Polynomial) ^ (r >> 1); + table[i] = r; + } + for (size_t i = 256; i < table.size(); i++) { + uint32_t value = table[i - 256]; + table[i] = table[value & 0xff] ^ (value >> 8); + } + } + + constexpr uint32_t operator[](size_t index) const { + return table[index]; + } +}; + +inline uint32_t getUint32LE(const std::byte* data) { + return (uint32_t(data[0]) << 0) | (uint32_t(data[1]) << 8) | (uint32_t(data[2]) << 16) | + (uint32_t(data[3]) << 24); +} + +static constexpr CRC32Table<0xedb88320, 8> CRC32_TABLE; + +/** + * Initialize a CRC32 to all 1 bits. + */ +static constexpr uint32_t CRC32_INIT = 0xffffffff; + +/** + * Update a streaming CRC32 calculation. + * + * For performance, this implementation processes the data 8 bytes at a time, using the algorithm + * presented at: https://github.com/komrad36/CRC#option-9-8-byte-tabular + */ +inline uint32_t crc32Update(const uint32_t prev, const std::byte* const data, const size_t length) { + // Process bytes one by one until we reach the proper alignment. + uint32_t r = prev; + size_t offset = 0; + for (; (uintptr_t(data + offset) & alignof(uint32_t)) != 0 && offset < length; offset++) { + r = CRC32_TABLE[(r ^ uint8_t(data[offset])) & 0xff] ^ (r >> 8); + } + if (offset == length) { + return r; + } + + // Process 8 bytes (2 uint32s) at a time. + size_t remainingBytes = length - offset; + for (; remainingBytes >= 8; offset += 8, remainingBytes -= 8) { + r ^= getUint32LE(data + offset); + uint32_t r2 = getUint32LE(data + offset + 4); + r = CRC32_TABLE[0 * 256 + ((r2 >> 24) & 0xff)] ^ CRC32_TABLE[1 * 256 + ((r2 >> 16) & 0xff)] ^ + CRC32_TABLE[2 * 256 + ((r2 >> 8) & 0xff)] ^ CRC32_TABLE[3 * 256 + ((r2 >> 0) & 0xff)] ^ + CRC32_TABLE[4 * 256 + ((r >> 24) & 0xff)] ^ CRC32_TABLE[5 * 256 + ((r >> 16) & 0xff)] ^ + CRC32_TABLE[6 * 256 + ((r >> 8) & 0xff)] ^ CRC32_TABLE[7 * 256 + ((r >> 0) & 0xff)]; + } + + // Process any remaining bytes one by one. + for (; offset < length; offset++) { + r = CRC32_TABLE[(r ^ uint8_t(data[offset])) & 0xff] ^ (r >> 8); + } + return r; +} + +/** Finalize a CRC32 by inverting the output value. */ +inline uint32_t crc32Final(uint32_t crc) { + return crc ^ 0xffffffff; +} + +} // namespace mcap::internal diff --git a/mcap/mcap/include/mcap/errors.hpp b/mcap/mcap/include/mcap/errors.hpp new file mode 100644 index 000000000..c25c39a0a --- /dev/null +++ b/mcap/mcap/include/mcap/errors.hpp @@ -0,0 +1,120 @@ +#pragma once + +#include + +namespace mcap { + +/** + * @brief Status codes for MCAP readers and writers. + */ +enum class StatusCode { + Success = 0, + NotOpen, + InvalidSchemaId, + InvalidChannelId, + FileTooSmall, + ReadFailed, + MagicMismatch, + InvalidFile, + InvalidRecord, + InvalidOpCode, + InvalidChunkOffset, + InvalidFooter, + DecompressionFailed, + DecompressionSizeMismatch, + UnrecognizedCompression, + OpenFailed, + MissingStatistics, + InvalidMessageReadOptions, + NoMessageIndexesAvailable, + UnsupportedCompression, +}; + +/** + * @brief Wraps a status code and string message carrying additional context. + */ +struct [[nodiscard]] Status { + StatusCode code; + std::string message; + + Status() + : code(StatusCode::Success) {} + + Status(StatusCode code) + : code(code) { + switch (code) { + case StatusCode::Success: + break; + case StatusCode::NotOpen: + message = "not open"; + break; + case StatusCode::InvalidSchemaId: + message = "invalid schema id"; + break; + case StatusCode::InvalidChannelId: + message = "invalid channel id"; + break; + case StatusCode::FileTooSmall: + message = "file too small"; + break; + case StatusCode::ReadFailed: + message = "read failed"; + break; + case StatusCode::MagicMismatch: + message = "magic mismatch"; + break; + case StatusCode::InvalidFile: + message = "invalid file"; + break; + case StatusCode::InvalidRecord: + message = "invalid record"; + break; + case StatusCode::InvalidOpCode: + message = "invalid opcode"; + break; + case StatusCode::InvalidChunkOffset: + message = "invalid chunk offset"; + break; + case StatusCode::InvalidFooter: + message = "invalid footer"; + break; + case StatusCode::DecompressionFailed: + message = "decompression failed"; + break; + case StatusCode::DecompressionSizeMismatch: + message = "decompression size mismatch"; + break; + case StatusCode::UnrecognizedCompression: + message = "unrecognized compression"; + break; + case StatusCode::OpenFailed: + message = "open failed"; + break; + case StatusCode::MissingStatistics: + message = "missing statistics"; + break; + case StatusCode::InvalidMessageReadOptions: + message = "message read options conflict"; + break; + case StatusCode::NoMessageIndexesAvailable: + message = "file has no message indices"; + break; + case StatusCode::UnsupportedCompression: + message = "unsupported compression"; + break; + default: + message = "unknown"; + break; + } + } + + Status(StatusCode code, const std::string& message) + : code(code) + , message(message) {} + + bool ok() const { + return code == StatusCode::Success; + } +}; + +} // namespace mcap diff --git a/mcap/mcap/include/mcap/internal.hpp b/mcap/mcap/include/mcap/internal.hpp new file mode 100644 index 000000000..4faedd0b2 --- /dev/null +++ b/mcap/mcap/include/mcap/internal.hpp @@ -0,0 +1,189 @@ +#pragma once + +#include "types.hpp" +#include + +// Do not compile on systems with non-8-bit bytes +static_assert(std::numeric_limits::digits == 8); + +namespace mcap { + +namespace internal { + +constexpr uint64_t MinHeaderLength = /* magic bytes */ sizeof(Magic) + + /* opcode */ 1 + + /* record length */ 8 + + /* profile length */ 4 + + /* library length */ 4; +constexpr uint64_t FooterLength = /* opcode */ 1 + + /* record length */ 8 + + /* summary start */ 8 + + /* summary offset start */ 8 + + /* summary crc */ 4 + + /* magic bytes */ sizeof(Magic); + +inline std::string ToHex(uint8_t byte) { + std::string result{2, '\0'}; + result[0] = "0123456789ABCDEF"[(uint8_t(byte) >> 4) & 0x0F]; + result[1] = "0123456789ABCDEF"[uint8_t(byte) & 0x0F]; + return result; +} +inline std::string ToHex(std::byte byte) { + return ToHex(uint8_t(byte)); +} + +inline std::string to_string(const std::string& arg) { + return arg; +} +inline std::string to_string(std::string_view arg) { + return std::string(arg); +} +inline std::string to_string(const char* arg) { + return std::string(arg); +} +template +[[nodiscard]] inline std::string StrCat(T&&... args) { + using mcap::internal::to_string; + using std::to_string; + return ("" + ... + to_string(std::forward(args))); +} + +inline uint32_t KeyValueMapSize(const KeyValueMap& map) { + size_t size = 0; + for (const auto& [key, value] : map) { + size += 4 + key.size() + 4 + value.size(); + } + return (uint32_t)(size); +} + +inline const std::string CompressionString(Compression compression) { + switch (compression) { + case Compression::None: + default: + return std::string{}; + case Compression::Lz4: + return "lz4"; + case Compression::Zstd: + return "zstd"; + } +} + +inline uint16_t ParseUint16(const std::byte* data) { + return uint16_t(data[0]) | (uint16_t(data[1]) << 8); +} + +inline uint32_t ParseUint32(const std::byte* data) { + return uint32_t(data[0]) | (uint32_t(data[1]) << 8) | (uint32_t(data[2]) << 16) | + (uint32_t(data[3]) << 24); +} + +inline Status ParseUint32(const std::byte* data, uint64_t maxSize, uint32_t* output) { + if (maxSize < 4) { + const auto msg = StrCat("cannot read uint32 from ", maxSize, " bytes"); + return Status{StatusCode::InvalidRecord, msg}; + } + *output = ParseUint32(data); + return StatusCode::Success; +} + +inline uint64_t ParseUint64(const std::byte* data) { + return uint64_t(data[0]) | (uint64_t(data[1]) << 8) | (uint64_t(data[2]) << 16) | + (uint64_t(data[3]) << 24) | (uint64_t(data[4]) << 32) | (uint64_t(data[5]) << 40) | + (uint64_t(data[6]) << 48) | (uint64_t(data[7]) << 56); +} + +inline Status ParseUint64(const std::byte* data, uint64_t maxSize, uint64_t* output) { + if (maxSize < 8) { + const auto msg = StrCat("cannot read uint64 from ", maxSize, " bytes"); + return Status{StatusCode::InvalidRecord, msg}; + } + *output = ParseUint64(data); + return StatusCode::Success; +} + +inline Status ParseStringView(const std::byte* data, uint64_t maxSize, std::string_view* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + const auto msg = StrCat("cannot read string size: ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("string size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + *output = std::string_view(reinterpret_cast(data + 4), size); + return StatusCode::Success; +} + +inline Status ParseString(const std::byte* data, uint64_t maxSize, std::string* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + return status; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("string size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + *output = std::string(reinterpret_cast(data + 4), size); + return StatusCode::Success; +} + +inline Status ParseByteArray(const std::byte* data, uint64_t maxSize, ByteArray* output) { + uint32_t size = 0; + if (auto status = ParseUint32(data, maxSize, &size); !status.ok()) { + return status; + } + if (uint64_t(size) > (maxSize - 4)) { + const auto msg = StrCat("byte array size ", size, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + output->resize(size); + std::memcpy(output->data(), data + 4, size); + return StatusCode::Success; +} + +inline Status ParseKeyValueMap(const std::byte* data, uint64_t maxSize, KeyValueMap* output) { + uint32_t sizeInBytes = 0; + if (auto status = ParseUint32(data, maxSize, &sizeInBytes); !status.ok()) { + return status; + } + if (sizeInBytes > (maxSize - 4)) { + const auto msg = + StrCat("key-value map size ", sizeInBytes, " exceeds remaining bytes ", (maxSize - 4)); + return Status(StatusCode::InvalidRecord, msg); + } + + // Account for the byte size prefix in sizeInBytes to make the bounds checking + // below simpler + sizeInBytes += 4; + + output->clear(); + uint64_t pos = 4; + while (pos < sizeInBytes) { + std::string_view key; + if (auto status = ParseStringView(data + pos, sizeInBytes - pos, &key); !status.ok()) { + const auto msg = StrCat("cannot read key-value map key at pos ", pos, ": ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + pos += 4 + key.size(); + std::string_view value; + if (auto status = ParseStringView(data + pos, sizeInBytes - pos, &value); !status.ok()) { + const auto msg = StrCat("cannot read key-value map value for key \"", key, "\" at pos ", pos, + ": ", status.message); + return Status{StatusCode::InvalidRecord, msg}; + } + pos += 4 + value.size(); + output->emplace(key, value); + } + return StatusCode::Success; +} + +inline std::string MagicToHex(const std::byte* data) { + return internal::ToHex(data[0]) + internal::ToHex(data[1]) + internal::ToHex(data[2]) + + internal::ToHex(data[3]) + internal::ToHex(data[4]) + internal::ToHex(data[5]) + + internal::ToHex(data[6]) + internal::ToHex(data[7]); +} + +} // namespace internal + +} // namespace mcap diff --git a/mcap/mcap/include/mcap/intervaltree.hpp b/mcap/mcap/include/mcap/intervaltree.hpp new file mode 100644 index 000000000..b9c2b6ea4 --- /dev/null +++ b/mcap/mcap/include/mcap/intervaltree.hpp @@ -0,0 +1,303 @@ +// Adapted from + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace mcap::internal { + +template +class Interval { +public: + Scalar start; + Scalar stop; + Value value; + Interval(const Scalar& s, const Scalar& e, const Value& v) + : start(std::min(s, e)) + , stop(std::max(s, e)) + , value(v) {} +}; + +template +Value intervalStart(const Interval& i) { + return i.start; +} + +template +Value intervalStop(const Interval& i) { + return i.stop; +} + +template +std::ostream& operator<<(std::ostream& out, const Interval& i) { + out << "Interval(" << i.start << ", " << i.stop << "): " << i.value; + return out; +} + +template +class IntervalTree { +public: + using interval = Interval; + using interval_vector = std::vector; + + struct IntervalStartCmp { + bool operator()(const interval& a, const interval& b) { + return a.start < b.start; + } + }; + + struct IntervalStopCmp { + bool operator()(const interval& a, const interval& b) { + return a.stop < b.stop; + } + }; + + IntervalTree() + : left(nullptr) + , right(nullptr) + , center(Scalar(0)) {} + + ~IntervalTree() = default; + + std::unique_ptr clone() const { + return std::unique_ptr(new IntervalTree(*this)); + } + + IntervalTree(const IntervalTree& other) + : intervals(other.intervals) + , left(other.left ? other.left->clone() : nullptr) + , right(other.right ? other.right->clone() : nullptr) + , center(other.center) {} + + IntervalTree& operator=(IntervalTree&&) = default; + IntervalTree(IntervalTree&&) = default; + + IntervalTree& operator=(const IntervalTree& other) { + center = other.center; + intervals = other.intervals; + left = other.left ? other.left->clone() : nullptr; + right = other.right ? other.right->clone() : nullptr; + return *this; + } + + IntervalTree(interval_vector&& ivals, std::size_t depth = 16, std::size_t minbucket = 64, + std::size_t maxbucket = 512, Scalar leftextent = 0, Scalar rightextent = 0) + : left(nullptr) + , right(nullptr) { + --depth; + const auto minmaxStop = std::minmax_element(ivals.begin(), ivals.end(), IntervalStopCmp()); + const auto minmaxStart = std::minmax_element(ivals.begin(), ivals.end(), IntervalStartCmp()); + if (!ivals.empty()) { + center = (minmaxStart.first->start + minmaxStop.second->stop) / 2; + } + if (leftextent == 0 && rightextent == 0) { + // sort intervals by start + std::sort(ivals.begin(), ivals.end(), IntervalStartCmp()); + } else { + assert(std::is_sorted(ivals.begin(), ivals.end(), IntervalStartCmp())); + } + if (depth == 0 || (ivals.size() < minbucket && ivals.size() < maxbucket)) { + std::sort(ivals.begin(), ivals.end(), IntervalStartCmp()); + intervals = std::move(ivals); + assert(is_valid().first); + return; + } else { + Scalar leftp = 0; + Scalar rightp = 0; + + if (leftextent || rightextent) { + leftp = leftextent; + rightp = rightextent; + } else { + leftp = ivals.front().start; + rightp = std::max_element(ivals.begin(), ivals.end(), IntervalStopCmp())->stop; + } + + interval_vector lefts; + interval_vector rights; + + for (typename interval_vector::const_iterator i = ivals.begin(); i != ivals.end(); ++i) { + const interval& interval = *i; + if (interval.stop < center) { + lefts.push_back(interval); + } else if (interval.start > center) { + rights.push_back(interval); + } else { + assert(interval.start <= center); + assert(center <= interval.stop); + intervals.push_back(interval); + } + } + + if (!lefts.empty()) { + left.reset(new IntervalTree(std::move(lefts), depth, minbucket, maxbucket, leftp, center)); + } + if (!rights.empty()) { + right.reset( + new IntervalTree(std::move(rights), depth, minbucket, maxbucket, center, rightp)); + } + } + assert(is_valid().first); + } + + // Call f on all intervals near the range [start, stop]: + template + void visit_near(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + if (!intervals.empty() && !(stop < intervals.front().start)) { + for (auto& i : intervals) { + f(i); + } + } + if (left && start <= center) { + left->visit_near(start, stop, f); + } + if (right && stop >= center) { + right->visit_near(start, stop, f); + } + } + + // Call f on all intervals crossing pos + template + void visit_overlapping(const Scalar& pos, UnaryFunction f) const { + visit_overlapping(pos, pos, f); + } + + // Call f on all intervals overlapping [start, stop] + template + void visit_overlapping(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + auto filterF = [&](const interval& interval) { + if (interval.stop >= start && interval.start <= stop) { + // Only apply f if overlapping + f(interval); + } + }; + visit_near(start, stop, filterF); + } + + // Call f on all intervals contained within [start, stop] + template + void visit_contained(const Scalar& start, const Scalar& stop, UnaryFunction f) const { + auto filterF = [&](const interval& interval) { + if (start <= interval.start && interval.stop <= stop) { + f(interval); + } + }; + visit_near(start, stop, filterF); + } + + interval_vector find_overlapping(const Scalar& start, const Scalar& stop) const { + interval_vector result; + visit_overlapping(start, stop, [&](const interval& interval) { + result.emplace_back(interval); + }); + return result; + } + + interval_vector find_contained(const Scalar& start, const Scalar& stop) const { + interval_vector result; + visit_contained(start, stop, [&](const interval& interval) { + result.push_back(interval); + }); + return result; + } + + bool empty() const { + if (left && !left->empty()) { + return false; + } + if (!intervals.empty()) { + return false; + } + if (right && !right->empty()) { + return false; + } + return true; + } + + template + void visit_all(UnaryFunction f) const { + if (left) { + left->visit_all(f); + } + std::for_each(intervals.begin(), intervals.end(), f); + if (right) { + right->visit_all(f); + } + } + + std::pair extent() const { + struct Extent { + std::pair x{std::numeric_limits::max(), + std::numeric_limits::min()}; + void operator()(const interval& interval) { + x.first = std::min(x.first, interval.start); + x.second = std::max(x.second, interval.stop); + } + }; + Extent extent; + + visit_all([&](const interval& interval) { + extent(interval); + }); + return extent.x; + } + + // Check all constraints. + // If first is false, second is invalid. + std::pair> is_valid() const { + const auto minmaxStop = + std::minmax_element(intervals.begin(), intervals.end(), IntervalStopCmp()); + const auto minmaxStart = + std::minmax_element(intervals.begin(), intervals.end(), IntervalStartCmp()); + + std::pair> result = { + true, {std::numeric_limits::max(), std::numeric_limits::min()}}; + if (!intervals.empty()) { + result.second.first = std::min(result.second.first, minmaxStart.first->start); + result.second.second = std::min(result.second.second, minmaxStop.second->stop); + } + if (left) { + auto valid = left->is_valid(); + result.first &= valid.first; + result.second.first = std::min(result.second.first, valid.second.first); + result.second.second = std::min(result.second.second, valid.second.second); + if (!result.first) { + return result; + } + if (valid.second.second >= center) { + result.first = false; + return result; + } + } + if (right) { + auto valid = right->is_valid(); + result.first &= valid.first; + result.second.first = std::min(result.second.first, valid.second.first); + result.second.second = std::min(result.second.second, valid.second.second); + if (!result.first) { + return result; + } + if (valid.second.first <= center) { + result.first = false; + return result; + } + } + if (!std::is_sorted(intervals.begin(), intervals.end(), IntervalStartCmp())) { + result.first = false; + } + return result; + } + +private: + interval_vector intervals; + std::unique_ptr left; + std::unique_ptr right; + Scalar center; +}; + +} // namespace mcap::internal diff --git a/mcap/mcap/include/mcap/mcap.hpp b/mcap/mcap/include/mcap/mcap.hpp new file mode 100644 index 000000000..71f479cd8 --- /dev/null +++ b/mcap/mcap/include/mcap/mcap.hpp @@ -0,0 +1,4 @@ +#pragma once + +#include "reader.hpp" +#include "writer.hpp" diff --git a/mcap/mcap/include/mcap/read_job_queue.hpp b/mcap/mcap/include/mcap/read_job_queue.hpp new file mode 100644 index 000000000..7faf4468a --- /dev/null +++ b/mcap/mcap/include/mcap/read_job_queue.hpp @@ -0,0 +1,147 @@ +#pragma once + +#include "types.hpp" +#include +#include + +namespace mcap::internal { + +// Helper for writing compile-time exhaustive variant visitors. +template +inline constexpr bool always_false_v = false; + +/** + * @brief A job to read a specific message at offset `offset` from the decompressed chunk + * stored in `chunkReaderIndex`. A timestamp is provided to order this job relative to other jobs. + */ +struct ReadMessageJob { + Timestamp timestamp; + RecordOffset offset; + size_t chunkReaderIndex; +}; + +/** + * @brief A job to decompress the chunk starting at `chunkStartOffset`. The message indices + * starting directly after the chunk record and ending at `messageIndexEndOffset` will be used to + * find specific messages within the chunk. + */ +struct DecompressChunkJob { + Timestamp messageStartTime; + Timestamp messageEndTime; + ByteOffset chunkStartOffset; + ByteOffset messageIndexEndOffset; +}; + +/** + * @brief A union of jobs that an indexed MCAP reader executes. + */ +using ReadJob = std::variant; + +/** + * @brief A priority queue of jobs for an indexed MCAP reader to execute. + */ +struct ReadJobQueue { +private: + bool reverse_ = false; + std::vector heap_; + + /** + * @brief return the timestamp key that should be used to compare jobs. + */ + static Timestamp TimeComparisonKey(const ReadJob& job, bool reverse) { + Timestamp result = 0; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + result = arg.timestamp; + } else if constexpr (std::is_same_v) { + if (reverse) { + result = arg.messageEndTime; + } else { + result = arg.messageStartTime; + } + } else { + static_assert(always_false_v, "non-exhaustive visitor!"); + } + }, + job); + return result; + } + static RecordOffset PositionComparisonKey(const ReadJob& job, bool reverse) { + RecordOffset result; + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + result = arg.offset; + } else if constexpr (std::is_same_v) { + if (reverse) { + result.offset = arg.messageIndexEndOffset; + } else { + result.offset = arg.chunkStartOffset; + } + } else { + static_assert(always_false_v, "non-exhaustive visitor!"); + } + }, + job); + return result; + } + + static bool CompareForward(const ReadJob& a, const ReadJob& b) { + auto aTimestamp = TimeComparisonKey(a, false); + auto bTimestamp = TimeComparisonKey(b, false); + if (aTimestamp == bTimestamp) { + return PositionComparisonKey(a, false) > PositionComparisonKey(b, false); + } + return aTimestamp > bTimestamp; + } + + static bool CompareReverse(const ReadJob& a, const ReadJob& b) { + auto aTimestamp = TimeComparisonKey(a, true); + auto bTimestamp = TimeComparisonKey(b, true); + if (aTimestamp == bTimestamp) { + return PositionComparisonKey(a, true) < PositionComparisonKey(b, true); + } + return aTimestamp < bTimestamp; + } + +public: + explicit ReadJobQueue(bool reverse) + : reverse_(reverse) {} + void push(DecompressChunkJob&& decompressChunkJob) { + heap_.emplace_back(std::move(decompressChunkJob)); + if (!reverse_) { + std::push_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::push_heap(heap_.begin(), heap_.end(), CompareReverse); + } + } + + void push(ReadMessageJob&& readMessageJob) { + heap_.emplace_back(std::move(readMessageJob)); + if (!reverse_) { + std::push_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::push_heap(heap_.begin(), heap_.end(), CompareReverse); + } + } + + ReadJob pop() { + if (!reverse_) { + std::pop_heap(heap_.begin(), heap_.end(), CompareForward); + } else { + std::pop_heap(heap_.begin(), heap_.end(), CompareReverse); + } + auto popped = heap_.back(); + heap_.pop_back(); + return popped; + } + + size_t len() const { + return heap_.size(); + } +}; + +} // namespace mcap::internal diff --git a/mcap/mcap/include/mcap/reader.hpp b/mcap/mcap/include/mcap/reader.hpp new file mode 100644 index 000000000..907f9450e --- /dev/null +++ b/mcap/mcap/include/mcap/reader.hpp @@ -0,0 +1,733 @@ +#pragma once + +#include "intervaltree.hpp" +#include "read_job_queue.hpp" +#include "types.hpp" +#include "visibility.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace mcap { + +enum struct ReadSummaryMethod { + /** + * @brief Parse the Summary section to produce seeking indexes and summary + * statistics. If the Summary section is not present or corrupt, a failure + * Status is returned and the seeking indexes and summary statistics are not + * populated. + */ + NoFallbackScan, + /** + * @brief If the Summary section is missing or incomplete, allow falling back + * to reading the file sequentially to produce seeking indexes and summary + * statistics. + */ + AllowFallbackScan, + /** + * @brief Read the file sequentially from Header to DataEnd to produce seeking + * indexes and summary statistics. + */ + ForceScan, +}; + +/** + * @brief An abstract interface for reading MCAP data. + */ +struct MCAP_PUBLIC IReadable { + virtual ~IReadable() = default; + + /** + * @brief Returns the size of the file in bytes. + * + * @return uint64_t The total number of bytes in the MCAP file. + */ + virtual uint64_t size() const = 0; + /** + * @brief This method is called by MCAP reader classes when they need to read + * a portion of the file. + * + * @param output A pointer to a pointer to the buffer to write to. This method + * is expected to either maintain an internal buffer, read data into it, and + * update this pointer to point at the internal buffer, or update this + * pointer to point directly at the source data if possible. The pointer and + * data must remain valid and unmodified until the next call to read(). + * @param offset The offset in bytes from the beginning of the file to read. + * @param size The number of bytes to read. + * @return uint64_t Number of bytes actually read. This may be less than the + * requested size if the end of the file is reached. The output pointer must + * be readable from `output` to `output + size`. If the read fails, this + * method should return 0. + */ + virtual uint64_t read(std::byte** output, uint64_t offset, uint64_t size) = 0; +}; + +/** + * @brief IReadable implementation wrapping a FILE* pointer created by fopen() + * and a read buffer. + */ +class MCAP_PUBLIC FileReader final : public IReadable { +public: + FileReader(std::FILE* file); + + uint64_t size() const override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + +private: + std::FILE* file_; + std::vector buffer_; + uint64_t size_; + uint64_t position_; +}; + +/** + * @brief IReadable implementation wrapping a std::ifstream input file stream. + */ +class MCAP_PUBLIC FileStreamReader final : public IReadable { +public: + FileStreamReader(std::ifstream& stream); + + uint64_t size() const override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + +private: + std::ifstream& stream_; + std::vector buffer_; + uint64_t size_; + uint64_t position_; +}; + +/** + * @brief An abstract interface for compressed readers. + */ +class MCAP_PUBLIC ICompressedReader : public IReadable { +public: + virtual ~ICompressedReader() override = default; + + /** + * @brief Reset the reader state, clearing any internal buffers and state, and + * initialize with new compressed data. + * + * @param data Compressed data to read from. + * @param size Size of the compressed data in bytes. + * @param uncompressedSize Size of the data in bytes after decompression. A + * buffer of this size will be allocated for the uncompressed data. + */ + virtual void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) = 0; + /** + * @brief Report the current status of decompression. A StatusCode other than + * `StatusCode::Success` after `reset()` is called indicates the decompression + * was not successful and the reader is in an invalid state. + */ + virtual Status status() const = 0; +}; + +/** + * @brief A "null" compressed reader that directly passes through uncompressed + * data. No internal buffers are allocated. + */ +class MCAP_PUBLIC BufferReader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + BufferReader() = default; + BufferReader(const BufferReader&) = delete; + BufferReader& operator=(const BufferReader&) = delete; + BufferReader(BufferReader&&) = delete; + BufferReader& operator=(BufferReader&&) = delete; + +private: + const std::byte* data_; + uint64_t size_; +}; + +#ifndef MCAP_COMPRESSION_NO_ZSTD +/** + * @brief ICompressedReader implementation that decompresses Zstandard + * (https://facebook.github.io/zstd/) data. + */ +class MCAP_PUBLIC ZStdReader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + /** + * @brief Decompresses an entire Zstd-compressed chunk into `output`. + * + * @param data The Zstd-compressed input chunk. + * @param compressedSize The size of the Zstd-compressed input. + * @param uncompressedSize The size of the data once uncompressed. + * @param output The output vector. This will be resized to `uncompressedSize` to fit the data, + * or 0 if the decompression encountered an error. + * @return Status + */ + static Status DecompressAll(const std::byte* data, uint64_t compressedSize, + uint64_t uncompressedSize, ByteArray* output); + ZStdReader() = default; + ZStdReader(const ZStdReader&) = delete; + ZStdReader& operator=(const ZStdReader&) = delete; + ZStdReader(ZStdReader&&) = delete; + ZStdReader& operator=(ZStdReader&&) = delete; + +private: + Status status_; + ByteArray uncompressedData_; +}; +#endif + +#ifndef MCAP_COMPRESSION_NO_LZ4 +/** + * @brief ICompressedReader implementation that decompresses LZ4 + * (https://lz4.github.io/lz4/) data. + */ +class MCAP_PUBLIC LZ4Reader final : public ICompressedReader { +public: + void reset(const std::byte* data, uint64_t size, uint64_t uncompressedSize) override; + uint64_t read(std::byte** output, uint64_t offset, uint64_t size) override; + uint64_t size() const override; + Status status() const override; + + /** + * @brief Decompresses an entire LZ4-encoded chunk into `output`. + * + * @param data The LZ4-compressed input chunk. + * @param size The size of the LZ4-compressed input. + * @param uncompressedSize The size of the data once uncompressed. + * @param output The output vector. This will be resized to `uncompressedSize` to fit the data, + * or 0 if the decompression encountered an error. + * @return Status + */ + Status decompressAll(const std::byte* data, uint64_t size, uint64_t uncompressedSize, + ByteArray* output); + LZ4Reader(); + LZ4Reader(const LZ4Reader&) = delete; + LZ4Reader& operator=(const LZ4Reader&) = delete; + LZ4Reader(LZ4Reader&&) = delete; + LZ4Reader& operator=(LZ4Reader&&) = delete; + ~LZ4Reader() override; + +private: + void* decompressionContext_ = nullptr; // LZ4F_dctx* + Status status_; + const std::byte* compressedData_; + ByteArray uncompressedData_; + uint64_t compressedSize_; + uint64_t uncompressedSize_; +}; +#endif + +struct LinearMessageView; + +/** + * @brief Options for reading messages out of an MCAP file. + */ +struct MCAP_PUBLIC ReadMessageOptions { +public: + /** + * @brief Only messages with log timestamps greater or equal to startTime will be included. + */ + Timestamp startTime = 0; + /** + * @brief Only messages with log timestamps less than endTime will be included. + */ + Timestamp endTime = MaxTime; + /** + * @brief If provided, `topicFilter` is called on all topics found in the MCAP file. If + * `topicFilter` returns true for a given channel, messages from that channel will be included. + * if not provided, messages from all channels are provided. + */ + std::function topicFilter; + enum struct ReadOrder { FileOrder, LogTimeOrder, ReverseLogTimeOrder }; + /** + * @brief Set the expected order that messages should be returned in. + * if readOrder == FileOrder, messages will be returned in the order they appear in the MCAP file. + * if readOrder == LogTimeOrder, messages will be returned in ascending log time order. + * if readOrder == ReverseLogTimeOrder, messages will be returned in descending log time order. + */ + ReadOrder readOrder = ReadOrder::FileOrder; + + ReadMessageOptions(Timestamp start, Timestamp end) + : startTime(start) + , endTime(end) {} + + ReadMessageOptions() = default; + + /** + * @brief validate the configuration. + */ + Status validate() const; +}; + +/** + * @brief Provides a read interface to an MCAP file. + */ +class MCAP_PUBLIC McapReader final { +public: + ~McapReader(); + + /** + * @brief Opens an MCAP file for reading from an already constructed IReadable + * implementation. + * + * @param reader An implementation of the IReader interface that provides raw + * MCAP data. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the data source is not considered open and McapReader is not + * usable until `open()` is called and a success response is returned. + */ + Status open(IReadable& reader); + /** + * @brief Opens an MCAP file for reading from a given filename. + * + * @param filename Filename to open. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the data source is not considered open and McapReader is not + * usable until `open()` is called and a success response is returned. + */ + Status open(std::string_view filename); + /** + * @brief Opens an MCAP file for reading from a std::ifstream input file + * stream. + * + * @param stream Input file stream to read MCAP data from. + * @return Status StatusCode::Success on success. If a non-success Status is + * returned, the file is not considered open and McapReader is not usable + * until `open()` is called and a success response is returned. + */ + Status open(std::ifstream& stream); + + /** + * @brief Closes the MCAP file, clearing any internal data structures and + * state and dropping the data source reference. + * + */ + void close(); + + /** + * @brief Read and parse the Summary section at the end of the MCAP file, if + * available. This will populate internal indexes to allow for efficient + * summarization and random access. This method will automatically be called + * upon requesting summary data or first seek if Summary section parsing is + * allowed by the configuration options. + */ + Status readSummary( + ReadSummaryMethod method, const ProblemCallback& onProblem = [](const Status&) {}); + + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. If a non-zero `startTime` is provided, + * this will first parse the Summary section (by calling `readSummary()`) if + * allowed by the configuration options and it has not been parsed yet. + * + * @param startTime Optional start time in nanoseconds. Messages before this + * time will not be returned. + * @param endTime Optional end time in nanoseconds. Messages equal to or after + * this time will not be returned. + */ + LinearMessageView readMessages(Timestamp startTime = 0, Timestamp endTime = MaxTime); + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. If a non-zero `startTime` is provided, + * this will first parse the Summary section (by calling `readSummary()`) if + * allowed by the configuration options and it has not been parsed yet. + * + * @param onProblem A callback that will be called when a parsing error + * occurs. Problems can either be recoverable, indicating some data could + * not be read, or non-recoverable, stopping the iteration. + * @param startTime Optional start time in nanoseconds. Messages before this + * time will not be returned. + * @param endTime Optional end time in nanoseconds. Messages equal to or after + * this time will not be returned. + */ + LinearMessageView readMessages(const ProblemCallback& onProblem, Timestamp startTime = 0, + Timestamp endTime = MaxTime); + + /** + * @brief Returns an iterable view with `begin()` and `end()` methods for + * iterating Messages in the MCAP file. + * Uses the options from `options` to select the messages that are yielded. + */ + LinearMessageView readMessages(const ProblemCallback& onProblem, + const ReadMessageOptions& options); + + /** + * @brief Returns starting and ending byte offsets that must be read to + * iterate all messages in the given time range. If `readSummary()` has been + * successfully called and the recording contains Chunk records, this range + * will be narrowed to Chunk records that contain messages in the given time + * range. Otherwise, this range will be the entire Data section if the Data + * End record has been found or the entire file otherwise. + * + * This method is automatically used by `readMessages()`, and only needs to be + * called directly if the caller is manually constructing an iterator. + * + * @param startTime Start time in nanoseconds. + * @param endTime Optional end time in nanoseconds. + * @return Start and end byte offsets. + */ + std::pair byteRange(Timestamp startTime, + Timestamp endTime = MaxTime) const; + + /** + * @brief Returns a pointer to the IReadable data source backing this reader. + * Will return nullptr if the reader is not open. + */ + IReadable* dataSource(); + + /** + * @brief Returns the parsed Header record, if it has been encountered. + */ + const std::optional
& header() const; + /** + * @brief Returns the parsed Footer record, if it has been encountered. + */ + const std::optional