Skip to content

Commit

Permalink
Merge pull request #2 from moonlibs/discovery-enhanced
Browse files Browse the repository at this point in the history
feat: fix retriabe with timeout+deadline and implements box.error
  • Loading branch information
ochaton authored Aug 11, 2023
2 parents 4fd3463 + 4f18ed5 commit 2360ac1
Showing 1 changed file with 71 additions and 18 deletions.
89 changes: 71 additions & 18 deletions discovery/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

---@type DiscoveryPool
local M = {
_VERSION = '0.10.0',
_VERSION = '0.10.1',
}
local log = require 'log'
local fiber = require 'fiber'
Expand Down Expand Up @@ -352,44 +352,88 @@ function M:rebuild()
end

local function tail_call(self, ctx, pcall_ok, ...)
local now = fiber.time()
ctx.total_time = now-ctx.started_at
ctx.execution_time = now-ctx.executed_at

if pcall_ok then
log.verbose("[Proxy=ok] to %s in %.5fs (total: %.5fs)",
ctx.addr, fiber.time()-ctx.executed_at, fiber.time()-ctx.started_at)
ctx.addr, now-ctx.executed_at, now-ctx.started_at)
return ...
end

log.error("call %s to {%s} (attempt=%s,retriable=%s,duration=%.3fs,total=%.3fs,left=%.3fs) failed with: %s",
ctx.method, ctx.addr, ctx.attempt, ctx.retriable,
fiber.time()-ctx.executed_at,
fiber.time()-ctx.started_at,
ctx.deadline - fiber.time(),
ctx.execution_time,
ctx.total_time,
ctx.deadline - now,
...
)

local err = ...
ctx.last_error = box.error.new{
type = 'DiscoveryError',
code = M.errors.execution_failed,
reason = tostring(err),
}

if ctx.retriable then
ctx.last_error = ...
-- local json = require 'json'
-- log.warn("retrying call(%s, %s, %s, %s)",
-- ctx.method, json.encode(ctx.args), json.encode(ctx.opts), json.encode(ctx))
return self:call(ctx.method, ctx.args, ctx.opts, ctx)
end

error(...)
ctx.last_error:raise()
end

M.errors = {
no_route_for_call = 0x100,
execution_timed_out = 0x101,
execution_failed = 0x102,
}

---Calls given method on connection pool
---@param method string
---@param args any[]?
---@param opts table?
---@param ctx table?
---@return ...
function M:call(method, args, opts, ctx)
args = args or {}
opts = opts or {}
opts.timeout = opts.timeout or self.upstream.net_box_timeout

ctx = ctx or {
started_at = fiber.time(),
attempt = 0,
method = method,
max_attempts = tonumber(opts.max_attempts),
args = args,
}
opts.timeout = tonumber(opts.timeout) or self.upstream.net_box_timeout

if type(ctx) ~= 'table' then
ctx = {}
end

ctx.started_at = ctx.started_at or fiber.time()
ctx.attempt = ctx.attempt or 0
ctx.method = ctx.method or method
ctx.max_attempts = ctx.max_attempts or tonumber(opts.max_attempts)
ctx.args = ctx.args or args

local deadline = ctx.deadline
if not deadline then
deadline = tonumber(opts.deadline)
end
if not deadline then
if ctx.max_attempts then
deadline = fiber.time() + opts.timeout * ctx.max_attempts
else
deadline = fiber.time() + opts.timeout
end
end

local deadline = ctx.deadline or opts.deadline or (fiber.time() + opts.timeout)
if deadline < fiber.time() then
error(("Timeout for discovery of %s exceeded"):format(method))
ctx.total_time = fiber.time() - ctx.started_at
ctx.execution_time = ctx.execution_time or -1
box.error{
reason = ("Timeout for discovery of %s exceeded"):format(method),
type = 'DiscoveryError',
code = M.errors.execution_timed_out,
}
end

ctx.deadline = deadline
Expand All @@ -400,6 +444,15 @@ function M:call(method, args, opts, ctx)
log.verbose("No nodes available for %s. Waiting %.3fs", method, deadline - fiber.time())
self.conds[method] = self.conds[method] or fiber.cond()
self.conds[method]:wait(deadline - fiber.time())
if deadline < fiber.time() then
ctx.total_time = fiber.time() - ctx.started_at
ctx.execution_time = ctx.execution_time or -1
box.error{
reason = ("No route for call %s exceeded"):format(method),
type = 'DiscoveryError',
code = M.errors.no_route_for_call,
}
end
return self:call(method, args, opts, ctx)
end

Expand Down

0 comments on commit 2360ac1

Please sign in to comment.