Skip to content

Commit

Permalink
meta: md with I and x removes value
Browse files Browse the repository at this point in the history
If `md` is given the `x` flags it will remove the value but leave the
item. When combined with 'I' it will mark this new item as stale as
well.

This allows the user to create a tombstone of a previous value with a
new TTL.
  • Loading branch information
dormando committed Apr 12, 2024
1 parent 6537dcd commit 563d05a
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 9 deletions.
7 changes: 7 additions & 0 deletions doc/protocol.txt
Original file line number Diff line number Diff line change
Expand Up @@ -830,6 +830,7 @@ The flags used by the 'md' command are:
- O(token): opaque to copy back.
- q: noreply
- T(token): updates TTL, only when paired with the 'I' flag
- x: removes the item value, but leaves the item.

The flags are now repeated with detailed information where useful:

Expand Down Expand Up @@ -858,6 +859,12 @@ When marking an item as stale with 'I', the 'T' flag can be used to update the
TTL as well; limiting the amount of time an item will live while stale and
waiting to be recached.

- x: removes the item value, but leaves the item.

This deletes the value off of an item (by replacing it with an empty value
item atomically). Combined with I this can leave what is effectively a
tombstone of a previous value.

Meta Arithmetic
---------------

Expand Down
38 changes: 34 additions & 4 deletions proto_text.c
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,7 @@ struct _meta_flags {
unsigned int has_cas_in :1;
unsigned int new_ttl :1;
unsigned int key_binary:1;
unsigned int remove_val:1;
char mode; // single character mode switch, common to ms/ma
rel_time_t exptime;
rel_time_t autoviv_exptime;
Expand Down Expand Up @@ -1016,6 +1017,9 @@ static int _meta_flag_preparse(token_t *tokens, const size_t start,
case 'q':
of->no_reply = 1;
break;
case 'x':
of->remove_val = 1;
break;
// mset-related.
case 'F':
if (!safe_strtoflags(tokens[i].value+1, &of->client_flags)) {
Expand Down Expand Up @@ -1610,7 +1614,7 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke
size_t nkey;
item *it = NULL;
int i;
uint32_t hv;
uint32_t hv = 0;
struct _meta_flags of = {0}; // option bitflags.
char *errstr = "CLIENT_ERROR bad command line format";
assert(c != NULL);
Expand Down Expand Up @@ -1676,6 +1680,25 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke
goto cleanup;
}

// If requested, create a new empty tombstone item.
if (of.remove_val) {
item *new_it = item_alloc(key, nkey, of.client_flags, of.exptime, 2);
if (new_it != NULL) {
memcpy(ITEM_data(new_it), "\r\n", 2);
if (do_store_item(new_it, NREAD_SET, c->thread, hv, NULL, NULL, ITEM_get_cas(it), CAS_NO_STALE)) {
do_item_remove(it);
it = new_it;
} else {
do_item_remove(new_it);
memcpy(resp->wbuf, "NS", 2);
goto cleanup;
}
} else {
errstr = "SERVER_ERROR out of memory";
goto error;
}
}

// If we're to set this item as stale, we don't actually want to
// delete it. We mark the stale bit, bump CAS, and update exptime if
// we were supplied a new TTL.
Expand All @@ -1688,19 +1711,21 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke
it->it_flags &= ~ITEM_TOKEN_SENT;

ITEM_set_cas(it, of.has_cas_in ? of.cas_id_in : get_cas_id());

// Clients can noreply nominal responses.
if (c->noreply)
resp->skip = true;

memcpy(resp->wbuf, "HD", 2);
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);

LOGGER_LOG(NULL, LOG_DELETIONS, LOGGER_DELETIONS, it, LOG_TYPE_META_DELETE);
do_item_unlink(it, hv);
STORAGE_delete(c->thread->storage, it);
if (!of.remove_val) {
do_item_unlink(it, hv);
STORAGE_delete(c->thread->storage, it);
}
if (c->noreply)
resp->skip = true;
memcpy(resp->wbuf, "HD", 2);
Expand All @@ -1727,6 +1752,11 @@ static void process_mdelete_command(conn *c, token_t *tokens, const size_t ntoke
conn_set_state(c, conn_new_cmd);
return;
error:
// cleanup if an error happens after we fetched an item.
if (it) {
do_item_remove(it);
item_unlock(hv);
}
out_errstring(c, errstr);
}

Expand Down
38 changes: 33 additions & 5 deletions proxy_internal.c
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ struct _meta_flags {
unsigned int has_cas_in :1;
unsigned int new_ttl :1;
unsigned int key_binary:1;
unsigned int remove_val:1;
char mode; // single character mode switch, common to ms/ma
rel_time_t exptime;
rel_time_t autoviv_exptime;
Expand Down Expand Up @@ -689,6 +690,9 @@ static int _meta_flag_preparse(mcp_parser_t *pr, const size_t start,
case 'q':
of->no_reply = 1;
break;
case 'x':
of->remove_val = 1;
break;
// mset-related.
case 'F':
if (!safe_strtoflags(&pr->request[pr->tokens[i]+1], &of->client_flags)) {
Expand Down Expand Up @@ -1323,6 +1327,25 @@ static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *r
goto cleanup;
}

// If requested, create a new empty tombstone item.
if (of.remove_val) {
item *new_it = item_alloc(key, nkey, of.client_flags, of.exptime, 2);
if (new_it != NULL) {
memcpy(ITEM_data(new_it), "\r\n", 2);
if (do_store_item(new_it, NREAD_SET, t, hv, NULL, NULL, ITEM_get_cas(it), CAS_NO_STALE)) {
do_item_remove(it);
it = new_it;
} else {
do_item_remove(new_it);
memcpy(resp->wbuf, "NS", 2);
goto cleanup;
}
} else {
errstr = "SERVER_ERROR out of memory";
goto error;
}
}

// If we're to set this item as stale, we don't actually want to
// delete it. We mark the stale bit, bump CAS, and update exptime if
// we were supplied a new TTL.
Expand All @@ -1335,18 +1358,18 @@ static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *r
it->it_flags &= ~ITEM_TOKEN_SENT;

ITEM_set_cas(it, of.has_cas_in ? of.cas_id_in : get_cas_id());

// Clients can noreply nominal responses.
if (of.no_reply)
resp->skip = true;

memcpy(resp->wbuf, "HD", 2);
} else {
pthread_mutex_lock(&t->stats.mutex);
t->stats.slab_stats[ITEM_clsid(it)].delete_hits++;
pthread_mutex_unlock(&t->stats.mutex);

do_item_unlink(it, hv);
STORAGE_delete(t->storage, it);
if (!of.remove_val) {
do_item_unlink(it, hv);
STORAGE_delete(t->storage, it);
}
if (of.no_reply)
resp->skip = true;
memcpy(resp->wbuf, "HD", 2);
Expand All @@ -1373,6 +1396,11 @@ static void process_mdelete_cmd(LIBEVENT_THREAD *t, mcp_parser_t *pr, mc_resp *r
//conn_set_state(c, conn_new_cmd);
return;
error:
// cleanup if an error happens after we fetched an item.
if (it) {
do_item_remove(it);
item_unlock(hv);
}
pout_errstring(resp, errstr);
}

Expand Down
33 changes: 33 additions & 0 deletions t/metaget.t
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ subtest 'marith with CAS override' => sub {

{
note "pipeline test";
my $sock = $server->new_sock;
print $sock "ms foo 2 T100\r\nna\r\n";
like(scalar <$sock>, qr/^HD/, "set foo");
print $sock "mg foo s\r\nmg foo s\r\nquit\r\nmg foo s\r\n";
Expand All @@ -741,6 +742,34 @@ subtest 'marith with CAS override' => sub {
is(scalar <$sock>, undef, "final get didn't run");
}

subtest 'md x and I flags' => sub {
my $k = 'mdx';
print $sock "ms $k 2 T50\r\nmx\r\n";
like(scalar <$sock>, qr/^HD/, "set $k");

my $res = mget($sock, $k, 't v');
is($res->{val}, 'mx', 'seed value as expected');

print $sock "md $k x\r\n";
like(scalar <$sock>, qr/^HD/, "mdelete with x");

$res = mget($sock, $k, 't v');
is($res->{val}, '', 'value zeroed out');

# re-set for x + I
print $sock "ms $k 2 T50\r\nmz\r\n";
like(scalar <$sock>, qr/^HD/, "set $k");

$res = mget($sock, $k, 't v');
is($res->{val}, 'mz', 'seed value as expected');

print $sock "md $k x I\r\n";
like(scalar <$sock>, qr/^HD/, "mdelete with x and I");
$res = mget($sock, $k, 't v');
is($res->{val}, '', 'value zeroed out');
ok(find_flags($res, 'XW'), "got win and stale flags back");
};

# TODO: move wait_for_ext into Memcached.pm
sub wait_for_ext {
my $sock = shift;
Expand Down Expand Up @@ -902,6 +931,10 @@ sub mget_res {
} elsif ($resp =~ m/^HD\s*([^\r]+)\r\n/gm) {
$r{flags} = $1;
$r{hd} = 1;
} elsif ($resp =~ m/^EN/gm) {
# do nothing?
} else {
die "Unable to parse mget response: $resp";
}

return \%r;
Expand Down

0 comments on commit 563d05a

Please sign in to comment.