From a6f20205a37ac5887cf47eaba403974a95f9cc3f Mon Sep 17 00:00:00 2001 From: Dohan Kim Date: Wed, 22 Jan 2025 21:21:54 +0900 Subject: [PATCH] Call disconnect on protocol when reconnecting in Replication connection (#726) --- lib/postgrex/replication_connection.ex | 10 ++++++++++ lib/postgrex/simple_connection.ex | 11 +++++++++++ 2 files changed, 21 insertions(+) diff --git a/lib/postgrex/replication_connection.ex b/lib/postgrex/replication_connection.ex index 6a5ec734..3354f17c 100644 --- a/lib/postgrex/replication_connection.ex +++ b/lib/postgrex/replication_connection.ex @@ -490,12 +490,22 @@ defmodule Postgrex.ReplicationConnection do {:keep_state, s, {:next_event, :internal, {:connect, :backoff}}} end + def handle_event(:internal, {:connect, :reconnect}, @state, %{protocol: protocol} = state) + when protocol != nil do + Protocol.disconnect(:reconnect, protocol) + {:keep_state, %{state | protocol: nil}, {:next_event, :internal, {:connect, :init}}} + end + def handle_event(:internal, {:connect, _info}, @state, %{state: {mod, mod_state}} = s) do case Protocol.connect(opts()) do {:ok, protocol} -> maybe_handle(mod, :handle_connect, [mod_state], %{s | protocol: protocol}) {:error, reason} -> + Logger.error( + "#{inspect(pid_or_name())} (#{inspect(mod)}) failed to connect to Postgres: #{Exception.format(:error, reason)}" + ) + if s.auto_reconnect do {:keep_state, s, {{:timeout, :backoff}, s.reconnect_backoff, nil}} else diff --git a/lib/postgrex/simple_connection.ex b/lib/postgrex/simple_connection.ex index 0f411341..eaef1f8e 100644 --- a/lib/postgrex/simple_connection.ex +++ b/lib/postgrex/simple_connection.ex @@ -362,6 +362,10 @@ defmodule Postgrex.SimpleConnection do end {:error, reason} -> + Logger.error( + "#{inspect(pid_or_name())} (#{inspect(mod)}) failed to connect to Postgres: #{Exception.format(:error, reason)}" + ) + if state.auto_reconnect do {:keep_state, state, {{:timeout, :backoff}, state.reconnect_backoff, nil}} else @@ -466,6 +470,13 @@ defmodule Postgrex.SimpleConnection do end end + defp pid_or_name do + case Process.info(self(), :registered_name) do + {:registered_name, atom} when is_atom(atom) -> atom + _ -> self() + end + end + defp opts(mod), do: Process.get(mod) defp put_opts(mod, opts), do: Process.put(mod, opts)