Skip to content

Commit b49f82a

Browse files
author
fvanroie
committed
mqtt_enqueue_message
1 parent 75af4ae commit b49f82a

File tree

1 file changed

+39
-31
lines changed

1 file changed

+39
-31
lines changed

src/mqtt/hasp_mqtt_esp.cpp

Lines changed: 39 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ int mqttQos = 0;
6464
esp_mqtt_client_handle_t mqttClient;
6565
static esp_mqtt_client_config_t mqtt_cfg;
6666

67-
extern const uint8_t rootca_crt_bundle_start[] asm("_binary_data_cert_x509_crt_bundle_bin_start");
68-
extern const uint8_t rootca_crt_bundle_end[] asm("_binary_data_cert_x509_crt_bundle_bin_end");
67+
// extern const uint8_t rootca_crt_bundle_start[] asm("_binary_data_cert_x509_crt_bundle_bin_start");
68+
// extern const uint8_t rootca_crt_bundle_end[] asm("_binary_data_cert_x509_crt_bundle_bin_end");
6969

7070
bool last_mqtt_state = false;
7171
bool current_mqtt_state = false;
@@ -104,7 +104,7 @@ void mqtt_run_scripts()
104104
void mqtt_disconnected()
105105
{
106106
current_mqtt_state = false; // now we are disconnected
107-
mqtt_run_scripts();
107+
// mqtt_run_scripts();
108108
mqtt_reconnect_counter++;
109109
}
110110

@@ -115,7 +115,7 @@ void mqtt_connected()
115115
current_mqtt_state = true; // now we are connected
116116
LOG_VERBOSE(TAG_MQTT, F("%s"), current_mqtt_state ? PSTR(D_SERVICE_CONNECTED) : PSTR(D_SERVICE_DISCONNECTED));
117117
}
118-
mqtt_run_scripts();
118+
// mqtt_run_scripts();
119119
}
120120

121121
int mqttPublish(const char* topic, const char* payload, size_t len, bool retain)
@@ -190,6 +190,36 @@ static inline size_t mqtt_msg_length(size_t len)
190190
return (len / 64) * 64 + 64;
191191
}
192192

193+
void mqtt_enqueue_message(const char* topic, const char* payload, size_t payload_len)
194+
{
195+
// Add new message to the queue
196+
mqtt_message_t data;
197+
198+
size_t topic_len = strlen(topic);
199+
data.topic = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(topic_len + 1));
200+
data.payload = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(payload_len + 1));
201+
202+
if(!data.topic || !data.payload) {
203+
LOG_ERROR(TAG_MQTT_RCV, D_ERROR_OUT_OF_MEMORY);
204+
hasp_free(data.topic);
205+
hasp_free(data.payload);
206+
return;
207+
}
208+
memcpy(data.topic, topic, topic_len);
209+
memcpy(data.payload, payload, payload_len);
210+
211+
{
212+
size_t attempt = 0;
213+
while(xQueueSend(queue, &data, (TickType_t)0) == errQUEUE_FULL && attempt < 100) {
214+
delay(5);
215+
attempt++;
216+
};
217+
if(attempt >= 100) {
218+
LOG_ERROR(TAG_MQTT_RCV, D_ERROR_OUT_OF_MEMORY);
219+
}
220+
}
221+
}
222+
193223
void mqtt_process_topic_payload(const char* topic, const char* payload, unsigned int length)
194224
{
195225
if(gui_acquire(pdMS_TO_TICKS(30))) {
@@ -198,30 +228,7 @@ void mqtt_process_topic_payload(const char* topic, const char* payload, unsigned
198228
dispatch_topic_payload(topic, payload, length > 0, TAG_MQTT);
199229
gui_release();
200230
} else {
201-
// Add new message to the queue
202-
mqtt_message_t data;
203-
204-
size_t topic_len = strlen(topic);
205-
size_t payload_len = length;
206-
data.topic = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(topic_len + 1));
207-
data.payload = (char*)hasp_calloc(sizeof(char), mqtt_msg_length(payload_len + 1));
208-
209-
if(!data.topic || !data.payload) {
210-
LOG_ERROR(TAG_MQTT_RCV, D_ERROR_OUT_OF_MEMORY);
211-
hasp_free(data.topic);
212-
hasp_free(data.payload);
213-
return;
214-
}
215-
memcpy(data.topic, topic, topic_len);
216-
memcpy(data.payload, payload, payload_len);
217-
218-
{
219-
size_t attempt = 0;
220-
while(xQueueSend(queue, &data, (TickType_t)0) == errQUEUE_FULL && attempt < 100) {
221-
delay(5);
222-
attempt++;
223-
};
224-
}
231+
mqtt_enqueue_message(topic, payload, length);
225232
}
226233
}
227234

@@ -353,7 +360,7 @@ void onMqttConnect(esp_mqtt_client_handle_t client)
353360
String subtopic = F(MQTT_TOPIC_CUSTOM "/#");
354361
mqttSubscribeTo(mqttGroupCommandTopic + subtopic);
355362
mqttSubscribeTo(mqttNodeCommandTopic + subtopic);
356-
#endif
363+
#endif
357364

358365
/* Home Assistant auto-configuration */
359366
#ifdef HASP_USE_HA
@@ -465,8 +472,8 @@ static esp_err_t mqtt_event_handler(esp_mqtt_event_handle_t event)
465472
void mqttSetup()
466473
{
467474
queue = xQueueCreate(64, sizeof(mqtt_message_t));
468-
//esp_crt_bundle_set(rootca_crt_bundle_start, rootca_crt_bundle_end-rootca_crt_bundle_start);
469-
arduino_esp_crt_bundle_set(rootca_crt_bundle_start);
475+
// esp_crt_bundle_set(rootca_crt_bundle_start, rootca_crt_bundle_end-rootca_crt_bundle_start);
476+
// arduino_esp_crt_bundle_set(rootca_crt_bundle_start);
470477
mqttStart();
471478
}
472479

@@ -489,6 +496,7 @@ IRAM_ATTR void mqttLoop(void)
489496

490497
void mqttEvery5Seconds(bool networkIsConnected)
491498
{
499+
mqtt_run_scripts();
492500
// if(mqttEnabled && networkIsConnected && !mqttClientConnected) {
493501
// LOG_TRACE(TAG_MQTT, F(D_MQTT_RECONNECTING));
494502
// mqttStart();

0 commit comments

Comments
 (0)