Skip to content

Commit

Permalink
tls transport: inline aios
Browse files Browse the repository at this point in the history
  • Loading branch information
gdamore committed Dec 16, 2024
1 parent 0de15f0 commit 23da22c
Showing 1 changed file with 49 additions and 53 deletions.
102 changes: 49 additions & 53 deletions src/sp/transport/tls/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ struct tlstran_pipe {
size_t gotrxhead;
size_t wanttxhead;
size_t wantrxhead;
nni_aio *txaio;
nni_aio *rxaio;
nni_aio *negoaio;
nni_aio txaio;
nni_aio rxaio;
nni_aio negoaio;
nni_msg *rxmsg;
nni_mtx mtx;
};
Expand All @@ -67,8 +67,8 @@ struct tlstran_ep {
nng_stream_dialer *dialer;
nng_stream_listener *listener;
nni_aio *useraio;
nni_aio *connaio;
nni_aio *timeaio;
nni_aio connaio;
nni_aio timeaio;
nni_list busypipes; // busy pipes -- ones passed to socket
nni_list waitpipes; // pipes waiting to match to socket
nni_list negopipes; // pipes busy negotiating
Expand Down Expand Up @@ -110,9 +110,9 @@ tlstran_pipe_close(void *arg)
{
tlstran_pipe *p = arg;

nni_aio_close(p->rxaio);
nni_aio_close(p->txaio);
nni_aio_close(p->negoaio);
nni_aio_close(&p->rxaio);
nni_aio_close(&p->txaio);
nni_aio_close(&p->negoaio);

nng_stream_close(p->tls);
}
Expand All @@ -122,9 +122,9 @@ tlstran_pipe_stop(void *arg)
{
tlstran_pipe *p = arg;

nni_aio_stop(p->rxaio);
nni_aio_stop(p->txaio);
nni_aio_stop(p->negoaio);
nni_aio_stop(&p->rxaio);
nni_aio_stop(&p->txaio);
nni_aio_stop(&p->negoaio);
}

static int
Expand Down Expand Up @@ -152,9 +152,9 @@ tlstran_pipe_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
}
nng_stream_free(p->tls);
nni_aio_free(p->rxaio);
nni_aio_free(p->txaio);
nni_aio_free(p->negoaio);
nni_aio_fini(&p->rxaio);
nni_aio_fini(&p->txaio);
nni_aio_fini(&p->negoaio);
nni_msg_free(p->rxmsg);
NNI_FREE_STRUCT(p);
}
Expand All @@ -163,20 +163,15 @@ static int
tlstran_pipe_alloc(tlstran_pipe **pipep)
{
tlstran_pipe *p;
int rv;

if ((p = NNI_ALLOC_STRUCT(p)) == NULL) {
return (NNG_ENOMEM);
}
nni_mtx_init(&p->mtx);

if (((rv = nni_aio_alloc(&p->txaio, tlstran_pipe_send_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->rxaio, tlstran_pipe_recv_cb, p)) != 0) ||
((rv = nni_aio_alloc(&p->negoaio, tlstran_pipe_nego_cb, p)) !=
0)) {
tlstran_pipe_fini(p);
return (rv);
}
nni_aio_init(&p->txaio, tlstran_pipe_send_cb, p);
nni_aio_init(&p->rxaio, tlstran_pipe_recv_cb, p);
nni_aio_init(&p->negoaio, tlstran_pipe_nego_cb, p);
nni_aio_list_init(&p->recvq);
nni_aio_list_init(&p->sendq);
nni_atomic_flag_reset(&p->reaped);
Expand Down Expand Up @@ -219,7 +214,7 @@ tlstran_pipe_nego_cb(void *arg)
{
tlstran_pipe *p = arg;
tlstran_ep *ep = p->ep;
nni_aio *aio = p->negoaio;
nni_aio *aio = &p->negoaio;
nni_aio *uaio;
int rv;

Expand Down Expand Up @@ -302,7 +297,7 @@ tlstran_pipe_send_cb(void *arg)
nni_aio *aio;
size_t n;
nni_msg *msg;
nni_aio *txaio = p->txaio;
nni_aio *txaio = &p->txaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->sendq);
Expand Down Expand Up @@ -347,12 +342,12 @@ tlstran_pipe_recv_cb(void *arg)
int rv;
size_t n;
nni_msg *msg;
nni_aio *rxaio = p->rxaio;
nni_aio *rxaio = &p->rxaio;

nni_mtx_lock(&p->mtx);
aio = nni_list_first(&p->recvq);

if ((rv = nni_aio_result(p->rxaio)) != 0) {
if ((rv = nni_aio_result(rxaio)) != 0) {
goto recv_error;
}

Expand Down Expand Up @@ -453,7 +448,7 @@ tlstran_pipe_send_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the txaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->sendq) == aio) {
nni_aio_abort(p->txaio, rv);
nni_aio_abort(&p->txaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
Expand Down Expand Up @@ -482,7 +477,7 @@ tlstran_pipe_send_start(tlstran_pipe *p)

NNI_PUT64(p->txlen, len);

txaio = p->txaio;
txaio = &p->txaio;
niov = 0;
iov[niov].iov_buf = p->txlen;
iov[niov].iov_len = sizeof(p->txlen);
Expand Down Expand Up @@ -542,7 +537,7 @@ tlstran_pipe_recv_cancel(nni_aio *aio, void *arg, int rv)
// The callback on the rxaio will cause the user aio to
// be canceled too.
if (nni_list_first(&p->recvq) == aio) {
nni_aio_abort(p->rxaio, rv);
nni_aio_abort(&p->rxaio, rv);
nni_mtx_unlock(&p->mtx);
return;
}
Expand All @@ -559,7 +554,7 @@ tlstran_pipe_recv_start(tlstran_pipe *p)
NNI_ASSERT(p->rxmsg == NULL);

// Schedule a read of the IPC header.
aio = p->rxaio;
aio = &p->rxaio;
iov.iov_buf = p->rxlen;
iov.iov_len = sizeof(p->rxlen);
nni_aio_set_iov(aio, 1, &iov);
Expand Down Expand Up @@ -622,11 +617,11 @@ tlstran_pipe_start(tlstran_pipe *p, nng_stream *conn, tlstran_ep *ep)
p->wanttxhead = 8;
iov.iov_len = 8;
iov.iov_buf = &p->txlen[0];
nni_aio_set_iov(p->negoaio, 1, &iov);
nni_aio_set_iov(&p->negoaio, 1, &iov);
nni_list_append(&ep->negopipes, p);

nni_aio_set_timeout(p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->tls, p->negoaio);
nni_aio_set_timeout(&p->negoaio, 10000); // 10 sec timeout to negotiate
nng_stream_send(p->tls, &p->negoaio);
}

static void
Expand All @@ -643,8 +638,8 @@ tlstran_ep_fini(void *arg)
nni_mtx_unlock(&ep->mtx);
nng_stream_dialer_free(ep->dialer);
nng_stream_listener_free(ep->listener);
nni_aio_free(ep->timeaio);
nni_aio_free(ep->connaio);
nni_aio_fini(&ep->timeaio);
nni_aio_fini(&ep->connaio);

nni_mtx_fini(&ep->mtx);
NNI_FREE_STRUCT(ep);
Expand All @@ -655,8 +650,8 @@ tlstran_ep_stop(void *arg)
{
tlstran_ep *ep = arg;

nni_aio_stop(ep->timeaio);
nni_aio_stop(ep->connaio);
nni_aio_stop(&ep->timeaio);
nni_aio_stop(&ep->connaio);
nng_stream_dialer_stop(ep->dialer);
nng_stream_listener_stop(ep->listener);
}
Expand All @@ -669,7 +664,7 @@ tlstran_ep_close(void *arg)

nni_mtx_lock(&ep->mtx);
ep->closed = true;
nni_aio_close(ep->timeaio);
nni_aio_close(&ep->timeaio);

if (ep->dialer != NULL) {
nng_stream_dialer_close(ep->dialer);
Expand Down Expand Up @@ -697,16 +692,16 @@ static void
tlstran_timer_cb(void *arg)
{
tlstran_ep *ep = arg;
if (nni_aio_result(ep->timeaio) == 0) {
nng_stream_listener_accept(ep->listener, ep->connaio);
if (nni_aio_result(&ep->timeaio) == 0) {
nng_stream_listener_accept(ep->listener, &ep->connaio);
}
}

static void
tlstran_accept_cb(void *arg)
{
tlstran_ep *ep = arg;
nni_aio *aio = ep->connaio;
nni_aio *aio = &ep->connaio;
tlstran_pipe *p;
int rv;
nng_stream *conn;
Expand All @@ -730,7 +725,7 @@ tlstran_accept_cb(void *arg)
goto error;
}
tlstran_pipe_start(p, conn, ep);
nng_stream_listener_accept(ep->listener, ep->connaio);
nng_stream_listener_accept(ep->listener, aio);
nni_mtx_unlock(&ep->mtx);
return;

Expand All @@ -746,15 +741,15 @@ tlstran_accept_cb(void *arg)
case NNG_ENOMEM:
case NNG_ENOFILES:
// We need to cool down here, to avoid spinning.
nng_sleep_aio(10, ep->timeaio);
nng_sleep_aio(10, &ep->timeaio);
break;

default:
// Start another accept. This is done because we want to
// ensure that TLS negotiations are disconnected from
// the upper layer accept logic.
if (!ep->closed) {
nng_stream_listener_accept(ep->listener, ep->connaio);
nng_stream_listener_accept(ep->listener, &ep->connaio);
}
break;
}
Expand All @@ -765,7 +760,7 @@ static void
tlstran_dial_cb(void *arg)
{
tlstran_ep *ep = arg;
nni_aio *aio = ep->connaio;
nni_aio *aio = &ep->connaio;
tlstran_pipe *p;
int rv;
nng_stream *conn;
Expand Down Expand Up @@ -850,13 +845,13 @@ tlstran_ep_init_dialer(void **dp, nng_url *url, nni_dialer *ndialer)
return (NNG_EADDRINVAL);
}

if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) ||
((rv = nni_aio_alloc(&ep->connaio, tlstran_dial_cb, ep)) != 0)) {
if ((rv = tlstran_ep_init(&ep, url, sock)) != 0) {
return (rv);
}

if ((rv != 0) ||
((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0)) {
nni_aio_init(&ep->connaio, tlstran_dial_cb, ep);

if ((rv = nng_stream_dialer_alloc_url(&ep->dialer, url)) != 0) {
tlstran_ep_fini(ep);
return (rv);
}
Expand All @@ -883,12 +878,13 @@ tlstran_ep_init_listener(void **lp, nng_url *url, nni_listener *nlistener)
return (NNG_EADDRINVAL);
}
if (((rv = tlstran_ep_init(&ep, url, sock)) != 0) ||
((rv = nni_aio_alloc(&ep->connaio, tlstran_accept_cb, ep)) != 0) ||
((rv = nni_aio_alloc(&ep->timeaio, tlstran_timer_cb, ep)) != 0) ||
((rv = nng_stream_listener_alloc_url(&ep->listener, url)) != 0)) {
tlstran_ep_fini(ep);
return (rv);
}
nni_aio_init(&ep->connaio, tlstran_accept_cb, ep);
nni_aio_init(&ep->timeaio, tlstran_timer_cb, ep);

#ifdef NNG_ENABLE_STATS
nni_listener_add_stat(nlistener, &ep->st_rcv_max);
#endif
Expand Down Expand Up @@ -936,7 +932,7 @@ tlstran_ep_connect(void *arg, nni_aio *aio)
}
ep->useraio = aio;

nng_stream_dialer_dial(ep->dialer, ep->connaio);
nng_stream_dialer_dial(ep->dialer, &ep->connaio);
nni_mtx_unlock(&ep->mtx);
}

Expand Down Expand Up @@ -987,7 +983,7 @@ tlstran_ep_accept(void *arg, nni_aio *aio)
ep->useraio = aio;
if (!ep->started) {
ep->started = true;
nng_stream_listener_accept(ep->listener, ep->connaio);
nng_stream_listener_accept(ep->listener, &ep->connaio);
} else {
tlstran_ep_match(ep);
}
Expand Down

0 comments on commit 23da22c

Please sign in to comment.