Skip to content

Commit

Permalink
Allow preferred nodes to be configured
Browse files Browse the repository at this point in the history
If a cluster is distributed geographically, it can be preferable to query the local nodes. For readonly commands, we have the posibility to chose a subset of the clsuter nodes that will be prioritized when choosing a node. A specific local slave can therefore be used.
  • Loading branch information
pwdng committed Mar 30, 2018
1 parent 3088665 commit e149ba3
Show file tree
Hide file tree
Showing 11 changed files with 310 additions and 19 deletions.
17 changes: 17 additions & 0 deletions corvus.conf
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,30 @@ syslog 0
#
# * `master`, forward all reading commands to master, the default
# * `read-slave-only`, forward all reading commands to slaves
# * `read-preferred-only` forward all reading commands to a slave or master based on the preferred_node list
# * `both`, forward reading commands to both master and slaves
#
# If new slaves are added to the cluster, `PROXY UPDATESLOTMAP` should be emmited
# to tell corvus to use the newly added slaves.
#
# read-strategy master

#
# When the `read-preferred-only` strategy is configured, the nodes from the `preferred_nodes` list
# will be preferred to any other node for a given slot.
# This is useful if a cluster is distributed geographically. It can be preferable to query the local nodes
# If the cluster shows a preferred node as disconnected, it won't be used but the cluster status will be
# polled until all preferred nodes are available again.
#
# preferred_nodes localhost:8003,localhost:8004

#
# When `read-preferred` read-strategy is used and the cluster shows a preferred node to be disconnected,
# an active polling of the cluster configuration is started to determine as soon as possible when the
# preferred node is back online so that it can be used again.
#
polling_interval 30

# Slowlog
# The following two configs are almost the same with redis.
# Every command whose lantency exceeds `slowlog-log-slower-than` will be considered a slow command,
Expand Down
77 changes: 73 additions & 4 deletions src/config.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ const char * CONFIG_OPTIONS[] = {
"metric_interval",
"stats",
"read-strategy",
"preferred_nodes",
"polling_interval",
"requirepass",
"client_timeout",
"server_timeout",
Expand All @@ -48,6 +50,9 @@ void config_init()
config.bind = 12345;
config.node = cv_calloc(1, sizeof(struct node_conf));
config.node->refcount = 1;
config.preferred_node = cv_calloc(1, sizeof(struct node_conf));
config.preferred_node->refcount = 1;
config.polling_interval = 10;
config.thread = DEFAULT_THREAD;
config.loglevel = INFO;
config.syslog = 0;
Expand All @@ -56,7 +61,7 @@ void config_init()
config.server_timeout = 0;
config.bufsize = DEFAULT_BUFSIZE;
config.requirepass = NULL;
config.readslave = config.readmasterslave = false;
config.readslave = config.readmasterslave = config.readpreferred = false;
config.slowlog_max_len = 1024;
config.slowlog_log_slower_than = -1;
config.slowlog_statsd_enabled = 0;
Expand All @@ -68,6 +73,7 @@ void config_init()
void config_free()
{
config_node_dec_ref(config.node);
config_node_dec_ref(config.preferred_node);
pthread_mutex_destroy(&lock_conf_node);
pthread_mutex_destroy(&lock_config_rewrite);
}
Expand Down Expand Up @@ -106,6 +112,25 @@ void config_set_node(struct node_conf *node)
config_node_dec_ref(oldnode);
}

struct node_conf *config_get_preferred_node()
{
pthread_mutex_lock(&lock_conf_node);
struct node_conf *node = config.preferred_node;
int refcount = ATOMIC_INC(node->refcount, 1);
pthread_mutex_unlock(&lock_conf_node);
assert(refcount >= 1);
return node;
}

void config_set_preferred_node(struct node_conf *node)
{
pthread_mutex_lock(&lock_conf_node);
struct node_conf *oldnode = config.preferred_node;
config.preferred_node = node;
pthread_mutex_unlock(&lock_conf_node);
config_node_dec_ref(oldnode);
}

void config_node_dec_ref(struct node_conf *node)
{
int refcount = ATOMIC_DEC(node->refcount, 1);
Expand Down Expand Up @@ -134,6 +159,24 @@ void config_node_to_str(char *str, size_t max_len)
config_node_dec_ref(nodes);
}

void config_preferred_node_to_str(char *str, size_t max_len)
{
struct node_conf *nodes = config_get_preferred_node();
char buf[ADDRESS_LEN + 1];
for (size_t i = 0; i != nodes->len; i++) {
struct address *addr = &nodes->addr[i];
size_t len = snprintf(buf, max_len, "%s:%u",
addr->ip, addr->port);
size_t comma_len = i > 0 ? 1 : 0;
if (len + comma_len > max_len) break;
if (comma_len) *str++ = ',';
strcpy(str, buf);
str += len;
max_len -= len + comma_len;
}
config_node_dec_ref(nodes);
}

int parse_int(char *s, int *result)
{
const int max = 100000000;
Expand Down Expand Up @@ -188,11 +231,15 @@ int config_add(char *name, char *value)
config.readslave = true;
}
} else if (strcmp(name, "read-strategy") == 0) {
config.readpreferred = false;
if (strcmp(value, "read-slave-only") == 0) {
config.readmasterslave = false;
config.readslave = true;
} else if (strcmp(value, "both") == 0) {
config.readmasterslave = config.readslave = true;
} else if (strcmp(value, "read-preferred") == 0) {
config.readmasterslave = config.readslave = false;
config.readpreferred = true;
} else {
config.readmasterslave = config.readslave = false;
}
Expand Down Expand Up @@ -224,6 +271,9 @@ int config_add(char *name, char *value)
} else if (strcmp(name, "metric_interval") == 0) {
TRY_PARSE_INT();
config.metric_interval = val > 0 ? val : 10;
} else if (strcmp(name, "polling_interval") == 0) {
TRY_PARSE_INT();
config.polling_interval = val > 0 ? val : 10;
} else if (strcmp(name, "loglevel") == 0) {
if (strcasecmp(value, "debug") == 0) {
ATOMIC_SET(config.loglevel, DEBUG);
Expand All @@ -247,7 +297,7 @@ int config_add(char *name, char *value)
config.requirepass = cv_calloc(strlen(value) + 1, sizeof(char));
memcpy(config.requirepass, value, strlen(value));
}
} else if (strcmp(name, "node") == 0) {
} else if (strcmp(name, "node") == 0 || strcmp(name, "preferred_nodes") == 0) {
// strtok will modify `value` to tokenize it.
// Copy it first in case value is a string literal
char buf[strlen(value) + 1];
Expand All @@ -266,14 +316,18 @@ int config_add(char *name, char *value)
p = strtok(NULL, ",");
}
if (addr_cnt == 0) {
LOG(WARN, "received empty node value in config set");
LOG(WARN, "received empty %s value in config set", name);
return CORVUS_ERR;
}
struct node_conf *newnode = cv_malloc(sizeof(struct node_conf));
newnode->addr = addr;
newnode->len = addr_cnt;
newnode->refcount = 1;
config_set_node(newnode);
if (strcmp(name, "node") == 0) {
config_set_node(newnode);
} else {
config_set_preferred_node(newnode);
}
} else if (strcmp(name, "slowlog-log-slower-than") == 0) {
TRY_PARSE_INT();
ATOMIC_SET(config.slowlog_log_slower_than, val);
Expand All @@ -297,6 +351,8 @@ int config_get(const char *name, char *value, size_t max_len)
snprintf(value, max_len, "%u", config.bind);
} else if (strcmp(name, "node") == 0) {
config_node_to_str(value, max_len);
} else if (strcmp(name, "preferred_nodes") == 0) {
config_preferred_node_to_str(value, max_len);
} else if (strcmp(name, "thread") == 0) {
snprintf(value, max_len, "%d", config.thread);
} else if (strcmp(name, "loglevel") == 0) {
Expand Down Expand Up @@ -335,6 +391,8 @@ int config_get(const char *name, char *value, size_t max_len)
snprintf(value, max_len, "%d", config.slowlog_max_len);
} else if (strcmp(name, "slowlog-statsd-enabled") == 0) {
strncpy(value, BOOL_STR(config.slowlog_statsd_enabled), max_len);
} else if (strcmp(name, "polling_interval") == 0) {
snprintf(value, max_len, "%d", config.polling_interval);
} else {
return CORVUS_ERR;
}
Expand Down Expand Up @@ -612,3 +670,14 @@ bool config_option_changable(const char *option)
}
return false;
}

bool config_is_preferred_node(struct address *node)
{
struct node_conf *preferred_node = config_get_preferred_node();
for (size_t i = 0; i < preferred_node->len; i++) {
if (socket_cmp(node, &preferred_node->addr[i]) == 0) {
return true;
}
}
return false;
}
5 changes: 5 additions & 0 deletions src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ struct corvus_config {
char cluster[CLUSTER_NAME_SIZE + 1];
uint16_t bind;
struct node_conf *node;
struct node_conf *preferred_node; /* List of nodes that should be set a higher priority */
int thread;
int loglevel;
bool syslog;
Expand All @@ -25,6 +26,8 @@ struct corvus_config {
bool stats;
bool readslave;
bool readmasterslave;
bool readpreferred;
uint16_t polling_interval; /* Intervall to use when polling for cluster configuration */
char *requirepass;
int64_t client_timeout;
int64_t server_timeout;
Expand All @@ -46,5 +49,7 @@ void config_set_node(struct node_conf *node);
void config_node_dec_ref(struct node_conf *node);
int config_add(char *name, char *value);
bool config_option_changable(const char *option);
struct node_conf *config_get_preferred_node();
bool config_is_preferred_node(struct address *node);

#endif /* end of include guard: CONFIG_H */
11 changes: 9 additions & 2 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -325,14 +325,21 @@ struct connection *conn_get_server(struct context *ctx, uint16_t slot,
bool readonly = false;

if (slot_get_node_addr(slot, &info)) {
addr = &info.nodes[0];
addr = &info.nodes[0].addr;
if (access != CMD_ACCESS_WRITE && config.readslave && info.index > 1) {
int r = rand_r(&ctx->seed);
if (!config.readmasterslave || r % info.index != 0) {
int i = r % (info.index - 1);
addr = &info.nodes[++i];
addr = &info.nodes[++i].addr;
readonly = true;
}
} else if (access != CMD_ACCESS_WRITE && config.readpreferred) {
/*
* If readpreferred is set and we're not running a 'write' command
* use the first node available that's in the preferred list.
*/
addr = &info.preferred_nodes[0].addr;
readonly = true;
}
if (addr->port > 0) {
return conn_get_server_from_pool(ctx, addr, readonly);
Expand Down
Loading

0 comments on commit e149ba3

Please sign in to comment.