Skip to content

Commit 58b63c3

Browse files
committed
Bump go-amqp to v0.17.0 so this lib can be compatible with eventhub lib
1 parent a14e023 commit 58b63c3

File tree

6 files changed

+50
-32
lines changed

6 files changed

+50
-32
lines changed

batch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (mb *MessageBatch) Add(m *Message) (bool, error) {
118118
}
119119

120120
if mb.SessionID != nil {
121-
msg.Properties.GroupID = *mb.SessionID
121+
msg.Properties.GroupID = mb.SessionID
122122
}
123123

124124
bin, err := msg.MarshalBinary()

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ module github.com/Azure/azure-service-bus-go
44
go 1.12
55

66
require (
7-
github.com/Azure/azure-amqp-common-go/v3 v3.2.1
7+
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
88
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible
9-
github.com/Azure/go-amqp v0.16.4
9+
github.com/Azure/go-amqp v0.17.0
1010
github.com/Azure/go-autorest/autorest v0.11.18
1111
github.com/Azure/go-autorest/autorest/adal v0.9.13
1212
github.com/Azure/go-autorest/autorest/date v0.3.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
github.com/Azure/azure-amqp-common-go/v3 v3.2.1 h1:uQyDk81yn5hTP1pW4Za+zHzy97/f4vDz9o1d/exI4j4=
22
github.com/Azure/azure-amqp-common-go/v3 v3.2.1/go.mod h1:O6X1iYHP7s2x7NjUKsXVhkwWrQhxrd+d8/3rRadj4CI=
3+
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 h1:uDF62mbd9bypXWi19V1bN5NZEO84JqgmI5G73ibAmrk=
4+
github.com/Azure/azure-amqp-common-go/v3 v3.2.3/go.mod h1:7rPmbSfszeovxGfc5fSAXE4ehlXQZHpMja2OtxC2Tas=
35
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w=
46
github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
57
github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
68
github.com/Azure/go-amqp v0.16.4 h1:/1oIXrq5zwXLHaoYDliJyiFjJSpJZMWGgtMX9e0/Z30=
79
github.com/Azure/go-amqp v0.16.4/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
10+
github.com/Azure/go-amqp v0.17.0 h1:HHXa3149nKrI0IZwyM7DRcRy5810t9ZICDutn4BYzj4=
11+
github.com/Azure/go-amqp v0.17.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg=
812
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
913
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
1014
github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM=

message.go

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -233,14 +233,14 @@ func (m *Message) Defer(ctx context.Context) error {
233233
}
234234

235235
// Release will notify Azure Service Bus the message should be re-queued without failure.
236-
//func (m *Message) Release() DispositionAction {
236+
// func (m *Message) Release() DispositionAction {
237237
// return func(ctx context.Context) {
238238
// span, _ := m.startSpanFromContext(ctx, "sb.Message.Release")
239239
// defer span.Finish()
240240
//
241241
// m.message.Release()
242242
// }
243-
//}
243+
// }
244244

245245
// DeadLetter will notify Azure Service Bus the message failed and should not re-queued
246246
func (m *Message) DeadLetter(ctx context.Context, err error) error {
@@ -318,6 +318,20 @@ func (m *Message) GetKeyValues() map[string]interface{} {
318318
return m.UserProperties
319319
}
320320

321+
func strDeref(s *string) string {
322+
if s != nil {
323+
return *s
324+
}
325+
return ""
326+
}
327+
328+
func strPtr(s string) *string {
329+
if s == "" {
330+
return nil
331+
}
332+
return &s
333+
}
334+
321335
func sendMgmtDisposition(ctx context.Context, m *Message, state disposition) error {
322336
ctx, span := startConsumerSpanFromContext(ctx, "sb.sendMgmtDisposition")
323337
defer span.End()
@@ -349,19 +363,19 @@ func (m *Message) toMsg() (*amqp.Message, error) {
349363
}
350364

351365
if m.SessionID != nil {
352-
amqpMsg.Properties.GroupID = *m.SessionID
366+
amqpMsg.Properties.GroupID = m.SessionID
353367
}
354368

355369
if m.GroupSequence != nil {
356-
amqpMsg.Properties.GroupSequence = *m.GroupSequence
370+
amqpMsg.Properties.GroupSequence = m.GroupSequence
357371
}
358372

359373
amqpMsg.Properties.CorrelationID = m.CorrelationID
360-
amqpMsg.Properties.ContentType = m.ContentType
361-
amqpMsg.Properties.Subject = m.Label
362-
amqpMsg.Properties.To = m.To
363-
amqpMsg.Properties.ReplyTo = m.ReplyTo
364-
amqpMsg.Properties.ReplyToGroupID = m.ReplyToGroupID
374+
amqpMsg.Properties.ContentType = strPtr(m.ContentType)
375+
amqpMsg.Properties.Subject = strPtr(m.Label)
376+
amqpMsg.Properties.To = strPtr(m.To)
377+
amqpMsg.Properties.ReplyTo = strPtr(m.ReplyTo)
378+
amqpMsg.Properties.ReplyToGroupID = strPtr(m.ReplyToGroupID)
365379

366380
if len(m.UserProperties) > 0 {
367381
amqpMsg.ApplicationProperties = make(map[string]interface{})
@@ -422,16 +436,16 @@ func newMessage(data []byte, amqpMsg *amqp.Message, r *amqp.Receiver) (*Message,
422436
if id, ok := amqpMsg.Properties.MessageID.(string); ok {
423437
msg.ID = id
424438
}
425-
msg.SessionID = &amqpMsg.Properties.GroupID
426-
msg.GroupSequence = &amqpMsg.Properties.GroupSequence
439+
msg.SessionID = amqpMsg.Properties.GroupID
440+
msg.GroupSequence = amqpMsg.Properties.GroupSequence
427441
if id, ok := amqpMsg.Properties.CorrelationID.(string); ok {
428442
msg.CorrelationID = id
429443
}
430-
msg.ContentType = amqpMsg.Properties.ContentType
431-
msg.Label = amqpMsg.Properties.Subject
432-
msg.To = amqpMsg.Properties.To
433-
msg.ReplyTo = amqpMsg.Properties.ReplyTo
434-
msg.ReplyToGroupID = amqpMsg.Properties.ReplyToGroupID
444+
msg.ContentType = strDeref(amqpMsg.Properties.ContentType)
445+
msg.Label = strDeref(amqpMsg.Properties.Subject)
446+
msg.To = strDeref(amqpMsg.Properties.To)
447+
msg.ReplyTo = strDeref(amqpMsg.Properties.ReplyTo)
448+
msg.ReplyToGroupID = strDeref(amqpMsg.Properties.ReplyToGroupID)
435449
if amqpMsg.Header != nil {
436450
msg.DeliveryCount = amqpMsg.Header.DeliveryCount + 1
437451
msg.TTL = &amqpMsg.Header.TTL

message_session.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (ms *MessageSession) SetState(ctx context.Context, state []byte) error {
143143
"operation": "com.microsoft:set-session-state",
144144
},
145145
Properties: &amqp.MessageProperties{
146-
GroupID: *ms.SessionID(),
146+
GroupID: ms.SessionID(),
147147
},
148148
Value: map[string]interface{}{
149149
"session-id": ms.SessionID(),

message_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -135,22 +135,22 @@ func (suite *serviceBusSuite) TestAMQPMessageToMessage() {
135135
d := 30 * time.Second
136136
until := time.Now().Add(d)
137137
pID := int16(12)
138-
138+
n := uint32(1)
139139
aMsg := &amqp.Message{
140140
DeliveryTag: dotNetEncodedLockTokenGUID,
141141
Properties: &amqp.MessageProperties{
142142
MessageID: "messageID",
143-
To: "to",
144-
Subject: "subject",
145-
ReplyTo: "replyTo",
146-
ReplyToGroupID: "replyToGroupID",
147-
CorrelationID: "correlationID",
148-
ContentType: "contentType",
149-
ContentEncoding: "contentEncoding",
150-
AbsoluteExpiryTime: until,
151-
CreationTime: until,
152-
GroupID: "groupID",
153-
GroupSequence: uint32(1),
143+
To: strPtr("to"),
144+
Subject: strPtr("subject"),
145+
ReplyTo: strPtr("replyTo"),
146+
ReplyToGroupID: strPtr("replyToGroupID"),
147+
CorrelationID: strPtr("correlationID"),
148+
ContentType: strPtr("contentType"),
149+
ContentEncoding: strPtr("contentEncoding"),
150+
AbsoluteExpiryTime: &until,
151+
CreationTime: &until,
152+
GroupID: strPtr("groupID"),
153+
GroupSequence: &n,
154154
},
155155
Annotations: amqp.Annotations{
156156
"x-opt-locked-until": until,

0 commit comments

Comments
 (0)