Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/fix-last-processed-lsn-missing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@core/sync-service": patch
---

Fix crash when LsnTracker ETS table is empty during long-poll timeout. Return nil instead of crashing, fall back to shape offset, and align request read-only flag with runtime status. Also fix stale flushed_wal (always 0) when populating LsnTracker during replication slot creation.
12 changes: 5 additions & 7 deletions packages/sync-service/lib/electric/lsn_tracker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,12 @@ defmodule Electric.LsnTracker do
initialize_last_processed_lsn(stack_ref, Lsn.from_integer(lsn))
end

@spec get_last_processed_lsn(stack_ref()) :: Lsn.t()
@spec get_last_processed_lsn(stack_ref()) :: Lsn.t() | nil
def get_last_processed_lsn(stack_ref) do
[last_processed_lsn: lsn] =
stack_ref
|> table()
|> :ets.lookup(:last_processed_lsn)

lsn
case stack_ref |> table() |> :ets.lookup(:last_processed_lsn) do
[{:last_processed_lsn, lsn}] -> lsn
[] -> nil
end
end

@spec broadcast_last_seen_lsn(stack_ref(), Lsn.t() | non_neg_integer()) :: :ok
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ defmodule Electric.Postgres.ReplicationClient do

@impl true
def handle_result(result_list_or_error, state) do
{current_step, next_step, extra_info, return_val} =
{current_step, next_step, extra_info, updated_state, return_val} =
ConnectionSetup.process_query_result(result_list_or_error, state)

if current_step == :identify_system,
Expand All @@ -248,13 +248,17 @@ defmodule Electric.Postgres.ReplicationClient do

# for new slots, always reset the last processed LSN
if current_step == :create_slot and extra_info == :created_new_slot do
Electric.LsnTracker.set_last_processed_lsn(state.stack_id, state.flushed_wal)
Electric.LsnTracker.set_last_processed_lsn(state.stack_id, updated_state.flushed_wal)
notify_created_new_slot(state)
end

# for existing slots, populate the last processed LSN if not present
if current_step == :query_slot_flushed_lsn,
do: Electric.LsnTracker.initialize_last_processed_lsn(state.stack_id, state.flushed_wal)
do:
Electric.LsnTracker.initialize_last_processed_lsn(
state.stack_id,
updated_state.flushed_wal
)

if next_step == :ready_to_stream,
do: notify_ready_to_stream(state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do

# Process the result of executing the query, pick the next step and return the `{:query, ...}`
# tuple for it.
@spec process_query_result(query_result, state) :: {step, step, extra_info, callback_return}
@spec process_query_result(query_result, state) ::
{step, step, extra_info, state, callback_return}
def process_query_result(result, %{step: step} = state) do
{extra_info, state} =
case dispatch_query_result(step, result, state) do
Expand All @@ -41,7 +42,7 @@ defmodule Electric.Postgres.ReplicationClient.ConnectionSetup do
end

next_step = next_step(state)
{step, next_step, extra_info, query_for_step(next_step, %{state | step: next_step})}
{step, next_step, extra_info, state, query_for_step(next_step, %{state | step: next_step})}
end

# Instruct `Postgrex.ReplicationConnection` to switch the connection into the logical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,15 @@ defmodule Electric.Replication.ShapeLogCollector do
end

def handle_call(:mark_as_ready, _from, state) do
lsn = LsnTracker.get_last_processed_lsn(state.stack_id)
offset = LogOffset.new(Lsn.to_integer(lsn), :infinity)
offset =
case LsnTracker.get_last_processed_lsn(state.stack_id) do
%Lsn{} = lsn ->
LogOffset.new(Lsn.to_integer(lsn), :infinity)

nil ->
raise "LsnTracker must be populated before marking shape_log_collector as ready"
end

Electric.StatusMonitor.mark_shape_log_collector_ready(state.stack_id, self())
{:reply, :ok, Map.put(state, :last_processed_offset, offset)}
end
Expand Down
27 changes: 14 additions & 13 deletions packages/sync-service/lib/electric/shapes/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -459,21 +459,17 @@ defmodule Electric.Shapes.Api do
%{request | new_changes_pid: self(), new_changes_ref: ref}
end

# In read-only mode, LsnTracker isn't populated (no replication connection).
# Use the shape's own latest offset as the LSN — it's the Postgres LSN of
# the last transaction persisted for this shape, which is the best available
# value without the replication stream or persisting the LsnTracker or loading
# all shapes' latest offsets up front. If we need stronger guarantees on this
# we should be persisting the LsnTracker updates to a file.
defp determine_global_last_seen_lsn(%Request{read_only?: true} = request) do
%{request | global_last_seen_lsn: request.last_offset.tx_offset}
end

# When the LsnTracker is populated (active mode), use the global last
# processed LSN. Otherwise fall back to the shape's own latest offset —
# the Postgres LSN of the last transaction persisted for this shape.
# This covers read-only mode (no replication connection) and transient
# states where the LsnTracker has been reset (e.g. process restarts).
defp determine_global_last_seen_lsn(%Request{} = request) do
offset =
request.api.stack_id
|> Electric.LsnTracker.get_last_processed_lsn()
|> Electric.Postgres.Lsn.to_integer()
case Electric.LsnTracker.get_last_processed_lsn(request.api.stack_id) do
nil -> request.last_offset.tx_offset
lsn -> Electric.Postgres.Lsn.to_integer(lsn)
end

%{request | global_last_seen_lsn: offset}
end
Expand Down Expand Up @@ -900,6 +896,11 @@ defmodule Electric.Shapes.Api do

cond do
request.read_only? or status.shape == :read_only ->
# Align the request flag with the current runtime status so that
# downstream functions (determine_log_chunk_offset, get_merged_log_stream,
# etc.) use the correct read-only strategy.
request = %{request | read_only?: true}

# No consumer is running (or it stopped), so check if the
# active instance has flushed new data to disk.
case check_for_disk_updates(request) do
Expand Down
4 changes: 4 additions & 0 deletions packages/sync-service/test/electric/lsn_tracker_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ defmodule Electric.LsnTrackerTest do
:ok
end

test "returns nil when not yet populated", %{stack_id: stack_id} do
assert LsnTracker.get_last_processed_lsn(stack_id) == nil
end

test "returns inital lsn", %{stack_id: stack_id} do
lsn = Lsn.from_integer(7)
LsnTracker.set_last_processed_lsn(stack_id, lsn)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
defmodule Electric.Postgres.ReplicationClient.ConnectionSetupTest do
use ExUnit.Case, async: true

alias Electric.Postgres.ReplicationClient.ConnectionSetup
alias Electric.Postgres.ReplicationClient.State
alias Electric.Postgres.Lsn

defp base_state(overrides) do
Map.merge(
%State{
handle_event: fn _, _ -> :ok end,
publication_name: "test_pub",
slot_name: "test_slot",
display_settings: ["SET dummy = 'test'"],
flushed_wal: 0
},
overrides
)
end

describe "process_query_result/2 returns updated state" do
test "create_slot result includes updated flushed_wal in returned state" do
slot_lsn = "0/1A2B3C4"
expected_wal = slot_lsn |> Lsn.from_string() |> Lsn.to_integer()

state = base_state(%{step: :create_slot})

create_result = [
%Postgrex.Result{
command: :create,
columns: ["slot_name", "consistent_point", "snapshot_name", "output_plugin"],
rows: [["test_slot", slot_lsn, nil, "pgoutput"]],
num_rows: 1
}
]

{_step, _next_step, :created_new_slot, updated_state, _return_val} =
ConnectionSetup.process_query_result(create_result, state)

assert updated_state.flushed_wal == expected_wal
end

test "query_slot_flushed_lsn result includes updated flushed_wal in returned state" do
slot_lsn = "0/5D6E7F8"
expected_wal = slot_lsn |> Lsn.from_string() |> Lsn.to_integer()

state = base_state(%{step: :query_slot_flushed_lsn, flushed_wal: 0})

query_result = [
%Postgrex.Result{
command: :select,
columns: ["confirmed_flush_lsn"],
rows: [[slot_lsn]],
num_rows: 1
}
]

{_step, _next_step, _extra_info, updated_state, _return_val} =
ConnectionSetup.process_query_result(query_result, state)

assert updated_state.flushed_wal == expected_wal
end
end
end
66 changes: 66 additions & 0 deletions packages/sync-service/test/electric/shapes/api_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,72 @@ defmodule Electric.Shapes.ApiTest do
assert request.global_last_seen_lsn == @start_offset_50.tx_offset
end

test "falls back to shape offset for global_last_seen_lsn when LsnTracker is not populated",
ctx do
# Simulate active mode but with an empty LsnTracker — e.g. after a
# ShapeStatusOwner crash/restart that wipes the ETS table.
{:via, _, {registry_name, registry_key}} = Electric.Shapes.Supervisor.name(ctx.stack_id)
{:ok, _} = Registry.register(registry_name, registry_key, nil)
Electric.LsnTracker.initialize(ctx.stack_id)
# Do NOT populate the LsnTracker — leave it empty
set_status_to_active(ctx)

Repatch.patch(Electric.ShapeCache, :resolve_shape_handle, fn @test_shape_handle,
@test_shape,
_stack_id,
_opts ->
{@test_shape_handle, @start_offset_50}
end)

assert {:ok, request} =
Api.validate(ctx.api, %{
table: "public.users",
handle: @test_shape_handle,
offset: "#{@start_offset_50}"
})

assert request.read_only? == false
# Falls back to the shape's last offset when LsnTracker is empty
assert request.global_last_seen_lsn == @start_offset_50.tx_offset
end

@tag long_poll_timeout: 100
test "long poll timeout does not crash when LsnTracker is not populated", ctx do
# Simulate active mode but with an empty LsnTracker
{:via, _, {registry_name, registry_key}} = Electric.Shapes.Supervisor.name(ctx.stack_id)
{:ok, _} = Registry.register(registry_name, registry_key, nil)
Electric.LsnTracker.initialize(ctx.stack_id)
set_status_to_active(ctx)

patch_shape_cache(
has_shape?: fn @test_shape_handle, _opts -> true end,
await_snapshot_start: fn @test_shape_handle, _ -> :started end,
resolve_shape_handle: fn @test_shape_handle, @test_shape, _stack_id, _opts ->
{@test_shape_handle, @test_offset}
end
)

patch_storage(
for_shape: fn @test_shape_handle, _opts -> @test_opts end,
get_chunk_end_log_offset: fn _, @test_opts -> nil end,
get_log_stream: fn @test_offset, _, @test_opts -> [] end
)

assert {:ok, request} =
Api.validate(ctx.api, %{
table: "public.users",
offset: "#{@test_offset}",
handle: @test_shape_handle,
live: true
})

assert request.read_only? == false
assert response = Api.serve_shape_response(request)
assert response.status == 200
assert response.no_changes
assert [%{headers: %{control: "up-to-date"}}] = response_body(response)
end

@tag stack_ready_timeout: 100
test "waits for active when shape not found in read-only mode", ctx do
set_read_only(ctx)
Expand Down
Loading