Skip to content

Commit

Permalink
Add semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
1technophile committed Nov 8, 2023
1 parent bd3219f commit 5656a5e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 17 deletions.
3 changes: 3 additions & 0 deletions main/User_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ const byte mac[] = {0xDE, 0xED, 0xBA, 0xFE, 0x54, 0x95}; //W5100 ethernet shield
#ifndef GeneralTimeOut
# define GeneralTimeOut 20 // time out if a task is stuck in seconds (should be more than TimeBetweenReadingRN8209/1000) and more than 3 seconds, the WDT will reset the ESP, used also for MQTT connection
#endif
#ifndef queueTimeout
# define queueTimeout 5000 // time out for the queue in milliseconds
#endif

#if defined(ESP8266) || defined(ESP32)
// Uncomment to use a device running TheengsGateway to decode BLE data. (https://github.com/theengs/gateway)
Expand Down
68 changes: 51 additions & 17 deletions main/main.ino
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ static String ota_server_cert = "";
# include <nvs.h>
# include <nvs_flash.h>

SemaphoreHandle_t jsonQueueMutex = xSemaphoreCreateMutex();
bool ProcessLock = false; // Process lock when we want to use a critical function like OTA for example
# if !defined(NO_INT_TEMP_READING)
// ESP32 internal temperature reading
Expand Down Expand Up @@ -426,18 +427,47 @@ void pubMainCore(JsonObject& data) {
// Add a document to the queue
void enqueueJsonObject(const StaticJsonDocument<JSON_MSG_BUFFER>& jsonDoc) {
if (jsonDoc.size() == 0) {
Log.error(F("Empty JSON, not enqueued" CR));
Log.error(F("Empty JSON, not processed" CR));
return;
}
#ifdef ESP32
unsigned long startMillis = millis();
unsigned long currentMillis;
bool queueFull = true;

while (true) {
if (xSemaphoreTake(jsonQueueMutex, pdMS_TO_TICKS(queueTimeout)) == pdTRUE) {
queueFull = (jsonQueue.size() >= QueueSize);
xSemaphoreGive(jsonQueueMutex);

if (!queueFull) break;

currentMillis = millis();
if (currentMillis - startMillis >= queueTimeout) {
Log.error(F("Wait queueTimeout, queue still full. Exiting." CR));
yield();
return;
}
} else {
Log.error(F("Failed to take jsonQueueMutex" CR));
return;
}
}

Log.trace(F("Enqueue JSON" CR));
#if defined(ESP8266) || defined(ESP32) || defined(__AVR_ATmega2560__) || defined(__AVR_ATmega1280__)
JsonBundle bundle;
bundle.doc = jsonDoc;
jsonQueue.push(bundle);
Log.trace(F("Queue length: %d" CR), jsonQueue.size());

if (xSemaphoreTake(jsonQueueMutex, pdMS_TO_TICKS(queueTimeout)) == pdTRUE) {
jsonQueue.push(bundle);
xSemaphoreGive(jsonQueueMutex);
} else {
Log.error(F("Failed to take jsonQueueMutex" CR));
}
#else
// Pub to main core
JsonObject jsonObj = jsonDoc.to<JsonObject>();
StaticJsonDocument<JSON_MSG_BUFFER> mutableJsonDoc = jsonDoc;
JsonObject jsonObj = mutableJsonDoc.to<JsonObject>();
pubMainCore(jsonObj); // Arduino UNO or other boards with small memory, we don't store and directly publish
#endif
}
Expand Down Expand Up @@ -484,20 +514,24 @@ void buildTopicFromId(JsonObject& Jsondata, const char* origin) {

// Empty the documents queue
void emptyQueue() {
while (true) {
JsonBundle bundle;

if (jsonQueue.empty()) {
break;
}
Log.trace(F("Dequeue JSON" CR));
bundle = jsonQueue.front();
jsonQueue.pop();
//Empty queue
while (jsonQueue.empty()) {
JsonBundle bundle;
if (xSemaphoreTake(jsonQueueMutex, pdMS_TO_TICKS(10)) == pdTRUE) {
Log.trace(F("Dequeue JSON" CR));
bundle = jsonQueue.front();
jsonQueue.pop();
xSemaphoreGive(jsonQueueMutex);
} else {
Log.error(F("Failed to take jsonQueueMutex" CR));
continue;
}

JsonObject obj = bundle.doc.as<JsonObject>();
pubMainCore(obj);
queueLengthSum++;
}
JsonObject obj = bundle.doc.as<JsonObject>();
pubMainCore(obj);
queueLengthSum++;
}
}
#else
void emptyQueue() {}
Expand Down

0 comments on commit 5656a5e

Please sign in to comment.