Skip to content

Commit a03758d

Browse files
committedNov 27, 2023
feat(mq): add rabbitmq support
1 parent 9b760b5 commit a03758d

File tree

7 files changed

+483
-0
lines changed

7 files changed

+483
-0
lines changed
 

‎go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ require (
1515
github.com/parnurzeal/gorequest v0.2.16
1616
github.com/patrickmn/go-cache v2.1.0+incompatible
1717
github.com/pkg/errors v0.9.1
18+
github.com/rabbitmq/amqp091-go v1.9.0
1819
github.com/samber/lo v1.38.1
1920
github.com/satori/go.uuid v1.2.0
2021
github.com/sethvargo/go-envconfig v0.9.0
22+
github.com/silenceper/wechat/v2 v2.1.6
2123
github.com/soheilhy/cmux v0.1.5
2224
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.42.0
2325
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0

‎go.sum

Lines changed: 36 additions & 0 deletions
Large diffs are not rendered by default.

‎mq/consumer.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package mq
2+
3+
import (
4+
"fmt"
5+
amqp "github.com/rabbitmq/amqp091-go"
6+
)
7+
8+
type Consumer struct {
9+
Conn *amqp.Connection
10+
Channel *amqp.Channel
11+
Tag string
12+
Done chan error
13+
Deliveries <-chan amqp.Delivery
14+
}
15+
16+
func NewConsumer(amqpURI, key string) (*Consumer, error) {
17+
c := &Consumer{
18+
Conn: nil,
19+
Channel: nil,
20+
Tag: key,
21+
Done: make(chan error),
22+
}
23+
24+
var err error
25+
26+
c.Conn, c.Channel, err = initConnection(amqpURI)
27+
if err != nil {
28+
return nil, err
29+
}
30+
31+
err = declareDirect(c.Channel, exchangeName, key)
32+
if err != nil {
33+
return nil, err
34+
}
35+
36+
c.Deliveries, err = c.Channel.Consume(
37+
key, // name
38+
key, // consumerTag,
39+
false, // autoAck
40+
false, // exclusive
41+
false, // noLocal
42+
false, // noWait
43+
nil, // arguments
44+
)
45+
if err != nil {
46+
return nil, fmt.Errorf("queue Consume: %s", err)
47+
}
48+
49+
return c, nil
50+
}
51+
52+
func (c *Consumer) Shutdown() error {
53+
// will close() the Deliveries Channel
54+
if err := c.Channel.Cancel(c.Tag, true); err != nil {
55+
return fmt.Errorf("consumer cancel failed: %s", err)
56+
}
57+
58+
if err := c.Conn.Close(); err != nil {
59+
return fmt.Errorf("AMQP connection close error: %s", err)
60+
}
61+
62+
defer fmt.Printf("AMQP shutdown OK")
63+
64+
// wait for exit
65+
return <-c.Done
66+
}

‎mq/init.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package mq
2+
3+
import (
4+
"fmt"
5+
amqp "github.com/rabbitmq/amqp091-go"
6+
)
7+
8+
const (
9+
exchangeName = "notify"
10+
11+
WechatKey = "notify.wechat"
12+
DingTalkKey = "notify.dingtalk"
13+
14+
UserIdKey = "userId"
15+
)
16+
17+
func initConnection(amqpURI string) (conn *amqp.Connection, channel *amqp.Channel, err error) {
18+
config := amqp.Config{Properties: amqp.NewConnectionProperties()}
19+
config.Properties.SetClientConnectionName("sample-consumer")
20+
fmt.Printf("dialing %q", amqpURI)
21+
conn, err = amqp.DialConfig(amqpURI, config)
22+
if err != nil {
23+
return nil, nil, fmt.Errorf("dial: %s", err)
24+
}
25+
26+
go func() {
27+
fmt.Printf("closing: %s", <-conn.NotifyClose(make(chan *amqp.Error)))
28+
}()
29+
30+
fmt.Printf("got Connection, getting Channel")
31+
channel, err = conn.Channel()
32+
if err != nil {
33+
return nil, nil, fmt.Errorf("channel: %s", err)
34+
}
35+
36+
return conn, channel, nil
37+
}
38+
39+
func declareDirect(channel *amqp.Channel, exchange, key string) error {
40+
err := channel.ExchangeDeclare(
41+
exchange,
42+
"direct",
43+
true,
44+
false,
45+
false,
46+
false,
47+
amqp.Table{},
48+
)
49+
if err != nil {
50+
return err
51+
}
52+
53+
_, err = channel.QueueDeclare(key,
54+
true, false, false, false, nil)
55+
if err != nil {
56+
return err
57+
}
58+
59+
err = channel.QueueBind(key, key, exchange, false, nil)
60+
if err != nil {
61+
return err
62+
}
63+
64+
return nil
65+
}

‎mq/notice.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package mq
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"github.com/silenceper/wechat/v2/officialaccount/message"
7+
)
8+
9+
type NoticeTemplate struct {
10+
ctx context.Context
11+
StaffID []string `json:"StaffID"`
12+
WeChat *WeChatNoticeBody `json:"template,omitempty"`
13+
DingTalk *DingTalkNoticeBody `json:"dingtalk,omitempty"`
14+
}
15+
16+
func New() *NoticeTemplate {
17+
return &NoticeTemplate{}
18+
}
19+
20+
func (t *NoticeTemplate) WithContext(ctx context.Context) *NoticeTemplate {
21+
t.ctx = ctx
22+
return t
23+
}
24+
25+
func (t *NoticeTemplate) Receiver(staffId ...string) *NoticeTemplate {
26+
t.StaffID = append(t.StaffID, staffId...)
27+
return t
28+
}
29+
30+
func (t *NoticeTemplate) NewWeChat() *WeChatNoticeBody {
31+
t.WeChat = &WeChatNoticeBody{}
32+
return t.WeChat
33+
}
34+
35+
func (t *NoticeTemplate) NewDingTalk() *DingTalkNoticeBody {
36+
t.DingTalk = &DingTalkNoticeBody{}
37+
return t.DingTalk
38+
}
39+
40+
type WeChatNoticeBody struct {
41+
Data map[string]*message.TemplateDataItem `json:"data"`
42+
TemplateID string `json:"template_id"`
43+
ToUser string `json:"touser"`
44+
URL string `json:"url"`
45+
}
46+
47+
func (t *WeChatNoticeBody) SetTmpl(templateId string) *WeChatNoticeBody {
48+
t.TemplateID = templateId
49+
return t
50+
}
51+
52+
func (t *WeChatNoticeBody) SetData(key, value, color string) *WeChatNoticeBody {
53+
if t.Data == nil {
54+
t.Data = make(map[string]*message.TemplateDataItem)
55+
}
56+
t.Data[key] = &message.TemplateDataItem{Value: value, Color: color}
57+
return t
58+
}
59+
60+
func (t *WeChatNoticeBody) Url(url string) *WeChatNoticeBody {
61+
t.URL = url
62+
return t
63+
}
64+
65+
type DingTalkNoticeBody struct {
66+
MsgType string `json:"msgtype"`
67+
Text *DingTalkMsgPlainTextInput `json:"text,omitempty"`
68+
Link *DingTalkMsgLinkInput `json:"link,omitempty"`
69+
Markdown *DingTalkMsgMarkdownInput `json:"markdown,omitempty"`
70+
}
71+
72+
func (t *DingTalkNoticeBody) PlainText(content string) {
73+
t.MsgType = "text"
74+
t.Text = &DingTalkMsgPlainTextInput{
75+
Content: content,
76+
}
77+
}
78+
79+
func (t *DingTalkNoticeBody) LinkMsg(messageUrl, picUrl, title, text string) {
80+
t.MsgType = "link"
81+
t.Link = &DingTalkMsgLinkInput{
82+
MessageUrl: messageUrl,
83+
PicUrl: picUrl,
84+
Title: title,
85+
Text: text,
86+
}
87+
}
88+
89+
func (t *DingTalkNoticeBody) MarkdownMsg(title, text string) {
90+
t.MsgType = "markdown"
91+
t.Markdown = &DingTalkMsgMarkdownInput{
92+
Title: title,
93+
Text: text,
94+
}
95+
}
96+
97+
type DingTalkMsgPlainTextInput struct {
98+
Content string `json:"content"`
99+
}
100+
101+
type DingTalkMsgLinkInput struct {
102+
MessageUrl string `json:"messageUrl"`
103+
PicUrl string `json:"picUrl"`
104+
Title string `json:"title"`
105+
Text string `json:"text"`
106+
}
107+
108+
type DingTalkMsgMarkdownInput struct {
109+
Title string `json:"title"` // 这个是点进推送列表前显示的简短的消息(进入后不可见)
110+
Text string `json:"text"` // 这个是进入后显示的详细的消息
111+
}
112+
113+
type UniNotifyInput struct {
114+
Title string
115+
SubTitle string
116+
Content string
117+
Extra string
118+
}
119+
120+
type WechatBody struct {
121+
StaffId string `json:"staffId"`
122+
*WeChatNoticeBody
123+
MessageId string `json:"messageId"`
124+
}
125+
126+
func MarshalWechatBody(staffId string, msg *WeChatNoticeBody) (data []byte, err error) {
127+
body := &WechatBody{
128+
StaffId: staffId,
129+
WeChatNoticeBody: msg,
130+
}
131+
if data, err = json.Marshal(body); err != nil {
132+
return nil, err
133+
}
134+
return data, nil
135+
}
136+
137+
func UnmarshalWechatBody(data []byte) (*WechatBody, error) {
138+
body := &WechatBody{}
139+
if err := json.Unmarshal(data, body); err != nil {
140+
return nil, err
141+
}
142+
return body, nil
143+
}
144+
145+
type DingTalkBody struct {
146+
StaffId string `json:"staffId"`
147+
*DingTalkNoticeBody
148+
MessageId string `json:"messageId"`
149+
}
150+
151+
func MarshalDingTalkBody(staffId string, msg *DingTalkNoticeBody) (data []byte, err error) {
152+
body := &DingTalkBody{
153+
StaffId: staffId,
154+
DingTalkNoticeBody: msg,
155+
}
156+
if data, err = json.Marshal(body); err != nil {
157+
return nil, err
158+
}
159+
return data, nil
160+
}
161+
162+
func UnmarshalDingTalkBody(data []byte) (*DingTalkNoticeBody, error) {
163+
body := &DingTalkNoticeBody{}
164+
if err := json.Unmarshal(data, body); err != nil {
165+
return nil, err
166+
}
167+
return body, nil
168+
}

‎mq/producer.go

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package mq
2+
3+
import (
4+
"context"
5+
"crypto/md5"
6+
"fmt"
7+
amqp "github.com/rabbitmq/amqp091-go"
8+
)
9+
10+
type Producer struct {
11+
Conn *amqp.Connection
12+
Channel *amqp.Channel
13+
14+
appId string
15+
}
16+
17+
func NewProducer(appId string, amqpURI string) (*Producer, error) {
18+
if appId == "" {
19+
return nil, fmt.Errorf("app id is empty")
20+
}
21+
22+
p := &Producer{
23+
appId: appId,
24+
}
25+
26+
var err error
27+
28+
p.Conn, p.Channel, err = initConnection(amqpURI)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
return p, nil
34+
}
35+
36+
func (p *Producer) PublishNotice(ctx context.Context, data *NoticeTemplate, options ...string) error {
37+
38+
if data == nil {
39+
return fmt.Errorf("data is empty")
40+
}
41+
42+
opts := parseOptions(options)
43+
44+
if data.WeChat != nil {
45+
for _, staffId := range data.StaffID {
46+
if staffId == "" {
47+
continue
48+
}
49+
body, err := MarshalWechatBody(staffId, data.WeChat)
50+
if err != nil {
51+
return err
52+
}
53+
if err := p.publish(ctx, WechatKey, body, opts); err != nil {
54+
return err
55+
}
56+
}
57+
}
58+
59+
if data.DingTalk != nil {
60+
for _, staffId := range data.StaffID {
61+
if staffId == "" {
62+
continue
63+
}
64+
body, err := MarshalWechatBody(staffId, data.WeChat)
65+
if err != nil {
66+
return err
67+
}
68+
if err := p.publish(ctx, DingTalkKey, body, opts); err != nil {
69+
return err
70+
}
71+
}
72+
}
73+
74+
return nil
75+
}
76+
77+
func parseOptions(options []string) map[string]string {
78+
opts := make(map[string]string)
79+
for i := 0; i < len(options); i += 2 {
80+
if i+1 < len(options) {
81+
opts[options[i]] = options[i+1]
82+
}
83+
}
84+
return opts
85+
}
86+
87+
func (p *Producer) publish(ctx context.Context, key string, msg []byte, opts map[string]string) error {
88+
89+
err := p.Channel.PublishWithContext(
90+
ctx,
91+
exchangeName,
92+
key,
93+
false,
94+
false,
95+
amqp.Publishing{
96+
ContentType: "application/json",
97+
DeliveryMode: amqp.Persistent,
98+
Body: msg,
99+
AppId: p.appId,
100+
UserId: opts[UserIdKey],
101+
MessageId: fmt.Sprintf("%x", md5.Sum(msg)),
102+
})
103+
104+
return err
105+
106+
}
107+
108+
func (p *Producer) Shutdown() error {
109+
110+
if err := p.Conn.Close(); err != nil {
111+
return fmt.Errorf("AMQP connection close error: %s", err)
112+
}
113+
114+
defer fmt.Printf("AMQP shutdown OK")
115+
116+
return nil
117+
}

‎mq/producer_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package mq
2+
3+
import (
4+
"context"
5+
"testing"
6+
"time"
7+
)
8+
9+
func TestSendNotify(t *testing.T) {
10+
notice := New()
11+
notice.NewWeChat().
12+
SetTmpl("WZsoRBsLvbwRSk_2qr3oTkproxKbwbNjx4iwE7swJ6Y"). // 学校通知
13+
SetData("first", "通知标题", ""). // 标题
14+
SetData("keyword1", "通知大学", ""). // 学校
15+
SetData("keyword2", "通知人", ""). // 通知人
16+
SetData("keyword3", time.Now().Format("2006-01-02"), ""). // 通知时间
17+
SetData("keyword4", "通知内容", "") // 通知内容
18+
notice.Receiver("12345678")
19+
20+
p, err := NewProducer("sdk_test", "amqpURL")
21+
if err != nil {
22+
t.Error(err)
23+
}
24+
25+
err = p.PublishNotice(context.Background(), notice)
26+
if err != nil {
27+
t.Error(err)
28+
}
29+
}

0 commit comments

Comments
 (0)
Please sign in to comment.