Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move MySQL56 to MySQL for replication related values #14977

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func main() {
Shard: "-80",
// Gtid "" is to stream from the start, "current" is to stream from the current gtid
// you can also specify a gtid to start with.
Gtid: "", //"current" // "MySQL56/36a89abd-978f-11eb-b312-04ed332e05c2:1-265"
Gtid: "", //"current" // "MySQL/36a89abd-978f-11eb-b312-04ed332e05c2:1-265"
Copy link
Member

@deepthi deepthi Apr 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion is as follows:

  • Don't change GTID format. That will break contracts with clients
  • Feel free to change function names etc.

Anyhow this isn't super important. Just confusing for anyone new to the code base.

}, {
Keyspace: "customer",
Shard: "80-",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@ import (
const (
// vdiffStopPosition is the default stop position for the target vreplication.
// It can be overridden with the positons argument to newTestVDiffEnv.
vdiffStopPosition = "MySQL56/d834e6b8-7cbf-11ed-a1eb-0242ac120002:1-892"
vdiffStopPosition = "MySQL/d834e6b8-7cbf-11ed-a1eb-0242ac120002:1-892"
// vdiffSourceGtid should be the position reported by the source side VStreamResults.
// It's expected to be higher the vdiffStopPosition.
vdiffSourceGtid = "MySQL56/d834e6b8-7cbf-11ed-a1eb-0242ac120002:1-893"
vdiffSourceGtid = "MySQL/d834e6b8-7cbf-11ed-a1eb-0242ac120002:1-893"
// vdiffTargetPrimaryPosition is the primary position of the target after
// vreplication has been synchronized.
vdiffTargetPrimaryPosition = "MySQL56/e34d6fb6-7cbf-11ed-a1eb-0242ac120002:1-892"
vdiffTargetPrimaryPosition = "MySQL/e34d6fb6-7cbf-11ed-a1eb-0242ac120002:1-892"
)

type testVDiffEnv struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,8 +572,8 @@ func TestVDiffSharded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
env := newTestVDiffEnv(t, ctx, []string{"-40", "40-"}, []string{"-80", "80-"}, "", map[string]string{
"-80": "MySQL56/0e45e704-7cb9-11ed-a1eb-0242ac120002:1-890",
"80-": "MySQL56/1497ddb0-7cb9-11ed-a1eb-0242ac120002:1-891",
"-80": "MySQL/0e45e704-7cb9-11ed-a1eb-0242ac120002:1-890",
"80-": "MySQL/1497ddb0-7cb9-11ed-a1eb-0242ac120002:1-891",
})
defer env.close()

Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (tp *TransactionPayload) decode() error {
eventLen, pos, decompressedPayloadLen))
}
eventData := decompressedPayload[pos : pos+eventLen]
ble := NewMysql56BinlogEvent(eventData)
ble := NewMysqlBinlogEvent(eventData)
tp.Events = append(tp.Events, ble)

pos += eventLen
Expand Down
30 changes: 15 additions & 15 deletions go/mysql/binlog_event_make.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
// This file contains utility methods to create binlog replication
// packets. They are mostly used for testing.

// NewMySQL56BinlogFormat returns a typical BinlogFormat for MySQL 5.6.
func NewMySQL56BinlogFormat() BinlogFormat {
// NewMySQLBinlogFormat returns a typical BinlogFormat for MySQL 5.6.
func NewMySQLBinlogFormat() BinlogFormat {
return BinlogFormat{
FormatVersion: 4,
ServerVersion: "5.6.33-0ubuntu0.14.04.1-log",
Expand Down Expand Up @@ -133,11 +133,11 @@

// NewInvalidEvent returns an invalid event (its size is <19).
func NewInvalidEvent() BinlogEvent {
return NewMysql56BinlogEvent([]byte{0})
return NewMysqlBinlogEvent([]byte{0})
}

// NewFormatDescriptionEvent creates a new FormatDescriptionEvent
// based on the provided BinlogFormat. It uses a mysql56BinlogEvent
// based on the provided BinlogFormat. It uses a mysqlBinlogEvent
// but could use a MariaDB one.
func NewFormatDescriptionEvent(f BinlogFormat, s *FakeBinlogStream) BinlogEvent {
length := 2 + // binlog-version
Expand All @@ -155,7 +155,7 @@
data[57+len(f.HeaderSizes)] = f.ChecksumAlgorithm

ev := s.Packetize(f, eFormatDescriptionEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewInvalidFormatDescriptionEvent returns an invalid FormatDescriptionEvent.
Expand All @@ -166,7 +166,7 @@
data[0] = 3

ev := s.Packetize(f, eFormatDescriptionEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewRotateEvent returns a RotateEvent.
Expand All @@ -179,7 +179,7 @@
copy(data[8:], filename)

ev := s.Packetize(f, eRotateEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

func NewFakeRotateEvent(f BinlogFormat, s *FakeBinlogStream, filename string) BinlogEvent {
Expand All @@ -190,14 +190,14 @@
copy(data[8:], filename)

ev := s.Packetize(f, eRotateEvent, FlagLogEventArtificial, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewHeartbeatEvent returns a HeartbeatEvent.
// see https://dev.mysql.com/doc/internals/en/heartbeat-event.html
func NewHeartbeatEvent(f BinlogFormat, s *FakeBinlogStream) BinlogEvent {
ev := s.Packetize(f, eHeartbeatEvent, 0, []byte{})
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewHeartbeatEvent returns a HeartbeatEvent.
Expand All @@ -208,7 +208,7 @@
copy(data, filename)

ev := s.Packetize(f, eHeartbeatEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)

Check warning on line 211 in go/mysql/binlog_event_make.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_make.go#L211

Added line #L211 was not covered by tests
}

// NewQueryEvent makes up a QueryEvent based on the Query structure.
Expand Down Expand Up @@ -250,7 +250,7 @@
copy(data[pos:], q.SQL)

ev := s.Packetize(f, eQueryEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewInvalidQueryEvent returns an invalid QueryEvent. IsValid is however true.
Expand All @@ -261,7 +261,7 @@
data[4+4] = 200 // > 100

ev := s.Packetize(f, eQueryEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewXIDEvent returns a XID event. We do not use the data, so keep it 0.
Expand All @@ -270,7 +270,7 @@
data := make([]byte, length)

ev := s.Packetize(f, eXIDEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewIntVarEvent returns an IntVar event.
Expand All @@ -289,7 +289,7 @@
data[8] = byte(value >> 56)

ev := s.Packetize(f, eIntVarEvent, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}

// NewMariaDBGTIDEvent returns a MariaDB specific GTID event.
Expand Down Expand Up @@ -462,5 +462,5 @@
}

ev := s.Packetize(f, typ, 0, data)
return NewMysql56BinlogEvent(ev)
return NewMysqlBinlogEvent(ev)
}
26 changes: 13 additions & 13 deletions go/mysql/binlog_event_make_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
// FormatDescriptionEvent is working properly.
func TestFormatDescriptionEvent(t *testing.T) {
// MySQL 5.6
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

event := NewFormatDescriptionEvent(f, s)
Expand All @@ -59,7 +59,7 @@ func TestFormatDescriptionEvent(t *testing.T) {
}

func TestQueryEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

q := Query{
Expand All @@ -85,7 +85,7 @@ func TestQueryEvent(t *testing.T) {
}

func TestXIDEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

event := NewXIDEvent(f, s)
Expand All @@ -95,7 +95,7 @@ func TestXIDEvent(t *testing.T) {
}

func TestIntVarEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

event := NewIntVarEvent(f, s, IntVarLastInsertID, 0x123456789abcdef0)
Expand All @@ -117,7 +117,7 @@ func TestIntVarEvent(t *testing.T) {
}

func TestInvalidEvents(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

// InvalidEvent
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestInvalidEvents(t *testing.T) {
}

func TestMariadDBGTIDEVent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()
s.ServerID = 0x87654321

Expand Down Expand Up @@ -190,7 +190,7 @@ func TestMariadDBGTIDEVent(t *testing.T) {
}

func TestTableMapEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

tm := &TableMap{
Expand Down Expand Up @@ -245,7 +245,7 @@ func TestTableMapEvent(t *testing.T) {
}

func TestLargeTableMapEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

colLen := 256
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestLargeTableMapEvent(t *testing.T) {
}

func TestRowsEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

/*
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestRowsEvent(t *testing.T) {

func TestHeartbeatEvent(t *testing.T) {
// MySQL 5.6
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()
event := NewHeartbeatEvent(f, s)
require.NotNil(t, event)
Expand All @@ -385,7 +385,7 @@ func TestHeartbeatEvent(t *testing.T) {

func TestRotateRotateEvent(t *testing.T) {
// MySQL 5.6
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()
event := NewRotateEvent(f, s, 456, "mysql-bin.000123")
require.NotNil(t, event)
Expand All @@ -398,7 +398,7 @@ func TestRotateRotateEvent(t *testing.T) {

func TestFakeRotateEvent(t *testing.T) {
// MySQL 5.6
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()
event := NewFakeRotateEvent(f, s, "mysql-bin.000123")
require.NotNil(t, event)
Expand All @@ -409,7 +409,7 @@ func TestFakeRotateEvent(t *testing.T) {
assert.Equal(t, "mysql-bin.000123", nextFile)
}
func TestLargeRowsEvent(t *testing.T) {
f := NewMySQL56BinlogFormat()
f := NewMySQLBinlogFormat()
s := NewFakeBinlogStream()

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,31 @@
"vitess.io/vitess/go/vt/vterrors"
)

// mysql56BinlogEvent wraps a raw packet buffer and provides methods to examine
// mysqlBinlogEvent wraps a raw packet buffer and provides methods to examine
// it by implementing BinlogEvent. Some methods are pulled in from
// binlogEvent.
type mysql56BinlogEvent struct {
type mysqlBinlogEvent struct {
binlogEvent
semiSyncAckRequested bool
}

// NewMysql56BinlogEventWithSemiSyncInfo creates a BinlogEvent from given byte array
func NewMysql56BinlogEventWithSemiSyncInfo(buf []byte, semiSyncAckRequested bool) BinlogEvent {
return mysql56BinlogEvent{binlogEvent: binlogEvent(buf), semiSyncAckRequested: semiSyncAckRequested}
// NewMysqlBinlogEventWithSemiSyncInfo creates a BinlogEvent from given byte array
func NewMysqlBinlogEventWithSemiSyncInfo(buf []byte, semiSyncAckRequested bool) BinlogEvent {
return mysqlBinlogEvent{binlogEvent: binlogEvent(buf), semiSyncAckRequested: semiSyncAckRequested}
}

// NewMysql56BinlogEvent creates a BinlogEvent from given byte array
func NewMysql56BinlogEvent(buf []byte) BinlogEvent {
return mysql56BinlogEvent{binlogEvent: binlogEvent(buf)}
// NewMysqlBinlogEvent creates a BinlogEvent from given byte array
func NewMysqlBinlogEvent(buf []byte) BinlogEvent {
return mysqlBinlogEvent{binlogEvent: binlogEvent(buf)}
}

// IsSemiSyncAckRequested implements BinlogEvent.IsSemiSyncAckRequested().
func (ev mysql56BinlogEvent) IsSemiSyncAckRequested() bool {
func (ev mysqlBinlogEvent) IsSemiSyncAckRequested() bool {
return ev.semiSyncAckRequested
}

// IsGTID implements BinlogEvent.IsGTID().
func (ev mysql56BinlogEvent) IsGTID() bool {
func (ev mysqlBinlogEvent) IsGTID() bool {
return ev.Type() == eGTIDEvent
}

Expand All @@ -60,18 +60,18 @@
// 1 flags
// 16 SID (server UUID)
// 8 GNO (sequence number, signed int)
func (ev mysql56BinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
func (ev mysqlBinlogEvent) GTID(f BinlogFormat) (replication.GTID, bool, error) {
data := ev.Bytes()[f.HeaderLength:]
var sid replication.SID
copy(sid[:], data[1:1+16])
gno := int64(binary.LittleEndian.Uint64(data[1+16 : 1+16+8]))
return replication.Mysql56GTID{Server: sid, Sequence: gno}, false /* hasBegin */, nil
return replication.MysqlGTID{Server: sid, Sequence: gno}, false /* hasBegin */, nil
}

// PreviousGTIDs implements BinlogEvent.PreviousGTIDs().
func (ev mysql56BinlogEvent) PreviousGTIDs(f BinlogFormat) (replication.Position, error) {
func (ev mysqlBinlogEvent) PreviousGTIDs(f BinlogFormat) (replication.Position, error) {

Check warning on line 72 in go/mysql/binlog_event_mysql.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_mysql.go#L72

Added line #L72 was not covered by tests
data := ev.Bytes()[f.HeaderLength:]
set, err := replication.NewMysql56GTIDSetFromSIDBlock(data)
set, err := replication.NewMysqlGTIDSetFromSIDBlock(data)

Check warning on line 74 in go/mysql/binlog_event_mysql.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/binlog_event_mysql.go#L74

Added line #L74 was not covered by tests
if err != nil {
return replication.Position{}, err
}
Expand All @@ -81,7 +81,7 @@
}

// StripChecksum implements BinlogEvent.StripChecksum().
func (ev mysql56BinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, error) {
func (ev mysqlBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte, error) {
switch f.ChecksumAlgorithm {
case BinlogChecksumAlgOff, BinlogChecksumAlgUndef:
// There is no checksum.
Expand All @@ -92,7 +92,7 @@
length := len(data)
checksum := data[length-BinlogCRC32ChecksumLen:]
data = data[:length-BinlogCRC32ChecksumLen]
return mysql56BinlogEvent{binlogEvent: binlogEvent(data)}, checksum, nil
return mysqlBinlogEvent{binlogEvent: binlogEvent(data)}, checksum, nil
default:
// MySQL 5.6 does not guarantee that future checksum algorithms will be
// 4 bytes, so we can't support them a priori.
Expand Down
Loading
Loading