diff --git a/lib/floe/runner.rb b/lib/floe/runner.rb index ffd72fbc6..4623dc86f 100644 --- a/lib/floe/runner.rb +++ b/lib/floe/runner.rb @@ -27,6 +27,13 @@ def for_resource(resource) scheme = resource.split("://").first resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]") end + + def runners + @runners.each_value.map do |runner| + runner = runner.call if runner.kind_of?(Proc) + runner + end + end end # Run a command asynchronously and create a runner_context @@ -75,8 +82,9 @@ def cleanup(_runner_context) raise NotImplementedError, "Must be implemented in a subclass" end - def wait(timeout: nil, events: %i[create update delete]) - raise NotImplementedError, "Must be implemented in a subclass" - end + # running in another thread, this watches for async events + # def wait(timeout: nil, events: %i[create update delete]) + # raise NotImplementedError, "Must be implemented in a subclass" + # end end end diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 92ce77a2b..719d2359e 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) logger.info("checking #{workflows.count} workflows...") - run_until = Time.now.utc + timeout if timeout.to_i > 0 - ready = [] - queue = Queue.new - wait_thread = Thread.new do - loop do - Runner.for_resource("docker").wait do |event, runner_context| - queue.push([event, runner_context]) + run_until = Time.now.utc + timeout if timeout.to_i > 0 + ready = [] + queue = Queue.new + wait_threads = + Runner.runners.map do |runner| + next unless runner.respond_to?(:wait) + + Thread.new do + loop do + runner.wait do |event, runner_context| + queue.push([event, runner_context]) + end + end end end - end loop do ready = workflows.select(&:step_nonblock_ready?) @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block) logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready ensure - wait_thread&.kill + wait_threads.compact.map(&:kill) end end diff --git a/lib/floe/workflow/context.rb b/lib/floe/workflow/context.rb index 92af20228..8978dcb2d 100644 --- a/lib/floe/workflow/context.rb +++ b/lib/floe/workflow/context.rb @@ -24,6 +24,10 @@ def execution @context["Execution"] end + def object + [execution["_object_type"], execution["_object_id"]&.id] + end + def started? execution.key?("StartTime") end