diff --git a/_posts/2023-12-08-debezium.md b/_posts/2023-12-08-debezium.md index 287423c..abc8e5e 100644 --- a/_posts/2023-12-08-debezium.md +++ b/_posts/2023-12-08-debezium.md @@ -174,12 +174,25 @@ the latest insert lsn. So in this case, Debezium starts streaming changes from latest lsn. When Debezium reconnect, it tries to look up the offset checkpoint from Kafka. -If found, it uses the offset to start replication streaming. See +If found, it uses it to start replication streaming. See [code](https://github.com/debezium/debezium/blob/493e1e23b0633a4e4d990e43733e249343599af5/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java#L143-L143). -If not found, for example, manually deleted, then it uses the lsn of -replication slot. See +However, on Postgres server side, it uses +`max(confirmed_flush_lsn of slot, user passed in lsn)`. So it is not guaranteed +that the lsn provided by Debezium will take effect. On the other hand, if not +found, for example, manually deleted, then it uses the lsn of the replication +slot. See [code](https://github.com/debezium/debezium/blob/493e1e23b0633a4e4d990e43733e249343599af5/debezium-connector-postgres/src/main/java/io/debezium/connector/postgresql/connection/PostgresReplicationConnection.java#L341-L341). +#### How should we move the offset forward? + +It may happen that there is a burst of database updates and we want to skip +them. Then we need a way to reset the offset to the latest lsn. A simple +solution would be + +1. Stop Debezium. +2. Delete replication slot `select pg_drop_replication_slot('xxxx');` +3. Start Debezium. + ### Source Structure Every Kafka message has a "source" field as below. @@ -217,5 +230,8 @@ concatenating the two parts and converted to decimal. For example: C0/F72DAA60 => echo $((16#C0F72DAA60)) = 828780685920 ``` +Note, the second part should take 8 bytes. For example, for `1/190D7D0` we +should patch it to `1/0190D7D0`. + The Java implementation is [here](https://github.com/pgjdbc/pgjdbc/blob/e33be5c0481c22f4242a5d7ef2d2c09c8a17179f/pgjdbc/src/main/java/org/postgresql/replication/LogSequenceNumber.java#L44-L44). diff --git a/_posts/2024-08-25-postgres-replication.md b/_posts/2024-08-25-postgres-replication.md index 777326d..93a7b5d 100644 --- a/_posts/2024-08-25-postgres-replication.md +++ b/_posts/2024-08-25-postgres-replication.md @@ -252,8 +252,8 @@ client, and stops after receiving a `CopyDone` message from the client. When there is no new WALs, the server process enters a [wait state](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/replication/walsender.c#L2886). This wait is implemeted using epoll or kevent depending on the underlying OS, -and it waits for two types of events: socket event from the client and -conditional variable events on new WAL. Every time when a new WAL is flushed to +and it waits for two types of events: socket events from the client and +conditional variable events of new WAL. Every time when a new WAL is flushed to disk, this conditional variable is woke up. See [code](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/access/transam/xlog.c#L2499). Note, many processes are running at the same time ^\_^. @@ -267,9 +267,30 @@ $ ps -ef | grep -i postgres ``` One special note about the start `lsn`. When the start `lsn` provided in the -`START_REPLICATION` command is older than slot's `confirmed_flush_lsn`, then it -is reset to `confirmed_flush_lsn`. See +`START_REPLICATION` command is older than slot's `confirmed_flush_lsn`, it is +reset to `confirmed_flush_lsn`. See [code](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/replication/logical/logical.c#L567). +When the start `lsn` is newer than slot's `confirmed_flush_lsn`, then WALs +between `[confirmed_flush_lsn, provided lsn)` are ignored. The subtlety is that +the walsender process still processes the WALs in range +`[confirmed_flush_lsn, provided lsn]`, but these records are thrown away in +streaming. The core code is +[this function](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/backend/replication/logical/snapbuild.c#L433). +The corresponding stack path is + +``` +PostgresMain + -> exec_replication_command + -> WalSndLoop + -> XLogSendLogical + -> ogicalDecodingProcessRecord + -> heap_decode + -> SnapBuildProcessChange + -> ReorderBufferSetBaseSnapshot + -> AssertTXNLsnOrder + -> SnapBuildXactNeedsSkip +``` + Also, when creating a new replication slot by `pg_create_logical_replication_slot`, it will set the `confirmed_flush_lsn` to `pg_current_wal_insert_lsn()`. See @@ -294,7 +315,7 @@ postgres=# select pg_current_wal_insert_lsn(); 0/190D798 ``` -Also, during the while loop, the client sends +During the while loop, the client sends [Standby Status Update](https://github.com/postgres/postgres/blob/a3e6c6f929912f928fa405909d17bcbf0c1b03ee/src/bin/pg_basebackup/pg_recvlogical.c#L144) messages to the server to inform the written and flushed `lsn`. On the server side, the corresponding `lsn` in-memory fields are