-
Hi, I've followed the documentation trying to use my own database and tables, but I'm unable to get this working. Docker Composeversion: '3.6'
networks:
app: {}
services:
postgres:
restart: always
image: debezium/postgres:14-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=trading
ports:
- 5432:5432
networks:
- app
volumes:
- ./modules/forecasts/src/main/resources/db/migration/V1__baseline.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres -d trading"]
interval: 5s
timeout: 20s
retries: 15
pulsar:
restart: always
image: apachepulsar/pulsar:2.10.1
ports:
- 6650:6650
- 8080:8080
command: >
/bin/bash -c "bin/pulsar standalone"
networks:
- app
volumes:
- ./pulsarconf/standalone.conf:/pulsar/conf/standalone.conf
- ./pulsarconf/connectors/pulsar-io-debezium-postgres-2.10.1.nar:/pulsar/connectors/debezium-2.10.1.nar
- ./pulsarconf/debezium-postgres-config.yaml:/pulsar/conf/debezium-pg.yaml
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/admin/v2/brokers/health"]
interval: 2s
timeout: 5s
retries: 15
start_period: 2m Debezium Postgres configI've noticed that tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
#plugin.name: "wal2json"
plugin.name: "pgoutput"
schema.whitelist: "trading"
database.whitelist: "trading"
table.whitelist: "trading.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650" Pulsar Debezium ConnectorOnce Postgres and Pulsar are up, I run the following command to start the Pulsar Debezium connector. $ docker-compose exec -T pulsar bin/pulsar-admin source localrun --source-config-file /pulsar/conf/debezium-pg.yaml
...
2022-09-20T08:10:22,579+0000 [pool-3-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:10:22,604+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:10:22,607+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Searching for WAL resume position The log shows the last lines of output. Once there is some activity in my database (e.g. insert or update), I see this log only for the first operation. Then nothing else happens, no more logs, even if I continue to insert records. 2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - First LSN 'LSN{0/1886078}' received
2022-09-20T08:11:18,247+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - WAL resume position 'LSN{0/1886078}' discovered
2022-09-20T08:11:18,248+0000 [pool-4-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,249+0000 [pool-5-thread-1] INFO io.debezium.jdbc.JdbcConnection - Connection gracefully closed
2022-09-20T08:11:18,256+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.PostgresReplicationConnection - Initializing PgOutput logical decoder publication
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Requested thread factory for connector PostgresConnector, id = dbserver named = keep-alive
2022-09-20T08:11:18,268+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.util.Threads - Creating thread debezium-postgresconnector-dbserver-keep-alive
2022-09-20T08:11:18,269+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.PostgresStreamingChangeEventSource - Processing messages
2022-09-20T08:11:18,775+0000 [debezium-postgresconnector-dbserver-change-event-source-coordinator] INFO io.debezium.connector.postgresql.connection.WalPositionLocator - Message with LSN 'LSN{0/1886078}' arrived, switching off the filtering PostgreSQLOn the Postgres side, everything seems alright. I'm able to create and update records. $ select * from authors;
id | name | website
--------------------------------------+--------+---------
37d58e4c-8379-4fac-beb4-88f5d99adac5 | gvolpe | gvolpe.com
d35b1167-8442-4dc3-bef4-589f36b982d7 | jdoe |
(2 rows) ConsumerThe consumer doesn't get any events. This is the full log. $ docker-compose exec pulsar bin/pulsar-client consume -s "cdc-authors" persistent://public/default/dbserver.trading.authors
2022-09-20T08:11:38,357+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConnectionPool - [[id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650]] Connected to server
2022-09-20T08:11:38,457+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Starting Pulsar consumer status recorder with config: {"topicNames":["persistent://public/default/dbserver.trading.authors"],"topicsPattern":null,"subscriptionName":"cdc-authors","subscriptionType":"Exclusive","subscriptionProperties":null,"subscriptionMode":"Durable","receiverQueueSize":1000,"acknowledgementsGroupTimeMicros":100000,"negativeAckRedeliveryDelayMicros":60000000,"maxTotalReceiverQueueSizeAcrossPartitions":50000,"consumerName":null,"ackTimeoutMillis":0,"tickDurationMillis":1000,"priorityLevel":0,"maxPendingChunkedMessage":10,"autoAckOldestChunkedMessageOnQueueFull":false,"expireTimeOfIncompleteChunkedMessageMillis":60000,"cryptoFailureAction":"FAIL","properties":{},"readCompacted":false,"subscriptionInitialPosition":"Latest","patternAutoDiscoveryPeriod":60,"regexSubscriptionMode":"PersistentOnly","deadLetterPolicy":null,"retryEnable":false,"autoUpdatePartitions":true,"autoUpdatePartitionsIntervalSeconds":60,"replicateSubscriptionState":false,"resetIncludeHead":false,"keySharedPolicy":null,"batchIndexAckEnabled":false,"ackReceiptEnabled":false,"poolMessages":true,"startPaused":false,"maxPendingChuckedMessage":10}
2022-09-20T08:11:38,470+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - Pulsar client config: {"serviceUrl":"pulsar://localhost:6650/","authPluginClassName":null,"authParams":null,"authParamMap":null,"operationTimeoutMs":30000,"lookupTimeoutMs":30000,"statsIntervalSeconds":60,"numIoThreads":1,"numListenerThreads":1,"connectionsPerBroker":1,"useTcpNoDelay":true,"useTls":false,"tlsTrustCertsFilePath":"","tlsAllowInsecureConnection":false,"tlsHostnameVerificationEnable":false,"concurrentLookupRequest":5000,"maxLookupRequest":50000,"maxLookupRedirects":20,"maxNumberOfRejectedRequestPerConnection":50,"keepAliveIntervalSeconds":30,"connectionTimeoutMs":10000,"requestTimeoutMs":60000,"initialBackoffIntervalNanos":100000000,"maxBackoffIntervalNanos":60000000000,"enableBusyWait":false,"listenerName":null,"useKeyStoreTls":false,"sslProvider":null,"tlsTrustStoreType":"JKS","tlsTrustStorePath":"","tlsTrustStorePassword":"","tlsCiphers":[],"tlsProtocols":[],"memoryLimitBytes":67108864,"proxyServiceUrl":null,"proxyProtocol":null,"enableTransaction":false,"dnsLookupBindAddress":null,"dnsLookupBindPort":0,"socks5ProxyAddress":null,"socks5ProxyUsername":null,"socks5ProxyPassword":null}
2022-09-20T08:11:38,479+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribing to topic on cnx [id: 0xd4bdab64, L:/127.0.0.1:59816 - R:localhost/127.0.0.1:6650], consumerId 0
2022-09-20T08:11:38,542+0000 [pulsar-client-io-1-1] INFO org.apache.pulsar.client.impl.ConsumerImpl - [persistent://public/default/dbserver.trading.authors][cdc-authors] Subscribed to topic on localhost/127.0.0.1:6650 -- consumer: 0 TopicsBefore I run the consumer and after I produce SQL data, these are the only topics I see. $ docker-compose exec -T pulsar bin/pulsar-admin topics list public/default
persistent://public/default/__transaction_buffer_snapshot
persistent://public/default/__change_events
persistent://public/default/debezium-postgres-source-debezium-offset-topic
persistent://public/default/debezium-postgres-topic I would expect to have Has anyone out there been able to get this working with Pulsar 2.10.1 or newer? Would appreciate any help, thanks! |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment
-
Ok, managed to get this working with the following configuration: tenant: "public"
namespace: "default"
name: "debezium-postgres-source"
topicName: "debezium-postgres-topic"
archive: "connectors/debezium-2.10.1.nar"
parallelism: 1
configs:
database.hostname: "postgres"
database.port: "5432"
database.user: "postgres"
database.password: "postgres"
database.dbname: "trading"
database.server.name: "dbserver"
plugin.name: "pgoutput"
schema.whitelist: "public"
table.whitelist: "public.authors"
pulsar.service.url: "pulsar://pulsar:6650"
database.history.pulsar.service.url: "pulsar://pulsar:6650" So it doesn't matter if one sets |
Beta Was this translation helpful? Give feedback.
Ok, managed to get this working with the following configuration:
So it doesn't matter if one sets
database.whitelist: "my…