From 5fef7e261fd97834f82c29d9f5caf493bc773b83 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Tue, 1 Mar 2022 09:44:48 -0500 Subject: [PATCH] Fix issue with missing timestamp cols --- .gitignore | 3 +++ setup.py | 2 +- target_bigquery/processhandler.py | 38 +++++++++++++++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index d728b9c..fa3f6b1 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +build/ +.secrets/ +.DS_Store .idea/* .pytest_cache/* .spyproject/* diff --git a/setup.py b/setup.py index fbc2223..0ae6cf0 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ setup( name="target-bigquery", - version="0.11.0", + version="0.11.1", description="Singer.io target for writing data to Google BigQuery", author="Adswerve", url="https://github.com/adswerve/target-bigquery", diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 16f2e96..4278933 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -295,6 +295,44 @@ def _do_temp_table_based_load(self, rows): except NotFound: self.logger.info(f"Table {table_id} is not found, proceeding to upload with TRUNCATE") self.truncate = True + + except Exception as e: + if "Unrecognized name: _time_" not in str(e): + raise + else: + # Add the missing fields + query ="""ALTER TABLE `{table}` + ADD COLUMN IF NOT EXISTS _time_extracted TIMESTAMP, + ADD COLUMN IF NOT EXISTS _time_loaded TIMESTAMP; + """.format(table=table_id) + job_config = QueryJobConfig() + query_job = self.client.query(query, job_config=job_config) + query_job.result() + + # Run merge again + self.client.get_table(table_id) + column_names = [x.name for x in self.bq_schemas[stream]] + + query ="""MERGE `{table}` t + USING `{temp_table}` s + ON {primary_key_condition} + WHEN MATCHED THEN + UPDATE SET {set_values} + WHEN NOT MATCHED THEN + INSERT ({new_cols}) VALUES ({cols}) + """.format(table=table_id, + temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}", + primary_key_condition=self.primary_key_condition(stream), + set_values=', '.join(f'{c}=s.{c}' for c in column_names), + new_cols=', '.join(column_names), + cols=', '.join(f's.{c}' for c in column_names)) + + job_config = QueryJobConfig() + query_job = self.client.query(query, job_config=job_config) + query_job.result() + self.logger.info(f'LOADED {query_job.num_dml_affected_rows} rows') + incremental_success = True + if not incremental_success: truncate = self.truncate if stream not in self.partially_loaded_streams else False copy_config = CopyJobConfig()