Skip to content

Commit b4b3211

Browse files
authored
REPLICATION: Allow starting one stream right after another (#693)
1 parent 1daa656 commit b4b3211

File tree

2 files changed

+48
-2
lines changed

2 files changed

+48
-2
lines changed

lib/postgrex/replication_connection.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -527,8 +527,8 @@ defmodule Postgrex.ReplicationConnection do
527527
defp handle_data([], s), do: {:keep_state, s}
528528

529529
defp handle_data([:copy_done | copies], %{state: {mod, mod_state}} = s) do
530-
with {:keep_state, s} <- handle(mod, :handle_data, [:done, mod_state], nil, s) do
531-
handle_data(copies, %{s | streaming: nil})
530+
with {:keep_state, s} <- handle(mod, :handle_data, [:done, mod_state], nil, %{s | streaming: nil}) do
531+
handle_data(copies, s)
532532
end
533533
end
534534

test/replication_connection_test.exs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ defmodule ReplicationTest do
5050
{:noreply, [reply], pid}
5151
end
5252

53+
# This is part of the "stream_continuation" test and handles the COPY :done
54+
# state. It will start another stream right away by starting the replication
55+
# slot.
56+
def handle_data(:done, %{pid: pid, test: "stream_continuation"}) do
57+
send(pid, {:done, System.unique_integer([:monotonic])})
58+
query = "START_REPLICATION SLOT postgrex_test LOGICAL 0/0 (proto_version '1', publication_names 'postgrex_example')"
59+
60+
{:stream, query, [], pid}
61+
end
62+
63+
# This is part of the "stream_continuation" test and handles the COPY results.
64+
def handle_data(msg, %{pid: pid, test: "stream_continuation"} = s) do
65+
send(pid, {msg, System.unique_integer([:monotonic])})
66+
{:noreply, [], s}
67+
end
68+
5369
def handle_data(msg, pid) do
5470
send(pid, {msg, System.unique_integer([:monotonic])})
5571
{:noreply, [], pid}
@@ -80,6 +96,12 @@ defmodule ReplicationTest do
8096
{:query, query, {from, pid}}
8197
end
8298

99+
# This is part of the "stream_continuation" test and handles call that
100+
# triggers that chain of events.
101+
def handle_call({:query, query, %{test: "stream_continuation", next_query: _} = opts}, from, pid) do
102+
{:query, query, Map.merge(opts, %{from: from, pid: pid})}
103+
end
104+
83105
@impl true
84106
def handle_call({:disconnect, reason}, _, _) do
85107
{:disconnect, reason}
@@ -97,6 +119,12 @@ defmodule ReplicationTest do
97119
{:noreply, pid}
98120
end
99121

122+
# Handles the result of the "stream_continuation" query call. It is the results of the slot creation.
123+
def handle_result(results, %{from: from, test: "stream_continuation", next_query: next_query} = s) do
124+
Postgrex.ReplicationConnection.reply(from, {:ok, results})
125+
{:stream, next_query, [], Map.delete(s, :next_query)}
126+
end
127+
100128
@epoch DateTime.to_unix(~U[2000-01-01 00:00:00Z], :microsecond)
101129
defp current_time(), do: System.os_time(:microsecond) - @epoch
102130
end
@@ -288,6 +316,24 @@ defmodule ReplicationTest do
288316
# Can query after copy is done
289317
{:ok, [%Postgrex.Result{}]} = PR.call(context.repl, {:query, "SELECT 1"})
290318
end
319+
320+
test "allow replication stream right after a COPY stream", context do
321+
P.query!(context.pid, "INSERT INTO repl_test VALUES ($1, $2), ($3, $4)", [42, "42", 1, "1"])
322+
323+
query = "CREATE_REPLICATION_SLOT postgrex_test TEMPORARY LOGICAL pgoutput NOEXPORT_SNAPSHOT"
324+
next_query = "COPY repl_test TO STDOUT"
325+
326+
PR.call(
327+
context.repl,
328+
{:query, query, %{test: "stream_continuation", next_query: next_query}}
329+
)
330+
331+
assert_receive {"42\t42\n", i1}, @timeout
332+
assert_receive {"1\t1\n", i2} when i1 < i2, @timeout
333+
assert_receive {:done, i3} when i2 < i3, @timeout
334+
# Prior to allowing one stream to start after another, this would fail
335+
assert_receive <<?k, _::64, _::64, _>>, @timeout
336+
end
291337
end
292338

293339
defp start_replication(repl) do

0 commit comments

Comments
 (0)