Skip to content

Commit

Permalink
Merge pull request #32 from Roco-scientist/train_async
Browse files Browse the repository at this point in the history
Train async
  • Loading branch information
Roco-scientist authored Sep 1, 2021
2 parents bde342b + 5e1b606 commit 88200d1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 24 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mbta_countdown"
version = "0.4.0"
version = "0.5.0"
authors = ["Rory Coffey <coffeyrt@gmail.com>"]
edition = "2018"

Expand All @@ -10,7 +10,7 @@ edition = "2018"
chrono = "0.4"
ht16k33 = "0.4"
lazy_static = "1.4"
reqwest = {version = "0.10.0-alpha.2", features = ["blocking", "json"]}
reqwest = {version = "0.11", features = ["blocking", "json"]}
rppal = {version = "0.11", features = ["hal-unproven"]}
serde_json = "1.0"
ssd1306 = "0.4"
Expand All @@ -21,3 +21,4 @@ scraper = "0.12.0"
rayon = "1.5.1"
termion = "1.5.6"
tokio = { version = "1", features = ["full"] }
futures = {version = "0.3.15", features = ["executor"]}
10 changes: 6 additions & 4 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std;
use std::{
cmp,
collections::HashMap,
error,
io::{stdout, Read, Write},
process::{exit, Command},
sync::{Arc, Mutex},
Expand Down Expand Up @@ -98,6 +99,7 @@ async fn main() {
// Get the scheduled and predicted train times to display and countdown from
let train_times = Arc::new(Mutex::new(
mbta_countdown::train_time::train_times(&dir_code, &station, &vehicle_code)
.await
.unwrap_or_else(|err| panic!("ERROR - train_times - {}", err)),
));

Expand All @@ -116,7 +118,7 @@ async fn main() {
// get the first and last train for the day to know when to pause the displays and not
// continually update when there are no trains arriving
let last_first =
mbta_countdown::train_time::max_min_times(&dir_code, &station, &vehicle_code)
mbta_countdown::train_time::max_min_times(&dir_code, &station, &vehicle_code).await
.unwrap_or_else(|err| panic!("Error - max min times - {}", err));
let mut last_time;
let mut first_time;
Expand Down Expand Up @@ -150,7 +152,7 @@ async fn main() {

// after 3 am get the first and last vehicle times
let last_first_thread =
mbta_countdown::train_time::max_min_times(&dir_code, &station, &vehicle_code)
mbta_countdown::train_time::max_min_times(&dir_code, &station, &vehicle_code).await
.unwrap_or_else(|err| panic!("Error - max min times - {}", err));
if let Some([last, first]) = last_first_thread {
last_time = last;
Expand Down Expand Up @@ -231,7 +233,7 @@ async fn main() {
// If there is no error on retrieving the train times from the website, update the
// train_times variable, otherwise allow up to 5 errors
if let Ok(new_train_times) =
mbta_countdown::train_time::train_times(&dir_code, &station, &vehicle_code)
mbta_countdown::train_time::train_times(&dir_code, &station, &vehicle_code).await
{
*train_times_clone.lock().unwrap() = new_train_times;
train_time_errors = 0;
Expand Down Expand Up @@ -320,7 +322,7 @@ async fn main() {
}

/// Gets the command line arguments
pub fn arguments() -> Result<(String, String, u8, String, String), Box<dyn std::error::Error>> {
pub fn arguments() -> Result<(String, String, u8, String, String), Box<dyn error::Error>> {
// get station and vehicle conversions for the MBTA API
let (vehicle_info, station_info) = mbta_countdown::mbta_info::all_mbta_info(false)?;
// get a list of stations to limit the station argument input
Expand Down
40 changes: 22 additions & 18 deletions src/train_time.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
use chrono;
use reqwest;
use std;
use chrono::prelude::*;
use chrono::{DateTime, Local, TimeZone};
use reqwest;
use serde_json::Value;
use std::{collections::HashMap, error::Error};

/// Main function to retrieve train times from Forest Hills Station for inbound commuter rail
pub fn train_times(
// Main function to retrieve train times from Forest Hills Station for inbound commuter rail
pub async fn train_times(
dir_code: &str,
station: &str,
route_code: &str,
) -> Result<Option<Vec<DateTime<Local>>>, Box<dyn Error>> {
// get prediction times
let prediction_times = get_prediction_times(station, dir_code, route_code)?;
let prediction_times_task = get_prediction_times(station, dir_code, route_code);
// get schuduled times, if None, create empty hashmap
let mut scheduled_times =
get_scheduled_times(station, dir_code, route_code, true)?.unwrap_or(HashMap::new());
let scheduled_times_task =
get_scheduled_times(station, dir_code, route_code, true);
let prediction_times = prediction_times_task.await?;
let mut scheduled_times = scheduled_times_task.await?.unwrap_or(HashMap::new());
// let (prediction_times, scheduled_times_start) = try_join!(prediction_times_task, scheduled_times_task)?;
// let mut scheduled_times = scheduled_times_start.unwrap_or(HashMap::new());
// if there are predicted times, replace the scheduled times with the more accurate predicted
// tiem
if let Some(pred_times) = prediction_times {
Expand Down Expand Up @@ -47,12 +51,12 @@ pub fn train_times(
return Ok(Some(all_times));
}

pub fn max_min_times(
pub async fn max_min_times(
dir_code: &str,
station: &str,
route_code: &str,
) -> Result<Option<[DateTime<Local>; 2]>, Box<dyn Error>> {
if let Some(scheduled_times) = get_scheduled_times(station, dir_code, route_code, false)? {
if let Some(scheduled_times) = get_scheduled_times(station, dir_code, route_code, false).await? {
let mut all_times = scheduled_times
.values()
.map(|date| date.clone())
Expand All @@ -69,48 +73,48 @@ pub fn max_min_times(
}

/// Retreived MBTA predicted times with their API
fn get_prediction_times(
async fn get_prediction_times(
station: &str,
dir_code: &str,
route_code: &str,
) -> Result<Option<HashMap<String, DateTime<Local>>>, Box<dyn Error>> {
// MBTA API for predicted times
let address = format!("https://api-v3.mbta.com/predictions?filter[stop]={}&filter[direction_id]={}&include=stop&filter[route]={}", station, dir_code, route_code);
return get_route_times(address);
return get_route_times(address).await;
}

/// Retreived MBTA scheduled times with their API
fn get_scheduled_times(
async fn get_scheduled_times(
station: &str,
dir_code: &str,
route_code: &str,
filter_time: bool,
) -> Result<Option<HashMap<String, DateTime<Local>>>, Box<dyn Error>> {
let address;
) -> Result<Option<HashMap<String, DateTime<Local>>>, Box<dyn std::error::Error>> {
let address;
if filter_time {
let now = chrono::Local::now();
// MBTA API for scheduled times
address = format!("https://api-v3.mbta.com/schedules?include=route,trip,stop&filter[min_time]={}%3A{}&filter[stop]={}&filter[route]={}&filter[direction_id]={}",now.hour(), now.minute(), station, route_code, dir_code);
} else {
address = format!("https://api-v3.mbta.com/schedules?include=route,trip,stop&filter[stop]={}&filter[route]={}&filter[direction_id]={}", station, route_code, dir_code);
}
return get_route_times(address);
return get_route_times(address).await;
}

/// Retreives the JSON from MBTA API and parses it into a hasmap
fn get_route_times(
async fn get_route_times(
address: String,
) -> Result<Option<HashMap<String, DateTime<Local>>>, Box<dyn Error>> {
// retrieve the routes with the MBTA API returning a converted JSON format
let routes_json: Value = reqwest::blocking::get(&address)?.json()?;
let routes_json: Value = reqwest::get(&address).await?.json().await?;
// only interested in the "data" field
let data_option = routes_json.get("data");
// if there is a "data" field, proceed
if let Some(data) = data_option {
// if the "data" field is an array, proceed
if let Some(data_array) = data.as_array() {
// create a new HashMap to put int trip_id and departure time
let mut commuter_rail_dep_time: HashMap<String, chrono::DateTime<Local>> =
let mut commuter_rail_dep_time: HashMap<String, DateTime<Local>> =
HashMap::new();
// for each train in the data array, insert the trip_id and departure time
for train in data_array {
Expand Down

0 comments on commit 88200d1

Please sign in to comment.