Skip to content

Commit

Permalink
PUBSUB
Browse files Browse the repository at this point in the history
  • Loading branch information
Harmen authored Mar 29, 2019
2 parents a860672 + 8e2ff94 commit 7ae8e75
Show file tree
Hide file tree
Showing 27 changed files with 2,540 additions and 43 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ script: make test testrace int
sudo: false

go:
- 1.11
- 1.12
38 changes: 37 additions & 1 deletion cmd_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,33 @@ func (m *Miniredis) cmdPing(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
c.WriteInline("PONG")

if len(args) > 1 {
setDirty(c)
c.WriteError(errWrongNumber(cmd))
return
}

payload := ""
if len(args) > 0 {
payload = args[0]
}

// PING is allowed in subscribed state
if sub := getCtx(c).subscriber; sub != nil {
c.Block(func(c *server.Writer) {
c.WriteLen(2)
c.WriteBulk("pong")
c.WriteBulk(payload)
})
return
}

if payload == "" {
c.WriteInline("PONG")
return
}
c.WriteBulk(payload)
}

// AUTH
Expand All @@ -31,6 +57,10 @@ func (m *Miniredis) cmdAuth(c *server.Peer, cmd string, args []string) {
c.WriteError(errWrongNumber(cmd))
return
}
if m.checkPubsub(c) {
return
}

pw := args[0]

m.Lock()
Expand Down Expand Up @@ -58,6 +88,9 @@ func (m *Miniredis) cmdEcho(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

msg := args[0]
c.WriteBulk(msg)
Expand All @@ -73,6 +106,9 @@ func (m *Miniredis) cmdSelect(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

id, err := strconv.Atoi(args[0])
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions cmd_connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ func TestAuth(t *testing.T) {
ok(t, err)
}

func TestPing(t *testing.T) {
s, err := Run()
ok(t, err)
defer s.Close()
c, err := redis.Dial("tcp", s.Addr())
ok(t, err)

r, err := redis.String(c.Do("PING"))
ok(t, err)
equals(t, "PONG", r)

r, err = redis.String(c.Do("PING", "hi"))
ok(t, err)
equals(t, "hi", r)

_, err = c.Do("PING", "foo", "bar")
mustFail(t, err, errWrongNumber("ping"))

}

func TestEcho(t *testing.T) {
s, err := Run()
ok(t, err)
Expand Down
42 changes: 42 additions & 0 deletions cmd_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func makeCmdExpire(m *Miniredis, unix bool, d time.Duration) func(*server.Peer,
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]
value := args[1]
Expand Down Expand Up @@ -102,6 +105,10 @@ func (m *Miniredis) cmdTTL(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
Expand Down Expand Up @@ -133,6 +140,10 @@ func (m *Miniredis) cmdPTTL(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
Expand Down Expand Up @@ -164,6 +175,10 @@ func (m *Miniredis) cmdPersist(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
Expand Down Expand Up @@ -191,6 +206,9 @@ func (m *Miniredis) cmdDel(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
Expand All @@ -216,6 +234,9 @@ func (m *Miniredis) cmdType(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand All @@ -242,6 +263,9 @@ func (m *Miniredis) cmdExists(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
Expand All @@ -266,6 +290,9 @@ func (m *Miniredis) cmdMove(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]
targetDB, err := strconv.Atoi(args[1])
Expand Down Expand Up @@ -299,6 +326,9 @@ func (m *Miniredis) cmdKeys(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand All @@ -323,6 +353,9 @@ func (m *Miniredis) cmdRandomkey(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
Expand Down Expand Up @@ -352,6 +385,9 @@ func (m *Miniredis) cmdRename(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

from, to := args[0], args[1]

Expand All @@ -378,6 +414,9 @@ func (m *Miniredis) cmdRenamenx(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

from, to := args[0], args[1]

Expand Down Expand Up @@ -409,6 +448,9 @@ func (m *Miniredis) cmdScan(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

cursor, err := strconv.Atoi(args[0])
if err != nil {
Expand Down
42 changes: 42 additions & 0 deletions cmd_hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func (m *Miniredis) cmdHset(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, field, value := args[0], args[1], args[2]

Expand Down Expand Up @@ -66,6 +69,9 @@ func (m *Miniredis) cmdHsetnx(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, field, value := args[0], args[1], args[2]

Expand Down Expand Up @@ -102,6 +108,9 @@ func (m *Miniredis) cmdHmset(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, args := args[0], args[1:]
if len(args)%2 != 0 {
Expand Down Expand Up @@ -138,6 +147,9 @@ func (m *Miniredis) cmdHget(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, field := args[0], args[1]

Expand Down Expand Up @@ -172,6 +184,9 @@ func (m *Miniredis) cmdHdel(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, fields := args[0], args[1:]

Expand Down Expand Up @@ -217,6 +232,9 @@ func (m *Miniredis) cmdHexists(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, field := args[0], args[1]

Expand Down Expand Up @@ -251,6 +269,9 @@ func (m *Miniredis) cmdHgetall(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand Down Expand Up @@ -285,6 +306,9 @@ func (m *Miniredis) cmdHkeys(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand Down Expand Up @@ -318,6 +342,9 @@ func (m *Miniredis) cmdHvals(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand Down Expand Up @@ -351,6 +378,9 @@ func (m *Miniredis) cmdHlen(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand Down Expand Up @@ -381,6 +411,9 @@ func (m *Miniredis) cmdHmget(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]

Expand Down Expand Up @@ -419,6 +452,9 @@ func (m *Miniredis) cmdHincrby(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, field, deltas := args[0], args[1], args[2]

Expand Down Expand Up @@ -456,6 +492,9 @@ func (m *Miniredis) cmdHincrbyfloat(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key, field, deltas := args[0], args[1], args[2]

Expand Down Expand Up @@ -493,6 +532,9 @@ func (m *Miniredis) cmdHscan(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if m.checkPubsub(c) {
return
}

key := args[0]
cursor, err := strconv.Atoi(args[1])
Expand Down
Loading

0 comments on commit 7ae8e75

Please sign in to comment.