Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Namespaces #374

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
3c4212f
implemented basic support for namespaces (this only adds support for …
ederuiter Oct 16, 2021
b8561cf
implemented database allocation + acl namespace support
ederuiter Oct 22, 2021
25e3744
make sure db allocation is always present when selecting a different …
ederuiter Oct 23, 2021
d3cd457
fix debug htstats to work with namespaces
ederuiter Oct 23, 2021
eb311cf
make sure fake replication client also has a namespace
ederuiter Oct 23, 2021
c8c4828
fix rreplay tests to also include allocate calls (this is needed as i…
ederuiter Oct 23, 2021
9adb05c
added actual namespace switching in ACLAuthenticateUser
ederuiter Oct 24, 2021
4254348
added basic namespace tests
ederuiter Oct 24, 2021
71aa119
Move master initialization out of config parsing; this caused issues …
ederuiter Oct 29, 2021
acda6f2
Fix flushall + emptyDbStucture + debug loadaof
ederuiter Oct 29, 2021
9b98faf
Added database mapping + namespace to client list output + added comp…
ederuiter Oct 29, 2021
05b631e
Namespace fixes for rdb/aof/scripting
ederuiter Oct 29, 2021
8434366
redisMaster->staleKeyMap is not always initialized, only delete it wh…
ederuiter Oct 29, 2021
c2efd10
added support for ::auto namespace + support for default user namespa…
ederuiter Nov 1, 2021
ace526f
forgot to commit generateAutoNamespaceName implementation
ederuiter Nov 1, 2021
d0e5165
Added SELECTNS admin command to switch to databases in other namespaces
ederuiter Nov 6, 2021
0f47216
Added last_allocated_db to prevent looping over all databases every time
ederuiter Nov 6, 2021
6cadb8c
Reset last_allocated_db after emptying all databases
ederuiter Nov 6, 2021
160d443
Move script cache to namespace
ederuiter Nov 6, 2021
ac5c4e3
Added command stats per namespace + output them in the info command
ederuiter Nov 7, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion src/acl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ user *ACLCreateUser(const char *name, size_t namelen) {
if (raxFind(Users,(unsigned char*)name,namelen) != raxNotFound) return NULL;
user *u = (user*)zmalloc(sizeof(*u), MALLOC_LOCAL);
u->name = sdsnewlen(name,namelen);
u->ns_name = sdsnew("::");
u->flags = USER_FLAG_DISABLED | g_pserver->acl_pubsub_default;
u->allowed_subcommands = NULL;
u->passwords = listCreate();
Expand Down Expand Up @@ -288,6 +289,7 @@ user *ACLCreateUnlinkedUser(void) {
* will not remove the user from the Users global radix tree. */
void ACLFreeUser(user *u) {
sdsfree(u->name);
sdsfree(u->ns_name);
listRelease(u->passwords);
listRelease(u->patterns);
listRelease(u->channels);
Expand Down Expand Up @@ -338,6 +340,8 @@ void ACLCopyUser(user *dst, user *src) {
memcpy(dst->allowed_commands,src->allowed_commands,
sizeof(dst->allowed_commands));
dst->flags = src->flags;
sdsfree(dst->ns_name);
dst->ns_name = sdsdup(src->ns_name);
ACLResetSubcommands(dst);
/* Copy the allowed subcommands array of array of SDS strings. */
if (src->allowed_subcommands) {
Expand Down Expand Up @@ -669,6 +673,13 @@ sds ACLDescribeUser(user *u) {
sds rules = ACLDescribeUserCommandRules(u);
res = sdscatsds(res,rules);
sdsfree(rules);

/* namespace */
if (strcmp(u->ns_name, DefaultUser->ns_name) != 0) {
res = sdscatlen(res, " ", 1);
res = sdscatsds(res, u->ns_name);
}

return res;
}

Expand Down Expand Up @@ -801,6 +812,10 @@ void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub) {
* reset Performs the following actions: resetpass, resetkeys, off,
* -@all. The user returns to the same state it has immediately
* after its creation.
* ::<ns> Sets the namespace for the user.
* :: is the default namespace
* ::auto will generate a new namespace with a random uuid
* (only if the current user is still in the default namespace)
*
* The 'op' string must be null terminated. The 'oplen' argument should
* specify the length of the 'op' string in case the caller requires to pass
Expand Down Expand Up @@ -1000,6 +1015,11 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
errno = ENOENT;
return C_ERR;
}
} else if (op[0] == ':' && op[1] == ':') {
//TODO: implement ::auto
sds ns = sdsnewlen(op,oplen);
sdsfree(u->ns_name);
u->ns_name = ns;
} else if (!strcasecmp(op,"reset")) {
serverAssert(ACLSetUser(u,"resetpass",-1) == C_OK);
serverAssert(ACLSetUser(u,"resetkeys",-1) == C_OK);
Expand Down Expand Up @@ -1114,6 +1134,8 @@ int ACLAuthenticateUser(client *c, robj *username, robj *password) {
if (ACLCheckUserCredentials(username,password) == C_OK) {
c->authenticated = 1;
c->user = ACLGetUserByName((sds)ptrFromObj(username),sdslen((sds)ptrFromObj(username)));
c->ns = getNamespace(c->user->ns_name);
selectDbNamespaced(c, c->db->mapped_id);
moduleNotifyUserChanged(c);
return C_OK;
} else {
Expand Down Expand Up @@ -1969,7 +1991,7 @@ void aclCommand(client *c) {
return;
}

addReplyMapLen(c,5);
addReplyMapLen(c,6);

/* Flags */
addReplyBulkCString(c,"flags");
Expand Down Expand Up @@ -2030,6 +2052,10 @@ void aclCommand(client *c) {
addReplyBulkCBuffer(c,thispat,sdslen(thispat));
}
}

/* namespace */
addReplyBulkCString(c,"namespace");
addReplyBulkCString(c, u->ns_name);
} else if ((!strcasecmp(sub,"list") || !strcasecmp(sub,"users")) &&
c->argc == 2)
{
Expand Down
23 changes: 19 additions & 4 deletions src/aof.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -731,10 +731,14 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a

/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != g_pserver->aof_selected_db) {
char seldb[64];

if (dictid != -1 && dictid != g_pserver->aof_selected_db) {
char seldb[64], mappeddb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
snprintf(mappeddb,sizeof(seldb),"%d",g_pserver->db[dictid].mapped_id);

buf = sdscatprintf(buf, "*4\r\n$8\r\nALLOCATE\r\n$%lu\r\n%s\r\n$%lu\r\n%s\r\n$%lu\r\n%s\r\n",
sdslen(g_pserver->db[dictid].ns->name), g_pserver->db[dictid].ns->name, strlen(seldb), seldb, strlen(mappeddb), mappeddb);

buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
g_pserver->aof_selected_db = dictid;
Expand Down Expand Up @@ -767,7 +771,8 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
struct client *createAOFClient(void) {
struct client *c =(client*) zmalloc(sizeof(*c), MALLOC_LOCAL);

selectDb(c,0);
c->ns = g_pserver->default_namespace;
selectDbNamespaced(c, 0);
c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */
c->conn = NULL;
c->iel = IDX_EVENT_LOOP_MAIN;
Expand Down Expand Up @@ -1546,12 +1551,22 @@ int rewriteAppendOnlyFileRio(rio *aof) {
long long updated_time = 0;

for (j = 0; j < cserver.dbnum; j++) {
char allocatecmd[] = "*4\r\n$8\r\nALLOCATE\r\n";
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = g_pserver->db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
serverAssert(db->ns);
serverAssert(db->mapped_id >= 0);

di = dictGetSafeIterator(d);

/* ALLOCATE DB */
if (rioWrite(aof,allocatecmd,sizeof(allocatecmd)-1) == 0) goto werr;
if (rioWriteBulkString(aof,db->ns->name, sdslen(db->ns->name)) == 0) goto werr;
if (rioWriteBulkLongLong(aof,db->id) == 0) goto werr;
if (rioWriteBulkLongLong(aof,db->mapped_id) == 0) goto werr;

/* SELECT the new DB */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
Expand Down
35 changes: 23 additions & 12 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2196,24 +2196,32 @@ int clusterProcessPacket(clusterLink *link) {
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH) {
robj *channel, *message;
uint32_t channel_len, message_len;
sds ns_name;
redisNamespace *ns;
uint32_t ns_len, channel_len, message_len;

ns_len = ntohl(hdr->data.publish.msg.ns_len);
ns_name = sdsnewlen((char*)hdr->data.publish.msg.bulk_data,ns_len);
ns = getNamespace(ns_name);

/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if (dictSize(g_pserver->pubsub_channels) ||
dictSize(g_pserver->pubsub_patterns))
if (!ns ||
dictSize(ns->pubsub_channels) ||
dictSize(ns->pubsub_patterns))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject(
(char*)hdr->data.publish.msg.bulk_data,channel_len);
(char*)hdr->data.publish.msg.bulk_data+ns_len,channel_len);
message = createStringObject(
(char*)hdr->data.publish.msg.bulk_data+channel_len,
(char*)hdr->data.publish.msg.bulk_data+ns_len+channel_len,
message_len);
pubsubPublishMessage(channel,message);
pubsubPublishMessage(ns,channel,message);
decrRefCount(channel);
decrRefCount(message);
}
sdsfree(ns_name);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender,hdr);
Expand Down Expand Up @@ -2754,22 +2762,24 @@ void clusterBroadcastPong(int target) {
/* Send a PUBLISH message.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
void clusterSendPublish(redisNamespace *ns, clusterLink *link, robj *channel, robj *message) {
unsigned char *payload;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t channel_len, message_len;
uint32_t channel_len, message_len, ns_len;

channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = sdslen(szFromObj(channel));
message_len = sdslen(szFromObj(message));
ns_len = sdslen(ns->name);

clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len + ns_len;

hdr->data.publish.msg.ns_len = htonl(ns_len);
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);
Expand All @@ -2782,7 +2792,8 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
memcpy(payload,hdr,sizeof(*hdr));
hdr = (clusterMsg*) payload;
}
memcpy(hdr->data.publish.msg.bulk_data,ptrFromObj(channel),sdslen(szFromObj(channel)));
memcpy(hdr->data.publish.msg.bulk_data,ns->name,ns_len);
memcpy(hdr->data.publish.msg.bulk_data+ns_len,ptrFromObj(channel),sdslen(szFromObj(channel)));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(szFromObj(channel)),
ptrFromObj(message),sdslen(szFromObj(message)));

Expand Down Expand Up @@ -2888,8 +2899,8 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
* cluster. In the future we'll try to get smarter and avoiding propagating those
* messages to hosts without receives for a given channel.
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message) {
clusterSendPublish(NULL, channel, message);
void clusterPropagatePublish(redisNamespace *ns, robj *channel, robj *message) {
clusterSendPublish(ns, NULL, channel, message);
}

/* -----------------------------------------------------------------------------
Expand Down
1 change: 1 addition & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ typedef struct {
} clusterMsgDataFail;

typedef struct {
uint32_t ns_len;
uint32_t channel_len;
uint32_t message_len;
unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */
Expand Down
7 changes: 6 additions & 1 deletion src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,11 @@ void loadServerConfigFromString(char *config) {
if (port < 0 || port > 65535 || *ptr != '\0') {
err= "Invalid master port"; goto loaderr;
}
replicationAddMaster(argv[1], port);

redisMasterConnInfo *masterInfo = (redisMasterConnInfo*)zmalloc(sizeof(redisMasterConnInfo), MALLOC_LOCAL);
masterInfo->ip = sdsnew(argv[1]);
masterInfo->port = port;
listAddNodeTail(g_pserver->repl_init_masters, masterInfo);
} else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) {
err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN";
Expand Down Expand Up @@ -2685,6 +2689,7 @@ standardConfig configs[] = {

/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("databases-per-namespace", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.ns_dbnum, INT_MAX, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */
Expand Down
Loading