diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go deleted file mode 100644 index 163c4c44d4d..00000000000 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: TopologyWatcherInterface) - -// Package txthrottler is a generated GoMock package. -package txthrottler - -import ( - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockTopologyWatcherInterface is a mock of TopologyWatcherInterface interface. -type MockTopologyWatcherInterface struct { - ctrl *gomock.Controller - recorder *MockTopologyWatcherInterfaceMockRecorder -} - -// MockTopologyWatcherInterfaceMockRecorder is the mock recorder for MockTopologyWatcherInterface. -type MockTopologyWatcherInterfaceMockRecorder struct { - mock *MockTopologyWatcherInterface -} - -// NewMockTopologyWatcherInterface creates a new mock instance. -func NewMockTopologyWatcherInterface(ctrl *gomock.Controller) *MockTopologyWatcherInterface { - mock := &MockTopologyWatcherInterface{ctrl: ctrl} - mock.recorder = &MockTopologyWatcherInterfaceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockTopologyWatcherInterface) EXPECT() *MockTopologyWatcherInterfaceMockRecorder { - return m.recorder -} - -// Start mocks base method. -func (m *MockTopologyWatcherInterface) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MockTopologyWatcherInterfaceMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Start)) -} - -// Stop mocks base method. -func (m *MockTopologyWatcherInterface) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MockTopologyWatcherInterfaceMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Stop)) -} diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 18dede5f30a..596870081cb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -38,25 +38,20 @@ import ( ) // These vars store the functions used to create the topo server, healthcheck, -// topology watchers and go/vt/throttler. These are provided here so that they can be overridden +// and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( - healthCheckFactory healthCheckFactoryFunc - topologyWatcherFactory topologyWatcherFactoryFunc - throttlerFactory throttlerFactoryFunc + healthCheckFactory healthCheckFactoryFunc + throttlerFactory throttlerFactoryFunc ) func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) - } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) } @@ -149,7 +144,8 @@ type txThrottler struct { topoServer *topo.Server // stats - throttlerRunning *stats.Gauge + throttlerRunning *stats.Gauge + // TODO(deepthi): deprecated, should be deleted in v20 topoWatchers *stats.GaugesWithSingleLabel healthChecksReadTotal *stats.CountersWithMultiLabels healthChecksRecordedTotal *stats.CountersWithMultiLabels @@ -170,10 +166,9 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc - topologyWatchers map[string]TopologyWatcherInterface + throttleMu sync.Mutex + throttler ThrottlerInterface + stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck healthCheckChan chan *discovery.TabletHealth @@ -204,8 +199,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { config: config, topoServer: topoServer, throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"), - topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"), - healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", + topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "DEPRECATED: transaction throttler topology watchers", "cell"), healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", []string{"cell", "DbType"}), healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded", []string{"cell", "DbType"}), @@ -322,31 +316,12 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) ts.healthCheckChan = ts.healthCheck.Subscribe() - ts.topologyWatchers = make( - map[string]TopologyWatcherInterface, len(ts.healthCheckCells)) - for _, cell := range ts.healthCheckCells { - ts.topologyWatchers[cell] = topologyWatcherFactory( - topoServer, - ts.healthCheck, - cell, - target.Keyspace, - target.Shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency, - ) - ts.txThrottler.topoWatchers.Add(cell, 1) - } } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { if ts.healthCheck == nil { return } - for cell, watcher := range ts.topologyWatchers { - watcher.Stop() - ts.txThrottler.topoWatchers.Reset(cell) - } - ts.topologyWatchers = nil ts.stopHealthCheck() ts.healthCheck.Close() } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index ea57d37ad8e..268a37437d9 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -19,7 +19,6 @@ package txthrottler // Commands to generate the mocks for this test. //go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck //go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface -//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( "context" @@ -74,16 +73,6 @@ func TestEnabledThrottler(t *testing.T) { return mockHealthCheck } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - assert.Equal(t, ts, topoServer) - assert.Contains(t, []string{"cell1", "cell2"}, cell) - assert.Equal(t, "keyspace", keyspace) - assert.Equal(t, "shard", shard) - result := NewMockTopologyWatcherInterface(mockCtrl) - result.EXPECT().Stop() - return result - } - mockThrottler := NewMockThrottlerInterface(mockCtrl) throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { assert.Equal(t, 1, threadCount) @@ -131,7 +120,6 @@ func TestEnabledThrottler(t *testing.T) { throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl) assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) assert.False(t, throttlerImpl.Throttle(100, "some_workload")) assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"]) @@ -162,7 +150,6 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"]) throttlerImpl.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) - assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) } func TestFetchKnownCells(t *testing.T) {