forked from jasonbot/geojson-madness
-
Notifications
You must be signed in to change notification settings - Fork 4
/
geojson_out.py
159 lines (144 loc) · 6.38 KB
/
geojson_out.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
# coding: utf-8
import json
import os
import sys
from collections import OrderedDict
if ((3, 0) <= sys.version_info <= (3, 9)):
from urllib.request import urlopen, Request
elif ((2, 0) <= sys.version_info <= (2, 9)):
from urllib2 import urlopen, Request
import arcpy
__all__ = ['write_geojson_file', 'post_gist', 'write_geojson_gist']
def part_split_at_nones(part_items):
current_part = []
for item in part_items:
if item is None:
if current_part:
yield current_part
current_part = []
else:
current_part.append((round(item.X, 6), round(item.Y, 6)))
if current_part:
yield current_part
def geometry_to_struct(in_geometry):
if in_geometry is None:
return None
elif isinstance(in_geometry, arcpy.PointGeometry):
pt = in_geometry.getPart(0)
return {
'type': "Point",
'coordinates': (round(pt.X, 6), round(pt.Y, 6))
}
elif isinstance(in_geometry, arcpy.Polyline):
parts = [[(round(point.X, 6), round(point.Y, 6)) for point in in_geometry.getPart(part)]
for part in range(in_geometry.partCount)]
if len(parts) == 1:
return {
'type': "LineString",
'coordinates': parts[0]
}
else:
return {
'type': "MultiLineString",
'coordinates': parts
}
elif isinstance(in_geometry, arcpy.Polygon):
parts = [list(part_split_at_nones(in_geometry.getPart(part)))
for part in range(in_geometry.partCount)]
if len(parts) == 1:
return {
'type': "Polygon",
'coordinates': parts[0]
}
else:
return {
'type': "MultiPolygon",
'coordinates': parts
}
else:
raise ValueError(in_geometry)
def utf8ify(fn_):
def fn(*a, **k):
for output in fn_(*a, **k):
if isinstance(output, unicode):
yield output.encode("utf-8")
elif isinstance(output, str):
try:
output.decode("utf-8")
yield output
except:
# Magnificently hacky: don't know encoding, so just go nuts
# and strip all non-ASCII
yield (output.decode("ascii", "replace")
.encode("ascii", "replace"))
else:
yield output
return fn
@utf8ify
def geojson_lines_for_feature_class(in_feature_class):
shape_field = arcpy.Describe(in_feature_class).shapeFieldName
spatial_reference = arcpy.SpatialReference('WGS 1984')
aliased_fields = {
field.name: (field.aliasName or field.name)
for field in arcpy.ListFields(in_feature_class)
}
record_count = int(arcpy.management.GetCount(in_feature_class)[0])
arcpy.SetProgressor("step", "Writing records", 0, record_count)
with arcpy.da.SearchCursor(in_feature_class, ['SHAPE@', '*'],
spatial_reference=spatial_reference) as in_cur:
counter = 0
col_names = [aliased_fields.get(f, f) for f in in_cur.fields[1:] if f.lower() not in ['shape_area', 'shape_length', 'geom_area', 'geom_length']]
yield '{"type": "FeatureCollection", "features": ['
for row_idx, row in enumerate(in_cur):
counter += 1
if (row_idx % 100 == 1):
arcpy.SetProgressorPosition(row_idx)
geometry_dict = geometry_to_struct(row[0])
property_dict = OrderedDict(zip(col_names, row[1:]))
if shape_field in property_dict:
del property_dict[shape_field]
# Add some default display properties
if geometry_dict['type'] in ['LineString', 'MultiLineString']:
property_dict.update(OrderedDict([('stroke', '#0000ff'), ('stroke-opacity', 0.8), ('stroke-width', 2)]))
elif geometry_dict['type'] in ['Polygon', 'MultiPolygon']:
property_dict.update(OrderedDict([('fill', '#0000ff'), ('fill-opacity', 0.3), ('stroke', '#0000ff'), ('stroke-opacity', 0.8), ('stroke-width', 1)]))
row_struct = OrderedDict([
("type", "Feature"),
("properties", property_dict),
("geometry", geometry_dict),
])
if counter < record_count:
yield ' ' + json.dumps(row_struct) + ','
else:
yield ' ' + json.dumps(row_struct) # No comma after final feature
yield ']}'
def get_geojson_string(in_feature_class):
return ''.join(geojson_lines_for_feature_class(in_feature_class))
def write_geojson_file(in_feature_class, out_json_file):
arcpy.AddMessage("Writing features from {} to {}".format(in_feature_class,
out_json_file))
with open(out_json_file, 'wb') as out_json:
for line in geojson_lines_for_feature_class(in_feature_class):
out_json.write(line + "\n")
def post_gist(in_feature_class, feature_geojson):
filename = os.path.basename(in_feature_class) + ".json"
gist_payload = {
'description':
u"Feature Layer {}".format(in_feature_class),
'public': True,
'files': {
filename: { "content": feature_geojson }
}
}
req = Request("https://api.github.com/gists",
json.dumps(gist_payload),
headers = {'Content-Type': 'application/json'})
reponse = urlopen(req)
return json.loads(reponse.read())["url"]
def write_geojson_gist(in_feature_class):
arcpy.AddMessage("Getting GeoJSON from features")
geojson = get_geojson_string(in_feature_class)
arcpy.AddMessage("Posting Gist")
gist_url = post_gist(in_feature_class, geojson)
arcpy.AddMessage("Posted Gist to {}".format(gist_url))
return gist_url