Skip to content

Commit 9451be5

Browse files
[release-20.0] Use uint64 for binary log file position (#17472) (#17506)
Signed-off-by: Matt Lord <mattalord@gmail.com> Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
1 parent 8f1dad7 commit 9451be5

File tree

14 files changed

+61
-45
lines changed

14 files changed

+61
-45
lines changed

go/mysql/binlog_event.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ type BinlogEvent interface {
4848
IsValid() bool
4949

5050
// General protocol events.
51-
NextPosition() uint32
51+
NextPosition() uint64
5252

5353
// IsFormatDescription returns true if this is a
5454
// FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before

go/mysql/binlog_event_common.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ import (
4747
// +----------------------------+
4848
// | extra_headers 19 : x-19 |
4949
// +============================+
50-
// http://dev.mysql.com/doc/internals/en/event-header-fields.html
50+
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header
5151
type binlogEvent []byte
5252

5353
const (
@@ -121,8 +121,9 @@ func (ev binlogEvent) Length() uint32 {
121121
}
122122

123123
// NextPosition returns the nextPosition field from the header
124-
func (ev binlogEvent) NextPosition() uint32 {
125-
return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4])
124+
func (ev binlogEvent) NextPosition() uint64 {
125+
// Only 4 bytes are used for the next_position field in the header.
126+
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
126127
}
127128

128129
// IsFormatDescription implements BinlogEvent.IsFormatDescription().

go/mysql/binlog_event_filepos.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte
7575

7676
// nextPosition returns the next file position of the binlog.
7777
// If no information is available, it returns 0.
78-
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 {
78+
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 {
7979
if f.HeaderLength <= 13 {
8080
// Dead code. This is just a failsafe.
8181
return 0
8282
}
83-
return binary.LittleEndian.Uint32(ev.Bytes()[13:17])
83+
// The header only uses 4 bytes for the next_position.
84+
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
8485
}
8586

8687
// rotate implements BinlogEvent.Rotate().
@@ -139,7 +140,7 @@ type filePosFakeEvent struct {
139140
timestamp uint32
140141
}
141142

142-
func (ev filePosFakeEvent) NextPosition() uint32 {
143+
func (ev filePosFakeEvent) NextPosition() uint64 {
143144
return 0
144145
}
145146

@@ -275,7 +276,7 @@ type filePosGTIDEvent struct {
275276
gtid replication.FilePosGTID
276277
}
277278

278-
func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent {
279+
func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent {
279280
return filePosGTIDEvent{
280281
filePosFakeEvent: filePosFakeEvent{
281282
timestamp: timestamp,

go/mysql/replication.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package mysql
1818

1919
import (
2020
"fmt"
21+
"math"
2122

2223
"vitess.io/vitess/go/mysql/sqlerror"
2324
"vitess.io/vitess/go/vt/proto/vtrpc"
@@ -32,9 +33,14 @@ const (
3233
// This file contains the methods related to replication.
3334

3435
// WriteComBinlogDump writes a ComBinlogDump command.
35-
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax.
3636
// Returns a SQLError.
37-
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error {
37+
// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html
38+
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error {
39+
// The binary log file position is a uint64, but the protocol command
40+
// only uses 4 bytes for the file position.
41+
if binlogPos > math.MaxUint32 {
42+
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos)
43+
}
3844
c.sequence = 0
3945
length := 1 + // ComBinlogDump
4046
4 + // binlog-pos
@@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog
4349
len(binlogFilename) // binlog-filename
4450
data, pos := c.startEphemeralPacketWithHeader(length)
4551
pos = writeByte(data, pos, ComBinlogDump)
46-
pos = writeUint32(data, pos, binlogPos)
52+
pos = writeUint32(data, pos, uint32(binlogPos))
4753
pos = writeUint16(data, pos, flags)
4854
pos = writeUint32(data, pos, serverID)
4955
_ = writeEOFString(data, pos, binlogFilename)

go/mysql/replication/filepos_gtid.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) {
3333
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s)
3434
}
3535

36-
pos, err := strconv.ParseUint(parts[1], 0, 32)
36+
pos, err := strconv.ParseUint(parts[1], 0, 64)
3737
if err != nil {
3838
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s)
3939
}
4040

4141
return FilePosGTID{
4242
File: parts[0],
43-
Pos: uint32(pos),
43+
Pos: pos,
4444
}, nil
4545
}
4646

@@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) {
5656
// FilePosGTID implements GTID.
5757
type FilePosGTID struct {
5858
File string
59-
Pos uint32
59+
Pos uint64
6060
}
6161

6262
// String implements GTID.String().

go/mysql/replication/filepos_gtid_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323
func Test_filePosGTID_String(t *testing.T) {
2424
type fields struct {
2525
file string
26-
pos uint32
26+
pos uint64
2727
}
2828
tests := []struct {
2929
name string
@@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) {
3535
fields{file: "mysql-bin.166031", pos: 192394},
3636
"mysql-bin.166031:192394",
3737
},
38+
{
39+
"handles large position correctly",
40+
fields{file: "vt-1448040107-bin.003222", pos: 4663881395},
41+
"vt-1448040107-bin.003222:4663881395",
42+
},
3843
}
3944
for _, tt := range tests {
4045
t.Run(tt.name, func(t *testing.T) {
@@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) {
5257
func Test_filePosGTID_ContainsGTID(t *testing.T) {
5358
type fields struct {
5459
file string
55-
pos uint32
60+
pos uint64
5661
}
5762
type args struct {
5863
other GTID

go/mysql/replication_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package mysql
1818

1919
import (
20+
"math"
2021
"reflect"
2122
"testing"
2223

@@ -34,6 +35,10 @@ func TestComBinlogDump(t *testing.T) {
3435
cConn.Close()
3536
}()
3637

38+
// Try to write a ComBinlogDump packet with a position greater than 4 bytes.
39+
err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e)
40+
require.Error(t, err)
41+
3742
// Write ComBinlogDump packet, read it, compare.
3843
if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil {
3944
t.Fatalf("WriteComBinlogDump failed: %v", err)

go/test/endtoend/migration/migration_test.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi
132132
streams from the same source. The main difference between an external source vs a vitess
133133
source is that the source proto contains an "external_mysql" field instead of keyspace and shard.
134134
That field is the key into the externalConnections section of the input yaml.
135-
136-
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter:<rules:<match:\"product\" > > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
137-
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter:<rules:<match:\"customer\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
138-
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter:<rules:<match:\"orders\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
139135
*/
140136
func TestMigration(t *testing.T) {
141137
yamlFile := startCluster(t)
@@ -155,7 +151,7 @@ func TestMigration(t *testing.T) {
155151
migrate(t, "customer", "commerce", []string{"customer"})
156152
migrate(t, "customer", "commerce", []string{"orders"})
157153
vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess
158-
waitForVReplicationToCatchup(t, vttablet, 1*time.Second)
154+
waitForVReplicationToCatchup(t, vttablet, 30*time.Second)
159155

160156
testcases := []struct {
161157
query string
@@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) {
217213
var sqlEscaped bytes.Buffer
218214
val.EncodeSQL(&sqlEscaped)
219215
query := fmt.Sprintf("insert into _vt.vreplication "+
220-
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+
221-
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String())
222-
fmt.Printf("VReplicationExec: %s\n", query)
223-
vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess
224-
err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query)
216+
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+
217+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String())
218+
fmt.Printf("VReplication insert: %s\n", query)
219+
vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias
220+
err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query)
225221
require.NoError(t, err)
226222
}
227223

go/test/endtoend/vtorc/readtopologyinstance/main_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
8787
assert.Equal(t, "ON", primaryInstance.GTIDMode)
8888
assert.Equal(t, "FULL", primaryInstance.BinlogRowImage)
8989
assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID))
90-
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0))
90+
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0))
9191
assert.True(t, primaryInstance.SemiSyncPrimaryEnabled)
9292
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
9393
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
@@ -139,7 +139,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
139139
assert.Equal(t, utils.Hostname, replicaInstance.SourceHost)
140140
assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort)
141141
assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID))
142-
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0))
142+
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0))
143143
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
144144
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
145145
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
@@ -157,11 +157,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
157157
assert.True(t, replicaInstance.ReplicationIOThreadRuning)
158158
assert.True(t, replicaInstance.ReplicationSQLThreadRuning)
159159
assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
160-
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0))
160+
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0))
161161
assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
162-
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0))
162+
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0))
163163
assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID))
164-
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0))
164+
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0))
165165
assert.Empty(t, replicaInstance.LastIOError)
166166
assert.Empty(t, replicaInstance.LastSQLError)
167167
assert.EqualValues(t, 0, replicaInstance.SQLDelay)

go/vt/vtorc/inst/analysis_dao.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
319319
a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias)
320320
a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{
321321
LogFile: m.GetString("binary_log_file"),
322-
LogPos: m.GetUint32("binary_log_pos"),
322+
LogPos: m.GetUint64("binary_log_pos"),
323323
Type: BinaryLog,
324324
}
325325
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")

go/vt/vtorc/inst/binlog.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,22 +40,23 @@ const (
4040
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
4141
type BinlogCoordinates struct {
4242
LogFile string
43-
LogPos uint32
43+
LogPos uint64
4444
Type BinlogType
4545
}
4646

47-
// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
47+
// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345"
48+
// into a BinlogCoordinates struct.
4849
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
4950
tokens := strings.SplitN(logFileLogPos, ":", 2)
5051
if len(tokens) != 2 {
5152
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
5253
}
5354

54-
logPos, err := strconv.ParseUint(tokens[1], 10, 32)
55+
logPos, err := strconv.ParseUint(tokens[1], 10, 64)
5556
if err != nil {
5657
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
5758
}
58-
return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil
59+
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
5960
}
6061

6162
// DisplayString returns a user-friendly string representation of these coordinates
@@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta
177178
}
178179
detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1]
179180
logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32)
180-
detachedCoordinates.LogPos = uint32(logPos)
181+
detachedCoordinates.LogPos = logPos
181182
return true, detachedCoordinates
182183
}

go/vt/vtorc/inst/binlog_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,15 @@ func TestPreviousFileCoordinates(t *testing.T) {
4141

4242
require.NoError(t, err)
4343
require.Equal(t, previous.LogFile, "mysql-bin.000009")
44-
require.Equal(t, previous.LogPos, uint32(0))
44+
require.Equal(t, previous.LogPos, uint64(0))
4545
}
4646

4747
func TestNextFileCoordinates(t *testing.T) {
4848
next, err := testCoordinates.NextFileCoordinates()
4949

5050
require.NoError(t, err)
5151
require.Equal(t, next.LogFile, "mysql-bin.000011")
52-
require.Equal(t, next.LogPos, uint32(0))
52+
require.Equal(t, next.LogPos, uint64(0))
5353
}
5454

5555
func TestBinlogCoordinates(t *testing.T) {

go/vt/vtorc/inst/instance_dao.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,14 +511,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
511511
instance.GtidErrant = m.GetString("gtid_errant")
512512
instance.UsingMariaDBGTID = m.GetBool("mariadb_gtid")
513513
instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file")
514-
instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos")
514+
instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos")
515515
instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file")
516-
instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos")
516+
instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos")
517517
instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file")
518-
instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos")
518+
instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos")
519519
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
520520
instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file")
521-
instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos")
521+
instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos")
522522
instance.RelaylogCoordinates.Type = RelayLog
523523
instance.LastSQLError = m.GetString("last_sql_error")
524524
instance.LastIOError = m.GetString("last_io_error")

go/vt/wrangler/testlib/emergency_reparent_shard_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,8 @@ func TestEmergencyReparentShard(t *testing.T) {
132132
},
133133
},
134134
})
135-
goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455")
135+
goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64
136+
require.NoError(t, err)
136137
goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{
137138
GTIDSet: goodReplica1RelayLogPos,
138139
}
@@ -181,7 +182,7 @@ func TestEmergencyReparentShard(t *testing.T) {
181182

182183
// run EmergencyReparentShard
183184
waitReplicaTimeout := time.Second * 2
184-
err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
185+
err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
185186
topoproto.TabletAliasString(newPrimary.Tablet.Alias)})
186187
require.NoError(t, err)
187188
// check what was run

0 commit comments

Comments
 (0)