Skip to content

Commit 130f090

Browse files
committed
feat: mark a tablet not serving in broadcast if the disk is stalled
Signed-off-by: Manan Gupta <manan@planetscale.com>
1 parent 00d359b commit 130f090

File tree

9 files changed

+102
-15
lines changed

9 files changed

+102
-15
lines changed

go/vt/vttablet/tabletmanager/rpc_replication.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
6464
// Return if the disk is stalled or rejecting writes.
6565
// If the disk is stalled, we can't be sure if reads will go through
6666
// or not, so we should not run any reads either.
67-
if tm.dhMonitor.IsDiskStalled() {
67+
if tm.QueryServiceControl.IsDiskStalled() {
6868
return &replicationdatapb.FullStatus{
6969
DiskStalled: true,
7070
}, nil

go/vt/vttablet/tabletmanager/tm_init.go

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,8 @@ var (
9595
skipBuildInfoTags = "/.*/"
9696
initTags flagutil.StringMapValue
9797

98-
initTimeout = 1 * time.Minute
99-
mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
100-
stalledDiskWriteDir = ""
101-
stalledDiskWriteTimeout = 30 * time.Second
102-
stalledDiskWriteInterval = 5 * time.Second
98+
initTimeout = 1 * time.Minute
99+
mysqlShutdownTimeout = mysqlctl.DefaultShutdownTimeout
103100
)
104101

105102
func registerInitFlags(fs *pflag.FlagSet) {
@@ -112,9 +109,6 @@ func registerInitFlags(fs *pflag.FlagSet) {
112109
fs.Var(&initTags, "init_tags", "(init parameter) comma separated list of key:value pairs used to tag the tablet")
113110
fs.DurationVar(&initTimeout, "init_timeout", initTimeout, "(init parameter) timeout to use for the init phase.")
114111
fs.DurationVar(&mysqlShutdownTimeout, "mysql-shutdown-timeout", mysqlShutdownTimeout, "timeout to use when MySQL is being shut down.")
115-
fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled")
116-
fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled")
117-
fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled")
118112
}
119113

120114
var (
@@ -170,7 +164,6 @@ type TabletManager struct {
170164
VREngine *vreplication.Engine
171165
VDiffEngine *vdiff.Engine
172166
Env *vtenv.Environment
173-
dhMonitor DiskHealthMonitor
174167

175168
// tmc is used to run an RPC against other vttablets.
176169
tmc tmclient.TabletManagerClient
@@ -379,7 +372,6 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, config *tabletenv.Tabl
379372
tm.tmc = tmclient.NewTabletManagerClient()
380373
tm.tmState = newTMState(tm, tablet)
381374
tm.actionSema = semaphore.NewWeighted(1)
382-
tm.dhMonitor = newDiskHealthMonitor(tm.BatchCtx)
383375
tm._waitForGrantsComplete = make(chan struct{})
384376

385377
tm.baseTabletType = tablet.Type

go/vt/vttablet/tabletserver/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,9 @@ type Controller interface {
122122

123123
// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
124124
SetDemotePrimaryStalled()
125+
126+
// IsDiskStalled returns if the disk is stalled.
127+
IsDiskStalled() bool
125128
}
126129

127130
// Ensure TabletServer satisfies Controller interface.

go/vt/vttablet/tabletmanager/disk_health_monitor.go renamed to go/vt/vttablet/tabletserver/disk_health_monitor.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,20 @@
1-
package tabletmanager
1+
/*
2+
Copyright 2024 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package tabletserver
218

319
import (
420
"context"
@@ -7,8 +23,29 @@ import (
723
"strconv"
824
"sync"
925
"time"
26+
27+
"github.com/spf13/pflag"
28+
29+
"vitess.io/vitess/go/vt/servenv"
1030
)
1131

32+
var (
33+
stalledDiskWriteDir = ""
34+
stalledDiskWriteTimeout = 30 * time.Second
35+
stalledDiskWriteInterval = 5 * time.Second
36+
)
37+
38+
func init() {
39+
servenv.OnParseFor("vtcombo", registerInitFlags)
40+
servenv.OnParseFor("vttablet", registerInitFlags)
41+
}
42+
43+
func registerInitFlags(fs *pflag.FlagSet) {
44+
fs.StringVar(&stalledDiskWriteDir, "disk-write-dir", stalledDiskWriteDir, "if provided, tablet will attempt to write a file to this directory to check if the disk is stalled")
45+
fs.DurationVar(&stalledDiskWriteTimeout, "disk-write-timeout", stalledDiskWriteTimeout, "if writes exceed this duration, the disk is considered stalled")
46+
fs.DurationVar(&stalledDiskWriteInterval, "disk-write-interval", stalledDiskWriteInterval, "how often to write to the disk to check whether it is stalled")
47+
}
48+
1249
type DiskHealthMonitor interface {
1350
// IsDiskStalled returns true if the disk is stalled or rejecting writes.
1451
IsDiskStalled() bool

go/vt/vttablet/tabletmanager/disk_health_monitor_test.go renamed to go/vt/vttablet/tabletserver/disk_health_monitor_test.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,20 @@
1-
package tabletmanager
1+
/*
2+
Copyright 2024 The Vitess Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package tabletserver
218

319
import (
420
"context"

go/vt/vttablet/tabletserver/state_manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type stateManager struct {
9797
replHealthy bool
9898
demotePrimaryStalled bool
9999
lameduck bool
100+
dhMonitor DiskHealthMonitor
100101
alsoAllow []topodatapb.TabletType
101102
reason string
102103
transitionErr error
@@ -777,7 +778,7 @@ func (sm *stateManager) IsServing() bool {
777778
}
778779

779780
func (sm *stateManager) isServingLocked() bool {
780-
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck
781+
return sm.state == StateServing && sm.wantState == StateServing && sm.replHealthy && !sm.demotePrimaryStalled && !sm.lameduck && !sm.dhMonitor.IsDiskStalled()
781782
}
782783

783784
func (sm *stateManager) AppendDetails(details []*kv) []*kv {

go/vt/vttablet/tabletserver/state_manager_test.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ import (
4141
var testNow = time.Now()
4242

4343
func TestStateManagerStateByName(t *testing.T) {
44-
sm := &stateManager{}
44+
sm := &stateManager{
45+
dhMonitor: newNoopDiskHealthMonitor(),
46+
}
4547

4648
sm.replHealthy = true
4749
sm.wantState = StateServing
@@ -147,6 +149,29 @@ func TestStateManagerUnservePrimary(t *testing.T) {
147149
assert.Equal(t, StateNotServing, sm.state)
148150
}
149151

152+
type testDiskMonitor struct {
153+
isDiskStalled bool
154+
}
155+
156+
func (t *testDiskMonitor) IsDiskStalled() bool {
157+
return t.isDiskStalled
158+
}
159+
160+
// TestIsServingLocked tests isServingLocked() functionality.
161+
func TestIsServingLocked(t *testing.T) {
162+
sm := newTestStateManager()
163+
defer sm.StopService()
164+
tdm := &testDiskMonitor{isDiskStalled: false}
165+
sm.dhMonitor = tdm
166+
167+
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
168+
require.NoError(t, err)
169+
require.True(t, sm.isServingLocked())
170+
171+
tdm.isDiskStalled = true
172+
require.False(t, sm.isServingLocked())
173+
}
174+
150175
func TestStateManagerUnserveNonPrimary(t *testing.T) {
151176
sm := newTestStateManager()
152177
defer sm.StopService()
@@ -792,6 +817,7 @@ func newTestStateManager() *stateManager {
792817
te: &testTxEngine{},
793818
messager: &testSubcomponent{},
794819
ddle: &testOnlineDDLExecutor{},
820+
dhMonitor: newNoopDiskHealthMonitor(),
795821
throttler: &testLagThrottler{},
796822
tableGC: &testTableGC{},
797823
rw: newRequestsWaiter(),

go/vt/vttablet/tabletserver/tabletserver.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c
207207
throttler: tsv.lagThrottler,
208208
tableGC: tsv.tableGC,
209209
rw: newRequestsWaiter(),
210+
dhMonitor: newDiskHealthMonitor(ctx),
210211
}
211212

212213
tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) })
@@ -767,6 +768,11 @@ func (tsv *TabletServer) SetDemotePrimaryStalled() {
767768
tsv.BroadcastHealth()
768769
}
769770

771+
// IsDiskStalled returns if the disk is stalled or not.
772+
func (tsv *TabletServer) IsDiskStalled() bool {
773+
return tsv.sm.dhMonitor.IsDiskStalled()
774+
}
775+
770776
// CreateTransaction creates the metadata for a 2PC transaction.
771777
func (tsv *TabletServer) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
772778
return tsv.execRequest(

go/vt/vttablet/tabletservermock/controller.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,12 @@ func (tqsc *Controller) SetDemotePrimaryStalled() {
279279
tqsc.MethodCalled["SetDemotePrimaryStalled"] = true
280280
}
281281

282+
// IsDiskStalled is part of the tabletserver.Controller interface
283+
func (tqsc *Controller) IsDiskStalled() bool {
284+
tqsc.MethodCalled["IsDiskStalled"] = true
285+
return false
286+
}
287+
282288
// EnterLameduck implements tabletserver.Controller.
283289
func (tqsc *Controller) EnterLameduck() {
284290
tqsc.mu.Lock()

0 commit comments

Comments
 (0)