Skip to content

Commit

Permalink
added new alert core services to jupyter test
Browse files Browse the repository at this point in the history
  • Loading branch information
wagmarcel committed Nov 18, 2024
1 parent 027aaf7 commit af7704b
Showing 1 changed file with 337 additions and 27 deletions.
364 changes: 337 additions & 27 deletions semantic-model/shacl2flink/debug/notebooks/core-services.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@
"outputs": [],
"source": [
"%%flink_gateway_sql_query\n",
"/* aggregates entities from attributes topic and insert it every 2 seconds in update channel\n",
" /* aggregates entities from attributes topic and insert it every 2 seconds in update channel\n",
" part of CORE SERVICES */\n",
"-- insert into ngsild_updates\n",
"select 'update', true, true, '[{\"id\": \"' || id || '\",' || LISTAGG(attributeagg) || '}]'\n",
" from (\n",
" select window_start, window_end, B.entityId as id, '\"' || B.name || '\":[' || LISTAGG(B.indexagg) || ']' as attributeagg\n",
" from(\n",
" select window_start, window_end, entityId, name,\n",
" '{\"observedAt\": \"' || LAST_VALUE(DATE_FORMAT(A.`ts`, 'yyyy-MM-dd''T''HH:mm:ss.') || CAST(EXTRACT(MILLISECOND from A.`ts`) as STRING)) || 'Z\", \"type\": \"'\n",
" || LAST_VALUE(A.`type`)\n",
" || '\", \"datasetId\": \"'\n",
" || IF(A.`https://uri.etsi.org/ngsi-ld/datasetId` IS NOT NULL, A.`https://uri.etsi.org/ngsi-ld/datasetId`, '@none')\n",
" || IF(LAST_VALUE(A.`type`) = 'https://uri.etsi.org/ngsi-ld/Relationship','\", \"object\": \"', '\", \"value\": ')\n",
" || IF(LAST_VALUE(A.`type`) = 'https://uri.etsi.org/ngsi-ld/Relationship', LAST_VALUE(A.`https://uri.etsi.org/ngsi-ld/hasObject`) || '\"',\n",
" \n",
" IF((LAST_VALUE(A.nodeType) <> '@json' AND LAST_VALUE(A.nodeType) <> '@id') OR LAST_VALUE(A.nodeType) IS NULL, '\"' || LAST_VALUE(A.`https://uri.etsi.org/ngsi-ld/hasValue`) || '\"',\n",
" /* @id needs encoding with \"@id\": value */\n",
" IF(LAST_VALUE(A.nodeType) = '@id', '{\"@id\": \"' || LAST_VALUE(A.`https://uri.etsi.org/ngsi-ld/hasValue`) || '\"}',\n",
" /* @json is put directly into value field */\n",
" IF(LAST_VALUE(A.nodeType) = '@json', LAST_VALUE(A.`https://uri.etsi.org/ngsi-ld/hasValue`), 'null' /*should not happen */)))\n",
" )\n",
" || '}' as indexagg\n",
" FROM TABLE( TUMBLE(TABLE attributes, DESCRIPTOR(ts), INTERVAL {{.Values.flink.ngsildUpdateWindow|squote}} second)) as A\n",
" WHERE A.entityId IS NOT NULL\n",
" GROUP BY A.entityId, A.index, A.`https://uri.etsi.org/ngsi-ld/datasetId`, A.name, window_start, window_end) as B\n",
" GROUP BY window_start, window_end, entityId, name\n",
" ) GROUP BY window_start, window_end, id;"
" --insert into ngsild_updates\n",
" select 'update', true, true, '[{\"id\": \"' || id || '\",' || LISTAGG(attributeagg) || '}]'\n",
" from (\n",
" select window_start, window_end, B.entityId as id, '\"' || B.name || '\":[' || LISTAGG(B.indexagg) || ']' as attributeagg\n",
" from(\n",
" select window_start, window_end, entityId, name, `datasetId`,\n",
" '{\"observedAt\": \"' || LAST_VALUE(DATE_FORMAT(A.`ts`, 'yyyy-MM-dd''T''HH:mm:ss.') || CAST(EXTRACT(MILLISECOND from A.`ts`) as STRING)) || 'Z\", \"type\": \"'\n",
" || LAST_VALUE(A.`type`)\n",
" || '\", \"datasetId\": \"'\n",
" || IF(A.`datasetId` IS NOT NULL, A.`datasetId`, '@none')\n",
" || IF(LAST_VALUE(A.`type`) = 'https://uri.etsi.org/ngsi-ld/Relationship','\", \"object\": \"', '\", \"value\": ')\n",
" || IF(LAST_VALUE(A.`type`) = 'https://uri.etsi.org/ngsi-ld/Relationship', LAST_VALUE(A.`attributeValue`) || '\"',\n",
" \n",
" IF((LAST_VALUE(A.nodeType) <> '@json' AND LAST_VALUE(A.nodeType) <> '@id') OR LAST_VALUE(A.nodeType) IS NULL, '\"' || LAST_VALUE(A.`attributeValue`) || '\"',\n",
" /* @id needs encoding with \"@id\": value */\n",
" IF(LAST_VALUE(A.nodeType) = '@id', '{\"@id\": \"' || LAST_VALUE(A.`attributeValue`) || '\"}',\n",
" /* @json is put directly into value field */\n",
" IF(LAST_VALUE(A.nodeType) = '@json', LAST_VALUE(A.`attributeValue`), 'null' /*should not happen */)))\n",
" )\n",
" || '}' as indexagg\n",
" FROM TABLE( TUMBLE(TABLE attributes, DESCRIPTOR(ts), INTERVAL {{.Values.flink.ngsildUpdateWindow|squote}} second)) as A\n",
" WHERE A.entityId IS NOT NULL and (A.deleted IS NULL or A.deleted = false) and (A.synched IS NULL or A.synched = false)\n",
" GROUP BY A.id, A.entityId, A.name, A.`datasetId`, window_start, window_end) as B\n",
" GROUP BY window_start, window_end, B.entityId, B.name\n",
" ) GROUP BY window_start, window_end, id;"
]
},
{
Expand All @@ -64,7 +64,317 @@
"id": "fb5f3608-2af3-43d8-ba14-c77f17549714",
"metadata": {},
"outputs": [],
"source": []
"source": [
"%%flink_gateway_sql_query\n",
"\n",
"select window_start, window_end, B.entityId as id, '\"' || B.name || '\":[' || LISTAGG(B.indexagg) || ']' as attributeagg\n",
" from(\n",
" select window_start, window_end, entityId, name, `datasetId`,\n",
" '{\"observedAt\": \"' || LAST_VALUE(DATE_FORMAT(A.`ts`, 'yyyy-MM-dd''T''HH:mm:ss.') || CAST(EXTRACT(MILLISECOND from A.`ts`) as STRING)) || 'Z\", \"type\": \"'\n",
" || LAST_VALUE(A.`type`)\n",
" || '\", \"datasetId\": \"'\n",
" || IF(A.`datasetId` IS NOT NULL, A.`datasetId`, '@none')\n",
" || IF(LAST_VALUE(A.`type`) = 'https://uri.etsi.org/ngsi-ld/Relationship','\", \"object\": \"', '\", \"value\": ')\n",
" || IF(LAST_VALUE(A.`type`) = 'https://uri.etsi.org/ngsi-ld/Relationship', LAST_VALUE(A.`attributeValue`) || '\"',\n",
" \n",
" IF((LAST_VALUE(A.nodeType) <> '@json' AND LAST_VALUE(A.nodeType) <> '@id') OR LAST_VALUE(A.nodeType) IS NULL, '\"' || LAST_VALUE(A.`attributeValue`) || '\"',\n",
" /* @id needs encoding with \"@id\": value */\n",
" IF(LAST_VALUE(A.nodeType) = '@id', '{\"@id\": \"' || LAST_VALUE(A.`attributeValue`) || '\"}',\n",
" /* @json is put directly into value field */\n",
" IF(LAST_VALUE(A.nodeType) = '@json', LAST_VALUE(A.`attributeValue`), 'null' /*should not happen */)))\n",
" )\n",
" || '}' as indexagg\n",
" FROM TABLE( TUMBLE(TABLE attributes, DESCRIPTOR(ts), INTERVAL '0.001' second)) as A\n",
" WHERE A.entityId IS NOT NULL and (A.deleted IS NULL or A.deleted = false) and (A.synched IS NULL or A.synched = false)\n",
" GROUP BY A.id, A.entityId, A.name, A.`datasetId`, window_start, window_end) as B\n",
" GROUP BY window_start, window_end, B.entityId, B.name;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07cb1f25-9514-45c1-8be1-a5be99d08e42",
"metadata": {},
"outputs": [],
"source": [
"%%flink_gateway_sql_prepare\n",
"DROP TABLE IF EXISTS `alerts`;\n",
"CREATE TABLE `alerts` (\n",
" `resource` STRING,\n",
" `event` STRING,\n",
" `environment` STRING,\n",
" `service` ARRAY < STRING >,\n",
" `severity` STRING,\n",
" `customer` STRING,\n",
" `text` STRING,\n",
" PRIMARY KEY (resource, event) NOT ENFORCED\n",
") WITH (\n",
" 'connector' = 'upsert-kafka',\n",
" 'value.format' = 'json',\n",
" 'value.json.fail-on-missing-field' = 'False',\n",
" 'value.json.ignore-parse-errors' = 'True',\n",
" 'key.format' = 'json',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'topic' = 'iff.alerts'\n",
");\n",
"DROP TABLE IF EXISTS `alerts`;\n",
"CREATE TABLE `alerts` (\n",
" `resource` STRING,\n",
" `event` STRING,\n",
" `environment` STRING,\n",
" `service` ARRAY < STRING >,\n",
" `severity` STRING,\n",
" `customer` STRING,\n",
" `text` STRING\n",
") WITH (\n",
" 'connector' = 'kafka',\n",
" 'format' = 'json',\n",
" 'json.fail-on-missing-field' = 'False',\n",
" 'json.ignore-parse-errors' = 'True',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'scan.startup.mode' = 'latest-offset',\n",
" 'topic' = 'iff.alerts'\n",
");\n",
"DROP TABLE IF EXISTS `alerts_bulk`;\n",
"CREATE TABLE `alerts_bulk` (\n",
" `resource` STRING,\n",
" `event` STRING,\n",
" `environment` STRING,\n",
" `service` ARRAY < STRING >,\n",
" `severity` STRING,\n",
" `customer` STRING,\n",
" `text` STRING,\n",
" watermark FOR ts AS ts - INTERVAL '0.001' SECONDS,\n",
" `ts` TIMESTAMP(3) METADATA\n",
" FROM\n",
" 'timestamp' VIRTUAL,\n",
" PRIMARY KEY (resource, event) NOT ENFORCED\n",
") WITH (\n",
" 'connector' = 'upsert-kafka',\n",
" 'value.format' = 'json',\n",
" 'value.json.fail-on-missing-field' = 'False',\n",
" 'value.json.ignore-parse-errors' = 'True',\n",
" 'key.format' = 'json',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'topic' = 'iff.alerts.bulk'\n",
");\n",
"DROP TABLE IF EXISTS `attributes`;\n",
" CREATE TABLE `attributes` (\n",
" `id` STRING,\n",
" `entityId` STRING,\n",
" `name` STRING,\n",
" `nodeType` STRING,\n",
" `valueType` STRING,\n",
" `index` INTEGER,\n",
" `type` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/datasetId` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasValue` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasObject` STRING,\n",
" `ts` TIMESTAMP(3) METADATA\n",
" FROM\n",
" 'timestamp' VIRTUAL,\n",
" watermark FOR ts AS ts\n",
") WITH (\n",
" 'connector' = 'kafka',\n",
" 'format' = 'json',\n",
" 'json.fail-on-missing-field' = 'False',\n",
" 'json.ignore-parse-errors' = 'True',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'scan.startup.mode' = 'latest-offset',\n",
" 'topic' = 'iff.ngsild.attributes'\n",
");\n",
"DROP TABLE IF EXISTS `attributes_writeback`;\n",
" CREATE TABLE `attributes_writeback` (\n",
" `id` STRING,\n",
" `entityId` STRING,\n",
" `name` STRING,\n",
" `nodeType` STRING,\n",
" `valueType` STRING,\n",
" `index` INTEGER,\n",
" `type` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/datasetId` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasValue` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasObject` STRING,\n",
" PRIMARY KEY (id, index) NOT ENFORCED\n",
") WITH (\n",
" 'connector' = 'upsert-kafka',\n",
" 'value.format' = 'json',\n",
" 'value.json.fail-on-missing-field' = 'False',\n",
" 'value.json.ignore-parse-errors' = 'True',\n",
" 'key.format' = 'json',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'topic' = 'iff.ngsild.attributes'\n",
");\n",
"DROP TABLE IF EXISTS `attributes_insert`;\n",
" CREATE TABLE `attributes_insert` (\n",
" `id` STRING,\n",
" `entityId` STRING,\n",
" `name` STRING,\n",
" `nodeType` STRING,\n",
" `valueType` STRING,\n",
" `index` INTEGER,\n",
" `type` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/datasetId` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasValue` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasObject` STRING,\n",
" `ts` TIMESTAMP(3) METADATA\n",
" FROM\n",
" 'timestamp' VIRTUAL,\n",
" watermark FOR `ts` AS `ts`,\n",
" PRIMARY KEY (id, index) NOT ENFORCED\n",
") WITH (\n",
" 'connector' = 'upsert-kafka',\n",
" 'value.format' = 'json',\n",
" 'value.json.fail-on-missing-field' = 'False',\n",
" 'value.json.ignore-parse-errors' = 'True',\n",
" 'key.format' = 'json',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'topic' = 'iff.ngsild.attributes_insert'\n",
");\n",
"DROP TABLE IF EXISTS `attributes_insert_filter`;\n",
" CREATE TABLE `attributes_insert_filter` (\n",
" `id` STRING,\n",
" `entityId` STRING,\n",
" `name` STRING,\n",
" `nodeType` STRING,\n",
" `valueType` STRING,\n",
" `index` INTEGER,\n",
" `type` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/datasetId` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasValue` STRING,\n",
" `https://uri.etsi.org/ngsi-ld/hasObject` STRING,\n",
" `ts` TIMESTAMP(3) METADATA\n",
" FROM\n",
" 'timestamp',\n",
" watermark FOR `ts` AS `ts`\n",
") WITH (\n",
" 'connector' = 'kafka',\n",
" 'format' = 'json',\n",
" 'json.fail-on-missing-field' = 'False',\n",
" 'json.ignore-parse-errors' = 'True',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'scan.startup.mode' = 'latest-offset',\n",
" 'topic' = 'iff.ngsild.attributes_insert'\n",
");\n",
"DROP TABLE IF EXISTS `ngsild_updates`;\n",
" CREATE TABLE `ngsild_updates` (\n",
" `op` STRING,\n",
" `overwriteOrReplace` Boolean,\n",
" `noForward` Boolean,\n",
" `entities` STRING\n",
") WITH (\n",
" 'connector' = 'kafka',\n",
" 'format' = 'json',\n",
" 'json.fail-on-missing-field' = 'False',\n",
" 'json.ignore-parse-errors' = 'True',\n",
" 'properties.bootstrap.servers' = 'my-cluster-kafka-bootstrap:9092',\n",
" 'scan.startup.mode' = 'latest-offset',\n",
" 'topic' = 'iff.ngsild-updates'\n",
");\n",
"DROP VIEW IF EXISTS `attributes_insert_view`;\n",
"CREATE VIEW `attributes_insert_view` AS\n",
"SELECT\n",
" id,\n",
" entityId,\n",
" name,\n",
" nodeType,\n",
" valueType,\n",
" index,\n",
" `type`,\n",
" 'https://uri.etsi.org/ngsi-ld/datasetId',\n",
" `https://uri.etsi.org/ngsi-ld/hasValue`,\n",
" `https://uri.etsi.org/ngsi-ld/hasObject`,\n",
" `ts` FROM (\n",
" SELECT\n",
" *,\n",
" ROW_NUMBER() OVER (\n",
" PARTITION BY `id`,\n",
" `index` \n",
" ORDER BY\n",
" ts DESC\n",
" ) AS rownum \n",
" FROM\n",
" `attributes_insert_filter`\n",
" ) \n",
"WHERE\n",
" rownum = 1\n",
" and entityId is NOT NULL;"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "80ec8cdd-cbe9-4fb0-bff1-4c64717a18bc",
"metadata": {},
"outputs": [],
"source": [
"%flink_sql_replace -s '{{{{.Values.flink.alertWindow | squote}}}}' -r \"'0.001'\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9c4e1396-99c4-4749-b88d-6bd25ff26496",
"metadata": {},
"outputs": [],
"source": [
"%%flink_gateway_sql_query\n",
" \n",
"WITH WindowedAlerts AS (\n",
" SELECT \n",
" `resource`, \n",
" `event`,\n",
" `environment`,\n",
" `service`,\n",
" `customer`,\n",
" LAST_VALUE(severity) AS `severity`,\n",
" LAST_VALUE(`text`) AS `text`,\n",
" TUMBLE_START(`ts`, INTERVAL '0.001' SECOND) AS window_start,\n",
" TUMBLE_END(`ts`, INTERVAL '0.001' SECOND) AS window_end,\n",
" TUMBLE_ROWTIME(`ts`, INTERVAL '0.001' SECOND) AS window_time\n",
" FROM TABLE(\n",
" TUMBLE(TABLE alerts_bulk, DESCRIPTOR(ts), INTERVAL '0.001' SECOND)\n",
" )\n",
" GROUP BY \n",
" `resource`, \n",
" `event`, \n",
" `service`, \n",
" `customer`, \n",
" `environment`, \n",
" TUMBLE(`ts`, INTERVAL '0.001' SECOND)\n",
"),\n",
"AlertsWithPreviousSeverity AS (\n",
" SELECT\n",
" `resource`,\n",
" `event`,\n",
" `environment`,\n",
" `service`,\n",
" `customer`,\n",
" `severity`,\n",
" `text`,\n",
" `window_end`,\n",
" `window_time`,\n",
" LAG(`severity`) OVER (\n",
" PARTITION BY `resource`, `event`, `service`, `customer`, `environment`\n",
" ORDER BY `window_time`\n",
" ) AS prev_severity\n",
" FROM WindowedAlerts\n",
")\n",
"SELECT \n",
" `resource`,\n",
" `event`,\n",
" `environment`,\n",
" `service`,\n",
" `severity`,\n",
" `customer`,\n",
" `text`\n",
"FROM \n",
" AlertsWithPreviousSeverity\n",
"WHERE \n",
" prev_severity IS NULL OR severity <> prev_severity;\n",
"\n",
"\n"
]
}
],
"metadata": {
Expand Down

0 comments on commit af7704b

Please sign in to comment.