From 4aa47fe74dac2cb36e926bc3a2c0f15a5714eb78 Mon Sep 17 00:00:00 2001
From: Neil Stephens <knarl@ratbert.lan>
Date: Sun, 21 Jul 2024 12:21:21 +1000
Subject: [PATCH] synchronise calls to KafkaClientCache::GetClient()

---
 Code/Ports/KafkaPort/KafkaClientCache.h | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/Code/Ports/KafkaPort/KafkaClientCache.h b/Code/Ports/KafkaPort/KafkaClientCache.h
index 0680e4c1..003c85f2 100644
--- a/Code/Ports/KafkaPort/KafkaClientCache.h
+++ b/Code/Ports/KafkaPort/KafkaClientCache.h
@@ -3,6 +3,7 @@
 #include <memory>
 #include <unordered_map>
 #include <string>
+#include <mutex>
 
 class KafkaClientCache
 {
@@ -42,11 +43,10 @@ class KafkaClientCache
 		}
 	}
 
-public:
-
 	// Clean up any expired clients
 	void Clean()
 	{
+		//we know mutex is locked, because this is only called from GetClient
 		std::set<std::string> to_delete;
 		for(auto& client : clients)
 		{
@@ -60,10 +60,14 @@ class KafkaClientCache
 		}
 	}
 
+public:
+
 	// Factory method to get a client from the cache, or create a new one if it doesn't exist
 	template<typename ClientType>
 	std::shared_ptr<ClientType> GetClient(const std::string& client_key, const kafka::Properties& properties, const size_t MaxPollIntervalms = std::numeric_limits<size_t>::max())
 	{
+		std::lock_guard lock(mtx);
+
 		if(auto client = clients[client_key].lock())
 		{
 			MaxPollTime(client_key, MaxPollIntervalms);
@@ -74,6 +78,7 @@ class KafkaClientCache
 		clients[client_key] = new_client;
 		poll_timers[client_key] = {std::numeric_limits<size_t>::max(),nullptr};
 		MaxPollTime(client_key, MaxPollIntervalms);
+		Clean();
 		return new_client;
 	}
 
@@ -107,6 +112,7 @@ class KafkaClientCache
 	}
 
 private:
+	std::mutex mtx;
 	std::unordered_map<std::string, std::weak_ptr<kafka::clients::KafkaClient>> clients;
 	std::unordered_map<std::string, std::pair<size_t,std::shared_ptr<asio::steady_timer>>> poll_timers;
 };
\ No newline at end of file