The APi bridges write data to a Kafka topic or Azure Event Hubs or Fabric Event Streams. From Event Hubs and Event Streams, you can easily feed this data into a Fabric Eventhouse or Azure Data Explorer (Kusto) database to analyze and visualize it.
This document explains how to set up a Kusto database and ingest data from the API bridges. The principles are the same for all the bridges.
First, you need access to Microsoft Fabric. If you don't have access yet, you can easily set up a trial account for yourself. To set up a trial account, follow the instructions in the Microsoft Fabric documentation.
If that's "TL;DR" for you, click here and get going.
To set up a KQL database in Microsoft Fabric, follow the following steps:
- Create an Eventhouse or reuse an existing one.
- Create a KQL database in the Eventhouse.
Once you have set up the KQL database, collect these two pieces of information:
-
The
databaseName
of the KQL database. -
The
clusterUri
of the Eventhouse. You can find this in the Eventhouse overview in the Microsoft Fabric portal under "Eventhouse details".
First, you need an Azure subscription. If you don't have an Azure subscription yet, you can easily set up a trial account for yourself. To set up a trial account, follow the instructions in the Azure documentation.
To set up a KQL database in Azure Data Explorer (Kusto), follow the steps in the Create a Kusto cluster and database article.
Once you have set up the KQL database, collect these two pieces of information:
- The
databaseName
of the KQL database. - The
clusterUri
of the Kusto cluster. You can find this in the Azure portal under "Overview" in the Azure Data Explorer (Kusto) resource. It's the value of the "URI" field.
The KQL scripts in the projects contains the necessary table schemas, materialized views, and update policies to organize the event data in the the KQL database. Consider this your "bronze" layer.
The scripts are located here:
- GTFS: gtfs.kql
- NOAA: noaa.kql
- PegelOnline: pegelonline.kql
- RSS: rss.kql
You can run the script one-by-one in the Kusto Query Language (KQL) editor in
the Fabric or Azure database portal, or you can use the kusto.cli
tool for
which you find two helper scripts in the tools directory.
To install the Kusto CLI tool, run the following command:
.\tools\install-kusto-cli.ps1
The script downloads the Kusto CLI tool, installs it into the KustoCLI
directory of your user profile, and adds tools directory inside of that to the user's PATH. If you run the tool from Windows Powershell, the .NET 4.7 version is installed, if you run from Powershell 7+, the .NET 6.0 version is installed.
To run the KQL script, use the following command:
.\tools\run-kql-script.ps1 -clusterUri '<clusterUri>' -databaseName '<databaseName>' -script .\gtfs\kql\gtfs.kql
Replace <clusterUri>
and <databaseName>
with the values you collected earlier.
This assumes that you run the script from the root of the repository. If you run it from a different location, adjust the path to the script accordingly.
The script creates a table for each payload schema of every event that is being
sent by the bridge. The table contains the "top level" fields of the payload
schema flattened into columns. If a payload field is an object or an array, the
column type is dynamic
. The CloudEvents metadata is stored in the table as
well, with each attribute name prefixed with ___
.
Example:
.create-merge table [Trips] (
[routeId]: string,
[serviceDates]: dynamic,
[serviceExceptions]: dynamic,
[tripId]: string,
[tripHeadsign]: string,
[tripShortName]: string,
[directionId]: string,
[blockId]: string,
[shapeId]: string,
[wheelchairAccessible]: string,
[bikesAllowed]: string,
[___type]: string,
[___source]: string,
[___id]: string,
[___time]: datetime,
[___subject]: string
);
The script also creates a materialized view for each payload schema that gives
you the latest event for each unique combination of the CloudEvent ___type
,
___source
, and ___subject
. This is useful for querying the latest state of
each entity. The bridge will generally populate the ___subject
with a key
value for the entity (the tripId
in the example above), so you can easily
query the latest state and avoid reading duplicates in the form of older
versions.
.create materialized-view with (backfill=true) TripsLatest on table Trips {
Trips | summarize arg_max(___time, *) by ___type, ___source, ___subject
}
You will also find that the input schema's descriptive metadata is added to the table as table and column docstrings.
Inside the docstrings you may find two fields:
description
: A human-readable description of the field.schema
: An Apache Avro schema that describes the field's data type and possible values. This is added if the available type information is richer than what can be expressed in KQL.
Example:
.alter table [Trips] column-docstrings (
[routeId]: "{\"description\": \"Identifies a route.\"}",
[tripId]: "{\"description\": \"Identifies a trip.\"}",
[tripHeadsign]: "{\"description\": \"Text that appears on signage identifying the trip's destination to riders.\", \"schema\": [\"null\", \"string\"]}",
[tripShortName]: "{\"description\": \"Public facing text used to identify the trip to riders.\", \"schema\": [\"null\", \"string\"]}",
[directionId]: "{\"description\": \"Indicates the direction of travel for a trip.\", \"schema\": {\"type\": \"enum\", \"name\": \"DirectionId\", \"namespace\": \"GeneralTransitFeedStatic\", \"symbols\": [\"OUTBOUND\", \"INBOUND\"], \"doc\": \"Indicates the direction of travel for a trip. Symbols: OUTBOUND - Travel in one direction; INBOUND - Travel in the opposite direction.\"}}",
[blockId]: "{\"description\": \"Identifies the block to which the trip belongs.\", \"schema\": [\"null\", \"string\"]}",
[shapeId]: "{\"description\": \"Identifies a geospatial shape describing the vehicle travel path for a trip.\", \"schema\": [\"null\", \"string\"]}",
[wheelchairAccessible]: "{\"description\": \"Indicates wheelchair accessibility.\", \"schema\": {\"type\": \"enum\", \"name\": \"WheelchairAccessible\", \"namespace\": \"GeneralTransitFeedStatic\", \"symbols\": [\"NO_INFO\", \"WHEELCHAIR_ACCESSIBLE\", \"NOT_WHEELCHAIR_ACCESSIBLE\"], \"doc\": \"Indicates wheelchair accessibility. Symbols: NO_INFO - No accessibility information for the trip; WHEELCHAIR_ACCESSIBLE - Vehicle can accommodate at least one rider in a wheelchair; NOT_WHEELCHAIR_ACCESSIBLE - No riders in wheelchairs can be accommodated on this trip.\"}}",
[bikesAllowed]: "{\"description\": \"Indicates whether bikes are allowed.\", \"schema\": {\"type\": \"enum\", \"name\": \"BikesAllowed\", \"namespace\": \"GeneralTransitFeedStatic\", \"symbols\": [\"NO_INFO\", \"BICYCLE_ALLOWED\", \"BICYCLE_NOT_ALLOWED\"], \"doc\": \"Indicates whether bikes are allowed. Symbols: NO_INFO - No bike information for the trip; BICYCLE_ALLOWED - Vehicle can accommodate at least one bicycle; BICYCLE_NOT_ALLOWED - No bicycles are allowed on this trip.\"}}",
[___type] : 'Event type',
[___source]: 'Context origin/source of the event',
[___id]: 'Event identifier',
[___time]: 'Event generation time',
[___subject]: 'Context subject of the event'
);
The script also creates an update policy for each table that imports "its" events from
the _cloudevents_ingest
table. This is explained further in the next section.
.alter table [Trips] policy update
```
[{
"IsEnabled": true,
"Source": "_cloudevents_dispatch",
"Query": "_cloudevents_dispatch | where (specversion == '1.0' and type == 'GeneralTransitFeedStatic.Trips') | project['routeId'] = tostring(data.['routeId']),['serviceDates'] = todynamic(data.['serviceDates']),['serviceExceptions'] = todynamic(data.['serviceExceptions']),['tripId'] = tostring(data.['tripId']),['tripHeadsign'] = tostring(data.['tripHeadsign']),['tripShortName'] = tostring(data.['tripShortName']),['directionId'] = tostring(data.['directionId']),['blockId'] = tostring(data.['blockId']),['shapeId'] = tostring(data.['shapeId']),['wheelchairAccessible'] = tostring(data.['wheelchairAccessible']),['bikesAllowed'] = tostring(data.['bikesAllowed']),___type = type,___source = source,___id = ['id'],___time = ['time'],___subject = subject",
"IsTransactional": false,
"PropagateIngestionProperties": true,
}]
```
Finally, there are two ingestion mappings created for each table. The _json_flat
mapping assumes that the
arriving events are already in the flattened form of the table schema. The _json_ce_structured
mapping assumes that the arriving events are in CloudEvents structured JSON format and extracts the payload columns from the data
field.
Now that you have set up the KQL database and added the necessary schemas, you still need to wire up the API bridge to the KQL database.
The table schema elements added by the script allow for several avenues for ingesting data. We will discuss two of them here:
In this scenario, you will set up your KQL database to ingest data from the
Event Stream or Event Hub into the _cloudevents_dispatch
table, using the
_cloudevents_dispatch_json
JSON ingestion mapping.
Before you start, the bridge should be running or should at least already have run once to send some events to the target Event Stream or Event Hub. It is required for data to be in the Event Stream or Event Hub to finish the setup.
-
In the Microsoft Fabric portal, navigate to the Eventhouse and database you set up.
-
Find the "Get Data" button in the ribbon, and click it or expand its menu. Select either "Event Hub" or "Event Stream" depending on which you are publishing to:
-
In the "Pick a destination table and configure the source" dialog, select the
_cloudevents_dispatch
table, then complete the source configuration.Detailed instructions for how to connect to Event Hubs and Event Streams sources can be found in the documentation:
-
It is recommended to open up the "Advanced filters" section and set the "Event retrieval start date" to date well before you started the bridge for the first time.
-
Click "Next". On the upper right hand side of the "Inspect the data" dialog, make sure that "Format" is "JSON" and then click "Advanced", select "existing mapping" in the box and choose the
_cloudevents_dispatch_mapping
mapping. -
Click "Finish" to start the ingestion.
Once you've completed these steps, events will start showing up in the _cloudevents_dispatch
table
and the update policies will take care of importing the events into the per-event-type tables.
With Azure Data Explorer, the steps are similar to the ones for Microsoft Fabric, but the portal entry point is different.
Follow the instructions in the Create an Event Hubs data connection for Azure Data Explorer article, and select the table and mappings as described above.
If you are using Azure Stream Analytics to pre-process the data from the Event Hub or Event Stream, you can push the event data directly into the event tables.
- In the Azure portal, navigate to the Azure Stream Analytics job that you have set up to process the data from the Event Hub or Event Stream.
- In the job's "Overview" pane, click on the "Outputs" tab and add a new output
- Select "Azure Data Explorer" as the output type.
- Configure the output with the connection string of the Azure Data Explorer cluster and the database name.
- In the "Table" field, enter the name of the event table you want to target.
In the "Query" field, you can use the SELECT
statement to map the incoming
data to the table schema, filtered by type
. For example:
SELECT
data.routeId as routeId,
data.serviceDates as serviceDates,
data.serviceExceptions as serviceExceptions,
data.tripId as tripId,
data.tripHeadsign as tripHeadsign,
data.tripShortName as tripShortName,
data.directionId as directionId,
data.blockId as blockId,
data.shapeId as shapeId,
data.wheelchairAccessible as wheelchairAccessible,
data.bikesAllowed as bikesAllowed,
type as ___type,
source as ___source,
id as ___id,
time as ___time,
subject as ___subject
INTO
Trips
FROM
input_stream
WHERE
specversion = '1.0' AND type = 'GeneralTransitFeedStatic.Trips'