diff --git a/examples/chatroom-go/Makefile b/examples/chatroom-go/Makefile deleted file mode 100644 index 2acb89d39..000000000 --- a/examples/chatroom-go/Makefile +++ /dev/null @@ -1,2 +0,0 @@ -run: - go run main.go diff --git a/examples/chatroom-go/README.md b/examples/chatroom-go/README.md index 29c2b79db..20cf3dde9 100644 --- a/examples/chatroom-go/README.md +++ b/examples/chatroom-go/README.md @@ -1,3 +1,6 @@ -## Potential Risks +Chatroom +=== -1. may have update when multiple updates happen at same time (granualrity)? +```sh +$ go run main.go +``` diff --git a/examples/chatroom-go/db/main.go b/examples/chatroom-go/db/main.go index 7fb8d6324..fecc61078 100644 --- a/examples/chatroom-go/db/main.go +++ b/examples/chatroom-go/db/main.go @@ -11,7 +11,7 @@ var ( func init() { var err error - Client, err = dicedb.NewClient("localhost", 7379, dicedb.WithWatch()) + Client, err = dicedb.NewClient("localhost", 7379) if err != nil { panic(err) } diff --git a/examples/chatroom-go/go.mod b/examples/chatroom-go/go.mod index 590400755..64ef12ffd 100644 --- a/examples/chatroom-go/go.mod +++ b/examples/chatroom-go/go.mod @@ -6,6 +6,7 @@ require ( github.com/charmbracelet/bubbles v0.20.0 github.com/charmbracelet/bubbletea v1.2.4 github.com/charmbracelet/lipgloss v1.0.0 + github.com/dicedb/dicedb-go v1.0.1 ) require ( @@ -13,8 +14,8 @@ require ( github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect github.com/charmbracelet/x/ansi v0.4.5 // indirect github.com/charmbracelet/x/term v0.2.1 // indirect - github.com/dicedb/dicedb-go v1.0.1 // indirect github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f // indirect + github.com/google/uuid v1.6.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-localereader v0.0.1 // indirect diff --git a/examples/chatroom-go/go.sum b/examples/chatroom-go/go.sum index d53310c87..d7f45b237 100644 --- a/examples/chatroom-go/go.sum +++ b/examples/chatroom-go/go.sum @@ -16,6 +16,10 @@ github.com/charmbracelet/x/term v0.2.1 h1:AQeHeLZ1OqSXhrAWpYUtZyX1T3zVxfpZuEQMIQ github.com/charmbracelet/x/term v0.2.1/go.mod h1:oQ4enTYFV7QN4m0i9mzHrViD7TQKvNEEkHUMCmsxdUg= github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f h1:Y/CXytFA4m6baUTXGLOoWe4PQhGxaX0KpnayAqC48p4= github.com/erikgeiser/coninput v0.0.0-20211004153227-1c3628e74d0f/go.mod h1:vw97MGsxSvLiUE2X8qFplwetxpGLQrlU1Q9AUEIzCaM= +github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= @@ -41,5 +45,7 @@ golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.36.4 h1:6A3ZDJHn/eNqc1i+IdefRzy/9PokBTPvcqMySR7NNIM= google.golang.org/protobuf v1.36.4/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= diff --git a/examples/chatroom-go/main.go b/examples/chatroom-go/main.go index 0f4dd4e2d..6feef5151 100644 --- a/examples/chatroom-go/main.go +++ b/examples/chatroom-go/main.go @@ -3,9 +3,6 @@ package main -// A simple program demonstrating the text area component from the Bubbles -// component library. - import ( "chatroom-go/db" "fmt" @@ -33,7 +30,11 @@ func init() { } func loop() { - for resp := range db.Client.WatchCh() { + ch, err := db.Client.WatchCh() + if err != nil { + panic(err) + } + for resp := range ch { fmt.Println(resp) } } diff --git a/internal/cmd/cmd_client_id.go b/internal/cmd/cmd_client_id.go deleted file mode 100644 index 3236a19f4..000000000 --- a/internal/cmd/cmd_client_id.go +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright (c) 2022-present, DiceDB contributors -// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information. - -package cmd - -import ( - dstore "github.com/dicedb/dice/internal/store" - "github.com/dicedb/dicedb-go/wire" -) - -var cCLIENTID = &DiceDBCommand{ - Name: "CLIENT.ID", - HelpShort: "CLIENT.ID gets and sets the client ID", - Eval: evalCLIENTID, -} - -func init() { - commandRegistry.AddCommand(cCLIENTID) -} - -func evalCLIENTID(c *Cmd, s *dstore.Store) (*CmdRes, error) { - if len(c.C.Args) == 0 { - return &CmdRes{R: &wire.Response{ - Value: &wire.Response_VStr{VStr: c.ClientID}, - }}, nil - } - c.ClientID = c.C.Args[0] - return cmdResOK, nil -} diff --git a/internal/cmd/cmd_handshake.go b/internal/cmd/cmd_handshake.go new file mode 100644 index 000000000..f85c16757 --- /dev/null +++ b/internal/cmd/cmd_handshake.go @@ -0,0 +1,27 @@ +// Copyright (c) 2022-present, DiceDB contributors +// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information. + +package cmd + +import ( + dstore "github.com/dicedb/dice/internal/store" +) + +var cHANDSHAKE = &DiceDBCommand{ + Name: "HANDSHAKE", + HelpShort: "HANDSHAKE is used to handshake with the database; sends client_id and execution mode", + Eval: evalHANDSHAKE, +} + +func init() { + commandRegistry.AddCommand(cHANDSHAKE) +} + +func evalHANDSHAKE(c *Cmd, s *dstore.Store) (*CmdRes, error) { + if len(c.C.Args) != 2 { + return cmdResNil, errWrongArgumentCount("HANDSHAKE") + } + c.ClientID = c.C.Args[0] + c.Mode = c.C.Args[1] + return cmdResOK, nil +} diff --git a/internal/cmd/cmd_mode.go b/internal/cmd/cmd_mode.go new file mode 100644 index 000000000..9fcee1ca7 --- /dev/null +++ b/internal/cmd/cmd_mode.go @@ -0,0 +1,22 @@ +// Copyright (c) 2022-present, DiceDB contributors +// All rights reserved. Licensed under the BSD 3-Clause License. See LICENSE file in the project root for full license information. + +package cmd + +import ( + dstore "github.com/dicedb/dice/internal/store" +) + +var cMODE = &DiceDBCommand{ + Name: "MODE", + HelpShort: "MODE sets the mode of the client", + Eval: evalMODE, +} + +func init() { + commandRegistry.AddCommand(cMODE) +} + +func evalMODE(c *Cmd, s *dstore.Store) (*CmdRes, error) { + return cmdResOK, nil +} diff --git a/internal/cmd/cmds.go b/internal/cmd/cmds.go index 9abcebc01..7942b697d 100644 --- a/internal/cmd/cmds.go +++ b/internal/cmd/cmds.go @@ -23,6 +23,7 @@ type Cmd struct { C *wire.Command IsReplay bool ClientID string + Mode string } func (c *Cmd) String() string { @@ -83,6 +84,7 @@ func Execute(c *Cmd, s *store.Store) (*CmdRes, error) { slog.Debug("command executed", slog.Any("cmd", c.String()), slog.String("client_id", c.ClientID), + slog.String("mode", c.Mode), slog.Int("shard_id", s.ShardID), slog.Any("took_ns", time.Since(start).Nanoseconds())) return resp, err diff --git a/internal/server/ironhawk/iothread.go b/internal/server/ironhawk/iothread.go index 960263827..2b23991b7 100644 --- a/internal/server/ironhawk/iothread.go +++ b/internal/server/ironhawk/iothread.go @@ -14,6 +14,7 @@ import ( type IOThread struct { ClientID string + Mode string IoHandler *IOHandler Session *auth.Session } @@ -39,6 +40,7 @@ func (t *IOThread) StartSync(ctx context.Context, shardManager *ShardManager, wa _c := &cmd.Cmd{C: c} _c.ClientID = t.ClientID + _c.Mode = t.Mode res, err := shardManager.Execute(_c) if err != nil { @@ -49,6 +51,11 @@ func (t *IOThread) StartSync(ctx context.Context, shardManager *ShardManager, wa // Also, CLientID is duplicated in command and io-thread. t.ClientID = _c.ClientID + if c.Cmd == "HANDSHAKE" { + t.ClientID = _c.C.Args[0] + t.Mode = _c.C.Args[1] + } + if strings.HasSuffix(c.Cmd, ".WATCH") { watchManager.HandleWatch(_c, t) } diff --git a/internal/server/ironhawk/main.go b/internal/server/ironhawk/main.go index 546eec3f7..38c2f4cd6 100644 --- a/internal/server/ironhawk/main.go +++ b/internal/server/ironhawk/main.go @@ -158,9 +158,15 @@ func (s *Server) startIOThread(ctx context.Context, wg *sync.WaitGroup, thread * if err != nil { if err == io.EOF { s.watchManager.CleanupThreadWatchSubscriptions(thread) - slog.Debug("client disconnected. io-thread stopped", slog.String("client_id", thread.ClientID)) + slog.Debug("client disconnected. io-thread stopped", + slog.String("client_id", thread.ClientID), + slog.String("mode", thread.Mode), + ) } else { - slog.Debug("io-thread errored out", slog.String("client_id", thread.ClientID), slog.Any("error", err)) + slog.Debug("io-thread errored out", + slog.String("client_id", thread.ClientID), + slog.String("mode", thread.Mode), + slog.Any("error", err)) } } } diff --git a/internal/server/ironhawk/watch_manager.go b/internal/server/ironhawk/watch_manager.go index 567b66814..315a87789 100644 --- a/internal/server/ironhawk/watch_manager.go +++ b/internal/server/ironhawk/watch_manager.go @@ -12,37 +12,54 @@ import ( ) type WatchManager struct { + clientWatchThreadMap map[string]*IOThread + keyFPMap map[string]map[uint32]bool - fpThreadMap map[uint32]map[*IOThread]bool + fpClientMap map[uint32]map[string]bool fpCmdMap map[uint32]*cmd.Cmd } func NewWatchManager() *WatchManager { return &WatchManager{ + clientWatchThreadMap: map[string]*IOThread{}, + keyFPMap: map[string]map[uint32]bool{}, - fpThreadMap: map[uint32]map[*IOThread]bool{}, + fpClientMap: map[uint32]map[string]bool{}, fpCmdMap: map[uint32]*cmd.Cmd{}, } } func (w *WatchManager) HandleWatch(c *cmd.Cmd, t *IOThread) { - fp := c.GetFingerprint() - key := c.Key() + fp, key := c.GetFingerprint(), c.Key() slog.Debug("creating a new subscription", slog.String("key", key), slog.String("cmd", c.String()), slog.Any("fingerprint", fp)) + // For the key that will be watched through any .WATCH command + // Create an entry in the map that holds, key <--> [command fingerprint] as map if _, ok := w.keyFPMap[key]; !ok { w.keyFPMap[key] = make(map[uint32]bool) } w.keyFPMap[key][fp] = true - if _, ok := w.fpThreadMap[fp]; !ok { - w.fpThreadMap[fp] = make(map[*IOThread]bool) + // For the fingerprint + // Create an entry in the map that holds, fingerprint <--> [client id] as map + // This tells us which clients are subscribed to a particular fingerprint + if _, ok := w.fpClientMap[fp]; !ok { + w.fpClientMap[fp] = make(map[string]bool) } - w.fpThreadMap[fp][t] = true + w.fpClientMap[fp][t.ClientID] = true + + // Store the fingerprint <--> command mapping + // so that we understand what should we execute when the data changes w.fpCmdMap[fp] = c + + // If the thread is a WATCH thread, store it in the map + // so that we can notify the clients when the data changes + if t.Mode == "WATCH" { + w.clientWatchThreadMap[t.ClientID] = t + } } func (w *WatchManager) HandleUnwatch(c *cmd.Cmd, t *IOThread) { @@ -50,50 +67,56 @@ func (w *WatchManager) HandleUnwatch(c *cmd.Cmd, t *IOThread) { return } + // Parse the fingerprint from the command _fp, err := strconv.ParseUint(c.C.Args[0], 10, 32) if err != nil { return } fp := uint32(_fp) - delete(w.fpThreadMap[fp], t) - if len(w.fpThreadMap[fp]) == 0 { - delete(w.fpThreadMap, fp) - } + // Multiple clients can unsubscribe from the same fingerprint + // So, we need to delete the one that is unsubscribing + delete(w.fpClientMap[fp], t.ClientID) - for key, fpMap := range w.keyFPMap { - if _, ok := fpMap[fp]; ok { - delete(w.keyFPMap[key], fp) - } - if len(w.keyFPMap[key]) == 0 { - delete(w.keyFPMap, key) - } + // If a fingerprint has no clients subscribed to it, delete the fingerprint from the map. + if len(w.fpClientMap[fp]) == 0 { + delete(w.fpClientMap, fp) + + // If we have deleted the fingerprint, delete the command from the map + delete(w.fpCmdMap, fp) } - // TODO: Maintain ref count for gp -> cmd mapping - // delete it from delete(fpCmdMap, fp) only when ref count is 0 - // check if any easier way to do this + // Delete the mapping where we have the key <--> [command fingerprint] + // This seems to be an O(n) operation. + // Downside of keeping this entry laying around is that + // If key k changes, then we may be iterating through the fingerprint that does not have any active watcher + // Hence we should do a lazy deletion. + + // TODO: If the key gets deleted from the database + // delete the subscriptions against that key from all the places. } func (w *WatchManager) CleanupThreadWatchSubscriptions(t *IOThread) { - for fp, threadMap := range w.fpThreadMap { - if _, ok := threadMap[t]; ok { - delete(w.fpThreadMap[fp], t) - } - if len(w.fpThreadMap[fp]) == 0 { - delete(w.fpThreadMap, fp) + // Delete the mapping of Watch thread to client id + delete(w.clientWatchThreadMap, t.ClientID) + + // Delete all the subscriptions of the client from the fingerprint maps + // Note: this is an O(n) operation and hence if there are large number of clients, this might be expensive. + // We can do a lazy deletion of the fingerprint map if this becomes a problem. + for fp := range w.fpClientMap { + delete(w.fpClientMap[fp], t.ClientID) + if len(w.fpClientMap[fp]) == 0 { + delete(w.fpClientMap, fp) } } } func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *ShardManager, t *IOThread) { - // TODO: During first WATCH call, we are getting the response multiple times on the Client - // Check if this is happening because of the way we are notifying the watchers key := c.Key() for fp := range w.keyFPMap[key] { _c := w.fpCmdMap[fp] if _c == nil { - // TODO: We might want to remove the key from keyFPMap if we don't have a command for it. + // TODO: Not having a command for a fingerprint is a bug. continue } @@ -105,13 +128,23 @@ func (w *WatchManager) NotifyWatchers(c *cmd.Cmd, shardManager *ShardManager, t continue } - for thread := range w.fpThreadMap[fp] { + for clientID := range w.fpClientMap[fp] { + thread := w.clientWatchThreadMap[clientID] + if thread == nil { + // if there is no thread against the client, delete the client from the map + delete(w.clientWatchThreadMap, clientID) + continue + } + err := thread.IoHandler.WriteSync(context.Background(), r.R) if err != nil { - slog.Error("failed to write response to thread", slog.Any("client_id", thread.ClientID), slog.Any("error", err)) + slog.Error("failed to write response to thread", + slog.Any("client_id", thread.ClientID), + slog.String("mode", thread.Mode), + slog.Any("error", err)) } } - slog.Debug("notifying watchers for key", slog.String("key", key), slog.Int("watchers", len(w.fpThreadMap[fp]))) + slog.Debug("notifying watchers for key", slog.String("key", key), slog.Int("watchers", len(w.fpClientMap[fp]))) } }