From 5656a5eb8ec6f7c9272be5b0d443be8ca6d1c0b6 Mon Sep 17 00:00:00 2001 From: Florian <1technophile@users.noreply.github.com> Date: Wed, 8 Nov 2023 08:51:57 -0600 Subject: [PATCH] Add semaphore --- main/User_config.h | 3 ++ main/main.ino | 68 ++++++++++++++++++++++++++++++++++------------ 2 files changed, 54 insertions(+), 17 deletions(-) diff --git a/main/User_config.h b/main/User_config.h index 744030842e..5802a72219 100644 --- a/main/User_config.h +++ b/main/User_config.h @@ -180,6 +180,9 @@ const byte mac[] = {0xDE, 0xED, 0xBA, 0xFE, 0x54, 0x95}; //W5100 ethernet shield #ifndef GeneralTimeOut # define GeneralTimeOut 20 // time out if a task is stuck in seconds (should be more than TimeBetweenReadingRN8209/1000) and more than 3 seconds, the WDT will reset the ESP, used also for MQTT connection #endif +#ifndef queueTimeout +# define queueTimeout 5000 // time out for the queue in milliseconds +#endif #if defined(ESP8266) || defined(ESP32) // Uncomment to use a device running TheengsGateway to decode BLE data. (https://github.com/theengs/gateway) diff --git a/main/main.ino b/main/main.ino index 19814bc0fe..024040f7af 100644 --- a/main/main.ino +++ b/main/main.ino @@ -264,6 +264,7 @@ static String ota_server_cert = ""; # include # include +SemaphoreHandle_t jsonQueueMutex = xSemaphoreCreateMutex(); bool ProcessLock = false; // Process lock when we want to use a critical function like OTA for example # if !defined(NO_INT_TEMP_READING) // ESP32 internal temperature reading @@ -426,18 +427,47 @@ void pubMainCore(JsonObject& data) { // Add a document to the queue void enqueueJsonObject(const StaticJsonDocument& jsonDoc) { if (jsonDoc.size() == 0) { - Log.error(F("Empty JSON, not enqueued" CR)); + Log.error(F("Empty JSON, not processed" CR)); return; } +#ifdef ESP32 + unsigned long startMillis = millis(); + unsigned long currentMillis; + bool queueFull = true; + + while (true) { + if (xSemaphoreTake(jsonQueueMutex, pdMS_TO_TICKS(queueTimeout)) == pdTRUE) { + queueFull = (jsonQueue.size() >= QueueSize); + xSemaphoreGive(jsonQueueMutex); + + if (!queueFull) break; + + currentMillis = millis(); + if (currentMillis - startMillis >= queueTimeout) { + Log.error(F("Wait queueTimeout, queue still full. Exiting." CR)); + yield(); + return; + } + } else { + Log.error(F("Failed to take jsonQueueMutex" CR)); + return; + } + } + Log.trace(F("Enqueue JSON" CR)); -#if defined(ESP8266) || defined(ESP32) || defined(__AVR_ATmega2560__) || defined(__AVR_ATmega1280__) JsonBundle bundle; bundle.doc = jsonDoc; - jsonQueue.push(bundle); - Log.trace(F("Queue length: %d" CR), jsonQueue.size()); + + if (xSemaphoreTake(jsonQueueMutex, pdMS_TO_TICKS(queueTimeout)) == pdTRUE) { + jsonQueue.push(bundle); + xSemaphoreGive(jsonQueueMutex); + } else { + Log.error(F("Failed to take jsonQueueMutex" CR)); + } #else // Pub to main core - JsonObject jsonObj = jsonDoc.to(); + StaticJsonDocument mutableJsonDoc = jsonDoc; + JsonObject jsonObj = mutableJsonDoc.to(); pubMainCore(jsonObj); // Arduino UNO or other boards with small memory, we don't store and directly publish #endif } @@ -484,20 +514,24 @@ void buildTopicFromId(JsonObject& Jsondata, const char* origin) { // Empty the documents queue void emptyQueue() { - while (true) { - JsonBundle bundle; - if (jsonQueue.empty()) { - break; - } - Log.trace(F("Dequeue JSON" CR)); - bundle = jsonQueue.front(); - jsonQueue.pop(); + //Empty queue + while (jsonQueue.empty()) { + JsonBundle bundle; + if (xSemaphoreTake(jsonQueueMutex, pdMS_TO_TICKS(10)) == pdTRUE) { + Log.trace(F("Dequeue JSON" CR)); + bundle = jsonQueue.front(); + jsonQueue.pop(); + xSemaphoreGive(jsonQueueMutex); + } else { + Log.error(F("Failed to take jsonQueueMutex" CR)); + continue; + } - JsonObject obj = bundle.doc.as(); - pubMainCore(obj); - queueLengthSum++; - } + JsonObject obj = bundle.doc.as(); + pubMainCore(obj); + queueLengthSum++; + } } #else void emptyQueue() {}