Skip to content

Commit

Permalink
Do full sync of P&L when state.json is not passed
Browse files Browse the repository at this point in the history
  • Loading branch information
hsyyid committed May 3, 2021
1 parent 7fb9f67 commit cc48438
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 20 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def get_version():
readme = f.read()

setup(name='tap-quickbooks',
version='1.4.32',
version='1.4.33',
description='Singer.io tap for extracting data from the Quickbooks API',
author='hotglue',
url='http://hotglue.xyz/',
Expand Down
7 changes: 4 additions & 3 deletions tap_quickbooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ def do_discover(qb):
result = {'streams': entries}
json.dump(result, sys.stdout, indent=4)

def do_sync(qb, catalog, state):
def do_sync(qb, catalog, state, state_passed):
starting_stream = state.get("current_stream")

if starting_stream:
Expand Down Expand Up @@ -239,7 +239,7 @@ def do_sync(qb, catalog, state):
catalog_entry['tap_stream_id'],
'version',
stream_version)
counter = sync_stream(qb, catalog_entry, state)
counter = sync_stream(qb, catalog_entry, state, state_passed)
LOGGER.info("%s: Completed sync (%s rows)", stream_name, counter.value)

state["current_stream"] = None
Expand Down Expand Up @@ -271,8 +271,9 @@ def main_impl():
do_discover(qb)
elif args.properties:
catalog = args.properties
state_passed = bool(args.state)
state = build_state(args.state, catalog)
do_sync(qb, catalog, state)
do_sync(qb, catalog, state, state_passed)
finally:
if qb:
if qb.rest_requests_attempted > 0:
Expand Down
6 changes: 3 additions & 3 deletions tap_quickbooks/quickbooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ def query(self, catalog_entry, state):
"api_type should be REST was: {}".format(
self.api_type))

def query_report(self, catalog_entry, state):
start_date = self.get_start_date(state, catalog_entry)
reader = ProfitAndLossDetailReport(self, start_date)
def query_report(self, catalog_entry, state, state_passed):
start_date = singer_utils.strptime_with_tz(self.get_start_date(state, catalog_entry))
reader = ProfitAndLossDetailReport(self, start_date, state_passed)
return reader.sync(catalog_entry)
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ class ProfitAndLossDetailReport(QuickbooksStream):
key_properties: ClassVar[List[str]] = []
replication_method: ClassVar[str] = 'FULL_TABLE'

def __init__(self, qb, start_date):
def __init__(self, qb, start_date, state_passed):
self.qb = qb
self.start_date = start_date
self.state_passed = state_passed

def _get_column_metadata(self, resp):
columns = []
Expand Down Expand Up @@ -52,11 +53,12 @@ def _recursive_row_search(self, row, output, categories):
categories.pop()

def sync(self, catalog_entry):
LOGGER.info(f"ignoring config start date {self.start_date}")
end_date = datetime.date.today()
full_sync = not self.state_passed

for i in range(NUMBER_OF_PERIODS):
start_date = end_date.replace(day=1)
if full_sync:
LOGGER.info(f"Starting full sync of P&L")
end_date = datetime.date.today()
start_date = self.start_date
params = {
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
Expand All @@ -74,9 +76,7 @@ def sync(self, catalog_entry):
row_array = row_group.get("Row")

if row_array is None:
# Update end date
end_date = start_date - datetime.timedelta(days=1)
continue
return

output = []
categories = []
Expand All @@ -98,5 +98,52 @@ def sync(self, catalog_entry):
cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ")

yield cleansed_row
else:
LOGGER.info(f"Syncing P&L of last {NUMBER_OF_PERIODS} periods")
end_date = datetime.date.today()

for i in range(NUMBER_OF_PERIODS):
start_date = end_date.replace(day=1)
params = {
"start_date": start_date.strftime("%Y-%m-%d"),
"end_date": end_date.strftime("%Y-%m-%d"),
"accounting_method": "Accrual"
}

LOGGER.info(f"Fetch Journal Report for period {params['start_date']} to {params['end_date']}")
resp = self._get(report_entity='ProfitAndLossDetail', params=params)

# Get column metadata.
columns = self._get_column_metadata(resp)

# Recursively get row data.
row_group = resp.get("Rows")
row_array = row_group.get("Row")

if row_array is None:
# Update end date
end_date = start_date - datetime.timedelta(days=1)
continue

output = []
categories = []
for row in row_array:
self._recursive_row_search(row, output, categories)

# Zip columns and row data.
for raw_row in output:
row = dict(zip(columns, raw_row))
cleansed_row = {}
for k, v in row.items():
if v == "":
continue
else:
cleansed_row.update({k: v})

cleansed_row["Amount"] = float(row.get("Amount"))
cleansed_row["Balance"] = float(row.get("Balance"))
cleansed_row["SyncTimestampUtc"] = singer.utils.strftime(singer.utils.now(), "%Y-%m-%dT%H:%M:%SZ")

yield cleansed_row

end_date = start_date - datetime.timedelta(days=1)
end_date = start_date - datetime.timedelta(days=1)
8 changes: 4 additions & 4 deletions tap_quickbooks/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ def get_stream_version(catalog_entry, state):
return int(time.time() * 1000)


def sync_stream(qb, catalog_entry, state):
def sync_stream(qb, catalog_entry, state, state_passed):
stream = catalog_entry['stream']

with metrics.record_counter(stream) as counter:
try:
sync_records(qb, catalog_entry, state, counter)
sync_records(qb, catalog_entry, state, counter, state_passed)
singer.write_state(state)
except RequestException as ex:
raise Exception("Error syncing {}: {} Response: {}".format(
Expand All @@ -71,7 +71,7 @@ def sync_stream(qb, catalog_entry, state):
return counter


def sync_records(qb, catalog_entry, state, counter):
def sync_records(qb, catalog_entry, state, counter, state_passed):
chunked_bookmark = singer_utils.strptime_with_tz(qb.get_start_date(state, catalog_entry))
stream = catalog_entry['stream']
schema = catalog_entry['schema']
Expand All @@ -92,7 +92,7 @@ def sync_records(qb, catalog_entry, state, counter):
if stream.endswith("Report"):
query_func = qb.query_report

for rec in query_func(catalog_entry, state):
for rec in query_func(catalog_entry, state, state_passed):

counter.increment()
with Transformer(pre_hook=transform_data_hook) as transformer:
Expand Down

0 comments on commit cc48438

Please sign in to comment.