diff --git a/go.mod b/go.mod index 8dd9623..c6c3609 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ go 1.12 require ( github.com/Azure/azure-amqp-common-go/v3 v3.1.0 github.com/Azure/azure-sdk-for-go v51.1.0+incompatible - github.com/Azure/go-amqp v0.13.7 + github.com/Azure/go-amqp v0.13.8 github.com/Azure/go-autorest/autorest v0.11.18 github.com/Azure/go-autorest/autorest/adal v0.9.13 github.com/Azure/go-autorest/autorest/date v0.3.0 diff --git a/go.sum b/go.sum index 9306fe6..4c16f86 100644 --- a/go.sum +++ b/go.sum @@ -3,11 +3,10 @@ github.com/Azure/azure-amqp-common-go/v3 v3.1.0/go.mod h1:PBIGdzcO1teYoufTKMcGib github.com/Azure/azure-sdk-for-go v51.1.0+incompatible h1:7uk6GWtUqKg6weLv2dbKnzwb0ml1Qn70AdtRccZ543w= github.com/Azure/azure-sdk-for-go v51.1.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-amqp v0.13.0/go.mod h1:qj+o8xPCz9tMSbQ83Vp8boHahuRDl5mkNHyt1xlxUTs= -github.com/Azure/go-amqp v0.13.7 h1:ukcCtx138ZmOfHbdALuh9yoJhGtOY3+yaKApfzNvhSk= -github.com/Azure/go-amqp v0.13.7/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI= +github.com/Azure/go-amqp v0.13.8 h1:EGDxD/Iyzs65DX0h5Tc8k9czVBjJ4lmZ16E8aQD/d7Y= +github.com/Azure/go-amqp v0.13.8/go.mod h1:wbpCKA8tR5MLgRyIu+bb+S6ECdIDdYJ0NlpFE9xsBPI= github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs= github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24= -github.com/Azure/go-autorest/autorest v0.11.3 h1:fyYnmYujkIXUgv88D9/Wo2ybE4Zwd/TmQd5sSI5u2Ws= github.com/Azure/go-autorest/autorest v0.11.3/go.mod h1:JFgpikqFJ/MleTTxwepExTKnFUKKszPS8UavbQYUMuw= github.com/Azure/go-autorest/autorest v0.11.18 h1:90Y4srNYrwOtAgVo3ndrQkTYn6kf1Eg/AjTFJ8Is2aM= github.com/Azure/go-autorest/autorest v0.11.18/go.mod h1:dSiJPy22c3u0OtOKDNttNgqpNFY/GeWa7GH/Pz56QRA= @@ -16,7 +15,6 @@ github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZ github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= -github.com/Azure/go-autorest/autorest/mocks v0.4.0 h1:z20OWOSG5aCye0HEkDp6TPmP17ZcfeMxPi6HnSALa8c= github.com/Azure/go-autorest/autorest/mocks v0.4.0/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk= github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= @@ -24,7 +22,6 @@ github.com/Azure/go-autorest/autorest/to v0.4.0 h1:oXVqrxakqqV1UZdSazDOPOLvOIz+X github.com/Azure/go-autorest/autorest/to v0.4.0/go.mod h1:fE8iZBn7LQR7zH/9XU2NcPR4o9jEImooCeWJcYV/zLE= github.com/Azure/go-autorest/autorest/validation v0.3.1 h1:AgyqjAd94fwNAoTjl/WQXg4VvFeRFpO+UhNyRXqF1ac= github.com/Azure/go-autorest/autorest/validation v0.3.1/go.mod h1:yhLgjC0Wda5DYXl6JAsWyUe4KVNffhoDhG0zVzUMo3E= -github.com/Azure/go-autorest/logger v0.2.0 h1:e4RVHVZKC5p6UANLJHkM4OfR1UKZPj8Wt8Pcx+3oqrE= github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= @@ -35,7 +32,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/devigned/tab v0.1.1 h1:3mD6Kb1mUOYeLpJvTVSDwSg5ZsfSxfvxGRTxRsJsITA= github.com/devigned/tab v0.1.1/go.mod h1:XG9mPq0dFghrYvoBF3xdRrJzSTX1b7IQrvaL9mzjeJY= -github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= @@ -100,7 +96,6 @@ github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de h1:ikNHVSjEfnvz6sxdSPCaPt572qowuyMDMJLLm3Db3ig= golang.org/x/crypto v0.0.0-20200728195943-123391ffb6de/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 h1:hb9wdF1z5waM+dSIICn1l0DkLVDT3hqhhQsDNUmHPRE= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= diff --git a/queue_test.go b/queue_test.go index b0d57d1..896ce49 100644 --- a/queue_test.go +++ b/queue_test.go @@ -508,7 +508,6 @@ func testMessageProperties(ctx context.Context, t *testing.T, q *Queue) { HandlerFunc(func(ctx context.Context, msg *Message) error { sp := msg.SystemProperties assert.NotNil(t, sp.LockedUntil, "LockedUntil") - assert.NotNil(t, sp.EnqueuedSequenceNumber, "EnqueuedSequenceNumber") assert.NotNil(t, sp.EnqueuedTime, "EnqueuedTime") assert.NotNil(t, sp.SequenceNumber, "SequenceNumber") assert.NotNil(t, sp.PartitionID, "PartitionID") diff --git a/receiver.go b/receiver.go index 4ba5487..d3503ef 100644 --- a/receiver.go +++ b/receiver.go @@ -24,6 +24,8 @@ package servicebus import ( "context" + "errors" + "fmt" "sync" "time" @@ -33,6 +35,8 @@ import ( "github.com/devigned/tab" ) +const sessionFilterName = "com.microsoft:session-filter" + type ( // Receiver provides connection, session and link handling for a receiving to an entity path Receiver struct { @@ -408,8 +412,9 @@ func (r *Receiver) newSessionAndLink(ctx context.Context) error { opts = append(opts, amqp.LinkSenderSettle(amqp.ModeSettled)) } - if opt, ok := r.getSessionFilterLinkOption(); ok { - opts = append(opts, opt) + sessionOpt, useSessionOpt := r.getSessionFilterLinkOption() + if useSessionOpt { + opts = append(opts, sessionOpt) } amqpReceiver, err := amqpSession.NewReceiver(opts...) @@ -417,13 +422,22 @@ func (r *Receiver) newSessionAndLink(ctx context.Context) error { tab.For(ctx).Error(err) return err } - r.receiver = amqpReceiver + if useSessionOpt { + rawsid := r.receiver.LinkSourceFilterValue(sessionFilterName) + if rawsid == nil && r.sessionID == nil { + return errors.New("failed to create a receiver. no unlocked sessions available") + } else if rawsid != nil && r.sessionID != nil && rawsid != *r.sessionID { + return fmt.Errorf("failed to create a receiver for session %s, it may be locked by another receiver", rawsid) + } else if r.sessionID == nil { + sid := rawsid.(string) + r.sessionID = &sid + } + } return nil } func (r *Receiver) getSessionFilterLinkOption() (amqp.LinkOption, bool) { - const name = "com.microsoft:session-filter" const code = uint64(0x00000137000000C) if !r.useSessions { @@ -431,10 +445,10 @@ func (r *Receiver) getSessionFilterLinkOption() (amqp.LinkOption, bool) { } if r.sessionID == nil { - return amqp.LinkSourceFilter(name, code, nil), true + return amqp.LinkSourceFilter(sessionFilterName, code, nil), true } - return amqp.LinkSourceFilter(name, code, r.sessionID), true + return amqp.LinkSourceFilter(sessionFilterName, code, r.sessionID), true } func messageID(msg *amqp.Message) interface{} { diff --git a/session.go b/session.go index e74dbfb..8f6f535 100644 --- a/session.go +++ b/session.go @@ -336,6 +336,10 @@ func (qs *QueueSession) ensureReceiver(ctx context.Context) error { } qs.receiver = r + if qs.sessionID == nil { + // propagate the acquired session ID from the receiver + qs.sessionID = qs.receiver.sessionID + } return nil } @@ -474,6 +478,10 @@ func (ss *SubscriptionSession) ensureReceiver(ctx context.Context) error { } ss.receiver = r + if ss.sessionID == nil { + // propagate the acquired session ID from the receiver + ss.sessionID = ss.receiver.sessionID + } return nil }