Skip to content

Commit

Permalink
Implement audit backend for TimescaleDB.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 committed Aug 23, 2024
1 parent 045c507 commit 7855e39
Show file tree
Hide file tree
Showing 8 changed files with 707 additions and 50 deletions.
10 changes: 5 additions & 5 deletions auditing/auditing-interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func UnaryServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(fu
auditReqContext.StatusCode = statusCodeFromGrpc(err)

if err != nil {
auditReqContext.Error = err
auditReqContext.Error = err.Error()
err2 := a.Index(auditReqContext)
if err2 != nil {
logger.Error("unable to index", "error", err2)
Expand Down Expand Up @@ -129,7 +129,7 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f
auditReqContext.StatusCode = statusCodeFromGrpc(err)

if err != nil {
auditReqContext.Error = err
auditReqContext.Error = err.Error()
err2 := a.Index(auditReqContext)
if err2 != nil {
logger.Error("unable to index", "error", err2)
Expand Down Expand Up @@ -244,7 +244,7 @@ func (a auditingConnectInterceptor) WrapStreamingHandler(next connect.StreamingH
auditReqContext.StatusCode = statusCodeFromGrpc(err)

if err != nil {
auditReqContext.Error = err
auditReqContext.Error = err.Error()
err2 := a.auditing.Index(auditReqContext)
if err2 != nil {
a.logger.Error("unable to index", "error", err2)
Expand Down Expand Up @@ -311,7 +311,7 @@ func (i auditingConnectInterceptor) WrapUnary(next connect.UnaryFunc) connect.Un
auditReqContext.StatusCode = statusCodeFromGrpc(err)

if err != nil {
auditReqContext.Error = err
auditReqContext.Error = err.Error()
err2 := i.auditing.Index(auditReqContext)
if err2 != nil {
i.logger.Error("unable to index", "error", err2)
Expand Down Expand Up @@ -432,7 +432,7 @@ func HttpFilter(a Auditing, logger *slog.Logger) (restful.FilterFunction, error)
err = json.Unmarshal(body, &auditReqContext.Body)
if err != nil {
auditReqContext.Body = strBody
auditReqContext.Error = err
auditReqContext.Error = err.Error()
}

err = a.Index(auditReqContext)
Expand Down
53 changes: 30 additions & 23 deletions auditing/auditing.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@ package auditing

import (
"log/slog"
"os"
"path/filepath"
"time"
)

type Config struct {
Component string
URL string
APIKey string
IndexPrefix string
RotationInterval Interval
Keep int64
Log *slog.Logger
Component string
Log *slog.Logger
}

type Interval string
Expand Down Expand Up @@ -52,38 +49,39 @@ const (
const EntryFilterDefaultLimit int64 = 100

type Entry struct {
Id string // filled by the auditing driver
Component string
RequestId string `json:"rqid"`
Type EntryType
Timestamp time.Time
Id string `db:"-"` // filled by the auditing driver

User string
Tenant string
Component string `db:"component"`
RequestId string `db:"rqid" json:"rqid"`
Type EntryType `db:"type"`
Timestamp time.Time `db:"timestamp"`

User string `db:"userid"`
Tenant string `db:"tenant"`

// For `EntryDetailHTTP` the HTTP method get, post, put, delete, ...
// For `EntryDetailGRPC` unary, stream
Detail EntryDetail
Detail EntryDetail `db:"detail"`
// e.g. Request, Response, Error, Opened, Close
Phase EntryPhase
Phase EntryPhase `db:"phase"`
// For `EntryDetailHTTP` /api/v1/...
// For `EntryDetailGRPC` /api.v1/... (the method name)
Path string
ForwardedFor string
RemoteAddr string
Path string `db:"path"`
ForwardedFor string `db:"forwardedfor"`
RemoteAddr string `db:"remoteaddr"`

Body any // JSON, string or numbers
StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code
Body any `db:"body"` // JSON, string or numbers
StatusCode int `db:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code

// Internal errors
Error error
Error string `db:"error"`
}

func (e *Entry) prepareForNextPhase() {
e.Id = ""
e.Timestamp = time.Now()
e.Body = nil
e.Error = nil
e.Error = ""

switch e.Phase {
case EntryPhaseRequest:
Expand Down Expand Up @@ -137,3 +135,12 @@ type Auditing interface {
// The returned entries will be sorted by timestamp in descending order.
Search(EntryFilter) ([]Entry, error)
}

func defaultComponent() (string, error) {
ex, err := os.Executable()
if err != nil {
return "", err
}

return filepath.Base(ex), nil
}
38 changes: 23 additions & 15 deletions auditing/meilisearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"errors"
"fmt"
"log/slog"
"os"
"path/filepath"
"regexp"
"slices"
"strings"
Expand All @@ -17,6 +15,15 @@ import (
"github.com/meilisearch/meilisearch-go"
)

type MeilisearchConfig struct {
URL string
APIKey string

IndexPrefix string
RotationInterval Interval
Keep int64
}

type meiliAuditing struct {
component string
client *meilisearch.Client
Expand All @@ -39,32 +46,33 @@ const (
meiliIndexCreationWaitInterval = 100 * time.Millisecond
)

func New(c Config) (Auditing, error) {
func NewMeilisearch(c Config, mc MeilisearchConfig) (Auditing, error) {
if c.Component == "" {
ex, err := os.Executable()
component, err := defaultComponent()
if err != nil {
return nil, err
}
c.Component = filepath.Base(ex)

c.Component = component
}

client := meilisearch.NewClient(meilisearch.ClientConfig{
Host: c.URL,
APIKey: c.APIKey,
Host: mc.URL,
APIKey: mc.APIKey,
})
v, err := client.GetVersion()
if err != nil {
return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", c.URL, err)
return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", mc.URL, err)
}
c.Log.Info("meilisearch", "connected to", v, "index rotated", c.RotationInterval, "index keep", c.Keep)
c.Log.Info("meilisearch", "connected to", v, "index rotated", mc.RotationInterval, "index keep", mc.Keep)

a := &meiliAuditing{
component: c.Component,
client: client,
log: c.Log.WithGroup("auditing"),
indexPrefix: c.IndexPrefix,
rotationInterval: c.RotationInterval,
keep: c.Keep,
indexPrefix: mc.IndexPrefix,
rotationInterval: mc.RotationInterval,
keep: mc.Keep,
}
return a, nil
}
Expand Down Expand Up @@ -266,8 +274,8 @@ func (a *meiliAuditing) encodeEntry(entry Entry) map[string]any {
if entry.StatusCode != 0 {
doc["status-code"] = entry.StatusCode
}
if entry.Error != nil {
doc["error"] = entry.Error.Error()
if entry.Error != "" {
doc["error"] = entry.Error
}
if entry.Body != nil {
doc["body"] = entry.Body
Expand Down Expand Up @@ -339,7 +347,7 @@ func (a *meiliAuditing) decodeEntry(doc map[string]any) Entry {
entry.StatusCode = int(statusCode)
}
if err, ok := doc["error"].(string); ok {
entry.Error = errors.New(err)
entry.Error = err
}
if body, ok := doc["body"]; ok {
entry.Body = body
Expand Down
11 changes: 6 additions & 5 deletions auditing/meilisearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
RemoteAddr: "10.0.0.0",
Body: "This is the body of 00000000-0000-0000-0000-000000000000",
StatusCode: 200,
Error: nil,
Error: "",
},
{
Component: "auditing.test",
Expand All @@ -115,7 +115,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
RemoteAddr: "10.0.0.1",
Body: "This is the body of 00000000-0000-0000-0000-000000000001",
StatusCode: 201,
Error: nil,
Error: "",
},
{
Component: "auditing.test",
Expand All @@ -131,7 +131,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
RemoteAddr: "10.0.0.2",
Body: "This is the body of 00000000-0000-0000-0000-000000000002",
StatusCode: 0,
Error: nil,
Error: "",
},
}
}
Expand Down Expand Up @@ -277,10 +277,11 @@ func TestAuditing_Meilisearch(t *testing.T) {
tt := tt

t.Run(fmt.Sprintf("%d %s", i, tt.name), func(t *testing.T) {
a, err := New(Config{
a, err := NewMeilisearch(Config{
Log: slog.Default(),
}, MeilisearchConfig{
URL: c.Endpoint,
APIKey: c.Password,
Log: slog.Default(),
IndexPrefix: fmt.Sprintf("test-%d", i),
})
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 7855e39

Please sign in to comment.