Skip to content

Commit f0a2d66

Browse files
committed
init commit
1 parent 1fb57e1 commit f0a2d66

File tree

4 files changed

+692
-12
lines changed

4 files changed

+692
-12
lines changed

.gitignore

+4-12
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,4 @@
1-
# Binaries for programs and plugins
2-
*.exe
3-
*.exe~
4-
*.dll
5-
*.so
6-
*.dylib
7-
8-
# Test binary, build with `go test -c`
9-
*.test
10-
11-
# Output of the go coverage tool, specifically when used with LiteIDE
12-
*.out
1+
vendor/
2+
lepus
3+
.DS_Store
4+
glide.*

README.md

+68
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
# lepus
2+
3+
[![GoDoc](https://godoc.org/github.com/edadeal/lepus?status.svg)](https://godoc.org/github.com/edadeal/lepus)
4+
[![Go Report Card](https://goreportcard.com/badge/github.com/edadeal/lepus)](https://goreportcard.com/report/github.com/edadeal/lepus)
5+
6+
Simple wrapper around [streadway/amqp](github.com/streadway/amqp) with syncronous functions.
7+
8+
## Installation
9+
10+
Install:
11+
12+
```shell
13+
go get -u github.com/edadeal/lepus
14+
```
15+
16+
Import:
17+
18+
```go
19+
import "github.com/edadeal/lepus"
20+
```
21+
22+
## Quickstart
23+
24+
```go
25+
func main() {
26+
conn, err := amqp.Dial("amqp://lepus:lepus@127.0.0.1:5672/lepus")
27+
if err != nil {
28+
log.Fatal(err)
29+
}
30+
31+
defer conn.Close()
32+
33+
ch, err := lepus.SyncChannel(conn.Channel())
34+
if err != nil {
35+
t.Fatal(err)
36+
}
37+
38+
_, err = ch.QueueDeclare(
39+
"test", // name
40+
true, // durable
41+
false, // delete when unused
42+
false, // exclusive
43+
false, // no-wait
44+
nil, // arguments
45+
)
46+
if err != nil {
47+
t.Fatal(err)
48+
}
49+
50+
state, err := ch.PublishAndWait(
51+
"", // exchange
52+
"test", // routing key
53+
true, // mandatory
54+
false,
55+
amqp.Publishing{
56+
DeliveryMode: amqp.Persistent,
57+
ContentType: "text/plain",
58+
Body: []byte("Hello, lepus!"),
59+
},
60+
)
61+
62+
if err != nil {
63+
t.Fatal(err)
64+
}
65+
66+
log.Printf("Published: %t", state == lepus.StatePublished)
67+
}
68+
```

lepus.go

+215
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
package lepus
2+
3+
import (
4+
"errors"
5+
"strconv"
6+
"sync"
7+
"sync/atomic"
8+
"time"
9+
10+
"github.com/streadway/amqp"
11+
)
12+
13+
// State indicates publishing state of message
14+
type State int32
15+
16+
// states
17+
const (
18+
StateUnknown State = iota
19+
StatePublished
20+
StateReturned
21+
StateTimeout
22+
StateClosed
23+
)
24+
25+
type info struct {
26+
mu sync.Mutex
27+
state int32
28+
err error
29+
}
30+
31+
// Channel is a wrapper around async AMQP channel
32+
type Channel struct {
33+
*amqp.Channel
34+
35+
messageCount uint64
36+
midPrefix string
37+
38+
pubc chan amqp.Confirmation
39+
retc chan amqp.Return
40+
closc chan *amqp.Error
41+
42+
sm map[string]*info
43+
44+
timeout time.Duration
45+
}
46+
47+
// SyncChannel returns channel wrapper
48+
func SyncChannel(ch *amqp.Channel, err error) (*Channel, error) {
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
if err := ch.Confirm(false); err != nil {
54+
return nil, err
55+
}
56+
57+
c := &Channel{
58+
Channel: ch,
59+
midPrefix: "lepus-" + strconv.Itoa(int(time.Now().Unix())),
60+
sm: make(map[string]*info),
61+
timeout: 2 * time.Second,
62+
}
63+
64+
c.pubc = ch.NotifyPublish(make(chan amqp.Confirmation))
65+
go func() {
66+
for pub := range c.pubc {
67+
smkey := "DeliveryTag-" + strconv.Itoa(int(pub.DeliveryTag))
68+
if inf, ok := c.sm[smkey]; ok {
69+
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StatePublished))
70+
if swapped {
71+
inf.mu.Unlock()
72+
}
73+
}
74+
}
75+
}()
76+
77+
c.retc = ch.NotifyReturn(make(chan amqp.Return))
78+
go func() {
79+
for ret := range c.retc {
80+
smkey := ret.MessageId + ret.CorrelationId
81+
if inf, ok := c.sm[smkey]; ok {
82+
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateReturned))
83+
if swapped {
84+
inf.mu.Unlock()
85+
}
86+
}
87+
}
88+
}()
89+
90+
c.closc = ch.NotifyClose(make(chan *amqp.Error))
91+
go func() {
92+
err := <-c.closc
93+
for _, inf := range c.sm {
94+
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateClosed))
95+
if swapped {
96+
inf.err = err
97+
inf.mu.Unlock()
98+
}
99+
}
100+
}()
101+
102+
return c, nil
103+
}
104+
105+
// WithTimeout sets publish wait timeout
106+
func (c *Channel) WithTimeout(d time.Duration) {
107+
c.timeout = d
108+
}
109+
110+
// PublishAndWait sends message to queue and waits for response
111+
func (c *Channel) PublishAndWait(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) (State, error) {
112+
mid := atomic.AddUint64(&c.messageCount, 1)
113+
if msg.MessageId == "" {
114+
msg.MessageId = c.midPrefix + "-" + strconv.Itoa(int(mid))
115+
}
116+
117+
if msg.Timestamp.IsZero() {
118+
msg.Timestamp = time.Now()
119+
}
120+
121+
if msg.CorrelationId == "" {
122+
msg.CorrelationId = strconv.Itoa(int(mid))
123+
}
124+
125+
inf := &info{
126+
state: int32(StateUnknown),
127+
mu: sync.Mutex{},
128+
}
129+
inf.mu.Lock()
130+
131+
mkey := msg.MessageId + msg.CorrelationId
132+
tkey := "DeliveryTag-" + strconv.Itoa(int(mid))
133+
134+
c.sm[mkey] = inf
135+
c.sm[tkey] = inf
136+
137+
defer func() {
138+
delete(c.sm, mkey)
139+
delete(c.sm, tkey)
140+
}()
141+
142+
err := c.Publish(exchange, key, mandatory, immediate, msg)
143+
if err != nil {
144+
return StateUnknown, err
145+
}
146+
147+
go func() {
148+
timer := time.NewTimer(c.timeout)
149+
defer timer.Stop()
150+
151+
<-timer.C
152+
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateTimeout))
153+
if swapped {
154+
inf.err = errors.New("message publishing timeout reached")
155+
inf.mu.Unlock()
156+
}
157+
}()
158+
159+
inf.mu.Lock()
160+
return State(inf.state), inf.err
161+
}
162+
163+
// ConsumeMessages returns chan of wrapped messages from queue
164+
func (c *Channel) ConsumeMessages(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan Delivery, error) {
165+
d, err := c.Consume(queue, consumer, autoAck, exclusive, noLocal, noWait, args)
166+
if err != nil {
167+
return nil, err
168+
}
169+
170+
wd := make(chan Delivery)
171+
go func() {
172+
for msg := range d {
173+
msg.Acknowledger = c
174+
wd <- Delivery{msg}
175+
}
176+
}()
177+
178+
return wd, nil
179+
}
180+
181+
// Delivery is a superset of amqp.Delivery
182+
type Delivery struct {
183+
amqp.Delivery
184+
}
185+
186+
// NackDelayed nacks message without requeue and publishes it again
187+
// without modification back to tail of queue
188+
func (d *Delivery) NackDelayed(multiple, mandatory, immediate bool) (State, error) {
189+
ch, ok := d.Acknowledger.(*Channel)
190+
if !ok {
191+
return StateUnknown, errors.New("Acknowledger is not of type *lepus.Channel")
192+
}
193+
194+
err := d.Nack(multiple, false)
195+
if err != nil {
196+
return StateUnknown, err
197+
}
198+
199+
return ch.PublishAndWait(d.Exchange, d.RoutingKey, mandatory, immediate, amqp.Publishing{
200+
Headers: d.Headers,
201+
ContentType: d.ContentType,
202+
ContentEncoding: d.ContentEncoding,
203+
DeliveryMode: d.DeliveryMode,
204+
Priority: d.Priority,
205+
CorrelationId: d.CorrelationId,
206+
ReplyTo: d.ReplyTo,
207+
Expiration: d.Expiration,
208+
MessageId: d.MessageId,
209+
Timestamp: d.Timestamp,
210+
Type: d.Type,
211+
UserId: d.UserId,
212+
AppId: d.AppId,
213+
Body: d.Body,
214+
})
215+
}

0 commit comments

Comments
 (0)