From 7855e390adaf004b6089f176144e0fc2ce3f9e05 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Fri, 23 Aug 2024 12:48:59 +0200 Subject: [PATCH 01/13] Implement audit backend for TimescaleDB. --- auditing/auditing-interceptor.go | 10 +- auditing/auditing.go | 53 ++-- auditing/meilisearch.go | 38 +-- auditing/meilisearch_integration_test.go | 11 +- auditing/timescaledb.go | 328 +++++++++++++++++++++++ auditing/timescaledb_integration_test.go | 275 +++++++++++++++++++ go.mod | 9 +- go.sum | 33 +++ 8 files changed, 707 insertions(+), 50 deletions(-) create mode 100644 auditing/timescaledb.go create mode 100644 auditing/timescaledb_integration_test.go diff --git a/auditing/auditing-interceptor.go b/auditing/auditing-interceptor.go index 1e2d979..94749da 100644 --- a/auditing/auditing-interceptor.go +++ b/auditing/auditing-interceptor.go @@ -71,7 +71,7 @@ func UnaryServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(fu auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err + auditReqContext.Error = err.Error() err2 := a.Index(auditReqContext) if err2 != nil { logger.Error("unable to index", "error", err2) @@ -129,7 +129,7 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err + auditReqContext.Error = err.Error() err2 := a.Index(auditReqContext) if err2 != nil { logger.Error("unable to index", "error", err2) @@ -244,7 +244,7 @@ func (a auditingConnectInterceptor) WrapStreamingHandler(next connect.StreamingH auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err + auditReqContext.Error = err.Error() err2 := a.auditing.Index(auditReqContext) if err2 != nil { a.logger.Error("unable to index", "error", err2) @@ -311,7 +311,7 @@ func (i auditingConnectInterceptor) WrapUnary(next connect.UnaryFunc) connect.Un auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err + auditReqContext.Error = err.Error() err2 := i.auditing.Index(auditReqContext) if err2 != nil { i.logger.Error("unable to index", "error", err2) @@ -432,7 +432,7 @@ func HttpFilter(a Auditing, logger *slog.Logger) (restful.FilterFunction, error) err = json.Unmarshal(body, &auditReqContext.Body) if err != nil { auditReqContext.Body = strBody - auditReqContext.Error = err + auditReqContext.Error = err.Error() } err = a.Index(auditReqContext) diff --git a/auditing/auditing.go b/auditing/auditing.go index bfdae5f..424470a 100644 --- a/auditing/auditing.go +++ b/auditing/auditing.go @@ -2,17 +2,14 @@ package auditing import ( "log/slog" + "os" + "path/filepath" "time" ) type Config struct { - Component string - URL string - APIKey string - IndexPrefix string - RotationInterval Interval - Keep int64 - Log *slog.Logger + Component string + Log *slog.Logger } type Interval string @@ -52,38 +49,39 @@ const ( const EntryFilterDefaultLimit int64 = 100 type Entry struct { - Id string // filled by the auditing driver - Component string - RequestId string `json:"rqid"` - Type EntryType - Timestamp time.Time + Id string `db:"-"` // filled by the auditing driver - User string - Tenant string + Component string `db:"component"` + RequestId string `db:"rqid" json:"rqid"` + Type EntryType `db:"type"` + Timestamp time.Time `db:"timestamp"` + + User string `db:"userid"` + Tenant string `db:"tenant"` // For `EntryDetailHTTP` the HTTP method get, post, put, delete, ... // For `EntryDetailGRPC` unary, stream - Detail EntryDetail + Detail EntryDetail `db:"detail"` // e.g. Request, Response, Error, Opened, Close - Phase EntryPhase + Phase EntryPhase `db:"phase"` // For `EntryDetailHTTP` /api/v1/... // For `EntryDetailGRPC` /api.v1/... (the method name) - Path string - ForwardedFor string - RemoteAddr string + Path string `db:"path"` + ForwardedFor string `db:"forwardedfor"` + RemoteAddr string `db:"remoteaddr"` - Body any // JSON, string or numbers - StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code + Body any `db:"body"` // JSON, string or numbers + StatusCode int `db:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code // Internal errors - Error error + Error string `db:"error"` } func (e *Entry) prepareForNextPhase() { e.Id = "" e.Timestamp = time.Now() e.Body = nil - e.Error = nil + e.Error = "" switch e.Phase { case EntryPhaseRequest: @@ -137,3 +135,12 @@ type Auditing interface { // The returned entries will be sorted by timestamp in descending order. Search(EntryFilter) ([]Entry, error) } + +func defaultComponent() (string, error) { + ex, err := os.Executable() + if err != nil { + return "", err + } + + return filepath.Base(ex), nil +} diff --git a/auditing/meilisearch.go b/auditing/meilisearch.go index ef121f5..df29b78 100644 --- a/auditing/meilisearch.go +++ b/auditing/meilisearch.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "log/slog" - "os" - "path/filepath" "regexp" "slices" "strings" @@ -17,6 +15,15 @@ import ( "github.com/meilisearch/meilisearch-go" ) +type MeilisearchConfig struct { + URL string + APIKey string + + IndexPrefix string + RotationInterval Interval + Keep int64 +} + type meiliAuditing struct { component string client *meilisearch.Client @@ -39,32 +46,33 @@ const ( meiliIndexCreationWaitInterval = 100 * time.Millisecond ) -func New(c Config) (Auditing, error) { +func NewMeilisearch(c Config, mc MeilisearchConfig) (Auditing, error) { if c.Component == "" { - ex, err := os.Executable() + component, err := defaultComponent() if err != nil { return nil, err } - c.Component = filepath.Base(ex) + + c.Component = component } client := meilisearch.NewClient(meilisearch.ClientConfig{ - Host: c.URL, - APIKey: c.APIKey, + Host: mc.URL, + APIKey: mc.APIKey, }) v, err := client.GetVersion() if err != nil { - return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", c.URL, err) + return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", mc.URL, err) } - c.Log.Info("meilisearch", "connected to", v, "index rotated", c.RotationInterval, "index keep", c.Keep) + c.Log.Info("meilisearch", "connected to", v, "index rotated", mc.RotationInterval, "index keep", mc.Keep) a := &meiliAuditing{ component: c.Component, client: client, log: c.Log.WithGroup("auditing"), - indexPrefix: c.IndexPrefix, - rotationInterval: c.RotationInterval, - keep: c.Keep, + indexPrefix: mc.IndexPrefix, + rotationInterval: mc.RotationInterval, + keep: mc.Keep, } return a, nil } @@ -266,8 +274,8 @@ func (a *meiliAuditing) encodeEntry(entry Entry) map[string]any { if entry.StatusCode != 0 { doc["status-code"] = entry.StatusCode } - if entry.Error != nil { - doc["error"] = entry.Error.Error() + if entry.Error != "" { + doc["error"] = entry.Error } if entry.Body != nil { doc["body"] = entry.Body @@ -339,7 +347,7 @@ func (a *meiliAuditing) decodeEntry(doc map[string]any) Entry { entry.StatusCode = int(statusCode) } if err, ok := doc["error"].(string); ok { - entry.Error = errors.New(err) + entry.Error = err } if body, ok := doc["body"]; ok { entry.Body = body diff --git a/auditing/meilisearch_integration_test.go b/auditing/meilisearch_integration_test.go index 60e19d1..0d2f9d7 100644 --- a/auditing/meilisearch_integration_test.go +++ b/auditing/meilisearch_integration_test.go @@ -99,7 +99,7 @@ func TestAuditing_Meilisearch(t *testing.T) { RemoteAddr: "10.0.0.0", Body: "This is the body of 00000000-0000-0000-0000-000000000000", StatusCode: 200, - Error: nil, + Error: "", }, { Component: "auditing.test", @@ -115,7 +115,7 @@ func TestAuditing_Meilisearch(t *testing.T) { RemoteAddr: "10.0.0.1", Body: "This is the body of 00000000-0000-0000-0000-000000000001", StatusCode: 201, - Error: nil, + Error: "", }, { Component: "auditing.test", @@ -131,7 +131,7 @@ func TestAuditing_Meilisearch(t *testing.T) { RemoteAddr: "10.0.0.2", Body: "This is the body of 00000000-0000-0000-0000-000000000002", StatusCode: 0, - Error: nil, + Error: "", }, } } @@ -277,10 +277,11 @@ func TestAuditing_Meilisearch(t *testing.T) { tt := tt t.Run(fmt.Sprintf("%d %s", i, tt.name), func(t *testing.T) { - a, err := New(Config{ + a, err := NewMeilisearch(Config{ + Log: slog.Default(), + }, MeilisearchConfig{ URL: c.Endpoint, APIKey: c.Password, - Log: slog.Default(), IndexPrefix: fmt.Sprintf("test-%d", i), }) require.NoError(t, err) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go new file mode 100644 index 0000000..6b19833 --- /dev/null +++ b/auditing/timescaledb.go @@ -0,0 +1,328 @@ +package auditing + +import ( + "context" + "database/sql" + "fmt" + "log/slog" + "reflect" + "slices" + "strings" + "time" + + sq "github.com/Masterminds/squirrel" + "github.com/jmoiron/sqlx" + "github.com/lopezator/migrator" + + _ "github.com/lib/pq" +) + +type TimescaleDbConfig struct { + Host string + Port string + DB string + User string + Password string +} + +type timescaleAuditing struct { + component string + db *sqlx.DB + log *slog.Logger + + cols []string + vals []any +} + +func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { + if c.Component == "" { + component, err := defaultComponent() + if err != nil { + return nil, err + } + + c.Component = component + } + + if tc.Port == "" { + tc.Port = "5432" + } + if tc.DB == "" { + tc.DB = "postgres" + } + if tc.User == "" { + tc.User = "postgres" + } + + source := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=disable", tc.Host, tc.Port, tc.User, tc.DB, tc.Password) + + db, err := sqlx.Connect("postgres", source) + if err != nil { + return nil, fmt.Errorf("could not connect to datastore: %w", err) + } + + a := ×caleAuditing{ + component: c.Component, + log: c.Log.WithGroup("auditing"), + db: db, + } + + err = a.initialize() + if err != nil { + return nil, fmt.Errorf("unable to initialize timescaledb backend: %w", err) + } + + a.log.Info("connected to timescaledb backend") + + return a, nil +} + +func (a *timescaleAuditing) initialize() error { + initialSchema := &migrator.Migration{ + Name: "Initial database schema", + Func: func(tx *sql.Tx) error { + schema := ` + CREATE EXTENSION IF NOT EXISTS timescaledb; + CREATE EXTENSION IF NOT EXISTS pg_stat_statements; + + CREATE TABLE IF NOT EXISTS traces ( + timestamp timestamp NOT NULL, + rqid text NOT NULL, + component text NOT NULL, + type text NOT NULL, + body text NOT NULL, + error text NOT NULL, + statuscode int NOT NULL, + remoteaddr text NOT NULL, + forwardedfor text NOT NULL, + path text NOT NULL, + phase text NOT NULL, + detail text NOT NULL, + tenant text NOT NULL, + userid text NOT NULL + ); + + SELECT create_hypertable('traces', 'timestamp', chunk_time_interval => INTERVAL '1 days', if_not_exists => TRUE); + ALTER TABLE traces SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'rqid', + timescaledb.compress_orderby = 'timestamp', + timescaledb.compress_chunk_time_interval = '7 days' + ); + ` + // TODO: evaluate what is needed + // CREATE INDEX IF NOT EXISTS traces_idx ON traces(); + + if _, err := tx.Exec(schema); err != nil { + return err + } + return nil + }, + } + + a.db.SetMaxIdleConns(5) + a.db.SetConnMaxLifetime(2 * time.Minute) + a.db.SetMaxOpenConns(95) + + m, err := migrator.New( + migrator.WithLogger(migrator.LoggerFunc(func(msg string, args ...interface{}) { + a.log.Info(fmt.Sprintf(msg, args...)) + })), + migrator.Migrations( + initialSchema, + ), + ) + if err != nil { + return err + } + + if err := m.Migrate(a.db.DB); err != nil { + return err + } + + q, _, err := sq. + Select("column_name"). + From("information_schema.columns"). + Where("table_name='traces'"). + ToSql() + if err != nil { + return err + } + + rows, err := a.db.Query(q) + if err != nil { + return err + } + defer rows.Close() + if rows.Err() != nil { + return rows.Err() + } + + for rows.Next() { + var col string + + err = rows.Scan(&col) + if err != nil { + return err + } + + a.cols = append(a.cols, col) + a.vals = append(a.vals, sq.Expr(":"+col)) + } + + return nil +} + +func (a *timescaleAuditing) Flush() error { + return nil +} + +func (a *timescaleAuditing) Index(entry Entry) error { + q, _, err := sq. + Insert("traces"). + Columns(a.cols...). + Values(a.vals...). + ToSql() + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + entry.Id = entry.RequestId + + _, err = a.db.NamedExecContext(ctx, q, entry) + if err != nil { + return fmt.Errorf("unable to index audit trace: %w", err) + } + + return nil +} + +type compOp string + +const ( + equals compOp = "equals" + like compOp = "like" +) + +func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) { + var ( + where []string + values = map[string]interface{}{} + addFilter = func(field string, value any, op compOp) error { + if reflect.ValueOf(value).IsZero() { + return nil + } + + if !slices.Contains(a.cols, field) { + return fmt.Errorf("unable to filter for %q, no such table column", field) + } + + values[field] = value + + switch op { + case equals: + where = append(where, fmt.Sprintf("%s=:%s", field, field)) + case like: + where = append(where, fmt.Sprintf("%s like '%%' || %s || '%%'", field, field)) + default: + return fmt.Errorf("comp op not known") + } + + return nil + } + ) + + if err := addFilter("body", filter.Body, like); err != nil { + return nil, err + } + if err := addFilter("component", filter.Component, equals); err != nil { + return nil, err + } + if err := addFilter("detail", filter.Detail, equals); err != nil { + return nil, err + } + if err := addFilter("error", filter.Error, equals); err != nil { + return nil, err + } + if err := addFilter("forwardedfor", filter.ForwardedFor, equals); err != nil { + return nil, err + } + if err := addFilter("path", filter.Path, equals); err != nil { + return nil, err + } + if err := addFilter("phase", filter.Phase, equals); err != nil { + return nil, err + } + if err := addFilter("remoteaddr", filter.RemoteAddr, equals); err != nil { + return nil, err + } + if err := addFilter("rqid", filter.RequestId, equals); err != nil { + return nil, err + } + if err := addFilter("statuscode", filter.StatusCode, equals); err != nil { + return nil, err + } + if err := addFilter("tenant", filter.Tenant, equals); err != nil { + return nil, err + } + if err := addFilter("type", filter.Type, equals); err != nil { + return nil, err + } + if err := addFilter("userid", filter.User, equals); err != nil { + return nil, err + } + + query := sq. + Select(a.cols...). + From("traces"). + Columns(a.cols...). + Where(strings.Join(where, " AND ")). + OrderBy("timestamp ASC") + + // to make queries more efficient for timescaledb, we always provide from + if filter.From.IsZero() { + filter.From = time.Now().Add(-24 * time.Hour) + } + + values["from"] = filter.From + where = append(where, "timestamp >= :from") + + if !filter.To.IsZero() { + values["to"] = filter.To + where = append(where, "timestamp <= :to") + } + if filter.Limit != 0 { + query.Limit(uint64(filter.Limit)) + } + + q, _, err := query.ToSql() + if err != nil { + return nil, err + } + + rows, err := a.db.NamedQueryContext(context.TODO(), q, values) // TODO: search needs a ctx! + if err != nil { + return nil, err + } + defer rows.Close() + + var entries []Entry + + for rows.Next() { + var e Entry + + err = rows.StructScan(&e) + if err != nil { + return nil, err + } + + e.Id = e.RequestId + + entries = append(entries, e) + } + + return entries, nil +} diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go new file mode 100644 index 0000000..12e6635 --- /dev/null +++ b/auditing/timescaledb_integration_test.go @@ -0,0 +1,275 @@ +//go:build integration +// +build integration + +package auditing + +import ( + "context" + "fmt" + "log/slog" + "sort" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestAuditing_TimescaleDB(t *testing.T) { + container, auditing := StartTimescaleDB(t, Config{ + Log: slog.Default(), + }) + defer func() { + err := container.Terminate(context.Background()) + require.NoError(t, err) + }() + + now := time.Now().UTC() + // postgres does not store the nano seconds, so we neglect them for comparison: + timeComparer := cmp.Comparer(func(x, y time.Time) bool { + return x.Unix() == y.Unix() + }) + + testEntries := func() []Entry { + return []Entry{ + { + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000000", + Type: EntryTypeHTTP, + Timestamp: now, + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: EntryPhaseResponse, + Path: "/v1/test/0", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.0", + Body: "This is the body of 00000000-0000-0000-0000-000000000000", + StatusCode: 200, + Error: "", + }, + { + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000001", + Type: EntryTypeHTTP, + Timestamp: now.Add(1 * time.Second), + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: EntryPhaseResponse, + Path: "/v1/test/1", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.1", + Body: "This is the body of 00000000-0000-0000-0000-000000000001", + StatusCode: 201, + Error: "", + }, + { + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000002", + Type: EntryTypeHTTP, + Timestamp: now.Add(2 * time.Second), + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: EntryPhaseRequest, + Path: "/v1/test/2", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.2", + Body: "This is the body of 00000000-0000-0000-0000-000000000002", + StatusCode: 0, + Error: "", + }, + } + } + + tests := []struct { + name string + t func(t *testing.T, a Auditing) + }{ + { + name: "no entries, no search results", + t: func(t *testing.T, a Auditing) { + entries, err := a.Search(EntryFilter{}) + require.NoError(t, err) + assert.Empty(t, entries) + }, + }, + { + name: "insert one entry", + t: func(t *testing.T, a Auditing) { + err := a.Index(Entry{ + Body: "test", + }) + require.NoError(t, err) + err = a.Flush() + require.NoError(t, err) + + entries, err := a.Search(EntryFilter{ + Body: "test", + }) + require.NoError(t, err) + assert.Len(t, entries, 1) + }, + }, + { + name: "insert a couple of entries", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(EntryFilter{}) + require.NoError(t, err) + assert.Len(t, entries, len(es)) + + sort.Slice(entries, func(i, j int) bool { return entries[i].RequestId < entries[j].RequestId }) + + if diff := cmp.Diff(entries, es, cmpopts.IgnoreFields(Entry{}, "Id"), timeComparer); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + + entries, err = a.Search(EntryFilter{ + Body: "This", + }) + require.NoError(t, err) + assert.Len(t, entries, len(es)) + }, + }, + { + name: "filter search on rqid", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(EntryFilter{ + RequestId: es[0].RequestId, + }) + require.NoError(t, err) + require.Len(t, entries, 1) + + if diff := cmp.Diff(entries[0], es[0], cmpopts.IgnoreFields(Entry{}, "Id"), timeComparer); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + { + name: "filter search on phase", + t: func(t *testing.T, a Auditing) { + es := testEntries() + var wantEntries []Entry + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + + if e.Phase == EntryPhaseResponse { + wantEntries = append(wantEntries, e) + } + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(EntryFilter{ + Phase: EntryPhaseResponse, + }) + require.NoError(t, err) + require.Len(t, entries, 2) + + sort.Slice(entries, func(i, j int) bool { return entries[i].RequestId < entries[j].RequestId }) + + if diff := cmp.Diff(entries, wantEntries, cmpopts.IgnoreFields(Entry{}, "Id"), timeComparer); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + // { + // name: "filter on body", + // t: func(t *testing.T, a Auditing) { + // es := testEntries() + // for _, e := range es { + // err := a.Index(e) + // require.NoError(t, err) + // } + + // err := a.Flush() + // require.NoError(t, err) + + // entries, err := a.Search(EntryFilter{ + // Body: fmt.Sprintf("%q", es[0].Body.(string)), + // }) + // require.NoError(t, err) + // require.Len(t, entries, 1) + + // if diff := cmp.Diff(entries[0], es[0]); diff != "" { + // t.Errorf("diff (+got -want):\n %s", diff) + // } + // }, + // }, + } + for i, tt := range tests { + tt := tt + + t.Run(fmt.Sprintf("%d %s", i, tt.name), func(t *testing.T) { + defer func() { + a := auditing.(*timescaleAuditing) + a.db.MustExec("DELETE FROM traces;") + }() + + tt.t(t, auditing) + }) + } +} + +func StartTimescaleDB(t testing.TB, config Config) (testcontainers.Container, Auditing) { + req := testcontainers.ContainerRequest{ + Image: "timescale/timescaledb:2.16.1-pg16", + ExposedPorts: []string{"5432/tcp"}, + Env: map[string]string{"POSTGRES_PASSWORD": "password"}, + WaitingFor: wait.ForAll( + wait.ForLog("database system is ready to accept connections"), + wait.ForListeningPort("5432/tcp"), + ), + Cmd: []string{"postgres", "-c", "max_connections=200"}, + } + + ctx := context.Background() + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + + ip, err := container.Host(ctx) + require.NoError(t, err) + + port, err := container.MappedPort(ctx, "5432") + require.NoError(t, err) + + auditing, err := NewTimescaleDB(config, TimescaleDbConfig{ + Host: ip, + Port: port.Port(), + DB: "postgres", + User: "postgres", + Password: "password", + }) + require.NoError(t, err) + + return container, auditing +} diff --git a/go.mod b/go.mod index 828a309..bfb3d78 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.22 require ( connectrpc.com/connect v1.16.2 github.com/Masterminds/semver/v3 v3.2.1 + github.com/Masterminds/squirrel v1.5.4 github.com/avast/retry-go/v4 v4.6.0 github.com/coreos/go-oidc/v3 v3.11.0 github.com/emicklei/go-restful-openapi/v2 v2.10.2 @@ -18,6 +19,9 @@ require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/icza/dyno v0.0.0-20230330125955-09f820a8d9c0 + github.com/jmoiron/sqlx v1.4.0 + github.com/lib/pq v1.10.9 + github.com/lopezator/migrator v0.3.1 github.com/mattn/go-isatty v0.0.20 github.com/meilisearch/meilisearch-go v0.27.2 github.com/metal-stack/security v0.8.1 @@ -41,8 +45,6 @@ require ( tailscale.com v1.54.0 ) -require github.com/containerd/platforms v0.2.1 // indirect - require ( dario.cat/mergo v1.0.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect @@ -71,6 +73,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/containerd/containerd v1.7.20 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect github.com/coreos/go-iptables v0.7.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect @@ -116,6 +119,8 @@ require ( github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/kortschak/wol v0.0.0-20200729010619-da482cc4850a // indirect + github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect + github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc v1.0.5 // indirect diff --git a/go.sum b/go.sum index fe80197..dbb38a2 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8 github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= +github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= +github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/hcsshim v0.11.7 h1:vl/nj3Bar/CvJSYo7gIQPyRWc9f3c6IeSNavBTSZNZQ= @@ -154,6 +156,8 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58= github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= @@ -206,10 +210,28 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/insomniacslk/dhcp v0.0.0-20240204152450-ca2dc33955c1 h1:L3pm9Kf2G6gJVYawz2SrI5QnV1wzHYbqmKnSHHXJAb8= github.com/insomniacslk/dhcp v0.0.0-20240204152450-ca2dc33955c1/go.mod h1:izxuNQZeFrbx2nK2fAyN5iNUB34Fe9j0nK4PwLzAkKw= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.11.0 h1:HiHArx4yFbwl91X3qqIHtUFoiIfLNJXCQRsnzkiwwaQ= +github.com/jackc/pgconn v1.11.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.2.0 h1:r7JypeP2D3onoQTCxWdTpCtJ4D+qpKr0TxvoyMhZ5ns= +github.com/jackc/pgproto3/v2 v2.2.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgtype v1.10.0 h1:ILnBWrRMSXGczYvmkYD6PsYyVFUNLTnIUJHHDLmqk38= +github.com/jackc/pgtype v1.10.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.15.0 h1:B7dTkXsdILD3MF987WGGCcg+tvLW6bZJdEcqVFeU//w= +github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/native v1.0.1-0.20221213033349-c1e37c09b531/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= @@ -237,6 +259,10 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/lestrrat-go/blackmagic v1.0.2 h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k= github.com/lestrrat-go/blackmagic v1.0.2/go.mod h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= @@ -249,6 +275,10 @@ github.com/lestrrat-go/jwx/v2 v2.1.0 h1:0zs7Ya6+39qoit7gwAf+cYm1zzgS3fceIdo7RmQ5 github.com/lestrrat-go/jwx/v2 v2.1.0/go.mod h1:Xpw9QIaUGiIUD1Wx0NcY1sIHwFf8lDuZn/cmxtXYRys= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lopezator/migrator v0.3.1 h1:ZFPT6aC7+nGWkqhleynABZ6ftycSf6hmHHLOaryq1Og= +github.com/lopezator/migrator v0.3.1/go.mod h1:X+lHDMZ9Ci3/KdbypJcQYFFwipVrJsX4fRCQ4QLauYk= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= @@ -267,6 +297,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mdlayher/genetlink v1.3.2 h1:KdrNKe+CTu+IbZnm/GVUMXSqBBLqcGpRDa0xkQy56gw= github.com/mdlayher/genetlink v1.3.2/go.mod h1:tcC3pkCrPUGIKKsCsp0B3AdaaKuHtaxoJRz3cc+528o= github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= @@ -372,6 +404,7 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= From 5c3a30c3c7f370939fb16bc050f818b647ee5b1f Mon Sep 17 00:00:00 2001 From: Gerrit Date: Fri, 23 Aug 2024 13:21:43 +0200 Subject: [PATCH 02/13] Small improvements. --- auditing/auditing-interceptor.go | 10 +- auditing/auditing.go | 36 +++---- auditing/meilisearch.go | 8 +- auditing/meilisearch_integration_test.go | 21 ++-- auditing/timescaledb.go | 129 +++++++++++++++++------ auditing/timescaledb_integration_test.go | 21 ++-- 6 files changed, 147 insertions(+), 78 deletions(-) diff --git a/auditing/auditing-interceptor.go b/auditing/auditing-interceptor.go index 94749da..1e2d979 100644 --- a/auditing/auditing-interceptor.go +++ b/auditing/auditing-interceptor.go @@ -71,7 +71,7 @@ func UnaryServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(fu auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err.Error() + auditReqContext.Error = err err2 := a.Index(auditReqContext) if err2 != nil { logger.Error("unable to index", "error", err2) @@ -129,7 +129,7 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err.Error() + auditReqContext.Error = err err2 := a.Index(auditReqContext) if err2 != nil { logger.Error("unable to index", "error", err2) @@ -244,7 +244,7 @@ func (a auditingConnectInterceptor) WrapStreamingHandler(next connect.StreamingH auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err.Error() + auditReqContext.Error = err err2 := a.auditing.Index(auditReqContext) if err2 != nil { a.logger.Error("unable to index", "error", err2) @@ -311,7 +311,7 @@ func (i auditingConnectInterceptor) WrapUnary(next connect.UnaryFunc) connect.Un auditReqContext.StatusCode = statusCodeFromGrpc(err) if err != nil { - auditReqContext.Error = err.Error() + auditReqContext.Error = err err2 := i.auditing.Index(auditReqContext) if err2 != nil { i.logger.Error("unable to index", "error", err2) @@ -432,7 +432,7 @@ func HttpFilter(a Auditing, logger *slog.Logger) (restful.FilterFunction, error) err = json.Unmarshal(body, &auditReqContext.Body) if err != nil { auditReqContext.Body = strBody - auditReqContext.Error = err.Error() + auditReqContext.Error = err } err = a.Index(auditReqContext) diff --git a/auditing/auditing.go b/auditing/auditing.go index 424470a..b7238ec 100644 --- a/auditing/auditing.go +++ b/auditing/auditing.go @@ -1,6 +1,7 @@ package auditing import ( + "context" "log/slog" "os" "path/filepath" @@ -49,39 +50,38 @@ const ( const EntryFilterDefaultLimit int64 = 100 type Entry struct { - Id string `db:"-"` // filled by the auditing driver - - Component string `db:"component"` - RequestId string `db:"rqid" json:"rqid"` - Type EntryType `db:"type"` - Timestamp time.Time `db:"timestamp"` + Id string // filled by the auditing driver + Component string + RequestId string `json:"rqid"` + Type EntryType + Timestamp time.Time - User string `db:"userid"` - Tenant string `db:"tenant"` + User string + Tenant string // For `EntryDetailHTTP` the HTTP method get, post, put, delete, ... // For `EntryDetailGRPC` unary, stream - Detail EntryDetail `db:"detail"` + Detail EntryDetail // e.g. Request, Response, Error, Opened, Close - Phase EntryPhase `db:"phase"` + Phase EntryPhase // For `EntryDetailHTTP` /api/v1/... // For `EntryDetailGRPC` /api.v1/... (the method name) - Path string `db:"path"` - ForwardedFor string `db:"forwardedfor"` - RemoteAddr string `db:"remoteaddr"` + Path string + ForwardedFor string + RemoteAddr string - Body any `db:"body"` // JSON, string or numbers - StatusCode int `db:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code + Body any // JSON, string or numbers + StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code // Internal errors - Error string `db:"error"` + Error error } func (e *Entry) prepareForNextPhase() { e.Id = "" e.Timestamp = time.Now() e.Body = nil - e.Error = "" + e.Error = nil switch e.Phase { case EntryPhaseRequest: @@ -133,7 +133,7 @@ type Auditing interface { // Searches for entries matching the given filter. // By default only recent entries will be returned. // The returned entries will be sorted by timestamp in descending order. - Search(EntryFilter) ([]Entry, error) + Search(context.Context, EntryFilter) ([]Entry, error) } func defaultComponent() (string, error) { diff --git a/auditing/meilisearch.go b/auditing/meilisearch.go index df29b78..aa2fb3b 100644 --- a/auditing/meilisearch.go +++ b/auditing/meilisearch.go @@ -129,7 +129,7 @@ func (a *meiliAuditing) Index(entry Entry) error { return nil } -func (a *meiliAuditing) Search(filter EntryFilter) ([]Entry, error) { +func (a *meiliAuditing) Search(_ context.Context, filter EntryFilter) ([]Entry, error) { predicates := make([]string, 0) if filter.Component != "" { predicates = append(predicates, fmt.Sprintf("component = %q", filter.Component)) @@ -274,8 +274,8 @@ func (a *meiliAuditing) encodeEntry(entry Entry) map[string]any { if entry.StatusCode != 0 { doc["status-code"] = entry.StatusCode } - if entry.Error != "" { - doc["error"] = entry.Error + if entry.Error != nil { + doc["error"] = entry.Error.Error() } if entry.Body != nil { doc["body"] = entry.Body @@ -347,7 +347,7 @@ func (a *meiliAuditing) decodeEntry(doc map[string]any) Entry { entry.StatusCode = int(statusCode) } if err, ok := doc["error"].(string); ok { - entry.Error = err + entry.Error = errors.New(err) } if body, ok := doc["body"]; ok { entry.Body = body diff --git a/auditing/meilisearch_integration_test.go b/auditing/meilisearch_integration_test.go index 0d2f9d7..7220880 100644 --- a/auditing/meilisearch_integration_test.go +++ b/auditing/meilisearch_integration_test.go @@ -70,6 +70,7 @@ func StartMeilisearch(t testing.TB) (container testcontainers.Container, c *conn } func TestAuditing_Meilisearch(t *testing.T) { + ctx := context.Background() container, c, err := StartMeilisearch(t) require.NoError(t, err) defer func() { @@ -99,7 +100,7 @@ func TestAuditing_Meilisearch(t *testing.T) { RemoteAddr: "10.0.0.0", Body: "This is the body of 00000000-0000-0000-0000-000000000000", StatusCode: 200, - Error: "", + Error: nil, }, { Component: "auditing.test", @@ -115,7 +116,7 @@ func TestAuditing_Meilisearch(t *testing.T) { RemoteAddr: "10.0.0.1", Body: "This is the body of 00000000-0000-0000-0000-000000000001", StatusCode: 201, - Error: "", + Error: nil, }, { Component: "auditing.test", @@ -131,7 +132,7 @@ func TestAuditing_Meilisearch(t *testing.T) { RemoteAddr: "10.0.0.2", Body: "This is the body of 00000000-0000-0000-0000-000000000002", StatusCode: 0, - Error: "", + Error: nil, }, } } @@ -143,7 +144,7 @@ func TestAuditing_Meilisearch(t *testing.T) { { name: "no entries, no search results", t: func(t *testing.T, a Auditing) { - entries, err := a.Search(EntryFilter{}) + entries, err := a.Search(ctx, EntryFilter{}) require.NoError(t, err) assert.Empty(t, entries) }, @@ -158,7 +159,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ Body: "test", }) require.NoError(t, err) @@ -177,7 +178,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{}) + entries, err := a.Search(ctx, EntryFilter{}) require.NoError(t, err) assert.Len(t, entries, len(es)) @@ -187,7 +188,7 @@ func TestAuditing_Meilisearch(t *testing.T) { t.Errorf("diff (+got -want):\n %s", diff) } - entries, err = a.Search(EntryFilter{ + entries, err = a.Search(ctx, EntryFilter{ Body: "This", }) require.NoError(t, err) @@ -206,7 +207,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ RequestId: es[0].RequestId, }) require.NoError(t, err) @@ -234,7 +235,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ Phase: EntryPhaseResponse, }) require.NoError(t, err) @@ -259,7 +260,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ // we want to run a phrase search as otherwise we return the other entries as well // https://www.meilisearch.com/docs/reference/api/search#phrase-search-2 Body: fmt.Sprintf("%q", es[0].Body.(string)), diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 6b19833..821273a 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -3,6 +3,8 @@ package auditing import ( "context" "database/sql" + "encoding/json" + "errors" "fmt" "log/slog" "reflect" @@ -17,22 +19,50 @@ import ( _ "github.com/lib/pq" ) -type TimescaleDbConfig struct { - Host string - Port string - DB string - User string - Password string -} - -type timescaleAuditing struct { - component string - db *sqlx.DB - log *slog.Logger +type ( + TimescaleDbConfig struct { + Host string + Port string + DB string + User string + Password string + } + + timescaleAuditing struct { + component string + db *sqlx.DB + log *slog.Logger + + cols []string + vals []any + } + + // to keep the public interface free from field tags like "db" and "json" (as these might differ for different dbs) + // we introduce an internal type. unfortunately, this requires a conversion, which takes effort to maintain + timescaleEntry struct { + Component string `db:"component"` + RequestId string `db:"rqid" json:"rqid"` + Type EntryType `db:"type"` + Timestamp time.Time `db:"timestamp"` + User string `db:"userid"` + Tenant string `db:"tenant"` + Detail EntryDetail `db:"detail"` + Phase EntryPhase `db:"phase"` + Path string `db:"path"` + ForwardedFor string `db:"forwardedfor"` + RemoteAddr string `db:"remoteaddr"` + Body any `db:"body"` + StatusCode int `db:"statuscode"` + Error string `db:"error" json:"-"` + } + + sqlCompOp string +) - cols []string - vals []any -} +const ( + equals sqlCompOp = "equals" + like sqlCompOp = "like" +) func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { if c.Component == "" { @@ -187,12 +217,15 @@ func (a *timescaleAuditing) Index(entry Entry) error { return err } + internalEntry, err := a.toInternal(entry) + if err != nil { + return fmt.Errorf("unable to convert audit trace to database entry: %w", err) + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - entry.Id = entry.RequestId - - _, err = a.db.NamedExecContext(ctx, q, entry) + _, err = a.db.NamedExecContext(ctx, q, internalEntry) if err != nil { return fmt.Errorf("unable to index audit trace: %w", err) } @@ -200,18 +233,11 @@ func (a *timescaleAuditing) Index(entry Entry) error { return nil } -type compOp string - -const ( - equals compOp = "equals" - like compOp = "like" -) - -func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) { +func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) { var ( where []string values = map[string]interface{}{} - addFilter = func(field string, value any, op compOp) error { + addFilter = func(field string, value any, op sqlCompOp) error { if reflect.ValueOf(value).IsZero() { return nil } @@ -303,7 +329,7 @@ func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) { return nil, err } - rows, err := a.db.NamedQueryContext(context.TODO(), q, values) // TODO: search needs a ctx! + rows, err := a.db.NamedQueryContext(ctx, q, values) if err != nil { return nil, err } @@ -312,17 +338,58 @@ func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) { var entries []Entry for rows.Next() { - var e Entry + var e timescaleEntry err = rows.StructScan(&e) if err != nil { return nil, err } - e.Id = e.RequestId + entry, err := a.toExternal(e) + if err != nil { + return nil, fmt.Errorf("unable to convert entry: %w", err) + } - entries = append(entries, e) + entries = append(entries, entry) } return entries, nil } + +func (_ *timescaleAuditing) toInternal(e Entry) (*timescaleEntry, error) { + intermediate, err := json.Marshal(e) // nolint + if err != nil { + return nil, err + } + var internalEntry timescaleEntry + err = json.Unmarshal(intermediate, &internalEntry) // nolint + if err != nil { + return nil, err + } + + internalEntry.RequestId = e.RequestId + if e.Error != nil { + internalEntry.Error = e.Error.Error() + } + + return &internalEntry, nil +} + +func (_ *timescaleAuditing) toExternal(e timescaleEntry) (Entry, error) { + intermediate, err := json.Marshal(e) // nolint + if err != nil { + return Entry{}, err + } + var externalEntry Entry + err = json.Unmarshal(intermediate, &externalEntry) // nolint + if err != nil { + return Entry{}, err + } + + externalEntry.Id = e.RequestId + if e.Error != "" { + externalEntry.Error = errors.New(e.Error) + } + + return externalEntry, nil +} diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index 12e6635..c6a188d 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -20,6 +20,7 @@ import ( ) func TestAuditing_TimescaleDB(t *testing.T) { + ctx := context.Background() container, auditing := StartTimescaleDB(t, Config{ Log: slog.Default(), }) @@ -50,7 +51,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { RemoteAddr: "10.0.0.0", Body: "This is the body of 00000000-0000-0000-0000-000000000000", StatusCode: 200, - Error: "", + Error: nil, }, { Component: "auditing.test", @@ -66,7 +67,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { RemoteAddr: "10.0.0.1", Body: "This is the body of 00000000-0000-0000-0000-000000000001", StatusCode: 201, - Error: "", + Error: nil, }, { Component: "auditing.test", @@ -82,7 +83,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { RemoteAddr: "10.0.0.2", Body: "This is the body of 00000000-0000-0000-0000-000000000002", StatusCode: 0, - Error: "", + Error: nil, }, } } @@ -94,7 +95,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { { name: "no entries, no search results", t: func(t *testing.T, a Auditing) { - entries, err := a.Search(EntryFilter{}) + entries, err := a.Search(ctx, EntryFilter{}) require.NoError(t, err) assert.Empty(t, entries) }, @@ -109,7 +110,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ Body: "test", }) require.NoError(t, err) @@ -128,7 +129,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { err := a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{}) + entries, err := a.Search(ctx, EntryFilter{}) require.NoError(t, err) assert.Len(t, entries, len(es)) @@ -138,7 +139,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { t.Errorf("diff (+got -want):\n %s", diff) } - entries, err = a.Search(EntryFilter{ + entries, err = a.Search(ctx, EntryFilter{ Body: "This", }) require.NoError(t, err) @@ -157,7 +158,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { err := a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ RequestId: es[0].RequestId, }) require.NoError(t, err) @@ -185,7 +186,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { err := a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ Phase: EntryPhaseResponse, }) require.NoError(t, err) @@ -210,7 +211,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { // err := a.Flush() // require.NoError(t, err) - // entries, err := a.Search(EntryFilter{ + // entries, err := a.Search(ctx, EntryFilter{ // Body: fmt.Sprintf("%q", es[0].Body.(string)), // }) // require.NoError(t, err) From b0c1e61329518aaa93f6ef0fd9affb2e729c41ec Mon Sep 17 00:00:00 2001 From: Gerrit Date: Fri, 23 Aug 2024 13:52:08 +0200 Subject: [PATCH 03/13] Linter. --- pkg/ssh/ssh.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go index 235f790..04017bf 100644 --- a/pkg/ssh/ssh.go +++ b/pkg/ssh/ssh.go @@ -125,7 +125,7 @@ func (c *Client) Connect(env *Env) error { ssh.TTY_OP_OSPEED: 115200, // output speed = 14.4kbaud } - fileDescriptor := int(os.Stdin.Fd()) + fileDescriptor := int(os.Stdin.Fd()) //nolint if term.IsTerminal(fileDescriptor) { originalState, err := term.MakeRaw(fileDescriptor) From 402c22c489da1594a3f9d8218ccede3a88408467 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Fri, 23 Aug 2024 14:44:03 +0200 Subject: [PATCH 04/13] Add retention policy. --- auditing/timescaledb.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 821273a..88e6884 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -26,12 +26,17 @@ type ( DB string User string Password string + + // Retention defines when audit traces will be thrown away, only settable on initial database usage + // If this needs to be changed over time, you need to do this manually. Defaults to '14 days'. + Retention string } timescaleAuditing struct { component string db *sqlx.DB log *slog.Logger + retention string cols []string vals []any @@ -83,6 +88,9 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { if tc.User == "" { tc.User = "postgres" } + if tc.Retention == "" { + tc.Retention = "14 days" + } source := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=disable", tc.Host, tc.Port, tc.User, tc.DB, tc.Password) @@ -95,6 +103,7 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { component: c.Component, log: c.Log.WithGroup("auditing"), db: db, + retention: tc.Retention, } err = a.initialize() @@ -133,6 +142,7 @@ func (a *timescaleAuditing) initialize() error { ); SELECT create_hypertable('traces', 'timestamp', chunk_time_interval => INTERVAL '1 days', if_not_exists => TRUE); + ALTER TABLE traces SET ( timescaledb.compress, timescaledb.compress_segmentby = 'rqid', @@ -140,12 +150,20 @@ func (a *timescaleAuditing) initialize() error { timescaledb.compress_chunk_time_interval = '7 days' ); ` + // TODO: evaluate what is needed // CREATE INDEX IF NOT EXISTS traces_idx ON traces(); if _, err := tx.Exec(schema); err != nil { return err } + + retention := `SELECT add_retention_policy('traces', $1::interval);` + + if _, err := tx.Exec(retention, a.retention); err != nil { + return err + } + return nil }, } From 3e1110e36514db051b8679b2dc0995ebf1432d55 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Fri, 23 Aug 2024 15:07:20 +0200 Subject: [PATCH 05/13] Make other more settings configurable. --- auditing/timescaledb.go | 116 +++++++++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 44 deletions(-) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 88e6884..93660b4 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -30,13 +30,20 @@ type ( // Retention defines when audit traces will be thrown away, only settable on initial database usage // If this needs to be changed over time, you need to do this manually. Defaults to '14 days'. Retention string + // CompressionInterval defines after which period audit traces will be compressed, only settable on initial database usage. + // If this needs to be changed over time, you need to do this manually. Defaults to '7 days'. + CompressionInterval string + // ChunkInterval defines after which period audit traces will be stored in a new chunk table, only settable on initial database usage. + // If this needs to be changed over time, you need to do this manually. Defaults to '1 days'. + ChunkInterval string } timescaleAuditing struct { component string db *sqlx.DB log *slog.Logger - retention string + + config *TimescaleDbConfig cols []string vals []any @@ -91,6 +98,12 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { if tc.Retention == "" { tc.Retention = "14 days" } + if tc.ChunkInterval == "" { + tc.ChunkInterval = "1 days" + } + if tc.CompressionInterval == "" { + tc.CompressionInterval = "7 days" + } source := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=disable", tc.Host, tc.Port, tc.User, tc.DB, tc.Password) @@ -103,7 +116,7 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { component: c.Component, log: c.Log.WithGroup("auditing"), db: db, - retention: tc.Retention, + config: &tc, } err = a.initialize() @@ -117,51 +130,66 @@ func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { } func (a *timescaleAuditing) initialize() error { + type txStatement struct { + query string + args []any + } + initialSchema := &migrator.Migration{ Name: "Initial database schema", Func: func(tx *sql.Tx) error { - schema := ` - CREATE EXTENSION IF NOT EXISTS timescaledb; - CREATE EXTENSION IF NOT EXISTS pg_stat_statements; - - CREATE TABLE IF NOT EXISTS traces ( - timestamp timestamp NOT NULL, - rqid text NOT NULL, - component text NOT NULL, - type text NOT NULL, - body text NOT NULL, - error text NOT NULL, - statuscode int NOT NULL, - remoteaddr text NOT NULL, - forwardedfor text NOT NULL, - path text NOT NULL, - phase text NOT NULL, - detail text NOT NULL, - tenant text NOT NULL, - userid text NOT NULL - ); - - SELECT create_hypertable('traces', 'timestamp', chunk_time_interval => INTERVAL '1 days', if_not_exists => TRUE); - - ALTER TABLE traces SET ( - timescaledb.compress, - timescaledb.compress_segmentby = 'rqid', - timescaledb.compress_orderby = 'timestamp', - timescaledb.compress_chunk_time_interval = '7 days' - ); - ` - - // TODO: evaluate what is needed - // CREATE INDEX IF NOT EXISTS traces_idx ON traces(); - - if _, err := tx.Exec(schema); err != nil { - return err - } - - retention := `SELECT add_retention_policy('traces', $1::interval);` - - if _, err := tx.Exec(retention, a.retention); err != nil { - return err + for _, stmt := range []txStatement{ + { + query: `CREATE EXTENSION IF NOT EXISTS timescaledb`, + }, + { + query: `CREATE EXTENSION IF NOT EXISTS pg_stat_statements`, + }, + { + query: `CREATE TABLE IF NOT EXISTS traces ( + timestamp timestamp NOT NULL, + rqid text NOT NULL, + component text NOT NULL, + type text NOT NULL, + body text NOT NULL, + error text NOT NULL, + statuscode int NOT NULL, + remoteaddr text NOT NULL, + forwardedfor text NOT NULL, + path text NOT NULL, + phase text NOT NULL, + detail text NOT NULL, + tenant text NOT NULL, + userid text NOT NULL + )`, + }, + { + query: `SELECT create_hypertable('traces', 'timestamp', chunk_time_interval => $1::interval, if_not_exists => TRUE)`, + args: []any{a.config.ChunkInterval}, + }, + { + query: `ALTER TABLE traces SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'tenant', + timescaledb.compress_orderby = 'timestamp' + )`, + }, + { + query: `SELECT add_compression_policy('traces', $1::interval)`, + args: []any{a.config.CompressionInterval}, + }, + // TODO: evaluate what is needed + // { + // query: `CREATE INDEX IF NOT EXISTS traces_idx ON traces()`, + // }, + { + query: `SELECT add_retention_policy('traces', $1::interval)`, + args: []any{a.config.Retention}, + }, + } { + if _, err := tx.Exec(stmt.query, stmt.args...); err != nil { + return err + } } return nil From 14976a0544095a6c81d6033c4bc9ed9550794a09 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 13:23:12 +0200 Subject: [PATCH 06/13] Use JSONB. --- auditing/auditing.go | 30 ++--- auditing/timescaledb.go | 161 +++++------------------ auditing/timescaledb_integration_test.go | 4 +- 3 files changed, 49 insertions(+), 146 deletions(-) diff --git a/auditing/auditing.go b/auditing/auditing.go index b7238ec..3898bf2 100644 --- a/auditing/auditing.go +++ b/auditing/auditing.go @@ -50,31 +50,31 @@ const ( const EntryFilterDefaultLimit int64 = 100 type Entry struct { - Id string // filled by the auditing driver - Component string - RequestId string `json:"rqid"` - Type EntryType - Timestamp time.Time + Id string `json:"-"` // filled by the auditing driver + Component string `json:"component"` + RequestId string `json:"rqid"` + Type EntryType `json:"type"` + Timestamp time.Time `json:"timestamp"` - User string - Tenant string + User string `json:"user"` + Tenant string `json:"tenant"` // For `EntryDetailHTTP` the HTTP method get, post, put, delete, ... // For `EntryDetailGRPC` unary, stream - Detail EntryDetail + Detail EntryDetail `json:"detail"` // e.g. Request, Response, Error, Opened, Close - Phase EntryPhase + Phase EntryPhase `json:"phase"` // For `EntryDetailHTTP` /api/v1/... // For `EntryDetailGRPC` /api.v1/... (the method name) - Path string - ForwardedFor string - RemoteAddr string + Path string `json:"path"` + ForwardedFor string `json:"forwardedfor"` + RemoteAddr string `json:"remoteaddr"` - Body any // JSON, string or numbers - StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code + Body any `json:"body"` // JSON, string or numbers + StatusCode int `json:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code // Internal errors - Error error + Error error `json:"error"` } func (e *Entry) prepareForNextPhase() { diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 93660b4..58fba8a 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -4,11 +4,9 @@ import ( "context" "database/sql" "encoding/json" - "errors" "fmt" "log/slog" "reflect" - "slices" "strings" "time" @@ -44,28 +42,11 @@ type ( log *slog.Logger config *TimescaleDbConfig + } - cols []string - vals []any - } - - // to keep the public interface free from field tags like "db" and "json" (as these might differ for different dbs) - // we introduce an internal type. unfortunately, this requires a conversion, which takes effort to maintain - timescaleEntry struct { - Component string `db:"component"` - RequestId string `db:"rqid" json:"rqid"` - Type EntryType `db:"type"` - Timestamp time.Time `db:"timestamp"` - User string `db:"userid"` - Tenant string `db:"tenant"` - Detail EntryDetail `db:"detail"` - Phase EntryPhase `db:"phase"` - Path string `db:"path"` - ForwardedFor string `db:"forwardedfor"` - RemoteAddr string `db:"remoteaddr"` - Body any `db:"body"` - StatusCode int `db:"statuscode"` - Error string `db:"error" json:"-"` + timescaledbRow struct { + Timestamp time.Time `db:"timestamp"` + Entry []byte `db:"entry"` } sqlCompOp string @@ -148,19 +129,7 @@ func (a *timescaleAuditing) initialize() error { { query: `CREATE TABLE IF NOT EXISTS traces ( timestamp timestamp NOT NULL, - rqid text NOT NULL, - component text NOT NULL, - type text NOT NULL, - body text NOT NULL, - error text NOT NULL, - statuscode int NOT NULL, - remoteaddr text NOT NULL, - forwardedfor text NOT NULL, - path text NOT NULL, - phase text NOT NULL, - detail text NOT NULL, - tenant text NOT NULL, - userid text NOT NULL + entry jsonb NOT NULL )`, }, { @@ -170,7 +139,6 @@ func (a *timescaleAuditing) initialize() error { { query: `ALTER TABLE traces SET ( timescaledb.compress, - timescaledb.compress_segmentby = 'tenant', timescaledb.compress_orderby = 'timestamp' )`, }, @@ -178,10 +146,9 @@ func (a *timescaleAuditing) initialize() error { query: `SELECT add_compression_policy('traces', $1::interval)`, args: []any{a.config.CompressionInterval}, }, - // TODO: evaluate what is needed - // { - // query: `CREATE INDEX IF NOT EXISTS traces_idx ON traces()`, - // }, + { + query: `CREATE INDEX IF NOT EXISTS traces_gin_idx ON traces USING GIN (entry)`, + }, { query: `SELECT add_retention_policy('traces', $1::interval)`, args: []any{a.config.Retention}, @@ -216,36 +183,6 @@ func (a *timescaleAuditing) initialize() error { return err } - q, _, err := sq. - Select("column_name"). - From("information_schema.columns"). - Where("table_name='traces'"). - ToSql() - if err != nil { - return err - } - - rows, err := a.db.Query(q) - if err != nil { - return err - } - defer rows.Close() - if rows.Err() != nil { - return rows.Err() - } - - for rows.Next() { - var col string - - err = rows.Scan(&col) - if err != nil { - return err - } - - a.cols = append(a.cols, col) - a.vals = append(a.vals, sq.Expr(":"+col)) - } - return nil } @@ -256,22 +193,27 @@ func (a *timescaleAuditing) Flush() error { func (a *timescaleAuditing) Index(entry Entry) error { q, _, err := sq. Insert("traces"). - Columns(a.cols...). - Values(a.vals...). + Columns("timestamp", "entry"). + Values(sq.Expr(":timestamp"), sq.Expr(":entry")). ToSql() if err != nil { return err } - internalEntry, err := a.toInternal(entry) + e, err := json.Marshal(entry) if err != nil { - return fmt.Errorf("unable to convert audit trace to database entry: %w", err) + return fmt.Errorf("error marshaling entry: %w", err) + } + + row := timescaledbRow{ + Timestamp: entry.Timestamp, + Entry: e, } ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - _, err = a.db.NamedExecContext(ctx, q, internalEntry) + _, err = a.db.NamedExecContext(ctx, q, row) if err != nil { return fmt.Errorf("unable to index audit trace: %w", err) } @@ -288,17 +230,13 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return nil } - if !slices.Contains(a.cols, field) { - return fmt.Errorf("unable to filter for %q, no such table column", field) - } - values[field] = value switch op { case equals: - where = append(where, fmt.Sprintf("%s=:%s", field, field)) + where = append(where, fmt.Sprintf("entry ->> '%s'=:%s", field, field)) case like: - where = append(where, fmt.Sprintf("%s like '%%' || %s || '%%'", field, field)) + where = append(where, fmt.Sprintf("entry ->> '%s' like '%%' || :%s || '%%'", field, field)) default: return fmt.Errorf("comp op not known") } @@ -347,13 +285,6 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return nil, err } - query := sq. - Select(a.cols...). - From("traces"). - Columns(a.cols...). - Where(strings.Join(where, " AND ")). - OrderBy("timestamp ASC") - // to make queries more efficient for timescaledb, we always provide from if filter.From.IsZero() { filter.From = time.Now().Add(-24 * time.Hour) @@ -366,6 +297,13 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E values["to"] = filter.To where = append(where, "timestamp <= :to") } + + query := sq. + Select("timestamp", "entry"). + From("traces"). + Where(strings.Join(where, " AND ")). + OrderBy("timestamp ASC") + if filter.Limit != 0 { query.Limit(uint64(filter.Limit)) } @@ -384,16 +322,17 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E var entries []Entry for rows.Next() { - var e timescaleEntry + var e timescaledbRow err = rows.StructScan(&e) if err != nil { return nil, err } - entry, err := a.toExternal(e) + var entry Entry + err = json.Unmarshal(e.Entry, &entry) if err != nil { - return nil, fmt.Errorf("unable to convert entry: %w", err) + return nil, fmt.Errorf("error unmarshaling entry: %w", err) } entries = append(entries, entry) @@ -401,41 +340,3 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E return entries, nil } - -func (_ *timescaleAuditing) toInternal(e Entry) (*timescaleEntry, error) { - intermediate, err := json.Marshal(e) // nolint - if err != nil { - return nil, err - } - var internalEntry timescaleEntry - err = json.Unmarshal(intermediate, &internalEntry) // nolint - if err != nil { - return nil, err - } - - internalEntry.RequestId = e.RequestId - if e.Error != nil { - internalEntry.Error = e.Error.Error() - } - - return &internalEntry, nil -} - -func (_ *timescaleAuditing) toExternal(e timescaleEntry) (Entry, error) { - intermediate, err := json.Marshal(e) // nolint - if err != nil { - return Entry{}, err - } - var externalEntry Entry - err = json.Unmarshal(intermediate, &externalEntry) // nolint - if err != nil { - return Entry{}, err - } - - externalEntry.Id = e.RequestId - if e.Error != "" { - externalEntry.Error = errors.New(e.Error) - } - - return externalEntry, nil -} diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index c6a188d..bdf6c39 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -104,7 +104,8 @@ func TestAuditing_TimescaleDB(t *testing.T) { name: "insert one entry", t: func(t *testing.T, a Auditing) { err := a.Index(Entry{ - Body: "test", + Timestamp: now, + Body: "test", }) require.NoError(t, err) err = a.Flush() @@ -142,6 +143,7 @@ func TestAuditing_TimescaleDB(t *testing.T) { entries, err = a.Search(ctx, EntryFilter{ Body: "This", }) + require.NoError(t, err) assert.Len(t, entries, len(es)) }, From 87fddec3439a97b51b5d99c7ccf1b4f0461e5d96 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 14:06:03 +0200 Subject: [PATCH 07/13] Set timestamp. --- auditing/auditing-interceptor.go | 7 +++++++ auditing/timescaledb.go | 5 +++++ 2 files changed, 12 insertions(+) diff --git a/auditing/auditing-interceptor.go b/auditing/auditing-interceptor.go index 1e2d979..5212e0c 100644 --- a/auditing/auditing-interceptor.go +++ b/auditing/auditing-interceptor.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net/http" + "time" "connectrpc.com/connect" "github.com/emicklei/go-restful/v3" @@ -45,6 +46,7 @@ func UnaryServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(fu childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Type: EntryTypeGRPC, Detail: EntryDetailGRPCUnary, @@ -106,6 +108,7 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f } auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCStream, Path: info.FullMethod, @@ -166,6 +169,7 @@ func (a auditingConnectInterceptor) WrapStreamingClient(next connect.StreamingCl childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCStream, Path: s.Procedure, @@ -215,6 +219,7 @@ func (a auditingConnectInterceptor) WrapStreamingHandler(next connect.StreamingH childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCStream, Path: shc.Spec().Procedure, @@ -278,6 +283,7 @@ func (i auditingConnectInterceptor) WrapUnary(next connect.UnaryFunc) connect.Un childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCUnary, Path: ar.Spec().Procedure, @@ -381,6 +387,7 @@ func HttpFilter(a Auditing, logger *slog.Logger) (restful.FilterFunction, error) requestID = uuid.NewString() } auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Type: EntryTypeHTTP, Detail: EntryDetail(r.Method), diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 58fba8a..2a9da55 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "fmt" "log/slog" "reflect" @@ -191,6 +192,10 @@ func (a *timescaleAuditing) Flush() error { } func (a *timescaleAuditing) Index(entry Entry) error { + if entry.Timestamp.IsZero() { + return errors.New("timestamp is not set") + } + q, _, err := sq. Insert("traces"). Columns("timestamp", "entry"). From ee991380a3d1712036361581f09566364c5227c9 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 14:33:10 +0200 Subject: [PATCH 08/13] Squirrel is dead. --- auditing/timescaledb.go | 27 +++++++-------------------- go.mod | 3 --- go.sum | 7 ------- 3 files changed, 7 insertions(+), 30 deletions(-) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index 2a9da55..d539f41 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -11,7 +11,6 @@ import ( "strings" "time" - sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "github.com/lopezator/migrator" @@ -196,14 +195,7 @@ func (a *timescaleAuditing) Index(entry Entry) error { return errors.New("timestamp is not set") } - q, _, err := sq. - Insert("traces"). - Columns("timestamp", "entry"). - Values(sq.Expr(":timestamp"), sq.Expr(":entry")). - ToSql() - if err != nil { - return err - } + q := "INSERT INTO traces (timestamp, entry) VALUES (:timestamp, :entry)" e, err := json.Marshal(entry) if err != nil { @@ -303,19 +295,14 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E where = append(where, "timestamp <= :to") } - query := sq. - Select("timestamp", "entry"). - From("traces"). - Where(strings.Join(where, " AND ")). - OrderBy("timestamp ASC") - - if filter.Limit != 0 { - query.Limit(uint64(filter.Limit)) + q := "SELECT timestamp,entry FROM traces" + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") } + q += " ORDER BY timestamp ASC" - q, _, err := query.ToSql() - if err != nil { - return nil, err + if filter.Limit != 0 { + q += fmt.Sprintf(" LIMIT %d", filter.Limit) } rows, err := a.db.NamedQueryContext(ctx, q, values) diff --git a/go.mod b/go.mod index bfb3d78..4d71019 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.22 require ( connectrpc.com/connect v1.16.2 github.com/Masterminds/semver/v3 v3.2.1 - github.com/Masterminds/squirrel v1.5.4 github.com/avast/retry-go/v4 v4.6.0 github.com/coreos/go-oidc/v3 v3.11.0 github.com/emicklei/go-restful-openapi/v2 v2.10.2 @@ -119,8 +118,6 @@ require ( github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/kortschak/wol v0.0.0-20200729010619-da482cc4850a // indirect - github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect - github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect github.com/lestrrat-go/httpcc v1.0.1 // indirect github.com/lestrrat-go/httprc v1.0.5 // indirect diff --git a/go.sum b/go.sum index dbb38a2..df39df8 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,6 @@ github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8 github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0= github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ= -github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM= -github.com/Masterminds/squirrel v1.5.4/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/hcsshim v0.11.7 h1:vl/nj3Bar/CvJSYo7gIQPyRWc9f3c6IeSNavBTSZNZQ= @@ -259,10 +257,6 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= -github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= -github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= -github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/lestrrat-go/blackmagic v1.0.2 h1:Cg2gVSc9h7sz9NOByczrbUvLopQmXrfFx//N+AkAr5k= github.com/lestrrat-go/blackmagic v1.0.2/go.mod h1:UrEqBzIR2U6CnzVyUtfM6oZNMt/7O7Vohk2J0OGSAtU= github.com/lestrrat-go/httpcc v1.0.1 h1:ydWCStUeJLkpYyjLDHihupbn2tYmZ7m22BGkcvZZrIE= @@ -404,7 +398,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= From 6443e77f9f1a0b4a9d4d8679e4226455627f9e4b Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 14:42:31 +0200 Subject: [PATCH 09/13] Use UUID V7. --- auditing/auditing-interceptor.go | 39 +++++++++++++++++++++++++++----- 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/auditing/auditing-interceptor.go b/auditing/auditing-interceptor.go index 5212e0c..1756e76 100644 --- a/auditing/auditing-interceptor.go +++ b/auditing/auditing-interceptor.go @@ -40,7 +40,11 @@ func UnaryServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(fu requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return nil, err + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) @@ -99,7 +103,11 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return err + } + requestID = uuid.String() } childCtx := context.WithValue(ss.Context(), rest.RequestIDKey, requestID) childSS := grpcServerStreamWithContext{ @@ -164,7 +172,11 @@ func (a auditingConnectInterceptor) WrapStreamingClient(next connect.StreamingCl requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + a.logger.Error("unable to generate uuid", "error", err) + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) @@ -214,7 +226,11 @@ func (a auditingConnectInterceptor) WrapStreamingHandler(next connect.StreamingH requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return err + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) @@ -278,7 +294,11 @@ func (i auditingConnectInterceptor) WrapUnary(next connect.UnaryFunc) connect.Un requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return nil, err + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) @@ -384,7 +404,14 @@ func HttpFilter(a Auditing, logger *slog.Logger) (restful.FilterFunction, error) requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + logger.Error("unable to generate uuid", "error", err) + _, _ = response.Write([]byte("unable to generate request uuid " + err.Error())) + response.WriteHeader(http.StatusInternalServerError) + return + } + requestID = uuid.String() } auditReqContext := Entry{ Timestamp: time.Now(), From bdb6b502c519b16047d02f42b2dd5fbeed6f36c3 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 15:36:50 +0200 Subject: [PATCH 10/13] More. --- auditing/timescaledb_integration_test.go | 46 ++++++++++++------------ rest/middleware.go | 8 ++++- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index bdf6c39..896df5b 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -201,29 +201,29 @@ func TestAuditing_TimescaleDB(t *testing.T) { } }, }, - // { - // name: "filter on body", - // t: func(t *testing.T, a Auditing) { - // es := testEntries() - // for _, e := range es { - // err := a.Index(e) - // require.NoError(t, err) - // } - - // err := a.Flush() - // require.NoError(t, err) - - // entries, err := a.Search(ctx, EntryFilter{ - // Body: fmt.Sprintf("%q", es[0].Body.(string)), - // }) - // require.NoError(t, err) - // require.Len(t, entries, 1) - - // if diff := cmp.Diff(entries[0], es[0]); diff != "" { - // t.Errorf("diff (+got -want):\n %s", diff) - // } - // }, - // }, + { + name: "filter on body", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + Body: fmt.Sprintf("%s", es[0].Body.(string)), + }) + require.NoError(t, err) + require.Len(t, entries, 1) + + if diff := cmp.Diff(entries[0], es[0]); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, } for i, tt := range tests { tt := tt diff --git a/rest/middleware.go b/rest/middleware.go index 54478ad..fcdcbcc 100644 --- a/rest/middleware.go +++ b/rest/middleware.go @@ -56,7 +56,13 @@ func RequestLoggerFilter(logger *slog.Logger) restful.FilterFunction { requestID := req.HeaderParameter("X-Request-Id") if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + _, _ = resp.Write([]byte("unable to generate request uuid " + err.Error())) + resp.WriteHeader(http.StatusInternalServerError) + return + } + requestID = uuid.String() } fields := []any{ From 1f5a17fe4b0a56424414f76bf81b1b0d21dc06a5 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 15:42:11 +0200 Subject: [PATCH 11/13] Fix user filter. --- auditing/timescaledb.go | 2 +- auditing/timescaledb_integration_test.go | 38 ++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index d539f41..b005821 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -278,7 +278,7 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E if err := addFilter("type", filter.Type, equals); err != nil { return nil, err } - if err := addFilter("userid", filter.User, equals); err != nil { + if err := addFilter("user", filter.User, equals); err != nil { return nil, err } diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index 896df5b..3ee7c6a 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -219,6 +219,44 @@ func TestAuditing_TimescaleDB(t *testing.T) { require.NoError(t, err) require.Len(t, entries, 1) + if diff := cmp.Diff(entries[0], es[0]); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + { + name: "filter on everything", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + Limit: 1, + From: now.Add(-1 * time.Minute), + To: now.Add(1 * time.Minute), + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000000", + Type: "http", + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: "response", + Path: "/v1/test/0", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.0", + Body: fmt.Sprintf("%s", es[0].Body.(string)), + StatusCode: 200, + Error: "", + }) + require.NoError(t, err) + require.Len(t, entries, 1) + if diff := cmp.Diff(entries[0], es[0]); diff != "" { t.Errorf("diff (+got -want):\n %s", diff) } From af14336160f6321bf786b3d74e6ebd1c89650696 Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 15:43:19 +0200 Subject: [PATCH 12/13] Filter on nothing test. --- auditing/timescaledb_integration_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go index 3ee7c6a..63af752 100644 --- a/auditing/timescaledb_integration_test.go +++ b/auditing/timescaledb_integration_test.go @@ -262,6 +262,23 @@ func TestAuditing_TimescaleDB(t *testing.T) { } }, }, + { + name: "filter on nothing", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{}) + require.NoError(t, err) + require.Len(t, entries, len(testEntries())) + }, + }, } for i, tt := range tests { tt := tt From 504fccc294643d9a394d2a518a87a2f2bf62229f Mon Sep 17 00:00:00 2001 From: Gerrit Date: Mon, 26 Aug 2024 15:57:12 +0200 Subject: [PATCH 13/13] Only store and query UTC timestamps. --- auditing/timescaledb.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go index b005821..0e70f9d 100644 --- a/auditing/timescaledb.go +++ b/auditing/timescaledb.go @@ -203,7 +203,7 @@ func (a *timescaleAuditing) Index(entry Entry) error { } row := timescaledbRow{ - Timestamp: entry.Timestamp, + Timestamp: entry.Timestamp.UTC(), Entry: e, } @@ -284,14 +284,14 @@ func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]E // to make queries more efficient for timescaledb, we always provide from if filter.From.IsZero() { - filter.From = time.Now().Add(-24 * time.Hour) + filter.From = time.Now().Add(-24 * time.Hour).UTC() } - values["from"] = filter.From + values["from"] = filter.From.UTC() where = append(where, "timestamp >= :from") if !filter.To.IsZero() { - values["to"] = filter.To + values["to"] = filter.To.UTC() where = append(where, "timestamp <= :to") }