Skip to content

Commit

Permalink
Don't require a connection id to use mg_wakeup
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshilliard committed Jun 14, 2024
1 parent 5084279 commit 7f366cb
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 26 deletions.
4 changes: 2 additions & 2 deletions mongoose.c
Original file line number Diff line number Diff line change
Expand Up @@ -7862,7 +7862,7 @@ static void wufn(struct mg_connection *c, int ev, void *ev_data) {
if (c->recv.len >= sizeof(*id)) {
struct mg_connection *t;
for (t = c->mgr->conns; t != NULL; t = t->next) {
if (t->id == *id) {
if (t->id == *id || 0 == *id) {
struct mg_str data = mg_str_n((char *) c->recv.buf + sizeof(*id),
c->recv.len - sizeof(*id));
mg_call(t, MG_EV_WAKEUP, &data);
Expand Down Expand Up @@ -7901,7 +7901,7 @@ bool mg_wakeup_init(struct mg_mgr *mgr) {

bool mg_wakeup(struct mg_mgr *mgr, unsigned long conn_id, const void *buf,
size_t len) {
if (mgr->pipe != MG_INVALID_SOCKET && conn_id > 0) {
if (mgr->pipe != MG_INVALID_SOCKET && conn_id >= 0) {
char *extended_buf = (char *) alloca(len + sizeof(conn_id));
memcpy(extended_buf, &conn_id, sizeof(conn_id));
memcpy(extended_buf + sizeof(conn_id), buf, len);
Expand Down
4 changes: 2 additions & 2 deletions src/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ static void wufn(struct mg_connection *c, int ev, void *ev_data) {
if (c->recv.len >= sizeof(*id)) {
struct mg_connection *t;
for (t = c->mgr->conns; t != NULL; t = t->next) {
if (t->id == *id) {
if (t->id == *id || 0 == *id) {
struct mg_str data = mg_str_n((char *) c->recv.buf + sizeof(*id),
c->recv.len - sizeof(*id));
mg_call(t, MG_EV_WAKEUP, &data);
Expand Down Expand Up @@ -666,7 +666,7 @@ bool mg_wakeup_init(struct mg_mgr *mgr) {

bool mg_wakeup(struct mg_mgr *mgr, unsigned long conn_id, const void *buf,
size_t len) {
if (mgr->pipe != MG_INVALID_SOCKET && conn_id > 0) {
if (mgr->pipe != MG_INVALID_SOCKET && conn_id >= 0) {
char *extended_buf = (char *) alloca(len + sizeof(conn_id));
memcpy(extended_buf, &conn_id, sizeof(conn_id));
memcpy(extended_buf + sizeof(conn_id), buf, len);
Expand Down
28 changes: 6 additions & 22 deletions tutorials/core/multi-threaded-12m/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,19 @@ static void start_thread(void *(*f)(void *), void *p) {
#endif
}

struct thread_data {
struct mg_mgr *mgr;
unsigned long conn_id; // Parent connection ID
};

static void *thread_function(void *param) {
struct thread_data *p = (struct thread_data *) param;
struct mg_mgr *mgr = (struct mg_mgr *) param;
printf("THREAD STARTED\n");
for (;;) {
sleep(2);
mg_wakeup(p->mgr, p->conn_id, "hi!", 3); // Send to parent
mg_wakeup(mgr, 0, "hi!", 3); // Send to parent
}
// Free all resources that were passed to us
free(p);
return NULL;
}

// HTTP request callback
static void fn(struct mg_connection *c, int ev, void *ev_data) {
if (ev == MG_EV_OPEN && c->is_listening) {
// Start worker thread
struct thread_data *data = calloc(1, sizeof(*data)); // Worker owns it
data->conn_id = c->id;
data->mgr = c->mgr;
start_thread(thread_function, data); // Start thread and pass data
} else if (ev == MG_EV_HTTP_MSG) {
if (ev == MG_EV_HTTP_MSG) {
struct mg_http_message *hm = (struct mg_http_message *) ev_data;
if (mg_match(hm->uri, mg_str("/websocket"), NULL)) {
mg_ws_upgrade(c, hm, NULL); // Upgrade HTTP to Websocket
Expand All @@ -68,12 +55,8 @@ static void fn(struct mg_connection *c, int ev, void *ev_data) {
} else if (ev == MG_EV_WAKEUP) {
struct mg_str *data = (struct mg_str *) ev_data;
// Broadcast message to all connected websocket clients.
// Traverse over all connections
for (struct mg_connection *wc = c->mgr->conns; wc != NULL; wc = wc->next) {
// Send only to marked connections
if (wc->data[0] == 'W')
mg_ws_send(wc, data->buf, data->len, WEBSOCKET_OP_TEXT);
}
if (c->data[0] == 'W')
mg_ws_send(c, data->buf, data->len, WEBSOCKET_OP_TEXT);
}
}

Expand All @@ -83,6 +66,7 @@ int main(void) {
mg_log_set(MG_LL_DEBUG); // Set debug log level
mg_http_listen(&mgr, "http://localhost:8000", fn, NULL); // Create listener
mg_wakeup_init(&mgr); // Initialise wakeup socket pair
start_thread(thread_function, &mgr); // Start thread and pass mgr
for (;;) { // Event loop
mg_mgr_poll(&mgr, 1000);
}
Expand Down

0 comments on commit 7f366cb

Please sign in to comment.