Skip to content

Commit bc05e42

Browse files
committed
proxy: internal backend for V2 API
Replaces `res = mcp.internal(req)` with the standard V2 API flow. Create a handle for an fgen using `mcp.internal_backend` as an argument, then call wait against it like a normal pool. Code is incomplete but the basics work.
1 parent 83425f3 commit bc05e42

File tree

7 files changed

+139
-50
lines changed

7 files changed

+139
-50
lines changed

proxy.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,7 @@ struct _io_pending_proxy_t {
521521
// original struct ends here
522522

523523
mcp_rcontext_t *rctx; // pointer to request context.
524+
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
524525
int queue_handle; // queue slot to return this result to
525526
bool ascii_multiget; // passed on from mcp_r_t
526527
union {
@@ -541,7 +542,6 @@ struct _io_pending_proxy_t {
541542
struct iovec iov[2]; // request string + tail buffer
542543
int iovcnt; // 1 or 2...
543544
unsigned int iovbytes; // total bytes in the iovec
544-
mcp_resp_t *client_resp; // reference (currently pointing to a lua object)
545545
bool flushed; // whether we've fully written this request to a backend.
546546
bool background; // dummy IO for backgrounded awaits
547547
};
@@ -608,6 +608,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m
608608
// internal request interface
609609
int mcplib_internal(lua_State *L);
610610
int mcplib_internal_run(mcp_rcontext_t *rctx);
611+
void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r);
611612

612613
// user stats interface
613614
#define MAX_USTATS_DEFAULT 1024
@@ -697,10 +698,11 @@ struct mcp_funcgen_router {
697698

698699
#define RQUEUE_TYPE_NONE 0
699700
#define RQUEUE_TYPE_POOL 1
700-
#define RQUEUE_TYPE_FGEN 2
701-
#define RQUEUE_TYPE_UOBJ 3 // user tracked object types past this point
702-
#define RQUEUE_TYPE_UOBJ_REQ 4
703-
#define RQUEUE_TYPE_UOBJ_RES 5
701+
#define RQUEUE_TYPE_INT 2
702+
#define RQUEUE_TYPE_FGEN 3
703+
#define RQUEUE_TYPE_UOBJ 4 // user tracked object types past this point
704+
#define RQUEUE_TYPE_UOBJ_REQ 5
705+
#define RQUEUE_TYPE_UOBJ_RES 6
704706
#define RQUEUE_ASSIGNED (1<<0)
705707
#define RQUEUE_R_RESUME (1<<1)
706708
#define RQUEUE_R_GOOD (1<<3)

proxy_internal.c

Lines changed: 46 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1637,30 +1637,8 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res
16371637

16381638
/*** Lua and internal handler ***/
16391639

1640-
int mcplib_internal(lua_State *L) {
1641-
luaL_checkudata(L, 1, "mcp.request");
1642-
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1643-
memset(r, 0, sizeof(mcp_resp_t));
1644-
luaL_getmetatable(L, "mcp.response");
1645-
lua_setmetatable(L, -2);
1646-
1647-
lua_pushinteger(L, MCP_YIELD_INTERNAL);
1648-
return lua_yield(L, 2);
1649-
}
1650-
1651-
// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
1652-
// the text parser should eventually move to the new tokenizer and we can
1653-
// merge all of this code together.
1654-
int mcplib_internal_run(mcp_rcontext_t *rctx) {
1655-
lua_State *L = rctx->Lc;
1656-
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
1657-
mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
1658-
mc_resp *resp = resp_start_unlinked(rctx->c);
1659-
LIBEVENT_THREAD *t = rctx->c->thread;
1640+
static inline int _mcplib_internal_run(LIBEVENT_THREAD *t, mcp_request_t *rq, mcp_resp_t *r, mc_resp *resp) {
16601641
mcp_parser_t *pr = &rq->pr;
1661-
if (resp == NULL) {
1662-
return -1;
1663-
}
16641642

16651643
// TODO: meta no-op isn't handled here. haven't decided how yet.
16661644
switch (rq->pr.command) {
@@ -1751,6 +1729,51 @@ int mcplib_internal_run(mcp_rcontext_t *rctx) {
17511729
// Always return OK from here as this is signalling an internal error.
17521730
r->status = MCMC_OK;
17531731

1732+
return 0;
1733+
}
1734+
1735+
int mcplib_internal(lua_State *L) {
1736+
luaL_checkudata(L, 1, "mcp.request");
1737+
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1738+
memset(r, 0, sizeof(mcp_resp_t));
1739+
luaL_getmetatable(L, "mcp.response");
1740+
lua_setmetatable(L, -2);
1741+
1742+
lua_pushinteger(L, MCP_YIELD_INTERNAL);
1743+
return lua_yield(L, 2);
1744+
}
1745+
1746+
// V2 API internal handling.
1747+
void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r) {
1748+
LIBEVENT_THREAD *t = rctx->fgen->thread;
1749+
mc_resp *resp = resp_start_unlinked(rctx->c);
1750+
if (resp == NULL) {
1751+
return NULL;
1752+
}
1753+
1754+
// TODO: release resp here instead on error?
1755+
if (_mcplib_internal_run(t, rq, r, resp) != 0) {
1756+
return NULL;
1757+
}
1758+
1759+
return resp;
1760+
}
1761+
1762+
// we're pretending to be p_c_ascii(), but reusing our already tokenized code.
1763+
// the text parser should eventually move to the new tokenizer and we can
1764+
// merge all of this code together.
1765+
int mcplib_internal_run(mcp_rcontext_t *rctx) {
1766+
lua_State *L = rctx->Lc;
1767+
mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request");
1768+
mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response");
1769+
mc_resp *resp = resp_start_unlinked(rctx->c);
1770+
LIBEVENT_THREAD *t = rctx->c->thread;
1771+
if (resp == NULL) {
1772+
return -1;
1773+
}
1774+
1775+
_mcplib_internal_run(t, rq, r, resp);
1776+
17541777
// resp object is associated with the
17551778
// response object, which is about a
17561779
// kilobyte.

proxy_lua.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1801,6 +1801,10 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
18011801
luaL_newmetatable(L, "mcp.funcgen");
18021802
lua_pop(L, 1);
18031803

1804+
// mt for magical null wrapper for using internal cache as backend
1805+
luaL_newmetatable(L, "mcp.internal_be");
1806+
lua_pop(L, 1);
1807+
18041808
luaL_newlibtable(L, mcplib_f_routes);
18051809
} else {
18061810
// Change the extra space override for the configuration VM to just point
@@ -1834,6 +1838,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
18341838
luaL_newlibtable(L, mcplib_f_config);
18351839
}
18361840

1841+
// Create magic empty value to pass as an internal backend.
1842+
lua_newuserdatauv(L, 1, 0);
1843+
luaL_getmetatable(L, "mcp.internal_be");
1844+
lua_setmetatable(L, -2);
1845+
lua_setfield(L, -2, "internal_backend");
1846+
18371847
// create main library table.
18381848
//luaL_newlib(L, mcplib_f);
18391849
// TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things

proxy_luafgen.c

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ static void mcp_rcontext_cleanup(lua_State *L, mcp_funcgen_t *fgen, mcp_rcontext
7171
// cleanup of request queue entries. recurse funcgen cleanup.
7272
for (int x = 0; x < fgen->max_queues; x++) {
7373
struct mcp_rqueue_s *rqu = &rctx->qslots[x];
74-
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
74+
if (rqu->obj_type == RQUEUE_TYPE_POOL || rqu->obj_type == RQUEUE_TYPE_INT) {
7575
// nothing to do.
7676
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
7777
// don't need to recurse, just free the subrctx.
@@ -221,7 +221,7 @@ static int _mcplib_funcgen_gencall(lua_State *L) {
221221
struct mcp_rqueue_s *frqu = &fgen->queue_list[x];
222222
struct mcp_rqueue_s *rqu = &rc->qslots[x];
223223
rqu->obj_type = frqu->obj_type;
224-
if (frqu->obj_type == RQUEUE_TYPE_POOL) {
224+
if (frqu->obj_type == RQUEUE_TYPE_POOL || frqu->obj_type == RQUEUE_TYPE_INT) {
225225
rqu->obj_ref = 0;
226226
rqu->obj = frqu->obj;
227227
mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread);
@@ -641,6 +641,7 @@ int mcplib_funcgen_new_handle(lua_State *L) {
641641
mcp_funcgen_t *fgen = lua_touserdata(L, 1);
642642
mcp_pool_proxy_t *pp = NULL;
643643
mcp_funcgen_t *fg = NULL;
644+
void *test = NULL;
644645

645646
if (fgen->ready) {
646647
proxy_lua_error(L, "cannot modify function generator after calling ready");
@@ -649,6 +650,8 @@ int mcplib_funcgen_new_handle(lua_State *L) {
649650

650651
if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) {
651652
// good.
653+
} else if ((test = luaL_testudata(L, 2, "mcp.internal_be")) != NULL) {
654+
// also good.
652655
} else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) {
653656
if (fg->is_router) {
654657
proxy_lua_error(L, "cannot assign a router to a handle in new_handle");
@@ -682,6 +685,11 @@ int mcplib_funcgen_new_handle(lua_State *L) {
682685
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
683686
rqu->obj_type = RQUEUE_TYPE_POOL;
684687
rqu->obj = pp;
688+
} else if (test) {
689+
// pops test from the stack
690+
rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX);
691+
rqu->obj_type = RQUEUE_TYPE_INT;
692+
rqu->obj = test;
685693
} else {
686694
// pops the fgen from the stack.
687695
mcp_funcgen_reference(L);
@@ -1151,6 +1159,34 @@ void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
11511159
p->return_cb = proxy_return_rqu_cb;
11521160
p->queue_handle = handle;
11531161
rctx->pending_reqs++;
1162+
} else if (rqu->obj_type == RQUEUE_TYPE_INT) {
1163+
mcp_request_t *rq = rqu->rq;
1164+
mc_resp *resp = mcp_rcontext_internal(rctx, rq, rqu->res_obj);
1165+
if (resp == NULL) {
1166+
// FIXME: error handling.
1167+
} else if (resp->io_pending) {
1168+
resp->io_pending->return_cb = proxy_return_rqu_cb;
1169+
// Add io object to extstore submission queue.
1170+
io_queue_t *q = thread_io_queue_get(rctx->fgen->thread, IO_QUEUE_EXTSTORE);
1171+
io_pending_proxy_t *io = (io_pending_proxy_t *)resp->io_pending;
1172+
io->queue_handle = handle;
1173+
io->client_resp = rqu->res_obj;
1174+
1175+
STAILQ_INSERT_TAIL(&q->stack, (io_pending_t *)io, iop_next);
1176+
1177+
io->rctx = rctx;
1178+
io->c = rctx->c;
1179+
io->ascii_multiget = rq->ascii_multiget;
1180+
// mark the buffer into the mcp_resp for freeing later.
1181+
rqu->res_obj->buf = io->eio.buf;
1182+
rctx->pending_reqs++;
1183+
} else {
1184+
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, rqu->res_obj);
1185+
p->return_cb = proxy_return_rqu_cb;
1186+
p->queue_handle = handle;
1187+
p->background = true;
1188+
rctx->pending_reqs++;
1189+
}
11541190
} else if (rqu->obj_type == RQUEUE_TYPE_FGEN) {
11551191
// TODO: NULL the ->c post-return?
11561192
mcp_rcontext_t *subrctx = rqu->obj;

t/proxyinternal.lua

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ function mcp_config_pools()
22
return true
33
end
44

5-
-- Do specialized testing based on the key prefix.
65
function mcp_config_routes(zones)
7-
mcp.attach(mcp.CMD_ANY_STORAGE, function(r)
8-
return mcp.internal(r)
9-
end)
6+
local fg = mcp.funcgen_new()
7+
local h = fg:new_handle(mcp.internal_backend)
8+
fg:ready({ n = "internal", f = function(rctx)
9+
return function(r)
10+
return rctx:enqueue_and_wait(r, h)
11+
end
12+
end})
13+
14+
mcp.attach(mcp.CMD_ANY_STORAGE, fg)
1015
end

t/proxyinternal2.lua

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,20 @@ function mcp_config_pools()
22
return true
33
end
44

5-
local result_leak = {}
6-
-- Do specialized testing based on the key prefix.
75
function mcp_config_routes(zones)
8-
mcp.attach(mcp.CMD_ANY_STORAGE, function(r)
9-
local cmd = r:command()
10-
if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then
11-
-- marking the object as <close> will clean up its internal
12-
-- references as soon as it drops out of scope.
13-
-- it is an error to try to use this 'res' outside of this 'if'
14-
-- statement!
15-
local res <close> = mcp.internal(r)
16-
local res2 = mcp.internal(r)
17-
res2:close() -- test manual closing.
18-
-- uncomment to test effects of leaking a res obj
19-
table.insert(result_leak, res)
6+
local fg = mcp.funcgen_new()
7+
local h1 = fg:new_handle(mcp.internal_backend)
8+
local h2 = fg:new_handle(mcp.internal_backend)
9+
fg:ready({ n = "internal", f = function(rctx)
10+
return function(r)
11+
-- ensure we can't leak by grabbing a result we then don't use.
12+
local cmd = r:command()
13+
if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then
14+
local res1 = rctx:enqueue_and_wait(r, h1)
15+
end
16+
return rctx:enqueue_and_wait(r, h2)
2017
end
21-
return mcp.internal(r)
22-
end)
18+
end})
19+
20+
mcp.attach(mcp.CMD_ANY_STORAGE, fg)
2321
end

t/proxyinternal3.lua

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
function mcp_config_pools()
2+
return true
3+
end
4+
5+
function mcp_config_routes(p)
6+
local fg = mcp.funcgen_new()
7+
local h = fg:new_handle(mcp.internal_backend)
8+
fg:ready({ n = "internal", f = function(rctx)
9+
return function(r)
10+
return rctx:enqueue_and_wait(r, h)
11+
end
12+
end})
13+
14+
mcp.attach(mcp.CMD_ANY_STORAGE, fg)
15+
end

0 commit comments

Comments
 (0)