Skip to content

Commit

Permalink
Modify congestion SQL and readme
Browse files Browse the repository at this point in the history
  • Loading branch information
chmnata committed Sep 21, 2020
1 parent ead636f commit a9fadf7
Show file tree
Hide file tree
Showing 5 changed files with 481 additions and 0 deletions.
16 changes: 16 additions & 0 deletions congestion_data_aggregation/sql/ReadMe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Data source and data flow

## Automatic Pipeline

1) Citywide Travel Time Index
- Function `congestion.generate_citywide_tti_daily` insert an averaged daily hourly citywide travel time index from 6am to 11pm every month. It aggregates travel time on a link level up to segments, filtering the segments that does not have at least 80% of its lengeth worth of links. Highway uses a baseline of 25th percetile of travel time while the rest of the streets use a baseline of 10th percentile of travel time. Highway is defined in `congestion.highway_segments_v5`. This function runs monthly with `congestion_refresh` DAG.

2) Citywide Buffer Index
- Function `congestion.generate_citywide_bi_monthly` insert an averaged monthly hourly citywide buffer index from 6am to 11pm every month. It aggregates travel time on a link level up to segments, filtering the segments that does not have at least 80% of its length worth of links. This function runs monthly with `congestion_refresh` DAG.

3) Corridor Travel Time Index
- Function `congestion.generate_corridor_tti_weekly` insert an averaged weekly hourly corridor travel time index from 6am to 11pm every month for each corridor. It first aggregates travel time on a link level up to segments, filtering the segments that does not have at least 80% of its length worth of links. It then aggregates segments up to corridor, also filtering corridors that does not have at least 80% of its length worth of segments. Highway uses a baseline of 25th percetile of travel time while the rest of the streets use a baseline of 10th percentile of travel time. Highway is defined in `congestion.highway_segments_v5`. This function runs monthly with `congestion_refresh` DAG.

4) Corridor Buffer Index
- Function `congestion.generate_corridor_bi_monthly` insert an averaged monthly hourly citywide buffer index from 6am to 11pm every month. It aggregates travel time on a link level up to segments, filtering the segments that does not have at least 80% of its length worth of links. It then aggregates segments up to corridor, also filtering corridors that does not have at least 80% of its length worth of segments. This function runs monthly with `congestion_refresh` DAG.

104 changes: 104 additions & 0 deletions congestion_data_aggregation/sql/generate_citywide_bi_monthly.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
FUNCTION: generate_citywide_bi_hourly
PURPOSE: This function produces estimates of a city-wide buffer index (BI) for every hour of the day, every month
INPUTS: dt_: first day of month
*/

-- DROP FUNCTION congestion.generate_citywide_bi_monthly(date);

CREATE OR REPLACE FUNCTION congestion.generate_citywide_bi_monthly(
_dt date)

RETURNS void
LANGUAGE 'sql'

COST 100
VOLATILE SECURITY DEFINER

AS $BODY$

/*
speed_links: Produces estimates of the average speed for each 30-minute bin for each individual link (link_dir)
*/
WITH speed_links AS (
SELECT segment_id,
link_dir,
datetime_bin(tx,30) AS datetime_bin,
harmean(mean) AS spd_avg_all,
length AS link_length,
COUNT (DISTINCT tx) AS count_hc

FROM here.ta
INNER JOIN congestion.segment_links_v5_19_4 using (link_dir)
LEFT JOIN ref.holiday hol ON hol.dt = tx::date

WHERE hol.dt IS NULL AND
date_part('isodow'::text, tx::date) < 6 AND
(tx >= _dt AND tx < ( _dt + '1 mon'::interval))

GROUP BY segment_id,link_dir,datetime_bin(tx,60), link_length
),

/*
seg_tt: Produces estimates of the average travel time for each 30-minute bin for each individual segment (segment_id)
*/
seg_tt AS (
SELECT segment_id,
datetime_bin,
CASE WHEN SUM(link_length) >= 0.8 * b.length
THEN SUM(link_length / spd_avg_all * 3.6 ) * b.length / SUM(link_length)
END AS spd_avg_all,
SUM(link_length) / b.length * 100 AS data_pct_hc

FROM speed_links
INNER JOIN congestion.segments_v5 b USING (segment_id)

WHERE link_length / spd_avg_all IS NOT NULL

GROUP BY segment_id, datetime_bin, b.length
ORDER BY segment_id, datetime_bin
),

/*
seg_bi: Produces estimates of the average buffer index for each month for each 30-minute by segment (segment_id)
*/
seg_bi AS (
SELECT a.segment_id,
date_part('month'::text, a.datetime_bin) AS month,
datetime_bin::time without time zone AS time_bin,
count(a.datetime_bin) AS num_bins,
avg(a.spd_avg_all) AS avg_tt,
percentile_cont(0.95::double precision) WITHIN GROUP (ORDER BY a.spd_avg_all) as pct_95,
(percentile_cont(0.95::double precision) WITHIN GROUP (ORDER BY a.spd_avg_all) - avg(a.spd_avg_all))/ avg(a.spd_avg_all) AS bi

FROM seg_tt a

WHERE a.spd_avg_all IS NOT NULL

GROUP BY a.segment_id, (a.datetime_bin::time without time zone), date_part('month'::text, a.datetime_bin)
ORDER BY a.segment_id, date_part('month'::text, a.datetime_bin), (a.datetime_bin::time without time zone)
)

/*
Final Output: Inserts an estimate of the city-wide BI (weighted by length of segment and sqrt of AADT) into congestion.citywide_bi_monthly
*/
INSERT INTO congestion.citywide_bi_monthly(month, time_bin, num_segments, bi)

SELECT month::int,
time_bin,
count(segment_id) AS num_segments,
sum(bi * segments_v5.length * segment_aadt_final.aadt)/sum(segments_v5.length * segment_aadt_final.aadt) AS bi

FROM seg_bi
INNER join congestion.segments_v5 using (segment_id)
INNER join covid.segment_aadt_final USING (segment_id)

WHERE time_bin <@ '[06:00:00,23:00:00)'::timerange

GROUP BY month, time_bin
ORDER BY month, time_bin

$BODY$;

ALTER FUNCTION congestion.generate_citywide_bi_monthly(date)
OWNER TO congestion_admins;
99 changes: 99 additions & 0 deletions congestion_data_aggregation/sql/generate_citywide_tti_daily.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
FUNCTION: generate_citywide_tti_daily
PURPOSE: This function produces estimates of a city-wide travel time index (TTI) for every 30-minute of the day
INPUTS: dt_: day of aggregation
*/

-- DROP FUNCTION congestion.generate_citywide_tti_daily(date);

CREATE OR REPLACE FUNCTION congestion.generate_citywide_tti_daily(
_dt date)

RETURNS void
LANGUAGE 'sql'

COST 100
VOLATILE SECURITY DEFINER

AS $BODY$

/*
speed_links: Produces estimates of the average speed for each 30-minute bin for each individual link (link_dir)
*/
WITH speed_links AS (
SELECT segment_id,
link_dir,
length AS link_length,
datetime_bin(tx, 30) AS datetime_bin,
harmean(mean) AS spd_avg,
COUNT(DISTINCT tx) AS count_hc

FROM here.ta
INNER JOIN congestion.segment_links_v5_19_4 USING (link_dir)

WHERE tx <@ '[06:00:00, 23:00:00]'::timerange AND
(tx >= _dt AND tx < ( $1 + '1 day'::interval))

GROUP BY segment_id, link_dir, datetime_bin(tx, 30), length
),

/*
daily: Produces estimates of the average travel time for each 30-minute bin for each individual segment (segment_id), where at least 80% of the segment (by distance) has observations at the link (link_dir) level
*/
daily AS (
SELECT segment_id,
datetime_bin,
CASE WHEN SUM(link_length) >= 0.8 * b.length
THEN SUM(link_length / spd_avg * 3.6 ) * b.length / SUM(link_length)
ELSE
NULL
END AS segment_tt_avg

FROM speed_links
INNER JOIN congestion.segments_v5 b USING (segment_id)

WHERE link_length / spd_avg IS NOT NULL

GROUP BY segment_id, datetime_bin, b.length
ORDER BY segment_id, datetime_bin
),

/*
seg_tti: Produces estimates of the average travel time index (using 10th percentile travel times for highways and 25th percentile for other segments as a baseline) for each month for each hour by segment (segment_id)
*/
seg_tti AS (
SELECT segment_id,
dt,
time_bin,
CASE WHEN highway.segment_id IS NOT NULL
THEN tti.segment_tt_avg/b.tt_baseline_10pct_corr
ELSE
tti.segment_tt_avg/b.tt_baseline_25pct_corr
END AS tti

FROM daily tti
LEFT JOIN congestion.tt_segments_baseline_v5_2019_af b USING (segment_id)
LEFT JOIN congestion.highway_segments_v5 highway USING (segment_id)
)

/*
Final Output: Inserts an estimate of the city-wide TTI (weighted by length of segment and sqrt of AADT) into congestion.citywide_tti_daily
*/

INSERT INTO congestion.citywide_tti_daily(dt, time_bin, num_segments, tti)
SELECT dt,
time_bin,
count(seg_tti.segment_id) AS num_segments,
sum(seg_tti.tti * segments_v5.length * sqrt(segment_aadt_final.aadt)) / sum(segments_v5.length * sqrt(segment_aadt_final.aadt)) AS tti

FROM seg_tti
INNER JOIN congestion.segments_v5 USING (segment_id)
INNER JOIN covid.segment_aadt_final USING (segment_id)

GROUP BY dt, time_bin
ORDER BY dt, time_bin

$BODY$;

ALTER FUNCTION congestion.generate_citywide_tti_daily(date)
OWNER TO congestion_admins;
125 changes: 125 additions & 0 deletions congestion_data_aggregation/sql/generate_corridor_bi_monthly.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
FUNCTION: generate_corridor_bi_hourly
PURPOSE: This function produces estimates of a corridor-level buffer index (BI) for every hour of the day, every month
INPUTS: dt_: first day of month
*/

-- DROP FUNCTION congestion.generate_corridor_bi_monthly(date);

CREATE OR REPLACE FUNCTION congestion.generate_corridor_bi_monthly(
_dt date)

RETURNS void
LANGUAGE 'sql'

COST 100
VOLATILE SECURITY DEFINER

AS $BODY$

/*
speed_links: Produces estimates of the average speed for each 60-minute bin for each individual link (link_dir)
*/
WITH speed_links AS (
SELECT segment_id,
link_dir,
datetime_bin(tx,60) AS datetime_bin,
harmean(mean) AS spd_avg_all,
length AS link_length,
COUNT (DISTINCT tx) AS count_hc

FROM here.ta
INNER JOIN congestion.segment_links_v5_19_4 using (link_dir)
LEFT JOIN ref.holiday hol ON hol.dt = tx::date

WHERE hol.dt IS NULL AND
date_part('isodow'::text, tx::date) < 6 AND
(tx >= _dt AND tx < ( _dt + '1 mon'::interval))

GROUP BY segment_id, link_dir, datetime_bin(tx,60), link_length
),

/*
seg_tt: Produces estimates of the average travel time for each 60-minute bin for each individual segment (segment_id)
*/
seg_tt AS (
SELECT segment_id,
datetime_bin,
CASE WHEN SUM(link_length) >= 0.8 * b.length
THEN SUM(link_length / spd_avg_all * 3.6 ) * b.length / SUM(link_length)
END AS spd_avg_all,
SUM(link_length) / b.length * 100 AS data_pct_hc

FROM speed_links
INNER JOIN congestion.segments_v5 b USING (segment_id)

WHERE link_length / spd_avg_all IS NOT NULL

GROUP BY segment_id, datetime_bin, b.length
ORDER BY segment_id, datetime_bin
),

/*
seg_bi: Produces estimates of the average buffer index for each month for each hour by segment (segment_id)
*/
seg_bi AS (
SELECT a.segment_id,
date_trunc('month', datetime_bin) AS month,
datetime_bin::time without time zone AS time_bin,
count(a.datetime_bin) AS num_bins,
avg(a.spd_avg_all) AS avg_tt,
percentile_cont(0.95::double precision) WITHIN GROUP (ORDER BY a.spd_avg_all) AS pct_95,
(percentile_cont(0.95::double precision) WITHIN GROUP (ORDER BY a.spd_avg_all) - avg(a.spd_avg_all))/ avg(a.spd_avg_all) AS bi

FROM seg_tt a

WHERE a.spd_avg_all IS NOT NULL

GROUP BY a.segment_id, (a.datetime_bin::time without time zone), date_trunc('month', datetime_bin)
ORDER BY a.segment_id, date_trunc('month', datetime_bin), (a.datetime_bin::time without time zone)
),

/*
cor_bi: Produces estimates of the buffer index (BI) for each month for each hour by corridor (corridor_id)
*/
cor_bi AS (
SELECT corridor_id,
month,
time_bin,
sum(bi*seg.length)/cor.length AS bi,
sum(seg.length) AS seg_length,
cor.length AS cor_length

FROM seg_bi
JOIN congestion.segments_v5 seg using (segment_id)
JOIN congestion.corridors_v1_merged_lookup using (segment_id)
JOIN congestion.corridors_v1_merged cor using (corridor_id)

GROUP BY corridor_id, month, time_bin, cor.length
)

/*
Final Output: Inserts an estimate of the corridor buffer index (BI) into congestion.corridor_bi_hourly, where at least 80% of the corridor (by distance) has observations at the segment (segment_id) level
*/
INSERT INTO congestion.corridor_bi_monthly

SELECT corridor_id,
month,
time_bin,
CASE WHEN cor_length*0.8 < seg_length
THEN bi
ELSE
NULL
END AS bi

FROM cor_bi

WHERE time_bin <@ '[06:00:00, 23:00:00)'::timerange

GROUP BY corridor_id, month, time_bin, cor_length, seg_length, bi
ORDER BY corridor_id, month, time_bin

$BODY$;

ALTER FUNCTION congestion.generate_corridor_bi_monthly(date)
OWNER TO congestion_admins;
Loading

1 comment on commit a9fadf7

@chmnata
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#49

Please sign in to comment.