Skip to content

Commit

Permalink
Merge pull request #2 from hotgluexyz/feature/incremental_sync_and_sc…
Browse files Browse the repository at this point in the history
…hema_logging

Fix issue with missing timestamp cols
  • Loading branch information
hsyyid authored Mar 1, 2022
2 parents 9a67a81 + 5fef7e2 commit f0260e4
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
build/
.secrets/
.DS_Store
.idea/*
.pytest_cache/*
.spyproject/*
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

setup(
name="target-bigquery",
version="0.11.0",
version="0.11.1",
description="Singer.io target for writing data to Google BigQuery",
author="Adswerve",
url="https://github.com/adswerve/target-bigquery",
Expand Down
38 changes: 38 additions & 0 deletions target_bigquery/processhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,44 @@ def _do_temp_table_based_load(self, rows):
except NotFound:
self.logger.info(f"Table {table_id} is not found, proceeding to upload with TRUNCATE")
self.truncate = True

except Exception as e:
if "Unrecognized name: _time_" not in str(e):
raise
else:
# Add the missing fields
query ="""ALTER TABLE `{table}`
ADD COLUMN IF NOT EXISTS _time_extracted TIMESTAMP,
ADD COLUMN IF NOT EXISTS _time_loaded TIMESTAMP;
""".format(table=table_id)
job_config = QueryJobConfig()
query_job = self.client.query(query, job_config=job_config)
query_job.result()

# Run merge again
self.client.get_table(table_id)
column_names = [x.name for x in self.bq_schemas[stream]]

query ="""MERGE `{table}` t
USING `{temp_table}` s
ON {primary_key_condition}
WHEN MATCHED THEN
UPDATE SET {set_values}
WHEN NOT MATCHED THEN
INSERT ({new_cols}) VALUES ({cols})
""".format(table=table_id,
temp_table=f"{self.project_id}.{self.dataset.dataset_id}.{tmp_table_name}",
primary_key_condition=self.primary_key_condition(stream),
set_values=', '.join(f'{c}=s.{c}' for c in column_names),
new_cols=', '.join(column_names),
cols=', '.join(f's.{c}' for c in column_names))

job_config = QueryJobConfig()
query_job = self.client.query(query, job_config=job_config)
query_job.result()
self.logger.info(f'LOADED {query_job.num_dml_affected_rows} rows')
incremental_success = True

if not incremental_success:
truncate = self.truncate if stream not in self.partially_loaded_streams else False
copy_config = CopyJobConfig()
Expand Down

0 comments on commit f0260e4

Please sign in to comment.