Skip to content

Commit ab192be

Browse files
authored
doc: example with kafka debezium (#8)
* doc: example with debezium * fix: bigquery safe_cast timestamp parsing
1 parent aa38887 commit ab192be

File tree

3 files changed

+132
-5
lines changed

3 files changed

+132
-5
lines changed

bizon/connectors/destinations/bigquery_streaming/src/destination.py

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import tempfile
33
from concurrent.futures import ThreadPoolExecutor
4+
from datetime import datetime
45
from typing import List, Tuple, Type
56

67
import polars as pl
@@ -101,6 +102,16 @@ def append_rows_to_stream(
101102
response = write_client.append_rows(iter([request]))
102103
return response.code().name
103104

105+
def safe_cast_record_values(self, row: dict):
106+
for col in self.config.record_schema:
107+
if col.type in ["TIMESTAMP", "DATETIME"]:
108+
if isinstance(row[col.name], int):
109+
if row[col.name] > datetime(9999, 12, 31).timestamp():
110+
row[col.name] = datetime.fromtimestamp(row[col.name]/1_000_000).strftime("%Y-%m-%d %H:%M:%S.%f")
111+
else:
112+
row[col.name] = datetime.fromtimestamp(row[col.name]).strftime("%Y-%m-%d %H:%M:%S.%f")
113+
return row
114+
104115
@staticmethod
105116
def to_protobuf_serialization(TableRowClass: Type[Message], row: dict) -> bytes:
106117
"""Convert a row to a Protobuf serialization."""
@@ -132,7 +143,7 @@ def load_to_bigquery_via_streaming(self, df_destination_records: pl.DataFrame) -
132143

133144
if self.config.unnest:
134145
serialized_rows = [
135-
self.to_protobuf_serialization(TableRowClass=TableRow, row=row)
146+
self.to_protobuf_serialization(TableRowClass=TableRow, row=self.safe_cast_record_values(row))
136147
for row in df_destination_records["source_data"].str.json_decode().to_list()
137148
]
138149
else:
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
name: Kafka debezium messages to bigquery streaming
2+
3+
source:
4+
name: kafka
5+
stream: topic
6+
7+
sync_mode: full_refresh
8+
9+
force_ignore_checkpoint: true
10+
11+
topic: <TOPIC_NAME>
12+
13+
nb_bytes_schema_id: 8
14+
15+
batch_size: 1000
16+
consumer_timeout: 10
17+
bootstrap_servers: <BOOTSTRAP_SERVERS>
18+
group_id: <GROUP_ID>
19+
20+
authentication:
21+
type: basic
22+
23+
schema_registry_url: <SCHEMA_REGISTRY_URL>
24+
schema_registry_username: <SCHEMA_REGISTRY_USERNAME>
25+
schema_registry_password: <SCHEMA_REGISTRY_PASSWORD>
26+
27+
params:
28+
username: <USERNAME>
29+
password: <PASSWORD>
30+
31+
destination:
32+
name: bigquery_streaming
33+
34+
config:
35+
buffer_size: 50
36+
bq_max_rows_per_request: 10000
37+
buffer_flush_timeout: 30
38+
39+
table_id: <TABLE_ID>
40+
dataset_id: <DATASET_ID>
41+
dataset_location: US
42+
project_id: <PROJECT_ID>
43+
44+
unnest: true
45+
46+
time_partitioning:
47+
# Mandatory if unnested
48+
field: __event_timestamp
49+
50+
record_schema:
51+
- name: account_id
52+
type: INTEGER
53+
mode: REQUIRED
54+
55+
- name: team_id
56+
type: INTEGER
57+
mode: REQUIRED
58+
59+
- name: user_id
60+
type: INTEGER
61+
mode: REQUIRED
62+
63+
- name: __deleted
64+
type: BOOLEAN
65+
mode: NULLABLE
66+
67+
- name: __cluster
68+
type: STRING
69+
mode: NULLABLE
70+
71+
- name: __kafka_partition
72+
type: INTEGER
73+
mode: NULLABLE
74+
75+
- name: __kafka_offset
76+
type: INTEGER
77+
mode: NULLABLE
78+
79+
- name: __event_timestamp
80+
type: TIMESTAMP
81+
mode: NULLABLE
82+
83+
transforms:
84+
- label: debezium
85+
python: |
86+
from datetime import datetime
87+
88+
cluster = data['value']['source']['name'].replace('_', '-')
89+
partition = data['partition']
90+
offset = data['offset']
91+
92+
kafka_timestamp = datetime.utcfromtimestamp(data['value']['source']['ts_ms'] / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')
93+
94+
deleted = False
95+
96+
if data['value']['op'] == 'd':
97+
data = data['value']['before']
98+
deleted = True
99+
else:
100+
data = data['value']['after']
101+
102+
data['__deleted'] = deleted
103+
data['__cluster'] = cluster
104+
data['__kafka_partition'] = partition
105+
data['__kafka_offset'] = offset
106+
data['__event_timestamp'] = kafka_timestamp
107+
108+
engine:
109+
queue:
110+
type: python_queue
111+
config:
112+
max_nb_messages: 1000000

bizon/transform/transform.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ def __init__(self, transforms: list[TransformModel]):
1414

1515
def apply_transforms(self, df_source_records: pl.DataFrame) -> pl.DataFrame:
1616
"""Apply transformation on df_source_records"""
17+
1718
# Process the transformations
1819
for transform in self.transforms:
1920

2021
logger.debug(f"Applying transform {transform.label}")
2122

2223
# Create a function to be executed in the desired context
23-
def my_transform(data: dict) -> str:
24+
def my_transform(data: str) -> str:
25+
26+
data = json.loads(data)
2427

2528
# Start writing here
2629
local_vars = {"data": data}
@@ -33,9 +36,10 @@ def my_transform(data: dict) -> str:
3336
# Stop writing here
3437
return json.dumps(local_vars["data"])
3538

36-
transformed_source_records = [
37-
my_transform(row) for row in df_source_records["data"].str.json_decode().to_list()
38-
]
39+
transformed_source_records = []
40+
41+
for row in df_source_records["data"].to_list():
42+
transformed_source_records.append(my_transform(row))
3943

4044
df_source_records = df_source_records.with_columns(
4145
pl.Series("data", transformed_source_records, dtype=pl.String).alias("data")

0 commit comments

Comments
 (0)