Skip to content

Commit

Permalink
Add Supra.stream_by
Browse files Browse the repository at this point in the history
  • Loading branch information
sax committed Jul 26, 2024
1 parent 57fc096 commit 57ec3e1
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 0 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## Unreleased

- Add `Supra.stream_by` for streaming queries outside of transactions.

## v1.0.0

- Verify support for Elixir 1.17.0.
Expand Down
65 changes: 65 additions & 0 deletions lib/supra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ defmodule Supra do
@type result(type) :: {:ok, type} | {:error, Ecto.Changeset.t(type)}
@type result(ok_t, error_t) :: {:ok, ok_t} | {:error, Ecto.Changeset.t(error_t)}

@type stream_opts() :: [stream_opt()]
@type stream_opt() :: {:repo, module()} | {:order, :asc | :desc} | {:preload, term()}

# # #

@doc "Returns the number of rows in `queryable`"
Expand Down Expand Up @@ -45,4 +48,66 @@ defmodule Supra do
@spec limit(Ecto.Queryable.t(), non_neg_integer()) :: Ecto.Queryable.t()
def limit(queryable, count) when is_integer(count) and count >= 0,
do: queryable |> Ecto.Query.limit(^count)

@doc """
Streams an Ecto query without requiring a transaction.
Must be given the name of a non-nullable field to iterate over in batches.
## Options
- `repo :: module()` required - An `Ecto.Repo` execute queries.
- `order :: :asc | :desc` default `:asc` - The order in which to iterate over batches.
- `preload :: term()` optional - An optional set of preloads to apply to each batch before
emitting members to the stream.
"""
@spec stream_by(Ecto.Query.t(), atom(), stream_opts()) :: Enum.t()
def stream_by(query, field, opts),
do:
Stream.unfold(nil, &Supra.Stream.get_next_batch(query, field, &1, opts))
|> Stream.flat_map(& &1)

defmodule Stream do
require Ecto.Query

@batch_size 100

def get_next_batch(query, field, last_field_value, opts) do
repo = Keyword.get(opts, :repo) || raise("")

case query_batch(
repo,
Ecto.Query.exclude(query, :order_by),
field,
last_field_value,
Keyword.get(opts, :order, :asc),
Keyword.get(opts, :preload, [])
) do
[] ->
nil

batch ->
last = List.last(batch)
{batch, Map.get(last, field)}
end
end

def query_batch(repo, query, field, last_field_value, order, preloads) do
query
|> then(fn query ->
if last_field_value,
do: next_batch(query, field, order, last_field_value),
else: query
end)
|> Ecto.Query.order_by([{^order, ^field}])
|> Ecto.Query.limit(^@batch_size)
|> repo.all()
|> repo.preload(preloads)
end

def next_batch(query, field, :asc, last_value),
do: Ecto.Query.where(query, [entity], field(entity, ^field) > ^last_value)

def next_batch(query, field, :desc, last_value),
do: Ecto.Query.where(query, [entity], field(entity, ^field) < ^last_value)
end
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ defmodule Supra.MixProject do
{:dialyxir, "~> 1.1", only: [:dev, :test], runtime: false},
{:ecto, "~> 3.0"},
{:ecto_sql, "~> 3.0", optional: true},
{:ecto_temp, "~> 1.2", only: :test, runtime: false},
{:ex_doc, "~> 0.28", only: [:docs, :dev], runtime: false},
{:mix_audit, "~> 2.0", only: :dev, runtime: false},
{:mix_test_watch, "~> 1.2", runtime: false, only: :dev},
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"},
"ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"},
"ecto_sql": {:hex, :ecto_sql, "3.11.3", "4eb7348ff8101fbc4e6bbc5a4404a24fecbe73a3372d16569526b0cf34ebc195", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5f36e3d736b99c7fee3e631333b8394ade4bafe9d96d35669fca2d81c2be928"},
"ecto_temp": {:hex, :ecto_temp, "1.2.0", "9bda89268d7dd244a1323f92a7cc40d3ad070c89f0fe6c1db729db48d261db8e", [:mix], [{:ecto, ">= 3.0.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "> 3.0.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "fc9c9d501c48c169e8c435c2e3c364e4a6f708b45859ab6bc64690e5f4803be7"},
"erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
Expand Down
99 changes: 99 additions & 0 deletions test/supra_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,103 @@ defmodule SupraTest do
assert Test.Schemas.House |> Supra.limit(1) |> Supra.count(repo: Test.Repo) == 1
end
end

describe "stream_by" do
use EctoTemp, repo: Test.Repo

require EctoTemp.Factory

deftemptable :data_temp do
column(:value, :string, null: false)
end

setup do
create_temp_tables()
:ok
end

defmodule Data do
use Ecto.Schema
import Ecto.Changeset, only: [cast: 3]

@primary_key false
schema "data_temp" do
field(:value, :string)
end

def padded(int),
do: String.pad_leading(to_string(int), 4, "0")

def changeset(attrs), do: cast(%__MODULE__{}, Map.new(attrs), ~w[value]a)

defmodule Query do
import Ecto.Query
def base, do: from(_ in Data, as: :data)

def where_greater_than(query \\ base(), int) do
value = "value-#{Data.padded(int)}"
where(query, [data: d], d.value > ^value)
end
end
end

setup %{max_value: max} do
for int <- 1..max do
EctoTemp.Factory.insert(:data_temp, value: "value-#{Data.padded(int)}")
end

:ok
end

@tag max_value: 50
test "handles datasets less than batch size" do
Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo)
|> Enum.count()
|> assert_eq(50)

assert [%{value: "value-0001"}] =
Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo)
|> Stream.take(1)
|> Enum.to_list()

assert [%{value: "value-0050"}] =
Supra.stream_by(Data.Query.base(), :value, order: :desc, repo: Test.Repo)
|> Stream.take(1)
|> Enum.to_list()
end

@tag max_value: 345
test "handles datasets greater than batch size" do
Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo)
|> Enum.count()
|> assert_eq(345)

assert [%{value: "value-0001"}] =
Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo)
|> Stream.take(1)
|> Enum.to_list()

assert [%{value: "value-0345"}] =
Supra.stream_by(Data.Query.base(), :value, order: :desc, repo: Test.Repo)
|> Stream.take(1)
|> Enum.to_list()
end

@tag max_value: 250
test "handles queries with existing where clauses" do
Supra.stream_by(Data.Query.where_greater_than(177), :value, repo: Test.Repo)
|> Enum.count()
|> assert_eq(250 - 177)

assert [%{value: "value-0178"}] =
Supra.stream_by(Data.Query.where_greater_than(177), :value, repo: Test.Repo)
|> Stream.take(1)
|> Enum.to_list()

assert [%{value: "value-0250"}] =
Supra.stream_by(Data.Query.where_greater_than(177), :value, order: :desc, repo: Test.Repo)
|> Stream.take(1)
|> Enum.to_list()
end
end
end

0 comments on commit 57ec3e1

Please sign in to comment.