From 8c09c078b8f9df6a252cbf591569e600eeae247f Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Wed, 3 Jul 2024 23:30:20 +0200 Subject: [PATCH 1/6] Add mqtt process plugin --- include/ipfixprobe/ipfix-elements.hpp | 17 ++ process/mqtt.cpp | 251 ++++++++++++++++++++++++++ process/mqtt.hpp | 177 ++++++++++++++++++ 3 files changed, 445 insertions(+) create mode 100644 process/mqtt.cpp create mode 100644 process/mqtt.hpp diff --git a/include/ipfixprobe/ipfix-elements.hpp b/include/ipfixprobe/ipfix-elements.hpp index c1c64d52..0f4d1f0a 100644 --- a/include/ipfixprobe/ipfix-elements.hpp +++ b/include/ipfixprobe/ipfix-elements.hpp @@ -295,6 +295,14 @@ namespace ipxp { #define NTS_TIME_DISTRIBUTION(F) F(8057, 1031, 4, nullptr) #define NTS_SWITCHING_RATIO(F) F(8057, 1032, 4, nullptr) +#define MQTT_TYPE_CUMULATIVE(F) F(8057, 1033, 2, nullptr) +#define MQTT_VERSION(F) F(8057, 1034, 1, nullptr) +#define MQTT_CONNECTION_FLAGS(F) F(8057, 1035, 1, nullptr) +#define MQTT_KEEP_ALIVE(F) F(8057, 1036, 2, nullptr) +#define MQTT_LAST_RETURN_CODE(F) F(8057, 1037, 1, nullptr) +#define MQTT_PUBLISH_FLAGS(F) F(8057, 1038, 1, nullptr) +#define MQTT_TOPICS(F) F(8057, 1039, -1, nullptr) + #define MPLS_TOP_LABEL_STACK_SECTION F(0, 70, -1, nullptr) @@ -443,6 +451,15 @@ namespace ipxp { F(SIP_USER_AGENT) \ F(SIP_REQUEST_URI) \ F(SIP_VIA) + +#define IPFIX_MQTT_TEMPLATE(F) \ + F(MQTT_TYPE_CUMULATIVE) \ + F(MQTT_VERSION) \ + F(MQTT_CONNECTION_FLAGS) \ + F(MQTT_KEEP_ALIVE) \ + F(MQTT_LAST_RETURN_CODE) \ + F(MQTT_PUBLISH_FLAGS) \ + F(MQTT_TOPICS) #define IPFIX_PSTATS_TEMPLATE(F) \ F(STATS_PCKT_SIZES) \ diff --git a/process/mqtt.cpp b/process/mqtt.cpp new file mode 100644 index 00000000..ba7b2292 --- /dev/null +++ b/process/mqtt.cpp @@ -0,0 +1,251 @@ +/** +* \file mqtt.hpp +* \brief MQTT plugin for ipfixprobe +* \author Damir Zainullin +* \date 2024 + */ +/* +* Copyright (C) 2023 CESNET +* +* LICENSE TERMS +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in +* the documentation and/or other materials provided with the +* distribution. +* 3. Neither the name of the Company nor the names of its contributors +* may be used to endorse or promote products derived from this +* software without specific prior written permission. + */ +#include "mqtt.hpp" +#include "cstring" +#ifdef DEBUG_MQTT +static const bool debug_mqtt = true; +#else +static const bool debug_mqtt = false; +#endif +namespace ipxp { + +int RecordExtMQTT::REGISTERED_ID = -1; + +__attribute__((constructor)) static void register_this_plugin(){ + static PluginRecord rec = PluginRecord("mqtt", []() { return new MQTTPlugin(); }); + register_plugin(&rec); + RecordExtMQTT::REGISTERED_ID = register_extension(); +} + +int MQTTPlugin::post_create(Flow &rec, const Packet &pkt){ + if (has_mqtt_protocol_name(reinterpret_cast(pkt.payload), pkt.payload_len)) + add_ext_mqtt(reinterpret_cast(pkt.payload), pkt.payload_len, rec); + return 0; +} + +int MQTTPlugin::pre_update(Flow &rec, Packet &pkt){ + const char* payload = reinterpret_cast(pkt.payload); + RecordExt* ext = rec.get_extension(RecordExtMQTT::REGISTERED_ID); + if (ext == nullptr) { + return 0; + }else{ + parse_mqtt(payload, pkt.payload_len, static_cast(ext)); + } + return 0; +} + +/** + * \brief Read variable integer as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. + * \param [in] data Pointer to IP payload. + * \param [in] payload_len IP payload length. + * \param [in] last_byte Next after last read byte. + * \return Pair of read integer and bool. Bool is false in case read was unsuccessful. + */ +std::pair MQTTPlugin::read_variable_int(const char* data, int payload_len, uint32_t& last_byte)const noexcept{ + uint32_t res = 0; + bool next; + for( next = true; next && last_byte < (uint32_t)payload_len; last_byte++){ + res <<= 8; + res |= data[last_byte]; + next = (data[last_byte] & 0b1000'0000); + } + return last_byte == (uint32_t)payload_len && next ? std::make_pair(0u, false) : std::make_pair(res, true); +} + +/** + * \brief Read utf8 encoded string as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. + * \param [in] data Pointer to IP payload. + * \param [in] payload_len IP payload length. + * \param [in] last_byte Next after last read byte. + * \return Tuple of read string, its length and bool. Bool is false in case read was unsuccessful. + */ +std::tuple MQTTPlugin::read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept{ + if (last_byte + 2 >= (uint32_t)payload_len) + return {0, {}, false}; + uint16_t string_length = ntohs(*(uint16_t*)&data[last_byte]); + last_byte += 2; + if (last_byte + string_length >= (uint32_t)payload_len) + return {0, {}, false}; + return {string_length, std::string_view(&data[last_byte], string_length), true}; +} + +/** + * \brief Parse buffer to check if it contains MQTT packets. + * \param [in] data Pointer to IP payload. + * \param [in] payload_len IP payload length. + * \param [in,out] rec Record to write MQTT data in. + * \return True if buffer contains set of valid mqtt packets. + */ +bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept{ + if (payload_len <= 0) + return false; + uint32_t last_byte = 0; + // Each tcp segment may contain more MQTT packets + while(last_byte < (uint32_t)payload_len) { + uint8_t type, flags; + type = flags = data[last_byte++]; + type >>= 4; + flags &= 0b00001111; + rec->type_cumulative |= (0b1 << type); + + auto [remaining_length, success] = read_variable_int(data, payload_len, last_byte); + if (!success || last_byte + remaining_length > (uint32_t) payload_len) { + if constexpr (debug_mqtt) + std::cout << "Invalid remaining length read" << std::endl; + return false; + } + auto first_byte_after_payload = remaining_length + last_byte; + // Connect packet + if (type == 1) { + if (!has_mqtt_protocol_name(data, payload_len)){ + if constexpr (debug_mqtt) + std::cout << "Connection packet doesn't have MQTT label" << std::endl; + return false; + } + last_byte += 6; // Skip "MQTT" label(and its 2-byte length) + rec->version = data[last_byte++]; + //Only MQTT v3.1.1 and v5.0 are supported + if (rec->version != 4 && rec->version != 5){ + if constexpr (debug_mqtt) + std::cout << "Unsupported mqtt version" << std::endl; + return false; + } + rec->connection_flags = data[last_byte++]; + rec->keep_alive = ntohs(*(uint16_t*) &data[last_byte]); + } + // Connect ACK packet + else if (type == 2) { + rec->session_present_flag = data[last_byte++] & 0b1; /// Connect Acknowledge Flags + rec->connection_return_code = data[last_byte++]; + } + // Publish packet + else if (type == 3) { + rec->publish_flags |= flags; + auto [str_len, str, success] = read_utf8_string(data, payload_len, last_byte); + if (!success) { + if constexpr (debug_mqtt) + std::cout << "Invalid utf8 string read" << std::endl; + return false; + } + if (str.find('#') != std::string::npos) { + if constexpr (debug_mqtt) + std::cout << "Topic name contains wildcard char" << std::endl; + return false; + } + // Use '#' as delimiter, as '#' and '?' are only forbidden characters for topic name + if (rec->topics.count++ < maximal_topic_count) { + rec->topics.str += std::move(std::string(str.begin(), str.end()).append("#")); + } + } + // Disconnect packet + else if (type == 14) { + flow_flush = true; + } + // Read packet identifier + /*if ( (rec->type >= 3 && rec->type <= 11) || (rec->type == 3 && ((rec->flags >> 1) & 0b11)!= 0)){ + rec->id = *((const uint16_t*)(&data[last_byte])); last_byte += 2; + }*/ + + // Read properties (only for MQTT v5.0) + /*if (rec->type <= 11 || rec->type >= 14){ + rec->property_length = read_variable_int(data, payload_len, last_byte); + read_properties(data, payload_len, last_byte, rec); + }*/ + + last_byte = first_byte_after_payload; // Skip rest of payload + } + return true; +} + +int MQTTPlugin::post_update(Flow &rec, const Packet &pkt){ + if (flow_flush){ + flow_flush = false; + return FLOW_FLUSH; + } + return 0; +} + +void MQTTPlugin::pre_export(Flow &rec){ + if constexpr (debug_mqtt){ + RecordExtMQTT* ext = static_cast(rec.get_extension(RecordExtMQTT::REGISTERED_ID)); + if (ext == nullptr) + return; + /*std::cout << "MQTT Exports: type cumulative = " << std::to_string(ext->type_cumulative) + << ", version = " << std::to_string(ext->version) << ", con_flags = " + << std::to_string(ext->connection_flags) << ",keep_alive = " + << std::to_string(ext->keep_alive)<< ",\nsession_present= " + << std::to_string(ext->session_present_flag) << ", last return code= " + << std::to_string(ext->last_connect_return_code) << ", publish flags= " + << std::to_string(ext->publish_flags) << ",\npublished topics: "; + for(auto it = ext->topics.begin(); it != ext->topics.end(); it++) + std::cout << (it == ext->topics.begin() ? "" : ", ") << *it; + std::cout << std::endl;*/ + } + return; +} + +/** + * \brief Parse buffer to check if it contains MQTT packets. + * \param [in] data Pointer to IP payload. + * \param [in] payload_len IP payload length. + * \return True if buffer starts with MQTT label as part of connection mqtt packet. + */ +bool MQTTPlugin::has_mqtt_protocol_name(const char* data, int payload_len) const noexcept{ + if (payload_len <= 1) + return false; + auto pos = 1u; + if (auto [_, success] = read_variable_int(data,payload_len, pos); !success) + return false; + auto [string_length, str, success] = read_utf8_string(data,payload_len, pos); + return success && str == "MQTT"; +} + +void MQTTPlugin::add_ext_mqtt(const char* data, int payload_len, Flow& flow) +{ + if (recPrealloc == nullptr) { + recPrealloc = new RecordExtMQTT(); + } + if (!parse_mqtt(data, payload_len, recPrealloc)) + return ; + flow.add_extension(recPrealloc); + recPrealloc = nullptr; +} + +void MQTTPlugin::init(const char *params){ + MQTTOptionsParser parser; + try { + parser.parse(params); + } catch (ParserError &e) { + throw PluginError(e.what()); + } + maximal_topic_count = parser.m_maximal_topic_count; +} + +ProcessPlugin* MQTTPlugin::copy() +{ + return new MQTTPlugin(*this); +} + +} // namespace ipxp \ No newline at end of file diff --git a/process/mqtt.hpp b/process/mqtt.hpp new file mode 100644 index 00000000..de752a67 --- /dev/null +++ b/process/mqtt.hpp @@ -0,0 +1,177 @@ +/** +* \file mqtt.hpp +* \brief MQTT plugin for ipfixprobe +* \author Damir Zainullin +* \date 2024 +*/ +/* +* Copyright (C) 2023 CESNET +* +* LICENSE TERMS +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* 1. Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* 2. Redistributions in binary form must reproduce the above copyright +* notice, this list of conditions and the following disclaimer in +* the documentation and/or other materials provided with the +* distribution. +* 3. Neither the name of the Company nor the names of its contributors +* may be used to endorse or promote products derived from this +* software without specific prior written permission. +*/ +#ifndef CACHE_CPP_TPLUGIN_HPP +#define CACHE_CPP_TPLUGIN_HPP + + +#ifdef WITH_NEMEA +#include "fields.h" +#endif + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ipxp { + +#define MQTT_UNIREC_TEMPLATE "MQTT_TYPE_CUMULATIVE, MQTT_VERSION, MQTT_CONNECTION_FLAGS, MQTT_KEEP_ALIVE, MQTT_CONNECTION_RETURN_CODE, MQTT_PUBLISH_FLAGS, MQTT_TOPICS" + +UR_FIELDS ( + uint16 MQTT_TYPE_CUMULATIVE, + uint8 MQTT_VERSION, + uint8 MQTT_CONNECTION_FLAGS, + uint16 MQTT_KEEP_ALIVE, + uint8 MQTT_CONNECTION_RETURN_CODE, + uint8 MQTT_PUBLISH_FLAGS, + string MQTT_TOPICS +) + +class MQTTOptionsParser : public OptionsParser +{ +public: + uint32_t m_maximal_topic_count; ///< Maximal count of topics from Publish packet header to store for each flow + + MQTTOptionsParser() : OptionsParser("mqtt", "Parse MQTT traffic"), m_maximal_topic_count(0) + { + register_option( + "tc", + "topiccount", + "count", + "Export first tc topics from Publish packet header. Topics are separated by #. Default value is 0.", + [this](const char *arg){try { + m_maximal_topic_count = str2num(arg); + } catch(std::invalid_argument &e) { + return false; + } + return true; + }, + OptionFlags::RequiredArgument); + } +}; + +struct RecordExtMQTT : public RecordExt { + static int REGISTERED_ID; + RecordExtMQTT() : + RecordExt(REGISTERED_ID), + type_cumulative(0), + version(0), + connection_flags(0), + keep_alive(0), + session_present_flag(false), + connection_return_code(0), + publish_flags(0) {} + + uint16_t type_cumulative; ///< Types of packets presented during communication and session present flag. DISCONNECT(1b) | PINGRESP(1b) | PINGREQ(1b) | UNSUBACK(1b) | UNSUBSCRIBE(1b) | SUBACK(1b) | SUBSCRIBE(1b) | PUBCOMP(1b) | PUBREL(1b) | PUBREC(1b) | PUBACK(1b) | PUBLISH(1b) | CONNACK(1b) | CONNECT(1b) | session present(1b) + uint8_t version; ///< Used version of MQTT from last connection packet + //Connect + uint8_t connection_flags; ///< Last connection flags: Username flag(1b) | Password flag(1b) | Will retain(1b) | Will QoS(2b) | Clean Session(1b) | 0(1b) + uint16_t keep_alive; ///< Last connection keep alive (seconds) + //CONNACK + bool session_present_flag; ///< Session present bit from last connack flags. First bit of type_cumulative + uint8_t connection_return_code; ///< Value of last connection return code from CONNACK header + //PUBLISH + uint8_t publish_flags; ///< Cumulative of Publish header flags + struct { + std::string str; + uint32_t count = 0; + } topics; ///< Struct to keep all recorded and concatenated topics from Publish header and its count + + virtual int fill_ipfix(uint8_t *buffer, int size) + { + auto max_length = 8u + topics.str.size() + 3u; + if ((uint32_t)size < max_length) + return -1; + *(uint16_t*) (buffer) = ntohs(type_cumulative | session_present_flag); + *(buffer + 2) = version; + *(buffer + 3) = connection_flags; + *(uint16_t*) (buffer + 4) = ntohs(keep_alive); + *(buffer + 6) = connection_return_code; + *(buffer + 7) = publish_flags; + auto total_length = 8u; + total_length += variable2ipfix_buffer(buffer + total_length,(uint8_t*) topics.str.c_str(), topics.str.size()); + return total_length; + } + + const char **get_ipfix_tmplt() const + { + static const char *ipfix_template[] = { + IPFIX_MQTT_TEMPLATE(IPFIX_FIELD_NAMES) + nullptr + }; + return ipfix_template; + } +#ifdef WITH_NEMEA + void fill_unirec(ur_template_t *tmplt, void *record) override + { + ur_set(tmplt, record, F_MQTT_TYPE_CUMULATIVE, type_cumulative | session_present_flag); + ur_set(tmplt, record, F_MQTT_VERSION, version); + ur_set(tmplt, record, F_MQTT_CONNECTION_FLAGS, connection_flags); + ur_set(tmplt, record, F_MQTT_KEEP_ALIVE, keep_alive); + ur_set(tmplt, record, F_MQTT_CONNECTION_RETURN_CODE, connection_return_code); + ur_set(tmplt, record, F_MQTT_PUBLISH_FLAGS, publish_flags); + ur_set_string(tmplt, record, F_MQTT_TOPICS, topics.str.c_str()); + } + + const char *get_unirec_tmplt() const + { + return MQTT_UNIREC_TEMPLATE; + } +#endif +}; + + + +class MQTTPlugin : public ProcessPlugin +{ +public: + int post_create(Flow &rec, const Packet &pkt) override; + int pre_update(Flow &rec, Packet &pkt) override; + void pre_export(Flow &rec) override; + int post_update(Flow &rec, const Packet &pkt) override; + RecordExt *get_ext() const { return new RecordExtMQTT(); } + OptionsParser *get_parser() const { return new MQTTOptionsParser(); } + std::string get_name() const { return "mqtt"; } + ProcessPlugin *copy(); + void init(const char *params) override; +private: + bool flow_flush = false; ///< Tell storage plugin to flush current Flow. + uint32_t maximal_topic_count = 0; ///< Maximal count of topics from Publish packet header to store for each flow + RecordExtMQTT *recPrealloc; ///< Preallocated extension. + bool parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept; + void add_ext_mqtt(const char *data, int payload_len, Flow &flow); + std::pair read_variable_int(const char* data, int payload_len, uint32_t& last_byte)const noexcept; + std::tuple read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept; + bool has_mqtt_protocol_name(const char* data, int payload_len) const noexcept; + +}; + +} // namespace ipxp + +#endif // CACHE_CPP_TPLUGIN_HPP From a561e052b26055005b6fe8188b87a346f5e43952 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Wed, 3 Jul 2024 23:38:10 +0200 Subject: [PATCH 2/6] Add mqtt plugin tests --- pcaps/mqtt.pcap | Bin 0 -> 3148 bytes tests/functional/Makefile.am | 7 +++++-- tests/functional/mqtt.sh | 8 ++++++++ tests/functional/reference/mqtt | 6 ++++++ 4 files changed, 19 insertions(+), 2 deletions(-) create mode 100644 pcaps/mqtt.pcap create mode 100755 tests/functional/mqtt.sh create mode 100644 tests/functional/reference/mqtt diff --git a/pcaps/mqtt.pcap b/pcaps/mqtt.pcap new file mode 100644 index 0000000000000000000000000000000000000000..c4fb7205636ca95edc05fe109fb9160514f3e722 GIT binary patch literal 3148 zcmb7`ZA@Eb6vv;ImjYvgg9%$0Tm~W_Ewn%b?j?&11`(LHY^dcWr7ciPU%2~56^R>;&CGM2_O@_Wn(ZVfy_c5f ze}4b-oO_?J&6~p}2_a>>pWQBj&j;?15*1OZ_(tY{$;On}cvDlVCXFeuv!>LFvNXn= zD_5#=va*Vp)FWp2l~!Arqfn(05~jT>;q)NnlkeRH3)ElTi336#f$a7&YMvxK=PEk2et0W7lJTZ@YF_1J{@! zA%P!ktsan76B0rS33-0BejnKvyd~*ETu8pIy}D*NQKqZ1C&tfzd*yOhtmKYW4|PgH z!dQ)?#%cLCIWn9Cmc3L_5hx|QaY@ud%|^U|K+VGNgqMrq8WX~$dGn(rw`N{@^gfau ztnWlU)RpOm6HyOzGeGmdgS{?I(91_%M(SE0bxzVHT{@A`vsEUz$MnH=Xta^CJt1Pt z0IhM#2F1JCzC^981QN{K9cF{lYT`KP8*k!x25FhrY=u&x1R2y9^L94;TvY^r$MjK$ zdelto$Pm?0M(fxH9v|}5F^P3JVVOoq)aio6Kwr;qim!pj5lkOCWw8^e^Vyi7b7Nel z3oAZ5Rw{XWC*^JO=*)u72az%EUJ~j1WK&yPTa8++u7OWjz6(;!d2!@SE%p=>;-S7i z`4jf_>ZrG`H^xyLYM8;ke*KF}AGAM(zRogIiH8$?#X!#Gy^PUcUt=dJ=dg%#2DDPv z_qBO1YI*vqdc3~An!nSCJ;n6lw>tJKTE{g}9d~FQ>-y@d!wJg>eO)`FpIkMY{LSdX zcGUUJCg@B?$#etVbw9=i7tB#!&x|%}otC@X9%V+gQxo<6dYP1fy}ZPGdpQ|}dZ0Ux zy?k(7=;d8Gt7+CU8_X!T^_ZmrxG{ZrSh)~K*(ybB^Psgz*;<7?+dKkn^4vuCjAj$b zM%~1{B;LEJ7e8I|(4zMYPc>85r) zJr`eh!Zq`r8DLkPmensFWd`;HT<`Sf;mTj=;jbThJv<$OdZ4?E9EL_Bc2B8#*z7+61c;J5I?W;SC%N>polMhYFA1C^1bnoz;_Sc!(8pz09XS8HBjp$ z-aw$%sXg;O{+`*%Z&Aj%^^7S2z?exHJ?~rMoyjb7lM#y~GhkM_P(1lO28`mIjx zLod!tL|!C=7v;KdOqEF;X!TGrmzX8P;kJ4_!snk`AFGkRFi=}(a9V!r>O zydb6Kb*cjgPuKZ}A9PyoH$=v{&jSYkKNuc_AV^^^zY-TrtfNnrOGq3ku(!aWp5+(^ tYi~6h;b>jg&KO;XaVeZ>@wYoJq`wg0zP7;s1;X{a=n+H^T(1q(_#dqP0(JlZ literal 0 HcmV?d00001 diff --git a/tests/functional/Makefile.am b/tests/functional/Makefile.am index 52737c21..5d2ac57a 100644 --- a/tests/functional/Makefile.am +++ b/tests/functional/Makefile.am @@ -19,7 +19,8 @@ TESTS=basic.sh \ wg.sh \ ssadetector.sh \ vlan.sh \ - nettisa.sh + nettisa.sh \ + mqtt.sh if WITH_QUIC TESTS+=\ @@ -50,6 +51,7 @@ EXTRA_DIST=common.sh \ nettisa.sh \ ssadetector.sh \ vlan.sh \ + mqtt.sh \ reference/basic \ reference/basicplus \ reference/pstats \ @@ -72,7 +74,8 @@ EXTRA_DIST=common.sh \ reference/quic \ reference/ssadetector \ reference/vlan \ - reference/nettisa + reference/nettisa \ + reference/mqtt clean-local: rm -rf output diff --git a/tests/functional/mqtt.sh b/tests/functional/mqtt.sh new file mode 100755 index 00000000..ff5ee910 --- /dev/null +++ b/tests/functional/mqtt.sh @@ -0,0 +1,8 @@ +#!/bin/sh + +test -z "$srcdir" && export srcdir=. + +. $srcdir/common.sh + +run_plugin_test mqtt "$pcap_dir/mqtt.pcap" + diff --git a/tests/functional/reference/mqtt b/tests/functional/reference/mqtt new file mode 100644 index 00000000..0c00ee30 --- /dev/null +++ b/tests/functional/reference/mqtt @@ -0,0 +1,6 @@ +91.121.93.94,172.28.8.57,205,56,0,2024-07-03T15:15:02.107896,2024-07-03T15:15:02.209210,00:00:00:00:00:00,00:15:5d:a0:07:5d,3,1,1883,60,16398,45601,0,2,0,0,4,6,25,24,"" +91.121.93.94,172.28.8.57,216,56,0,2024-07-03T15:15:02.230420,2024-07-03T15:15:02.331357,00:00:00:00:00:00,00:15:5d:a0:07:5d,3,1,1883,666,16399,57281,0,0,0,0,4,6,25,24,"" +91.121.93.94,172.28.8.57,263,168,0,2024-07-03T15:15:02.352824,2024-07-03T15:15:02.754817,00:00:00:00:00:00,00:15:5d:a0:07:5d,4,3,1883,60,16622,50195,0,2,0,5,4,6,24,24,"" +91.121.93.94,172.28.8.57,274,196,0,2024-07-03T15:15:02.776140,2024-07-03T15:15:03.177116,00:00:00:00:00:00,00:15:5d:a0:07:5d,4,3,1883,60,17166,36367,0,2,0,1,4,6,24,24,"" +91.121.93.94,172.28.8.57,82,56,0,2024-07-03T15:15:03.198739,2024-07-03T15:15:03.218922,00:00:00:00:00:00,00:15:5d:a0:07:5d,1,1,1883,60,6,38687,0,194,5,0,4,6,24,24,"" +ipaddr DST_IP,ipaddr SRC_IP,uint64 BYTES,uint64 BYTES_REV,uint64 LINK_BIT_FIELD,time TIME_FIRST,time TIME_LAST,macaddr DST_MAC,macaddr SRC_MAC,uint32 PACKETS,uint32 PACKETS_REV,uint16 DST_PORT,uint16 MQTT_KEEP_ALIVE,uint16 MQTT_TYPE_CUMULATIVE,uint16 SRC_PORT,uint8 DIR_BIT_FIELD,uint8 MQTT_CONNECTION_FLAGS,uint8 MQTT_CONNECTION_RETURN_CODE,uint8 MQTT_PUBLISH_FLAGS,uint8 MQTT_VERSION,uint8 PROTOCOL,uint8 TCP_FLAGS,uint8 TCP_FLAGS_REV,string MQTT_TOPICS \ No newline at end of file From bfe514ab1557d64ecf5f4c04d23c305f6d763068 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Wed, 3 Jul 2024 23:41:36 +0200 Subject: [PATCH 3/6] Change c++ version to c++17 --- Makefile.am | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/Makefile.am b/Makefile.am index 4169e0a5..3bb727d2 100644 --- a/Makefile.am +++ b/Makefile.am @@ -11,7 +11,7 @@ DISTCHECK_CONFIGURE_FLAGS="--with-systemdsystemunitdir=$$dc_install_base/$(syste ipfixprobe_LDFLAGS=-lpthread -ldl -latomic ipfixprobe_CFLAGS=-I$(srcdir)/include/ -fPIC -ipfixprobe_CXXFLAGS=-std=gnu++11 -Wno-write-strings -I$(srcdir)/include/ -fPIC +ipfixprobe_CXXFLAGS=-std=gnu++17 -Wno-write-strings -I$(srcdir)/include/ -fPIC if OS_CYGWIN ipfixprobe_CXXFLAGS+=-Wl,--export-all-symbols @@ -143,7 +143,9 @@ ipfixprobe_process_src=\ process/flow_hash.hpp \ process/flow_hash.cpp \ process/mpls.hpp \ - process/mpls.cpp + process/mpls.cpp \ + process/mqtt.hpp \ + process/mqtt.cpp if WITH_QUIC ipfixprobe_process_src+=\ From 8fa5bc7ad73f7d4d12bcf1ef6c9620bd15ed9ba8 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Thu, 4 Jul 2024 16:47:55 +0200 Subject: [PATCH 4/6] Add get_text() for mqtt plugin --- process/mqtt.hpp | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/process/mqtt.hpp b/process/mqtt.hpp index de752a67..05e03ae1 100644 --- a/process/mqtt.hpp +++ b/process/mqtt.hpp @@ -38,6 +38,7 @@ #include #include #include +#include "sstream" namespace ipxp { @@ -144,6 +145,18 @@ struct RecordExtMQTT : public RecordExt { return MQTT_UNIREC_TEMPLATE; } #endif + std::string get_text() const override + { + std::ostringstream out; + out << "type_cumulative=" << type_cumulative + << ",version=" << std::to_string(version) + << ",connection_flags=" << std::to_string(connection_flags) + << ",keep_alive=" << keep_alive + << ",connection_return_code=" << std::to_string(connection_return_code) + << ",publish_flags=" << std::to_string(publish_flags) + << ",topics=\"" << topics.str << "\""; + return out.str(); + } }; From 4e6a7006ba0a222303b42c8e1cbb33ff95553970 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Thu, 4 Jul 2024 16:59:55 +0200 Subject: [PATCH 5/6] Add MQTT plugin description to README --- README.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/README.md b/README.md index 2d19cf2a..75fbc3b3 100644 --- a/README.md +++ b/README.md @@ -352,6 +352,20 @@ List of unirec fields exported together with basic flow fields on interface by P | DNS_RR_TTL | uint32 | resource record TTL field | | DNS_IP | ipaddr | IP address from PTR, A or AAAA record | + +### MQTT +List of unirec fields exported together with basic flow fields on interface by MQTT plugin. + +| Output field | Type | Description | +|:-----------------------------:|:------:|:-----------------------------------------------------:| +| MQTT_TYPE_CUMULATIVE | uint16 | types of packets and session present flag cumulative | +| MQTT_VERSION | uint8 | MQTT version | +| MQTT_CONNECTION_FLAGS | uint8 | last CONNECT packet flags | +| MQTT_KEEP_ALIVE | uint16 | last CONNECT keep alive | +| MQTT_CONNECTION_RETURN_CODE | uint8 | last CONNECT return code | +| MQTT_PUBLISH_FLAGS | uint8 | cumulative of PUBLISH packet flags | +| MQTT_TOPICS | string | topics from PUBLISH packets headers | + ### SIP List of unirec fields exported together with basic flow fields on interface by SIP plugin. From a2e029f3313923143a15adca8af02ed67a61a845 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Tue, 16 Jul 2024 05:24:10 +0200 Subject: [PATCH 6/6] Fix code formatting --- process/mqtt.cpp | 153 ++++++++++++++++-------------------- process/mqtt.hpp | 201 ++++++++++++++++++++++++----------------------- 2 files changed, 173 insertions(+), 181 deletions(-) diff --git a/process/mqtt.cpp b/process/mqtt.cpp index ba7b2292..9eadda2c 100644 --- a/process/mqtt.cpp +++ b/process/mqtt.cpp @@ -1,29 +1,32 @@ /** -* \file mqtt.hpp -* \brief MQTT plugin for ipfixprobe -* \author Damir Zainullin -* \date 2024 + * \file mqtt.hpp + * \brief MQTT plugin for ipfixprobe + * \author Damir Zainullin + * \date 2024 */ /* -* Copyright (C) 2023 CESNET -* -* LICENSE TERMS -* -* Redistribution and use in source and binary forms, with or without -* modification, are permitted provided that the following conditions -* are met: -* 1. Redistributions of source code must retain the above copyright -* notice, this list of conditions and the following disclaimer. -* 2. Redistributions in binary form must reproduce the above copyright -* notice, this list of conditions and the following disclaimer in -* the documentation and/or other materials provided with the -* distribution. -* 3. Neither the name of the Company nor the names of its contributors -* may be used to endorse or promote products derived from this -* software without specific prior written permission. + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. */ + #include "mqtt.hpp" -#include "cstring" + +#include + #ifdef DEBUG_MQTT static const bool debug_mqtt = true; #else @@ -33,60 +36,67 @@ namespace ipxp { int RecordExtMQTT::REGISTERED_ID = -1; -__attribute__((constructor)) static void register_this_plugin(){ +__attribute__((constructor)) static void register_this_plugin() +{ static PluginRecord rec = PluginRecord("mqtt", []() { return new MQTTPlugin(); }); register_plugin(&rec); RecordExtMQTT::REGISTERED_ID = register_extension(); } -int MQTTPlugin::post_create(Flow &rec, const Packet &pkt){ +int MQTTPlugin::post_create(Flow& rec, const Packet& pkt) +{ if (has_mqtt_protocol_name(reinterpret_cast(pkt.payload), pkt.payload_len)) add_ext_mqtt(reinterpret_cast(pkt.payload), pkt.payload_len, rec); return 0; } -int MQTTPlugin::pre_update(Flow &rec, Packet &pkt){ +int MQTTPlugin::pre_update(Flow& rec, Packet& pkt) +{ const char* payload = reinterpret_cast(pkt.payload); RecordExt* ext = rec.get_extension(RecordExtMQTT::REGISTERED_ID); if (ext == nullptr) { return 0; - }else{ + } else { parse_mqtt(payload, pkt.payload_len, static_cast(ext)); } return 0; } /** - * \brief Read variable integer as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. - * \param [in] data Pointer to IP payload. - * \param [in] payload_len IP payload length. - * \param [in] last_byte Next after last read byte. - * \return Pair of read integer and bool. Bool is false in case read was unsuccessful. + * \brief Read variable integer as defined in + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. \param [in] data Pointer to IP + * payload. \param [in] payload_len IP payload length. \param [in] last_byte Next after last read + * byte. \return Pair of read integer and bool. Bool is false in case read was unsuccessful. */ -std::pair MQTTPlugin::read_variable_int(const char* data, int payload_len, uint32_t& last_byte)const noexcept{ +std::pair +MQTTPlugin::read_variable_int(const char* data, int payload_len, uint32_t& last_byte) const noexcept +{ uint32_t res = 0; bool next; - for( next = true; next && last_byte < (uint32_t)payload_len; last_byte++){ + for (next = true; next && last_byte < (uint32_t) payload_len; last_byte++) { res <<= 8; res |= data[last_byte]; next = (data[last_byte] & 0b1000'0000); } - return last_byte == (uint32_t)payload_len && next ? std::make_pair(0u, false) : std::make_pair(res, true); + return last_byte == (uint32_t) payload_len && next ? std::make_pair(0u, false) + : std::make_pair(res, true); } /** - * \brief Read utf8 encoded string as defined in http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. - * \param [in] data Pointer to IP payload. - * \param [in] payload_len IP payload length. - * \param [in] last_byte Next after last read byte. - * \return Tuple of read string, its length and bool. Bool is false in case read was unsuccessful. + * \brief Read utf8 encoded string as defined in + * http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html. \param [in] data Pointer to IP + * payload. \param [in] payload_len IP payload length. \param [in] last_byte Next after last read + * byte. \return Tuple of read string, its length and bool. Bool is false in case read was + * unsuccessful. */ -std::tuple MQTTPlugin::read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept{ - if (last_byte + 2 >= (uint32_t)payload_len) +std::tuple +MQTTPlugin::read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept +{ + if (last_byte + 2 >= (uint32_t) payload_len) return {0, {}, false}; - uint16_t string_length = ntohs(*(uint16_t*)&data[last_byte]); + uint16_t string_length = ntohs(*(uint16_t*) &data[last_byte]); last_byte += 2; - if (last_byte + string_length >= (uint32_t)payload_len) + if (last_byte + string_length >= (uint32_t) payload_len) return {0, {}, false}; return {string_length, std::string_view(&data[last_byte], string_length), true}; } @@ -98,12 +108,13 @@ std::tuple MQTTPlugin::read_utf8_string(const * \param [in,out] rec Record to write MQTT data in. * \return True if buffer contains set of valid mqtt packets. */ -bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept{ +bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept +{ if (payload_len <= 0) return false; uint32_t last_byte = 0; // Each tcp segment may contain more MQTT packets - while(last_byte < (uint32_t)payload_len) { + while (last_byte < (uint32_t) payload_len) { uint8_t type, flags; type = flags = data[last_byte++]; type >>= 4; @@ -119,15 +130,15 @@ bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* re auto first_byte_after_payload = remaining_length + last_byte; // Connect packet if (type == 1) { - if (!has_mqtt_protocol_name(data, payload_len)){ + if (!has_mqtt_protocol_name(data, payload_len)) { if constexpr (debug_mqtt) std::cout << "Connection packet doesn't have MQTT label" << std::endl; return false; } last_byte += 6; // Skip "MQTT" label(and its 2-byte length) rec->version = data[last_byte++]; - //Only MQTT v3.1.1 and v5.0 are supported - if (rec->version != 4 && rec->version != 5){ + // Only MQTT v3.1.1 and v5.0 are supported + if (rec->version != 4 && rec->version != 5) { if constexpr (debug_mqtt) std::cout << "Unsupported mqtt version" << std::endl; return false; @@ -163,62 +174,35 @@ bool MQTTPlugin::parse_mqtt(const char* data, int payload_len, RecordExtMQTT* re else if (type == 14) { flow_flush = true; } - // Read packet identifier - /*if ( (rec->type >= 3 && rec->type <= 11) || (rec->type == 3 && ((rec->flags >> 1) & 0b11)!= 0)){ - rec->id = *((const uint16_t*)(&data[last_byte])); last_byte += 2; - }*/ - - // Read properties (only for MQTT v5.0) - /*if (rec->type <= 11 || rec->type >= 14){ - rec->property_length = read_variable_int(data, payload_len, last_byte); - read_properties(data, payload_len, last_byte, rec); - }*/ last_byte = first_byte_after_payload; // Skip rest of payload } return true; } -int MQTTPlugin::post_update(Flow &rec, const Packet &pkt){ - if (flow_flush){ +int MQTTPlugin::post_update(Flow& rec, const Packet& pkt) +{ + if (flow_flush) { flow_flush = false; return FLOW_FLUSH; } return 0; } -void MQTTPlugin::pre_export(Flow &rec){ - if constexpr (debug_mqtt){ - RecordExtMQTT* ext = static_cast(rec.get_extension(RecordExtMQTT::REGISTERED_ID)); - if (ext == nullptr) - return; - /*std::cout << "MQTT Exports: type cumulative = " << std::to_string(ext->type_cumulative) - << ", version = " << std::to_string(ext->version) << ", con_flags = " - << std::to_string(ext->connection_flags) << ",keep_alive = " - << std::to_string(ext->keep_alive)<< ",\nsession_present= " - << std::to_string(ext->session_present_flag) << ", last return code= " - << std::to_string(ext->last_connect_return_code) << ", publish flags= " - << std::to_string(ext->publish_flags) << ",\npublished topics: "; - for(auto it = ext->topics.begin(); it != ext->topics.end(); it++) - std::cout << (it == ext->topics.begin() ? "" : ", ") << *it; - std::cout << std::endl;*/ - } - return; -} - /** * \brief Parse buffer to check if it contains MQTT packets. * \param [in] data Pointer to IP payload. * \param [in] payload_len IP payload length. * \return True if buffer starts with MQTT label as part of connection mqtt packet. */ -bool MQTTPlugin::has_mqtt_protocol_name(const char* data, int payload_len) const noexcept{ +bool MQTTPlugin::has_mqtt_protocol_name(const char* data, int payload_len) const noexcept +{ if (payload_len <= 1) return false; auto pos = 1u; - if (auto [_, success] = read_variable_int(data,payload_len, pos); !success) + if (auto [_, success] = read_variable_int(data, payload_len, pos); !success) return false; - auto [string_length, str, success] = read_utf8_string(data,payload_len, pos); + auto [string_length, str, success] = read_utf8_string(data, payload_len, pos); return success && str == "MQTT"; } @@ -228,16 +212,17 @@ void MQTTPlugin::add_ext_mqtt(const char* data, int payload_len, Flow& flow) recPrealloc = new RecordExtMQTT(); } if (!parse_mqtt(data, payload_len, recPrealloc)) - return ; + return; flow.add_extension(recPrealloc); recPrealloc = nullptr; } -void MQTTPlugin::init(const char *params){ +void MQTTPlugin::init(const char* params) +{ MQTTOptionsParser parser; try { parser.parse(params); - } catch (ParserError &e) { + } catch (ParserError& e) { throw PluginError(e.what()); } maximal_topic_count = parser.m_maximal_topic_count; diff --git a/process/mqtt.hpp b/process/mqtt.hpp index 05e03ae1..ae1c676a 100644 --- a/process/mqtt.hpp +++ b/process/mqtt.hpp @@ -1,74 +1,78 @@ /** -* \file mqtt.hpp -* \brief MQTT plugin for ipfixprobe -* \author Damir Zainullin -* \date 2024 -*/ + * \file mqtt.hpp + * \brief MQTT plugin for ipfixprobe + * \author Damir Zainullin + * \date 2024 + */ /* -* Copyright (C) 2023 CESNET -* -* LICENSE TERMS -* -* Redistribution and use in source and binary forms, with or without -* modification, are permitted provided that the following conditions -* are met: -* 1. Redistributions of source code must retain the above copyright -* notice, this list of conditions and the following disclaimer. -* 2. Redistributions in binary form must reproduce the above copyright -* notice, this list of conditions and the following disclaimer in -* the documentation and/or other materials provided with the -* distribution. -* 3. Neither the name of the Company nor the names of its contributors -* may be used to endorse or promote products derived from this -* software without specific prior written permission. -*/ + * Copyright (C) 2023 CESNET + * + * LICENSE TERMS + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of the Company nor the names of its contributors + * may be used to endorse or promote products derived from this + * software without specific prior written permission. + */ #ifndef CACHE_CPP_TPLUGIN_HPP #define CACHE_CPP_TPLUGIN_HPP - #ifdef WITH_NEMEA #include "fields.h" #endif -#include +#include "sstream" + +#include #include -#include -#include #include +#include +#include +#include #include #include -#include -#include "sstream" namespace ipxp { -#define MQTT_UNIREC_TEMPLATE "MQTT_TYPE_CUMULATIVE, MQTT_VERSION, MQTT_CONNECTION_FLAGS, MQTT_KEEP_ALIVE, MQTT_CONNECTION_RETURN_CODE, MQTT_PUBLISH_FLAGS, MQTT_TOPICS" +#define MQTT_UNIREC_TEMPLATE "MQTT_TYPE_CUMULATIVE, MQTT_VERSION, MQTT_CONNECTION_FLAGS,\ + MQTT_KEEP_ALIVE, MQTT_CONNECTION_RETURN_CODE, MQTT_PUBLISH_FLAGS, MQTT_TOPICS" -UR_FIELDS ( - uint16 MQTT_TYPE_CUMULATIVE, - uint8 MQTT_VERSION, - uint8 MQTT_CONNECTION_FLAGS, - uint16 MQTT_KEEP_ALIVE, - uint8 MQTT_CONNECTION_RETURN_CODE, - uint8 MQTT_PUBLISH_FLAGS, - string MQTT_TOPICS -) + UR_FIELDS( + uint16 MQTT_TYPE_CUMULATIVE, + uint8 MQTT_VERSION, + uint8 MQTT_CONNECTION_FLAGS, + uint16 MQTT_KEEP_ALIVE, + uint8 MQTT_CONNECTION_RETURN_CODE, + uint8 MQTT_PUBLISH_FLAGS, + string MQTT_TOPICS) -class MQTTOptionsParser : public OptionsParser -{ + class MQTTOptionsParser : public OptionsParser { public: - uint32_t m_maximal_topic_count; ///< Maximal count of topics from Publish packet header to store for each flow + uint32_t m_maximal_topic_count; ///< Maximal count of topics from Publish packet header to store + ///< for each flow - MQTTOptionsParser() : OptionsParser("mqtt", "Parse MQTT traffic"), m_maximal_topic_count(0) + MQTTOptionsParser() + : OptionsParser("mqtt", "Parse MQTT traffic") + , m_maximal_topic_count(0) { register_option( "tc", "topiccount", "count", - "Export first tc topics from Publish packet header. Topics are separated by #. Default value is 0.", - [this](const char *arg){try { + "Export first tc topics from Publish packet header. Topics are separated by #. Default " + "value is 0.", + [this](const char* arg) { + try { m_maximal_topic_count = str2num(arg); - } catch(std::invalid_argument &e) { + } catch (std::invalid_argument& e) { return false; } return true; @@ -79,35 +83,43 @@ class MQTTOptionsParser : public OptionsParser struct RecordExtMQTT : public RecordExt { static int REGISTERED_ID; - RecordExtMQTT() : - RecordExt(REGISTERED_ID), - type_cumulative(0), - version(0), - connection_flags(0), - keep_alive(0), - session_present_flag(false), - connection_return_code(0), - publish_flags(0) {} - - uint16_t type_cumulative; ///< Types of packets presented during communication and session present flag. DISCONNECT(1b) | PINGRESP(1b) | PINGREQ(1b) | UNSUBACK(1b) | UNSUBSCRIBE(1b) | SUBACK(1b) | SUBSCRIBE(1b) | PUBCOMP(1b) | PUBREL(1b) | PUBREC(1b) | PUBACK(1b) | PUBLISH(1b) | CONNACK(1b) | CONNECT(1b) | session present(1b) + RecordExtMQTT() + : RecordExt(REGISTERED_ID) + , type_cumulative(0) + , version(0) + , connection_flags(0) + , keep_alive(0) + , session_present_flag(false) + , connection_return_code(0) + , publish_flags(0) + { + } + + uint16_t type_cumulative; /**< Types of packets presented during communication and session + present flag. DISCONNECT(1b) | PINGRESP(1b) | PINGREQ(1b) | UNSUBACK(1b) | UNSUBSCRIBE(1b) | + SUBACK(1b) | SUBSCRIBE(1b) | PUBCOMP(1b) | PUBREL(1b) | PUBREC(1b) | PUBACK(1b) | PUBLISH(1b) | + CONNACK(1b) | CONNECT(1b) | session present(1b) */ uint8_t version; ///< Used version of MQTT from last connection packet - //Connect - uint8_t connection_flags; ///< Last connection flags: Username flag(1b) | Password flag(1b) | Will retain(1b) | Will QoS(2b) | Clean Session(1b) | 0(1b) + // Connect + uint8_t connection_flags; /**< Last connection flags: Username flag(1b) | Password flag(1b) + | Will retain(1b) | Will QoS(2b) | Clean Session(1b) | 0(1b) */ uint16_t keep_alive; ///< Last connection keep alive (seconds) - //CONNACK - bool session_present_flag; ///< Session present bit from last connack flags. First bit of type_cumulative + // CONNACK + bool session_present_flag; ///< Session present bit from last connack flags. First bit of + ///< type_cumulative uint8_t connection_return_code; ///< Value of last connection return code from CONNACK header - //PUBLISH + // PUBLISH uint8_t publish_flags; ///< Cumulative of Publish header flags struct { std::string str; uint32_t count = 0; - } topics; ///< Struct to keep all recorded and concatenated topics from Publish header and its count + } topics; ///< Struct to keep all recorded and concatenated topics from Publish header and its + ///< count - virtual int fill_ipfix(uint8_t *buffer, int size) + virtual int fill_ipfix(uint8_t* buffer, int size) { auto max_length = 8u + topics.str.size() + 3u; - if ((uint32_t)size < max_length) + if ((uint32_t) size < max_length) return -1; *(uint16_t*) (buffer) = ntohs(type_cumulative | session_present_flag); *(buffer + 2) = version; @@ -116,20 +128,20 @@ struct RecordExtMQTT : public RecordExt { *(buffer + 6) = connection_return_code; *(buffer + 7) = publish_flags; auto total_length = 8u; - total_length += variable2ipfix_buffer(buffer + total_length,(uint8_t*) topics.str.c_str(), topics.str.size()); + total_length += variable2ipfix_buffer( + buffer + total_length, + (uint8_t*) topics.str.c_str(), + topics.str.size()); return total_length; } - const char **get_ipfix_tmplt() const + const char** get_ipfix_tmplt() const { - static const char *ipfix_template[] = { - IPFIX_MQTT_TEMPLATE(IPFIX_FIELD_NAMES) - nullptr - }; + static const char* ipfix_template[] = {IPFIX_MQTT_TEMPLATE(IPFIX_FIELD_NAMES) nullptr}; return ipfix_template; } #ifdef WITH_NEMEA - void fill_unirec(ur_template_t *tmplt, void *record) override + void fill_unirec(ur_template_t* tmplt, void* record) override { ur_set(tmplt, record, F_MQTT_TYPE_CUMULATIVE, type_cumulative | session_present_flag); ur_set(tmplt, record, F_MQTT_VERSION, version); @@ -140,49 +152,44 @@ struct RecordExtMQTT : public RecordExt { ur_set_string(tmplt, record, F_MQTT_TOPICS, topics.str.c_str()); } - const char *get_unirec_tmplt() const - { - return MQTT_UNIREC_TEMPLATE; - } + const char* get_unirec_tmplt() const { return MQTT_UNIREC_TEMPLATE; } #endif std::string get_text() const override { std::ostringstream out; - out << "type_cumulative=" << type_cumulative - << ",version=" << std::to_string(version) + out << "type_cumulative=" << type_cumulative << ",version=" << std::to_string(version) << ",connection_flags=" << std::to_string(connection_flags) << ",keep_alive=" << keep_alive << ",connection_return_code=" << std::to_string(connection_return_code) - << ",publish_flags=" << std::to_string(publish_flags) - << ",topics=\"" << topics.str << "\""; + << ",publish_flags=" << std::to_string(publish_flags) << ",topics=\"" << topics.str + << "\""; return out.str(); } }; - - -class MQTTPlugin : public ProcessPlugin -{ +class MQTTPlugin : public ProcessPlugin { public: - int post_create(Flow &rec, const Packet &pkt) override; - int pre_update(Flow &rec, Packet &pkt) override; - void pre_export(Flow &rec) override; - int post_update(Flow &rec, const Packet &pkt) override; - RecordExt *get_ext() const { return new RecordExtMQTT(); } - OptionsParser *get_parser() const { return new MQTTOptionsParser(); } + int post_create(Flow& rec, const Packet& pkt) override; + int pre_update(Flow& rec, Packet& pkt) override; + int post_update(Flow& rec, const Packet& pkt) override; + RecordExt* get_ext() const { return new RecordExtMQTT(); } + OptionsParser* get_parser() const { return new MQTTOptionsParser(); } std::string get_name() const { return "mqtt"; } - ProcessPlugin *copy(); - void init(const char *params) override; + ProcessPlugin* copy(); + void init(const char* params) override; + private: bool flow_flush = false; ///< Tell storage plugin to flush current Flow. - uint32_t maximal_topic_count = 0; ///< Maximal count of topics from Publish packet header to store for each flow - RecordExtMQTT *recPrealloc; ///< Preallocated extension. + uint32_t maximal_topic_count + = 0; ///< Maximal count of topics from Publish packet header to store for each flow + RecordExtMQTT* recPrealloc; ///< Preallocated extension. bool parse_mqtt(const char* data, int payload_len, RecordExtMQTT* rec) noexcept; - void add_ext_mqtt(const char *data, int payload_len, Flow &flow); - std::pair read_variable_int(const char* data, int payload_len, uint32_t& last_byte)const noexcept; - std::tuple read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept; + void add_ext_mqtt(const char* data, int payload_len, Flow& flow); + std::pair + read_variable_int(const char* data, int payload_len, uint32_t& last_byte) const noexcept; + std::tuple + read_utf8_string(const char* data, int payload_len, uint32_t& last_byte) const noexcept; bool has_mqtt_protocol_name(const char* data, int payload_len) const noexcept; - }; } // namespace ipxp