diff --git a/go.mod b/go.mod index 797d0efd..7c295acd 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/acobaugh/osrelease v0.1.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dlclark/regexp2 v1.10.0 // indirect + github.com/emirpasic/gods v1.18.1 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-sourcemap/sourcemap v2.1.3+incompatible // indirect github.com/golang/snappy v0.0.4 // indirect diff --git a/go.sum b/go.sum index 513d956a..d19b3408 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d h1:wi6jN5LVt/ljaBG4ue7 github.com/dop251/goja v0.0.0-20231027120936-b396bb4c349d/go.mod h1:QMWlm50DNe14hD7t24KEqZuUdC9sOTy8W6XbCU1mlw4= github.com/dop251/goja_nodejs v0.0.0-20210225215109-d91c329300e7/go.mod h1:hn7BA7c8pLvoGndExHudxTDKZ84Pyvv+90pbBjbTz0Y= github.com/dop251/goja_nodejs v0.0.0-20211022123610-8dd9abb0616d/go.mod h1:DngW8aVqWbuLRMHItjPUyqdj+HWPvnQe8V8y1nDpIbM= +github.com/emirpasic/gods v1.18.1 h1:FXtiHYKDGKCW2KzwZKx0iC0PQmdlorYgdFG9jPXJ1Bc= +github.com/emirpasic/gods v1.18.1/go.mod h1:8tpGGwCnJ5H4r6BWwaV6OrWmMoPhUl5jm/FMNAnJvWQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= diff --git a/internal/util/hash.go b/internal/util/hash.go deleted file mode 100644 index 546ffb63..00000000 --- a/internal/util/hash.go +++ /dev/null @@ -1,7 +0,0 @@ -package util - -import "github.com/mitchellh/hashstructure/v2" - -func Hash(val any) (uint64, error) { - return hashstructure.Hash(val, hashstructure.FormatV2, nil) -} diff --git a/pkg/database/memdb/collection.go b/pkg/database/memdb/collection.go index 6c646b03..1e82b5d9 100644 --- a/pkg/database/memdb/collection.go +++ b/pkg/database/memdb/collection.go @@ -5,9 +5,10 @@ import ( "sort" "sync" + "github.com/emirpasic/gods/maps" "github.com/emirpasic/gods/maps/treemap" + "github.com/emirpasic/gods/utils" "github.com/pkg/errors" - "github.com/siyul-park/uniflow/internal/pool" "github.com/siyul-park/uniflow/internal/util" "github.com/siyul-park/uniflow/pkg/database" "github.com/siyul-park/uniflow/pkg/primitive" @@ -43,7 +44,7 @@ var ( func NewCollection(name string) *Collection { return &Collection{ name: name, - data: pool.GetMap(), + data: treemap.NewWith(comparator), indexView: NewIndexView(), dataLock: sync.RWMutex{}, streamLock: sync.RWMutex{}, @@ -302,12 +303,12 @@ func (coll *Collection) FindMany(ctx context.Context, filter *database.Filter, o } func (coll *Collection) Drop(ctx context.Context) error { - data, err := func() (*sync.Map, error) { + data, err := func() (maps.Map, error) { coll.dataLock.Lock() defer coll.dataLock.Unlock() data := coll.data - coll.data = pool.GetMap() + coll.data = treemap.NewWith(comparator) if err := coll.indexView.deleteAll(ctx); err != nil { return nil, err @@ -319,7 +320,7 @@ func (coll *Collection) Drop(ctx context.Context) error { return err } - data.Range(func(_, val any) bool { + for _, val := range data.Values() { doc := val.(*primitive.Map) if id, ok := doc.Get(keyID); ok { coll.emit(fullEvent{ @@ -330,8 +331,7 @@ func (coll *Collection) Drop(ctx context.Context) error { Document: doc, }) } - return true - }) + } coll.streamLock.Lock() defer coll.streamLock.Unlock() @@ -362,9 +362,7 @@ func (coll *Collection) insertMany(ctx context.Context, docs []*primitive.Map) ( for i, doc := range docs { if id, ok := doc.Get(keyID); !ok { return nil, errors.Wrap(errors.WithStack(ErrPKNotFound), database.ErrCodeWrite) - } else if hash, err := util.Hash(id); err != nil { - return nil, errors.Wrap(err, database.ErrCodeWrite) - } else if _, ok := coll.data.Load(hash); ok { + } else if _, ok := coll.data.Get(id); ok { return nil, errors.Wrap(errors.WithStack(ErrPKDuplicated), database.ErrCodeWrite) } else { ids[i] = id @@ -375,11 +373,7 @@ func (coll *Collection) insertMany(ctx context.Context, docs []*primitive.Map) ( return nil, errors.Wrap(err, database.ErrCodeWrite) } for i, doc := range docs { - if hash, err := util.Hash(ids[i].Interface()); err != nil { - return nil, errors.Wrap(err, database.ErrCodeWrite) - } else { - coll.data.Store(hash, doc) - } + coll.data.Put(ids[i], doc) } return ids, nil @@ -423,40 +417,39 @@ func (coll *Collection) findMany(ctx context.Context, filter *database.Filter, o scanSize = -1 } - scan := map[uint64]*primitive.Map{} + scan := treemap.NewWith(utils.Comparator(func(a, b any) int { + return primitive.Compare(a.(primitive.Object), b.(primitive.Object)) + })) if examples, ok := FilterToExample(filter); ok { if ids, err := coll.indexView.findMany(ctx, examples); err == nil { for _, id := range ids { - if scanSize == len(scan) { + if scanSize == scan.Size() { break - } else if hash, err := util.Hash(id.Interface()); err != nil { - return nil, errors.Wrap(err, database.ErrCodeWrite) - } else if doc, ok := coll.data.Load(hash); ok && match(doc.(*primitive.Map)) { - scan[hash] = doc.(*primitive.Map) + } else if doc, ok := coll.data.Get(id); ok && match(doc.(*primitive.Map)) { + scan.Put(id, doc) } } } } - if scanSize != len(scan) { - coll.data.Range(func(key, value any) bool { - if scanSize == len(scan) { - return false + if scanSize != scan.Size() { + for _, key := range coll.data.Keys() { + value, _ := coll.data.Get(key) + if scanSize == scan.Size() { + continue } - if match(value.(*primitive.Map)) { - scan[key.(uint64)] = value.(*primitive.Map) + scan.Put(key, value) } - return true - }) + } } - if skip >= len(scan) { + if skip >= scan.Size() { return nil, nil } var docs []*primitive.Map - for _, doc := range scan { - docs = append(docs, doc) + for _, doc := range scan.Values() { + docs = append(docs, doc.(*primitive.Map)) } if len(sorts) > 0 { @@ -508,11 +501,7 @@ func (coll *Collection) deleteMany(ctx context.Context, docs []*primitive.Map) ( } for _, id := range ids { - if hash, err := util.Hash(id.Interface()); err != nil { - return nil, errors.Wrap(err, database.ErrCodeWrite) - } else { - coll.data.Delete(hash) - } + coll.data.Remove(id) } return deletes, nil diff --git a/pkg/database/memdb/compare.go b/pkg/database/memdb/compare.go new file mode 100644 index 00000000..7d68e0f2 --- /dev/null +++ b/pkg/database/memdb/compare.go @@ -0,0 +1,12 @@ +package memdb + +import ( + "github.com/emirpasic/gods/utils" + "github.com/siyul-park/uniflow/pkg/primitive" +) + +var ( + comparator = utils.Comparator(func(a, b any) int { + return primitive.Compare(a.(primitive.Object), b.(primitive.Object)) + }) +) diff --git a/pkg/database/memdb/index.go b/pkg/database/memdb/index.go index abdb1a9d..5b518e08 100644 --- a/pkg/database/memdb/index.go +++ b/pkg/database/memdb/index.go @@ -4,9 +4,12 @@ import ( "context" "sync" + "github.com/emirpasic/gods/containers" + "github.com/emirpasic/gods/maps" + "github.com/emirpasic/gods/maps/treemap" + "github.com/emirpasic/gods/sets" + "github.com/emirpasic/gods/sets/treeset" "github.com/pkg/errors" - "github.com/siyul-park/uniflow/internal/pool" - "github.com/siyul-park/uniflow/internal/util" "github.com/siyul-park/uniflow/pkg/database" "github.com/siyul-park/uniflow/pkg/primitive" ) @@ -15,7 +18,7 @@ type ( IndexView struct { names []string models []database.IndexModel - data []*sync.Map + data []maps.Map lock sync.RWMutex } ) @@ -72,7 +75,7 @@ func (iv *IndexView) Create(_ context.Context, index database.IndexModel) error iv.names = append(iv.names, name) iv.models = append(iv.models, index) - iv.data = append(iv.data, pool.GetMap()) + iv.data = append(iv.data, treemap.NewWith(comparator)) return nil } @@ -135,8 +138,7 @@ func (iv *IndexView) findMany(_ context.Context, examples []*primitive.Map) ([]p iv.lock.RLock() defer iv.lock.RUnlock() - ids := pool.GetMap() - defer pool.PutMap(ids) + ids := treeset.NewWith(comparator) for _, example := range examples { if err := func() error { @@ -159,27 +161,15 @@ func (iv *IndexView) findMany(_ context.Context, examples []*primitive.Map) ([]p if obj, ok := primitive.Pick[primitive.Object](example, k); ok { v := primitive.Interface(obj) - hash, err := util.Hash(v) - if err != nil { - return err - } visits[k] = true - if sub, ok := curr.Load(hash); ok { + if sub, ok := curr.Get(v); ok { if i < len(model.Keys)-1 { - curr = sub.(*sync.Map) + curr = sub.(maps.Map) } else { if model.Unique { - if hsub, err := util.Hash(sub); err != nil { - return err - } else { - ids.Store(hsub, sub) - return nil - } + ids.Add(sub) } else { - sub.(*sync.Map).Range(func(key, val any) bool { - ids.Store(key, val) - return true - }) + ids.Add(sub.(sets.Set).Values()...) return nil } } @@ -201,7 +191,7 @@ func (iv *IndexView) findMany(_ context.Context, examples []*primitive.Map) ([]p continue } - var parent []*sync.Map + var parent []maps.Map parent = append(parent, curr) depth := len(model.Keys) - 1 @@ -210,21 +200,17 @@ func (iv *IndexView) findMany(_ context.Context, examples []*primitive.Map) ([]p } for ; i < depth; i++ { - var children []*sync.Map + var children []maps.Map for _, curr := range parent { - curr.Range(func(_, value any) bool { - children = append(children, value.(*sync.Map)) - return true - }) + for _, v := range curr.Values() { + children = append(children, v.(maps.Map)) + } } parent = children } for _, curr := range parent { - curr.Range(func(k, v any) bool { - ids.Store(k, v) - return true - }) + ids.Add(curr.Values()...) } return nil @@ -237,14 +223,16 @@ func (iv *IndexView) findMany(_ context.Context, examples []*primitive.Map) ([]p } var uniqueIds []primitive.Object - ids.Range(func(_, val any) bool { - uniqueIds = append(uniqueIds, val.(primitive.Object)) - return true - }) + for _, v := range ids.Values() { + uniqueIds = append(uniqueIds, v.(primitive.Object)) + } return uniqueIds, nil } func (iv *IndexView) insertOne(ctx context.Context, doc *primitive.Map) error { + iv.lock.Lock() + defer iv.lock.Unlock() + id, ok := doc.Get(keyID) if !ok { return ErrIndexConflict @@ -260,30 +248,27 @@ func (iv *IndexView) insertOne(ctx context.Context, doc *primitive.Map) error { for i, k := range model.Keys { obj, _ := primitive.Pick[primitive.Object](doc, k) - v := primitive.Interface(obj) - hash, err := util.Hash(v) - if err != nil { - return err - } if i < len(model.Keys)-1 { - cm := pool.GetMap() - sub, load := curr.LoadOrStore(hash, cm) - if load { - pool.PutMap(cm) + sub, ok := curr.Get(obj) + if !ok { + sub = treemap.NewWith(comparator) + curr.Put(obj, sub) } - curr = sub.(*sync.Map) + curr = sub.(maps.Map) } else if model.Unique { - if r, loaded := curr.LoadOrStore(hash, id); loaded && r != id { + if r, ok := curr.Get(obj); !ok { + curr.Put(obj, id) + } else if r != id { return ErrIndexConflict } } else { - cm := pool.GetMap() - r, load := curr.LoadOrStore(hash, cm) - if load { - pool.PutMap(cm) + r, ok := curr.Get(obj) + if !ok { + r = treeset.NewWith(comparator) + curr.Put(obj, r) } - r.(*sync.Map).Store(hash, id) + r.(sets.Set).Add(id) } } @@ -303,11 +288,6 @@ func (iv *IndexView) deleteOne(_ context.Context, doc *primitive.Map) error { return nil } - hid, err := util.Hash(id) - if err != nil { - return err - } - for i, model := range iv.models { if err := func() error { curr := iv.data[i] @@ -316,38 +296,32 @@ func (iv *IndexView) deleteOne(_ context.Context, doc *primitive.Map) error { return nil } - var nodes []*sync.Map + var nodes []containers.Container nodes = append(nodes, curr) - var keys []any + var keys []primitive.Object keys = append(keys, nil) for i, k := range model.Keys { obj, _ := primitive.Pick[primitive.Object](doc, k) - v := primitive.Interface(obj) - - hash, err := util.Hash(v) - if err != nil { - return err - } if i < len(model.Keys)-1 { - if sub, ok := curr.Load(hash); ok { - curr = sub.(*sync.Map) + if sub, ok := curr.Get(obj); ok { + curr = sub.(maps.Map) nodes = append(nodes, curr) - keys = append(keys, hash) + keys = append(keys, obj) } else { return nil } } else if model.Unique { - if r, loaded := curr.Load(hash); loaded && primitive.Equal(id, r.(primitive.Object)) { - curr.Delete(hash) + if r, ok := curr.Get(obj); ok && primitive.Equal(id, r.(primitive.Object)) { + curr.Remove(obj) } } else { - if r, loaded := curr.Load(hash); loaded { - nodes = append(nodes, r.(*sync.Map)) - keys = append(keys, hash) - r.(*sync.Map).Delete(hid) + if r, ok := curr.Get(obj); ok { + nodes = append(nodes, r.(sets.Set)) + keys = append(keys, obj) + r.(sets.Set).Remove(id) } } } @@ -355,18 +329,13 @@ func (iv *IndexView) deleteOne(_ context.Context, doc *primitive.Map) error { for i := len(nodes) - 1; i >= 0; i-- { node := nodes[i] - empty := true - node.Range(func(_, _ any) bool { - empty = false - return false - }) - - if empty && i > 0 { + if node.Empty() && i > 0 { parent := nodes[i-1] key := keys[i] - parent.Delete(key) - pool.PutMap(node) + if p, ok := parent.(maps.Map); ok { + p.Remove(key) + } } } diff --git a/pkg/primitive/map.go b/pkg/primitive/map.go index 32e0ef2b..27157f48 100644 --- a/pkg/primitive/map.go +++ b/pkg/primitive/map.go @@ -252,29 +252,7 @@ func (o *Map) Interface() any { } func (*comparer) Compare(a Object, b Object) int { - if a == nil { - return -1 - } else if b == nil { - return 1 - } else if a.Kind() > b.Kind() { - return 1 - } else if a.Kind() < b.Kind() { - return -1 - } - - hashA := a.Hash() - hashB := b.Hash() - - if hashA > hashB { - return 1 - } else if hashA < hashB { - return -1 - } - - if !a.Equal(b) { - return 1 - } - return 0 + return Compare(a, b) } // NewMapEncoder is encode map or struct to Map.