Skip to content

Commit 91f29f7

Browse files
committed
Make ZstdInMemoryDecompressorMaxSize configurable
Signed-off-by: Matt Lord <mattalord@gmail.com>
1 parent f6067e0 commit 91f29f7

File tree

3 files changed

+11
-8
lines changed

3 files changed

+11
-8
lines changed

go/mysql/binlog_event_compression.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,6 @@ const (
5858
// Length of the binlog event header for internal events within
5959
// the transaction payload.
6060
headerLen = binlogEventLenOffset + eventLenBytes
61-
62-
// At what size should we switch from the in-memory buffer
63-
// decoding to streaming mode which is much slower, but does
64-
// not require everything be done in memory.
65-
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
6661
)
6762

6863
var (
@@ -75,6 +70,11 @@ var (
7570
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
7671
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")
7772

73+
// At what size should we switch from the in-memory buffer
74+
// decoding to streaming mode which is much slower, but does
75+
// not require everything be done in memory all at once.
76+
ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB
77+
7878
// A concurrent stateless decoder that caches decompressors. This is
7979
// used for smaller payloads that we want to handle entirely using
8080
// in-memory buffers via DecodeAll.
@@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error {
284284

285285
// Switch to slower but less memory intensive stream mode for
286286
// larger payloads.
287-
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
287+
if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
288288
in := bytes.NewReader(tp.payload)
289289
streamDecoder, err := statefulDecoderPool.Get(in)
290290
if err != nil {
@@ -366,7 +366,7 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) {
366366
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled)
367367
}
368368
} else {
369-
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
369+
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(ZstdInMemoryDecompressorMaxSize))
370370
if err != nil { // Should only happen e.g. due to ENOMEM
371371
return nil, vterrors.Wrap(err, "failed to create stateful stream decoder")
372372
}

go/mysql/binlog_event_mysql56_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
191191
totalSize += len(eventStr)
192192
require.True(t, strings.HasPrefix(eventStr, want))
193193
}
194-
require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
194+
require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize)
195195
}
196196
}
197197
}

go/vt/vttablet/common/flags.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/spf13/pflag"
2323

24+
"vitess.io/vitess/go/mysql"
2425
"vitess.io/vitess/go/vt/servenv"
2526
)
2627

@@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) {
9495
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")
9596

9697
fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")
98+
99+
fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the sizea at which the faster all in-memory binlog compressed transaction handling will switch to the slower streaming mode. It also controls the maximum memory to be used when in streaming mode.")
97100
}

0 commit comments

Comments
 (0)