Skip to content

Commit

Permalink
proxy: allow early freeing rcontexts
Browse files Browse the repository at this point in the history
Any allocated request context would stay in memory attached to its
function generator until the next reload, which would replace the
function generators. A replaced fgen would eventually run out of attach
references and cleanup itself, removing all rcontexts.

A one-time burst of concurrent request contexts would thus take up
memory forever. With this change we use an easing function to slowly
free unneeded request contexts.

This can help for both memory usage scenarios and instances where the GC
is still used in the request path and tail latency can become poor.
  • Loading branch information
dormando committed Mar 8, 2024
1 parent 3304459 commit cf0af8b
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 85 deletions.
5 changes: 4 additions & 1 deletion proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -640,23 +640,25 @@ struct mcp_funcgen_router {
int map_ref;
};

#define FGEN_NAME_MAXLEN 80
struct mcp_funcgen_s {
LIBEVENT_THREAD *thread; // worker thread that created this funcgen.
int generator_ref; // reference to the generator function.
int self_ref; // self-reference if we're attached anywhere
int argument_ref; // reference to an argument to pass to generator
int name_ref; // reference to string name for the generator
int max_queues; // how many queue slots rctx's have
unsigned int refcount; // reference counter
unsigned int total; // total contexts managed
unsigned int free; // free contexts
unsigned int free_max; // size of list below.
unsigned int free_pressure; // "pressure" for when to early release rctx
unsigned int routecount; // total routes if this fgen is a router.
bool closed; // the hook holding this fgen has been replaced
bool ready; // if we're locked down or not.
mcp_rcontext_t **list;
struct mcp_rqueue_s *queue_list;
struct mcp_funcgen_router router;
char name[FGEN_NAME_MAXLEN+1]; // string name for the generator.
};

#define RQUEUE_TYPE_NONE 0
Expand Down Expand Up @@ -691,6 +693,7 @@ struct mcp_rqueue_s {
};

struct mcp_rcontext_s {
int self_ref; // reference to our own object
int request_ref; // top level request for this context.
int function_ref; // ref to the created route function.
int coroutine_ref; // ref to our encompassing coroutine.
Expand Down
183 changes: 99 additions & 84 deletions proxy_luafgen.c
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

static mcp_funcgen_t *mcp_funcgen_route(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_t *pr);
static int mcp_funcgen_router_cleanup(lua_State *L, mcp_funcgen_t *fgen);
static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx);
static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen);
static void proxy_return_rqu_cb(io_pending_t *pending);

Expand All @@ -26,16 +27,91 @@ int mcplib_funcgen_gc(lua_State *L) {
// These "generator functions" handle pre-allocating and creating a memory
// heirarchy, allowing dynamic runtimes at high speed.

// must be called with fgen on top of stack in fgen->thread->L
static void mcp_rcontext_cleanup(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx, int fgen_idx) {
lua_State *L = fgen->thread->L;
luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
luaL_unref(L, LUA_REGISTRYINDEX, rctx->function_ref);
if (rctx->request_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rctx->request_ref);
}
assert(rctx->pending_reqs == 0);

// cleanup of request queue entries. recurse funcgen cleanup.
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
// nothing to do.
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// don't need to recurse, just return the subrctx.
mcp_rcontext_t *subrctx = rqu->obj;
_mcplib_funcgen_cache(subrctx->fgen, subrctx);
} else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
assert(1 == 0);
}

if (rqu->res_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rqu->res_ref);
rqu->res_ref = 0;
}

if (rqu->cb_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
rqu->cb_ref = 0;
}
}

lua_getiuservalue(L, fgen_idx, 1);
luaL_unref(L, -1, rctx->self_ref);
rctx->self_ref = 0;
lua_pop(L, 1); // drop freelist table

LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, 1);
}

// TODO: switch from an array to a STAILQ so we can avoid the memory
// management and error handling.
// Realistically it's impossible for these to error so we're safe for now.
#define FGEN_FREE_PRESSURE_MAX 1000
#define FGEN_FREE_PRESSURE_DROP 100
static void _mcplib_funcgen_cache(mcp_funcgen_t *fgen, mcp_rcontext_t *rctx) {
if (fgen->free + 1 >= fgen->free_max) {
fgen->free_max *= 2;
fgen->list = realloc(fgen->list, fgen->free_max * sizeof(mcp_rcontext_t *));
bool do_cache = true;
// Easing algorithm to decide when to "early free" rctx slots:
// - If we recently allocated a slot, reset pressure.
// - Each time an rctx is freed and more than half of available rctx's are
// free, increase pressure.
// - If free rctx are less than half of total, reduce pressure.
// - If pressure is too high, immediately free the rctx, then drop the
// pressure slightly.
//
// This should allow bursty traffic to avoid spinning on alloc/frees,
// while one-time bursts will slowly free slots back down to a minimum of
// 1.
if (fgen->free > fgen->total/2) {
if (fgen->free_pressure++ > FGEN_FREE_PRESSURE_MAX) {
fgen->free_pressure -= FGEN_FREE_PRESSURE_DROP;
// do not cache the rctx
assert(fgen->self_ref);
lua_State *L = fgen->thread->L;
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
mcp_rcontext_cleanup(fgen, rctx, lua_absindex(L, -1));
lua_pop(L, 1); // drop fgen
fgen->total--;
do_cache = false;
}
} else if (fgen->free_pressure > 0) {
fgen->free_pressure--;
}

if (do_cache) {
if (fgen->free + 1 >= fgen->free_max) {
fgen->free_max *= 2;
fgen->list = realloc(fgen->list, fgen->free_max * sizeof(mcp_rcontext_t *));
}
fgen->list[fgen->free] = rctx;
fgen->free++;
}
fgen->list[fgen->free] = rctx;
fgen->free++;

// we're closed and every outstanding request slot has been
// returned.
Expand Down Expand Up @@ -130,7 +206,7 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
lua_getiuservalue(L, fgen_idx, 1); // get the reference table.
// rc, t -> t, rc
lua_rotate(L, -2, 1);
lua_rawseti(L, -2, fgen->total); // pop rcontext
rc->self_ref = luaL_ref(L, -2); // pop rcontext
lua_pop(L, 1); // pop ref table.

_mcplib_funcgen_cache(fgen, rc);
Expand All @@ -141,18 +217,8 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
rc->coroutine_ref = luaL_ref(L, LUA_REGISTRYINDEX);

// increment the slot counter
const char *name = NULL;
if (fgen->name_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->name_ref);
name = lua_tostring(L, -1);
// popping early: string stays referenced in name_ref and no other lua
// code executing on this VM, so it's safe to reference immediately.
lua_pop(L, 1);
} else {
name = "anonymous";
}
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, name, 1);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, fgen->name, 1);

// return the fgen.
// FIXME: just return 0? need to adjust caller to not mis-ref the
Expand Down Expand Up @@ -241,6 +307,8 @@ mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *
mcp_rcontext_t *rctx = NULL;
// nothing left in slot cache, generate a new function.
if (fgen->free == 0) {
// reset free pressure so we try to keep the rctx cached
fgen->free_pressure = 0;
// TODO (perf): pre-create this c closure somewhere hidden.
lua_pushcclosure(L, _mcplib_funcgen_gencall, 0);
// pull in the funcgen object
Expand All @@ -260,6 +328,7 @@ mcp_rcontext_t *mcp_funcgen_get_rctx(lua_State *L, int fgen_ref, mcp_funcgen_t *
}

rctx = fgen->list[fgen->free-1];
fgen->list[fgen->free-1] = NULL;
fgen->free--;

// on non-error, return the response object upward.
Expand Down Expand Up @@ -304,11 +373,13 @@ mcp_rcontext_t *mcp_funcgen_start(lua_State *L, mcp_funcgen_t *fgen, mcp_parser_
// calling either with self_ref set, or with fgen in stack -1 (ie; from GC
// function without ever being attached to anything)
static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
int fgen_idx = 0;
lua_checkstack(L, 5); // paranoia. this can recurse from a router.
// pull the fgen into the stack.
if (fgen->self_ref) {
// pull self onto the stack and hold until the end of the func.
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->self_ref);
fgen_idx = lua_absindex(L, -1); // remember fgen offset
// remove the C reference to the fgen
luaL_unref(L, LUA_REGISTRYINDEX, fgen->self_ref);
fgen->self_ref = 0;
Expand All @@ -323,52 +394,16 @@ static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
}

// decrement the slot counter
const char *name = NULL;
if (fgen->name_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->name_ref);
name = lua_tostring(L, -1);
} else if (fgen->router.type != FGEN_ROUTER_NONE) {
name = "mcp_router";
} else {
name = "anonymous";
}
LIBEVENT_THREAD *t = PROXY_GET_THR(L);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, name, -1);
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, -1);

// Walk every request context and issue cleanup.
for (int x = 0; x < fgen->total; x++) {
mcp_rcontext_t *rctx = fgen->list[x];

luaL_unref(L, LUA_REGISTRYINDEX, rctx->coroutine_ref);
luaL_unref(L, LUA_REGISTRYINDEX, rctx->function_ref);
if (rctx->request_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rctx->request_ref);
}
assert(rctx->pending_reqs == 0);

// cleanup of request queue entries. recurse funcgen cleanup.
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
// nothing to do.
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
// don't need to recurse, just return the subrctx.
mcp_rcontext_t *subrctx = rqu->obj;
_mcplib_funcgen_cache(subrctx->fgen, subrctx);
} else if (rqu->obj_type != RQUEUE_TYPE_NONE) {
assert(1 == 0);
}

if (rqu->res_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rqu->res_ref);
rqu->res_ref = 0;
}

if (rqu->cb_ref) {
luaL_unref(L, LUA_REGISTRYINDEX, rqu->cb_ref);
rqu->cb_ref = 0;
}
if (rctx == NULL) {
continue;
}
mcp_rcontext_cleanup(fgen, rctx, fgen_idx);
}

if (fgen->argument_ref) {
Expand All @@ -381,9 +416,6 @@ static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
fgen->generator_ref = 0;
}

// decrement the slot tracker. apply full delta at once for efficiency.
mcp_sharedvm_delta(t->proxy_ctx, SHAREDVM_FGENSLOT_IDX, name, -fgen->total);

if (fgen->queue_list) {
for (int x = 0; x < fgen->max_queues; x++) {
struct mcp_rqueue_s *rqu = &fgen->queue_list[x];
Expand All @@ -401,20 +433,7 @@ static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) {
free(fgen->queue_list);
}

if (fgen->name_ref) {
lua_pop(L, 1); // pop the name string.
luaL_unref(L, LUA_REGISTRYINDEX, fgen->name_ref);
}

// Finally, get the rctx reference table and nil each reference to allow
// garbage collection to happen sooner on the rctx's
lua_getiuservalue(L, -1, 1);
for (int x = 0; x < fgen->total; x++) {
lua_pushnil(L);
lua_rawseti(L, -2, x+1);
}
// drop the reference table and the funcgen reference.
lua_pop(L, 2);
lua_pop(L, 1); // drop funcgen reference
}

// Must be called with the function generator at on top of stack
Expand Down Expand Up @@ -583,8 +602,11 @@ int mcplib_funcgen_ready(lua_State *L) {
}

if (lua_getfield(L, 2, "n") == LUA_TSTRING) {
fgen->name_ref = luaL_ref(L, LUA_REGISTRYINDEX);
size_t len = 0;
const char *name = lua_tolstring(L, -1, &len);
strncpy(fgen->name, name, FGEN_NAME_MAXLEN);
} else {
strncpy(fgen->name, "anonymous", FGEN_NAME_MAXLEN);
lua_pop(L, 1);
}

Expand All @@ -595,15 +617,7 @@ int mcplib_funcgen_ready(lua_State *L) {
lua_pop(L, 1); // drop extra funcgen ref.

// add us to the global state
const char *name = NULL;
if (fgen->name_ref) {
lua_rawgeti(L, LUA_REGISTRYINDEX, fgen->name_ref);
name = lua_tostring(L, -1);
lua_pop(L, 1);
} else {
name = "anonymous";
}
mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, name, 1);
mcp_sharedvm_delta(fgen->thread->proxy_ctx, SHAREDVM_FGEN_IDX, fgen->name, 1);

fgen->ready = true;
return 1;
Expand Down Expand Up @@ -1588,6 +1602,7 @@ int mcplib_router_new(lua_State *L) {

fgen->routecount = route_count;
memcpy(&fgen->router, &fr, sizeof(struct mcp_funcgen_router));
strncpy(fgen->name, "mcp_router", FGEN_NAME_MAXLEN);

// walk map table again, funcgen_ref everyone.
lua_createtable(L, 0, route_count);
Expand Down

0 comments on commit cf0af8b

Please sign in to comment.