Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement hash load balancing method #13

Merged
merged 1 commit into from
Jul 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,22 @@ Takes an optional id parameter, this *must* be unique if multiple instances of u
Initialises the background thread, should be called in `init_worker_by_lua`

### connect
`syntax: ok, err = upstream:connect(client?)`
`syntax: ok, err = upstream:connect(client?, key?)`

Attempts to connect to a host in the defined pools in priority order using the selected load balancing method.
Returns a connected socket and a table containing the connected `host`, `poolid` and `pool` or nil and an error message.

When passed a [socket](https://github.com/openresty/lua-nginx-module#ngxsockettcp) or resty module it will return the same object after successful connection or nil.

Additionally, hash methods may take an optional `key` to define how to hash the connection to determine the host. By default `ngx.var.remote_addr` is used. This value is ignored when the pool's method is round robin.

```lua
resty_redis = require('resty.redis')
local redis = resty_redis.new()

local redis, err = upstream:connect(redis)
local key = ngx.req.get_headers()["X-Forwarded-For"]

local redis, err = upstream:connect(redis, key)

if not redis then
ngx.log(ngx.ERR, err)
Expand Down Expand Up @@ -250,7 +254,7 @@ Returns a new api object using the provided upstream object.
`syntax: ok, err = api:set_method(poolid, method)`

Sets the load balancing method for the specified pool.
Currently only randomised round robin is supported.
Currently randomised round robin and hashing methods are supported.

### create_pool
`syntax: ok, err = api:create_pool(pool)`
Expand All @@ -259,7 +263,7 @@ Creates a new pool from a table of options, `pool` must contain at least 1 key `

Other valid options are

* `method` Balancing method, currently only `round_robin` is supported
* `method` Balancing method
* `timeout` Connection timeout in ms
* `priority` Higher priority pools are used later
* `read_timeout`
Expand Down
110 changes: 107 additions & 3 deletions lib/resty/upstream/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,111 @@ function _M.connect_failed(self, host, poolid, failed_hosts)
end


local function get_hash_host(vars)
local h = vars.hash
local hosts = vars.available_hosts
local weight_sum = vars.weight_sum
local hostcount = #hosts

if hostcount == 0 then return end

local cur_idx = 1

-- figure where we should go
local cur_weight = hosts[cur_idx].weight

while (h >= cur_weight) do
h = h - cur_weight

if (h < 0) then
h = maxweight + h
end

cur_idx = cur_idx + 1

if (cur_idx > hostcount) then
cur_idx = 1
end

cur_weight = hosts[cur_idx].weight
end

-- now cur_idx points us to where we should go
return hosts[cur_idx]
end

function get_hash_vars(hosts, failed_hosts, key)
local available_hosts = {} -- new tab needed here
local n = 0
local weight_sum = 0

for i=1, #hosts do
local host = hosts[i]

if (host.up and not failed_hosts[host.id]) then
n = n + 1
available_hosts[n] = host
weight_sum = weight_sum + host.weight
end
end

local hash = ngx.crc32_short(key) % weight_sum

return {
available_hosts = available_hosts,
weight_sum = weight_sum,
hash = hash,
}
end

_M.available_methods.hash = function(self, pool, sock, key)
local hosts = pool.hosts
local poolid = pool.id
local hash_key = key or ngx.var.remote_addr

local failed_hosts = self:get_failed_hosts(poolid)

-- Attempt a connection
if #hosts == 1 then
-- Don't bother trying to balance between 1 host
local host = hosts[1]
if host.up == false or failed_hosts[host.id] then
return nil, sock, {}, nil
end
local connected, err = sock:connect(host.host, host.port)
if not connected then
self:connect_failed(host, poolid, failed_hosts)
end
return connected, sock, host, err
end

local hash_vars = get_hash_vars(hosts, failed_hosts, hash_key)

local connected, err
repeat
local host = get_hash_host(hash_vars)
if not host then
-- Ran out of hosts, break out of the loop (go to next pool)
break
end

-- Try connecting to the selected host
connected, err = sock:connect(host.host, host.port)

if connected then
return connected, sock, host, err
else
-- Mark the host bad and retry
self:connect_failed(host, poolid, failed_hosts)

-- rehash
hash_vars = get_hash_vars(hosts, failed_hosts, hash_key)
end
until connected
-- All hosts have failed
return nil, sock, {}, err
end

local function select_weighted_rr_host(hosts, failed_hosts, round_robin_vars)
local idx = round_robin_vars.idx
local cw = round_robin_vars.cw
Expand Down Expand Up @@ -569,7 +674,6 @@ local function get_round_robin_vars(self, pool)
return round_robin_vars
end


_M.available_methods.round_robin = function(self, pool, sock)
local hosts = pool.hosts
local poolid = pool.id
Expand Down Expand Up @@ -616,7 +720,7 @@ _M.available_methods.round_robin = function(self, pool, sock)
end


function _M.connect(self, sock)
function _M.connect(self, sock, key)
-- Get pool data
local priority_index, err = self:get_priority_index()
if not priority_index then
Expand Down Expand Up @@ -652,7 +756,7 @@ function _M.connect(self, sock)
set_timeout(sock, pool.timeout)

-- Load balance between available hosts using specified method
connected, sock, host, err = available_methods[pool.method](self, pool, sock)
connected, sock, host, err = available_methods[pool.method](self, pool, sock, key)

if connected then
-- Return connected socket!
Expand Down
Loading