Skip to content
This repository was archived by the owner on Jun 2, 2025. It is now read-only.

Commit ee1396f

Browse files
authored
Merge pull request #320 from openclimatefix/pvnet_concurrent_datapipe
PVNet concurrent datapipe
2 parents 8e61130 + e79560b commit ee1396f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+1511
-661
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
"""Conversion from Xarray to NumpyBatch"""
2+
from .gsp import convert_gsp_to_numpy_batch
3+
from .nwp import convert_nwp_to_numpy_batch
4+
from .pv import convert_pv_to_numpy_batch
5+
from .satellite import convert_satellite_to_numpy_batch
6+
from .sensor import convert_sensor_to_numpy_batch
7+
from .wind import convert_wind_to_numpy_batch

ocf_datapipes/convert/numpy_batch/gsp.py

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,31 @@
99
logger = logging.getLogger(__name__)
1010

1111

12+
def convert_gsp_to_numpy_batch(xr_data):
13+
"""Convert from Xarray to NumpyBatch"""
14+
15+
example: NumpyBatch = {
16+
BatchKey.gsp: xr_data.values,
17+
BatchKey.gsp_t0_idx: xr_data.attrs["t0_idx"],
18+
BatchKey.gsp_id: xr_data.gsp_id.values,
19+
BatchKey.gsp_nominal_capacity_mwp: xr_data.isel(time_utc=0)["nominal_capacity_mwp"].values,
20+
BatchKey.gsp_effective_capacity_mwp: (
21+
xr_data.isel(time_utc=0)["effective_capacity_mwp"].values
22+
),
23+
BatchKey.gsp_time_utc: datetime64_to_float(xr_data["time_utc"].values),
24+
}
25+
26+
# Coordinates
27+
for batch_key, dataset_key in (
28+
(BatchKey.gsp_y_osgb, "y_osgb"),
29+
(BatchKey.gsp_x_osgb, "x_osgb"),
30+
):
31+
if dataset_key in xr_data.coords.keys():
32+
example[batch_key] = xr_data[dataset_key].values
33+
34+
return example
35+
36+
1237
@functional_datapipe("convert_gsp_to_numpy_batch")
1338
class ConvertGSPToNumpyBatchIterDataPipe(IterDataPipe):
1439
"""Convert GSP Xarray to NumpyBatch"""
@@ -25,29 +50,5 @@ def __init__(self, source_datapipe: IterDataPipe):
2550

2651
def __iter__(self) -> NumpyBatch:
2752
"""Convert from Xarray to NumpyBatch"""
28-
logger.debug("Converting GSP to numpy to batch")
2953
for xr_data in self.source_datapipe:
30-
example: NumpyBatch = {
31-
BatchKey.gsp: xr_data.values,
32-
BatchKey.gsp_t0_idx: xr_data.attrs["t0_idx"],
33-
BatchKey.gsp_id: xr_data.gsp_id.values,
34-
BatchKey.gsp_nominal_capacity_mwp: xr_data.isel(time_utc=0)[
35-
"nominal_capacity_mwp"
36-
].values,
37-
BatchKey.gsp_effective_capacity_mwp: (
38-
xr_data.isel(time_utc=0)["effective_capacity_mwp"].values
39-
),
40-
BatchKey.gsp_time_utc: datetime64_to_float(xr_data["time_utc"].values),
41-
}
42-
43-
# Coordinates
44-
for batch_key, dataset_key in (
45-
(BatchKey.gsp_y_osgb, "y_osgb"),
46-
(BatchKey.gsp_x_osgb, "x_osgb"),
47-
):
48-
if dataset_key in xr_data.coords.keys():
49-
values = xr_data[dataset_key].values
50-
# Expand dims so AddFourierSpaceTime works!
51-
example[batch_key] = values # np.expand_dims(values, axis=1)
52-
53-
yield example
54+
yield convert_gsp_to_numpy_batch(xr_data)

ocf_datapipes/convert/numpy_batch/nwp.py

Lines changed: 26 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,31 @@
66
from ocf_datapipes.utils.utils import datetime64_to_float
77

88

9+
def convert_nwp_to_numpy_batch(xr_data):
10+
"""Convert from Xarray to NWPBatchKey"""
11+
12+
example: NWPNumpyBatch = {
13+
NWPBatchKey.nwp: xr_data.values,
14+
NWPBatchKey.nwp_t0_idx: xr_data.attrs["t0_idx"],
15+
NWPBatchKey.nwp_channel_names: xr_data.channel.values,
16+
NWPBatchKey.nwp_init_time_utc: datetime64_to_float(xr_data.init_time_utc.values),
17+
NWPBatchKey.nwp_step: (xr_data.step.values / np.timedelta64(1, "h")).astype(np.int64),
18+
}
19+
20+
if "target_time_utc" in xr_data.coords:
21+
target_time = xr_data.target_time_utc.values
22+
example[NWPBatchKey.nwp_target_time_utc] = datetime64_to_float(target_time)
23+
24+
for batch_key, dataset_key in (
25+
(NWPBatchKey.nwp_y_osgb, "y_osgb"),
26+
(NWPBatchKey.nwp_x_osgb, "x_osgb"),
27+
):
28+
if dataset_key in xr_data.coords:
29+
example[batch_key] = xr_data[dataset_key].values
30+
31+
return example
32+
33+
934
@functional_datapipe("convert_nwp_to_numpy_batch")
1035
class ConvertNWPToNumpyBatchIterDataPipe(IterDataPipe):
1136
"""Convert NWP Xarray objects to NWPNumpyBatch"""
@@ -23,26 +48,4 @@ def __init__(self, source_datapipe: IterDataPipe):
2348
def __iter__(self) -> NWPNumpyBatch:
2449
"""Convert from Xarray to NWPBatchKey"""
2550
for xr_data in self.source_datapipe:
26-
example: NWPNumpyBatch = {
27-
NWPBatchKey.nwp: xr_data.values,
28-
NWPBatchKey.nwp_t0_idx: xr_data.attrs["t0_idx"],
29-
}
30-
if "target_time_utc" in xr_data.coords:
31-
target_time = xr_data.target_time_utc.values
32-
example[NWPBatchKey.nwp_target_time_utc] = datetime64_to_float(target_time)
33-
example[NWPBatchKey.nwp_channel_names] = xr_data.channel.values
34-
example[NWPBatchKey.nwp_step] = (xr_data.step.values / np.timedelta64(1, "h")).astype(
35-
np.int64
36-
)
37-
example[NWPBatchKey.nwp_init_time_utc] = datetime64_to_float(
38-
xr_data.init_time_utc.values
39-
)
40-
41-
for batch_key, dataset_key in (
42-
(NWPBatchKey.nwp_y_osgb, "y_osgb"),
43-
(NWPBatchKey.nwp_x_osgb, "x_osgb"),
44-
):
45-
if dataset_key in xr_data.coords.keys():
46-
example[batch_key] = xr_data[dataset_key].values
47-
48-
yield example
51+
yield convert_nwp_to_numpy_batch(xr_data)

ocf_datapipes/convert/numpy_batch/pv.py

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,23 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13+
def convert_pv_to_numpy_batch(xr_data):
14+
"""Convert PV Xarray to NumpyBatch"""
15+
example: NumpyBatch = {
16+
BatchKey.pv: xr_data.values,
17+
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
18+
BatchKey.pv_ml_id: xr_data["ml_id"].values,
19+
BatchKey.pv_id: xr_data["pv_system_id"].values.astype(np.float32),
20+
BatchKey.pv_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
21+
BatchKey.pv_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
22+
BatchKey.pv_time_utc: datetime64_to_float(xr_data["time_utc"].values),
23+
BatchKey.pv_latitude: xr_data["latitude"].values,
24+
BatchKey.pv_longitude: xr_data["longitude"].values,
25+
}
26+
27+
return example
28+
29+
1330
@functional_datapipe("convert_pv_to_numpy_batch")
1431
class ConvertPVToNumpyBatchIterDataPipe(IterDataPipe):
1532
"""Convert PV Xarray to NumpyBatch"""
@@ -25,20 +42,6 @@ def __init__(self, source_datapipe: IterDataPipe):
2542
self.source_datapipe = source_datapipe
2643

2744
def __iter__(self) -> NumpyBatch:
28-
"""Iterate and convert PV Xarray to NumpyBatch"""
45+
"""Convert PV Xarray to NumpyBatch"""
2946
for xr_data in self.source_datapipe:
30-
logger.debug("Converting PV xarray to numpy example")
31-
32-
example: NumpyBatch = {
33-
BatchKey.pv: xr_data.values,
34-
BatchKey.pv_t0_idx: xr_data.attrs["t0_idx"],
35-
BatchKey.pv_ml_id: xr_data["ml_id"].values,
36-
BatchKey.pv_id: xr_data["pv_system_id"].values.astype(np.float32),
37-
BatchKey.pv_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
38-
BatchKey.pv_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
39-
BatchKey.pv_time_utc: datetime64_to_float(xr_data["time_utc"].values),
40-
BatchKey.pv_latitude: xr_data["latitude"].values,
41-
BatchKey.pv_longitude: xr_data["longitude"].values,
42-
}
43-
44-
yield example
47+
yield convert_pv_to_numpy_batch(xr_data)

ocf_datapipes/convert/numpy_batch/satellite.py

Lines changed: 44 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,49 @@
55
from ocf_datapipes.utils.utils import datetime64_to_float
66

77

8+
def _convert_satellite_to_numpy_batch(xr_data):
9+
example: NumpyBatch = {
10+
BatchKey.satellite_actual: xr_data.values,
11+
BatchKey.satellite_t0_idx: xr_data.attrs["t0_idx"],
12+
BatchKey.satellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
13+
}
14+
15+
for batch_key, dataset_key in (
16+
(BatchKey.satellite_y_geostationary, "y_geostationary"),
17+
(BatchKey.satellite_x_geostationary, "x_geostationary"),
18+
):
19+
# HRVSatellite coords are already float32.
20+
example[batch_key] = xr_data[dataset_key].values
21+
22+
return example
23+
24+
25+
def _convert_hrvsatellite_to_numpy_batch(xr_data):
26+
example: NumpyBatch = {
27+
BatchKey.hrvsatellite_actual: xr_data.values,
28+
BatchKey.hrvsatellite_t0_idx: xr_data.attrs["t0_idx"],
29+
BatchKey.hrvsatellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
30+
}
31+
32+
for batch_key, dataset_key in (
33+
(BatchKey.hrvsatellite_y_geostationary, "y_geostationary"),
34+
(BatchKey.hrvsatellite_x_geostationary, "x_geostationary"),
35+
):
36+
# Satellite coords are already float32.
37+
example[batch_key] = xr_data[dataset_key].values
38+
39+
return example
40+
41+
42+
def convert_satellite_to_numpy_batch(xr_data, is_hrv=False):
43+
"""Converts Xarray Satellite to NumpyBatch object"""
44+
if is_hrv:
45+
example = _convert_hrvsatellite_to_numpy_batch(xr_data)
46+
else:
47+
example = _convert_satellite_to_numpy_batch(xr_data)
48+
return example
49+
50+
851
@functional_datapipe("convert_satellite_to_numpy_batch")
952
class ConvertSatelliteToNumpyBatchIterDataPipe(IterDataPipe):
1053
"""Converts Xarray Satellite to NumpyBatch object"""
@@ -24,31 +67,4 @@ def __init__(self, source_datapipe: IterDataPipe, is_hrv: bool = False):
2467
def __iter__(self) -> NumpyBatch:
2568
"""Convert each example to a NumpyBatch object"""
2669
for xr_data in self.source_datapipe:
27-
if self.is_hrv:
28-
example: NumpyBatch = {
29-
BatchKey.hrvsatellite_actual: xr_data.values,
30-
BatchKey.hrvsatellite_t0_idx: xr_data.attrs["t0_idx"],
31-
BatchKey.hrvsatellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
32-
}
33-
34-
for batch_key, dataset_key in (
35-
(BatchKey.hrvsatellite_y_geostationary, "y_geostationary"),
36-
(BatchKey.hrvsatellite_x_geostationary, "x_geostationary"),
37-
):
38-
# HRVSatellite coords are already float32.
39-
example[batch_key] = xr_data[dataset_key].values
40-
else:
41-
example: NumpyBatch = {
42-
BatchKey.satellite_actual: xr_data.values,
43-
BatchKey.satellite_t0_idx: xr_data.attrs["t0_idx"],
44-
BatchKey.satellite_time_utc: datetime64_to_float(xr_data["time_utc"].values),
45-
}
46-
47-
for batch_key, dataset_key in (
48-
(BatchKey.satellite_y_geostationary, "y_geostationary"),
49-
(BatchKey.satellite_x_geostationary, "x_geostationary"),
50-
):
51-
# HRVSatellite coords are already float32.
52-
example[batch_key] = xr_data[dataset_key].values
53-
54-
yield example
70+
yield convert_satellite_to_numpy_batch(xr_data, self.is_hrv)

ocf_datapipes/convert/numpy_batch/sensor.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,34 +10,37 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13+
def convert_sensor_to_numpy_batch(xr_data):
14+
"""Convert Sensor Xarray to NumpyBatch"""
15+
16+
example: NumpyBatch = {
17+
BatchKey.sensor: xr_data.values,
18+
BatchKey.sensor_t0_idx: xr_data.attrs["t0_idx"],
19+
BatchKey.sensor_id: xr_data["station_id"].values.astype(np.float32),
20+
# BatchKey.sensor_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
21+
# BatchKey.sensor_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
22+
BatchKey.sensor_time_utc: datetime64_to_float(xr_data["time_utc"].values),
23+
BatchKey.sensor_latitude: xr_data["latitude"].values,
24+
BatchKey.sensor_longitude: xr_data["longitude"].values,
25+
}
26+
return example
27+
28+
1329
@functional_datapipe("convert_sensor_to_numpy_batch")
1430
class ConvertSensorToNumpyBatchIterDataPipe(IterDataPipe):
1531
"""Convert Sensor Xarray to NumpyBatch"""
1632

1733
def __init__(self, source_datapipe: IterDataPipe):
1834
"""
19-
Convert PV Xarray objects to NumpyBatch objects
35+
Convert sensor Xarray objects to NumpyBatch objects
2036
2137
Args:
22-
source_datapipe: Datapipe emitting PV Xarray objects
38+
source_datapipe: Datapipe emitting sensor Xarray objects
2339
"""
2440
super().__init__()
2541
self.source_datapipe = source_datapipe
2642

2743
def __iter__(self) -> NumpyBatch:
28-
"""Iterate and convert PV Xarray to NumpyBatch"""
44+
"""Iterate and convert sensor Xarray to NumpyBatch"""
2945
for xr_data in self.source_datapipe:
30-
logger.debug("Converting Sensor xarray to numpy example")
31-
32-
example: NumpyBatch = {
33-
BatchKey.sensor: xr_data.values,
34-
BatchKey.sensor_t0_idx: xr_data.attrs["t0_idx"],
35-
BatchKey.sensor_id: xr_data["station_id"].values.astype(np.float32),
36-
# BatchKey.sensor_observed_capacity_wp: (xr_data["observed_capacity_wp"].values),
37-
# BatchKey.sensor_nominal_capacity_wp: (xr_data["nominal_capacity_wp"].values),
38-
BatchKey.sensor_time_utc: datetime64_to_float(xr_data["time_utc"].values),
39-
BatchKey.sensor_latitude: xr_data["latitude"].values,
40-
BatchKey.sensor_longitude: xr_data["longitude"].values,
41-
}
42-
43-
yield example
46+
yield convert_sensor_to_numpy_batch(xr_data)

ocf_datapipes/convert/numpy_batch/wind.py

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,24 @@
1010
logger = logging.getLogger(__name__)
1111

1212

13+
def convert_wind_to_numpy_batch(xr_data):
14+
"""Convert Wind Xarray to NumpyBatch"""
15+
16+
example: NumpyBatch = {
17+
BatchKey.wind: xr_data.values,
18+
BatchKey.wind_t0_idx: xr_data.attrs["t0_idx"],
19+
BatchKey.wind_ml_id: xr_data["ml_id"].values,
20+
BatchKey.wind_id: xr_data["wind_system_id"].values.astype(np.float32),
21+
BatchKey.wind_observed_capacity_mwp: (xr_data["observed_capacity_mwp"].values),
22+
BatchKey.wind_nominal_capacity_mwp: (xr_data["nominal_capacity_mwp"].values),
23+
BatchKey.wind_time_utc: datetime64_to_float(xr_data["time_utc"].values),
24+
BatchKey.wind_latitude: xr_data["latitude"].values,
25+
BatchKey.wind_longitude: xr_data["longitude"].values,
26+
}
27+
28+
return example
29+
30+
1331
@functional_datapipe("convert_wind_to_numpy_batch")
1432
class ConvertWindToNumpyBatchIterDataPipe(IterDataPipe):
1533
"""Convert Wind Xarray to NumpyBatch"""
@@ -27,18 +45,4 @@ def __init__(self, source_datapipe: IterDataPipe):
2745
def __iter__(self) -> NumpyBatch:
2846
"""Iterate and convert PV Xarray to NumpyBatch"""
2947
for xr_data in self.source_datapipe:
30-
logger.debug("Converting Wind xarray to numpy example")
31-
32-
example: NumpyBatch = {
33-
BatchKey.wind: xr_data.values,
34-
BatchKey.wind_t0_idx: xr_data.attrs["t0_idx"],
35-
BatchKey.wind_ml_id: xr_data["ml_id"].values,
36-
BatchKey.wind_id: xr_data["wind_system_id"].values.astype(np.float32),
37-
BatchKey.wind_observed_capacity_mwp: (xr_data["observed_capacity_mwp"].values),
38-
BatchKey.wind_nominal_capacity_mwp: (xr_data["nominal_capacity_mwp"].values),
39-
BatchKey.wind_time_utc: datetime64_to_float(xr_data["time_utc"].values),
40-
BatchKey.wind_latitude: xr_data["latitude"].values,
41-
BatchKey.wind_longitude: xr_data["longitude"].values,
42-
}
43-
44-
yield example
48+
yield convert_wind_to_numpy_batch(xr_data)

0 commit comments

Comments
 (0)