Skip to content

Commit

Permalink
fixed some critical issues in the k/v logic
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Aug 12, 2024
1 parent ce94325 commit 8ac1e07
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 8 deletions.
10 changes: 7 additions & 3 deletions temporal/internal/driver/local/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ func (api *API) FlushAll(ctx context.Context) error {
return err
}

keys := keyIndex.Value.(map[string]bool)
keys := keyIndex.Value.(map[string]interface{})
for key := range keys {
api.Delete(ctx, key)
err := api.Delete(ctx, key)
if err != nil {
return err
}
}

api.Store.FlushAll()
// If supported
//api.Store.FlushAll()
return nil
}
26 changes: 21 additions & 5 deletions temporal/internal/driver/local/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ func (api *API) Increment(ctx context.Context, key string) (int64, error) {
return 1, nil
}

if o.Deleted || o.IsExpired() {
o = &Object{
Value: int64(0),
Type: TypeCounter,
NoExp: true,
}
}

var v int64 = -1
if o.Type != TypeCounter {
switch o.Value.(type) {
Expand Down Expand Up @@ -233,6 +241,14 @@ func (api *API) Decrement(ctx context.Context, key string) (newValue int64, err
return -1, nil
}

if o.Deleted || o.IsExpired() {
o = &Object{
Value: int64(0),
Type: TypeCounter,
NoExp: true,
}
}

var v int64
if o.Type != TypeCounter {
switch o.Value.(type) {
Expand Down Expand Up @@ -280,12 +296,12 @@ func (api *API) Exists(ctx context.Context, key string) (exists bool, err error)
return false, temperr.KeyEmpty
}

o, err := api.Store.Get(key)
o, err := api.Get(ctx, key)
if err != nil {
return false, err
return false, nil
}

if o == nil {
if o == "" {
return false, nil
}

Expand Down Expand Up @@ -399,7 +415,7 @@ func (api *API) Keys(ctx context.Context, pattern string) ([]string, error) {
if keyIndexObj == nil {
return nil, nil
}
keyIndex := keyIndexObj.Value.(map[string]bool)
keyIndex := keyIndexObj.Value.(map[string]interface{})

// Get the deleted key index
deletedKeyIndexObj, err := api.Store.Get(deletedKeyIndexKey)
Expand All @@ -408,7 +424,7 @@ func (api *API) Keys(ctx context.Context, pattern string) ([]string, error) {
}
var deletedKeys map[string]bool
if deletedKeyIndexObj != nil {
deletedKeysList := deletedKeyIndexObj.Value.(map[string]bool)
deletedKeysList := deletedKeyIndexObj.Value.(map[string]interface{})
deletedKeys = make(map[string]bool, len(deletedKeysList))
for key := range deletedKeysList {
deletedKeys[key] = true
Expand Down
63 changes: 63 additions & 0 deletions temporal/internal/driver/local/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package local

func (api *API) addToKeyIndex(key string) error {
o, err := api.Store.Get(keyIndexKey)
if err != nil {
// not found, create new
o = &Object{
Type: TypeSet,
Value: map[string]interface{}{key: true},
NoExp: true,
}

return api.Store.Set(keyIndexKey, o)
}

if o == nil {
o = &Object{
Type: TypeSet,
Value: map[string]interface{}{key: true},
NoExp: true,
}
}

list := o.Value.(map[string]interface{})
list[key] = true

o.Value = list

err = api.Store.Set(keyIndexKey, o)
if err != nil {
return err
}

return nil
}

func (api *API) updateDeletedKeysIndex(key string) error {
o, err := api.Store.Get(deletedKeyIndexKey)
if err != nil {
// not found, create new
o = &Object{
Type: TypeSet,
Value: map[string]interface{}{key: true},
NoExp: true,
}

return api.Store.Set(deletedKeyIndexKey, o)
}

if o == nil {
o = &Object{
Type: TypeSet,
Value: map[string]interface{}{key: true},
NoExp: true,
}
}

list := o.Value.(map[string]interface{})
list[key] = true
o.Value = list

return api.Store.Set(deletedKeyIndexKey, o)
}

0 comments on commit 8ac1e07

Please sign in to comment.