-
Notifications
You must be signed in to change notification settings - Fork 14
TDL-24162 Log based inclusivity updates #90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 5 commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
af0b0fb
Uncomment all record count assertions, fix with pk count where needed…
bhtowles fb2bcbc
First round review comments (set comprehension, get() fallback) and s…
bhtowles 4cb991f
Delete commented out test_sync_full.py
bhtowles ab98f5d
Review comments 2, make pk count method generic and update to use tup…
bhtowles 844329b
Update log based int test to add new table pk to expected metadata
bhtowles 6100490
Review comments 3, fail test if upsert format or value is incorrect
bhtowles 7b71ef5
Fix typo / bug for pk tuple iteration
bhtowles 0674d7b
update comment
bhtowles File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -216,11 +216,8 @@ def test_run(self): | |
# verify records match on the first sync | ||
records_by_stream = runner.get_records_from_target_output() | ||
|
||
record_count_by_stream = runner.examine_target_output_file( | ||
self, conn_id, self.expected_sync_streams(), self.expected_primary_keys_by_sync_stream_id()) | ||
|
||
# BUG : TDL-19583 log_based_interruptible_dbo_int_and_bool_data is replicating the last row twice | ||
#self.assertEqual(record_count_by_stream, self.expected_count()) | ||
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) | ||
self.assertEqual(pk_count_by_stream, self.expected_count()) | ||
|
||
table_version = dict() | ||
initial_log_version = dict() | ||
|
@@ -229,45 +226,34 @@ def test_run(self): | |
stream_expected_data = self.expected_metadata()[stream] | ||
table_version[stream] = records_by_stream[stream]['table_version'] | ||
|
||
# BUG: TDL-19583 - 3 activate_version messages | ||
# verify on the first sync you get | ||
# activate version message before and after all data for the full table | ||
# and before the logical replication part | ||
|
||
# self.assertEqual( | ||
# records_by_stream[stream]['messages'][0]['action'], | ||
# 'activate_version') | ||
# self.assertEqual( | ||
# records_by_stream[stream]['messages'][-1]['action'], | ||
# 'activate_version') | ||
# self.assertEqual( | ||
# records_by_stream[stream]['messages'][-2]['action'], | ||
# 'activate_version') | ||
|
||
self.assertEqual( | ||
len([m for m in records_by_stream[stream]['messages'][1:] if m["action"] == "activate_version"]), | ||
2, | ||
msg="Expect 2 more activate version messages for end of full table and beginning of log based") | ||
# gather all actions then verify 3 activate versions, 1 at start, 2 in the last 3 | ||
actions = [rec['action'] for rec in records_by_stream[stream]['messages']] | ||
self.assertEqual(actions[0], 'activate_version') | ||
self.assertEqual(len([a for a in actions[-3:] if a == "activate_version"]), 2, | ||
msg=("Expected 2 of the last 3 messages to be activate version messages. 1 for " | ||
"end of full table and 1 for beginning of log based. Position can vary " | ||
"due to TDL-24162") | ||
) | ||
|
||
# verify state and bookmarks | ||
initial_state = menagerie.get_state(conn_id) | ||
bookmark = initial_state['bookmarks'][stream] | ||
|
||
self.assertIsNone(initial_state.get('currently_syncing'), msg="expected state's currently_syncing to be None") | ||
self.assertIsNotNone( | ||
bookmark.get('current_log_version'), | ||
msg="expected bookmark to have current_log_version because we are using log replication") | ||
self.assertTrue(bookmark['initial_full_table_complete'], msg="expected full table to be complete") | ||
self.assertIsNone(initial_state.get('currently_syncing'), | ||
msg="expected state's currently_syncing to be None") | ||
self.assertIsNotNone(bookmark.get('current_log_version'), | ||
msg="expected bookmark to have current_log_version due to log replication") | ||
self.assertTrue(bookmark['initial_full_table_complete'], | ||
msg="expected full table to be complete") | ||
inital_log_version = bookmark['current_log_version'] | ||
|
||
self.assertEqual(bookmark['version'], table_version[stream], | ||
msg="expected bookmark for stream to match version") | ||
|
||
expected_schemas = self.expected_metadata()[stream]['schema'] | ||
self.assertEqual(records_by_stream[stream]['schema'], | ||
expected_schemas, | ||
msg="expected: {} != actual: {}".format(expected_schemas, | ||
records_by_stream[stream]['schema'])) | ||
self.assertEqual(records_by_stream[stream]['schema'], expected_schemas, | ||
msg="expected: {} != actual: {}".format( | ||
expected_schemas, records_by_stream[stream]['schema'])) | ||
initial_log_version[stream] = bookmark['current_log_version'] | ||
|
||
initial_log_version_value = set(initial_log_version.values()).pop() | ||
|
@@ -281,14 +267,18 @@ def test_run(self): | |
# --> A table which is interrupted | ||
|
||
del interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['version'] | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data']['initial_full_table_complete'] = False | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_data'][ | ||
'initial_full_table_complete'] = False | ||
|
||
max_pk_values = {'max_pk_values': {'pk': 12}} | ||
last_pk_fetched = {'last_pk_fetched': {'pk': 10}} | ||
|
||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(max_pk_values) | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update(last_pk_fetched) | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data']['initial_full_table_complete'] = False | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update( | ||
max_pk_values) | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'].update( | ||
last_pk_fetched) | ||
interrupted_state['bookmarks']['log_based_interruptible_dbo_int_and_bool_data'][ | ||
'initial_full_table_complete'] = False | ||
|
||
menagerie.set_state(conn_id, interrupted_state) | ||
|
||
|
@@ -310,20 +300,24 @@ def test_run(self): | |
query_list.extend(insert(database_name, schema_name, table_name, | ||
int_after_values)) | ||
|
||
|
||
mssql_cursor_context_manager(*query_list) | ||
|
||
# add new table's pk to expected_metadata | ||
self.EXPECTED_METADATA['log_based_interruptible_dbo_int_data_after'] = { | ||
self.PRIMARY_KEYS: {'pk'}} | ||
|
||
# invoke the sync job AGAIN following various manipulations to the data | ||
|
||
# add the newly created stream in the expectations | ||
expected_sync_streams = self.expected_sync_streams() | ||
expected_sync_streams.add('log_based_interruptible_dbo_int_data_after') | ||
expected_primary_keys_by_sync_stream_id = self.expected_primary_keys_by_sync_stream_id() | ||
expected_primary_keys_by_sync_stream_id['log_based_interruptible_dbo_int_data_after'] = {'pk'} | ||
expected_primary_keys_by_sync_stream_id[ | ||
'log_based_interruptible_dbo_int_data_after'] = {'pk'} | ||
expected_count = self.expected_count() | ||
expected_count['log_based_interruptible_dbo_int_data_after'] = 6 | ||
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 3 | ||
expected_count['log_based_interruptible_dbo_int_data'] = 0 | ||
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 2 | ||
expected_count['log_based_interruptible_dbo_int_data'] = 14 | ||
Comment on lines
+319
to
+320
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see what changed to make this expectation to change? What am I missing? |
||
|
||
# run in check mode | ||
check_job_name = runner.run_check_mode(self, conn_id) | ||
|
@@ -345,15 +339,14 @@ def test_run(self): | |
|
||
records_by_stream = runner.get_records_from_target_output() | ||
|
||
record_count_by_stream = runner.examine_target_output_file( | ||
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id) | ||
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) | ||
|
||
second_state = menagerie.get_state(conn_id) | ||
bookmark_2 = second_state['bookmarks'] | ||
|
||
# validate the record count for all the streams after interruption recovery | ||
# BUG: TDL-19583 Duplicate record within a sync with 3 activate_version messages | ||
#self.assertEqual(record_count_by_stream, expected_count) | ||
# validate the record count for all the streams after interruption recovery, use unique | ||
# pks instead of all upserts to de-dupe and avoid inconsistency from TDL-24162 | ||
self.assertEqual(pk_count_by_stream, expected_count) | ||
|
||
second_log_version = dict() | ||
for stream in expected_sync_streams: | ||
|
@@ -409,14 +402,13 @@ def test_run(self): | |
|
||
records_by_stream = runner.get_records_from_target_output() | ||
|
||
record_count_by_stream = runner.examine_target_output_file( | ||
self, conn_id, expected_sync_streams, expected_primary_keys_by_sync_stream_id) | ||
pk_count_by_stream = self.unique_pk_count_by_stream(records_by_stream) | ||
|
||
expected_count['log_based_interruptible_dbo_int_data_after'] = 3 | ||
expected_count['log_based_interruptible_dbo_int_and_bool_data'] = 0 | ||
expected_count['log_based_interruptible_dbo_int_data'] = 0 | ||
|
||
self.assertEqual(record_count_by_stream, expected_count) | ||
self.assertEqual(pk_count_by_stream, expected_count) | ||
|
||
final_state = menagerie.get_state(conn_id) | ||
bookmark_3 = final_state['bookmarks'] | ||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar test code to before
Results:
set().difference()
doesn't modify the set its called on# remove any failed get() entries from the set to correct pk count
anywayThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea was to filter out any bad upserts, we carded out an upstream verification but I do like just failing the test if we find a bad upsert here. Commit incomming.