Skip to content

Commit

Permalink
fixed deletion logic for non-flusher-supporting K/V stores
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Aug 12, 2024
1 parent cb56832 commit aa8ee0f
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 32 deletions.
14 changes: 12 additions & 2 deletions temporal/internal/driver/local/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ import (
)

func (api *API) FlushAll(ctx context.Context) error {
// save the ops
_, ok := api.Store.Features()[FeatureFlushAll]
if ok {
err := api.Store.FlushAll()
if err != nil {
return err
}

api.initialiseKeyIndexes()
}

keyIndex, err := api.Store.Get(keyIndexKey)
if err != nil {
return err
Expand All @@ -18,7 +29,6 @@ func (api *API) FlushAll(ctx context.Context) error {
}
}

// If supported
//api.Store.FlushAll()
api.initialiseKeyIndexes()
return nil
}
48 changes: 20 additions & 28 deletions temporal/internal/driver/local/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ func (api *API) Delete(ctx context.Context, key string) error {
return api.updateDeletedKeysIndex(key)
}

func NewCounter(value int64) *Object {
return &Object{
Value: value,
Type: TypeCounter,
NoExp: true,
}
}

func (api *API) Increment(ctx context.Context, key string) (int64, error) {
if key == "" {
return 0, temperr.KeyEmpty
Expand All @@ -134,35 +142,24 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) {
o, err := api.Store.Get(key)
if err != nil {
// create the object
o = &Object{
Value: int64(1),
Type: TypeCounter,
NoExp: true,
}
o = NewCounter(1)

api.Store.Set(key, o)
api.addToKeyIndex(key)
return 1, nil
}

if o == nil {
o = &Object{
Value: int64(1),
Type: TypeCounter,
NoExp: true,
}
o = NewCounter(1)

api.Store.Set(key, o)
api.addToKeyIndex(key)
return 1, nil
}

if o.Deleted || o.IsExpired() {
o = &Object{
Value: int64(0),
Type: TypeCounter,
NoExp: true,
}
o = NewCounter(0)
api.addToKeyIndex(key)
}

var v int64 = -1
Expand All @@ -181,7 +178,6 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) {
o.Type = TypeCounter
v = int64(o.Value.(int32))
case string:
fmt.Println("string")
// try to convert
conv, err := strconv.Atoi(o.Value.(string))
if err != nil {
Expand Down Expand Up @@ -247,6 +243,7 @@ func (api *API) Decrement(ctx context.Context, key string) (newValue int64, err
Type: TypeCounter,
NoExp: true,
}
api.addToKeyIndex(key)
}

var v int64
Expand Down Expand Up @@ -422,22 +419,17 @@ func (api *API) Keys(ctx context.Context, pattern string) ([]string, error) {
if err != nil {
return nil, err
}
var deletedKeys map[string]bool
if deletedKeyIndexObj != nil {
deletedKeysList := deletedKeyIndexObj.Value.(map[string]interface{})
deletedKeys = make(map[string]bool, len(deletedKeysList))
for key := range deletedKeysList {
deletedKeys[key] = true
}
} else {
deletedKeys = make(map[string]bool)
}

deletedKeys := deletedKeyIndexObj.Value.(map[string]interface{})

var retKeys []string
for key := range keyIndex {
// Check if the key matches the pattern and is not deleted
if (pattern == "" || strings.HasPrefix(key, pattern)) && !deletedKeys[key] {
retKeys = append(retKeys, key)
if pattern == "" || strings.HasPrefix(key, pattern) {
_, f := deletedKeys[key]
if !f {
retKeys = append(retKeys, key)
}
}
}

Expand Down
12 changes: 10 additions & 2 deletions temporal/internal/driver/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,17 @@ func NewLocalStore(connector model.Connector) *API {
Broker: connector.(*LocalConnector).Broker,
}

initAlready, _ := api.Store.Get(keyIndexKey)
if initAlready != nil {
return api
}

// init the key indexes
api.initialiseKeyIndexes()
return api
}

func (api *API) initialiseKeyIndexes() {
api.Store.Set(keyIndexKey, &Object{
Type: TypeList,
Value: map[string]interface{}{},
Expand All @@ -59,6 +69,4 @@ func NewLocalStore(connector model.Connector) *API {
Value: map[string]interface{}{},
NoExp: true,
})

return api
}
6 changes: 6 additions & 0 deletions temporal/internal/driver/local/mockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,9 @@ func (m *MockStore) FlushAll() error {
m.data = make(map[string]interface{})
return nil
}

func (m *MockStore) Features() map[ExtendedFeature]bool {
return map[ExtendedFeature]bool{
FeatureFlushAll: true,
}
}
6 changes: 6 additions & 0 deletions temporal/internal/driver/local/nonblocking_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,9 @@ func (m *LockFreeStore) FlushAll() error {

return nil
}

func (m *LockFreeStore) Features() map[ExtendedFeature]bool {
return map[ExtendedFeature]bool{
FeatureFlushAll: true,
}
}
7 changes: 7 additions & 0 deletions temporal/internal/driver/local/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,18 @@ func (o *Object) SetExpireAt(t time.Time) {
o.NoExp = false
}

type ExtendedFeature string

const (
FeatureFlushAll ExtendedFeature = "flushall"
)

type KVStore interface {
Get(key string) (*Object, error)
Set(key string, value interface{}) error
Delete(key string) error
FlushAll() error
Features() map[ExtendedFeature]bool
}

type Broker interface {
Expand Down

0 comments on commit aa8ee0f

Please sign in to comment.