Skip to content

Commit 1c08cc2

Browse files
[release-19.0] Use uint64 for binary log file position (#17472) (#17505)
Signed-off-by: Matt Lord <mattalord@gmail.com> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com> Co-authored-by: Matt Lord <mattalord@gmail.com>
1 parent 1b615f5 commit 1c08cc2

File tree

15 files changed

+58
-41
lines changed

15 files changed

+58
-41
lines changed

go/mysql/binlog_event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type BinlogEvent interface {
4646
IsValid() bool
4747

4848
// General protocol events.
49-
NextPosition() uint32
49+
NextPosition() uint64
5050

5151
// IsFormatDescription returns true if this is a
5252
// FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before

go/mysql/binlog_event_common.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import (
4747
// +----------------------------+
4848
// | extra_headers 19 : x-19 |
4949
// +============================+
50-
// http://dev.mysql.com/doc/internals/en/event-header-fields.html
50+
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header
5151
type binlogEvent []byte
5252

5353
const (
@@ -121,8 +121,9 @@ func (ev binlogEvent) Length() uint32 {
121121
}
122122

123123
// NextPosition returns the nextPosition field from the header
124-
func (ev binlogEvent) NextPosition() uint32 {
125-
return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4])
124+
func (ev binlogEvent) NextPosition() uint64 {
125+
// Only 4 bytes are used for the next_position field in the header.
126+
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
126127
}
127128

128129
// IsFormatDescription implements BinlogEvent.IsFormatDescription().

go/mysql/binlog_event_filepos.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte
7575

7676
// nextPosition returns the next file position of the binlog.
7777
// If no information is available, it returns 0.
78-
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 {
78+
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 {
7979
if f.HeaderLength <= 13 {
8080
// Dead code. This is just a failsafe.
8181
return 0
8282
}
83-
return binary.LittleEndian.Uint32(ev.Bytes()[13:17])
83+
// The header only uses 4 bytes for the next_position.
84+
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
8485
}
8586

8687
// rotate implements BinlogEvent.Rotate().
@@ -139,7 +140,7 @@ type filePosFakeEvent struct {
139140
timestamp uint32
140141
}
141142

142-
func (ev filePosFakeEvent) NextPosition() uint32 {
143+
func (ev filePosFakeEvent) NextPosition() uint64 {
143144
return 0
144145
}
145146

@@ -275,7 +276,7 @@ type filePosGTIDEvent struct {
275276
gtid replication.FilePosGTID
276277
}
277278

278-
func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent {
279+
func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent {
279280
return filePosGTIDEvent{
280281
filePosFakeEvent: filePosFakeEvent{
281282
timestamp: timestamp,

go/mysql/endtoend/replication_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func connectForReplication(t *testing.T, rbr bool) (*mysql.Conn, mysql.BinlogFor
8181
}
8282

8383
// Write ComBinlogDump packet with to start streaming events from here.
84-
if err := conn.WriteComBinlogDump(1, file, uint32(position), 0); err != nil {
84+
if err := conn.WriteComBinlogDump(1, file, position, 0); err != nil {
8585
t.Fatalf("WriteComBinlogDump failed: %v", err)
8686
}
8787

go/mysql/replication.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ limitations under the License.
1717
package mysql
1818

1919
import (
20+
"math"
21+
2022
"vitess.io/vitess/go/mysql/sqlerror"
2123
"vitess.io/vitess/go/vt/proto/vtrpc"
2224
"vitess.io/vitess/go/vt/vterrors"
@@ -30,9 +32,14 @@ const (
3032
// This file contains the methods related to replication.
3133

3234
// WriteComBinlogDump writes a ComBinlogDump command.
33-
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax.
3435
// Returns a SQLError.
35-
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error {
36+
// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html
37+
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error {
38+
// The binary log file position is a uint64, but the protocol command
39+
// only uses 4 bytes for the file position.
40+
if binlogPos > math.MaxUint32 {
41+
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos)
42+
}
3643
c.sequence = 0
3744
length := 1 + // ComBinlogDump
3845
4 + // binlog-pos
@@ -41,7 +48,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog
4148
len(binlogFilename) // binlog-filename
4249
data, pos := c.startEphemeralPacketWithHeader(length)
4350
pos = writeByte(data, pos, ComBinlogDump)
44-
pos = writeUint32(data, pos, binlogPos)
51+
pos = writeUint32(data, pos, uint32(binlogPos))
4552
pos = writeUint16(data, pos, flags)
4653
pos = writeUint32(data, pos, serverID)
4754
_ = writeEOFString(data, pos, binlogFilename)

go/mysql/replication/filepos_gtid.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) {
3333
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s)
3434
}
3535

36-
pos, err := strconv.ParseUint(parts[1], 0, 32)
36+
pos, err := strconv.ParseUint(parts[1], 0, 64)
3737
if err != nil {
3838
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s)
3939
}
4040

4141
return FilePosGTID{
4242
File: parts[0],
43-
Pos: uint32(pos),
43+
Pos: pos,
4444
}, nil
4545
}
4646

@@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) {
5656
// FilePosGTID implements GTID.
5757
type FilePosGTID struct {
5858
File string
59-
Pos uint32
59+
Pos uint64
6060
}
6161

6262
// String implements GTID.String().

go/mysql/replication/filepos_gtid_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
func Test_filePosGTID_String(t *testing.T) {
2424
type fields struct {
2525
file string
26-
pos uint32
26+
pos uint64
2727
}
2828
tests := []struct {
2929
name string
@@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) {
3535
fields{file: "mysql-bin.166031", pos: 192394},
3636
"mysql-bin.166031:192394",
3737
},
38+
{
39+
"handles large position correctly",
40+
fields{file: "vt-1448040107-bin.003222", pos: 4663881395},
41+
"vt-1448040107-bin.003222:4663881395",
42+
},
3843
}
3944
for _, tt := range tests {
4045
t.Run(tt.name, func(t *testing.T) {
@@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) {
5257
func Test_filePosGTID_ContainsGTID(t *testing.T) {
5358
type fields struct {
5459
file string
55-
pos uint32
60+
pos uint64
5661
}
5762
type args struct {
5863
other GTID

go/mysql/replication_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package mysql
1818

1919
import (
20+
"math"
2021
"reflect"
2122
"testing"
2223

@@ -34,6 +35,10 @@ func TestComBinlogDump(t *testing.T) {
3435
cConn.Close()
3536
}()
3637

38+
// Try to write a ComBinlogDump packet with a position greater than 4 bytes.
39+
err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e)
40+
require.Error(t, err)
41+
3742
// Write ComBinlogDump packet, read it, compare.
3843
if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil {
3944
t.Fatalf("WriteComBinlogDump failed: %v", err)

go/test/endtoend/migration/migration_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi
132132
streams from the same source. The main difference between an external source vs a vitess
133133
source is that the source proto contains an "external_mysql" field instead of keyspace and shard.
134134
That field is the key into the externalConnections section of the input yaml.
135-
136-
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter:<rules:<match:\"product\" > > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
137-
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter:<rules:<match:\"customer\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
138-
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter:<rules:<match:\"orders\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
139135
*/
140136
func TestMigration(t *testing.T) {
141137
yamlFile := startCluster(t)
@@ -155,7 +151,7 @@ func TestMigration(t *testing.T) {
155151
migrate(t, "customer", "commerce", []string{"customer"})
156152
migrate(t, "customer", "commerce", []string{"orders"})
157153
vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess
158-
waitForVReplicationToCatchup(t, vttablet, 1*time.Second)
154+
waitForVReplicationToCatchup(t, vttablet, 30*time.Second)
159155

160156
testcases := []struct {
161157
query string

go/test/endtoend/vtorc/readtopologyinstance/main_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
8989
assert.Equal(t, "ON", primaryInstance.GTIDMode)
9090
assert.Equal(t, "FULL", primaryInstance.BinlogRowImage)
9191
assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID))
92-
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0))
92+
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0))
9393
assert.True(t, primaryInstance.SemiSyncPrimaryEnabled)
9494
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
9595
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
@@ -139,7 +139,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
139139
assert.Equal(t, utils.Hostname, replicaInstance.SourceHost)
140140
assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort)
141141
assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID))
142-
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0))
142+
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0))
143143
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
144144
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
145145
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
@@ -157,11 +157,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
157157
assert.True(t, replicaInstance.ReplicationIOThreadRuning)
158158
assert.True(t, replicaInstance.ReplicationSQLThreadRuning)
159159
assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
160-
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0))
160+
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0))
161161
assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
162-
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0))
162+
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0))
163163
assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID))
164-
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0))
164+
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0))
165165
assert.Empty(t, replicaInstance.LastIOError)
166166
assert.Empty(t, replicaInstance.LastSQLError)
167167
assert.EqualValues(t, 0, replicaInstance.SQLDelay)

go/vt/vtorc/inst/analysis_dao.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
345345
a.AnalyzedInstancePhysicalEnvironment = m.GetString("physical_environment")
346346
a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{
347347
LogFile: m.GetString("binary_log_file"),
348-
LogPos: m.GetUint32("binary_log_pos"),
348+
LogPos: m.GetUint64("binary_log_pos"),
349349
Type: BinaryLog,
350350
}
351351
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")

go/vt/vtorc/inst/binlog.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,23 @@ const (
4040
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
4141
type BinlogCoordinates struct {
4242
LogFile string
43-
LogPos uint32
43+
LogPos uint64
4444
Type BinlogType
4545
}
4646

47-
// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
47+
// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345"
48+
// into a BinlogCoordinates struct.
4849
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
4950
tokens := strings.SplitN(logFileLogPos, ":", 2)
5051
if len(tokens) != 2 {
5152
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
5253
}
5354

54-
logPos, err := strconv.ParseUint(tokens[1], 10, 32)
55+
logPos, err := strconv.ParseUint(tokens[1], 10, 64)
5556
if err != nil {
5657
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
5758
}
58-
return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil
59+
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
5960
}
6061

6162
// DisplayString returns a user-friendly string representation of these coordinates
@@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta
177178
}
178179
detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1]
179180
logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32)
180-
detachedCoordinates.LogPos = uint32(logPos)
181+
detachedCoordinates.LogPos = logPos
181182
return true, detachedCoordinates
182183
}

go/vt/vtorc/inst/binlog_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ func TestPreviousFileCoordinates(t *testing.T) {
4141

4242
require.NoError(t, err)
4343
require.Equal(t, previous.LogFile, "mysql-bin.000009")
44-
require.Equal(t, previous.LogPos, uint32(0))
44+
require.Equal(t, previous.LogPos, uint64(0))
4545
}
4646

4747
func TestNextFileCoordinates(t *testing.T) {
4848
next, err := testCoordinates.NextFileCoordinates()
4949

5050
require.NoError(t, err)
5151
require.Equal(t, next.LogFile, "mysql-bin.000011")
52-
require.Equal(t, next.LogPos, uint32(0))
52+
require.Equal(t, next.LogPos, uint64(0))
5353
}
5454

5555
func TestBinlogCoordinates(t *testing.T) {

go/vt/vtorc/inst/instance_dao.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -514,14 +514,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
514514
instance.GtidErrant = m.GetString("gtid_errant")
515515
instance.UsingMariaDBGTID = m.GetBool("mariadb_gtid")
516516
instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file")
517-
instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos")
517+
instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos")
518518
instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file")
519-
instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos")
519+
instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos")
520520
instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file")
521-
instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos")
521+
instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos")
522522
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
523523
instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file")
524-
instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos")
524+
instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos")
525525
instance.RelaylogCoordinates.Type = RelayLog
526526
instance.LastSQLError = m.GetString("last_sql_error")
527527
instance.LastIOError = m.GetString("last_io_error")

go/vt/wrangler/testlib/emergency_reparent_shard_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func TestEmergencyReparentShard(t *testing.T) {
132132
},
133133
},
134134
})
135-
goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455")
135+
goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64
136+
require.NoError(t, err)
136137
goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{
137138
GTIDSet: goodReplica1RelayLogPos,
138139
}
@@ -181,7 +182,7 @@ func TestEmergencyReparentShard(t *testing.T) {
181182

182183
// run EmergencyReparentShard
183184
waitReplicaTimeout := time.Second * 2
184-
err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
185+
err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
185186
topoproto.TabletAliasString(newPrimary.Tablet.Alias)})
186187
require.NoError(t, err)
187188
// check what was run

0 commit comments

Comments
 (0)