Skip to content

Commit 2926cb0

Browse files
author
Jun Wang
committed
update solution
Signed-off-by: Jun Wang <jun.wang@demonware.net>
1 parent 45192d2 commit 2926cb0

File tree

8 files changed

+68
-5
lines changed

8 files changed

+68
-5
lines changed

changelog/22.0/22.0.0/summary.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
- **[Minor Changes](#minor-changes)**
1212
- **[VTTablet Flags](#flags-vttablet)**
1313

14+
- **[Minor Changes](#minor-changes)**
15+
- **[VTTablet](#vttablet)**
16+
- [VTTablet: Query Consolidation Waiter Cap](#vttablet-consolidator-query-waiter-cap)
1417

1518
## <a id="major-changes"/>Major Changes</a>
1619

@@ -67,3 +70,11 @@ To upgrade to the newer version of the configuration file, first switch to using
6770
- `twopc_abandon_age` flag now supports values in the time.Duration format (e.g., 1s, 2m, 1h).
6871
While the flag will continue to accept float values (interpreted as seconds) for backward compatibility,
6972
**float inputs are deprecated** and will be removed in a future release.
73+
74+
## <a id="minor-changes"/>Minor Changes
75+
76+
### <a id="vttablet"/>VTTablet
77+
78+
#### <a id="vttablet-consolidator-query-waiter-cap"/>--consolidator-query-waiter-cap flag
79+
80+
A new CLI flag `--consolidator-query-waiter-cap` to set the maximum number of clients allowed to wait on the consolidator. The default value is set to 0 for unlimited wait. Users can adjust this value based on the performance of VTTablet to avoid excessive memory usage and the risk of being OOMKilled, particularly in Kubernetes deployments.

go/flags/endtoend/vtcombo.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ Flags:
5252
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
5353
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
5454
--config-type string Config file type (omit to infer config type from file extension).
55+
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
5556
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
5657
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
5758
--consul_auth_static_file string JSON File to read the topos/tokens from.

go/flags/endtoend/vttablet.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ Flags:
8686
--config-path strings Paths to search for config files in. (default [{{ .Workdir }}])
8787
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
8888
--config-type string Config file type (omit to infer config type from file extension).
89+
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
8990
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
9091
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
9192
--consul_auth_static_file string JSON File to read the topos/tokens from.

go/sync2/consolidator.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type PendingResult interface {
4040
SetResult(*sqltypes.Result)
4141
Result() *sqltypes.Result
4242
Wait()
43+
AddWaiterCounter(int64) *int64
4344
}
4445

4546
type consolidator struct {
@@ -77,6 +78,7 @@ func (co *consolidator) Create(query string) (PendingResult, bool) {
7778
defer co.mu.Unlock()
7879
var r *pendingResult
7980
if r, ok := co.queries[query]; ok {
81+
r.AddWaiterCounter(int64(1))
8082
return r, false
8183
}
8284
r = &pendingResult{consolidator: co, query: query}
@@ -122,17 +124,23 @@ func (rs *pendingResult) Wait() {
122124
rs.executing.RLock()
123125
}
124126

127+
func (rs *pendingResult) AddWaiterCounter(c int64) *int64 {
128+
atomic.AddInt64(rs.consolidator.totalWaiterCount, c)
129+
return rs.consolidator.totalWaiterCount
130+
}
131+
125132
// ConsolidatorCache is a thread-safe object used for counting how often recent
126133
// queries have been consolidated.
127134
// It is also used by the txserializer package to count how often transactions
128135
// have been queued and had to wait because they targeted the same row (range).
129136
type ConsolidatorCache struct {
130137
*cache.LRUCache[*ccount]
138+
totalWaiterCount *int64
131139
}
132140

133141
// NewConsolidatorCache creates a new cache with the given capacity.
134142
func NewConsolidatorCache(capacity int64) *ConsolidatorCache {
135-
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity)}
143+
return &ConsolidatorCache{cache.NewLRUCache[*ccount](capacity), new(int64)}
136144
}
137145

138146
// Record increments the count for "query" by 1.

go/sync2/consolidator_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,42 @@ package sync2
1818

1919
import (
2020
"reflect"
21+
"sync"
2122
"testing"
2223

2324
"vitess.io/vitess/go/sqltypes"
2425
)
2526

27+
func TestAddWaiterCount(t *testing.T) {
28+
con := NewConsolidator()
29+
sql := "select * from SomeTable"
30+
pr, _ := con.Create(sql)
31+
var wgAdd sync.WaitGroup
32+
var wgSub sync.WaitGroup
33+
34+
var concurrent = 1000
35+
36+
for i := 0; i < concurrent; i++ {
37+
wgAdd.Add(1)
38+
wgSub.Add(1)
39+
go func() {
40+
defer wgAdd.Done()
41+
pr.AddWaiterCounter(1)
42+
}()
43+
go func() {
44+
defer wgSub.Done()
45+
pr.AddWaiterCounter(-1)
46+
}()
47+
}
48+
49+
wgAdd.Wait()
50+
wgSub.Wait()
51+
52+
if *pr.AddWaiterCounter(0) != 0 {
53+
t.Fatalf("Expect 0 totalWaiterCount but got: %v", *pr.AddWaiterCounter(0))
54+
}
55+
}
56+
2657
func TestConsolidator(t *testing.T) {
2758
con := NewConsolidator()
2859
sql := "select * from SomeTable"

go/sync2/fake_consolidator.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,8 @@ func (fr *FakePendingResult) SetResult(result *sqltypes.Result) {
112112
func (fr *FakePendingResult) Wait() {
113113
fr.WaitCalls++
114114
}
115+
116+
// AddWaiterCounter is currently a no-op.
117+
func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
118+
return new(int64)
119+
}

go/vt/vttablet/tabletserver/query_executor.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -716,10 +716,14 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
716716
q.SetErr(err)
717717
}
718718
} else {
719-
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
720-
startTime := time.Now()
721-
q.Wait()
722-
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
719+
waiterCap := qre.tsv.config.ConsolidatorQueryWaiterCap
720+
if waiterCap == 0 || *q.AddWaiterCounter(0) <= waiterCap {
721+
qre.logStats.QuerySources |= tabletenv.QuerySourceConsolidator
722+
startTime := time.Now()
723+
q.Wait()
724+
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
725+
}
726+
q.AddWaiterCounter(-1)
723727
}
724728
if q.Err() != nil {
725729
return nil, q.Err()

go/vt/vttablet/tabletserver/tabletenv/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
195195
fs.Int64Var(&currentConfig.ConsolidatorStreamQuerySize, "consolidator-stream-query-size", defaultConfig.ConsolidatorStreamQuerySize, "Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator.")
196196
fs.Int64Var(&currentConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.")
197197

198+
fs.Int64Var(&currentConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
198199
fs.DurationVar(&healthCheckInterval, "health_check_interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
199200
fs.DurationVar(&degradedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
200201
fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
@@ -320,6 +321,7 @@ type TabletConfig struct {
320321
StreamBufferSize int `json:"streamBufferSize,omitempty"`
321322
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
322323
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
324+
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
323325
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
324326
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
325327
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`

0 commit comments

Comments
 (0)