Skip to content

Commit

Permalink
fixes #1663 Request/Reply Protocol Throughput and Scalability
Browse files Browse the repository at this point in the history
This eliminates the req protocols use of nni_timer (and setting
a single timer node per request.  This was problematic because it
devolves into O(n^2) as we wind up inserting timer nodes and having
to scan the list for the timer node.

The solution is to use a single scan - stop worrying about insertion,
but instead use a coarse granularity timer (defaults to 1 second)
for retries.  Then do the O(n) scan just once per interval.

A new option, NNG_OPT_REQ_RESENDTICK, can be used to change the tick
interval for cases (like unit tests) where more fine grained timing
is required.
  • Loading branch information
gdamore committed Dec 17, 2023
1 parent cc58517 commit 74bc9ea
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 89 deletions.
25 changes: 21 additions & 4 deletions docs/man/nng_req.7.adoc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
= nng_req(7)
//
// Copyright 2018 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2023 Staysail Systems, Inc. <info@staysail.tech>
// Copyright 2018 Capitar IT Group BV <info@capitar.com>
//
// This document is supplied under the terms of the MIT License, a
Expand Down Expand Up @@ -101,9 +101,26 @@ The following protocol-specific option is available.
When a new request is started, a timer of this duration is also started.
If no reply is received before this timer expires, then the request will
be resent.
(Requests are also automatically resent if the peer to whom
the original request was sent disconnects, or if a peer becomes available
while the requester is waiting for an available peer.)
+
(Requests are also automatically resent if the peer to whom
the original request was sent disconnects, or if a peer becomes available
while the requester is waiting for an available peer.)
+
Resending may be deferred up to the value of the `NNG_OPT_RESENDTICK` parameter.

((`NNG_OPT_REQ_RESENDTICK`))::

(xref:nng_duration.5.adoc[`nng_duration`])
This is the granularity of the clock that is used to check for resending.
The default is a second. Setting this to a higher rate will allow for
more timely resending to occur, but may incur significant additional
overhead when the socket has many outstanding requests (contexts).
+
When there are no requests outstanding that have a resend set, then
the clock does not tick at all.
+
This option is shared for all contexts on a socket, and is only available for the socket itself.


=== Protocol Headers

Expand Down
1 change: 1 addition & 0 deletions include/nng/protocol/reqrep0/req.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ NNG_DECL int nng_req0_open_raw(nng_socket *);
#define NNG_REQ0_PEER_NAME "rep"

#define NNG_OPT_REQ_RESENDTIME "req:resend-time"
#define NNG_OPT_REQ_RESENDTICK "req:resend-tick"

#ifdef __cplusplus
}
Expand Down
158 changes: 110 additions & 48 deletions src/sp/protocol/reqrep0/req.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,49 @@ typedef struct req0_ctx req0_ctx;

static void req0_run_send_queue(req0_sock *, nni_aio_completions *);
static void req0_ctx_reset(req0_ctx *);
static void req0_ctx_timeout(void *);
static void req0_pipe_fini(void *);
static void req0_ctx_fini(void *);
static void req0_ctx_init(void *, void *);
static void req0_retry_cb(void *);

// A req0_ctx is a "context" for the request. It uses most of the
// socket, but keeps track of its own outstanding replays, the request ID,
// and so forth.
struct req0_ctx {
req0_sock *sock;
nni_list_node sock_node; // node on the socket context list
nni_list_node send_node; // node on the send_queue
nni_list_node pipe_node; // node on the pipe list
uint32_t request_id; // request ID, without high bit set
nni_aio *recv_aio; // user aio waiting to recv - only one!
nni_aio *send_aio; // user aio waiting to send
nng_msg *req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg *rep_msg; // reply message
nni_timer_node timer;
nni_duration retry;
bool conn_reset; // sent message w/o retry, peer disconnect
req0_sock *sock;
nni_list_node sock_node; // node on the socket context list
nni_list_node send_node; // node on the send_queue
nni_list_node pipe_node; // node on the pipe list
nni_list_node retry_node; // node on the socket retry list
uint32_t request_id; // request ID, without high bit set
nni_aio *recv_aio; // user aio waiting to recv - only one!
nni_aio *send_aio; // user aio waiting to send
nng_msg *req_msg; // request message (owned by protocol)
size_t req_len; // length of request message (for stats)
nng_msg *rep_msg; // reply message
nni_duration retry;
nni_time retry_time; // retry after this expires
bool conn_reset; // sent message w/o retry, peer disconnect
};

// A req0_sock is our per-socket protocol private structure.
struct req0_sock {
nni_duration retry;
bool closed;
bool retry_active; // true if retry aio running
nni_atomic_int ttl;
req0_ctx master; // base socket master
nni_list ready_pipes;
nni_list busy_pipes;
nni_list stop_pipes;
nni_list contexts;
nni_list send_queue; // contexts waiting to send.
nni_id_map requests; // contexts by request ID
nni_list retry_queue;
nni_aio retry_aio; // retry timer
nni_id_map requests; // contexts by request ID
nni_pollable readable;
nni_pollable writable;
nni_duration retry_tick; // clock interval for retry timer
nni_mtx mtx;
};

Expand Down Expand Up @@ -95,16 +100,20 @@ req0_sock_init(void *arg, nni_sock *sock)
NNI_LIST_INIT(&s->busy_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->stop_pipes, req0_pipe, node);
NNI_LIST_INIT(&s->send_queue, req0_ctx, send_node);
NNI_LIST_INIT(&s->retry_queue, req0_ctx, retry_node);
NNI_LIST_INIT(&s->contexts, req0_ctx, sock_node);

// this is "semi random" start for request IDs.
s->retry = NNI_SECOND * 60;
s->retry = NNI_SECOND * 60;
s->retry_tick = NNI_SECOND; // how often we check for retries

req0_ctx_init(&s->master, s);

nni_pollable_init(&s->writable);
nni_pollable_init(&s->readable);

nni_aio_init(&s->retry_aio, req0_retry_cb, s);

nni_atomic_init(&s->ttl);
nni_atomic_set(&s->ttl, 8);
}
Expand All @@ -130,6 +139,7 @@ req0_sock_fini(void *arg)
{
req0_sock *s = arg;

nni_aio_stop(&s->retry_aio);
nni_mtx_lock(&s->mtx);
NNI_ASSERT(nni_list_empty(&s->busy_pipes));
NNI_ASSERT(nni_list_empty(&s->stop_pipes));
Expand All @@ -140,6 +150,7 @@ req0_sock_fini(void *arg)
nni_pollable_fini(&s->readable);
nni_pollable_fini(&s->writable);
nni_id_map_fini(&s->requests);
nni_aio_fini(&s->retry_aio);
nni_mtx_fini(&s->mtx);
}

Expand Down Expand Up @@ -236,12 +247,9 @@ req0_pipe_close(void *arg)
ctx->conn_reset = true;
}
} else {
// Reset the timer on this so it expires immediately.
// This is actually easier than canceling the timer and
// running the send_queue separately. (In particular,
// it avoids a potential deadlock on cancelling the
// timer.)
nni_timer_schedule(&ctx->timer, NNI_TIME_ZERO);
// Reset the retry time to make it expire immediately.
// The timer should already be running.
ctx->retry_time = nni_clock();
}
}
nni_mtx_unlock(&s->mtx);
Expand Down Expand Up @@ -363,16 +371,41 @@ req0_recv_cb(void *arg)
}

static void
req0_ctx_timeout(void *arg)
req0_retry_cb(void *arg)
{
req0_ctx *ctx = arg;
req0_sock *s = ctx->sock;

req0_sock *s = arg;
req0_ctx *ctx;
nni_time now;
bool reschedule = false;

// The design of this is that retries are infrequent, because
// we should normally be succeeding. We also hope that we are not
// executing this linear scan of all requests too often, once
// per clock tick is all we want.
now = nni_clock();
nni_mtx_lock(&s->mtx);
if ((ctx->req_msg != NULL) && (!s->closed)) {
if (s->closed || (nni_aio_result(&s->retry_aio) != 0)) {
nni_mtx_unlock(&s->mtx);
return;
}

NNI_LIST_FOREACH (&s->retry_queue, ctx) {
if (ctx->retry_time > now || (ctx->req_msg == NULL)) {
continue;
}
if (!nni_list_node_active(&ctx->send_node)) {
nni_list_append(&s->send_queue, ctx);
}
reschedule = true;
}
if (!nni_list_empty(&s->retry_queue)) {
// if there are still jobs in the queue waiting to be
// retried, do them.
nni_sleep_aio(s->retry_tick, &s->retry_aio);
} else {
s->retry_active = false;
}
if (reschedule) {
req0_run_send_queue(s, NULL);
}
nni_mtx_unlock(&s->mtx);
Expand All @@ -384,8 +417,6 @@ req0_ctx_init(void *arg, void *sock)
req0_sock *s = sock;
req0_ctx *ctx = arg;

nni_timer_init(&ctx->timer, req0_ctx_timeout, ctx);

nni_mtx_lock(&s->mtx);
ctx->sock = s;
ctx->recv_aio = NULL;
Expand Down Expand Up @@ -415,9 +446,6 @@ req0_ctx_fini(void *arg)
req0_ctx_reset(ctx);
nni_list_remove(&s->contexts, ctx);
nni_mtx_unlock(&s->mtx);

nni_timer_cancel(&ctx->timer);
nni_timer_fini(&ctx->timer);
}

static int
Expand Down Expand Up @@ -448,20 +476,20 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
return;
}

// We have a place to send it, so do the send.
// We have a place to send it, so send it.
// If a sending error occurs that causes the message to
// be dropped, we rely on the resend timer to pick it up.
// We also notify the completion callback if this is the
// first send attempt.
nni_list_remove(&s->send_queue, ctx);

// Schedule a resubmit timer. We only do this if we got
// Schedule a retry. We only do this if we got
// a pipe to send to. Otherwise, we should get handled
// the next time that the send_queue is run. We don't do this
// if the retry is "disabled" with NNG_DURATION_INFINITE.
if (ctx->retry > 0) {
nni_timer_schedule(
&ctx->timer, nni_clock() + ctx->retry);
nni_list_node_remove(&ctx->retry_node);
nni_list_append(&s->retry_queue, ctx);
}

// Put us on the pipe list of active contexts.
Expand Down Expand Up @@ -489,7 +517,7 @@ req0_run_send_queue(req0_sock *s, nni_aio_completions *sent_list)
}

// At this point, we will never give this message back to
// to the user, so we don't have to worry about making it
// the user, so we don't have to worry about making it
// unique. We can freely clone it.
nni_msg_clone(ctx->req_msg);
nni_aio_set_msg(&p->aio_send, ctx->req_msg);
Expand All @@ -503,16 +531,7 @@ req0_ctx_reset(req0_ctx *ctx)
req0_sock *s = ctx->sock;
// Call with sock lock held!

// We cannot safely "wait" using nni_timer_cancel, but this removes
// any scheduled timer activation. If the timeout is already running
// concurrently, it will still run. It should do nothing, because
// we toss the request. There is still a very narrow race if the
// timeout fires, but doesn't actually start running before we
// both finish this function, *and* manage to reschedule another
// request. The consequence of that occurring is that the request
// will be emitted on the wire twice. This is not actually tragic.
nni_timer_schedule(&ctx->timer, NNI_TIME_NEVER);

nni_list_node_remove(&ctx->retry_node);
nni_list_node_remove(&ctx->pipe_node);
nni_list_node_remove(&ctx->send_node);
if (ctx->request_id != 0) {
Expand Down Expand Up @@ -561,7 +580,7 @@ req0_ctx_cancel_recv(nni_aio *aio, void *arg, int rv)
// entire state machine. This allows us to preserve the
// semantic of exactly one receive operation per send
// operation, and should be the least surprising for users. The
// main consequence is that if a receive operation is completed
// main consequence is that if the operation is completed
// (in error or otherwise), the user must submit a new send
// operation to restart the state machine.
req0_ctx_reset(ctx);
Expand Down Expand Up @@ -713,6 +732,15 @@ req0_ctx_send(void *arg, nni_aio *aio)
ctx->send_aio = aio;
nni_aio_set_msg(aio, NULL);

if (ctx->retry > 0) {
ctx->retry_time = nni_clock() + ctx->retry;
nni_list_append(&s->retry_queue, ctx);
if (!s->retry_active) {
s->retry_active = true;
nni_sleep_aio(s->retry_tick, &s->retry_aio);
}
}

// Stick us on the send_queue list.
nni_list_append(&s->send_queue, ctx);

Expand Down Expand Up @@ -771,6 +799,34 @@ req0_sock_get_resend_time(void *arg, void *buf, size_t *szp, nni_opt_type t)
return (req0_ctx_get_resend_time(&s->master, buf, szp, t));
}

static int
req0_sock_set_resend_tick(
void *arg, const void *buf, size_t sz, nni_opt_type t)
{
req0_sock *s = arg;
nng_duration tick;
int rv;

if ((rv = nni_copyin_ms(&tick, buf, sz, t)) == 0) {
nni_mtx_lock(&s->mtx);
s->retry_tick = tick;
nni_mtx_unlock(&s->mtx);
}
return (rv);
}

static int
req0_sock_get_resend_tick(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
req0_sock *s = arg;
nng_duration tick;

nni_mtx_lock(&s->mtx);
tick = s->retry_tick;
nni_mtx_unlock(&s->mtx);
return (nni_copyout_ms(tick, buf, szp, t));
}

static int
req0_sock_get_send_fd(void *arg, void *buf, size_t *szp, nni_opt_type t)
{
Expand Down Expand Up @@ -846,6 +902,12 @@ static nni_option req0_sock_options[] = {
.o_name = NNG_OPT_SENDFD,
.o_get = req0_sock_get_send_fd,
},
{
.o_name = NNG_OPT_REQ_RESENDTICK,
.o_get = req0_sock_get_resend_tick,
.o_set = req0_sock_set_resend_tick,
},

// terminate list
{
.o_name = NULL,
Expand Down
Loading

0 comments on commit 74bc9ea

Please sign in to comment.