Skip to content

Commit

Permalink
Remove map of map
Browse files Browse the repository at this point in the history
  • Loading branch information
vadimalekseev committed Aug 28, 2023
1 parent e34e03c commit d3a48e1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 41 deletions.
12 changes: 10 additions & 2 deletions pipeline/pipeline_whitebox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,25 @@ func TestPipeline_streamEvent(t *testing.T) {
event.SourceID = SourceID(streamID)
event.streamName = DefaultStreamName
event.SeqID = 123456789
key := streamKey{
ID: streamID,
Name: DefaultStreamName,
}

p.streamEvent(event)

assert.Equal(t, event, p.streamer.getStream(streamID, DefaultStreamName).first)
assert.Equal(t, event, p.streamer.getStream(key).first)

p.UseSpread()
p.streamEvent(event)

expectedStreamID := StreamID(event.SeqID % uint64(procs))

assert.Equal(t, event, p.streamer.getStream(expectedStreamID, DefaultStreamName).first)
key = streamKey{
ID: expectedStreamID,
Name: DefaultStreamName,
}
assert.Equal(t, event, p.streamer.getStream(key).first)
}

// Can't use fake plugin here dye cycle import
Expand Down
74 changes: 35 additions & 39 deletions pipeline/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pipeline

import (
"fmt"
"strings"
"sync"
"time"

Expand All @@ -11,34 +12,34 @@ import (

type StreamID uint64

type streamKey struct {
ID StreamID
Name StreamName
}

type streamer struct {
streams map[StreamID]map[StreamName]*stream
mu *sync.RWMutex
streams map[streamKey]*stream
mu sync.RWMutex

shouldStop atomic.Bool

charged []*stream
chargedMu *sync.Mutex
chargedMu sync.Mutex
chargedCond *sync.Cond

blocked []*stream
blockedMu *sync.Mutex
blockedMu sync.Mutex

eventTimeout time.Duration
}

func newStreamer(eventTimeout time.Duration) *streamer {
streamer := &streamer{
streams: make(map[StreamID]map[StreamName]*stream),
mu: &sync.RWMutex{},
charged: make([]*stream, 0),

chargedMu: &sync.Mutex{},
blockedMu: &sync.Mutex{},

streams: make(map[streamKey]*stream),
charged: make([]*stream, 0),
eventTimeout: eventTimeout,
}
streamer.chargedCond = sync.NewCond(streamer.chargedMu)
streamer.chargedCond = sync.NewCond(&streamer.chargedMu)

return streamer
}
Expand All @@ -51,22 +52,24 @@ func (s *streamer) stop() {
s.shouldStop.Store(true)

s.mu.Lock()
for _, source := range s.streams {
for _, stream := range source {
stream.put(unlockEvent(stream))
}
for _, stream := range s.streams {
stream.put(unlockEvent(stream))
}
s.mu.Unlock()
}

func (s *streamer) putEvent(streamID StreamID, streamName StreamName, event *Event) uint64 {
return s.getStream(streamID, streamName).put(event)
key := streamKey{
ID: streamID,
Name: streamName,
}
return s.getStream(key).put(event)
}

func (s *streamer) getStream(streamID StreamID, streamName StreamName) *stream {
func (s *streamer) getStream(key streamKey) *stream {
// fast path, stream has been already created
s.mu.RLock()
st, has := s.streams[streamID][streamName]
st, has := s.streams[key]
s.mu.RUnlock()
if has {
return st
Expand All @@ -75,20 +78,15 @@ func (s *streamer) getStream(streamID StreamID, streamName StreamName) *stream {
// slow path, create new stream
s.mu.Lock()
defer s.mu.Unlock()
st, has = s.streams[streamID][streamName]
st, has = s.streams[key]
if has {
return st
}

_, has = s.streams[streamID]
if !has {
s.streams[streamID] = make(map[StreamName]*stream)
}

// copy streamName because it's unsafe []byte instead of regular string
streamNameCopy := StreamName([]byte(streamName))
st = newStream(streamNameCopy, streamID, s)
s.streams[streamID][streamNameCopy] = st
key.Name = StreamName(strings.Clone(string(key.Name)))
st = newStream(key.Name, key.ID, s)
s.streams[key] = st

return st
}
Expand Down Expand Up @@ -170,18 +168,16 @@ func (s *streamer) dump() string {

out := logger.Cond(len(s.streams) == 0, logger.Header("no streams"), func() string {
o := logger.Header("streams")
for _, s := range s.streams {
for _, stream := range s {
state := "| UNATTACHED |"
if stream.isAttached {
state = "| ATTACHED |"
}
if stream.isDetaching {
state = "| DETACHING |"
}

o += fmt.Sprintf("%d(%s) state=%s, away event id=%d, commit event id=%d, len=%d\n", stream.streamID, stream.name, state, stream.awaySeq, stream.commitSeq.Load(), stream.len)
for _, stream := range s.streams {
state := "| UNATTACHED |"
if stream.isAttached {
state = "| ATTACHED |"
}
if stream.isDetaching {
state = "| DETACHING |"
}

o += fmt.Sprintf("%d(%s) state=%s, away event id=%d, commit event id=%d, len=%d\n", stream.streamID, stream.name, state, stream.awaySeq, stream.commitSeq.Load(), stream.len)
}

return o
Expand Down

0 comments on commit d3a48e1

Please sign in to comment.