diff --git a/inline_verifier.go b/inline_verifier.go index 2f47f293..be2c8422 100644 --- a/inline_verifier.go +++ b/inline_verifier.go @@ -3,6 +3,8 @@ package ghostferry import ( "bytes" "context" + "crypto/md5" + "encoding/hex" "errors" "fmt" "sort" @@ -233,11 +235,23 @@ func (s *BinlogVerifyStore) Serialize() BinlogVerifySerializedStore { return s.store.Copy() } +type mismatchType string + +const ( + MismatchColumnMissingOnSource mismatchType = "column missing on source" + MismatchColumnMissingOnTarget mismatchType = "column missing on target" + MismatchRowMissingOnSource mismatchType = "row missing on source" + MismatchRowMissingOnTarget mismatchType = "row missing on target" + MismatchContentDifference mismatchType = "content difference" + MismatchChecksumDifference mismatchType = "rows checksum difference" +) + type InlineVerifierMismatches struct { Pk uint64 SourceChecksum string TargetChecksum string - Mismatch mismatch + MismatchColumn string + MismatchType mismatchType } type InlineVerifier struct { @@ -452,18 +466,23 @@ func formatMismatches(mismatches map[string]map[string][]InlineVerifierMismatche incorrectTables = append(incorrectTables, tableNameWithSchema) messageBuf.WriteString(tableNameWithSchema) - messageBuf.WriteString(" [paginationKeys: ") + messageBuf.WriteString(" [PKs: ") for _, mismatch := range mismatches[schemaName][tableName] { messageBuf.WriteString(strconv.FormatUint(mismatch.Pk, 10)) - messageBuf.WriteString(" (source: ") - messageBuf.WriteString(mismatch.SourceChecksum) - messageBuf.WriteString(", target: ") - messageBuf.WriteString(mismatch.TargetChecksum) - messageBuf.WriteString(", type: ") - messageBuf.WriteString(string(mismatch.Mismatch.mismatchType)) - if mismatch.Mismatch.column != "" { + messageBuf.WriteString(" (type: ") + messageBuf.WriteString(string(mismatch.MismatchType)) + if mismatch.SourceChecksum != "" { + messageBuf.WriteString(", source: ") + messageBuf.WriteString(mismatch.SourceChecksum) + } + if mismatch.TargetChecksum != "" { + messageBuf.WriteString(", target: ") + messageBuf.WriteString(mismatch.TargetChecksum) + } + + if mismatch.MismatchColumn != "" { messageBuf.WriteString(", column: ") - messageBuf.WriteString(mismatch.Mismatch.column) + messageBuf.WriteString(mismatch.MismatchColumn) } messageBuf.WriteString(") ") @@ -592,22 +611,27 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin for paginationKey, targetHash := range target { sourceHash, exists := source[paginationKey] - if !bytes.Equal(sourceHash, targetHash) || !exists { + if !exists { mismatchSet[paginationKey] = InlineVerifierMismatches{ Pk: paginationKey, + MismatchType: MismatchRowMissingOnSource, + } + } else if !bytes.Equal(sourceHash, targetHash) { + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchChecksumDifference, SourceChecksum: string(sourceHash), TargetChecksum: string(targetHash), } } } - for paginationKey, sourceHash := range source { - targetHash, exists := target[paginationKey] - if !bytes.Equal(sourceHash, targetHash) || !exists { + for paginationKey, _ := range source { + _, exists := target[paginationKey] + if !exists { mismatchSet[paginationKey] = InlineVerifierMismatches{ Pk: paginationKey, - SourceChecksum: string(sourceHash), - TargetChecksum: string(targetHash), + MismatchType: MismatchRowMissingOnTarget, } } } @@ -615,30 +639,17 @@ func (v *InlineVerifier) compareHashes(source, target map[uint64][]byte) map[uin return mismatchSet } -type mismatchType string -const ( - MismatchColumnMissingOnSource mismatchType = "column missing on source" - MismatchColumnMissingOnTarget mismatchType = "column missing on target" - MismatchRowMissingOnSource mismatchType = "row missing on source" - MismatchRowMissingOnTarget mismatchType = "row missing on target" - MismatchContentDifference mismatchType = "content difference" -) - -type mismatch struct { - column string - mismatchType mismatchType -} - -func compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]mismatch { - mismatchSet := map[uint64]mismatch{} +func compareDecompressedData(source, target map[uint64]map[string][]byte) map[uint64]InlineVerifierMismatches { + mismatchSet := map[uint64]InlineVerifierMismatches{} for paginationKey, targetDecompressedColumns := range target { sourceDecompressedColumns, exists := source[paginationKey] if !exists { // row missing on source - mismatchSet[paginationKey] = mismatch{ - mismatchType: MismatchRowMissingOnSource, + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchRowMissingOnSource, } continue } @@ -646,15 +657,22 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui for colName, targetData := range targetDecompressedColumns { sourceData, exists := sourceDecompressedColumns[colName] if !exists { - mismatchSet[paginationKey] = mismatch{ - column: colName, - mismatchType: MismatchColumnMissingOnSource, + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchColumnMissingOnSource, + MismatchColumn: colName, } break // no need to compare other columns } else if !bytes.Equal(sourceData, targetData) { - mismatchSet[paginationKey] = mismatch{ - column: colName, - mismatchType: MismatchContentDifference, + sourceChecksum := md5.Sum(sourceData) + targetChecksum := md5.Sum(targetData) + + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchContentDifference, + MismatchColumn: colName, + SourceChecksum: hex.EncodeToString(sourceChecksum[:]), + TargetChecksum: hex.EncodeToString(targetChecksum[:]), } break // no need to compare other columns } @@ -665,8 +683,9 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui targetDecompressedColumns, exists := target[paginationKey] if !exists { // row missing on target - mismatchSet[paginationKey] = mismatch{ - mismatchType: MismatchRowMissingOnTarget, + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchType: MismatchRowMissingOnTarget, } continue } @@ -674,9 +693,10 @@ func compareDecompressedData(source, target map[uint64]map[string][]byte) map[ui for colName := range sourceDecompressedColumns { _, exists := targetDecompressedColumns[colName] if !exists { - mismatchSet[paginationKey] = mismatch{ - column: colName, - mismatchType: MismatchColumnMissingOnTarget, + mismatchSet[paginationKey] = InlineVerifierMismatches{ + Pk: paginationKey, + MismatchColumn: colName, + MismatchType: MismatchColumnMissingOnTarget, } } } @@ -689,12 +709,7 @@ func (v *InlineVerifier) compareHashesAndData(sourceHashes, targetHashes map[uin mismatches := v.compareHashes(sourceHashes, targetHashes) compressedMismatch := compareDecompressedData(sourceData, targetData) for paginationKey, mismatch := range compressedMismatch { - mismatches[paginationKey] = InlineVerifierMismatches{ - Pk: paginationKey, - SourceChecksum: "compressed-data-mismatch", // TODO: compute the hash of the compressed data and put it here - TargetChecksum: "compressed-data-mismatch", - Mismatch: mismatch, - } + mismatches[paginationKey] = mismatch } mismatchList := make([]InlineVerifierMismatches, 0, len(mismatches)) diff --git a/inline_verifier_test.go b/inline_verifier_test.go index 44c2b930..61c922f2 100644 --- a/inline_verifier_test.go +++ b/inline_verifier_test.go @@ -16,7 +16,7 @@ func TestCompareDecompressedDataNoDifference(t *testing.T) { result := compareDecompressedData(source, target) - assert.Equal(t, map[uint64]mismatch{}, result) + assert.Equal(t, map[uint64]InlineVerifierMismatches{}, result) } func TestCompareDecompressedDataContentDifference(t *testing.T) { @@ -29,7 +29,15 @@ func TestCompareDecompressedDataContentDifference(t *testing.T) { result := compareDecompressedData(source, target) - assert.Equal(t, map[uint64]mismatch{1: {mismatchType: MismatchContentDifference, column: "name"}}, result) + assert.Equal(t, map[uint64]InlineVerifierMismatches{ + 1: { + Pk: 1, + MismatchType: MismatchContentDifference, + MismatchColumn: "name", + SourceChecksum: "e356a972989f87a1531252cfa2152797", + TargetChecksum: "81b8a1b77068d06e1c8190825253066f", + }, + }, result) } func TestCompareDecompressedDataMissingTarget(t *testing.T) { @@ -40,7 +48,7 @@ func TestCompareDecompressedDataMissingTarget(t *testing.T) { result := compareDecompressedData(source, target) - assert.Equal(t, map[uint64]mismatch{1: {mismatchType: MismatchRowMissingOnTarget}}, result) + assert.Equal(t, map[uint64]InlineVerifierMismatches{1: {Pk: 1, MismatchType: MismatchRowMissingOnTarget}}, result) } func TestCompareDecompressedDataMissingSource(t *testing.T) { @@ -51,7 +59,7 @@ func TestCompareDecompressedDataMissingSource(t *testing.T) { result := compareDecompressedData(source, target) - assert.Equal(t, map[uint64]mismatch{3: {mismatchType: MismatchRowMissingOnSource}}, result) + assert.Equal(t, map[uint64]InlineVerifierMismatches{3: {Pk: 3, MismatchType: MismatchRowMissingOnSource}}, result) } func TestFormatMismatch(t *testing.T) { @@ -60,18 +68,14 @@ func TestFormatMismatch(t *testing.T) { "users": { InlineVerifierMismatches{ Pk: 1, - SourceChecksum: "", - TargetChecksum: "bar", - Mismatch: mismatch{ - mismatchType: MismatchRowMissingOnSource, - }, + MismatchType: MismatchRowMissingOnSource, }, }, }, } message, tables := formatMismatches(mismatches) - assert.Equal(t, string("cutover verification failed for: default.users [paginationKeys: 1 (source: , target: bar, type: row missing on source) ] "), message) + assert.Equal(t, string("cutover verification failed for: default.users [PKs: 1 (type: row missing on source) ] "), message) assert.Equal(t, []string{string("default.users")}, tables) } @@ -81,47 +85,35 @@ func TestFormatMismatches(t *testing.T) { "users": { InlineVerifierMismatches{ Pk: 1, - SourceChecksum: "", - TargetChecksum: "bar", - Mismatch: mismatch{ - mismatchType: MismatchRowMissingOnSource, - }, + MismatchType: MismatchRowMissingOnSource, }, InlineVerifierMismatches{ Pk: 5, - SourceChecksum: "baz", - TargetChecksum: "", - Mismatch: mismatch{ - mismatchType: MismatchRowMissingOnTarget, - }, + MismatchType: MismatchRowMissingOnTarget, }, }, "posts": { InlineVerifierMismatches{ Pk: 9, + MismatchType: MismatchContentDifference, + MismatchColumn: string("title"), SourceChecksum: "boo", TargetChecksum: "aaa", - Mismatch: mismatch{ - mismatchType: MismatchContentDifference, - column: string("title"), - }, }, }, "attachments": { InlineVerifierMismatches{ Pk: 7, + MismatchType: MismatchContentDifference, + MismatchColumn: string("name"), SourceChecksum: "boo", TargetChecksum: "aaa", - Mismatch: mismatch{ - mismatchType: MismatchContentDifference, - column: string("name"), - }, }, }, }, } message, tables := formatMismatches(mismatches) - assert.Equal(t, string("cutover verification failed for: default.attachments [paginationKeys: 7 (source: boo, target: aaa, type: content difference, column: name) ] default.posts [paginationKeys: 9 (source: boo, target: aaa, type: content difference, column: title) ] default.users [paginationKeys: 1 (source: , target: bar, type: row missing on source) 5 (source: baz, target: , type: row missing on target) ] "), message) + assert.Equal(t, string("cutover verification failed for: default.attachments [PKs: 7 (type: content difference, source: boo, target: aaa, column: name) ] default.posts [PKs: 9 (type: content difference, source: boo, target: aaa, column: title) ] default.users [PKs: 1 (type: row missing on source) 5 (type: row missing on target) ] "), message) assert.Equal(t, []string{string("default.attachments"), string("default.posts"), string("default.users")}, tables) } diff --git a/test/integration/inline_verifier_test.rb b/test/integration/inline_verifier_test.rb index 26c3a34d..89cc1029 100644 --- a/test/integration/inline_verifier_test.rb +++ b/test/integration/inline_verifier_test.rb @@ -40,7 +40,10 @@ def test_corrupted_insert_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: #{corrupting_id} (type: rows checksum difference, source: " + assert ghostferry.error_lines.last["msg"].start_with?(expected_message) end def test_different_compressed_data_is_detected_inline_with_batch_writer @@ -68,7 +71,11 @@ def test_different_compressed_data_is_detected_inline_with_batch_writer assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: 1 (type: content difference, source: 389101948d1694a3bbfb904f57ae845c, target: 4594bb26f2f93c5c60328df6c86a0846, column: data) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_same_decompressed_data_different_compressed_test_passes_inline_verification @@ -163,7 +170,11 @@ def test_catches_binlog_streamer_corruption ghostferry.run assert verification_ran - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: #{corrupting_id}") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: #{corrupting_id} (type: rows checksum difference, source: ced197ee28c2e73cc737242eb0e8c49c, target: ff030f09c559a197ed440b0eee7950a0) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_target_corruption_is_ignored_if_skip_target_verification @@ -399,7 +410,11 @@ def test_catches_binlog_streamer_corruption_with_composite_pk ghostferry.run assert verification_ran assert incorrect_tables_found, "verification did not catch corrupted table" - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: #{corrupting_id}") + + expected_message = "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} "\ + "[PKs: #{corrupting_id} (type: rows checksum difference, source: 0cc788986133d5289aba8cd87705d106, target: f4c00525c4daf1388254f1b1024ed35d) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_positive_negative_zero @@ -430,7 +445,10 @@ def test_positive_negative_zero assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} [paginationKeys: 1") + + expected_message = "cutover verification failed for: #{DEFAULT_DB}.#{DEFAULT_TABLE} "\ + "[PKs: 1 (type: rows checksum difference, source: 2888f4944da0fba0d5a5c7a7de2346f3, target: 2fa7e7e5e76005ffd8bfa5082da9f2f9) ] " + assert_equal expected_message, ghostferry.error_lines.last["msg"] # Now we run the real test case. target_db.query("UPDATE #{DEFAULT_FULL_TABLE_NAME} SET data = -0.0 WHERE id = 1") @@ -484,7 +502,10 @@ def test_null_vs_empty_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 [PKs: 1 (type: " + + assert ghostferry.error_lines.last["msg"].start_with?(expected_message) end def test_null_vs_null_string @@ -507,7 +528,11 @@ def test_null_vs_null_string assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 " \ + "[PKs: 1 (type: rows checksum difference, source: 7dfce9db8fc0f2475d2ff8ac3a5382e9, target: dc4cca2441c365c72466c75076782022) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end def test_null_in_different_order @@ -533,7 +558,11 @@ def test_null_in_different_order assert verification_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 "\ + "[PKs: 1 (type: rows checksum difference, source: 8e8e0931b9b2e5cb422a76d63160bbf3, target: 503b2de936a8da9e8d67b0d4594117d9) ] " + + assert_equal expected_message, ghostferry.error_lines.last["msg"] end ################### @@ -605,7 +634,9 @@ def run_collation_test(data, source_charset, target_charset, identical:) assert verify_during_cutover_ran assert_equal ["#{DEFAULT_DB}.#{DEFAULT_TABLE}"], incorrect_tables - assert ghostferry.error_lines.last["msg"].start_with?("cutover verification failed for: gftest.test_table_1 [paginationKeys: 1") + + expected_message = "cutover verification failed for: gftest.test_table_1 [PKs: 1 (" + assert ghostferry.error_lines.last["msg"].start_with?(expected_message) end end