Skip to content
This repository has been archived by the owner on Nov 26, 2021. It is now read-only.

Creating Tables and Writing Query Results to External Systems

Fabian Hueske edited this page Jul 30, 2020 · 4 revisions

This session discusses how to connect and integrate Flink SQL with external systems.

Specifically, you will learn

  • how to create tables to read and write data.
  • how to write query results to external systems.

Slides

Hands-On Exercises

Flink SQL and the SQL CLI client support the INSERT INTO clause to write the result of a SELECT query into a table. The table must have been previously registered as a sink table. The training environment does not provide pre-defined sink tables, so we have to define them ourselves.

You can define a tables with a CREATE TABLE DDL statement. Since Flink doesn't store data itself but uses external storage systems, a table definition requires additional information about the external system and possibly about the format in which the data should be stored.

In the following exercises we provide the DDL statements to define the tables and describe their properties.

Writing an Append-only Table to Kafka

Specify a query that computes the number of rides that started within the last 10 minutes and write the result of the query to table that is backed by a Kafka topic.

As a first step, we have to define the table to which we will write the data. The table should be backed by a Kafka topic and the records should be encoded as text-JSON (like our input topics).

We define the table with the following DDL statement.

CREATE TABLE TenMinPsgCnts (
  cntStart TIMESTAMP(3),
  cntEnd TIMESTAMP(3),
  cnt BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'TenMinPsgCnts',
  'properties.bootstrap.servers' = 'kafka:9092',
  'properties.group.id' = 'flinksql',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

As you see, the table definition includes all details that Flink needs to talk to Kafka such as hostname, port, and topic name. Being an append-only table, TenMinPsgCnts only accepts append-only results, i.e., streams that do not update previously emitted results.

Now you have to write the SQL query that writes the requested result into the TenMinPsgCnts table. Note that the schema of the query result must exactly match with the schema of the result table which consists of three columns: a start timestamp, an end timestamp, and a count:

Flink SQL> DESCRIBE TenMinPsgCnts;
root
 |-- cntStart: TIMESTAMP(3)
 |-- cntEnd: TIMESTAMP(3)
 |-- cnt: BIGINT

You can monitor the Kafka topic of the TenMinsPsgCnts table by running the following command in the folder that contains the docker-compose.yml file:

docker-compose exec kafka kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

The result is encoded as JSON and should look as follows.

{"cntStart":1356998400000,"cntEnd":1356999000000,"cnt":2864}
{"cntStart":1356999000000,"cntEnd":1356999600000,"cnt":6884}
{"cntStart":1356999600000,"cntEnd":1357000200000,"cnt":9391}
{"cntStart":1357000200000,"cntEnd":1357000800000,"cnt":10158}
{"cntStart":1357000800000,"cntEnd":1357001400000,"cnt":10702}
{"cntStart":1357001400000,"cntEnd":1357002000000,"cnt":10817}
{"cntStart":1357002000000,"cntEnd":1357002600000,"cnt":10958}
{"cntStart":1357002600000,"cntEnd":1357003200000,"cnt":10946}

Note: An INSERT INTO query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.

Click to see the solution.
INSERT INTO TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart, 
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  COUNT(*) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

Writing an Append-only Table as Partitioned CSV Files to S3

Specify a query that cleanses the data of the Rides table by joining the start and end events and filtering out all rides that do not start and end in New York City. The result of the cleansing query shall be written as CSV files to a filesystem in hourly partitions.

First, we define the sink table CleansedRides which is backed by a directory in an S3-compatible storage system. The following DDL statement configures the filesystem connector to partition the rows based on the tHour attribute and write them to CSV-formatted files.

CREATE TABLE CleansedRides (
  rideId BIGINT,
  taxiId BIGINT,
  startTime TIMESTAMP(3),
  endTime TIMESTAMP(3),
  startLon FLOAT,
  startLat FLOAT,
  endLon FLOAT,
  endLat FLOAT,
  psgCnt INT,
  tHour STRING)
PARTITIONED BY (tHour)
WITH (
  'connector'='filesystem',
  'path'='s3://sql-training/cleansed-rides',
  'format'='csv'
);

Note: We're using CSV here because it is a human-readable format. In a production setting you would probably use a more read-efficient, columnar format, such as Apache Parquet or Apache ORC which are also supported by Flink.

The training environment includes Minio, an S3-compatible storage system. You can access its web UI by opening http://localhost:9000 and entering 'flink-sql' for both user name and password. The cleansed-rides folder won't be created until you submit a query that writes to the CleansedRides table. The filesystem connector writes partitions as part files. A part file can only be downloaded via the web UI after it was completed and a new part file was started.

The content of a part file should look as follows:

9,2013000009,"2013-01-01 00:00:00","2013-01-01 00:02:44",-73.99988,40.743343,-74.00371,40.74828,1
15,2013000015,"2013-01-01 00:00:00","2013-01-01 00:03:00",-73.95412,40.778343,-73.94182,40.79548,2
14,2013000014,"2013-01-01 00:00:00","2013-01-01 00:04:00",-74.00797,40.747204,-74.0073,40.73818,6
34,2013000034,"2013-01-01 00:00:00","2013-01-01 00:04:00",-73.934555,40.750957,-73.91633,40.76224,5
35,2013000035,"2013-01-01 00:00:00","2013-01-01 00:03:00",-74.0014,40.72261,-73.99818,40.729485,6

Note: An INSERT INTO query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.

Click here for hints.
  • To join the start and end events, you should use a time-windowed join with the same join condition as the solution of the "Compute the Ride Duration" exercise.
  • Use the built-in DATE_FORMAT function to format the tHour field.
  • Use the user-defined function isInNYC to check if a pair of longitude-latitude coordinates is located in New York City.

Click to see the solution.
INSERT INTO CleansedRides 
SELECT 
  s.rideId, s.taxiId, s.rideTime, CAST(e.rideTime AS TIMESTAMP(3)), 
  s.lon, s.lat, e.lon, e.lat, s.psgCnt, 
  DATE_FORMAT(s.rideTime, 'yyyy-MM-dd_HH-00')
FROM 
  (SELECT * FROM Rides WHERE isStart AND isInNYC(lon, lat)) s,
  (SELECT * FROM Rides WHERE NOT isStart AND isInNYC(lon, lat)) e
WHERE 
  s.rideId = e.rideId AND 
  e.rideTime BETWEEN s.rideTime AND s.rideTime + INTERVAL '2' HOUR;

Maintaining a Continuously Updated Materialized View in MySQL

Specify a query that maintains a materialized view consisting of the number of departing rides per area in a MySQL table AreaCnts.

First we define the sink table AreaCnts which is backed by a table with the same name in MySQL with the following DDL statement.

CREATE TABLE AreaCnts (
  areaId INT PRIMARY KEY NOT ENFORCED,
  cnt BIGINT
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://mysql:3306/flinksql',
  'table-name' = 'AreaCnts',
  'username' = 'flink',
  'password' = 'secret',
  'sink.buffer-flush.interval' = '1s',
  'sink.buffer-flush.max-rows' = '1000',
  'sink.max-retries' = '3'
);

AreaCnts is an upsert table stored in a MySQL table, which means that it only accepts insert-only results or results which can be updated based on a key.

Now you have to specify a query that computes the number of departing rides per area and write the result to AreaCnts.

Again, the schema of the query result and the schema of the sink table (AreaCnts) must exactly match. AreaCnts consists of an area identifier and count:

Flink SQL> DESCRIBE AreaCnts;
root
 |-- areaId: INT
 |-- cnt: BIGINT

To check, whether your query is producing a correct result, you can start the MySQL CLI client by running the following command in the folder that contains the docker-compose.yml file:

docker-compose exec mysql mysql -D flinksql -u flink --password=secret

Once you started the MySQL CLI client, you can query the MySQL table with regular SQL, such as for example:

mysql> SELECT * FROM AreaCnts ORDER BY cnt DESC LIMIT 10;
+---------+-----+
| areaId  | cnt |
+---------+-----+
| 8252892 |  90 |
|   51781 |  44 |
|   45548 |  32 |
|   46298 |  28 |
|   50801 |  22 |
|   52542 |  20 |
|   51047 |  20 |
|   51532 |  19 |
|   49551 |  18 |
|   54285 |  18 |
+---------+-----+
10 rows in set (0.01 sec)

By repeatedly running the same query, you can observe how Flink maintains the query result as a materialize view in the MySQL table.

Note: An INSERT INTO query cannot be stopped from the CLI client yet. Please use the Flink web UI (at http://localhost:8081) to inspect the query and cancel it.

Click to see the solution.
INSERT INTO AreaCnts
SELECT
  toAreaId(lon, lat) AS areaId,
  COUNT(*) AS cnt
FROM Rides
WHERE isStart
GROUP BY toAreaId(lon, lat);