Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: improve replication reliability with transaction acknowledgment #74

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions lib/walex/events/event_modules.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ defmodule WalEx.Events.EventModules do

@impl true
def handle_call({:process, txn, server}, _from, state) do
server
|> WalEx.Config.get_configs(:modules)
|> process_events(txn)
resp =
server
|> WalEx.Config.get_configs(:modules)
|> process_events(txn)

{:reply, :ok, state}
{:reply, resp, state}
end

defp process_events(nil, %{changes: [], commit_timestamp: _}), do: nil
defp process_events(nil, %{changes: [], commit_timestamp: _}), do: :ok

defp process_events(modules, txn) when is_list(modules) do
process_modules(modules, txn)
Expand All @@ -48,16 +49,30 @@ defmodule WalEx.Events.EventModules do
defp process_modules(modules, txn) do
functions = ~w(process_all process_insert process_update process_delete)a

Enum.each(modules, &process_module(&1, functions, txn))
Enum.reduce_while(modules, :ok, fn module_name, _acc ->
case process_module(module_name, functions, txn) do
:ok -> {:cont, :ok}
{:ok, _} = term -> {:cont, term}
term -> {:halt, term}
end
end)
end

defp process_module(module_name, functions, txn) do
Enum.each(functions, &apply_process_macro(&1, module_name, txn))
Enum.reduce_while(functions, :ok, fn function, _acc ->
case apply_process_macro(function, module_name, txn) do
:ok -> {:cont, :ok}
{:ok, _} = term -> {:cont, term}
term -> {:halt, term}
end
end)
end

defp apply_process_macro(function, module, txn) do
if Keyword.has_key?(module.__info__(:functions), function) do
apply(module, function, [txn])
else
:ok
end
end
end
4 changes: 2 additions & 2 deletions lib/walex/events/events.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ defmodule WalEx.Events do

@impl true
def handle_call({:process, txn, app_name}, _from, state) do
process_destinations(txn, app_name)
resp = process_destinations(txn, app_name)

{:reply, :ok, state}
{:reply, resp, state}
end

defp process_destinations(txn, app_name) do
Expand Down
17 changes: 16 additions & 1 deletion lib/walex/replication/publisher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ defmodule WalEx.Replication.Publisher do

alias WalEx.{Changes, Config, Events, Types}
alias WalEx.Decoder.Messages
alias WalEx.Replication.Server

require Logger

defmodule(State,
do:
Expand Down Expand Up @@ -72,7 +75,19 @@ defmodule WalEx.Replication.Publisher do
%State{transaction: {current_txn_lsn, txn}, relations: _relations} = state
)
when commit_lsn == current_txn_lsn do
Events.process(txn, app_name)
case Events.process(txn, app_name) do
:ok ->
Server.ack(commit_lsn, app_name)

{:ok, _} ->
Server.ack(commit_lsn, app_name)

term ->
Logger.error(
"Failed to process transaction for lsn: #{inspect(commit_lsn)} with term: #{inspect(term)}"
)
end

state
end

Expand Down
43 changes: 39 additions & 4 deletions lib/walex/replication/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ defmodule WalEx.Replication.Server do
slot_name: slot_name,
publication: publication,
durable_slot: durable_slot,
message_middleware: message_middleware
message_middleware: message_middleware,
wal_position: nil
}

{:ok, state}
Expand Down Expand Up @@ -113,7 +114,13 @@ defmodule WalEx.Replication.Server do
end

@impl true
def handle_result([%Postgrex.Result{} | _results], state = %{step: :create_slot}) do
def handle_result(
[%Postgrex.Result{columns: columns, rows: [row]} | _results],
state = %{step: :create_slot}
) do
consistent_point = columns |> Enum.zip(row) |> Enum.into(%{}) |> Map.get("consistent_point")
wal_position = String.split(consistent_point, "/") |> List.last() |> String.to_integer(16)
state = Map.put(state, :wal_position, wal_position)
start_replication_with_retry(state, 0, @initial_backoff)
end

Expand Down Expand Up @@ -152,8 +159,18 @@ defmodule WalEx.Replication.Server do
def handle_data(<<?k, wal_end::64, _clock::64, reply>>, state) do
messages =
case reply do
1 -> [<<?r, wal_end + 1::64, wal_end + 1::64, wal_end + 1::64, current_time()::64, 0>>]
0 -> []
1 ->
Logger.debug(
"standby status update, remote wal: #{wal_end} current wal: #{state.wal_position}"
)

[
<<?r, state.wal_position::64, state.wal_position::64, state.wal_position::64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needed to be the wal_position+1 I think
Screenshot 2024-12-30 at 16 08 25

current_time()::64, 0>>
]

0 ->
[]
end

{:noreply, messages, state}
Expand All @@ -165,6 +182,24 @@ defmodule WalEx.Replication.Server do
{:query, query, %{state | step: :slot_exists}}
end

def handle_info({:ack_transaction, {_, wal_position}}, state) do
Logger.debug("moving wal position to #{wal_position}")
state = Map.put(state, :wal_position, wal_position)
{:noreply, state}
end

def ack(info, app_name) do
case Registry.lookup(:walex_registry, {__MODULE__, app_name}) do
[{pid, _}] ->
send(pid, {:ack_transaction, info})

[] ->
Logger.warning(
"Attempted to ack transaction but server process not found for #{app_name}"
)
end
end

defp set_pgx_replication_conn_opts(app_name) do
database_configs_keys = [
:hostname,
Expand Down
6 changes: 6 additions & 0 deletions test/walex/database_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ defmodule WalEx.DatabaseTest do

refute Process.info(database_pid)

Process.sleep(5000)

new_database_pid = get_database_pid(supervisor_pid)

assert is_pid(new_database_pid)
Expand Down Expand Up @@ -143,6 +145,8 @@ defmodule WalEx.DatabaseTest do

assert :ok == pg_restart()

Process.sleep(5000)

new_database_pid = get_database_pid(supervisor_pid)

assert is_pid(new_database_pid)
Expand Down Expand Up @@ -454,6 +458,8 @@ defmodule WalEx.DatabaseTest do
:timer.sleep(1000)
end)

IO.puts(capture_log)

assert capture_log =~ "on_update event occurred"
assert capture_log =~ "%WalEx.Event"
end
Expand Down
Loading