-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.go
186 lines (152 loc) · 4.15 KB
/
example.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package main
import (
"EnchanceSimulator/config"
"bytes"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/streadway/amqp"
"io"
"log"
"net/http"
)
func main() {
// Establish a connection to RabbitMQ
conn, ch := establishRabbitMQConnection()
defer func(conn *amqp.Connection) {
err := conn.Close()
if err != nil {
}
}(conn)
defer func(ch *amqp.Channel) {
err := ch.Close()
if err != nil {
}
}(ch)
//Load Global Config Values
appConfig := config.GetConfig()
inputQueueName := appConfig.InputQueu
outputQueueName := appConfig.OutputQueu
declareQueues(ch, inputQueueName, outputQueueName)
// Declare input and output queues
inputQueueName = "input_data_queue"
outputQueueName = "output_data_queue"
declareQueues(ch, inputQueueName, outputQueueName)
// Make an HTTP POST request and get the response bytes
responseBytes, err := makeHTTPPostRequest()
if err != nil {
log.Println("Error:", err)
return
}
// Convert the response bytes to hex (optional)
hexData := hex.EncodeToString(responseBytes)
// Publish the hex data to the input queue
publishToInputQueue(ch, inputQueueName, hexData)
log.Println("Data sent successfully.")
// Consume the output queue
consumeOutputQueue(ch, outputQueueName)
}
// Function to establish a connection to RabbitMQ
func establishRabbitMQConnection() (*amqp.Connection, *amqp.Channel) {
conn, err := amqp.Dial("amqp:guest:guest@localhost:5672/")
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
return conn, ch
}
// Function to declare input and output queues
func declareQueues(ch *amqp.Channel, inputQueueName, outputQueueName string) {
declareQueue(ch, inputQueueName)
declareQueue(ch, outputQueueName)
}
// Function to declare a queue
func declareQueue(ch *amqp.Channel, queueName string) {
_, err := ch.QueueDeclare(
queueName, //Queue name
false, //Durable
false, //Delete when unused
false, //Exclusive
false, //No-wait
nil, //Arguments
)
if err != nil {
log.Fatalf("Failed to declare the queue '%s': %s", queueName, err)
}
}
// Function to make an HTTP POST request
func makeHTTPPostRequest() ([]byte, error) {
payload := map[string]interface{}{
"keyType": 0,
"mainCategory": 10,
"subCategory": 3,
}
payloadJSON, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("Error encoding JSON: %v", err)
}
//Todo change to get all strings from hashcorpi vault
url := "secret/Trademarket/GetWorldMarketHotList"
client := &http.Client{}
req, err := http.NewRequest("POST", url, bytes.NewBuffer(payloadJSON))
if err != nil {
return nil, fmt.Errorf("Error creating request: %v", err)
}
req.Header.Set("User-Agent", "XXX")
req.Header.Set("Content-Type", "application/json")
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("Error making request: %v", err)
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
}
}(resp.Body)
responseBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("Error reading response: %v", err)
}
return responseBytes, nil
}
// Function to publish data to the input queue
func publishToInputQueue(ch *amqp.Channel, queueName, data string) {
err := ch.Publish(
"", // Exchange
queueName, // Routing key
false, // Mandatory
false, // Immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(data),
},
)
if err != nil {
log.Fatalf("Failed to publish message to '%s' queue: %s", queueName, err)
}
}
//Function to consume the output queue
func consumeOutputQueue(ch *amqp.Channel, queueName string) {
messages, err := ch.Consume(
queueName, //Queue name
"", // Consumer tag
false, // Auto-acknowledgement set to false
false, // Exclusive
false, // No-local
false, //No-wait
nil, //Arguments
)
if err != nil {
log.Fatalf("Failed to consume messages from '%s' queue: %s", queueName, err)
}
for msg := range messages {
log.Println("Received output:", string(msg.Body))
err := msg.Ack(false)
if err != nil {
return
}
}
}