Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
record_list_changes_telemetry(time, tenant_id)

case handle_list_changes_result(list_changes, subscribers_nodes_table, tenant_id, rate_counter_args) do
{:ok, :peek_empty} ->
Backoff.reset(backoff)
poll_ref = Process.send_after(self(), :poll, poll_interval_ms)
{:noreply, %{state | backoff: backoff, poll_ref: poll_ref}}

{:ok, row_count} ->
Backoff.reset(backoff)

Expand Down Expand Up @@ -180,6 +185,14 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
)
end

defp handle_list_changes_result(
{:ok, %Postgrex.Result{num_rows: 1, rows: [[nil, nil, nil, _, _, _, nil, [], ["peek_empty"]]]}},
_subscribers_nodes_table,
_tenant_id,
_rate_counter_args
),
do: {:ok, :peek_empty}

defp handle_list_changes_result(
{:ok,
%Postgrex.Result{
Expand Down
6 changes: 4 additions & 2 deletions lib/realtime/tenants/migrations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ defmodule Realtime.Tenants.Migrations do
CreateMessagesReplayIndex,
BroadcastSendIncludePayloadId,
AddActionToSubscriptions,
FilterActionPostgresChanges
FilterActionPostgresChanges,
CreatePeekAndListChangesFunction
}

@migrations [
Expand Down Expand Up @@ -151,7 +152,8 @@ defmodule Realtime.Tenants.Migrations do
{20_250_905_041_441, CreateMessagesReplayIndex},
{20_251_103_001_201, BroadcastSendIncludePayloadId},
{20_251_120_212_548, AddActionToSubscriptions},
{20_251_120_215_549, FilterActionPostgresChanges}
{20_251_120_215_549, FilterActionPostgresChanges},
{20_260_210_000_000, CreatePeekAndListChangesFunction}
]

defstruct [:tenant_external_id, :settings, migrations_ran: 0]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
defmodule Realtime.Tenants.Migrations.CreatePeekAndListChangesFunction do
@moduledoc false

use Ecto.Migration

def up do
execute("""
create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int)
returns setof realtime.wal_rls
language sql
as $$
with peek as (
select 1
from pg_logical_slot_peek_changes(
slot_name, null, 1,
'include-pk', 'true',
'include-transaction', 'false',
'include-timestamp', 'true',
'include-type-oids', 'true',
'format-version', '2'
)
limit 1
),
pub as (
select
concat_ws(
',',
case when bool_or(pubinsert) then 'insert' else null end,
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
''
) w2j_add_tables
from
pg_publication pp
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = publication
and exists (select 1 from peek)
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_logical_slot_get_changes(
slot_name, null, max_changes,
'include-pk', 'true',
'include-transaction', 'false',
'include-timestamp', 'true',
'include-type-oids', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := max_record_bytes
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null

union all

select
null::jsonb,
null::boolean,
'{}'::uuid[],
'{peek_empty}'::text[]
where
not exists (select 1 from peek)
$$;
""")
end

def down do
execute("""
create or replace function realtime.list_changes(publication name, slot_name name, max_changes int, max_record_bytes int)
returns setof realtime.wal_rls
language sql
set log_min_messages to 'fatal'
as $$
with pub as (
select
concat_ws(
',',
case when bool_or(pubinsert) then 'insert' else null end,
case when bool_or(pubupdate) then 'update' else null end,
case when bool_or(pubdelete) then 'delete' else null end
) as w2j_actions,
coalesce(
string_agg(
realtime.quote_wal2json(format('%I.%I', schemaname, tablename)::regclass),
','
) filter (where ppt.tablename is not null and ppt.tablename not like '% %'),
''
) w2j_add_tables
from
pg_publication pp
left join pg_publication_tables ppt
on pp.pubname = ppt.pubname
where
pp.pubname = publication
group by
pp.pubname
limit 1
),
w2j as (
select
x.*, pub.w2j_add_tables
from
pub,
pg_logical_slot_get_changes(
slot_name, null, max_changes,
'include-pk', 'true',
'include-transaction', 'false',
'include-timestamp', 'true',
'include-type-oids', 'true',
'format-version', '2',
'actions', pub.w2j_actions,
'add-tables', pub.w2j_add_tables
) x
)
select
xyz.wal,
xyz.is_rls_enabled,
xyz.subscription_ids,
xyz.errors
from
w2j,
realtime.apply_rls(
wal := w2j.data::jsonb,
max_record_bytes := max_record_bytes
) xyz(wal, is_rls_enabled, subscription_ids, errors)
where
w2j.w2j_add_tables <> ''
and xyz.subscription_ids[1] is not null
$$;
""")
end
end
190 changes: 190 additions & 0 deletions test/integration/replications_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
defmodule Realtime.Integration.ReplicationsTest do
use Realtime.DataCase, async: false

alias Extensions.PostgresCdcRls.Replications
alias Extensions.PostgresCdcRls.Subscriptions
alias Realtime.Database

@publication "supabase_realtime_test"
@poll_interval 100

setup do
tenant = Containers.checkout_tenant(run_migrations: true)

{:ok, conn} =
tenant
|> Database.from_tenant("realtime_rls")
|> Map.from_struct()
|> Keyword.new()
|> Postgrex.start_link()

slot_name = "supabase_realtime_test_slot_#{System.unique_integer([:positive])}"

on_exit(fn ->
try do
Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
catch
_, _ -> :ok
end
end)

{:ok, subscription_params} = Subscriptions.parse_subscription_params(%{"event" => "*", "schema" => "public"})
params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
{:ok, _} = Subscriptions.create(conn, @publication, params_list, self(), self())
{:ok, _} = Replications.prepare_replication(conn, slot_name)

# Drain any setup changes
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

%{conn: conn, slot_name: slot_name}
end

describe "replication polling lifecycle" do
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
{time, result} =
:timer.tc(fn ->
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
end)

assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
assert Enum.at(sentinel, 8) == ["peek_empty"]
assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"

Process.sleep(@poll_interval)

Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])

Process.sleep(@poll_interval)

{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

[row | _] = rows
assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"

Process.sleep(@poll_interval)

{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(sentinel, 8) == ["peek_empty"]
end
Comment on lines +43 to +75
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid a hard 50ms timing assertion to prevent flaky CI.

The strict bound is environment-sensitive and can intermittently fail even when behavior is correct. Consider relaxing the threshold or asserting functional behavior only.

🧪 Suggested relaxation
-      assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"
+      assert time < 500_000, "Expected peek short-circuit under 500ms, took #{div(time, 1000)}ms"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
{time, result} =
:timer.tc(fn ->
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
end)
assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
assert Enum.at(sentinel, 8) == ["peek_empty"]
assert time < 50_000, "Expected peek short-circuit under 50ms, took #{div(time, 1000)}ms"
Process.sleep(@poll_interval)
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])
Process.sleep(@poll_interval)
{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
[row | _] = rows
assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"
Process.sleep(@poll_interval)
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)
assert Enum.at(sentinel, 8) == ["peek_empty"]
end
test "prepare, poll, consume full cycle", %{conn: conn, slot_name: slot_name} do
{time, result} =
:timer.tc(fn ->
Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576)
end)
assert {:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} = result
assert Enum.at(sentinel, 8) == ["peek_empty"]
assert time < 500_000, "Expected peek short-circuit under 500ms, took #{div(time, 1000)}ms"
Process.sleep(`@poll_interval`)
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_1')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_2')", [])
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('row_3')", [])
Process.sleep(`@poll_interval`)
{:ok, %Postgrex.Result{num_rows: 3, rows: rows}} =
Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576)
[row | _] = rows
assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"
Process.sleep(`@poll_interval`)
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, `@publication`, 100, 1_048_576)
assert Enum.at(sentinel, 8) == ["peek_empty"]
end
🤖 Prompt for AI Agents
In `@test/integration/replications_test.exs` around lines 43 - 75, The test's
strict timing assertion for peek short-circuit (assert time < 50_000) is flaky;
update the assertion in the "prepare, poll, consume full cycle" test that
measures :timer.tc around Replications.list_changes to use a relaxed threshold
(e.g., assert time < 200_000) or remove the wall-clock assert entirely and
instead assert functional behavior only (presence of sentinel and its peek_empty
value via Enum.at(sentinel, 8)), referencing the measured variable time and the
call to Replications.list_changes so you change the correct assertion.


test "polls empty multiple times then captures a change when it arrives", %{conn: conn, slot_name: slot_name} do
for _ <- 1..5 do
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(sentinel, 8) == ["peek_empty"]
Process.sleep(@poll_interval)
end

Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('delayed_arrival')", [])
Process.sleep(@poll_interval)

{:ok, %Postgrex.Result{num_rows: 1, rows: [row]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(row, 0) == "INSERT"
assert Enum.at(row, 1) == "public"
assert Enum.at(row, 2) == "test"

Process.sleep(@poll_interval)

{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(sentinel, 8) == ["peek_empty"]
end

test "prepare_replication is idempotent", %{conn: conn, slot_name: slot_name} do
{:ok, _} = Replications.prepare_replication(conn, slot_name)
Process.sleep(@poll_interval)
{:ok, _} = Replications.prepare_replication(conn, slot_name)
end

test "terminate_backend returns slot_not_found for unknown slots", %{conn: conn} do
assert {:error, :slot_not_found} =
Replications.terminate_backend(conn, "nonexistent_slot_#{System.unique_integer([:positive])}")
end

test "get_pg_stat_activity_diff returns elapsed seconds for active connection", %{conn: conn} do
{:ok, %Postgrex.Result{rows: [[pid]]}} = Postgrex.query(conn, "SELECT pg_backend_pid()", [])
Postgrex.query!(conn, "SET application_name = 'realtime_rls'", [])
Process.sleep(@poll_interval)

assert {:ok, diff} = Replications.get_pg_stat_activity_diff(conn, pid)
assert is_integer(diff)
end
end

describe "peek vs RLS distinction" do
setup do
tenant = Containers.checkout_tenant(run_migrations: true)

{:ok, conn} =
tenant
|> Database.from_tenant("realtime_rls")
|> Map.from_struct()
|> Keyword.new()
|> Postgrex.start_link()

slot_name = "supabase_realtime_rls_slot_#{System.unique_integer([:positive])}"

on_exit(fn ->
try do
Postgrex.query(conn, "select pg_drop_replication_slot($1)", [slot_name])
catch
_, _ -> :ok
end
end)

{:ok, subscription_params} =
Subscriptions.parse_subscription_params(%{
"event" => "*",
"schema" => "public",
"table" => "test",
"filter" => "details=eq.no_match"
})

params_list = [%{claims: %{"role" => "anon"}, id: UUID.uuid1(), subscription_params: subscription_params}]
{:ok, _} = Subscriptions.create(conn, @publication, params_list, self(), self())
{:ok, _} = Replications.prepare_replication(conn, slot_name)

Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

%{conn: conn, slot_name: slot_name}
end

test "returns 0 rows when WAL changes exist but are filtered by subscription", %{
conn: conn,
slot_name: slot_name
} do
# Peek is empty - sentinel row
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(sentinel, 8) == ["peek_empty"]

# Insert a row that doesn't match the filter (details != "no_match")
Postgrex.query!(conn, "INSERT INTO public.test (details) VALUES ('rls_filtered')", [])
Process.sleep(@poll_interval)

# WAL changes consumed but subscription filter doesn't match - 0 rows, no sentinel
{:ok, %Postgrex.Result{num_rows: 0}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

Process.sleep(@poll_interval)

# After consumption, peek is empty again - sentinel returned
{:ok, %Postgrex.Result{num_rows: 1, rows: [sentinel]}} =
Replications.list_changes(conn, slot_name, @publication, 100, 1_048_576)

assert Enum.at(sentinel, 8) == ["peek_empty"]
end
end
end