From e25d67ec52c63c2d1e3d2b75f475a89d29c8ceec Mon Sep 17 00:00:00 2001 From: Keenan Brock Date: Tue, 14 May 2024 12:43:23 -0400 Subject: [PATCH 1/3] generic runners --- lib/floe/runner.rb | 4 ++++ lib/floe/workflow.rb | 23 ++++++++++++++--------- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/lib/floe/runner.rb b/lib/floe/runner.rb index 4fb89b88..c769b43e 100644 --- a/lib/floe/runner.rb +++ b/lib/floe/runner.rb @@ -27,6 +27,10 @@ def for_resource(resource) scheme = resource.split("://").first resolve_scheme(scheme) || raise(ArgumentError, "Invalid resource scheme [#{scheme}]") end + + def runners + @runners.each_value.map { |runner| runner.kind_of?(Proc) ? runner.call : runner } + end end # Run a command asynchronously and create a runner_context diff --git a/lib/floe/workflow.rb b/lib/floe/workflow.rb index 47543509..2899b8c3 100644 --- a/lib/floe/workflow.rb +++ b/lib/floe/workflow.rb @@ -20,16 +20,21 @@ def wait(workflows, timeout: nil, &block) workflows = [workflows] if workflows.kind_of?(self) logger.info("checking #{workflows.count} workflows...") - run_until = Time.now.utc + timeout if timeout.to_i > 0 - ready = [] - queue = Queue.new - wait_thread = Thread.new do - loop do - Runner.for_resource("docker").wait do |event, runner_context| - queue.push([event, runner_context]) + run_until = Time.now.utc + timeout if timeout.to_i > 0 + ready = [] + queue = Queue.new + wait_threads = + Runner.runners.map do |runner| + next unless runner.respond_to?(:wait) + + Thread.new do + loop do + runner.wait do |event, runner_context| + queue.push([event, runner_context]) + end + end end end - end loop do ready = workflows.select(&:step_nonblock_ready?) @@ -81,7 +86,7 @@ def wait(workflows, timeout: nil, &block) logger.info("checking #{workflows.count} workflows...Complete - #{ready.count} ready") ready ensure - wait_thread&.kill + wait_threads.compact.map(&:kill) end end From d60853171b3ee75396538b43c0748a458200918e Mon Sep 17 00:00:00 2001 From: Keenan Brock Date: Tue, 30 Apr 2024 22:26:52 -0400 Subject: [PATCH 2/3] Add synchronous awesome spawn runner --- examples/awesome.asl | 27 +++++++++++++ lib/floe/awesome_runner.rb | 69 ++++++++++++++++++++++++++++++++ lib/floe/cli.rb | 1 + spec/awesome_runner_spec.rb | 78 +++++++++++++++++++++++++++++++++++++ 4 files changed, 175 insertions(+) create mode 100644 examples/awesome.asl create mode 100644 lib/floe/awesome_runner.rb create mode 100644 spec/awesome_runner_spec.rb diff --git a/examples/awesome.asl b/examples/awesome.asl new file mode 100644 index 00000000..24e497a3 --- /dev/null +++ b/examples/awesome.asl @@ -0,0 +1,27 @@ +{ + "Comment": "Directory Listing", + "StartAt": "a", + "States": { + "a":{ + "Type": "Pass", + "Next": "b" + }, + "b": { + "Type": "Wait", + "Seconds": 1, + "Next": "ls" + }, + "ls": { + "Type": "Task", + "Resource": "awesome://ls -l Gemfile", + "Comment": "awesome://ls -l $FILENAME", + "Next": "c", + "Parameters": { + "FILENAME" : "Gemfile" + } + }, + "c": { + "Type": "Succeed" + } + } +} diff --git a/lib/floe/awesome_runner.rb b/lib/floe/awesome_runner.rb new file mode 100644 index 00000000..2d165f21 --- /dev/null +++ b/lib/floe/awesome_runner.rb @@ -0,0 +1,69 @@ +# frozen_string_literal: true + +module Floe + class AwesomeRunner < Floe::Runner + SCHEME = "awesome" + SCHEME_PREFIX = "#{SCHEME}://" + SCHEME_OFFSET = SCHEME.length + 3 + + def initialize(_options = {}) + require "awesome_spawn" + + super + end + + # @return [Hash] runner_context + def run_async!(resource, params = {}, _secrets = {}, _context = {}) + raise ArgumentError, "Invalid resource" unless resource&.start_with?(SCHEME_PREFIX) + + args = resource[SCHEME_OFFSET..].split + method = args.shift + + runner_context = {} + + # TODO: fix sanitization preventing params in args (e.g.: $PARAM1 => \$PARAM1) + result = AwesomeSpawn.run(method, :env => params, :params => args) + self.class.populate_results!(runner_context, :result => result) + runner_context + end + + def status!(runner_context) + end + + def running?(runner_context) + !runner_context["Output"] + end + + def success?(runner_context) + !runner_context["Error"] + end + + def output(runner_context) + runner_context["Output"] + end + + def cleanup(runner_context) + end + + # internal methods + + def self.command_error_cause(command_result) + command_result.error.nil? || command_result.error.empty? ? command_result.output.to_s : command_result.error.to_s + end + + def self.populate_results!(runner_context, result: nil, error: nil) + error ||= command_error_cause(result) if result&.failure? + + if error + runner_context["Output"] = {"Error" => "States.TaskFailed", "Cause" => error} + runner_context["Error"] = true + else + runner_context["Output"] = {"Result" => result.output.chomp.split("\n")} + end + + runner_context + end + end +end + +Floe::Runner.register_scheme(Floe::AwesomeRunner::SCHEME, Floe::AwesomeRunner.new) diff --git a/lib/floe/cli.rb b/lib/floe/cli.rb index 03b448c0..29690766 100644 --- a/lib/floe/cli.rb +++ b/lib/floe/cli.rb @@ -4,6 +4,7 @@ def initialize require "optimist" require "floe" require "floe/container_runner" + require "floe/awesome_runner" require "logger" Floe.logger = Logger.new($stdout) diff --git a/spec/awesome_runner_spec.rb b/spec/awesome_runner_spec.rb new file mode 100644 index 00000000..94bdf923 --- /dev/null +++ b/spec/awesome_runner_spec.rb @@ -0,0 +1,78 @@ +require_relative "../lib/floe/awesome_runner" + +RSpec.describe Floe::AwesomeRunner, :uses_awesome_spawn => true do + let(:subject) { described_class.new(runner_options) } + let(:runner_options) { {} } + let(:container_id) { SecureRandom.hex } + + # let(:workflow) { make_workflow(ctx, {"State" => {"Type" => "Task", "Resource" => resource, "Parameters" => {"var1.$" => "$.foo.bar"}, "End" => true}}) } + + describe "#run_async!" do + it "raises an exception without a resource" do + expect { subject.run_async!(nil) }.to raise_error(ArgumentError, "Invalid resource") + end + + it "raises an exception for an invalid resource uri" do + expect { subject.run_async!("arn:abcd:efgh") }.to raise_error(ArgumentError, "Invalid resource") + end + + it "calls command run with the command name" do + stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n") + + subject.run_async!("awesome://ls") + end + + it "passes environment variables to command run" do + stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n") + + subject.run_async!("awesome://ls", {"FOO" => "BAR"}) + end + end + + # describe "#status!" do + # let(:runner_context) { {"container_ref" => container_id} } + + # it "returns the updated container_state" do + # stub_good_run!("ls", :params => ["inspect", container_id], :output => "[{\"State\": {\"Running\": true}}]") + + # subject.status!(runner_context) + + # expect(runner_context).to include("container_state" => {"Running" => true}) + # end + # end + + describe "#running?" do + # it "retuns true when running" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => true}} + # expect(subject.running?(runner_context)).to be_truthy + # end + + # it "retuns false when not running" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 0}} + # expect(subject.running?(runner_context)).to be_falsey + # end + end + + describe "#success?" do + # it "retuns true when successful" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 0}} + # expect(subject.success?(runner_context)).to be_truthy + # end + + # it "retuns false when not successful" do + # runner_context = {"container_ref" => container_id, "container_state" => {"Running" => false, "ExitCode" => 1}} + # expect(subject.success?(runner_context)).to be_falsey + # end + end + + describe "#output" do + let(:runner_context) { {"Output" => ["output1", "output2"]} } + + it "returns log output" do + expect(subject.output(runner_context)).to eq(["output1", "output2"]) + end + end + + # describe "#cleanup" do + # end +end From 514bed0f45d1a697883b8c510ff51ed0539d6e0c Mon Sep 17 00:00:00 2001 From: Keenan Brock Date: Tue, 30 Apr 2024 16:09:32 -0400 Subject: [PATCH 3/3] Async version of the awesome runner --- lib/floe/awesome_runner.rb | 67 +++++++++++++++++++++++++++++++++++-- spec/awesome_runner_spec.rb | 2 ++ 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/lib/floe/awesome_runner.rb b/lib/floe/awesome_runner.rb index 2d165f21..8520d9d5 100644 --- a/lib/floe/awesome_runner.rb +++ b/lib/floe/awesome_runner.rb @@ -1,14 +1,59 @@ # frozen_string_literal: true +require "concurrent/array" + module Floe + class AwesomeProcess < Thread + attr_reader :result + attr_accessor :error + + def initialize(queue, context, *args) + self.report_on_exception = true + @processed = false + @context = context + + # Don't like changing the value of context here, + # but want to make sure thread is set before the `queue.push` + # `queue.pop` will look potentially at status, which is through thread + context["thread"] = self + + super do + @result = AwesomeSpawn.run(*args) + + # this is changing the value of the context + # in the non-main thread + # Potential race condition here + Floe::AwesomeRunner.populate_results!(@context, :result => @result) + + # trigger an event + queue.push(["delete", context]) + rescue => err + # Shouldn't ever get in here + @error = err + + Floe::AwesomeRunner.populate_results!(@context, :error => err) + + # trigger an event + queue.push(["delete", context]) + end + end + end + class AwesomeRunner < Floe::Runner SCHEME = "awesome" SCHEME_PREFIX = "#{SCHEME}://" SCHEME_OFFSET = SCHEME.length + 3 + # only exposed for tests + # use wait instead + attr_reader :queue + def initialize(_options = {}) require "awesome_spawn" + # events triggered + @queue = Queue.new + super end @@ -21,13 +66,18 @@ def run_async!(resource, params = {}, _secrets = {}, _context = {}) runner_context = {} - # TODO: fix sanitization preventing params in args (e.g.: $PARAM1 => \$PARAM1) - result = AwesomeSpawn.run(method, :env => params, :params => args) - self.class.populate_results!(runner_context, :result => result) + # NOTE: this adds itself to the runner_context + AwesomeProcess.new(@queue, runner_context, method, :env => params, :params => args) + runner_context end def status!(runner_context) + # check if it has no output (i.e.: we think it is running) but it is not running + if !runner_context.key?("Output") && !runner_context["thread"]&.alive? + runner_context["Output"] = {"Error" => "Lambda.Unknown", "Cause" => "no output and no thread"} + runner_context["Error"] = true + end end def running?(runner_context) @@ -43,6 +93,17 @@ def output(runner_context) end def cleanup(runner_context) + runner_context["thread"] = nil + end + + def wait(timeout: nil, _events: %i[create update delete]) + # TODO: implement whole interface + raise "wait needs a block and doesn't support timeout" unless timeout.nil? && block_given? + + loop do + event_context = @queue.pop + yield event_context if block_given? + end end # internal methods diff --git a/spec/awesome_runner_spec.rb b/spec/awesome_runner_spec.rb index 94bdf923..aafedc32 100644 --- a/spec/awesome_runner_spec.rb +++ b/spec/awesome_runner_spec.rb @@ -20,12 +20,14 @@ stub_good_run("ls", :params => [], :env => {}, :output => "file\nlisting\n") subject.run_async!("awesome://ls") + subject.queue.pop end it "passes environment variables to command run" do stub_good_run("ls", :params => [], :env => {"FOO" => "BAR"}, :output => "file\nlisting\n") subject.run_async!("awesome://ls", {"FOO" => "BAR"}) + subject.queue.pop end end