Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SNOW-869536: Iterators from to_local_iterator stop returning results after another query occurs #945

Closed
orrdermer1 opened this issue Jul 16, 2023 · 2 comments · Fixed by #1226
Assignees
Labels
bug Something isn't working

Comments

@orrdermer1
Copy link

orrdermer1 commented Jul 16, 2023

Please answer these questions before submitting your issue. Thanks!

  1. What version of Python are you using?

Python 3.10.8 (main, Oct 13 2022, 09:48:40) [Clang 14.0.0 (clang-1400.0.29.102)]

  1. What operating system and processor architecture are you using?

macOS-13.3.1-arm64-arm-64bit

  1. What are the component versions in the environment (pip freeze)?

... (Snowpark 1.5.1)

  1. What did you do?

We've been using an iterator from to_local_iterator(), and also using the table's schema to parse it.

# Note: df.schema must not be called before this for bug to be recreated
df = session.table("some_table")
my_iter = df.to_local_iterator()
counter = 0
for row in my_iter:
   len(df.schema.fields)  # Here was our parsing logic which used df.schema, this is good enough to recreate the bug
   counter += 1
print(counter)   # 1 is printed - we only iterated over the first row
  1. What did you expect to see?

We expected to iterate over all the rows, and we only iterated over the first one.
Calling df.schema had probably caused the python snowflake connector to execute another query, making cursor.execute() no longer point to our query and rendering the iterator useless.
This probably means that generally, other queries cannot be run while iterating.
Note that there's an easy workaround, using AsyncJobs - which makes the iterator query specifically for our query-id, and thus is still stable even while other queries are running:

my_iter = df.to_local_iterator(block=False).result()
counter = 0
for row in my_iter:
   len(df.schema.fields)
   counter += 1
print(counter)  # Prints the length of the table
  1. Can you set logging to DEBUG and collect the logs?

Hard to do with our current environment :(

@orrdermer1 orrdermer1 added bug Something isn't working needs triage Initial RCA is required labels Jul 16, 2023
@github-actions github-actions bot changed the title Iterators from to_local_iterator stop returning results after another query occurs SNOW-869536: Iterators from to_local_iterator stop returning results after another query occurs Jul 16, 2023
@orrdermer1
Copy link
Author

Apparently the suggestion I've made isn't good either, because apparently (block=False).result() actually invokes fetchall - which means all data is loaded into the process at once, making the iterator useless.

I wound up writing a function to solve it. For some reason _cursor.description didn't return quite right as well, but this should work:

from snowflake.snowpark._internal.utils import result_set_to_iter
from snowflake.snowpark.dataframe import DataFrame
from snowflake.snowpark.async_job import AsyncJob


def get_iterator_from_df(df: DataFrame, case_sensitive=True):
    """
    This function is a workaround for a bug in Snowpark, allowing to iterate over multiple dataframes simultaneously.
    """
    # Async jobs create a new cursor, which is good for us
    async_job: AsyncJob = df.to_local_iterator(block=False)  # type: ignore
    
    # Not using "async_job.result" because it uses fetchall - effectively collecting everything
    result_meta = async_job._cursor.describe(async_job._query)
    assert result_meta is not None, "Failed to get result metadata"
    async_job._cursor.get_results_from_sfqid(async_job.query_id)
    
    return result_set_to_iter(
        iter(async_job._cursor),
        result_meta,
        case_sensitive=case_sensitive,
    )

This code will show that it works, and profiling shows that fetchone was only called 4 times instead of for all the results.

import cProfile

df_1 = session.table("<>").limit(500000)
df_2 = session.table("<>").limit(400000)

with cProfile.Profile() as pr:
    iterator_1 = get_iterator_from_df(df_1)
    iterator_2 = get_iterator_from_df(df_2, case_sensitive=False)
    print(f"First iterator: {next(iterator_1)}")
    print(f"Second iterator: {next(iterator_2)}")
    print()
    print(f"First iterator: {next(iterator_1)}")
    print(f"Second iterator: {next(iterator_2)}")
    pr.print_stats()

@sfc-gh-stan
Copy link
Collaborator

I can repro, this is because the same cursor is being reused for both queries.

results_cursor = self._cursor.execute(query, params=params, **kwargs)
. Assigned to myself for fixing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants