From 72b215c46a3810155979ce17bc5651de44600f85 Mon Sep 17 00:00:00 2001 From: Tomas Cejka Date: Mon, 7 Aug 2023 16:38:32 +0200 Subject: [PATCH 1/6] build: add dependency on liblz4 due to IPFIX compression feature --- configure.ac | 5 +++++ ipfixprobe.spec.in | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/configure.ac b/configure.ac index 0933f6d4..ebc795c1 100644 --- a/configure.ac +++ b/configure.ac @@ -83,6 +83,11 @@ AC_DEFINE_DIR([DEFAULTSOCKETDIR], [defaultsocketdir], [Default path to socket di AC_CHECK_LIB(atomic, __atomic_store, [libatomic=yes], AC_MSG_ERROR([libatomic not found])) +PKG_CHECK_MODULES([LIBLZ4], [liblz4]) +CFLAGS="$LIBLZ4_CFLAGS $CFLAGS" +CXXFLAGS="$LIBLZ4_CFLAGS $CXXFLAGS" +LIBS="$LIBLZ4_LIBS $LIBS" + ### gtest AC_ARG_WITH([gtest], AC_HELP_STRING([--with-gtest],[Compile ipfixprobe with gtest framework]), diff --git a/ipfixprobe.spec.in b/ipfixprobe.spec.in index 9b039293..8af17f61 100644 --- a/ipfixprobe.spec.in +++ b/ipfixprobe.spec.in @@ -57,8 +57,8 @@ Vendor: CESNET, z.s.p.o. Packager: @USERNAME@ <@USERMAIL@> BuildRoot: %{_tmppath}/%{name}-%{version}-%{release} Summary: IPFIX flow exporter with various extending IPFIX elements exported by plugins. -Requires: libatomic fuse3 telemetry -BuildRequires: gcc gcc-c++ make doxygen pkgconfig libatomic telemetry +Requires: libatomic fuse3 telemetry lz4 +BuildRequires: gcc gcc-c++ make doxygen pkgconfig libatomic telemetry lz4-devel Provides: ipfixprobe %if %{with ndp} From 24ce7b1a7eb15e704e2fe3be09a005849c7158c5 Mon Sep 17 00:00:00 2001 From: Tomas Cejka Date: Mon, 7 Aug 2023 17:27:51 +0200 Subject: [PATCH 2/6] debian: add lz4 dependency --- debian/control | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/debian/control b/debian/control index 04bc258e..878e8b73 100644 --- a/debian/control +++ b/debian/control @@ -3,7 +3,7 @@ Section: net Priority: standard Maintainer: Tomas Cejka Build-Depends: autoconf (>=2.69), pkg-config, libtool, make (>=4.2.1), debhelper (>=9), - openssl, libpcap-dev, libpcap0.8, libssl-dev, libatomic1 + openssl, libpcap-dev, libpcap0.8, libssl-dev, libatomic1, liblz4-dev Standards-Version: 4.5.0 Homepage: https://github.com/CESNET/ipfixprobe Vcs-Git: https://github.com/CESNET/ipfixprobe From 86b69ea4707969bdb53ef8852fa7a2e727d06f20 Mon Sep 17 00:00:00 2001 From: Tomas Cejka Date: Mon, 7 Aug 2023 20:18:52 +0200 Subject: [PATCH 3/6] actions: add lz4 package for build --- .github/workflows/c-cpp.yml | 2 +- .github/workflows/codeql-analysis.yml | 2 +- .github/workflows/coverity.yml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/c-cpp.yml b/.github/workflows/c-cpp.yml index b052b111..cb4919c4 100644 --- a/.github/workflows/c-cpp.yml +++ b/.github/workflows/c-cpp.yml @@ -18,7 +18,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update - sudo apt-get -y install git build-essential autoconf libtool libpcap-dev pkg-config libxml2-dev libunwind-dev libfuse3-dev fuse3 cmake + sudo apt-get -y install git build-essential autoconf libtool libpcap-dev pkg-config libxml2-dev libunwind-dev libfuse3-dev fuse3 cmake liblz4-dev ( git clone --depth 1 https://github.com/CESNET/nemea-framework /tmp/nemea-framework; cd /tmp/nemea-framework; ./bootstrap.sh &&./configure --bindir=/usr/bin/nemea/ -q &&make -j10 && sudo make install; sudo ldconfig) ( git clone --depth 1 https://github.com/CESNET/nemea-modules /tmp/nemea-modules; cd /tmp/nemea-modules; ./bootstrap.sh &&./configure --bindir=/usr/bin/nemea/ -q &&make -j10 && sudo make install; ) ( git clone -b release --depth 1 https://github.com/CESNET/telemetry /tmp/telemetry; cd /tmp/telemetry; mkdir build && cd build; cmake -DCMAKE_INSTALL_PREFIX=/usr .. &&make -j10 && sudo make install; ) diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index b5447f56..00e4d219 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -30,7 +30,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update - sudo apt-get -y install git build-essential autoconf libtool libpcap-dev pkg-config libxml2-dev libfuse3-dev fuse3 cmake + sudo apt-get -y install git build-essential autoconf libtool libpcap-dev pkg-config libxml2-dev libfuse3-dev fuse3 cmake liblz4-dev ( git clone --depth 1 https://github.com/CESNET/nemea-framework /tmp/nemea-framework; cd /tmp/nemea-framework; ./bootstrap.sh &&./configure --bindir=/usr/bin/nemea/ -q &&make -j10 && sudo make install; sudo ldconfig) ( git clone -b release --depth 1 https://github.com/CESNET/telemetry /tmp/telemetry; cd /tmp/telemetry; mkdir build && cd build; cmake -DCMAKE_INSTALL_PREFIX=/usr .. &&make -j10 && sudo make install; ) # Initializes the CodeQL tools for scanning. diff --git a/.github/workflows/coverity.yml b/.github/workflows/coverity.yml index 8425d6df..779022b6 100644 --- a/.github/workflows/coverity.yml +++ b/.github/workflows/coverity.yml @@ -16,7 +16,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update - sudo apt-get -y install git build-essential autoconf libtool libpcap-dev pkg-config libxml2-dev libfuse3-dev fuse3 cmake + sudo apt-get -y install git build-essential autoconf libtool libpcap-dev pkg-config libxml2-dev libfuse3-dev fuse3 cmake liblz4-dev ( git clone --depth 1 https://github.com/CESNET/nemea-framework /tmp/nemea-framework; cd /tmp/nemea-framework; ./bootstrap.sh &&./configure --bindir=/usr/bin/nemea/ -q &&make -j10 && sudo make install; sudo ldconfig) ( git clone --depth 1 https://github.com/CESNET/nemea-modules /tmp/nemea-modules; cd /tmp/nemea-modules; ./bootstrap.sh &&./configure --bindir=/usr/bin/nemea/ -q &&make -j10 && sudo make install; ) ( git clone -b release --depth 1 https://github.com/CESNET/telemetry /tmp/telemetry; cd /tmp/telemetry; mkdir build && cd build; cmake -DCMAKE_INSTALL_PREFIX=/usr .. &&make -j10 && sudo make install; ) From fafd5ff3800f8febdec684be9b4b70d02b7eaa91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Anton=C3=ADn=20=C5=A0tigler?= Date: Tue, 8 Aug 2023 09:16:44 +0200 Subject: [PATCH 4/6] ipfix plugin: add lz4 compression --- output/ipfix.cpp | 327 ++++++++++++++++++++++++++++++++++++++++++++--- output/ipfix.hpp | 226 +++++++++++++++++++++++++++++++- 2 files changed, 531 insertions(+), 22 deletions(-) diff --git a/output/ipfix.cpp b/output/ipfix.cpp index c5a3db4f..aefef4cd 100644 --- a/output/ipfix.cpp +++ b/output/ipfix.cpp @@ -52,6 +52,7 @@ #include #include #include +#include #define __STDC_FORMAT_MACROS #include @@ -141,11 +142,12 @@ IPFIXExporter::IPFIXExporter() : fd(-1), addrinfo(nullptr), host(""), port(4739), protocol(IPPROTO_TCP), ip(AF_UNSPEC), flags(0), non_blocking_tcp(false), + packetDataBuffer(), reconnectTimeout(RECONNECT_TIMEOUT), lastReconnect(0), odid(0), templateRefreshTime(TEMPLATE_REFRESH_TIME), templateRefreshPackets(TEMPLATE_REFRESH_PACKETS), dir_bit_field(0), - mtu(DEFAULT_MTU), packetDataBuffer(nullptr), + mtu(DEFAULT_MTU), tmpltMaxBufferSize(mtu - IPFIX_HEADER_SIZE) { } @@ -163,6 +165,13 @@ void IPFIXExporter::init(const char *params) } catch (ParserError &e) { throw PluginError(e.what()); } + + // check if both compression and udp is enabled + // (compression is not supported with udp) + if (parser.m_lz4_compression && parser.m_udp) { + throw PluginError("Compression (c) is not supported with udp (u)"); + } + verbose = parser.m_verbose; if (verbose) { fprintf(stderr, "VERBOSE: IPFIX export plugin init start\n"); @@ -175,6 +184,28 @@ void IPFIXExporter::init(const char *params) dir_bit_field = parser.m_dir; templateRefreshTime = parser.m_template_refresh_time; + int res; + // check if compression is enabled + if (parser.m_lz4_compression) { + res = packetDataBuffer.init( + true, + LZ4_COMPRESSBOUND(mtu) + CompressBuffer::C_ADD_SIZE, + // mtu * 3 is arbitrary value, it should be more than mtu * 2 + std::max(parser.m_lz4_buffer_size, mtu * 3) + ); + } else { + res = packetDataBuffer.init( + false, + 0, + mtu + ); + } + + if (res) { + packetDataBuffer.close(); + throw PluginError("not enough memory"); + } + if (parser.m_udp) { protocol = IPPROTO_UDP; } @@ -187,10 +218,6 @@ void IPFIXExporter::init(const char *params) throw PluginError("IPFIX message MTU size should be at least " + std::to_string(IPFIX_HEADER_SIZE)); } tmpltMaxBufferSize = mtu - IPFIX_HEADER_SIZE; - packetDataBuffer = (uint8_t *) malloc(sizeof(uint8_t) * mtu); - if (!packetDataBuffer) { - throw PluginError("not enough memory"); - } int ret = connect_to_collector(); if (ret) { @@ -255,10 +282,8 @@ void IPFIXExporter::close() } templates = nullptr; - if (packetDataBuffer != nullptr) { - free(packetDataBuffer); - packetDataBuffer = nullptr; - } + packetDataBuffer.close(); + if (extensions != nullptr) { delete [] extensions; extensions = nullptr; @@ -670,8 +695,8 @@ uint16_t IPFIXExporter::create_template_packet(ipfix_packet_t *packet) totalSize += IPFIX_HEADER_SIZE + IPFIX_SET_HEADER_SIZE; - /* Allocate memory for the packet */ - packet->data = (uint8_t *) malloc(sizeof(uint8_t)*(totalSize)); + /* Get memory for the packet */ + packet->data = packetDataBuffer.getWriteBuffer(totalSize); if (!packet->data) { return 0; } @@ -775,8 +800,6 @@ void IPFIXExporter::send_templates() /* After error, the plugin sends all templates after reconnection, * so we need not concern about it here */ send_packet(&pkt); - - free(pkt.data); } } @@ -786,10 +809,24 @@ void IPFIXExporter::send_templates() void IPFIXExporter::send_data() { ipfix_packet_t pkt; - pkt.data = packetDataBuffer; - /* Send all new templates */ - while (create_data_packet(&pkt)) { + /* Send all new templates + * Loop ends when len = create_data_packet() is 0 + */ + while (true) { + pkt.data = packetDataBuffer.getWriteBuffer(mtu); + if (!pkt.data) { + // this should never happen because packetDataBuffer + // should already have enough allocated memory + return; + } + + auto len = create_data_packet(&pkt); + packetDataBuffer.shrinkTo(len); + if (len == 0) { + return; + } + int ret = send_packet(&pkt); if (ret == 1) { /* Collector reconnected, resend the packet */ @@ -816,10 +853,13 @@ void IPFIXExporter::flush() /** * \brief Sends packet using UDP or TCP as defined in plugin configuration * + * The packet data is take from the packetDataBuffer. + * * When the collector disconnects, tries to reconnect and resend the data * * \param packet Packet to send - * \return 0 on success, -1 on socket error, 1 when data needs to be resent (after reconnect) + * \return 0 on success, -1 on socket error, -2 on compress error, + * 1 when data needs to be resent (after reconnect) */ int IPFIXExporter::send_packet(ipfix_packet_t *packet) { @@ -831,10 +871,13 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet) return -1; } + auto dataLen = packetDataBuffer.compress(); + auto data = packetDataBuffer.getCompressed(); + /* sendto() does not guarantee that everything will be send in one piece */ - while (sent < packet->length) { + while (sent < dataLen) { /* Send data to collector (TCP and SCTP ignores last two arguments) */ - ret = sendto(fd, (void *) (packet->data + sent), packet->length - sent, 0, + ret = sendto(fd, (void *) (data + sent), dataLen - sent, 0, addrinfo->ai_addr, addrinfo->ai_addrlen); /* Check that the data were sent correctly */ @@ -868,7 +911,7 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet) /* Reset the sequences number since it is unique per connection */ sequenceNum = 0; - ((ipfix_header_t *) packet->data)->sequenceNumber = 0; /* no need to change byteorder of 0 */ + ((ipfix_header_t *) packetDataBuffer.reviveLast())->sequenceNumber = 0; /* no need to change byteorder of 0 */ /* Say that we should try to connect and send data again */ return 1; @@ -1111,6 +1154,250 @@ int IPFIXExporter::reconnect() return 0; } +// compress buffer implementation + +CompressBuffer::CompressBuffer() : + shouldCompress(false), shouldResetConnection(true), uncompressed(nullptr), + uncompressedSize(0), compressed(nullptr), compressedSize(0), readIndex(0), + readSize(0), lastReadIndex(0), lastReadSize(0), lz4Stream(nullptr) {} + +int CompressBuffer::init(bool compress, size_t compressSize, size_t writeSize) +{ + shouldCompress = compress; + + if (compress && compressSize < C_ADD_SIZE) { + return -1; + } + + uncompressed = reinterpret_cast(malloc(sizeof(uint8_t) * writeSize)); + if (!uncompressed) { + return -1; + } + uncompressedSize = writeSize; + + if (!compress) { + return 0; + } + + compressed = reinterpret_cast(malloc(sizeof(uint8_t) * compressSize)); + if (!compressed) { + return -1; + } + compressedSize = compressSize; + + lz4Stream = LZ4_createStream(); + if (!lz4Stream) { + return -1; + } + + shouldResetConnection = true; + + return 0; +} + +uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) { + // the contents can happily fit into the buffer + if (requiredSize <= uncompressedSize - readIndex - readSize) { + auto res = uncompressed + readIndex + readSize; + readSize += requiredSize; + return res; + } + + // readIndex is always 0 if the buffer is in non-compress mode + + if (readIndex != 0 && readSize + requiredSize <= uncompressedSize) { + if (readSize != 0) { + // getWriteBuffer was called multiple times and it is a problem + return nullptr; + } + + // if readSize is 0, this just wraps the circular buffer to the begining + readIndex = 0; + + auto res = uncompressed + readSize; + readSize += requiredSize; + return res; + } + + // now it is necesary to resize the buffer + auto newSize = readIndex + readSize + requiredSize; + auto newPtr = realloc(uncompressed, sizeof(uint8_t) * newSize); + if (!newPtr) { + return nullptr; + } + + // reset the stream if the data is not on the same position + if (shouldCompress && newPtr != uncompressed) { + requestConnectionReset(); + } + + uncompressed = reinterpret_cast(newPtr); + uncompressedSize = newSize; + + auto res = uncompressed + readIndex + readSize; + readSize += requiredSize; + return res; +} + +int CompressBuffer::compress() { + // The format is as follows: + // each time the block of compressed data is preceaded by + // the compression header that contains the size of the compressed + // block and the size of the data in the block after it is decompressed + // + // additionaly, with each reset this is also prepended with + // four 0 bytes to signify reset, and the start compress header + // which contains the circular buffer size, so when decompressing + // the buffers can be synchronized. + + if (readSize == 0) { + return 0; + } + + // when not compressing, just map the compressed buffer to the + // uncompressed buffer + if (!shouldCompress) { + compressed = uncompressed; + compressedSize = readSize; + // readIndex stays 0 + readSize = 0; + return compressedSize; + } + + // resize the buffer if it may not be large enough + if (compressedSize < LZ4_COMPRESSBOUND(readSize) + C_ADD_SIZE) { + auto newSize = LZ4_COMPRESSBOUND(readSize); + auto newPtr = realloc(compressed, newSize); + if (newPtr) { + compressedSize = newSize; + compressed = reinterpret_cast(newPtr); + } + // even if the reallocation fails the buffer may still be large enough + } + + auto com = compressed; + auto comSize = compressedSize; + + if (shouldResetConnection) { + // when reset, the buffer must start at 0 + if (readIndex != 0) { + memmove(uncompressed, uncompressed + readIndex, readSize); + readIndex = 0; + } + LZ4_resetStream(lz4Stream); + + // fill the info about new stream + + // set the magic number + *reinterpret_cast(com) = ntohl(LZ4_MAGIC); + com += 4; + comSize -= 4; + + // set the recommended ring buffer size - large enough ring buffer so + // that it doesn't need to be perfectly synchronized + reinterpret_cast(com)->bufferSize = + htonl(uncompressedSize + compressedSize); + com += sizeof(ipfix_start_compress_header_t); + comSize -= sizeof(ipfix_start_compress_header_t); + shouldResetConnection = false; + } + + // set the info about the current block + auto hdr = reinterpret_cast(com); + hdr->uncompressedSize = htons(readSize); + + com += sizeof(ipfix_compress_header_t); + comSize -= sizeof(ipfix_compress_header_t); + + // compress the data + auto res = LZ4_compress_fast_continue( + lz4Stream, + reinterpret_cast(uncompressed + readIndex), + reinterpret_cast(com), + readSize, + comSize, + 0 // 0 is default + ); + + if (res == 0) { + return -1; + } + + hdr->compressedSize = htons(res); + + lastReadIndex = readIndex; + lastReadSize = readSize; + + readIndex += readSize; + readSize = 0; + + return res + (com - compressed); +} + +const uint8_t *CompressBuffer::getCompressed() const { + return compressed; +} + +uint8_t *CompressBuffer::reviveLast() { + readSize = lastReadSize; + readIndex = lastReadIndex; + + if (shouldCompress) { + requestConnectionReset(); + } + + return uncompressed + readIndex; +} + +void CompressBuffer::shrinkTo(size_t size) { + readSize = std::min(readSize, size); +} + +void CompressBuffer::requestConnectionReset() { + if (!shouldCompress) { + return; + } + + // reset is costly when readIndex != 0 + if (readSize == 0) { + readIndex = 0; + } + shouldResetConnection = true; +} + +void CompressBuffer::close() { + if (uncompressed) { + free(uncompressed); + uncompressedSize = 0; + uncompressed = nullptr; + } + + readSize = 0; + lastReadSize = 0; + + if (!shouldCompress) { + compressed = nullptr; + compressedSize = 0; + return; + } + + if (compressed) { + free(compressed); + compressed = nullptr; + compressedSize = 0; + } + + if (lz4Stream) { + LZ4_freeStream(lz4Stream); + lz4Stream = nullptr; + } + + shouldResetConnection = false; + shouldCompress = false; + readIndex = 0; + lastReadIndex = 0; +} + #define GEN_FIELDS_SUMLEN_INT(FIELD) FIELD_LEN(FIELD) + #define GEN_FILLFIELDS_INT(TMPLT) IPFIX_FILL_FIELD(p, TMPLT); #define GEN_FILLFIELDS_MAXLEN(TMPLT) IPFIX_FILL_FIELD(p, TMPLT); diff --git a/output/ipfix.hpp b/output/ipfix.hpp index c3f53713..f1a1a4e0 100644 --- a/output/ipfix.hpp +++ b/output/ipfix.hpp @@ -40,6 +40,9 @@ #include #include +#include + +#include #include #include @@ -78,10 +81,13 @@ class IpfixOptParser : public OptionsParser uint32_t m_dir; uint32_t m_template_refresh_time; bool m_verbose; + int m_lz4_buffer_size; + bool m_lz4_compression; + IpfixOptParser() : OptionsParser("ipfix", "Output plugin for ipfix export"), m_host("127.0.0.1"), m_port(4739), m_mtu(DEFAULT_MTU), m_udp(false), m_non_blocking_tcp(false), m_id(DEFAULT_EXPORTER_ID), m_dir(0), - m_template_refresh_time(TEMPLATE_REFRESH_TIME), m_verbose(false) + m_template_refresh_time(TEMPLATE_REFRESH_TIME), m_verbose(false), m_lz4_buffer_size(0), m_lz4_compression(false) { register_option("h", "host", "ADDR", "Remote collector address", [this](const char *arg){m_host = arg; return true;}, OptionFlags::RequiredArgument); register_option("p", "port", "PORT", "Remote collector port", @@ -102,6 +108,10 @@ class IpfixOptParser : public OptionsParser [this](const char *arg){try {m_template_refresh_time = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, OptionFlags::RequiredArgument); register_option("v", "verbose", "", "Enable verbose mode", [this](const char *arg){m_verbose = true; return true;}, OptionFlags::NoArgument); + register_option("c", "lz4-compression", "", "Enable lz4 compression", [this](const char *arg){m_lz4_compression = true; return true;}, OptionFlags::NoArgument); + register_option("s", "lz4-buffer-size", "", "Lz4 compression buffer size (default (minimum): mtu*3)", + [this](const char *arg){try {m_lz4_buffer_size = str2num(arg);} catch(std::invalid_argument &e) {return false;} return true;}, + OptionFlags::RequiredArgument); } }; @@ -231,6 +241,217 @@ typedef struct ipfix_template_set_header { } ipfix_template_set_header_t; +/** + * @brief the header used for compressed data, all values are in big-endian + * + */ +typedef struct { + /** + * size of the data after it is decompressed (not including this header) + */ + uint16_t uncompressedSize; + + /** + * size of the data after when it is compressed (not including this header) + */ + uint16_t compressedSize; +} ipfix_compress_header_t; + +/** + * @brief the header that is used when the compress stream is reset to + * allow the reciever to use synchronized buffers when decompressing + * + */ +typedef struct { + /** + * @brief size of the used curcullar buffer, allows synchronization of the + * reciever buffer with the sender buffer + * + */ + uint32_t bufferSize; +} ipfix_start_compress_header_t; + +/** + * circular buffer with compression, it can also work in non-compression mode + * as a regural buffer + */ +class CompressBuffer { + // In non-compression mode: + // + // Acts as a buffer that can be extended by calls to getWriteBuffer(). + // You can call compress() to get all the data written since last call + // to compress(). + // + // In compression mode: + // + // This is circular buffer of blocks of memory where the blocks cannot be + // wrapped (part of the block on the end and part on the start of the + // buffer). + // + // Each block is single compression block and so the blocks + // are separated by calls to compress(). + // + // Data to the buffer is written using getWriteBuffer(), this + // requests block of memory in the buffer of the given size. + // If getWriteBuffer() is called again before calling compress() the + // block is extended. Extending the block may require moving the block + // or reallocation so rather request more memory and than call shrinkTo(). + // + // You can shorten the last block using the method shrinkTo(), this + // is valid only if no data has been written to the end of the block that + // will be discarded by this call. + // + // The compression can be reset using requestConnectionReset(), this will make it so + // that when decompressing you don't need the previous blocks, but the + // compression will be less effective. + // + // If you wan't to 'undo' the last compressed block you can call + // reviveLast(), this will reset the last block as if compress() was not + // called, and return the pointer to the block, but it will reset the + // compression. + // + // Ideal workflow: + // + // 1. init the buffer with init() + // 2. reset if needed with requestConnectionReset() + // 3. write to the buffer with getWriteBuffer() + // you can call getWriteBuffer() only once + // 4. shrink the data if needed with shrinkTo() + // 5. get the written data with compress() and getCompressed() + // 6. try to avoid calling reviveLast() + // 7. repeat from 2. + // 8. free all resources by calling close() + // + // this workflow works as expected in both compression and non-compression + // modes. +public: + /** + * @brief create uninitialized compressin buffer. Initialize it with the + * method init() + * + */ + CompressBuffer(); + ~CompressBuffer() + { + close(); + } + + /** + * @brief init the compression buffer, when it fails, you should call close + * + * @param compress when false, the buffer will be in non-compression mode + * @param compressSize size of the compress buffer, ignored in non-compression mode + * (the buffer will be resized automatically if needed) + * @param writeSize size of the write buffer + * (the buffer will be resized automatically if needed) + * + * @return 0 on success + */ + int init(bool compress, size_t compressSize, size_t writeSize); + + /** + * @brief Gets buffer to write to with the required size + * + * Note: calling this motehod multiple times before calling + * compress can be quite slow. (one call with larger required + * size may be faster than multiple calls that add up to the + * same size) + * + * @param requiredSize required size of the buffer + * @return pointer to the buffer with the required size, null on failure + */ + uint8_t *getWriteBuffer(size_t requiredSize); + + /** + * @brief compresses data written after last compress() call + * + * In non-compression mode returns the size of the written data. + * and prepares the uncompressed data for getCompressed() + * + * @return size of the data returned by getCompressed(), valid only until + * another call to compress(). + * Negative on error. + * + * In non-compression mode, the data returned by getCompressed() may + * be also invalid after a call to getWriteBuffer() + */ + int compress(); + + /** + * @brief gets the data compressed by call to compress(), the pointer and + * data is valid only until another call to compress(). + * The size of the data is the value returned by compress. + * + * In non-compression mode, returns the data written since the last + * compress() call, and the data is invalidated with call to + * getWriteBuffer(). + * + * @return compressed data of size returned by compress() + */ + const uint8_t *getCompressed() const; + + /** + * @brief sets the last compressed block as for compression and requests reset + */ + uint8_t *reviveLast(); + + /** + * @brief shrinks the uncompressed data size + * + * @param size size to shrink to + */ + void shrinkTo(size_t size); + + /** + * @brief requests that the compression is reset + * + */ + void requestConnectionReset(); + + /** + * @brief frees all allocated memory + * + */ + void close(); + + // the maximum aditional size required to send metadata that are needed to + // to decompress the data, the +4 is there for four 0 bytes that identify + // ipfix_start_compress_header_t + static constexpr size_t C_ADD_SIZE = + sizeof(ipfix_compress_header_t) + + sizeof(ipfix_start_compress_header_t) + + sizeof(uint32_t) * 2; + + // LZ4c + static constexpr uint32_t LZ4_MAGIC = 0x4c5a3463; + +private: + + // false if the buffer is in non-compression mode + bool shouldCompress; + // true if the lz4Stream should be reset + bool shouldResetConnection; + + // the buffer with uncompressed data + uint8_t *uncompressed; + size_t uncompressedSize; + + // the buffer with compressed data + uint8_t *compressed; + size_t compressedSize; + + // index to the uncompressed block of data + size_t readIndex; + size_t readSize; + + // last compressed data position + size_t lastReadIndex; + size_t lastReadSize; + + // compression stream used by lz4 + LZ4_stream_t *lz4Stream; +}; + class IPFIXExporter : public OutputPlugin { public: @@ -271,6 +492,8 @@ class IPFIXExporter : public OutputPlugin int flags; /**< getaddrinfo flags */ bool non_blocking_tcp; + CompressBuffer packetDataBuffer; + uint32_t reconnectTimeout; /**< Timeout between connection retries */ time_t lastReconnect; /**< Time in seconds of last connection retry */ uint32_t odid; /**< Observation Domain ID */ @@ -279,7 +502,6 @@ class IPFIXExporter : public OutputPlugin uint32_t dir_bit_field; /**< Direction bit field value. */ uint16_t mtu; /**< Max size of packet payload sent */ - uint8_t *packetDataBuffer; /**< Data buffer to store packet */ uint16_t tmpltMaxBufferSize; /**< Size of template buffer, tmpltBufferSize < packetDataBuffer */ void init_template_buffer(template_t *tmpl); From 836651902013dd649e30cb5444a8f401c83dc0bc Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Wed, 28 Aug 2024 13:15:47 +0200 Subject: [PATCH 5/6] README.md - add LZ4 compression info --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index 4b5a54f1..cf8e914f 100644 --- a/README.md +++ b/README.md @@ -114,6 +114,9 @@ The output flow records are composed of information provided by the enabled plug See `ipfixprobe -h output` for more information and complete list of output plugins and their parameters. +LZ4 compression: +ipfix plugin supports LZ4 compression algorithm over tcp. See plugin's help for more information. + ## Parameters ### Module specific parameters - `-i ARGS` Activate input plugin (-h input for help) From a72a1b80e02d48bd7ef45bab13772e329a3706ac Mon Sep 17 00:00:00 2001 From: Pavel Siska Date: Wed, 28 Aug 2024 13:39:51 +0200 Subject: [PATCH 6/6] ipfixprobed - add new option LZ4_COMPRESSION to init script and config example --- init/ipfixprobed | 7 ++++++- init/link0.conf.example | 3 +++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/init/ipfixprobed b/init/ipfixprobed index daf57e24..bb6676e2 100644 --- a/init/ipfixprobed +++ b/init/ipfixprobed @@ -118,7 +118,12 @@ if [ -e "$CONFFILE" ]; then output_affinity="@$OUTPUT_WORKER_CPU" fi - output="-o ipfix$output_affinity;host=${HOST:-127.0.0.1};port=${PORT:-4739};id=${LINK:-0};dir=${DIR:-0};${UDP_PARAM};${NON_BLOCKING_TCP_PARAM};template=${TEMPLATE_REFRESH_RATE:-300}" + LZ4_COMPRESSION_PARAM="" + if [[ $LZ4_COMPRESSION == "yes" ]]; then + LZ4_COMPRESSION_PARAM="lz4-compression"; + fi + + output="-o ipfix$output_affinity;host=${HOST:-127.0.0.1};port=${PORT:-4739};id=${LINK:-0};dir=${DIR:-0};${UDP_PARAM};${NON_BLOCKING_TCP_PARAM};${LZ4_COMPRESSION_PARAM};template=${TEMPLATE_REFRESH_RATE:-300}" telemetry="" if [ "$USE_FUSE" = "1" ]; then diff --git a/init/link0.conf.example b/init/link0.conf.example index 25e394a3..a3891cb5 100644 --- a/init/link0.conf.example +++ b/init/link0.conf.example @@ -197,6 +197,9 @@ TEMPLATE_REFRESH_RATE=300 # Define output worker (thread) affinity, e.g. CPU core isolated from the scheduler #OUTPUT_WORKER_CPU=12 +# Enable LZ4 compression (only with TCP) +LZ4_COMPRESSION=no + ####### Fuse telemetry USE_FUSE=0