Skip to content

Commit

Permalink
apply patch 11325 (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
pbibra authored Oct 20, 2023
1 parent 51c1507 commit c14d752
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 35 deletions.
156 changes: 156 additions & 0 deletions go/test/endtoend/vreplication/vschema_load_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2022 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"context"
"fmt"
"net"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

// TestVSchemaChangesUnderLoad tests vstreamer under a load of high binlog events and simultaneous multiple vschema changes
// see https://github.com/vitessio/vitess/issues/11169
func TestVSchemaChangesUnderLoad(t *testing.T) {

extendedTimeout := defaultTimeout * 4

defaultCellName := "zone1"
allCells := []string{"zone1"}
allCellNames = "zone1"
vc = NewVitessCluster(t, "TestVSchemaChanges", allCells, mainClusterConfig)

require.NotNil(t, vc)

defer vc.TearDown(t)

defaultCell = vc.Cells[defaultCellName]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, 1, 0, 100, sourceKsOpts)
vtgate = defaultCell.Vtgates[0]
require.NotNil(t, vtgate)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.primary", "product", "0"), 1)
vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.replica", "product", "0"), 1)
vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()

// ch is used to signal that there is significant data inserted into the tables and when a lot of vschema changes have been applied
ch := make(chan bool, 1)

ctx := context.Background()
initialDataInserted := false
startCid := 100
warmupRowCount := startCid + 2000
insertData := func() {
timer := time.NewTimer(extendedTimeout)
defer timer.Stop()
log.Infof("Inserting data into customer")
cid := startCid
for {
if !initialDataInserted && cid > warmupRowCount {
log.Infof("Done inserting initial data into customer")
initialDataInserted = true
ch <- true
}
query := fmt.Sprintf("insert into customer(cid, name) values (%d, 'a')", cid)
_, _ = vtgateConn.ExecuteFetch(query, 1, false)
cid++
query = "update customer set name = concat(name, 'a')"
_, _ = vtgateConn.ExecuteFetch(query, 10000, false)
select {
case <-timer.C:
log.Infof("Done inserting data into customer")
return
default:
}
}
}
go func() {
log.Infof("Starting to vstream from replica")
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Shard: "0",
Gtid: "",
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "customer",
Filter: "select * from customer",
}},
}
conn, err := vtgateconn.Dial(ctx, net.JoinHostPort("localhost", strconv.Itoa(vc.ClusterConfig.vtgateGrpcPort)))
require.NoError(t, err)
defer conn.Close()

flags := &vtgatepb.VStreamFlags{}

ctx2, cancel := context.WithTimeout(ctx, extendedTimeout/2)
defer cancel()
reader, err := conn.VStream(ctx2, topodatapb.TabletType_REPLICA, vgtid, filter, flags)
require.NoError(t, err)
_, err = reader.Recv()
require.NoError(t, err)
log.Infof("About to sleep in vstreaming to block the vstream Recv() channel")
time.Sleep(extendedTimeout)
log.Infof("Done vstreaming")
}()

go insertData()
<-ch // wait for enough data to be inserted before ApplyVSchema
const maxApplyVSchemas = 20
go func() {
numApplyVSchema := 0
timer := time.NewTimer(extendedTimeout)
defer timer.Stop()
log.Infof("Started ApplyVSchema")
for {
if err := vc.VtctlClient.ExecuteCommand("ApplyVSchema", "--", "--vschema={}", "product"); err != nil {
log.Errorf("ApplyVSchema command failed with %+v\n", err)
return
}
numApplyVSchema++
if numApplyVSchema > maxApplyVSchemas {
ch <- true
}
select {
case <-timer.C:
log.Infof("Done ApplyVSchema")
ch <- true
return
default:
time.Sleep(defaultTick)
}
}
}()

<-ch // wait for enough ApplyVSchema calls before doing a PRS
if err := vc.VtctlClient.ExecuteCommand("PlannedReparentShard", "--", "--keyspace_shard", "product/0",
"--new_primary", "zone1-101", "--wait_replicas_timeout", defaultTimeout.String()); err != nil {
require.NoError(t, err, "PlannedReparentShard command failed")
}
}
44 changes: 20 additions & 24 deletions go/vt/vttablet/tabletserver/vstreamer/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/dbconfigs"
Expand All @@ -45,6 +46,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand All @@ -67,7 +69,7 @@ type Engine struct {
wg sync.WaitGroup

mu sync.Mutex
isOpen bool
isOpen int32 // 0 or 1 in place of atomic.Bool added in go 1.19
streamIdx int
streamers map[int]*uvstreamer
rowStreamers map[int]*rowStreamer
Expand Down Expand Up @@ -150,30 +152,24 @@ func (vse *Engine) InitDBConfig(keyspace, shard string) {

// Open starts the Engine service.
func (vse *Engine) Open() {
vse.mu.Lock()
defer vse.mu.Unlock()
if vse.isOpen {
return
}
log.Info("VStreamer: opening")
vse.isOpen = true
// If it's not already open, then open it now.
atomic.CompareAndSwapInt32(&vse.isOpen, 0, 1)
}

// IsOpen checks if the engine is opened
func (vse *Engine) IsOpen() bool {
vse.mu.Lock()
defer vse.mu.Unlock()
return vse.isOpen
return atomic.LoadInt32(&vse.isOpen) == 1
}

// Close closes the Engine service.
func (vse *Engine) Close() {
func() {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return
}
vse.mu.Lock()
defer vse.mu.Unlock()
// cancels are non-blocking.
for _, s := range vse.streamers {
s.Cancel()
Expand All @@ -184,7 +180,7 @@ func (vse *Engine) Close() {
for _, s := range vse.resultStreamers {
s.Cancel()
}
vse.isOpen = false
atomic.StoreInt32(&vse.isOpen, 0)
}()

// Wait only after releasing the lock because the end of every
Expand All @@ -209,11 +205,11 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, tablePKs []*binl

// Create stream and add it to the map.
streamer, idx, err := func() (*uvstreamer, int, error) {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return nil, 0, errors.New("VStreamer is not open")
}
vse.mu.Lock()
defer vse.mu.Unlock()
streamer := newUVStreamer(ctx, vse, vse.env.Config().DB.FilteredWithDB(), vse.se, startPos, tablePKs, filter, vse.lvschema, send)
idx := vse.streamIdx
vse.streamers[idx] = streamer
Expand Down Expand Up @@ -250,11 +246,11 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp

// Create stream and add it to the map.
rowStreamer, idx, err := func() (*rowStreamer, int, error) {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return nil, 0, errors.New("VStreamer is not open")
}
vse.mu.Lock()
defer vse.mu.Unlock()

rowStreamer := newRowStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), vse.se, query, lastpk, vse.lvschema, send, vse)
idx := vse.streamIdx
Expand Down Expand Up @@ -285,11 +281,11 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp
func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*binlogdatapb.VStreamResultsResponse) error) error {
// Create stream and add it to the map.
resultStreamer, idx, err := func() (*resultStreamer, int, error) {
vse.mu.Lock()
defer vse.mu.Unlock()
if !vse.isOpen {
if atomic.LoadInt32(&vse.isOpen) == 0 {
return nil, 0, errors.New("VStreamer is not open")
}
vse.mu.Lock()
defer vse.mu.Unlock()
resultStreamer := newResultStreamer(ctx, vse.env.Config().DB.FilteredWithDB(), query, send, vse)
idx := vse.streamIdx
vse.resultStreamers[idx] = resultStreamer
Expand Down Expand Up @@ -443,7 +439,7 @@ func (vse *Engine) waitForMySQL(ctx context.Context, db dbconfigs.Connector, tab
}
select {
case <-ctx.Done():
return ctx.Err()
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
case <-time.After(backoff):
// Exponential backoff with 1.5 as a factor
if backoff != backoffLimit {
Expand Down
40 changes: 29 additions & 11 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/timer"
vtschema "vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
Expand All @@ -40,6 +41,7 @@ import (

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
Expand Down Expand Up @@ -139,6 +141,12 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) {
select {
case vs.vevents <- vschema:
case <-vs.ctx.Done():
default: // if there is a pending vschema in the channel, drain it and update it with the latest one
select {
case <-vs.vevents:
vs.vevents <- vschema
default:
}
}
}

Expand Down Expand Up @@ -278,13 +286,18 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

injectHeartbeat := func(throttled bool) error {
now := time.Now().UnixNano()
err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
Throttled: throttled,
})
return err
select {
case <-ctx.Done():
return vterrors.Errorf(vtrpcpb.Code_CANCELED, "context has expired")
default:
err := bufferAndTransmit(&binlogdatapb.VEvent{
Type: binlogdatapb.VEventType_HEARTBEAT,
Timestamp: now / 1e9,
CurrentTime: now,
Throttled: throttled,
})
return err
}
}

throttleEvents := func(throttledEvents chan mysql.BinlogEvent) {
Expand Down Expand Up @@ -361,11 +374,16 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog
}
}
case vs.vschema = <-vs.vevents:
if err := vs.rebuildPlans(); err != nil {
return err
select {
case <-ctx.Done():
return nil
default:
if err := vs.rebuildPlans(); err != nil {
return err
}
// Increment this counter for testing.
vschemaUpdateCount.Add(1)
}
// Increment this counter for testing.
vschemaUpdateCount.Add(1)
case <-ctx.Done():
return nil
case <-hbTimer.C:
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -961,6 +961,15 @@
"RetryMax": 2,
"Tags": []
},
"vreplication_vschema_load": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestVSchemaChangesUnderLoad"],
"Command": [],
"Manual": false,
"Shard": "vreplication_cellalias",
"RetryMax": 2,
"Tags": []
},
"vreplication_basic": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestBasicVreplicationWorkflow"],
Expand Down

0 comments on commit c14d752

Please sign in to comment.