Skip to content

Commit

Permalink
websocket transport: use inline SP objects
Browse files Browse the repository at this point in the history
This also fixes a possible race in the listener that may cause
connections to be dropped incorrectly, if the connection arrives
before the common layer has posted an accept request.

Instead we save the connection and potentially match later, like
we do for the other protocols that need to negotiate.
  • Loading branch information
gdamore committed Dec 16, 2024
1 parent 58df588 commit a43eb95
Showing 1 changed file with 87 additions and 73 deletions.
160 changes: 87 additions & 73 deletions src/sp/transport/ws/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ struct ws_dialer {
nni_mtx mtx;
nni_aio connaio;
nng_stream_dialer *dialer;
nni_dialer *ndialer;
bool started;
};

Expand All @@ -37,20 +38,26 @@ struct ws_listener {
nni_mtx mtx;
nni_aio accaio;
nng_stream_listener *listener;
nni_listener *nlistener;
nni_list wait_pipes;
bool started;
};

struct ws_pipe {
nni_mtx mtx;
bool closed;
uint16_t peer;
nni_aio *user_txaio;
nni_aio *user_rxaio;
nni_aio txaio;
nni_aio rxaio;
nng_stream *ws;
nni_mtx mtx;
bool closed;
uint16_t peer;
nni_aio *user_txaio;
nni_aio *user_rxaio;
nni_aio txaio;
nni_aio rxaio;
nng_stream *ws;
nni_pipe *npipe;
nni_list_node node;
};

static void wstran_listener_match(ws_listener *l);

static void
wstran_pipe_send_cb(void *arg)
{
Expand Down Expand Up @@ -189,8 +196,14 @@ wstran_pipe_stop(void *arg)
static int
wstran_pipe_init(void *arg, nni_pipe *pipe)
{
NNI_ARG_UNUSED(arg);
NNI_ARG_UNUSED(pipe);
ws_pipe *p = arg;

p->npipe = pipe;
nni_mtx_init(&p->mtx);

// Initialize AIOs.
nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);
return (0);
}

Expand All @@ -204,7 +217,6 @@ wstran_pipe_fini(void *arg)
nni_aio_fini(&p->txaio);

nni_mtx_fini(&p->mtx);
NNI_FREE_STRUCT(p);
}

static void
Expand All @@ -218,25 +230,6 @@ wstran_pipe_close(void *arg)
nng_stream_close(p->ws);
}

static int
wstran_pipe_alloc(ws_pipe **pipep, void *ws)
{
ws_pipe *p;

if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
p->ws = ws;
nni_mtx_init(&p->mtx);

// Initialize AIOs.
nni_aio_init(&p->txaio, wstran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, wstran_pipe_recv_cb, p);

*pipep = p;
return (0);
}

static uint16_t
wstran_pipe_peer(void *arg)
{
Expand All @@ -253,7 +246,6 @@ wstran_listener_bind(void *arg, nng_url *url)

if ((rv = nng_stream_listener_listen(l->listener)) == 0) {
int port;
l->started = true;
nng_stream_listener_get_int(
l->listener, NNG_OPT_TCP_BOUND_PORT, &port);
url->u_port = (uint32_t) port;
Expand Down Expand Up @@ -293,9 +285,11 @@ wstran_listener_accept(void *arg, nni_aio *aio)
return;
}
nni_list_append(&l->aios, aio);
if (aio == nni_list_first(&l->aios)) {
if (!l->started) {
l->started = true;
nng_stream_listener_accept(l->listener, &l->accaio);
}
wstran_listener_match(l);
nni_mtx_unlock(&l->mtx);
}

Expand Down Expand Up @@ -356,6 +350,7 @@ wstran_pipe_getopt(
}

static nni_sp_pipe_ops ws_pipe_ops = {
.p_size = sizeof(ws_pipe),
.p_init = wstran_pipe_init,
.p_fini = wstran_pipe_fini,
.p_stop = wstran_pipe_stop,
Expand Down Expand Up @@ -383,7 +378,6 @@ wstran_dialer_fini(void *arg)
nng_stream_dialer_free(d->dialer);
nni_aio_fini(&d->connaio);
nni_mtx_fini(&d->mtx);
NNI_FREE_STRUCT(d);
}

static void
Expand All @@ -403,7 +397,6 @@ wstran_listener_fini(void *arg)
nng_stream_listener_free(l->listener);
nni_aio_fini(&l->accaio);
nni_mtx_fini(&l->mtx);
NNI_FREE_STRUCT(l);
}

static void
Expand All @@ -422,7 +415,6 @@ wstran_connect_cb(void *arg)
}
if ((uaio = nni_list_first(&d->aios)) == NULL) {
// The client stopped caring about this!
nng_stream_stop(ws);
nng_stream_free(ws);
nni_mtx_unlock(&d->mtx);
return;
Expand All @@ -431,14 +423,15 @@ wstran_connect_cb(void *arg)
NNI_ASSERT(nni_list_empty(&d->aios));
if ((rv = nni_aio_result(caio)) != 0) {
nni_aio_finish_error(uaio, rv);
} else if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
nng_stream_stop(ws);
} else if ((rv = nni_pipe_alloc_dialer((void **) &p, d->ndialer)) !=
0) {
nng_stream_free(ws);
nni_aio_finish_error(uaio, rv);
} else {
p->peer = d->peer;
p->ws = ws;

nni_aio_set_output(uaio, 0, p);
nni_aio_set_output(uaio, 0, p->npipe);
nni_aio_finish(uaio, 0, 0);
}
nni_mtx_unlock(&d->mtx);
Expand All @@ -457,66 +450,90 @@ static void
wstran_listener_close(void *arg)
{
ws_listener *l = arg;
ws_pipe *p;

nni_aio_close(&l->accaio);
NNI_LIST_FOREACH (&l->wait_pipes, p) {
nni_pipe_close(p->npipe);
}
nng_stream_listener_close(l->listener);
}

static void
wstran_listener_match(ws_listener *l)
{
nni_aio *uaio;
ws_pipe *p;
if (((uaio = nni_list_first(&l->aios)) == NULL) ||
((p = nni_list_first(&l->wait_pipes)) == NULL)) {
return;
}

nni_list_remove(&l->wait_pipes, p);
nni_aio_list_remove(uaio);

nni_aio_set_output(uaio, 0, p->npipe);
nni_aio_finish(uaio, 0, 0);
}

static void
wstran_accept_cb(void *arg)
{
ws_listener *l = arg;
nni_aio *aaio = &l->accaio;
nni_aio *uaio;
int rv;
ws_pipe *p;
nng_stream *ws;

nni_mtx_lock(&l->mtx);

ws = nni_aio_get_output(aaio, 0);
uaio = nni_list_first(&l->aios);
if ((rv = nni_aio_result(aaio)) != 0) {
if (uaio != NULL) {
nni_aio_list_remove(uaio);
nni_aio_finish_error(uaio, rv);
}
} else {
nng_stream *ws = nni_aio_get_output(aaio, 0);
if (uaio != NULL) {
ws_pipe *p;
// Make a pipe
nni_aio_list_remove(uaio);
if ((rv = wstran_pipe_alloc(&p, ws)) != 0) {
nng_stream_close(ws);
nni_aio_finish_error(uaio, rv);
} else {
p->peer = l->peer;

nni_aio_set_output(uaio, 0, p);
nni_aio_finish(uaio, 0, 0);
}
}
goto error;
}
if (!nni_list_empty(&l->aios)) {
nng_stream_listener_accept(l->listener, aaio);

rv = nni_pipe_alloc_listener((void **) &p, l->nlistener);
if (rv != 0) {
nng_stream_free(ws);
goto error;
}
p->peer = l->peer;
p->ws = ws;

nni_list_append(&l->wait_pipes, p);
wstran_listener_match(l);
nng_stream_listener_accept(l->listener, aaio);
nni_mtx_unlock(&l->mtx);
return;

error:

// possibly report this upstream
if ((uaio = nni_list_first(&l->aios)) != NULL) {
nni_aio_list_remove(uaio);
nni_aio_finish_error(uaio, rv);
}
nng_stream_listener_accept(l->listener, aaio);
nni_mtx_unlock(&l->mtx);
}

static int
wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
{
ws_dialer *d;
ws_dialer *d = (void *) dp;
nni_sock *s = nni_dialer_sock(ndialer);
int rv;
char name[64];

if ((d = NNI_ALLOC_STRUCT(d)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&d->mtx);

nni_aio_list_init(&d->aios);
nni_aio_init(&d->connaio, wstran_connect_cb, d);

d->peer = nni_sock_peer_id(s);
d->peer = nni_sock_peer_id(s);
d->ndialer = ndialer;

snprintf(
name, sizeof(name), "%s.sp.nanomsg.org", nni_sock_peer_name(s));
Expand All @@ -526,29 +543,26 @@ wstran_dialer_init(void **dp, nng_url *url, nni_dialer *ndialer)
d->dialer, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_dialer_set_string(
d->dialer, NNG_OPT_WS_PROTOCOL, name)) != 0)) {
wstran_dialer_fini(d);
return (rv);
}

*dp = d;
return (0);
}

static int
wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
{
ws_listener *l;
ws_listener *l = (void *) lp;
int rv;
nni_sock *s = nni_listener_sock(listener);
char name[64];

if ((l = NNI_ALLOC_STRUCT(l)) == NULL) {
return (NNG_ENOMEM);
}
l->nlistener = listener;
nni_mtx_init(&l->mtx);

nni_aio_list_init(&l->aios);
nni_aio_init(&l->accaio, wstran_accept_cb, l);
NNI_LIST_INIT(&l->wait_pipes, ws_pipe, node);

l->peer = nni_sock_peer_id(s);

Expand All @@ -560,10 +574,8 @@ wstran_listener_init(void **lp, nng_url *url, nni_listener *listener)
l->listener, NNI_OPT_WS_MSGMODE, true)) != 0) ||
((rv = nng_stream_listener_set_string(
l->listener, NNG_OPT_WS_PROTOCOL, name)) != 0)) {
wstran_listener_fini(l);
return (rv);
}
*lp = l;
return (0);
}

Expand Down Expand Up @@ -669,6 +681,7 @@ wstran_listener_set_tls(void *arg, nng_tls_config *tls)
}

static nni_sp_dialer_ops ws_dialer_ops = {
.d_size = sizeof(ws_dialer),
.d_init = wstran_dialer_init,
.d_fini = wstran_dialer_fini,
.d_connect = wstran_dialer_connect,
Expand All @@ -681,6 +694,7 @@ static nni_sp_dialer_ops ws_dialer_ops = {
};

static nni_sp_listener_ops ws_listener_ops = {
.l_size = sizeof(ws_listener),
.l_init = wstran_listener_init,
.l_fini = wstran_listener_fini,
.l_bind = wstran_listener_bind,
Expand Down

0 comments on commit a43eb95

Please sign in to comment.