-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdownsample.py
85 lines (66 loc) · 3.11 KB
/
downsample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import argparse
import numpy as np
import rasterio
from rasterio.windows import Window
from rasterio.transform import Affine
from tqdm import tqdm
import enum
class PixelClass(enum.Enum):
FOREST_COVER = "forest_cover"
DEFORESTATION = "deforestation"
def aggregate_raster(raster_file, output_file, window_size, pixel_class: PixelClass):
"""
Aggregate pixel class data into grids in a raster image.
Args:
raster_file (str): Path to the raster image file (.tif).
output_file (str): Path to the output raster image file (.tif) to store the aggregated data.
window_size (int): Size of the window in pixels used for aggregation.
pixel_class (PixelClass): The pixel class to aggregate.
Returns:
None
Raises:
FileNotFoundError: If the raster file does not exist.
"""
mask_factories = {
PixelClass.FOREST_COVER: lambda data: (200 <= data) & (data < 400),
PixelClass.DEFORESTATION: lambda data: ((400 <= data) & (data < 500)) | ((600 <= data) & (data < 700)),
}
generate_mask = mask_factories[pixel_class]
# Open the raster file
with rasterio.open(raster_file) as src:
# Calculate the number of windows in each dimension
n_windows_x = src.width // window_size
n_windows_y = src.height // window_size
aggregated_data = np.zeros((n_windows_y, n_windows_x), dtype='uint32')
# Loop over the windows in the raster
for wx in tqdm(range(n_windows_x), desc=f"Processing file: {raster_file}"):
for wy in range(n_windows_y):
# Read a window from the raster
window = Window(wx * window_size, wy * window_size, window_size, window_size)
data = src.read(1, window=window)
# Generate mask according to pixel class
mask = generate_mask(data)
# Sum up the pixel class
aggregated_data[wy, wx] = np.sum(mask)
# Update the transformation
transform = src.transform * Affine.scale(window_size)
# Write the aggregated data to a new raster file
with rasterio.open(output_file, 'w', driver='GTiff', height=aggregated_data.shape[0],
width=aggregated_data.shape[1], count=1, dtype=aggregated_data.dtype,
crs=src.crs, transform=transform) as dst:
dst.write(aggregated_data, 1)
def main():
parser = argparse.ArgumentParser(description='Aggregate pixel class data into grids.')
parser.add_argument('input_file', type=str, help='Path to the input .tif file.')
parser.add_argument('output_file', type=str, help='Path to the output .tif file.')
parser.add_argument('--grid_size', type=int, default=200, help='Grid size in pixels. Default is 200 (for a 6km grid with 30m pixels).')
parser.add_argument(
'--pixel-class', required=True, type=str, choices=[x.value for x in PixelClass],
help='Pixel class to aggregate.'
)
args = parser.parse_args()
aggregate_raster(
args.input_file, args.output_file, args.grid_size, PixelClass(args.pixel_class)
)
if __name__ == '__main__':
main()