Skip to content

Commit 9e40015

Browse files
Prevent adding to query details after unserve common has started (vitessio#15684)
Signed-off-by: Manan Gupta <manan@planetscale.com>
1 parent f118ba2 commit 9e40015

File tree

7 files changed

+99
-19
lines changed

7 files changed

+99
-19
lines changed

go/test/endtoend/vtgate/transaction/restart/main_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"os"
2424
"testing"
2525

26-
"github.com/stretchr/testify/assert"
2726
"github.com/stretchr/testify/require"
2827

2928
"vitess.io/vitess/go/mysql"
@@ -113,5 +112,4 @@ func TestStreamTxRestart(t *testing.T) {
113112
// query should return connection error
114113
_, err = utils.ExecAllowError(t, conn, "select connection_id()")
115114
require.Error(t, err)
116-
assert.Contains(t, err.Error(), "broken pipe (errno 2006) (sqlstate HY000)")
117115
}

go/vt/vttablet/tabletserver/livequeryz_test.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import (
2222
"net/http/httptest"
2323
"testing"
2424

25+
"github.com/stretchr/testify/require"
26+
2527
"vitess.io/vitess/go/vt/sqlparser"
2628
)
2729

@@ -30,8 +32,10 @@ func TestLiveQueryzHandlerJSON(t *testing.T) {
3032
req, _ := http.NewRequest("GET", "/livequeryz/?format=json", nil)
3133

3234
queryList := NewQueryList("test", sqlparser.NewTestParser())
33-
queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1}))
34-
queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2}))
35+
err := queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1}))
36+
require.NoError(t, err)
37+
err = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2}))
38+
require.NoError(t, err)
3539

3640
livequeryzHandler([]*QueryList{queryList}, resp, req)
3741
}
@@ -41,8 +45,10 @@ func TestLiveQueryzHandlerHTTP(t *testing.T) {
4145
req, _ := http.NewRequest("GET", "/livequeryz/", nil)
4246

4347
queryList := NewQueryList("test", sqlparser.NewTestParser())
44-
queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1}))
45-
queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2}))
48+
err := queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 1}))
49+
require.NoError(t, err)
50+
err = queryList.Add(NewQueryDetail(context.Background(), &testConn{id: 2}))
51+
require.NoError(t, err)
4652

4753
livequeryzHandler([]*QueryList{queryList}, resp, req)
4854
}
@@ -64,7 +70,8 @@ func TestLiveQueryzHandlerTerminateConn(t *testing.T) {
6470

6571
queryList := NewQueryList("test", sqlparser.NewTestParser())
6672
testConn := &testConn{id: 1}
67-
queryList.Add(NewQueryDetail(context.Background(), testConn))
73+
err := queryList.Add(NewQueryDetail(context.Background(), testConn))
74+
require.NoError(t, err)
6875
if testConn.IsKilled() {
6976
t.Fatalf("conn should still be alive")
7077
}

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1085,7 +1085,10 @@ func (qre *QueryExecutor) execDBConn(conn *connpool.Conn, sql string, wantfields
10851085
defer qre.logStats.AddRewrittenSQL(sql, time.Now())
10861086

10871087
qd := NewQueryDetail(qre.logStats.Ctx, conn)
1088-
qre.tsv.statelessql.Add(qd)
1088+
err := qre.tsv.statelessql.Add(qd)
1089+
if err != nil {
1090+
return nil, err
1091+
}
10891092
defer qre.tsv.statelessql.Remove(qd)
10901093

10911094
return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
@@ -1098,7 +1101,10 @@ func (qre *QueryExecutor) execStatefulConn(conn *StatefulConnection, sql string,
10981101
defer qre.logStats.AddRewrittenSQL(sql, time.Now())
10991102

11001103
qd := NewQueryDetail(qre.logStats.Ctx, conn)
1101-
qre.tsv.statefulql.Add(qd)
1104+
err := qre.tsv.statefulql.Add(qd)
1105+
if err != nil {
1106+
return nil, err
1107+
}
11021108
defer qre.tsv.statefulql.Remove(qd)
11031109

11041110
return conn.Exec(ctx, sql, int(qre.tsv.qe.maxResultSize.Load()), wantfields)
@@ -1122,11 +1128,17 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction
11221128
// once their grace period is over.
11231129
qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn)
11241130
if isTransaction {
1125-
qre.tsv.statefulql.Add(qd)
1131+
err := qre.tsv.statefulql.Add(qd)
1132+
if err != nil {
1133+
return err
1134+
}
11261135
defer qre.tsv.statefulql.Remove(qd)
11271136
return conn.Conn.StreamOnce(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
11281137
}
1129-
qre.tsv.olapql.Add(qd)
1138+
err := qre.tsv.olapql.Add(qd)
1139+
if err != nil {
1140+
return err
1141+
}
11301142
defer qre.tsv.olapql.Remove(qd)
11311143
return conn.Conn.Stream(ctx, sql, callBackClosingSpan, allocStreamResult, int(qre.tsv.qe.streamBufferSize.Load()), sqltypes.IncludeFieldsOrDefault(qre.options))
11321144
}

go/vt/vttablet/tabletserver/query_list.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@ import (
2626

2727
"vitess.io/vitess/go/streamlog"
2828
"vitess.io/vitess/go/vt/callinfo"
29+
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
2930
"vitess.io/vitess/go/vt/sqlparser"
31+
"vitess.io/vitess/go/vt/vterrors"
3032
)
3133

3234
// QueryDetail is a simple wrapper for Query, Context and a killable conn.
@@ -59,27 +61,52 @@ type QueryList struct {
5961
queryDetails map[int64][]*QueryDetail
6062

6163
parser *sqlparser.Parser
64+
ca ClusterActionState
6265
}
6366

67+
type ClusterActionState int
68+
69+
const (
70+
ClusterActionNotInProgress ClusterActionState = iota
71+
ClusterActionInProgress ClusterActionState = iota
72+
ClusterActionNoQueries ClusterActionState = iota
73+
)
74+
6475
// NewQueryList creates a new QueryList
6576
func NewQueryList(name string, parser *sqlparser.Parser) *QueryList {
6677
return &QueryList{
6778
name: name,
6879
queryDetails: make(map[int64][]*QueryDetail),
6980
parser: parser,
81+
ca: ClusterActionNotInProgress,
82+
}
83+
}
84+
85+
// SetClusterAction sets the clusterActionInProgress field.
86+
func (ql *QueryList) SetClusterAction(ca ClusterActionState) {
87+
ql.mu.Lock()
88+
defer ql.mu.Unlock()
89+
// If the current state is ClusterActionNotInProgress, then we want to ignore setting ClusterActionNoQueries.
90+
if ca == ClusterActionNoQueries && ql.ca == ClusterActionNotInProgress {
91+
return
7092
}
93+
ql.ca = ca
7194
}
7295

7396
// Add adds a QueryDetail to QueryList
74-
func (ql *QueryList) Add(qd *QueryDetail) {
97+
func (ql *QueryList) Add(qd *QueryDetail) error {
7598
ql.mu.Lock()
7699
defer ql.mu.Unlock()
100+
if ql.ca == ClusterActionNoQueries {
101+
return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown)
102+
}
77103
qds, exists := ql.queryDetails[qd.connID]
78104
if exists {
79105
ql.queryDetails[qd.connID] = append(qds, qd)
80106
} else {
81107
ql.queryDetails[qd.connID] = []*QueryDetail{qd}
82108
}
109+
return nil
83110
}
84111

85112
// Remove removes a QueryDetail from QueryList

go/vt/vttablet/tabletserver/query_list_test.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,15 +49,17 @@ func TestQueryList(t *testing.T) {
4949
ql := NewQueryList("test", sqlparser.NewTestParser())
5050
connID := int64(1)
5151
qd := NewQueryDetail(context.Background(), &testConn{id: connID})
52-
ql.Add(qd)
52+
err := ql.Add(qd)
53+
require.NoError(t, err)
5354

5455
if qd1, ok := ql.queryDetails[connID]; !ok || qd1[0].connID != connID {
5556
t.Errorf("failed to add to QueryList")
5657
}
5758

5859
conn2ID := int64(2)
5960
qd2 := NewQueryDetail(context.Background(), &testConn{id: conn2ID})
60-
ql.Add(qd2)
61+
err = ql.Add(qd2)
62+
require.NoError(t, err)
6163

6264
rows := ql.AppendQueryzRows(nil)
6365
if len(rows) != 2 || rows[0].ConnID != 1 || rows[1].ConnID != 2 {
@@ -74,11 +76,13 @@ func TestQueryListChangeConnIDInMiddle(t *testing.T) {
7476
ql := NewQueryList("test", sqlparser.NewTestParser())
7577
connID := int64(1)
7678
qd1 := NewQueryDetail(context.Background(), &testConn{id: connID})
77-
ql.Add(qd1)
79+
err := ql.Add(qd1)
80+
require.NoError(t, err)
7881

7982
conn := &testConn{id: connID}
8083
qd2 := NewQueryDetail(context.Background(), conn)
81-
ql.Add(qd2)
84+
err = ql.Add(qd2)
85+
require.NoError(t, err)
8286

8387
require.Len(t, ql.queryDetails[1], 2)
8488

@@ -92,3 +96,22 @@ func TestQueryListChangeConnIDInMiddle(t *testing.T) {
9296
require.Equal(t, qd1, ql.queryDetails[1][0])
9397
require.NotEqual(t, qd2, ql.queryDetails[1][0])
9498
}
99+
100+
func TestClusterAction(t *testing.T) {
101+
ql := NewQueryList("test", sqlparser.NewTestParser())
102+
connID := int64(1)
103+
qd1 := NewQueryDetail(context.Background(), &testConn{id: connID})
104+
105+
ql.SetClusterAction(ClusterActionInProgress)
106+
ql.SetClusterAction(ClusterActionNoQueries)
107+
err := ql.Add(qd1)
108+
require.ErrorContains(t, err, "operation not allowed in state SHUTTING_DOWN")
109+
110+
ql.SetClusterAction(ClusterActionNotInProgress)
111+
err = ql.Add(qd1)
112+
require.NoError(t, err)
113+
// If the current state is not in progress, then setting no queries, shouldn't change anything.
114+
ql.SetClusterAction(ClusterActionNoQueries)
115+
err = ql.Add(qd1)
116+
require.NoError(t, err)
117+
}

go/vt/vttablet/tabletserver/state_manager.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -542,6 +542,8 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error {
542542
}
543543

544544
func (sm *stateManager) unserveCommon() {
545+
sm.markClusterAction(ClusterActionInProgress)
546+
defer sm.markClusterAction(ClusterActionNotInProgress)
545547
// We create a wait group that tracks whether all the queries have been terminated or not.
546548
wg := sync.WaitGroup{}
547549
wg.Add(1)
@@ -601,6 +603,8 @@ func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func())
601603
if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil {
602604
return
603605
}
606+
// Prevent any new queries from being added before we kill all the queries in the list.
607+
sm.markClusterAction(ClusterActionNoQueries)
604608
log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod)
605609
sm.statelessql.TerminateAll()
606610
log.Infof("Killed all stateless OLTP queries.")
@@ -850,3 +854,10 @@ func (sm *stateManager) IsServingString() string {
850854
func (sm *stateManager) SetUnhealthyThreshold(v time.Duration) {
851855
sm.unhealthyThreshold.Store(v.Nanoseconds())
852856
}
857+
858+
// markClusterAction marks whether a cluster action is in progress or not for all the query details.
859+
func (sm *stateManager) markClusterAction(ca ClusterActionState) {
860+
sm.statefulql.SetClusterAction(ca)
861+
sm.statelessql.SetClusterAction(ca)
862+
sm.olapql.SetClusterAction(ca)
863+
}

go/vt/vttablet/tabletserver/state_manager_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -409,18 +409,20 @@ func TestStateManagerShutdownGracePeriod(t *testing.T) {
409409

410410
sm.te = &delayedTxEngine{}
411411
kconn1 := &killableConn{id: 1}
412-
sm.statelessql.Add(&QueryDetail{
412+
err := sm.statelessql.Add(&QueryDetail{
413413
conn: kconn1,
414414
connID: kconn1.id,
415415
})
416+
require.NoError(t, err)
416417
kconn2 := &killableConn{id: 2}
417-
sm.statefulql.Add(&QueryDetail{
418+
err = sm.statefulql.Add(&QueryDetail{
418419
conn: kconn2,
419420
connID: kconn2.id,
420421
})
422+
require.NoError(t, err)
421423

422424
// Transition to replica with no shutdown grace period should kill kconn2 but not kconn1.
423-
err := sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "")
425+
err = sm.SetServingType(topodatapb.TabletType_PRIMARY, testNow, StateServing, "")
424426
require.NoError(t, err)
425427
assert.False(t, kconn1.killed.Load())
426428
assert.True(t, kconn2.killed.Load())

0 commit comments

Comments
 (0)