Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matcher refactor #543

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 8 additions & 66 deletions internal/sink/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,15 @@
package sink

import (
"container/ring"
"errors"
"fmt"
"math/rand"
"net/http"
"net/url"
"regexp"
"strings"
"sync"
"time"

"github.com/xmidt-org/ancla"
"github.com/xmidt-org/caduceus/internal/metrics"
"github.com/xmidt-org/wrp-go/v3"
"go.uber.org/zap"
)
Expand All @@ -30,26 +26,22 @@ func (c *ClientMock) Do(req *http.Request) (*http.Response, error) {
// move to subpackage and change to Interface
type Matcher interface {
IsMatch(*wrp.Message) bool

Copy link
Contributor Author

@maurafortino maurafortino Sep 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed getUrls from Mathcer interface since the urls field has been moved to sink.go

//TODO: not sure if this will be functionality of all webhooks or just v1
//leaving for now - will make changes if running into roadblock with this
getUrls() *ring.Ring
}

type MatcherV1 struct {
events []*regexp.Regexp
matcher []*regexp.Regexp
urls *ring.Ring
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved urls to sink.go (webhookV1)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any functionality related to urls that was in matcher.go has also been moved to sink.go

CommonWebhook
CommonMatcher
}

type MatcherV2 struct {
matcher map[string]*regexp.Regexp
CommonWebhook
CommonMatcher
}
type CommonWebhook struct {
mutex sync.RWMutex

type CommonMatcher struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a common matcher struct - not sure if it's all that necessary though

logger *zap.Logger
mutex sync.RWMutex
}

// TODO: need to add matching logic for RegistryV2 & MatcherV2
Expand Down Expand Up @@ -78,14 +70,8 @@ func NewMatcher(l ancla.Register, logger *zap.Logger) (Matcher, error) {
// webhook is registered
func (m1 *MatcherV1) update(l ancla.RegistryV1) error {

//TODO: don't believe the logger for webhook is being set anywhere just yet
m1.logger = m1.logger.With(zap.String("webhook.address", l.Registration.Address))

if l.Registration.FailureURL != "" {
_, err := url.ParseRequestURI(l.Registration.FailureURL)
return err
}

var events []*regexp.Regexp
for _, event := range l.Registration.Events {
var re *regexp.Regexp
Expand Down Expand Up @@ -117,14 +103,6 @@ func (m1 *MatcherV1) update(l ancla.RegistryV1) error {
}

// Validate the various urls
urlCount := len(l.Registration.Config.AlternativeURLs)
for i := 0; i < urlCount; i++ {
_, err := url.Parse(l.Registration.Config.AlternativeURLs[i])
if err != nil {
m1.logger.Error("failed to update url", zap.Any(metrics.UrlLabel, l.Registration.Config.AlternativeURLs[i]), zap.Error(err))
return err
}
}

// write/update sink sender
m1.mutex.Lock()
Expand All @@ -134,30 +112,10 @@ func (m1 *MatcherV1) update(l ancla.RegistryV1) error {

// if matcher list is empty set it nil for Queue() logic
m1.matcher = nil
if 0 < len(matcher) {
if len(matcher) > 0 {
m1.matcher = matcher
}

if urlCount == 0 {
m1.urls = ring.New(1)
m1.urls.Value = l.Registration.Config.ReceiverURL
} else {
ring := ring.New(urlCount)
for i := 0; i < urlCount; i++ {
ring.Value = l.Registration.Config.AlternativeURLs[i]
ring = ring.Next()
}
m1.urls = ring
}

// Randomize where we start so all the instances don't synchronize
rand := rand.New(rand.NewSource(time.Now().UnixNano()))
offset := rand.Intn(m1.urls.Len())
for 0 < offset {
m1.urls = m1.urls.Next()
offset--
}

return nil

}
Expand Down Expand Up @@ -197,20 +155,10 @@ func (m1 *MatcherV1) IsMatch(msg *wrp.Message) bool {
return true
}

func (m1 *MatcherV1) getUrls() (urls *ring.Ring) {
urls = m1.urls
// Move to the next URL to try 1st the next time.
// This is okay because we run a single dispatcher and it's the
// only one updating this field.
m1.urls = m1.urls.Next()
return
}

// Update applies user configurable values for the outbound sender when a
// webhook is registered
func (m2 *MatcherV2) update(l ancla.RegistryV2) error {

//TODO: don't believe the logger for webhook is being set anywhere just yet
m2.logger = m2.logger.With(zap.String("webhook.address", l.Registration.Address))

if l.Registration.FailureURL != "" {
Expand Down Expand Up @@ -242,9 +190,8 @@ func (m2 *MatcherV2) update(l ancla.RegistryV2) error {
defer m2.mutex.Unlock()

// if matcher list is empty set it nil for Queue() logic
if len(matcher) == 0 {
m2.matcher = nil
} else {
m2.matcher = nil
if len(matcher) > 0 {
m2.matcher = matcher
}

Expand Down Expand Up @@ -273,8 +220,3 @@ func (m2 *MatcherV2) IsMatch(msg *wrp.Message) bool {
}
return true
}

// TODO: this is a big reason why I want to refactor the Matcher logic
func (m2 *MatcherV2) getUrls() (urls *ring.Ring) {
return
}
3 changes: 0 additions & 3 deletions internal/sink/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,6 @@ var (
matcher = &MatcherV1{
events: []*regexp.Regexp{regexp.MustCompile("iot")},
matcher: []*regexp.Regexp{regexp.MustCompile("mac:112233445566")},
CommonWebhook: CommonWebhook{
logger: logger,
},
}
)

Expand Down
Loading
Loading