Skip to content

Commit 95dd960

Browse files
committed
proxy: per-backend automatic logging
Options similar to mcp.log_reqsample() are applied to responses as they're read off the network. Can be a lot faster than relying on callbacks to do logging, if you're not doing logging inline with a route handler. Also simpler in some cases (ie routelib).
1 parent 33cf4cc commit 95dd960

File tree

8 files changed

+233
-39
lines changed

8 files changed

+233
-39
lines changed

proto_proxy.c

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1196,8 +1196,6 @@ mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t) {
11961196

11971197
void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
11981198
memset(r, 0, sizeof(mcp_resp_t));
1199-
r->buf = NULL;
1200-
r->blen = 0;
12011199
r->thread = t;
12021200
assert(r->thread != NULL);
12031201
gettimeofday(&r->start, NULL);
@@ -1224,20 +1222,7 @@ void mcp_set_resobj(mcp_resp_t *r, mcp_request_t *rq, mcp_backend_t *be, LIBEVEN
12241222
}
12251223

12261224
r->cmd = rq->pr.command;
1227-
1228-
strncpy(r->be_name, be->name, MAX_NAMELEN+1);
1229-
strncpy(r->be_port, be->port, MAX_PORTLEN+1);
1230-
1231-
}
1232-
1233-
mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t) {
1234-
mcp_resp_t *r = lua_newuserdatauv(L, sizeof(mcp_resp_t), 0);
1235-
mcp_set_resobj(r, rq, be, t);
1236-
1237-
luaL_getmetatable(L, "mcp.response");
1238-
lua_setmetatable(L, -2);
1239-
1240-
return r;
1225+
r->be = be;
12411226
}
12421227

12431228
void mcp_resp_set_elapsed(mcp_resp_t *r) {

proxy.h

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,15 @@ struct proxy_tunables {
227227
bool down; // backend is forced into a down/bad state.
228228
};
229229

230+
struct proxy_logging {
231+
unsigned int deadline; // log if slower than N us (user specifies ms)
232+
unsigned int rate; // sampling
233+
bool all_errors; // always log on an error case
234+
char *detail_hit;
235+
char *detail_ok;
236+
char *detail_err;
237+
};
238+
230239
typedef STAILQ_HEAD(globalobj_head_s, mcp_globalobj_s) globalobj_head_t;
231240
typedef struct {
232241
lua_State *proxy_state; // main configuration vm
@@ -382,7 +391,9 @@ struct mcp_backend_label_s {
382391
char label[MAX_LABELLEN+1];
383392
size_t llen; // cache label length for small speedup in pool creation.
384393
int conncount; // number of sockets to make.
394+
bool use_logging;
385395
struct proxy_tunables tunables;
396+
struct proxy_logging logging;
386397
};
387398

388399
// lua object wrapper meant to own a malloc'ed conn structure
@@ -435,13 +446,15 @@ struct mcp_backend_s {
435446
bool transferred; // if beconn has been shipped to owner thread.
436447
bool use_io_thread; // note if this backend is worker-local or not.
437448
bool stacked; // if backend already queued for syscalls.
449+
bool use_logging; // if automatic logging is enabled.
438450
STAILQ_ENTRY(mcp_backend_s) beconn_next; // stack for connecting conns
439451
STAILQ_ENTRY(mcp_backend_s) be_next; // stack for backends
440452
iop_head_t iop_head; // stack of inbound requests.
453+
struct proxy_logging logging;
454+
struct proxy_tunables tunables; // this gets copied a few times for speed.
441455
char name[MAX_NAMELEN+1];
442456
char port[MAX_PORTLEN+1];
443457
char label[MAX_LABELLEN+1];
444-
struct proxy_tunables tunables; // this gets copied a few times for speed.
445458
struct mcp_backendconn_s be[];
446459
};
447460
typedef STAILQ_HEAD(be_head_s, mcp_backend_s) be_head_t;
@@ -494,6 +507,7 @@ typedef struct {
494507
char *buf; // response line + potentially value.
495508
mc_resp *cresp; // client mc_resp object during extstore fetches.
496509
LIBEVENT_THREAD *thread; // cresp's owner thread needed for extstore cleanup.
510+
mcp_backend_t *be; // backend that generated this response
497511
unsigned int blen; // total size of the value to read.
498512
struct timeval start; // time this object was created.
499513
long elapsed; // time elapsed once handled.
@@ -502,8 +516,6 @@ typedef struct {
502516
uint8_t cmd; // from parser (pr.command)
503517
uint8_t extra; // ascii multiget hack for memory accounting. extra blen.
504518
enum mcp_resp_mode mode; // reply mode (for noreply fixing)
505-
char be_name[MAX_NAMELEN+1];
506-
char be_port[MAX_PORTLEN+1];
507519
} mcp_resp_t;
508520

509521
// re-cast an io_pending_t into this more descriptive structure.
@@ -600,7 +612,6 @@ void proxy_init_event_thread(proxy_event_thread_t *t, proxy_ctx_t *ctx, struct e
600612
void *proxy_event_thread(void *arg);
601613
void proxy_run_backend_queue(be_head_t *head);
602614
struct mcp_backendconn_s *proxy_choose_beconn(mcp_backend_t *be);
603-
mcp_resp_t *mcp_prep_resobj(lua_State *L, mcp_request_t *rq, mcp_backend_t *be, LIBEVENT_THREAD *t);
604615
mcp_resp_t *mcp_prep_bare_resobj(lua_State *L, LIBEVENT_THREAD *t);
605616
void mcp_resp_set_elapsed(mcp_resp_t *r);
606617
io_pending_proxy_t *mcp_queue_rctx_io(mcp_rcontext_t *rctx, mcp_request_t *rq, mcp_backend_t *be, mcp_resp_t *r);
@@ -787,7 +798,7 @@ int mcplib_funcgen_gc(lua_State *L);
787798
void mcp_funcgen_reference(lua_State *L);
788799
void mcp_funcgen_dereference(lua_State *L, mcp_funcgen_t *fgen);
789800
void mcp_rcontext_push_rqu_res(lua_State *L, mcp_rcontext_t *rctx, int handle);
790-
801+
void mcplib_rqu_log(mcp_request_t *rq, mcp_resp_t *rs, int cfd);
791802

792803
int mcplib_factory_command_new(lua_State *L);
793804

proxy_internal.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1719,8 +1719,9 @@ static inline int _mcplib_internal_run(LIBEVENT_THREAD *t, mcp_request_t *rq, mc
17191719
}
17201720

17211721
// in case someone logs this response it should make sense.
1722-
memcpy(r->be_name, "internal", strlen("internal"));
1723-
memcpy(r->be_port, "0", 1);
1722+
// FIXME: make a decision here.
1723+
//memcpy(r->be_name, "internal", strlen("internal"));
1724+
//memcpy(r->be_port, "0", 1);
17241725

17251726
// TODO: r-> will need status/code/mode copied from resp.
17261727
r->cresp = resp;

proxy_lua.c

Lines changed: 156 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,78 @@ static int mcplib_backend_wrap_gc(lua_State *L) {
285285
}
286286

287287
static int mcplib_backend_gc(lua_State *L) {
288-
return 0; // no-op.
288+
mcp_backend_label_t *be = lua_touserdata(L, 1);
289+
if (be->logging.detail_hit)
290+
free(be->logging.detail_hit);
291+
if (be->logging.detail_ok)
292+
free(be->logging.detail_ok);
293+
if (be->logging.detail_err)
294+
free(be->logging.detail_err);
295+
296+
return 0;
297+
}
298+
299+
static int _mcplib_backend_log(lua_State *L, mcp_backend_label_t *be) {
300+
be->use_logging = true;
301+
302+
if (lua_getfield(L, -1, "deadline") != LUA_TNIL) {
303+
int deadline = luaL_checkinteger(L, -1);
304+
if (deadline < 0) {
305+
proxy_lua_error(L, "backend log deadline must be >= 0");
306+
}
307+
// convert to milliseconds.
308+
be->logging.deadline = deadline * 1000;
309+
}
310+
lua_pop(L, 1);
311+
312+
if (lua_getfield(L, -1, "rate") != LUA_TNIL) {
313+
int rate = luaL_checkinteger(L, -1);
314+
if (rate < 0) {
315+
proxy_lua_error(L, "backend log sample rate must be >= 0");
316+
}
317+
be->logging.rate = rate;
318+
}
319+
lua_pop(L, 1);
320+
321+
if (lua_getfield(L, -1, "errors") != LUA_TNIL) {
322+
luaL_checktype(L, -1, LUA_TBOOLEAN);
323+
int errors = lua_toboolean(L, -1);
324+
if (errors) {
325+
be->logging.all_errors = true;
326+
} else {
327+
be->logging.all_errors = false;
328+
}
329+
}
330+
lua_pop(L, 1);
331+
332+
if (lua_getfield(L, -1, "tag_hit") != LUA_TNIL) {
333+
size_t tlen = 0;
334+
const char *tag = luaL_checklstring(L, -1, &tlen);
335+
be->logging.detail_hit = malloc(tlen+1);
336+
memcpy(be->logging.detail_hit, tag, tlen);
337+
be->logging.detail_hit[tlen] = '\0';
338+
}
339+
lua_pop(L, 1);
340+
341+
if (lua_getfield(L, -1, "tag_ok") != LUA_TNIL) {
342+
size_t tlen = 0;
343+
const char *tag = luaL_checklstring(L, -1, &tlen);
344+
be->logging.detail_ok = malloc(tlen+1);
345+
memcpy(be->logging.detail_ok, tag, tlen);
346+
be->logging.detail_ok[tlen] = '\0';
347+
}
348+
lua_pop(L, 1);
349+
350+
if (lua_getfield(L, -1, "tag_err") != LUA_TNIL) {
351+
size_t tlen = 0;
352+
const char *tag = luaL_checklstring(L, -1, &tlen);
353+
be->logging.detail_err = malloc(tlen+1);
354+
memcpy(be->logging.detail_err, tag, tlen);
355+
be->logging.detail_err[tlen] = '\0';
356+
}
357+
lua_pop(L, 1);
358+
359+
return 0;
289360
}
290361

291362
// backend label object; given to pools which then find or create backend
@@ -298,15 +369,18 @@ static int mcplib_backend(lua_State *L) {
298369
size_t llen = 0;
299370
size_t nlen = 0;
300371
size_t plen = 0;
301-
proxy_ctx_t *ctx = PROXY_GET_CTX(L);
302-
mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
303-
memset(be, 0, sizeof(*be));
304372
const char *label;
305373
const char *name;
306374
const char *port;
375+
proxy_ctx_t *ctx = PROXY_GET_CTX(L);
376+
mcp_backend_label_t *be = lua_newuserdatauv(L, sizeof(mcp_backend_label_t), 0);
377+
memset(be, 0, sizeof(*be));
307378
// copy global defaults for tunables.
308379
memcpy(&be->tunables, &ctx->tunables, sizeof(be->tunables));
309380
be->conncount = 1; // one connection per backend as default.
381+
// set the metatable early so the GC handler can free partial allocations
382+
luaL_getmetatable(L, "mcp.backend");
383+
lua_setmetatable(L, -2); // set metatable to userdata.
310384

311385
if (lua_istable(L, 1)) {
312386

@@ -455,6 +529,14 @@ static int mcplib_backend(lua_State *L) {
455529
}
456530
lua_pop(L, 1);
457531

532+
if (lua_getfield(L, 1, "log") != LUA_TNIL) {
533+
if (lua_istable(L, -1)) {
534+
_mcplib_backend_log(L, be);
535+
} else {
536+
proxy_lua_error(L, "backend log option must be a table");
537+
}
538+
}
539+
lua_pop(L, 1);
458540
} else {
459541
label = luaL_checklstring(L, 1, &llen);
460542
name = luaL_checklstring(L, 2, &nlen);
@@ -486,8 +568,6 @@ static int mcplib_backend(lua_State *L) {
486568
if (lua_istable(L, 1)) {
487569
lua_pop(L, 3); // drop label, name, port.
488570
}
489-
luaL_getmetatable(L, "mcp.backend");
490-
lua_setmetatable(L, -2); // set metatable to userdata.
491571

492572
return 1; // return be object.
493573
}
@@ -530,12 +610,27 @@ static mcp_backend_wrap_t *_mcplib_make_backendconn(lua_State *L, mcp_backend_la
530610
proxy_lua_error(L, "out of memory allocating backend connection");
531611
return NULL;
532612
}
613+
533614
bew->be = be;
534615

535616
strncpy(be->name, bel->name, MAX_NAMELEN+1);
536617
strncpy(be->port, bel->port, MAX_PORTLEN+1);
537618
strncpy(be->label, bel->label, MAX_LABELLEN+1);
538619
memcpy(&be->tunables, &bel->tunables, sizeof(bel->tunables));
620+
memcpy(&be->logging, &bel->logging, sizeof(bel->logging));
621+
be->use_logging = bel->use_logging;
622+
// TODO: check for errors.
623+
// not really going to happen and if it does the tag just blanks out..
624+
if (bel->logging.detail_hit) {
625+
be->logging.detail_hit = strdup(bel->logging.detail_hit);
626+
}
627+
if (bel->logging.detail_ok) {
628+
be->logging.detail_ok = strdup(bel->logging.detail_ok);
629+
}
630+
if (bel->logging.detail_err) {
631+
be->logging.detail_err = strdup(bel->logging.detail_err);
632+
}
633+
539634
be->conncount = bel->conncount;
540635
STAILQ_INIT(&be->iop_head);
541636

@@ -1397,8 +1492,8 @@ static int mcplib_log_req(lua_State *L) {
13971492
rtype = rs->resp.type;
13981493
rcode = rs->resp.code;
13991494
rstatus = rs->status;
1400-
rname = rs->be_name;
1401-
rport = rs->be_port;
1495+
rname = rs->be->name;
1496+
rport = rs->be->port;
14021497
elapsed = rs->elapsed;
14031498
}
14041499
size_t dlen = 0;
@@ -1432,6 +1527,57 @@ static uint32_t _mcp_nextrand(uint32_t *s) {
14321527
return result;
14331528
}
14341529

1530+
void mcplib_rqu_log(mcp_request_t *rq, mcp_resp_t *rs, int cfd) {
1531+
LIBEVENT_THREAD *t = rs->thread;
1532+
logger *l = t->l;
1533+
1534+
long elapsed = 0;
1535+
1536+
int rtype = rs->resp.type;
1537+
int rcode = rs->resp.code;
1538+
int rstatus = rs->status;
1539+
elapsed = rs->elapsed;
1540+
1541+
bool do_log = false;
1542+
struct proxy_logging *pl = &rs->be->logging;
1543+
if (pl->all_errors && rstatus != MCMC_OK) {
1544+
do_log = true;
1545+
} else if (pl->deadline > 0 && elapsed > pl->deadline) {
1546+
do_log = true;
1547+
} else if (pl->rate > 0) {
1548+
// slightly biased random-to-rate without adding a loop, which is
1549+
// completely fine for this use case.
1550+
uint32_t rnd = (uint64_t)_mcp_nextrand(t->proxy_rng) * (uint64_t)pl->rate >> 32;
1551+
if (rnd == 0) {
1552+
do_log = true;
1553+
}
1554+
}
1555+
1556+
if (do_log) {
1557+
char *rname = rs->be->name;
1558+
char *rport = rs->be->port;
1559+
size_t dlen = 0;
1560+
const char *detail = NULL;
1561+
1562+
// FIXME: confirm this is what we want.
1563+
// I think this returns HIT in odd circumstances.
1564+
// might have to check for a "get class" cmd.
1565+
if (rstatus == MCMC_OK) {
1566+
if (rcode != MCMC_CODE_END) {
1567+
detail = rs->be->logging.detail_hit;
1568+
} else {
1569+
detail = rs->be->logging.detail_ok;
1570+
}
1571+
} else {
1572+
detail = rs->be->logging.detail_err;
1573+
}
1574+
if (detail) {
1575+
dlen = strlen(detail);
1576+
}
1577+
1578+
logger_log(l, LOGGER_PROXY_REQ, NULL, rq->pr.request, rq->pr.reqlen, elapsed, rtype, rcode, rstatus, cfd, detail, dlen, rname, rport);
1579+
}
1580+
}
14351581

14361582
// (milliseconds, sample_rate, allerrors, request, resp, "detail")
14371583
static int mcplib_log_reqsample(lua_State *L) {
@@ -1459,8 +1605,8 @@ static int mcplib_log_reqsample(lua_State *L) {
14591605
rtype = rs->resp.type;
14601606
rcode = rs->resp.code;
14611607
rstatus = rs->status;
1462-
rname = rs->be_name;
1463-
rport = rs->be_port;
1608+
rname = rs->be->name;
1609+
rport = rs->be->port;
14641610
elapsed = rs->elapsed;
14651611
}
14661612
size_t dlen = 0;

proxy_luafgen.c

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1084,6 +1084,10 @@ int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res)
10841084
}
10851085
}
10861086

1087+
if (res->be && res->be->use_logging) {
1088+
mcplib_rqu_log(rqu->rq, res, rctx->conn_fd);
1089+
}
1090+
10871091
if (rqu->cb_ref) {
10881092
lua_settop(rctx->Lc, 0);
10891093
lua_rawgeti(rctx->Lc, LUA_REGISTRYINDEX, rqu->cb_ref);
@@ -1117,6 +1121,7 @@ int mcp_process_rqueue_return(mcp_rcontext_t *rctx, int handle, mcp_resp_t *res)
11171121
// we settop _before_ calling cb's and
11181122
// _before_ setting up for a coro resume.
11191123
}
1124+
11201125
rqu->flags |= flag;
11211126
return rqu->flags;
11221127
}
@@ -1150,10 +1155,7 @@ void mcp_run_rcontext_handle(mcp_rcontext_t *rctx, int handle) {
11501155
if (rqu->obj_type == RQUEUE_TYPE_POOL) {
11511156
mcp_request_t *rq = rqu->rq;
11521157
mcp_backend_t *be = mcplib_pool_proxy_call_helper(rqu->obj, MCP_PARSER_KEY(rq->pr), rq->pr.klen);
1153-
// FIXME: queue requires conn because we're stacking objects
1154-
// into the connection for later submission, which means we
1155-
// absolutely cannot queue things once *c becomes invalid.
1156-
// need to assert/block this from happening.
1158+
11571159
mcp_set_resobj(rqu->res_obj, rq, be, rctx->fgen->thread);
11581160
io_pending_proxy_t *p = mcp_queue_rctx_io(rctx, rq, be, rqu->res_obj);
11591161
p->return_cb = proxy_return_rqu_cb;

0 commit comments

Comments
 (0)