Skip to content

Commit

Permalink
Add Map ItemBatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
agrare committed Nov 5, 2024
1 parent 1e674ae commit d691724
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 1 deletion.
1 change: 1 addition & 0 deletions lib/floe.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
require_relative "floe/workflow/choice_rule/and"
require_relative "floe/workflow/choice_rule/data"
require_relative "floe/workflow/context"
require_relative "floe/workflow/item_batcher"
require_relative "floe/workflow/item_processor"
require_relative "floe/workflow/intrinsic_function"
require_relative "floe/workflow/intrinsic_function/parser"
Expand Down
29 changes: 29 additions & 0 deletions lib/floe/workflow/item_batcher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# frozen_string_literal: true

module Floe
class Workflow
class ItemBatcher
include ValidationMixin

attr_reader :name, :batch_input, :max_items_per_batch, :max_items_per_batch_path, :max_input_bytes_per_batch, :max_input_bytes_per_batch_path

def initialize(payload, name)
@name = name

@batch_input = PayloadTemplate.new(payload["BatchInput"]) if payload["BatchInput"]
@max_items_per_batch = payload["MaxItemsPerBatch"]
@max_input_bytes_per_batch = payload["MaxInputBytesPerBatch"]

@max_items_per_batch_path = ReferencePath.new(payload["MaxItemsPerBatchPath"]) if payload["MaxItemsPerBatchPath"]
@max_input_bytes_per_batch_path = ReferencePath.new(payload["MaxInputBytesPerBatchPath"]) if payload["MaxInputBytesPerBatchPath"]

if [max_items_per_batch, max_input_bytes_per_batch, max_items_per_batch_path, max_input_bytes_per_batch_path].all?(&:nil?)
parser_error!("must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\"")
end

parser_error!("must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"") if max_items_per_batch && max_items_per_batch_path
parser_error!("must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"") if max_input_bytes_per_batch && max_input_bytes_per_batch_path
end
end
end
end
2 changes: 1 addition & 1 deletion lib/floe/workflow/states/map.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def initialize(workflow, name, payload)
@items_path = ReferencePath.new(payload.fetch("ItemsPath", "$"))
@item_reader = payload["ItemReader"]
@item_selector = payload["ItemSelector"]
@item_batcher = payload["ItemBatcher"]
@item_batcher = ItemBatcher.new(payload["ItemBatcher"], name + ["ItemBatcher"]) if payload["ItemBatcher"]
@result_writer = payload["ResultWriter"]
@max_concurrency = payload["MaxConcurrency"]&.to_i
@tolerated_failure_percentage = payload["ToleratedFailurePercentage"]&.to_i
Expand Down
83 changes: 83 additions & 0 deletions spec/workflow/item_batcher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
RSpec.describe Floe::Workflow::ItemBatcher do
let(:subject) { described_class.new(payload, ["Map", "ItemBatcher"]) }

describe "#initialize" do
context "with no MaxItems or MaxInputBytes" do
let(:payload) { {} }

it "raises an exception" do
expect { subject }
.to raise_error(
Floe::InvalidWorkflowError,
"Map.ItemBatcher must have one of \"MaxItemsPerBatch\", \"MaxItemsPerBatchPath\", \"MaxInputBytesPerBatch\", \"MaxInputBytesPerBatchPath\""
)
end
end

context "with MaxItemsPerBatch" do
let(:payload) { {"MaxItemsPerBatch" => 10} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_items_per_batch" do
expect(subject.max_items_per_batch).to eq(payload["MaxItemsPerBatch"])
end
end

context "with MaxInputBytesPerBatch" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch" do
expect(subject.max_input_bytes_per_batch).to eq(payload["MaxInputBytesPerBatch"])
end
end

context "with MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatchPath" => "$.maxBatchItems"} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_items_per_batch_path" do
expect(subject.max_items_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_items_per_batch_path).to have_attributes(:path => ["maxBatchItems"])
end
end

context "with MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "returns an ItemBatcher" do
expect(subject).to be_kind_of(described_class)
end

it "sets max_input_bytes_per_batch_path" do
expect(subject.max_input_bytes_per_batch_path).to be_kind_of(Floe::Workflow::ReferencePath)
expect(subject.max_input_bytes_per_batch_path).to have_attributes(:path => ["batchSize"])
end
end

context "with MaxItemsPerBatch and MaxItemsPerBatchPath" do
let(:payload) { {"MaxItemsPerBatch" => 10, "MaxItemsPerBatchPath" => "$.maxBatchItems"} }

it "raises an exception" do
expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxItemsPerBatch\" and \"MaxItemsPerBatchPath\"")
end
end

context "with MaxInputBytesPerBatch and MaxInputBytesPerBatchPath" do
let(:payload) { {"MaxInputBytesPerBatch" => 1_024, "MaxInputBytesPerBatchPath" => "$.batchSize"} }

it "raises an exception" do
expect { subject }.to raise_error(Floe::InvalidWorkflowError, "Map.ItemBatcher must not specify both \"MaxInputBytesPerBatch\" and \"MaxInputBytesPerBatchPath\"")
end
end
end
end

0 comments on commit d691724

Please sign in to comment.