diff --git a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go index 91777f51b9c..bd41fd76419 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go @@ -89,7 +89,7 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent // select * construct was used. We need to use the field names. tplan, err := rp.buildFromFields(prelim.TargetName, prelim.Lastpk, fieldEvent.Fields) if err != nil { - return nil, err + return nil, vterrors.Wrapf(err, "failed to build replication plan for %s table", fieldEvent.TableName) } tplan.Fields = fieldEvent.Fields return tplan, nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 653cc713c8f..70bd8016b9d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -558,7 +558,7 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if err := vp.applyEvent(ctx, event, mustSave); err != nil { if err != io.EOF { vp.vr.stats.ErrorCounts.Add([]string{"Apply"}, 1) - var table, tableLogMsg string + var table, tableLogMsg, gtidLogMsg string switch { case event.GetFieldEvent() != nil: table = event.GetFieldEvent().TableName @@ -568,7 +568,12 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error { if table != "" { tableLogMsg = fmt.Sprintf(" for table %s", table) } - log.Errorf("Error applying event%s: %s", tableLogMsg, err.Error()) + pos := getNextPosition(items, i, j+1) + if pos != "" { + gtidLogMsg = fmt.Sprintf(" while processing position %s", pos) + } + log.Errorf("Error applying event%s%s: %s", tableLogMsg, gtidLogMsg, err.Error()) + err = vterrors.Wrapf(err, "error applying event%s%s", tableLogMsg, gtidLogMsg) } return err } @@ -602,6 +607,34 @@ func hasAnotherCommit(items [][]*binlogdatapb.VEvent, i, j int) bool { return false } +// getNextPosition returns the GTID set/position we would be at if the current +// transaction was committed. This is useful for error handling as we can then +// determine which GTID we're failing to process from the source and examine the +// binlog events for that GTID directly on the source to debug the issue. +// This is needed as it's not as simple as the user incrementing the current +// position in the stream by 1 as we may be skipping N intermediate GTIDs in the +// stream due to filtering. For GTIDs that we filter out we still replicate the +// GTID event itself, just without any internal events and a COMMIT event (see +// the unsavedEvent handling). +func getNextPosition(items [][]*binlogdatapb.VEvent, i, j int) string { + for i < len(items) { + for j < len(items[i]) { + switch items[i][j].Type { + case binlogdatapb.VEventType_GTID: + pos, err := binlogplayer.DecodePosition(items[i][j].Gtid) + if err != nil { + return "" + } + return pos.String() + } + j++ + } + j = 0 + i++ + } + return "" +} + func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, mustSave bool) error { stats := NewVrLogStats(event.Type.String()) switch event.Type { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index b1925c3c64f..0641a111199 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -575,9 +575,20 @@ func TestPlayerForeignKeyCheck(t *testing.T) { cancel() } -func TestPlayerStatementModeWithFilter(t *testing.T) { +// TestPlayerStatementModeWithFilterAndErrorHandling confirms that we get the +// expected error when using a filter with statement mode. It also tests the +// general vplayer applyEvent error and log message handling. +func TestPlayerStatementModeWithFilterAndErrorHandling(t *testing.T) { defer deleteTablet(addTablet(100)) + // We want to check for the expected log message. + ole := log.Errorf + logger := logutil.NewMemoryLogger() + log.Errorf = logger.Errorf + defer func() { + log.Errorf = ole + }() + execStatements(t, []string{ "create table src1(id int, val varbinary(128), primary key(id))", }) @@ -600,21 +611,29 @@ func TestPlayerStatementModeWithFilter(t *testing.T) { cancel, _ := startVReplication(t, bls, "") defer cancel() + const gtid = "37f16b4c-5a74-11ef-87de-56bfd605e62e:100" input := []string{ "set @@session.binlog_format='STATEMENT'", + fmt.Sprintf("set @@session.gtid_next='%s'", gtid), "insert into src1 values(1, 'aaa')", + "set @@session.gtid_next='AUTOMATIC'", "set @@session.binlog_format='ROW'", } + expectedMsg := fmt.Sprintf("[Ee]rror applying event while processing position .*%s.* filter rules are not supported for SBR.*", gtid) + // It does not work when filter is enabled output := qh.Expect( "begin", "rollback", - "/update _vt.vreplication set message='filter rules are not supported for SBR", + fmt.Sprintf("/update _vt.vreplication set message='%s", expectedMsg), ) execStatements(t, input) expectDBClientQueries(t, output) + + logs := logger.String() + require.Regexp(t, expectedMsg, logs) } func TestPlayerStatementMode(t *testing.T) { @@ -1758,8 +1777,8 @@ func TestPlayerDDL(t *testing.T) { execStatements(t, []string{"alter table t1 add column val2 varchar(128)"}) expectDBClientQueries(t, qh.Expect( "alter table t1 add column val2 varchar(128)", - "/update _vt.vreplication set message='Duplicate", - "/update _vt.vreplication set state='Error', message='Duplicate", + "/update _vt.vreplication set message='error applying event: Duplicate", + "/update _vt.vreplication set state='Error', message='error applying event: Duplicate", )) cancel()