Skip to content

Commit

Permalink
Revert "Revert "Rename buffer to chunk""
Browse files Browse the repository at this point in the history
This reverts commit c6ceaf5.
  • Loading branch information
antonmi committed Apr 22, 2024
1 parent 5459588 commit c187a5f
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 21 deletions.
16 changes: 8 additions & 8 deletions lib/gen_mix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@ defmodule Strom.GenMix do

use GenServer

@buffer 1
@chunk 1

defstruct pid: nil,
inputs: [],
outputs: [],
opts: [],
running: false,
buffer: @buffer,
chunk: @chunk,
producers: %{},
consumers: %{}

Expand All @@ -21,7 +21,7 @@ defmodule Strom.GenMix do
def start(%__MODULE__{opts: opts} = gen_mix) when is_list(opts) do
gen_mix = %{
gen_mix
| buffer: Keyword.get(opts, :buffer, @buffer)
| chunk: Keyword.get(opts, :chunk, @chunk)
}

start_link(gen_mix)
Expand All @@ -44,17 +44,17 @@ defmodule Strom.GenMix do
GenServer.call(pid, :stop)
end

defp run_inputs(streams, pid, buffer) do
defp run_inputs(streams, pid, chunk) do
Enum.reduce(streams, %{}, fn {{name, fun}, stream}, acc ->
task = async_run_stream({name, fun}, stream, buffer, pid)
task = async_run_stream({name, fun}, stream, chunk, pid)
Map.put(acc, {name, fun}, task)
end)
end

defp async_run_stream({name, fun}, stream, buffer, pid) do
defp async_run_stream({name, fun}, stream, chunk, pid) do
Task.async(fn ->
stream
|> Stream.chunk_every(buffer)
|> Stream.chunk_every(chunk)
|> Stream.each(fn chunk ->
{chunk, _} = Enum.split_with(chunk, fun)
GenServer.cast(pid, {:new_data, {name, fun}, chunk})
Expand Down Expand Up @@ -97,7 +97,7 @@ defmodule Strom.GenMix do
{Map.put(flow, name, stream), mix}
end)

producers = run_inputs(input_streams, mix.pid, mix.buffer)
producers = run_inputs(input_streams, mix.pid, mix.chunk)

flow =
flow
Expand Down
16 changes: 8 additions & 8 deletions lib/transformer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ defmodule Strom.Transformer do

use GenServer

@buffer 1
@chunk 1

defstruct pid: nil,
running: false,
opts: [],
buffer: @buffer,
chunk: @chunk,
function: nil,
acc: nil,
names: [],
Expand All @@ -63,7 +63,7 @@ defmodule Strom.Transformer do

@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{opts: opts} = transformer) do
transformer = %{transformer | buffer: Keyword.get(opts, :buffer, @buffer)}
transformer = %{transformer | chunk: Keyword.get(opts, :chunk, @chunk)}

{:ok, pid} = start_link(transformer)
__state__(pid)
Expand Down Expand Up @@ -139,17 +139,17 @@ defmodule Strom.Transformer do

def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

defp run_inputs(streams, pid, buffer) do
defp run_inputs(streams, pid, chunk) do
Enum.reduce(streams, %{}, fn {{name, fun, acc}, stream}, streams_acc ->
task = async_run_stream({name, fun, acc}, stream, buffer, pid)
task = async_run_stream({name, fun, acc}, stream, chunk, pid)
Map.put(streams_acc, name, task)
end)
end

defp async_run_stream({name, fun, acc}, stream, buffer, pid) do
defp async_run_stream({name, fun, acc}, stream, chunk, pid) do
Task.async(fn ->
stream
|> Stream.chunk_every(buffer)
|> Stream.chunk_every(chunk)
|> Stream.transform(acc, fn chunk, acc ->
{chunk, new_acc} =
Enum.reduce(chunk, {[], acc}, fn el, {events, acc} ->
Expand Down Expand Up @@ -183,7 +183,7 @@ defmodule Strom.Transformer do

@impl true
def handle_call({:run_inputs, streams_to_call}, _from, %__MODULE__{} = transformer) do
tasks = run_inputs(streams_to_call, transformer.pid, transformer.buffer)
tasks = run_inputs(streams_to_call, transformer.pid, transformer.chunk)

{:reply, :ok, %{transformer | running: true, tasks: tasks}}
end
Expand Down
4 changes: 2 additions & 2 deletions test/examples/parcels_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ defmodule Strom.Examples.ParcelsTest do
def components() do
[
source(:orders, ReadLines.new("test/examples/parcels/orders.csv")),
transform([:orders], &__MODULE__.build_order/1, nil, buffer: 1000),
transform([:orders], &__MODULE__.build_order/1, nil, chunk: 1000),
source(:parcels, ReadLines.new("test/examples/parcels/parcels.csv")),
transform([:parcels], &__MODULE__.build_parcel/1, nil, buffer: 1000),
transform([:parcels], &__MODULE__.build_parcel/1, nil, chunk: 1000),
mix([:orders, :parcels], :mixed),
transform([:mixed], &ParcelsFlow.force_order/2, %{}),
transform([:mixed], &ParcelsFlow.decide/2, %{}),
Expand Down
2 changes: 1 addition & 1 deletion test/mixer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Strom.MixerTest do
setup do
mixer =
[:stream1, :stream2]
|> Mixer.new(:stream, buffer: 1)
|> Mixer.new(:stream, chunk: 1)
|> Mixer.start()

%{mixer: mixer}
Expand Down
4 changes: 2 additions & 2 deletions test/transformer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ defmodule Strom.TransformerTest do

transformer =
[:numbers1, :numbers2]
|> Transformer.new(fun, 100, buffer: 2)
|> Transformer.new(fun, 100, chunk: 2)
|> Transformer.start()

assert transformer.buffer == 2
assert transformer.chunk == 2

flow = %{numbers1: [1, 2, 3, 4, 5], numbers2: [6, 7, 8, 9, 10], numbers3: [0, 0, 0, 0, 0]}

Expand Down

0 comments on commit c187a5f

Please sign in to comment.