diff --git a/lib/extensions/postgres_cdc_rls/replication_poller.ex b/lib/extensions/postgres_cdc_rls/replication_poller.ex index 1bb14b586..3e6d1f2e9 100644 --- a/lib/extensions/postgres_cdc_rls/replication_poller.ex +++ b/lib/extensions/postgres_cdc_rls/replication_poller.ex @@ -95,6 +95,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do record_list_changes_telemetry(time, tenant_id) case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do + {:ok, :peek_empty} -> + Backoff.reset(backoff) + poll_ref = Process.send_after(self(), :poll, poll_interval_ms) + {:noreply, %{state | backoff: backoff, poll_ref: poll_ref}} + {:ok, row_count} -> Backoff.reset(backoff) @@ -180,6 +185,14 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do ) end + defp handle_list_changes_result( + {:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, _, _, _, nil, [], ["peek_empty"]]]}}, + _subscribers_nodes_table, + _tenant_id, + _rate_counter_args + ), + do: {:ok, :peek_empty} + defp handle_list_changes_result( {:ok, %Postgrex.Result{ diff --git a/lib/realtime/tenants/migrations.ex b/lib/realtime/tenants/migrations.ex index 954f06390..3a87be211 100644 --- a/lib/realtime/tenants/migrations.ex +++ b/lib/realtime/tenants/migrations.ex @@ -81,7 +81,8 @@ defmodule Realtime.Tenants.Migrations do CreateMessagesReplayIndex, BroadcastSendIncludePayloadId, AddActionToSubscriptions, - FilterActionPostgresChanges + FilterActionPostgresChanges, + CreatePeekAndListChangesFunction } @migrations [ @@ -151,7 +152,8 @@ defmodule Realtime.Tenants.Migrations do {20_250_905_041_441, CreateMessagesReplayIndex}, {20_251_103_001_201, BroadcastSendIncludePayloadId}, {20_251_120_212_548, AddActionToSubscriptions}, - {20_251_120_215_549, FilterActionPostgresChanges} + {20_251_120_215_549, FilterActionPostgresChanges}, + {20_260_210_000_000, CreatePeekAndListChangesFunction} ] defstruct [:tenant_external_id, :settings, migrations_ran: 0] diff --git a/lib/realtime/tenants/repo/migrations/20260210000000_create_peek_and_list_changes_function.ex b/lib/realtime/tenants/repo/migrations/20260210000000_create_peek_and_list_changes_function.ex new file mode 100644 index 000000000..33e4d277b --- /dev/null +++ b/lib/realtime/tenants/repo/migrations/20260210000000_create_peek_and_list_changes_function.ex @@ -0,0 +1,159 @@ +defmodule Realtime.Tenants.Migrations.CreatePeekAndListChangesFunction do + @moduledoc false + + use Ecto.Migration + + def up do + execute(""" + create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int) + returns setof realtime.wal_rls + language sql + as $$ + with peek as ( + select 1 + from pg_logical_slot_peek_changes( + slot_name, null, 1, + 'include-pk', 'true', + 'include-transaction', 'false', + 'include-timestamp', 'true', + 'include-type-oids', 'true', + 'format-version', '2' + ) + limit 1 + ), + pub as ( + select + concat_ws( + ',', + case when bool_or(pubinsert) then 'insert' else null end, + case when bool_or(pubupdate) then 'update' else null end, + case when bool_or(pubdelete) then 'delete' else null end + ) as w2j_actions, + coalesce( + string_agg( + realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), + ',' + ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), + '' + ) w2j_add_tables + from + pg_publication pp + left join pg_publication_tables ppt + on pp.pubname = ppt.pubname + where + pp.pubname = publication + and exists (select 1 from peek) + group by + pp.pubname + limit 1 + ), + w2j as ( + select + x.*, pub.w2j_add_tables + from + pub, + pg_logical_slot_get_changes( + slot_name, null, max_changes, + 'include-pk', 'true', + 'include-transaction', 'false', + 'include-timestamp', 'true', + 'include-type-oids', 'true', + 'format-version', '2', + 'actions', pub.w2j_actions, + 'add-tables', pub.w2j_add_tables + ) x + ) + select + xyz.wal, + xyz.is_rls_enabled, + xyz.subscription_ids, + xyz.errors + from + w2j, + realtime.apply_rls( + wal := w2j.data::jsonb, + max_record_bytes := max_record_bytes + ) xyz(wal, is_rls_enabled, subscription_ids, errors) + where + w2j.w2j_add_tables <> '' + and xyz.subscription_ids[1] is not null + + union all + + select + null::jsonb, + null::boolean, + '{}'::uuid[], + '{peek_empty}'::text[] + where + not exists (select 1 from peek) + $$; + """) + end + + def down do + execute(""" + create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int) + returns setof realtime.wal_rls + language sql + set log_min_messages to 'fatal' + as $$ + with pub as ( + select + concat_ws( + ',', + case when bool_or(pubinsert) then 'insert' else null end, + case when bool_or(pubupdate) then 'update' else null end, + case when bool_or(pubdelete) then 'delete' else null end + ) as w2j_actions, + coalesce( + string_agg( + realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass), + ',' + ) filter (where ppt.tablename is not null and ppt.tablename not like '% %'), + '' + ) w2j_add_tables + from + pg_publication pp + left join pg_publication_tables ppt + on pp.pubname = ppt.pubname + where + pp.pubname = publication + group by + pp.pubname + limit 1 + ), + w2j as ( + select + x.*, pub.w2j_add_tables + from + pub, + pg_logical_slot_get_changes( + slot_name, null, max_changes, + 'include-pk', 'true', + 'include-transaction', 'false', + 'include-timestamp', 'true', + 'include-type-oids', 'true', + 'format-version', '2', + 'actions', pub.w2j_actions, + 'add-tables', pub.w2j_add_tables + ) x + ) + select + xyz.wal, + xyz.is_rls_enabled, + xyz.subscription_ids, + xyz.errors + from + w2j, + realtime.apply_rls( + wal := w2j.data::jsonb, + max_record_bytes := max_record_bytes + ) xyz(wal, is_rls_enabled, subscription_ids, errors) + where + w2j.w2j_add_tables <> '' + and xyz.subscription_ids[1] is not null + $$; + """) + end +end diff --git a/test/integration/replications_test.exs b/test/integration/replications_test.exs new file mode 100644 index 000000000..6046cf76e --- /dev/null +++ b/test/integration/replications_test.exs @@ -0,0 +1,190 @@ +defmodule Realtime.Integration.ReplicationsTest do + use Realtime.DataCase, async: false + + alias Extensions.PostgresCdcRls.Replications + alias Extensions.PostgresCdcRls.Subscriptions + alias Realtime.Database + + @publication "supabase_realtime_test" + @poll_interval 100 + + setup do + tenant = Containers.checkout_tenant(run_migrations: true) + + {:ok, conn} = + tenant + |> Database.from_tenant("realtime_rls") + |> Map.from_struct() + |> Keyword.new() + |> Postgrex.start_link() + + slot_name = "supabase_realtime_test_slot_#{System.unique_integer([:positive])}" + + on_exit(fn -> + try do + Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name]) + catch + _, _ -> :ok + end + end) + + {:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"event" => "*", "schema" => "public"}) + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + {:ok, _} = Subscriptions.create(conn, @publication, params_list, self(), self()) + {:ok, _} = Replications.prepare_replication(conn, slot_name) + + # Drain any setup changes + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + %{conn: conn, slot_name: slot_name} + end + + describe "replication polling lifecycle" do + test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do + {time, result} = + :timer.tc(fn -> + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + end) + + assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result + assert Enum.at(sentinel, 8) == ["peek_empty"] + assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms" + + Process.sleep(@poll_interval) + + Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", []) + Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", []) + Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", []) + + Process.sleep(@poll_interval) + + {:ok, %Postgrex.Result{num_rows: 3, rows: rows}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + [row | _] = rows + assert Enum.at(row, 0) == "INSERT" + assert Enum.at(row, 1) == "public" + assert Enum.at(row, 2) == "test" + + Process.sleep(@poll_interval) + + {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + assert Enum.at(sentinel, 8) == ["peek_empty"] + end + + test "polls empty multiple times then captures a change when it arrives", %{conn: conn, slot_name: slot_name} do + for _ <- 1..5 do + {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + assert Enum.at(sentinel, 8) == ["peek_empty"] + Process.sleep(@poll_interval) + end + + Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('delayed_arrival')", []) + Process.sleep(@poll_interval) + + {:ok, %Postgrex.Result{num_rows: 1, rows: [row]}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + assert Enum.at(row, 0) == "INSERT" + assert Enum.at(row, 1) == "public" + assert Enum.at(row, 2) == "test" + + Process.sleep(@poll_interval) + + {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + assert Enum.at(sentinel, 8) == ["peek_empty"] + end + + test "prepare_replication is idempotent", %{conn: conn, slot_name: slot_name} do + {:ok, _} = Replications.prepare_replication(conn, slot_name) + Process.sleep(@poll_interval) + {:ok, _} = Replications.prepare_replication(conn, slot_name) + end + + test "terminate_backend returns slot_not_found for unknown slots", %{conn: conn} do + assert {:error, :slot_not_found} = + Replications.terminate_backend(conn, "nonexistent_slot_#{System.unique_integer([:positive])}") + end + + test "get_pg_stat_activity_diff returns elapsed seconds for active connection", %{conn: conn} do + {:ok, %Postgrex.Result{rows: [[pid]]}} = Postgrex.query(conn, "SELECT pg_backend_pid()", []) + Postgrex.query!(conn, "SET application_name = 'realtime_rls'", []) + Process.sleep(@poll_interval) + + assert {:ok, diff} = Replications.get_pg_stat_activity_diff(conn, pid) + assert is_integer(diff) + end + end + + describe "peek vs RLS distinction" do + setup do + tenant = Containers.checkout_tenant(run_migrations: true) + + {:ok, conn} = + tenant + |> Database.from_tenant("realtime_rls") + |> Map.from_struct() + |> Keyword.new() + |> Postgrex.start_link() + + slot_name = "supabase_realtime_rls_slot_#{System.unique_integer([:positive])}" + + on_exit(fn -> + try do + Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name]) + catch + _, _ -> :ok + end + end) + + {:ok, subscription_params} = + Subscriptions.parse_subscription_params(%{ + "event" => "*", + "schema" => "public", + "table" => "test", + "filter" => "details=eq.no_match" + }) + + params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}] + {:ok, _} = Subscriptions.create(conn, @publication, params_list, self(), self()) + {:ok, _} = Replications.prepare_replication(conn, slot_name) + + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + %{conn: conn, slot_name: slot_name} + end + + test "returns 0 rows when WAL changes exist but are filtered by subscription", %{ + conn: conn, + slot_name: slot_name + } do + # Peek is empty - sentinel row + {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + assert Enum.at(sentinel, 8) == ["peek_empty"] + + # Insert a row that doesn't match the filter (details != "no_match") + Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('rls_filtered')", []) + Process.sleep(@poll_interval) + + # WAL changes consumed but subscription filter doesn't match - 0 rows, no sentinel + {:ok, %Postgrex.Result{num_rows: 0}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + Process.sleep(@poll_interval) + + # After consumption, peek is empty again - sentinel returned + {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = + Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576) + + assert Enum.at(sentinel, 8) == ["peek_empty"] + end + end +end