Skip to content

Commit

Permalink
generic runners
Browse files Browse the repository at this point in the history
  • Loading branch information
kbrock committed Jun 13, 2024
1 parent e967a86 commit 7e419b0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 9 deletions.
7 changes: 7 additions & 0 deletions lib/floe/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ def for_resource(resource)
scheme = resource.split("://").first
resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]")
end

def runners
@runners.each_value.map do |runner|
runner = runner.call if runner.kind_of?(Proc)
runner
end
end
end

# Run a command asynchronously and create a runner_context
Expand Down
23 changes: 14 additions & 9 deletions lib/floe/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block)
workflows = [workflows] if workflows.kind_of?(self)
logger.info("checking #{workflows.count} workflows...")

run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_thread = Thread.new do
loop do
Runner.for_resource("docker").wait do |event, runner_context|
queue.push([event, runner_context])
run_until = Time.now.utc + timeout if timeout.to_i > 0
ready = []
queue = Queue.new
wait_threads =
Runner.runners.map do |runner|
next unless runner.respond_to?(:wait)

Thread.new do
loop do
runner.wait do |event, runner_context|
queue.push([event, runner_context])
end
end
end
end
end

loop do
ready = workflows.select(&:step_nonblock_ready?)
Expand Down Expand Up @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block)
logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready")
ready
ensure
wait_thread&.kill
wait_threads.compact.map(&:kill)
end
end

Expand Down

0 comments on commit 7e419b0

Please sign in to comment.