From aa8ee0fde472edb706dafb42984b55bdaec2965e Mon Sep 17 00:00:00 2001 From: Martin Buhr Date: Tue, 13 Aug 2024 09:22:11 +1200 Subject: [PATCH] fixed deletion logic for non-flusher-supporting K/V stores --- temporal/internal/driver/local/flusher.go | 14 +++++- temporal/internal/driver/local/keyvalue.go | 48 ++++++++----------- temporal/internal/driver/local/local.go | 12 ++++- temporal/internal/driver/local/mockstore.go | 6 +++ .../driver/local/nonblocking_store.go | 6 +++ temporal/internal/driver/local/types.go | 7 +++ 6 files changed, 61 insertions(+), 32 deletions(-) diff --git a/temporal/internal/driver/local/flusher.go b/temporal/internal/driver/local/flusher.go index 68cd659..b843fed 100644 --- a/temporal/internal/driver/local/flusher.go +++ b/temporal/internal/driver/local/flusher.go @@ -5,6 +5,17 @@ import ( ) func (api *API) FlushAll(ctx context.Context) error { + // save the ops + _, ok := api.Store.Features()[FeatureFlushAll] + if ok { + err := api.Store.FlushAll() + if err != nil { + return err + } + + api.initialiseKeyIndexes() + } + keyIndex, err := api.Store.Get(keyIndexKey) if err != nil { return err @@ -18,7 +29,6 @@ func (api *API) FlushAll(ctx context.Context) error { } } - // If supported - //api.Store.FlushAll() + api.initialiseKeyIndexes() return nil } diff --git a/temporal/internal/driver/local/keyvalue.go b/temporal/internal/driver/local/keyvalue.go index 56c7663..7782653 100644 --- a/temporal/internal/driver/local/keyvalue.go +++ b/temporal/internal/driver/local/keyvalue.go @@ -126,6 +126,14 @@ func (api *API) Delete(ctx context.Context, key string) error { return api.updateDeletedKeysIndex(key) } +func NewCounter(value int64) *Object { + return &Object{ + Value: value, + Type: TypeCounter, + NoExp: true, + } +} + func (api *API) Increment(ctx context.Context, key string) (int64, error) { if key == "" { return 0, temperr.KeyEmpty @@ -134,11 +142,7 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) { o, err := api.Store.Get(key) if err != nil { // create the object - o = &Object{ - Value: int64(1), - Type: TypeCounter, - NoExp: true, - } + o = NewCounter(1) api.Store.Set(key, o) api.addToKeyIndex(key) @@ -146,11 +150,7 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) { } if o == nil { - o = &Object{ - Value: int64(1), - Type: TypeCounter, - NoExp: true, - } + o = NewCounter(1) api.Store.Set(key, o) api.addToKeyIndex(key) @@ -158,11 +158,8 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) { } if o.Deleted || o.IsExpired() { - o = &Object{ - Value: int64(0), - Type: TypeCounter, - NoExp: true, - } + o = NewCounter(0) + api.addToKeyIndex(key) } var v int64 = -1 @@ -181,7 +178,6 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) { o.Type = TypeCounter v = int64(o.Value.(int32)) case string: - fmt.Println("string") // try to convert conv, err := strconv.Atoi(o.Value.(string)) if err != nil { @@ -247,6 +243,7 @@ func (api *API) Decrement(ctx context.Context, key string) (newValue int64, err Type: TypeCounter, NoExp: true, } + api.addToKeyIndex(key) } var v int64 @@ -422,22 +419,17 @@ func (api *API) Keys(ctx context.Context, pattern string) ([]string, error) { if err != nil { return nil, err } - var deletedKeys map[string]bool - if deletedKeyIndexObj != nil { - deletedKeysList := deletedKeyIndexObj.Value.(map[string]interface{}) - deletedKeys = make(map[string]bool, len(deletedKeysList)) - for key := range deletedKeysList { - deletedKeys[key] = true - } - } else { - deletedKeys = make(map[string]bool) - } + + deletedKeys := deletedKeyIndexObj.Value.(map[string]interface{}) var retKeys []string for key := range keyIndex { // Check if the key matches the pattern and is not deleted - if (pattern == "" || strings.HasPrefix(key, pattern)) && !deletedKeys[key] { - retKeys = append(retKeys, key) + if pattern == "" || strings.HasPrefix(key, pattern) { + _, f := deletedKeys[key] + if !f { + retKeys = append(retKeys, key) + } } } diff --git a/temporal/internal/driver/local/local.go b/temporal/internal/driver/local/local.go index 9cb2747..a160097 100644 --- a/temporal/internal/driver/local/local.go +++ b/temporal/internal/driver/local/local.go @@ -47,7 +47,17 @@ func NewLocalStore(connector model.Connector) *API { Broker: connector.(*LocalConnector).Broker, } + initAlready, _ := api.Store.Get(keyIndexKey) + if initAlready != nil { + return api + } + // init the key indexes + api.initialiseKeyIndexes() + return api +} + +func (api *API) initialiseKeyIndexes() { api.Store.Set(keyIndexKey, &Object{ Type: TypeList, Value: map[string]interface{}{}, @@ -59,6 +69,4 @@ func NewLocalStore(connector model.Connector) *API { Value: map[string]interface{}{}, NoExp: true, }) - - return api } diff --git a/temporal/internal/driver/local/mockstore.go b/temporal/internal/driver/local/mockstore.go index f4b668c..8fd60c6 100644 --- a/temporal/internal/driver/local/mockstore.go +++ b/temporal/internal/driver/local/mockstore.go @@ -49,3 +49,9 @@ func (m *MockStore) FlushAll() error { m.data = make(map[string]interface{}) return nil } + +func (m *MockStore) Features() map[ExtendedFeature]bool { + return map[ExtendedFeature]bool{ + FeatureFlushAll: true, + } +} diff --git a/temporal/internal/driver/local/nonblocking_store.go b/temporal/internal/driver/local/nonblocking_store.go index f984988..b800744 100644 --- a/temporal/internal/driver/local/nonblocking_store.go +++ b/temporal/internal/driver/local/nonblocking_store.go @@ -60,3 +60,9 @@ func (m *LockFreeStore) FlushAll() error { return nil } + +func (m *LockFreeStore) Features() map[ExtendedFeature]bool { + return map[ExtendedFeature]bool{ + FeatureFlushAll: true, + } +} diff --git a/temporal/internal/driver/local/types.go b/temporal/internal/driver/local/types.go index d5f9de8..5970d95 100644 --- a/temporal/internal/driver/local/types.go +++ b/temporal/internal/driver/local/types.go @@ -33,11 +33,18 @@ func (o *Object) SetExpireAt(t time.Time) { o.NoExp = false } +type ExtendedFeature string + +const ( + FeatureFlushAll ExtendedFeature = "flushall" +) + type KVStore interface { Get(key string) (*Object, error) Set(key string, value interface{}) error Delete(key string) error FlushAll() error + Features() map[ExtendedFeature]bool } type Broker interface {