1
1
package nats
2
2
3
3
import (
4
+ "encoding/json"
4
5
"log"
5
6
"strings"
6
7
"sync"
@@ -25,8 +26,8 @@ type Options struct {
25
26
26
27
// Nats will implement Nats subscribe and publish functionality
27
28
type Nats struct {
28
- ec * nats.EncodedConn
29
- wg * sync.WaitGroup
29
+ conn * nats.Conn
30
+ wg * sync.WaitGroup
30
31
}
31
32
32
33
// New - constructor
@@ -57,55 +58,66 @@ func New(opts Options) (broker.Handler, error) {
57
58
return nil , ErrConnect (err )
58
59
}
59
60
60
- ec , err := nats .NewEncodedConn (nc , nats .JSON_ENCODER )
61
- if err != nil {
62
- return nil , ErrEncodedConn (err )
63
- }
64
-
65
- return & Nats {ec : ec }, nil
61
+ return & Nats {conn : nc , wg : & sync.WaitGroup {}}, nil
66
62
}
63
+
67
64
func (n * Nats ) ConnectedEndpoints () (endpoints []string ) {
68
- for _ , server := range n .ec . Conn .Servers () {
65
+ for _ , server := range n .conn .Servers () {
69
66
endpoints = append (endpoints , strings .TrimPrefix (server , "nats://" ))
70
67
}
71
68
return
72
69
}
73
70
74
71
func (n * Nats ) Info () string {
75
- if n .ec == nil || n . ec . Conn == nil {
72
+ if n .conn == nil {
76
73
return broker .NotConnected
77
74
}
78
- return n .ec . Conn .Opts .Name
75
+ return n .conn .Opts .Name
79
76
}
80
77
81
78
func (n * Nats ) CloseConnection () {
82
- n .ec .Close ()
79
+ if n .conn != nil {
80
+ if err := n .conn .Drain (); err != nil {
81
+ log .Printf ("nats: drain error: %v" , err )
82
+ }
83
+ n .conn .Close ()
84
+ }
83
85
}
84
86
87
+
85
88
// Publish - to publish messages
86
89
func (n * Nats ) Publish (subject string , message * broker.Message ) error {
87
- err := n . ec . Publish ( subject , message )
90
+ b , err := json . Marshal ( message )
88
91
if err != nil {
89
92
return ErrPublish (err )
90
93
}
91
- return nil
94
+ return n . conn . Publish ( subject , b )
92
95
}
93
96
94
97
// PublishWithChannel - to publish messages with channel
95
98
func (n * Nats ) PublishWithChannel (subject string , msgch chan * broker.Message ) error {
96
- err := n .ec .BindSendChan (subject , msgch )
97
- if err != nil {
98
- return ErrPublish (err )
99
- }
100
- return nil
99
+ go func () {
100
+ for msg := range msgch {
101
+ b , err := json .Marshal (msg )
102
+ if err != nil {
103
+ log .Printf ("nats: JSON marshal error: %v" , err )
104
+ continue
105
+ }
106
+ if err := n .conn .Publish (subject , b ); err != nil {
107
+ log .Printf ("nats: publish error for subject %s: %v" , subject , err )
108
+ }
109
+ }
110
+ }()
111
+ return nil
101
112
}
102
113
114
+
103
115
// Subscribe - for subscribing messages
104
116
// TODO Ques: Do we want to unsubscribe
105
117
// TODO will the method-user just subsribe, how will it handle the received messages?
106
118
func (n * Nats ) Subscribe (subject , queue string , message []byte ) error {
107
119
n .wg .Add (1 )
108
- _ , err := n .ec .QueueSubscribe (subject , queue , func (msg * nats.Msg ) {
120
+ _ , err := n .conn .QueueSubscribe (subject , queue , func (msg * nats.Msg ) {
109
121
message = msg .Data
110
122
n .wg .Done ()
111
123
})
@@ -119,7 +131,12 @@ func (n *Nats) Subscribe(subject, queue string, message []byte) error {
119
131
120
132
// SubscribeWithChannel will publish all the messages received to the given channel
121
133
func (n * Nats ) SubscribeWithChannel (subject , queue string , msgch chan * broker.Message ) error {
122
- _ , err := n .ec .BindRecvQueueChan (subject , queue , msgch )
134
+ _ , err := n .conn .QueueSubscribe (subject , queue , func (m * nats.Msg ) {
135
+ var msg broker.Message
136
+ if err := json .Unmarshal (m .Data , & msg ); err == nil {
137
+ msgch <- & msg
138
+ }
139
+ })
123
140
if err != nil {
124
141
return ErrQueueSubscribe (err )
125
142
}
@@ -153,8 +170,5 @@ func (in *Nats) DeepCopyObject() broker.Handler {
153
170
// Check if the connection object is empty
154
171
func (in * Nats ) IsEmpty () bool {
155
172
empty := & Nats {}
156
- if in == nil || * in == * empty {
157
- return true
158
- }
159
- return false
173
+ return in == nil || * in == * empty
160
174
}
0 commit comments