From 6e89b36a12541c323b591fc204179b5fbd0e642a Mon Sep 17 00:00:00 2001 From: Antoine Balliet Date: Mon, 23 Dec 2024 16:34:05 +0100 Subject: [PATCH] doc: example with debezium --- .../kafka/config/kafka_debezium.example.yml | 112 ++++++++++++++++++ bizon/transform/transform.py | 12 +- 2 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 bizon/connectors/sources/kafka/config/kafka_debezium.example.yml diff --git a/bizon/connectors/sources/kafka/config/kafka_debezium.example.yml b/bizon/connectors/sources/kafka/config/kafka_debezium.example.yml new file mode 100644 index 0000000..8b2b518 --- /dev/null +++ b/bizon/connectors/sources/kafka/config/kafka_debezium.example.yml @@ -0,0 +1,112 @@ +name: Kafka debezium messages to bigquery streaming + +source: + name: kafka + stream: topic + + sync_mode: full_refresh + + force_ignore_checkpoint: true + + topic: + + nb_bytes_schema_id: 8 + + batch_size: 1000 + consumer_timeout: 10 + bootstrap_servers: + group_id: + + authentication: + type: basic + + schema_registry_url: + schema_registry_username: + schema_registry_password: + + params: + username: + password: + +destination: + name: bigquery_streaming + + config: + buffer_size: 50 + bq_max_rows_per_request: 10000 + buffer_flush_timeout: 30 + + table_id: + dataset_id: + dataset_location: US + project_id: + + unnest: true + + time_partitioning: + # Mandatory if unnested + field: __event_timestamp + + record_schema: + - name: account_id + type: INTEGER + mode: REQUIRED + + - name: team_id + type: INTEGER + mode: REQUIRED + + - name: user_id + type: INTEGER + mode: REQUIRED + + - name: __deleted + type: BOOLEAN + mode: NULLABLE + + - name: __cluster + type: STRING + mode: NULLABLE + + - name: __kafka_partition + type: INTEGER + mode: NULLABLE + + - name: __kafka_offset + type: INTEGER + mode: NULLABLE + + - name: __event_timestamp + type: TIMESTAMP + mode: NULLABLE + +transforms: +- label: debezium + python: | + from datetime import datetime + + cluster = data['value']['source']['name'].replace('_', '-') + partition = data['partition'] + offset = data['offset'] + + kafka_timestamp = datetime.utcfromtimestamp(data['value']['source']['ts_ms'] / 1000).strftime('%Y-%m-%d %H:%M:%S.%f') + + deleted = False + + if data['value']['op'] == 'd': + data = data['value']['before'] + deleted = True + else: + data = data['value']['after'] + + data['__deleted'] = deleted + data['__cluster'] = cluster + data['__kafka_partition'] = partition + data['__kafka_offset'] = offset + data['__event_timestamp'] = kafka_timestamp + +engine: + queue: + type: python_queue + config: + max_nb_messages: 1000000 \ No newline at end of file diff --git a/bizon/transform/transform.py b/bizon/transform/transform.py index 4deacbd..e41c8c3 100644 --- a/bizon/transform/transform.py +++ b/bizon/transform/transform.py @@ -14,13 +14,16 @@ def __init__(self, transforms: list[TransformModel]): def apply_transforms(self, df_source_records: pl.DataFrame) -> pl.DataFrame: """Apply transformation on df_source_records""" + # Process the transformations for transform in self.transforms: logger.debug(f"Applying transform {transform.label}") # Create a function to be executed in the desired context - def my_transform(data: dict) -> str: + def my_transform(data: str) -> str: + + data = json.loads(data) # Start writing here local_vars = {"data": data} @@ -33,9 +36,10 @@ def my_transform(data: dict) -> str: # Stop writing here return json.dumps(local_vars["data"]) - transformed_source_records = [ - my_transform(row) for row in df_source_records["data"].str.json_decode().to_list() - ] + transformed_source_records = [] + + for row in df_source_records["data"].to_list(): + transformed_source_records.append(my_transform(row)) df_source_records = df_source_records.with_columns( pl.Series("data", transformed_source_records, dtype=pl.String).alias("data")