From c776ae1b4841fb7d0c7c4e3025e61cf921326087 Mon Sep 17 00:00:00 2001 From: Youngteac Hong Date: Mon, 28 Oct 2024 09:54:22 +0900 Subject: [PATCH] Implement lock striping for cmap (#1053) This change introduces lock striping to improve concurrent map performance by reducing lock contention. Instead of using a single lock for the entire map, the implementation divides the map into shards, each with its own lock, allowing for better parallelism and throughput in concurrent operations. --- pkg/cmap/cmap.go | 137 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 102 insertions(+), 35 deletions(-) diff --git a/pkg/cmap/cmap.go b/pkg/cmap/cmap.go index 039ccaa11..420485600 100644 --- a/pkg/cmap/cmap.go +++ b/pkg/cmap/cmap.go @@ -18,50 +18,96 @@ package cmap import ( + "fmt" + "hash/fnv" "sync" ) -// Map is a mutex-protected map. -type Map[K comparable, V any] struct { +// numShards is the number of shards. +const numShards = 32 + +type shard[K comparable, V any] struct { sync.RWMutex items map[K]V } +// Map is a concurrent map that is safe for multiple routines. It is optimized +// to reduce lock contention and improve performance. +type Map[K comparable, V any] struct { + shards [numShards]shard[K, V] +} + // New creates a new Map. func New[K comparable, V any]() *Map[K, V] { - return &Map[K, V]{ - items: make(map[K]V), + m := &Map[K, V]{} + for i := 0; i < numShards; i++ { + m.shards[i].items = make(map[K]V) + } + return m +} + +// shardForKey returns the shard for the given key. +func (m *Map[K, V]) shardForKey(key K) *shard[K, V] { + var idx uint32 + switch k := any(key).(type) { + case string: + hash := fnv.New32a() + if _, err := hash.Write([]byte(k)); err != nil { + panic(fmt.Sprintf("shard for key: %s", err)) + } + idx = hash.Sum32() + case int: + idx = uint32(k) + default: + hash := fnv.New32a() + if _, err := hash.Write([]byte(fmt.Sprintf("%v", key))); err != nil { + panic(fmt.Sprintf("shard for key: %s", err)) + } + idx = hash.Sum32() } + + return &m.shards[idx%numShards] +} + +// shardOf returns the shard for the given index. +func (m *Map[K, V]) shardOf(idx int) *shard[K, V] { + return &m.shards[idx%numShards] } // Set sets a key-value pair. func (m *Map[K, V]) Set(key K, value V) { - m.Lock() - defer m.Unlock() + shard := m.shardForKey(key) + + shard.Lock() + defer shard.Unlock() - m.items[key] = value + shard.items[key] = value } // UpsertFunc is a function to insert or update a key-value pair. -type UpsertFunc[K comparable, V any] func(valueInMap V, exists bool) V +type UpsertFunc[K comparable, V any] func(value V, exists bool) V // Upsert inserts or updates a key-value pair. func (m *Map[K, V]) Upsert(key K, upsertFunc UpsertFunc[K, V]) V { - m.Lock() - defer m.Unlock() + shard := m.shardForKey(key) - v, exists := m.items[key] + shard.Lock() + defer shard.Unlock() + + v, exists := shard.items[key] res := upsertFunc(v, exists) - m.items[key] = res + shard.items[key] = res return res } // Get retrieves a value from the map. func (m *Map[K, V]) Get(key K) (V, bool) { - m.RLock() - defer m.RUnlock() + shard := m.shardForKey(key) + + shard.RLock() + defer shard.RUnlock() - value, exists := m.items[key] + value, exists := shard.items[key] return value, exists } @@ -70,54 +116,75 @@ type DeleteFunc[K comparable, V any] func(value V, exists bool) bool // Delete removes a value from the map. func (m *Map[K, V]) Delete(key K, deleteFunc DeleteFunc[K, V]) bool { - m.Lock() - defer m.Unlock() + shard := m.shardForKey(key) + + shard.Lock() + defer shard.Unlock() - value, exists := m.items[key] + value, exists := shard.items[key] del := deleteFunc(value, exists) if del && exists { - delete(m.items, key) + delete(shard.items, key) } + return del } // Has checks if a key exists in the map func (m *Map[K, V]) Has(key K) bool { - m.RLock() - defer m.RUnlock() + shard := m.shardForKey(key) + + shard.RLock() + defer shard.RUnlock() - _, exists := m.items[key] + _, exists := shard.items[key] return exists } // Len returns the number of items in the map func (m *Map[K, V]) Len() int { - m.RLock() - defer m.RUnlock() + count := 0 + + for i := 0; i < numShards; i++ { + shard := &m.shards[i] - return len(m.items) + shard.RLock() + count += len(shard.items) + shard.RUnlock() + } + + return count } // Keys returns a slice of all keys in the map func (m *Map[K, V]) Keys() []K { - m.RLock() - defer m.RUnlock() + keys := make([]K, 0) + + for i := 0; i < numShards; i++ { + shard := &m.shards[i] - keys := make([]K, 0, len(m.items)) - for k := range m.items { - keys = append(keys, k) + shard.RLock() + for k := range shard.items { + keys = append(keys, k) + } + shard.RUnlock() } return keys } // Values returns a slice of all values in the map func (m *Map[K, V]) Values() []V { - m.RLock() - defer m.RUnlock() + values := make([]V, 0) - values := make([]V, 0, len(m.items)) - for _, v := range m.items { - values = append(values, v) + for i := 0; i < numShards; i++ { + shard := &m.shards[i] + + shard.RLock() + for _, v := range shard.items { + values = append(values, v) + } + shard.RUnlock() } + return values }