Skip to content

Commit

Permalink
proxy: stats proxybe to show bad backends
Browse files Browse the repository at this point in the history
stats command intended for backend state information.

so far just outputs "bad_label" [count] for each backend currently
marked bad. The count is the number of backend objects marked bad.

If in "per worker thread mode" (default) if all workers recognize the
backend as down, the count will be the same as the number of worker
threads. If the IO thread is in use it would be 1.

If a backend has normal status, it is not included in the output.

Also fixes a bug with reconnects:

- If a remote server is listening for new connections but not accepting
  them, the backend will fail in readvalidate and never get marked bad.
- Now we only reset the fail counter and remove bad status if a backend
  is fully validated.
  • Loading branch information
dormando committed Mar 29, 2024
1 parent 53f6650 commit ea66fe0
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 8 deletions.
46 changes: 44 additions & 2 deletions proto_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,39 @@ void process_proxy_funcstats(void *arg, ADD_STAT add_stats, conn *c) {
snprintf(key_str, STAT_KEY_LEN-1, "slots_%s", name);
APPEND_STAT(key_str, "%d", slots);
} else {
// TODO: Is it safe to delete keys in the middle here?
// not worried at all about just leaking memory here.
// TODO: It is safe to delete keys here. Slightly complex so low
// priority.
}
}

pthread_mutex_unlock(&ctx->sharedvm_lock);
}

void process_proxy_bestats(void *arg, ADD_STAT add_stats, conn *c) {
char key_str[STAT_KEY_LEN];
if (!arg) {
return;
}
proxy_ctx_t *ctx = arg;
lua_State *L = ctx->proxy_sharedvm;
pthread_mutex_lock(&ctx->sharedvm_lock);

// iterate all of the listed backends
lua_pushnil(L);
while (lua_next(L, SHAREDVM_BACKEND_IDX) != 0) {
int n = lua_tointeger(L, -1);
lua_pop(L, 1); // drop the value, leave the key.
if (n != 0) {
// now grab the name key.
const char *name = lua_tostring(L, -1);
snprintf(key_str, STAT_KEY_LEN-1, "bad_%s", name);
APPEND_STAT(key_str, "%d", n);
} else {
// delete keys of backends that are no longer bad or no longer
// exist to keep the table small.
const char *name = lua_tostring(L, -1);
lua_pushnil(L);
lua_setfield(L, SHAREDVM_BACKEND_IDX, name);
}
}

Expand Down Expand Up @@ -310,6 +341,7 @@ void *proxy_init(bool use_uring, bool proxy_memprofile) {
// constantly fetch them from registry.
lua_newtable(ctx->proxy_sharedvm); // fgen count
lua_newtable(ctx->proxy_sharedvm); // fgen slot count
lua_newtable(ctx->proxy_sharedvm); // backend down status

// Create/start the IO thread, which we need before servers
// start getting created.
Expand Down Expand Up @@ -1305,6 +1337,16 @@ void mcp_sharedvm_delta(proxy_ctx_t *ctx, int tidx, const char *name, int delta)
pthread_mutex_unlock(&ctx->sharedvm_lock);
}

void mcp_sharedvm_remove(proxy_ctx_t *ctx, int tidx, const char *name) {
lua_State *L = ctx->proxy_sharedvm;
pthread_mutex_lock(&ctx->sharedvm_lock);

lua_pushnil(L);
lua_setfield(L, tidx, name);

pthread_mutex_unlock(&ctx->sharedvm_lock);
}

static void *mcp_profile_alloc(void *ud, void *ptr, size_t osize,
size_t nsize) {
struct mcp_memprofile *prof = ud;
Expand Down
1 change: 1 addition & 0 deletions proto_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
void proxy_stats(void *arg, ADD_STAT add_stats, conn *c);
void process_proxy_stats(void *arg, ADD_STAT add_stats, conn *c);
void process_proxy_funcstats(void *arg, ADD_STAT add_stats, conn *c);
void process_proxy_bestats(void *arg, ADD_STAT add_stats, conn *c);

/* proxy mode handlers */
int try_read_command_proxy(conn *c);
Expand Down
2 changes: 2 additions & 0 deletions proto_text.c
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,8 @@ static void process_stat(conn *c, token_t *tokens, const size_t ntokens) {
process_proxy_stats(settings.proxy_ctx, &append_stats, c);
} else if (strcmp(subcommand, "proxyfuncs") == 0) {
process_proxy_funcstats(settings.proxy_ctx, &append_stats, c);
} else if (strcmp(subcommand, "proxybe") == 0) {
process_proxy_bestats(settings.proxy_ctx, &append_stats, c);
#endif
} else {
/* getting here means that the subcommand is either engine specific or
Expand Down
2 changes: 2 additions & 0 deletions proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ struct mcp_memprofile {

#define SHAREDVM_FGEN_IDX 1
#define SHAREDVM_FGENSLOT_IDX 2
#define SHAREDVM_BACKEND_IDX 3

// all possible commands.
#define CMD_FIELDS \
Expand Down Expand Up @@ -556,6 +557,7 @@ typedef struct {
// utils
bool proxy_bufmem_checkadd(LIBEVENT_THREAD *t, int len);
void mcp_sharedvm_delta(proxy_ctx_t *ctx, int tidx, const char *name, int delta);
void mcp_sharedvm_remove(proxy_ctx_t *ctx, int tidx, const char *name);

// networking interface
void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct event_base *base);
Expand Down
28 changes: 23 additions & 5 deletions proxy_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,13 @@ static int _proxy_beconn_checkconnect(struct mcp_backendconn_s *be) {
_reset_bad_backend(be, P_BE_FAIL_CONNECTING);
return -1;
}
P_DEBUG("%s: backend connected (%s:%s)\n", __func__, be->be_parent->name, be->be_parent->port);
P_DEBUG("%s: backend connected [fd: %d] (%s:%s)\n", __func__, mcmc_fd(be->client), be->be_parent->name, be->be_parent->port);
be->connecting = false;
be->state = mcp_backend_read;
be->bad = false;

// seed the failure time for the flap check.
gettimeofday(&be->last_failed, NULL);
be->depth = 0; // was set to INT_MAX if bad, need to reset.
be->failed_count = 0;

be->validating = true;
// TODO: make validation optional.
Expand Down Expand Up @@ -197,6 +196,11 @@ static void _cleanup_backend(mcp_backend_t *be) {
assert(STAILQ_EMPTY(&bec->io_head));

mcmc_disconnect(bec->client);

if (bec->bad) {
mcp_sharedvm_delta(bec->event_thread->ctx, SHAREDVM_BACKEND_IDX,
bec->be_parent->label, -1);
}
}
// - free be->client
free(bec->client);
Expand Down Expand Up @@ -773,6 +777,8 @@ static void _backend_reschedule(struct mcp_backendconn_s *be) {
if (!be->bad) {
P_DEBUG("%s: marking backend as bad\n", __func__);
STAT_INCR(be->event_thread->ctx, backend_marked_bad, 1);
mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
be->be_parent->label, 1);
LOGGER_LOG(NULL, LOG_PROXYEVENTS, LOGGER_PROXY_BE_ERROR, NULL, badtext, be->be_parent->name, be->be_parent->port, be->be_parent->label, 0, NULL, 0, retry_time);
}
be->bad = true;
Expand Down Expand Up @@ -836,7 +842,7 @@ static void _backend_flap_check(struct mcp_backendconn_s *be, enum proxy_be_fail
// _must_ be called from within the event thread.
static void _reset_bad_backend(struct mcp_backendconn_s *be, enum proxy_be_failures err) {
io_pending_proxy_t *io = NULL;
P_DEBUG("%s: resetting bad backend: %s\n", __func__, proxy_be_failure_text[err]);
P_DEBUG("%s: resetting bad backend: [fd: %d] %s\n", __func__, mcmc_fd(be->client), proxy_be_failure_text[err]);
// Can't use STAILQ_FOREACH() since return_io_pending() free's the current
// io. STAILQ_FOREACH_SAFE maybe?
int depth = be->depth;
Expand Down Expand Up @@ -989,7 +995,7 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
struct timeval tmp_time = be->tunables.read;

if (which & EV_TIMEOUT) {
P_DEBUG("%s: backend timed out while connecting\n", __func__);
P_DEBUG("%s: backend timed out while connecting [fd: %d]\n", __func__, mcmc_fd(be->client));
if (be->connecting) {
_reset_bad_backend(be, P_BE_FAIL_CONNTIMEOUT);
} else {
Expand Down Expand Up @@ -1079,6 +1085,18 @@ static void proxy_beconn_handler(const int fd, const short which, void *arg) {
// switch to the primary persistent read event.
if (!be->validating) {
_set_main_event(be, be->event_thread->base, EV_READ|EV_PERSIST, NULL, proxy_backend_handler);

// we're happily validated and switching to normal processing, so
// _now_ the backend is no longer "bad".
// If we reset the failed count earlier we then can fail the
// validation loop indefinitely without ever being marked bad.
if (be->bad) {
// was bad, need to mark as no longer bad in shared space.
mcp_sharedvm_delta(be->event_thread->ctx, SHAREDVM_BACKEND_IDX,
be->be_parent->label, -1);
}
be->bad = false;
be->failed_count = 0;
}
}

Expand Down
18 changes: 17 additions & 1 deletion t/lib/MemcachedTest.pm
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,8 @@ sub _accept_backend {
$be->autoflush(1);
Test::More::ok(defined $be, "mock backend created");
Test::More::like(<$be>, qr/version/, "received version command");
print $be "VERSION 1.0.0-mock\r\n";
my $sent = $be->send("VERSION 1.0.0-mock\r\n");
Test::More::cmp_ok($sent, '>', 0, "wrote VERSION back to backend");

return $be;
}
Expand All @@ -582,6 +583,21 @@ sub accept_backend {
$self->{_be}->[$idx] = _accept_backend($self->{_srv}->[$idx]);
}

sub close_backend {
my $self = shift;
my $idx = shift;
$self->{_be}->[$idx]->close;
}

sub srv_accept_waiting {
my $self = shift;
my $idx = shift;
my $wait = shift || 1;
my $s = IO::Select->new();
$s->add($self->{_srv}->[$idx]);
return $s->can_read($wait);
}

sub set_c {
my $self = shift;
my $sel = $self->{_csel};
Expand Down
18 changes: 18 additions & 0 deletions t/proxybestats.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
function mcp_config_pools()
mcp.backend_read_timeout(0.25)
mcp.backend_connect_timeout(0.5)
mcp.backend_failure_limit(1)
mcp.backend_retry_waittime(0.5) -- fast retry for test
local b1 = mcp.backend('b1', '127.0.0.1', 12131)
local b2 = mcp.backend('b2', '127.0.0.1', 12132)
local b3 = mcp.backend('b3', '127.0.0.1', 12133)
return mcp.pool({b1, b2, b3})
end

-- not making requests, just opening/closing backends
function mcp_config_routes(p)
pool = p -- stash this in a global.
mcp.attach(mcp.CMD_MG, function(r)
return "SERVER_ERROR nothing\r\n"
end)
end
66 changes: 66 additions & 0 deletions t/proxybestats.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#!/usr/bin/env perl
# Check funcgen and memory accounting.

use strict;
use warnings;
use Test::More;
use FindBin qw($Bin);
use lib "$Bin/lib";
use Carp qw(croak);
use MemcachedTest;
use IO::Socket qw(AF_INET SOCK_STREAM);
use IO::Select;
use Data::Dumper qw/Dumper/;

if (!supports_proxy()) {
plan skip_all => 'proxy not enabled';
exit 0;
}

my $t = Memcached::ProxyTest->new(servers => [12131, 12132, 12133]);

my $p_srv = new_memcached('-o proxy_config=./t/proxybestats.lua -t 1');
my $ps = $p_srv->sock;
$ps->autoflush(1);

$t->set_c($ps);
$t->accept_backends();

subtest 'cause bad backends' => sub {
my $s = mem_stats($ps, "proxybe");
is(keys %$s, 0, "no bad backends");

$t->close_backend(0);
# wait for retry to mark it bad.
sleep 1;
$s = mem_stats($ps, "proxybe");
is($s->{bad_b1}, 1, "b1 is now bad");

# Could have multiple sockets queued as the proxy retries. We can get all
# the way through read validation then not know if the socket was closed
# until the _next_ time we read from it.
# So we can either try to read and accept in a loop, or keep checking if
# there's a conn waiting to be accepted.
while ($t->srv_accept_waiting(0, 1)) {
$t->accept_backend(0);
}
$s = mem_stats($ps, "proxybe");
is(keys %$s, 0, "no bad backends anymore");

$t->close_backend(1);
$t->close_backend(2);
sleep 1;
$s = mem_stats($ps, "proxybe");
is($s->{bad_b2}, 1, "b2 is now bad");
is($s->{bad_b3}, 1, "b3 is now bad");
for (1 .. 2) {
while ($t->srv_accept_waiting($_, 1)) {
$t->accept_backend($_);
}
}
$s = mem_stats($ps, "proxybe");
is(keys %$s, 0, "no bad backends anymore");

};

done_testing();

0 comments on commit ea66fe0

Please sign in to comment.