Skip to content

Commit

Permalink
Merge pull request #86 from alicebob/psub
Browse files Browse the repository at this point in the history
reply with "pmessage" for psubscriptions

and assorted cleanups.
  • Loading branch information
alicebob authored Jul 24, 2019
2 parents 3819308 + 150346d commit 383fb2e
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 614 deletions.
141 changes: 20 additions & 121 deletions cmd_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package miniredis

import (
"fmt"
"regexp"
"strings"

"github.com/alicebob/miniredis/v2/server"
Expand Down Expand Up @@ -35,11 +34,11 @@ func (m *Miniredis) cmdSubscribe(c *server.Peer, cmd string, args []string) {
sub := m.subscribedState(c)
for _, channel := range args {
n := sub.Subscribe(channel)
c.Block(func(c *server.Writer) {
c.WriteLen(3)
c.WriteBulk("subscribe")
c.WriteBulk(channel)
c.WriteInt(n)
c.Block(func(w *server.Writer) {
w.WriteLen(3)
w.WriteBulk("subscribe")
w.WriteBulk(channel)
w.WriteInt(n)
})
}
})
Expand All @@ -63,11 +62,11 @@ func (m *Miniredis) cmdUnsubscribe(c *server.Peer, cmd string, args []string) {
// there is no de-duplication
for _, channel := range channels {
n := sub.Unsubscribe(channel)
c.Block(func(c *server.Writer) {
c.WriteLen(3)
c.WriteBulk("unsubscribe")
c.WriteBulk(channel)
c.WriteInt(n)
c.Block(func(w *server.Writer) {
w.WriteLen(3)
w.WriteBulk("unsubscribe")
w.WriteBulk(channel)
w.WriteInt(n)
})
}

Expand All @@ -92,116 +91,16 @@ func (m *Miniredis) cmdPsubscribe(c *server.Peer, cmd string, args []string) {
sub := m.subscribedState(c)
for _, pat := range args {
n := sub.Psubscribe(pat)
c.Block(func(c *server.Writer) {
c.WriteLen(3)
c.WriteBulk("psubscribe")
c.WriteBulk(pat)
c.WriteInt(n)
c.Block(func(w *server.Writer) {
w.WriteLen(3)
w.WriteBulk("psubscribe")
w.WriteBulk(pat)
w.WriteInt(n)
})
}
})
}

func compileChannelPattern(pattern string) *regexp.Regexp {
const readingLiteral uint8 = 0
const afterEscape uint8 = 1
const inClass uint8 = 2

rgx := []rune{'\\', 'A'}
state := readingLiteral
literals := []rune{}
klass := map[rune]struct{}{}

for _, c := range pattern {
switch state {
case readingLiteral:
switch c {
case '\\':
state = afterEscape
case '?':
rgx = append(rgx, append([]rune(regexp.QuoteMeta(string(literals))), '.')...)
literals = []rune{}
case '*':
rgx = append(rgx, append([]rune(regexp.QuoteMeta(string(literals))), '.', '*')...)
literals = []rune{}
case '[':
rgx = append(rgx, []rune(regexp.QuoteMeta(string(literals)))...)
literals = []rune{}
state = inClass
default:
literals = append(literals, c)
}
case afterEscape:
literals = append(literals, c)
state = readingLiteral
case inClass:
if c == ']' {
expr := []rune{'['}

if _, hasDash := klass['-']; hasDash {
delete(klass, '-')
expr = append(expr, '-')
}

flatClass := make([]rune, len(klass))
i := 0

for c := range klass {
flatClass[i] = c
i++
}

klass = map[rune]struct{}{}
expr = append(append(expr, []rune(regexp.QuoteMeta(string(flatClass)))...), ']')

if len(expr) < 3 {
rgx = append(rgx, 'x', '\\', 'b', 'y')
} else {
rgx = append(rgx, expr...)
}

state = readingLiteral
} else {
klass[c] = struct{}{}
}
}
}

switch state {
case afterEscape:
rgx = append(rgx, '\\', '\\')
case inClass:
if len(klass) < 0 {
rgx = append(rgx, '\\', '[')
} else {
expr := []rune{'['}

if _, hasDash := klass['-']; hasDash {
delete(klass, '-')
expr = append(expr, '-')
}

flatClass := make([]rune, len(klass))
i := 0

for c := range klass {
flatClass[i] = c
i++
}

expr = append(append(expr, []rune(regexp.QuoteMeta(string(flatClass)))...), ']')

if len(expr) < 3 {
rgx = append(rgx, 'x', '\\', 'b', 'y')
} else {
rgx = append(rgx, expr...)
}
}
}

return regexp.MustCompile(string(append(rgx, '\\', 'z')))
}

// PUNSUBSCRIBE
func (m *Miniredis) cmdPunsubscribe(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
Expand All @@ -220,11 +119,11 @@ func (m *Miniredis) cmdPunsubscribe(c *server.Peer, cmd string, args []string) {
// there is no de-duplication
for _, pat := range patterns {
n := sub.Punsubscribe(pat)
c.Block(func(c *server.Writer) {
c.WriteLen(3)
c.WriteBulk("punsubscribe")
c.WriteBulk(pat)
c.WriteInt(n)
c.Block(func(w *server.Writer) {
w.WriteLen(3)
w.WriteBulk("punsubscribe")
w.WriteBulk(pat)
w.WriteInt(n)
})
}

Expand Down
Loading

0 comments on commit 383fb2e

Please sign in to comment.