Skip to content

Commit

Permalink
KafkaPort test stubs
Browse files Browse the repository at this point in the history
  • Loading branch information
Neil Stephens committed Jul 14, 2024
1 parent db15542 commit 760fd4d
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Code/Ports/KafkaPort/KafkaPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ void KafkaPort::Disable()

void KafkaPort::ProcessElements(const Json::Value& JSONRoot)
{
if(!JSONRoot.isObject()) return;

auto pConf = static_cast<KafkaPortConf*>(this->pConf.get());
//JSON member NativeKafkaProperties has all the kafka property strings that are directly passed to the kafka stack
if(JSONRoot.isMember("NativeKafkaProperties"))
Expand Down
29 changes: 28 additions & 1 deletion Code/Ports/KafkaPort/KafkaProducerPort.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,51 @@
#include "KafkaProducerPort.h"
#include "KafkaPortConf.h"
#include <kafka/KafkaProducer.h>
#include <opendatacon/spdlog.h>

void KafkaProducerPort::Build()
{
//create a kafka producer
auto pConf = static_cast<KafkaPortConf*>(this->pConf.get());

if(!pConf->NativeKafkaProperties.contains("bootstrap.servers"))
{
pConf->NativeKafkaProperties.put("bootstrap.servers", "localhost:9092");
if(auto log = spdlog::get("KafkaPort"))
log->error("bootstrap.servers property not found, defaulting to localhost:9092");
}

if(pConf->NativeKafkaProperties.getProperty("enable.manual.events.poll") == "false")
if(auto log = spdlog::get("KafkaPort"))
log->warn("enable.manual.events.poll property is set to false, forcing to true");
pConf->NativeKafkaProperties.put("enable.manual.events.poll", "true");

pConf->NativeKafkaProperties.put("error_cb", [this](const kafka::Error& error)
{
if(auto log = spdlog::get("KafkaPort"))
log->error("{}: {}",Name,error.toString());
});

pConf->NativeKafkaProperties.put("log_cb", [this](int level, const char* filename, int lineno, const char* msg)
{
auto spdlog_lvl = spdlog::level::level_enum(6-level);
if(auto log = spdlog::get("KafkaPort"))
log->log(spdlog_lvl,"{} ({}:{}): {}",Name,filename,lineno,msg);
});

pConf->NativeKafkaProperties.put("stats_cb", [this](const std::string& jsonString)
{
if(auto log = spdlog::get("KafkaPort"))
log->info("{}: Statistics: {}",Name,jsonString);
});

//TODO: consider also forcing enable.idempotence=true depending on the retry model
// see https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#idempotent-producer
//TODO: consider also setting the acks property to "all" depending on the retry model

//TODO: use a factory function to create the producer or return an existing one
// depending on the configuration, ports could share a producer or have their own
pKafkaProducer = std::make_shared<KCP::KafkaProducer>(pConf->NativeKafkaProperties); // <--- TODO: check if this can throw
pKafkaProducer = std::make_shared<KCP::KafkaProducer>(pConf->NativeKafkaProperties); // <--- FIXME: this can throw - catch it, and at least log it

//TODO: set up a polling loop using asio to call pKafkaProducer->pollEvents() at a regular interval
// to ensure we get callbacks in the case there's no following Event
Expand Down
4 changes: 4 additions & 0 deletions Code/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ if(LUAPORT)
add_subdirectory(LuaPort_tests)
endif()

if(KAFKAPORT)
add_subdirectory(KafkaPort_tests)
endif()

if(FULL)
add_subdirectory(Integration_tests)
endif()
87 changes: 87 additions & 0 deletions Code/tests/KafkaPort_tests/Basic.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Created on: 14/07/2024
* Author: Neil Stephens <dearknarl@gmail.com>
*/
#include "Helpers.h"
#include "../PortLoader.h"
#include "../ThreadPool.h"
#include <catch.hpp>
#include <opendatacon/asio.h>
#include <thread>

#define SUITE(name) "KafkaPortBasicsTestSuite - " name

TEST_CASE(SUITE("ConstructBuildEnableDisableDestroy"))
{
TestSetup();
auto portlib = LoadModule(GetLibFileName("KafkaPort"));
REQUIRE(portlib);

//Go through the typical life cycle of a port, albeit short
{
newptr newPort = GetPortCreator(portlib, "KafkaProducer");
REQUIRE(newPort);
delptr deletePort = GetPortDestroyer(portlib, "KafkaProducer");
REQUIRE(deletePort);

//TODO: put something in the config
std::shared_ptr<DataPort> PUT(newPort("PortUnderTest", "", ""), deletePort);

PUT->Build();

ThreadPool thread_pool(1);

PUT->Enable();
PUT->Disable();
}

TestTearDown();
UnLoadModule(portlib);
}

TEST_CASE(SUITE("ConstructBuildEnableDestroy"))
{
TestSetup();
auto portlib = LoadModule(GetLibFileName("KafkaPort"));
REQUIRE(portlib);

//Test the destruction of an enabled port (happens in case of exception)
{
newptr newPort = GetPortCreator(portlib, "KafkaProducer");
REQUIRE(newPort);
delptr deletePort = GetPortDestroyer(portlib, "KafkaProducer");
REQUIRE(deletePort);

//TODO: put something in the config
std::shared_ptr<DataPort> PUT(newPort("PortUnderTest", "", ""), deletePort);

PUT->Build();

ThreadPool thread_pool(1);

PUT->Enable();
PUT.reset();
}

TestTearDown();
UnLoadModule(portlib);
}
29 changes: 29 additions & 0 deletions Code/tests/KafkaPort_tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# opendatacon
#
# Copyright (c) 2014:
#
# DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
# yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
project(KafkaPort_tests)

file(GLOB ${PROJECT_NAME}_SRC *.cpp *.h ../PortLoader.cpp ../PortLoader.h)

add_executable(${PROJECT_NAME} ${${PROJECT_NAME}_SRC})
target_link_libraries(${PROJECT_NAME} ODC ${DL})

install(TARGETS ${PROJECT_NAME} RUNTIME DESTINATION ${INSTALLDIR_BINS})
set_target_properties(${PROJECT_NAME} PROPERTIES FOLDER tests)
add_test(NAME CTEST_${PROJECT_NAME} COMMAND ${PROJECT_NAME})
52 changes: 52 additions & 0 deletions Code/tests/KafkaPort_tests/CatchStart.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*/
#define CATCH_CONFIG_RUNNER

#include "Helpers.h"
#include <opendatacon/util.h>
#include <catch.hpp>

spdlog::level::level_enum log_level = spdlog::level::off;

int main( int argc, char* argv[] )
{
int new_argc = argc;
char** new_argv = argv;
if (argc > 1)
{
std::string level_str = argv[1];
log_level = spdlog::level::from_str(level_str);
if (log_level == spdlog::level::off && level_str != "off")
{
std::cout << "KafkaPort tests: optional log level as first arg. Choose from:" << std::endl;
for (uint8_t i = 0; i < 7; i++)
std::cout << spdlog::level::level_string_views[i].data() << std::endl;
}
else
{
new_argc = argc - 1;
new_argv = argv + 1;
}
}

return Catch::Session().run(new_argc, new_argv);
}
64 changes: 64 additions & 0 deletions Code/tests/KafkaPort_tests/EndToEnd.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
*/
#include "Helpers.h"
#include "../PortLoader.h"
#include "../ThreadPool.h"
#include <catch.hpp>
#include <opendatacon/asio.h>
#include <thread>
#include <map>

#define SUITE(name) "KafkaPortEndToEndTestSuite - " name

TEST_CASE(SUITE("Produce"))
{
TestSetup();

auto portlib = LoadModule(GetLibFileName("KafkaPort"));
REQUIRE(portlib);
{
newptr newPort = GetPortCreator(portlib, "KafkaProducer");
REQUIRE(newPort);
delptr deletePort = GetPortDestroyer(portlib, "KafkaProducer");
REQUIRE(deletePort);

//TODO: spin up a kafka cluster

//TODO: put something in the config
std::shared_ptr<DataPort> PUT(newPort("PortUnderTest", "", ""), deletePort);
PUT->Build();

ThreadPool thread_pool(1);

PUT->Enable();

//TODO: load a message into the port

//TODO: spin up a consumer to check the message was received

PUT->Disable();
}
//Unload the library
UnLoadModule(portlib);
TestTearDown();
}

60 changes: 60 additions & 0 deletions Code/tests/KafkaPort_tests/Helpers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/* opendatacon
*
* Copyright (c) 2014:
*
* DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi
* yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA==
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Helpers.h
*
* Created on: 14/07/2024
* Author: Neil Stephens <dearknarl@gmail.com>
*/

#ifndef TESTHELPERS_H
#define TESTHELPERS_H

#include "opendatacon/Platform.h"
#include "opendatacon/util.h"
#include <spdlog/sinks/stdout_color_sinks.h>

extern spdlog::level::level_enum log_level;

inline void TestSetup()
{
auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
auto pLibLogger = std::make_shared<spdlog::logger>("KafkaPort", console_sink);
pLibLogger->set_level(log_level);
odc::spdlog_register_logger(pLibLogger);

// We need an opendatacon logger to catch config file parsing errors
auto pODCLogger = std::make_shared<spdlog::logger>("opendatacon", console_sink);
pODCLogger->set_level(log_level);
odc::spdlog_register_logger(pODCLogger);

static std::once_flag once_flag;
std::call_once(once_flag,[]()
{
InitLibaryLoading();
});
}
inline void TestTearDown()
{
odc::spdlog_drop_all(); // Close off everything
}

#endif // TESTHELPERS_H

0 comments on commit 760fd4d

Please sign in to comment.