Skip to content

Commit

Permalink
VReplication: Improve replication plan builder and event application …
Browse files Browse the repository at this point in the history
…errors (#16596)

Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord authored Aug 21, 2024
1 parent 5458941 commit 4206c2a
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent
// select * construct was used. We need to use the field names.
tplan, err := rp.buildFromFields(prelim.TargetName, prelim.Lastpk, fieldEvent.Fields)
if err != nil {
return nil, err
return nil, vterrors.Wrapf(err, "failed to build replication plan for %s table", fieldEvent.TableName)
}
tplan.Fields = fieldEvent.Fields
return tplan, nil
Expand Down
37 changes: 35 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,7 +558,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if err := vp.applyEvent(ctx, event, mustSave); err != nil {
if err != io.EOF {
vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1)
var table, tableLogMsg string
var table, tableLogMsg, gtidLogMsg string
switch {
case event.GetFieldEvent() != nil:
table = event.GetFieldEvent().TableName
Expand All @@ -568,7 +568,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
if table != "" {
tableLogMsg = fmt.Sprintf(" for table %s", table)
}
log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error())
pos := getNextPosition(items, i, j+1)
if pos != "" {
gtidLogMsg = fmt.Sprintf(" while processing position %s", pos)
}
log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error())
err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg)
}
return err
}
Expand Down Expand Up @@ -602,6 +607,34 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool {
return false
}

// getNextPosition returns the GTID set/position we would be at if the current
// transaction was committed. This is useful for error handling as we can then
// determine which GTID we're failing to process from the source and examine the
// binlog events for that GTID directly on the source to debug the issue.
// This is needed as it's not as simple as the user incrementing the current
// position in the stream by 1 as we may be skipping N intermediate GTIDs in the
// stream due to filtering. For GTIDs that we filter out we still replicate the
// GTID event itself, just without any internal events and a COMMIT event (see
// the unsavedEvent handling).
func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string {
for i < len(items) {
for j < len(items[i]) {
switch items[i][j].Type {
case binlogdatapb.VEventType_GTID:
pos, err := binlogplayer.DecodePosition(items[i][j].Gtid)
if err != nil {
return ""
}
return pos.String()
}
j++
}
j = 0
i++
}
return ""
}

func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error {
stats := NewVrLogStats(event.Type.String())
switch event.Type {
Expand Down
27 changes: 23 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,9 +575,20 @@ func TestPlayerForeignKeyCheck(t *testing.T) {
cancel()
}

func TestPlayerStatementModeWithFilter(t *testing.T) {
// TestPlayerStatementModeWithFilterAndErrorHandling confirms that we get the
// expected error when using a filter with statement mode. It also tests the
// general vplayer applyEvent error and log message handling.
func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) {
defer deleteTablet(addTablet(100))

// We want to check for the expected log message.
ole := log.Errorf
logger := logutil.NewMemoryLogger()
log.Errorf = logger.Errorf
defer func() {
log.Errorf = ole
}()

execStatements(t, []string{
"create table src1(id int, val varbinary(128), primary key(id))",
})
Expand All @@ -600,21 +611,29 @@ func TestPlayerStatementModeWithFilter(t *testing.T) {
cancel, _ := startVReplication(t, bls, "")
defer cancel()

const gtid = "37f16b4c-5a74-11ef-87de-56bfd605e62e:100"
input := []string{
"set @@session.binlog_format='STATEMENT'",
fmt.Sprintf("set @@session.gtid_next='%s'", gtid),
"insert into src1 values(1, 'aaa')",
"set @@session.gtid_next='AUTOMATIC'",
"set @@session.binlog_format='ROW'",
}

expectedMsg := fmt.Sprintf("[Ee]rror applying event while processing position .*%s.* filter rules are not supported for SBR.*", gtid)

// It does not work when filter is enabled
output := qh.Expect(
"begin",
"rollback",
"/update _vt.vreplication set message='filter rules are not supported for SBR",
fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg),
)

execStatements(t, input)
expectDBClientQueries(t, output)

logs := logger.String()
require.Regexp(t, expectedMsg, logs)
}

func TestPlayerStatementMode(t *testing.T) {
Expand Down Expand Up @@ -1758,8 +1777,8 @@ func TestPlayerDDL(t *testing.T) {
execStatements(t, []string{"alter table t1 add column val2 varchar(128)"})
expectDBClientQueries(t, qh.Expect(
"alter table t1 add column val2 varchar(128)",
"/update _vt.vreplication set message='Duplicate",
"/update _vt.vreplication set state='Error', message='Duplicate",
"/update _vt.vreplication set message='error applying event: Duplicate",
"/update _vt.vreplication set state='Error', message='error applying event: Duplicate",
))
cancel()

Expand Down

0 comments on commit 4206c2a

Please sign in to comment.