Skip to content

Commit

Permalink
added connection error handling to client; tests for parquet and rast…
Browse files Browse the repository at this point in the history
…er; support for raster sampling
  • Loading branch information
jpswinski committed Dec 19, 2022
1 parent a74d292 commit 838caf8
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 41 deletions.
12 changes: 10 additions & 2 deletions sliderule/icesat2.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def atl06p(parm, asset=DEFAULT_ASSET, version=DEFAULT_ICESAT2_SDP_VERSION, callb
columns = {}
elevation_records = []
num_elevations = 0
field_dictionary = {} # ['field_name'] = {"extent_id": [], field_name: []}
field_dictionary = {} # [<field_name>] = {"extent_id": [], <field_name>: []}
if len(rsps) > 0:
# Sort Records
for rsp in rsps:
Expand All @@ -806,7 +806,7 @@ def atl06p(parm, asset=DEFAULT_ASSET, version=DEFAULT_ICESAT2_SDP_VERSION, callb
elif 'extrec' == rsp['__rectype']:
field_name = parm['atl03_geo_fields'][rsp['field_index']]
if field_name not in field_dictionary:
field_dictionary[field_name] = {"extent_id": [], field_name: []}
field_dictionary[field_name] = {'extent_id': [], field_name: []}
# Parse Ancillary Data
data = __get_values(rsp['data'], rsp['datatype'], len(rsp['data']))
# Add Left Pair Track Entry
Expand All @@ -815,6 +815,14 @@ def atl06p(parm, asset=DEFAULT_ASSET, version=DEFAULT_ICESAT2_SDP_VERSION, callb
# Add Right Pair Track Entry
field_dictionary[field_name]['extent_id'] += rsp['extent_id'] | 0x3,
field_dictionary[field_name][field_name] += data[RIGHT_PAIR],
elif 'rsrec' == rsp['__rectype']:
for sample in rsp["samples"]:
time_str = sliderule.gps2utc(sample["time"])
field_name = parm['samples'][rsp['raster_index']] + "-" + time_str.split(" ")[0].strip()
if field_name not in field_dictionary:
field_dictionary[field_name] = {'extent_id': [], field_name: []}
field_dictionary[field_name]['extent_id'] += rsp['extent_id'],
field_dictionary[field_name][field_name] += sample['value'],
# Build Elevation Columns
if num_elevations > 0:
# Initialize Columns
Expand Down
85 changes: 47 additions & 38 deletions sliderule/sliderule.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,44 +455,53 @@ def source (api, parm={}, stream=False, callbacks={}, path="/source"):
if c not in callbacks:
callbacks[c] = __callbacks[c]
# Attempt Request
try:
# Construct Request URL and Authorization
if service_org:
url = 'https://%s.%s%s/%s' % (service_org, service_url, path, api)
headers = __build_auth_header()
else:
url = 'http://%s%s/%s' % (service_url, path, api)
# Perform Request
if not stream:
data = session.get(url, data=rqst, headers=headers, timeout=request_timeout)
else:
data = session.post(url, data=rqst, headers=headers, timeout=request_timeout, stream=True)
data.raise_for_status()
# Parse Response
format = data.headers['Content-Type']
if format == 'text/plain':
rsps = __parse_json(data)
elif format == 'application/json':
rsps = __parse_json(data)
elif format == 'application/octet-stream':
rsps = __parse_native(data, callbacks)
else:
raise FatalError('unsupported content type: %s' % (format))
except requests.exceptions.SSLError as e:
raise FatalError("Unable to verify SSL certificate: {}".format(e))
except requests.ConnectionError as e:
raise FatalError("Connection error to endpoint {}".format(url))
except requests.Timeout as e:
raise TransientError("Timed-out waiting for response from endpoint {}".format(url))
except requests.exceptions.ChunkedEncodingError as e:
raise RuntimeError("Unexpected termination of response from endpoint {}".format(url))
except requests.HTTPError as e:
if e.response.status_code == 503:
raise TransientError("Server experiencing heavy load, stalling on request to {}".format(url))
else:
raise FatalError("HTTP error {} from endpoint {}".format(e.response.status_code, url))
except:
raise
complete = False
attempts = 3
while not complete and attempts > 0:
attempts -= 1
try:
# Construct Request URL and Authorization
if service_org:
url = 'https://%s.%s%s/%s' % (service_org, service_url, path, api)
headers = __build_auth_header()
else:
url = 'http://%s%s/%s' % (service_url, path, api)
# Perform Request
if not stream:
data = session.get(url, data=rqst, headers=headers, timeout=request_timeout)
else:
data = session.post(url, data=rqst, headers=headers, timeout=request_timeout, stream=True)
data.raise_for_status()
# Parse Response
format = data.headers['Content-Type']
if format == 'text/plain':
rsps = __parse_json(data)
elif format == 'application/json':
rsps = __parse_json(data)
elif format == 'application/octet-stream':
rsps = __parse_native(data, callbacks)
else:
raise FatalError('unsupported content type: %s' % (format))
# Success
complete = True
except requests.exceptions.SSLError as e:
logger.error("Unable to verify SSL certificate: {} ...retrying request".format(e))
except requests.ConnectionError as e:
logger.error("Connection error to endpoint {} ...retrying request".format(url))
except requests.Timeout as e:
logger.error("Timed-out waiting for response from endpoint {} ...retrying request".format(url))
except requests.exceptions.ChunkedEncodingError as e:
logger.error("Unexpected termination of response from endpoint {} ...retrying request".format(url))
except requests.HTTPError as e:
if e.response.status_code == 503:
raise TransientError("Server experiencing heavy load, stalling on request to {}".format(url))
else:
raise FatalError("HTTP error {} from endpoint {}".format(e.response.status_code, url))
except:
raise
# Check Success
if not complete:
raise FatalError("Unable to complete request due to errors")
# Return Response
return rsps

Expand Down
7 changes: 7 additions & 0 deletions tests/test_arcticdem.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ def test_sampler(self, domain, asset, organization):
"maxi": 1,
"samples": ["arcticdem-mosaic"] }
gdf = icesat2.atl06p(parms, asset=asset, resources=[resource])
assert len(gdf) == 964
assert len(gdf.keys()) == 17
assert gdf["rgt"][0] == 1160
assert gdf["cycle"][0] == 2
assert gdf['segment_id'].describe()["min"] == 405240
assert gdf['segment_id'].describe()["max"] == 405915
assert abs(gdf["arcticdem-mosaic-1980-01-06"].describe()["min"] - 655.14990234375) < 0.0001
16 changes: 16 additions & 0 deletions tests/test_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"""Tests for sliderule-python connection errors when requests get sent back to back."""

import pytest
import sliderule
from sliderule import icesat2

@pytest.mark.network
class TestInit:
def test_loop_init(self, domain, organization):
for _ in range(10):
icesat2.init(domain, organization=organization)

def test_loop_versions(self, domain, organization):
icesat2.init(domain, organization=organization)
for _ in range(10):
sliderule.source("version", {})
2 changes: 1 addition & 1 deletion tests/test_parquet.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Tests for sliderule-python arcticdem raster support."""
"""Tests for sliderule-python parquet support."""

import pytest
from pathlib import Path
Expand Down

0 comments on commit 838caf8

Please sign in to comment.