Skip to content

Commit

Permalink
add more logging and explicit parameter passing for cache flag
Browse files Browse the repository at this point in the history
  • Loading branch information
conbrad committed Sep 25, 2024
1 parent 30f53df commit 771ee9c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 106 deletions.
4 changes: 3 additions & 1 deletion api/app/morecast_v2/forecasts.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ async def construct_wf1_forecasts(session: ClientSession, forecast_records: List
start_time = vancouver_tz.localize(datetime.combine(min_forecast_date, time.min))
end_time = vancouver_tz.localize(datetime.combine(max_forecast_date, time.max))
unique_station_codes = list(set([f.station_code for f in forecast_records]))
dailies = await get_forecasts_for_stations_by_date_range(session, header, start_time, end_time, unique_station_codes, False)
dailies = await get_forecasts_for_stations_by_date_range(
session=session, header=header, start_time_of_interest=start_time, end_time_of_interest=end_time, unique_station_codes=unique_station_codes, check_cache=False
)

# Shape the WF1 dailies into a dictionary keyed by station codes for easier consumption
grouped_dailies = defaultdict(list[StationDailyFromWF1])
Expand Down
7 changes: 5 additions & 2 deletions api/app/wildfire_one/wfwx_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,8 @@ async def get_dailies_generator(
# for production, it's more tricky - we don't want to put too much load on the wf1 api, but we don't
# want stale values either. We default to 5 minutes, or 300 seconds.
cache_expiry_seconds: Final = int(config.get("REDIS_DAILIES_BY_STATION_CODE_CACHE_EXPIRY", 300))
use_cache = check_cache is True and cache_expiry_seconds is not None and config.get("REDIS_USE") == "True"
use_cache = check_cache is True and config.get("REDIS_USE") == "True"
logger.info(f"Using cache: {use_cache}")

dailies_iterator = fetch_paged_response_generator(
session,
Expand Down Expand Up @@ -346,7 +347,9 @@ async def get_forecasts_for_stations_by_date_range(
# get station information from the wfwx api
wfwx_stations = await get_wfwx_stations_from_station_codes(session, header, unique_station_codes)
# get the daily forecasts for all the stations in the date range
raw_dailies = await get_dailies_generator(session, header, wfwx_stations, start_time_of_interest, end_time_of_interest, check_cache)
raw_dailies = await get_dailies_generator(

Check warning on line 350 in api/app/wildfire_one/wfwx_api.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wfwx_api.py#L350

Added line #L350 was not covered by tests
session=session, header=header, wfwx_stations=wfwx_stations, time_of_interest=start_time_of_interest, end_time_of_interest=end_time_of_interest, check_cache=check_cache
)

forecast_dailies = await mapper(raw_dailies, WF1RecordTypeEnum.FORECAST)

Expand Down
175 changes: 72 additions & 103 deletions api/app/wildfire_one/wildfire_fetchers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
""" Functions that request and marshall WFWX API responses into our schemas"""
"""Functions that request and marshall WFWX API responses into our schemas"""

import math
import logging
from datetime import datetime
Expand All @@ -9,9 +10,7 @@
from app.data.ecodivision_seasons import EcodivisionSeasons
from app.rocketchat_notifications import send_rocketchat_notification
from app.schemas.observations import WeatherStationHourlyReadings
from app.schemas.stations import (DetailedWeatherStationProperties,
GeoJsonDetailedWeatherStation,
WeatherStationGeometry)
from app.schemas.stations import DetailedWeatherStationProperties, GeoJsonDetailedWeatherStation, WeatherStationGeometry
from app.db.crud.stations import _get_noon_date
from app.wildfire_one.query_builders import BuildQuery
from app import config
Expand All @@ -22,28 +21,27 @@
logger = logging.getLogger(__name__)


async def _fetch_cached_response(session: ClientSession, headers: dict, url: str, params: dict,
cache_expiry_seconds: int):
async def _fetch_cached_response(session: ClientSession, headers: dict, url: str, params: dict, cache_expiry_seconds: int):
cache = create_redis()
key = f'{url}?{urlencode(params)}'
key = f"{url}?{urlencode(params)}"

Check warning on line 26 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L26

Added line #L26 was not covered by tests
try:
cached_json = cache.get(key)
except Exception as error:
cached_json = None
logger.error(error, exc_info=error)
if cached_json:
logger.info('redis cache hit %s', key)
logger.info("redis cache hit %s", key)

Check warning on line 33 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L33

Added line #L33 was not covered by tests
response_json = json.loads(cached_json.decode())
else:
logger.info('redis cache miss %s', key)
logger.info("redis cache miss %s", key)

Check warning on line 36 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L36

Added line #L36 was not covered by tests
async with session.get(url, headers=headers, params=params) as response:
try:
response_json = await response.json()
except json.decoder.JSONDecodeError as error:
logger.error(error, exc_info=error)
text = await response.text()
logger.error('response.text() = %s', text)
send_rocketchat_notification(f'JSONDecodeError, response.text() = {text}', error)
logger.error("response.text() = %s", text)
send_rocketchat_notification(f"JSONDecodeError, response.text() = {text}", error)

Check warning on line 44 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L43-L44

Added lines #L43 - L44 were not covered by tests
raise
try:
if response.status == 200:
Expand All @@ -54,14 +52,9 @@ async def _fetch_cached_response(session: ClientSession, headers: dict, url: str


async def fetch_paged_response_generator(
session: ClientSession,
headers: dict,
query_builder: BuildQuery,
content_key: str,
use_cache: bool = False,
cache_expiry_seconds: int = 86400
session: ClientSession, headers: dict, query_builder: BuildQuery, content_key: str, use_cache: bool = False, cache_expiry_seconds: int = 86400
) -> AsyncGenerator[dict, None]:
""" Asynchronous generator for iterating through responses from the API.
"""Asynchronous generator for iterating through responses from the API.
The response is a paged response, but this generator abstracts that away.
"""
# We don't know how many pages until our first call - so we assume one page to start with.
Expand All @@ -70,14 +63,16 @@ async def fetch_paged_response_generator(
while page_count < total_pages:
# Build up the request URL.
url, params = query_builder.query(page_count)
logger.debug('loading page %d...', page_count)
if use_cache and config.get('REDIS_USE') == 'True':
logger.debug("loading page %d...", page_count)
if use_cache and config.get("REDIS_USE") == "True":
logger.info("Using cache")

Check warning on line 68 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L68

Added line #L68 was not covered by tests
# We've been told and configured to use the redis cache.
response_json = await _fetch_cached_response(session, headers, url, params, cache_expiry_seconds)
else:
logger.info("Not using cache")
async with session.get(url, headers=headers, params=params) as response:
response_json = await response.json()
logger.debug('done loading page %d.', page_count)
logger.debug("done loading page %d.", page_count)

# keep this code around for dumping responses to a json file - useful for when you're writing
# tests to grab actual responses to use in fixtures.
Expand All @@ -88,51 +83,40 @@ async def fetch_paged_response_generator(
# json.dump(response_json, f)

# Update the total page count.
total_pages = response_json['page']['totalPages'] if 'page' in response_json else 1
for response_object in response_json['_embedded'][content_key]:
total_pages = response_json["page"]["totalPages"] if "page" in response_json else 1
for response_object in response_json["_embedded"][content_key]:
yield response_object
# Keep track of our page count.
page_count = page_count + 1


async def fetch_detailed_geojson_stations(
session: ClientSession,
headers: dict,
query_builder: BuildQuery) -> Tuple[Dict[int, GeoJsonDetailedWeatherStation], Dict[str, int]]:
""" Fetch and marshall geojson station data"""
async def fetch_detailed_geojson_stations(session: ClientSession, headers: dict, query_builder: BuildQuery) -> Tuple[Dict[int, GeoJsonDetailedWeatherStation], Dict[str, int]]:
"""Fetch and marshall geojson station data"""
stations = {}
id_to_code_map = {}
# 1 week seems a reasonable period to cache stations for.
redis_station_cache_expiry: Final = int(config.get('REDIS_STATION_CACHE_EXPIRY', 604800))
redis_station_cache_expiry: Final = int(config.get("REDIS_STATION_CACHE_EXPIRY", 604800))
# Put the stations in a nice dictionary.
async for raw_station in fetch_paged_response_generator(session,
headers,
query_builder,
'stations',
True,
redis_station_cache_expiry):
station_code = raw_station.get('stationCode')
station_status = raw_station.get('stationStatus', {}).get('id')
async for raw_station in fetch_paged_response_generator(session, headers, query_builder, "stations", True, redis_station_cache_expiry):
station_code = raw_station.get("stationCode")
station_status = raw_station.get("stationStatus", {}).get("id")
# Because we can't filter on status in the RSQL, we have to manually exclude stations that are
# not active.
if is_station_valid(raw_station):
id_to_code_map[raw_station.get('id')] = station_code
geojson_station = GeoJsonDetailedWeatherStation(properties=DetailedWeatherStationProperties(
code=station_code,
name=raw_station.get('displayLabel')),
geometry=WeatherStationGeometry(
coordinates=[raw_station.get('longitude'), raw_station.get('latitude')]))
id_to_code_map[raw_station.get("id")] = station_code
geojson_station = GeoJsonDetailedWeatherStation(
properties=DetailedWeatherStationProperties(code=station_code, name=raw_station.get("displayLabel")),
geometry=WeatherStationGeometry(coordinates=[raw_station.get("longitude"), raw_station.get("latitude")]),
)
stations[station_code] = geojson_station
else:
logger.debug('station %s, status %s', station_code, station_status)
logger.debug("station %s, status %s", station_code, station_status)

return stations, id_to_code_map


async def fetch_raw_dailies_for_all_stations(
session: ClientSession, headers: dict, time_of_interest: datetime) -> list:
""" Fetch the noon values(observations and forecasts) for a given time, for all weather stations.
"""
async def fetch_raw_dailies_for_all_stations(session: ClientSession, headers: dict, time_of_interest: datetime) -> list:
"""Fetch the noon values(observations and forecasts) for a given time, for all weather stations."""
# We don't know how many pages until our first call - so we assume one page to start with.
total_pages = 1
page_count = 0
Expand All @@ -143,126 +127,111 @@ async def fetch_raw_dailies_for_all_stations(
# Get dailies
async with session.get(url, params=params, headers=headers) as response:
dailies_json = await response.json()
total_pages = dailies_json['page']['totalPages']
hourlies.extend(dailies_json['_embedded']['dailies'])
total_pages = dailies_json["page"]["totalPages"]
hourlies.extend(dailies_json["_embedded"]["dailies"])
page_count = page_count + 1
return hourlies


def prepare_fetch_hourlies_query(raw_station: dict, start_timestamp: datetime, end_timestamp: datetime):
""" Prepare url and params to fetch hourly readings from the WFWX Fireweather API.
"""
base_url = config.get('WFWX_BASE_URL')
"""Prepare url and params to fetch hourly readings from the WFWX Fireweather API."""
base_url = config.get("WFWX_BASE_URL")

logger.debug('requesting historic data from %s to %s', start_timestamp, end_timestamp)
logger.debug("requesting historic data from %s to %s", start_timestamp, end_timestamp)

# Prepare query params and query:
query_start_timestamp = math.floor(start_timestamp.timestamp() * 1000)
query_end_timestamp = math.floor(end_timestamp.timestamp() * 1000)

station_id = raw_station['id']
params = {'startTimestamp': query_start_timestamp,
'endTimestamp': query_end_timestamp, 'stationId': station_id}
endpoint = ('/v1/hourlies/search/'
'findHourliesByWeatherTimestampBetweenAndStationIdEqualsOrderByWeatherTimestampAsc')
url = f'{base_url}{endpoint}'
station_id = raw_station["id"]
params = {"startTimestamp": query_start_timestamp, "endTimestamp": query_end_timestamp, "stationId": station_id}
endpoint = "/v1/hourlies/search/" "findHourliesByWeatherTimestampBetweenAndStationIdEqualsOrderByWeatherTimestampAsc"
url = f"{base_url}{endpoint}"

return url, params


def prepare_fetch_dailies_for_all_stations_query(time_of_interest: datetime, page_count: int):
""" Prepare url and params for fetching dailies(that's forecast and observations for noon) for all.
stations. """
base_url = config.get('WFWX_BASE_URL')
"""Prepare url and params for fetching dailies(that's forecast and observations for noon) for all.
stations."""
base_url = config.get("WFWX_BASE_URL")
noon_date = _get_noon_date(time_of_interest)
timestamp = int(noon_date.timestamp() * 1000)
# one could filter on recordType.id==FORECAST or recordType.id==ACTUAL but we want it all.
params = {'query': f'weatherTimestamp=={timestamp}',
'page': page_count,
'size': config.get('WFWX_MAX_PAGE_SIZE', 1000)}
endpoint = ('/v1/dailies/rsql')
url = f'{base_url}{endpoint}'
logger.info('%s %s', url, params)
params = {"query": f"weatherTimestamp=={timestamp}", "page": page_count, "size": config.get("WFWX_MAX_PAGE_SIZE", 1000)}
endpoint = "/v1/dailies/rsql"
url = f"{base_url}{endpoint}"
logger.info("%s %s", url, params)
return url, params


async def fetch_hourlies(
session: ClientSession,
raw_station: dict,
headers: dict,
start_timestamp: datetime,
end_timestamp: datetime,
use_cache: bool,
eco_division: EcodivisionSeasons) -> WeatherStationHourlyReadings:
""" Fetch hourly weather readings for the specified time range for a give station """
logger.debug('fetching hourlies for %s(%s)',
raw_station['displayLabel'], raw_station['stationCode'])
session: ClientSession, raw_station: dict, headers: dict, start_timestamp: datetime, end_timestamp: datetime, use_cache: bool, eco_division: EcodivisionSeasons
) -> WeatherStationHourlyReadings:
"""Fetch hourly weather readings for the specified time range for a give station"""
logger.debug("fetching hourlies for %s(%s)", raw_station["displayLabel"], raw_station["stationCode"])

url, params = prepare_fetch_hourlies_query(raw_station, start_timestamp, end_timestamp)

cache_expiry_seconds: Final = int(config.get('REDIS_HOURLIES_BY_STATION_CODE_CACHE_EXPIRY', 300))
cache_expiry_seconds: Final = int(config.get("REDIS_HOURLIES_BY_STATION_CODE_CACHE_EXPIRY", 300))

# Get hourlies
if use_cache and cache_expiry_seconds is not None and config.get('REDIS_USE') == 'True':
if use_cache and config.get("REDIS_USE") == "True":
hourlies_json = await _fetch_cached_response(session, headers, url, params, cache_expiry_seconds)
else:
async with session.get(url, params=params, headers=headers) as response:
hourlies_json = await response.json()

hourlies = []
for hourly in hourlies_json['_embedded']['hourlies']:
for hourly in hourlies_json["_embedded"]["hourlies"]:
# We only accept "ACTUAL" values
if hourly.get('hourlyMeasurementTypeCode', '').get('id') == 'ACTUAL':
if hourly.get("hourlyMeasurementTypeCode", "").get("id") == "ACTUAL":
hourlies.append(parse_hourly(hourly))

logger.debug('fetched %d hourlies for %s(%s)', len(
hourlies), raw_station['displayLabel'], raw_station['stationCode'])
logger.debug("fetched %d hourlies for %s(%s)", len(hourlies), raw_station["displayLabel"], raw_station["stationCode"])

return WeatherStationHourlyReadings(values=hourlies,
station=parse_station(
raw_station, eco_division))
return WeatherStationHourlyReadings(values=hourlies, station=parse_station(raw_station, eco_division))


async def fetch_access_token(session: ClientSession) -> dict:
""" Fetch an access token for WFWX Fireweather API
"""
logger.debug('fetching access token...')
password = config.get('WFWX_SECRET')
user = config.get('WFWX_USER')
auth_url = config.get('WFWX_AUTH_URL')
"""Fetch an access token for WFWX Fireweather API"""
logger.debug("fetching access token...")
password = config.get("WFWX_SECRET")
user = config.get("WFWX_USER")
auth_url = config.get("WFWX_AUTH_URL")
cache = create_redis()
# NOTE: Consider using a hashed version of the password as part of the key.
params = {'user': user}
key = f'{auth_url}?{urlencode(params)}'
params = {"user": user}
key = f"{auth_url}?{urlencode(params)}"
try:
cached_json = cache.get(key)
except Exception as error:
cached_json = None
logger.error(error, exc_info=error)
if cached_json:
logger.info('redis cache hit %s', auth_url)
logger.info("redis cache hit %s", auth_url)

Check warning on line 212 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L212

Added line #L212 was not covered by tests
response_json = json.loads(cached_json.decode())
else:
logger.info('redis cache miss %s', auth_url)
logger.info("redis cache miss %s", auth_url)
async with session.get(auth_url, auth=BasicAuth(login=user, password=password)) as response:
response_json = await response.json()
try:
if response.status == 200:
# We expire when the token expires, or 10 minutes, whichever is less.
# NOTE: only caching for 10 minutes right now, since we aren't handling cases
# where the token is invalidated.
redis_auth_cache_expiry: Final = int(config.get('REDIS_AUTH_CACHE_EXPIRY', 600))
expires = min(response_json['expires_in'], redis_auth_cache_expiry)
redis_auth_cache_expiry: Final = int(config.get("REDIS_AUTH_CACHE_EXPIRY", 600))
expires = min(response_json["expires_in"], redis_auth_cache_expiry)
cache.set(key, json.dumps(response_json).encode(), ex=expires)
except Exception as error:
logger.error(error, exc_info=error)
return response_json


async def fetch_stations_by_group_id(session: ClientSession, headers: dict, group_id: str):
logger.debug(f'Fetching stations for group {group_id}')
base_url = config.get('WFWX_BASE_URL')
url = f'{base_url}/v1/stationGroups/{group_id}/members'
logger.debug(f"Fetching stations for group {group_id}")
base_url = config.get("WFWX_BASE_URL")
url = f"{base_url}/v1/stationGroups/{group_id}/members"

Check warning on line 234 in api/app/wildfire_one/wildfire_fetchers.py

View check run for this annotation

Codecov / codecov/patch

api/app/wildfire_one/wildfire_fetchers.py#L232-L234

Added lines #L232 - L234 were not covered by tests

async with session.get(url, headers=headers) as response:
raw_stations = await response.json()
Expand Down

0 comments on commit 771ee9c

Please sign in to comment.