Skip to content

Commit

Permalink
refactor(topics): refactor updating topic events.
Browse files Browse the repository at this point in the history
  • Loading branch information
docmerlin committed Oct 24, 2022
1 parent 225b804 commit a95a391
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 2,120 deletions.
13 changes: 8 additions & 5 deletions alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ func (s *Topics) RestoreTopics() {
}
}

func (s *Topics) UpdateEvent(id string, event EventState) {
func (s *Topics) UpdateEvent(topicID string, event EventState) {

s.mu.Lock()
defer s.mu.Unlock()
t, ok := s.topics[id]
t, ok := s.topics[topicID]
if !ok {
t = s.newTopic(id)
s.topics[id] = t
s.topics[topicID] = s.newTopic(topicID)
}
t.updateEvent(event)
}
Expand Down Expand Up @@ -124,7 +124,6 @@ func (s *Topics) Collect(event Event) error {
}
s.mu.Unlock()
}

return topic.collect(event)
}

Expand Down Expand Up @@ -347,16 +346,19 @@ func (t *Topic) close() {
}

func (t *Topic) collect(event Event) error {

prev, ok := t.updateEvent(event.State)
if ok {
event.previousState = prev
}

t.collected.Add(1)

return t.handleEvent(event)
}

func (t *Topic) handleEvent(event Event) error {

t.mu.RLock()
defer t.mu.RUnlock()

Expand Down Expand Up @@ -406,6 +408,7 @@ func (t *Topic) updateEvent(state EventState) (EventState, bool) {

type sortedStates []*EventState

// TODO(docmerlin): replaced sortedStates with a heap or something similar
func (e sortedStates) Len() int { return len(e) }
func (e sortedStates) Swap(i int, j int) { e[i], e[j] = e[j], e[i] }
func (e sortedStates) Less(i int, j int) bool {
Expand Down
7 changes: 1 addition & 6 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,8 @@ def run_tests(race, parallel, timeout, verbose):
test_command += " -timeout {}".format(timeout)
test_command += " ./..."
logging.info("Running tests...")

packages = run("go list ./...").split("\n")
print(packages)
logging.info("Test command: " + test_command)
for package in packages:
run(test_command + " " + package, printOutput=logging.getLogger().getEffectiveLevel() == logging.DEBUG)

output = run(test_command, printOutput=logging.getLogger().getEffectiveLevel() == logging.DEBUG)
return True

def package_udfs(version, dist_dir):
Expand Down
1 change: 0 additions & 1 deletion server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,7 +1135,6 @@ func (s *Server) startServices() error {
return fmt.Errorf("open service %T: %s", service, err)
}
s.Diag.Debug("opened service", keyvalue.KV("service", fmt.Sprintf("%T", service)))

// Apply config overrides after the config override service has been opened and before any dynamic services.
if service == s.ConfigOverrideService && !s.config.SkipConfigOverrides && s.config.ConfigOverride.Enabled {
// Apply initial config updates
Expand Down
117 changes: 59 additions & 58 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2857,7 +2857,6 @@ test value=1 0000000011
func TestServer_UpdateTaskID(t *testing.T) {
s, cli := OpenDefaultServer(t)
defer s.Close()
println("here")
id := "testTaskID"
ttype := client.StreamTask
dbrps := []client.DBRP{
Expand Down Expand Up @@ -9990,70 +9989,70 @@ func TestServer_AlertHandlers_CRUD(t *testing.T) {
},
}
for _, tc := range testCases {
// Create default config
c := NewConfig(t)
s := OpenServer(c)
cli := Client(s)
defer s.Close()

h, err := cli.CreateTopicHandler(cli.TopicHandlersLink(tc.topic), tc.create)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(h, tc.expCreate) {
t.Errorf("unexpected handler created:\ngot\n%#v\nexp\n%#v\n", h, tc.expCreate)
}

h, err = cli.PatchTopicHandler(h.Link, tc.patch)
if err != nil {
t.Fatal(err)
}
t.Run(tc.topic, func(t *testing.T) {
// Create default config
c := NewConfig(t)
s := OpenServer(c)
cli := Client(s)
defer s.Close()
h, err := cli.CreateTopicHandler(cli.TopicHandlersLink(tc.topic), tc.create)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(h, tc.expPatch) {
t.Errorf("unexpected handler patched:\ngot\n%#v\nexp\n%#v\n", h, tc.expPatch)
}
if !reflect.DeepEqual(h, tc.expCreate) {
t.Errorf("unexpected handler created:\ngot\n%#v\nexp\n%#v\n", h, tc.expCreate)
}

h, err = cli.ReplaceTopicHandler(h.Link, tc.put)
if err != nil {
t.Fatal(err)
}
h, err = cli.PatchTopicHandler(h.Link, tc.patch)
if err != nil {
t.Fatal(err)
}

if !reflect.DeepEqual(h, tc.expPut) {
t.Errorf("unexpected handler put:\ngot\n%#v\nexp\n%#v\n", h, tc.expPut)
}
if !reflect.DeepEqual(h, tc.expPatch) {
t.Errorf("unexpected handler patched:\ngot\n%#v\nexp\n%#v\n", h, tc.expPatch)
}

// Restart server
s.Restart()
h, err = cli.ReplaceTopicHandler(h.Link, tc.put)
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(h, tc.expPut) {
t.Errorf("unexpected handler put:\ngot\n%#v\nexp\n%#v\n", h, tc.expPut)
}

rh, err := cli.TopicHandler(h.Link)
if err != nil {
t.Fatalf("could not find handler after restart: %v", err)
}
if got, exp := rh, h; !reflect.DeepEqual(got, exp) {
t.Errorf("unexpected handler after restart:\ngot\n%#v\nexp\n%#v\n", got, exp)
}
// Restart server
s.Restart()

err = cli.DeleteTopicHandler(h.Link)
if err != nil {
t.Fatal(err)
}
rh, err := cli.TopicHandler(h.Link)
if err != nil {
t.Fatalf("could not find handler after restart: %v", err)
}
if got, exp := rh, h; !reflect.DeepEqual(got, exp) {
t.Errorf("unexpected handler after restart:\ngot\n%#v\nexp\n%#v\n", got, exp)
}

_, err = cli.TopicHandler(h.Link)
if err == nil {
t.Errorf("expected handler to be deleted")
}
err = cli.DeleteTopicHandler(h.Link)
if err != nil {
t.Fatal(err)
}

handlers, err := cli.ListTopicHandlers(cli.TopicHandlersLink(tc.topic), nil)
if err != nil {
t.Fatal(err)
}
for _, h := range handlers.Handlers {
if h.ID == tc.expPut.ID {
_, err = cli.TopicHandler(h.Link)
if err == nil {
t.Errorf("expected handler to be deleted")
break
}
}

handlers, err := cli.ListTopicHandlers(cli.TopicHandlersLink(tc.topic), nil)
if err != nil {
t.Fatal(err)
}
for _, h := range handlers.Handlers {
if h.ID == tc.expPut.ID {
t.Errorf("expected handler to be deleted")
break
}
}
})
}
}

Expand Down Expand Up @@ -12001,7 +12000,10 @@ stream
v := url.Values{}
v.Add("precision", "s")
s.MustWrite("mydb", "myrp", point, v)
time.Sleep(15 * time.Second)

q, _ := s.AlertService.EventStates("tcp", -1)
_ = q
s.Restart()

// Check TCP handler got event
Expand Down Expand Up @@ -12029,9 +12031,8 @@ stream
exp := []alert.Data{alertData}
got := ts.Data()
if !reflect.DeepEqual(exp, got) {
t.Errorf("unexpected tcp request:\nexp\n%+v\ngot\n%+v\n", exp, got)
t.Fatalf("unexpected tcp request:\nexp\n%+v\ngot\n%+v\n", exp, got)
}

// Check event on topic
l := cli.TopicEventsLink(tcpTopic)
expTopicEvents := client.TopicEvents{
Expand All @@ -12055,7 +12056,7 @@ stream
t.Fatal(err)
}
if !reflect.DeepEqual(te, expTopicEvents) {
t.Errorf("unexpected topic events for publish topic:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents)
t.Fatalf("unexpected topic events for publish topic:\ngot\n%+v\nexp\n%+v\n", te, expTopicEvents)
}
}

Expand Down
1 change: 1 addition & 0 deletions services/alert/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ func (s *apiServer) handleListEvents(topic string, w http.ResponseWriter, r *htt
httpd.HttpError(w, fmt.Sprintf("failed to get topic events: %s", err.Error()), true, http.StatusInternalServerError)
return
}

res := client.TopicEvents{
Link: s.topicEventsLink(topic, client.Self),
Topic: topic,
Expand Down
26 changes: 26 additions & 0 deletions services/alert/easyjson-bootstrap2172438621.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// +build ignore

// TEMPORARY AUTOGENERATED FILE: easyjson bootstapping code to launch
// the actual generator.

package main

import (
"fmt"
"os"

"github.com/mailru/easyjson/gen"

pkg "github.com/influxdata/kapacitor/services/alert"
)

func main() {
g := gen.NewGenerator("dao_easyjson.go")
g.SetPkg("alert", "github.com/influxdata/kapacitor/services/alert")
g.Add(pkg.EasyJSON_exporter_EventState(nil))
g.Add(pkg.EasyJSON_exporter_TopicState(nil))
if err := g.Run(os.Stdout); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
}
Loading

0 comments on commit a95a391

Please sign in to comment.