diff --git a/Code/Ports/KafkaPort/KafkaPort.cpp b/Code/Ports/KafkaPort/KafkaPort.cpp index ebcf8466..38473c9b 100644 --- a/Code/Ports/KafkaPort/KafkaPort.cpp +++ b/Code/Ports/KafkaPort/KafkaPort.cpp @@ -46,6 +46,8 @@ void KafkaPort::Disable() void KafkaPort::ProcessElements(const Json::Value& JSONRoot) { + if(!JSONRoot.isObject()) return; + auto pConf = static_cast(this->pConf.get()); //JSON member NativeKafkaProperties has all the kafka property strings that are directly passed to the kafka stack if(JSONRoot.isMember("NativeKafkaProperties")) diff --git a/Code/Ports/KafkaPort/KafkaProducerPort.cpp b/Code/Ports/KafkaPort/KafkaProducerPort.cpp index 2e545a42..a713ca4f 100644 --- a/Code/Ports/KafkaPort/KafkaProducerPort.cpp +++ b/Code/Ports/KafkaPort/KafkaProducerPort.cpp @@ -27,24 +27,51 @@ #include "KafkaProducerPort.h" #include "KafkaPortConf.h" #include +#include void KafkaProducerPort::Build() { //create a kafka producer auto pConf = static_cast(this->pConf.get()); + if(!pConf->NativeKafkaProperties.contains("bootstrap.servers")) + { + pConf->NativeKafkaProperties.put("bootstrap.servers", "localhost:9092"); + if(auto log = spdlog::get("KafkaPort")) + log->error("bootstrap.servers property not found, defaulting to localhost:9092"); + } + if(pConf->NativeKafkaProperties.getProperty("enable.manual.events.poll") == "false") if(auto log = spdlog::get("KafkaPort")) log->warn("enable.manual.events.poll property is set to false, forcing to true"); pConf->NativeKafkaProperties.put("enable.manual.events.poll", "true"); + pConf->NativeKafkaProperties.put("error_cb", [this](const kafka::Error& error) + { + if(auto log = spdlog::get("KafkaPort")) + log->error("{}: {}",Name,error.toString()); + }); + + pConf->NativeKafkaProperties.put("log_cb", [this](int level, const char* filename, int lineno, const char* msg) + { + auto spdlog_lvl = spdlog::level::level_enum(6-level); + if(auto log = spdlog::get("KafkaPort")) + log->log(spdlog_lvl,"{} ({}:{}): {}",Name,filename,lineno,msg); + }); + + pConf->NativeKafkaProperties.put("stats_cb", [this](const std::string& jsonString) + { + if(auto log = spdlog::get("KafkaPort")) + log->info("{}: Statistics: {}",Name,jsonString); + }); + //TODO: consider also forcing enable.idempotence=true depending on the retry model // see https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#idempotent-producer //TODO: consider also setting the acks property to "all" depending on the retry model //TODO: use a factory function to create the producer or return an existing one // depending on the configuration, ports could share a producer or have their own - pKafkaProducer = std::make_shared(pConf->NativeKafkaProperties); // <--- TODO: check if this can throw + pKafkaProducer = std::make_shared(pConf->NativeKafkaProperties); // <--- FIXME: this can throw - catch it, and at least log it //TODO: set up a polling loop using asio to call pKafkaProducer->pollEvents() at a regular interval // to ensure we get callbacks in the case there's no following Event diff --git a/Code/tests/CMakeLists.txt b/Code/tests/CMakeLists.txt index be152105..9fbc5e53 100644 --- a/Code/tests/CMakeLists.txt +++ b/Code/tests/CMakeLists.txt @@ -51,6 +51,10 @@ if(LUAPORT) add_subdirectory(LuaPort_tests) endif() +if(KAFKAPORT) +add_subdirectory(KafkaPort_tests) +endif() + if(FULL) add_subdirectory(Integration_tests) endif() diff --git a/Code/tests/KafkaPort_tests/Basic.cpp b/Code/tests/KafkaPort_tests/Basic.cpp new file mode 100644 index 00000000..5ca651dd --- /dev/null +++ b/Code/tests/KafkaPort_tests/Basic.cpp @@ -0,0 +1,87 @@ +/* opendatacon + * + * Copyright (c) 2014: + * + * DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi + * yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA== + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Created on: 14/07/2024 + * Author: Neil Stephens + */ +#include "Helpers.h" +#include "../PortLoader.h" +#include "../ThreadPool.h" +#include +#include +#include + +#define SUITE(name) "KafkaPortBasicsTestSuite - " name + +TEST_CASE(SUITE("ConstructBuildEnableDisableDestroy")) +{ + TestSetup(); + auto portlib = LoadModule(GetLibFileName("KafkaPort")); + REQUIRE(portlib); + + //Go through the typical life cycle of a port, albeit short + { + newptr newPort = GetPortCreator(portlib, "KafkaProducer"); + REQUIRE(newPort); + delptr deletePort = GetPortDestroyer(portlib, "KafkaProducer"); + REQUIRE(deletePort); + + //TODO: put something in the config + std::shared_ptr PUT(newPort("PortUnderTest", "", ""), deletePort); + + PUT->Build(); + + ThreadPool thread_pool(1); + + PUT->Enable(); + PUT->Disable(); + } + + TestTearDown(); + UnLoadModule(portlib); +} + +TEST_CASE(SUITE("ConstructBuildEnableDestroy")) +{ + TestSetup(); + auto portlib = LoadModule(GetLibFileName("KafkaPort")); + REQUIRE(portlib); + + //Test the destruction of an enabled port (happens in case of exception) + { + newptr newPort = GetPortCreator(portlib, "KafkaProducer"); + REQUIRE(newPort); + delptr deletePort = GetPortDestroyer(portlib, "KafkaProducer"); + REQUIRE(deletePort); + + //TODO: put something in the config + std::shared_ptr PUT(newPort("PortUnderTest", "", ""), deletePort); + + PUT->Build(); + + ThreadPool thread_pool(1); + + PUT->Enable(); + PUT.reset(); + } + + TestTearDown(); + UnLoadModule(portlib); +} diff --git a/Code/tests/KafkaPort_tests/CMakeLists.txt b/Code/tests/KafkaPort_tests/CMakeLists.txt new file mode 100644 index 00000000..8106555e --- /dev/null +++ b/Code/tests/KafkaPort_tests/CMakeLists.txt @@ -0,0 +1,29 @@ +# opendatacon + # + # Copyright (c) 2014: + # + # DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi + # yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA== + # + # Licensed under the Apache License, Version 2.0 (the "License"); + # you may not use this file except in compliance with the License. + # You may obtain a copy of the License at + # + # http://www.apache.org/licenses/LICENSE-2.0 + # + # Unless required by applicable law or agreed to in writing, software + # distributed under the License is distributed on an "AS IS" BASIS, + # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + # See the License for the specific language governing permissions and + # limitations under the License. + # +project(KafkaPort_tests) + +file(GLOB ${PROJECT_NAME}_SRC *.cpp *.h ../PortLoader.cpp ../PortLoader.h) + +add_executable(${PROJECT_NAME} ${${PROJECT_NAME}_SRC}) +target_link_libraries(${PROJECT_NAME} ODC ${DL}) + +install(TARGETS ${PROJECT_NAME} RUNTIME DESTINATION ${INSTALLDIR_BINS}) +set_target_properties(${PROJECT_NAME} PROPERTIES FOLDER tests) +add_test(NAME CTEST_${PROJECT_NAME} COMMAND ${PROJECT_NAME}) diff --git a/Code/tests/KafkaPort_tests/CatchStart.cpp b/Code/tests/KafkaPort_tests/CatchStart.cpp new file mode 100644 index 00000000..8f88cf9a --- /dev/null +++ b/Code/tests/KafkaPort_tests/CatchStart.cpp @@ -0,0 +1,52 @@ +/* opendatacon + * + * Copyright (c) 2014: + * + * DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi + * yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA== + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + */ +#define CATCH_CONFIG_RUNNER + +#include "Helpers.h" +#include +#include + +spdlog::level::level_enum log_level = spdlog::level::off; + +int main( int argc, char* argv[] ) +{ + int new_argc = argc; + char** new_argv = argv; + if (argc > 1) + { + std::string level_str = argv[1]; + log_level = spdlog::level::from_str(level_str); + if (log_level == spdlog::level::off && level_str != "off") + { + std::cout << "KafkaPort tests: optional log level as first arg. Choose from:" << std::endl; + for (uint8_t i = 0; i < 7; i++) + std::cout << spdlog::level::level_string_views[i].data() << std::endl; + } + else + { + new_argc = argc - 1; + new_argv = argv + 1; + } + } + + return Catch::Session().run(new_argc, new_argv); +} diff --git a/Code/tests/KafkaPort_tests/EndToEnd.cpp b/Code/tests/KafkaPort_tests/EndToEnd.cpp new file mode 100644 index 00000000..adc2122d --- /dev/null +++ b/Code/tests/KafkaPort_tests/EndToEnd.cpp @@ -0,0 +1,64 @@ +/* opendatacon + * + * Copyright (c) 2014: + * + * DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi + * yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA== + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + */ +#include "Helpers.h" +#include "../PortLoader.h" +#include "../ThreadPool.h" +#include +#include +#include +#include + +#define SUITE(name) "KafkaPortEndToEndTestSuite - " name + +TEST_CASE(SUITE("Produce")) +{ + TestSetup(); + + auto portlib = LoadModule(GetLibFileName("KafkaPort")); + REQUIRE(portlib); + { + newptr newPort = GetPortCreator(portlib, "KafkaProducer"); + REQUIRE(newPort); + delptr deletePort = GetPortDestroyer(portlib, "KafkaProducer"); + REQUIRE(deletePort); + + //TODO: spin up a kafka cluster + + //TODO: put something in the config + std::shared_ptr PUT(newPort("PortUnderTest", "", ""), deletePort); + PUT->Build(); + + ThreadPool thread_pool(1); + + PUT->Enable(); + + //TODO: load a message into the port + + //TODO: spin up a consumer to check the message was received + + PUT->Disable(); + } + //Unload the library + UnLoadModule(portlib); + TestTearDown(); +} + diff --git a/Code/tests/KafkaPort_tests/Helpers.h b/Code/tests/KafkaPort_tests/Helpers.h new file mode 100644 index 00000000..bb82baee --- /dev/null +++ b/Code/tests/KafkaPort_tests/Helpers.h @@ -0,0 +1,60 @@ +/* opendatacon + * + * Copyright (c) 2014: + * + * DCrip3fJguWgVCLrZFfA7sIGgvx1Ou3fHfCxnrz4svAi + * yxeOtDhDCXf1Z4ApgXvX5ahqQmzRfJ2DoX8S05SqHA== + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/* + * Helpers.h + * + * Created on: 14/07/2024 + * Author: Neil Stephens + */ + +#ifndef TESTHELPERS_H +#define TESTHELPERS_H + +#include "opendatacon/Platform.h" +#include "opendatacon/util.h" +#include + +extern spdlog::level::level_enum log_level; + +inline void TestSetup() +{ + auto console_sink = std::make_shared(); + auto pLibLogger = std::make_shared("KafkaPort", console_sink); + pLibLogger->set_level(log_level); + odc::spdlog_register_logger(pLibLogger); + + // We need an opendatacon logger to catch config file parsing errors + auto pODCLogger = std::make_shared("opendatacon", console_sink); + pODCLogger->set_level(log_level); + odc::spdlog_register_logger(pODCLogger); + + static std::once_flag once_flag; + std::call_once(once_flag,[]() + { + InitLibaryLoading(); + }); +} +inline void TestTearDown() +{ + odc::spdlog_drop_all(); // Close off everything +} + +#endif // TESTHELPERS_H +