Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TDL-24162 Log based inclusivity updates #90

Merged
merged 8 commits into from
Oct 16, 2023
12 changes: 12 additions & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,18 @@ def select_all_streams_and_fields(conn_id, catalogs, select_all_fields: bool = T
connections.select_catalog_and_fields_via_metadata(
conn_id, catalog, schema, additional_md, non_selected_properties)

def unique_pk_count_by_stream(self, recs_by_stream):
"""
Switch from upsert record count verification to unique pk count verification due to
tap-mssql inconsistency with log based inclusivity TDL-24162 (that will not be fixed)
"""
pk_count_by_stream = {}
for strm in recs_by_stream:
stream_pks = [m.get('data').get('pk') for m in recs_by_stream[strm]['messages']
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
if m['action'] == 'upsert']
pk_count_by_stream[strm] = len(set(stream_pks))
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
return pk_count_by_stream

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.start_date = self.get_properties().get("start_date")
82 changes: 36 additions & 46 deletions tests/test_log_based_interruped_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,8 @@ def test_run(self):
# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, self.expected_sync_streams(), self.expected_primary_keys_by_sync_stream_id())

# BUG : TDL-19583 log_based_interruptible_dbo_int_and_bool_data is replicating the last row twice
#self.assertEqual(record_count_by_stream, self.expected_count())
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, self.expected_count())

table_version = dict()
initial_log_version = dict()
Expand All @@ -229,45 +226,36 @@ def test_run(self):
stream_expected_data = self.expected_metadata()[stream]
table_version[stream] = records_by_stream[stream]['table_version']

# BUG: TDL-19583 - 3 activate_version messages
# verify on the first sync you get
# activate version message before and after all data for the full table
# and before the logical replication part

# self.assertEqual(
# records_by_stream[stream]['messages'][0]['action'],
# 'activate_version')
# self.assertEqual(
# records_by_stream[stream]['messages'][-1]['action'],
# 'activate_version')
# self.assertEqual(
# records_by_stream[stream]['messages'][-2]['action'],
# 'activate_version')

# gather all actions then verify 3 activate versions, 1 at start, 2 in the last 3
actions = [rec['action'] for rec in records_by_stream[stream]['messages']]
self.assertEqual(actions[0], 'activate_version')
self.assertEqual(
len([m for m in records_by_stream[stream]['messages'][1:] if m["action"] == "activate_version"]),
2,
msg="Expect 2 more activate version messages for end of full table and beginning of log based")
len([a for a in actions[-3:] if a == "activate_version"]), 2,
msg=("Expected 2 of the last 3 messages to be activate version messages. 1 for "
"end of full table and 1 for beginning of log based. Position can vary "
"due to TDL-24162")
)

# verify state and bookmarks
initial_state = menagerie.get_state(conn_id)
bookmark = initial_state['bookmarks'][stream]

self.assertIsNone(initial_state.get('currently_syncing'), msg="expected state's currently_syncing to be None")
self.assertIsNone(initial_state.get('currently_syncing'),
msg="expected state's currently_syncing to be None")
self.assertIsNotNone(
bookmark.get('current_log_version'),
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
msg="expected bookmark to have current_log_version because we are using log replication")
self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete")
msg="expected bookmark to have current_log_version due to log replication")
self.assertTrue(bookmark['initial_full_table_complete'],
msg="expected full table to be complete")
inital_log_version = bookmark['current_log_version']

self.assertEqual(bookmark['version'], table_version[stream],
msg="expected bookmark for stream to match version")

expected_schemas = self.expected_metadata()[stream]['schema']
self.assertEqual(records_by_stream[stream]['schema'],
expected_schemas,
msg="expected: {} != actual: {}".format(expected_schemas,
records_by_stream[stream]['schema']))
self.assertEqual(records_by_stream[stream]['schema'], expected_schemas,
msg="expected: {} != actual: {}".format(
expected_schemas, records_by_stream[stream]['schema']))
initial_log_version[stream] = bookmark['current_log_version']

initial_log_version_value = set(initial_log_version.values()).pop()
Expand All @@ -281,14 +269,18 @@ def test_run(self):
# --> A table which is interrupted

del interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['version']
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['initial_full_table_complete'] = False
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data'][
'initial_full_table_complete'] = False

max_pk_values = {'max_pk_values': {'pk': 12}}
last_pk_fetched = {'last_pk_fetched': {'pk': 10}}

interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(max_pk_values)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(last_pk_fetched)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data']['initial_full_table_complete'] = False
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(
max_pk_values)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(
last_pk_fetched)
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'][
'initial_full_table_complete'] = False

menagerie.set_state(conn_id, interrupted_state)

Expand All @@ -310,7 +302,6 @@ def test_run(self):
query_list.extend(insert(database_name, schema_name, table_name,
int_after_values))


mssql_cursor_context_manager(*query_list)

# invoke the sync job AGAIN following various manipulations to the data
Expand All @@ -319,11 +310,12 @@ def test_run(self):
expected_sync_streams = self.expected_sync_streams()
expected_sync_streams.add('log_based_interruptible_dbo_int_data_after')
expected_primary_keys_by_sync_stream_id = self.expected_primary_keys_by_sync_stream_id()
expected_primary_keys_by_sync_stream_id['log_based_interruptible_dbo_int_data_after'] = {'pk'}
expected_primary_keys_by_sync_stream_id[
'log_based_interruptible_dbo_int_data_after'] = {'pk'}
expected_count = self.expected_count()
expected_count['log_based_interruptible_dbo_int_data_after'] = 6
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 3
expected_count['log_based_interruptible_dbo_int_data'] = 0
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 2
expected_count['log_based_interruptible_dbo_int_data'] = 14
Comment on lines +319 to +320

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what changed to make this expectation to change? What am I missing?


# run in check mode
check_job_name = runner.run_check_mode(self, conn_id)
Expand All @@ -345,15 +337,14 @@ def test_run(self):

records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id)
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)

second_state = menagerie.get_state(conn_id)
bookmark_2 = second_state['bookmarks']

# validate the record count for all the streams after interruption recovery
# BUG: TDL-19583 Duplicate record within a sync with 3 activate_version messages
#self.assertEqual(record_count_by_stream, expected_count)
# validate the record count for all the streams after interruption recovery, use unique
# pks instead of all upserts to de-dupe and avoid inconsistency from TDL-24162
self.assertEqual(pk_count_by_stream, expected_count)

second_log_version = dict()
for stream in expected_sync_streams:
Expand Down Expand Up @@ -409,14 +400,13 @@ def test_run(self):

records_by_stream = runner.get_records_from_target_output()

record_count_by_stream = runner.examine_target_output_file(
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id)
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)

expected_count['log_based_interruptible_dbo_int_data_after'] = 3
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 0
expected_count['log_based_interruptible_dbo_int_data'] = 0

self.assertEqual(record_count_by_stream, expected_count)
self.assertEqual(pk_count_by_stream, expected_count)

final_state = menagerie.get_state(conn_id)
bookmark_3 = final_state['bookmarks']
Expand Down
9 changes: 5 additions & 4 deletions tests/test_sync_logical_datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,14 @@ def test_run(self):
# run a sync and verify exit codes
record_count_by_stream = self.run_sync(conn_id, clear_state=True)

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
# self.assertEqual(record_count_by_stream, expected_count)

# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, expected_count)

table_version = dict()
for stream in self.expected_streams():
with self.subTest(stream=stream):
Expand Down
9 changes: 5 additions & 4 deletions tests/test_sync_logical_float.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,14 @@ def test_run(self):
# run a sync and verify exit codes
record_count_by_stream = self.run_sync(conn_id, clear_state=True)

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
# self.assertEqual(record_count_by_stream, expected_count)

# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, expected_count)

table_version = dict()
for stream in self.expected_streams():
with self.subTest(stream=stream):
Expand Down
9 changes: 5 additions & 4 deletions tests/test_sync_logical_multiple_dbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,14 @@ def test_run(self):
# run a sync and verify exit codes
record_count_by_stream = self.run_sync(conn_id, clear_state=True)

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
# self.assertEqual(record_count_by_stream, expected_count)

# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, expected_count)

table_version = dict()
for stream in self.expected_streams():
with self.subTest(stream=stream):
Expand Down
9 changes: 5 additions & 4 deletions tests/test_sync_logical_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,14 @@ def test_run(self):
# run a sync and verify exit codes
record_count_by_stream = self.run_sync(conn_id, clear_state=True)

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
# self.assertEqual(record_count_by_stream, expected_count)

# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, expected_count)

table_version = dict()
for stream in self.expected_streams():
with self.subTest(stream=stream):
Expand Down
9 changes: 5 additions & 4 deletions tests/test_sync_logical_others.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,13 +275,14 @@ def test_run(self):
# run a sync and verify exit codes
record_count_by_stream = self.run_sync(conn_id, clear_state=True)

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
# self.assertEqual(record_count_by_stream, expected_count)

# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, expected_count)

table_version = dict()
for stream in self.expected_streams():
with self.subTest(stream=stream):
Expand Down
56 changes: 52 additions & 4 deletions tests/test_sync_logical_pks.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,53 @@ def setUpClass(cls) -> None:

cls.expected_metadata = cls.discovery_expected_metadata

def expected_primary_keys_by_sync_stream_id(self):
return {'constraints_database_dbo_check_constraint': {'pk'},
'constraints_database_dbo_default_column': {'pk'},
'constraints_database_dbo_even_identity': {'pk'},
'constraints_database_dbo_multiple_column_pk': {'first_name', 'last_name'},
'constraints_database_dbo_pk_with_fk': {'pk'},
'constraints_database_dbo_pk_with_unique_not_null': {'pk'},
'constraints_database_dbo_single_column_pk': {'pk'}}

def unique_pk_count_by_stream(self, recs_by_stream):
"""
Switch from upsert record count verification to unique pk count verification due to
tap-mssql inconsistency with log based inclusivity TDL-24162 (that will not be fixed)
"""
pk_count_by_stream = {}
for strm in recs_by_stream:
primary_key = self.expected_primary_keys_by_stream_id()[strm]

if strm == 'constraints_database_dbo_multiple_column_pk':
pk1, pk2 = [pk for pk in primary_key]
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
stream_pks = [{m.get('data').get(pk1), m.get('data').get(pk2)}
for m in recs_by_stream[strm]['messages']
if m['action'] == 'upsert']

# find and ignore any duplicate pk sets in pk list. cannot simply convert the list
# to a set as a type error will occur (list of sets is unhashable)
dupe_count = 0
for key in stream_pks:
key_count = 0
for matching_key in stream_pks:
if key == matching_key:
key_count += 1
if key_count > 1:
dupe_count += (key_count - 1)
bhtowles marked this conversation as resolved.
Show resolved Hide resolved

pk_count_by_stream[strm] = len(stream_pks) - dupe_count

else:
pk = primary_key[0]
stream_pks = [m.get('data').get(pk) for m in recs_by_stream[strm]['messages']
if m['action'] == 'upsert']

pk_count_by_stream[strm] = len(set(stream_pks))

return pk_count_by_stream


def test_run(self):
"""
Verify that a full sync can send capture all data and send it in the correct format
Expand Down Expand Up @@ -524,13 +571,14 @@ def test_run(self):
# run a sync and verify exit codes
record_count_by_stream = self.run_sync(conn_id, clear_state=True)

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
# self.assertEqual(record_count_by_stream, expected_count)

# verify records match on the first sync
records_by_stream = runner.get_records_from_target_output()

# verify record counts of streams
expected_count = {k: len(v['values']) for k, v in self.expected_metadata().items()}
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream)
self.assertEqual(pk_count_by_stream, expected_count)

table_version = dict()
for stream in self.expected_streams():
with self.subTest(stream=stream):
Expand Down