Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tdl 24590 fix contacts to company #250

Merged
merged 15 commits into from
Jan 23, 2024
70 changes: 50 additions & 20 deletions tap_hubspot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import re
import sys
import json
# pylint: disable=import-error
# pylint: disable=import-error,too-many-statements
import attr
import backoff
import requests
Expand Down Expand Up @@ -77,6 +77,7 @@ class StateFields:
"companies_recent": "/companies/v2/companies/recent/modified",
"companies_detail": "/companies/v2/companies/{company_id}",
"contacts_by_company": "/companies/v2/companies/{company_id}/vids",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still used?

"contacts_by_company_v3": "/crm/v3/associations/company/contact/batch/read",

"deals_properties": "/properties/v1/deals/properties",
"deals_all": "/deals/v1/deal/paged",
Expand Down Expand Up @@ -568,26 +569,24 @@ def use_recent_companies_endpoint(response):
default_contacts_by_company_params = {'count' : 100}

# NB> to do: support stream aliasing and field selection
def _sync_contacts_by_company(STATE, ctx, company_id):
def _sync_contacts_by_company_batch_read(STATE, ctx, company_ids):
schema = load_schema(CONTACTS_BY_COMPANY)
catalog = ctx.get_catalog_from_id(singer.get_currently_syncing(STATE))
mdata = metadata.to_map(catalog.get('metadata'))
url = get_url("contacts_by_company", company_id=company_id)
path = 'vids'
url = get_url("contacts_by_company_v3")
with Transformer(UNIX_MILLISECONDS_INTEGER_DATETIME_PARSING) as bumble_bee:
with metrics.record_counter(CONTACTS_BY_COMPANY) as counter:
data = request(url, default_contacts_by_company_params).json()

if data.get(path) is None:
raise RuntimeError("Unexpected API response: {} not in {}".format(path, data.keys()))

for row in data[path]:
counter.increment()
record = {'company-id' : company_id,
'contact-id' : row}
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())

body = {'inputs': [{'id': company_id} for company_id in company_ids]}
contacts_to_company_rows = post_search_endpoint(url, body).json()
for row in contacts_to_company_rows['results']:
for contact in row['to']:
counter.increment()
record = {'company-id' : row['from']['id'],
'contact-id' : contact['id']}
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("contacts_by_company", record, time_extracted=utils.now())
STATE = singer.set_offset(STATE, "contacts_by_company", 'offset', company_ids[-1])
singer.write_state(STATE)
return STATE

default_company_params = {
Expand Down Expand Up @@ -620,8 +619,26 @@ def sync_companies(STATE, ctx):
max_bk_value = start
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
contacts_by_company_schema = load_schema(CONTACTS_BY_COMPANY)
singer.write_schema("contacts_by_company", contacts_by_company_schema, ["company-id", "contact-id"])
singer.write_schema('contacts_by_company', contacts_by_company_schema, ["company-id", "contact-id"])

# This code handles the interrutped sync. When sync is interrupted,
# last batch of `contacts_by_company` extraction may get interrupted.
# So before ressuming, we should check between `companies` and `contacts_by_company`
# whose offset is lagging behind and set that as an offset value for `companies`.
# Note, few of the records may get duplicated.
if singer.get_offset(STATE, 'contacts_by_company', {}).get('offset'):
companies_offset = singer.get_offset(STATE, 'companies', {}).get('offset')
contacts_by_company_offset = singer.get_offset(STATE, 'contacts_by_company').get('offset')
if companies_offset:
offset = min(companies_offset, contacts_by_company_offset)
else:
offset = contacts_by_company_offset

STATE = singer.set_offset(STATE, 'companies', 'offset', offset)
singer.write_state(STATE)

# This list collects the recently modified company ids to extract `contacts_by_company` records in batch
company_ids = []
with bumble_bee:
for row in gen_request(STATE, 'companies', url, default_company_params, 'companies', 'has-more', ['offset'], ['offset']):
row_properties = row['properties']
Expand All @@ -642,8 +659,21 @@ def sync_companies(STATE, ctx):
record = request(get_url("companies_detail", company_id=row['companyId'])).json()
record = bumble_bee.transform(lift_properties_and_versions(record), schema, mdata)
singer.write_record("companies", record, catalog.get('stream_alias'), time_extracted=utils.now())
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company(STATE, ctx, record['companyId'])

if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
# Collect the recently modified company id
if not modified_time or modified_time >= start:
company_ids.append(row['companyId'])

# Once batch size reaches set limit, extract the `contacts_by_company` for company ids collected
if len(company_ids) >= default_company_params['limit']:
STATE = _sync_contacts_by_company_batch_read(STATE, ctx, company_ids)
company_ids = [] # reset the list

# Extract the records for last remaining company ids
if CONTACTS_BY_COMPANY in ctx.selected_stream_ids:
STATE = _sync_contacts_by_company_batch_read(STATE, ctx, company_ids)
STATE = singer.clear_offset(STATE, "contacts_by_company")

# Don't bookmark past the start of this sync to account for updated records during the sync.
new_bookmark = min(max_bk_value, current_sync_start)
Expand Down Expand Up @@ -1417,7 +1447,7 @@ def discover_schemas():

# Load the contacts_by_company schema
LOGGER.info('Loading schema for contacts_by_company')
contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company, ['company-id', 'contact-id'], None, 'FULL_TABLE')
contacts_by_company = Stream('contacts_by_company', _sync_contacts_by_company_batch_read, ['company-id', 'contact-id'], None, 'FULL_TABLE')
schema, mdata = load_discovered_schema(contacts_by_company)

result['streams'].append({'stream': CONTACTS_BY_COMPANY,
Expand Down
79 changes: 64 additions & 15 deletions tests/test_hubspot_interrupted_sync_offset.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

class TestHubspotInterruptedSyncOffsetContactLists(HubspotBaseTest):
"""Testing interrupted syncs for streams that implement unique bookmarking logic."""
synced_records = None

@staticmethod
def name():
return "tt_hubspot_interrupt_contact_lists"
Expand All @@ -20,7 +22,6 @@ def streams_to_test(self):
"""expected streams minus the streams not under test"""
untested = {
# Streams tested elsewhere
'companies', # covered in TestHubspotInterruptedSync1
'engagements', # covered in TestHubspotInterruptedSync1
# Feature Request | TDL-16095: [tap-hubspot] All incremental
# streams should implement the interruptible sync feature
Expand All @@ -31,7 +32,6 @@ def streams_to_test(self):
'deal_pipelines', # interruptible does not apply, child of deals
'campaigns', # unable to manually find a partial state with our test data
'email_events', # unable to manually find a partial state with our test data
'contacts_by_company', # interruptible does not apply, child of 'companies'
'subscription_changes', # BUG_TDL-14938
'tickets' # covered in TestHubspotInterruptedSync1
}
Expand All @@ -41,8 +41,9 @@ def streams_to_test(self):
def stream_to_interrupt(self):
return 'contact_lists'

def state_to_inject(self):
return {'offset': {'offset': 250}}
def state_to_inject(self, new_state):
new_state['bookmarks']['contact_lists'] = {'offset': {'offset': 250}}
return new_state

def get_properties(self):
return {
Expand Down Expand Up @@ -79,14 +80,15 @@ def test_run(self):

# Run sync 1
first_record_count_by_stream = self.run_and_verify_sync(conn_id)
synced_records = runner.get_records_from_target_output()
self.synced_records = runner.get_records_from_target_output()
state_1 = menagerie.get_state(conn_id)

# Update state to simulate a bookmark
stream = self.stream_to_interrupt()
new_state = copy.deepcopy(state_1)
new_state['bookmarks'][stream] = self.state_to_inject()
new_state = self.state_to_inject(new_state)
new_state['currently_syncing'] = stream

menagerie.set_state(conn_id, new_state)

# run second sync
Expand All @@ -98,10 +100,21 @@ def test_run(self):
# since newly created test records may get updated while stream is syncing
replication_keys = self.expected_replication_keys()
for stream in state_1.get('bookmarks'):
replication_key = list(replication_keys[stream])[0]
self.assertLessEqual(state_1["bookmarks"][stream].get(replication_key),
state_2["bookmarks"][stream].get(replication_key),
msg="First sync bookmark should not be greater than the second bookmark.")

if self.stream_to_interrupt() == 'companies' and stream == 'companies':
replication_key = list(replication_keys[stream])[0]
self.assertLessEqual(new_state.get('bookmarks')[stream].get('current_sync_start'),
state_2["bookmarks"][stream].get(replication_key),
msg="First sync bookmark should not be greater than the second bookmark.")
elif stream == 'contacts_by_company':
self.assertEquals(state_1["bookmarks"][stream], {"offset": {}})
self.assertEquals(state_2["bookmarks"][stream], {"offset": {}})

else:
replication_key = list(replication_keys[stream])[0]
self.assertLessEqual(state_1["bookmarks"][stream].get(replication_key),
state_2["bookmarks"][stream].get(replication_key),
msg="First sync bookmark should not be greater than the second bookmark.")


class TestHubspotInterruptedSyncOffsetContacts(TestHubspotInterruptedSyncOffsetContactLists):
Expand All @@ -119,8 +132,9 @@ def get_properties(self):
def stream_to_interrupt(self):
return 'contacts'

def state_to_inject(self):
return {'offset': {'vidOffset': 3502}}
def state_to_inject(self, new_state):
new_state['bookmarks']['contacts'] = {'offset': {'vidOffset': 3502}}
return new_state

class TestHubspotInterruptedSyncOffsetDeals(TestHubspotInterruptedSyncOffsetContactLists):
"""Testing interrupted syncs for streams that implement unique bookmarking logic."""
Expand All @@ -136,6 +150,41 @@ def get_properties(self):
def stream_to_interrupt(self):
return 'deals'

def state_to_inject(self):
return {'property_hs_lastmodifieddate': '2021-10-13T08:32:08.383000Z',
'offset': {'offset': 3442973342}}
def state_to_inject(self, new_state):
new_state['bookmarks']['deals'] = {'property_hs_lastmodifieddate': '2021-10-13T08:32:08.383000Z',
'offset': {'offset': 3442973342}}
return new_state


class TestHubspotInterruptedSyncOffsetCompanies(TestHubspotInterruptedSyncOffsetContactLists):
"""Testing interrupted syncs for streams that implement unique bookmarking logic."""
@staticmethod
def name():
return "tt_hubspot_interrupt_companies"

def get_properties(self):
return {
'start_date' : '2023-12-31T00:00:00Z'
}

def stream_to_interrupt(self):
return 'companies'

def state_to_inject(self, new_state):
companies_records = self.synced_records['companies']['messages']
contacts_by_company_records = self.synced_records['contacts_by_company']['messages']

company_record_index = int(len(companies_records)/2)
contact_record_index = int(3*len(contacts_by_company_records)/4)

last_modified_value = companies_records[-1]['data'][list(self.expected_replication_keys()['companies'])[0]]['value']
current_sync_start = companies_records[company_record_index]['data'][list(self.expected_replication_keys()['companies'])[0]]['value']
offset_1 = companies_records[company_record_index]['data']['companyId']
offset_2 = contacts_by_company_records[contact_record_index]['data']['company-id']

new_state['bookmarks']['companies'] = {'property_hs_lastmodifieddate': last_modified_value,
'current_sync_start': current_sync_start,
'offset': {'offset': offset_1}}
new_state['bookmarks']['contacts_by_company'] = {'offset': {'offset': offset_2}}

return new_state