forked from xmidt-org/kratos
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdecodeWorkers.go
114 lines (102 loc) · 2.9 KB
/
decodeWorkers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package kratos
import (
"sync"
"sync/atomic"
"github.com/go-kit/kit/log"
"github.com/goph/emperror"
"github.com/xmidt-org/webpa-common/logging"
"github.com/xmidt-org/webpa-common/semaphore"
"github.com/xmidt-org/wrp-go/v3"
)
const (
minWorkers = 1
minQueueSize = 1
)
// decoderSender is anything that decodes a message from bytes and then sends
// it downstream.
type decoderSender interface {
DecodeAndSend([]byte)
Close()
}
// decoderQueue implements an asynchronous decoderSender.
type decoderQueue struct {
incoming chan []byte
sender registryHandler
workers semaphore.Interface
wg sync.WaitGroup
logger log.Logger
once sync.Once
closed atomic.Value
}
// NewDecoderSender creates a new decoderQueue for decoding and sending
// messages.
func NewDecoderSender(sender registryHandler, maxWorkers int, queueSize int, logger log.Logger) *decoderQueue {
size := queueSize
if size < minQueueSize {
size = minQueueSize
}
numWorkers := maxWorkers
if numWorkers < minWorkers {
numWorkers = minWorkers
}
d := decoderQueue{
incoming: make(chan []byte, size),
sender: sender,
workers: semaphore.New(numWorkers),
logger: logger,
}
d.wg.Add(1)
go d.startParsing()
return &d
}
// DecodeAndSend places the message on the queue. This will block when the
// queue is full. This should not be called after Close().
func (d *decoderQueue) DecodeAndSend(msg []byte) {
switch d.closed.Load() {
case true:
logging.Error(d.logger).Log(logging.MessageKey(),
"Failed to queue message. DecoderQueue is no longer accepting messages.")
default:
d.incoming <- msg
}
}
// Close stops consumers from being able to add new messages to be decoded.
// Then it blocks until all messages have been decoded and sent.
func (d *decoderQueue) Close() {
d.once.Do(func() {
d.closed.Store(true)
close(d.incoming)
d.wg.Wait()
d.sender.Close()
})
}
// startParsing is called when the decoderQueue is created. It is a
// long-running go routine that watches the queue and starts other go routines
// to decode and send the messages.
func (d *decoderQueue) startParsing() {
defer d.wg.Done()
for i := range d.incoming {
d.workers.Acquire()
d.wg.Add(1)
go d.parse(i)
}
}
// parse is called to decode and then send an incoming message, using the
// registryHandler.
func (d *decoderQueue) parse(incoming []byte) {
defer d.wg.Done()
defer d.workers.Release()
msg := wrp.Message{}
// decoding
logging.Debug(d.logger).Log(logging.MessageKey(), "Decoding message...")
err := wrp.NewDecoderBytes(incoming, wrp.Msgpack).Decode(&msg)
if err != nil {
logging.Error(d.logger, emperror.Context(err)...).
Log(logging.MessageKey(), "Failed to decode message into wrp", logging.ErrorKey(), err.Error())
return
}
logging.Debug(d.logger).Log(logging.MessageKey(), "Message Decoded")
// sending
d.sender.GetHandlerThenSend(&msg)
logging.Debug(d.logger).Log(logging.MessageKey(), "Message Sent")
}