Skip to content

Commit

Permalink
SNOW-1562604: enhance error handling when polling query result (#2027)
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-aling authored Aug 19, 2024
1 parent 5832190 commit d3efb17
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 3 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Source code is also available at: https://github.com/snowflakedb/snowflake-conne
- Use `pathlib` instead of `os` for default config file location resolution.
- Removed upper `cryptogaphy` version pin.
- Removed reference to script `snowflake-export-certs` (its backing module was already removed long ago)
- Enhanced retry mechanism for handling transient network failures during query result polling when no server response is received.

- v3.12.0(July 24,2024)
- Set default connection timeout of 10 seconds and socket read timeout of 10 minutes for HTTP calls in file transfer.
Expand Down
22 changes: 20 additions & 2 deletions src/snowflake/connector/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,7 @@ def _get_request(
headers: dict[str, str],
token: str = None,
timeout: int | None = None,
is_fetch_query_status: bool = False,
) -> dict[str, Any]:
if "Content-Encoding" in headers:
del headers["Content-Encoding"]
Expand All @@ -692,6 +693,7 @@ def _get_request(
headers,
timeout=timeout,
token=token,
is_fetch_query_status=is_fetch_query_status,
)
if ret.get("code") == SESSION_EXPIRED_GS_CODE:
try:
Expand All @@ -706,7 +708,12 @@ def _get_request(
)
)
if ret.get("success"):
return self._get_request(url, headers, token=self.token)
return self._get_request(
url,
headers,
token=self.token,
is_fetch_query_status=is_fetch_query_status,
)

return ret

Expand Down Expand Up @@ -779,7 +786,13 @@ def _post_request(
result_url = ret["data"]["getResultUrl"]
logger.debug("ping pong starting...")
ret = self._get_request(
result_url, headers, token=self.token, timeout=timeout
result_url,
headers,
token=self.token,
timeout=timeout,
is_fetch_query_status=bool(
re.match(r"^/queries/.+/result$", result_url)
),
)
logger.debug("ret[code] = %s", ret.get("code", "N/A"))
logger.debug("ping pong done")
Expand Down Expand Up @@ -878,6 +891,7 @@ def _request_exec_wrapper(

full_url = retry_ctx.add_retry_params(full_url)
full_url = SnowflakeRestful.add_request_guid(full_url)
is_fetch_query_status = kwargs.pop("is_fetch_query_status", False)
try:
return_object = self._request_exec(
session=session,
Expand All @@ -890,6 +904,10 @@ def _request_exec_wrapper(
)
if return_object is not None:
return return_object
if is_fetch_query_status:
err_msg = "fetch query status failed and http request returned None, this is usually caused by transient network failures, retrying..."
logger.info(err_msg)
raise RetryRequest(err_msg)
self._handle_unknown_error(method, full_url, headers, data, conn)
return {}
except RetryRequest as e:
Expand Down
64 changes: 63 additions & 1 deletion test/integ/test_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,19 @@

from __future__ import annotations

import logging
import unittest.mock
from logging import getLogger

import pytest

import snowflake.connector
from snowflake.connector import errorcode, errors
from snowflake.connector.network import SnowflakeRestful
from snowflake.connector.network import (
QUERY_IN_PROGRESS_ASYNC_CODE,
QUERY_IN_PROGRESS_CODE,
SnowflakeRestful,
)

logger = getLogger(__name__)

Expand Down Expand Up @@ -36,3 +45,56 @@ def test_no_auth(db_parameters):
assert e.errno == errorcode.ER_CONNECTION_IS_CLOSED
finally:
rest.close()


@pytest.mark.skipolddriver
@pytest.mark.parametrize(
"query_return_code", [QUERY_IN_PROGRESS_CODE, QUERY_IN_PROGRESS_ASYNC_CODE]
)
def test_none_object_when_querying_result(db_parameters, caplog, query_return_code):
# this test simulate the case where the response from the server is None
# the following events happen in sequence:
# 1. we send a simple query to the server which is a post request
# 2. we record the query result in a global variable
# 3. we mock return a query in progress code and an url to fetch the query result
# 4. we return None for the fetching query result request for the first time
# 5. for the second time, we return the code for the query result
# 6. in the end, we assert the result, and retry has taken place when result is None by checking logging

original_request_exec = SnowflakeRestful._request_exec
expected_ret = None
get_executed_time = 0

def side_effect_request_exec(self, *args, **kwargs):
nonlocal expected_ret, get_executed_time
# 1. we send a simple query to the server which is a post request
if "queries/v1/query-request" in kwargs["full_url"]:
ret = original_request_exec(self, *args, **kwargs)
expected_ret = ret # 2. we record the query result in a global variable
# 3. we mock return a query in progress code and an url to fetch the query result
return {
"code": query_return_code,
"data": {"getResultUrl": "/queries/123/result"},
}

if "/queries/123/result" in kwargs["full_url"]:
if get_executed_time == 0:
# 4. we return None for the 1st time fetching query result request, this should trigger retry
get_executed_time += 1
return None
else:
# 5. for the second time, we return the code for the query result, this indicates retry success
return expected_ret

with snowflake.connector.connect(
**db_parameters
) as conn, conn.cursor() as cursor, caplog.at_level(logging.INFO):
with unittest.mock.patch.object(
SnowflakeRestful, "_request_exec", new=side_effect_request_exec
):
# 6. in the end, we assert the result, and retry has taken place when result is None by checking logging
assert cursor.execute("select 1").fetchone() == (1,)
assert (
"fetch query status failed and http request returned None, this is usually caused by transient network failures, retrying"
in caplog.text
)

0 comments on commit d3efb17

Please sign in to comment.