Skip to content

Commit b8184d4

Browse files
authored
Merge pull request #1 from violetpay-org/refactor/structure
Refactor/structure
2 parents 106abe9 + 5e1829b commit b8184d4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+976
-818
lines changed

config/config.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package queuemanagerconfig
2+
3+
import (
4+
"github.com/IBM/sarama"
5+
"time"
6+
)
7+
8+
type RedisOpts func(*RedisConfig)
9+
type RedisConfig struct {
10+
QueueName string
11+
Retry int
12+
TTL int
13+
Addrs []string
14+
}
15+
16+
func SetRedisQueueName(queueName string) RedisOpts {
17+
return func(c *RedisConfig) {
18+
c.QueueName = queueName
19+
}
20+
}
21+
22+
func AddRedisBroker(broker string) RedisOpts {
23+
return func(c *RedisConfig) {
24+
c.Addrs = append(c.Addrs, broker)
25+
}
26+
}
27+
28+
func AddRedisBrokers(brokers []string) RedisOpts {
29+
return func(c *RedisConfig) {
30+
c.Addrs = append(c.Addrs, brokers...)
31+
}
32+
}
33+
34+
func AddRedisTTL(ttl int) RedisOpts {
35+
return func(c *RedisConfig) {
36+
c.TTL = ttl
37+
}
38+
}
39+
40+
func AddRedisRetry(retry int) RedisOpts {
41+
return func(c *RedisConfig) {
42+
c.Retry = retry
43+
}
44+
}
45+
46+
func NewRedisConfig(opts ...RedisOpts) *RedisConfig {
47+
conf := RedisConfig{
48+
QueueName: "",
49+
Retry: 0,
50+
TTL: 0,
51+
Addrs: []string{},
52+
}
53+
54+
for _, opt := range opts {
55+
opt(&conf)
56+
}
57+
58+
return &conf
59+
}
60+
61+
var (
62+
TestKafkaBrokers = []string{}
63+
TestKafkaQueueName = "test_queue_2"
64+
)
65+
66+
type KafkaOpts func(*KafkaConfig)
67+
type KafkaConfig struct {
68+
Conf *sarama.Config
69+
Brokers []string
70+
Topic string
71+
}
72+
73+
func AddKafkaBroker(broker string) KafkaOpts {
74+
return func(c *KafkaConfig) {
75+
c.Brokers = append(c.Brokers, broker)
76+
}
77+
}
78+
79+
func AddKafkaBrokers(brokers []string) KafkaOpts {
80+
return func(c *KafkaConfig) {
81+
c.Brokers = append(c.Brokers, brokers...)
82+
}
83+
}
84+
85+
func SetKafkaTopic(topic string) KafkaOpts {
86+
return func(c *KafkaConfig) {
87+
c.Topic = topic
88+
}
89+
}
90+
91+
func NewKafkaConfig(opts ...KafkaOpts) *KafkaConfig {
92+
conf := KafkaConfig{
93+
Conf: sarama.NewConfig(),
94+
Brokers: []string{"kafka.vp-datacenter-1.violetpay.net:9092", "kafka.vp-datacenter-1.violetpay.net:9093"},
95+
}
96+
97+
for _, opt := range opts {
98+
opt(&conf)
99+
}
100+
101+
conf.Conf.Producer.Partitioner = sarama.NewRoundRobinPartitioner
102+
conf.Conf.Producer.Return.Successes = true
103+
conf.Conf.Producer.Flush.Frequency = 500 * time.Millisecond
104+
conf.Conf.Producer.RequiredAcks = sarama.WaitForAll
105+
106+
// conf.Conf.Producer.Flush.Bytes = 1024 * 1024 * 10 // 10 MB
107+
return &conf
108+
}
Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
package qmanservices
1+
package queuemanagerconfig
22

33
import (
4-
qmanErr "github.com/violetpay-org/point3-quman/errors"
4+
"github.com/violetpay-org/queuemanager/internal/queueerror"
55
)
66

77
// QueueName is an enumeration of the different queues that the service can use.
@@ -11,11 +11,11 @@ type QueueName int
1111

1212
var (
1313
// map of int to string
14-
queue_names map[int]string = make(map[int]string)
14+
queueNames map[int]string = make(map[int]string)
1515
)
1616

1717
func (q QueueName) String() string {
18-
name, _ := queue_names[int(q)]
18+
name, _ := queueNames[int(q)]
1919
return name
2020
}
2121

@@ -24,10 +24,10 @@ func (q QueueName) GetQueueName() string {
2424
}
2525

2626
func RegisterQueueName(name string, index int) (QueueName, error) {
27-
if _, ok := queue_names[index]; !ok {
28-
queue_names[index] = name
27+
if _, ok := queueNames[index]; !ok {
28+
queueNames[index] = name
2929
return QueueName(index), nil
3030
}
3131

32-
return QueueName(0), qmanErr.ErrDuplicateQueueName()
32+
return QueueName(0), queueerror.ErrDuplicateQueueName()
3333
}

point3-quman.go renamed to factory.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
package quman
1+
package queuemanager
22

33
import (
44
"context"
5+
"github.com/violetpay-org/queuemanager/config"
6+
"github.com/violetpay-org/queuemanager/internal/queueerror"
7+
innerqueue "github.com/violetpay-org/queuemanager/queue"
58
"sync"
6-
7-
qmanErr "github.com/violetpay-org/point3-quman/errors"
8-
qmanservices "github.com/violetpay-org/point3-quman/services"
99
)
1010

1111
// Main package for Quman (Queue Manager) service
@@ -23,44 +23,44 @@ func NewQueueFactory(wg *sync.WaitGroup, ctx *context.Context) *MainQueueFactory
2323
}
2424
}
2525

26-
func (f *MainQueueFactory) RegisterQueueName(name string, index int) (qmanservices.QueueName, error) {
27-
return qmanservices.RegisterQueueName(name, index)
26+
func (f *MainQueueFactory) RegisterQueueName(name string, index int) (queuemanagerconfig.QueueName, error) {
27+
return queuemanagerconfig.RegisterQueueName(name, index)
2828
}
2929

3030
func (f *MainQueueFactory) GetWaitGroup() (*sync.WaitGroup, error) {
3131
if f.wg == nil {
32-
return nil, qmanErr.ErrQueueFactoryWaitGroupNil()
32+
return nil, queueerror.ErrQueueFactoryWaitGroupNil()
3333
}
3434

3535
return f.wg, nil
3636
}
3737

38-
func (f *MainQueueFactory) GetQueueService(queueName qmanservices.QueueName) (qmanservices.IQueueService, error) {
38+
func (f *MainQueueFactory) GetQueueService(queueName queuemanagerconfig.QueueName) (innerqueue.Service, error) {
3939
queue := f.queueSet[queueName]
4040

4141
if queue == nil {
42-
return nil, qmanErr.ErrQueueNotFound(queueName.GetQueueName())
42+
return nil, queueerror.ErrQueueNotFound(queueName.GetQueueName())
4343
}
4444

4545
return queue.QueueService, nil
4646
}
4747

48-
func (f *MainQueueFactory) RunQueue(queueName qmanservices.QueueName) (*sync.WaitGroup, error) {
48+
func (f *MainQueueFactory) RunQueue(queueName queuemanagerconfig.QueueName) (*sync.WaitGroup, error) {
4949
if f.wg == nil {
50-
return nil, qmanErr.ErrQueueFactoryWaitGroupNil()
50+
return nil, queueerror.ErrQueueFactoryWaitGroupNil()
5151
}
5252

5353
queue := f.queueSet[queueName]
5454

5555
if queue == nil {
56-
return f.wg, qmanErr.ErrQueueNotFound(queueName.GetQueueName())
56+
return f.wg, queueerror.ErrQueueNotFound(queueName.GetQueueName())
5757
}
5858

5959
queueOperator := queue.LowLevelQueueOperator
6060
queueCallbacks := queue.ConsumeCallback
6161

6262
if (queueOperator == nil) || (queueCallbacks == nil) {
63-
return f.wg, qmanErr.ErrQueueNotPrepared(queueName.GetQueueName())
63+
return f.wg, queueerror.ErrQueueNotPrepared(queueName.GetQueueName())
6464
}
6565

6666
err := queueOperator.StartQueue(
@@ -72,32 +72,32 @@ func (f *MainQueueFactory) RunQueue(queueName qmanservices.QueueName) (*sync.Wai
7272
return f.wg, err
7373
}
7474

75-
func (f *MainQueueFactory) StopQueue(queueName qmanservices.QueueName) (*sync.WaitGroup, error) {
75+
func (f *MainQueueFactory) StopQueue(queueName queuemanagerconfig.QueueName) (*sync.WaitGroup, error) {
7676
if f.wg == nil {
77-
return nil, qmanErr.ErrQueueFactoryWaitGroupNil()
77+
return nil, queueerror.ErrQueueFactoryWaitGroupNil()
7878
}
7979

8080
queue := f.queueSet[queueName]
8181

8282
if queue == nil {
83-
return f.wg, qmanErr.ErrQueueNotFound(queueName.GetQueueName())
83+
return f.wg, queueerror.ErrQueueNotFound(queueName.GetQueueName())
8484
}
8585

8686
queueOperator := queue.LowLevelQueueOperator
8787
queueCallbacks := queue.StopCallback
8888

8989
if queueOperator == nil || queueCallbacks == nil {
90-
return f.wg, qmanErr.ErrQueueNotPrepared(queueName.GetQueueName())
90+
return f.wg, queueerror.ErrQueueNotPrepared(queueName.GetQueueName())
9191
}
9292

9393
err := queueOperator.StopQueue(queueCallbacks)
9494

9595
return f.wg, err
9696
}
9797

98-
func (f *MainQueueFactory) AddQueue(queueName qmanservices.QueueName, queue *Queue) error {
98+
func (f *MainQueueFactory) AddQueue(queueName queuemanagerconfig.QueueName, queue *Queue) error {
9999
if f.queueSet[queueName] != nil {
100-
return qmanErr.ErrDuplicateQueue()
100+
return queueerror.ErrDuplicateQueue()
101101
}
102102

103103
f.queueSet[queueName] = queue
@@ -108,7 +108,7 @@ func (f *MainQueueFactory) AddAllQueues(queueSet QueueSet) error {
108108
// Check for duplicates
109109
for queueName := range queueSet {
110110
if f.queueSet[queueName] != nil {
111-
return qmanErr.ErrDuplicateQueue()
111+
return queueerror.ErrDuplicateQueue()
112112
}
113113
}
114114

@@ -120,7 +120,7 @@ func (f *MainQueueFactory) AddAllQueues(queueSet QueueSet) error {
120120
return nil
121121
}
122122

123-
func (f *MainQueueFactory) UpsertQueue(queueName qmanservices.QueueName, queue *Queue) error {
123+
func (f *MainQueueFactory) UpsertQueue(queueName queuemanagerconfig.QueueName, queue *Queue) error {
124124

125125
_, err := f.GetQueueService(queueName)
126126

@@ -145,7 +145,7 @@ func (f *MainQueueFactory) UpsertQueue(queueName qmanservices.QueueName, queue *
145145

146146
func (f *MainQueueFactory) RunAllQueues() (*sync.WaitGroup, error) {
147147
var erroredQueue interface{}
148-
queuesExecuted := []qmanservices.QueueName{}
148+
queuesExecuted := []queuemanagerconfig.QueueName{}
149149

150150
for queueName := range f.queueSet {
151151
_, err := f.RunQueue(queueName)
@@ -163,7 +163,7 @@ func (f *MainQueueFactory) RunAllQueues() (*sync.WaitGroup, error) {
163163
_, _ = f.StopQueue(queueName)
164164
}
165165

166-
return f.wg, qmanErr.ErrQueueNotPrepared(erroredQueue.(qmanservices.QueueName).GetQueueName())
166+
return f.wg, queueerror.ErrQueueNotPrepared(erroredQueue.(queuemanagerconfig.QueueName).GetQueueName())
167167
}
168168

169169
return f.wg, nil

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module github.com/violetpay-org/point3-quman
1+
module github.com/violetpay-org/queuemanager
22

33
go 1.19
44

hub.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package queuemanager
2+
3+
import (
4+
"github.com/violetpay-org/queuemanager/config"
5+
"github.com/violetpay-org/queuemanager/internal/queue/kafka"
6+
"github.com/violetpay-org/queuemanager/internal/queue/redis"
7+
"github.com/violetpay-org/queuemanager/item"
8+
)
9+
10+
// NewRedisHub is a function that returns a new Hub.
11+
func NewRedisHub(
12+
messageSerializer queueitem.RedisSerializer,
13+
config *queuemanagerconfig.RedisConfig,
14+
logger func(string),
15+
) *redis.Hub {
16+
return redis.NewHub(
17+
messageSerializer,
18+
config,
19+
logger,
20+
)
21+
}
22+
23+
// NewKafkaHub is a function that returns a new Hub.
24+
// maxConsumerCount is the maximum number of consumers that can be created.
25+
func NewKafkaHub(
26+
maxConsumerCount int,
27+
messageSerializer queueitem.KafkaSerializer,
28+
publishOnly bool,
29+
config *queuemanagerconfig.KafkaConfig,
30+
logger func(string),
31+
) *kafka.Hub {
32+
return kafka.NewHub(
33+
maxConsumerCount,
34+
messageSerializer,
35+
publishOnly,
36+
config,
37+
logger,
38+
)
39+
}

0 commit comments

Comments
 (0)