Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BadRequest when performing merge after write in the same Spark session #964

Closed
khaledh opened this issue May 5, 2023 · 8 comments
Closed
Assignees

Comments

@khaledh
Copy link

khaledh commented May 5, 2023

I have a use case where we need to use MERGE INTO, but because the connector doesn't support it natively (there's an issue for it #575) we do this by writing the delta dataframe to a temp table and then use the python-bigquery library to execute a MERGE sql query:

def write_then_merge():
    full_df = spark.createDataFrame([
        (1, 'a'),
        (2, 'b'),
        (3, 'c'),
    ], ['id', 'letter'])

    logger.info('writing full_df')
    full_df.write.format('bigquery').option('writeMethod', 'direct').save(
        'myproject.scratch.overwrite_then_merge'
    )

    delta_df = spark.createDataFrame([
        (1, 'aa'),
        (2, 'bb'),
    ], ['id', 'letter'])

    logger.info('writing delta_df')
    delta_df.write.format('bigquery').option('writeMethod', 'direct').save(
        'myproject.scratch.overwrite_then_merge_tmp'
    )

    logger.info('merging delta_df into full_df...')

    query = """
        MERGE INTO myproject.scratch.overwrite_then_merge AS target
        USING myproject.scratch.overwrite_then_merge_tmp AS source
        ON target.id = source.id
        WHEN MATCHED THEN UPDATE SET
          target.letter = source.letter
        WHEN NOT MATCHED THEN INSERT ROW"""

    client = bigquery.Client(project='myproject')
    job = client.query(query)
    job.result()

    logger.info('done')

However, this results in the following error:

BadRequest: 400 UPDATE or DELETE statement over table myproject.scratch.overwrite_then_merge would affect rows in the streaming buffer, which is not supported

Relevant part of the stack trace:

    job.result()
  File "/tmp/tmp3lb1gxhh/installed_wheels/.../google_cloud_bigquery-3.10.0-py2.py3-none-any.whl/google/cloud/bigquery/job/query.py", line 1520, in result
    do_get_result()
  File "/tmp/tmp3lb1gxhh/installed_wheels/.../google_api_core-2.11.0-py3-none-any.whl/google/api_core/retry.py", line 349, in retry_wrapped_func
    return retry_target(
  File "/tmp/tmp3lb1gxhh/installed_wheels/.../google_api_core-2.11.0-py3-none-any.whl/google/api_core/retry.py", line 191, in retry_target
    return target()
  File "/tmp/tmp3lb1gxhh/installed_wheels/.../google_cloud_bigquery-3.10.0-py2.py3-none-any.whl/google/cloud/bigquery/job/query.py", line 1510, in do_get_result
    super(QueryJob, self).result(retry=retry, timeout=timeout)
  File "/tmp/tmp3lb1gxhh/installed_wheels/.../google_cloud_bigquery-3.10.0-py2.py3-none-any.whl/google/cloud/bigquery/job/base.py", line 911, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
  File "/tmp/tmp3lb1gxhh/installed_wheels/.../google_api_core-2.11.0-py3-none-any.whl/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 UPDATE or DELETE statement over table myproject.scratch.overwrite_then_merge would affect rows in the streaming buffer, which is not supported

I was able to find some relevant docs about availability of data after streaming into BigQuery, which states that it may take up to 90 minutes to be available, so I put the above code in a retry loop that retried for 2 hours, and it still hits the same issue. Also if I execute the first write and the second merge steps in two separate runs back to back, it works fine. So I don't think the above documentation page is relevant to this issue.

@khaledh
Copy link
Author

khaledh commented May 5, 2023

More context: If I switch the writing of full_df to use indirect write method (using a temp gcs bucket), this works flawlessly.

@suryasoma
Copy link
Contributor

which spark version and connector version are you using?

@suryasoma suryasoma self-assigned this May 10, 2023
@davidrabinowitz
Copy link
Member

When using the DIRECT write method it my take a few seconds until the data appears in the table. Have you tried to use the INDIRECT write method?

@khaledh
Copy link
Author

khaledh commented Jun 9, 2023

When using the DIRECT write method it my take a few seconds until the data appears in the table.

The data does appear if I query it, but I still get the error above if I try to MERGE into the table after the first write.

Have you tried to use the INDIRECT write method?

Yes, that's what I mentioned in my second comment above. If I create/write the table using the indirect method, then I don't get the error with the subsequent MERGE.

@khaledh
Copy link
Author

khaledh commented Jun 9, 2023

which spark version and connector version are you using?

Spark 3.3 and connector 0.30.0.

@davidrabinowitz
Copy link
Member

@yirutang can you please have a look?

@davidrabinowitz davidrabinowitz assigned yirutang and unassigned suryasoma Jun 9, 2023
@yirutang
Copy link
Collaborator

yirutang commented Jun 9, 2023

We are working on making data after commit to be on formal storage, but that is WIP. In the meantime, commit will trigger conversion, and the time is less than publicly documented streaming delay. Our study shows 99% conversion will be done in 2 min and the longest tail we saw is 25 min.

@davidrabinowitz
Copy link
Member

Given that, I suggest to switch to the INDIRECT mode for the time being, or add a retry logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants