Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map state add tolerated failure #282

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
require_relative "floe/workflow/states/non_terminal_mixin"
require_relative "floe/workflow/states/parallel"
require_relative "floe/workflow/states/pass"
require_relative "floe/workflow/states/retry_catch_mixin"
require_relative "floe/workflow/states/succeed"
require_relative "floe/workflow/states/task"
require_relative "floe/workflow/states/wait"
Expand Down
47 changes: 39 additions & 8 deletions lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

require_relative "input_output_mixin"
require_relative "non_terminal_mixin"
require_relative "retry_catch_mixin"

module Floe
class Workflow
module States
class Map < Floe::Workflow::State
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin

attr_reader :end, :next, :parameters, :input_path, :output_path, :result_path,
:result_selector, :retry, :catch, :item_processor, :items_path,
Expand Down Expand Up @@ -36,8 +38,8 @@ def initialize(workflow, name, payload)
@item_batcher = payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]
@tolerated_failure_count = payload["ToleratedFailureCount"]
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i
@tolerated_failure_count = payload["ToleratedFailureCount"]&.to_i

validate_state!(workflow)
end
Expand All @@ -58,8 +60,14 @@ def start(context)
end

def finish(context)
result = context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx).output }
context.output = process_output(context, result)
if success?(context)
result = each_item_processor(context).map(&:output)
context.output = process_output(context, result)
else
error = parse_error(context)
retry_state!(context, error) || catch_error!(context, error) || fail_workflow!(context, error)
end

super
end

Expand All @@ -77,27 +85,46 @@ def end?
end

def ready?(context)
!context.state_started? || context.state["ItemProcessorContext"].any? { |ctx| item_processor.step_nonblock_ready?(Context.new(ctx)) }
!context.state_started? || each_item_processor(context).any? { |ctx| item_processor.step_nonblock_ready?(ctx) }
end

def wait_until(context)
context.state["ItemProcessorContext"].filter_map { |ctx| item_processor.wait_until(Context.new(ctx)) }.min
each_item_processor(context).filter_map { |ctx| item_processor.wait_until(ctx) }.min
end

def waiting?(context)
context.state["ItemProcessorContext"].any? { |ctx| item_processor.waiting?(Context.new(ctx)) }
each_item_processor(context).any? { |ctx| item_processor.waiting?(ctx) }
end

def running?(context)
!ended?(context)
end

def ended?(context)
context.state["ItemProcessorContext"].all? { |ctx| Context.new(ctx).ended? }
each_item_processor(context).all?(&:ended?)
end

def success?(context)
contexts = each_item_processor(context)
num_failed = contexts.count(&:failed?)
total = contexts.count

return true if num_failed.zero? || total.zero?
return true if tolerated_failure_percentage && tolerated_failure_percentage == 100
# Some have failed, check the tolerated_failure thresholds to see if
# we should fail the whole state.
return true if tolerated_failure_count && num_failed < tolerated_failure_count
return true if tolerated_failure_percentage && (100 * num_failed / total.to_f) < tolerated_failure_percentage
Comment on lines +116 to +117
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is supposed to happen if both count and percentage are given (or is that not allowed)? I can think of percentages where you want, say, the minimum of 50% or 3. In that case, I think you need to check both clauses, and then return, as opposed to bailing out on the first one.

(This could have fallen through with the flip from failed? to success?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specs tend to say one vs the other.
In the branch that I had with all error checkings, I only allow or the other.

Also, you should not be able to state Next and End at the same time.
But in the short term, we've been letting these cases slide

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the states language spec:

If a "ToleratedFailurePercentage" field and a "ToleratedFailureCount" field are both specified, the Map State will fail if either threshold is breached.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an additional nuance here that I'm wondering if we need to code explicitly (not 100% sure). The spec says

A Map State MAY have a "ToleratedFailurePercentage" field whose value MUST be a number between zero and 100. Its default value is zero, which means the Map State will fail if any (i.e. more than 0%) of its items fail. A "ToleratedFailurePercentage" value of 100 means the interpreter will continue starting iterations even if all items fail.

So if 0 or 100 are specified, then those have defined meanings, and I'm concerned that silly floating point math might let those fall through the cracks? I wonder if we should have at least some tests for those specific values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another very strange question but is it possible for total to be 0 here or does some earlier check avoid that? Asking because this is a potential divide by zero here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the states language spec:

If a "ToleratedFailurePercentage" field and a "ToleratedFailureCount" field are both specified, the Map State will fail if either threshold is breached.

Right this will report a failure if either threshold is hit

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another very strange question but is it possible for total to be 0 here or does some earlier check avoid that? Asking because this is a potential divide by zero here.

Technically this will return early if total is zero because num_failed will also be zero, but I can add an additional / explicit total.zero? above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right this will report a failure if either threshold is hit

The way it's coded though, I'm not sure it does? Taking a (very) contrived example, if we had 4 items, 2 failures, threshold count of 2, and the threshold % of 25%, then the current code will return success true, but should return false.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay I added an explicit check for ToleratedFailurePercentage==100 (interesting the spec says it is an integer not a float so that made == 100 easier, I did &.to_i in the initialize)
I think all concerns here are convered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way it's coded though, I'm not sure it does? Taking a (very) contrived example, if we had 4 items, 2 failures, threshold count of 2, and the threshold % of 25%, then the current code will return success true, but should return false.

Oh yeah, this did flip when going from failed? to success?


false
end

private

def each_item_processor(context)
context.state["ItemProcessorContext"].map { |ctx| Context.new(ctx) }
end

def step_nonblock!(context)
item_processor_context = Context.new(context.state["ItemProcessorContext"][context.state["Iteration"]])
item_processor.run_nonblock(item_processor_context) if item_processor.step_nonblock_ready?(item_processor_context)
Expand All @@ -109,6 +136,10 @@ def step_nonblock!(context)
end
end

def parse_error(context)
each_item_processor(context).detect(&:failed?)&.output&.dig("Error")
end

def validate_state!(workflow)
validate_state_next!(workflow)
end
Expand Down
57 changes: 57 additions & 0 deletions lib/floe/workflow/states/retry_catch_mixin.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# frozen_string_literal: true

module Floe
class Workflow
module States
module RetryCatchMixin
def find_retrier(error)
self.retry.detect { |r| r.match_error?(error) }
end

def find_catcher(error)
self.catch.detect { |c| c.match_error?(error) }
end

def retry_state!(context, error)
retrier = find_retrier(error["Error"]) if error
return if retrier.nil?

# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
context["State"]["RetryCount"] = 0
context["State"]["Retrier"] = retrier.error_equals
end

context["State"]["RetryCount"] += 1

return if context["State"]["RetryCount"] > retrier.max_attempts

wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end

def catch_error!(context, error)
catcher = find_catcher(error["Error"]) if error
return if catcher.nil?

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")
Comment on lines +37 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

catcher and retrier always seemed like the same thing.

you try and match it, and if it matches, then you set the next_state / output

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are similar for sure, not sure the same thing though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore ^

I can play with this refactor later


true
end

def fail_workflow!(context, error)
# next_state is nil, and will be set to nil again in super
# keeping in here for completeness
context.next_state = nil
context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end
end
end
end
end
53 changes: 5 additions & 48 deletions lib/floe/workflow/states/task.rb
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
# frozen_string_literal: true

require_relative "input_output_mixin"
require_relative "non_terminal_mixin"
require_relative "retry_catch_mixin"
Comment on lines +3 to +5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like these belong with the other requires in the global floe.rb

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is there as well but keeping those alphabetic means this fails to resolve the constant.
Alternatively, I can move all of the _mixin requires above the others in floe.rb

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like making the require not alphabetical.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the require being in the place that needs it and not trying to solve a dependency graph in floe.rb 😆

Copy link
Member

@kbrock kbrock Oct 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

huh.

We have been putting all requires up front in floe.rb.
(I did not just say: we have been doing it this way and we have to continue doing that)

Do we want to get away from that?

wondering if InputOutputMixin and the others do not belong in /states/.
That way we can just require these and then all states and not have to deal with orders.

Does look like these 3 mixins are used by only states, so alternatively we can just move the mixins up front.

require_relative "floe/workflow/state"
require_relative "floe/workflow/states/input_output_mixin"
require_relative "floe/workflow/states/non_terminal_mixin"
# require_relative "floe/workflow/states/*_mixin"
require_relative "floe/workflow/states/choice"
require_relative "floe/workflow/states/fail"
require_relative "floe/workflow/states/map"
require_relative "floe/workflow/states/parallel"
require_relative "floe/workflow/states/pass"
require_relative "floe/workflow/states/succeed"
require_relative "floe/workflow/states/task"
require_relative "floe/workflow/states/wait"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have been putting all requires up front in floe.rb.
(I did not just say: we have been doing it this way and we have to continue doing that)

No I don't think we need to stop doing that, I think of require 'floe' as "just require everything" but if a particular class needs something specific it should require it that way a caller could require 'floe/workflow/context' and not having to bring in the memory of the entire gem.


module Floe
class Workflow
module States
class Task < Floe::Workflow::State
include InputOutputMixin
include NonTerminalMixin
include RetryCatchMixin

attr_reader :credentials, :end, :heartbeat_seconds, :next, :parameters,
:result_selector, :resource, :timeout_seconds, :retry, :catch,
Expand Down Expand Up @@ -82,54 +87,6 @@ def success?(context)
runner.success?(context.state["RunnerContext"])
end

def find_retrier(error)
self.retry.detect { |r| r.match_error?(error) }
end

def find_catcher(error)
self.catch.detect { |c| c.match_error?(error) }
end

def retry_state!(context, error)
retrier = find_retrier(error["Error"]) if error
return if retrier.nil?

# If a different retrier is hit reset the context
if !context["State"].key?("RetryCount") || context["State"]["Retrier"] != retrier.error_equals
context["State"]["RetryCount"] = 0
context["State"]["Retrier"] = retrier.error_equals
end

context["State"]["RetryCount"] += 1

return if context["State"]["RetryCount"] > retrier.max_attempts

Comment on lines -94 to -106
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always wanted the retrier / catcher to act just like a State.

You ask - do you have a retrier/catcher for me?
And if you do, then you run_nonblock it just like you do for the current state.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignore ^ - I can play with this later

wait_until!(context, :seconds => retrier.sleep_duration(context["State"]["RetryCount"]))
context.next_state = context.state_name
context.output = error
logger.info("Running state: [#{long_name}] with input [#{context.json_input}] got error[#{context.json_output}]...Retry - delay: #{wait_until(context)}")
true
end

def catch_error!(context, error)
catcher = find_catcher(error["Error"]) if error
return if catcher.nil?

context.next_state = catcher.next
context.output = catcher.result_path.set(context.input, error)
logger.info("Running state: [#{long_name}] with input [#{context.json_input}]...CatchError - next state: [#{context.next_state}] output: [#{context.json_output}]")

true
end

def fail_workflow!(context, error)
# next_state is nil, and will be set to nil again in super
# keeping in here for completeness
context.next_state = nil
context.output = error
logger.error("Running state: [#{long_name}] with input [#{context.json_input}]...Complete workflow - output: [#{context.json_output}]")
end

def parse_error(output)
return if output.nil?
return output if output.kind_of?(Hash)
Expand Down
120 changes: 120 additions & 0 deletions spec/workflow/states/map_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
}
}
end

let(:tolerated_failure_count) { nil }
let(:tolerated_failure_percentage) { nil }
let(:workflow) do
payload = {
"Validate-All" => {
Expand All @@ -38,6 +41,10 @@
"End" => true,
}
}

payload["Validate-All"]["ToleratedFailureCount"] = tolerated_failure_count if tolerated_failure_count
payload["Validate-All"]["ToleratedFailurePercentage"] = tolerated_failure_percentage if tolerated_failure_percentage

make_workflow(ctx, payload)
end

Expand Down Expand Up @@ -145,4 +152,117 @@
end
end
end

describe "#running?" do
before { state.start(ctx) }

context "with all iterations ended" do
before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } }

it "returns false" do
expect(state.running?(ctx)).to be_falsey
end
end

context "with some iterations not ended" do
before { ctx.state["ItemProcessorContext"][0]["Execution"]["EndTime"] = Time.now.utc }

it "returns true" do
expect(state.running?(ctx)).to be_truthy
end
end
end

describe "#ended?" do
before { state.start(ctx) }

context "with all iterations ended" do
before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["Execution"]["EndTime"] = Time.now.utc } }

it "returns true" do
expect(state.ended?(ctx)).to be_truthy
end
end

context "with some iterations not ended" do
before { ctx.state["ItemProcessorContext"][0]["Execution"]["EndTime"] = Time.now.utc }

it "returns false" do
expect(state.ended?(ctx)).to be_falsey
end
end
end

describe "#success?" do
before { state.start(ctx) }

context "with no failed iterations" do
it "returns true" do
expect(state.success?(ctx)).to be_truthy
end
end

context "with no iterations" do
let(:input) { {"detail" => {"shipped" => []}} }

it "returns true" do
expect(state.success?(ctx)).to be_truthy
end
end

context "with all iterations failed" do
before { ctx.state["ItemProcessorContext"].each { |ctx| ctx["State"] = {"Output" => {"Error" => "FAILED!"}}} }

it "returns false" do
expect(state.success?(ctx)).to be_falsey
end
end

context "with mixed successful and failed iterations" do
before do
ctx.state["ItemProcessorContext"][0]["State"] = {"Output" => {"Error" => "FAILED!"}}
ctx.state["ItemProcessorContext"][2]["State"] = {"Output" => {"Error" => "FAILED!"}}
end

it "returns true" do
expect(state.success?(ctx)).to be_falsey
end

context "with ToleratedFailureCount" do
context "greater than the number of failures" do
let(:tolerated_failure_count) { 3 }

it "returns false" do
expect(state.success?(ctx)).to be_truthy
end
end

context "less than the number of failures" do
let(:tolerated_failure_count) { 1 }

it "returns true" do
expect(state.success?(ctx)).to be_falsey
end
end
end

context "with ToleratedFailurePercentage" do
context "greater than the number of failures" do
let(:tolerated_failure_percentage) { 50 }

it "returns false" do
expect(state.success?(ctx)).to be_truthy
end
end

context "less than the number of failures" do
let(:tolerated_failure_percentage) { 10 }

it "returns true" do
expect(state.success?(ctx)).to be_falsey
end
end
end
end
end
end