Skip to content

Commit 994292e

Browse files
authored
Add support for pool_count (#636)
1 parent 53736de commit 994292e

File tree

4 files changed

+108
-61
lines changed

4 files changed

+108
-61
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,10 +58,16 @@ jobs:
5858
- "11.11-alpine"
5959
- "9.6-alpine"
6060
- "9.5-alpine"
61+
include:
62+
- elixirbase: "1.11.4-erlang-23.3.4.9-alpine-3.16.9"
63+
postgres: "16.2-alpine"
64+
pool_count: "4"
6165
steps:
6266
- uses: earthly/actions-setup@v1
6367
- uses: actions/checkout@v3
6468
- name: test ecto_sql
69+
env:
70+
POOL_COUNT: ${{ matrix.pool_count || '1' }}
6571
run: earthly -P --ci --build-arg ELIXIR_BASE=${{matrix.elixirbase}} --build-arg POSTGRES=${{matrix.postgres}} +integration-test-postgres
6672

6773
test-mysql:

integration_test/myxql/test_helper.exs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@ Application.put_env(:ecto, :primary_key_type, :id)
55
Application.put_env(:ecto, :async_integration_tests, false)
66
Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE")
77

8-
Code.require_file "../support/repo.exs", __DIR__
8+
Code.require_file("../support/repo.exs", __DIR__)
99

1010
# Configure MySQL connection
11-
Application.put_env(:ecto_sql, :mysql_test_url,
11+
Application.put_env(
12+
:ecto_sql,
13+
:mysql_test_url,
1214
"ecto://" <> (System.get_env("MYSQL_URL") || "root@127.0.0.1")
1315
)
1416

@@ -53,7 +55,8 @@ alias Ecto.Integration.PoolRepo
5355
Application.put_env(:ecto_sql, PoolRepo,
5456
adapter: Ecto.Adapters.MyXQL,
5557
url: Application.get_env(:ecto_sql, :mysql_test_url) <> "/ecto_test",
56-
pool_size: 10,
58+
pool_size: 5,
59+
pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")),
5760
show_sensitive_data_on_connection_error: true
5861
)
5962

@@ -63,8 +66,8 @@ end
6366

6467
# Load support files
6568
ecto = Mix.Project.deps_paths()[:ecto]
66-
Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__
67-
Code.require_file "../support/migration.exs", __DIR__
69+
Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__)
70+
Code.require_file("../support/migration.exs", __DIR__)
6871

6972
defmodule Ecto.Integration.Case do
7073
use ExUnit.CaseTemplate
@@ -77,7 +80,7 @@ end
7780
{:ok, _} = Ecto.Adapters.MyXQL.ensure_all_started(TestRepo.config(), :temporary)
7881

7982
# Load up the repository, start it, and run migrations
80-
_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config())
83+
_ = Ecto.Adapters.MyXQL.storage_down(TestRepo.config())
8184
:ok = Ecto.Adapters.MyXQL.storage_up(TestRepo.config())
8285

8386
{:ok, _pid} = TestRepo.start_link()

integration_test/pg/test_helper.exs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ Application.put_env(:ecto, :async_integration_tests, true)
66
Application.put_env(:ecto_sql, :lock_for_update, "FOR UPDATE")
77

88
# Configure PG connection
9-
Application.put_env(:ecto_sql, :pg_test_url,
9+
Application.put_env(
10+
:ecto_sql,
11+
:pg_test_url,
1012
"ecto://" <> (System.get_env("PG_URL") || "postgres:postgres@127.0.0.1")
1113
)
1214

13-
Code.require_file "../support/repo.exs", __DIR__
15+
Code.require_file("../support/repo.exs", __DIR__)
1416

1517
# Define type module
1618
opts = if Code.ensure_loaded?(Duration), do: [interval_decode_type: Duration], else: []
@@ -59,21 +61,28 @@ end
5961

6062
pool_repo_config = [
6163
url: Application.get_env(:ecto_sql, :pg_test_url) <> "/ecto_test",
62-
pool_size: 10,
64+
pool_size: 5,
65+
pool_count: String.to_integer(System.get_env("POOL_COUNT", "1")),
6366
max_restarts: 20,
6467
max_seconds: 10
6568
]
6669

6770
Application.put_env(:ecto_sql, PoolRepo, pool_repo_config)
68-
Application.put_env(:ecto_sql, AdvisoryLockPoolRepo, pool_repo_config ++ [
69-
migration_source: "advisory_lock_schema_migrations",
70-
migration_lock: :pg_advisory_lock
71-
])
71+
72+
Application.put_env(
73+
:ecto_sql,
74+
AdvisoryLockPoolRepo,
75+
pool_repo_config ++
76+
[
77+
migration_source: "advisory_lock_schema_migrations",
78+
migration_lock: :pg_advisory_lock
79+
]
80+
)
7281

7382
# Load support files
7483
ecto = Mix.Project.deps_paths()[:ecto]
75-
Code.require_file "#{ecto}/integration_test/support/schemas.exs", __DIR__
76-
Code.require_file "../support/migration.exs", __DIR__
84+
Code.require_file("#{ecto}/integration_test/support/schemas.exs", __DIR__)
85+
Code.require_file("../support/migration.exs", __DIR__)
7786

7887
defmodule Ecto.Integration.Case do
7988
use ExUnit.CaseTemplate
@@ -86,7 +95,7 @@ end
8695
{:ok, _} = Ecto.Adapters.Postgres.ensure_all_started(TestRepo.config(), :temporary)
8796

8897
# Load up the repository, start it, and run migrations
89-
_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config())
98+
_ = Ecto.Adapters.Postgres.storage_down(TestRepo.config())
9099
:ok = Ecto.Adapters.Postgres.storage_up(TestRepo.config())
91100

92101
{:ok, _pid} = TestRepo.start_link()
@@ -112,7 +121,9 @@ exclude_list = excludes ++ excludes_above_9_5
112121

113122
cond do
114123
Version.match?(version, "< 9.6.0") ->
115-
ExUnit.configure(exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0)
124+
ExUnit.configure(
125+
exclude: exclude_list ++ excludes_below_9_6 ++ excludes_below_12_0 ++ excludes_below_15_0
126+
)
116127

117128
Version.match?(version, "< 12.0.0") ->
118129
ExUnit.configure(exclude: exclude_list ++ excludes_below_12_0 ++ excludes_below_15_0)

lib/ecto/adapters/sql.ex

Lines changed: 71 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,22 @@ defmodule Ecto.Adapters.SQL do
480480
disconnect_all(Ecto.Adapter.lookup_meta(repo), interval, opts)
481481
end
482482

483-
def disconnect_all(%{pid: pid} = _adapter_meta, interval, opts) do
484-
DBConnection.disconnect_all(pid, interval, opts)
483+
def disconnect_all(adapter_meta, interval, opts) do
484+
case adapter_meta do
485+
%{partition_supervisor: {name, count}} ->
486+
1..count
487+
|> Enum.map(fn i ->
488+
Task.async(fn ->
489+
DBConnection.disconnect_all({:via, PartitionSupervisor, {name, i}}, interval, opts)
490+
end)
491+
end)
492+
|> Task.await_many(:infinity)
493+
494+
:ok
495+
496+
%{pid: pool} ->
497+
DBConnection.disconnect_all(pool, interval, opts)
498+
end
485499
end
486500

487501
@doc """
@@ -646,7 +660,7 @@ defmodule Ecto.Adapters.SQL do
646660

647661
defp sql_call(adapter_meta, callback, args, params, opts) do
648662
%{pid: pool, telemetry: telemetry, sql: sql, opts: default_opts} = adapter_meta
649-
conn = get_conn_or_pool(pool)
663+
conn = get_conn_or_pool(pool, adapter_meta)
650664
opts = with_log(telemetry, params, opts ++ default_opts)
651665
args = args ++ [params, opts]
652666
apply(sql, callback, [conn | args])
@@ -662,7 +676,7 @@ defmodule Ecto.Adapters.SQL do
662676
end
663677

664678
@doc """
665-
Check if the given `table` exists.
679+
Checks if the given `table` exists.
666680
667681
Returns `true` if the `table` exists in the `repo`, otherwise `false`.
668682
The table is checked against the current database/schema in the connection.
@@ -702,7 +716,7 @@ defmodule Ecto.Adapters.SQL do
702716
def format_table(%{columns: columns, rows: rows}) do
703717
column_widths =
704718
[columns | rows]
705-
|> List.zip()
719+
|> Enum.zip()
706720
|> Enum.map(&Tuple.to_list/1)
707721
|> Enum.map(fn column_with_rows ->
708722
column_with_rows |> Enum.map(&binary_length/1) |> Enum.max()
@@ -733,7 +747,7 @@ defmodule Ecto.Adapters.SQL do
733747
defp cells(items, widths) do
734748
cell =
735749
[items, widths]
736-
|> List.zip()
750+
|> Enum.zip()
737751
|> Enum.map(fn {item, width} -> [?|, " ", format_item(item, width), " "] end)
738752

739753
[cell | [?|]]
@@ -827,6 +841,8 @@ defmodule Ecto.Adapters.SQL do
827841
@pool_opts [:timeout, :pool, :pool_size] ++
828842
[:queue_target, :queue_interval, :ownership_timeout, :repo]
829843

844+
@valid_log_levels ~w(false debug info notice warning error critical alert emergency)a
845+
830846
@doc false
831847
def init(connection, driver, config) do
832848
unless Code.ensure_loaded?(connection) do
@@ -845,24 +861,12 @@ defmodule Ecto.Adapters.SQL do
845861

846862
log = Keyword.get(config, :log, :debug)
847863

848-
valid_log_levels = [
849-
false,
850-
:debug,
851-
:info,
852-
:notice,
853-
:warning,
854-
:error,
855-
:critical,
856-
:alert,
857-
:emergency
858-
]
859-
860-
if log not in valid_log_levels do
864+
if log not in @valid_log_levels do
861865
raise """
862866
invalid value for :log option in Repo config
863867
864868
The accepted values for the :log option are:
865-
#{Enum.map_join(valid_log_levels, ", ", &inspect/1)}
869+
#{Enum.map_join(@valid_log_levels, ", ", &inspect/1)}
866870
867871
See https://hexdocs.pm/ecto/Ecto.Repo.html for more information.
868872
"""
@@ -872,35 +876,49 @@ defmodule Ecto.Adapters.SQL do
872876
telemetry_prefix = Keyword.fetch!(config, :telemetry_prefix)
873877
telemetry = {config[:repo], log, telemetry_prefix ++ [:query]}
874878

875-
config = adapter_config(config)
876-
opts = Keyword.take(config, @pool_opts)
877-
meta = %{telemetry: telemetry, sql: connection, stacktrace: stacktrace, opts: opts}
878-
{:ok, connection.child_spec(config), meta}
879-
end
879+
{name, config} = Keyword.pop(config, :name, config[:repo])
880+
{pool_count, config} = Keyword.pop(config, :pool_count, 1)
881+
{pool, config} = pool_config(config)
882+
child_spec = connection.child_spec(config)
880883

881-
defp adapter_config(config) do
882-
if Keyword.has_key?(config, :pool_timeout) do
883-
message = """
884-
:pool_timeout option no longer has an effect and has been replaced with an improved queuing system.
885-
See \"Queue config\" in DBConnection.start_link/2 documentation for more information.
886-
"""
884+
meta = %{
885+
telemetry: telemetry,
886+
sql: connection,
887+
stacktrace: stacktrace,
888+
opts: Keyword.take(config, @pool_opts)
889+
}
887890

888-
IO.warn(message)
889-
end
891+
if pool_count > 1 do
892+
if name == nil do
893+
raise ArgumentError, "the option :pool_count requires a :name"
894+
end
890895

891-
config
892-
|> Keyword.delete(:name)
893-
|> Keyword.update(:pool, DBConnection.ConnectionPool, &normalize_pool/1)
894-
end
896+
if pool == DBConnection.Ownership do
897+
raise ArgumentError, "the option :pool_count does not work with the SQL sandbox"
898+
end
895899

896-
defp normalize_pool(pool) do
897-
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
898-
DBConnection.Ownership
900+
name = Module.concat(name, PartitionSupervisor)
901+
partition_opts = [name: name, child_spec: child_spec, partitions: pool_count]
902+
child_spec = Supervisor.child_spec({PartitionSupervisor, partition_opts}, [])
903+
{:ok, child_spec, Map.put(meta, :partition_supervisor, {name, pool_count})}
899904
else
900-
pool
905+
{:ok, child_spec, meta}
901906
end
902907
end
903908

909+
defp pool_config(config) do
910+
{pool, config} = Keyword.pop(config, :pool, DBConnection.ConnectionPool)
911+
912+
pool =
913+
if Code.ensure_loaded?(pool) && function_exported?(pool, :unboxed_run, 2) do
914+
DBConnection.Ownership
915+
else
916+
pool
917+
end
918+
919+
{pool, [pool: pool] ++ config}
920+
end
921+
904922
@doc false
905923
def checkout(adapter_meta, opts, callback) do
906924
checkout_or_transaction(:run, adapter_meta, opts, callback)
@@ -1385,11 +1403,20 @@ defmodule Ecto.Adapters.SQL do
13851403
end
13861404
end
13871405

1388-
apply(DBConnection, fun, [get_conn_or_pool(pool), callback, opts])
1406+
apply(DBConnection, fun, [get_conn_or_pool(pool, adapter_meta), callback, opts])
13891407
end
13901408

1391-
defp get_conn_or_pool(pool) do
1392-
Process.get(key(pool), pool)
1409+
defp get_conn_or_pool(pool, adapter_meta) do
1410+
case :erlang.get(key(pool)) do
1411+
:undefined ->
1412+
case adapter_meta do
1413+
%{partition_supervisor: {name, _}} -> {:via, PartitionSupervisor, {name, self()}}
1414+
_ -> pool
1415+
end
1416+
1417+
conn ->
1418+
conn
1419+
end
13931420
end
13941421

13951422
defp get_conn(pool) do

0 commit comments

Comments
 (0)