Skip to content

Commit

Permalink
proxy: wait timeout API
Browse files Browse the repository at this point in the history
give request contexts the ability to time out on wait operations. this
allows moving along the logic without cancelling in flight requests or
resetting backends.
  • Loading branch information
dormando committed Apr 2, 2024
1 parent ea66fe0 commit 8cf0d0f
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 37 deletions.
4 changes: 3 additions & 1 deletion proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,9 @@ static void _proxy_run_tresp_to_resp(mc_resp *tresp, mc_resp *resp) {
int proxy_run_rcontext(mcp_rcontext_t *rctx) {
int nresults = 0;
lua_State *Lc = rctx->Lc;
int cores = lua_resume(Lc, NULL, 1, &nresults);
assert(rctx->lua_narg != 0);
int cores = lua_resume(Lc, NULL, rctx->lua_narg, &nresults);
rctx->lua_narg = 1; // reset to default since not-default is uncommon.
size_t rlen = 0;
conn *c = rctx->c;
mc_resp *resp = rctx->resp;
Expand Down
10 changes: 9 additions & 1 deletion proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ struct mcp_memprofile {
uint64_t alloc_bytes[8];
};

// for various time conversion functions
#define NANOSECONDS(x) ((x) * 1E9 + 0.5)
#define MICROSECONDS(x) ((x) * 1E6 + 0.5)

// Note: value created from thin air. Could be shorter.
#define MCP_REQUEST_MAXLEN KEY_MAX_LENGTH * 2

Expand Down Expand Up @@ -614,7 +618,8 @@ enum mcp_rqueue_e {
QWAIT_OK,
QWAIT_GOOD,
QWAIT_FASTGOOD,
QWAIT_HANDLE
QWAIT_HANDLE,
QWAIT_SLEEP,
};

enum mcp_funcgen_router_e {
Expand Down Expand Up @@ -707,13 +712,15 @@ struct mcp_rcontext_s {
int parent_handle; // queue slot in parent rctx
int conn_fd; // fd of the originating client, as *c can become invalid
enum mcp_rqueue_e wait_mode;
uint8_t lua_narg; // number of responses to push when yield resuming.
bool first_queue; // HACK
lua_State *Lc; // coroutine thread pointer.
mcp_request_t *request; // ptr to the above reference.
mcp_rcontext_t *parent; // parent rctx in the call graph
conn *c; // associated client object.
mc_resp *resp; // top level response object to fill.
mcp_funcgen_t *fgen; // parent function generator context.
struct event timeout_event; // for *_wait_timeout() and sleep() calls
struct mcp_rqueue_s qslots[]; // queueable slots.
};

Expand All @@ -723,6 +730,7 @@ int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res)
int mcplib_rcontext_handle_set_cb(lua_State *L);
int mcplib_rcontext_enqueue(lua_State *L);
int mcplib_rcontext_wait_cond(lua_State *L);
int mcplib_rcontext_wait_cond_timeout(lua_State *L);
int mcplib_rcontext_wait_handle(lua_State *L);
int mcplib_rcontext_enqueue_and_wait(lua_State *L);
int mcplib_rcontext_res_good(lua_State *L);
Expand Down
5 changes: 1 addition & 4 deletions proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@

#include "proxy.h"

// sad, I had to look this up...
#define NANOSECONDS(x) ((x) * 1E9 + 0.5)
#define MICROSECONDS(x) ((x) * 1E6 + 0.5)

// func prototype example:
// static int fname (lua_State *L)
// normal library open:
Expand Down Expand Up @@ -1491,6 +1487,7 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
{"handle_set_cb", mcplib_rcontext_handle_set_cb},
{"enqueue", mcplib_rcontext_enqueue},
{"wait_cond", mcplib_rcontext_wait_cond},
{"wait_cond_timeout", mcplib_rcontext_wait_cond_timeout},
{"enqueue_and_wait", mcplib_rcontext_enqueue_and_wait},
{"wait_handle", mcplib_rcontext_wait_handle},
{"res_good", mcplib_rcontext_res_good},
Expand Down
133 changes: 102 additions & 31 deletions proxy_luafgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,24 @@ static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_p
static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen);
static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx);
static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen);
static void mcp_resume_rctx_from_cb(mcp_rcontext_t *rctx);
static void proxy_return_rqu_cb(io_pending_t *pending);

static inline void _mcp_queue_hack(conn *c) {
if (c) {
// HACK
// see notes above proxy_run_rcontext.
// in case the above resume calls queued new work, we have to submit
// it to the backend handling system here.
for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
if (q->stack_ctx != NULL) {
io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
qcb->submit_cb(q);
}
}
}
}

// If we're GC'ed but not closed, it means it was created but never
// attached to a function, so ensure everything is closed properly.
int mcplib_funcgen_gc(lua_State *L) {
Expand All @@ -22,6 +38,37 @@ int mcplib_funcgen_gc(lua_State *L) {
return 0;
}

// handler for *_wait_cond_timeout() variants and sleep calls
static void mcp_funcgen_wait_handler(const int fd, const short which, void *arg) {
mcp_rcontext_t *rctx = arg;

// if we were in waiting: reset wait mode, push wait_done + boolean true
// if we were in sleep: reset wait mode.
// immediately resume.
lua_settop(rctx->Lc, 0);
rctx->wait_count = 0;
rctx->lua_narg = 2;
if (rctx->wait_mode == QWAIT_HANDLE) {
// if timed out then we shouldn't have a result. just push nil.
lua_pushnil(rctx->Lc);
} else if (rctx->wait_mode == QWAIT_SLEEP) {
// no extra arg.
rctx->lua_narg = 1;
} else {
// how many results were processed
lua_pushinteger(rctx->Lc, rctx->wait_done);
}
// "timed out"
lua_pushboolean(rctx->Lc, 1);

rctx->wait_mode = QWAIT_IDLE;

mcp_resume_rctx_from_cb(rctx);

// like proxy_return_rqu_dummy_cb, need the HACK section.
_mcp_queue_hack(rctx->c);
}

// For describing functions which generate functions which can execute
// requests.
// These "generator functions" handle pre-allocating and creating a memory
Expand Down Expand Up @@ -144,6 +191,7 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
lua_setmetatable(L, -2);
// allow the rctx to reference the function generator.
rc->fgen = fgen;
rc->lua_narg = 1;

// initialize the queue slots based on the fgen parent
for (int x = 0; x < fgen->max_queues; x++) {
Expand Down Expand Up @@ -231,6 +279,8 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, 1);

event_assign(&rc->timeout_event, t->base, -1, EV_TIMEOUT, mcp_funcgen_wait_handler, rc);

// return the fgen.
// FIXME: just return 0? need to adjust caller to not mis-ref the
// generator function.
Expand Down Expand Up @@ -884,19 +934,8 @@ static void proxy_return_rqu_dummy_cb(io_pending_t *pending) {
mcp_resume_rctx_from_cb(rctx);

do_cache_free(p->thread->io_cache, p);
// We always need a C object right now, but just in case.
if (c) {
// HACK
// see notes above proxy_run_rcontext.
// in case the above resume calls queued new work, we have to submit
// it to the backend handling system here.
for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
if (q->stack_ctx != NULL) {
io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
qcb->submit_cb(q);
}
}
}

_mcp_queue_hack(c);
}

void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle) {
Expand Down Expand Up @@ -949,6 +988,9 @@ void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle) {
rqu->state = RQUEUE_WAITED;
}
break;
case QWAIT_SLEEP:
assert(1 == 0); // should not get here.
break;
}

assert(rctx->pending_reqs != 0);
Expand All @@ -965,6 +1007,11 @@ void mcp_process_rctx_wait(mcp_rcontext_t *rctx, int handle) {
}
rctx->wait_mode = QWAIT_IDLE;

// nuke alarm if set.
if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
event_del(&rctx->timeout_event);
}

mcp_resume_rctx_from_cb(rctx);
}
}
Expand Down Expand Up @@ -1044,19 +1091,7 @@ static void proxy_return_rqu_cb(io_pending_t *pending) {

do_cache_free(p->thread->io_cache, p);

// We always need a C object right now, but just in case.
if (c) {
// HACK
// see notes above proxy_run_rcontext.
// in case the above resume calls queued new work, we have to submit
// it to the backend handling system here.
for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {
if (q->stack_ctx != NULL) {
io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);
qcb->submit_cb(q);
}
}
}
_mcp_queue_hack(c);
}

void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
Expand Down Expand Up @@ -1096,9 +1131,7 @@ void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
// TODO: one more function to wait on a list of handles? to queue and wait on
// a list of handles? expand wait_cond()

// takes num, filter mode
int mcplib_rcontext_wait_cond(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);
static inline int _mcplib_rcontext_wait_prep(lua_State *L, mcp_rcontext_t *rctx) {
int mode = QWAIT_ANY;
int wait = 0;

Expand All @@ -1116,7 +1149,6 @@ int mcplib_rcontext_wait_cond(lua_State *L) {
proxy_lua_error(L, "wait count for wait_cond must be positive");
return 0;
}
rctx->wait_count = wait;
}

if (lua_isnumber(L, 3)) {
Expand All @@ -1133,11 +1165,22 @@ int mcplib_rcontext_wait_cond(lua_State *L) {
proxy_lua_error(L, "invalid mode sent to wait_cond");
return 0;
}

rctx->wait_count = wait;
rctx->wait_done = 0;
rctx->wait_mode = mode;

return 0;
}

// takes num, filter mode
int mcplib_rcontext_wait_cond(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);

_mcplib_rcontext_wait_prep(L, rctx);

// waiting for none, meaning just execute the queues.
if (wait == 0) {
if (rctx->wait_count == 0) {
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, NULL);
p->return_cb = proxy_return_rqu_dummy_cb;
p->await_background = true;
Expand All @@ -1149,6 +1192,34 @@ int mcplib_rcontext_wait_cond(lua_State *L) {
return lua_yield(L, 1);
}

int mcplib_rcontext_wait_cond_timeout(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);

_mcplib_rcontext_wait_prep(L, rctx);

// FIXME: could also just skip the timeout portion and not error?
if (rctx->wait_count == 0) {
proxy_lua_error(L, "wait_cond_timeout requires non zero wait_count");
}

if (lua_isnumber(L, 4)) {
lua_Number secondsf = lua_tonumber(L, -1);
int pending = event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL);
if ((pending & (EV_TIMEOUT)) == 0) {
struct timeval tv = { .tv_sec = 0, .tv_usec = 0 };
lua_Integer secondsi = (lua_Integer) secondsf;
lua_Number subseconds = secondsf - secondsi;

tv.tv_sec = secondsi;
tv.tv_usec = MICROSECONDS(subseconds);
event_add(&rctx->timeout_event, &tv);
}
}

lua_pushinteger(L, MCP_YIELD_WAITCOND);
return lua_yield(L, 1);
}

int mcplib_rcontext_enqueue_and_wait(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);
mcp_request_t *rq = luaL_checkudata(L, 2, "mcp.request");
Expand Down
57 changes: 57 additions & 0 deletions t/proxyrctxtimeout.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
function mcp_config_pools()
local b1 = mcp.backend('b1', '127.0.0.1', 12141)
local b2 = mcp.backend('b2', '127.0.0.1', 12142)
local b3 = mcp.backend('b3', '127.0.0.1', 12143)

return {
z1 = mcp.pool({b1}),
z2 = mcp.pool({b2}),
z3 = mcp.pool({b3})
}
end

function cond_timeout(p)
local fgen = mcp.funcgen_new()
local near = fgen:new_handle(p.z1)
local far = { fgen:new_handle(p.z2),
fgen:new_handle(p.z3) }

local all = { near, far[1], far[2] }

fgen:ready({ n = "cond_timeout", f = function(rctx)
return function(r)
rctx:enqueue(r, near)
local done, timeout = rctx:wait_cond_timeout(1, mcp.WAIT_GOOD, 0.5)

if timeout then
rctx:enqueue(r, far)
local done = rctx:wait_cond(1, mcp.WAIT_GOOD)
for x=1,#all do
local res = rctx:res_any(all[x])
if res then
return res
end
end
return "SERVER_ERROR no responses\r\n"
else
return rctx:res_any(near)
end
end
end})
return fgen
end

-- TODO: different cond_timeout test with 2/3 instead of 1

function mcp_config_routes(p)
local map = {
["cond_timeout"] = cond_timeout(p),
}

-- defaults are fine. "prefix/etc"
local router = mcp.router_new({
map = map,
})

mcp.attach(mcp.CMD_MG, router)
end
Loading

0 comments on commit 8cf0d0f

Please sign in to comment.