Skip to content

Commit b1f7350

Browse files
committed
[udp] make response message cache configurable
Modifies UDP conn implementation to allow for a custom response message cache to be provided. This change is implemented in such a way that API compatibility with previous v3 releases is preserved. Users may now provide an option to supply their own cache implementation. Structured messages, rather than their serialized representation, are now passed to the cache implementation to allow for caching decisions to be made in the cache implementation. For example, it may be desirable to skip caching blockwise message responses if the entire underlying data being transferred is also cached. The cache implementation is responsible for cloning messages or otherwise ensuring that it is not storing data that may subsequently be modified. Signed-off-by: Daniel Mangum <georgedanielmangum@gmail.com>
1 parent 91a04ea commit b1f7350

File tree

1 file changed

+86
-27
lines changed

1 file changed

+86
-27
lines changed

udp/client/conn.go

Lines changed: 86 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math"
88
"net"
9+
"strconv"
910
"sync"
1011
"time"
1112

@@ -127,6 +128,62 @@ func (m *midElement) GetMessage(cc *Conn) (*pool.Message, bool, error) {
127128
return msg, true, nil
128129
}
129130

131+
// MessageCache is a cache of CoAP messages.
132+
type MessageCache interface {
133+
Load(key string, msg *pool.Message) (bool, error)
134+
Store(key string, msg *pool.Message) error
135+
CheckExpirations(time.Time)
136+
}
137+
138+
// messageCache is a CoAP message cache backed by an in-memory cache.
139+
type messageCache struct {
140+
c *cache.Cache[string, []byte]
141+
}
142+
143+
// newMessageCache constructs a new CoAP message cache.
144+
func newMessageCache() *messageCache {
145+
return &messageCache{
146+
c: cache.NewCache[string, []byte](),
147+
}
148+
}
149+
150+
// Load loads a message from the cache if one exists with key.
151+
func (m *messageCache) Load(key string, msg *pool.Message) (bool, error) {
152+
cachedResp := m.c.Load(key)
153+
if cachedResp == nil {
154+
return false, nil
155+
}
156+
if rawMsg := cachedResp.Data(); len(rawMsg) > 0 {
157+
_, err := msg.UnmarshalWithDecoder(coder.DefaultCoder, rawMsg)
158+
if err != nil {
159+
return false, err
160+
}
161+
return true, nil
162+
}
163+
return false, nil
164+
}
165+
166+
// Store stores a message in the cache.
167+
func (m *messageCache) Store(key string, msg *pool.Message) error {
168+
if _, err := msg.GetOptionUint32(message.Block2); err == nil {
169+
// Skip caching blockwise response.
170+
return nil
171+
}
172+
marshaledResp, err := msg.MarshalWithEncoder(coder.DefaultCoder)
173+
if err != nil {
174+
return err
175+
}
176+
cacheMsg := make([]byte, len(marshaledResp))
177+
copy(cacheMsg, marshaledResp)
178+
m.c.LoadOrStore(key, cache.NewElement(cacheMsg, time.Now().Add(ExchangeLifetime), nil))
179+
return nil
180+
}
181+
182+
// CheckExpirations checks the cache for any expirations.
183+
func (m *messageCache) CheckExpirations(now time.Time) {
184+
m.c.CheckExpirations(now)
185+
}
186+
130187
// Conn represents a virtual connection to a conceptual endpoint, to perform COAPs commands.
131188
type Conn struct {
132189
// This field needs to be the first in the struct to ensure proper word alignment on 32-bit platforms.
@@ -145,7 +202,7 @@ type Conn struct {
145202

146203
processReceivedMessage config.ProcessReceivedMessageFunc[*Conn]
147204
errors ErrorFunc
148-
responseMsgCache *cache.Cache[string, []byte]
205+
responseMsgCache MessageCache
149206
msgIDMutex *MutexMap
150207

151208
tokenHandlerContainer *coapSync.Map[uint64, HandlerFunc]
@@ -192,6 +249,7 @@ type ConnOptions struct {
192249
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]
193250
inactivityMonitor InactivityMonitor
194251
requestMonitor RequestMonitorFunc
252+
responseMsgCache MessageCache
195253
}
196254

197255
type Option = func(opts *ConnOptions)
@@ -220,6 +278,23 @@ func WithRequestMonitor(requestMonitor RequestMonitorFunc) Option {
220278
}
221279
}
222280

281+
// WithResponseMessageCache sets the cache used for response messages. All
282+
// response messages are submitted to the cache, but it is up to the cache
283+
// implementation to determine which messages are stored and for how long.
284+
// Caching responses enables sending the same Acknowledgment for retransmitted
285+
// confirmable messages within an EXCHANGE_LIFETIME. It may be desirable to
286+
// relax this behavior in some scenarios.
287+
// https://datatracker.ietf.org/doc/html/rfc7252#section-4.5
288+
// The default response message cache stores all responses with an expiration of
289+
// 247 seconds, which is EXCHANGE_LIFETIME when using default CoAP transmission
290+
// parameters.
291+
// https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2
292+
func WithResponseMessageCache(cache MessageCache) Option {
293+
return func(opts *ConnOptions) {
294+
opts.responseMsgCache = cache
295+
}
296+
}
297+
223298
func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
224299
if cfg.Errors == nil {
225300
cfg.Errors = func(error) {
@@ -248,6 +323,10 @@ func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
248323
for _, o := range opts {
249324
o(&cfgOpts)
250325
}
326+
// Only construct cache if one was not set via options.
327+
if cfgOpts.responseMsgCache == nil {
328+
cfgOpts.responseMsgCache = newMessageCache()
329+
}
251330
cc := Conn{
252331
session: session,
253332
transmission: &Transmission{
@@ -262,7 +341,7 @@ func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
262341
processReceivedMessage: cfg.ProcessReceivedMessage,
263342
errors: cfg.Errors,
264343
msgIDMutex: NewMutexMap(),
265-
responseMsgCache: cache.NewCache[string, []byte](),
344+
responseMsgCache: cfgOpts.responseMsgCache,
266345
inactivityMonitor: cfgOpts.inactivityMonitor,
267346
requestMonitor: cfgOpts.requestMonitor,
268347
messagePool: cfg.MessagePool,
@@ -609,34 +688,14 @@ func (cc *Conn) Sequence() uint64 {
609688
return cc.sequence.Add(1)
610689
}
611690

612-
func (cc *Conn) responseMsgCacheID(msgID int32) string {
613-
return fmt.Sprintf("resp-%v-%d", cc.RemoteAddr(), msgID)
691+
// getResponseFromCache gets a message from the response message cache.
692+
func (cc *Conn) getResponseFromCache(mid int32, resp *pool.Message) (bool, error) {
693+
return cc.responseMsgCache.Load(strconv.Itoa(int(mid)), resp)
614694
}
615695

696+
// addResponseToCache adds a message to the response message cache.
616697
func (cc *Conn) addResponseToCache(resp *pool.Message) error {
617-
marshaledResp, err := resp.MarshalWithEncoder(coder.DefaultCoder)
618-
if err != nil {
619-
return err
620-
}
621-
cacheMsg := make([]byte, len(marshaledResp))
622-
copy(cacheMsg, marshaledResp)
623-
cc.responseMsgCache.LoadOrStore(cc.responseMsgCacheID(resp.MessageID()), cache.NewElement(cacheMsg, time.Now().Add(ExchangeLifetime), nil))
624-
return nil
625-
}
626-
627-
func (cc *Conn) getResponseFromCache(mid int32, resp *pool.Message) (bool, error) {
628-
cachedResp := cc.responseMsgCache.Load(cc.responseMsgCacheID(mid))
629-
if cachedResp == nil {
630-
return false, nil
631-
}
632-
if rawMsg := cachedResp.Data(); len(rawMsg) > 0 {
633-
_, err := resp.UnmarshalWithDecoder(coder.DefaultCoder, rawMsg)
634-
if err != nil {
635-
return false, err
636-
}
637-
return true, nil
638-
}
639-
return false, nil
698+
return cc.responseMsgCache.Store(strconv.Itoa(int(resp.MessageID())), resp)
640699
}
641700

642701
// checkMyMessageID compare client msgID against peer messageID and if it is near < 0xffff/4 then increase msgID.

0 commit comments

Comments
 (0)