diff --git a/include/nng/nng.h b/include/nng/nng.h index d087805d7..4d21287c5 100644 --- a/include/nng/nng.h +++ b/include/nng/nng.h @@ -640,8 +640,11 @@ NNG_DECL void nng_aio_finish(nng_aio *, int); // final argument is passed to the cancelfn. The final argument of the // cancellation function is the error number (will not be zero) corresponding // to the reason for cancellation, e.g. NNG_ETIMEDOUT or NNG_ECANCELED. +// This returns false if the operation cannot be deferred (because the AIO +// has been stopped with nng_aio_stop.) If it does so, then the aio's +// completion callback will fire with a result of NNG_ECLOSED. typedef void (*nng_aio_cancelfn)(nng_aio *, void *, int); -NNG_DECL void nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *); +NNG_DECL bool nng_aio_defer(nng_aio *, nng_aio_cancelfn, void *); // nng_aio_sleep does a "sleeping" operation, basically does nothing // but wait for the specified number of milliseconds to expire, then diff --git a/src/core/aio.c b/src/core/aio.c index eaff5c805..0ac70a0d2 100644 --- a/src/core/aio.c +++ b/src/core/aio.c @@ -8,6 +8,7 @@ // found online at https://opensource.org/licenses/MIT. // +#include "core/aio.h" #include "core/nng_impl.h" #include "core/taskq.h" #include @@ -347,14 +348,14 @@ nni_aio_begin(nni_aio *aio) aio->a_count = 0; aio->a_cancel_fn = NULL; aio->a_abort = false; + aio->a_expire_ok = false; + aio->a_sleep = false; // We should not reschedule anything at this point. if (aio->a_stop || eq->eq_stop) { aio->a_result = NNG_ECANCELED; aio->a_cancel_fn = NULL; aio->a_expire = NNI_TIME_NEVER; - aio->a_sleep = false; - aio->a_expire_ok = false; nni_mtx_unlock(&eq->eq_mtx); return (NNG_ECANCELED); @@ -408,6 +409,65 @@ nni_aio_schedule(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) return (0); } +bool +nni_aio_defer(nni_aio *aio, nni_aio_cancel_fn cancel, void *data) +{ + nni_aio_expire_q *eq = aio->a_expire_q; + bool timeout = false; + + if (!aio->a_sleep && !aio->a_use_expire) { + // Convert the relative timeout to an absolute timeout. + switch (aio->a_timeout) { + case NNG_DURATION_ZERO: + timeout = true; + break; + case NNG_DURATION_INFINITE: + case NNG_DURATION_DEFAULT: + aio->a_expire = NNI_TIME_NEVER; + break; + default: + aio->a_expire = nni_clock() + aio->a_timeout; + break; + } + } else if (aio->a_use_expire && aio->a_expire <= nni_clock()) { + timeout = true; + } + + nni_mtx_lock(&eq->eq_mtx); + if (timeout) { + aio->a_sleep = false; + aio->a_result = aio->a_expire_ok ? 0 : NNG_ETIMEDOUT; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_abort) { + aio->a_sleep = false; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + if (aio->a_stop || eq->eq_stop) { + aio->a_sleep = false; + aio->a_result = NNG_ECLOSED; + nni_mtx_unlock(&eq->eq_mtx); + nni_task_dispatch(&aio->a_task); + return (false); + } + + NNI_ASSERT(aio->a_cancel_fn == NULL); + aio->a_cancel_fn = cancel; + aio->a_cancel_arg = data; + + // We only schedule expiration if we have a way for the expiration + // handler to actively cancel it. + if ((aio->a_expire != NNI_TIME_NEVER) && (cancel != NULL)) { + nni_aio_expire_add(aio); + } + nni_mtx_unlock(&eq->eq_mtx); + return (true); +} + // nni_aio_abort is called by a consumer which guarantees that the aio // is still valid. void @@ -766,7 +826,6 @@ nni_sleep_cancel(nng_aio *aio, void *arg, int rv) void nni_sleep_aio(nng_duration ms, nng_aio *aio) { - int rv; if (nni_aio_begin(aio) != 0) { return; } @@ -788,9 +847,8 @@ nni_sleep_aio(nng_duration ms, nng_aio *aio) aio->a_expire = ms == NNG_DURATION_INFINITE ? NNI_TIME_NEVER : nni_clock() + ms; - if ((rv = nni_aio_schedule(aio, nni_sleep_cancel, NULL)) != 0) { - nni_aio_finish_error(aio, rv); - } + // we don't do anything else here, so we can ignore the return + (void) nni_aio_defer(aio, nni_sleep_cancel, NULL); } static bool diff --git a/src/core/aio.h b/src/core/aio.h index 9491a2fa9..8628d8ef4 100644 --- a/src/core/aio.h +++ b/src/core/aio.h @@ -164,8 +164,17 @@ extern void nni_aio_bump_count(nni_aio *, size_t); // is returned. (In that case the caller should probably either return an // error to its caller, or possibly cause an asynchronous error by calling // nni_aio_finish_error on this aio.) +// +// NB: This function should be called while holding the lock that will be used +// to cancel the operation. Otherwise a race can occur where the operation +// cannot be canceled, which can lead to apparent hangs. extern int nni_aio_schedule(nni_aio *, nni_aio_cancel_fn, void *); +// nni_aio_defer is just like nni_io_schedule, but it also calls the callback +// automatically if the operation cannot be started because the AIO is stopped +// or was canceled before this call (but after nni_aio_begin). +extern bool nni_aio_defer(nni_aio *, nni_aio_cancel_fn, void *); + extern void nni_sleep_aio(nni_duration, nni_aio *); // nni_aio_completion_list is used after removing the aio from an diff --git a/src/core/device.c b/src/core/device.c index 815fafccc..7084d3e41 100644 --- a/src/core/device.c +++ b/src/core/device.c @@ -228,9 +228,8 @@ nni_device(nni_aio *aio, nni_sock *s1, nni_sock *s2) nni_aio_finish_error(aio, rv); return; } - if ((rv = nni_aio_schedule(aio, device_cancel, d)) != 0) { + if (!nni_aio_defer(aio, device_cancel, d)) { nni_mtx_unlock(&device_mtx); - nni_aio_finish_error(aio, rv); nni_reap(&device_reap, d); } device_start(d, aio); diff --git a/src/core/sockfd.c b/src/core/sockfd.c index 787a0783d..1a0e792ff 100644 --- a/src/core/sockfd.c +++ b/src/core/sockfd.c @@ -115,7 +115,6 @@ static void sfd_listener_accept(void *arg, nng_aio *aio) { sfd_listener *l = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; @@ -129,9 +128,7 @@ sfd_listener_accept(void *arg, nng_aio *aio) if (l->listen_cnt) { sfd_start_conn(l, aio); - } else if ((rv = nni_aio_schedule(aio, sfd_cancel_accept, l)) != 0) { - nni_aio_finish_error(aio, rv); - } else { + } else if (nni_aio_defer(aio, sfd_cancel_accept, l)) { nni_aio_list_append(&l->accept_q, aio); } nni_mtx_unlock(&l->mtx); diff --git a/src/core/tcp.c b/src/core/tcp.c index d2e084930..75b938b0d 100644 --- a/src/core/tcp.c +++ b/src/core/tcp.c @@ -177,7 +177,6 @@ static void tcp_dialer_dial(void *arg, nng_aio *aio) { tcp_dialer *d = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } @@ -187,9 +186,8 @@ tcp_dialer_dial(void *arg, nng_aio *aio) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, tcp_dial_cancel, d)) != 0) { + if (!nni_aio_defer(aio, tcp_dial_cancel, d)) { nni_mtx_unlock(&d->mtx); - nni_aio_finish_error(aio, rv); return; } nni_list_append(&d->conaios, aio); diff --git a/src/nng.c b/src/nng.c index 2276f2e1a..cdcdecd9b 100644 --- a/src/nng.c +++ b/src/nng.c @@ -2062,10 +2062,10 @@ nng_aio_finish(nng_aio *aio, int rv) nni_aio_finish(aio, rv, nni_aio_count(aio)); } -void +bool nng_aio_defer(nng_aio *aio, nng_aio_cancelfn fn, void *arg) { - nni_aio_schedule(aio, fn, arg); + return (nni_aio_defer(aio, fn, arg)); } bool diff --git a/src/platform/posix/posix_ipcconn.c b/src/platform/posix/posix_ipcconn.c index 224828ff2..ea4f58280 100644 --- a/src/platform/posix/posix_ipcconn.c +++ b/src/platform/posix/posix_ipcconn.c @@ -246,16 +246,14 @@ static void ipc_send(void *arg, nni_aio *aio) { ipc_conn *c = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&c->mtx); - if ((rv = nni_aio_schedule(aio, ipc_cancel, c)) != 0) { + if (!nni_aio_defer(aio, ipc_cancel, c)) { nni_mtx_unlock(&c->mtx); - nni_aio_finish_error(aio, rv); return; } nni_aio_list_append(&c->writeq, aio); diff --git a/src/sp/transport/inproc/inproc.c b/src/sp/transport/inproc/inproc.c index 903174992..fcf755662 100644 --- a/src/sp/transport/inproc/inproc.c +++ b/src/sp/transport/inproc/inproc.c @@ -193,7 +193,6 @@ inproc_pipe_send(void *arg, nni_aio *aio) { inproc_pipe *pipe = arg; inproc_queue *queue = pipe->send_queue; - int rv; if (nni_aio_begin(aio) != 0) { // No way to give the message back to the protocol, so @@ -204,9 +203,8 @@ inproc_pipe_send(void *arg, nni_aio *aio) } nni_mtx_lock(&queue->lock); - if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) { nni_mtx_unlock(&queue->lock); - nni_aio_finish_error(aio, rv); return; } nni_aio_list_append(&queue->writers, aio); @@ -219,16 +217,14 @@ inproc_pipe_recv(void *arg, nni_aio *aio) { inproc_pipe *pipe = arg; inproc_queue *queue = pipe->recv_queue; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&queue->lock); - if ((rv = nni_aio_schedule(aio, inproc_queue_cancel, queue)) != 0) { + if (!nni_aio_defer(aio, inproc_queue_cancel, queue)) { nni_mtx_unlock(&queue->lock); - nni_aio_finish_error(aio, rv); return; } nni_aio_list_append(&queue->readers, aio); @@ -463,7 +459,6 @@ inproc_ep_connect(void *arg, nni_aio *aio) { inproc_ep *ep = arg; inproc_ep *server; - int rv; if (nni_aio_begin(aio) != 0) { return; @@ -486,9 +481,8 @@ inproc_ep_connect(void *arg, nni_aio *aio) // We don't have to worry about the case where a zero timeout // on connect was specified, as there is no option to specify // that in the upper API. - if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) { nni_mtx_unlock(&nni_inproc.mx); - nni_aio_finish_error(aio, rv); return; } @@ -523,7 +517,6 @@ static void inproc_ep_accept(void *arg, nni_aio *aio) { inproc_ep *ep = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; @@ -533,9 +526,8 @@ inproc_ep_accept(void *arg, nni_aio *aio) // We need not worry about the case where a non-blocking // accept was tried -- there is no API to do such a thing. - if ((rv = nni_aio_schedule(aio, inproc_ep_cancel, ep)) != 0) { + if (!nni_aio_defer(aio, inproc_ep_cancel, ep)) { nni_mtx_unlock(&nni_inproc.mx); - nni_aio_finish_error(aio, rv); return; } diff --git a/src/sp/transport/ws/websocket.c b/src/sp/transport/ws/websocket.c index f962adbff..ab139a48d 100644 --- a/src/sp/transport/ws/websocket.c +++ b/src/sp/transport/ws/websocket.c @@ -126,15 +126,13 @@ static void wstran_pipe_recv(void *arg, nni_aio *aio) { ws_pipe *p = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, wstran_pipe_recv_cancel, p)) != 0) { + if (!nni_aio_defer(aio, wstran_pipe_recv_cancel, p)) { nni_mtx_unlock(&p->mtx); - nni_aio_finish_error(aio, rv); return; } p->user_rxaio = aio; @@ -161,7 +159,6 @@ static void wstran_pipe_send(void *arg, nni_aio *aio) { ws_pipe *p = arg; - int rv; if (nni_aio_begin(aio) != 0) { // No way to give the message back to the protocol, so @@ -171,9 +168,8 @@ wstran_pipe_send(void *arg, nni_aio *aio) return; } nni_mtx_lock(&p->mtx); - if ((rv = nni_aio_schedule(aio, wstran_pipe_send_cancel, p)) != 0) { + if (!nni_aio_defer(aio, wstran_pipe_send_cancel, p)) { nni_mtx_unlock(&p->mtx); - nni_aio_finish_error(aio, rv); return; } p->user_txaio = aio; @@ -271,7 +267,6 @@ static void wstran_listener_accept(void *arg, nni_aio *aio) { ws_listener *l = arg; - int rv; // We already bound, so we just need to look for an available // pipe (created by the handler), and match it. @@ -280,9 +275,8 @@ wstran_listener_accept(void *arg, nni_aio *aio) return; } nni_mtx_lock(&l->mtx); - if ((rv = nni_aio_schedule(aio, wstran_listener_cancel, l)) != 0) { + if (!nni_aio_defer(aio, wstran_listener_cancel, l)) { nni_mtx_unlock(&l->mtx); - nni_aio_finish_error(aio, rv); return; } nni_list_append(&l->aios, aio); @@ -311,16 +305,14 @@ static void wstran_dialer_connect(void *arg, nni_aio *aio) { ws_dialer *d = arg; - int rv; if (nni_aio_begin(aio) != 0) { return; } nni_mtx_lock(&d->mtx); - if ((rv = nni_aio_schedule(aio, wstran_dialer_cancel, d)) != 0) { + if (!nni_aio_defer(aio, wstran_dialer_cancel, d)) { nni_mtx_unlock(&d->mtx); - nni_aio_finish_error(aio, rv); return; } NNI_ASSERT(nni_list_empty(&d->aios)); diff --git a/src/supplemental/http/http_conn.c b/src/supplemental/http/http_conn.c index 33ed70bb6..9ff2c9978 100644 --- a/src/supplemental/http/http_conn.c +++ b/src/supplemental/http/http_conn.c @@ -363,8 +363,6 @@ http_rd_cancel(nni_aio *aio, void *arg, int rv) static void http_rd_submit(nni_http_conn *conn, nni_aio *aio, enum read_flavor flavor) { - int rv; - if (nni_aio_begin(aio) != 0) { return; } @@ -372,8 +370,7 @@ http_rd_submit(nni_http_conn *conn, nni_aio *aio, enum read_flavor flavor) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, http_rd_cancel, conn)) != 0) { - nni_aio_finish_error(aio, rv); + if (!nni_aio_defer(aio, http_rd_cancel, conn)) { return; } conn->rd_flavor = flavor; @@ -483,8 +480,6 @@ http_wr_cancel(nni_aio *aio, void *arg, int rv) static void http_wr_submit(nni_http_conn *conn, nni_aio *aio, enum write_flavor flavor) { - int rv; - if (nni_aio_begin(aio) != 0) { return; } @@ -492,8 +487,7 @@ http_wr_submit(nni_http_conn *conn, nni_aio *aio, enum write_flavor flavor) nni_aio_finish_error(aio, NNG_ECLOSED); return; } - if ((rv = nni_aio_schedule(aio, http_wr_cancel, conn)) != 0) { - nni_aio_finish_error(aio, rv); + if (!nni_aio_defer(aio, http_wr_cancel, conn)) { return; } conn->wr_flavor = flavor;