Skip to content

Commit b80b7a8

Browse files
committed
[udp] make response message cache configurable
Modifies UDP conn implementation to allow for a custom response message cache to be provided. This change is implemented in such a way that API compatibility with previous v3 releases is preserved. Users may now provide an option to supply their own cache implementation. Structured messages, rather than their serialized representation, are now passed to the cache implementation to allow for caching decisions to be made in the cache implementation. For example, it may be desirable to skip caching blockwise message responses if the entire underlying data being transferred is also cached. The cache implementation is responsible for cloning messages or otherwise ensuring that it is not storing data that may subsequently be modified. Signed-off-by: Daniel Mangum <georgedanielmangum@gmail.com>
1 parent 91a04ea commit b80b7a8

File tree

1 file changed

+82
-27
lines changed

1 file changed

+82
-27
lines changed

udp/client/conn.go

Lines changed: 82 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"math"
88
"net"
9+
"strconv"
910
"sync"
1011
"time"
1112

@@ -127,6 +128,58 @@ func (m *midElement) GetMessage(cc *Conn) (*pool.Message, bool, error) {
127128
return msg, true, nil
128129
}
129130

131+
// MessageCache is a cache of CoAP messages.
132+
type MessageCache interface {
133+
Load(key string, msg *pool.Message) (bool, error)
134+
Store(key string, msg *pool.Message) error
135+
CheckExpirations(time.Time)
136+
}
137+
138+
// messageCache is a CoAP message cache backed by an in-memory cache.
139+
type messageCache struct {
140+
c *cache.Cache[string, []byte]
141+
}
142+
143+
// newMessageCache constructs a new CoAP message cache.
144+
func newMessageCache() *messageCache {
145+
return &messageCache{
146+
c: cache.NewCache[string, []byte](),
147+
}
148+
}
149+
150+
// Load loads a message from the cache if one exists with key.
151+
func (m *messageCache) Load(key string, msg *pool.Message) (bool, error) {
152+
cachedResp := m.c.Load(key)
153+
if cachedResp == nil {
154+
return false, nil
155+
}
156+
if rawMsg := cachedResp.Data(); len(rawMsg) > 0 {
157+
_, err := msg.UnmarshalWithDecoder(coder.DefaultCoder, rawMsg)
158+
if err != nil {
159+
return false, err
160+
}
161+
return true, nil
162+
}
163+
return false, nil
164+
}
165+
166+
// Store stores a message in the cache.
167+
func (m *messageCache) Store(key string, msg *pool.Message) error {
168+
marshaledResp, err := msg.MarshalWithEncoder(coder.DefaultCoder)
169+
if err != nil {
170+
return err
171+
}
172+
cacheMsg := make([]byte, len(marshaledResp))
173+
copy(cacheMsg, marshaledResp)
174+
m.c.LoadOrStore(key, cache.NewElement(cacheMsg, time.Now().Add(ExchangeLifetime), nil))
175+
return nil
176+
}
177+
178+
// CheckExpirations checks the cache for any expirations.
179+
func (m *messageCache) CheckExpirations(now time.Time) {
180+
m.c.CheckExpirations(now)
181+
}
182+
130183
// Conn represents a virtual connection to a conceptual endpoint, to perform COAPs commands.
131184
type Conn struct {
132185
// This field needs to be the first in the struct to ensure proper word alignment on 32-bit platforms.
@@ -145,7 +198,7 @@ type Conn struct {
145198

146199
processReceivedMessage config.ProcessReceivedMessageFunc[*Conn]
147200
errors ErrorFunc
148-
responseMsgCache *cache.Cache[string, []byte]
201+
responseMsgCache MessageCache
149202
msgIDMutex *MutexMap
150203

151204
tokenHandlerContainer *coapSync.Map[uint64, HandlerFunc]
@@ -192,6 +245,7 @@ type ConnOptions struct {
192245
createBlockWise func(cc *Conn) *blockwise.BlockWise[*Conn]
193246
inactivityMonitor InactivityMonitor
194247
requestMonitor RequestMonitorFunc
248+
responseMsgCache MessageCache
195249
}
196250

197251
type Option = func(opts *ConnOptions)
@@ -220,6 +274,23 @@ func WithRequestMonitor(requestMonitor RequestMonitorFunc) Option {
220274
}
221275
}
222276

277+
// WithResponseMessageCache sets the cache used for response messages. All
278+
// response messages are submitted to the cache, but it is up to the cache
279+
// implementation to determine which messages are stored and for how long.
280+
// Caching responses enables sending the same Acknowledgment for retransmitted
281+
// confirmable messages within an EXCHANGE_LIFETIME. It may be desirable to
282+
// relax this behavior in some scenarios.
283+
// https://datatracker.ietf.org/doc/html/rfc7252#section-4.5
284+
// The default response message cache stores all responses with an expiration of
285+
// 247 seconds, which is EXCHANGE_LIFETIME when using default CoAP transmission
286+
// parameters.
287+
// https://datatracker.ietf.org/doc/html/rfc7252#section-4.8.2
288+
func WithResponseMessageCache(cache MessageCache) Option {
289+
return func(opts *ConnOptions) {
290+
opts.responseMsgCache = cache
291+
}
292+
}
293+
223294
func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
224295
if cfg.Errors == nil {
225296
cfg.Errors = func(error) {
@@ -248,6 +319,10 @@ func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
248319
for _, o := range opts {
249320
o(&cfgOpts)
250321
}
322+
// Only construct cache if one was not set via options.
323+
if cfgOpts.responseMsgCache == nil {
324+
cfgOpts.responseMsgCache = newMessageCache()
325+
}
251326
cc := Conn{
252327
session: session,
253328
transmission: &Transmission{
@@ -262,7 +337,7 @@ func NewConnWithOpts(session Session, cfg *Config, opts ...Option) *Conn {
262337
processReceivedMessage: cfg.ProcessReceivedMessage,
263338
errors: cfg.Errors,
264339
msgIDMutex: NewMutexMap(),
265-
responseMsgCache: cache.NewCache[string, []byte](),
340+
responseMsgCache: cfgOpts.responseMsgCache,
266341
inactivityMonitor: cfgOpts.inactivityMonitor,
267342
requestMonitor: cfgOpts.requestMonitor,
268343
messagePool: cfg.MessagePool,
@@ -609,34 +684,14 @@ func (cc *Conn) Sequence() uint64 {
609684
return cc.sequence.Add(1)
610685
}
611686

612-
func (cc *Conn) responseMsgCacheID(msgID int32) string {
613-
return fmt.Sprintf("resp-%v-%d", cc.RemoteAddr(), msgID)
687+
// getResponseFromCache gets a message from the response message cache.
688+
func (cc *Conn) getResponseFromCache(mid int32, resp *pool.Message) (bool, error) {
689+
return cc.responseMsgCache.Load(strconv.Itoa(int(mid)), resp)
614690
}
615691

692+
// addResponseToCache adds a message to the response message cache.
616693
func (cc *Conn) addResponseToCache(resp *pool.Message) error {
617-
marshaledResp, err := resp.MarshalWithEncoder(coder.DefaultCoder)
618-
if err != nil {
619-
return err
620-
}
621-
cacheMsg := make([]byte, len(marshaledResp))
622-
copy(cacheMsg, marshaledResp)
623-
cc.responseMsgCache.LoadOrStore(cc.responseMsgCacheID(resp.MessageID()), cache.NewElement(cacheMsg, time.Now().Add(ExchangeLifetime), nil))
624-
return nil
625-
}
626-
627-
func (cc *Conn) getResponseFromCache(mid int32, resp *pool.Message) (bool, error) {
628-
cachedResp := cc.responseMsgCache.Load(cc.responseMsgCacheID(mid))
629-
if cachedResp == nil {
630-
return false, nil
631-
}
632-
if rawMsg := cachedResp.Data(); len(rawMsg) > 0 {
633-
_, err := resp.UnmarshalWithDecoder(coder.DefaultCoder, rawMsg)
634-
if err != nil {
635-
return false, err
636-
}
637-
return true, nil
638-
}
639-
return false, nil
694+
return cc.responseMsgCache.Store(strconv.Itoa(int(resp.MessageID())), resp)
640695
}
641696

642697
// checkMyMessageID compare client msgID against peer messageID and if it is near < 0xffff/4 then increase msgID.

0 commit comments

Comments
 (0)