Skip to content

Commit

Permalink
Use uint64 for binary log file position (#17472)
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
vitess-bot[bot] authored and vitess-bot committed Jan 10, 2025
1 parent 261cc67 commit 5882501
Show file tree
Hide file tree
Showing 14 changed files with 61 additions and 45 deletions.
2 changes: 1 addition & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type BinlogEvent interface {
IsValid() bool

// General protocol events.
NextPosition() uint32
NextPosition() uint64

// IsFormatDescription returns true if this is a
// FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before
Expand Down
7 changes: 4 additions & 3 deletions go/mysql/binlog_event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
// +----------------------------+
// | extra_headers 19 : x-19 |
// +============================+
// http://dev.mysql.com/doc/internals/en/event-header-fields.html
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header
type binlogEvent []byte

const (
Expand Down Expand Up @@ -119,8 +119,9 @@ func (ev binlogEvent) Length() uint32 {
}

// NextPosition returns the nextPosition field from the header
func (ev binlogEvent) NextPosition() uint32 {
return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4])
func (ev binlogEvent) NextPosition() uint64 {
// Only 4 bytes are used for the next_position field in the header.
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
}

// IsFormatDescription implements BinlogEvent.IsFormatDescription().
Expand Down
9 changes: 5 additions & 4 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte

// nextPosition returns the next file position of the binlog.
// If no information is available, it returns 0.
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 {
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 {
if f.HeaderLength <= 13 {
// Dead code. This is just a failsafe.
return 0
}
return binary.LittleEndian.Uint32(ev.Bytes()[13:17])
// The header only uses 4 bytes for the next_position.
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
}

// rotate implements BinlogEvent.Rotate().
Expand Down Expand Up @@ -139,7 +140,7 @@ type filePosFakeEvent struct {
timestamp uint32
}

func (ev filePosFakeEvent) NextPosition() uint32 {
func (ev filePosFakeEvent) NextPosition() uint64 {
return 0
}

Expand Down Expand Up @@ -279,7 +280,7 @@ type filePosGTIDEvent struct {
gtid replication.FilePosGTID
}

func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent {
func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent {
return filePosGTIDEvent{
filePosFakeEvent: filePosFakeEvent{
timestamp: timestamp,
Expand Down
12 changes: 9 additions & 3 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"fmt"
"math"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand All @@ -32,9 +33,14 @@ const (
// This file contains the methods related to replication.

// WriteComBinlogDump writes a ComBinlogDump command.
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax.
// Returns a SQLError.
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error {
// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error {
// The binary log file position is a uint64, but the protocol command
// only uses 4 bytes for the file position.
if binlogPos > math.MaxUint32 {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos)
}
c.sequence = 0
length := 1 + // ComBinlogDump
4 + // binlog-pos
Expand All @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog
len(binlogFilename) // binlog-filename
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDump)
pos = writeUint32(data, pos, binlogPos)
pos = writeUint32(data, pos, uint32(binlogPos))
pos = writeUint16(data, pos, flags)
pos = writeUint32(data, pos, serverID)
_ = writeEOFString(data, pos, binlogFilename)
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/replication/filepos_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s)
}

pos, err := strconv.ParseUint(parts[1], 0, 32)
pos, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s)
}

return FilePosGTID{
File: parts[0],
Pos: uint32(pos),
Pos: pos,
}, nil
}

Expand All @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) {
// FilePosGTID implements GTID.
type FilePosGTID struct {
File string
Pos uint32
Pos uint64
}

// String implements GTID.String().
Expand Down
9 changes: 7 additions & 2 deletions go/mysql/replication/filepos_gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_filePosGTID_String(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
tests := []struct {
name string
Expand All @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) {
fields{file: "mysql-bin.166031", pos: 192394},
"mysql-bin.166031:192394",
},
{
"handles large position correctly",
fields{file: "vt-1448040107-bin.003222", pos: 4663881395},
"vt-1448040107-bin.003222:4663881395",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) {
func Test_filePosGTID_ContainsGTID(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
type args struct {
other GTID
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package mysql

import (
"math"
"reflect"
"testing"

Expand All @@ -34,6 +35,10 @@ func TestComBinlogDump(t *testing.T) {
cConn.Close()
}()

// Try to write a ComBinlogDump packet with a position greater than 4 bytes.
err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e)
require.Error(t, err)

// Write ComBinlogDump packet, read it, compare.
if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil {
t.Fatalf("WriteComBinlogDump failed: %v", err)
Expand Down
16 changes: 6 additions & 10 deletions go/test/endtoend/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi
streams from the same source. The main difference between an external source vs a vitess
source is that the source proto contains an "external_mysql" field instead of keyspace and shard.
That field is the key into the externalConnections section of the input yaml.
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter:<rules:<match:\"product\" > > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter:<rules:<match:\"customer\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter:<rules:<match:\"orders\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
*/
func TestMigration(t *testing.T) {
yamlFile := startCluster(t)
Expand All @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) {
migrate(t, "customer", "commerce", []string{"customer"})
migrate(t, "customer", "commerce", []string{"orders"})
vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess
waitForVReplicationToCatchup(t, vttablet, 1*time.Second)
waitForVReplicationToCatchup(t, vttablet, 30*time.Second)

testcases := []struct {
query string
Expand Down Expand Up @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) {
var sqlEscaped bytes.Buffer
val.EncodeSQL(&sqlEscaped)
query := fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplicationExec: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess
err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query)
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplication insert: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias
err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query)
require.NoError(t, err)
}

Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Equal(t, "ON", primaryInstance.GTIDMode)
assert.Equal(t, "FULL", primaryInstance.BinlogRowImage)
assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID))
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0))
assert.True(t, primaryInstance.SemiSyncPrimaryEnabled)
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Equal(t, utils.Hostname, replicaInstance.SourceHost)
assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort)
assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID))
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0))
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
Expand All @@ -157,11 +157,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, replicaInstance.ReplicationIOThreadRuning)
assert.True(t, replicaInstance.ReplicationSQLThreadRuning)
assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0))
assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0))
assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID))
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0))
assert.Empty(t, replicaInstance.LastIOError)
assert.Empty(t, replicaInstance.LastSQLError)
assert.EqualValues(t, 0, replicaInstance.SQLDelay)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias)
a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{
LogFile: m.GetString("binary_log_file"),
LogPos: m.GetUint32("binary_log_pos"),
LogPos: m.GetUint64("binary_log_pos"),
Type: BinaryLog,
}
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtorc/inst/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@ const (
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
type BinlogCoordinates struct {
LogFile string
LogPos uint32
LogPos uint64
Type BinlogType
}

// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345"
// into a BinlogCoordinates struct.
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
tokens := strings.SplitN(logFileLogPos, ":", 2)
if len(tokens) != 2 {
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
}

logPos, err := strconv.ParseUint(tokens[1], 10, 32)
logPos, err := strconv.ParseUint(tokens[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
}
return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
}

// DisplayString returns a user-friendly string representation of these coordinates
Expand Down Expand Up @@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta
}
detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1]
logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32)
detachedCoordinates.LogPos = uint32(logPos)
detachedCoordinates.LogPos = logPos
return true, detachedCoordinates
}
4 changes: 2 additions & 2 deletions go/vt/vtorc/inst/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func TestPreviousFileCoordinates(t *testing.T) {

require.NoError(t, err)
require.Equal(t, previous.LogFile, "mysql-bin.000009")
require.Equal(t, previous.LogPos, uint32(0))
require.Equal(t, previous.LogPos, uint64(0))
}

func TestNextFileCoordinates(t *testing.T) {
next, err := testCoordinates.NextFileCoordinates()

require.NoError(t, err)
require.Equal(t, next.LogFile, "mysql-bin.000011")
require.Equal(t, next.LogPos, uint32(0))
require.Equal(t, next.LogPos, uint64(0))
}

func TestBinlogCoordinates(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,14 +516,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
instance.GtidErrant = m.GetString("gtid_errant")
instance.UsingMariaDBGTID = m.GetBool("mariadb_gtid")
instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file")
instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos")
instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos")
instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file")
instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos")
instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos")
instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file")
instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos")
instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos")
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file")
instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos")
instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos")
instance.RelaylogCoordinates.Type = RelayLog
instance.LastSQLError = m.GetString("last_sql_error")
instance.LastIOError = m.GetString("last_io_error")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ func TestEmergencyReparentShard(t *testing.T) {
},
},
})
goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455")
goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64
require.NoError(t, err)
goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{
GTIDSet: goodReplica1RelayLogPos,
}
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestEmergencyReparentShard(t *testing.T) {

// run EmergencyReparentShard
waitReplicaTimeout := time.Second * 2
err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
topoproto.TabletAliasString(newPrimary.Tablet.Alias)})
require.NoError(t, err)
// check what was run
Expand Down

0 comments on commit 5882501

Please sign in to comment.