Skip to content

Commit bf3023c

Browse files
committed
CBG-1196 make message.flags an atomic
1 parent 13a798c commit bf3023c

File tree

5 files changed

+41
-29
lines changed

5 files changed

+41
-29
lines changed

example/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/couchbase/go-blip/example
22

3-
go 1.18
3+
go 1.23
44

55
require (
66
github.com/couchbase/go-blip v0.0.0-00010101000000-000000000000

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/couchbase/go-blip
22

3-
go 1.17
3+
go 1.23
44

55
require (
66
github.com/klauspost/compress v1.15.11

message.go

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,19 @@ import (
1919
"log"
2020
"runtime/debug"
2121
"sync"
22+
"sync/atomic"
2223
)
2324

2425
type MessageNumber uint32
2526

2627
// A BLIP message. It could be a request or response or error, and it could be from me or the peer.
2728
type Message struct {
28-
Outgoing bool // Is this a message created locally?
29-
Sender *Sender // The connection that sent this message.
30-
Properties Properties // The message's metadata, similar to HTTP headers.
31-
body []byte // The message body. MIME type is defined by "Content-Type" property
32-
number MessageNumber // The sequence number of the message in the connection.
33-
flags frameFlags // Message flags as seen on the first frame.
29+
Outgoing bool // Is this a message created locally?
30+
Sender *Sender // The connection that sent this message.
31+
Properties Properties // The message's metadata, similar to HTTP headers.
32+
body []byte // The message body. MIME type is defined by "Content-Type" property
33+
number MessageNumber // The sequence number of the message in the connection.
34+
flags atomic.Pointer[frameFlags] // Message flags as seen on the first frame.
3435
bytesSent uint64
3536
bytesAcked uint64
3637

@@ -56,7 +57,7 @@ func (message *Message) Close() (err error) {
5657

5758
// Returns a string describing the message for debugging purposes
5859
func (message *Message) String() string {
59-
return frameString(message.number, message.flags)
60+
return frameString(message.number, *message.flags.Load())
6061
}
6162

6263
func frameString(number MessageNumber, flags frameFlags) string {
@@ -72,12 +73,13 @@ func frameString(number MessageNumber, flags frameFlags) string {
7273

7374
// Creates a new outgoing request.
7475
func NewRequest() *Message {
75-
return &Message{
76-
flags: frameFlags(RequestType),
76+
m := &Message{
7777
Outgoing: true,
7878
Properties: Properties{},
7979
cond: sync.NewCond(&sync.Mutex{}),
8080
}
81+
m.flags.Store(ptr(frameFlags(RequestType)))
82+
return m
8183
}
8284

8385
// The order in which a request message was sent.
@@ -90,17 +92,17 @@ func (message *Message) SerialNumber() MessageNumber {
9092
}
9193

9294
// The type of message: request, response or error
93-
func (message *Message) Type() MessageType { return MessageType(message.flags.messageType()) }
95+
func (message *Message) Type() MessageType { return MessageType(message.flags.Load().messageType()) }
9496

9597
// True if the message has Urgent priority.
96-
func (message *Message) Urgent() bool { return message.flags&kUrgent != 0 }
98+
func (message *Message) Urgent() bool { return *message.flags.Load()&kUrgent != 0 }
9799

98100
// True if the message doesn't want a reply.
99-
func (message *Message) NoReply() bool { return message.flags&kNoReply != 0 }
101+
func (message *Message) NoReply() bool { return *message.flags.Load()&kNoReply != 0 }
100102

101103
// True if the message's body was GZIP-compressed in transit.
102104
// (This is for informative purposes only; you don't need to unzip it yourself!)
103-
func (message *Message) Compressed() bool { return message.flags&kCompressed != 0 }
105+
func (message *Message) Compressed() bool { return *message.flags.Load()&kCompressed != 0 }
104106

105107
// Marks an outgoing message as having high priority. Urgent messages get a higher amount of
106108
// bandwidth. This is useful for streaming media.
@@ -125,11 +127,13 @@ func (request *Message) SetNoReply(noReply bool) {
125127

126128
func (message *Message) setFlag(flag frameFlags, value bool) {
127129
message.assertMutable()
130+
flags := *message.flags.Load()
128131
if value {
129-
message.flags |= flag
132+
flags |= flag
130133
} else {
131-
message.flags &^= flag
134+
flags &^= flag
132135
}
136+
message.flags.Store(&flags)
133137
}
134138

135139
func (message *Message) assertMutable() {
@@ -228,7 +232,7 @@ func (m *Message) SetJSONBodyAsBytes(jsonBytes []byte) {
228232
// Multiple calls return the same object.
229233
// If called on a NoReply request, this returns nil.
230234
func (request *Message) Response() *Message {
231-
if request.flags&kNoReply != 0 {
235+
if *request.flags.Load()&kNoReply != 0 {
232236
return nil
233237
}
234238
if request.Type() != RequestType {
@@ -257,7 +261,8 @@ func (request *Message) Response() *Message {
257261
return request.response
258262
}
259263
response := request.createResponse()
260-
response.flags |= request.flags & kUrgent
264+
newFlags := *response.flags.Load() | *request.flags.Load()&kUrgent
265+
response.flags.Store(&newFlags)
261266
response.Properties = Properties{}
262267
request.response = response
263268
return response
@@ -271,7 +276,8 @@ func (response *Message) SetError(errDomain string, errCode int, message string)
271276
if response.Type() == RequestType {
272277
panic("Can't call SetError on a request")
273278
}
274-
response.flags = (response.flags &^ kTypeMask) | frameFlags(ErrorType)
279+
newFlags := *response.flags.Load()&^kTypeMask | frameFlags(ErrorType)
280+
response.flags.Store(&newFlags)
275281
response.Properties = Properties{
276282
"Error-Domain": errDomain,
277283
"Error-Code": fmt.Sprintf("%d", errCode),
@@ -285,13 +291,14 @@ func (response *Message) SetError(errDomain string, errCode int, message string)
285291
//////// INTERNALS:
286292

287293
func newIncomingMessage(sender *Sender, number MessageNumber, flags frameFlags, reader io.ReadCloser) *Message {
288-
return &Message{
294+
m := &Message{
289295
Sender: sender,
290-
flags: flags | kMoreComing,
291296
number: number,
292297
reader: reader,
293298
cond: sync.NewCond(&sync.Mutex{}),
294299
}
300+
m.flags.Store(ptr(flags | kMoreComing))
301+
return m
295302
}
296303

297304
// Creates an incoming message given properties and body; exposed only for testing.
@@ -309,16 +316,17 @@ func NewParsedIncomingMessage(sender *Sender, msgType MessageType, properties Pr
309316
}
310317

311318
func (request *Message) createResponse() *Message {
319+
flags := frameFlags(ResponseType) | (*request.flags.Load() & kUrgent)
312320
response := &Message{
313-
flags: frameFlags(ResponseType) | (request.flags & kUrgent),
314321
number: request.number,
315322
Outgoing: !request.Outgoing,
316323
inResponseTo: request,
317324
cond: sync.NewCond(&sync.Mutex{}),
318325
}
319326
if !response.Outgoing {
320-
response.flags |= kMoreComing
327+
flags |= kMoreComing
321328
}
329+
response.flags.Store(&flags)
322330
return response
323331
}
324332

@@ -409,7 +417,7 @@ func (m *Message) nextFrameToSend(maxSize int) ([]byte, frameFlags) {
409417
}
410418

411419
frame := make([]byte, maxSize)
412-
flags := m.flags
420+
flags := *m.flags.Load()
413421
size, err := io.ReadFull(m.encoder, frame)
414422
if err == nil {
415423
flags |= kMoreComing
@@ -421,3 +429,7 @@ func (m *Message) nextFrameToSend(maxSize int) ([]byte, frameFlags) {
421429

422430
// A callback function that takes a message and returns nothing
423431
type MessageCallback func(*Message)
432+
433+
func ptr[T any](v T) *T {
434+
return &v
435+
}

message_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func TestMessageEncoding(t *testing.T) {
4040
assert.Equal(t, "\x25Content-Type\x00ham/rye\x00X-Weather\x00rainy\x00The white knight is sliding down the poker. He balances very badly.", string(serialized))
4141
t.Logf("Encoded as %d bytes", len(serialized))
4242

43-
m2 := newIncomingMessage(nil, 1, m.flags, nil)
43+
m2 := newIncomingMessage(nil, 1, *m.flags.Load(), nil)
4444
reader := bytes.NewReader(serialized)
4545
err = m2.ReadFrom(reader)
4646
assert.Equal(t, nil, err)
@@ -63,7 +63,7 @@ func TestMessageEncodingCompressed(t *testing.T) {
6363
// goassert.Equals(t, string(serialized), "\x1a\x04\x00ham/rye\x00X-Weather\x00rainy\x00\x1f\x8b\b\x00\x00\tn\x88\x00\xff\f\xca\xd1\t\xc5 \f\x05\xd0U\xee\x04\xce\xf1\x06x\v\xd8z\xd1`\x88ń\x8a\xdb\xd7\xcf\x03\xe7߈\xd5$\x88nR[@\x1c\xaeR\xc4*\xcaX\x868\xe1\x19\x9d3\xe1G\\Y\xb3\xddt\xbc\x9c\xfb\xa8\xe8N_\x00\x00\x00\xff\xffs*\xa1\xa6C\x00\x00\x00")
6464
// log.Printf("Encoded compressed as %d bytes", len(serialized))
6565

66-
m2 := newIncomingMessage(nil, 1, m.flags, nil)
66+
m2 := newIncomingMessage(nil, 1, *m.flags.Load(), nil)
6767
reader := bytes.NewReader(serialized)
6868
err = m2.ReadFrom(reader)
6969
assert.Equal(t, nil, err)
@@ -103,7 +103,7 @@ func TestMessageDecoding(t *testing.T) {
103103
writer.Close()
104104
}()
105105

106-
incoming := newIncomingMessage(nil, original.number, original.flags, reader)
106+
incoming := newIncomingMessage(nil, original.number, *original.flags.Load(), reader)
107107
err := incoming.readProperties()
108108
assert.Equal(t, nil, err)
109109
assert.Equal(t, original.Properties, incoming.Properties)

receiver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ func (r *receiver) getPendingResponse(requestNumber MessageNumber, flags frameFl
291291
msgStream = r.pendingResponses[requestNumber]
292292
if msgStream != nil {
293293
if msgStream.bytesWritten == 0 {
294-
msgStream.message.flags = flags // set flags based on 1st frame of response
294+
msgStream.message.flags.Store(&flags) // set flags based on 1st frame of response
295295
}
296296
if complete {
297297
delete(r.pendingResponses, requestNumber)

0 commit comments

Comments
 (0)