diff --git a/src/acl.cpp b/src/acl.cpp index 07b6c4978..dd06443e6 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -33,6 +33,7 @@ extern "C" { } #include #include +#include /* ============================================================================= * Global state for ACLs @@ -248,6 +249,13 @@ user *ACLCreateUser(const char *name, size_t namelen) { if (raxFind(Users,(unsigned char*)name,namelen) != raxNotFound) return NULL; user *u = (user*)zmalloc(sizeof(*u), MALLOC_LOCAL); u->name = sdsnewlen(name,namelen); + + if (!strcasecmp(cserver.default_user_namespace, "::auto")) { + u->ns_name = generateAutoNamespaceName(); + } else { + u->ns_name = sdsnew(cserver.default_user_namespace); + } + u->flags = USER_FLAG_DISABLED | g_pserver->acl_pubsub_default; u->allowed_subcommands = NULL; u->passwords = listCreate(); @@ -288,6 +296,7 @@ user *ACLCreateUnlinkedUser(void) { * will not remove the user from the Users global radix tree. */ void ACLFreeUser(user *u) { sdsfree(u->name); + sdsfree(u->ns_name); listRelease(u->passwords); listRelease(u->patterns); listRelease(u->channels); @@ -338,6 +347,8 @@ void ACLCopyUser(user *dst, user *src) { memcpy(dst->allowed_commands,src->allowed_commands, sizeof(dst->allowed_commands)); dst->flags = src->flags; + sdsfree(dst->ns_name); + dst->ns_name = sdsdup(src->ns_name); ACLResetSubcommands(dst); /* Copy the allowed subcommands array of array of SDS strings. */ if (src->allowed_subcommands) { @@ -669,6 +680,13 @@ sds ACLDescribeUser(user *u) { sds rules = ACLDescribeUserCommandRules(u); res = sdscatsds(res,rules); sdsfree(rules); + + /* namespace */ + if (strcmp(u->ns_name, cserver.default_user_namespace) != 0) { + res = sdscatlen(res, " ", 1); + res = sdscatsds(res, u->ns_name); + } + return res; } @@ -801,6 +819,9 @@ void ACLAddAllowedSubcommand(user *u, unsigned long id, const char *sub) { * reset Performs the following actions: resetpass, resetkeys, off, * -@all. The user returns to the same state it has immediately * after its creation. + * :: Sets the namespace for the user. + * :: is the default namespace + * ::auto will generate a new namespace with a random uuid * * The 'op' string must be null terminated. The 'oplen' argument should * specify the length of the 'op' string in case the caller requires to pass @@ -1000,6 +1021,14 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { errno = ENOENT; return C_ERR; } + } else if (op[0] == ':' && op[1] == ':') { + if (!strcasecmp(op, "::auto")) { + u->ns_name = generateAutoNamespaceName(); + } else { + sds ns = sdsnewlen(op,oplen); + sdsfree(u->ns_name); + u->ns_name = ns; + } } else if (!strcasecmp(op,"reset")) { serverAssert(ACLSetUser(u,"resetpass",-1) == C_OK); serverAssert(ACLSetUser(u,"resetkeys",-1) == C_OK); @@ -1114,6 +1143,8 @@ int ACLAuthenticateUser(client *c, robj *username, robj *password) { if (ACLCheckUserCredentials(username,password) == C_OK) { c->authenticated = 1; c->user = ACLGetUserByName((sds)ptrFromObj(username),sdslen((sds)ptrFromObj(username))); + c->ns = getNamespace(c->user->ns_name); + selectDbNamespaced(c, c->db->mapped_id); moduleNotifyUserChanged(c); return C_OK; } else { @@ -1969,7 +2000,7 @@ void aclCommand(client *c) { return; } - addReplyMapLen(c,5); + addReplyMapLen(c,6); /* Flags */ addReplyBulkCString(c,"flags"); @@ -2030,6 +2061,10 @@ void aclCommand(client *c) { addReplyBulkCBuffer(c,thispat,sdslen(thispat)); } } + + /* namespace */ + addReplyBulkCString(c,"namespace"); + addReplyBulkCString(c, u->ns_name); } else if ((!strcasecmp(sub,"list") || !strcasecmp(sub,"users")) && c->argc == 2) { diff --git a/src/aof.cpp b/src/aof.cpp index df326ead6..9a129ec2d 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -731,10 +731,14 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ - if (dictid != g_pserver->aof_selected_db) { - char seldb[64]; - + if (dictid != -1 && dictid != g_pserver->aof_selected_db) { + char seldb[64], mappeddb[64]; snprintf(seldb,sizeof(seldb),"%d",dictid); + snprintf(mappeddb,sizeof(seldb),"%d",g_pserver->db[dictid].mapped_id); + + buf = sdscatprintf(buf, "*4\r\n$8\r\nALLOCATE\r\n$%lu\r\n%s\r\n$%lu\r\n%s\r\n$%lu\r\n%s\r\n", + sdslen(g_pserver->db[dictid].ns->name), g_pserver->db[dictid].ns->name, strlen(seldb), seldb, strlen(mappeddb), mappeddb); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", (unsigned long)strlen(seldb),seldb); g_pserver->aof_selected_db = dictid; @@ -767,7 +771,8 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a struct client *createAOFClient(void) { struct client *c =(client*) zmalloc(sizeof(*c), MALLOC_LOCAL); - selectDb(c,0); + c->ns = g_pserver->default_namespace; + selectDbNamespaced(c, 0); c->id = CLIENT_ID_AOF; /* So modules can identify it's the AOF client. */ c->conn = NULL; c->iel = IDX_EVENT_LOOP_MAIN; @@ -1546,12 +1551,22 @@ int rewriteAppendOnlyFileRio(rio *aof) { long long updated_time = 0; for (j = 0; j < cserver.dbnum; j++) { + char allocatecmd[] = "*4\r\n$8\r\nALLOCATE\r\n"; char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = g_pserver->db+j; dict *d = db->dict; if (dictSize(d) == 0) continue; + serverAssert(db->ns); + serverAssert(db->mapped_id >= 0); + di = dictGetSafeIterator(d); + /* ALLOCATE DB */ + if (rioWrite(aof,allocatecmd,sizeof(allocatecmd)-1) == 0) goto werr; + if (rioWriteBulkString(aof,db->ns->name, sdslen(db->ns->name)) == 0) goto werr; + if (rioWriteBulkLongLong(aof,db->id) == 0) goto werr; + if (rioWriteBulkLongLong(aof,db->mapped_id) == 0) goto werr; + /* SELECT the new DB */ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr; diff --git a/src/cluster.cpp b/src/cluster.cpp index 4f1cc7710..fb413ead4 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -2196,24 +2196,32 @@ int clusterProcessPacket(clusterLink *link) { } } else if (type == CLUSTERMSG_TYPE_PUBLISH) { robj *channel, *message; - uint32_t channel_len, message_len; + sds ns_name; + redisNamespace *ns; + uint32_t ns_len, channel_len, message_len; + + ns_len = ntohl(hdr->data.publish.msg.ns_len); + ns_name = sdsnewlen((char*)hdr->data.publish.msg.bulk_data,ns_len); + ns = getNamespace(ns_name); /* Don't bother creating useless objects if there are no * Pub/Sub subscribers. */ - if (dictSize(g_pserver->pubsub_channels) || - dictSize(g_pserver->pubsub_patterns)) + if (!ns || + dictSize(ns->pubsub_channels) || + dictSize(ns->pubsub_patterns)) { channel_len = ntohl(hdr->data.publish.msg.channel_len); message_len = ntohl(hdr->data.publish.msg.message_len); channel = createStringObject( - (char*)hdr->data.publish.msg.bulk_data,channel_len); + (char*)hdr->data.publish.msg.bulk_data+ns_len,channel_len); message = createStringObject( - (char*)hdr->data.publish.msg.bulk_data+channel_len, + (char*)hdr->data.publish.msg.bulk_data+ns_len+channel_len, message_len); - pubsubPublishMessage(channel,message); + pubsubPublishMessage(ns,channel,message); decrRefCount(channel); decrRefCount(message); } + sdsfree(ns_name); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ clusterSendFailoverAuthIfNeeded(sender,hdr); @@ -2754,22 +2762,24 @@ void clusterBroadcastPong(int target) { /* Send a PUBLISH message. * * If link is NULL, then the message is broadcasted to the whole cluster. */ -void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { +void clusterSendPublish(redisNamespace *ns, clusterLink *link, robj *channel, robj *message) { unsigned char *payload; clusterMsg buf[1]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; - uint32_t channel_len, message_len; + uint32_t channel_len, message_len, ns_len; channel = getDecodedObject(channel); message = getDecodedObject(message); channel_len = sdslen(szFromObj(channel)); message_len = sdslen(szFromObj(message)); + ns_len = sdslen(ns->name); clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len; + totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len + ns_len; + hdr->data.publish.msg.ns_len = htonl(ns_len); hdr->data.publish.msg.channel_len = htonl(channel_len); hdr->data.publish.msg.message_len = htonl(message_len); hdr->totlen = htonl(totlen); @@ -2782,7 +2792,8 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message) { memcpy(payload,hdr,sizeof(*hdr)); hdr = (clusterMsg*) payload; } - memcpy(hdr->data.publish.msg.bulk_data,ptrFromObj(channel),sdslen(szFromObj(channel))); + memcpy(hdr->data.publish.msg.bulk_data,ns->name,ns_len); + memcpy(hdr->data.publish.msg.bulk_data+ns_len,ptrFromObj(channel),sdslen(szFromObj(channel))); memcpy(hdr->data.publish.msg.bulk_data+sdslen(szFromObj(channel)), ptrFromObj(message),sdslen(szFromObj(message))); @@ -2888,8 +2899,8 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin * cluster. In the future we'll try to get smarter and avoiding propagating those * messages to hosts without receives for a given channel. * -------------------------------------------------------------------------- */ -void clusterPropagatePublish(robj *channel, robj *message) { - clusterSendPublish(NULL, channel, message); +void clusterPropagatePublish(redisNamespace *ns, robj *channel, robj *message) { + clusterSendPublish(ns, NULL, channel, message); } /* ----------------------------------------------------------------------------- diff --git a/src/cluster.h b/src/cluster.h index db82c3f24..7ea2f8bce 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -209,6 +209,7 @@ typedef struct { } clusterMsgDataFail; typedef struct { + uint32_t ns_len; uint32_t channel_len; uint32_t message_len; unsigned char bulk_data[8]; /* 8 bytes just as placeholder. */ diff --git a/src/config.cpp b/src/config.cpp index aa3b32ebb..20ef79953 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -542,7 +542,11 @@ void loadServerConfigFromString(char *config) { if (port < 0 || port > 65535 || *ptr != '\0') { err= "Invalid master port"; goto loaderr; } - replicationAddMaster(argv[1], port); + + redisMasterConnInfo *masterInfo = (redisMasterConnInfo*)zmalloc(sizeof(redisMasterConnInfo), MALLOC_LOCAL); + masterInfo->ip = sdsnew(argv[1]); + masterInfo->port = port; + listAddNodeTail(g_pserver->repl_init_masters, masterInfo); } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; @@ -2667,7 +2671,7 @@ standardConfig configs[] = { createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->bgsave_cpulist, NULL, NULL, NULL), createStringConfig("ignore-warnings", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->ignore_warnings, "", NULL, NULL), createStringConfig("proc-title-template", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, cserver.proc_title_template, CONFIG_DEFAULT_PROC_TITLE_TEMPLATE, isValidProcTitleTemplate, updateProcTitleTemplate), - + createStringConfig("acl-namespace-default", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, cserver.default_user_namespace, "::", NULL, NULL), //TODO: validation /* SDS Configs */ createSDSConfig("masterauth", NULL, MODIFIABLE_CONFIG | SENSITIVE_CONFIG, EMPTY_STRING_IS_NULL, cserver.default_masterauth, NULL, NULL, updateMasterAuthConfig), createSDSConfig("requirepass", NULL, MODIFIABLE_CONFIG | SENSITIVE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->requirepass, NULL, NULL, updateRequirePass), @@ -2685,6 +2689,7 @@ standardConfig configs[] = { /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.dbnum, 16, INTEGER_CONFIG, NULL, NULL), + createIntConfig("databases-per-namespace", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.ns_dbnum, INT_MAX, INTEGER_CONFIG, NULL, NULL), createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, g_pserver->port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */ createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL), createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->cluster_slave_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* Slave max data age factor. */ diff --git a/src/db.cpp b/src/db.cpp index 88d43eb25..3382b76d7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -494,6 +494,13 @@ long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, } /* Because all keys of database are removed, reset average ttl. */ dbarray[j].avg_ttl = 0; + + if (dbnum == -1 && dbarray[j].ns && g_pserver->db == dbarray) { + dbarray[j].ns->db[dbarray[j].mapped_id] = nullptr; + dbarray[j].mapped_id = -1; + dbarray[j].ns = nullptr; + g_pserver->last_allocated_db = -1; + } } return removed; @@ -643,6 +650,84 @@ int selectDb(client *c, int id) { if (id < 0 || id >= cserver.dbnum) return C_ERR; c->db = &g_pserver->db[id]; + c->ns = c->db->ns; + serverAssertWithInfo(NULL,NULL,c->db->mapped_id >= 0); + serverAssertWithInfo(NULL,NULL,c->db->ns); + return C_OK; +} + +void mapDb(redisNamespace *ns, int global_db, int ns_db, int do_propagate) { + //serverLog(LL_WARNING, "about to map %s %d %d", ns->name, global_db, ns_db); + serverAssertWithInfo(NULL, NULL, global_db >= 0 && global_db < cserver.dbnum); + serverAssertWithInfo(NULL, NULL, ns_db >= 0 && ns_db < std::min(cserver.dbnum, cserver.ns_dbnum)); + serverAssertWithInfo(NULL, NULL, !g_pserver->db[global_db].ns || (g_pserver->db[global_db].ns == ns && g_pserver->db[global_db].mapped_id == ns_db)); + + g_pserver->last_allocated_db = std::max(g_pserver->last_allocated_db, global_db); + g_pserver->db[global_db].ns = ns; + g_pserver->db[global_db].mapped_id = ns_db; + ns->db[ns_db] = &g_pserver->db[global_db]; + + if (do_propagate > 0) { + /* propagate allocation to make sure mapping is the same on all nodes */ + robj *argv[4]; + argv[0] = shared.allocate; + argv[1] = createObject(OBJ_STRING, sdsdup(ns->name)); + argv[2] = createObject(OBJ_STRING, sdsfromlonglong(global_db)); + argv[3] = createObject(OBJ_STRING, sdsfromlonglong(ns_db)); + + serverLog(LL_WARNING, "propagate %s %d %d", ns->name, global_db, ns_db); + if (do_propagate == 1) + propagate(cserver.allocateCommand, -1, argv, 4, PROPAGATE_AOF|PROPAGATE_REPL); + else + alsoPropagate(cserver.allocateCommand, -1, argv, 4, PROPAGATE_AOF|PROPAGATE_REPL); + } +} + +void printDbMap() { + serverLog(LL_NOTICE, "------ database mapping ------"); + for (int i=0; i %d (%s) @ %d keys", i, g_pserver->db[i].mapped_id, + g_pserver->db[i].ns ? g_pserver->db[i].ns->name : "", (int) dictSize(g_pserver->db[i].dict)); + } + serverLog(LL_NOTICE, "------ default ns mapping ------"); + for (int i=0; idefault_namespace->db[i]) { + serverLog(LL_NOTICE, "%d => %d (%s) @ %d keys", i, g_pserver->default_namespace->db[i]->mapped_id, + g_pserver->default_namespace->db[i]->ns ? g_pserver->default_namespace->db[i]->ns->name + : "", + (int) dictSize(g_pserver->default_namespace->db[i]->dict)); + } + } + serverLog(LL_NOTICE, "---- end database mapping ----"); +} + +redisDb *allocateDb(client *c, int id) { + if (id < 0 || id >= std::min(cserver.dbnum, cserver.ns_dbnum)) return nullptr; + + if (!c->ns->db[id]) { + /* allocate database from global pool */ + int found = 0; + for (int i=g_pserver->last_allocated_db+1;idb[i].ns) { + mapDb(c->ns, i, id, 1); + found=1; + break; + } + } + + if (!found) { + serverLog(LL_WARNING, "We ran out of databases!!"); + return nullptr; + } + } + return c->ns->db[id]; +} + +int selectDbNamespaced(client *c, int id) { + redisDb *db = allocateDb(c, id); + if (db == nullptr) return C_ERR; + + c->db = db; return C_OK; } @@ -716,8 +801,17 @@ int getFlushCommandFlags(client *c, int *flags) { } /* Flushes the whole server data set. */ -void flushAllDataAndResetRDB(int flags) { - g_pserver->dirty += emptyDb(-1,flags,NULL); +void flushAllDataAndResetRDB(redisNamespace *ns, int flags) { + if (ns == NULL) { + g_pserver->dirty += emptyDb(-1, flags, NULL); + } else { + for (int i=0; idb[i]) { + g_pserver->dirty += emptyDb(ns->db[i]->id,flags,NULL); + } + } + } + if (g_pserver->child_type == CHILD_TYPE_RDB) killRDBChild(); if (g_pserver->saveparamslen > 0) { /* Normally rdbSave() will reset dirty, but we don't want this here @@ -765,7 +859,7 @@ void flushdbCommand(client *c) { void flushallCommand(client *c) { int flags; if (getFlushCommandFlags(c,&flags) == C_ERR) return; - flushAllDataAndResetRDB(flags); + flushAllDataAndResetRDB(c->ns, flags); addReply(c,shared.ok); } @@ -825,13 +919,48 @@ void selectCommand(client *c) { addReplyError(c,"SELECT is not allowed in cluster mode"); return; } - if (selectDb(c,id) == C_ERR) { + + bool is_replication = ((c->flags & CLIENT_MASTER) || c->id == CLIENT_ID_AOF); + if (is_replication && selectDb(c, id) == C_ERR) { + addReplyError(c,"DB index is out of range"); + } else if (!is_replication && selectDbNamespaced(c, id) == C_ERR) { addReplyError(c,"DB index is out of range"); } else { addReply(c,shared.ok); } } +void selectNsCommand(client *c) { + //TODO: validate namespace name + c->ns = getNamespace(szFromObj(c->argv[2])); + selectCommand(c); +} + +void allocateCommand(client *c) { + int global_db, ns_db; + + bool is_replication = ((c->flags & CLIENT_MASTER) || c->id == CLIENT_ID_AOF); + if (!is_replication) { + addReplyError(c,"ALLOCATE is only allowed for replication"); + return; + } + + //TODO: validate namespace name + redisNamespace *ns = getNamespace(szFromObj(c->argv[1])); + + if (getIntFromObjectOrReply(c, c->argv[2], &global_db, NULL) != C_OK) + return; + + if (getIntFromObjectOrReply(c, c->argv[3], &ns_db, NULL) != C_OK) + return; + + serverLog(LL_WARNING, "allocate %s %d %d", ns->name, global_db, ns_db); + + //TODO: error handling + mapDb(ns, global_db, ns_db, 0); + addReply(c,shared.ok); +} + void randomkeyCommand(client *c) { robj *key; @@ -1259,7 +1388,7 @@ void moveCommand(client *c) { if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) return; - if (selectDb(c,dbid) == C_ERR) { + if (selectDbNamespaced(c, dbid) == C_ERR) { addReplyError(c,"DB index is out of range"); return; } @@ -1334,7 +1463,7 @@ void copyCommand(client *c) { if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK) return; - if (selectDb(c, dbid) == C_ERR) { + if (selectDbNamespaced(c, dbid) == C_ERR) { addReplyError(c,"DB index is out of range"); return; } @@ -1438,11 +1567,13 @@ void scanDatabaseForReadyLists(redisDb *db) { * * Returns C_ERR if at least one of the DB ids are out of range, otherwise * C_OK is returned. */ -int dbSwapDatabases(int id1, int id2) { - if (id1 < 0 || id1 >= cserver.dbnum || - id2 < 0 || id2 >= cserver.dbnum) return C_ERR; +int dbSwapDatabases(client *c, int id1, int id2) { + if (id1 < 0 || id1 >= std::min(cserver.dbnum, cserver.ns_dbnum) || + id2 < 0 || id2 >= std::min(cserver.dbnum, cserver.ns_dbnum)) return C_ERR; if (id1 == id2) return C_OK; - redisDb *db1 = &g_pserver->db[id1], *db2 = &g_pserver->db[id2]; + + redisDb *db1 = allocateDb(c, id1); + redisDb *db2 = allocateDb(c, id2); /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since we want clients to @@ -1492,7 +1623,7 @@ void swapdbCommand(client *c) { return; /* Swap... */ - if (dbSwapDatabases(id1,id2) == C_ERR) { + if (dbSwapDatabases(c, id1,id2) == C_ERR) { addReplyError(c,"DB index is out of range"); return; } else { diff --git a/src/debug.cpp b/src/debug.cpp index 8dc71bd78..a6dcf8af4 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -567,9 +567,11 @@ NULL serverLog(LL_WARNING,"DB reloaded by DEBUG RELOAD"); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"loadaof")) { + serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF started"); if (g_pserver->aof_state != AOF_OFF) flushAppendOnlyFile(1); emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); + int ret = loadAppendOnlyFile(g_pserver->aof_filename); unprotectClient(c); if (ret != C_OK) { @@ -577,7 +579,7 @@ NULL return; } g_pserver->dirty = 0; /* Prevent AOF / replication */ - serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF"); + serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF done"); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"object") && c->argc == 3) { dictEntry *de; @@ -840,18 +842,18 @@ NULL sdsfree(stats); return; } - if (dbid < 0 || dbid >= cserver.dbnum) { + if (dbid < 0 || dbid >= std::min(cserver.dbnum, cserver.ns_dbnum)) { sdsfree(stats); addReplyError(c,"Out of range database"); return; } stats = sdscatprintf(stats,"[Dictionary HT]\n"); - dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].dict); + dictGetStats(buf,sizeof(buf),c->ns->db[dbid]->dict); stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires set]\n"); - g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf)); + c->ns->db[dbid]->setexpire->getstats(buf, sizeof(buf)); stats = sdscat(stats, buf); addReplyVerbatim(c,stats,sdslen(stats),"txt"); @@ -972,6 +974,7 @@ void _serverAssertPrintClientInfo(const client *c) { bugReportStart(); serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ==="); + serverLog(LL_WARNING,"client->id = %llu", (unsigned long long) c->id); serverLog(LL_WARNING,"client->flags = %llu", (unsigned long long) c->flags); serverLog(LL_WARNING,"client->conn = %s", connGetInfo(c->conn, conninfo, sizeof(conninfo))); serverLog(LL_WARNING,"client->argc = %d", c->argc); @@ -1035,6 +1038,7 @@ void _serverAssertPrintObject(robj_roptr o) { void _serverAssertWithInfo(const client *c, robj_roptr o, const char *estr, const char *file, int line) { if (c) _serverAssertPrintClientInfo(c); if (o) _serverAssertPrintObject(o); + printDbMap(); _serverAssert(estr,file,line); } diff --git a/src/defrag.cpp b/src/defrag.cpp index b693e29bb..0463e8fc7 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -981,8 +981,19 @@ long defragOtherGlobals() { /* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc. * but we assume most of these are short lived, we only need to defrag allocations * that remain static for a long time */ - defragged += activeDefragSdsDict(g_pserver->lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB); - defragged += activeDefragSdsListAndDict(g_pserver->repl_scriptcache_fifo, g_pserver->repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL); + + redisNamespace *ns; + dictEntry *de; + dictIterator *di; + di = dictGetSafeIterator(g_pserver->namespaces); + while((de = dictNext(di)) != NULL) { + ns = (struct redisNamespace *) dictGetVal(de); + defragged += activeDefragSdsDict(ns->lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB); + defragged += activeDefragSdsListAndDict(ns->repl_scriptcache_fifo, ns->repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL); + } + dictReleaseIterator(di); + + defragged += activeDefragSdsDict(g_pserver->namespaces, DEFRAG_SDS_DICT_VAL_VOID_PTR); defragged += moduleDefragGlobals(); return defragged; } diff --git a/src/module.cpp b/src/module.cpp index fb2a66fc1..968afc8e2 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -2097,10 +2097,9 @@ int RM_GetClientInfoById(void *ci, uint64_t id) { /* Publish a message to subscribers (see PUBLISH command). */ int RM_PublishMessage(RedisModuleCtx *ctx, RedisModuleString *channel, RedisModuleString *message) { - UNUSED(ctx); - int receivers = pubsubPublishMessage(channel, message); + int receivers = pubsubPublishMessage(ctx->client->ns, channel, message); if (g_pserver->cluster_enabled) - clusterPropagatePublish(channel, message); + clusterPropagatePublish(ctx->client->ns, channel, message); return receivers; } @@ -2293,7 +2292,7 @@ int RM_AvoidReplicaTraffic() { * returns back to the original one, it should call RedisModule_GetSelectedDb() * before in order to restore the old DB number before returning. */ int RM_SelectDb(RedisModuleCtx *ctx, int newid) { - int retval = selectDb(ctx->client,newid); + int retval = selectDbNamespaced(ctx->client,newid); return (retval == C_OK) ? REDISMODULE_OK : REDISMODULE_ERR; } @@ -2513,7 +2512,7 @@ int RM_SetAbsExpire(RedisModuleKey *key, mstime_t expire) { * When async is set to true, db contents will be freed by a background thread. */ void RM_ResetDataset(int restart_aof, int async) { if (restart_aof && g_pserver->aof_state != AOF_OFF) stopAppendOnly(); - flushAllDataAndResetRDB(async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); + flushAllDataAndResetRDB(NULL, async? EMPTYDB_ASYNC: EMPTYDB_NO_FLAGS); if (g_pserver->aof_enabled && restart_aof) restartAOFAfterSYNC(); } @@ -4215,6 +4214,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* We do not want to allow block, the module do not expect it */ c->flags |= CLIENT_DENY_BLOCKING; c->db = ctx->client->db; + c->ns = ctx->client->ns; c->argv = argv; c->argc = argc; if (ctx->module) ctx->module->in_call++; @@ -8527,7 +8527,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data) { * up to the specific event setup to change it when it makes * sense. For instance for FLUSHDB events we select the correct * DB automatically. */ - selectDb(ctx.client, 0); + selectDbNamespaced(ctx.client, 0); /* Event specific context and data pointer setup. */ if (eid == REDISMODULE_EVENT_CLIENT_CHANGE) { diff --git a/src/networking.cpp b/src/networking.cpp index b9307e18a..2ca8d7df0 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -140,12 +140,12 @@ client *createClient(connection *conn, int iel) { connSetPrivateData(conn, c); } - selectDb(c,0); uint64_t client_id; client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; fastlock_init(&c->lock, "client"); c->id = client_id; + c->ns = g_pserver->default_namespace; c->resp = 2; c->conn = conn; c->name = NULL; @@ -172,6 +172,7 @@ client *createClient(connection *conn, int iel) { /* If the default user does not require authentication, the user is * directly authenticated. */ clientSetDefaultAuth(c); + selectDbNamespaced(c,0); c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; @@ -2732,7 +2733,7 @@ sds catClientInfoString(sds s, client *client) { total_mem += zmalloc_size(client->argv); return sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i(%i) sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I ns=%s", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2741,6 +2742,7 @@ sds catClientInfoString(sds s, client *client) { (long long)(g_pserver->unixtime - client->ctime), (long long)(g_pserver->unixtime - client->lastinteraction), flags, + client->db->mapped_id, client->db->id, (int) dictSize(client->pubsub_channels), (int) listLength(client->pubsub_patterns), @@ -2755,7 +2757,8 @@ sds catClientInfoString(sds s, client *client) { events, client->lastcmd ? client->lastcmd->name : "NULL", client->user ? client->user->name : "(superuser)", - (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1); + (client->flags & CLIENT_TRACKING) ? (long long) client->client_tracking_redirection : -1, + client->ns ? client->ns->name : "(unset)"); } sds getAllClientsInfoString(int type) { @@ -2836,7 +2839,7 @@ void resetCommand(client *c) { } if (c->flags & CLIENT_TRACKING) disableTracking(c); - selectDb(c,0); + c->resp = 2; clientSetDefaultAuth(c); @@ -2846,6 +2849,9 @@ void resetCommand(client *c) { pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllPatterns(c,0); + c->ns = g_pserver->default_namespace; + selectDbNamespaced(c,0); + if (c->name) { decrRefCount(c->name); c->name = NULL; diff --git a/src/notify.cpp b/src/notify.cpp index fa3133bd1..fe686a2fc 100644 --- a/src/notify.cpp +++ b/src/notify.cpp @@ -101,6 +101,7 @@ sds keyspaceEventsFlagsToString(int flags) { void notifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { sds chan; robj *chanobj, *eventobj; + redisDb *db = g_pserver->db + dbid; int len = -1; char buf[24]; @@ -108,7 +109,7 @@ void notifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { * This bypasses the notifications configuration, but the module engine * will only call event subscribers if the event type matches the types * they are interested in. */ - moduleNotifyKeyspaceEvent(type, event, key, dbid); + moduleNotifyKeyspaceEvent(type, event, key, dbid); /* If notifications for this class of events are off, return ASAP. */ if (!(g_pserver->notify_keyspace_events & type)) return; @@ -118,24 +119,24 @@ void notifyKeyspaceEvent(int type, const char *event, robj *key, int dbid) { /* __keyspace@__: notifications. */ if (g_pserver->notify_keyspace_events & NOTIFY_KEYSPACE) { chan = sdsnewlen("__keyspace@",11); - len = ll2string(buf,sizeof(buf),dbid); + len = ll2string(buf,sizeof(buf), db->mapped_id); chan = sdscatlen(chan, buf, len); chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, szFromObj(key)); chanobj = createObject(OBJ_STRING, chan); - pubsubPublishMessage(chanobj, eventobj); + pubsubPublishMessage(db->ns, chanobj, eventobj); decrRefCount(chanobj); } /* __keyevent@__: notifications. */ if (g_pserver->notify_keyspace_events & NOTIFY_KEYEVENT) { chan = sdsnewlen("__keyevent@",11); - if (len == -1) len = ll2string(buf,sizeof(buf),dbid); + if (len == -1) len = ll2string(buf,sizeof(buf),db->mapped_id); chan = sdscatlen(chan, buf, len); chan = sdscatlen(chan, "__:", 3); chan = sdscatsds(chan, szFromObj(eventobj)); chanobj = createObject(OBJ_STRING, chan); - pubsubPublishMessage(chanobj, key); + pubsubPublishMessage(db->ns, chanobj, key); decrRefCount(chanobj); } decrRefCount(eventobj); diff --git a/src/object.cpp b/src/object.cpp index c7496f222..23c23faa5 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1112,15 +1112,26 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->aof_buffer = mem; mem_total+=mem; - mem = g_pserver->lua_scripts_mem; - mem += dictSize(g_pserver->lua_scripts) * sizeof(dictEntry) + - dictSlots(g_pserver->lua_scripts) * sizeof(dictEntry*); - mem += dictSize(g_pserver->repl_scriptcache_dict) * sizeof(dictEntry) + - dictSlots(g_pserver->repl_scriptcache_dict) * sizeof(dictEntry*); - if (listLength(g_pserver->repl_scriptcache_fifo) > 0) { - mem += listLength(g_pserver->repl_scriptcache_fifo) * (sizeof(listNode) + - sdsZmallocSize((sds)listNodeValue(listFirst(g_pserver->repl_scriptcache_fifo)))); + mem = 0; + + redisNamespace *ns; + dictEntry *de; + dictIterator *di; + di = dictGetSafeIterator(g_pserver->namespaces); + while((de = dictNext(di)) != NULL) { + ns = (struct redisNamespace *) dictGetVal(de); + mem += ns->lua_scripts_mem; + mem += dictSize(ns->lua_scripts) * sizeof(dictEntry) + + dictSlots(ns->lua_scripts) * sizeof(dictEntry*); + mem += dictSize(ns->repl_scriptcache_dict) * sizeof(dictEntry) + + dictSlots(ns->repl_scriptcache_dict) * sizeof(dictEntry*); + if (listLength(ns->repl_scriptcache_fifo) > 0) { + mem += listLength(ns->repl_scriptcache_fifo) * (sizeof(listNode) + + sdsZmallocSize((sds)listNodeValue(listFirst(ns->repl_scriptcache_fifo)))); + } } + dictReleaseIterator(di); + mh->lua_caches = mem; mem_total+=mem; @@ -1234,7 +1245,8 @@ sds getMemoryDoctorReport(void) { } /* Too many scripts are cached? */ - if (dictSize(g_pserver->lua_scripts) > 1000) { + //TODO: all namespaces + if (dictSize(g_pserver->default_namespace->lua_scripts) > 1000) { many_scripts = 1; num_reports++; } diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 76bf8d22a..a4c5eb561 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -145,10 +145,10 @@ int pubsubSubscribeChannel(client *c, robj *channel) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ - de = dictFind(g_pserver->pubsub_channels,channel); + de = dictFind(c->ns->pubsub_channels,channel); if (de == NULL) { clients = listCreate(); - dictAdd(g_pserver->pubsub_channels,channel,clients); + dictAdd(c->ns->pubsub_channels,channel,clients); incrRefCount(channel); } else { clients = (list*)dictGetVal(de); @@ -174,7 +174,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ - de = dictFind(g_pserver->pubsub_channels,channel); + de = dictFind(c->ns->pubsub_channels,channel); serverAssertWithInfo(c,NULL,de != NULL); clients = (list*)dictGetVal(de); ln = listSearchKey(clients,c); @@ -184,7 +184,7 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) { /* Free the list and associated hash entry at all if this was * the latest client, so that it will be possible to abuse * Redis PUBSUB creating millions of channels. */ - dictDelete(g_pserver->pubsub_channels,channel); + dictDelete(c->ns->pubsub_channels,channel); } } /* Notify the client */ @@ -205,10 +205,10 @@ int pubsubSubscribePattern(client *c, robj *pattern) { listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); /* Add the client to the pattern -> list of clients hash table */ - de = dictFind(g_pserver->pubsub_patterns,pattern); + de = dictFind(c->ns->pubsub_patterns,pattern); if (de == NULL) { clients = listCreate(); - dictAdd(g_pserver->pubsub_patterns,pattern,clients); + dictAdd(c->ns->pubsub_patterns,pattern,clients); incrRefCount(pattern); } else { clients = (list*)dictGetVal(de); @@ -233,7 +233,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { retval = 1; listDelNode(c->pubsub_patterns,ln); /* Remove the client from the pattern -> clients list hash table */ - de = dictFind(g_pserver->pubsub_patterns,pattern); + de = dictFind(c->ns->pubsub_patterns,pattern); serverAssertWithInfo(c,NULL,de != NULL); clients = (list*)dictGetVal(de); ln = listSearchKey(clients,c); @@ -242,7 +242,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client. */ - dictDelete(g_pserver->pubsub_patterns,pattern); + dictDelete(c->ns->pubsub_patterns,pattern); } } /* Notify the client */ @@ -291,8 +291,9 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) { } /* Publish a message */ -int pubsubPublishMessage(robj *channel, robj *message) { +int pubsubPublishMessage(redisNamespace *ns, robj *channel, robj *message) { serverAssert(GlobalLocksAcquired()); + serverAssert(ns); int receivers = 0; dictEntry *de; dictIterator *di; @@ -300,7 +301,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { listIter li; /* Send to clients listening for that channel */ - de = dictFind(g_pserver->pubsub_channels,channel); + de = dictFind(ns->pubsub_channels,channel); if (de) { list *list = reinterpret_cast<::list*>(dictGetVal(de)); listNode *ln; @@ -320,7 +321,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { } } /* Send to clients listening to matching channels */ - di = dictGetIterator(g_pserver->pubsub_patterns); + di = dictGetIterator(ns->pubsub_patterns); if (di) { channel = getDecodedObject(channel); while((de = dictNext(di)) != NULL) { @@ -423,9 +424,9 @@ void punsubscribeCommand(client *c) { /* PUBLISH */ void publishCommand(client *c) { - int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]); + int receivers = pubsubPublishMessage(c->ns,c->argv[1],c->argv[2]); if (g_pserver->cluster_enabled) - clusterPropagatePublish(c->argv[1],c->argv[2]); + clusterPropagatePublish(c->ns,c->argv[1],c->argv[2]); else forceCommandPropagation(c,PROPAGATE_REPL); addReplyLongLong(c,receivers); @@ -450,7 +451,7 @@ NULL { /* PUBSUB CHANNELS [] */ sds pat = (c->argc == 2) ? NULL : szFromObj(c->argv[2]); - dictIterator *di = dictGetIterator(g_pserver->pubsub_channels); + dictIterator *di = dictGetIterator(c->ns->pubsub_channels); dictEntry *de; long mblen = 0; void *replylen; @@ -475,14 +476,14 @@ NULL addReplyArrayLen(c,(c->argc-2)*2); for (j = 2; j < c->argc; j++) { - list *l = (list*)dictFetchValue(g_pserver->pubsub_channels,c->argv[j]); + list *l = (list*)dictFetchValue(c->ns->pubsub_channels,c->argv[j]); addReplyBulk(c,c->argv[j]); addReplyLongLong(c,l ? listLength(l) : 0); } } else if (!strcasecmp(szFromObj(c->argv[1]),"numpat") && c->argc == 2) { /* PUBSUB NUMPAT */ - addReplyLongLong(c,dictSize(g_pserver->pubsub_patterns)); + addReplyLongLong(c,dictSize(c->ns->pubsub_patterns)); } else { addReplySubcommandSyntaxError(c); } diff --git a/src/rdb.cpp b/src/rdb.cpp index dfeaaa54b..f716a754b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1283,6 +1283,7 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { * error. */ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; + dictIterator *di_ns = NULL; dictEntry *de; char magic[10]; uint64_t cksum; @@ -1302,7 +1303,8 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; dict *d = db->dict; - if (dictSize(d) == 0) continue; + if (db->mapped_id < 0) continue; + serverAssert(db->ns); di = dictGetSafeIterator(d); /* Write the SELECT DB opcode */ @@ -1316,7 +1318,12 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; - + + if (rdbSaveAuxFieldStrStr(rdb,"keydb-namespace", db->ns->name) == -1) goto werr; + if (rdbSaveAuxFieldStrInt(rdb,"keydb-namespace-dbid", db->mapped_id) == -1) goto werr; + + if (db_size == 0) continue; + /* Iterate this DB writing every entry */ size_t ckeysExpired = 0; while((de = dictNext(di)) != NULL) { @@ -1349,15 +1356,27 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { * the script cache as well: on successful PSYNC after a restart, we need * to be able to process any EVALSHA inside the replication backlog the * master will send us. */ - if (rsi && dictSize(g_pserver->lua_scripts)) { - di = dictGetIterator(g_pserver->lua_scripts); - while((de = dictNext(di)) != NULL) { - robj *body = (robj*)dictGetVal(de); - if (rdbSaveAuxField(rdb,"lua",3,szFromObj(body),sdslen(szFromObj(body))) == -1) - goto werr; + if (rsi) { + redisNamespace *ns; + dictEntry *de_ns; + di_ns = dictGetSafeIterator(g_pserver->namespaces); + while((de_ns = dictNext(di_ns)) != NULL) { + ns = (struct redisNamespace *) dictGetVal(de_ns); + if (dictSize(ns->lua_scripts)) { + di = dictGetIterator(ns->lua_scripts); + while ((de = dictNext(di)) != NULL) { + robj *body = (robj *) dictGetVal(de); + if (rdbSaveAuxField(rdb, "keydb-namespace-lua", 19, ns->name, sdslen(ns->name)) == -1) + goto werr; + if (rdbSaveAuxField(rdb, "lua", 3, szFromObj(body), sdslen(szFromObj(body))) == -1) + goto werr; + } + dictReleaseIterator(di); + di = NULL; /* So that we don't release it again on error. */ + } } - dictReleaseIterator(di); - di = NULL; /* So that we don't release it again on error. */ + dictReleaseIterator(di_ns); + di_ns = NULL; } if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; @@ -1375,6 +1394,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { werr: if (error) *error = errno; if (di) dictReleaseIterator(di); + if (di_ns) dictReleaseIterator(di_ns); return C_ERR; } @@ -2551,6 +2571,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { bool fLastKeyExpired = false; int error; long long empty_keys_skipped = 0, expired_keys_skipped = 0, keys_loaded = 0; + client *lua_client = createClient(nullptr, IDX_EVENT_LOOP_MAIN); rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; @@ -2618,6 +2639,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { exit(1); } db = g_pserver->db+dbid; + db->ns = g_pserver->default_namespace; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently @@ -2657,7 +2679,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10); } else if (!strcasecmp(szFromObj(auxkey),"lua")) { /* Load the script back in memory. */ - if (luaCreateFunction(NULL,g_pserver->lua,auxval) == NULL) { + if (luaCreateFunction(lua_client, lua_client->ns->lua, auxval) == NULL) { rdbReportCorruptRDB( "Can't load Lua script from RDB file! " "BODY: %s", (char*)ptrFromObj(auxval)); @@ -2710,6 +2732,25 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(subexpireKey); subexpireKey = nullptr; } + } else if (!strcasecmp(szFromObj(auxkey), "keydb-namespace")) { + db->ns = getNamespace(szFromObj(auxval)); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-namespace-lua")) { + lua_client->ns = getNamespace(szFromObj(auxval)); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-namespace-dbid")) { + long long max_db = std::min(cserver.dbnum,cserver.ns_dbnum); + long long mapped_id = strtoll(szFromObj(auxval),NULL,10); + serverAssert(db->ns); + serverAssert(mapped_id >= 0); + if (mapped_id >= max_db) { + serverLog(LL_WARNING, + "FATAL: Data file was created with a KeyDB " + "server configured to handle more than %d " + "namespaced databases. Exiting\n", (int)max_db); + exit(1); + } + + db->mapped_id = (int) mapped_id; + db->ns->db[db->mapped_id] = db; } else { /* We ignore fields we don't understand, as by AUX field * contract. */ diff --git a/src/replication.cpp b/src/replication.cpp index 45143005d..5c13813f1 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3285,7 +3285,8 @@ void freeMasterInfo(redisMaster *mi) zfree(mi->masteruser); if (mi->repl_transfer_tmpfile) zfree(mi->repl_transfer_tmpfile); - delete mi->staleKeyMap; + if (mi->staleKeyMap != nullptr) + delete mi->staleKeyMap; if (mi->cached_master != nullptr) freeClientAsync(mi->cached_master); if (mi->master != nullptr) @@ -3792,10 +3793,10 @@ void refreshGoodSlavesCount(void) { */ /* Initialize the script cache, only called at startup. */ -void replicationScriptCacheInit(void) { - g_pserver->repl_scriptcache_size = 10000; - g_pserver->repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); - g_pserver->repl_scriptcache_fifo = listCreate(); +void replicationScriptCacheInit(redisNamespace *ns) { + ns->repl_scriptcache_size = 10000; + ns->repl_scriptcache_dict = dictCreate(&replScriptCacheDictType,NULL); + ns->repl_scriptcache_fifo = listCreate(); } /* Empty the script cache. Should be called every time we are no longer sure @@ -3810,38 +3811,49 @@ void replicationScriptCacheInit(void) { * to reclaim otherwise unused memory. */ void replicationScriptCacheFlush(void) { - dictEmpty(g_pserver->repl_scriptcache_dict,NULL); - listRelease(g_pserver->repl_scriptcache_fifo); - g_pserver->repl_scriptcache_fifo = listCreate(); + redisNamespace *ns; + dictEntry *de; + dictIterator *di; + + di = dictGetSafeIterator(g_pserver->namespaces); + while((de = dictNext(di)) != NULL) { + ns = (struct redisNamespace *) dictGetVal(de); + if (listLength(ns->repl_scriptcache_fifo) != 0) { + dictEmpty(ns->repl_scriptcache_dict, NULL); + listRelease(ns->repl_scriptcache_fifo); + ns->repl_scriptcache_fifo = listCreate(); + } + } + dictReleaseIterator(di); } /* Add an entry into the script cache, if we reach max number of entries the * oldest is removed from the list. */ -void replicationScriptCacheAdd(sds sha1) { +void replicationScriptCacheAdd(redisNamespace *ns, sds sha1) { int retval; sds key = sdsdup(sha1); /* Evict oldest. */ - if (listLength(g_pserver->repl_scriptcache_fifo) == g_pserver->repl_scriptcache_size) + if (listLength(ns->repl_scriptcache_fifo) == ns->repl_scriptcache_size) { - listNode *ln = listLast(g_pserver->repl_scriptcache_fifo); + listNode *ln = listLast(ns->repl_scriptcache_fifo); sds oldest = (sds)listNodeValue(ln); - retval = dictDelete(g_pserver->repl_scriptcache_dict,oldest); + retval = dictDelete(ns->repl_scriptcache_dict,oldest); serverAssert(retval == DICT_OK); - listDelNode(g_pserver->repl_scriptcache_fifo,ln); + listDelNode(ns->repl_scriptcache_fifo,ln); } /* Add current. */ - retval = dictAdd(g_pserver->repl_scriptcache_dict,key,NULL); - listAddNodeHead(g_pserver->repl_scriptcache_fifo,key); + retval = dictAdd(ns->repl_scriptcache_dict,key,NULL); + listAddNodeHead(ns->repl_scriptcache_fifo,key); serverAssert(retval == DICT_OK); } /* Returns non-zero if the specified entry exists inside the cache, that is, * if all the slaves are aware of this script SHA1. */ -int replicationScriptCacheExists(sds sha1) { - return dictFind(g_pserver->repl_scriptcache_dict,sha1) != NULL; +int replicationScriptCacheExists(redisNamespace *ns, sds sha1) { + return dictFind(ns->repl_scriptcache_dict,sha1) != NULL; } /* ----------------------- SYNCHRONOUS REPLICATION -------------------------- @@ -4215,8 +4227,7 @@ void replicationCron(void) { * free our Replication Script Cache as there is no need to propagate * EVALSHA at all. */ if (listLength(g_pserver->slaves) == 0 && - g_pserver->aof_state == AOF_OFF && - listLength(g_pserver->repl_scriptcache_fifo) != 0) + g_pserver->aof_state == AOF_OFF) { replicationScriptCacheFlush(); } @@ -4697,6 +4708,7 @@ void replicaReplayCommand(client *c) cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->user = c->user; + cFake->ns = c->ns; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; diff --git a/src/scripting.cpp b/src/scripting.cpp index edf989706..7574725f0 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -605,6 +605,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { c->argv = argv; c->argc = argc; c->user = g_pserver->lua_caller->user; + c->ns = g_pserver->lua_caller->ns; /* Process module hooks */ moduleCallCommandFilters(c); @@ -1144,31 +1145,31 @@ void scriptingEnableGlobalsProtection(lua_State *lua) { * in order to reset the Lua scripting environment. * * However it is simpler to just call scriptingReset() that does just that. */ -void scriptingInit(int setup) { - lua_State *lua = lua_open(); +void scriptingSetup() { - if (setup) { - for (int iel = 0; iel < cserver.cthreads; ++iel) - { - g_pserver->rgthreadvar[iel].lua_client = createClient(nullptr, iel); - g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_LUA; - /* We do not want to allow blocking commands inside Lua */ - g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_DENY_BLOCKING; - } - g_pserver->lua_timedout = 0; - g_pserver->lua_caller = NULL; - g_pserver->lua_cur_script = NULL; - ldbInit(); + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + g_pserver->rgthreadvar[iel].lua_client = createClient(nullptr, iel); + g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_LUA; + /* We do not want to allow blocking commands inside Lua */ + g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_DENY_BLOCKING; } + g_pserver->lua_timedout = 0; + g_pserver->lua_caller = NULL; + g_pserver->lua_cur_script = NULL; + ldbInit(); +} +void scriptingInit(redisNamespace *ns) { + lua_State *lua = lua_open(); luaLoadLibraries(lua); luaRemoveUnsupportedFunctions(lua); /* Initialize a dictionary we use to map SHAs to scripts. * This is useful for replication, as we need to replicate EVALSHA * as EVAL, so we need to remember the associated script. */ - g_pserver->lua_scripts = dictCreate(&shaScriptObjectDictType,NULL); - g_pserver->lua_scripts_mem = 0; + ns->lua_scripts = dictCreate(&shaScriptObjectDictType,NULL); + ns->lua_scripts_mem = 0; /* Register the redis commands table and fields */ lua_newtable(lua); @@ -1316,23 +1317,23 @@ void scriptingInit(int setup) { * to global variables. */ scriptingEnableGlobalsProtection(lua); - g_pserver->lua = lua; + ns->lua = lua; } /* Release resources related to Lua scripting. * This function is used in order to reset the scripting environment. */ -void scriptingRelease(int async) { +void scriptingRelease(redisNamespace *ns, int async) { if (async) - freeLuaScriptsAsync(g_pserver->lua_scripts); + freeLuaScriptsAsync(ns->lua_scripts); else - dictRelease(g_pserver->lua_scripts); - g_pserver->lua_scripts_mem = 0; - lua_close(g_pserver->lua); + dictRelease(ns->lua_scripts); + ns->lua_scripts_mem = 0; + lua_close(ns->lua); } -void scriptingReset(int async) { - scriptingRelease(async); - scriptingInit(0); +void scriptingReset(redisNamespace *ns, int async) { + scriptingRelease(ns, async); + scriptingInit(ns); } /* Set an array of Redis String Objects as a Lua array (table) stored into a @@ -1421,7 +1422,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) { sha1hex(funcname+2,(char*)ptrFromObj(body),sdslen((sds)ptrFromObj(body))); sds sha = sdsnewlen(funcname+2,40); - if ((de = dictFind(g_pserver->lua_scripts,sha)) != NULL) { + if ((de = dictFind(c->ns->lua_scripts,sha)) != NULL) { sdsfree(sha); return (sds)dictGetKey(de); } @@ -1459,9 +1460,9 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) { /* We also save a SHA1 -> Original script map in a dictionary * so that we can replicate / write in the AOF all the * EVALSHA commands as EVAL using the original script. */ - int retval = dictAdd(g_pserver->lua_scripts,sha,body); + int retval = dictAdd(c->ns->lua_scripts,sha,body); serverAssertWithInfo(c ? c : serverTL->lua_client,NULL,retval == DICT_OK); - g_pserver->lua_scripts_mem += sdsZmallocSize(sha) + getStringObjectSdsUsedMemory(body); + c->ns->lua_scripts_mem += sdsZmallocSize(sha) + getStringObjectSdsUsedMemory(body); incrRefCount(body); return sha; } @@ -1522,7 +1523,7 @@ void resetLuaClient(void) { } void evalGenericCommand(client *c, int evalsha) { - lua_State *lua = g_pserver->lua; + lua_State *lua = c->ns->lua; char funcname[43]; long long numkeys; long long initial_server_dirty = g_pserver->dirty; @@ -1626,7 +1627,7 @@ void evalGenericCommand(client *c, int evalsha) { lua_sethook(lua,luaMaskCountHook,LUA_MASKCOUNT,100000); delhook = 1; } else if (ldb.active) { - lua_sethook(g_pserver->lua,luaLdbLineHook,LUA_MASKLINE|LUA_MASKCOUNT,100000); + lua_sethook(c->ns->lua,luaLdbLineHook,LUA_MASKLINE|LUA_MASKCOUNT,100000); delhook = 1; } @@ -1709,13 +1710,13 @@ void evalGenericCommand(client *c, int evalsha) { * flush our cache of scripts that can be replicated as EVALSHA, while * for AOF we need to do so every time we rewrite the AOF file. */ if (evalsha && !g_pserver->lua_replicate_commands) { - if (!replicationScriptCacheExists((sds)ptrFromObj(c->argv[1]))) { + if (!replicationScriptCacheExists(c->ns, (sds)ptrFromObj(c->argv[1]))) { /* This script is not in our script cache, replicate it as * EVAL, then add it into the script cache, as from now on * slaves and AOF know about it. */ - robj *script = (robj*)dictFetchValue(g_pserver->lua_scripts,ptrFromObj(c->argv[1])); + robj *script = (robj*)dictFetchValue(c->ns->lua_scripts,ptrFromObj(c->argv[1])); - replicationScriptCacheAdd((sds)ptrFromObj(c->argv[1])); + replicationScriptCacheAdd(c->ns, (sds)ptrFromObj(c->argv[1])); serverAssertWithInfo(c,NULL,script != NULL); /* If the script did not produce any changes in the dataset we want @@ -1798,7 +1799,7 @@ NULL addReplyError(c,"SCRIPT FLUSH only support SYNC|ASYNC option"); return; } - scriptingReset(async); + scriptingReset(c->ns, async); addReply(c,shared.ok); replicationScriptCacheFlush(); g_pserver->dirty++; /* Propagating this command is a good idea. */ @@ -1807,13 +1808,13 @@ NULL addReplyArrayLen(c, c->argc-2); for (j = 2; j < c->argc; j++) { - if (dictFind(g_pserver->lua_scripts,ptrFromObj(c->argv[j]))) + if (dictFind(c->ns->lua_scripts,ptrFromObj(c->argv[j]))) addReply(c,shared.cone); else addReply(c,shared.czero); } } else if (c->argc == 3 && !strcasecmp((const char*)ptrFromObj(c->argv[1]),"load")) { - sds sha = luaCreateFunction(c,g_pserver->lua,c->argv[2]); + sds sha = luaCreateFunction(c, c->ns->lua,c->argv[2]); if (sha == NULL) return; /* The error was sent by luaCreateFunction(). */ addReplyBulkCBuffer(c,sha,40); forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF); @@ -1822,6 +1823,8 @@ NULL addReplyError(c,"-NOTBUSY No scripts in execution right now."); } else if (g_pserver->lua_caller->flags & CLIENT_MASTER) { addReplyError(c,"-UNKILLABLE The busy script was sent by a master instance in the context of replication and cannot be killed."); + } else if (g_pserver->lua_caller->ns != c->ns) { + addReplyError(c,"-UNKILLABLE Script is not yours to kill"); } else if (g_pserver->lua_write_dirty) { addReplyError(c,"-UNKILLABLE Sorry the script already executed write commands against the dataset. You can either wait the script termination or kill the server in a hard way using the SHUTDOWN NOSAVE command."); } else { diff --git a/src/sentinel.cpp b/src/sentinel.cpp index 42bbdc396..2115d2b33 100644 --- a/src/sentinel.cpp +++ b/src/sentinel.cpp @@ -734,7 +734,7 @@ void sentinelEvent(int level, const char *type, sentinelRedisInstance *ri, if (level != LL_DEBUG) { channel = createStringObject(type,strlen(type)); payload = createStringObject(msg,strlen(msg)); - pubsubPublishMessage(channel,payload); + pubsubPublishMessage(g_pserver->default_namespace, channel,payload); decrRefCount(channel); decrRefCount(payload); } diff --git a/src/server.cpp b/src/server.cpp index 7dc38179a..2f169efab 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1169,7 +1169,15 @@ struct redisCommand redisCommandTable[] = { {"failover",failoverCommand,-1, "admin no-script ok-stale", - 0,NULL,0,0,0,0,0,0} + 0,NULL,0,0,0,0,0,0}, + + {"allocate",allocateCommand,4, + "ok-loading fast ok-stale @keyspace", + 0,NULL,0,0,0,0,0,0}, + + {"selectns",selectNsCommand,3, + "admin ok-loading fast ok-stale @keyspace", + 0,NULL,0,0,0,0,0,0}, }; /*============================ Utility functions ============================ */ @@ -1602,6 +1610,18 @@ dictType modulesDictType = { NULL /* allow to expand */ }; +/* Namespaces dictionary type. Keys are namespace name, + * values are pointer to redisNamespace struct. */ +dictType namespaceDictType = { + dictSdsCaseHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCaseCompare, /* key compare */ + dictSdsDestructor, /* key destructor */ + NULL, /* val destructor */ + NULL /* allow to expand */ +}; + /* Migrate cache dict type. */ dictType migrateCacheDictType = { dictSdsHash, /* hash function */ @@ -2104,8 +2124,17 @@ void cronUpdateMemoryStats() { /* LUA memory isn't part of zmalloc_used, but it is part of the process RSS, * so we must deduct it in order to be able to calculate correct * "allocator fragmentation" ratio */ - size_t lua_memory = lua_gc(g_pserver->lua,LUA_GCCOUNT,0)*1024LL; - g_pserver->cron_malloc_stats.allocator_resident = g_pserver->cron_malloc_stats.process_rss - lua_memory; + g_pserver->cron_malloc_stats.allocator_resident = g_pserver->cron_malloc_stats.process_rss; + + redisNamespace *ns; + dictEntry *de; + dictIterator *di; + di = dictGetSafeIterator(g_pserver->namespaces); + while((de = dictNext(di)) != NULL) { + ns = (struct redisNamespace *) dictGetVal(de); + g_pserver->cron_malloc_stats.allocator_resident -= lua_gc(ns->lua,LUA_GCCOUNT,0)*1024LL; + } + dictReleaseIterator(di); } if (!g_pserver->cron_malloc_stats.allocator_active) g_pserver->cron_malloc_stats.allocator_active = g_pserver->cron_malloc_stats.allocator_resident; @@ -2733,6 +2762,7 @@ void createSharedObjects(void) { "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", dictid_len, dictid_str))); } + shared.messagebulk = makeObjectShared("$7\r\nmessage\r\n",13); shared.pmessagebulk = makeObjectShared("$8\r\npmessage\r\n",14); shared.subscribebulk = makeObjectShared("$9\r\nsubscribe\r\n",15); @@ -2741,6 +2771,7 @@ void createSharedObjects(void) { shared.punsubscribebulk = makeObjectShared("$12\r\npunsubscribe\r\n",19); /* Shared command names */ + shared.allocate = makeObjectShared("ALLOCATE", 8); shared.del = makeObjectShared("DEL",3); shared.unlink = makeObjectShared("UNLINK",6); shared.rpop = makeObjectShared("RPOP",4); @@ -2906,6 +2937,7 @@ void initServerConfig(void) { /* Replication related */ g_pserver->masters = listCreate(); + g_pserver->repl_init_masters = listCreate(); g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; g_pserver->master_repl_offset = 0; @@ -2962,6 +2994,7 @@ void initServerConfig(void) { cserver.hdelCommand = lookupCommandByCString("hdel"); cserver.zremCommand = lookupCommandByCString("zrem"); cserver.lmoveCommand = lookupCommandByCString("lmove"); + cserver.allocateCommand = lookupCommandByCString("allocate"); /* Debugging */ g_pserver->watchdog_period = 0; @@ -3348,6 +3381,16 @@ void resetServerStats(void) { g_pserver->stat_total_error_replies = 0; g_pserver->stat_dump_payload_sanitizations = 0; g_pserver->aof_delayed_fsync = 0; + + redisNamespace *ns; + dictEntry *de; + dictIterator *di; + di = dictGetSafeIterator(g_pserver->namespaces); + while((de = dictNext(di)) != NULL) { + ns = (struct redisNamespace *) dictGetVal(de); + ns->stat_numcommands = 0; + } + dictReleaseIterator(di); } /* Make the thread killable at any time, so that kill threads functions @@ -3472,13 +3515,47 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) } } -void initServer(void) { - signal(SIGHUP, SIG_IGN); - signal(SIGPIPE, SIG_IGN); - setupSignalHandlers(); - makeThreadKillable(); +sds generateAutoNamespaceName() { + char uuid[37]; + uuid_t uuid_bin; + uuid_generate(uuid_bin); + uuid_unparse(uuid_bin, uuid); + + //TODO: for readability we might want to add username + // but because we are operating on a fake user here + // the actual user name is not available. + sds auto_ns = sdsempty(); + return sdscatfmt(auto_ns, "::%s", uuid); +} +redisNamespace *getNamespace(const char* name) { + sds ns_name = sdsnew(name); + redisNamespace *ns = (redisNamespace *) dictFetchValue(g_pserver->namespaces, ns_name); + if (ns == NULL) { + ns = (redisNamespace *)zmalloc(sizeof(redisNamespace), MALLOC_LOCAL); + ns = new(ns) redisNamespace; + ns->name = sdsnew(ns_name); + ns->pubsub_channels = dictCreate(&keylistDictType,NULL); + ns->pubsub_patterns = dictCreate(&keylistDictType,NULL); + ns->db = (redisDb**)zcalloc(sizeof(void *)*std::min(cserver.dbnum,cserver.ns_dbnum), MALLOC_LOCAL); + ns->stat_numcommands = 0; + ns->stat_commands = (redisNamespaceCommandStats*)zcalloc(sizeof(redisNamespaceCommandStats)*dictSize(g_pserver->commands), MALLOC_LOCAL); + resetCommandTableStatsNs(ns); + scriptingInit(ns); + replicationScriptCacheInit(ns); + + dictAdd(g_pserver->namespaces, sdsnew(ns_name), ns); + } + sdsfree(ns_name); + + return ns; +} + +void initDb(void) { + g_pserver->namespaces = dictCreate(&namespaceDictType,NULL); + g_pserver->default_namespace = getNamespace("::"); g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL); + g_pserver->last_allocated_db = -1; /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { @@ -3490,11 +3567,19 @@ void initServer(void) { g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].id = j; + g_pserver->db[j].mapped_id = -1; g_pserver->db[j].avg_ttl = 0; g_pserver->db[j].last_expire_set = 0; g_pserver->db[j].defrag_later = listCreate(); listSetFreeMethod(g_pserver->db[j].defrag_later,(void (*)(const void*))sdsfree); } +} + +void initServer(void) { + signal(SIGHUP, SIG_IGN); + signal(SIGPIPE, SIG_IGN); + setupSignalHandlers(); + makeThreadKillable(); /* Fixup Master Client Database */ listIter li; @@ -3541,8 +3626,6 @@ void initServer(void) { serverLog(LL_NOTICE, "monotonic clock: %s", clk_msg); evictionPoolAlloc(); /* Initialize the LRU keys pool. */ - g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL); - g_pserver->pubsub_patterns = dictCreate(&keylistDictType,NULL); g_pserver->cronloops = 0; g_pserver->propagate_in_transaction = 0; g_pserver->client_pause_in_transaction = 0; @@ -3625,8 +3708,7 @@ void initServer(void) { uuid_generate((unsigned char*)cserver.uuid); if (g_pserver->cluster_enabled) clusterInit(); - replicationScriptCacheInit(); - scriptingInit(1); + scriptingSetup(); slowlogInit(); latencyMonitorInit(); @@ -3738,6 +3820,7 @@ void populateCommandTable(void) { void resetCommandTableStats(void) { struct redisCommand *c; + redisNamespace *ns; dictEntry *de; dictIterator *di; @@ -3751,6 +3834,28 @@ void resetCommandTableStats(void) { } dictReleaseIterator(di); + di = dictGetSafeIterator(g_pserver->namespaces); + while((de = dictNext(di)) != NULL) { + ns = (redisNamespace *) dictGetVal(de); + resetCommandTableStatsNs(ns); + } + dictReleaseIterator(di); +} + +void resetCommandTableStatsNs(redisNamespace *ns) { + struct redisCommand *c; + dictEntry *de; + dictIterator *di; + + di = dictGetSafeIterator(g_pserver->commands); + while((de = dictNext(di)) != NULL) { + c = (struct redisCommand *) dictGetVal(de); + ns->stat_commands[c->id].microseconds = 0; + ns->stat_commands[c->id].calls = 0; + ns->stat_commands[c->id].rejected_calls = 0; + ns->stat_commands[c->id].failed_calls = 0; + } + dictReleaseIterator(di); } static void zfree_noconst(void *p) { @@ -4030,6 +4135,7 @@ void call(client *c, int flags) { * the same error twice. */ if ((g_pserver->stat_total_error_replies - prev_err_count) > 0) { real_cmd->failed_calls++; + c->ns->stat_commands[real_cmd->id].failed_calls++; } /* After executing command, we will close the client after writing entire @@ -4089,6 +4195,8 @@ void call(client *c, int flags) { if (flags & CMD_CALL_STATS) { real_cmd->microseconds += duration; real_cmd->calls++; + c->ns->stat_commands[real_cmd->id].microseconds += duration; + c->ns->stat_commands[real_cmd->id].calls++; } /* Propagate the command into the AOF and replication link */ @@ -4194,6 +4302,7 @@ void call(client *c, int flags) { } g_pserver->stat_numcommands++; + c->ns->stat_numcommands++; serverTL->fixed_time_expire--; prev_err_count = g_pserver->stat_total_error_replies; @@ -5110,7 +5219,7 @@ sds genRedisInfoString(const char *section) { size_t zmalloc_used = zmalloc_used_memory(); size_t total_system_mem = cserver.system_memory_size; const char *evict_policy = evictPolicyToString(); - long long memory_lua = g_pserver->lua ? (long long)lua_gc(g_pserver->lua,LUA_GCCOUNT,0)*1024 : 0; + long long memory_lua = g_pserver->default_namespace->lua ? (long long)lua_gc(g_pserver->default_namespace->lua,LUA_GCCOUNT,0)*1024 : 0; //TODO: all namespaces struct redisMemOverhead *mh = getMemoryOverheadData(); /* Peak memory is updated from time to time by serverCron() so it @@ -5192,7 +5301,7 @@ sds genRedisInfoString(const char *section) { used_memory_lua_hmem, (long long) mh->lua_caches, used_memory_scripts_hmem, - dictSize(g_pserver->lua_scripts), + dictSize(g_pserver->default_namespace->lua_scripts), g_pserver->maxmemory, maxmemory_hmem, evict_policy, @@ -5409,8 +5518,8 @@ sds genRedisInfoString(const char *section) { g_pserver->stat_evictedkeys, g_pserver->stat_keyspace_hits, g_pserver->stat_keyspace_misses, - dictSize(g_pserver->pubsub_channels), - dictSize(g_pserver->pubsub_patterns), + dictSize(g_pserver->default_namespace->pubsub_channels), //TODO: all namespaces + dictSize(g_pserver->default_namespace->pubsub_patterns), //TODO: all namespaces g_pserver->stat_fork_time, g_pserver->stat_total_forks, dictSize(g_pserver->migrate_cached_sockets), @@ -5657,6 +5766,31 @@ sds genRedisInfoString(const char *section) { if (tmpsafe != NULL) zfree(tmpsafe); } dictReleaseIterator(di); + + dictIterator *di_ns; + dictEntry *de_ns; + di_ns = dictGetSafeIterator(g_pserver->namespaces); + while((de_ns = dictNext(di_ns)) != NULL) { + redisNamespace *ns = (redisNamespace *) dictGetVal(de_ns); + if (sections++) info = sdscat(info, "\r\n"); + info = sdscatprintf(info, "# Commandstats %s\r\n", ns->name); + di = dictGetSafeIterator(g_pserver->commands); + while ((de = dictNext(di)) != NULL) { + char *tmpsafe; + c = (struct redisCommand *) dictGetVal(de); + if (!c->calls && !c->failed_calls && !c->rejected_calls) + continue; + info = sdscatprintf(info, + "cmdstat_%s:calls=%lld,usec=%lld,usec_per_call=%.2f" + ",rejected_calls=%lld,failed_calls=%lld\r\n", + getSafeInfoString(c->name, strlen(c->name), &tmpsafe), c->calls, c->microseconds, + (c->calls == 0) ? 0 : ((float) c->microseconds / c->calls), + c->rejected_calls, c->failed_calls); + if (tmpsafe != NULL) zfree(tmpsafe); + } + dictReleaseIterator(di); + } + dictReleaseIterator(di_ns); } /* Error statistics */ if (allsections || defsections || !strcasecmp(section,"errorstats")) { @@ -6817,6 +6951,24 @@ int main(int argc, char **argv) { validateConfiguration(); + initDb(); + + if (listLength(g_pserver->repl_init_masters)) { + listIter li; + listNode *ln; + listRewind(g_pserver->repl_init_masters, &li); + while ((ln = listNext(&li))) + { + struct redisMasterConnInfo *mi = (struct redisMasterConnInfo*)listNodeValue(ln); + + replicationAddMaster(mi->ip, mi->port); + sdsfree(mi->ip); + zfree(mi); + listDelNode(g_pserver->repl_init_masters, ln); + } + } + listRelease(g_pserver->repl_init_masters); + for (int iel = 0; iel < cserver.cthreads; ++iel) { initServerThread(g_pserver->rgthreadvar+iel, iel == IDX_EVENT_LOOP_MAIN); diff --git a/src/server.h b/src/server.h index 22ec2fa07..86a1b7d97 100644 --- a/src/server.h +++ b/src/server.h @@ -970,6 +970,8 @@ typedef struct clientReplyBlock { #endif } clientReplyBlock; +struct redisNamespace; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ @@ -980,6 +982,7 @@ struct redisDb { ~redisDb(); + redisNamespace *ns; ::dict *dict; /* The keyspace for this DB */ expireset *setexpire; expireset::setiter expireitr; @@ -987,12 +990,33 @@ struct redisDb { ::dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ ::dict *ready_keys; /* Blocked keys that received a PUSH */ ::dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ - int id; /* Database ID */ + int id; /* Global database ID */ + int mapped_id; /* Namespaced database ID */ long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ }; +struct redisNamespaceCommandStats { + long long microseconds, calls, rejected_calls, failed_calls; +}; + +struct redisNamespace { + sds name; + redisDb **db; + ::dict *pubsub_channels; /* Map channels to list of subscribed clients */ + ::dict *pubsub_patterns; /* A dict of pubsub_patterns */ + ::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ + lua_State *lua; /* The Lua interpreter. We use one per namespace */ + unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ + ::dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ + list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ + unsigned int repl_scriptcache_size; /* Max number of elements. */ + + redisNamespaceCommandStats *stat_commands; + long long stat_numcommands; +}; + /* Declare database backup that include redis main DBs and slots to keys map. * Definition is in db.c. We can't define it here since we define CLUSTER_SLOTS * in cluster.h. */ @@ -1102,6 +1126,7 @@ typedef struct readyList { typedef struct { sds name; /* The username as an SDS string. */ + sds ns_name; /* The namespace of this user as an SDS string. */ uint64_t flags; /* See USER_FLAG_* */ /* The bit in allowed_commands is set if this user has the right to @@ -1141,6 +1166,7 @@ typedef struct client { connection *conn; int resp; /* RESP protocol version. Can be 2 or 3. */ redisDb *db; /* Pointer to currently SELECTed DB. */ + redisNamespace *ns; /* Pointer to currently active namespace. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ @@ -1286,7 +1312,7 @@ struct sharedObjectsStruct { *time, *pxat, *px, *retrycount, *force, *justid, *lastid, *ping, *replping, *setid, *keepttl, *load, *createconsumer, *getack, *special_asterick, *special_equals, *default_username, - *hdel, *zrem, *mvccrestore, *pexpirememberat, *redacted, + *hdel, *zrem, *mvccrestore, *pexpirememberat, *redacted, *allocate, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1499,6 +1525,11 @@ struct redisServerThreadVars { std::vector vecclientsProcess; }; +struct redisMasterConnInfo { + char *ip; + int port; +}; + struct redisMaster { char *masteruser; /* AUTH with this user and masterauth with master */ char *masterauth; /* AUTH with this password with master */ @@ -1551,11 +1582,12 @@ struct redisServerConst { *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, *xgroupCommand, *rreplayCommand, *rpoplpushCommand, - *hdelCommand, *zremCommand, *lmoveCommand; + *hdelCommand, *zremCommand, *lmoveCommand, *allocateCommand; /* Configuration */ char *default_masteruser; /* AUTH with this user and masterauth with master */ char *default_masterauth; /* AUTH with this password with master */ + char *default_user_namespace; /* default user namespace */ int verbosity; /* Loglevel in keydb.conf */ int maxidletime; /* Client timeout in seconds */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ @@ -1571,6 +1603,7 @@ struct redisServerConst { unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ + int ns_dbnum; /* Maximum number of configured databases per namespace */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ int daemonize; /* True if running as a daemon */ @@ -1599,7 +1632,10 @@ struct redisServer { mode_t umask; /* The umask value of the process on startup */ std::atomic hz; /* serverCron() calls frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ + int last_allocated_db; /* store last allocated db, to speed up allocation */ redisDb *db; + redisNamespace *default_namespace; + ::dict *namespaces; ::dict *commands; /* Command table */ ::dict *orig_commands; /* Command table before command renaming. */ @@ -1834,7 +1870,8 @@ struct redisServer { int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (replica) */ list *masters; - int enable_multimaster; + list *repl_init_masters; /* temp variable to hold the master ip/port pairs to init after config parsing */ + int enable_multimaster; int repl_timeout; /* Timeout after N seconds of master idle */ int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ @@ -1847,10 +1884,6 @@ struct redisServer { int slave_announce_port; /* Give the master this listening port. */ char *slave_announce_ip; /* Give the master this ip address. */ int repl_slave_lazy_flush; /* Lazy FLUSHALL before loading DB? */ - /* Replication script cache. */ - ::dict *repl_scriptcache_dict; /* SHA1 all slaves are aware of. */ - list *repl_scriptcache_fifo; /* First in, first out LRU eviction. */ - unsigned int repl_scriptcache_size; /* Max number of elements. */ /* Synchronous replication. */ list *clients_waiting_acks; /* Clients waiting in WAIT command. */ int get_ack_from_slaves; /* If true we send REPLCONF GETACK. */ @@ -1901,8 +1934,6 @@ struct redisServer { size_t blocking_op_nesting; /* Nesting level of blocking operation, used to reset blocked_last_cron. */ long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ /* Pubsub */ - dict *pubsub_channels; /* Map channels to list of subscribed clients */ - dict *pubsub_patterns; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ @@ -1929,11 +1960,8 @@ struct redisServer { is down? */ int cluster_config_file_lock_fd; /* cluster config fd, will be flock */ /* Scripting */ - lua_State *lua; /* The Lua interpreter. We use just one for all clients */ client *lua_caller = nullptr; /* The client running EVAL right now, or NULL */ char* lua_cur_script = nullptr; /* SHA1 of the script currently running, or NULL */ - ::dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ - unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ mstime_t lua_time_limit; /* Script timeout in milliseconds */ monotime lua_time_start; /* monotonic timer to detect timed-out script */ mstime_t lua_time_snapshot; /* Snapshot of mstime when script is started */ @@ -2134,6 +2162,10 @@ extern dictType sdsReplyDictType; * Functions prototypes *----------------------------------------------------------------------------*/ +/* Namespaces */ +redisNamespace *getNamespace(const char* name); +sds generateAutoNamespaceName(); + /* Modules */ void moduleInitModulesSystem(void); void moduleInitModulesSystemLast(void); @@ -2423,10 +2455,10 @@ void resizeReplicationBacklog(long long newsize); struct redisMaster *replicationAddMaster(char *ip, int port); void replicationUnsetMaster(struct redisMaster *mi); void refreshGoodSlavesCount(void); -void replicationScriptCacheInit(void); +void replicationScriptCacheInit(redisNamespace *ns); void replicationScriptCacheFlush(void); -void replicationScriptCacheAdd(sds sha1); -int replicationScriptCacheExists(sds sha1); +void replicationScriptCacheAdd(redisNamespace *ns, sds sha1); +int replicationScriptCacheExists(redisNamespace *ns, sds sha1); void processClientsWaitingReplicas(void); void unblockClientWaitingReplicas(client *c); int replicationCountAcksByOffset(long long offset); @@ -2633,6 +2665,7 @@ void updateDictResizePolicy(void); int htNeedsResize(dict *dict); void populateCommandTable(void); void resetCommandTableStats(void); +void resetCommandTableStatsNs(redisNamespace *ns); void resetErrorTableStats(void); void adjustOpenFilesLimit(void); void incrementErrorCount(const char *fullerr, size_t namelen); @@ -2700,7 +2733,7 @@ int hashZiplistValidateIntegrity(unsigned char *zl, size_t size, int deep); /* Pub / Sub */ int pubsubUnsubscribeAllChannels(client *c, int notify); int pubsubUnsubscribeAllPatterns(client *c, int notify); -int pubsubPublishMessage(robj *channel, robj *message); +int pubsubPublishMessage(redisNamespace *ns, robj *channel, robj *message); void addReplyPubsubMessage(client *c, robj *channel, robj *msg); /* Keyspace events notification */ @@ -2760,7 +2793,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, void(callback)(void*)); -void flushAllDataAndResetRDB(int flags); +void flushAllDataAndResetRDB(redisNamespace *ns, int flags); long long dbTotalServerKeyCount(); dbBackup *backupDb(void); void restoreDbBackup(dbBackup *buckup); @@ -2768,6 +2801,8 @@ void discardDbBackup(dbBackup *buckup, int flags, void(callback)(void*)); int selectDb(client *c, int id); +int selectDbNamespaced(client *c, int id); +void printDbMap(); void signalModifiedKey(client *c, redisDb *db, robj *key); void signalFlushedDb(int dbid, int async); unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count); @@ -2807,7 +2842,7 @@ void clusterInit(void); extern "C" unsigned short crc16(const char *buf, int len); unsigned int keyHashSlot(char *key, int keylen); void clusterCron(void); -void clusterPropagatePublish(robj *channel, robj *message); +void clusterPropagatePublish(redisNamespace *ns, robj *channel, robj *message); void migrateCloseTimedoutSockets(void); void clusterBeforeSleep(void); int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, unsigned char *payload, uint32_t len); @@ -2829,7 +2864,8 @@ int redis_check_rdb_main(int argc, const char **argv, FILE *fp); int redis_check_aof_main(int argc, char **argv); /* Scripting */ -void scriptingInit(int setup); +void scriptingSetup(); +void scriptingInit(redisNamespace *ns); int ldbRemoveChild(pid_t pid); void ldbKillForkedSessions(void); int ldbPendingChildren(void); @@ -2922,6 +2958,8 @@ void incrbyCommand(client *c); void decrbyCommand(client *c); void incrbyfloatCommand(client *c); void selectCommand(client *c); +void selectNsCommand(client *c); +void allocateCommand(client *c); void swapdbCommand(client *c); void randomkeyCommand(client *c); void keysCommand(client *c); diff --git a/tests/assets/user-namespaces.acl b/tests/assets/user-namespaces.acl new file mode 100644 index 000000000..f852698f8 --- /dev/null +++ b/tests/assets/user-namespaces.acl @@ -0,0 +1,4 @@ +user alice on ~* +@all -@dangerous -@admin >alice ::ns1 +user bob on ~* +@all -@dangerous -@admin >bob ::ns2 +user b2 on ~* +@all -@dangerous -@admin >b2 ::ns2 +user default on nopass ~* +@all :: diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 104c299be..e43b41f04 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -1,7 +1,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*} + } {*addr=*:* fd=* age=* idle=* flags=N db=9* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client* ns=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -11,7 +11,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {*addr=*:* fd=* age=* idle=* flags=N db=9 sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client*} + } {*addr=*:* fd=* age=* idle=* flags=N db=9* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client* ns=*} test {MONITOR can log executed commands} { set rd [redis_deferring_client] @@ -147,6 +147,7 @@ start_server {tags {"introspection"}} { supervised syslog-facility databases + databases-per-namespace io-threads logfile unixsocketperm @@ -160,6 +161,7 @@ start_server {tags {"introspection"}} { active-replica bind set-proc-title + acl-namespace-default } if {!$::tls} { diff --git a/tests/unit/namespace.tcl b/tests/unit/namespace.tcl new file mode 100644 index 000000000..7d5f60ad2 --- /dev/null +++ b/tests/unit/namespace.tcl @@ -0,0 +1,101 @@ +set server_path [tmpdir "server.namepsace"] +exec cp -f tests/assets/user-namespaces.acl $server_path +start_server [list overrides [list "dir" $server_path "aclfile" "user-namespaces.acl"]] { + # user alice on ~* +@all -@dangerous -@admin >alice ::ns1 + # user bob on ~* +@all -@dangerous -@admin >bob ::ns2 + # user b2 on ~* +@all -@dangerous -@admin >b2 ::ns2 + # user default on nopass ~* +@all :: + + test {::auto generates unique namespace} { + r ACL setuser auto1 on ~* +@all -@dangerous -@admin >auto ::auto + set ns1 [dict get [r ACL getuser auto1] namespace] + assert_match "::*-*-*-*" $ns1 + r ACL setuser auto2 on ~* +@all -@dangerous -@admin >auto ::auto + set ns2 [dict get [r ACL getuser auto2] namespace] + assert_match "::*-*-*-*" $ns2 + assert {$ns1 != $ns2} + } + + test {alice can set keys in multiple databases} { + r AUTH alice alice + r SELECT 0 + r SET name alice0 + r SELECT 1 + r SET name alice1 + r SELECT 2 + r SET name alice2 + } + + test {bob does not have acces to the latest db of alice} { + r AUTH bob bob + assert_equal "" [r GET name] + } + + test {bob can set keys in multiple databases} { + r AUTH bob bob + r SELECT 0 + r SET name bob0 + r SELECT 1 + r SET name bob1 + r SELECT 2 + r SET name bob2 + } + + test {alice's keys are not overwritten by bob} { + r AUTH alice alice + r SELECT 0 + assert_equal "alice0" [r GET name] + r SELECT 1 + assert_equal "alice1" [r GET name] + r SELECT 2 + assert_equal "alice2" [r GET name] + } + + test {bob's keys are not overwritten by alice} { + r AUTH bob bob + r SELECT 0 + assert_equal "bob0" [r GET name] + r SELECT 1 + assert_equal "bob1" [r GET name] + r SELECT 2 + assert_equal "bob2" [r GET name] + } + + test {b2 can see bob's keys} { + r AUTH b2 b2 + r SELECT 0 + assert_equal "bob0" [r GET name] + r SELECT 1 + assert_equal "bob1" [r GET name] + r SELECT 2 + assert_equal "bob2" [r GET name] + } + + test {admin can select databases in different namespaces} { + r AUTH default "" + r SELECTNS 0 ::ns1 + assert_equal "alice0" [r GET name] + r SELECTNS 1 ::ns1 + assert_equal "alice1" [r GET name] + r SELECTNS 0 ::ns2 + assert_equal "bob0" [r GET name] + r SELECTNS 1 ::ns2 + assert_equal "bob1" [r GET name] + } + + test {Scripts commands should not affect other namespaces} { + r AUTH alice alice + set sha [r SCRIPT LOAD "return true"] + assert_equal "1" [r SCRIPT EXISTS $sha] + r AUTH bob bob + assert_equal "0" [r SCRIPT EXISTS $sha] + set sha2 [r SCRIPT LOAD "return true"] + assert_equal $sha $sha2 + r SCRIPT FLUSH + assert_equal "0" [r SCRIPT EXISTS $sha] + r AUTH alice alice + assert_equal "1" [r SCRIPT EXISTS $sha] + r EVALSHA $sha 0 + } +} + diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index e11030f95..53537f85b 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -2,6 +2,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { r debug force-master yes + r allocate :: 4 0 r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -11,6 +12,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY db different} { r debug force-master yes + r allocate :: 2 2 r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2