Skip to content

Commit

Permalink
Create separate handles for encoding and decoding msgpack
Browse files Browse the repository at this point in the history
  • Loading branch information
Christopher Swenson committed Nov 21, 2023
1 parent d1c58bb commit d7c087e
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 16 deletions.
19 changes: 11 additions & 8 deletions client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ func NewRPCClient(addr string) (*RPCClient, error) {
return ClientFromConfig(&conf)
}

func (c *Config) newMsgpackHandle() *codec.MsgpackHandle {
return &codec.MsgpackHandle{
WriteExt: true,
BasicHandle: codec.BasicHandle{
TimeNotBuiltin: !c.MsgpackUseNewTimeFormat,
},
}
}

// ClientFromConfig is used to create a new RPC client given the
// configuration object. This will return a client, or an error if
// the connection could not be established.
Expand All @@ -150,14 +159,8 @@ func ClientFromConfig(c *Config) (*RPCClient, error) {
dispatch: make(map[uint64]seqHandler),
shutdownCh: make(chan struct{}),
}
handle := &codec.MsgpackHandle{
WriteExt: true,
BasicHandle: codec.BasicHandle{
TimeNotBuiltin: !c.MsgpackUseNewTimeFormat,
},
}
client.dec = codec.NewDecoder(client.reader, handle)
client.enc = codec.NewEncoder(client.writer, handle)
client.dec = codec.NewDecoder(client.reader, c.newMsgpackHandle())
client.enc = codec.NewEncoder(client.writer, c.newMsgpackHandle())
go client.listen()

// Do the initial handshake
Expand Down
19 changes: 11 additions & 8 deletions cmd/serf/command/agent/ipc.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,15 @@ func (i *AgentIPC) isStopped() bool {
return atomic.LoadUint32(&i.stop) == 1
}

func (i *AgentIPC) newMsgpackHandle() *codec.MsgpackHandle {
return &codec.MsgpackHandle{
WriteExt: true,
BasicHandle: codec.BasicHandle{
TimeNotBuiltin: !i.msgpackUseNewTimeFormat,
},
}
}

// listen is a long running routine that listens for new clients
func (i *AgentIPC) listen() {
for {
Expand All @@ -395,14 +404,8 @@ func (i *AgentIPC) listen() {
eventStreams: make(map[uint64]*eventStream),
pendingQueries: make(map[uint64]*serf.Query),
}
handle := &codec.MsgpackHandle{
WriteExt: true,
BasicHandle: codec.BasicHandle{
TimeNotBuiltin: !i.msgpackUseNewTimeFormat,
},
}
client.dec = codec.NewDecoder(client.reader, handle)
client.enc = codec.NewEncoder(client.writer, handle)
client.dec = codec.NewDecoder(client.reader, i.newMsgpackHandle())
client.enc = codec.NewEncoder(client.writer, i.newMsgpackHandle())

// Register the client
i.Lock()
Expand Down

0 comments on commit d7c087e

Please sign in to comment.