Skip to content

Commit

Permalink
expose etcd's transaction as LowLevelTxn interface to kv.Base, simula…
Browse files Browse the repository at this point in the history
…te the behavior in memkv and leveldb

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Jan 20, 2025
1 parent 973234d commit 7f34146
Show file tree
Hide file tree
Showing 7 changed files with 746 additions and 6 deletions.
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,11 @@ error = '''
etcd transaction failed, conflicted and rolled back
'''

[PD:etcd:ErrEtcdTxnResponse]
error = '''
etcd transaction returned invalid response: %v
'''

["PD:etcd:ErrEtcdTxnInternal"]
error = '''
internal etcd transaction error occurred
Expand Down
1 change: 1 addition & 0 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ var (
ErrEtcdGrantLease = errors.Normalize("etcd lease failed", errors.RFCCodeText("PD:etcd:ErrEtcdGrantLease"))
ErrEtcdTxnInternal = errors.Normalize("internal etcd transaction error occurred", errors.RFCCodeText("PD:etcd:ErrEtcdTxnInternal"))
ErrEtcdTxnConflict = errors.Normalize("etcd transaction failed, conflicted and rolled back", errors.RFCCodeText("PD:etcd:ErrEtcdTxnConflict"))
ErrEtcdTxnResponse = errors.Normalize("etcd transaction returned invalid response: %v", errors.RFCCodeText("PD:etcd:ErrEtcdTxnResponse"))
ErrEtcdKVPut = errors.Normalize("etcd KV put failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVPut"))
ErrEtcdKVDelete = errors.Normalize("etcd KV delete failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVDelete"))
ErrEtcdKVGet = errors.Normalize("etcd KV get failed", errors.RFCCodeText("PD:etcd:ErrEtcdKVGet"))
Expand Down
123 changes: 123 additions & 0 deletions pkg/storage/kv/etcd_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv

import (
"context"
"fmt"
"path"
"strings"
"time"
Expand Down Expand Up @@ -139,6 +140,13 @@ func (kv *etcdKVBase) Remove(key string) error {
return nil
}

func (kv *etcdKVBase) CreateLowLevelTxn() LowLevelTxn {
return &lowLevelTxnWrapper{
inner: NewSlowLogTxn(kv.client),
rootPath: kv.rootPath,
}
}

// SlowLogTxn wraps etcd transaction and log slow one.
type SlowLogTxn struct {
clientv3.Txn
Expand Down Expand Up @@ -296,3 +304,118 @@ func (txn *etcdTxn) commit() error {
}
return nil
}

type lowLevelTxnWrapper struct {
inner clientv3.Txn
rootPath string
}

func (l *lowLevelTxnWrapper) If(conditions ...LowLevelTxnCondition) LowLevelTxn {
cmpList := make([]clientv3.Cmp, 0, len(conditions))
for _, c := range conditions {
key := strings.Join([]string{l.rootPath, c.Key}, "/")
if c.CmpType == LowLevelCmpExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), ">", 0))
} else if c.CmpType == LowLevelCmpNotExists {
cmpList = append(cmpList, clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
} else {
var cmpOp string
switch c.CmpType {
case LowLevelCmpEqual:
cmpOp = "="
case LowLevelCmpNotEqual:
cmpOp = "!="
case LowLevelCmpGreater:
cmpOp = ">"
case LowLevelCmpLess:
cmpOp = "<"
default:
panic(fmt.Sprintf("unknown cmp type %v", c.CmpType))
}
cmpList = append(cmpList, clientv3.Compare(clientv3.Value(key), cmpOp, c.Value))
}
}
l.inner = l.inner.If(cmpList...)
return l
}

func (l *lowLevelTxnWrapper) convertOps(ops []LowLevelTxnOp) []clientv3.Op {
opsList := make([]clientv3.Op, 0, len(ops))
for _, op := range ops {
key := strings.Join([]string{l.rootPath, op.Key}, "/")
switch op.OpType {
case LowLevelOpPut:
opsList = append(opsList, clientv3.OpPut(key, op.Value))
case LowLevelOpDelete:
opsList = append(opsList, clientv3.OpDelete(key))
case LowLevelOpGet:
opsList = append(opsList, clientv3.OpGet(key))
case LowLevelOpGetRange:
if op.EndKey == "\x00" {
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithPrefix(), clientv3.WithLimit(int64(op.Limit))))
} else {
endKey := strings.Join([]string{l.rootPath, op.EndKey}, "/")
opsList = append(opsList, clientv3.OpGet(key, clientv3.WithRange(endKey), clientv3.WithLimit(int64(op.Limit))))
}
default:
panic(fmt.Sprintf("unknown op type %v", op.OpType))
}
}
return opsList
}

func (l *lowLevelTxnWrapper) Then(ops ...LowLevelTxnOp) LowLevelTxn {
l.inner = l.inner.Then(l.convertOps(ops)...)
return l
}

func (l *lowLevelTxnWrapper) Else(ops ...LowLevelTxnOp) LowLevelTxn {
l.inner = l.inner.Else(l.convertOps(ops)...)
return l
}

func (l *lowLevelTxnWrapper) Commit(_ctx context.Context) (LowLevelTxnResult, error) {
resp, err := l.inner.Commit()
if err != nil {
return LowLevelTxnResult{}, err
}
items := make([]LowLevelTxnResultItem, 0, len(resp.Responses))
for i, respItem := range resp.Responses {
var resultItem LowLevelTxnResultItem
if put := respItem.GetResponsePut(); put != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
resultItem = LowLevelTxnResultItem{}
if put.PrevKv != nil {
key := strings.TrimPrefix(strings.TrimPrefix(string(put.PrevKv.Key), l.rootPath), "/")
resultItem.KeyValuePairs = []KeyValuePair{{
Key: key,
Value: string(put.PrevKv.Value),
}}
}
} else if del := respItem.GetResponseDeleteRange(); del != nil {
// Put and delete operations of etcd's transaction won't return any previous data. Skip handling it.
resultItem = LowLevelTxnResultItem{}
} else if rangeResp := respItem.GetResponseRange(); rangeResp != nil {
kvs := make([]KeyValuePair, 0, len(rangeResp.Kvs))
for _, kv := range rangeResp.Kvs {
key := strings.TrimPrefix(strings.TrimPrefix(string(kv.Key), l.rootPath), "/")
kvs = append(kvs, KeyValuePair{
Key: key,
Value: string(kv.Value),
})
}
resultItem = LowLevelTxnResultItem{
KeyValuePairs: kvs,
}
} else {
return LowLevelTxnResult{}, errs.ErrEtcdTxnResponse.GenWithStackByArgs(
fmt.Sprintf("succeeded: %v, index: %v, response: %v", resp.Succeeded, i, respItem),
)
}
items = append(items, resultItem)
}
return LowLevelTxnResult{
Succeeded: resp.Succeeded,
Items: items,
}, nil
}
133 changes: 129 additions & 4 deletions pkg/storage/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,142 @@ package kv

import "context"

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
type BaseReadWrite interface {

Check failure on line 19 in pkg/storage/kv/kv.go

View workflow job for this annotation

GitHub Actions / statics

exported: exported type BaseReadWrite should have comment or be unexported (revive)
Save(key, value string) error
Remove(key string) error
Load(key string) (string, error)
LoadRange(key, endKey string, limit int) (keys []string, values []string, err error)
}

// Txn bundles multiple operations into a single executable unit.
// It enables kv to atomically apply a set of updates.
type Txn interface {
BaseReadWrite
}

type LowLevelTxnCmpType int

Check failure on line 32 in pkg/storage/kv/kv.go

View workflow job for this annotation

GitHub Actions / statics

exported: exported type LowLevelTxnCmpType should have comment or be unexported (revive)
type LowLevelTxnOpType int

const (
LowLevelCmpEqual LowLevelTxnCmpType = iota
LowLevelCmpNotEqual
LowLevelCmpLess
LowLevelCmpGreater
LowLevelCmpExists
LowLevelCmpNotExists
)

const (
LowLevelOpPut LowLevelTxnOpType = iota
LowLevelOpDelete
LowLevelOpGet
LowLevelOpGetRange
)

type LowLevelTxnCondition struct {
Key string
CmpType LowLevelTxnCmpType
Value string
}

func (c *LowLevelTxnCondition) CheckOnValue(value string, exists bool) bool {
switch c.CmpType {
case LowLevelCmpEqual:
if exists && value == c.Value {
return true
}
case LowLevelCmpNotEqual:
if exists && value != c.Value {
return true
}
case LowLevelCmpLess:
if exists && value < c.Value {
return true
}
case LowLevelCmpGreater:
if exists && value > c.Value {
return true
}
case LowLevelCmpExists:
if exists {
return true
}
case LowLevelCmpNotExists:
if !exists {
return true
}
default:
panic("unreachable")
}
return false
}

type LowLevelTxnOp struct {
Key string
OpType LowLevelTxnOpType
Value string
// The end key when the OpType is LowLevelOpGetRange.
EndKey string
// The limit of the keys to get when the OpType is LowLevelOpGetRange.
Limit int
}

type KeyValuePair struct {
Key string
Value string
}

// LowLevelTxnResultItem represents a single result of a read operation in a LowLevelTxn.
type LowLevelTxnResultItem struct {
KeyValuePairs []KeyValuePair
More bool
}

// LowLevelTxnResult represents the result of a LowLevelTxn. The results of operations in `Then` or `Else` branches
// will be listed in `Items` in the same order as the operations are added.
// For Put or Delete operations, its corresponding result is the previous value before writing.
type LowLevelTxnResult struct {
Succeeded bool
Items []LowLevelTxnResultItem
}

// LowLevelTxn is a low-level transaction interface. It follows the same pattern of etcd's transaction
// API. When the backend is etcd, it simply calls etcd's equivalent APIs internally. Otherwise, the
// behavior is simulated.
// Considering that in different backends, the kv pairs may not have equivalent property of etcd's
// version, create-time, etc., the abstracted LowLevelTxn interface does not support comparing on them.
// It only supports checking the value or whether the key exists.
// Avoid reading/writing the same key multiple times in a single transaction, otherwise the behavior
// would be undefined.
type LowLevelTxn interface {
If(conditions ...LowLevelTxnCondition) LowLevelTxn
Then(ops ...LowLevelTxnOp) LowLevelTxn
Else(ops ...LowLevelTxnOp) LowLevelTxn
Commit(ctx context.Context) (LowLevelTxnResult, error)
}

// Base is an abstract interface for load/save pd cluster data.
type Base interface {
Txn
BaseReadWrite
// RunInTxn runs the user provided function in a Transaction.
// If user provided function f returns a non-nil error, then
// transaction will not be committed, the same error will be
// returned by RunInTxn.
// Otherwise, it returns the error occurred during the
// transaction.
//
// This is a highly-simplified transaction interface. As
// etcd's transaction API is quite limited, it's hard to use it
// to provide a complete transaction model as how a normal database
// does. So when this API is running on etcd backend, each read on
// `txn` implicitly constructs a condition.
// (ref: https://etcd.io/docs/v3.5/learning/api/#transaction)
// When reading a range using `LoadRange`, for each key found in the
// range there will be a condition constructed. Be aware of the
// possibility of causing phantom read.
// RunInTxn may not suit all use cases. When RunInTxn is found not
// improper to use, consider using CreateLowLevelTxn instead.
//
// Note that transaction are not committed until RunInTxn returns nil.
// Note:
// 1. Load and LoadRange operations provides only stale read.
Expand All @@ -42,4 +160,11 @@ type Base interface {
// 2. Only when storage is etcd, does RunInTxn checks that
// values loaded during transaction has not been modified before commit.
RunInTxn(ctx context.Context, f func(txn Txn) error) error

// CreateLowLevelTxn creates a transaction that provides the if-then-else
// API pattern which is the same as how etcd does, makes it possible
// to precisely control how etcd's transaction API is used when the
// backend is etcd. When there's other backend types, the behavior will be
// simulated.
CreateLowLevelTxn() LowLevelTxn
}
Loading

0 comments on commit 7f34146

Please sign in to comment.