Skip to content

Commit f048613

Browse files
authored
Update pubsub consumer (#27)
* Update pubsub consumer * PR fixes
1 parent eb6272b commit f048613

File tree

7 files changed

+700
-3
lines changed

7 files changed

+700
-3
lines changed

config/base/test_websocket_client.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import websocket
1616
import json
1717
import sys
18-
import time
1918
from datetime import datetime
2019

2120

config/base/usdc_transfers_rpc.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pipelines:
3131
- "CCW67TSZV3SSS2HXMBQ5JFGCKJNXKZM7UQUWUZPUTHXSTZLEO7SJMI75"
3232
topics:
3333
# Transfer event signature: transfer(from: Address, to: Address, amount: i128)
34-
# Base64 encoded "transfer"
34+
# XDR-encoded topic filter (base64 format)
3535
- ["AAAADwAAAAh0cmFuc2Zlcg==", "*", "*", "*"]
3636

3737
# Pagination settings
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
# Kwickbit Payments to Google Pub/Sub V2 Format Example
2+
#
3+
# This configuration processes Kwickbit payment events and publishes them to
4+
# Google Pub/Sub using the V2 message format that matches downstream service expectations.
5+
#
6+
# V2 Message Format:
7+
# {
8+
# "chainIdentifier": "StellarMainnet" | "StellarTestnet",
9+
# "payload": {
10+
# "paymentId": "0x...",
11+
# "merchantAddress": "G...",
12+
# "amount": "1000000",
13+
# "royaltyFee": "50000",
14+
# "payerAddress": "G..."
15+
# },
16+
# "details": {
17+
# "hash": "abc123...",
18+
# "block": 12345,
19+
# "to": "G...",
20+
# "from": "G..."
21+
# }
22+
# }
23+
24+
pipelines:
25+
KwickbitPaymentsPubSubV2:
26+
source:
27+
type: SorobanSourceAdapter
28+
config:
29+
# RPC endpoint
30+
rpc_url: "https://rpc-testnet.nodeswithobsrvr.co"
31+
auth_header: "Api-Key YOUR_API_KEY"
32+
33+
# Start from recent ledger
34+
start_ledger: 1281900
35+
# No end_ledger = streaming mode
36+
37+
# Batch configuration
38+
batch_size: 10
39+
poll_interval: 5
40+
41+
# RPC method
42+
rpc_method: "getLedgers"
43+
44+
# Checkpointing
45+
checkpoint_dir: "/tmp/checkpoints/kwickbit-pubsub"
46+
checkpoint_interval: 100
47+
48+
processors:
49+
# Extract contract events from ledgers
50+
- type: ContractEvent
51+
config:
52+
# Filter for Kwickbit payment contract
53+
contract_id: "YOUR_KWICKBIT_CONTRACT_ID"
54+
55+
# Extract structured payment data from events
56+
- type: EventPaymentExtractor
57+
config:
58+
# Extract payment events from Kwickbit contract
59+
payment_contract_id: "YOUR_KWICKBIT_CONTRACT_ID"
60+
61+
consumers:
62+
# Publish to Google Pub/Sub in V2 format
63+
- type: PublishToGooglePubSubV2
64+
config:
65+
# Google Cloud Project ID
66+
project_id: "your-gcp-project-id"
67+
68+
# Pub/Sub topic name
69+
topic_id: "payments"
70+
71+
# Chain identifier (StellarMainnet or StellarTestnet)
72+
chain_identifier: "StellarTestnet"
73+
74+
# Authentication (choose one):
75+
76+
# Option 1: Credentials file path
77+
credentials_file: "/path/to/service-account-key.json"
78+
79+
# Option 2: Credentials JSON string
80+
# credentials_json: '{"type": "service_account", ...}'
81+
82+
# Option 3: Use environment variables
83+
# Set GCLOUD_PUBSUB_PUBLISHER_SERVICE_ACCOUNT_KEY or
84+
# GOOGLE_APPLICATION_CREDENTIALS environment variable
85+
86+
# Environment Variables:
87+
# - PUBSUB_PROJECT_ID: Override project_id config
88+
# - PUBSUB_EMULATOR_HOST: Use local Pub/Sub emulator (e.g., "localhost:8085")
89+
# - CHAIN_IDENTIFIER: Override chain_identifier (StellarMainnet or StellarTestnet)
90+
# - GCLOUD_PUBSUB_PUBLISHER_SERVICE_ACCOUNT_KEY: JSON credentials string
91+
# - GOOGLE_APPLICATION_CREDENTIALS: Path to credentials file
92+
93+
# Example using Pub/Sub Emulator (for local testing):
94+
# export PUBSUB_EMULATOR_HOST="localhost:8085"
95+
# export PUBSUB_PROJECT_ID="local-dev-project"
96+
# ./cdp-pipeline-workflow -config config/examples/kwickbit-payments-pubsub-v2.yaml
97+
98+
# Message Attributes Published:
99+
# - event_type: "payment"
100+
# - chain_identifier: "StellarMainnet" or "StellarTestnet"
101+
# - block_height: "12345"
102+
# - payment_id: "0x..."
103+
# - message_version: "v2"

config/examples/rpc-streaming-mode.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ pipelines:
2828
batch_size: 10 # Fetch 10 ledgers per batch
2929

3030
# Polling configuration
31-
poll_interval: 5 # Poll every 5 seconds when caught up (minimum 1s)
31+
poll_interval: 5 # REQUIRED: Poll every 5 seconds when caught up (minimum 1s)
3232

3333
# RPC method
3434
rpc_method: "getLedgers"

0 commit comments

Comments
 (0)