diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_ADDRESS_CONNECTOR.sql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_ADDRESS_CONNECTOR.sql new file mode 100644 index 000000000..10e9ceb30 --- /dev/null +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_ADDRESS_CONNECTOR.sql @@ -0,0 +1,28 @@ +CREATE SINK CONNECTOR `AddressIntegrationDbConnector` with ( + "topics"= 'address.snapshot.oslo.flatten.integrationdb', + "input.data.format"= 'JSON_SR', + "input.key.format"= 'JSON_SR', + "delete.enabled"= false, + "connector.class"= 'PostgresSink', + "name"= 'AddressIntegrationDbConnector', + "kafka.auth.mode"= 'KAFKA_API_KEY', + "kafka.api.key"= '***', --clear value + "kafka.api.secret"= '***', --clear value + "connection.host"= '***', --clear value + "connection.port"= '5432', + "connection.user"= '***', --clear value + "connection.password"= '***', --clear value + "db.name"= 'postgres', + "ssl.mode"= 'require', + "insert.mode"= 'UPSERT', + "table.name.format"= 'Integration.Address', + "table.types"= 'TABLE', + "db.timezone"= 'UTC', + "pk.mode"= 'record_key', + "pk.fields"= 'PersistentLocalId', + "auto.create"= false, + "auto.evolve"= false, + "quote.sql.identifiers"= 'ALWAYS', + "batch.sizes"= 3000, + "tasks.max"= 1 + ); diff --git a/.ksql/INTEGRATIONDB/INTEGRATIONDB_ADDRESS_SNAPSHOT_OSLO_STREAM_FLATTEN.sql b/.ksql/INTEGRATIONDB/INTEGRATIONDB_ADDRESS_SNAPSHOT_OSLO_STREAM_FLATTEN.sql new file mode 100644 index 000000000..49ba42cae --- /dev/null +++ b/.ksql/INTEGRATIONDB/INTEGRATIONDB_ADDRESS_SNAPSHOT_OSLO_STREAM_FLATTEN.sql @@ -0,0 +1,26 @@ +CREATE OR REPLACE STREAM IF NOT EXISTS ADDRESS_SNAPSHOT_OSLO_STREAM_FLATTEN_INTEGRATIONDB +WITH (KAFKA_TOPIC='address.snapshot.oslo.flatten.integrationdb', PARTITIONS=1, VALUE_FORMAT='JSON_SR', KEY_FORMAT='JSON_SR') +AS +SELECT + CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INT) PersistentLocalId, + + IFNULL(GEMEENTE->OBJECTID, '') as "NisCode", + IFNULL(POSTINFO->OBJECTID, '') as "PostalCode", + CAST(IFNULL(STRAATNAAM->OBJECTID, '') AS INT) as "StreetNamePersistentLocalId", + ADRESSTATUS as "Status", + HUISNUMMER as "HouseNumber", + BUSNUMMER as "BoxNumber", + VOLLEDIGADRES->GEOGRAFISCHENAAM->SPELLING as "FullName", + ADRESPOSITIE->GEOMETRIE->GML as "GeometryGml", + ADRESPOSITIE->POSITIEGEOMETRIEMETHODE as "PositionMethod", + ADRESPOSITIE->POSITIESPECIFICATIE as "PositionSpecification", + OFFICIEELTOEGEKEND as "IsOfficiallyAssigned", + + IDENTIFICATOR->ID as "PuriId", + IDENTIFICATOR->NAAMRUIMTE as "Namespace", + IDENTIFICATOR->VERSIEID as "VersionString", + PARSE_TIMESTAMP("VersionString", 'yyyy-MM-dd''T''HH:mm:ssXXX', 'UTC') as "VersionTimestamp", + CASE WHEN IDENTIFICATOR->ID is null THEN TRUE ELSE FALSE END as "IsRemoved" + +FROM ADDRESS_SNAPSHOT_OSLO_STREAM +PARTITION BY CAST(REDUCE(SPLIT(URL_EXTRACT_PATH(MESSAGEKEY), '/'), '', (s,x) => x) AS INT); \ No newline at end of file