37
37
) # This is to filter out the polyfill warnings when rows failed to get indexed at a resolution, can be commented out to find missing rows
38
38
39
39
40
+ DEFAULT_PARENT_OFFSET = 6
41
+
42
+
43
+ class ParentResolutionException (Exception ):
44
+ pass
45
+
46
+
47
+ def _get_parent_res (parent_res : Union [None , int ], resolution : int ):
48
+ """
49
+ Uses a parent resolution,
50
+ OR,
51
+ Given a target resolution, returns our recommended parent resolution.
52
+
53
+ Used for intermediate re-partioning.
54
+ """
55
+ return (
56
+ int (parent_res )
57
+ if parent_res is not None
58
+ else max (MIN_H3 , (resolution - DEFAULT_PARENT_OFFSET ))
59
+ )
60
+
61
+
40
62
def polyfill (
41
- pq_in : Path , spatial_sort_col : str , resolution : int , output_directory : str
63
+ pq_in : Path ,
64
+ spatial_sort_col : str ,
65
+ resolution : int ,
66
+ parent_res : Union [None , int ],
67
+ output_directory : str ,
42
68
) -> None :
43
69
"""
44
70
Reads a geoparquet, performs H3 polyfilling,
@@ -52,6 +78,9 @@ def polyfill(
52
78
)
53
79
df = pd .DataFrame (df ).drop (columns = ["index" , "geometry" ])
54
80
df .index .rename (f"h3_{ resolution :02} " , inplace = True )
81
+ parent_res : int = _get_parent_res (parent_res , resolution )
82
+ # Secondary (parent) H3 index, used later for partitioning
83
+ df = df .h3 .h3_to_parent (parent_res ).reset_index ()
55
84
df .to_parquet (
56
85
PurePath (output_directory , pq_in .name ),
57
86
engine = "auto" ,
@@ -60,14 +89,59 @@ def polyfill(
60
89
return None
61
90
62
91
63
- def polyfill_star (args ):
92
+ def polyfill_star (args ) -> None :
64
93
return polyfill (* args )
65
94
66
95
96
+ def _parent_partitioning (
97
+ input_dir : Path ,
98
+ output_dir : Path ,
99
+ resolution ,
100
+ parent_res : Union [None , int ],
101
+ ** kwargs ,
102
+ ) -> Path :
103
+ parent_res : int = _get_parent_res (parent_res , resolution )
104
+ with TqdmCallback (desc = "Reading spatial partitions" ):
105
+ # Set index as parent cell
106
+ ddf = dd .read_parquet (input_dir , engine = "pyarrow" ).set_index (
107
+ f"h3_{ parent_res :02} "
108
+ )
109
+ with TqdmCallback (desc = "Counting parents" ):
110
+ # Count parents, to get target number of partitions
111
+ uniqueh3 = sorted (list (ddf .index .unique ().compute ()))
112
+
113
+ LOGGER .debug (
114
+ "Repartitioning into %d partitions, based on parent cells" ,
115
+ len (uniqueh3 ) + 1 ,
116
+ )
117
+
118
+ with TqdmCallback (desc = "Repartitioning" ):
119
+ ddf = (
120
+ ddf .repartition ( # See "notes" on why divisions expects repetition of the last item https://docs.dask.org/en/stable/generated/dask.dataframe.DataFrame.repartition.html
121
+ divisions = (uniqueh3 + [uniqueh3 [- 1 ]]), force = True
122
+ )
123
+ .reset_index ()
124
+ .set_index (f"h3_{ resolution :02} " )
125
+ .drop (columns = [f"h3_{ parent_res :02} " ])
126
+ .to_parquet (
127
+ output_dir ,
128
+ overwrite = kwargs .get ("overwrite" , False ),
129
+ engine = kwargs .get ("engine" , "pyarrow" ),
130
+ write_index = True ,
131
+ # append=False,
132
+ name_function = lambda i : f"{ uniqueh3 [i ]} .parquet" ,
133
+ compression = kwargs .get ("compression" , "ZSTD" ),
134
+ )
135
+ )
136
+ LOGGER .debug ("Parent cell repartitioning complete" )
137
+ return output_dir
138
+
139
+
67
140
def _index (
68
141
input_file : Union [Path , str ],
69
142
output_directory : Union [Path , str ],
70
143
resolution : int ,
144
+ parent_res : Union [None , int ],
71
145
keep_attributes : bool ,
72
146
npartitions : int ,
73
147
spatial_sorting : str ,
@@ -78,6 +152,7 @@ def _index(
78
152
con : Union [sqlalchemy .engine .Connection , sqlalchemy .engine .Engine ] = None ,
79
153
table : str = None ,
80
154
geom_col : str = "geom" ,
155
+ overwrite : bool = False ,
81
156
) -> Path :
82
157
"""
83
158
Performs multi-threaded H3 polyfilling on (multi)polygons.
@@ -138,7 +213,7 @@ def _index(
138
213
else f"{ spatial_sorting } _distance"
139
214
)
140
215
141
- with tempfile .TemporaryDirectory () as tmpdir :
216
+ with tempfile .TemporaryDirectory (suffix = ".parquet" ) as tmpdir :
142
217
with TqdmCallback ():
143
218
ddf .to_parquet (tmpdir , overwrite = True )
144
219
@@ -149,12 +224,19 @@ def _index(
149
224
"H3 Indexing on spatial partitions by polyfill with H3 resolution: %d" ,
150
225
resolution ,
151
226
)
152
- with Pool (processes = processes ) as pool :
153
- args = [
154
- (filepath , spatial_sort_col , resolution , output_directory )
155
- for filepath in filepaths
156
- ]
157
- list (tqdm (pool .imap (polyfill_star , args ), total = len (args )))
227
+ with tempfile .TemporaryDirectory (suffix = ".parquet" ) as tmpdir2 :
228
+ with Pool (processes = processes ) as pool :
229
+ args = [
230
+ (filepath , spatial_sort_col , resolution , parent_res , tmpdir2 )
231
+ for filepath in filepaths
232
+ ]
233
+ list (tqdm (pool .imap (polyfill_star , args ), total = len (args )))
234
+
235
+ output_directory = _parent_partitioning (
236
+ tmpdir2 , output_directory , resolution , parent_res , overwrite = overwrite
237
+ )
238
+
239
+ return output_directory
158
240
159
241
160
242
@click .command (context_settings = {"show_default" : True })
@@ -169,6 +251,13 @@ def _index(
169
251
help = "H3 resolution to index" ,
170
252
nargs = 1 ,
171
253
)
254
+ @click .option (
255
+ "-pr" ,
256
+ "--parent_res" ,
257
+ required = False ,
258
+ type = click .Choice (list (map (str , range (MIN_H3 , MAX_H3 + 1 )))),
259
+ help = "H3 Parent resolution for the output partition. Defaults to resolution - 6" ,
260
+ )
172
261
@click .option (
173
262
"-id" ,
174
263
"--id_field" ,
@@ -253,6 +342,7 @@ def h3(
253
342
vector_input : Union [str , Path ],
254
343
output_directory : Union [str , Path ],
255
344
resolution : str ,
345
+ parent_res : str ,
256
346
id_field : str ,
257
347
keep_attributes : bool ,
258
348
partitions : int ,
@@ -270,6 +360,12 @@ def h3(
270
360
VECTOR_INPUT is the path to input vector geospatial data.
271
361
OUTPUT_DIRECTORY should be a directory, not a file or database table, as it will instead be the write location for an Apache Parquet data store.
272
362
"""
363
+ if parent_res is not None and not int (parent_res ) < int (resolution ):
364
+ raise ParentResolutionException (
365
+ "Parent resolution ({pr}) must be less than target resolution ({r})" .format (
366
+ pr = parent_res , r = resolution
367
+ )
368
+ )
273
369
con : sqlalchemy .engine .Connection = None
274
370
scheme : str = urlparse (vector_input ).scheme
275
371
if bool (scheme ) and scheme != "file" :
@@ -305,6 +401,7 @@ def h3(
305
401
vector_input ,
306
402
output_directory ,
307
403
int (resolution ),
404
+ parent_res ,
308
405
keep_attributes ,
309
406
partitions ,
310
407
spatial_sorting ,
@@ -315,4 +412,5 @@ def h3(
315
412
con = con ,
316
413
table = table ,
317
414
geom_col = geom_col ,
415
+ overwrite = overwrite ,
318
416
)
0 commit comments