Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1522,6 +1522,8 @@ sds generateClusterSlotResponse(int resp) {
}
}
setDeferredArrayLen(recording_client, slot_replylen, num_primaries);
/* For cluster slots, deferred length should put all data in reply list, not buffer */
serverAssert(recording_client->bufpos == 0);
sds cluster_slot_response = aggregateClientOutputBuffer(recording_client);
deleteCachedResponseClient(recording_client);
return cluster_slot_response;
Expand Down
10 changes: 10 additions & 0 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -1385,6 +1385,8 @@ int VM_CreateCommand(ValkeyModuleCtx *ctx,
serverAssert(hashtableAdd(server.commands, cp->serverCmd));
serverAssert(hashtableAdd(server.orig_commands, cp->serverCmd));
cp->serverCmd->id = ACLGetCommandID(declared_name); /* ID used for ACL. */
/* Invalidate COMMAND response cache since we added a new command */
invalidateCommandCache();
return VALKEYMODULE_OK;
}

Expand Down Expand Up @@ -12530,6 +12532,12 @@ int moduleFreeCommand(struct ValkeyModule *module, struct serverCommand *cmd) {
hdr_close(cmd->latency_histogram);
cmd->latency_histogram = NULL;
}
for (int i = 0; i < RESP_CACHE_INDEX_MAX; i++) {
if (cmd->info_cache[i]) {
sdsfree(cmd->info_cache[i]);
cmd->info_cache[i] = NULL;
}
}
moduleFreeArgs(cmd->args, cmd->num_args);
zfree(cp);

Expand Down Expand Up @@ -12571,6 +12579,8 @@ void moduleUnregisterCommands(struct ValkeyModule *module) {
zfree(cmd);
}
hashtableResetIterator(&iter);
/* Invalidate COMMAND response cache since we removed commands */
invalidateCommandCache();
}

/* We parse argv to add sds "NAME VALUE" pairs to the server.module_configs_queue list of configs.
Expand Down
12 changes: 8 additions & 4 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,21 @@ int prepareClientToWrite(client *c) {
return C_OK;
}

/* Returns everything in the client reply linked list in a SDS format.
/* Returns everything in the client reply buffer and linked list in a SDS format.
* This should only be used only with a caching client. */
sds aggregateClientOutputBuffer(client *c) {
sds cmd_response = sdsempty();

/* First, collect from the fixed buffer if any */
if (c->bufpos > 0) {
cmd_response = sdscatlen(cmd_response, c->buf, c->bufpos);
}

/* Then, collect from the reply list */
listIter li;
listNode *ln;
clientReplyBlock *val_block;
listRewind(c->reply, &li);

/* Here, c->buf is not used, thus we confirm c->bufpos remains 0. */
serverAssert(c->bufpos == 0);
while ((ln = listNext(&li)) != NULL) {
val_block = (clientReplyBlock *)listNodeValue(ln);
cmd_response = sdscatlen(cmd_response, val_block->buf, val_block->used);
Expand Down
105 changes: 83 additions & 22 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2342,6 +2342,9 @@ void initServerConfig(void) {
* valkey.conf using the rename-command directive. */
server.commands = hashtableCreate(&commandSetType);
server.orig_commands = hashtableCreate(&originalCommandSetType);
for (int i = 0; i < RESP_CACHE_INDEX_MAX; i++) {
server.command_response_cache[i] = NULL;
}
populateCommandTable();

/* Debugging */
Expand Down Expand Up @@ -3282,6 +3285,11 @@ int populateCommandStructure(struct serverCommand *c) {
* has been issued for the first time */
c->latency_histogram = NULL;

/* Initialize command info cache */
for (int i = 0; i < RESP_CACHE_INDEX_MAX; i++) {
c->info_cache[i] = NULL;
}

/* Handle the legacy range spec and the "movablekeys" flag (must be done after populating all key specs). */
populateCommandLegacyRangeSpec(c);

Expand Down Expand Up @@ -5222,30 +5230,53 @@ void addReplyCommandSubCommands(client *c,
hashtableResetIterator(&iter);
}

/* Forward declaration */
void addReplyCommandInfo(client *c, struct serverCommand *cmd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a block of forward declarations near the top of this file where we could move this declaration, under

/*============================ Internal prototypes ========================== */


/* Generate and cache the command info response for a given protocol version */
static void cacheCommandInfo(struct serverCommand *cmd, int resp) {
client *caching_client = createCachedResponseClient(resp);

int firstkey = 0, lastkey = 0, keystep = 0;
if (cmd->legacy_range_key_spec.begin_search_type != KSPEC_BS_INVALID) {
firstkey = cmd->legacy_range_key_spec.bs.index.pos;
lastkey = cmd->legacy_range_key_spec.fk.range.lastkey;
if (lastkey >= 0) lastkey += firstkey;
keystep = cmd->legacy_range_key_spec.fk.range.keystep;
}

addReplyArrayLen(caching_client, 10);
addReplyBulkCBuffer(caching_client, cmd->fullname, sdslen(cmd->fullname));
addReplyLongLong(caching_client, cmd->arity);
addReplyFlagsForCommand(caching_client, cmd);
addReplyLongLong(caching_client, firstkey);
addReplyLongLong(caching_client, lastkey);
addReplyLongLong(caching_client, keystep);
addReplyCommandCategories(caching_client, cmd);
addReplyCommandTips(caching_client, cmd);
addReplyCommandKeySpecs(caching_client, cmd);
addReplyCommandSubCommands(caching_client, cmd, addReplyCommandInfo, 0);

cmd->info_cache[RESP_CACHE_INDEX(resp)] = aggregateClientOutputBuffer(caching_client);

deleteCachedResponseClient(caching_client);
}

/* Output the representation of a server command. Used by the COMMAND command and COMMAND INFO. */
void addReplyCommandInfo(client *c, struct serverCommand *cmd) {
if (!cmd) {
addReplyNull(c);
} else {
int firstkey = 0, lastkey = 0, keystep = 0;
if (cmd->legacy_range_key_spec.begin_search_type != KSPEC_BS_INVALID) {
firstkey = cmd->legacy_range_key_spec.bs.index.pos;
lastkey = cmd->legacy_range_key_spec.fk.range.lastkey;
if (lastkey >= 0) lastkey += firstkey;
keystep = cmd->legacy_range_key_spec.fk.range.keystep;
/* Use cached response if available for the client's protocol version */
int cache_idx = RESP_CACHE_INDEX(c->resp);
sds cache = cmd->info_cache[cache_idx];

if (cache == NULL) {
cacheCommandInfo(cmd, c->resp);
cache = cmd->info_cache[cache_idx];
Comment on lines +5274 to +5276
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here. We can let the other function just return the response instead of caching it to avoid some dependency on side-effects between these two functions:

Suggested change
if (cache == NULL) {
cacheCommandInfo(cmd, c->resp);
cache = cmd->info_cache[cache_idx];
if (cache == NULL) {
cache = generateCommandInfoResponse(cmd, c->resp);
cmd->info_cache[cache_idx] = cache;

}

addReplyArrayLen(c, 10);
addReplyBulkCBuffer(c, cmd->fullname, sdslen(cmd->fullname));
addReplyLongLong(c, cmd->arity);
addReplyFlagsForCommand(c, cmd);
addReplyLongLong(c, firstkey);
addReplyLongLong(c, lastkey);
addReplyLongLong(c, keystep);
addReplyCommandCategories(c, cmd);
addReplyCommandTips(c, cmd);
addReplyCommandKeySpecs(c, cmd);
addReplyCommandSubCommands(c, cmd, addReplyCommandInfo, 0);

addReplyProto(c, cache, sdslen(cache));
}
}

Expand Down Expand Up @@ -5370,17 +5401,47 @@ void getKeysSubcommand(client *c) {
getKeysSubcommandImpl(c, 0);
}

/* COMMAND (no args) */
void commandCommand(client *c) {
/* Invalidate the cached COMMAND response when command table changes */
void invalidateCommandCache(void) {
for (int i = 0; i < RESP_CACHE_INDEX_MAX; i++) {
if (server.command_response_cache[i]) {
sdsfree(server.command_response_cache[i]);
server.command_response_cache[i] = NULL;
}
}
}

/* Generate and cache the full COMMAND response */
static void cacheCommandResponse(int resp) {
client *caching_client = createCachedResponseClient(resp);

hashtableIterator iter;
void *next;
addReplyArrayLen(c, hashtableSize(server.commands));
addReplyArrayLen(caching_client, hashtableSize(server.commands));
hashtableInitIterator(&iter, server.commands, 0);
while (hashtableNext(&iter, &next)) {
struct serverCommand *cmd = next;
addReplyCommandInfo(c, cmd);
addReplyCommandInfo(caching_client, cmd);
}
hashtableResetIterator(&iter);

server.command_response_cache[RESP_CACHE_INDEX(resp)] = aggregateClientOutputBuffer(caching_client);

deleteCachedResponseClient(caching_client);
}

/* COMMAND (no args) */
void commandCommand(client *c) {
/* Use cached response if available for the client's protocol version */
int cache_idx = RESP_CACHE_INDEX(c->resp);
sds cache = server.command_response_cache[cache_idx];

if (cache == NULL) {
cacheCommandResponse(c->resp);
cache = server.command_response_cache[cache_idx];
}
Comment on lines +5437 to +5442
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we rely on cacheCommandResponse populating a specific global and then we fetch it afterwards. It looks like an implicit dependency that we can avoid.

Consider returning the response from cachecCommandResponse, like we do in generateClusterSlotResponse, and where the corresponding call looks like this:

    sds cached_reply = server.cached_cluster_slot_info[conn_type];
    if (!cached_reply) {
        cached_reply = generateClusterSlotResponse(c->resp);
        server.cached_cluster_slot_info[conn_type] = cached_reply;
    }


addReplyProto(c, cache, sdslen(cache));
}

/* COMMAND COUNT */
Expand Down
6 changes: 6 additions & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1566,6 +1566,9 @@ struct malloc_stats {
#define CACHE_CONN_TYPE_RESP3 (1 << 2)
#define CACHE_CONN_TYPE_MAX (1 << 3)

#define RESP_CACHE_INDEX_MAX 2 /* [0]=RESP2, [1]=RESP3 */
#define RESP_CACHE_INDEX(resp) ((resp) == 3 ? 1 : 0) /* Convert RESP version to cache array index */

/*-----------------------------------------------------------------------------
* TLS Context Configuration
*----------------------------------------------------------------------------*/
Expand Down Expand Up @@ -1677,6 +1680,7 @@ struct valkeyServer {
serverDb **db; /* each db created when it's first used */
hashtable *commands; /* Command table */
hashtable *orig_commands; /* Command table before command renaming. */
sds command_response_cache[RESP_CACHE_INDEX_MAX]; /* Cached COMMAND response: [0]=RESP2, [1]=RESP3 */
aeEventLoop *el;
_Atomic AeIoState io_poll_state; /* Indicates the state of the IO polling. */
int io_ae_fired_events; /* Number of poll events received by the IO thread. */
Expand Down Expand Up @@ -2626,6 +2630,7 @@ struct serverCommand {
* (not the fullname), and the value is the serverCommand structure pointer. */
struct serverCommand *parent;
struct ValkeyModuleCommand *module_cmd; /* A pointer to the module command data (NULL if native command) */
sds info_cache[RESP_CACHE_INDEX_MAX]; /* Cached COMMAND INFO response: [0]=RESP2, [1]=RESP3 */
};

struct serverError {
Expand Down Expand Up @@ -3783,6 +3788,7 @@ void commandCommand(client *c);
void commandCountCommand(client *c);
void commandListCommand(client *c);
void commandInfoCommand(client *c);
void invalidateCommandCache(void);
void commandGetKeysCommand(client *c);
void commandGetKeysAndFlagsCommand(client *c);
void commandHelpCommand(client *c);
Expand Down
Loading
Loading