Skip to content

Commit

Permalink
feat: add idempotencekey to ksql
Browse files Browse the repository at this point in the history
  • Loading branch information
pgallik committed Dec 8, 2023
1 parent 3b04ab2 commit 570c1eb
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
16 changes: 16 additions & 0 deletions .ksql/ALL_01_ADDRESS_SNAPSHOT_OSLO_STREAM_V2.ksql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
CREATE OR REPLACE STREAM IF NOT EXISTS address_snapshot_oslo_stream_v2 (
messageKey varchar KEY,
headers ARRAY<STRUCT<key STRING, value BYTES>> HEADERS,
`@context` varchar,
`@type` varchar,
identificator STRUCT<id varchar, naamruimte varchar, objectId varchar, versieId varchar>,
gemeente STRUCT<objectId varchar, detail varchar, gemeentenaam STRUCT<geografischenaam STRUCT<spelling varchar, taal varchar>>>,
postinfo STRUCT<objectId varchar, detail varchar>,
straatnaam STRUCT<objectId varchar, detail varchar, straatnaam STRUCT<geografischeNaam STRUCT<spelling varchar, taal varchar>>>,
huisnummer varchar,
busnummer varchar,
volledigAdres STRUCT<geografischeNaam STRUCT<spelling varchar, taal varchar>>,
adresPositie STRUCT<geometrie STRUCT<type varchar, gml varchar>, positieGeometrieMethode varchar, positieSpecificatie varchar>,
adresStatus varchar,
officieelToegekend boolean)
WITH (KAFKA_TOPIC='address.snapshot.oslo', VALUE_FORMAT='JSON');
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ WITH (KAFKA_TOPIC='address.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, V
AS
SELECT
CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INT) PersistentLocalId,

CAST(FROM_BYTES(FILTER(headers, (x) => (x->key = 'IdempotenceKey'))[1]->VALUE, 'utf8') AS BIGINT) as IdempotenceKey,

IFNULL(GEMEENTE->OBJECTID, '') as "NisCode",
IFNULL(POSTINFO->OBJECTID, '') as "PostalCode",
CAST(IFNULL(STRAATNAAM->OBJECTID, '') AS INT) as "StreetNamePersistentLocalId",
Expand All @@ -22,5 +23,5 @@ SELECT
PARSE_TIMESTAMP(IDENTIFICATOR->VERSIEID, 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp",
CASE WHEN IDENTIFICATOR->ID is null THEN TRUE ELSE FALSE END as "IsRemoved"

FROM ADDRESS_SNAPSHOT_OSLO_STREAM
FROM ADDRESS_SNAPSHOT_OSLO_STREAM_V2
PARTITION BY CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INT);

0 comments on commit 570c1eb

Please sign in to comment.