diff --git a/arbiter/server.go b/arbiter/server.go index 7035f1406..fb8d1b294 100644 --- a/arbiter/server.go +++ b/arbiter/server.go @@ -287,8 +287,16 @@ func (s *Server) loadStatus() (int, error) { func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.Loader) (err error) { dest := ld.Input() defer ld.Close() + var receivedTs int64 for msg := range source { log.Debug("recv msg from kafka reader", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) + + if msg.Binlog.CommitTs <= receivedTs { + log.Info("skip repeated binlog", zap.Int64("ts", msg.Binlog.CommitTs), zap.Int64("offset", msg.Offset)) + continue + } + receivedTs = msg.Binlog.CommitTs + txn, err := loader.SlaveBinlogToTxn(msg.Binlog) if err != nil { log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err)) diff --git a/arbiter/server_test.go b/arbiter/server_test.go index 97cb0cfa4..8b68a5550 100644 --- a/arbiter/server_test.go +++ b/arbiter/server_test.go @@ -380,7 +380,7 @@ type syncBinlogsSuite struct{} var _ = Suite(&syncBinlogsSuite{}) -func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message { +func (s *syncBinlogsSuite) createMsg(schema, table, sql string, commitTs int64) *reader.Message { return &reader.Message{ Binlog: &pb.Binlog{ DdlData: &pb.DDLData{ @@ -388,6 +388,7 @@ func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message TableName: &table, DdlQuery: []byte(sql), }, + CommitTs: commitTs, }, } } @@ -395,8 +396,15 @@ func (s *syncBinlogsSuite) createMsg(schema, table, sql string) *reader.Message func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) { source := make(chan *reader.Message, 1) msgs := []*reader.Message{ - s.createMsg("test42", "users", "alter table users add column gender smallint"), - s.createMsg("test42", "operations", "alter table operations drop column seq"), + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "operations", "alter table operations drop column seq", 2), + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "operations", "alter table operations drop column seq", 2), + } + expectMsgs := []*reader.Message{ + s.createMsg("test42", "users", "alter table users add column gender smallint", 1), + s.createMsg("test42", "operations", "alter table operations drop column seq", 2), } dest := make(chan *loader.Txn, len(msgs)) go func() { @@ -410,8 +418,8 @@ func (s *syncBinlogsSuite) TestShouldSendBinlogToLoader(c *C) { err := syncBinlogs(context.Background(), source, &ld) c.Assert(err, IsNil) - c.Assert(len(dest), Equals, 2) - for _, m := range msgs { + c.Assert(len(dest), Equals, len(expectMsgs)) + for _, m := range expectMsgs { txn := <-dest c.Assert(txn.Metadata.(*reader.Message), DeepEquals, m) } @@ -426,7 +434,7 @@ func (s *syncBinlogsSuite) TestShouldQuitWhenSomeErrorOccurs(c *C) { // input is set small to trigger blocking easily input: make(chan *loader.Txn, 1), } - msg := s.createMsg("test42", "users", "alter table users add column gender smallint") + msg := s.createMsg("test42", "users", "alter table users add column gender smallint", 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // start a routine keep sending msgs to kafka reader