Skip to content

Commit 3510a41

Browse files
committed
refactor state change process, update tests
1 parent 218cee7 commit 3510a41

File tree

2 files changed

+59
-68
lines changed

2 files changed

+59
-68
lines changed

lepus.go

+29-48
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ const (
2323
)
2424

2525
type info struct {
26-
mu sync.Mutex
2726
state int32
2827
err error
2928
}
@@ -35,10 +34,6 @@ type Channel struct {
3534
messageCount uint64
3635
midPrefix string
3736

38-
pubc chan amqp.Confirmation
39-
retc chan amqp.Return
40-
closc chan *amqp.Error
41-
4237
sm sync.Map
4338

4439
timeout time.Duration
@@ -60,43 +55,36 @@ func SyncChannel(ch *amqp.Channel, err error) (*Channel, error) {
6055
timeout: 2 * time.Second,
6156
}
6257

63-
c.pubc = ch.NotifyPublish(make(chan amqp.Confirmation))
6458
go func() {
65-
for pub := range c.pubc {
59+
pubc := ch.NotifyPublish(make(chan amqp.Confirmation))
60+
for pub := range pubc {
6661
smkey := "DeliveryTag-" + strconv.Itoa(int(pub.DeliveryTag))
67-
if iinf, ok := c.sm.Load(smkey); ok {
68-
inf := iinf.(*info)
69-
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StatePublished))
70-
if swapped {
71-
inf.mu.Unlock()
62+
if v, ok := c.sm.Load(smkey); ok {
63+
v.(chan info) <- info{
64+
state: int32(StatePublished),
7265
}
7366
}
7467
}
7568
}()
7669

77-
c.retc = ch.NotifyReturn(make(chan amqp.Return))
7870
go func() {
79-
for ret := range c.retc {
71+
retc := ch.NotifyReturn(make(chan amqp.Return))
72+
for ret := range retc {
8073
smkey := ret.MessageId + ret.CorrelationId
81-
if iinf, ok := c.sm.Load(smkey); ok {
82-
inf := iinf.(*info)
83-
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateReturned))
84-
if swapped {
85-
inf.mu.Unlock()
74+
if v, ok := c.sm.Load(smkey); ok {
75+
v.(chan info) <- info{
76+
state: int32(StateReturned),
8677
}
8778
}
8879
}
8980
}()
9081

91-
c.closc = ch.NotifyClose(make(chan *amqp.Error))
9282
go func() {
93-
err := <-c.closc
94-
c.sm.Range(func(key, value interface{}) bool {
95-
inf := value.(*info)
96-
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateClosed))
97-
if swapped {
98-
inf.err = err
99-
inf.mu.Unlock()
83+
err := <-ch.NotifyClose(make(chan *amqp.Error))
84+
c.sm.Range(func(k, v interface{}) bool {
85+
v.(chan info) <- info{
86+
state: int32(StateClosed),
87+
err: err,
10088
}
10189
return true
10290
})
@@ -125,17 +113,13 @@ func (c *Channel) PublishAndWait(exchange, key string, mandatory, immediate bool
125113
msg.CorrelationId = strconv.Itoa(int(mid))
126114
}
127115

128-
inf := &info{
129-
state: int32(StateUnknown),
130-
mu: sync.Mutex{},
131-
}
132-
inf.mu.Lock()
133-
134116
mkey := msg.MessageId + msg.CorrelationId
135117
tkey := "DeliveryTag-" + strconv.Itoa(int(mid))
136118

137-
c.sm.Store(mkey, inf)
138-
c.sm.Store(tkey, inf)
119+
sch := make(chan info)
120+
121+
c.sm.Store(mkey, sch)
122+
c.sm.Store(tkey, sch)
139123

140124
defer func() {
141125
c.sm.Delete(mkey)
@@ -147,20 +131,17 @@ func (c *Channel) PublishAndWait(exchange, key string, mandatory, immediate bool
147131
return StateUnknown, err
148132
}
149133

150-
go func() {
151-
timer := time.NewTimer(c.timeout)
152-
defer timer.Stop()
153-
154-
<-timer.C
155-
swapped := atomic.CompareAndSwapInt32(&inf.state, int32(StateUnknown), int32(StateTimeout))
156-
if swapped {
157-
inf.err = errors.New("message publishing timeout reached")
158-
inf.mu.Unlock()
159-
}
160-
}()
134+
timer := time.NewTimer(c.timeout)
135+
defer timer.Stop()
161136

162-
inf.mu.Lock()
163-
return State(inf.state), inf.err
137+
for {
138+
select {
139+
case <-timer.C:
140+
return StateTimeout, errors.New("message publishing timeout reached")
141+
case inf := <-sch:
142+
return State(inf.state), inf.err
143+
}
144+
}
164145
}
165146

166147
// ConsumeMessages returns chan of wrapped messages from queue

lepus_test.go

+30-20
Original file line numberDiff line numberDiff line change
@@ -112,28 +112,38 @@ func TestPublishAndWaitMultiple(t *testing.T) {
112112
t.Fatal(err)
113113
}
114114

115-
for i := 0; i < 10; i++ {
116-
state, err := ch.PublishAndWait(
117-
"", // exchange
118-
"test", // routing key
119-
true, // mandatory
120-
false,
121-
amqp.Publishing{
122-
DeliveryMode: amqp.Persistent,
123-
ContentType: "text/plain",
124-
Body: []byte("Hello, lepus!"),
125-
},
126-
)
115+
wg := &sync.WaitGroup{}
127116

128-
if err != nil {
129-
t.Fatal(err)
130-
}
117+
for i := 0; i < 10; i++ {
118+
wg.Add(1)
119+
120+
go func() {
121+
defer wg.Done()
122+
123+
state, err := ch.PublishAndWait(
124+
"", // exchange
125+
"test", // routing key
126+
true, // mandatory
127+
false,
128+
amqp.Publishing{
129+
DeliveryMode: amqp.Persistent,
130+
ContentType: "text/plain",
131+
Body: []byte("Hello, lepus!"),
132+
},
133+
)
134+
135+
if err != nil {
136+
t.Fatal(err)
137+
}
131138

132-
expected := StatePublished
133-
if state != expected {
134-
t.Fatalf("Expecting state to be %v got %v", expected, state)
135-
}
139+
expected := StatePublished
140+
if state != expected {
141+
t.Fatalf("Expecting state to be %v got %v", expected, state)
142+
}
143+
}()
136144
}
145+
146+
wg.Wait()
137147
}
138148

139149
func TestConsumeMessages(t *testing.T) {
@@ -167,7 +177,7 @@ func TestConsumeMessages(t *testing.T) {
167177
}
168178

169179
messagesCount := 10
170-
wg := sync.WaitGroup{}
180+
wg := &sync.WaitGroup{}
171181
wg.Add(1)
172182

173183
go func() {

0 commit comments

Comments
 (0)