Skip to content

Commit

Permalink
meta: E flag for overriding CAS value
Browse files Browse the repository at this point in the history
Ecas means: if operation succeeds, set the item's CAS value to this.
This allows external versioning of item data. This could be a row
version, a time stamp, a crc32 or 64bit data integrity hash, and so on.

overriding the CAS value may make some features not work as well (ie;
runtime enable/disable of `stats sizes`)
  • Loading branch information
dormando committed Apr 5, 2024
1 parent ea66fe0 commit 22480de
Show file tree
Hide file tree
Showing 12 changed files with 167 additions and 77 deletions.
11 changes: 11 additions & 0 deletions doc/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ The flags used by the 'mg' command are:
- v: return item value in <data block>

These flags can modify the item:
- E(token): use token as new CAS value if item is modified
- N(token): vivify on miss, takes TTL as a argument
- R(token): if remaining TTL is less than token, win for recache
- T(token): update remaining TTL
Expand Down Expand Up @@ -619,6 +620,13 @@ The data block for a metaget response is optional, requiring this flag to be
passed in. The response code also changes from "HD" to "VA <size>"

These flags can modify the item:
- E(token): use token as new CAS value if item is modified

Normally when an item is created it is given a CAS value from an internal
atomically incrementing counter. This allows overriding the CAS (8 byte
unsigned integer) with the specified value. This is useful for using an
external system to version cache data (row versions, clocks, etc).

- N(token): vivify on miss, takes TTL as a argument

Used to help with so called "dog piling" problems with recaching of popular
Expand Down Expand Up @@ -721,6 +729,7 @@ The flags used by the 'ms' command are:
- b: interpret key as base64 encoded binary value (see metaget)
- c: return CAS value if successfully stored.
- C(token): compare CAS value when storing item
- E(token): use token as new CAS value (see metaget for detail)
- F(token): set client flags to token (32 bit unsigned numeric)
- I: invalidate. set-to-invalid if supplied CAS is older than item's CAS
- k: return key as a token
Expand Down Expand Up @@ -815,6 +824,7 @@ The flags used by the 'md' command are:

- b: interpret key as base64 encoded binary value (see metaget)
- C(token): compare CAS value
- E(token): use token as new CAS value (see metaget for detail)
- I: invalidate. mark as stale, bumps CAS.
- k: return key
- O(token): opaque to copy back.
Expand Down Expand Up @@ -888,6 +898,7 @@ The flags used by the 'ma' command are:

- b: interpret key as base64 encoded binary value (see metaget)
- C(token): compare CAS value (see mset)
- E(token): use token as new CAS value (see metaget for detail)
- N(token): auto create item on miss with supplied TTL
- J(token): initial value to use if auto created after miss (default 0)
- D(token): delta to apply (decimal unsigned 64-bit number, default 1)
Expand Down
10 changes: 5 additions & 5 deletions items.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ static unsigned int sizes[LARGEST_ID];
static uint64_t sizes_bytes[LARGEST_ID];
static unsigned int *stats_sizes_hist = NULL;
static int stats_sizes_buckets = 0;
static uint64_t cas_id = 0;
static uint64_t cas_id = 1;

static volatile int do_run_lru_maintainer_thread = 0;
static pthread_mutex_t lru_maintainer_lock = PTHREAD_MUTEX_INITIALIZER;
Expand Down Expand Up @@ -488,7 +488,7 @@ static void item_unlink_q(item *it) {
pthread_mutex_unlock(&lru_locks[it->slabs_clsid]);
}

int do_item_link(item *it, const uint32_t hv) {
int do_item_link(item *it, const uint32_t hv, const uint64_t cas) {
MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
it->it_flags |= ITEM_LINKED;
Expand All @@ -501,7 +501,7 @@ int do_item_link(item *it, const uint32_t hv) {
STATS_UNLOCK();

/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
ITEM_set_cas(it, cas);
assoc_insert(it, hv);
item_link_q(it);
refcount_incr(it);
Expand Down Expand Up @@ -581,13 +581,13 @@ void do_item_update(item *it) {
}
}

int do_item_replace(item *it, item *new_it, const uint32_t hv) {
int do_item_replace(item *it, item *new_it, const uint32_t hv, const uint64_t cas) {
MEMCACHED_ITEM_REPLACE(ITEM_key(it), it->nkey, it->nbytes,
ITEM_key(new_it), new_it->nkey, new_it->nbytes);
assert((it->it_flags & ITEM_SLABBED) == 0);

do_item_unlink(it, hv);
return do_item_link(new_it, hv);
return do_item_link(new_it, hv, cas);
}

void item_flush_expired(void) {
Expand Down
4 changes: 2 additions & 2 deletions items.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ item *do_item_alloc_pull(const size_t ntotal, const unsigned int id);
void item_free(item *it);
bool item_size_ok(const size_t nkey, const client_flags_t flags, const int nbytes);

int do_item_link(item *it, const uint32_t hv); /** may fail if transgresses limits */
int do_item_link(item *it, const uint32_t hv, const uint64_t cas); /** may fail if transgresses limits */
void do_item_unlink(item *it, const uint32_t hv);
void do_item_unlink_nolock(item *it, const uint32_t hv);
void do_item_remove(item *it);
void do_item_update(item *it); /** update LRU time to current and reposition */
void do_item_update_nolock(item *it);
int do_item_replace(item *it, item *new_it, const uint32_t hv);
int do_item_replace(item *it, item *new_it, const uint32_t hv, const uint64_t cas);
void do_item_link_fixup(item *it);

int item_is_flushed(item *it);
Expand Down
8 changes: 4 additions & 4 deletions memcached.c
Original file line number Diff line number Diff line change
Expand Up @@ -1596,7 +1596,7 @@ static int _store_item_copy_data(int comm, item *old_it, item *new_it, item *add
*
* Returns the state of storage.
*/
enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, bool cas_stale) {
enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, uint64_t cas_in, bool cas_stale) {
char *key = ITEM_key(it);
item *old_it = do_item_get(key, it->nkey, hv, t, DONT_UPDATE);
enum store_item_type stored = NOT_STORED;
Expand Down Expand Up @@ -1712,7 +1712,7 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const

if (do_store) {
STORAGE_delete(t->storage, old_it);
item_replace(old_it, it, hv);
item_replace(old_it, it, hv, cas_in);
stored = STORED;
}

Expand Down Expand Up @@ -1750,7 +1750,7 @@ enum store_item_type do_store_item(item *it, int comm, LIBEVENT_THREAD *t, const
}

if (do_store) {
do_item_link(it, hv);
do_item_link(it, hv, cas_in);
stored = STORED;
}
}
Expand Down Expand Up @@ -2357,7 +2357,7 @@ enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key, const s
}
memcpy(ITEM_data(new_it), buf, res);
memcpy(ITEM_data(new_it) + res, "\r\n", 2);
item_replace(it, new_it, hv);
item_replace(it, new_it, hv, (settings.use_cas) ? get_cas_id() : 0);
// Overwrite the older item's CAS with our new CAS since we're
// returning the CAS of the old item below.
ITEM_set_cas(it, (settings.use_cas) ? ITEM_get_cas(new_it) : 0);
Expand Down
6 changes: 3 additions & 3 deletions memcached.h
Original file line number Diff line number Diff line change
Expand Up @@ -953,7 +953,7 @@ enum delta_result_type do_add_delta(LIBEVENT_THREAD *t, const char *key,
const int64_t delta, char *buf,
uint64_t *cas, const uint32_t hv,
item **it_ret);
enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, bool cas_stale);
enum store_item_type do_store_item(item *item, int comm, LIBEVENT_THREAD *t, const uint32_t hv, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale);
void thread_io_queue_add(LIBEVENT_THREAD *t, int type, void *ctx, io_queue_stack_cb cb);
void conn_io_queue_setup(conn *c);
io_queue_t *conn_io_queue_get(conn *c, int type);
Expand Down Expand Up @@ -1009,7 +1009,7 @@ item *item_get_locked(const char *key, const size_t nkey, LIBEVENT_THREAD *t, co
item *item_touch(const char *key, const size_t nkey, uint32_t exptime, LIBEVENT_THREAD *t);
int item_link(item *it);
void item_remove(item *it);
int item_replace(item *it, item *new_it, const uint32_t hv);
int item_replace(item *it, item *new_it, const uint32_t hv, const uint64_t cas_in);
void item_unlink(item *it);

void item_lock(uint32_t hv);
Expand All @@ -1035,7 +1035,7 @@ LIBEVENT_THREAD *get_worker_thread(int id);
void append_stat(const char *name, ADD_STAT add_stats, conn *c,
const char *fmt, ...);

enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, bool cas_stale);
enum store_item_type store_item(item *item, int comm, LIBEVENT_THREAD *t, int *nbytes, uint64_t *cas, const uint64_t cas_in, bool cas_stale);

/* Protocol related code */
void out_string(conn *c, const char *str);
Expand Down
4 changes: 2 additions & 2 deletions proto_bin.c
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ static void complete_incr_bin(conn *c, char *extbuf) {
memcpy(ITEM_data(it) + res, "\r\n", 2);
c->thread->cur_sfd = c->sfd; // for store_item logging.

if (store_item(it, NREAD_ADD, c->thread, NULL, &cas, CAS_NO_STALE)) {
if (store_item(it, NREAD_ADD, c->thread, NULL, &cas, (settings.use_cas) ? get_cas_id() : 0, CAS_NO_STALE)) {
c->cas = cas;
write_bin_response(c, &rsp->message.body, 0, 0, sizeof(rsp->message.body.value));
} else {
Expand Down Expand Up @@ -386,7 +386,7 @@ static void complete_update_bin(conn *c) {

uint64_t cas = 0;
c->thread->cur_sfd = c->sfd; // for store_item logging.
ret = store_item(it, c->cmd, c->thread, NULL, &cas, CAS_NO_STALE);
ret = store_item(it, c->cmd, c->thread, NULL, &cas, (settings.use_cas) ? get_cas_id() : 0, CAS_NO_STALE);
c->cas = cas;

#ifdef ENABLE_DTRACE
Expand Down
36 changes: 26 additions & 10 deletions proto_text.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ typedef struct token_s {
size_t length;
} token_t;

static void _finalize_mset(conn *c, int nbytes, enum store_item_type ret) {
static void _finalize_mset(conn *c, int nbytes, enum store_item_type ret, uint64_t cas) {
mc_resp *resp = c->resp;
item *it = c->item;
conn_set_state(c, conn_new_cmd);
Expand Down Expand Up @@ -103,7 +103,7 @@ static void _finalize_mset(conn *c, int nbytes, enum store_item_type ret) {
// We don't have the CAS until this point, which is why we
// generate this line so late.
META_CHAR(p, 'c');
p = itoa_u64(c->cas, p);
p = itoa_u64(cas, p);
break;
case 's':
// Get final item size, ie from append/prepend
Expand Down Expand Up @@ -181,7 +181,8 @@ void complete_nread_ascii(conn *c) {
} else {
uint64_t cas = 0;
c->thread->cur_sfd = c->sfd; // cuddle sfd for logging.
ret = store_item(it, comm, c->thread, &nbytes, &cas, c->set_stale);
ret = store_item(it, comm, c->thread, &nbytes, &cas, c->cas ? c->cas : get_cas_id(), c->set_stale);
c->cas = 0;

#ifdef ENABLE_DTRACE
switch (c->cmd) {
Expand Down Expand Up @@ -213,8 +214,7 @@ void complete_nread_ascii(conn *c) {
#endif

if (c->mset_res) {
c->cas = cas;
_finalize_mset(c, nbytes, ret);
_finalize_mset(c, nbytes, ret, cas);
} else {
switch (ret) {
case STORED:
Expand Down Expand Up @@ -915,6 +915,7 @@ struct _meta_flags {
unsigned int set_stale :1;
unsigned int no_reply :1;
unsigned int has_cas :1;
unsigned int has_cas_in :1;
unsigned int new_ttl :1;
unsigned int key_binary:1;
char mode; // single character mode switch, common to ms/ma
Expand All @@ -923,6 +924,7 @@ struct _meta_flags {
rel_time_t recache_time;
client_flags_t client_flags;
uint64_t req_cas_id;
uint64_t cas_id_in; // client supplied next-CAS
uint64_t delta; // ma
uint64_t initial; // ma
};
Expand Down Expand Up @@ -1028,6 +1030,14 @@ static int _meta_flag_preparse(token_t *tokens, const size_t start,
of->has_cas = true;
}
break;
case 'E': // ms, md, ma
if (!safe_strtoull(tokens[i].value+1, &of->cas_id_in)) {
*errstr = "CLIENT_ERROR bad token in command line format";
of->has_error = true;
} else {
of->has_cas_in = true;
}
break;
case 'M': // mset and marithmetic mode switch
if (tokens[i].length != 2) {
*errstr = "CLIENT_ERROR incorrect length for M token";
Expand Down Expand Up @@ -1136,7 +1146,7 @@ static void process_mget_command(conn *c, token_t *tokens, const size_t ntokens)
// I look forward to the day I get rid of this :)
memcpy(ITEM_data(it), "\r\n", 2);
// NOTE: This initializes the CAS value.
do_item_link(it, hv);
do_item_link(it, hv, of.has_cas_in ? of.cas_id_in : get_cas_id());
item_created = true;
}
}
Expand Down Expand Up @@ -1440,8 +1450,8 @@ static void process_mset_command(conn *c, token_t *tokens, const size_t ntokens)

// Set noreply after tokens are understood.
c->noreply = of.no_reply;
// Clear cas return value
c->cas = 0;
// Set cas return value
c->cas = of.has_cas_in ? of.cas_id_in : get_cas_id();
exptime = of.exptime;

bool has_error = false;
Expand Down Expand Up @@ -1677,7 +1687,7 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke
// Also need to remove TOKEN_SENT, so next client can win.
it->it_flags &= ~ITEM_TOKEN_SENT;

ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
ITEM_set_cas(it, of.has_cas_in ? of.cas_id_in : get_cas_id());

// Clients can noreply nominal responses.
if (c->noreply)
Expand Down Expand Up @@ -1797,6 +1807,11 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n
if (c->noreply)
resp->skip = true;
// *it was filled, set the status below.
if (of.has_cas_in) {
// override the CAS. slightly inefficient but fixing that can wait
// until the next time do_add_delta is changed.
ITEM_set_cas(it, of.cas_id_in);
}
break;
case NON_NUMERIC:
errstr = "CLIENT_ERROR cannot increment or decrement non-numeric value";
Expand All @@ -1815,7 +1830,8 @@ static void process_marithmetic_command(conn *c, token_t *tokens, const size_t n
if (it != NULL) {
memcpy(ITEM_data(it), tmpbuf, vlen);
memcpy(ITEM_data(it) + vlen, "\r\n", 2);
if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, NULL, CAS_NO_STALE)) {
if (do_store_item(it, NREAD_ADD, c->thread, hv, NULL, NULL,
of.has_cas_in ? of.cas_id_in : get_cas_id(), CAS_NO_STALE)) {
item_created = true;
} else {
// Not sure how we can get here if we're holding the lock.
Expand Down
26 changes: 21 additions & 5 deletions proxy_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ static void process_update_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *re
return;
}

int ret = store_item(it, comm, t, NULL, NULL, CAS_NO_STALE);
int ret = store_item(it, comm, t, NULL, NULL, (settings.use_cas) ? get_cas_id() : 0, CAS_NO_STALE);
switch (ret) {
case STORED:
pout_string(resp, "STORED");
Expand Down Expand Up @@ -586,6 +586,7 @@ struct _meta_flags {
unsigned int set_stale :1;
unsigned int no_reply :1;
unsigned int has_cas :1;
unsigned int has_cas_in :1;
unsigned int new_ttl :1;
unsigned int key_binary:1;
char mode; // single character mode switch, common to ms/ma
Expand All @@ -594,6 +595,7 @@ struct _meta_flags {
rel_time_t recache_time;
client_flags_t client_flags;
uint64_t req_cas_id;
uint64_t cas_id_in; // client supplied next-CAS
uint64_t delta; // ma
uint64_t initial; // ma
};
Expand Down Expand Up @@ -701,6 +703,14 @@ static int _meta_flag_preparse(mcp_parser_t *pr, const size_t start,
of->has_cas = true;
}
break;
case 'E': // ms, md, ma
if (!safe_strtoull(&pr->request[pr->tokens[i]+1], &of->cas_id_in)) {
*errstr = "CLIENT_ERROR bad token in command line format";
of->has_error = true;
} else {
of->has_cas_in = true;
}
break;
case 'M': // mset and marithmetic mode switch
// FIXME: this used to error if the token isn't a single byte.
// It probably should still?
Expand Down Expand Up @@ -796,7 +806,7 @@ static void process_mget_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp
// I look forward to the day I get rid of this :)
memcpy(ITEM_data(it), "\r\n", 2);
// NOTE: This initializes the CAS value.
do_item_link(it, hv);
do_item_link(it, hv, of.has_cas_in ? of.cas_id_in : get_cas_id());
item_created = true;
}
}
Expand Down Expand Up @@ -1173,7 +1183,7 @@ static void process_mset_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *resp

uint64_t cas = 0;
int nbytes = 0;
int ret = store_item(it, comm, t, &nbytes, &cas, set_stale);
int ret = store_item(it, comm, t, &nbytes, &cas, of.has_cas_in ? of.cas_id_in : get_cas_id(), set_stale);
switch (ret) {
case STORED:
memcpy(p, "HD", 2);
Expand Down Expand Up @@ -1324,7 +1334,7 @@ static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *r
// Also need to remove TOKEN_SENT, so next client can win.
it->it_flags &= ~ITEM_TOKEN_SENT;

ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
ITEM_set_cas(it, of.has_cas_in ? of.cas_id_in : get_cas_id());

// Clients can noreply nominal responses.
if (of.no_reply)
Expand Down Expand Up @@ -1440,6 +1450,11 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res
//if (c->noreply)
// resp->skip = true;
// *it was filled, set the status below.
if (of.has_cas_in) {
// override the CAS. slightly inefficient but fixing that can wait
// until the next time do_add_delta is changed.
ITEM_set_cas(it, of.cas_id_in);
}
cas = ITEM_get_cas(it);
break;
case NON_NUMERIC:
Expand All @@ -1459,7 +1474,8 @@ static void process_marithmetic_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_res
if (it != NULL) {
memcpy(ITEM_data(it), tmpbuf, vlen);
memcpy(ITEM_data(it) + vlen, "\r\n", 2);
if (do_store_item(it, NREAD_ADD, t, hv, NULL, &cas, CAS_NO_STALE)) {
if (do_store_item(it, NREAD_ADD, t, hv, NULL, &cas,
of.has_cas_in ? of.cas_id_in : get_cas_id(), CAS_NO_STALE)) {
item_created = true;
} else {
// Not sure how we can get here if we're holding the lock.
Expand Down
Loading

0 comments on commit 22480de

Please sign in to comment.