forked from kubemq-io/kubemq-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
common.go
140 lines (127 loc) · 3.81 KB
/
common.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package kubemq
import (
"context"
"encoding/json"
"fmt"
"github.com/kubemq-io/kubemq-go/common"
"github.com/kubemq-io/kubemq-go/pkg/uuid"
"time"
)
const requestChannel = "kubemq.cluster.internal.requests"
func CreateChannel(ctx context.Context, client *Client, clientId string, channel string, channelType string) error {
request := NewQuery().
SetChannel(requestChannel).
SetId(uuid.New()).
SetClientId(clientId).
SetMetadata("create-channel").
SetTags(map[string]string{
"channel_type": channelType,
"channel": channel,
"client_id": clientId,
}).
SetTimeout(time.Second * 10)
resp, err := client.SetQuery(request).Send(ctx)
if err != nil {
return fmt.Errorf("error sending create channel request: %s", err.Error())
}
if resp.Error != "" {
return fmt.Errorf("error creating channel: %s", resp.Error)
}
return nil
}
func DeleteChannel(ctx context.Context, client *Client, clientId string, channel string, channelType string) error {
request := NewQuery().
SetChannel(requestChannel).
SetId(uuid.New()).
SetClientId(clientId).
SetMetadata("delete-channel").
SetTags(map[string]string{
"channel_type": channelType,
"channel": channel,
"client_id": clientId,
}).
SetTimeout(time.Second * 10)
resp, err := client.SetQuery(request).Send(ctx)
if err != nil {
return fmt.Errorf("error sending delete channel request: %s", err.Error())
}
if resp.Error != "" {
return fmt.Errorf("error deleting channel: %s", resp.Error)
}
return nil
}
func listChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]byte, error) {
request := NewQuery().
SetChannel(requestChannel).
SetId(uuid.New()).
SetClientId(clientId).
SetMetadata("list-channels").
SetTags(map[string]string{
"channel_type": channelType,
"client_id": clientId,
"search": search,
}).
SetTimeout(time.Second * 10)
resp, err := client.SetQuery(request).Send(ctx)
if err != nil {
return nil, fmt.Errorf("error sending list channels request: %s", err.Error())
}
if resp.Error != "" {
return nil, fmt.Errorf("error listing channels: %s", resp.Error)
}
return resp.Body, nil
}
func ListQueuesChannels(ctx context.Context, client *Client, clientId string, search string) ([]*common.QueuesChannel, error) {
data, err := listChannels(ctx, client, clientId, "queues", search)
if err != nil {
return nil, err
}
return DecodeQueuesChannelList(data)
}
func ListPubSubChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]*common.PubSubChannel, error) {
data, err := listChannels(ctx, client, clientId, channelType, search)
if err != nil {
return nil, err
}
return DecodePubSubChannelList(data)
}
func ListCQChannels(ctx context.Context, client *Client, clientId string, channelType string, search string) ([]*common.CQChannel, error) {
data, err := listChannels(ctx, client, clientId, channelType, search)
if err != nil {
return nil, err
}
return DecodeCQChannelList(data)
}
func DecodePubSubChannelList(dataBytes []byte) ([]*common.PubSubChannel, error) {
var channelsData []*common.PubSubChannel
if dataBytes == nil {
return nil, nil
}
err := json.Unmarshal(dataBytes, &channelsData)
if err != nil {
return nil, err
}
return channelsData, nil
}
func DecodeQueuesChannelList(dataBytes []byte) ([]*common.QueuesChannel, error) {
var channelsData []*common.QueuesChannel
if dataBytes == nil {
return nil, nil
}
err := json.Unmarshal(dataBytes, &channelsData)
if err != nil {
return nil, err
}
return channelsData, nil
}
func DecodeCQChannelList(dataBytes []byte) ([]*common.CQChannel, error) {
var channelsData []*common.CQChannel
if dataBytes == nil {
return nil, nil
}
err := json.Unmarshal(dataBytes, &channelsData)
if err != nil {
return nil, err
}
return channelsData, nil
}