diff --git a/target_bigquery/processhandler.py b/target_bigquery/processhandler.py index 5ae555a..1d204ad 100644 --- a/target_bigquery/processhandler.py +++ b/target_bigquery/processhandler.py @@ -148,6 +148,7 @@ def __init__(self, logger, **kwargs): project=self.project_id, location=kwargs.get("location", "US") ) + self.truncate_counts = {} def handle_schema_message(self, msg): for s in super(LoadJobProcessHandler, self).handle_schema_message(msg): @@ -304,6 +305,10 @@ def _do_temp_table_based_load(self, rows): if instance_truncate: self.logger.info(f"Truncating dataset: {stream}") instance_increment = self.incremental if not instance_truncate else False + # For larger jobs we don't want to keep truncating the same table when copy temporary table to production + # using this change we will switch truncate logic off and make subsequent copies to incremental + if stream in self.truncate_counts and self.truncate_counts.get(stream, 0) > 0: + instance_truncate = False if instance_increment: self.logger.info(f"Copy {tmp_table_name} to {self.tables[stream]} by INCREMENTAL") self.logger.warning(f"INCREMENTAL replication method (MERGE SQL statement) is not recommended. It might result in loss of production data, because historical records get updated during the sync operation. Instead, we recommend using the APPEND replication method, which will preserve historical data.") @@ -350,6 +355,10 @@ def _do_temp_table_based_load(self, rows): destination=self.dataset.table(self.tables[stream]), job_config=copy_config ).result() + if stream not in self.truncate_counts: + self.truncate_counts[stream] = 1 + else: + self.truncate_counts[stream] += 1 self.partially_loaded_streams.add(stream) self.rows[stream].close() # erase the file