Skip to content

Commit 6c624e4

Browse files
committed
return skipped upserts in bulk_create
1 parent 1130a02 commit 6c624e4

File tree

4 files changed

+132
-5
lines changed

4 files changed

+132
-5
lines changed

lib/data_layer.ex

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2029,6 +2029,66 @@ defmodule AshPostgres.DataLayer do
20292029
repo.insert_all(source, ecto_changesets, opts)
20302030
end)
20312031

2032+
result =
2033+
if options[:return_skipped_upsert?] do
2034+
identity = options[:identity]
2035+
[changeset | _] = changesets
2036+
2037+
results =
2038+
result
2039+
|> elem(1)
2040+
|> List.wrap()
2041+
|> Enum.reduce(%{}, fn r, acc ->
2042+
Map.put(acc, Map.take(r, identity.keys), r)
2043+
end)
2044+
2045+
ash_query =
2046+
resource
2047+
|> Ash.Query.do_filter(
2048+
or:
2049+
changesets
2050+
|> Enum.filter(fn changeset ->
2051+
not Map.has_key?(results, Map.take(changeset.attributes, identity.keys))
2052+
end)
2053+
|> Enum.map(fn changeset ->
2054+
changeset.attributes
2055+
|> Map.take(identity.keys)
2056+
|> Keyword.new()
2057+
end)
2058+
)
2059+
|> then(fn
2060+
query when is_nil(identity) or is_nil(identity.where) -> query
2061+
query -> Ash.Query.do_filter(query, identity.where)
2062+
end)
2063+
|> Ash.Query.set_tenant(changeset.tenant)
2064+
2065+
skipped_upserts =
2066+
with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
2067+
{:ok, results} <- run_query(ecto_query, resource) do
2068+
results
2069+
|> Enum.map(fn result ->
2070+
Ash.Resource.put_metadata(result, :upsert_skipped, true)
2071+
end)
2072+
|> Enum.reduce(%{}, fn r, acc ->
2073+
Map.put(acc, Map.take(r, identity.keys), r)
2074+
end)
2075+
end
2076+
2077+
results =
2078+
changesets
2079+
|> Enum.map(fn changeset ->
2080+
identity =
2081+
changeset.attributes
2082+
|> Map.take(identity.keys)
2083+
2084+
Map.get(results, identity, Map.get(skipped_upserts, identity))
2085+
end)
2086+
2087+
{length(results), results}
2088+
else
2089+
result
2090+
end
2091+
20322092
case result do
20332093
{_, nil} ->
20342094
:ok
@@ -2039,6 +2099,7 @@ defmodule AshPostgres.DataLayer do
20392099

20402100
{:ok, results}
20412101
else
2102+
# TODO: what if there are less results than changesets because of upsert conditions?
20422103
{:ok,
20432104
Stream.zip_with(results, changesets, fn result, changeset ->
20442105
if !opts[:upsert?] do

mix.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,7 @@ defmodule AshPostgres.MixProject do
169169
{:ash, ash_version("~> 3.5 and >= 3.5.35")},
170170
{:spark, "~> 2.3 and >= 2.3.4"},
171171
{:ash_sql, ash_sql_version("~> 0.2 and >= 0.2.90")},
172-
{:igniter, "~> 0.6 and >= 0.6.14", optional: true},
172+
{:igniter, "~> 0.6 and >= 0.6.29", optional: true},
173173
{:ecto_sql, "~> 3.13"},
174174
{:ecto, "~> 3.13"},
175175
{:jason, "~> 1.0"},
@@ -186,7 +186,7 @@ defmodule AshPostgres.MixProject do
186186
{:credo, ">= 0.0.0", only: [:dev, :test], runtime: false},
187187
{:mix_audit, ">= 0.0.0", only: [:dev, :test], runtime: false},
188188
{:dialyxir, ">= 0.0.0", only: [:dev, :test], runtime: false},
189-
{:sobelow, ">= 0.0.0", only: [:dev, :test], runtime: false}
189+
{:mix_test_watch, "~> 1.0", only: [:dev, :test]}
190190
]
191191
end
192192

mix.lock

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,19 @@
3333
"mime": {:hex, :mime, "2.0.7", "b8d739037be7cd402aee1ba0306edfdef982687ee7e9859bee6198c1e7e2f128", [:mix], [], "hexpm", "6171188e399ee16023ffc5b76ce445eb6d9672e2e241d2df6050f3c771e80ccd"},
3434
"mint": {:hex, :mint, "1.7.1", "113fdb2b2f3b59e47c7955971854641c61f378549d73e829e1768de90fc1abf1", [:mix], [{:castore, "~> 0.1.0 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:hpax, "~> 0.1.1 or ~> 0.2.0 or ~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}], "hexpm", "fceba0a4d0f24301ddee3024ae116df1c3f4bb7a563a731f45fdfeb9d39a231b"},
3535
"mix_audit": {:hex, :mix_audit, "2.1.5", "c0f77cee6b4ef9d97e37772359a187a166c7a1e0e08b50edf5bf6959dfe5a016", [:make, :mix], [{:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}], "hexpm", "87f9298e21da32f697af535475860dc1d3617a010e0b418d2ec6142bc8b42d69"},
36+
"mix_test_watch": {:hex, :mix_test_watch, "1.3.0", "2ffc9f72b0d1f4ecf0ce97b044e0e3c607c3b4dc21d6228365e8bc7c2856dc77", [:mix], [{:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}], "hexpm", "f9e5edca976857ffac78632e635750d158df14ee2d6185a15013844af7570ffe"},
3637
"nimble_options": {:hex, :nimble_options, "1.1.1", "e3a492d54d85fc3fd7c5baf411d9d2852922f66e69476317787a7b2bb000a61b", [:mix], [], "hexpm", "821b2470ca9442c4b6984882fe9bb0389371b8ddec4d45a9504f00a66f650b44"},
3738
"nimble_parsec": {:hex, :nimble_parsec, "1.4.2", "8efba0122db06df95bfaa78f791344a89352ba04baedd3849593bfce4d0dc1c6", [:mix], [], "hexpm", "4b21398942dda052b403bbe1da991ccd03a053668d147d53fb8c4e0efe09c973"},
3839
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
3940
"owl": {:hex, :owl, "0.13.0", "26010e066d5992774268f3163506972ddac0a7e77bfe57fa42a250f24d6b876e", [:mix], [{:ucwidth, "~> 0.2", [hex: :ucwidth, repo: "hexpm", optional: true]}], "hexpm", "59bf9d11ce37a4db98f57cb68fbfd61593bf419ec4ed302852b6683d3d2f7475"},
4041
"postgrex": {:hex, :postgrex, "0.21.1", "2c5cc830ec11e7a0067dd4d623c049b3ef807e9507a424985b8dcf921224cd88", [:mix], [{:db_connection, "~> 2.1", [hex: :db_connection, repo: "hexpm", optional: false]}, {:decimal, "~> 1.5 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:table, "~> 0.1.0", [hex: :table, repo: "hexpm", optional: true]}], "hexpm", "27d8d21c103c3cc68851b533ff99eef353e6a0ff98dc444ea751de43eb48bdac"},
41-
"reactor": {:hex, :reactor, "0.16.0", "394087fe0f01b09e5cbcbf6525d9a54cd484582214e0e9e59f69ebc8d79eb70c", [:mix], [{:igniter, "~> 0.4", [hex: :igniter, repo: "hexpm", optional: true]}, {:iterex, "~> 0.1", [hex: :iterex, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, "~> 2.0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}, {:ymlr, "~> 5.0", [hex: :ymlr, repo: "hexpm", optional: false]}], "hexpm", "9ac43e70a9a36c5a016b02b6c068933dfd36edc0e3abd9cd6325a30194900c66"},
42+
"reactor": {:hex, :reactor, "0.17.0", "eb8bdb530dbae824e2d36a8538f8ec4f3aa7c2d1b61b04959fa787c634f88b49", [:mix], [{:igniter, "~> 0.4", [hex: :igniter, repo: "hexpm", optional: true]}, {:iterex, "~> 0.1", [hex: :iterex, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:libgraph, "~> 0.16", [hex: :libgraph, repo: "hexpm", optional: false]}, {:spark, ">= 2.3.3 and < 3.0.0-0", [hex: :spark, repo: "hexpm", optional: false]}, {:splode, "~> 0.2", [hex: :splode, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.2", [hex: :telemetry, repo: "hexpm", optional: false]}, {:yaml_elixir, "~> 2.11", [hex: :yaml_elixir, repo: "hexpm", optional: false]}, {:ymlr, "~> 5.0", [hex: :ymlr, repo: "hexpm", optional: false]}], "hexpm", "3c3bf71693adbad9117b11ec83cfed7d5851b916ade508ed9718de7ae165bf25"},
4243
"req": {:hex, :req, "0.5.15", "662020efb6ea60b9f0e0fac9be88cd7558b53fe51155a2d9899de594f9906ba9", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "a6513a35fad65467893ced9785457e91693352c70b58bbc045b47e5eb2ef0c53"},
4344
"rewrite": {:hex, :rewrite, "1.1.2", "f5a5d10f5fed1491a6ff48e078d4585882695962ccc9e6c779bae025d1f92eda", [:mix], [{:glob_ex, "~> 0.1", [hex: :glob_ex, repo: "hexpm", optional: false]}, {:sourceror, "~> 1.0", [hex: :sourceror, repo: "hexpm", optional: false]}, {:text_diff, "~> 0.1", [hex: :text_diff, repo: "hexpm", optional: false]}], "hexpm", "7f8b94b1e3528d0a47b3e8b7bfeca559d2948a65fa7418a9ad7d7712703d39d4"},
4445
"simple_sat": {:hex, :simple_sat, "0.1.3", "f650fc3c184a5fe741868b5ac56dc77fdbb428468f6dbf1978e14d0334497578", [:mix], [], "hexpm", "a54305066a356b7194dc81db2a89232bacdc0b3edaef68ed9aba28dcbc34887b"},
4546
"sobelow": {:hex, :sobelow, "0.14.0", "dd82aae8f72503f924fe9dd97ffe4ca694d2f17ec463dcfd365987c9752af6ee", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "7ecf91e298acfd9b24f5d761f19e8f6e6ac585b9387fb6301023f1f2cd5eed5f"},
4647
"sourceror": {:hex, :sourceror, "1.10.0", "38397dedbbc286966ec48c7af13e228b171332be1ad731974438c77791945ce9", [:mix], [], "hexpm", "29dbdfc92e04569c9d8e6efdc422fc1d815f4bd0055dc7c51b8800fb75c4b3f1"},
47-
"spark": {:hex, :spark, "2.3.4", "3fe37fdfa01e3f7c9f4ced16b7b9950d5bfbb7fab024148e1968b0460ce1336b", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: true]}], "hexpm", "c0293d41461f1c1f1774379c668a240b008f4f8dcd550ea82b06163a55dcd53b"},
48+
"spark": {:hex, :spark, "2.3.5", "f30d30ecc3b4ab9b932d9aada66af7677fc1f297a2c349b0bcec3eafb9f996e8", [:mix], [{:igniter, ">= 0.3.64 and < 1.0.0-0", [hex: :igniter, repo: "hexpm", optional: true]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: true]}, {:sourceror, "~> 1.2", [hex: :sourceror, repo: "hexpm", optional: true]}], "hexpm", "0e9d339704d5d148f77f2b2fef3bcfc873a9e9bb4224fcf289c545d65827202f"},
4849
"spitfire": {:hex, :spitfire, "0.2.1", "29e154873f05444669c7453d3d931820822cbca5170e88f0f8faa1de74a79b47", [:mix], [], "hexpm", "6eeed75054a38341b2e1814d41bb0a250564092358de2669fdb57ff88141d91b"},
4950
"splode": {:hex, :splode, "0.2.9", "3a2776e187c82f42f5226b33b1220ccbff74f4bcc523dd4039c804caaa3ffdc7", [:mix], [], "hexpm", "8002b00c6e24f8bd1bcced3fbaa5c33346048047bb7e13d2f3ad428babbd95c3"},
5051
"statistex": {:hex, :statistex, "1.1.0", "7fec1eb2f580a0d2c1a05ed27396a084ab064a40cfc84246dbfb0c72a5c761e5", [:mix], [], "hexpm", "f5950ea26ad43246ba2cce54324ac394a4e7408fdcf98b8e230f503a0cba9cf5"},
@@ -53,6 +54,6 @@
5354
"text_diff": {:hex, :text_diff, "0.1.0", "1caf3175e11a53a9a139bc9339bd607c47b9e376b073d4571c031913317fecaa", [:mix], [], "hexpm", "d1ffaaecab338e49357b6daa82e435f877e0649041ace7755583a0ea3362dbd7"},
5455
"tz": {:hex, :tz, "0.28.1", "717f5ffddfd1e475e2a233e221dc0b4b76c35c4b3650b060c8e3ba29dd6632e9", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:mint, "~> 1.6", [hex: :mint, repo: "hexpm", optional: true]}], "hexpm", "bfdca1aa1902643c6c43b77c1fb0cb3d744fd2f09a8a98405468afdee0848c8a"},
5556
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
56-
"yaml_elixir": {:hex, :yaml_elixir, "2.11.0", "9e9ccd134e861c66b84825a3542a1c22ba33f338d82c07282f4f1f52d847bd50", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "53cc28357ee7eb952344995787f4bb8cc3cecbf189652236e9b163e8ce1bc242"},
57+
"yaml_elixir": {:hex, :yaml_elixir, "2.12.0", "30343ff5018637a64b1b7de1ed2a3ca03bc641410c1f311a4dbdc1ffbbf449c7", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "ca6bacae7bac917a7155dca0ab6149088aa7bc800c94d0fe18c5238f53b313c6"},
5758
"ymlr": {:hex, :ymlr, "5.1.4", "b924d61e1fc1ec371cde6ab3ccd9311110b1e052fc5c2460fb322e8380e7712a", [:mix], [], "hexpm", "75f16cf0709fbd911b30311a0359a7aa4b5476346c01882addefd5f2b1cfaa51"},
5859
}

test/bulk_create_test.exs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,71 @@ defmodule AshPostgres.BulkCreateTest do
171171
end)
172172
end
173173

174+
@tag :focus
175+
test "bulk upsert returns skipped records with return_skipped_upsert?" do
176+
assert [
177+
{:ok, %{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10}},
178+
{:ok, %{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20}},
179+
{:ok, %{title: "herbert", uniq_if_contains_foo: "3", price: 30}}
180+
] =
181+
Ash.bulk_create!(
182+
[
183+
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
184+
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20},
185+
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
186+
],
187+
Post,
188+
:create,
189+
return_stream?: true,
190+
return_records?: true
191+
)
192+
|> Enum.sort_by(fn {:ok, result} -> result.title end)
193+
194+
results =
195+
Ash.bulk_create!(
196+
[
197+
%{title: "fredfoo", uniq_if_contains_foo: "1foo", price: 10},
198+
%{title: "georgefoo", uniq_if_contains_foo: "2foo", price: 20_000},
199+
%{title: "herbert", uniq_if_contains_foo: "3", price: 30}
200+
],
201+
Post,
202+
:upsert_with_no_filter,
203+
return_stream?: true,
204+
upsert_condition: expr(price != upsert_conflict(:price)),
205+
return_errors?: true,
206+
return_records?: true,
207+
return_skipped_upsert?: true
208+
)
209+
|> Enum.sort_by(fn
210+
{:ok, result} ->
211+
result.title
212+
213+
_ ->
214+
nil
215+
end)
216+
217+
assert [
218+
{:ok, skipped},
219+
{:ok, updated},
220+
{:ok, no_conflict}
221+
] = results
222+
223+
# "fredfoo" was skipped because price matches (10 == 10)
224+
assert skipped.title == "fredfoo"
225+
assert skipped.price == 10
226+
assert Ash.Resource.get_metadata(skipped, :upsert_skipped) == true
227+
228+
# "georgefoo" was updated because price differs (20 -> 20_000)
229+
assert updated.title == "georgefoo"
230+
assert updated.price == 20_000
231+
refute Ash.Resource.get_metadata(updated, :upsert_skipped)
232+
233+
# "herbert" had no conflict (doesn't match identity)
234+
assert no_conflict.title == "herbert"
235+
assert no_conflict.price == 30
236+
refute Ash.Resource.get_metadata(no_conflict, :upsert_skipped)
237+
end
238+
174239
# confirmed that this doesn't work because it can't. An upsert must map to a potentially successful insert.
175240
# leaving this test here for posterity
176241
# test "bulk creates can upsert with id" do

0 commit comments

Comments
 (0)