forked from rabbitmq/amqp091-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdelivery.go
173 lines (141 loc) · 5.84 KB
/
delivery.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// Copyright (c) 2021 VMware, Inc. or its affiliates. All Rights Reserved.
// Copyright (c) 2012-2021, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package amqp091
import (
"errors"
"time"
)
var errDeliveryNotInitialized = errors.New("delivery not initialized")
// Acknowledger notifies the server of successful or failed consumption of
// deliveries via identifier found in the Delivery.DeliveryTag field.
//
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple bool, requeue bool) error
Reject(tag uint64, requeue bool) error
}
// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
Acknowledger Acknowledger // the channel from which this delivery arrived
Headers Table // Application or header exchange table
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation identifier
ReplyTo string // application use - address to reply to (ex: RPC)
Expiration string // implementation use - message expiration spec
MessageId string // application use - message identifier
Timestamp time.Time // application use - message timestamp
Type string // application use - message type name
UserId string // application use - creating user - should be authenticated user
AppId string // application use - creating application id
// Valid only with Channel.Consume
ConsumerTag string
// Valid only with Channel.Get
MessageCount uint32
DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exchange
RoutingKey string // basic.publish routing key
Body []byte
}
func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()
delivery := Delivery{
Acknowledger: channel,
Headers: props.Headers,
ContentType: props.ContentType,
ContentEncoding: props.ContentEncoding,
DeliveryMode: props.DeliveryMode,
Priority: props.Priority,
CorrelationId: props.CorrelationId,
ReplyTo: props.ReplyTo,
Expiration: props.Expiration,
MessageId: props.MessageId,
Timestamp: props.Timestamp,
Type: props.Type,
UserId: props.UserId,
AppId: props.AppId,
Body: body,
}
// Properties for the delivery types
switch m := msg.(type) {
case *basicDeliver:
delivery.ConsumerTag = m.ConsumerTag
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
case *basicGetOk:
delivery.MessageCount = m.MessageCount
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
}
return &delivery
}
/*
Ack delegates an acknowledgement through the Acknowledger interface that the
client or server has finished work on a delivery.
All deliveries in AMQP must be acknowledged. If you called Channel.Consume
with autoAck true then the server will be automatically ack each message and
this method should not be called. Otherwise, you must call Delivery.Ack after
you have successfully processed this delivery.
When multiple is true, this delivery and all prior unacknowledged deliveries
on the same channel will be acknowledged. This is useful for batch processing
of deliveries.
An error will indicate that the acknowledge could not be delivered to the
channel it was sent from.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Ack(multiple bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Ack(d.DeliveryTag, multiple)
}
/*
Reject delegates a negatively acknowledgement through the Acknowledger interface.
When requeue is true, queue this message to be delivered to a consumer on a
different channel. When requeue is false or the server is unable to queue this
message, it will be dropped.
If you are batch processing deliveries, and your server supports it, prefer
Delivery.Nack.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Reject(requeue bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Reject(d.DeliveryTag, requeue)
}
/*
Nack negatively acknowledge the delivery of message(s) identified by the
delivery tag from either the client or server.
When multiple is true, nack messages up to and including delivered messages up
until the delivery tag delivered on the same channel.
When requeue is true, request the server to deliver this message to a different
consumer. If it is not possible or requeue is false, the message will be
dropped or delivered to a server configured dead-letter queue.
This method must not be used to select or requeue messages the client wishes
not to handle, rather it is to inform the server that the client is incapable
of handling this message at this time.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (d Delivery) Nack(multiple, requeue bool) error {
if d.Acknowledger == nil {
return errDeliveryNotInitialized
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
}