Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-1625324: execute_async() + get_results_from_sfqid() has a naive error handler #2026

Closed
Kache opened this issue Aug 14, 2024 · 7 comments
Assignees
Labels
enhancement The issue is a request for improvement or a new feature status-triage_done Initial triage done, will be further handled by the driver team

Comments

@Kache
Copy link
Contributor

Kache commented Aug 14, 2024

Python version

Python 3.11.4 (main, Jan 10 2024, 15:34:31) [Clang 15.0.0 (clang-1500.1.0.2.5)]

Operating system and processor architecture

macOS-14.6-arm64-arm-64bit

Installed packages

snowflake-connector-python 3.2.0

but issue present in `master` as well

What did you do?

  • execute async query in order to log query id before waiting for query completion
  • lost query error status handling, e.g. SQL compilation error, runtime type casting error, etc
  • only getting naive "something went wrong" exceptions
import logging
import snowflake.connector
import utils.snowflake as sf

logging.basicConfig(level=logging.INFO)

available_options = ['use execute()', 'use execute_async()', 'use workaround']
option = available_options[1]


def query(sql: str, configs: dict):
    with snowflake.connector.connect(**configs) as conn:
        with conn.cursor() as cur:

            # intent: log the query id before results are ready
            yield from execute_while_logging_sfqid(cur, sql)


def execute_while_logging_sfqid(cur: snowflake.connector.cursor.SnowflakeCursor, sql: str):
    if option == 'use execute()':
        # logging.info(f"Executed query: {cur.sfqid}")  # can't do this
        cur.execute(sql)
        logging.info(f"Executed query: {cur.sfqid}")  # unfortunately, after query is complete

        yield from cur

    elif option == 'use execute_async()':
        cur.execute_async(sql)
        logging.info(f"Executing query: {cur.sfqid}")  # good: logs before results are ready

        cur.get_results_from_sfqid(cur.sfqid)

        # unfortunately, wait_until_ready() has a naive error handler
        # that only says "Status of query '{}' is {}, results are unavailable":
        # https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/cursor.py#L1657-L1662
        yield from cur

    elif option == 'use workaround':
        cur.execute_async(sql)
        logging.info(f"Executing query: {cur.sfqid}")  # good: logs before results are ready

        cur.get_results_from_sfqid(cur.sfqid)

        if cur._prefetch_hook:
            try:
                cur._prefetch_hook()
            except snowflake.connector.DatabaseError:  # catch the naive error
                # use better error handler that raises more detailed errors:
                # https://github.com/snowflakedb/snowflake-connector-python/blob/main/src/snowflake/connector/connection.py#L1849-L1872
                cur.connection.get_query_status_throw_if_error(cur.sfqid)
                raise

        yield from cur

    else:
        yield from []

What did you expect to see?

Expected to see the use execute_async() path above to behave similarly to the use workaround path:

  • query id is logged right away, unlike use execute_async()
  • queries that eventually fail still produce meaningful error messages, as in use execute()

This can be done by replacing the naive query status/error handling in wait_until_ready():

if status != QueryStatus.SUCCESS:
raise DatabaseError(
"Status of query '{}' is {}, results are unavailable".format(
sfqid, status.name
)
)

with the better query status/error handling of get_query_status_throw_if_error():

if self.is_an_error(status):
if sf_qid in self._async_sfqids:
self._async_sfqids.pop(sf_qid, None)
message = status_resp.get("message")
if message is None:
message = ""
code = queries[0].get("errorCode", -1)
sql_state = None
if "data" in status_resp:
message += (
queries[0].get("errorMessage", "") if len(queries) > 0 else ""
)
sql_state = status_resp["data"].get("sqlState")
Error.errorhandler_wrapper(
self,
None,
ProgrammingError,
{
"msg": message,
"errno": int(code),
"sqlstate": sql_state,
"sfqid": sf_qid,
},
)

Can you set logging to DEBUG and collect the logs?

import logging
import os

for logger_name in ('snowflake.connector',):
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.DEBUG)
    ch.setFormatter(logging.Formatter('%(asctime)s - %(threadName)s %(filename)s:%(lineno)d - %(funcName)s() - %(levelname)s - %(message)s'))
    logger.addHandler(ch)
@github-actions github-actions bot changed the title execute_async() + get_results_from_sfqid() has a naive error handler SNOW-1625324: execute_async() + get_results_from_sfqid() has a naive error handler Aug 14, 2024
@sfc-gh-sghosh sfc-gh-sghosh self-assigned this Aug 19, 2024
@sfc-gh-sghosh sfc-gh-sghosh added the status-triage Issue is under initial triage label Aug 19, 2024
@sfc-gh-sghosh
Copy link

Hello @Kache ,

Thanks for raising the issue, we are investigating, will update.

Regards,
Sujan

@sfc-gh-sghosh
Copy link

sfc-gh-sghosh commented Aug 22, 2024

Hello @Kache ,

I just executed the code snippet in asynchronous mode with incorrect SQL statement and I am getting the query id and the proper error message, its not losing the SQL compilation error, run time error information etc.

 elif option == 'use execute_async()':
        cur.execute_async(sql)
        logging.info(f"Executing execute_async query: {cur.sfqid}")  # good: logs before results are ready
        cur.get_results_from_sfqid(cur.sfqid)
        yield from cur
        

sql_query = "copy into sujancsv select * from mycsvtable1"


for result in query(sql_query, configs):
    print(result)

Output:
INFO:root:Executing execute_async query: 01b6862c-080b-17a3-0000-164926f2564a
INFO:snowflake.connector.connection:closed
INFO:snowflake.connector.connection:No async queries seem to be running, deleting session
ProgrammingError                          Traceback (most recent call last)
Cell In[20], line 72
     69 sql_query = "copy into sujancsv select * from mycsvtable1"
     71 # Execute and print results
---> 72 for result in query(sql_query, configs):
     73     print(result)

Cell In[20], line 26, in query(sql, configs)
     22 with snowflake.connector.connect(**configs) as conn:
     23     with conn.cursor() as cur:
     24 
     25         # intent: log the query id before results are ready
---> 26         yield from execute_while_logging_sfqid(cur, sql)

Cell In[20], line 41, in execute_while_logging_sfqid(cur, sql)
     38 cur.execute_async(sql)
     39 logging.info(f"Executing execute_async query: {cur.sfqid}")  # good: logs before results are ready
---> 41 cur.get_results_from_sfqid(cur.sfqid)
     43 # unfortunately, wait_until_ready() has a naive error handler
     44 # that only says "Status of query '{}' is {}, results are unavailable":
     45 # https://github.com/snowflakedb/snowflake-connector-python/blob/416ff578932cfec00d60fdaac9091df58f294de8/src/snowflake/connector/cursor.py#L1657-L1662
     46 yield from cur

File /usr/local/lib/python3.8/site-packages/snowflake/connector/cursor.py:1679, in SnowflakeCursor.get_results_from_sfqid(self, sfqid)
   1676         if "data" in ret and "resultIds" in ret["data"]:
   1677             self._init_multi_statement_results(ret["data"])
-> 1679 self.connection.get_query_status_throw_if_error(
   1680     sfqid
   1681 )  # Trigger an exception if query failed
   1682 klass = self.__class__
   1683 self._inner_cursor = klass(self.connection)

File /usr/local/lib/python3.8/site-packages/snowflake/connector/connection.py:1824, in SnowflakeConnection.get_query_status_throw_if_error(self, sf_qid)
   1820         message += (
   1821             queries[0].get("errorMessage", "") if len(queries) > 0 else ""
   1822         )
   1823         sql_state = status_resp["data"].get("sqlState")
-> 1824     Error.errorhandler_wrapper(
   1825         self,
   1826         None,
   1827         ProgrammingError,
   1828         {
   1829             "msg": message,
   1830             "errno": int(code),
   1831             "sqlstate": sql_state,
   1832             "sfqid": sf_qid,
   1833         },
   1834     )
   1835 return status

File /usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py:290, in Error.errorhandler_wrapper(connection, cursor, error_class, error_value)
    267 @staticmethod
    268 def errorhandler_wrapper(
    269     connection: SnowflakeConnection | None,
   (...)
    272     error_value: dict[str, Any],
    273 ) -> None:
    274     """Error handler wrapper that calls the errorhandler method.
    275 
    276     Args:
   (...)
    287         exception to the first handler in that order.
    288     """
--> 290     handed_over = Error.hand_to_other_handler(
    291         connection,
    292         cursor,
    293         error_class,
    294         error_value,
    295     )
    296     if not handed_over:
    297         raise Error.errorhandler_make_exception(
    298             error_class,
    299             error_value,
    300         )

File /usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py:348, in Error.hand_to_other_handler(connection, cursor, error_class, error_value)
    346     return True
    347 elif connection is not None:
--> 348     connection.errorhandler(connection, cursor, error_class, error_value)
    349     return True
    350 return False

File /usr/local/lib/python3.8/site-packages/snowflake/connector/errors.py:221, in Error.default_errorhandler(connection, cursor, error_class, error_value)
    219 errno = error_value.get("errno")
    220 done_format_msg = error_value.get("done_format_msg")
--> 221 raise error_class(
    222     msg=error_value.get("msg"),
    223     errno=None if errno is None else int(errno),
    224     sqlstate=error_value.get("sqlstate"),
    225     sfqid=error_value.get("sfqid"),
    226     query=error_value.get("query"),
    227     done_format_msg=(
    228         None if done_format_msg is None else bool(done_format_msg)
    229     ),
    230     connection=connection,
    231     cursor=cursor,
    232 )`

ProgrammingError: 001003: 1003: SQL compilation error:
syntax error line 1 at position 19 unexpected 'select'.

@Kache
Copy link
Contributor Author

Kache commented Aug 24, 2024

In order to reproduce, can't use a query that "fails too quickly", where the failure is immediately available and immediately returned. Here's an example of a query that takes some execution time before failing:

        SELECT *
        FROM (
            SELECT SEQ4()::variant AS col
            FROM TABLE(GENERATOR(ROWCOUNT => 1000000))  -- increase as needed
            UNION
            SELECT 'fail'::variant AS col  -- fails at runtime when execution reaches this row
        ) a
        JOIN (VALUES (1)) b(col) ON a.col::varchar::int = b.col

Perhaps if the following get_query_status(sfqid) were instead get_query_status_throw_if_error(sfqid)?

status = self.connection.get_query_status(sfqid)

Let me put together a complete script

@sfc-gh-sghosh
Copy link

Hello @Kache ,

Thanks for the update; we are working on it and will update you.

Regards,
Sujan

@sfc-gh-sghosh sfc-gh-sghosh added status-triage_done Initial triage done, will be further handled by the driver team enhancement The issue is a request for improvement or a new feature and removed status-triage Issue is under initial triage bug labels Aug 26, 2024
@Kache
Copy link
Contributor Author

Kache commented Aug 28, 2024

Here is a cleaner repro than in the original post:

runtime_fail_sql = """
    SELECT *
    FROM (
        SELECT SEQ4()::variant AS col
        FROM TABLE(GENERATOR(ROWCOUNT => 1000000))  -- increase as needed
        UNION
        SELECT 'fail'::variant AS col  -- fails at runtime when execution reaches this row
    ) a
    JOIN (VALUES (1)) b(col) ON a.col::varchar::int = b.col
"""


def full_error():
    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=snowflake.connector.cursor.DictCursor) as cur:
            cur.execute(runtime_fail_sql)
            # snowflake.connector.errors.ProgrammingError: 100038 (22018): 01b6a58e-0308-12f7-76fd-87015479a46b: Numeric value 'fail' is not

            logging.info(cur.fetchall())


def async_naive_error():
    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=snowflake.connector.cursor.DictCursor) as cur:
            cur.execute_async(runtime_fail_sql)
            assert cur.sfqid
            cur.get_results_from_sfqid(cur.sfqid)

            results = cur.fetchall()
            # snowflake.connector.errors.DatabaseError: Status of query '01b6a58e-0308-12f9-76fd-8701547996af' is FAILED_WITH_ERROR, results are unavailable

            logging.info(results)


def async_full_error():
    retry_pattern = it.chain(ASYNC_RETRY_PATTERN, it.repeat(ASYNC_RETRY_PATTERN[-1]))

    with snowflake.connector.connect(**creds) as conn:
        with conn.cursor(cursor_class=snowflake.connector.cursor.DictCursor) as cur:
            cur.execute_async(runtime_fail_sql)
            assert cur.sfqid

            # custom wait loop, based on wait_until_ready() from cursor.get_results_from_sfqid()
            while True:
                status = cur.connection.get_query_status_throw_if_error(cur.sfqid)
                # snowflake.connector.errors.ProgrammingError: 100038: 100038: Numeric value 'fail' is not recognized

                if not cur.connection.is_still_running(status):
                    break

                time.sleep(0.5 * next(retry_pattern))

            cur.get_results_from_sfqid(cur.sfqid)
            logging.info(cur.fetchall())

In other words, it seems like this line should instead call get_query_status_throw_if_error():

status = self.connection.get_query_status(sfqid)

@sfc-gh-aling
Copy link
Collaborator

hey @Kache , thanks for your feedback.
I have a PR out to improve the error experience for async query: #2035

@sfc-gh-aling
Copy link
Collaborator

hey @Kache , we have released v3.12.2 with the fix, could you try out the latest version?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement The issue is a request for improvement or a new feature status-triage_done Initial triage done, will be further handled by the driver team
Projects
None yet
Development

No branches or pull requests

4 participants