Skip to content

Commit

Permalink
proxy: config thread cron functions
Browse files Browse the repository at this point in the history
Adds a system for periodically running functions in the configuration
thread. These crons may also signal to reload the configuration after
they run. IE: you may shell out or run a module to fetch and download
new json data, then reload the configuration.

mcp.register_cron("name",
  { every = seconds, func = function()
    print("cron running")
  end })

Also accepts { rerun = false } to run the cron once after a reload. Can
be used to check something after a reload has run, or simply issue a
reload after an exact amount of time since the previous reload finished.

Crons that are not seen after a config reload are unloaded. IE: the
only crons that may run must have been registered during the last
configuration reload.

If a cron is overwriting itself, and the 'every' period has not changed,
it will "inherit" the next scheduled run time. Thus config reloads will
not interrupt the scheduling of crons that are not changing their time
schedule.

Also adds `mcp.config_reload()` which will schedule the system to reload
its configuration after the cron finishes running.
  • Loading branch information
dormando committed Apr 11, 2024
1 parent 22480de commit 8cb719a
Show file tree
Hide file tree
Showing 6 changed files with 339 additions and 45 deletions.
4 changes: 4 additions & 0 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,10 @@ void *proxy_init(bool use_uring, bool proxy_memprofile) {
luaL_openlibs(L);
// NOTE: might need to differentiate the libs yes?
proxy_register_libs(ctx, NULL, L);
// Create the cron table.
lua_newtable(L);
ctx->cron_ref = luaL_ref(L, LUA_REGISTRYINDEX);
ctx->cron_next = INT_MAX;

// set up the shared state VM. Used by short-lock events (counters/state)
// for global visibility.
Expand Down
11 changes: 11 additions & 0 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ typedef struct {
pthread_cond_t manager_cond;
pthread_mutex_t sharedvm_lock; // protect statevm above
globalobj_head_t manager_head; // stack for pool deallocation.
int config_generation; // counter tracking config reloads
int cron_ref; // reference to lua cron table
int cron_next; // next cron to sleep to / execute
bool worker_done; // signal variable for the worker lock/cond system.
bool worker_failed; // covered by worker_lock as well.
bool use_uring; // use IO_URING for backend connections.
Expand Down Expand Up @@ -288,6 +291,7 @@ enum mcp_backend_states {
mcp_backend_next_close, // complete current request, then close socket
};

typedef struct mcp_cron_s mcp_cron_t;
typedef struct mcp_backend_wrap_s mcp_backend_wrap_t;
typedef struct mcp_backend_label_s mcp_backend_label_t;
typedef struct mcp_backend_s mcp_backend_t;
Expand Down Expand Up @@ -333,6 +337,13 @@ struct mcp_request_s {
char request[];
};

struct mcp_cron_s {
uint32_t gen;
uint32_t next;
uint32_t every;
bool repeat;
};

typedef STAILQ_HEAD(io_head_s, _io_pending_proxy_t) io_head_t;
#define MAX_LABELLEN 512
#define MAX_NAMELEN 255
Expand Down
175 changes: 130 additions & 45 deletions proxy_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,128 @@ static void *_proxy_manager_thread(void *arg) {
return NULL;
}

static void proxy_config_reload(proxy_ctx_t *ctx) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
// gen. used for tracking object lifecycles over time.
// ie: ensuring old things are unloaded.
ctx->config_generation++;
lua_State *L = ctx->proxy_state;
lua_settop(L, 0); // clear off any crud that could have been left on the stack.

// The main stages of config reload are:
// - load and execute the config file
// - run mcp_config_pools()
// - for each worker:
// - copy and execute new lua code
// - copy selector table
// - run mcp_config_routes()

if (proxy_load_config(ctx) != 0) {
// Failed to load. log and wait for a retry.
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
return;
}

// TODO (v2): create a temporary VM to test-load the worker code into.
// failing to load partway through the worker VM reloads can be
// critically bad if we're not careful about references.
// IE: the config VM _must_ hold references to selectors and backends
// as long as they exist in any worker for any reason.

for (int x = 0; x < settings.num_threads; x++) {
LIBEVENT_THREAD *thr = get_worker_thread(x);

pthread_mutex_lock(&ctx->worker_lock);
ctx->worker_done = false;
ctx->worker_failed = false;
proxy_reload_notify(thr);
while (!ctx->worker_done) {
// in case of spurious wakeup.
pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
}
pthread_mutex_unlock(&ctx->worker_lock);

// Code load bailed.
if (ctx->worker_failed) {
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
return;
}
}

lua_pop(ctx->proxy_state, 1); // drop config_pools return value
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
}

// Very basic scheduler. Unsorted because we don't expect a huge list of
// functions to run.
static void proxy_run_crons(proxy_ctx_t *ctx) {
lua_State *L = ctx->proxy_state;
assert(lua_gettop(L) == 0);
assert(ctx->cron_ref);
struct timespec now;

// Fetch the cron table. Created on startup so must exist.
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);

clock_gettime(CLOCK_REALTIME, &now);
if (ctx->cron_next <= now.tv_sec) {
ctx->cron_next = INT_MAX;
} else {
// no crons ready.
return;
}
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "cronstart");

// Loop the cron entries.
lua_pushnil(L);
while (lua_next(L, 1) != 0) {
const char *key = lua_tostring(L, -2);
mcp_cron_t *ce = lua_touserdata(L, -1);
int idx = lua_absindex(L, -1);

// check generation.
if (ctx->config_generation != ce->gen) {
// remove entry.
lua_pushnil(L);
lua_setfield(L, 1, key);
} else if (ce->next <= now.tv_sec) {
// grab func and execute it
lua_getiuservalue(L, idx, 1);
// no arguments or return values
int res = lua_pcall(L, 0, 0, 0);
if (res != LUA_OK) {
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_ERROR, NULL, lua_tostring(L, -1));
lua_pop(L, 1); // drop error.
}

if (ce->repeat) {
ce->next = now.tv_sec + ce->every;
// if rescheduled, check next against ctx. update if sooner
if (ctx->cron_next > ce->next) {
ctx->cron_next = ce->next;
}
} else {
// non-repeating cron. delete entry.
lua_pushnil(L);
lua_setfield(L, 1, key);
}
} else {
// not scheduled to run now, but check if we're next.
if (ctx->cron_next > ce->next) {
ctx->cron_next = ce->next;
}
}

lua_pop(L, 1); // drop value so we can loop.
}

lua_pop(L, 1); // drop cron table.
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "crondone");
}

// Thread handling the configuration reload sequence.
// TODO (v2): get a logger instance.
// TODO (v2): making this "safer" will require a few phases of work.
Expand All @@ -136,60 +258,23 @@ static void *_proxy_manager_thread(void *arg) {
// the old structures where marked dirty.
static void *_proxy_config_thread(void *arg) {
proxy_ctx_t *ctx = arg;
struct timespec wait = {0};

logger_create();
pthread_mutex_lock(&ctx->config_lock);
pthread_cond_signal(&ctx->config_cond);
while (1) {
ctx->loading = false;
pthread_cond_wait(&ctx->config_cond, &ctx->config_lock);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "start");
STAT_INCR(ctx, config_reloads, 1);
lua_State *L = ctx->proxy_state;
lua_settop(L, 0); // clear off any crud that could have been left on the stack.

// The main stages of config reload are:
// - load and execute the config file
// - run mcp_config_pools()
// - for each worker:
// - copy and execute new lua code
// - copy selector table
// - run mcp_config_routes()

if (proxy_load_config(ctx) != 0) {
// Failed to load. log and wait for a retry.
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
continue;
}

// TODO (v2): create a temporary VM to test-load the worker code into.
// failing to load partway through the worker VM reloads can be
// critically bad if we're not careful about references.
// IE: the config VM _must_ hold references to selectors and backends
// as long as they exist in any worker for any reason.

for (int x = 0; x < settings.num_threads; x++) {
LIBEVENT_THREAD *thr = get_worker_thread(x);

pthread_mutex_lock(&ctx->worker_lock);
ctx->worker_done = false;
ctx->worker_failed = false;
proxy_reload_notify(thr);
while (!ctx->worker_done) {
// in case of spurious wakeup.
pthread_cond_wait(&ctx->worker_cond, &ctx->worker_lock);
}
pthread_mutex_unlock(&ctx->worker_lock);
// cron only thinks in whole seconds.
wait.tv_sec = ctx->cron_next;
pthread_cond_timedwait(&ctx->config_cond, &ctx->config_lock, &wait);

// Code load bailed.
if (ctx->worker_failed) {
STAT_INCR(ctx, config_reload_fails, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "failed");
continue;
}
proxy_run_crons(ctx);

if (ctx->loading) {
proxy_config_reload(ctx);
}
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_CONFIG, NULL, "done");
}

return NULL;
Expand Down
92 changes: 92 additions & 0 deletions proxy_lua.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,96 @@ static lua_Integer _mcplib_backend_get_waittime(lua_Number secondsf) {
return secondsi;
}

// take string, table as arg:
// name, { every =, rerun = false, func = f }
// repeat defaults to true
static int mcplib_register_cron(lua_State *L) {
proxy_ctx_t *ctx = PROXY_GET_CTX(L);
const char *name = luaL_checkstring(L, 1);
luaL_checktype(L, 2, LUA_TTABLE);

// reserve an upvalue for storing the function.
mcp_cron_t *ce = lua_newuserdatauv(L, sizeof(mcp_cron_t), 1);
memset(ce, 0, sizeof(*ce));

// default repeat.
ce->repeat = true;
// sync config generation.
ce->gen = ctx->config_generation;

if (lua_getfield(L, 2, "func") != LUA_TNIL) {
luaL_checktype(L, -1, LUA_TFUNCTION);
lua_setiuservalue(L, 3, 1); // pop value
} else {
proxy_lua_error(L, "proxy cron entry missing 'func' field");
return 0;
}

if (lua_getfield(L, 2, "rerun") != LUA_TNIL) {
int rerun = lua_toboolean(L, -1);
if (!rerun) {
ce->repeat = false;
}
}
lua_pop(L, 1); // pop val or nil

// TODO: set a limit on 'every' so we don't have to worry about
// underflows. a year? a month?
if (lua_getfield(L, 2, "every") != LUA_TNIL) {
luaL_checktype(L, -1, LUA_TNUMBER);
int every = lua_tointeger(L, -1);
if (every < 1) {
proxy_lua_error(L, "proxy cron entry 'every' must be > 0");
return 0;
}
ce->every = every;
} else {
proxy_lua_error(L, "proxy cron entry missing 'every' field");
return 0;
}
lua_pop(L, 1); // pop val or nil

// schedule the next cron run
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
ce->next = now.tv_sec + ce->every;
// we may adjust ce->next shortly, so don't update global yet.

// valid cron entry, now place into cron table.
lua_rawgeti(L, LUA_REGISTRYINDEX, ctx->cron_ref);

// first, check if a cron of this name already exists.
// if so and the 'every' field matches, inherit its 'next' field
// so we don't perpetually reschedule all crons.
if (lua_getfield(L, -1, name) != LUA_TNIL) {
mcp_cron_t *oldce = lua_touserdata(L, -1);
if (ce->every == oldce->every) {
ce->next = oldce->next;
}
}
lua_pop(L, 1); // drop val/nil

lua_pushvalue(L, 3); // duplicate cron entry
lua_setfield(L, -2, name); // pop duplicate cron entry
lua_pop(L, 1); // drop cron table

// update central cron sleep.
if (ctx->cron_next > ce->next) {
ctx->cron_next = ce->next;
}

return 0;
}

// just set ctx->loading = true
// called from config thread, so config_lock must be held, so it's safe to
// modify protected ctx contents.
static int mcplib_config_reload(lua_State *L) {
proxy_ctx_t *ctx = PROXY_GET_CTX(L);
ctx->loading = true;
return 0;
}

static int mcplib_time_real_millis(lua_State *L) {
struct timespec now;
clock_gettime(CLOCK_REALTIME, &now);
Expand Down Expand Up @@ -1526,6 +1616,8 @@ int proxy_register_libs(void *ctx, LIBEVENT_THREAD *t, void *state) {
{"tcp_keepalive", mcplib_tcp_keepalive},
{"active_req_limit", mcplib_active_req_limit},
{"buffer_memory_limit", mcplib_buffer_memory_limit},
{"config_reload", mcplib_config_reload},
{"register_cron", mcplib_register_cron},
{NULL, NULL}
};

Expand Down
Loading

0 comments on commit 8cb719a

Please sign in to comment.