Skip to content

Commit 03f7c9b

Browse files
authored
Merge pull request #476 from xmidt-org/feat/sink-interface
Feat/sink interface
2 parents d8c4a56 + a5e29da commit 03f7c9b

File tree

6 files changed

+284
-266
lines changed

6 files changed

+284
-266
lines changed

caduceus_type.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,6 @@ import (
1212

1313
// Below is the struct we're using to contain the data from a provided config file
1414
// TODO: Try to figure out how to make bucket ranges configurable
15-
type CaduceusConfig struct {
16-
AuthHeader []string
17-
NumWorkerThreads int
18-
JobQueueSize int
19-
Sink SinkConfig
20-
JWTValidators []JWTValidator
21-
AllowInsecureTLS bool
22-
}
23-
2415
type SinkConfig struct {
2516
// The number of workers to assign to each SinkSender created.
2617
NumWorkersPerSender int

http.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,19 @@ type HandlerTelemetry struct {
5858

5959
func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
6060
eventType := unknownEventType
61-
log := sallust.Get(request.Context())
61+
logger := sallust.Get(request.Context())
6262
// find time difference, add to metric after function finishes
6363
defer func(s time.Time) {
6464
sh.recordQueueLatencyToHistogram(s, eventType)
6565
}(sh.now())
6666

67-
log.Info("Receiving incoming request...")
67+
logger.Info("Receiving incoming request...")
6868

6969
if len(request.Header["Content-Type"]) != 1 || request.Header["Content-Type"][0] != "application/msgpack" {
7070
//return a 415
7171
response.WriteHeader(http.StatusUnsupportedMediaType)
7272
response.Write([]byte("Invalid Content-Type header(s). Expected application/msgpack. \n"))
73-
log.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
73+
logger.Debug("Invalid Content-Type header(s). Expected application/msgpack. \n")
7474
return
7575
}
7676

@@ -81,7 +81,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
8181
// return a 503
8282
response.WriteHeader(http.StatusServiceUnavailable)
8383
response.Write([]byte("Incoming queue is full.\n"))
84-
log.Debug("Incoming queue is full.\n")
84+
logger.Debug("Incoming queue is full.\n")
8585
return
8686
}
8787

@@ -91,14 +91,14 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
9191
payload, err := io.ReadAll(request.Body)
9292
if err != nil {
9393
sh.telemetry.errorRequests.Add(1.0)
94-
log.Error("Unable to retrieve the request body.", zap.Error(err))
94+
logger.Error("Unable to retrieve the request body.", zap.Error(err))
9595
response.WriteHeader(http.StatusBadRequest)
9696
return
9797
}
9898

9999
if len(payload) == 0 {
100100
sh.telemetry.emptyRequests.Add(1.0)
101-
log.Error("Empty payload.")
101+
logger.Error("Empty payload.")
102102
response.WriteHeader(http.StatusBadRequest)
103103
response.Write([]byte("Empty payload.\n"))
104104
return
@@ -114,10 +114,10 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
114114
response.WriteHeader(http.StatusBadRequest)
115115
if err != nil {
116116
response.Write([]byte("Invalid payload format.\n"))
117-
log.Debug("Invalid payload format.")
117+
logger.Debug("Invalid payload format.")
118118
} else {
119119
response.Write([]byte("Invalid MessageType.\n"))
120-
log.Debug("Invalid MessageType.")
120+
logger.Debug("Invalid MessageType.")
121121
}
122122
return
123123
}
@@ -128,7 +128,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
128128
sh.telemetry.invalidCount.Add(1.0)
129129
response.WriteHeader(http.StatusBadRequest)
130130
response.Write([]byte("Strings must be UTF-8.\n"))
131-
log.Debug("Strings must be UTF-8.")
131+
logger.Debug("Strings must be UTF-8.")
132132
return
133133
}
134134
eventType = msg.FindEventStringSubMatch()
@@ -139,7 +139,7 @@ func (sh *ServerHandler) ServeHTTP(response http.ResponseWriter, request *http.R
139139
response.WriteHeader(http.StatusAccepted)
140140
response.Write([]byte("Request placed on to queue.\n"))
141141

142-
log.Debug("event passed to senders.", zap.Any("event", msg))
142+
logger.Debug("event passed to senders.", zap.Any("event", msg))
143143
}
144144

145145
func (sh *ServerHandler) recordQueueLatencyToHistogram(startTime time.Time, eventType string) {

listenerStub.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
// This is a stub for the webhook and kafka listeners. This will be removed once the webhook-schema configuration is approved
1212
type Listener interface {
1313
GetId() string
14-
GetAddress() string
1514
GetPartnerIds() []string
1615
GetUntil() time.Time
1716
}
@@ -208,21 +207,13 @@ func (v1 *ListenerV1) GetPartnerIds() []string {
208207
return v1.PartnerIds
209208
}
210209

211-
func (v1 *ListenerV1) GetAddress() string {
212-
return v1.Registration.Address
213-
}
214-
215210
func (v1 *ListenerV1) GetUntil() time.Time {
216211
return v1.Registration.Until
217212
}
218213
func (v2 *ListenerV2) GetId() string {
219214
return v2.Registration.CanonicalName
220215
}
221216

222-
func (v2 *ListenerV2) GetAddress() string {
223-
return v2.Registration.Address
224-
}
225-
226217
func (v2 *ListenerV2) GetPartnerIds() []string {
227218
return v2.PartnerIds
228219
}

webhooks.go renamed to matcher.go

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,16 @@ func (c *ClientMock) Do(req *http.Request) (*http.Response, error) {
2323
return &http.Response{}, nil
2424
}
2525

26-
type WebhookI interface {
27-
CheckMsg(*wrp.Message) error
26+
// move to subpackage and change to Interface
27+
type Matcher interface {
28+
IsMatch(*wrp.Message) bool
29+
30+
//TODO: not sure if this will be functionality of all webhooks or just v1
31+
//leaving for now - will make changes if running into roadblock with this
2832
getUrls() *ring.Ring
2933
}
30-
type WebhookV1 struct {
34+
35+
type MatcherV1 struct {
3136
events []*regexp.Regexp
3237
matcher []*regexp.Regexp
3338
urls *ring.Ring
@@ -41,10 +46,10 @@ type CommonWebhook struct {
4146

4247
// Update applies user configurable values for the outbound sender when a
4348
// webhook is registered
44-
func (w1 *WebhookV1) Update(l ListenerV1) error {
49+
func (m1 *MatcherV1) Update(l ListenerV1) error {
4550

4651
//TODO: don't believe the logger for webhook is being set anywhere just yet
47-
w1.logger = w1.logger.With(zap.String("webhook.address", l.Registration.Address))
52+
m1.logger = m1.logger.With(zap.String("webhook.address", l.Registration.Address))
4853

4954
if l.Registration.FailureURL != "" {
5055
_, err := url.ParseRequestURI(l.Registration.FailureURL)
@@ -86,54 +91,54 @@ func (w1 *WebhookV1) Update(l ListenerV1) error {
8691
for i := 0; i < urlCount; i++ {
8792
_, err := url.Parse(l.Registration.Config.AlternativeURLs[i])
8893
if err != nil {
89-
w1.logger.Error("failed to update url", zap.Any("url", l.Registration.Config.AlternativeURLs[i]), zap.Error(err))
94+
m1.logger.Error("failed to update url", zap.Any("url", l.Registration.Config.AlternativeURLs[i]), zap.Error(err))
9095
return err
9196
}
9297
}
9398

9499
// write/update sink sender
95-
w1.mutex.Lock()
96-
defer w1.mutex.Unlock()
100+
m1.mutex.Lock()
101+
defer m1.mutex.Unlock()
97102

98-
w1.events = events
103+
m1.events = events
99104

100105
//TODO: need to figure out how to set this
101106

102107
// if matcher list is empty set it nil for Queue() logic
103-
w1.matcher = nil
108+
m1.matcher = nil
104109
if 0 < len(matcher) {
105-
w1.matcher = matcher
110+
m1.matcher = matcher
106111
}
107112

108113
if urlCount == 0 {
109-
w1.urls = ring.New(1)
110-
w1.urls.Value = l.Registration.Config.ReceiverURL
114+
m1.urls = ring.New(1)
115+
m1.urls.Value = l.Registration.Config.ReceiverURL
111116
} else {
112117
ring := ring.New(urlCount)
113118
for i := 0; i < urlCount; i++ {
114119
ring.Value = l.Registration.Config.AlternativeURLs[i]
115120
ring = ring.Next()
116121
}
117-
w1.urls = ring
122+
m1.urls = ring
118123
}
119124

120125
// Randomize where we start so all the instances don't synchronize
121126
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
122-
offset := rand.Intn(w1.urls.Len())
127+
offset := rand.Intn(m1.urls.Len())
123128
for 0 < offset {
124-
w1.urls = w1.urls.Next()
129+
m1.urls = m1.urls.Next()
125130
offset--
126131
}
127132

128133
return nil
129134

130135
}
131136

132-
func (w1 *WebhookV1) CheckMsg(msg *wrp.Message) (err error) {
133-
w1.mutex.RLock()
134-
events := w1.events
135-
matcher := w1.matcher
136-
w1.mutex.RUnlock()
137+
func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool {
138+
m1.mutex.RLock()
139+
events := m1.events
140+
matcher := m1.matcher
141+
m1.mutex.RUnlock()
137142

138143
var (
139144
matchEvent bool
@@ -146,9 +151,8 @@ func (w1 *WebhookV1) CheckMsg(msg *wrp.Message) (err error) {
146151
}
147152
}
148153
if !matchEvent {
149-
w1.logger.Debug("destination regex doesn't match", zap.String("event.dest", msg.Destination))
150-
//TODO: return an error here?
151-
return
154+
m1.logger.Debug("destination regex doesn't match", zap.String("event.dest", msg.Destination))
155+
return false
152156
}
153157

154158
if matcher != nil {
@@ -162,37 +166,17 @@ func (w1 *WebhookV1) CheckMsg(msg *wrp.Message) (err error) {
162166
}
163167

164168
if !matchDevice {
165-
w1.logger.Debug("device regex doesn't match", zap.String("event.source", msg.Source))
166-
//TODO: return an error here?
167-
return
169+
m1.logger.Debug("device regex doesn't match", zap.String("event.source", msg.Source))
170+
return false
168171
}
169-
return
172+
return true
170173
}
171174

172-
func (w1 *WebhookV1) getUrls() (urls *ring.Ring) {
173-
urls = w1.urls
175+
func (m1 *MatcherV1) getUrls() (urls *ring.Ring) {
176+
urls = m1.urls
174177
// Move to the next URL to try 1st the next time.
175178
// This is okay because we run a single dispatcher and it's the
176179
// only one updating this field.
177-
w1.urls = w1.urls.Next()
180+
m1.urls = m1.urls.Next()
178181
return
179182
}
180-
181-
type WebhookV2 struct {
182-
//nolint:staticcheck
183-
placeholder string
184-
CommonWebhook
185-
}
186-
187-
func (w2 *WebhookV2) Update(l ListenerV2) error {
188-
189-
return nil
190-
}
191-
192-
func (w2 *WebhookV2) CheckMsg(msg *wrp.Message) error {
193-
return nil
194-
}
195-
196-
func (w2 *WebhookV2) getUrls() *ring.Ring {
197-
return nil
198-
}

0 commit comments

Comments
 (0)