diff --git a/proto_proxy.c b/proto_proxy.c index 20ac55ebd7..857d78b607 100644 --- a/proto_proxy.c +++ b/proto_proxy.c @@ -331,6 +331,10 @@ void *proxy_init(bool use_uring, bool proxy_memprofile) { luaL_openlibs(L); // NOTE: might need to differentiate the libs yes? proxy_register_libs(ctx, NULL, L); + // Create the cron table. + lua_newtable(L); + ctx->cron_ref = luaL_ref(L, LUA_REGISTRYINDEX); + ctx->cron_next = INT_MAX; // set up the shared state VM. Used by short-lock events (counters/state) // for global visibility. diff --git a/proxy.h b/proxy.h index 1c50a0c262..1f0fb6ce20 100644 --- a/proxy.h +++ b/proxy.h @@ -233,6 +233,9 @@ typedef struct { pthread_cond_t manager_cond; pthread_mutex_t sharedvm_lock; // protect statevm above globalobj_head_t manager_head; // stack for pool deallocation. + int config_generation; // counter tracking config reloads + int cron_ref; // reference to lua cron table + int cron_next; // next cron to sleep to / execute bool worker_done; // signal variable for the worker lock/cond system. bool worker_failed; // covered by worker_lock as well. bool use_uring; // use IO_URING for backend connections. @@ -288,6 +291,7 @@ enum mcp_backend_states { mcp_backend_next_close, // complete current request, then close socket }; +typedef struct mcp_cron_s mcp_cron_t; typedef struct mcp_backend_wrap_s mcp_backend_wrap_t; typedef struct mcp_backend_label_s mcp_backend_label_t; typedef struct mcp_backend_s mcp_backend_t; @@ -333,6 +337,13 @@ struct mcp_request_s { char request[]; }; +struct mcp_cron_s { + uint32_t gen; + uint32_t next; + uint32_t every; + bool repeat; +}; + typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t; #define MAX_LABELLEN 512 #define MAX_NAMELEN 255 diff --git a/proxy_config.c b/proxy_config.c index ce462ce339..ae8d5ad941 100644 --- a/proxy_config.c +++ b/proxy_config.c @@ -123,6 +123,128 @@ static void *_proxy_manager_thread(void *arg) { return NULL; } +static void proxy_config_reload(proxy_ctx_t *ctx) { + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start"); + STAT_INCR(ctx, config_reloads, 1); + // gen. used for tracking object lifecycles over time. + // ie: ensuring old things are unloaded. + ctx->config_generation++; + lua_State *L = ctx->proxy_state; + lua_settop(L, 0); // clear off any crud that could have been left on the stack. + + // The main stages of config reload are: + // - load and execute the config file + // - run mcp_config_pools() + // - for each worker: + // - copy and execute new lua code + // - copy selector table + // - run mcp_config_routes() + + if (proxy_load_config(ctx) != 0) { + // Failed to load. log and wait for a retry. + STAT_INCR(ctx, config_reload_fails, 1); + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed"); + return; + } + + // TODO (v2): create a temporary VM to test-load the worker code into. + // failing to load partway through the worker VM reloads can be + // critically bad if we're not careful about references. + // IE: the config VM _must_ hold references to selectors and backends + // as long as they exist in any worker for any reason. + + for (int x = 0; x < settings.num_threads; x++) { + LIBEVENT_THREAD *thr = get_worker_thread(x); + + pthread_mutex_lock(&ctx->worker_lock); + ctx->worker_done = false; + ctx->worker_failed = false; + proxy_reload_notify(thr); + while (!ctx->worker_done) { + // in case of spurious wakeup. + pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock); + } + pthread_mutex_unlock(&ctx->worker_lock); + + // Code load bailed. + if (ctx->worker_failed) { + STAT_INCR(ctx, config_reload_fails, 1); + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed"); + return; + } + } + + lua_pop(ctx->proxy_state, 1); // drop config_pools return value + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done"); +} + +// Very basic scheduler. Unsorted because we don't expect a huge list of +// functions to run. +static void proxy_run_crons(proxy_ctx_t *ctx) { + lua_State *L = ctx->proxy_state; + assert(lua_gettop(L) == 0); + assert(ctx->cron_ref); + struct timespec now; + + // Fetch the cron table. Created on startup so must exist. + lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref); + + clock_gettime(CLOCK_REALTIME, &now); + if (ctx->cron_next <= now.tv_sec) { + ctx->cron_next = INT_MAX; + } else { + // no crons ready. + return; + } + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "cronstart"); + + // Loop the cron entries. + lua_pushnil(L); + while (lua_next(L, 1) != 0) { + const char *key = lua_tostring(L, -2); + mcp_cron_t *ce = lua_touserdata(L, -1); + int idx = lua_absindex(L, -1); + + // check generation. + if (ctx->config_generation != ce->gen) { + // remove entry. + lua_pushnil(L); + lua_setfield(L, 1, key); + } else if (ce->next <= now.tv_sec) { + // grab func and execute it + lua_getiuservalue(L, idx, 1); + // no arguments or return values + int res = lua_pcall(L, 0, 0, 0); + if (res != LUA_OK) { + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1)); + lua_pop(L, 1); // drop error. + } + + if (ce->repeat) { + ce->next = now.tv_sec + ce->every; + // if rescheduled, check next against ctx. update if sooner + if (ctx->cron_next > ce->next) { + ctx->cron_next = ce->next; + } + } else { + // non-repeating cron. delete entry. + lua_pushnil(L); + lua_setfield(L, 1, key); + } + } else { + // not scheduled to run now, but check if we're next. + if (ctx->cron_next > ce->next) { + ctx->cron_next = ce->next; + } + } + + lua_pop(L, 1); // drop value so we can loop. + } + + lua_pop(L, 1); // drop cron table. + LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "crondone"); +} + // Thread handling the configuration reload sequence. // TODO (v2): get a logger instance. // TODO (v2): making this "safer" will require a few phases of work. @@ -136,60 +258,23 @@ static void *_proxy_manager_thread(void *arg) { // the old structures where marked dirty. static void *_proxy_config_thread(void *arg) { proxy_ctx_t *ctx = arg; + struct timespec wait = {0}; logger_create(); pthread_mutex_lock(&ctx->config_lock); pthread_cond_signal(&ctx->config_cond); while (1) { ctx->loading = false; - pthread_cond_wait(&ctx->config_cond, &ctx->config_lock); - LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start"); - STAT_INCR(ctx, config_reloads, 1); - lua_State *L = ctx->proxy_state; - lua_settop(L, 0); // clear off any crud that could have been left on the stack. - - // The main stages of config reload are: - // - load and execute the config file - // - run mcp_config_pools() - // - for each worker: - // - copy and execute new lua code - // - copy selector table - // - run mcp_config_routes() - - if (proxy_load_config(ctx) != 0) { - // Failed to load. log and wait for a retry. - STAT_INCR(ctx, config_reload_fails, 1); - LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed"); - continue; - } - // TODO (v2): create a temporary VM to test-load the worker code into. - // failing to load partway through the worker VM reloads can be - // critically bad if we're not careful about references. - // IE: the config VM _must_ hold references to selectors and backends - // as long as they exist in any worker for any reason. - - for (int x = 0; x < settings.num_threads; x++) { - LIBEVENT_THREAD *thr = get_worker_thread(x); - - pthread_mutex_lock(&ctx->worker_lock); - ctx->worker_done = false; - ctx->worker_failed = false; - proxy_reload_notify(thr); - while (!ctx->worker_done) { - // in case of spurious wakeup. - pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock); - } - pthread_mutex_unlock(&ctx->worker_lock); + // cron only thinks in whole seconds. + wait.tv_sec = ctx->cron_next; + pthread_cond_timedwait(&ctx->config_cond, &ctx->config_lock, &wait); - // Code load bailed. - if (ctx->worker_failed) { - STAT_INCR(ctx, config_reload_fails, 1); - LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed"); - continue; - } + proxy_run_crons(ctx); + + if (ctx->loading) { + proxy_config_reload(ctx); } - LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done"); } return NULL; diff --git a/proxy_lua.c b/proxy_lua.c index c69a181087..f545a73b79 100644 --- a/proxy_lua.c +++ b/proxy_lua.c @@ -26,6 +26,96 @@ static lua_Integer _mcplib_backend_get_waittime(lua_Number secondsf) { return secondsi; } +// take string, table as arg: +// name, { every =, rerun = false, func = f } +// repeat defaults to true +static int mcplib_register_cron(lua_State *L) { + proxy_ctx_t *ctx = PROXY_GET_CTX(L); + const char *name = luaL_checkstring(L, 1); + luaL_checktype(L, 2, LUA_TTABLE); + + // reserve an upvalue for storing the function. + mcp_cron_t *ce = lua_newuserdatauv(L, sizeof(mcp_cron_t), 1); + memset(ce, 0, sizeof(*ce)); + + // default repeat. + ce->repeat = true; + // sync config generation. + ce->gen = ctx->config_generation; + + if (lua_getfield(L, 2, "func") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TFUNCTION); + lua_setiuservalue(L, 3, 1); // pop value + } else { + proxy_lua_error(L, "proxy cron entry missing 'func' field"); + return 0; + } + + if (lua_getfield(L, 2, "rerun") != LUA_TNIL) { + int rerun = lua_toboolean(L, -1); + if (!rerun) { + ce->repeat = false; + } + } + lua_pop(L, 1); // pop val or nil + + // TODO: set a limit on 'every' so we don't have to worry about + // underflows. a year? a month? + if (lua_getfield(L, 2, "every") != LUA_TNIL) { + luaL_checktype(L, -1, LUA_TNUMBER); + int every = lua_tointeger(L, -1); + if (every < 1) { + proxy_lua_error(L, "proxy cron entry 'every' must be > 0"); + return 0; + } + ce->every = every; + } else { + proxy_lua_error(L, "proxy cron entry missing 'every' field"); + return 0; + } + lua_pop(L, 1); // pop val or nil + + // schedule the next cron run + struct timespec now; + clock_gettime(CLOCK_REALTIME, &now); + ce->next = now.tv_sec + ce->every; + // we may adjust ce->next shortly, so don't update global yet. + + // valid cron entry, now place into cron table. + lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref); + + // first, check if a cron of this name already exists. + // if so and the 'every' field matches, inherit its 'next' field + // so we don't perpetually reschedule all crons. + if (lua_getfield(L, -1, name) != LUA_TNIL) { + mcp_cron_t *oldce = lua_touserdata(L, -1); + if (ce->every == oldce->every) { + ce->next = oldce->next; + } + } + lua_pop(L, 1); // drop val/nil + + lua_pushvalue(L, 3); // duplicate cron entry + lua_setfield(L, -2, name); // pop duplicate cron entry + lua_pop(L, 1); // drop cron table + + // update central cron sleep. + if (ctx->cron_next > ce->next) { + ctx->cron_next = ce->next; + } + + return 0; +} + +// just set ctx->loading = true +// called from config thread, so config_lock must be held, so it's safe to +// modify protected ctx contents. +static int mcplib_config_reload(lua_State *L) { + proxy_ctx_t *ctx = PROXY_GET_CTX(L); + ctx->loading = true; + return 0; +} + static int mcplib_time_real_millis(lua_State *L) { struct timespec now; clock_gettime(CLOCK_REALTIME, &now); @@ -1526,6 +1616,8 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) { {"tcp_keepalive", mcplib_tcp_keepalive}, {"active_req_limit", mcplib_active_req_limit}, {"buffer_memory_limit", mcplib_buffer_memory_limit}, + {"config_reload", mcplib_config_reload}, + {"register_cron", mcplib_register_cron}, {NULL, NULL} }; diff --git a/t/proxycron.lua b/t/proxycron.lua new file mode 100644 index 0000000000..1a5b5ed53d --- /dev/null +++ b/t/proxycron.lua @@ -0,0 +1,54 @@ +function set_crons() + mcp.register_cron("foo", + { every = 2, func = function() + foo_run = 1 + end }) + + mcp.register_cron("reload", + { every = 3, func = function() + if foo_run and once_run then + mcp.config_reload() + end + end }) + + -- will run once per reload. + mcp.register_cron("once", + { every = 1, rerun = false, func = function() + once_run = 1 + end }) +end + +function set_crons2() + mcp.register_cron("bar", + { every = 2, func = function() + bar_run = 1 + end }) + + mcp.register_cron("reload", + { every = 3, func = function() + -- ensure the old crons didn't also run. + if bar_run and not foo_run and once_again and not once_run then + mcp.config_reload() + end + end }) + + -- will run once per reload. + mcp.register_cron("onceagain", + { every = 3, rerun = false, func = function() + once_again = 1 + end }) +end + +function mcp_config_pools() + if foo_run == nil then + set_crons() + else + foo_run = nil + once_run = nil + set_crons2() + end +end + +function mcp_config_routes() + -- do nothing. +end diff --git a/t/proxycron.t b/t/proxycron.t new file mode 100644 index 0000000000..f20fafc9fe --- /dev/null +++ b/t/proxycron.t @@ -0,0 +1,48 @@ +#!/usr/bin/env perl +# Basic testing of cron functionality. + +use strict; +use warnings; +use Test::More; +use FindBin qw($Bin); +use lib "$Bin/lib"; +use Carp qw(croak); +use MemcachedTest; +use IO::Socket qw(AF_INET SOCK_STREAM); +use IO::Select; +use Data::Dumper qw/Dumper/; + +if (!supports_proxy()) { + plan skip_all => 'proxy not enabled'; + exit 0; +} + +my $p_srv = new_memcached('-o proxy_config=./t/proxycron.lua -t 1'); +my $ps = $p_srv->sock; +$ps->autoflush(1); + +sub wait_reload { + my $w = shift; + while (my $line = <$w>) { + if ($line =~ m/type=proxy_conf status=done/) { + last; + } elsif ($line =~ m/status=cronstart/) { + pass("cron started"); + } elsif ($line =~ m/status=crondone/) { + pass("cron done"); + } + } + pass("reload complete"); +} + +# The crons will do some fiddling then issue a self-reload twice. +subtest 'wait for reload' => sub { + my $w = $p_srv->new_sock; + print $w "watch proxyevents\n"; + is(<$w>, "OK\r\n", "watcher enabled"); + + wait_reload($w); + wait_reload($w); +}; + +done_testing();