diff --git a/CMakeLists.txt b/CMakeLists.txt index 18e4f22..e4b4db7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -3,6 +3,7 @@ cmake_minimum_required(VERSION 3.0) set(SRCS "src/rbdns.cpp" "src/rbjson.cpp" + "src/rbprotocol.cpp" "src/rbprotocoludp.cpp" "src/rbprotocolws.cpp" "src/rbtcp.cpp" diff --git a/library.json b/library.json index e89909f..ce9ad03 100644 --- a/library.json +++ b/library.json @@ -14,7 +14,7 @@ "maintainer": true } ], - "version": "11.3.1", + "version": "12.0.0", "frameworks": ["espidf", "arduino"], "platforms": "espressif32" } diff --git a/src/rbdns.cpp b/src/rbdns.cpp index cd3bdee..1f000e0 100644 --- a/src/rbdns.cpp +++ b/src/rbdns.cpp @@ -105,11 +105,18 @@ ssize_t DnsServer::receivePacket(std::vector& buff, struct sockaddr_in ssize_t msg_size; while(true) { - msg_size = recvfrom(m_socket, (void*)buff.data(), buff.size(), MSG_PEEK, NULL, NULL); + msg_size = recvfrom(m_socket, (void*)buff.data(), buff.size(), MSG_PEEK | MSG_DONTWAIT, NULL, NULL); if (msg_size < 0) { const auto err = errno; - if (err == EBADF) + if (err == EAGAIN) { + vTaskDelay(pdMS_TO_TICKS(30)); + continue; + } + + if (err == EBADF) { return -EBADF; + } + ESP_LOGE(TAG, "error in recvfrom: %d %s!", err, strerror(err)); return -1; } @@ -169,7 +176,7 @@ uint8_t *DnsServer::parseDnsName(uint8_t *src_data, size_t maxlen, std::string& ssize_t DnsServer::processDnsQuestion(std::vector& buff, ssize_t req_size) { dns_header_t *header = (dns_header_t *)buff.data(); - ESP_LOGE(TAG, "DNS query with header id: 0x%X, flags: 0x%X, qd_count: %d", + ESP_LOGD(TAG, "DNS query with header id: 0x%X, flags: 0x%X, qd_count: %d", ntohs(header->id), ntohs(header->flags), ntohs(header->qd_count)); // Not a standard query @@ -213,7 +220,7 @@ ssize_t DnsServer::processDnsQuestion(std::vector& buff, ssize_t req_si uint16_t qd_type = ntohs(question->type); uint16_t qd_class = ntohs(question->clazz); - ESP_LOGE(TAG, "Received type: %d | Class: %d | Question for: %s", qd_type, qd_class, hostname.c_str()); + ESP_LOGD(TAG, "Received type: %d | Class: %d | Question for: %s", qd_type, qd_class, hostname.c_str()); if (qd_type != QD_TYPE_A) { cur_question_ptr = name_end_ptr + sizeof(dns_question_t); @@ -227,7 +234,7 @@ ssize_t DnsServer::processDnsQuestion(std::vector& buff, ssize_t req_si answer->clazz = htons(qd_class); answer->ttl = htonl(ANS_TTL_SEC); - ESP_LOGE(TAG, "Answer with PTR offset: 0x%" PRIX16 " and IP 0x%" PRIX32, ntohs(answer->ptr_offset), cur_esp_ip); + ESP_LOGD(TAG, "Answer with PTR offset: 0x%" PRIX16 " and IP 0x%" PRIX32, ntohs(answer->ptr_offset), cur_esp_ip); /* TODO: add support for custom DNS entries, now it always returns ESP's IP. diff --git a/src/rbprotocol.cpp b/src/rbprotocol.cpp new file mode 100644 index 0000000..7981056 --- /dev/null +++ b/src/rbprotocol.cpp @@ -0,0 +1,453 @@ +#include + +#include "rbprotocol.h" +#include "rbprotocoludp.h" +#include "rbprotocolws.h" + +#define RBPROT_TAG "RbProtocol" + +#define MS_TO_TICKS(ms) ((portTICK_PERIOD_MS <= ms) ? (ms / portTICK_PERIOD_MS) : 1) + +#define MUST_ARRIVE_TIMER_PERIOD MS_TO_TICKS(100) +#define MUST_ARRIVE_ATTEMPTS 15 + +namespace rb { + +using namespace rb::internal; + +Protocol::Protocol(const char* owner, const char* name, const char* description, Protocol::callback_t callback) { + m_owner = owner; + m_name = name; + m_desc = description; + m_callback = callback; + + m_sendQueue = xQueueCreate(32, sizeof(QueueItem)); + + m_read_counter = 0; + m_write_counter = 0; + + m_mustarrive_e = 0; + m_mustarrive_f = 0xFFFFFFFF; + + m_task_send = nullptr; + m_task_recv = nullptr; + + m_udp = nullptr; + m_ws = nullptr; + + memset(&m_possessed_addr, 0, sizeof(m_possessed_addr)); +} + +Protocol::~Protocol() { + stop(); + vQueueDelete(m_sendQueue); +} + +esp_err_t Protocol::start(const ProtocolConfig& cfg) { + std::lock_guard lock(m_mutex); + if (m_task_send != nullptr) { + return ESP_ERR_INVALID_STATE; + } + + if(!cfg.enable_udp && !cfg.enable_ws) { + ESP_LOGE(RBPROT_TAG, "One of enable_udp, enable_ws must be true!"); + return ESP_ERR_INVALID_ARG; + } + + std::unique_ptr udp; + std::unique_ptr ws; + esp_err_t err; + + if(cfg.enable_udp) { + udp.reset(new ProtBackendUdp()); + err = udp->start(cfg.udp_port); + if(err != ESP_OK) { + return err; + } + } + + if(cfg.enable_ws) { + ws.reset(new ProtBackendWs()); + err = ws->start(cfg.ws_register_with_webserver); + if(err != ESP_OK) { + return err; + } + } + + m_udp = udp.release(); + m_ws = ws.release(); + + xTaskCreate(&Protocol::send_task, "rbctrl_send", 2560, this, 9, &m_task_send); + xTaskCreate(&Protocol::recv_task, "rbctrl_recv", 4096, this, 10, &m_task_recv); + return ESP_OK; +} + +void Protocol::stop() { + std::lock_guard lock(m_mutex); + if (m_task_send == nullptr) { + return; + } + + QueueItem it = { }; + xQueueSendToFront(m_sendQueue, &it, portMAX_DELAY); + xTaskNotify(m_task_recv, 0, eNoAction); + + delete m_udp; + delete m_ws; + m_udp = nullptr; + m_ws = nullptr; + + m_task_send = nullptr; + m_task_recv = nullptr; +} + +bool Protocol::is_addr_empty(const ProtocolAddr& addr) const { + return addr.kind == ProtBackendType::PROT_NONE; +} + +bool Protocol::is_addr_same(const ProtocolAddr& a, const internal::ProtocolAddr& b) const { + return memcmp(&a, &b, sizeof(ProtocolAddr)) == 0; +} + +bool Protocol::get_possessed_addr(ProtocolAddr& addr) const { + std::lock_guard lock(m_mutex); + if (is_addr_empty(m_possessed_addr)) + return false; + addr = m_possessed_addr; + return true; +} + +bool Protocol::is_possessed() const { + m_mutex.lock(); + bool res = !is_addr_empty(m_possessed_addr); + m_mutex.unlock(); + return res; +} + +bool Protocol::is_mustarrive_complete(uint32_t id) const { + if (id == UINT32_MAX) + return true; + + std::lock_guard l(m_mustarrive_mutex); + for (const auto& it : m_mustarrive_queue) { + if (it.id == id) + return false; + } + return true; +} + +void Protocol::send(const char* cmd, rbjson::Object* obj) { + ProtocolAddr addr; + if (!get_possessed_addr(addr)) { + ESP_LOGW(RBPROT_TAG, "can't send, the device was not possessed yet."); + return; + } + send(addr, cmd, obj); +} + +void Protocol::send(const ProtocolAddr& addr, const char* cmd, rbjson::Object* obj) { + std::unique_ptr autoptr; + if (obj == NULL) { + obj = new rbjson::Object(); + autoptr.reset(obj); + } + + obj->set("c", new rbjson::String(cmd)); + send(addr, obj); +} + +void Protocol::send(const ProtocolAddr& addr, rbjson::Object* obj) { + m_mutex.lock(); + const int n = m_write_counter++; + m_mutex.unlock(); + + obj->set("n", new rbjson::Number(n)); + const auto str = obj->str(); + send(addr, str.c_str(), str.size()); +} + +void Protocol::send(const ProtocolAddr& addr, const char* buf) { + send(addr, buf, strlen(buf)); +} + +void Protocol::send(const ProtocolAddr& addr, const char* buf, size_t size) { + if (size == 0) + return; + + QueueItem it; + it.addr = addr; + it.buf = new char[size]; + it.size = size; + memcpy(it.buf, buf, size); + + if (xQueueSend(m_sendQueue, &it, pdMS_TO_TICKS(200)) != pdTRUE) { + ESP_LOGE(RBPROT_TAG, "failed to send - queue full!"); + delete[] it.buf; + } +} + +uint32_t Protocol::send_mustarrive(const char* cmd, rbjson::Object* params) { + ProtocolAddr addr; + if (!get_possessed_addr(addr)) { + ESP_LOGW(RBPROT_TAG, "can't send, the device was not possessed yet."); + delete params; + return UINT32_MAX; + } + + if (params == NULL) { + params = new rbjson::Object(); + } + + params->set("c", cmd); + + MustArrive mr; + mr.pkt = params; + mr.attempts = 0; + + m_mustarrive_mutex.lock(); + const uint32_t id = m_mustarrive_e++; + mr.id = id; + params->set("e", mr.id); + m_mustarrive_queue.emplace_back(mr); + send(addr, params); + m_mustarrive_mutex.unlock(); + + return id; +} + +void Protocol::send_log(const char* fmt, ...) { + va_list args; + va_start(args, fmt); + send_log(fmt, args); + va_end(args); +} + +void Protocol::send_log(const char* fmt, va_list args) { + char static_buf[256]; + std::unique_ptr dyn_buf; + char* used_buf = static_buf; + + int fmt_len = vsnprintf(static_buf, sizeof(static_buf), fmt, args); + if (fmt_len >= sizeof(static_buf)) { + dyn_buf.reset(new char[fmt_len + 1]); + used_buf = dyn_buf.get(); + vsnprintf(dyn_buf.get(), fmt_len + 1, fmt, args); + } + + send_log(std::string(used_buf)); +} + +void Protocol::send_log(const std::string& str) { + rbjson::Object* pkt = new rbjson::Object(); + pkt->set("msg", str); + send_mustarrive("log", pkt); +} + +void Protocol::handle_msg(const ProtocolAddr& addr, rbjson::Object* pkt) { + const auto cmd = pkt->getString("c"); + + if (cmd == "discover") { + std::unique_ptr res(new rbjson::Object()); + res->set("c", "found"); + res->set("owner", m_owner); + res->set("name", m_name); + res->set("desc", m_desc); + + const auto str = res->str(); + send(addr, str.c_str(), str.size()); + return; + } + + if (!pkt->contains("n")) { + ESP_LOGE(RBPROT_TAG, "packet does not have counter!"); + return; + } + + const bool isPossessCmd = cmd == "possess"; + + const int counter = pkt->getInt("n"); + if (counter == -1 || isPossessCmd) { + m_read_counter = 0; + m_mutex.lock(); + m_write_counter = 0; + m_mutex.unlock(); + } else if (counter < m_read_counter && m_read_counter - counter < 25) { + return; + } else { + m_read_counter = counter; + } + + if (is_addr_empty(m_possessed_addr) || isPossessCmd) { + m_mutex.lock(); + if (!is_addr_same(m_possessed_addr, addr)) { + m_possessed_addr = addr; + } + m_mustarrive_e = 0; + m_mustarrive_f = 0xFFFFFFFF; + m_write_counter = -1; + m_read_counter = -1; + m_mutex.unlock(); + + m_mustarrive_mutex.lock(); + for (auto it : m_mustarrive_queue) { + delete it.pkt; + } + m_mustarrive_queue.clear(); + m_mustarrive_mutex.unlock(); + } + + if (pkt->contains("f")) { + { + std::unique_ptr resp(new rbjson::Object); + resp->set("c", cmd); + resp->set("f", pkt->getInt("f")); + send(addr, resp.get()); + } + + int f = pkt->getInt("f"); + if (f <= m_mustarrive_f && m_mustarrive_f != 0xFFFFFFFF) { + return; + } else { + m_mustarrive_f = f; + } + } else if (pkt->contains("e")) { + uint32_t e = pkt->getInt("e"); + m_mustarrive_mutex.lock(); + for (auto itr = m_mustarrive_queue.begin(); itr != m_mustarrive_queue.end(); ++itr) { + if ((*itr).id == e) { + delete (*itr).pkt; + m_mustarrive_queue.erase(itr); + break; + } + } + m_mustarrive_mutex.unlock(); + return; + } + + if (isPossessCmd) { + ESP_LOGI(RBPROT_TAG, "We are possessed!"); + send_log("The device %s has been possessed!\n", m_name); + } + + if (m_callback != NULL) { + m_callback(cmd, pkt); + } +} + +void Protocol::resend_mustarrive_locked() { + ProtocolAddr possesed_addr; + get_possessed_addr(possesed_addr); + + for (auto itr = m_mustarrive_queue.begin(); itr != m_mustarrive_queue.end();) { + // Websocket protocol does not resend + if (possesed_addr.kind == ProtBackendType::PROT_UDP) { + m_mutex.lock(); + if(m_udp) { + itr->pkt->set("n", m_write_counter++); + m_udp->resend_mustarrive(possesed_addr, itr->pkt); + } + m_mutex.unlock(); + } + + if (++(*itr).attempts >= MUST_ARRIVE_ATTEMPTS) { + delete (*itr).pkt; + itr = m_mustarrive_queue.erase(itr); + } else { + ++itr; + } + } +} + +void Protocol::send_task(void *selfVoid) { + auto& self = *((Protocol*)selfVoid); + + TickType_t mustarrive_next; + QueueItem it; + + mustarrive_next = xTaskGetTickCount() + MUST_ARRIVE_TIMER_PERIOD; + + while (true) { + for (uint8_t i = 0; i < 16 && xQueueReceive(self.m_sendQueue, &it, MS_TO_TICKS(10)) == pdTRUE; ++i) { + if(it.addr.kind == ProtBackendType::PROT_NONE) { + goto exit; + } + + self.m_mutex.lock(); + switch(it.addr.kind) { + case ProtBackendType::PROT_UDP: + if(self.m_udp) { + self.m_udp->send_from_queue(it); + } + break; + case ProtBackendType::PROT_WS: + if(self.m_ws) { + self.m_ws->send_from_queue(it); + } + break; + case ProtBackendType::PROT_NONE: + break; + } + self.m_mutex.unlock(); + + delete[] it.buf; + } + + if (xTaskGetTickCount() >= mustarrive_next) { + self.m_mustarrive_mutex.lock(); + if (self.m_mustarrive_queue.size() != 0) { + self.resend_mustarrive_locked(); + } + self.m_mustarrive_mutex.unlock(); + mustarrive_next = xTaskGetTickCount() + MUST_ARRIVE_TIMER_PERIOD; + } + } + +exit: + vTaskDelete(nullptr); +} + +void Protocol::recv_task(void *selfVoid) { + auto& self = *((Protocol*)selfVoid); + + ProtocolAddr recv_addr; + std::vector buf; + buf.resize(64); + + while(xTaskNotifyWait(0, 0, NULL, 0) == pdFALSE) { + bool received_msg = false; + + self.m_mutex.lock(); + if(self.m_udp) { + auto pkt = self.m_udp->recv_iter(buf, recv_addr); + if(pkt) { + self.m_mutex.unlock(); + self.handle_msg(recv_addr, pkt.get()); + pkt.reset(); + received_msg = true; + self.m_mutex.lock(); + } + } + + if(self.m_ws) { + auto pkt = self.m_ws->recv_iter(buf, recv_addr); + if(pkt) { + self.m_mutex.unlock(); + self.handle_msg(recv_addr, pkt.get()); + pkt.reset(); + received_msg = true; + } else { + self.m_mutex.unlock(); + } + } else { + self.m_mutex.unlock(); + } + + if(!received_msg) { + vTaskDelay(MS_TO_TICKS(10)); + } + } + + vTaskDelete(nullptr); +} + +}; diff --git a/src/rbprotocol.h b/src/rbprotocol.h index 6036792..54ea942 100644 --- a/src/rbprotocol.h +++ b/src/rbprotocol.h @@ -1,12 +1,140 @@ #pragma once -#include "rbprotocoludp.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "rbjson.h" #define RBPROTOCOL_AXIS_MIN (-32767) //!< Minimal value of axes in "joy" command #define RBPROTOCOL_AXIS_MAX (32767) //!< Maximal value of axes in "joy" command - -// Backwards compatibility namespace rb { -using Protocol = ProtocolUdp; + +namespace internal { + enum ProtBackendType : uint8_t { + PROT_NONE = 0, + PROT_UDP = 1, + PROT_WS = 2, + }; + + struct ProtocolAddrUdp { + struct in_addr ip; + uint16_t port; + }; + + struct ProtocolAddrWs { + int fd; + }; + + struct ProtocolAddr { + union { + ProtocolAddrUdp udp; + ProtocolAddrWs ws; + }; + ProtBackendType kind; + }; + + struct QueueItem { + ProtocolAddr addr; + char* buf; + uint16_t size; + }; + + class ProtBackendUdp; + class ProtBackendWs; +}; + +struct ProtocolConfig { + bool enable_udp; + bool enable_ws; + bool ws_register_with_webserver; + uint16_t udp_port; +}; + +class Protocol { +public: + typedef std::function callback_t; + + static constexpr const ProtocolConfig DEFAULT_CONFIG = { + .enable_udp = true, + .enable_ws = true, + .ws_register_with_webserver = true, + .udp_port = 42424, + }; + + Protocol(const char* owner, const char* name, const char* description, callback_t callback = nullptr); + ~Protocol(); + + esp_err_t start(const ProtocolConfig& cfg = DEFAULT_CONFIG); + void stop(); + + void send(const char* cmd, rbjson::Object* params = NULL); + uint32_t send_mustarrive(const char* cmd, rbjson::Object* params = NULL); + + void send_log(const char* fmt, ...); + void send_log(const char* fmt, va_list args); + void send_log(const std::string& str); + + bool is_possessed() const; //!< Returns true of the device is possessed (somebody connected to it) + bool is_mustarrive_complete(uint32_t id) const; + + TaskHandle_t getTaskSend() const { return m_task_send; } + TaskHandle_t getTaskRecv() const { return m_task_recv; } + +private: + Protocol(Protocol&) = delete; + + struct MustArrive { + rbjson::Object* pkt; + uint32_t id; + int16_t attempts; + }; + + static void send_task(void *selfVoid); + static void recv_task(void *selfVoid); + + void handle_msg(const internal::ProtocolAddr& addr, rbjson::Object* pkt); + void resend_mustarrive_locked(); + + bool get_possessed_addr(internal::ProtocolAddr& addr) const; + bool is_addr_empty(const internal::ProtocolAddr& addr) const; + bool is_addr_same(const internal::ProtocolAddr& a, const internal::ProtocolAddr& b) const; + + void send(const internal::ProtocolAddr& addr, const char* command, rbjson::Object* obj); + void send(const internal::ProtocolAddr& addr, rbjson::Object* obj); + void send(const internal::ProtocolAddr& addr, const char* buf); + void send(const internal::ProtocolAddr& addr, const char* buf, size_t size); + + const char* m_owner; + const char* m_name; + const char* m_desc; + + callback_t m_callback; + + int32_t m_read_counter; + int32_t m_write_counter; + internal::ProtocolAddr m_possessed_addr; + QueueHandle_t m_sendQueue; + internal::ProtBackendUdp *m_udp; + internal::ProtBackendWs *m_ws; + mutable std::mutex m_mutex; + + uint32_t m_mustarrive_e; + uint32_t m_mustarrive_f; + std::vector m_mustarrive_queue; + mutable std::mutex m_mustarrive_mutex; + + TaskHandle_t m_task_send; + TaskHandle_t m_task_recv; +}; + }; diff --git a/src/rbprotocolbase.h b/src/rbprotocolbase.h deleted file mode 100644 index 383fe99..0000000 --- a/src/rbprotocolbase.h +++ /dev/null @@ -1,364 +0,0 @@ -#pragma once - -#include -#include -#include -#include -#include -#include -#include - -#include "rbjson.h" - -#define RBPROT_TAG "RbProtocol" - -namespace rb { - - -class ProtocolBase { -public: - virtual ~ProtocolBase() {} - - virtual void send(const char* cmd, rbjson::Object* params = NULL) = 0; - virtual uint32_t send_mustarrive(const char* cmd, rbjson::Object* params = NULL) = 0; - - virtual bool is_possessed() const = 0; //!< Returns true of the device is possessed (somebody connected to it) - virtual bool is_mustarrive_complete(uint32_t id) const = 0; - - void send_log(const char* fmt, ...) { - va_list args; - va_start(args, fmt); - send_log(fmt, args); - va_end(args); - } - - void send_log(const char* fmt, va_list args) { - char static_buf[256]; - std::unique_ptr dyn_buf; - char* used_buf = static_buf; - - int fmt_len = vsnprintf(static_buf, sizeof(static_buf), fmt, args); - if (fmt_len >= sizeof(static_buf)) { - dyn_buf.reset(new char[fmt_len + 1]); - used_buf = dyn_buf.get(); - vsnprintf(dyn_buf.get(), fmt_len + 1, fmt, args); - } - - send_log(std::string(used_buf)); - } - - void send_log(const std::string& str) { - rbjson::Object* pkt = new rbjson::Object(); - pkt->set("msg", str); - send_mustarrive("log", pkt); - } -}; - -template -class ProtocolImplBase : public ProtocolBase { -public: - typedef std::function callback_t; - - ProtocolImplBase(const char* owner, const char* name, const char* description, callback_t callback = nullptr); - virtual ~ProtocolImplBase() { - stop(); - } - - virtual void stop(); - - void send(const char* cmd, rbjson::Object* params = NULL); - uint32_t send_mustarrive(const char* cmd, rbjson::Object* params = NULL); - - bool is_possessed() const; //!< Returns true of the device is possessed (somebody connected to it) - bool is_mustarrive_complete(uint32_t id) const; - - TaskHandle_t getTaskSend() const { return m_task_send; } - TaskHandle_t getTaskRecv() const { return m_task_recv; } - -protected: - struct MustArrive { - rbjson::Object* pkt; - uint32_t id; - int16_t attempts; - }; - - struct QueueItem { - AddrT addr; - char* buf; - uint16_t size; - }; - - void handle_msg(const AddrT& addr, rbjson::Object* pkt); - - bool get_possessed_addr(AddrT& addr) const; - virtual bool is_addr_empty(const AddrT& addr) const = 0; - virtual bool is_addr_same(const AddrT& a, const AddrT& b) const = 0; - - void send(const AddrT& addr, const char* command, rbjson::Object* obj); - void send(const AddrT& addr, rbjson::Object* obj); - void send(const AddrT& addr, const char* buf); - void send(const AddrT& addr, const char* buf, size_t size); - - const char* m_owner; - const char* m_name; - const char* m_desc; - - callback_t m_callback; - - int32_t m_read_counter; - int32_t m_write_counter; - AddrT m_possessed_addr; - QueueHandle_t m_sendQueue; - mutable std::mutex m_mutex; - - uint32_t m_mustarrive_e; - uint32_t m_mustarrive_f; - std::vector m_mustarrive_queue; - mutable std::mutex m_mustarrive_mutex; - - TaskHandle_t m_task_send; - TaskHandle_t m_task_recv; -}; - - -template -ProtocolImplBase::ProtocolImplBase(const char* owner, const char* name, const char* description, ProtocolImplBase::callback_t callback) { - m_owner = owner; - m_name = name; - m_desc = description; - m_callback = callback; - - m_sendQueue = xQueueCreate(32, sizeof(QueueItem)); - - m_read_counter = 0; - m_write_counter = 0; - - m_mustarrive_e = 0; - m_mustarrive_f = 0xFFFFFFFF; - - m_task_send = nullptr; - m_task_recv = nullptr; -} - -template -void ProtocolImplBase::stop() { - std::lock_guard lock(m_mutex); - if (m_task_send == nullptr) { - return; - } - - QueueItem it = { }; - xQueueSend(m_sendQueue, &it, portMAX_DELAY); - - m_task_send = nullptr; - m_task_recv = nullptr; -} - - -template -bool ProtocolImplBase::get_possessed_addr(AddrT& addr) const { - std::lock_guard lock(m_mutex); - if (is_addr_empty(m_possessed_addr)) - return false; - addr = m_possessed_addr; - return true; -} - -template -bool ProtocolImplBase::is_possessed() const { - m_mutex.lock(); - bool res = !is_addr_empty(m_possessed_addr); - m_mutex.unlock(); - return res; -} - -template -bool ProtocolImplBase::is_mustarrive_complete(uint32_t id) const { - if (id == UINT32_MAX) - return true; - - std::lock_guard l(m_mustarrive_mutex); - for (const auto& it : m_mustarrive_queue) { - if (it.id == id) - return false; - } - return true; -} - -template -void ProtocolImplBase::send(const char* cmd, rbjson::Object* obj) { - AddrT addr; - if (!get_possessed_addr(addr)) { - ESP_LOGW(RBPROT_TAG, "can't send, the device was not possessed yet."); - return; - } - send(addr, cmd, obj); -} - -template -void ProtocolImplBase::send(const AddrT& addr, const char* cmd, rbjson::Object* obj) { - std::unique_ptr autoptr; - if (obj == NULL) { - obj = new rbjson::Object(); - autoptr.reset(obj); - } - - obj->set("c", new rbjson::String(cmd)); - send(addr, obj); -} - -template -void ProtocolImplBase::send(const AddrT& addr, rbjson::Object* obj) { - m_mutex.lock(); - const int n = m_write_counter++; - m_mutex.unlock(); - - obj->set("n", new rbjson::Number(n)); - const auto str = obj->str(); - send(addr, str.c_str(), str.size()); -} - -template -void ProtocolImplBase::send(const AddrT& addr, const char* buf) { - send(addr, buf, strlen(buf)); -} - -template -void ProtocolImplBase::send(const AddrT& addr, const char* buf, size_t size) { - if (size == 0) - return; - - QueueItem it; - it.addr = addr; - it.buf = new char[size]; - it.size = size; - memcpy(it.buf, buf, size); - - if (xQueueSend(m_sendQueue, &it, pdMS_TO_TICKS(200)) != pdTRUE) { - ESP_LOGE(RBPROT_TAG, "failed to send - queue full!"); - delete[] it.buf; - } -} - -template -uint32_t ProtocolImplBase::send_mustarrive(const char* cmd, rbjson::Object* params) { - AddrT addr; - if (!get_possessed_addr(addr)) { - ESP_LOGW(RBPROT_TAG, "can't send, the device was not possessed yet."); - return UINT32_MAX; - } - - if (params == NULL) { - params = new rbjson::Object(); - } - - params->set("c", cmd); - - MustArrive mr; - mr.pkt = params; - mr.attempts = 0; - - m_mustarrive_mutex.lock(); - const uint32_t id = m_mustarrive_e++; - mr.id = id; - params->set("e", mr.id); - m_mustarrive_queue.emplace_back(mr); - send(addr, params); - m_mustarrive_mutex.unlock(); - - return id; -} - -template -void ProtocolImplBase::handle_msg(const AddrT& addr, rbjson::Object* pkt) { - const auto cmd = pkt->getString("c"); - - if (cmd == "discover") { - std::unique_ptr res(new rbjson::Object()); - res->set("c", "found"); - res->set("owner", m_owner); - res->set("name", m_name); - res->set("desc", m_desc); - - const auto str = res->str(); - send(addr, str.c_str(), str.size()); - return; - } - - if (!pkt->contains("n")) { - ESP_LOGE(RBPROT_TAG, "packet does not have counter!"); - return; - } - - const bool isPossessCmd = cmd == "possess"; - - const int counter = pkt->getInt("n"); - if (counter == -1 || isPossessCmd) { - m_read_counter = 0; - m_mutex.lock(); - m_write_counter = 0; - m_mutex.unlock(); - } else if (counter < m_read_counter && m_read_counter - counter < 25) { - return; - } else { - m_read_counter = counter; - } - - if (is_addr_empty(m_possessed_addr) || isPossessCmd) { - m_mutex.lock(); - if (!is_addr_same(m_possessed_addr, addr)) { - m_possessed_addr = addr; - } - m_mustarrive_e = 0; - m_mustarrive_f = 0xFFFFFFFF; - m_write_counter = -1; - m_read_counter = -1; - m_mutex.unlock(); - - m_mustarrive_mutex.lock(); - for (auto it : m_mustarrive_queue) { - delete it.pkt; - } - m_mustarrive_queue.clear(); - m_mustarrive_mutex.unlock(); - } - - if (pkt->contains("f")) { - { - std::unique_ptr resp(new rbjson::Object); - resp->set("c", cmd); - resp->set("f", pkt->getInt("f")); - send(addr, resp.get()); - } - - int f = pkt->getInt("f"); - if (f <= m_mustarrive_f && m_mustarrive_f != 0xFFFFFFFF) { - return; - } else { - m_mustarrive_f = f; - } - } else if (pkt->contains("e")) { - uint32_t e = pkt->getInt("e"); - m_mustarrive_mutex.lock(); - for (auto itr = m_mustarrive_queue.begin(); itr != m_mustarrive_queue.end(); ++itr) { - if ((*itr).id == e) { - delete (*itr).pkt; - m_mustarrive_queue.erase(itr); - break; - } - } - m_mustarrive_mutex.unlock(); - return; - } - - if (isPossessCmd) { - ESP_LOGI(RBPROT_TAG, "We are possessed!"); - send_log("The device %s has been possessed!\n", m_name); - } - - if (m_callback != NULL) { - m_callback(cmd, pkt); - } -} - -}; diff --git a/src/rbprotocolmulti.hpp b/src/rbprotocolmulti.hpp deleted file mode 100644 index 3822b3c..0000000 --- a/src/rbprotocolmulti.hpp +++ /dev/null @@ -1,59 +0,0 @@ -#pragma once - - -#include "rbprotocolws.h" -#include "rbprotocoludp.h" - -namespace rb { - -class ProtocolMulti : public ProtocolBase { -public: - ProtocolMulti(ProtocolUdp& udp, ProtocolWs& ws) : m_udp(udp), m_ws(ws) { - - } - - void send(const char* cmd, rbjson::Object* params = NULL) { - if(m_udp.is_possessed()) { - m_udp.send(cmd, params); - } - if(m_ws.is_possessed()) { - m_ws.send(cmd, params); - } - } - - uint32_t send_mustarrive(const char* cmd, rbjson::Object* params = NULL) { - const auto udp_possessed = m_udp.is_possessed(); - const auto ws_possesed = m_ws.is_possessed(); - if(!udp_possessed && !ws_possesed) { - return UINT32_MAX; - } - - uint32_t id = 0; - - if(udp_possessed) { - id |= (m_udp.send(cmd, params) & 0xFFFF) << 16; - } - if(ws_possesed) { - id |= m_ws.send(cmd, params) & 0xFFFF; - } - return id; - } - - virtual bool is_possessed() const { - return m_udp.is_possessed() || m_ws.is_possessed(); - } - - virtual bool is_mustarrive_complete(uint32_t id) const { - if(id > 0xFFFF) { - return m_udp.is_mustarrive_complete(id >> 16); - } else { - return m_ws.is_mustarrive_complete(id); - } - } - -private: - ProtocolUdp& m_udp; - ProtocolWs& m_ws; -}; - -}; diff --git a/src/rbprotocoludp.cpp b/src/rbprotocoludp.cpp index aee5a3a..ce8191f 100644 --- a/src/rbprotocoludp.cpp +++ b/src/rbprotocoludp.cpp @@ -1,56 +1,36 @@ -#include -#include -#include -#include -#include -#include +#include -#include "esp_log.h" - -#include "lwip/err.h" -#include "lwip/sockets.h" -#include "lwip/sys.h" - -#include "rbjson.h" #include "rbprotocoludp.h" -using namespace rbjson; - -#define MS_TO_TICKS(ms) ((portTICK_PERIOD_MS <= ms) ? (ms / portTICK_PERIOD_MS) : 1) - -#define MUST_ARRIVE_TIMER_PERIOD MS_TO_TICKS(100) -#define MUST_ARRIVE_ATTEMPTS 15 +#define RBPROT_TAG "RBProtBackendUdp" namespace rb { +namespace internal { - -ProtocolUdp::ProtocolUdp(const char* owner, const char* name, const char* description, ProtocolUdp::callback_t callback) : - ProtocolImplBase(owner, name, description, callback) { - +ProtBackendUdp::ProtBackendUdp() { m_socket = -1; - - memset(&m_possessed_addr, 0, sizeof(UdpSockAddr)); } - -void ProtocolUdp::start(uint16_t port) { - std::lock_guard lock(m_mutex); - if (m_task_send != nullptr) { - return; +ProtBackendUdp::~ProtBackendUdp() { + if (m_socket != -1) { + close(m_socket); + m_socket = -1; } +} +esp_err_t ProtBackendUdp::start(uint16_t port) { m_socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (m_socket == -1) { ESP_LOGE(RBPROT_TAG, "failed to create socket: %s", strerror(errno)); - return; + return ESP_ERR_INVALID_STATE; } - int enable = 1; + const int enable = 1; if (setsockopt(m_socket, SOL_SOCKET, SO_REUSEADDR, &enable, sizeof(int)) == -1) { ESP_LOGE(RBPROT_TAG, "failed to set SO_REUSEADDR: %s", strerror(errno)); close(m_socket); m_socket = -1; - return; + return ESP_ERR_INVALID_STATE; } struct sockaddr_in addr_bind; @@ -63,29 +43,13 @@ void ProtocolUdp::start(uint16_t port) { ESP_LOGE(RBPROT_TAG, "failed to bind socket: %s", strerror(errno)); close(m_socket); m_socket = -1; - return; - } - - xTaskCreate(&ProtocolUdp::send_task_trampoline, "rbctrl_send", 2560, this, 9, &m_task_send); - xTaskCreate(&ProtocolUdp::recv_task_trampoline, "rbctrl_recv", 4096, this, 10, &m_task_recv); -} - -void ProtocolUdp::stop() { - ProtocolImplBase::stop(); - - if (m_socket != -1) { - close(m_socket); - m_socket = -1; + return ESP_ERR_INVALID_STATE; } -} -void ProtocolUdp::send_task_trampoline(void* ctrl) { - ((ProtocolUdp*)ctrl)->send_task(); + return ESP_OK; } -void ProtocolUdp::send_task() { - TickType_t mustarrive_next; - QueueItem it; +void ProtBackendUdp::send_from_queue(const QueueItem& it) { struct sockaddr_in send_addr = { .sin_len = sizeof(struct sockaddr_in), .sin_family = AF_INET, @@ -94,44 +58,16 @@ void ProtocolUdp::send_task() { .sin_zero = { 0 }, }; - m_mutex.lock(); - const int socket_fd = m_socket; - m_mutex.unlock(); - - mustarrive_next = xTaskGetTickCount() + MUST_ARRIVE_TIMER_PERIOD; - - while (true) { - for (size_t i = 0; xQueueReceive(m_sendQueue, &it, MS_TO_TICKS(10)) == pdTRUE && i < 16; ++i) { - if (it.buf == nullptr) { - goto exit; - } - - send_addr.sin_port = it.addr.port; - send_addr.sin_addr = it.addr.ip; - - int res = ::sendto(socket_fd, it.buf, it.size, 0, (struct sockaddr*)&send_addr, sizeof(struct sockaddr_in)); - if (res < 0) { - ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); - } - delete[] it.buf; - } + send_addr.sin_port = it.addr.udp.port; + send_addr.sin_addr = it.addr.udp.ip; - if (xTaskGetTickCount() >= mustarrive_next) { - m_mustarrive_mutex.lock(); - if (m_mustarrive_queue.size() != 0) { - resend_mustarrive_locked(); - } - m_mustarrive_mutex.unlock(); - mustarrive_next = xTaskGetTickCount() + MUST_ARRIVE_TIMER_PERIOD; - } + int res = ::sendto(m_socket, it.buf, it.size, 0, (struct sockaddr*)&send_addr, sizeof(struct sockaddr_in)); + if (res < 0) { + ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); } - -exit: - vTaskDelete(nullptr); } -void ProtocolUdp::resend_mustarrive_locked() { - bool possesed; +void ProtBackendUdp::resend_mustarrive(const ProtocolAddr& addr, const rbjson::Object* pkt) { struct sockaddr_in send_addr = { .sin_len = sizeof(struct sockaddr_in), .sin_family = AF_INET, @@ -139,100 +75,54 @@ void ProtocolUdp::resend_mustarrive_locked() { .sin_addr = { 0 }, .sin_zero = { 0 }, }; - { - UdpSockAddr addr; - if((possesed = get_possessed_addr(addr))) { - send_addr.sin_port = addr.port; - send_addr.sin_addr = addr.ip; - } - } - - for (auto itr = m_mustarrive_queue.begin(); itr != m_mustarrive_queue.end();) { - if (possesed) { - m_mutex.lock(); - const int n = m_write_counter++; - m_mutex.unlock(); + send_addr.sin_port = addr.udp.port; + send_addr.sin_addr = addr.udp.ip; - (*itr).pkt->set("n", n); - - const auto str = (*itr).pkt->str(); - - int res = ::sendto(m_socket, str.c_str(), str.size(), 0, (struct sockaddr*)&send_addr, sizeof(struct sockaddr_in)); - if (res < 0) { - ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); - } - } - - if (++(*itr).attempts >= MUST_ARRIVE_ATTEMPTS) { - delete (*itr).pkt; - itr = m_mustarrive_queue.erase(itr); - } else { - ++itr; - } + const auto str = pkt->str(); + int res = ::sendto(m_socket, str.c_str(), str.size(), 0, (struct sockaddr*)&send_addr, sizeof(struct sockaddr_in)); + if (res < 0) { + ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); } } -void ProtocolUdp::recv_task_trampoline(void* ctrl) { - ((ProtocolUdp*)ctrl)->recv_task(); -} - -void ProtocolUdp::recv_task() { - m_mutex.lock(); - const int socket_fd = m_socket; - m_mutex.unlock(); - - struct sockaddr_in addr; - socklen_t addr_len; - size_t buf_size = 64; - char* buf = (char*)malloc(buf_size); - ssize_t res; - +std::unique_ptr ProtBackendUdp::recv_iter(std::vector& buf, ProtocolAddr& out_received_addr) { + ssize_t received_len = 0; while (true) { - while (true) { - res = recvfrom(socket_fd, buf, buf_size, MSG_PEEK, NULL, NULL); - if (res < 0) { - const auto err = errno; + received_len = recvfrom(m_socket, buf.data(), buf.size(), MSG_PEEK | MSG_DONTWAIT, NULL, NULL); + if (received_len < 0) { + const auto err = errno; + if(err != EAGAIN) { // with MSG_DONTWAIT, it means no message available ESP_LOGE(RBPROT_TAG, "error in recvfrom: %d %s!", err, strerror(err)); - if (err == EBADF) - goto exit; - vTaskDelay(MS_TO_TICKS(10)); - continue; } - - if (res < buf_size) - break; - buf_size += 16; - buf = (char*)realloc(buf, buf_size); + return nullptr; } - addr_len = sizeof(struct sockaddr_in); - const auto pop_res = recvfrom(socket_fd, buf, 0, 0, (struct sockaddr*)&addr, &addr_len); - if (pop_res < 0) { - const auto err = errno; - ESP_LOGE(RBPROT_TAG, "error in recvfrom: %d %s!", err, strerror(err)); - if (err == EBADF) - goto exit; - vTaskDelay(MS_TO_TICKS(10)); - continue; - } + if (received_len < buf.size()) + break; + buf.resize(buf.size() + 16); + } - { - std::unique_ptr pkt(parse(buf, res)); - if (!pkt) { - ESP_LOGE(RBPROT_TAG, "failed to parse the packet's json"); - } else { - UdpSockAddr sa = { - .ip = addr.sin_addr, - .port = addr.sin_port, - }; - handle_msg(sa, pkt.get()); - } - } + struct sockaddr_in addr; + socklen_t addr_len = sizeof(struct sockaddr_in); + + const auto pop_res = recvfrom(m_socket, buf.data(), 0, 0, (struct sockaddr*)&addr, &addr_len); + if (pop_res < 0) { + const auto err = errno; + ESP_LOGE(RBPROT_TAG, "error in recvfrom: %d %s!", err, strerror(err)); + return nullptr; } -exit: - free(buf); - vTaskDelete(nullptr); + std::unique_ptr pkt(rbjson::parse((char*)buf.data(), received_len)); + if (!pkt) { + ESP_LOGE(RBPROT_TAG, "failed to parse the packet's json"); + return nullptr; + } + + out_received_addr.kind = ProtBackendType::PROT_UDP; + out_received_addr.udp.port = addr.sin_port; + out_received_addr.udp.ip = addr.sin_addr; + return pkt; } -}; // namespace rb +}; +}; diff --git a/src/rbprotocoludp.h b/src/rbprotocoludp.h index b5fe946..1c2c7e6 100644 --- a/src/rbprotocoludp.h +++ b/src/rbprotocoludp.h @@ -1,62 +1,26 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "rbjson.h" -#include "rbprotocolbase.h" +#include "rbprotocol.h" namespace rb { +namespace internal { -#define RBPROTOCOL_PORT 42424 //!< The default RBProtocol port -struct UdpSockAddr { - struct in_addr ip; - uint16_t port; -}; - -/** - * \brief Class that manages the RBProtocol communication - */ -class ProtocolUdp : public ProtocolImplBase { +class ProtBackendUdp { public: - typedef ProtocolImplBase::callback_t callback_t; - - /** - * The onPacketReceivedCallback is called when a packet arrives. - * It runs on a separate task, only single packet is processed at a time. - */ - ProtocolUdp(const char* owner, const char* name, const char* description, callback_t callback = nullptr); - ~ProtocolUdp() {} + ProtBackendUdp(); + ~ProtBackendUdp(); - void start(uint16_t port = RBPROTOCOL_PORT); //!< Start listening for UDP packets on port - void stop(); //!< Stop listening + esp_err_t start(uint16_t port); -private: - static void send_task_trampoline(void* ctrl); - void send_task(); - void resend_mustarrive_locked(); - - static void recv_task_trampoline(void* ctrl); - void recv_task(); - - bool is_addr_empty(const UdpSockAddr& addr) const { - return addr.port == 0; - } + void send_from_queue(const QueueItem& it); + void resend_mustarrive(const ProtocolAddr& addr, const rbjson::Object* pkt); - bool is_addr_same(const UdpSockAddr& a, const UdpSockAddr& b) const { - return a.ip.s_addr == b.ip.s_addr && a.port == b.port; - } + std::unique_ptr recv_iter(std::vector& buf, ProtocolAddr& out_received_addr); +private: int m_socket; }; }; +}; diff --git a/src/rbprotocolws.cpp b/src/rbprotocolws.cpp index a5f9879..6e01bcc 100644 --- a/src/rbprotocolws.cpp +++ b/src/rbprotocolws.cpp @@ -1,167 +1,104 @@ -#include -#include -#include -#include -#include -#include +#include -#include "esp_log.h" - -#include "lwip/err.h" -#include "lwip/sockets.h" -#include "lwip/sys.h" - -#include "rbjson.h" #include "rbprotocolws.h" - #include "rbwebserver_internal.h" -using namespace rbjson; - -#define MS_TO_TICKS(ms) ((portTICK_PERIOD_MS <= ms) ? (ms / portTICK_PERIOD_MS) : 1) -#define MUST_ARRIVE_TIMER_PERIOD MS_TO_TICKS(100) -#define MUST_ARRIVE_ATTEMPTS 15 +#define RBPROT_TAG "RBProtBackendWs" #define WS_OPCODE_CONTINUE 0x00 #define WS_OPCODE_TEXT 0x01 #define WS_OPCODE_CLOSE 0x08 namespace rb { +namespace internal { -ProtocolWs::ProtocolWs(const char* owner, const char* name, const char* description, ProtocolWs::callback_t callback) : - ProtocolImplBase(owner, name, description, callback) { - m_task_send = nullptr; - m_task_recv = nullptr; - - memset(&m_possessed_addr, 0, sizeof(WsAddr)); - - xTaskCreate(&ProtocolWs::recv_task_trampoline, "rbctrl_recv", 4096, this, 10, &m_task_recv); - xTaskCreate(&ProtocolWs::send_task_trampoline, "rbctrl_send", 2560, this, 9, &m_task_send); -} +ProtBackendWs::ProtBackendWs() { -void ProtocolWs::start() { - rb_web_set_wsprotocol(this); } -void ProtocolWs::stop() { - if(m_task_recv) { - rb_web_clear_wsprotocol(NULL); - xTaskNotify(m_task_recv, 0, eNoAction); - } - - ProtocolImplBase::stop(); +ProtBackendWs::~ProtBackendWs() { + rb_web_clear_wsprotocol(this); - m_client_fd_mu.lock(); - for(auto& client : m_client_fd) { + m_clients_mu.lock(); + for(auto& client : m_clients) { close(client->fd); } - m_client_fd.clear(); - m_client_fd_mu.unlock(); -} - -void ProtocolWs::addClient(int fd) { - m_client_fd_mu.lock(); - m_client_fd.push_back(std::unique_ptr(new Client(fd))); - m_client_fd_mu.unlock(); + m_clients.clear(); + m_clients_mu.unlock(); } -void ProtocolWs::close_client(int fd) { - m_client_fd_mu.lock(); - for(auto itr = m_client_fd.begin(); itr != m_client_fd.end(); ++itr) { - if(itr->get()->fd == fd) { - itr = m_client_fd.erase(itr); - close(fd); - break; - } +esp_err_t ProtBackendWs::start(bool register_with_webserver) { + if(register_with_webserver) { + rb_web_set_wsprotocol(this); } - m_client_fd_mu.unlock(); + return ESP_OK; } -void ProtocolWs::send_task_trampoline(void* ctrl) { - ((ProtocolWs*)ctrl)->send_task(); -} - -void ProtocolWs::send_task() { - TickType_t mustarrive_next; - QueueItem it; - +void ProtBackendWs::send_from_queue(const QueueItem& it) { uint8_t ws_header[4]; + ws_header[0] = (1 << 7) | WS_OPCODE_TEXT; // FIN flag + text opcode - mustarrive_next = xTaskGetTickCount() + MUST_ARRIVE_TIMER_PERIOD; - - while (true) { - for (size_t i = 0; xQueueReceive(m_sendQueue, &it, MS_TO_TICKS(10)) == pdTRUE && i < 16; ++i) { - if (it.buf == nullptr) { - if(it.addr.fd == 0) { - goto exit; - } - - ws_header[0] = (1 << 7) | WS_OPCODE_CLOSE; - } else { - ws_header[0] = (1 << 7) | WS_OPCODE_TEXT; // FIN flag + text opcode - } + if(it.size <= 125) { + ws_header[1] = it.size; + } else { + ws_header[1] = 126; + ws_header[2] = it.size >> 8; + ws_header[3] = it.size & 0xFF; + } - if(it.size <= 125) { - ws_header[1] = it.size; - } else { - ws_header[1] = 126; - ws_header[2] = it.size >> 8; - ws_header[3] = it.size & 0xFF; - } + int res = ::send(it.addr.ws.fd, ws_header, it.size <= 125 ? 2 : 4, 0); + if (res < 0) { + ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); + close_client(it.addr.ws.fd); + return; + } - int res = ::send(it.addr.fd, ws_header, it.size <= 125 ? 2 : 4, 0); - if (res < 0) { - ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); - close_client(it.addr.fd); - delete[] it.buf; - continue; - } + res = ::send(it.addr.ws.fd, it.buf, it.size, 0); + if (res < 0) { + ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); + close_client(it.addr.ws.fd); + } +} - if(it.buf == nullptr) { - close_client(it.addr.fd); - continue; - } +void ProtBackendWs::addClient(int fd) { + m_clients_mu.lock(); + m_clients.push_back(std::unique_ptr(new Client(fd))); + m_clients_mu.unlock(); +} - res = ::send(it.addr.fd, it.buf, it.size, 0); - if (res < 0) { - ESP_LOGE(RBPROT_TAG, "error in sendto: %d %s!", errno, strerror(errno)); - close_client(it.addr.fd); - } - delete[] it.buf; - } +void ProtBackendWs::close_client(int fd) { + m_clients_mu.lock(); + close_client_locked(fd); + m_clients_mu.unlock(); +} - if (xTaskGetTickCount() >= mustarrive_next) { - m_mustarrive_mutex.lock(); - if (m_mustarrive_queue.size() != 0) { - resend_mustarrive_locked(); - } - m_mustarrive_mutex.unlock(); - mustarrive_next = xTaskGetTickCount() + MUST_ARRIVE_TIMER_PERIOD; +void ProtBackendWs::close_client_locked(int fd) { + for(auto itr = m_clients.begin(); itr != m_clients.end(); ++itr) { + if(itr->get()->fd == fd) { + itr = m_clients.erase(itr); + close(fd); + return; } } - -exit: - vTaskDelete(nullptr); } -void ProtocolWs::resend_mustarrive_locked() { - for (auto itr = m_mustarrive_queue.begin(); itr != m_mustarrive_queue.end();) { - if ((*itr).attempts != -1 && ++(*itr).attempts >= MUST_ARRIVE_ATTEMPTS) { - delete (*itr).pkt; - itr = m_mustarrive_queue.erase(itr); - } else { - ++itr; - } +void ProtBackendWs::close_client_locked_gracefully(int fd) { + const uint8_t ws_header[2] = { + (1 << 7) | WS_OPCODE_CLOSE, + 0, + }; + + int res = ::send(fd, ws_header, 2, 0); + if (res < 0) { + ESP_LOGE(RBPROT_TAG, "error in sendto while gracefully closing: %d %s!", errno, strerror(errno)); } -} -void ProtocolWs::recv_task_trampoline(void* ctrl) { - ((ProtocolWs*)ctrl)->recv_task(); + close_client_locked(fd); } static int read_at_least(int fd, uint8_t *buf, size_t n) { - int res = ::recv(fd, buf, n, MSG_PEEK); + int res = ::recv(fd, buf, n, MSG_PEEK | MSG_DONTWAIT); if(res < 0) { if(errno == EWOULDBLOCK) { return 0; @@ -178,7 +115,7 @@ static int read_at_least(int fd, uint8_t *buf, size_t n) { return ::recv(fd, buf, n, 0); } -int ProtocolWs::process_client_header(Client &client, std::vector& buf) { +int ProtBackendWs::process_client_header(Client &client, std::vector& buf) { client.flags = buf[0]; const int mask = buf[1] >> 7; @@ -209,7 +146,7 @@ int ProtocolWs::process_client_header(Client &client, std::vector& buf) return 0; } -int ProtocolWs::process_client(Client &client, std::vector& buf) { +int ProtBackendWs::process_client(Client &client, std::vector& buf) { switch(client.state) { case ClientState::INITIAL: { int n = read_at_least(client.fd, buf.data(), 2); @@ -283,84 +220,56 @@ int ProtocolWs::process_client(Client &client, std::vector& buf) { return 0; } -void ProtocolWs::process_client_fully_received_locked(Client &client) { - const WsAddr sa = { .fd = client.fd }; +std::unique_ptr ProtBackendWs::process_client_fully_received_locked(ProtBackendWs::Client &client, ProtocolAddr& out_received_addr) { + client.state = ClientState::INITIAL; if(client.opcode() == WS_OPCODE_CLOSE) { - const QueueItem it = { - .addr = sa, - .buf = nullptr, - .size = 0 - }; - xQueueSend(m_sendQueue, &it, portMAX_DELAY); + close_client_locked_gracefully(client.fd); + return nullptr; } else { - ESP_LOGV(RBPROT_TAG, "parsing message %d %.*s", sa.fd, client.payload.size(), (char*)client.payload.data()); + ESP_LOGV(RBPROT_TAG, "parsing message %d %.*s", client.fd, client.payload.size(), (char*)client.payload.data()); - std::unique_ptr pkt(parse((char*)client.payload.data(), client.payload.size())); + std::unique_ptr pkt(rbjson::parse((char*)client.payload.data(), client.payload.size())); if (!pkt) { ESP_LOGE(RBPROT_TAG, "failed to parse the packet's json"); - return; + return nullptr; } - m_client_fd_mu.unlock(); - handle_msg(sa, pkt.get()); - m_client_fd_mu.lock(); + out_received_addr.kind = ProtBackendType::PROT_WS; + out_received_addr.ws.fd = client.fd; + return pkt; } - } -void ProtocolWs::recv_task() { - std::vector buf; - buf.resize(64); +std::unique_ptr ProtBackendWs::recv_iter(std::vector& buf, ProtocolAddr& out_received_addr) { + std::lock_guard lock(m_clients_mu); - while(xTaskNotifyWait(0, 0, NULL, 0) == pdFALSE) { - bool some_fully_received = false; + for(auto itr = m_clients.begin(); itr != m_clients.end();) { + auto& client = *itr->get(); - m_client_fd_mu.lock(); - for(auto itr = m_client_fd.begin(); itr != m_client_fd.end();) { - auto& client = *itr->get(); - - if(process_client(client, buf) < 0) { - close(client.fd); - itr = m_client_fd.erase(itr); - continue; - } else { - some_fully_received |= (client.state == ClientState::FULLY_RECEIVED); - } - - ++itr; + if(process_client(client, buf) < 0) { + close(client.fd); + itr = m_clients.erase(itr); + continue; + } else if(client.state == ClientState::FULLY_RECEIVED) { + return process_client_fully_received_locked(client, out_received_addr); } - while(some_fully_received) { - some_fully_received = false; - for(auto& client : m_client_fd) { - if(client->state != ClientState::FULLY_RECEIVED) { - continue; - } - some_fully_received = true; - - client->state = ClientState::INITIAL; - process_client_fully_received_locked(*client.get()); - break; - } - } - - m_client_fd_mu.unlock(); - - vTaskDelay(pdMS_TO_TICKS(10)); + ++itr; } - vTaskDelete(nullptr); + return nullptr; } +}; +}; + extern "C" void rb_web_ws_new_connection(void *wsProtocolInstance, int fd) { if(wsProtocolInstance == NULL) { close(fd); return; } - auto prot = (ProtocolWs*)wsProtocolInstance; + auto prot = (rb::internal::ProtBackendWs*)wsProtocolInstance; prot->addClient(fd); } - -}; // namespace rb diff --git a/src/rbprotocolws.h b/src/rbprotocolws.h index 682a6e3..ea459f4 100644 --- a/src/rbprotocolws.h +++ b/src/rbprotocolws.h @@ -1,39 +1,20 @@ #pragma once -#include -#include -#include -#include -#include -#include -#include -#include - -#include "rbjson.h" -#include "rbprotocolbase.h" +#include "rbprotocol.h" namespace rb { +namespace internal { -struct WsAddr { - int fd; -}; - -/** - * \brief Class that manages the RBProtocol communication - */ -class ProtocolWs : public ProtocolImplBase { +class ProtBackendWs { public: - typedef ProtocolImplBase::callback_t callback_t; + ProtBackendWs(); + ~ProtBackendWs(); - /** - * The onPacketReceivedCallback is called when a packet arrives. - * It runs on a separate task, only single packet is processed at a time. - */ - ProtocolWs(const char* owner, const char* name, const char* description, callback_t callback = nullptr); - ~ProtocolWs() {} + esp_err_t start(bool register_with_webserver); - void start(); - void stop(); + void send_from_queue(const QueueItem& it); + + std::unique_ptr recv_iter(std::vector& buf, ProtocolAddr& out_received_addr); void addClient(int fd); @@ -63,35 +44,19 @@ class ProtocolWs : public ProtocolImplBase { int fd; uint8_t flags; ClientState state; - }; - static void send_task_trampoline(void* ctrl); - void send_task(); - void resend_mustarrive_locked(); - - static void recv_task_trampoline(void* ctrl); - void recv_task(); - int process_client(Client &client, std::vector& buf); int process_client_header(Client &client, std::vector& buf); - void process_client_fully_received_locked(Client &client); + std::unique_ptr process_client_fully_received_locked(Client &client, ProtocolAddr& out_received_addr); void close_client(int fd); + void close_client_locked(int fd); + void close_client_locked_gracefully(int fd); - bool is_addr_empty(const WsAddr& addr) const { - return addr.fd == 0; - } - - bool is_addr_same(const WsAddr& a, const WsAddr& b) const { - return a.fd == b.fd; - } - - TaskHandle_t m_task_send; - TaskHandle_t m_task_recv; - - std::vector> m_client_fd; - std::mutex m_client_fd_mu; + std::vector> m_clients; + std::mutex m_clients_mu; }; }; +}; diff --git a/src/rbwebserver.c b/src/rbwebserver.c index e85cb49..ed6b271 100644 --- a/src/rbwebserver.c +++ b/src/rbwebserver.c @@ -115,10 +115,12 @@ static ssize_t rio_read(rio_t* rp, char* usrbuf, size_t n) { int cnt; while (rp->rio_cnt <= 0) { /* refill if buf is empty */ - rp->rio_cnt = read(rp->rio_fd, rp->rio_buf, - sizeof(rp->rio_buf)); + rp->rio_cnt = recv(rp->rio_fd, rp->rio_buf, + sizeof(rp->rio_buf), MSG_DONTWAIT); if (rp->rio_cnt < 0) { - if (errno != EINTR) /* interrupted by sig handler return */ + if(errno == EAGAIN) { + vTaskDelay(pdMS_TO_TICKS(10)); + } else if (errno != EINTR) /* interrupted by sig handler return */ return -1; } else if (rp->rio_cnt == 0) /* EOF */ return 0; @@ -252,31 +254,32 @@ static int prepare_gzip(http_request* req) { static int is_local_host(const char *hostHeader) { const int hostHeaderLen = strlen(hostHeader) - 2; // ignore \r\n if(hostHeaderLen < 0) { - return 1; + return true; } if(hostHeaderLen >= 7 && hostHeaderLen <= 15) { int dots = 0; bool is_ip = true; - for(const char *c = hostHeader; *c; ++c) { - if(*c == '.') { + for(int i = 0; i < hostHeaderLen; ++i) { + char c = hostHeader[i]; + if(c == '.') { ++dots; - } else if(*c < '0' || *c > '9') { + } else if(c < '0' || c > '9') { is_ip = false; break; } } if(is_ip && dots == 3) { - return 1; + return true; } } const char *localHostname = rb_dn_get_local_hostname(); if(strlen(localHostname) == hostHeaderLen && memcmp(localHostname, hostHeader, hostHeaderLen) == 0) { - return 1; + return true; } - return 0; + return false; } static void parse_request(int fd, http_request* req) { @@ -526,6 +529,7 @@ static void tiny_web_task(void* portPtr) { while (1) { connfd = accept(listenfd, (SA*)&clientaddr, &clientlen); + if (connfd >= 0) { if(process(connfd, &clientaddr) <= 0) { close(connfd); diff --git a/test-inis/esp32-idf5-idf.ini b/test-inis/esp32-idf5-idf.ini index 1c6f859..b3eaee0 100644 --- a/test-inis/esp32-idf5-idf.ini +++ b/test-inis/esp32-idf5-idf.ini @@ -9,7 +9,7 @@ ; http://docs.platformio.org/page/projectconf.html [env:esp32dev] -platform = espressif32@6.3.2 +platform = espressif32@6.4.0 board = esp32dev framework = espidf diff --git a/test-inis/esp32s3-idf4-arduino.ini b/test-inis/esp32s3-idf4-arduino.ini index 7b0da2f..73c3bee 100644 --- a/test-inis/esp32s3-idf4-arduino.ini +++ b/test-inis/esp32s3-idf4-arduino.ini @@ -9,7 +9,7 @@ ; http://docs.platformio.org/page/projectconf.html [env:esp32dev] -platform = espressif32@6.3.2 +platform = espressif32@6.4.0 board = esp32-s3-devkitc-1 framework = arduino