Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-24162 Log based inclusivity updates #90

Merged
merged 8 commits into from
Oct 16, 2023
23 changes: 23 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,29 @@ def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = T
connections.select_catalog_and_fields_via_metadata(
conn_id, catalog, schema, additional_md, non_selected_properties)

def unique_pk_count_by_stream(self, recs_by_stream):
"""
Switch from upsert record count verification to unique pk count verification due to
tap-mssql inconsistency with log based inclusivity TDL-24162 (that will not be fixed)
"""
pk_count_by_stream = {}
for strm, recs in recs_by_stream.items():
primary_keys = self.expected_primary_keys_by_stream_id()[strm]

# use tuple generator to handle arbitrary number of pks during set comprehension
stream_pks = {tuple(m.get('data', {}).get(pk) for pk in primary_keys)
for m in recs['messages']
if m['action'] == 'upsert'}

# fail the test if any upserts fail to return 'data' or a pk value
for pk in stream_pks:
for i in range(len(pk)):
self.assertIsNotNone(pk[i])

pk_count_by_stream[strm] = len(stream_pks)

return pk_count_by_stream

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = self.get_properties().get("start_date")
90 changes: 41 additions & 49 deletions tests/test_log_based_interruped_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,8 @@ def test_run(self):
# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, self.expected_sync_streams(), self.expected_primary_keys_by_sync_stream_id())

# BUG : TDL-19583 log_based_interruptible_dbo_int_and_bool_data is replicating the last row twice
#self.assertEqual(record_count_by_stream, self.expected_count())
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, self.expected_count())

table_version = dict()
initial_log_version = dict()
Expand All @@ -229,45 +226,34 @@ def test_run(self):
stream_expected_data = self.expected_metadata()[stream]
table_version[stream] = records_by_stream[stream]['table_version']

# BUG: TDL-19583 - 3 activate_version messages
# verify on the first sync you get
# activate version message before and after all data for the full table
# and before the logical replication part

# self.assertEqual(
# records_by_stream[stream]['messages'][0]['action'],
# 'activate_version')
# self.assertEqual(
# records_by_stream[stream]['messages'][-1]['action'],
# 'activate_version')
# self.assertEqual(
# records_by_stream[stream]['messages'][-2]['action'],
# 'activate_version')

self.assertEqual(
len([m for m in records_by_stream[stream]['messages'][1:] if m["action"] == "activate_version"]),
2,
msg="Expect 2 more activate version messages for end of full table and beginning of log based")
# gather all actions then verify 3 activate versions, 1 at start, 2 in the last 3
actions = [rec['action'] for rec in records_by_stream[stream]['messages']]
self.assertEqual(actions[0], 'activate_version')
self.assertEqual(len([a for a in actions[-3:] if a == "activate_version"]), 2,
msg=("Expected 2 of the last 3 messages to be activate version messages. 1 for "
"end of full table and 1 for beginning of log based. Position can vary "
"due to TDL-24162")
)

# verify state and bookmarks
initial_state = menagerie.get_state(conn_id)
bookmark = initial_state['bookmarks'][stream]

self.assertIsNone(initial_state.get('currently_syncing'), msg="expected state's currently_syncing to be None")
self.assertIsNotNone(
bookmark.get('current_log_version'),
msg="expected bookmark to have current_log_version because we are using log replication")
self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete")
self.assertIsNone(initial_state.get('currently_syncing'),
msg="expected state's currently_syncing to be None")
self.assertIsNotNone(bookmark.get('current_log_version'),
msg="expected bookmark to have current_log_version due to log replication")
self.assertTrue(bookmark['initial_full_table_complete'],
msg="expected full table to be complete")
inital_log_version = bookmark['current_log_version']

self.assertEqual(bookmark['version'], table_version[stream],
msg="expected bookmark for stream to match version")

expected_schemas = self.expected_metadata()[stream]['schema']
self.assertEqual(records_by_stream[stream]['schema'],
expected_schemas,
msg="expected: {} != actual: {}".format(expected_schemas,
records_by_stream[stream]['schema']))
self.assertEqual(records_by_stream[stream]['schema'], expected_schemas,
msg="expected: {} != actual: {}".format(
expected_schemas, records_by_stream[stream]['schema']))
initial_log_version[stream] = bookmark['current_log_version']

initial_log_version_value = set(initial_log_version.values()).pop()
Expand All @@ -281,14 +267,18 @@ def test_run(self):
# --> A table which is interrupted

del interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['version']
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['initial_full_table_complete'] = False
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data'][
'initial_full_table_complete'] = False

max_pk_values = {'max_pk_values': {'pk': 12}}
last_pk_fetched = {'last_pk_fetched': {'pk': 10}}

interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(max_pk_values)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(last_pk_fetched)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data']['initial_full_table_complete'] = False
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(
max_pk_values)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(
last_pk_fetched)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'][
'initial_full_table_complete'] = False

menagerie.set_state(conn_id, interrupted_state)

Expand All @@ -310,20 +300,24 @@ def test_run(self):
query_list.extend(insert(database_name, schema_name, table_name,
int_after_values))


mssql_cursor_context_manager(*query_list)

# add new table's pk to expected_metadata
self.EXPECTED_METADATA['log_based_interruptible_dbo_int_data_after'] = {
self.PRIMARY_KEYS: {'pk'}}

# invoke the sync job AGAIN following various manipulations to the data

# add the newly created stream in the expectations
expected_sync_streams = self.expected_sync_streams()
expected_sync_streams.add('log_based_interruptible_dbo_int_data_after')
expected_primary_keys_by_sync_stream_id = self.expected_primary_keys_by_sync_stream_id()
expected_primary_keys_by_sync_stream_id['log_based_interruptible_dbo_int_data_after'] = {'pk'}
expected_primary_keys_by_sync_stream_id[
'log_based_interruptible_dbo_int_data_after'] = {'pk'}
expected_count = self.expected_count()
expected_count['log_based_interruptible_dbo_int_data_after'] = 6
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 3
expected_count['log_based_interruptible_dbo_int_data'] = 0
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 2
expected_count['log_based_interruptible_dbo_int_data'] = 14
Comment on lines +319 to +320

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what changed to make this expectation to change? What am I missing?


# run in check mode
check_job_name = runner.run_check_mode(self, conn_id)
Expand All @@ -345,15 +339,14 @@ def test_run(self):

records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id)
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)

second_state = menagerie.get_state(conn_id)
bookmark_2 = second_state['bookmarks']

# validate the record count for all the streams after interruption recovery
# BUG: TDL-19583 Duplicate record within a sync with 3 activate_version messages
#self.assertEqual(record_count_by_stream, expected_count)
# validate the record count for all the streams after interruption recovery, use unique
# pks instead of all upserts to de-dupe and avoid inconsistency from TDL-24162
self.assertEqual(pk_count_by_stream, expected_count)

second_log_version = dict()
for stream in expected_sync_streams:
Expand Down Expand Up @@ -409,14 +402,13 @@ def test_run(self):

records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id)
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)

expected_count['log_based_interruptible_dbo_int_data_after'] = 3
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 0
expected_count['log_based_interruptible_dbo_int_data'] = 0

self.assertEqual(record_count_by_stream, expected_count)
self.assertEqual(pk_count_by_stream, expected_count)

final_state = menagerie.get_state(conn_id)
bookmark_3 = final_state['bookmarks']
Expand Down
Loading