Skip to content

Commit

Permalink
aio: introduce nni_aio_defer
Browse files Browse the repository at this point in the history
This will replace nni_aio_schedule, and it includes finishing the
task if needed.  It does so without dropping the lock and so is
more efficient and race free.

This includes some conversion of some subsystems to it.
  • Loading branch information
gdamore committed Dec 16, 2024
1 parent 7c51e7b commit 779c343
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 53 deletions.
5 changes: 4 additions & 1 deletion include/nng/nng.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,11 @@ NNG_DECL void nng_aio_finish(nng_aio *, int);
// final argument is passed to the cancelfn. The final argument of the
// cancellation function is the error number (will not be zero) corresponding
// to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED.
// This returns false if the operation cannot be deferred (because the AIO
// has been stopped with nng_aio_stop.) If it does so, then the aio's
// completion callback will fire with a result of NNG_ECLOSED.
typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int);
NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);
NNG_DECL bool nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *);

// nng_aio_sleep does a "sleeping" operation, basically does nothing
// but wait for the specified number of milliseconds to expire, then
Expand Down
70 changes: 64 additions & 6 deletions src/core/aio.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// found online at https://opensource.org/licenses/MIT.
//

#include "core/aio.h"
#include "core/nng_impl.h"
#include "core/taskq.h"
#include <string.h>
Expand Down Expand Up @@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio)
aio->a_count = 0;
aio->a_cancel_fn = NULL;
aio->a_abort = false;
aio->a_expire_ok = false;
aio->a_sleep = false;

// We should not reschedule anything at this point.
if (aio->a_stop || eq->eq_stop) {
aio->a_result = NNG_ECANCELED;
aio->a_cancel_fn = NULL;
aio->a_expire = NNI_TIME_NEVER;
aio->a_sleep = false;
aio->a_expire_ok = false;
nni_mtx_unlock(&eq->eq_mtx);

return (NNG_ECANCELED);
Expand Down Expand Up @@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
return (0);
}

bool
nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data)
{
nni_aio_expire_q *eq = aio->a_expire_q;
bool timeout = false;

if (!aio->a_sleep && !aio->a_use_expire) {
// Convert the relative timeout to an absolute timeout.
switch (aio->a_timeout) {
case NNG_DURATION_ZERO:
timeout = true;
break;

Check warning on line 423 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L421-L423

Added lines #L421 - L423 were not covered by tests
case NNG_DURATION_INFINITE:
case NNG_DURATION_DEFAULT:
aio->a_expire = NNI_TIME_NEVER;
break;
default:
aio->a_expire = nni_clock() + aio->a_timeout;
break;
}
} else if (aio->a_use_expire && aio->a_expire <= nni_clock()) {
timeout = true;
}

Check warning on line 434 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L433-L434

Added lines #L433 - L434 were not covered by tests

nni_mtx_lock(&eq->eq_mtx);
if (timeout) {
aio->a_sleep = false;
aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);

Check warning on line 442 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L438-L442

Added lines #L438 - L442 were not covered by tests
}
if (aio->a_abort) {
aio->a_sleep = false;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);

Check warning on line 448 in src/core/aio.c

View check run for this annotation

Codecov / codecov/patch

src/core/aio.c#L445-L448

Added lines #L445 - L448 were not covered by tests
}
if (aio->a_stop || eq->eq_stop) {
aio->a_sleep = false;
aio->a_result = NNG_ECLOSED;
nni_mtx_unlock(&eq->eq_mtx);
nni_task_dispatch(&aio->a_task);
return (false);
}

NNI_ASSERT(aio->a_cancel_fn == NULL);
aio->a_cancel_fn = cancel;
aio->a_cancel_arg = data;

// We only schedule expiration if we have a way for the expiration
// handler to actively cancel it.
if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) {
nni_aio_expire_add(aio);
}
nni_mtx_unlock(&eq->eq_mtx);
return (true);
}

// nni_aio_abort is called by a consumer which guarantees that the aio
// is still valid.
void
Expand Down Expand Up @@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv)
void
nni_sleep_aio(nng_duration ms, nng_aio *aio)
{
int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
Expand All @@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio)
aio->a_expire =
ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms;

if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) {
nni_aio_finish_error(aio, rv);
}
// we don't do anything else here, so we can ignore the return
(void) nni_aio_defer(aio, nni_sleep_cancel, NULL);
}

static bool
Expand Down
9 changes: 9 additions & 0 deletions src/core/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,17 @@ extern void nni_aio_bump_count(nni_aio *, size_t);
// is returned. (In that case the caller should probably either return an
// error to its caller, or possibly cause an asynchronous error by calling
// nni_aio_finish_error on this aio.)
//
// NB: This function should be called while holding the lock that will be used
// to cancel the operation. Otherwise a race can occur where the operation
// cannot be canceled, which can lead to apparent hangs.
extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *);

// nni_aio_defer is just like nni_io_schedule, but it also calls the callback
// automatically if the operation cannot be started because the AIO is stopped
// or was canceled before this call (but after nni_aio_begin).
extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *);

extern void nni_sleep_aio(nni_duration, nni_aio *);

// nni_aio_completion_list is used after removing the aio from an
Expand Down
3 changes: 1 addition & 2 deletions src/core/device.c
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,8 @@ nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2)
nni_aio_finish_error(aio, rv);
return;
}
if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) {
if (!nni_aio_defer(aio, device_cancel, d)) {
nni_mtx_unlock(&device_mtx);
nni_aio_finish_error(aio, rv);
nni_reap(&device_reap, d);
}
device_start(d, aio);
Expand Down
5 changes: 1 addition & 4 deletions src/core/sockfd.c
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ static void
sfd_listener_accept(void *arg, nng_aio *aio)
{
sfd_listener *l = arg;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
Expand All @@ -129,9 +128,7 @@ sfd_listener_accept(void *arg, nng_aio *aio)

if (l->listen_cnt) {
sfd_start_conn(l, aio);
} else if ((rv = nni_aio_schedule(aio, sfd_cancel_accept, l)) != 0) {
nni_aio_finish_error(aio, rv);
} else {
} else if (nni_aio_defer(aio, sfd_cancel_accept, l)) {
nni_aio_list_append(&l->accept_q, aio);
}
nni_mtx_unlock(&l->mtx);
Expand Down
4 changes: 1 addition & 3 deletions src/core/tcp.c
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ static void
tcp_dialer_dial(void *arg, nng_aio *aio)
{
tcp_dialer *d = arg;
int rv;
if (nni_aio_begin(aio) != 0) {
return;
}
Expand All @@ -187,9 +186,8 @@ tcp_dialer_dial(void *arg, nng_aio *aio)
nni_aio_finish_error(aio, NNG_ECLOSED);
return;
}
if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) {
if (!nni_aio_defer(aio, tcp_dial_cancel, d)) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&d->conaios, aio);
Expand Down
4 changes: 2 additions & 2 deletions src/nng.c
Original file line number Diff line number Diff line change
Expand Up @@ -2062,10 +2062,10 @@ nng_aio_finish(nng_aio *aio, int rv)
nni_aio_finish(aio, rv, nni_aio_count(aio));
}

void
bool
nng_aio_defer(nng_aio *aio, nng_aio_cancelfn fn, void *arg)
{
nni_aio_schedule(aio, fn, arg);
return (nni_aio_defer(aio, fn, arg));
}

bool
Expand Down
4 changes: 1 addition & 3 deletions src/platform/posix/posix_ipcconn.c
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,14 @@ static void
ipc_send(void *arg, nni_aio *aio)
{
ipc_conn *c = arg;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&c->mtx);

if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) {
if (!nni_aio_defer(aio, ipc_cancel, c)) {
nni_mtx_unlock(&c->mtx);
nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&c->writeq, aio);
Expand Down
16 changes: 4 additions & 12 deletions src/sp/transport/inproc/inproc.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ inproc_pipe_send(void *arg, nni_aio *aio)
{
inproc_pipe *pipe = arg;
inproc_queue *queue = pipe->send_queue;
int rv;

if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol, so
Expand All @@ -204,9 +203,8 @@ inproc_pipe_send(void *arg, nni_aio *aio)
}

nni_mtx_lock(&queue->lock);
if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) {
nni_mtx_unlock(&queue->lock);
nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&queue->writers, aio);
Expand All @@ -219,16 +217,14 @@ inproc_pipe_recv(void *arg, nni_aio *aio)
{
inproc_pipe *pipe = arg;
inproc_queue *queue = pipe->recv_queue;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
}

nni_mtx_lock(&queue->lock);
if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) {
if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) {
nni_mtx_unlock(&queue->lock);
nni_aio_finish_error(aio, rv);
return;
}
nni_aio_list_append(&queue->readers, aio);
Expand Down Expand Up @@ -463,7 +459,6 @@ inproc_ep_connect(void *arg, nni_aio *aio)
{
inproc_ep *ep = arg;
inproc_ep *server;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
Expand All @@ -486,9 +481,8 @@ inproc_ep_connect(void *arg, nni_aio *aio)
// We don't have to worry about the case where a zero timeout
// on connect was specified, as there is no option to specify
// that in the upper API.
if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) {
nni_mtx_unlock(&nni_inproc.mx);
nni_aio_finish_error(aio, rv);
return;
}

Expand Down Expand Up @@ -523,7 +517,6 @@ static void
inproc_ep_accept(void *arg, nni_aio *aio)
{
inproc_ep *ep = arg;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
Expand All @@ -533,9 +526,8 @@ inproc_ep_accept(void *arg, nni_aio *aio)

// We need not worry about the case where a non-blocking
// accept was tried -- there is no API to do such a thing.
if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) {
if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) {
nni_mtx_unlock(&nni_inproc.mx);
nni_aio_finish_error(aio, rv);
return;
}

Expand Down
16 changes: 4 additions & 12 deletions src/sp/transport/ws/websocket.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,13 @@ static void
wstran_pipe_recv(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
}
nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_schedule(aio, wstran_pipe_recv_cancel, p)) != 0) {
if (!nni_aio_defer(aio, wstran_pipe_recv_cancel, p)) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
p->user_rxaio = aio;
Expand All @@ -161,7 +159,6 @@ static void
wstran_pipe_send(void *arg, nni_aio *aio)
{
ws_pipe *p = arg;
int rv;

if (nni_aio_begin(aio) != 0) {
// No way to give the message back to the protocol, so
Expand All @@ -171,9 +168,8 @@ wstran_pipe_send(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&p->mtx);
if ((rv = nni_aio_schedule(aio, wstran_pipe_send_cancel, p)) != 0) {
if (!nni_aio_defer(aio, wstran_pipe_send_cancel, p)) {
nni_mtx_unlock(&p->mtx);
nni_aio_finish_error(aio, rv);
return;
}
p->user_txaio = aio;
Expand Down Expand Up @@ -271,7 +267,6 @@ static void
wstran_listener_accept(void *arg, nni_aio *aio)
{
ws_listener *l = arg;
int rv;

// We already bound, so we just need to look for an available
// pipe (created by the handler), and match it.
Expand All @@ -280,9 +275,8 @@ wstran_listener_accept(void *arg, nni_aio *aio)
return;
}
nni_mtx_lock(&l->mtx);
if ((rv = nni_aio_schedule(aio, wstran_listener_cancel, l)) != 0) {
if (!nni_aio_defer(aio, wstran_listener_cancel, l)) {
nni_mtx_unlock(&l->mtx);
nni_aio_finish_error(aio, rv);
return;
}
nni_list_append(&l->aios, aio);
Expand Down Expand Up @@ -311,16 +305,14 @@ static void
wstran_dialer_connect(void *arg, nni_aio *aio)
{
ws_dialer *d = arg;
int rv;

if (nni_aio_begin(aio) != 0) {
return;
}

nni_mtx_lock(&d->mtx);
if ((rv = nni_aio_schedule(aio, wstran_dialer_cancel, d)) != 0) {
if (!nni_aio_defer(aio, wstran_dialer_cancel, d)) {
nni_mtx_unlock(&d->mtx);
nni_aio_finish_error(aio, rv);
return;
}
NNI_ASSERT(nni_list_empty(&d->aios));
Expand Down
Loading

0 comments on commit 779c343

Please sign in to comment.