Skip to content

Commit

Permalink
Truncate fix for large jobs. (#16)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Key <keynarafaelp@gmail.com>
  • Loading branch information
xacadil and keyn4 authored Sep 24, 2024
1 parent 0bbbce2 commit 77dffef
Showing 1 changed file with 9 additions and 0 deletions.
9 changes: 9 additions & 0 deletions target_bigquery/processhandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def __init__(self, logger, **kwargs):
project=self.project_id,
location=kwargs.get("location", "US")
)
self.truncate_counts = {}

def handle_schema_message(self, msg):
for s in super(LoadJobProcessHandler, self).handle_schema_message(msg):
Expand Down Expand Up @@ -304,6 +305,10 @@ def _do_temp_table_based_load(self, rows):
if instance_truncate:
self.logger.info(f"Truncating dataset: {stream}")
instance_increment = self.incremental if not instance_truncate else False
# For larger jobs we don't want to keep truncating the same table when copy temporary table to production
# using this change we will switch truncate logic off and make subsequent copies to incremental
if stream in self.truncate_counts and self.truncate_counts.get(stream, 0) > 0:
instance_truncate = False
if instance_increment:
self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL")
self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.")
Expand Down Expand Up @@ -350,6 +355,10 @@ def _do_temp_table_based_load(self, rows):
destination=self.dataset.table(self.tables[stream]),
job_config=copy_config
).result()
if stream not in self.truncate_counts:
self.truncate_counts[stream] = 1
else:
self.truncate_counts[stream] += 1

self.partially_loaded_streams.add(stream)
self.rows[stream].close() # erase the file
Expand Down

0 comments on commit 77dffef

Please sign in to comment.