Skip to content

Commit 0245dcc

Browse files
authored
Merge pull request #3144 from rockwotj/pg_snap
pgcdc: fix snapshot consistency
2 parents 9a386af + 8da2870 commit 0245dcc

File tree

13 files changed

+621
-413
lines changed

13 files changed

+621
-413
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,20 @@ All notable changes to this project will be documented in this file.
1313
- Field `avro.preserve_logical_types` for processor `schema_registry_decode` was added to preserve logical types instead of decoding them as their primitive representation. (@rockwotj)
1414
- Processor `schema_registry_decode` now adds metadata `schema_id` for the schema's ID in the schema registry. (@rockwotj)
1515
- Field `schema_evolution.processors` added to `snowpipe_streaming` to support side effects or enrichment during schema evolution. (@rockwotj)
16+
- Field `unchanged_toast_value` added to `postgres_cdc` to control the value substituted for unchanged toast values when a table does not have full replica identity. (@rockwotj)
17+
18+
### Fixed
19+
20+
- Fix a snapshot stream consistency issue with `postgres_cdc` where data could be missed if writes were happening during the snapshot phase. (@rockwotj)
21+
- Fix an issue where `@table` metadata was quoted for the snapshot phase in `postgres_cdc`. (@rockwotj)
1622

1723
### Changed
1824

1925
- Field `avro_raw_json` was deprecated in favor of `avro.raw_unions` for processor `schema_registry_decode`. (@rockwotj)
2026
- The `snowpipe_streaming` output now has better error handling for authentication failures when uploading to cloud storage. (@rockwotj)
2127
- Field `schema_evolution.new_column_type_mapping` for `snowpipe_streaming` is deprecated and can be replaced with `schema_evolution.processors`. (@rockwotj)
2228
- Increased the default values for `max_message_bytes` and `broker_write_max_bytes` by using IEC units instead of SI units. This better matches defaults in Redpanda and Kafka. (@rockwotj)
29+
- Dropped support for postgres 10 and 11 in `postgres_cdc`. (@rockwotj)
2330

2431
## 4.45.1 - 2025-01-17
2532

docs/modules/components/pages/inputs/pg_stream.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ input:
8888
pg_standby_timeout: 10s
8989
pg_wal_monitor_interval: 3s
9090
max_parallel_snapshot_tables: 1
91+
unchanged_toast_value: null
9192
auto_replay_nacks: true
9293
batching:
9394
count: 0
@@ -285,6 +286,21 @@ Int specifies a number of tables that will be processed in parallel during the s
285286
286287
*Default*: `1`
287288
289+
=== `unchanged_toast_value`
290+
291+
The value to emit when there are unchanged TOAST values in the stream. This occurs for updates and deletes where REPLICA IDENTITY is not FULL.
292+
293+
294+
*Type*: `unknown`
295+
296+
*Default*: `null`
297+
298+
```yml
299+
# Examples
300+
301+
unchanged_toast_value: __redpanda_connect_unchanged_toast_value__
302+
```
303+
288304
=== `auto_replay_nacks`
289305
290306
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

docs/modules/components/pages/inputs/postgres_cdc.adoc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ input:
8383
pg_standby_timeout: 10s
8484
pg_wal_monitor_interval: 3s
8585
max_parallel_snapshot_tables: 1
86+
unchanged_toast_value: null
8687
auto_replay_nacks: true
8788
batching:
8889
count: 0
@@ -280,6 +281,21 @@ Int specifies a number of tables that will be processed in parallel during the s
280281
281282
*Default*: `1`
282283
284+
=== `unchanged_toast_value`
285+
286+
The value to emit when there are unchanged TOAST values in the stream. This occurs for updates and deletes where REPLICA IDENTITY is not FULL.
287+
288+
289+
*Type*: `unknown`
290+
291+
*Default*: `null`
292+
293+
```yml
294+
# Examples
295+
296+
unchanged_toast_value: __redpanda_connect_unchanged_toast_value__
297+
```
298+
283299
=== `auto_replay_nacks`
284300
285301
Whether messages that are rejected (nacked) at the output level should be automatically replayed indefinitely, eventually resulting in back pressure if the cause of the rejections is persistent. If set to `false` these messages will instead be deleted. Disabling auto replays can greatly improve memory efficiency of high throughput streams as the original shape of the data can be discarded immediately upon consumption and mutation.

internal/impl/postgresql/input_pg_stream.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ const (
4040
fieldSlotName = "slot_name"
4141
fieldBatching = "batching"
4242
fieldMaxParallelSnapshotTables = "max_parallel_snapshot_tables"
43+
fieldUnchangedToastValue = "unchanged_toast_value"
4344

4445
shutdownTimeout = 5 * time.Second
4546
)
@@ -111,6 +112,11 @@ This input adds the following metadata fields to each message:
111112
Field(service.NewIntField(fieldMaxParallelSnapshotTables).
112113
Description("Int specifies a number of tables that will be processed in parallel during the snapshot processing stage").
113114
Default(1)).
115+
Field(service.NewAnyField(fieldUnchangedToastValue).
116+
Description("The value to emit when there are unchanged TOAST values in the stream. This occurs for updates and deletes where REPLICA IDENTITY is not FULL.").
117+
Default(nil).
118+
Example("__redpanda_connect_unchanged_toast_value__").
119+
Advanced()).
114120
Field(service.NewAutoRetryNacksToggleField()).
115121
Field(service.NewBatchPolicyField(fieldBatching))
116122
}
@@ -131,6 +137,7 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser
131137
maxParallelSnapshotTables int
132138
pgStandbyTimeout time.Duration
133139
batching service.BatchPolicy
140+
unchangedToastValue any
134141
)
135142

136143
if err := license.CheckRunningEnterprise(mgr); err != nil {
@@ -206,6 +213,10 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser
206213
return nil, err
207214
}
208215

216+
if unchangedToastValue, err = conf.FieldAny(fieldUnchangedToastValue); err != nil {
217+
return nil, err
218+
}
219+
209220
pgConnConfig, err := pgconn.ParseConfigWithOptions(dsn, pgconn.ParseConfigOptions{
210221
// Don't support dynamic reading of password
211222
GetSSLPassword: func(context.Context) string { return "" },
@@ -237,6 +248,7 @@ func newPgStreamInput(conf *service.ParsedConfig, mgr *service.Resources) (s ser
237248
WalMonitorInterval: walMonitorInterval,
238249
MaxParallelSnapshotTables: maxParallelSnapshotTables,
239250
Logger: mgr.Logger(),
251+
UnchangedToastValue: unchangedToastValue,
240252
},
241253
batching: batching,
242254
checkpointLimit: checkpointLimit,

0 commit comments

Comments
 (0)