Skip to content

Commit b2f1cf2

Browse files
committed
create_test_data
1 parent 704340f commit b2f1cf2

File tree

2 files changed

+223
-0
lines changed

2 files changed

+223
-0
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ dependencies = [
1111
"mypy_boto3_s3",
1212
"moto",
1313
"polars",
14+
"s3fs",
1415
"tenacity",
1516
]
1617
name = "dri-utils"
Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
"""Script to generate test cosmos data.
2+
3+
Currently used for benchmarking duckdb queries.
4+
5+
Data created per minute for user defined sites and date range.
6+
7+
Can be exported into three different s3 bucket structures:
8+
9+
1) Original format (no partitioning): /YYYY-MM/YYYY-MM-DD.parquet
10+
2) Current format (partitioned by date): /date=YYYY-MM-DD/data.parquet
11+
3) Proposed format (partitioned by date and site): /sire=site/date=YYYY-MM-DD/data.parquet
12+
13+
As discussed, use case for loading from multiple dataset types
14+
(precip, soilmet) unlikely due to different resolutions.
15+
So assuming we will just be querying one dataset at a time.
16+
17+
Notes:
18+
19+
You need to have an aws-vault session running to connect to s3
20+
You (might) need extended permissions to write the test data to s3.
21+
"""
22+
23+
import datetime
24+
import random
25+
from datetime import date, datetime, timedelta
26+
from typing import Optional, Tuple, Union
27+
28+
import duckdb
29+
import polars as pl
30+
import s3fs
31+
32+
33+
def steralize_dates(
34+
start_date: Union[date, datetime], end_date: Optional[Union[date, datetime]]
35+
) -> Tuple[Union[date, datetime], datetime]:
36+
"""
37+
Configures and validates start and end dates.
38+
39+
Args:
40+
start_date: The start date.
41+
end_date: The end date.
42+
43+
Returns:
44+
A tuple containing the start date and the end date.
45+
46+
Raises:
47+
UserWarning: If the start date is after the end date.
48+
"""
49+
# Ensure the start_date is not after the end_date
50+
if start_date > end_date:
51+
raise UserWarning(f"Start date must come before end date: {start_date} > {end_date}")
52+
53+
# If start_date is of type date, convert it to datetime with time at start of the day
54+
if isinstance(start_date, date):
55+
start_date = datetime.combine(start_date, datetime.min.time())
56+
57+
# If end_date is of type date, convert it to datetime to include the entire day
58+
if isinstance(end_date, date):
59+
end_date = datetime.combine(end_date, datetime.max.time())
60+
61+
return start_date, end_date
62+
63+
64+
def write_parquet_s3(bucket: str, key: str, data: pl.DataFrame) -> None:
65+
# Write parquet to s3
66+
fs = s3fs.S3FileSystem()
67+
destination = f"s3://{bucket}/{key}"
68+
# with fs.open(destination, mode="wb") as f:
69+
# data.write_parquet(f)
70+
71+
72+
def build_test_precip_data(
73+
start_date: date, end_date: date, interval: timedelta, sites: list, schema: pl.Schema
74+
) -> pl.DataFrame:
75+
"""
76+
Builds test cosmos data.
77+
78+
For each site, and for each datetime object at the specified interval between
79+
the start and end date, random data is generated. The dataframe is initialised with
80+
the supplied schema, which is taken from the dataset for which you want to create
81+
test data.
82+
83+
Args:
84+
start_date: The start date.
85+
end_date: The end date.
86+
interval: Interval to seperate datetime objects between the start and end date
87+
sites: cosmos sites
88+
schema: required schema
89+
90+
Returns:
91+
A dataframe of random test data.
92+
"""
93+
# Create empty dataframe with the required schema
94+
test_data = pl.DataFrame(schema=schema)
95+
96+
# Format dates
97+
start_date, end_date = steralize_dates(start_date, end_date)
98+
99+
# Build datetime range series
100+
datetime_range = pl.datetime_range(start_date, end_date, interval, eager=True).alias("time")
101+
102+
# Attach each datetime to each site
103+
array = {"time": [], "SITE_ID": []}
104+
105+
for site in sites:
106+
array["time"].append(datetime_range)
107+
array["SITE_ID"].append(site)
108+
109+
date_site_data = pl.DataFrame(array).explode("time")
110+
111+
test_data = pl.concat([test_data, date_site_data], how="diagonal")
112+
113+
# Number of required rows
114+
required_rows = test_data.select(pl.len()).item()
115+
116+
# Update rest of the columns with random values
117+
# Remove cols already generated
118+
schema.pop("time")
119+
schema.pop("SITE_ID")
120+
121+
for column, dtype in schema.items():
122+
if isinstance(dtype, pl.Float64):
123+
col_values = pl.Series(column, [random.uniform(1, 50) for i in range(required_rows)])
124+
125+
if isinstance(dtype, pl.Int64):
126+
col_values = pl.Series(column, [random.randrange(1, 255, 1) for i in range(required_rows)])
127+
128+
test_data.replace_column(test_data.get_column_index(column), col_values)
129+
130+
return test_data
131+
132+
133+
def export_test_data(bucket: str, data: pl.DataFrame, structure: str = "partitioned_date") -> None:
134+
"""Export the test data.
135+
136+
Data can be exported to various s3 structures:
137+
138+
'date': cosmos/dataset_type/YYYY-MM/YYYY-MM-DD.parquet (original format)
139+
'date_partitioned': cosmos/dataset_type/date=YYYY-MM-DD/data.parquet (current format)
140+
'date_site_partitioned': cosmos/dataset_type/site=site/date=YYYY-MM-DD/data.parquet (proposed format)
141+
142+
Args:
143+
bucket: Name of the s3 bucket
144+
data: Test data to be exported
145+
structure: s3 structure. Defaults to date_partitioned (current structure)
146+
147+
Raises:
148+
ValueError if invalid structure string is provided.
149+
"""
150+
# Save out in required structure
151+
# Validate user input
152+
valid_structures = ["date", "partitioned_date", "partitioned_date_site"]
153+
if structure not in valid_structures:
154+
raise ValueError(f"Incorrect structure arguement entered; should be one of {valid_structures}")
155+
156+
groups = [(group[0][0], group[1]) for group in data.group_by(pl.col("time").dt.date())]
157+
158+
for date_obj, df in groups:
159+
if structure == "date":
160+
day = date_obj.strftime("%Y-%m-%d")
161+
month = date_obj.strftime("%Y-%m")
162+
key = f"cosmos/dataset=PRECIP_1MIN_2024_LOOPED/{month}/{day}.parquet"
163+
164+
print(df)
165+
166+
write_parquet_s3(bucket, key, df)
167+
168+
if structure == "partitioned_date":
169+
day = date_obj.strftime("%Y-%m-%d")
170+
key = f"cosmos/dataset=PRECIP_1MIN_2024_LOOPED/date={day}/data.parquet"
171+
172+
print(df)
173+
174+
write_parquet_s3(bucket, key, df)
175+
176+
if structure == "partitioned_date_site":
177+
groups = [(group[0][0], group[1]) for group in df.group_by(pl.col("SITE_ID"))]
178+
179+
for site, site_df in groups:
180+
day = date_obj.strftime("%Y-%m-%d")
181+
key = f"cosmos/dataset=PRECIP_1MIN_2024_LOOPED/site={site}/date={day}/data.parquet"
182+
183+
print(site_df)
184+
185+
write_parquet_s3(bucket, key, site_df)
186+
187+
188+
if __name__ == "__main__":
189+
# Setup basic duckdb connection
190+
conn = duckdb.connect()
191+
192+
conn.execute("""
193+
INSTALL httpfs;
194+
LOAD httpfs;
195+
SET force_download = true;
196+
SET enable_profiling = query_tree;
197+
""")
198+
199+
# Add s3 connection details
200+
conn.execute("""
201+
CREATE SECRET aws_secret (
202+
TYPE S3,
203+
PROVIDER CREDENTIAL_CHAIN,
204+
CHAIN 'sts'
205+
);
206+
""")
207+
208+
# Load single file to get list of unique sites, and the dataset schema
209+
bucket = "ukceh-fdri-staging-timeseries-level-0"
210+
key = "cosmos/dataset=PRECIP_1MIN_2024_LOOPED/date=2024-01-01/*.parquet"
211+
212+
query = f"""SELECT * FROM read_parquet('s3://{bucket}/{key}', hive_partitioning=false)"""
213+
df = conn.execute(query).pl()
214+
215+
sites = set(df.get_column("SITE_ID"))
216+
schema = df.schema
217+
218+
# Build test data
219+
test_data = build_test_precip_data(date(2024, 3, 28), date(2024, 3, 29), timedelta(minutes=1), sites, schema)
220+
221+
# Export test data based on required s3 structure
222+
export_test_data(bucket, test_data, "partitioned_date_site")

0 commit comments

Comments
 (0)