Skip to content

Commit

Permalink
Add s3 retry config and refactor s3_client creation as util
Browse files Browse the repository at this point in the history
  • Loading branch information
tim1234ltp committed Oct 11, 2024
1 parent fc93626 commit 2b9e0ad
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 16 deletions.
6 changes: 5 additions & 1 deletion config/global_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
"s3_bucket": {
"uri": "dataclinic-gtfs-rt",
"public_key": "",
"secret_key": ""
"secret_key": "",
"retries": {
"max_attempts": 3,
"mode": "standard"
}
},
"mobilitydatabase": {
"url": "https://api.mobilitydatabase.org/v1",
Expand Down
13 changes: 8 additions & 5 deletions src/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import pyarrow as pa
import s3fs
import structlog
from botocore.config import Config
from dateutil import parser

from gtfs_realtime_pb2 import FeedMessage
from src.normalize.parquet_utils import write_data, add_time_columns
from src.normalize.protobuf_utils import protobuf_objects_to_pyarrow_table
from src.util.s3_client import create_s3_client

structlog.configure(
processors=[
Expand All @@ -33,6 +35,10 @@ def check_config(config: dict):
assert config["s3_bucket"]["uri"]
assert config["s3_bucket"]["public_key"]
assert config["s3_bucket"]["secret_key"]
retries_config = config["s3_bucket"].get("retries")
if retries_config:
assert retries_config["mode"], ("mode must be specified for enabling "
"retry")


def load_config(path: str):
Expand Down Expand Up @@ -282,11 +288,8 @@ def main(
for key, val in config.get("normalize_argv_override", {}).items():
LOGGER.info(f"Overriding argv {key}={val}")
exec(key + f"={val}")
s3 = boto3.client(
"s3",
aws_access_key_id=config["s3_bucket"]["public_key"],
aws_secret_access_key=config["s3_bucket"]["secret_key"],
)

s3 = create_s3_client(config["s3_bucket"])
s3_fs = s3fs.S3FileSystem(
key=config["s3_bucket"]["public_key"],
secret=config["s3_bucket"]["secret_key"]
Expand Down
17 changes: 7 additions & 10 deletions src/scraper/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from datetime import datetime

import boto3
from botocore.config import Config
import click
import structlog

from src.scraper.mobilitydatabase import get_access_token, get_feed_json
from src.util.s3_client import create_s3_client

structlog.configure(
processors=[
Expand All @@ -29,6 +31,10 @@ def check_config(config: dict):
assert config["s3_bucket"]["uri"]
assert config["s3_bucket"]["public_key"]
assert config["s3_bucket"]["secret_key"]
retries_config = config["s3_bucket"].get("retries")
if retries_config:
assert retries_config["mode"], ("mode must be specified for enabling "
"retry")

assert config["mobilitydatabase"]["url"]
assert config["mobilitydatabase"]["token"]
Expand All @@ -41,15 +47,6 @@ def load_config(path: str):
return config


def create_s3_client(s3_config: dict):
session = boto3.Session(
aws_access_key_id=s3_config["public_key"],
aws_secret_access_key=s3_config["secret_key"],
)
s3 = session.resource("s3").Bucket(s3_config["uri"])
return s3


def scrape_loop(s3_client, feed_id: str):
module = importlib.import_module(f"src.scraper.feeds.{feed_id}")
feed_class = getattr(module, feed_id.upper().replace("-", "_"))
Expand Down Expand Up @@ -107,7 +104,7 @@ def main(feed_id, config_path):
config["mobilitydatabase"]["url"],
config["mobilitydatabase"]["token"],
)
s3 = create_s3_client(config["s3_bucket"])
s3 = create_s3_client(config["s3_bucket"]).resource("s3").Bucket(config["s3_bucket"]["uri"])
scrape_loop(s3, feed_id)


Expand Down
Empty file added src/util/__init__.py
Empty file.
22 changes: 22 additions & 0 deletions src/util/s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import boto3
from botocore.config import Config


def create_s3_client(s3_config: dict):
retries_config = s3_config.get("retries")
s3 = boto3.client(
"s3",
aws_access_key_id=s3_config["public_key"],
aws_secret_access_key=s3_config["secret_key"],
)
if retries_config:
config = Config(
retries=retries_config
)
s3 = boto3.client(
"s3",
aws_access_key_id=s3_config["public_key"],
aws_secret_access_key=s3_config["secret_key"],
config=config
)
return s3

0 comments on commit 2b9e0ad

Please sign in to comment.