From 57ec3e1f48b9a27891edcebd045f40abac7b4826 Mon Sep 17 00:00:00 2001 From: Eric Saxby Date: Fri, 26 Jul 2024 13:04:59 -0700 Subject: [PATCH] Add Supra.stream_by --- CHANGELOG.md | 2 + lib/supra.ex | 65 +++++++++++++++++++++++++++++ mix.exs | 1 + mix.lock | 1 + test/supra_test.exs | 99 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 168 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index d6fb713..8d4036f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## Unreleased +- Add `Supra.stream_by` for streaming queries outside of transactions. + ## v1.0.0 - Verify support for Elixir 1.17.0. diff --git a/lib/supra.ex b/lib/supra.ex index 6b22a1d..be226c0 100644 --- a/lib/supra.ex +++ b/lib/supra.ex @@ -14,6 +14,9 @@ defmodule Supra do @type result(type) :: {:ok, type} | {:error, Ecto.Changeset.t(type)} @type result(ok_t, error_t) :: {:ok, ok_t} | {:error, Ecto.Changeset.t(error_t)} + @type stream_opts() :: [stream_opt()] + @type stream_opt() :: {:repo, module()} | {:order, :asc | :desc} | {:preload, term()} + # # # @doc "Returns the number of rows in `queryable`" @@ -45,4 +48,66 @@ defmodule Supra do @spec limit(Ecto.Queryable.t(), non_neg_integer()) :: Ecto.Queryable.t() def limit(queryable, count) when is_integer(count) and count >= 0, do: queryable |> Ecto.Query.limit(^count) + + @doc """ + Streams an Ecto query without requiring a transaction. + Must be given the name of a non-nullable field to iterate over in batches. + + ## Options + + - `repo :: module()` required - An `Ecto.Repo` execute queries. + - `order :: :asc | :desc` default `:asc` - The order in which to iterate over batches. + - `preload :: term()` optional - An optional set of preloads to apply to each batch before + emitting members to the stream. + """ + @spec stream_by(Ecto.Query.t(), atom(), stream_opts()) :: Enum.t() + def stream_by(query, field, opts), + do: + Stream.unfold(nil, &Supra.Stream.get_next_batch(query, field, &1, opts)) + |> Stream.flat_map(& &1) + + defmodule Stream do + require Ecto.Query + + @batch_size 100 + + def get_next_batch(query, field, last_field_value, opts) do + repo = Keyword.get(opts, :repo) || raise("") + + case query_batch( + repo, + Ecto.Query.exclude(query, :order_by), + field, + last_field_value, + Keyword.get(opts, :order, :asc), + Keyword.get(opts, :preload, []) + ) do + [] -> + nil + + batch -> + last = List.last(batch) + {batch, Map.get(last, field)} + end + end + + def query_batch(repo, query, field, last_field_value, order, preloads) do + query + |> then(fn query -> + if last_field_value, + do: next_batch(query, field, order, last_field_value), + else: query + end) + |> Ecto.Query.order_by([{^order, ^field}]) + |> Ecto.Query.limit(^@batch_size) + |> repo.all() + |> repo.preload(preloads) + end + + def next_batch(query, field, :asc, last_value), + do: Ecto.Query.where(query, [entity], field(entity, ^field) > ^last_value) + + def next_batch(query, field, :desc, last_value), + do: Ecto.Query.where(query, [entity], field(entity, ^field) < ^last_value) + end end diff --git a/mix.exs b/mix.exs index e92d8de..58ab59b 100644 --- a/mix.exs +++ b/mix.exs @@ -54,6 +54,7 @@ defmodule Supra.MixProject do {:dialyxir, "~> 1.1", only: [:dev, :test], runtime: false}, {:ecto, "~> 3.0"}, {:ecto_sql, "~> 3.0", optional: true}, + {:ecto_temp, "~> 1.2", only: :test, runtime: false}, {:ex_doc, "~> 0.28", only: [:docs, :dev], runtime: false}, {:mix_audit, "~> 2.0", only: :dev, runtime: false}, {:mix_test_watch, "~> 1.2", runtime: false, only: :dev}, diff --git a/mix.lock b/mix.lock index 048999d..7d2fc8d 100644 --- a/mix.lock +++ b/mix.lock @@ -7,6 +7,7 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "ecto": {:hex, :ecto, "3.11.2", "e1d26be989db350a633667c5cda9c3d115ae779b66da567c68c80cfb26a8c9ee", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3c38bca2c6f8d8023f2145326cc8a80100c3ffe4dcbd9842ff867f7fc6156c65"}, "ecto_sql": {:hex, :ecto_sql, "3.11.3", "4eb7348ff8101fbc4e6bbc5a4404a24fecbe73a3372d16569526b0cf34ebc195", [:mix], [{:db_connection, "~> 2.4.1 or ~> 2.5", [hex: :db_connection, repo: "hexpm", optional: false]}, {:ecto, "~> 3.11.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:myxql, "~> 0.6", [hex: :myxql, repo: "hexpm", optional: true]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}, {:tds, "~> 2.1.1 or ~> 2.2", [hex: :tds, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4.0 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "e5f36e3d736b99c7fee3e631333b8394ade4bafe9d96d35669fca2d81c2be928"}, + "ecto_temp": {:hex, :ecto_temp, "1.2.0", "9bda89268d7dd244a1323f92a7cc40d3ad070c89f0fe6c1db729db48d261db8e", [:mix], [{:ecto, ">= 3.0.0", [hex: :ecto, repo: "hexpm", optional: false]}, {:ecto_sql, "> 3.0.0", [hex: :ecto_sql, repo: "hexpm", optional: false]}], "hexpm", "fc9c9d501c48c169e8c435c2e3c364e4a6f708b45859ab6bc64690e5f4803be7"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, diff --git a/test/supra_test.exs b/test/supra_test.exs index 48cf42b..a323578 100644 --- a/test/supra_test.exs +++ b/test/supra_test.exs @@ -39,4 +39,103 @@ defmodule SupraTest do assert Test.Schemas.House |> Supra.limit(1) |> Supra.count(repo: Test.Repo) == 1 end end + + describe "stream_by" do + use EctoTemp, repo: Test.Repo + + require EctoTemp.Factory + + deftemptable :data_temp do + column(:value, :string, null: false) + end + + setup do + create_temp_tables() + :ok + end + + defmodule Data do + use Ecto.Schema + import Ecto.Changeset, only: [cast: 3] + + @primary_key false + schema "data_temp" do + field(:value, :string) + end + + def padded(int), + do: String.pad_leading(to_string(int), 4, "0") + + def changeset(attrs), do: cast(%__MODULE__{}, Map.new(attrs), ~w[value]a) + + defmodule Query do + import Ecto.Query + def base, do: from(_ in Data, as: :data) + + def where_greater_than(query \\ base(), int) do + value = "value-#{Data.padded(int)}" + where(query, [data: d], d.value > ^value) + end + end + end + + setup %{max_value: max} do + for int <- 1..max do + EctoTemp.Factory.insert(:data_temp, value: "value-#{Data.padded(int)}") + end + + :ok + end + + @tag max_value: 50 + test "handles datasets less than batch size" do + Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo) + |> Enum.count() + |> assert_eq(50) + + assert [%{value: "value-0001"}] = + Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo) + |> Stream.take(1) + |> Enum.to_list() + + assert [%{value: "value-0050"}] = + Supra.stream_by(Data.Query.base(), :value, order: :desc, repo: Test.Repo) + |> Stream.take(1) + |> Enum.to_list() + end + + @tag max_value: 345 + test "handles datasets greater than batch size" do + Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo) + |> Enum.count() + |> assert_eq(345) + + assert [%{value: "value-0001"}] = + Supra.stream_by(Data.Query.base(), :value, repo: Test.Repo) + |> Stream.take(1) + |> Enum.to_list() + + assert [%{value: "value-0345"}] = + Supra.stream_by(Data.Query.base(), :value, order: :desc, repo: Test.Repo) + |> Stream.take(1) + |> Enum.to_list() + end + + @tag max_value: 250 + test "handles queries with existing where clauses" do + Supra.stream_by(Data.Query.where_greater_than(177), :value, repo: Test.Repo) + |> Enum.count() + |> assert_eq(250 - 177) + + assert [%{value: "value-0178"}] = + Supra.stream_by(Data.Query.where_greater_than(177), :value, repo: Test.Repo) + |> Stream.take(1) + |> Enum.to_list() + + assert [%{value: "value-0250"}] = + Supra.stream_by(Data.Query.where_greater_than(177), :value, order: :desc, repo: Test.Repo) + |> Stream.take(1) + |> Enum.to_list() + end + end end