Skip to content

Commit

Permalink
Merge branch 'tcp-bugfix-decoder' into 'master'
Browse files Browse the repository at this point in the history
TCP input: fix bug when detecting decoder of connection

See merge request monitoring/ipfixcol2!19
  • Loading branch information
sedmicha committed Sep 8, 2024
2 parents f259d54 + c8b105d commit 6380631
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 39 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ endif()
# Versions and other informations
set(IPFIXCOL_VERSION_MAJOR 2)
set(IPFIXCOL_VERSION_MINOR 7)
set(IPFIXCOL_VERSION_PATCH 0)
set(IPFIXCOL_VERSION_PATCH 1)
set(IPFIXCOL_VERSION
${IPFIXCOL_VERSION_MAJOR}.${IPFIXCOL_VERSION_MINOR}.${IPFIXCOL_VERSION_PATCH})

Expand Down
7 changes: 2 additions & 5 deletions src/plugins/input/tcp/src/Acceptor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,18 @@
#include <ipfixcol2.h> // ipx_ctx_t, ipx_strerror, IPX_CTX_WARNING

#include "ClientManager.hpp" // ClientManager
#include "DecoderFactory.hpp" // DecoderFactory
#include "Config.hpp" // Config
#include "UniqueFd.hpp" // UniqueFd
#include "IpAddress.hpp" // IpAddress, IpVersion

namespace tcp_in {

Acceptor::Acceptor(ClientManager &clients, DecoderFactory factory, ipx_ctx_t *ctx) :
Acceptor::Acceptor(ClientManager &clients, ipx_ctx_t *ctx) :
m_epoll(),
m_sockets(),
m_pipe_in(),
m_pipe_out(),
m_clients(clients),
m_factory(std::move(factory)),
m_thread(),
m_ctx(ctx)
{
Expand Down Expand Up @@ -231,8 +229,7 @@ void Acceptor::mainloop() {
}

try {
auto decoder = m_factory.detect_decoder(new_sd.get());
m_clients.add_connection(std::move(new_sd), std::move(decoder));
m_clients.add_connection(std::move(new_sd));
} catch (std::exception &ex) {
IPX_CTX_ERROR(m_ctx, "Acceptor: %s", ex.what());
}
Expand Down
4 changes: 1 addition & 3 deletions src/plugins/input/tcp/src/Acceptor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,10 @@ class Acceptor {
* @brief Creates the acceptor thread.
*
* @param clients Reference to client manager.
* @param factory Initialized decoder factory.
* @param config File configuration.
* @param ctx The plugin context.
*/
Acceptor(ClientManager &clients, DecoderFactory factory, ipx_ctx_t *ctx);
Acceptor(ClientManager &clients, ipx_ctx_t *ctx);

// force that acceptor stays in its original memory (so that `this` pointer stays valid on the
// other thread)
Expand Down Expand Up @@ -79,7 +78,6 @@ class Acceptor {

/** Accepted clients. */
ClientManager &m_clients;
DecoderFactory m_factory;
std::thread m_thread;
ipx_ctx_t *m_ctx;
};
Expand Down
15 changes: 5 additions & 10 deletions src/plugins/input/tcp/src/ClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@
#include <ipfixcol2.h> // ipx_strerror, ipx_ctx_t, ipx_session, IPX_CTX_INFO, IPX_CTX_WARNING

#include "Connection.hpp" // Connection
#include "Decoder.hpp" // Decoder
#include "UniqueFd.hpp" // UniqueFd

namespace tcp_in {

ClientManager::ClientManager(ipx_ctx_t *ctx) :
ClientManager::ClientManager(ipx_ctx_t *ctx, DecoderFactory factory) :
m_ctx(ctx),
m_epoll(),
m_mutex(),
m_connections()
m_connections(),
m_factory(std::move(factory))
{}

void ClientManager::add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) {
void ClientManager::add_connection(UniqueFd fd) {
const char *err_str;

// get the flags and set it to non-blocking mode
Expand All @@ -57,17 +57,12 @@ void ClientManager::add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder

int borrowed_fd = fd.get();

std::unique_ptr<Connection> connection(new Connection(std::move(fd), std::move(decoder)));
std::unique_ptr<Connection> connection(new Connection(std::move(fd), m_factory, m_ctx));

auto net = &connection->get_session()->tcp.net;
std::array<char, INET6_ADDRSTRLEN> src_addr_str{};
inet_ntop(net->l3_proto, &net->addr_src, src_addr_str.begin(), src_addr_str.size());
IPX_CTX_INFO(m_ctx, "New exporter connected from '%s'.", src_addr_str.begin());
IPX_CTX_INFO(
m_ctx,
"Using %s Decoder for the new connection",
connection->get_decoder().get_name()
);

std::lock_guard<std::mutex> lock(m_mutex);

Expand Down
8 changes: 4 additions & 4 deletions src/plugins/input/tcp/src/ClientManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <ipfixcol2.h> // ipx_session, ipx_ctx_t

#include "Connection.hpp" // Connection
#include "Decoder.hpp" // Decoder
#include "DecoderFactory.hpp" // Decoder
#include "UniqueFd.hpp" // UniqueFd
#include "Epoll.hpp" // Epoll

Expand All @@ -31,15 +31,14 @@ class ClientManager {
* @brief Creates client manager with no clients.
* @throws when fails to create epoll
*/
ClientManager(ipx_ctx_t *ctx);
ClientManager(ipx_ctx_t *ctx, DecoderFactory factory);

/**
* @brief Adds connection to the vector and epoll.
* @param fd file descriptor of the new tcp connection.
* @param decoder decoder for the connection.
* @throws when fails to add the new connection to epoll or when fails to create new session.
*/
void add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder);
void add_connection(UniqueFd fd);

/**
* @brief Removes connection from the vector based on its session. This is safe only for the
Expand Down Expand Up @@ -69,6 +68,7 @@ class ClientManager {
Epoll m_epoll;
std::mutex m_mutex;
std::vector<std::unique_ptr<Connection>> m_connections;
DecoderFactory m_factory;
};

} // namespace tcp_in
Expand Down
23 changes: 16 additions & 7 deletions src/plugins/input/tcp/src/Connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@

namespace tcp_in {

Connection::Connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) :
m_session(nullptr),
Connection::Connection(UniqueFd fd, DecoderFactory &factory, ipx_ctx *ctx) :
m_fd(std::move(fd)),
m_new_connnection(true),
m_decoder(std::move(decoder))
m_factory(factory),
m_ctx(ctx)
{
if (!m_decoder) {
throw std::runtime_error("Decoder was null.");
}
const char *err_str;

sockaddr_storage src_addr;
Expand Down Expand Up @@ -96,6 +92,19 @@ Connection::Connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) :
}

bool Connection::receive(ipx_ctx_t *ctx) {
if (!m_decoder) {
m_decoder = m_factory.detect_decoder(m_fd.get());
if (!m_decoder) {
return true;
}

IPX_CTX_INFO(
m_ctx,
"Using %s Decoder for the new connection",
m_decoder->get_name()
);
}

auto &buffer = m_decoder->decode();
buffer.process_decoded([=](ByteVector &&msg) { send_msg(ctx, std::move(msg)); });
return !buffer.is_eof_reached();
Expand Down
16 changes: 11 additions & 5 deletions src/plugins/input/tcp/src/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "ByteVector.hpp" // ByteVector
#include "Decoder.hpp" // Decoder
#include "DecoderFactory.hpp"
#include "UniqueFd.hpp" // UniqueFd

namespace tcp_in {
Expand All @@ -26,10 +27,10 @@ class Connection {
/**
* @brief Creates new connection with this TCP connection file descriptor.
* @param fd File descriptor of the new tcp connection.
* @param decoder Decoder to use in this connection.
* @param factory The decoder factory to decide the decoder of this connection
* @throws when fails to create new session
*/
Connection(UniqueFd fd, std::unique_ptr<Decoder> decoder);
Connection(UniqueFd fd, DecoderFactory& factory, ipx_ctx_t *ctx);

Connection(const Connection &) = delete;

Expand Down Expand Up @@ -69,13 +70,18 @@ class Connection {
private:
void send_msg(ipx_ctx_t *ctx, ByteVector &&msg);

ipx_session *m_session;
/** TCP file descriptor */
UniqueFd m_fd;
/** Decoder factory */
DecoderFactory &m_factory;
/** Plugin context for logging */
ipx_ctx_t *m_ctx;
/** The session identifier */
ipx_session *m_session = nullptr;
/** true if this connection didn't receive any full messages, otherwise false. */
bool m_new_connnection;
bool m_new_connnection = true;
/** selected decoder or nullptr. */
std::unique_ptr<Decoder> m_decoder;
std::unique_ptr<Decoder> m_decoder = nullptr;
};

} // namespace tcp_in
7 changes: 6 additions & 1 deletion src/plugins/input/tcp/src/DecoderFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ std::unique_ptr<Decoder> DecoderFactory::detect_decoder(int fd) {

std::array<uint8_t, MAX_MAGIC_LEN> buf{};

auto res = recv(fd, buf.begin(), buf.size(), MSG_PEEK | MSG_WAITALL);
auto res = recv(fd, buf.begin(), buf.size(), MSG_PEEK | MSG_DONTWAIT);
if (res == EAGAIN || res == EWOULDBLOCK) {
// Not enough data yet
return nullptr;
}

if (res == -1) {
const char *err_msg;
ipx_strerror(errno, err_msg);
Expand Down
3 changes: 2 additions & 1 deletion src/plugins/input/tcp/src/DecoderFactory.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class DecoderFactory {
* constructs it. This function may block if the decoder cannot be determined without recieving
* more data.
* @param fd TCP stream file descriptor
* @return Instance of the correct decoder, nullptr no decoder matches the data.
* @throws std::runtime_error on socket error or if no decoder matches the data
* @return Instance of the correct decoder, nullptr if there is not enough data to decide the decoder yet
*/
std::unique_ptr<Decoder> detect_decoder(int fd);

Expand Down
4 changes: 2 additions & 2 deletions src/plugins/input/tcp/src/Plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ namespace tcp_in {

Plugin::Plugin(ipx_ctx_t *ctx, Config &config) :
m_ctx(ctx),
m_clients(ctx),
m_acceptor(m_clients, DecoderFactory(), ctx)
m_clients(ctx, DecoderFactory()),
m_acceptor(m_clients, ctx)
{
m_acceptor.bind_addresses(config);
m_acceptor.start();
Expand Down

0 comments on commit 6380631

Please sign in to comment.