Skip to content

Commit

Permalink
LZ4 - add special magic number
Browse files Browse the repository at this point in the history
For better defferentiation between normal and compressed traffic.
  • Loading branch information
BonnyAD9 committed Mar 28, 2024
1 parent 6690040 commit 395f01b
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
35 changes: 13 additions & 22 deletions output/ipfix.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,12 +804,6 @@ void IPFIXExporter::send_data()
* Loop ends when len = create_data_packet() is 0
*/
while (true) {
// reset for every new connection
// (reset is faster when requested before getWriteBuffer())
if (sequenceNum == 0) {
packetDataBuffer.requestReset();
}

pkt.data = packetDataBuffer.getWriteBuffer(mtu);
if (!pkt.data) {
// this should never happen because packetDataBuffer
Expand Down Expand Up @@ -1065,7 +1059,7 @@ int IPFIXExporter::reconnect()
// compress buffer implementation

CompressBuffer::CompressBuffer() :
shouldCompress(false), shouldReset(false), uncompressed(nullptr),
shouldCompress(false), shouldResetConnection(true), uncompressed(nullptr),
uncompressedSize(0), compressed(nullptr), compressedSize(0), readIndex(0),
readSize(0), lastReadIndex(0), lastReadSize(0), lz4Stream(nullptr) {}

Expand Down Expand Up @@ -1098,7 +1092,7 @@ int CompressBuffer::init(bool compress, size_t compressSize, size_t writeSize)
return -1;
}

shouldReset = true;
shouldResetConnection = true;

return 0;
}
Expand All @@ -1115,10 +1109,8 @@ uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) {

if (readIndex != 0 && readSize + requiredSize <= uncompressedSize) {
if (readSize != 0) {
// move the written data in the buffer to the start of the buffer
memmove(uncompressed, uncompressed + readIndex, sizeof(uint8_t) * readSize);
// the old data is definitely invalid
shouldReset = true;
// getWriteBuffer was called multiple times and it is a problem
return nullptr;
}

// if readSize is 0, this just wraps the circular buffer to the begining
Expand All @@ -1138,7 +1130,7 @@ uint8_t *CompressBuffer::getWriteBuffer(size_t requiredSize) {

// reset the stream if the data is not on the same position
if (shouldCompress && newPtr != uncompressed) {
requestReset();
requestConnectionReset();
}

uncompressed = reinterpret_cast<uint8_t *>(newPtr);
Expand Down Expand Up @@ -1188,7 +1180,7 @@ int CompressBuffer::compress() {
auto com = compressed;
auto comSize = compressedSize;

if (shouldReset) {
if (shouldResetConnection) {
// when reset, the buffer must start at 0
if (readIndex != 0) {
memmove(uncompressed, uncompressed + readIndex, readSize);
Expand All @@ -1198,8 +1190,8 @@ int CompressBuffer::compress() {

// fill the info about new stream

// set the 0 to tell the reciever that this is new stream
*reinterpret_cast<uint32_t *>(com) = 0;
// set the magic number
*reinterpret_cast<uint32_t *>(com) = ntohl(LZ4_MAGIC);
com += 4;
comSize -= 4;

Expand All @@ -1209,7 +1201,7 @@ int CompressBuffer::compress() {
htonl(uncompressedSize + compressedSize);
com += sizeof(ipfix_start_compress_header_t);
comSize -= sizeof(ipfix_start_compress_header_t);
shouldReset = false;
shouldResetConnection = false;
}

// set the info about the current block
Expand All @@ -1230,7 +1222,6 @@ int CompressBuffer::compress() {
);

if (res == 0) {
requestReset();
return -1;
}

Expand All @@ -1254,7 +1245,7 @@ uint8_t *CompressBuffer::reviveLast() {
readIndex = lastReadIndex;

if (shouldCompress) {
requestReset();
requestConnectionReset();
}

return uncompressed + readIndex;
Expand All @@ -1264,7 +1255,7 @@ void CompressBuffer::shrinkTo(size_t size) {
readSize = std::min(readSize, size);
}

void CompressBuffer::requestReset() {
void CompressBuffer::requestConnectionReset() {
if (!shouldCompress) {
return;
}
Expand All @@ -1273,7 +1264,7 @@ void CompressBuffer::requestReset() {
if (readSize == 0) {
readIndex = 0;
}
shouldReset = true;
shouldResetConnection = true;
}

void CompressBuffer::close() {
Expand Down Expand Up @@ -1303,7 +1294,7 @@ void CompressBuffer::close() {
lz4Stream = nullptr;
}

shouldReset = false;
shouldResetConnection = false;
shouldCompress = false;
readIndex = 0;
lastReadIndex = 0;
Expand Down
18 changes: 12 additions & 6 deletions output/ipfix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class CompressBuffer {
// is valid only if no data has been written to the end of the block that
// will be discarded by this call.
//
// The compression can be reset using requestReset(), this will make it so
// The compression can be reset using requestConnectionReset(), this will make it so
// that when decompressing you don't need the previous blocks, but the
// compression will be less effective.
//
Expand All @@ -315,9 +315,9 @@ class CompressBuffer {
// Ideal workflow:
//
// 1. init the buffer with init()
// 2. reset if needed with requestReset()
// 2. reset if needed with requestConnectionReset()
// 3. write to the buffer with getWriteBuffer()
// ideally call getWriteBuffer() only once
// you can call getWriteBuffer() only once
// 4. shrink the data if needed with shrinkTo()
// 5. get the written data with compress() and getCompressed()
// 6. try to avoid calling reviveLast()
Expand Down Expand Up @@ -408,7 +408,7 @@ class CompressBuffer {
* @brief requests that the compression is reset
*
*/
void requestReset();
void requestConnectionReset();

/**
* @brief frees all allocated memory
Expand All @@ -419,14 +419,20 @@ class CompressBuffer {
// the maximum aditional size required to send metadata that are needed to
// to decompress the data, the +4 is there for four 0 bytes that identify
// ipfix_start_compress_header_t
static constexpr size_t C_ADD_SIZE = sizeof(ipfix_compress_header_t) + sizeof(ipfix_start_compress_header_t) + 4;
static constexpr size_t C_ADD_SIZE =
sizeof(ipfix_compress_header_t)
+ sizeof(ipfix_start_compress_header_t)
+ sizeof(uint32_t) * 2;

// LZ4c
static constexpr uint32_t LZ4_MAGIC = 0x4c5a3463;

private:

// false if the buffer is in non-compression mode
bool shouldCompress;
// true if the lz4Stream should be reset
bool shouldReset;
bool shouldResetConnection;

// the buffer with uncompressed data
uint8_t *uncompressed;
Expand Down

0 comments on commit 395f01b

Please sign in to comment.