From cc484381e33fe75c8743e2737be5b27aa9b807a3 Mon Sep 17 00:00:00 2001 From: Hassan Syyid Date: Mon, 3 May 2021 15:29:58 -0400 Subject: [PATCH] Do full sync of P&L when state.json is not passed --- setup.py | 2 +- tap_quickbooks/__init__.py | 7 +- tap_quickbooks/quickbooks/__init__.py | 6 +- .../ProfitAndLossDetailReport.py | 65 ++++++++++++++++--- tap_quickbooks/sync.py | 8 +-- 5 files changed, 68 insertions(+), 20 deletions(-) diff --git a/setup.py b/setup.py index 1cc4c2a..e4810d9 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ def get_version(): readme = f.read() setup(name='tap-quickbooks', - version='1.4.32', + version='1.4.33', description='Singer.io tap for extracting data from the Quickbooks API', author='hotglue', url='http://hotglue.xyz/', diff --git a/tap_quickbooks/__init__.py b/tap_quickbooks/__init__.py index 136c1a6..507ea9a 100644 --- a/tap_quickbooks/__init__.py +++ b/tap_quickbooks/__init__.py @@ -162,7 +162,7 @@ def do_discover(qb): result = {'streams': entries} json.dump(result, sys.stdout, indent=4) -def do_sync(qb, catalog, state): +def do_sync(qb, catalog, state, state_passed): starting_stream = state.get("current_stream") if starting_stream: @@ -239,7 +239,7 @@ def do_sync(qb, catalog, state): catalog_entry['tap_stream_id'], 'version', stream_version) - counter = sync_stream(qb, catalog_entry, state) + counter = sync_stream(qb, catalog_entry, state, state_passed) LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value) state["current_stream"] = None @@ -271,8 +271,9 @@ def main_impl(): do_discover(qb) elif args.properties: catalog = args.properties + state_passed = bool(args.state) state = build_state(args.state, catalog) - do_sync(qb, catalog, state) + do_sync(qb, catalog, state, state_passed) finally: if qb: if qb.rest_requests_attempted > 0: diff --git a/tap_quickbooks/quickbooks/__init__.py b/tap_quickbooks/quickbooks/__init__.py index ab7232a..0bf6b04 100644 --- a/tap_quickbooks/quickbooks/__init__.py +++ b/tap_quickbooks/quickbooks/__init__.py @@ -411,7 +411,7 @@ def query(self, catalog_entry, state): "api_type should be REST was: {}".format( self.api_type)) - def query_report(self, catalog_entry, state): - start_date = self.get_start_date(state, catalog_entry) - reader = ProfitAndLossDetailReport(self, start_date) + def query_report(self, catalog_entry, state, state_passed): + start_date = singer_utils.strptime_with_tz(self.get_start_date(state, catalog_entry)) + reader = ProfitAndLossDetailReport(self, start_date, state_passed) return reader.sync(catalog_entry) diff --git a/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py b/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py index 6ce9238..efe0aa6 100644 --- a/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py +++ b/tap_quickbooks/quickbooks/reportstreams/ProfitAndLossDetailReport.py @@ -15,9 +15,10 @@ class ProfitAndLossDetailReport(QuickbooksStream): key_properties: ClassVar[List[str]] = [] replication_method: ClassVar[str] = 'FULL_TABLE' - def __init__(self, qb, start_date): + def __init__(self, qb, start_date, state_passed): self.qb = qb self.start_date = start_date + self.state_passed = state_passed def _get_column_metadata(self, resp): columns = [] @@ -52,11 +53,12 @@ def _recursive_row_search(self, row, output, categories): categories.pop() def sync(self, catalog_entry): - LOGGER.info(f"ignoring config start date {self.start_date}") - end_date = datetime.date.today() + full_sync = not self.state_passed - for i in range(NUMBER_OF_PERIODS): - start_date = end_date.replace(day=1) + if full_sync: + LOGGER.info(f"Starting full sync of P&L") + end_date = datetime.date.today() + start_date = self.start_date params = { "start_date": start_date.strftime("%Y-%m-%d"), "end_date": end_date.strftime("%Y-%m-%d"), @@ -74,9 +76,7 @@ def sync(self, catalog_entry): row_array = row_group.get("Row") if row_array is None: - # Update end date - end_date = start_date - datetime.timedelta(days=1) - continue + return output = [] categories = [] @@ -98,5 +98,52 @@ def sync(self, catalog_entry): cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") yield cleansed_row + else: + LOGGER.info(f"Syncing P&L of last {NUMBER_OF_PERIODS} periods") + end_date = datetime.date.today() + + for i in range(NUMBER_OF_PERIODS): + start_date = end_date.replace(day=1) + params = { + "start_date": start_date.strftime("%Y-%m-%d"), + "end_date": end_date.strftime("%Y-%m-%d"), + "accounting_method": "Accrual" + } + + LOGGER.info(f"Fetch Journal Report for period {params['start_date']} to {params['end_date']}") + resp = self._get(report_entity='ProfitAndLossDetail', params=params) + + # Get column metadata. + columns = self._get_column_metadata(resp) + + # Recursively get row data. + row_group = resp.get("Rows") + row_array = row_group.get("Row") + + if row_array is None: + # Update end date + end_date = start_date - datetime.timedelta(days=1) + continue + + output = [] + categories = [] + for row in row_array: + self._recursive_row_search(row, output, categories) + + # Zip columns and row data. + for raw_row in output: + row = dict(zip(columns, raw_row)) + cleansed_row = {} + for k, v in row.items(): + if v == "": + continue + else: + cleansed_row.update({k: v}) + + cleansed_row["Amount"] = float(row.get("Amount")) + cleansed_row["Balance"] = float(row.get("Balance")) + cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ") + + yield cleansed_row - end_date = start_date - datetime.timedelta(days=1) + end_date = start_date - datetime.timedelta(days=1) diff --git a/tap_quickbooks/sync.py b/tap_quickbooks/sync.py index 194545d..f7f874f 100644 --- a/tap_quickbooks/sync.py +++ b/tap_quickbooks/sync.py @@ -54,12 +54,12 @@ def get_stream_version(catalog_entry, state): return int(time.time() * 1000) -def sync_stream(qb, catalog_entry, state): +def sync_stream(qb, catalog_entry, state, state_passed): stream = catalog_entry['stream'] with metrics.record_counter(stream) as counter: try: - sync_records(qb, catalog_entry, state, counter) + sync_records(qb, catalog_entry, state, counter, state_passed) singer.write_state(state) except RequestException as ex: raise Exception("Error syncing {}: {} Response: {}".format( @@ -71,7 +71,7 @@ def sync_stream(qb, catalog_entry, state): return counter -def sync_records(qb, catalog_entry, state, counter): +def sync_records(qb, catalog_entry, state, counter, state_passed): chunked_bookmark = singer_utils.strptime_with_tz(qb.get_start_date(state, catalog_entry)) stream = catalog_entry['stream'] schema = catalog_entry['schema'] @@ -92,7 +92,7 @@ def sync_records(qb, catalog_entry, state, counter): if stream.endswith("Report"): query_func = qb.query_report - for rec in query_func(catalog_entry, state): + for rec in query_func(catalog_entry, state, state_passed): counter.increment() with Transformer(pre_hook=transform_data_hook) as transformer: