Description
I have a spark application that I want to stream messages into BigQuery. Instead of manually having to a merge operation on the BQ side, I'd like to use the native CDC functionality that allows for UPSERTS and DELETES. I created a table in BQ w/ a PK as well as clustered so the pre-requisites are good there.
I added the field _CHANGE_TYPE to my data I wanted to load and omitted that field in the BQ table because its a pseudo column. When I try to use the storage write API I get errors related to it trying to write that pseudo column and their being a schema mismatch between the DF and the BQ schema.
It seems like this CDC feature isn't supported as part of the connector. Is this something that can be done? If not, is this in the planning stages?
simple example:
spark.conf.set('spark.datasource.bigquery.enableModeCheckForSchemaFields',False)
spark.conf.set('spark.datasource.bigquery.writeAtLeastOnce',True)
word_count = spark.sql(
'SELECT word, SUM(word_count) AS word_count, 'UPSERT' as _CHANGE_TYPE FROM words GROUP BY word')
Save the data to BigQuery
word_count.write.format('bigquery')
.option('writeMethod','direct')
.mode('append')
.save('demo_data.cdc_wordcount')