From 0478328c1e7e4d73adaa5b482b68bc8b5afd8d63 Mon Sep 17 00:00:00 2001 From: Uriel Katz Date: Mon, 23 Nov 2015 15:18:40 +0200 Subject: [PATCH] Added support for listening to same port from multiple twemproxy processes --- README.md | 1 + src/nc_conf.c | 11 ++++++++++ src/nc_conf.h | 2 ++ src/nc_connection.c | 3 +++ src/nc_connection.h | 1 + src/nc_proxy.c | 53 ++++++++++++++++++++++++++++++++++----------- src/nc_server.h | 1 + src/nc_util.c | 21 ++++++++++++++++++ src/nc_util.h | 1 + 9 files changed, 81 insertions(+), 13 deletions(-) diff --git a/README.md b/README.md index 08298b37..520b9509 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,7 @@ Twemproxy can be configured through a YAML file specified by the -c or --conf-fi + **auto_eject_hosts**: A boolean value that controls if server should be ejected temporarily when it fails consecutively server_failure_limit times. See [liveness recommendations](notes/recommendation.md#liveness) for information. Defaults to false. + **server_retry_timeout**: The timeout value in msec to wait for before retrying on a temporarily ejected server, when auto_eject_host is set to true. Defaults to 30000 msec. + **server_failure_limit**: The number of consecutive failures on a server that would lead to it being temporarily ejected when auto_eject_host is set to true. Defaults to 2. ++ **reuse_port**: A boolean value that controls if a server pool will use the SO_REUSEPORT option for its socket. Defaults to false. + **servers**: A list of server address, port and weight (name:port:weight or ip:port:weight) for this server pool. diff --git a/src/nc_conf.c b/src/nc_conf.c index fa720796..8df4dc13 100644 --- a/src/nc_conf.c +++ b/src/nc_conf.c @@ -106,6 +106,10 @@ static struct command conf_commands[] = { conf_set_num, offsetof(struct conf_pool, server_failure_limit) }, + { string("reuse_port"), + conf_set_bool, + offsetof(struct conf_pool, reuse_port) }, + { string("servers"), conf_add_server, offsetof(struct conf_pool, server) }, @@ -205,6 +209,7 @@ conf_pool_init(struct conf_pool *cp, struct string *name) cp->server_connections = CONF_UNSET_NUM; cp->server_retry_timeout = CONF_UNSET_NUM; cp->server_failure_limit = CONF_UNSET_NUM; + cp->reuse_port = CONF_UNSET_NUM; array_null(&cp->server); @@ -302,6 +307,7 @@ conf_pool_each_transform(void *elem, void *data) sp->server_failure_limit = (uint32_t)cp->server_failure_limit; sp->auto_eject_hosts = cp->auto_eject_hosts ? 1 : 0; sp->preconnect = cp->preconnect ? 1 : 0; + sp->reuse_port = cp->reuse_port ? 1 : 0; status = server_init(&sp->server, &cp->server, sp); if (status != NC_OK) { @@ -352,6 +358,7 @@ conf_dump(struct conf *cf) cp->server_retry_timeout); log_debug(LOG_VVERB, " server_failure_limit: %d", cp->server_failure_limit); + log_debug(LOG_VVERB, " reuse_port: %d", cp->reuse_port); nserver = array_n(&cp->server); log_debug(LOG_VVERB, " servers: %"PRIu32"", nserver); @@ -1269,6 +1276,10 @@ conf_validate_pool(struct conf *cf, struct conf_pool *cp) cp->server_failure_limit = CONF_DEFAULT_SERVER_FAILURE_LIMIT; } + if (cp->reuse_port == CONF_UNSET_NUM) { + cp->reuse_port = CONF_DEFAULT_REUSE_PORT; + } + if (!cp->redis && cp->redis_auth.len > 0) { log_error("conf: directive \"redis_auth:\" is only valid for a redis pool"); return NC_ERROR; diff --git a/src/nc_conf.h b/src/nc_conf.h index 296afc46..ce562736 100644 --- a/src/nc_conf.h +++ b/src/nc_conf.h @@ -54,6 +54,7 @@ #define CONF_DEFAULT_SERVER_FAILURE_LIMIT 2 #define CONF_DEFAULT_SERVER_CONNECTIONS 1 #define CONF_DEFAULT_KETAMA_PORT 11211 +#define CONF_DEFAULT_REUSE_PORT false #define CONF_DEFAULT_TCPKEEPALIVE false struct conf_listen { @@ -93,6 +94,7 @@ struct conf_pool { int server_connections; /* server_connections: */ int server_retry_timeout; /* server_retry_timeout: in msec */ int server_failure_limit; /* server_failure_limit: */ + int reuse_port; /* reuse_port: */ struct array server; /* servers: conf_server[] */ unsigned valid:1; /* valid? */ }; diff --git a/src/nc_connection.c b/src/nc_connection.c index a0d1d9da..e41de7d0 100644 --- a/src/nc_connection.c +++ b/src/nc_connection.c @@ -156,8 +156,10 @@ _conn_get(void) conn->eof = 0; conn->done = 0; conn->redis = 0; + conn->reuse_port = 0; conn->authenticated = 0; + ntotal_conn++; ncurr_conn++; @@ -256,6 +258,7 @@ conn_get_proxy(void *owner) } conn->redis = pool->redis; + conn->reuse_port = pool->reuse_port; conn->proxy = 1; diff --git a/src/nc_connection.h b/src/nc_connection.h index 7ece25ef..06a0353a 100644 --- a/src/nc_connection.h +++ b/src/nc_connection.h @@ -89,6 +89,7 @@ struct conn { unsigned done:1; /* done? aka close? */ unsigned redis:1; /* redis? */ unsigned authenticated:1; /* authenticated? */ + unsigned reuse_port:1; /* reuse_port? */ }; TAILQ_HEAD(conn_tqh, conn); diff --git a/src/nc_proxy.c b/src/nc_proxy.c index 38cdd9e6..34fc6e4f 100644 --- a/src/nc_proxy.c +++ b/src/nc_proxy.c @@ -90,7 +90,7 @@ proxy_close(struct context *ctx, struct conn *conn) } static rstatus_t -proxy_reuse(struct conn *p) +proxy_reuse(struct conn *p, bool reuse_addr) { rstatus_t status; struct sockaddr_un *un; @@ -98,18 +98,30 @@ proxy_reuse(struct conn *p) switch (p->family) { case AF_INET: case AF_INET6: - status = nc_set_reuseaddr(p->sd); + if (reuse_addr) { + status = nc_set_reuseaddr(p->sd); + } + else { + status = nc_set_reuseport(p->sd); + } break; case AF_UNIX: - /* - * bind() will fail if the pathname already exist. So, we call unlink() - * to delete the pathname, in case it already exists. If it does not - * exist, unlink() returns error, which we ignore - */ - un = (struct sockaddr_un *) p->addr; - unlink(un->sun_path); - status = NC_OK; + // Shouldn't accept reuseport for Unix domain sockets + if (reuse_addr) { + /* + * bind() will fail if the pathname already exist. So, we call unlink() + * to delete the pathname, in case it already exists. If it does not + * exist, unlink() returns error, which we ignore + */ + un = (struct sockaddr_un *) p->addr; + unlink(un->sun_path); + status = NC_OK; + } + else { + errno = ENOPROTOOPT; + status = NC_ERROR; + } break; default: @@ -134,14 +146,29 @@ proxy_listen(struct context *ctx, struct conn *p) return NC_ERROR; } - status = proxy_reuse(p); + status = proxy_reuse(p, true); if (status < 0) { - log_error("reuse of addr '%.*s' for listening on p %d failed: %s", + log_error("reuse_addr of addr '%.*s' for listening on p %d failed: %s", pool->addrstr.len, pool->addrstr.data, p->sd, strerror(errno)); return NC_ERROR; } + /* + (c) Copyright IBM Corp. 2015 + */ + if (p->reuse_port) { + status = proxy_reuse(p, false); + if (status < 0) { + log_error("reuse_port of addr '%.*s' for listening on p %d failed: %s\n\ +NOTICE that the reuse_port option is available only on BSD \ +distros or linux distros with linux kernel version 3.9 or above", + pool->addrstr.len, pool->addrstr.data, p->sd, + strerror(errno)); + return NC_ERROR; + } + } + status = bind(p->sd, p->addr, p->addrlen); if (status < 0) { log_error("bind on p %d to addr '%.*s' failed: %s", p->sd, @@ -294,7 +321,7 @@ proxy_accept(struct context *ctx, struct conn *p) return NC_OK; } - /* + /* * Workaround of https://github.com/twitter/twemproxy/issues/97 * * We should never reach here because the check for conn_ncurr_cconn() diff --git a/src/nc_server.h b/src/nc_server.h index 11cdaa77..eef63802 100644 --- a/src/nc_server.h +++ b/src/nc_server.h @@ -120,6 +120,7 @@ struct server_pool { unsigned auto_eject_hosts:1; /* auto_eject_hosts? */ unsigned preconnect:1; /* preconnect? */ unsigned redis:1; /* redis? */ + unsigned reuse_port:1; /* reuse_port? */ unsigned tcpkeepalive:1; /* tcpkeepalive? */ }; diff --git a/src/nc_util.c b/src/nc_util.c index e2768089..dbd5c898 100644 --- a/src/nc_util.c +++ b/src/nc_util.c @@ -75,6 +75,27 @@ nc_set_reuseaddr(int sd) return setsockopt(sd, SOL_SOCKET, SO_REUSEADDR, &reuse, len); } + +/* +(c) Copyright IBM Corp. 2015 +*/ +int +nc_set_reuseport(int sd) +{ +#ifdef SO_REUSEPORT + int reuse; + socklen_t len; + + reuse = 1; + len = sizeof(reuse); + + return setsockopt(sd, SOL_SOCKET, SO_REUSEPORT, &reuse, len); +#else + errno = ENOPROTOOPT; + return -1; +#endif + +} /* * Disable Nagle algorithm on TCP socket. * diff --git a/src/nc_util.h b/src/nc_util.h index 986b7aef..8d433b35 100644 --- a/src/nc_util.h +++ b/src/nc_util.h @@ -81,6 +81,7 @@ int nc_set_blocking(int sd); int nc_set_nonblocking(int sd); int nc_set_reuseaddr(int sd); +int nc_set_reuseport(int sd); int nc_set_tcpnodelay(int sd); int nc_set_linger(int sd, int timeout); int nc_set_sndbuf(int sd, int size);