From 7e419b09f31293e9a46949dab9f4ab26595f6ec3 Mon Sep 17 00:00:00 2001 From: Keenan Brock Date: Tue, 14 May 2024 12:43:23 -0400 Subject: [PATCH] generic runners --- lib/floe/runner.rb | 7 +++++++ lib/floe/workflow.rb | 23 ++++++++++++++--------- 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/lib/floe/runner.rb b/lib/floe/runner.rb index 4fb89b88..29bf0884 100644 --- a/lib/floe/runner.rb +++ b/lib/floe/runner.rb @@ -27,6 +27,13 @@ def for_resource(resource) scheme = resource.split("://").first resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]") end + + def runners + @runners.each_value.map do |runner| + runner = runner.call if runner.kind_of?(Proc) + runner + end + end end # Run a command asynchronously and create a runner_context diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index cb0993ef..012d1631 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) logger.info("checking #{workflows.count} workflows...") - run_until = Time.now.utc + timeout if timeout.to_i > 0 - ready = [] - queue = Queue.new - wait_thread = Thread.new do - loop do - Runner.for_resource("docker").wait do |event, runner_context| - queue.push([event, runner_context]) + run_until = Time.now.utc + timeout if timeout.to_i > 0 + ready = [] + queue = Queue.new + wait_threads = + Runner.runners.map do |runner| + next unless runner.respond_to?(:wait) + + Thread.new do + loop do + runner.wait do |event, runner_context| + queue.push([event, runner_context]) + end + end end end - end loop do ready = workflows.select(&:step_nonblock_ready?) @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block) logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready ensure - wait_thread&.kill + wait_threads.compact.map(&:kill) end end