Skip to content

Commit

Permalink
Merge branch 'main' into gmpify/mysql_bind_host
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriel Parreiras <gab.parreiras@gmail.com>
  • Loading branch information
gmpify authored Nov 20, 2024
2 parents 7adb0a6 + c5d0ecc commit eb09d53
Show file tree
Hide file tree
Showing 77 changed files with 10,950 additions and 9,646 deletions.
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Flags:
--backup_storage_compress if set, the backup files will be compressed. (default true)
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Flags:
--backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups.
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the uncompressed transaction payload size at which we switch from in-memory buffer based decompression to the slower streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_grpc_ca string the server ca to use to validate servers when connecting
Expand Down
21 changes: 12 additions & 9 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ const (
// Length of the binlog event header for internal events within
// the transaction payload.
headerLen = binlogEventLenOffset + eventLenBytes

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)

var (
Expand All @@ -75,13 +70,18 @@ var (
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory all at once.
ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB

// A concurrent stateless decoder that caches decompressors. This is
// used for smaller payloads that we want to handle entirely using
// in-memory buffers via DecodeAll.
statelessDecoder *zstd.Decoder

// A pool of stateful decoders for larger payloads that we want to
// stream. The number of large (> zstdInMemoryDecompressorMaxSize)
// stream. The number of large (> ZstdInMemoryDecompressorMaxSize)
// payloads should typically be relatively low, but there may be times
// where there are many of them -- and users like vstreamer may have
// N concurrent streams per tablet which could lead to a lot of
Expand Down Expand Up @@ -271,7 +271,7 @@ func (tp *TransactionPayload) decode() error {
}

// decompress decompresses the payload. If the payload is larger than
// zstdInMemoryDecompressorMaxSize then we stream the decompression via
// ZstdInMemoryDecompressorMaxSize then we stream the decompression via
// the package's pool of zstd.Decoders, otherwise we use in-memory
// buffers with the package's concurrent statelessDecoder.
// In either case, we setup the reader that can be used within the
Expand All @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error {

// Switch to slower but less memory intensive stream mode for
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder, err := statefulDecoderPool.Get(in)
if err != nil {
Expand Down Expand Up @@ -366,7 +366,10 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled)
}
} else {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
// Use the minimum amount of memory we can in processing the transaction by
// setting lowMem to true and limiting the decoder concurrency to 1 so that
// there's no async decoding of multiple windows or blocks.
d, err := zstd.NewReader(nil, zstd.WithDecoderLowmem(true), zstd.WithDecoderConcurrency(1))
if err != nil { // Should only happen e.g. due to ENOMEM
return nil, vterrors.Wrap(err, "failed to create stateful stream decoder")
}
Expand Down
8 changes: 7 additions & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
},
}

// Ensure that we can process events where the *uncompressed* size is
// larger than ZstdInMemoryDecompressorMaxSize. The *compressed* size
// of the payload in large_compressed_trx_payload.bin is 16KiB so we
// set the max to 2KiB to test this.
ZstdInMemoryDecompressorMaxSize = 2048

for _, tc := range testCases {
memDecodingCnt := compressedTrxPayloadsInMem.Get()
streamDecodingCnt := compressedTrxPayloadsUsingStream.Get()
Expand Down Expand Up @@ -191,7 +197,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
totalSize += len(eventStr)
require.True(t, strings.HasPrefix(eventStr, want))
}
require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize)
}
}
}
Expand Down
190 changes: 190 additions & 0 deletions go/mysql/sqlerror/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ const (
ERBlobKeyWithoutLength = ErrorCode(1170)
ERPrimaryCantHaveNull = ErrorCode(1171)
ERTooManyRows = ErrorCode(1172)
ERErrorDuringCommit = ErrorCode(1180)
ERLockOrActiveTransaction = ErrorCode(1192)
ERUnknownSystemVariable = ErrorCode(1193)
ERSetConstantsOnly = ErrorCode(1204)
Expand Down Expand Up @@ -301,6 +302,195 @@ const (
ERServerIsntAvailable = ErrorCode(3168)
)

// HandlerErrorCode is for errors thrown by the handler, and which are then embedded in other errors.
// See https://github.com/mysql/mysql-server/blob/trunk/include/my_base.h
type HandlerErrorCode uint16

func (e HandlerErrorCode) ToString() string {
return strconv.FormatUint(uint64(e), 10)
}

const (
// Didn't find key on read or update
HaErrKeyNotFound = HandlerErrorCode(120)
// Duplicate key on write
HaErrFoundDuppKey = HandlerErrorCode(121)
// Internal error
HaErrInternalError = HandlerErrorCode(122)
// Uppdate with is recoverable
HaErrRecordChanged = HandlerErrorCode(123)
// Wrong index given to function
HaErrWrongIndex = HandlerErrorCode(124)
// Transaction has been rolled back
HaErrRolledBack = HandlerErrorCode(125)
// Indexfile is crashed
HaErrCrashed = HandlerErrorCode(126)
// Record-file is crashed
HaErrWrongInRecord = HandlerErrorCode(127)
// Record-file is crashed
HaErrOutOfMem = HandlerErrorCode(128)
// not a MYI file - no signature
HaErrNotATable = HandlerErrorCode(130)
// Command not supported
HaErrWrongCommand = HandlerErrorCode(131)
// old database file
HaErrOldFile = HandlerErrorCode(132)
// No record read in update()
HaErrNoActiveRecord = HandlerErrorCode(133)
// A record is not there
HaErrRecordDeleted = HandlerErrorCode(134)
// No more room in file
HaErrRecordFileFull = HandlerErrorCode(135)
// No more room in file
HaErrIndexFileFull = HandlerErrorCode(136)
// end in next/prev/first/last
HaErrEndOfFile = HandlerErrorCode(137)
// unsupported extension used
HaErrUnsupported = HandlerErrorCode(138)
// Too big row
HaErrTooBigRow = HandlerErrorCode(139)
// Wrong create option
HaWrongCreateOption = HandlerErrorCode(140)
// Duplicate unique on write
HaErrFoundDuppUnique = HandlerErrorCode(141)
// Can't open charset
HaErrUnknownCharset = HandlerErrorCode(142)
// conflicting tables in MERGE
HaErrWrongMrgTableDef = HandlerErrorCode(143)
// Last (automatic?) repair failed
HaErrCrashedOnRepair = HandlerErrorCode(144)
// Table must be repaired
HaErrCrashedOnUsage = HandlerErrorCode(145)
// Lock wait timeout
HaErrLockWaitTimeout = HandlerErrorCode(146)
// Lock table is full
HaErrLockTableFull = HandlerErrorCode(147)
// Updates not allowed
HaErrReadOnlyTransaction = HandlerErrorCode(148)
// Deadlock found when trying to get lock
HaErrLockDeadlock = HandlerErrorCode(149)
// Cannot add a foreign key constr.
HaErrCannotAddForeign = HandlerErrorCode(150)
// Cannot add a child row
HaErrNoReferencedRow = HandlerErrorCode(151)
// Cannot delete a parent row
HaErrRowIsReferenced = HandlerErrorCode(152)
// No savepoint with that name
HaErrNoSavepoint = HandlerErrorCode(153)
// Non unique key block size
HaErrNonUniqueBlockSize = HandlerErrorCode(154)
// The table does not exist in engine
HaErrNoSuchTable = HandlerErrorCode(155)
// The table existed in storage engine
HaErrTableExist = HandlerErrorCode(156)
// Could not connect to storage engine
HaErrNoConnection = HandlerErrorCode(157)
// NULLs are not supported in spatial index
HaErrNullInSpatial = HandlerErrorCode(158)
// The table changed in storage engine
HaErrTableDefChanged = HandlerErrorCode(159)
// There's no partition in table for given value
HaErrNoPartitionFound = HandlerErrorCode(160)
// Row-based binlogging of row failed
HaErrRbrLoggingFailed = HandlerErrorCode(161)
// Index needed in foreign key constraint
HaErrDropIndexFk = HandlerErrorCode(162)
// Upholding foreign key constraints would lead to a duplicate key error in some other table.
HaErrForeignDuplicateKey = HandlerErrorCode(163)
// The table changed in storage engine
HaErrTableNeedsUpgrade = HandlerErrorCode(164)
// The table is not writable
HaErrTableReadonly = HandlerErrorCode(165)
// Failed to get next autoinc value
HaErrAutoincReadFailed = HandlerErrorCode(166)
// Failed to set row autoinc value
HaErrAutoincErange = HandlerErrorCode(167)
// Generic error
HaErrGeneric = HandlerErrorCode(168)
// row not actually updated: new values same as the old values
HaErrRecordIsTheSame = HandlerErrorCode(169)
// It is not possible to log this statement
HaErrLoggingImpossible = HandlerErrorCode(170)
// The event was corrupt, leading to illegal data being read
HaErrCorruptEvent = HandlerErrorCode(171)
// New file format
HaErrNewFile = HandlerErrorCode(172)
// The event could not be processed no other handler error happened
HaErrRowsEventApply = HandlerErrorCode(173)
// Error during initialization
HaErrInitialization = HandlerErrorCode(174)
// File too short
HaErrFileTooShort = HandlerErrorCode(175)
// Wrong CRC on page
HaErrWrongCrc = HandlerErrorCode(176)
// Too many active concurrent transactions
HaErrTooManyConcurrentTrxs = HandlerErrorCode(177)
// There's no explicitly listed partition in table for the given value
HaErrNotInLockPartitions = HandlerErrorCode(178)
// Index column length exceeds limit
HaErrIndexColTooLong = HandlerErrorCode(179)
// InnoDB index corrupted
HaErrIndexCorrupt = HandlerErrorCode(180)
// Undo log record too big
HaErrUndoRecTooBig = HandlerErrorCode(181)
// Invalid InnoDB Doc ID
HaFtsInvalidDocid = HandlerErrorCode(182)
// Table being used in foreign key check
HaErrTableInFkCheck = HandlerErrorCode(183)
// The tablespace existed in storage engine
HaErrTablespaceExists = HandlerErrorCode(184)
// Table has too many columns
HaErrTooManyFields = HandlerErrorCode(185)
// Row in wrong partition
HaErrRowInWrongPartition = HandlerErrorCode(186)
// InnoDB is in read only mode.
HaErrInnodbReadOnly = HandlerErrorCode(187)
// FTS query exceeds result cache limit
HaErrFtsExceedResultCacheLimit = HandlerErrorCode(188)
// Temporary file write failure
HaErrTempFileWriteFailure = HandlerErrorCode(189)
// Innodb is in force recovery mode
HaErrInnodbForcedRecovery = HandlerErrorCode(190)
// Too many words in a phrase
HaErrFtsTooManyWordsInPhrase = HandlerErrorCode(191)
// FK cascade depth exceeded
HaErrFkDepthExceeded = HandlerErrorCode(192)
// Option Missing during Create
HaMissingCreateOption = HandlerErrorCode(193)
// Out of memory in storage engine
HaErrSeOutOfMemory = HandlerErrorCode(194)
// Table/Clustered index is corrupted.
HaErrTableCorrupt = HandlerErrorCode(195)
// The query was interrupted
HaErrQueryInterrupted = HandlerErrorCode(196)
// Missing Tablespace
HaErrTablespaceMissing = HandlerErrorCode(197)
// Tablespace is not empty
HaErrTablespaceIsNotEmpty = HandlerErrorCode(198)
// Invalid Filename
HaErrWrongFileName = HandlerErrorCode(199)
// Operation is not allowed
HaErrNotAllowedCommand = HandlerErrorCode(200)
// Compute generated column value failed
HaErrComputeFailed = HandlerErrorCode(201)
// Table's row format has changed in the storage engine. Information in the data-dictionary needs to be updated.
HaErrRowFormatChanged = HandlerErrorCode(202)
// Don't wait for record lock
HaErrNoWaitLock = HandlerErrorCode(203)
// No more room in disk
HaErrDiskFullNowait = HandlerErrorCode(204)
// No session temporary space available
HaErrNoSessionTemp = HandlerErrorCode(205)
// Wrong or Invalid table name
HaErrWrongTableName = HandlerErrorCode(206)
// Path is too long for the OS
HaErrTooLongPath = HandlerErrorCode(207)
// Histogram sampling initialization failed
HaErrSamplingInitFailed = HandlerErrorCode(208)
// Too many sub-expression in search string
HaErrFtsTooManyNestedExp = HandlerErrorCode(209)
)

// Sql states for errors.
// Originally found in include/mysql/sql_state.h
const (
Expand Down
12 changes: 12 additions & 0 deletions go/mysql/sqlerror/sql_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,18 @@ func NewSQLError(number ErrorCode, sqlState string, msg string) *SQLError {
}
}

var handlerErrExtract = regexp.MustCompile(`Got error ([0-9]*) [-] .* (from storage engine|during COMMIT|during ROLLBACK)`)

func (se *SQLError) HaErrorCode() HandlerErrorCode {
match := handlerErrExtract.FindStringSubmatch(se.Message)
if len(match) >= 1 {
if code, err := strconv.ParseUint(match[1], 10, 16); err == nil {
return HandlerErrorCode(code)
}
}
return 0
}

// Error implements the error interface
func (se *SQLError) Error() string {
var buf strings.Builder
Expand Down
21 changes: 21 additions & 0 deletions go/mysql/sqlerror/sql_error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func TestNewSQLErrorFromError(t *testing.T) {
var tCases = []struct {
err error
num ErrorCode
ha HandlerErrorCode
ss string
}{
{
Expand Down Expand Up @@ -179,6 +180,24 @@ func TestNewSQLErrorFromError(t *testing.T) {
num: ERDupEntry,
ss: SSConstraintViolation,
},
{
err: fmt.Errorf("ERROR HY000: Got error 204 - 'No more room in disk' during COMMIT"),
num: ERUnknownError,
ss: SSUnknownSQLState,
ha: HaErrDiskFullNowait,
},
{
err: fmt.Errorf("COMMIT failed w/ error: Got error 204 - 'No more room in disk' during COMMIT (errno 1180) (sqlstate HY000) during query: commit"),
num: ERErrorDuringCommit,
ss: SSUnknownSQLState,
ha: HaErrDiskFullNowait,
},
{
err: fmt.Errorf("COMMIT failed w/ error: Got error 149 - 'Lock deadlock; Retry transaction' during COMMIT (errno 1180) (sqlstate HY000) during query: commit"),
num: ERErrorDuringCommit,
ss: SSUnknownSQLState,
ha: HaErrLockDeadlock,
},
}

for _, tc := range tCases {
Expand All @@ -187,6 +206,8 @@ func TestNewSQLErrorFromError(t *testing.T) {
require.ErrorAs(t, NewSQLErrorFromError(tc.err), &err)
assert.Equal(t, tc.num, err.Number())
assert.Equal(t, tc.ss, err.SQLState())
ha := err.HaErrorCode()
assert.Equal(t, tc.ha, ha)
})
}
}
33 changes: 33 additions & 0 deletions go/osutil/loadavg.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package osutil

import (
"fmt"
"strconv"
"strings"
)

// parseLoadAvg parses the load average from the content of /proc/loadavg or sysctl output.
// Input such as "1.00 0.99 0.98 1/1 1", "2.83 3.01 3.36"
func parseLoadAvg(content string) (float64, error) {
fields := strings.Fields(content)
if len(fields) == 0 {
return 0, fmt.Errorf("unexpected loadavg content: %s", content)
}
return strconv.ParseFloat(fields[0], 64)
}
Loading

0 comments on commit eb09d53

Please sign in to comment.