-
Notifications
You must be signed in to change notification settings - Fork 228
Joining Dynamic Tables
Flink SQL supports three common ways to join streaming data:
- Interval Joins
- Temporal Table Joins
- Regular (Fully-Materializing) Joins
This session shows you when and how to use the different types of joins.
An interval join is a join with a temporal join interval predicate. The interval predicate defines for two records of both input tables how much their timestamps may be apart from each other to qualify for the join.
Due to the temporal relationship of the input tables, the join operator does not need to maintain any of the input tables completely in state. Instead, only the relevant tail of the stream is kept in state.
Compute the average tip per hour of day and number of passengers.
Click here for hints.
- The tip paid for a ride is contained in the
Fares
table. - The payment event is expected to happen at most 5 minutes before the trip ends. Hence, the
payTime
timestamp of aFares
record is at most 5 minutes earlier and not later than therideTime
of the corresponding ride's end event. - Use the built-in
HOUR
function to extract the hour of a timestamp.
The output should look similar to:
hourOfDay avgTip
0 0.9319049
1 1.100541
2 1.1744025
3 1.2137822
4 1.1707343
5 1.1629586
6 1.1505183
Click to see the solution.
SELECT
HOUR(r.rideTime) AS hourOfDay,
r.psgCnt,
AVG(f.tip) AS avgTip
FROM
Rides r,
Fares f
WHERE
r.rideId = f.rideId AND
NOT r.isStart AND
f.payTime BETWEEN r.rideTime - INTERVAL '5' MINUTE AND r.rideTime
GROUP BY
HOUR(r.rideTime), r.psgCnt;
The query joins ride end events of the Rides
table with payment events of the Fares
table on the rideId
key and their timestamps. The interval join condition joins a payment and a ride event if the payTime
attribute is at most 5 minutes earlier than the rideTime
attribute. After both tables are joined, the query groups by the hour of day (computed with the HOUR
function) and computes the average tip.
In this exercise we want to compute the duration of every taxi ride, i.e., the time between its start and end event, in minutes. This means that we need to join start event and end event based on the ride id.
We are only interested in rides that start and end in New York City and take less than two hours.
Click here for hints.
- Filter the
Rides
table to separate start and end events. - Use an interval join to associate the start and end events of a ride. Only join an end event if it arrives within 2 hours after a start event has arrived.
- Use the built-in function
TIMESTAMPDIFF(timepointunit, timepoint1, timepoint2)
to compute the difference of two timestamps in minutes.
The output should look similar to:
rideId durationMin
52693 13
46868 24
53226 12
53629 11
55651 7
43220 31
53082 12
54716 9
55125 9
57211 4
44795 28
53563 12
Click to see the solution.
SELECT
s.rideId,
TIMESTAMPDIFF(MINUTE, s.rideTime, e.rideTime) AS durationMin
FROM
(SELECT * FROM Rides WHERE isStart AND isInNYC(lon, lat)) s,
(SELECT * FROM Rides WHERE NOT isStart AND isInNYC(lon, lat)) e
WHERE
s.rideId = e.rideId AND
e.rideTime BETWEEN s.rideTime AND s.rideTime + INTERVAL '2' HOUR;
This query performs an interval join between two subqueries. The first subquery s
returns the start events within New York City. The second subquery e
returns the end events within New York City.
Both tables are joined on the rideId
column. Additionally, the WHERE
clause specifies time constraints between an end event and a start event. This ensures to only join two taxi rides if the end event arrives within 2 hours after the start event.
The resulting durationMin
column contains the duration between both timestamps in minutes.
A temporal table join joins a streaming table with a temporal table. A temporal table maintains the history of a table and is able to return the table's rows for a specific point in time. For each record of the streaming table, the join looks up the rows of the version of the temporal table that corresponds to the timestamp of current record and joins them according to additional join predicates.
Due to the temporal condition, the join operator only holds the relevant history of the temporal table in state and does not store the streaming table at all.
Identify all drivers who served in 15 minutes at least 10 passengers.
Click here for hints.
- The temporal table function
Drivers
gives provides the driver that operated a taxi at a specific point in time.
The output should look similar to:
driverId srvdPsgCnt t
2013000155 12 2013-01-01 00:00:00.0
2013000233 12 2013-01-01 00:00:00.0
2013000230 31 2013-01-01 00:00:00.0
2013001174 12 2013-01-01 00:00:00.0
2013000014 12 2013-01-01 00:00:00.0
2013000595 12 2013-01-01 00:00:00.0
2013002453 12 2013-01-01 00:00:00.0
2013000124 12 2013-01-01 00:00:00.0
2013000117 18 2013-01-01 00:00:00.0
Click to see the solution.
SELECT
d.driverId,
SUM(r.psgCnt) AS srvdPsgCnt,
TUMBLE_START(r.rideTime, INTERVAL '15' MINUTE) AS t
FROM
Rides r,
LATERAL TABLE(Drivers(r.rideTime)) d
WHERE
r.taxiId = d.taxiId AND
r.isStart
GROUP BY
d.driverId,
TUMBLE(r.rideTime, INTERVAL '15' MINUTE)
HAVING SUM(r.psgCnt) >= 10;
The query joins each ride start event of the Rides
table with the temporal Drivers
table and enriches it with the driver who was driving the taxi when the ride started. Subsequently, the query groups the enriched data by the driverId
and a 15 minute window and computes the number of served passengers. The final HAVING
clause removes all drivers who did not serve at least 10 passengers.
Regular joins are joins without a temporal join condition. Due to the lack of a temporal join condition, both input tables of the join need to be fully materialized as query state. This can become a problem if one (or both) of the tables infinitely grows unless you configure a query to automatically cleanup stale rows (rows that have not been joined or updated for a certain time).
Regular joins work fine if both input tables are continuously updated but are bounded in sized.
Compute for each taxi the total number of shifts (i.e., driver changes) and the number of served passengers.
Click here for hints.
- Compute the number of shifts and passengers in two separate subqueries and join the results on the
taxiId
attribute.
The output should look similar to:
taxiId shiftCnt psgCnt
2013009497 1 26
2013009261 1 8
2013003937 2 27
2013008238 1 30
2013006197 1 12
2013001680 1 29
2013005414 2 10
2013004937 1 16
Click to see the solution.
SELECT s.taxiId, s.shiftCnt, p.psgCnt
FROM
(SELECT taxiId, COUNT(*) AS shiftCnt
FROM DriverChanges
GROUP BY taxiId) s
JOIN
(SELECT taxiId, SUM(psgCnt) AS psgCnt
FROM Rides
WHERE isStart
GROUP BY taxiId) p
ON s.taxiId = p.taxiId;
The first subquery computes the number of shifts by counting the number of driver changes per taxi. The second subquery sums the number of passengers per taxi. Finally, the outer query joins the result of both subqueries.
Since both subqueries produce updating tables, which are bounded by the number of distinct taxiId
values, we can join their results with a fully materializing join without running into state size issue.
Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.