Skip to content

Commit

Permalink
Terminate session when canceling calculation execution
Browse files Browse the repository at this point in the history
  • Loading branch information
laughingman7743 committed Jan 8, 2024
1 parent a90e7b5 commit 772949a
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 3 deletions.
5 changes: 2 additions & 3 deletions pyathena/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,8 @@ def __init__(self, response: Dict[str, Any]) -> None:
self._description: Optional[str] = response.get("Description")
self._working_directory: Optional[str] = response.get("WorkingDirectory")

result = response.get("Result")
if not result:
raise DataError("KeyError `Result`")
# If cancelled, the result does not exist.
result = response.get("Result", {})
self._std_out_s3_uri: Optional[str] = result.get("StdOutS3Uri")
self._std_error_s3_uri: Optional[str] = result.get("StdErrorS3Uri")
self._result_s3_uri: Optional[str] = result.get("ResultS3Uri")
Expand Down
4 changes: 4 additions & 0 deletions tests/pyathena/spark/test_async_spark_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,9 @@ def test_cancel(self, async_spark_cursor):
)
time.sleep(randint(5, 10))
async_spark_cursor.cancel(query_id).result()

# TODO: Calculation execution is not canceled unless session is terminated
async_spark_cursor.close()

calculation_execution = future.result()
assert calculation_execution.state == AthenaCalculationExecution.STATE_CANCELED
3 changes: 3 additions & 0 deletions tests/pyathena/spark/test_spark_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ def cancel(c):
time.sleep(randint(5, 10))
c.cancel()

# TODO: Calculation execution is not canceled unless session is terminated
c.close()

with ThreadPoolExecutor(max_workers=1) as executor:
executor.submit(cancel, spark_cursor)

Expand Down

0 comments on commit 772949a

Please sign in to comment.