diff --git a/proxy.h b/proxy.h index e25ba4b4e..972e788af 100644 --- a/proxy.h +++ b/proxy.h @@ -521,6 +521,7 @@ struct _io_pending_proxy_t { // original struct ends here mcp_rcontext_t *rctx; // pointer to request context. + mcp_resp_t *client_resp; // reference (currently pointing to a lua object) int queue_handle; // queue slot to return this result to bool ascii_multiget; // passed on from mcp_r_t union { @@ -541,7 +542,6 @@ struct _io_pending_proxy_t { struct iovec iov[2]; // request string + tail buffer int iovcnt; // 1 or 2... unsigned int iovbytes; // total bytes in the iovec - mcp_resp_t *client_resp; // reference (currently pointing to a lua object) bool flushed; // whether we've fully written this request to a backend. bool background; // dummy IO for backgrounded awaits }; @@ -608,6 +608,7 @@ io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, m // internal request interface int mcplib_internal(lua_State *L); int mcplib_internal_run(mcp_rcontext_t *rctx); +void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r); // user stats interface #define MAX_USTATS_DEFAULT 1024 @@ -697,10 +698,11 @@ struct mcp_funcgen_router { #define RQUEUE_TYPE_NONE 0 #define RQUEUE_TYPE_POOL 1 -#define RQUEUE_TYPE_FGEN 2 -#define RQUEUE_TYPE_UOBJ 3 // user tracked object types past this point -#define RQUEUE_TYPE_UOBJ_REQ 4 -#define RQUEUE_TYPE_UOBJ_RES 5 +#define RQUEUE_TYPE_INT 2 +#define RQUEUE_TYPE_FGEN 3 +#define RQUEUE_TYPE_UOBJ 4 // user tracked object types past this point +#define RQUEUE_TYPE_UOBJ_REQ 5 +#define RQUEUE_TYPE_UOBJ_RES 6 #define RQUEUE_ASSIGNED (1<<0) #define RQUEUE_R_RESUME (1<<1) #define RQUEUE_R_GOOD (1<<3) diff --git a/proxy_internal.c b/proxy_internal.c index fa1366a4a..71fe0a32e 100644 --- a/proxy_internal.c +++ b/proxy_internal.c @@ -1637,30 +1637,8 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res /*** Lua and internal handler ***/ -int mcplib_internal(lua_State *L) { - luaL_checkudata(L, 1, "mcp.request"); - mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0); - memset(r, 0, sizeof(mcp_resp_t)); - luaL_getmetatable(L, "mcp.response"); - lua_setmetatable(L, -2); - - lua_pushinteger(L, MCP_YIELD_INTERNAL); - return lua_yield(L, 2); -} - -// we're pretending to be p_c_ascii(), but reusing our already tokenized code. -// the text parser should eventually move to the new tokenizer and we can -// merge all of this code together. -int mcplib_internal_run(mcp_rcontext_t *rctx) { - lua_State *L = rctx->Lc; - mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); - mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response"); - mc_resp *resp = resp_start_unlinked(rctx->c); - LIBEVENT_THREAD *t = rctx->c->thread; +static inline int _mcplib_internal_run(LIBEVENT_THREAD *t, mcp_request_t *rq, mcp_resp_t *r, mc_resp *resp) { mcp_parser_t *pr = &rq->pr; - if (resp == NULL) { - return -1; - } // TODO: meta no-op isn't handled here. haven't decided how yet. switch (rq->pr.command) { @@ -1751,6 +1729,51 @@ int mcplib_internal_run(mcp_rcontext_t *rctx) { // Always return OK from here as this is signalling an internal error. r->status = MCMC_OK; + return 0; +} + +int mcplib_internal(lua_State *L) { + luaL_checkudata(L, 1, "mcp.request"); + mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0); + memset(r, 0, sizeof(mcp_resp_t)); + luaL_getmetatable(L, "mcp.response"); + lua_setmetatable(L, -2); + + lua_pushinteger(L, MCP_YIELD_INTERNAL); + return lua_yield(L, 2); +} + +// V2 API internal handling. +void *mcp_rcontext_internal(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_resp_t *r) { + LIBEVENT_THREAD *t = rctx->fgen->thread; + mc_resp *resp = resp_start_unlinked(rctx->c); + if (resp == NULL) { + return NULL; + } + + // TODO: release resp here instead on error? + if (_mcplib_internal_run(t, rq, r, resp) != 0) { + return NULL; + } + + return resp; +} + +// we're pretending to be p_c_ascii(), but reusing our already tokenized code. +// the text parser should eventually move to the new tokenizer and we can +// merge all of this code together. +int mcplib_internal_run(mcp_rcontext_t *rctx) { + lua_State *L = rctx->Lc; + mcp_request_t *rq = luaL_checkudata(L, 1, "mcp.request"); + mcp_resp_t *r = luaL_checkudata(L, 2, "mcp.response"); + mc_resp *resp = resp_start_unlinked(rctx->c); + LIBEVENT_THREAD *t = rctx->c->thread; + if (resp == NULL) { + return -1; + } + + _mcplib_internal_run(t, rq, r, resp); + // resp object is associated with the // response object, which is about a // kilobyte. diff --git a/proxy_lua.c b/proxy_lua.c index 92e527a91..8d6dbe8ef 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -1801,6 +1801,10 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { luaL_newmetatable(L, "mcp.funcgen"); lua_pop(L, 1); + // mt for magical null wrapper for using internal cache as backend + luaL_newmetatable(L, "mcp.internal_be"); + lua_pop(L, 1); + luaL_newlibtable(L, mcplib_f_routes); } else { // Change the extra space override for the configuration VM to just point @@ -1834,6 +1838,12 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { luaL_newlibtable(L, mcplib_f_config); } + // Create magic empty value to pass as an internal backend. + lua_newuserdatauv(L, 1, 0); + luaL_getmetatable(L, "mcp.internal_be"); + lua_setmetatable(L, -2); + lua_setfield(L, -2, "internal_backend"); + // create main library table. //luaL_newlib(L, mcplib_f); // TODO (v2): luaL_newlibtable() just pre-allocs the exact number of things diff --git a/proxy_luafgen.c b/proxy_luafgen.c index 4e7a3a1a2..121331acd 100644 --- a/proxy_luafgen.c +++ b/proxy_luafgen.c @@ -71,7 +71,7 @@ static void mcp_rcontext_cleanup(lua_State *L, mcp_funcgen_t *fgen, mcp_rcontext // cleanup of request queue entries. recurse funcgen cleanup. for (int x = 0; x < fgen->max_queues; x++) { struct mcp_rqueue_s *rqu = &rctx->qslots[x]; - if (rqu->obj_type == RQUEUE_TYPE_POOL) { + if (rqu->obj_type == RQUEUE_TYPE_POOL || rqu->obj_type == RQUEUE_TYPE_INT) { // nothing to do. } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) { // don't need to recurse, just free the subrctx. @@ -221,7 +221,7 @@ static int _mcplib_funcgen_gencall(lua_State *L) { struct mcp_rqueue_s *frqu = &fgen->queue_list[x]; struct mcp_rqueue_s *rqu = &rc->qslots[x]; rqu->obj_type = frqu->obj_type; - if (frqu->obj_type == RQUEUE_TYPE_POOL) { + if (frqu->obj_type == RQUEUE_TYPE_POOL || frqu->obj_type == RQUEUE_TYPE_INT) { rqu->obj_ref = 0; rqu->obj = frqu->obj; mcp_resp_t *r = mcp_prep_bare_resobj(L, fgen->thread); @@ -530,7 +530,7 @@ static void mcp_funcgen_cleanup(lua_State *L, mcp_funcgen_t *fgen) { if (fgen->queue_list) { for (int x = 0; x < fgen->max_queues; x++) { struct mcp_rqueue_s *rqu = &fgen->queue_list[x]; - if (rqu->obj_type == RQUEUE_TYPE_POOL) { + if (rqu->obj_type == RQUEUE_TYPE_POOL || rqu->obj_type == RQUEUE_TYPE_INT) { // just the obj_ref luaL_unref(L, LUA_REGISTRYINDEX, rqu->obj_ref); } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) { @@ -641,6 +641,7 @@ int mcplib_funcgen_new_handle(lua_State *L) { mcp_funcgen_t *fgen = lua_touserdata(L, 1); mcp_pool_proxy_t *pp = NULL; mcp_funcgen_t *fg = NULL; + void *test = NULL; if (fgen->ready) { proxy_lua_error(L, "cannot modify function generator after calling ready"); @@ -649,6 +650,8 @@ int mcplib_funcgen_new_handle(lua_State *L) { if ((pp = luaL_testudata(L, 2, "mcp.pool_proxy")) != NULL) { // good. + } else if ((test = luaL_testudata(L, 2, "mcp.internal_be")) != NULL) { + // also good. } else if ((fg = luaL_testudata(L, 2, "mcp.funcgen")) != NULL) { if (fg->is_router) { proxy_lua_error(L, "cannot assign a router to a handle in new_handle"); @@ -682,6 +685,11 @@ int mcplib_funcgen_new_handle(lua_State *L) { rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX); rqu->obj_type = RQUEUE_TYPE_POOL; rqu->obj = pp; + } else if (test) { + // pops test from the stack + rqu->obj_ref = luaL_ref(L, LUA_REGISTRYINDEX); + rqu->obj_type = RQUEUE_TYPE_INT; + rqu->obj = test; } else { // pops the fgen from the stack. mcp_funcgen_reference(L); @@ -1151,6 +1159,44 @@ void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) { p->return_cb = proxy_return_rqu_cb; p->queue_handle = handle; rctx->pending_reqs++; + } else if (rqu->obj_type == RQUEUE_TYPE_INT) { + mcp_request_t *rq = rqu->rq; + mc_resp *resp = mcp_rcontext_internal(rctx, rq, rqu->res_obj); + if (resp == NULL) { + // NOTE: This can be OOM (no resp alloc) + // or bad parse (no such command) + // we _could_ set an ERRMSG here. + mcp_resp_t *r = rqu->res_obj; + r->status = MCMC_ERR; + r->resp.code = MCMC_CODE_SERVER_ERROR; + io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, rqu->res_obj); + p->return_cb = proxy_return_rqu_cb; + p->queue_handle = handle; + p->background = true; + rctx->pending_reqs++; + } else if (resp->io_pending) { + resp->io_pending->return_cb = proxy_return_rqu_cb; + // Add io object to extstore submission queue. + io_queue_t *q = thread_io_queue_get(rctx->fgen->thread, IO_QUEUE_EXTSTORE); + io_pending_proxy_t *io = (io_pending_proxy_t *)resp->io_pending; + io->queue_handle = handle; + io->client_resp = rqu->res_obj; + + STAILQ_INSERT_TAIL(&q->stack, (io_pending_t *)io, iop_next); + + io->rctx = rctx; + io->c = rctx->c; + io->ascii_multiget = rq->ascii_multiget; + // mark the buffer into the mcp_resp for freeing later. + rqu->res_obj->buf = io->eio.buf; + rctx->pending_reqs++; + } else { + io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, NULL, NULL, rqu->res_obj); + p->return_cb = proxy_return_rqu_cb; + p->queue_handle = handle; + p->background = true; + rctx->pending_reqs++; + } } else if (rqu->obj_type == RQUEUE_TYPE_FGEN) { // TODO: NULL the ->c post-return? mcp_rcontext_t *subrctx = rqu->obj; diff --git a/t/proxyinternal.lua b/t/proxyinternal.lua index c758bf98d..13b7e9d5c 100644 --- a/t/proxyinternal.lua +++ b/t/proxyinternal.lua @@ -2,9 +2,32 @@ function mcp_config_pools() return true end --- Do specialized testing based on the key prefix. +function make_sub() + local fg = mcp.funcgen_new() + local h = fg:new_handle(mcp.internal_backend) + fg:ready({ n = "subint", f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end}) + return fg +end + function mcp_config_routes(zones) - mcp.attach(mcp.CMD_ANY_STORAGE, function(r) - return mcp.internal(r) - end) + local fg = mcp.funcgen_new() + local subfg = make_sub() + local h = fg:new_handle(mcp.internal_backend) + local hsub = fg:new_handle(subfg) + fg:ready({ n = "internal", f = function(rctx) + return function(r) + local k = r:key() + if string.find(k, "^/sub/") then + return rctx:enqueue_and_wait(r, hsub) + else + return rctx:enqueue_and_wait(r, h) + end + end + end}) + + mcp.attach(mcp.CMD_ANY_STORAGE, fg) end diff --git a/t/proxyinternal.t b/t/proxyinternal.t index 3ec3a2d29..09e820934 100644 --- a/t/proxyinternal.t +++ b/t/proxyinternal.t @@ -91,6 +91,9 @@ note "ascii basic"; # non-multiget get mode. print $ps "get /b/miss\r\n"; is(scalar <$ps>, "END\r\n", "basic miss"); + print $ps "get /sub/miss\r\n"; + is(scalar <$ps>, "END\r\n", "basic subrctx miss"); + check_version($ps); } @@ -108,10 +111,18 @@ note "ascii basic"; { print $ps "set /b/foo 0 0 2\r\nhi\r\n"; is(scalar <$ps>, "STORED\r\n", "int set"); + print $ps "set /sub/foo 0 0 2\r\nhi\r\n"; + is(scalar <$ps>, "STORED\r\n", "int set"); + print $ps "get /b/foo\r\n"; is(scalar <$ps>, "VALUE /b/foo 0 2\r\n", "get response"); is(scalar <$ps>, "hi\r\n", "get value"); is(scalar <$ps>, "END\r\n", "get END"); + + print $ps "get /sub/foo\r\n"; + is(scalar <$ps>, "VALUE /sub/foo 0 2\r\n", "get response"); + is(scalar <$ps>, "hi\r\n", "get value"); + is(scalar <$ps>, "END\r\n", "get END"); check_version($ps); } @@ -149,12 +160,21 @@ subtest 'fetch from extstore' => sub { my $data = 'x' x 1000; print $ps "set /b/ext 0 0 1000\r\n$data\r\n"; is(scalar <$ps>, "STORED\r\n", "int set for extstore"); + + print $ps "set /sub/ext 0 0 1000\r\n$data\r\n"; + is(scalar <$ps>, "STORED\r\n", "int set for subrctx extstore"); + wait_ext_flush($ps); print $ps "get /b/ext\r\n"; is(scalar <$ps>, "VALUE /b/ext 0 1000\r\n", "get response from extstore"); is(scalar <$ps>, "$data\r\n", "got data from extstore"); is(scalar <$ps>, "END\r\n", "get END"); + + print $ps "get /sub/ext\r\n"; + is(scalar <$ps>, "VALUE /sub/ext 0 1000\r\n", "get response from subrctx extstore"); + is(scalar <$ps>, "$data\r\n", "got data from extstore"); + is(scalar <$ps>, "END\r\n", "get END"); }; #diag "flood memory" diff --git a/t/proxyinternal2.lua b/t/proxyinternal2.lua index bc81632b4..3e5d6c97d 100644 --- a/t/proxyinternal2.lua +++ b/t/proxyinternal2.lua @@ -2,22 +2,20 @@ function mcp_config_pools() return true end -local result_leak = {} --- Do specialized testing based on the key prefix. function mcp_config_routes(zones) - mcp.attach(mcp.CMD_ANY_STORAGE, function(r) - local cmd = r:command() - if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then - -- marking the object as will clean up its internal - -- references as soon as it drops out of scope. - -- it is an error to try to use this 'res' outside of this 'if' - -- statement! - local res = mcp.internal(r) - local res2 = mcp.internal(r) - res2:close() -- test manual closing. - -- uncomment to test effects of leaking a res obj - table.insert(result_leak, res) + local fg = mcp.funcgen_new() + local h1 = fg:new_handle(mcp.internal_backend) + local h2 = fg:new_handle(mcp.internal_backend) + fg:ready({ n = "internal", f = function(rctx) + return function(r) + -- ensure we can't leak by grabbing a result we then don't use. + local cmd = r:command() + if cmd == mcp.CMD_GET or cmd == mcp.CMD_MG then + local res1 = rctx:enqueue_and_wait(r, h1) + end + return rctx:enqueue_and_wait(r, h2) end - return mcp.internal(r) - end) + end}) + + mcp.attach(mcp.CMD_ANY_STORAGE, fg) end diff --git a/t/proxyinternal3.lua b/t/proxyinternal3.lua new file mode 100644 index 000000000..688a061aa --- /dev/null +++ b/t/proxyinternal3.lua @@ -0,0 +1,15 @@ +function mcp_config_pools() + return true +end + +function mcp_config_routes(p) + local fg = mcp.funcgen_new() + local h = fg:new_handle(mcp.internal_backend) + fg:ready({ n = "internal", f = function(rctx) + return function(r) + return rctx:enqueue_and_wait(r, h) + end + end}) + + mcp.attach(mcp.CMD_ANY_STORAGE, fg) +end