Skip to content

Commit

Permalink
Implement lock striping for cmap (#1053)
Browse files Browse the repository at this point in the history
This change introduces lock striping to improve concurrent map
performance by reducing lock contention. Instead of using a single lock
for the entire map, the implementation divides the map into shards,
each with its own lock, allowing for better parallelism and throughput
in concurrent operations.
  • Loading branch information
hackerwins authored Oct 28, 2024
1 parent 553e240 commit c776ae1
Showing 1 changed file with 102 additions and 35 deletions.
137 changes: 102 additions & 35 deletions pkg/cmap/cmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,50 +18,96 @@
package cmap

import (
"fmt"
"hash/fnv"
"sync"
)

// Map is a mutex-protected map.
type Map[K comparable, V any] struct {
// numShards is the number of shards.
const numShards = 32

type shard[K comparable, V any] struct {
sync.RWMutex
items map[K]V
}

// Map is a concurrent map that is safe for multiple routines. It is optimized
// to reduce lock contention and improve performance.
type Map[K comparable, V any] struct {
shards [numShards]shard[K, V]
}

// New creates a new Map.
func New[K comparable, V any]() *Map[K, V] {
return &Map[K, V]{
items: make(map[K]V),
m := &Map[K, V]{}
for i := 0; i < numShards; i++ {
m.shards[i].items = make(map[K]V)
}
return m
}

// shardForKey returns the shard for the given key.
func (m *Map[K, V]) shardForKey(key K) *shard[K, V] {
var idx uint32
switch k := any(key).(type) {
case string:
hash := fnv.New32a()
if _, err := hash.Write([]byte(k)); err != nil {
panic(fmt.Sprintf("shard for key: %s", err))
}
idx = hash.Sum32()
case int:
idx = uint32(k)
default:
hash := fnv.New32a()
if _, err := hash.Write([]byte(fmt.Sprintf("%v", key))); err != nil {
panic(fmt.Sprintf("shard for key: %s", err))
}
idx = hash.Sum32()
}

return &m.shards[idx%numShards]
}

// shardOf returns the shard for the given index.
func (m *Map[K, V]) shardOf(idx int) *shard[K, V] {
return &m.shards[idx%numShards]
}

// Set sets a key-value pair.
func (m *Map[K, V]) Set(key K, value V) {
m.Lock()
defer m.Unlock()
shard := m.shardForKey(key)

shard.Lock()
defer shard.Unlock()

m.items[key] = value
shard.items[key] = value
}

// UpsertFunc is a function to insert or update a key-value pair.
type UpsertFunc[K comparable, V any] func(valueInMap V, exists bool) V
type UpsertFunc[K comparable, V any] func(value V, exists bool) V

// Upsert inserts or updates a key-value pair.
func (m *Map[K, V]) Upsert(key K, upsertFunc UpsertFunc[K, V]) V {
m.Lock()
defer m.Unlock()
shard := m.shardForKey(key)

v, exists := m.items[key]
shard.Lock()
defer shard.Unlock()

v, exists := shard.items[key]
res := upsertFunc(v, exists)
m.items[key] = res
shard.items[key] = res
return res
}

// Get retrieves a value from the map.
func (m *Map[K, V]) Get(key K) (V, bool) {
m.RLock()
defer m.RUnlock()
shard := m.shardForKey(key)

shard.RLock()
defer shard.RUnlock()

value, exists := m.items[key]
value, exists := shard.items[key]
return value, exists
}

Expand All @@ -70,54 +116,75 @@ type DeleteFunc[K comparable, V any] func(value V, exists bool) bool

// Delete removes a value from the map.
func (m *Map[K, V]) Delete(key K, deleteFunc DeleteFunc[K, V]) bool {
m.Lock()
defer m.Unlock()
shard := m.shardForKey(key)

shard.Lock()
defer shard.Unlock()

value, exists := m.items[key]
value, exists := shard.items[key]
del := deleteFunc(value, exists)
if del && exists {
delete(m.items, key)
delete(shard.items, key)
}

return del
}

// Has checks if a key exists in the map
func (m *Map[K, V]) Has(key K) bool {
m.RLock()
defer m.RUnlock()
shard := m.shardForKey(key)

shard.RLock()
defer shard.RUnlock()

_, exists := m.items[key]
_, exists := shard.items[key]
return exists
}

// Len returns the number of items in the map
func (m *Map[K, V]) Len() int {
m.RLock()
defer m.RUnlock()
count := 0

for i := 0; i < numShards; i++ {
shard := &m.shards[i]

return len(m.items)
shard.RLock()
count += len(shard.items)
shard.RUnlock()
}

return count
}

// Keys returns a slice of all keys in the map
func (m *Map[K, V]) Keys() []K {
m.RLock()
defer m.RUnlock()
keys := make([]K, 0)

for i := 0; i < numShards; i++ {
shard := &m.shards[i]

keys := make([]K, 0, len(m.items))
for k := range m.items {
keys = append(keys, k)
shard.RLock()
for k := range shard.items {
keys = append(keys, k)
}
shard.RUnlock()
}
return keys
}

// Values returns a slice of all values in the map
func (m *Map[K, V]) Values() []V {
m.RLock()
defer m.RUnlock()
values := make([]V, 0)

values := make([]V, 0, len(m.items))
for _, v := range m.items {
values = append(values, v)
for i := 0; i < numShards; i++ {
shard := &m.shards[i]

shard.RLock()
for _, v := range shard.items {
values = append(values, v)
}
shard.RUnlock()
}

return values
}

0 comments on commit c776ae1

Please sign in to comment.