Skip to content

Commit

Permalink
more postgres replication
Browse files Browse the repository at this point in the history
  • Loading branch information
dingxiong committed Aug 31, 2024
1 parent db9f252 commit 95b72c4
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 8 deletions.
22 changes: 19 additions & 3 deletions _posts/2023-12-08-debezium.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,12 +174,25 @@ the latest insert lsn. So in this case, Debezium starts streaming changes from
latest lsn.

When Debezium reconnect, it tries to look up the offset checkpoint from Kafka.
If found, it uses the offset to start replication streaming. See
If found, it uses it to start replication streaming. See
[code](https://github.com/debezium/debezium/blob/493e1e23b0633a4e4d990e43733e249343599af5/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L143-L143).
If not found, for example, manually deleted, then it uses the lsn of
replication slot. See
However, on Postgres server side, it uses
`max(confirmed_flush_lsn of slot, user passed in lsn)`. So it is not guaranteed
that the lsn provided by Debezium will take effect. On the other hand, if not
found, for example, manually deleted, then it uses the lsn of the replication
slot. See
[code](https://github.com/debezium/debezium/blob/493e1e23b0633a4e4d990e43733e249343599af5/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java#L341-L341).

#### How should we move the offset forward?

It may happen that there is a burst of database updates and we want to skip
them. Then we need a way to reset the offset to the latest lsn. A simple
solution would be

1. Stop Debezium.
2. Delete replication slot `select pg_drop_replication_slot('xxxx');`
3. Start Debezium.

### Source Structure

Every Kafka message has a "source" field as below.
Expand Down Expand Up @@ -217,5 +230,8 @@ concatenating the two parts and converted to decimal. For example:
C0/F72DAA60 => echo $((16#C0F72DAA60)) = 828780685920
```

Note, the second part should take 8 bytes. For example, for `1/190D7D0` we
should patch it to `1/0190D7D0`.

The Java implementation is
[here](https://github.com/pgjdbc/pgjdbc/blob/e33be5c0481c22f4242a5d7ef2d2c09c8a17179f/pgjdbc/src/main/java/org/postgresql/replication/LogSequenceNumber.java#L44-L44).
31 changes: 26 additions & 5 deletions _posts/2024-08-25-postgres-replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ client, and stops after receiving a `CopyDone` message from the client. When
there is no new WALs, the server process enters a
[wait state](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/replication/walsender.c#L2886).
This wait is implemeted using epoll or kevent depending on the underlying OS,
and it waits for two types of events: socket event from the client and
conditional variable events on new WAL. Every time when a new WAL is flushed to
and it waits for two types of events: socket events from the client and
conditional variable events of new WAL. Every time when a new WAL is flushed to
disk, this conditional variable is woke up. See
[code](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/access/transam/xlog.c#L2499).
Note, many processes are running at the same time ^\_^.
Expand All @@ -267,9 +267,30 @@ $ ps -ef | grep -i postgres
```

One special note about the start `lsn`. When the start `lsn` provided in the
`START_REPLICATION` command is older than slot's `confirmed_flush_lsn`, then it
is reset to `confirmed_flush_lsn`. See
`START_REPLICATION` command is older than slot's `confirmed_flush_lsn`, it is
reset to `confirmed_flush_lsn`. See
[code](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/replication/logical/logical.c#L567).
When the start `lsn` is newer than slot's `confirmed_flush_lsn`, then WALs
between `[confirmed_flush_lsn, provided lsn)` are ignored. The subtlety is that
the walsender process still processes the WALs in range
`[confirmed_flush_lsn, provided lsn]`, but these records are thrown away in
streaming. The core code is
[this function](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/replication/logical/snapbuild.c#L433).
The corresponding stack path is

```
PostgresMain
-> exec_replication_command
-> WalSndLoop
-> XLogSendLogical
-> ogicalDecodingProcessRecord
-> heap_decode
-> SnapBuildProcessChange
-> ReorderBufferSetBaseSnapshot
-> AssertTXNLsnOrder
-> SnapBuildXactNeedsSkip
```

Also, when creating a new replication slot by
`pg_create_logical_replication_slot`, it will set the `confirmed_flush_lsn` to
`pg_current_wal_insert_lsn()`. See
Expand All @@ -294,7 +315,7 @@ postgres=# select pg_current_wal_insert_lsn();
0/190D798
```

Also, during the while loop, the client sends
During the while loop, the client sends
[Standby Status Update](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/bin/pg_basebackup/pg_recvlogical.c#L144)
messages to the server to inform the written and flushed `lsn`. On the server
side, the corresponding `lsn` in-memory fields are
Expand Down

0 comments on commit 95b72c4

Please sign in to comment.