Skip to content

Commit

Permalink
redis 7.0 (#279)
Browse files Browse the repository at this point in the history
- implement redis 7.0.7 (from 6.X). Main changes:
   - update error messages
   - support nx|xx|gt|lt options in [P]EXPIRE[AT]
   - update how deleted items are processed in pending queues in streams
  • Loading branch information
alicebob authored Jan 6, 2023
1 parent 83dbb3d commit 331f847
Show file tree
Hide file tree
Showing 26 changed files with 352 additions and 175 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ Commands which will probably not be implemented:

## &c.

Integration tests are run against Redis 6.2.6. The [./integration](./integration/) subdir
Integration tests are run against Redis 7.0.7. The [./integration](./integration/) subdir
compares miniredis against a real redis instance.

The Redis 6 RESP3 protocol is supported. If there are problems, please open
Expand Down
5 changes: 3 additions & 2 deletions cmd_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ func (m *Miniredis) cmdAuth(c *server.Peer, cmd string, args []string) {
if m.checkPubsub(c, cmd) {
return
}
if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

Expand Down
66 changes: 63 additions & 3 deletions cmd_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package miniredis

import (
"fmt"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -44,7 +45,7 @@ func commandsGeneric(m *Miniredis) {
// converted to a duration.
func makeCmdExpire(m *Miniredis, unix bool, d time.Duration) func(*server.Peer, string, []string) {
return func(c *server.Peer, cmd string, args []string) {
if len(args) != 2 {
if len(args) < 2 {
setDirty(c)
c.WriteError(errWrongNumber(cmd))
return
Expand All @@ -59,11 +60,43 @@ func makeCmdExpire(m *Miniredis, unix bool, d time.Duration) func(*server.Peer,
var opts struct {
key string
value int
nx bool
xx bool
gt bool
lt bool
}
opts.key = args[0]
if ok := optInt(c, args[1], &opts.value); !ok {
return
}
args = args[2:]
for len(args) > 0 {
switch strings.ToLower(args[0]) {
case "nx":
opts.nx = true
case "xx":
opts.xx = true
case "gt":
opts.gt = true
case "lt":
opts.lt = true
default:
setDirty(c)
c.WriteError(fmt.Sprintf("ERR Unsupported option %s", args[0]))
return
}
args = args[1:]
}
if opts.gt && opts.lt {
setDirty(c)
c.WriteError("ERR GT and LT options at the same time are not compatible")
return
}
if opts.nx && (opts.xx || opts.gt || opts.lt) {
setDirty(c)
c.WriteError("ERR NX and XX, GT or LT options at the same time are not compatible")
return
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
db := m.db(ctx.selectedDB)
Expand All @@ -73,11 +106,38 @@ func makeCmdExpire(m *Miniredis, unix bool, d time.Duration) func(*server.Peer,
c.WriteInt(0)
return
}

oldTTL, ok := db.ttl[opts.key]

var newTTL time.Duration
if unix {
db.ttl[opts.key] = m.at(opts.value, d)
newTTL = m.at(opts.value, d)
} else {
db.ttl[opts.key] = time.Duration(opts.value) * d
newTTL = time.Duration(opts.value) * d
}

// > NX -- Set expiry only when the key has no expiry
if opts.nx && ok {
c.WriteInt(0)
return
}
// > XX -- Set expiry only when the key has an existing expiry
if opts.xx && !ok {
c.WriteInt(0)
return
}
// > GT -- Set expiry only when the new expiry is greater than current one
// (no exp == infinity)
if opts.gt && (!ok || newTTL <= oldTTL) {
c.WriteInt(0)
return
}
// > LT -- Set expiry only when the new expiry is less than current one
if opts.lt && ok && newTTL > oldTTL {
c.WriteInt(0)
return
}
db.ttl[opts.key] = newTTL
db.keyVersion[opts.key]++
db.checkTTL(opts.key)
c.WriteInt(1)
Expand Down
11 changes: 6 additions & 5 deletions cmd_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ func (m *Miniredis) cmdXpop(c *server.Peer, cmd string, args []string, lr leftri

if !db.exists(opts.key) {
// non-existing key is fine
if opts.withCount && !c.Resp3 {
// zero-length list in this specific case. Looks like a redis bug to me.
c.WriteLen(-1)
return
}
c.WriteNull()
return
}
Expand All @@ -481,11 +486,7 @@ func (m *Miniredis) cmdXpop(c *server.Peer, cmd string, args []string, lr leftri
}
opts.count -= 1
}
if len(popped) == 0 {
c.WriteLen(-1)
} else {
c.WriteStrings(popped)
}
c.WriteStrings(popped)
return
}

Expand Down
4 changes: 2 additions & 2 deletions cmd_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,12 @@ func TestLpop(t *testing.T) {

mustDo(t, c,
"LPOP", "l2", "99",
proto.Nil,
proto.NilList,
)

mustDo(t, c,
"LPOP", "l2", "0",
proto.Nil,
proto.NilList,
)

// Last element has been popped. Key is gone.
Expand Down
24 changes: 15 additions & 9 deletions cmd_pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ func (m *Miniredis) cmdSubscribe(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

Expand All @@ -53,8 +54,9 @@ func (m *Miniredis) cmdUnsubscribe(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

Expand Down Expand Up @@ -103,8 +105,9 @@ func (m *Miniredis) cmdPsubscribe(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

Expand All @@ -127,8 +130,9 @@ func (m *Miniredis) cmdPunsubscribe(c *server.Peer, cmd string, args []string) {
if !m.handleAuth(c) {
return
}
if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

Expand Down Expand Up @@ -212,7 +216,9 @@ func (m *Miniredis) cmdPubSub(c *server.Peer, cmd string, args []string) {
case "NUMPAT":
argsOk = len(subargs) == 0
default:
argsOk = false
setDirty(c)
c.WriteError(fmt.Sprintf(msgFPubsubUsageSimple, subcommand))
return
}

if !argsOk {
Expand Down
4 changes: 2 additions & 2 deletions cmd_pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,10 +598,10 @@ func TestPubSubBadArgs(t *testing.T) {
)
mustDo(t, c,
"PUBSUB", "FOOBAR",
proto.Error("ERR Unknown subcommand or wrong number of arguments for 'FOOBAR'. Try PUBSUB HELP."),
proto.Error("ERR unknown subcommand 'FOOBAR'. Try PUBSUB HELP."),
)
mustDo(t, c,
"PUBSUB", "CHANNELS", "FOOBAR1", "FOOBAR2",
proto.Error("ERR Unknown subcommand or wrong number of arguments for 'CHANNELS'. Try PUBSUB HELP."),
proto.Error("ERR unknown subcommand or wrong number of arguments for 'CHANNELS'. Try PUBSUB HELP."),
)
}
95 changes: 58 additions & 37 deletions cmd_scripting.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func commandsScripting(m *Miniredis) {

// Execute lua. Needs to run m.Lock()ed, from within withTx().
// Returns true if the lua was OK (and hence should be cached).
func (m *Miniredis) runLuaScript(c *server.Peer, script string, args []string) bool {
func (m *Miniredis) runLuaScript(c *server.Peer, sha, script string, args []string) bool {
l := lua.NewState(lua.Options{SkipOpenLibs: true})
defer l.Close()

Expand Down Expand Up @@ -80,7 +80,7 @@ func (m *Miniredis) runLuaScript(c *server.Peer, script string, args []string) b
}
l.SetGlobal("ARGV", argvTable)

redisFuncs, redisConstants := mkLua(m.srv, c)
redisFuncs, redisConstants := mkLua(m.srv, c, sha)
// Register command handlers
l.Push(l.NewFunction(func(l *lua.LState) int {
mod := l.RegisterModule("redis", redisFuncs).(*lua.LTable)
Expand Down Expand Up @@ -117,18 +117,18 @@ func (m *Miniredis) cmdEval(c *server.Peer, cmd string, args []string) {
if m.checkPubsub(c, cmd) {
return
}

if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

script, args := args[0], args[1:]

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
ok := m.runLuaScript(c, script, args)
sha := sha1Hex(script)
ok := m.runLuaScript(c, sha, script, args)
if ok {
sha := sha1Hex(script)
m.scripts[sha] = script
}
})
Expand All @@ -146,8 +146,9 @@ func (m *Miniredis) cmdEvalsha(c *server.Peer, cmd string, args []string) {
if m.checkPubsub(c, cmd) {
return
}
if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

Expand All @@ -160,7 +161,7 @@ func (m *Miniredis) cmdEvalsha(c *server.Peer, cmd string, args []string) {
return
}

m.runLuaScript(c, script, args)
m.runLuaScript(c, sha, script, args)
})
}

Expand All @@ -177,28 +178,62 @@ func (m *Miniredis) cmdScript(c *server.Peer, cmd string, args []string) {
return
}

if getCtx(c).nested {
c.WriteError(msgNotFromScripts)
ctx := getCtx(c)
if ctx.nested {
c.WriteError(msgNotFromScripts(ctx.nestedSHA))
return
}

subcmd, args := args[0], args[1:]
var opts struct {
subcmd string
script string
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
switch strings.ToLower(subcmd) {
case "load":
if len(args) != 1 {
c.WriteError(fmt.Sprintf(msgFScriptUsage, "LOAD"))
return
opts.subcmd, args = args[0], args[1:]

switch strings.ToLower(opts.subcmd) {
case "load":
if len(args) != 1 {
setDirty(c)
c.WriteError(fmt.Sprintf(msgFScriptUsage, "LOAD"))
return
}
opts.script = args[0]
case "exists":
if len(args) == 0 {
setDirty(c)
c.WriteError(errWrongNumber("script|exists"))
return
}
case "flush":
if len(args) == 1 {
switch strings.ToUpper(args[0]) {
case "SYNC", "ASYNC":
args = args[1:]
default:
}
script := args[0]
}
if len(args) != 0 {
setDirty(c)
c.WriteError(msgScriptFlush)
return
}

if _, err := parse.Parse(strings.NewReader(script), "user_script"); err != nil {
default:
setDirty(c)
c.WriteError(fmt.Sprintf(msgFScriptUsageSimple, strings.ToUpper(opts.subcmd)))
return
}

withTx(m, c, func(c *server.Peer, ctx *connCtx) {
switch strings.ToLower(opts.subcmd) {
case "load":
if _, err := parse.Parse(strings.NewReader(opts.script), "user_script"); err != nil {
c.WriteError(errLuaParseError(err))
return
}
sha := sha1Hex(script)
m.scripts[sha] = script
sha := sha1Hex(opts.script)
m.scripts[sha] = opts.script
c.WriteBulk(sha)

case "exists":
Expand All @@ -212,23 +247,9 @@ func (m *Miniredis) cmdScript(c *server.Peer, cmd string, args []string) {
}

case "flush":
if len(args) == 1 {
switch strings.ToUpper(args[0]) {
case "SYNC", "ASYNC":
args = args[1:]
default:
}
}
if len(args) != 0 {
c.WriteError(msgScriptFlush)
return
}

m.scripts = map[string]string{}
c.WriteOK()

default:
c.WriteError(fmt.Sprintf(msgFScriptUsage, strings.ToUpper(subcmd)))
}
})
}
Expand Down
Loading

0 comments on commit 331f847

Please sign in to comment.