Skip to content

Commit ae62a58

Browse files
author
Naman Jain
committed
add dockerfile; some other minor tweaks to entrypoint
1 parent 6c13ce2 commit ae62a58

File tree

10 files changed

+152
-12
lines changed

10 files changed

+152
-12
lines changed

.dockerignore

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# Ignore Python cache files
2+
__pycache__/
3+
*.pyc
4+
*.pyo
5+
6+
# Ignore virtual environments
7+
venv/
8+
.env/
9+
10+
# Ignore data directories
11+
data/
12+
output/
13+
14+
# Ignore other unnecessary files
15+
*.log
16+
*.tmp
17+
*.bak

.github/workflows/ci.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
# .github/workflows/ci.yml
21
name: CI
32

43
on:

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ repos:
1717
name: flake8
1818
entry: poetry run flake8
1919
language: system
20-
types: [python]
20+
types: [python]

Dockerfile

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
FROM python:3.12-slim
2+
3+
WORKDIR /app
4+
5+
COPY pyproject.toml poetry.lock ./
6+
7+
RUN pip install poetry
8+
9+
RUN poetry install --no-dev
10+
11+
COPY . .
12+
13+
ENV PYTHONPATH=/app
14+
15+
ENTRYPOINT ["poetry", "run", "python", "src/__main__.py"]

README.md

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,21 @@
1-
# Jua Data Engineer Assignment
1+
# Data Transformation pipeline: NetCDF to Parquet
22

33

44
### Task
5-
Create a pipeline for transforming 2022 total precipitation NetCDF files into Parquet format. <br/>
5+
This pipeline transforms total precipitation NetCDF files into Parquet format. <br/>
66
The source data is publicly available and is hosted at `gs://gcp-public-data-arco-era5/raw/date-variable-single_level`.
77

8-
### Requirements
9-
- The transformed data should support regular queries with filtering with timestamp.
10-
- The transformed data should support filtering by H3 geospatial index
8+
The transformed data supports:
9+
- regular queries with filtering with timestamp.
10+
- filtering by H3 geospatial index
1111

12-
<br/><br/>
13-
# Solution
14-
The solution has been implemented in python.
12+
## How to run
13+
### TODO: update instructions
14+
Build the Docker image
15+
`docker build -t data_transformation .`
16+
17+
Run the Docker container
18+
`docker run -v $(pwd)/output:/app/output data_transformation 01-01-2022 02-01-2022 out_dir`
1519

1620
### Features
1721
- Poetry for dependency management - Poetry makes life easier for managing dependencies and creating environments. To setup a virtual environment, simply run `poetry install` from the root directory. It will create a virtual environment for you. To spawn a shell, run `poetry shell`. Note that you need to have Poetry pre-installed in your system. To install Poetry, follow the steps listed [here](https://python-poetry.org/docs/#installation).
@@ -45,3 +49,9 @@ Note that some of the tests are live tests that actually download a file from GC
4549
- OOP?
4650
- Package it into a library
4751
- setup GitHub actions
52+
- add coverage report?
53+
- add doc strings
54+
- add exception handling and logs
55+
- add tqdm or some other progress bar
56+
57+
## Improvements

poetry.lock

Lines changed: 21 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ gcsfs = "^2024.6.1"
1515
scipy = "^1.14.1"
1616
netcdf4 = "^1.7.1.post2"
1717
h3 = "^3.7.7"
18+
tqdm = "^4.66.5"
1819

1920
[tool.poetry.group.dev.dependencies]
2021
black = "^24.8.0"

src/__main__.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import argparse
2+
import datetime
3+
import logging
4+
import pathlib
5+
6+
import gcsfs
7+
import tqdm
8+
9+
from src import constants, process_data, utils
10+
11+
logging.basicConfig(
12+
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
13+
)
14+
15+
file_system = utils.initialize_gcsfs()
16+
17+
18+
def netcdf_to_parquet(
19+
file_path: str, output_path: str, file_system: gcsfs.GCSFileSystem
20+
):
21+
dataframe = process_data.netcdf_to_dataframe(file_path, file_system)
22+
dataframe = process_data.add_h3_index(dataframe)
23+
process_data.save_dataframe_as_parquet(dataframe, output_path)
24+
25+
26+
parser = argparse.ArgumentParser(
27+
description="Process netCDF files and save as Parquet."
28+
)
29+
parser.add_argument(
30+
"start_date",
31+
type=lambda d: datetime.datetime.strptime(d, "%d-%m-%Y").date(),
32+
help="Start date in DD-MM-YYYY format",
33+
)
34+
parser.add_argument(
35+
"end_date",
36+
type=lambda d: datetime.datetime.strptime(d, "%d-%m-%Y").date(),
37+
help="End date in DD-MM-YYYY format",
38+
)
39+
parser.add_argument(
40+
"out_dir", type=str, help="Output directory for Parquet files"
41+
)
42+
43+
args = parser.parse_args()
44+
pathlib.Path(args.out_dir).mkdir(parents=True, exist_ok=True)
45+
46+
current_date = args.start_date
47+
total_days = (args.end_date - current_date).days + 1
48+
49+
for _ in tqdm.tqdm(range(total_days), desc="Processing dates"):
50+
try:
51+
date_str = current_date.strftime("%Y/%m/%d")
52+
file_path = (
53+
f"{constants.GCS_BASE_URL}/{date_str}/total_precipitation/surface.nc"
54+
)
55+
output_path = (
56+
f"{args.out_dir}/precipitation_{current_date.strftime('%d_%m_%Y')}.parquet"
57+
)
58+
netcdf_to_parquet(file_path, output_path, file_system)
59+
except Exception as e:
60+
logging.error(
61+
f"Failed to process date {current_date.strftime('%d_%m_%Y')}: {e}"
62+
)
63+
finally:
64+
current_date += datetime.timedelta(days=1)

src/process_data.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
import logging
2+
13
import gcsfs
24
import h3
35
import pandas as pd
46
import xarray as xr
57

68
from . import utils
79

10+
logging.basicConfig(
11+
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
12+
)
13+
814

915
def netcdf_to_dataframe(
1016
file_path: str, file_system: gcsfs.GCSFileSystem
1117
) -> pd.DataFrame:
18+
logging.info("Streaming data from GCS bucket for %s", file_path)
1219
with utils.open_file(file_path, file_system, mode="rb") as f:
1320
return xr.open_dataset(f).to_dataframe().reset_index()
1421

src/utils.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1+
import logging
2+
13
import gcsfs
24

5+
logging.basicConfig(
6+
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
7+
)
8+
39

410
def initialize_gcsfs():
5-
return gcsfs.GCSFileSystem()
11+
logging.info("Initializing GCS file system")
12+
return gcsfs.GCSFileSystem(token="anon")
613

714

815
def open_file(

0 commit comments

Comments
 (0)