diff --git a/headers/irismqttclient.h b/headers/irismqttclient.h index 49a1afe..2bfa5ef 100644 --- a/headers/irismqttclient.h +++ b/headers/irismqttclient.h @@ -8,14 +8,22 @@ #include #include "headers/topic.h" +struct MessageRecord { + std::string topic; + std::string payload; +}; + class IrisMQTTClient : public mosqpp::mosquittopp { private: std::vector* topics; // Pointer to topics list pqxx::connection* conn; std::unordered_map topic_map; + int batch_size; + std::vector id_list; + std::vector message_buffer; public: - IrisMQTTClient(const char *id, const char *host, const int port, const int timeout, std::vector* topics_list, pqxx::connection* conn); + IrisMQTTClient(const char *id, const char *host, const int port, const int timeout, std::vector* topics_list, const int batchSize, pqxx::connection* conn); void on_message(const struct mosquitto_message *message) override; diff --git a/iris.cpp b/iris.cpp index a06e241..85b6e80 100644 --- a/iris.cpp +++ b/iris.cpp @@ -118,6 +118,12 @@ int main() { std::string postgresqluser = load_string_value_from_config("./config.json", "postgresqluser"); std::string postgresqlpassword = load_string_value_from_config("./config.json", "postgresqlpassword"); + // Load the batch size + int batchSize = load_int_value_from_config("./config.json", "batchsize"); + + // Ensure it's at least 1 + batchSize = std::max(batchSize, 1); + // The MQTT settings are required if (mqtthost == "" || mqttport == 0 || mqtttimeout == 0) { return 1; @@ -162,7 +168,7 @@ int main() { // Initialize MQTT mosqpp::lib_init(); std::string unique_client_id = generate_unique_mqtt_client_id(); - IrisMQTTClient client(unique_client_id.c_str(), mqtthost.c_str(), mqttport, mqtttimeout, &topics, conn); + IrisMQTTClient client(unique_client_id.c_str(), mqtthost.c_str(), mqttport, mqtttimeout, &topics, batchSize, conn); // Subscribe to each topic from the config file for (const auto &topic : topics) { diff --git a/src/irismqttclient.cpp b/src/irismqttclient.cpp index 092d342..0d1cbf7 100644 --- a/src/irismqttclient.cpp +++ b/src/irismqttclient.cpp @@ -96,8 +96,8 @@ void insert_message(pqxx::connection* conn, const std::string& topic, const std: } -IrisMQTTClient::IrisMQTTClient(const char *id, const char *host, int port, int timeout, std::vector *topics_list, pqxx::connection* dbconn) - : mosquittopp(id), topics(topics_list), conn(dbconn) { +IrisMQTTClient::IrisMQTTClient(const char *id, const char *host, int port, int timeout, std::vector *topics_list, int batchSize, pqxx::connection* dbconn) + : mosquittopp(id), topics(topics_list), batch_size(batchSize), conn(dbconn) { connect(host, port, timeout); std::cout << "Connected to queue" << std::endl; @@ -117,7 +117,7 @@ void IrisMQTTClient::on_message(const struct mosquitto_message *message) { try { aes_key = topic_map.at(message_topic).aes_key; // ✅ Throws if key is missing } catch (const std::out_of_range&) { - std::cerr << "Error: topic not found in topic_map\n"; + // We don't actually have to do anything here } if (aes_key != "") { @@ -126,5 +126,37 @@ void IrisMQTTClient::on_message(const struct mosquitto_message *message) { std::cout << message_topic << ": " << message_payload << std::endl; - insert_message(conn, message_topic, message_payload); + MessageRecord rec; + rec.topic = message_topic; + rec.payload = message_payload; + + if (batch_size == 1) { + insert_message(conn, message_topic, message_payload); + } + else { + message_buffer.push_back(std::move(rec)); + + if ((int)message_buffer.size() >= batch_size) { + // Bulk insert all of them in one transaction: + pqxx::work txn(*conn); + { + // Build an INSERT … VALUES (…) query with multiple rows. + std::ostringstream sql; + sql << "INSERT INTO messages (topic, payload) VALUES "; + for (size_t i = 0; i < message_buffer.size(); ++i) { + auto &r = message_buffer[i]; + sql << "(" + << txn.quote(r.topic) << ", " + << txn.quote(r.payload) + << ")"; + if (i + 1 < message_buffer.size()) sql << ","; + } + txn.exec0(sql.str()); + } + txn.commit(); + + // Clear the buffer for the next batch: + message_buffer.clear(); + } + } } \ No newline at end of file