Skip to content

Commit

Permalink
icinga2: Add created/deleted events & cache event extra tags
Browse files Browse the repository at this point in the history
Host/Service groups are never supposed to change at runtime, thus we can
use these two new events to regularly refresh our cache store. This
eliminates the overhead of querying the groups with each ongoing event
and should relax the Icinga 2 API a litle bit.
  • Loading branch information
yhabteab committed Jun 4, 2024
1 parent 97ed9c8 commit 7cdb7f5
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 22 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/emersion/go-smtp v0.21.2
github.com/goccy/go-yaml v1.11.3
github.com/google/uuid v1.6.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/icinga/icinga-go-library v0.2.0
github.com/jhillyerd/enmime v1.2.0
github.com/jmoiron/sqlx v1.4.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/icinga/icinga-go-library v0.2.0 h1:1or5s3KMEJGdhFbMzlN8NPw1NCd/3ntsKLw5et4/9XI=
github.com/icinga/icinga-go-library v0.2.0/go.mod h1:YN7XJN3W0FodD+j4kirO89zk2tgvanXWt1RMV8UgOLo=
github.com/jaytaylor/html2text v0.0.0-20230321000545-74c2419ad056 h1:iCHtR9CQyktQ5+f3dMVZfwD2KWJUgm7M0gdL9NGr8KA=
Expand Down
36 changes: 36 additions & 0 deletions internal/icinga2/api_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
)

Expand Down Expand Up @@ -157,6 +158,8 @@ const (
typeDowntimeStarted = "DowntimeStarted"
typeDowntimeTriggered = "DowntimeTriggered"
typeFlapping = "Flapping"
typeObjectCreated = "ObjectCreated"
typeObjectDeleted = "ObjectDeleted"
)

// StateChange represents the Icinga 2 API Event Stream StateChange response for host/service state changes.
Expand Down Expand Up @@ -273,6 +276,37 @@ type Flapping struct {
IsFlapping bool `json:"is_flapping"`
}

// CheckableCreatedDeleted represents the Icinga 2 API stream Checkable created/deleted response.
//
// NOTE:
// - The ObjectName field may already contain the composed name of the checkable if the ObjectType is `Service`.
// - The EventType field indicates which event type is currently being streamed and is either
// set to typeObjectCreated or typeObjectDeleted.
type CheckableCreatedDeleted struct {
ObjectName string `json:"object_name"`
ObjectType string `json:"object_type"`
EventType string `json:"type"`
}

// GetHostName returns the host name of this current response result.
func (c *CheckableCreatedDeleted) GetHostName() string {
if c.ObjectType == "Host" {
return c.ObjectName
}

return strings.Split(c.ObjectName, "!")[0]
}

// GetServiceName returns the service of this current response result.
// Returns empty string if the current result type is of type `Host`.
func (c *CheckableCreatedDeleted) GetServiceName() string {
if c.ObjectType == "Service" {
return strings.Split(c.ObjectName, "!")[1]
}

return ""
}

// UnmarshalEventStreamResponse unmarshal a JSON response line from the Icinga 2 API Event Stream.
//
// The function expects an Icinga 2 API Event Stream Response in its JSON form and tries to unmarshal it into one of the
Expand Down Expand Up @@ -323,6 +357,8 @@ func UnmarshalEventStreamResponse(bytes []byte) (any, error) {
resp = new(DowntimeTriggered)
case typeFlapping:
resp = new(Flapping)
case typeObjectCreated, typeObjectDeleted:
resp = new(CheckableCreatedDeleted)
default:
return nil, fmt.Errorf("unsupported type %q", responseType)
}
Expand Down
36 changes: 36 additions & 0 deletions internal/icinga2/api_responses_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,42 @@ func TestApiResponseUnmarshal(t *testing.T) {
},
},
},
{
name: "objectcreated-host",
jsonData: `{"object_name":"event-stream","object_type":"Host","timestamp":1716542256.769028,"type":"ObjectCreated"}`,
expected: &CheckableCreatedDeleted{
ObjectName: "event-stream",
ObjectType: "Host",
EventType: "ObjectCreated",
},
},
{
name: "objectcreated-service",
jsonData: `{"object_name":"event-stream!ssh","object_type":"Service","timestamp":1716542256.783502,"type":"ObjectCreated"}`,
expected: &CheckableCreatedDeleted{
ObjectName: "event-stream!ssh",
ObjectType: "Service",
EventType: "ObjectCreated",
},
},
{
name: "objectdeleted-host",
jsonData: `{"object_name":"event-stream","object_type":"Host","timestamp":1716542070.492318,"type":"ObjectDeleted"}`,
expected: &CheckableCreatedDeleted{
ObjectName: "event-stream",
ObjectType: "Host",
EventType: "ObjectDeleted",
},
},
{
name: "objectdeleted-service",
jsonData: `{"object_name":"event-stream!ssh","object_type":"Service","timestamp":1716542070.492095,"type":"ObjectDeleted"}`,
expected: &CheckableCreatedDeleted{
ObjectName: "event-stream!ssh",
ObjectType: "Service",
EventType: "ObjectDeleted",
},
},
}

for _, test := range tests {
Expand Down
88 changes: 69 additions & 19 deletions internal/icinga2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package icinga2
import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/icinga/icinga-notifications/internal/event"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -64,6 +66,14 @@ type Client struct {
eventDispatcherEventStream chan *eventMsg
// catchupPhaseRequest requests the main worker to switch to the catch-up-phase to query the API for missed events.
catchupPhaseRequest chan struct{}

// eventExtraTags is used to cache Checkable groups once they have been fetched from the Icinga 2 API so that they
// don't have to be fetched over again with each ongoing event. Host/Service groups are never supposed to change at
// runtime, so this cache is being refreshed once in a while when Icinga 2 dispatches an object created/deleted
// event and thus should not overload the Icinga 2 API in a large environment with numerous Checkables.
// The LRU cache size is defined as 2^17, and when the actual cached items reach this size, the least used values
// will simply be overwritten by the new ones.
eventExtraTagsCache *lru.Cache[string, map[string]string]
}

// buildCommonEvent creates an event.Event based on Host and (optional) Service attributes to be specified later.
Expand All @@ -78,10 +88,9 @@ type Client struct {
// - ID
func (client *Client) buildCommonEvent(ctx context.Context, host, service string) (*event.Event, error) {
var (
eventName string
eventUrl *url.URL
eventTags map[string]string
eventExtraTags = make(map[string]string)
eventName string
eventUrl *url.URL
eventTags map[string]string
)

eventUrl, err := url.Parse(client.IcingaWebRoot)
Expand All @@ -99,14 +108,6 @@ func (client *Client) buildCommonEvent(ctx context.Context, host, service string
"host": host,
"service": service,
}

serviceGroups, err := client.fetchHostServiceGroups(ctx, host, service)
if err != nil {
return nil, err
}
for _, serviceGroup := range serviceGroups {
eventExtraTags["servicegroup/"+serviceGroup] = ""
}
} else {
eventName = host

Expand All @@ -118,12 +119,17 @@ func (client *Client) buildCommonEvent(ctx context.Context, host, service string
}
}

hostGroups, err := client.fetchHostServiceGroups(ctx, host, "")
if err != nil {
return nil, err
}
for _, hostGroup := range hostGroups {
eventExtraTags["hostgroup/"+hostGroup] = ""
extraTags, ok := client.eventExtraTagsCache.Get(eventName)
if !ok {
objectType := "Host"
if service != "" {
objectType = "Service"
}

checkableResult := &CheckableCreatedDeleted{ObjectName: eventName, ObjectType: objectType, EventType: typeObjectCreated}
if extraTags, err = client.refreshExtraTagsCache(ctx, checkableResult); err != nil {
return nil, err
}
}

return &event.Event{
Expand All @@ -132,10 +138,46 @@ func (client *Client) buildCommonEvent(ctx context.Context, host, service string
Name: eventName,
URL: eventUrl.String(),
Tags: eventTags,
ExtraTags: eventExtraTags,
ExtraTags: extraTags,
}, nil
}

// refreshExtraTagsCache refreshes the client event extra tags cache store based on the given response result.
// If the specified response result is of typeObjectCreated, it will return the just extracted event extra tags
// (if available), otherwise always nil.
func (client *Client) refreshExtraTagsCache(ctx context.Context, result *CheckableCreatedDeleted) (map[string]string, error) {
switch result.EventType {
case typeObjectDeleted:
// The checkable has just been deleted, so delete all existing extra tags from our cache store as well.
client.eventExtraTagsCache.Remove(result.ObjectName)
case typeObjectCreated:
extraTags := make(map[string]string)
hostGroups, err := client.fetchHostServiceGroups(ctx, result.GetHostName(), "")
if err != nil {
return nil, err
}
for _, hostGroup := range hostGroups {
extraTags["hostgroup/"+hostGroup] = ""
}

if result.ObjectType == "Service" {
serviceGroups, err := client.fetchHostServiceGroups(ctx, result.GetHostName(), result.GetServiceName())
if err != nil {
return nil, err
}
for _, serviceGroup := range serviceGroups {
extraTags["servicegroup/"+serviceGroup] = ""
}
}

client.eventExtraTagsCache.Add(result.ObjectName, extraTags)
default:
return nil, fmt.Errorf("cannot refresh object extra tags for event-stream type %q", result.EventType)
}

return nil, nil
}

// buildHostServiceEvent constructs an event.Event based on a CheckResult, a Host or Service state, a Host name and an
// optional Service name if the Event should represent a Service object.
func (client *Client) buildHostServiceEvent(ctx context.Context, result CheckResult, state int, host, service string) (*event.Event, error) {
Expand Down Expand Up @@ -441,6 +483,14 @@ func (client *Client) Process() {
client.eventDispatcherEventStream = make(chan *eventMsg)
client.catchupPhaseRequest = make(chan struct{})

cache, err := lru.New[string, map[string]string](1 << 17)
if err != nil {
// Is unlikely to happen, as the only error being returned is triggered by
// specifying negative numbers as the cache size.
client.Logger.Fatalw("Failed to initialise event extra tags cache", zap.Error(err))
}
client.eventExtraTagsCache = cache

go client.worker()

for client.Ctx.Err() == nil {
Expand Down
23 changes: 20 additions & 3 deletions internal/icinga2/client_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,19 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
queueNameRndBuff := make([]byte, 16)
_, _ = rand.Read(queueNameRndBuff)

reqBody, err := json.Marshal(map[string]any{
reqBody := new(bytes.Buffer)
encoder := json.NewEncoder(reqBody)
// Since our encoded JSON isn't going to be used somewhere in a browser, we've to disable this feature
// so that our filter expressions such as ‘&’ don't get replaced by some UTF-8 code points.
encoder.SetEscapeHTML(false)

err := encoder.Encode(map[string]any{
"queue": fmt.Sprintf("icinga-notifications-%x", queueNameRndBuff),
"types": esTypes,
"filter": fmt.Sprintf(
"(event.type!=%q && event.type!=%q) || event.object_type==%q || event.object_type==%q",
typeObjectCreated, typeObjectDeleted, "Host", "Service",
),
})
if err != nil {
return nil, err
Expand All @@ -342,7 +352,7 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
// When leaving the function without an error, it is being called in connectEventStreamReadCloser.Close().
reqCtx, reqCancel := context.WithCancel(client.Ctx)

req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, apiUrl, bytes.NewReader(reqBody))
req, err := http.NewRequestWithContext(reqCtx, http.MethodPost, apiUrl, reqBody)
if err != nil {
reqCancel()
return nil, err
Expand All @@ -357,7 +367,7 @@ func (client *Client) connectEventStream(esTypes []string) (io.ReadCloser, error
go func() {
defer close(resCh)

client.Logger.Debug("Try to establish an Event Stream API connection")
client.Logger.Debugw("Try to establish an Event Stream API connection", zap.String("request_body", reqBody.String()))
httpClient := &http.Client{Transport: client.ApiHttpTransport}
res, err := httpClient.Do(req)
if err != nil {
Expand Down Expand Up @@ -426,6 +436,8 @@ func (client *Client) listenEventStream() error {
typeDowntimeStarted,
typeDowntimeTriggered,
typeFlapping,
typeObjectCreated,
typeObjectDeleted,
})
if err != nil {
return err
Expand Down Expand Up @@ -499,6 +511,11 @@ func (client *Client) listenEventStream() error {
case *Flapping:
ev, err = client.buildFlappingEvent(client.Ctx, respT.Host, respT.Service, respT.IsFlapping)
evTime = respT.Timestamp.Time()
case *CheckableCreatedDeleted:
_, err = client.refreshExtraTagsCache(client.Ctx, respT)
if err == nil {
continue
}
default:
err = fmt.Errorf("unsupported type %T", resp)
}
Expand Down

0 comments on commit 7cdb7f5

Please sign in to comment.