Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion headers/irismqttclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@
#include <mosquittopp.h>
#include "headers/topic.h"

struct MessageRecord {
std::string topic;
std::string payload;
};

class IrisMQTTClient : public mosqpp::mosquittopp {
private:
std::vector<Topic>* topics; // Pointer to topics list
pqxx::connection* conn;
std::unordered_map<std::string, Topic> topic_map;
int batch_size;
std::vector<int> id_list;
std::vector<MessageRecord> message_buffer;

public:
IrisMQTTClient(const char *id, const char *host, const int port, const int timeout, std::vector<Topic>* topics_list, pqxx::connection* conn);
IrisMQTTClient(const char *id, const char *host, const int port, const int timeout, std::vector<Topic>* topics_list, const int batchSize, pqxx::connection* conn);

void on_message(const struct mosquitto_message *message) override;

Expand Down
8 changes: 7 additions & 1 deletion iris.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ int main() {
std::string postgresqluser = load_string_value_from_config("./config.json", "postgresqluser");
std::string postgresqlpassword = load_string_value_from_config("./config.json", "postgresqlpassword");

// Load the batch size
int batchSize = load_int_value_from_config("./config.json", "batchsize");

// Ensure it's at least 1
batchSize = std::max(batchSize, 1);

// The MQTT settings are required
if (mqtthost == "" || mqttport == 0 || mqtttimeout == 0) {
return 1;
Expand Down Expand Up @@ -162,7 +168,7 @@ int main() {
// Initialize MQTT
mosqpp::lib_init();
std::string unique_client_id = generate_unique_mqtt_client_id();
IrisMQTTClient client(unique_client_id.c_str(), mqtthost.c_str(), mqttport, mqtttimeout, &topics, conn);
IrisMQTTClient client(unique_client_id.c_str(), mqtthost.c_str(), mqttport, mqtttimeout, &topics, batchSize, conn);

// Subscribe to each topic from the config file
for (const auto &topic : topics) {
Expand Down
40 changes: 36 additions & 4 deletions src/irismqttclient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ void insert_message(pqxx::connection* conn, const std::string& topic, const std:
}


IrisMQTTClient::IrisMQTTClient(const char *id, const char *host, int port, int timeout, std::vector<Topic> *topics_list, pqxx::connection* dbconn)
: mosquittopp(id), topics(topics_list), conn(dbconn) {
IrisMQTTClient::IrisMQTTClient(const char *id, const char *host, int port, int timeout, std::vector<Topic> *topics_list, int batchSize, pqxx::connection* dbconn)
: mosquittopp(id), topics(topics_list), batch_size(batchSize), conn(dbconn) {
connect(host, port, timeout);

std::cout << "Connected to queue" << std::endl;
Expand All @@ -117,7 +117,7 @@ void IrisMQTTClient::on_message(const struct mosquitto_message *message) {
try {
aes_key = topic_map.at(message_topic).aes_key; // ✅ Throws if key is missing
} catch (const std::out_of_range&) {
std::cerr << "Error: topic not found in topic_map\n";
// We don't actually have to do anything here
}

if (aes_key != "") {
Expand All @@ -126,5 +126,37 @@ void IrisMQTTClient::on_message(const struct mosquitto_message *message) {

std::cout << message_topic << ": " << message_payload << std::endl;

insert_message(conn, message_topic, message_payload);
MessageRecord rec;
rec.topic = message_topic;
rec.payload = message_payload;

if (batch_size == 1) {
insert_message(conn, message_topic, message_payload);
}
else {
message_buffer.push_back(std::move(rec));

if ((int)message_buffer.size() >= batch_size) {
// Bulk insert all of them in one transaction:
pqxx::work txn(*conn);
{
// Build an INSERT … VALUES (…) query with multiple rows.
std::ostringstream sql;
sql << "INSERT INTO messages (topic, payload) VALUES ";
for (size_t i = 0; i < message_buffer.size(); ++i) {
auto &r = message_buffer[i];
sql << "("
<< txn.quote(r.topic) << ", "
<< txn.quote(r.payload)
<< ")";
if (i + 1 < message_buffer.size()) sql << ",";
}
txn.exec0(sql.str());
}
txn.commit();

// Clear the buffer for the next batch:
message_buffer.clear();
}
}
}