Skip to content

Commit

Permalink
vtgate: increment vtgate_warnings counter for non atomic commits (#15010
Browse files Browse the repository at this point in the history
)

Signed-off-by: Max Englander <max@planetscale.com>
  • Loading branch information
maxenglander authored Jan 25, 2024
1 parent 66e8e57 commit 2162883
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
1 change: 1 addition & 0 deletions go/vt/vtgate/tx_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error
Code: uint32(sqlerror.ERNonAtomicCommit),
Message: fmt.Sprintf("multi-db commit failed after committing to %d shards: %s", i, strings.Join(sNames, ", ")),
})
warnings.Add("NonAtomicCommit", 1)
}
_ = txc.Release(ctx, session)
return err
Expand Down
9 changes: 9 additions & 0 deletions go/vt/vtgate/tx_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func TestTxConnCommitFailure(t *testing.T) {

sc, sbcs, rssm, rssa := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 3)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI
nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"]

// Sequence the executes to ensure commit order

Expand Down Expand Up @@ -163,13 +164,16 @@ func TestTxConnCommitFailure(t *testing.T) {
}
utils.MustMatch(t, &wantSession, session.Session, "Session")
assert.EqualValues(t, 1, sbcs[0].CommitCount.Load(), "sbc0.CommitCount")

require.Equal(t, nonAtomicCommitCount+1, warnings.Counts()["NonAtomicCommit"])
}

func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 18)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI
nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"]

// Sequence the executes to ensure commit order

Expand Down Expand Up @@ -214,13 +218,16 @@ func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) {
for i := 0; i < 17; i++ {
assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
}

require.Equal(t, nonAtomicCommitCount+1, warnings.Counts()["NonAtomicCommit"])
}

func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) {
ctx := utils.LeakCheckContext(t)

sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConn", 17)
sc.txConn.mode = vtgatepb.TransactionMode_MULTI
nonAtomicCommitCount := warnings.Counts()["NonAtomicCommit"]

// Sequence the executes to ensure commit order

Expand Down Expand Up @@ -265,6 +272,8 @@ func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) {
for i := 0; i < 16; i++ {
assert.EqualValues(t, 1, sbcs[i].CommitCount.Load(), fmt.Sprintf("sbc%d.CommitCount", i))
}

require.Equal(t, nonAtomicCommitCount+1, warnings.Counts()["NonAtomicCommit"])
}

func TestTxConnCommitSuccess(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ var (
// Error counters should be global so they can be set from anywhere
errorCounts = stats.NewCountersWithMultiLabels("VtgateApiErrorCounts", "Vtgate API error counts per error type", []string{"Operation", "Keyspace", "DbType", "Code"})

warnings = stats.NewCountersWithSingleLabel("VtGateWarnings", "Vtgate warnings", "type", "IgnoredSet", "ResultsExceeded", "WarnPayloadSizeExceeded")
warnings = stats.NewCountersWithSingleLabel("VtGateWarnings", "Vtgate warnings", "type", "IgnoredSet", "ResultsExceeded", "WarnPayloadSizeExceeded", "NonAtomicCommit")

vstreamSkewDelayCount = stats.NewCounter("VStreamEventsDelayedBySkewAlignment",
"Number of events that had to wait because the skew across shards was too high")
Expand Down

0 comments on commit 2162883

Please sign in to comment.