diff --git a/discovery/init.lua b/discovery/init.lua index 871338d..e7bd4af 100644 --- a/discovery/init.lua +++ b/discovery/init.lua @@ -2,7 +2,7 @@ ---@type DiscoveryPool local M = { - _VERSION = '0.10.0', + _VERSION = '0.10.1', } local log = require 'log' local fiber = require 'fiber' @@ -352,44 +352,88 @@ function M:rebuild() end local function tail_call(self, ctx, pcall_ok, ...) + local now = fiber.time() + ctx.total_time = now-ctx.started_at + ctx.execution_time = now-ctx.executed_at + if pcall_ok then log.verbose("[Proxy=ok] to %s in %.5fs (total: %.5fs)", - ctx.addr, fiber.time()-ctx.executed_at, fiber.time()-ctx.started_at) + ctx.addr, now-ctx.executed_at, now-ctx.started_at) return ... end log.error("call %s to {%s} (attempt=%s,retriable=%s,duration=%.3fs,total=%.3fs,left=%.3fs) failed with: %s", ctx.method, ctx.addr, ctx.attempt, ctx.retriable, - fiber.time()-ctx.executed_at, - fiber.time()-ctx.started_at, - ctx.deadline - fiber.time(), + ctx.execution_time, + ctx.total_time, + ctx.deadline - now, ... ) + local err = ... + ctx.last_error = box.error.new{ + type = 'DiscoveryError', + code = M.errors.execution_failed, + reason = tostring(err), + } + if ctx.retriable then - ctx.last_error = ... + -- local json = require 'json' + -- log.warn("retrying call(%s, %s, %s, %s)", + -- ctx.method, json.encode(ctx.args), json.encode(ctx.opts), json.encode(ctx)) return self:call(ctx.method, ctx.args, ctx.opts, ctx) end - error(...) + ctx.last_error:raise() end +M.errors = { + no_route_for_call = 0x100, + execution_timed_out = 0x101, + execution_failed = 0x102, +} + +---Calls given method on connection pool +---@param method string +---@param args any[]? +---@param opts table? +---@param ctx table? +---@return ... function M:call(method, args, opts, ctx) args = args or {} opts = opts or {} - opts.timeout = opts.timeout or self.upstream.net_box_timeout - - ctx = ctx or { - started_at = fiber.time(), - attempt = 0, - method = method, - max_attempts = tonumber(opts.max_attempts), - args = args, - } + opts.timeout = tonumber(opts.timeout) or self.upstream.net_box_timeout + + if type(ctx) ~= 'table' then + ctx = {} + end + + ctx.started_at = ctx.started_at or fiber.time() + ctx.attempt = ctx.attempt or 0 + ctx.method = ctx.method or method + ctx.max_attempts = ctx.max_attempts or tonumber(opts.max_attempts) + ctx.args = ctx.args or args + + local deadline = ctx.deadline + if not deadline then + deadline = tonumber(opts.deadline) + end + if not deadline then + if ctx.max_attempts then + deadline = fiber.time() + opts.timeout * ctx.max_attempts + else + deadline = fiber.time() + opts.timeout + end + end - local deadline = ctx.deadline or opts.deadline or (fiber.time() + opts.timeout) if deadline < fiber.time() then - error(("Timeout for discovery of %s exceeded"):format(method)) + ctx.total_time = fiber.time() - ctx.started_at + ctx.execution_time = ctx.execution_time or -1 + box.error{ + reason = ("Timeout for discovery of %s exceeded"):format(method), + type = 'DiscoveryError', + code = M.errors.execution_timed_out, + } end ctx.deadline = deadline @@ -400,6 +444,15 @@ function M:call(method, args, opts, ctx) log.verbose("No nodes available for %s. Waiting %.3fs", method, deadline - fiber.time()) self.conds[method] = self.conds[method] or fiber.cond() self.conds[method]:wait(deadline - fiber.time()) + if deadline < fiber.time() then + ctx.total_time = fiber.time() - ctx.started_at + ctx.execution_time = ctx.execution_time or -1 + box.error{ + reason = ("No route for call %s exceeded"):format(method), + type = 'DiscoveryError', + code = M.errors.no_route_for_call, + } + end return self:call(method, args, opts, ctx) end