Skip to content

Commit

Permalink
proxy: internally track rctx:*_new objects
Browse files Browse the repository at this point in the history
We need to issue a request or result object cleanup _after_ a request
context has been completed and returned to the slot cache. Else request
or result buffer memory will hang around until the next time the slot is
used.

This requires internally tracking when these objects are created and
directly freeing them. We do this by adding extra rqu's in the existing
rcontext structure, so we can efficiently loop them instead of having to
go through lua.
  • Loading branch information
dormando committed Oct 30, 2024
1 parent 5c225c2 commit b038067
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 13 deletions.
7 changes: 6 additions & 1 deletion proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,7 @@ struct mcp_funcgen_s {
int self_ref; // self-reference if we're attached anywhere
int argument_ref; // reference to an argument to pass to generator
int max_queues; // how many queue slots rctx's have
int extra_queues; // how many extra queue slots for object storage we have
unsigned int refcount; // reference counter
unsigned int total; // total contexts managed
unsigned int free; // free contexts
Expand Down Expand Up @@ -702,12 +703,16 @@ struct mcp_funcgen_router {
#define RQUEUE_TYPE_NONE 0
#define RQUEUE_TYPE_POOL 1
#define RQUEUE_TYPE_FGEN 2
#define RQUEUE_TYPE_UOBJ 3 // user tracked object types past this point
#define RQUEUE_TYPE_UOBJ_REQ 4
#define RQUEUE_TYPE_UOBJ_RES 5
#define RQUEUE_ASSIGNED (1<<0)
#define RQUEUE_R_RESUME (1<<1)
#define RQUEUE_R_GOOD (1<<3)
#define RQUEUE_R_OK (1<<4)
#define RQUEUE_R_ANY (1<<5)
#define RQUEUE_R_ERROR (1<<7)
#define RQUEUE_UOBJ_MAX UINT8_MAX

enum mcp_rqueue_state {
RQUEUE_IDLE = 0,
Expand Down Expand Up @@ -735,7 +740,6 @@ struct mcp_rcontext_s {
int request_ref; // top level request for this context.
int function_ref; // ref to the created route function.
int coroutine_ref; // ref to our encompassing coroutine.
unsigned int async_pending; // legacy async handling
int pending_reqs; // pending requests and sub-requests
unsigned int wait_count;
unsigned int wait_done; // TODO: change these variables to uint8's
Expand All @@ -744,6 +748,7 @@ struct mcp_rcontext_s {
int conn_fd; // fd of the originating client, as *c can become invalid
enum mcp_rqueue_e wait_mode;
uint8_t lua_narg; // number of responses to push when yield resuming.
uint8_t obj_count; // number of extra tracked req/res objects.
bool first_queue; // HACK
lua_State *Lc; // coroutine thread pointer.
mcp_request_t *request; // ptr to the above reference.
Expand Down
81 changes: 75 additions & 6 deletions proxy_luafgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ static void mcp_rcontext_cleanup(lua_State *L, mcp_funcgen_t *fgen, mcp_rcontext
}
}

// look for rctx-local objects.
if (rctx->obj_count) {
int lim = fgen->max_queues + rctx->obj_count;
for (int x = fgen->max_queues; x < lim; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
// Don't need to look at the type:
// - slot has to be freed (thus cleaned up) before getting here
// - any uobj is ref'ed into obj_ref
luaL_unref(L, LUA_REGISTRYINDEX, rqu->obj_ref);
rqu->obj_ref = 0;
}
}

// nuke alarm if set.
// should only be paranoia here, but just in case.
if (event_pending(&rctx->timeout_event, EV_TIMEOUT, NULL)) {
Expand Down Expand Up @@ -207,7 +220,8 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
mcp_funcgen_t *fgen = luaL_checkudata(L, -2, "mcp.funcgen");
int fgen_idx = lua_absindex(L, -2);
// create the ctx object.
size_t rctx_len = sizeof(mcp_rcontext_t) + sizeof(struct mcp_rqueue_s) * fgen->max_queues;
int total_queues = fgen->max_queues + fgen->extra_queues;
size_t rctx_len = sizeof(mcp_rcontext_t) + sizeof(struct mcp_rqueue_s) * total_queues;
mcp_rcontext_t *rc = lua_newuserdatauv(L, rctx_len, 0);
memset(rc, 0, rctx_len);

Expand Down Expand Up @@ -368,6 +382,24 @@ static void _mcp_funcgen_return_rctx(mcp_rcontext_t *rctx) {
_mcp_funcgen_return_rctx(rqu->obj);
}
}

// look for rctx-local objects.
if (rctx->obj_count) {
int lim = fgen->max_queues + rctx->obj_count;
for (int x = fgen->max_queues; x < lim; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
if (rqu->obj_type == RQUEUE_TYPE_UOBJ_REQ) {
mcp_request_t *rq = rqu->obj;
mcp_request_cleanup(fgen->thread, rq);
} else if (rqu->obj_type == RQUEUE_TYPE_UOBJ_RES) {
mcp_resp_t *rs = rqu->obj;
mcp_response_cleanup(fgen->thread, rs);
} else {
// no known type. only crash the debug binary.
assert(1 == 0);
}
}
}
}

// TODO: check rctx->awaiting before returning?
Expand Down Expand Up @@ -707,6 +739,16 @@ int mcplib_funcgen_ready(lua_State *L) {
lua_pop(L, 1);
}

if (lua_getfield(L, 2, "u") == LUA_TNUMBER) {
int extra_queues = luaL_checkinteger(L, -1);
if (extra_queues < 1 || extra_queues > RQUEUE_UOBJ_MAX) {
proxy_lua_ferror(L, "user obj ('u') in fgen:ready must be between 1 and %d", RQUEUE_UOBJ_MAX);
return 0;
}
fgen->extra_queues = extra_queues;
}
lua_pop(L, 1);

// now we test the generator function and create the first slot.
lua_pushvalue(L, 1); // copy the funcgen to pass into gencall
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->generator_ref); // for gencall
Expand Down Expand Up @@ -1447,22 +1489,49 @@ int mcplib_rcontext_tls_peer_cn(lua_State *L) {
return 1;
}

// TODO: stub function.
// need to attach request function to rcontext, by using another function that
// doesn't fill out the request info
// Creates request object that's tracked by request context so we can call
// cleanup routines post-run.
int mcplib_rcontext_request_new(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);
if (rctx->obj_count == rctx->fgen->extra_queues) {
proxy_lua_error(L, "rctx request new: object count limit reached");
return 0;
}

// create new request object
mcp_parser_t pr = {0};
mcp_request_t *rq = mcp_new_request(L, &pr, " ", 1);

lua_pushvalue(L, -1); // dupe rq for the rqueue slot
struct mcp_rqueue_s *rqu = &rctx->qslots[rctx->fgen->max_queues + rctx->obj_count];
rctx->obj_count++;
// hold the request reference into the rctx for memory management.
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_UOBJ_REQ;
rqu->obj = rq;

mcp_new_request(L, &pr, " ", 1);
return 1;
}

// TODO: stub function, see request_new above.
int mcplib_rcontext_response_new(lua_State *L) {
mcp_rcontext_t *rctx = lua_touserdata(L, 1);
if (rctx->obj_count == rctx->fgen->extra_queues) {
proxy_lua_error(L, "rctx request new: object count limit reached");
return 0;
}

mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
memset(r, 0, sizeof(mcp_resp_t));
luaL_getmetatable(L, "mcp.response");
lua_setmetatable(L, -2);

lua_pushvalue(L, -1); // dupe rq for the rqueue slot
struct mcp_rqueue_s *rqu = &rctx->qslots[rctx->fgen->max_queues + rctx->obj_count];
rctx->obj_count++;
// hold the request reference into the rctx for memory management.
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
rqu->obj_type = RQUEUE_TYPE_UOBJ_RES;
rqu->obj = r;
return 1;
}

Expand Down
4 changes: 0 additions & 4 deletions proxy_mutator.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,8 +955,6 @@ static int mcp_mut_run(struct mcp_mut_run *run) {
// ensure space and/or allocate memory then seed our destination pointer.
if (mut->type == MUT_REQ) {
mcp_request_t *rq = run->arg;
// FIXME: cleanup should be managed by slot rctx.
mcp_request_cleanup(t, rq);
// future.. should be able to dynamically assign request buffer.
if (total > MCP_REQUEST_MAXLEN) {
proxy_lua_error(run->L, "mutator: new request is too long");
Expand Down Expand Up @@ -986,8 +984,6 @@ static int mcp_mut_run(struct mcp_mut_run *run) {
}
} else {
mcp_resp_t *rs = run->arg;
// FIXME: cleanup should be managed by slot rctx.
mcp_response_cleanup(t, rs);

rs->buf = malloc(total);
if (rs->buf == NULL) {
Expand Down
4 changes: 2 additions & 2 deletions t/proxymut.lua
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ function mcp_config_routes(p)
)

mgfg:ready({
n = "mgtest", f = function(rctx)
n = "mgtest", u = 2, f = function(rctx)
-- make blank request objects for handing to mutator

-- these objects must be made per slot (rctx)
Expand Down Expand Up @@ -90,7 +90,7 @@ function mcp_config_routes(p)
})

msfg:ready({
n = "mstest", f = function(rctx)
n = "mstest", u = 2, f = function(rctx)
return function(r)
local key = r:key()
-- test tree
Expand Down

0 comments on commit b038067

Please sign in to comment.