Skip to content

Commit

Permalink
Throttler/vreplication: fix app name used by VPlayer (#16578)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Aug 11, 2024
1 parent bf0c5f8 commit f68e62d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 1 deletion.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
timeLastSaved: time.Now(),
tablePlans: make(map[string]*TablePlan),
phase: phase,
throttlerAppName: throttlerapp.VCopierName.ConcatenateString(vr.throttlerAppName()),
throttlerAppName: throttlerapp.VPlayerName.ConcatenateString(vr.throttlerAppName()),
query: queryFunc,
commit: commitFunc,
batchMode: batchMode,
Expand Down
57 changes: 57 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vreplicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/mysqlctl"
Expand Down Expand Up @@ -815,3 +816,59 @@ func waitForQueryResult(t *testing.T, dbc binlogplayer.DBClient, query, val stri
}
}
}

func TestThrottlerAppNames(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tablet := addTablet(100)
defer deleteTablet(tablet)
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
}},
}
bls := &binlogdatapb.BinlogSource{
Keyspace: env.KeyspaceName,
Shard: env.ShardName,
Filter: filter,
}
id := int32(1)
vsclient := newTabletConnector(tablet)
stats := binlogplayer.NewStats()
defer stats.Stop()
dbClient := playerEngine.dbClientFactoryFiltered()
err := dbClient.Connect()
require.NoError(t, err)
defer dbClient.Close()
dbName := dbClient.DBName()
// Ensure there's a dummy vreplication workflow record
_, err = dbClient.ExecuteFetch(fmt.Sprintf("insert into _vt.vreplication (id, workflow, source, pos, max_tps, max_replication_lag, time_updated, transaction_timestamp, state, db_name, options) values (%d, 'test_workflow', '', '', 99999, 99999, 0, 0, 'Running', '%s', '{}') on duplicate key update workflow='test', source='', pos='', max_tps=99999, max_replication_lag=99999, time_updated=0, transaction_timestamp=0, state='Running', db_name='%s'",
id, dbName, dbName), 1)
require.NoError(t, err)
defer func() {
_, err = dbClient.ExecuteFetch(fmt.Sprintf("delete from _vt.vreplication where id = %d", id), 1)
require.NoError(t, err)
}()
vr := newVReplicator(id, bls, vsclient, stats, dbClient, env.Mysqld, playerEngine)
settings, _, err := vr.loadSettings(ctx, newVDBClient(dbClient, stats))
require.NoError(t, err)

throttlerAppName := vr.throttlerAppName()
assert.Contains(t, throttlerAppName, "test_workflow")
assert.Contains(t, throttlerAppName, "vreplication")
assert.NotContains(t, throttlerAppName, "vcopier")
assert.NotContains(t, throttlerAppName, "vplayer")

vp := newVPlayer(vr, settings, nil, replication.Position{}, "")
assert.Contains(t, vp.throttlerAppName, "test_workflow")
assert.Contains(t, vp.throttlerAppName, "vreplication")
assert.Contains(t, vp.throttlerAppName, "vplayer")
assert.NotContains(t, vp.throttlerAppName, "vcopier")

vc := newVCopier(vr)
assert.Contains(t, vc.throttlerAppName, "test_workflow")
assert.Contains(t, vc.throttlerAppName, "vreplication")
assert.Contains(t, vc.throttlerAppName, "vcopier")
assert.NotContains(t, vc.throttlerAppName, "vplayer")
}

0 comments on commit f68e62d

Please sign in to comment.