diff --git a/auditing/auditing-interceptor.go b/auditing/auditing-interceptor.go index 1e2d979..1756e76 100644 --- a/auditing/auditing-interceptor.go +++ b/auditing/auditing-interceptor.go @@ -8,6 +8,7 @@ import ( "io" "log/slog" "net/http" + "time" "connectrpc.com/connect" "github.com/emicklei/go-restful/v3" @@ -39,12 +40,17 @@ func UnaryServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(fu requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return nil, err + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Type: EntryTypeGRPC, Detail: EntryDetailGRPCUnary, @@ -97,7 +103,11 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return err + } + requestID = uuid.String() } childCtx := context.WithValue(ss.Context(), rest.RequestIDKey, requestID) childSS := grpcServerStreamWithContext{ @@ -106,6 +116,7 @@ func StreamServerInterceptor(a Auditing, logger *slog.Logger, shouldAudit func(f } auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCStream, Path: info.FullMethod, @@ -161,11 +172,16 @@ func (a auditingConnectInterceptor) WrapStreamingClient(next connect.StreamingCl requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + a.logger.Error("unable to generate uuid", "error", err) + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCStream, Path: s.Procedure, @@ -210,11 +226,16 @@ func (a auditingConnectInterceptor) WrapStreamingHandler(next connect.StreamingH requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return err + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCStream, Path: shc.Spec().Procedure, @@ -273,11 +294,16 @@ func (i auditingConnectInterceptor) WrapUnary(next connect.UnaryFunc) connect.Un requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + return nil, err + } + requestID = uuid.String() } childCtx := context.WithValue(ctx, rest.RequestIDKey, requestID) auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Detail: EntryDetailGRPCUnary, Path: ar.Spec().Procedure, @@ -378,9 +404,17 @@ func HttpFilter(a Auditing, logger *slog.Logger) (restful.FilterFunction, error) requestID = str } if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + logger.Error("unable to generate uuid", "error", err) + _, _ = response.Write([]byte("unable to generate request uuid " + err.Error())) + response.WriteHeader(http.StatusInternalServerError) + return + } + requestID = uuid.String() } auditReqContext := Entry{ + Timestamp: time.Now(), RequestId: requestID, Type: EntryTypeHTTP, Detail: EntryDetail(r.Method), diff --git a/auditing/auditing.go b/auditing/auditing.go index bfdae5f..3898bf2 100644 --- a/auditing/auditing.go +++ b/auditing/auditing.go @@ -1,18 +1,16 @@ package auditing import ( + "context" "log/slog" + "os" + "path/filepath" "time" ) type Config struct { - Component string - URL string - APIKey string - IndexPrefix string - RotationInterval Interval - Keep int64 - Log *slog.Logger + Component string + Log *slog.Logger } type Interval string @@ -52,31 +50,31 @@ const ( const EntryFilterDefaultLimit int64 = 100 type Entry struct { - Id string // filled by the auditing driver - Component string - RequestId string `json:"rqid"` - Type EntryType - Timestamp time.Time + Id string `json:"-"` // filled by the auditing driver + Component string `json:"component"` + RequestId string `json:"rqid"` + Type EntryType `json:"type"` + Timestamp time.Time `json:"timestamp"` - User string - Tenant string + User string `json:"user"` + Tenant string `json:"tenant"` // For `EntryDetailHTTP` the HTTP method get, post, put, delete, ... // For `EntryDetailGRPC` unary, stream - Detail EntryDetail + Detail EntryDetail `json:"detail"` // e.g. Request, Response, Error, Opened, Close - Phase EntryPhase + Phase EntryPhase `json:"phase"` // For `EntryDetailHTTP` /api/v1/... // For `EntryDetailGRPC` /api.v1/... (the method name) - Path string - ForwardedFor string - RemoteAddr string + Path string `json:"path"` + ForwardedFor string `json:"forwardedfor"` + RemoteAddr string `json:"remoteaddr"` - Body any // JSON, string or numbers - StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code + Body any `json:"body"` // JSON, string or numbers + StatusCode int `json:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code // Internal errors - Error error + Error error `json:"error"` } func (e *Entry) prepareForNextPhase() { @@ -135,5 +133,14 @@ type Auditing interface { // Searches for entries matching the given filter. // By default only recent entries will be returned. // The returned entries will be sorted by timestamp in descending order. - Search(EntryFilter) ([]Entry, error) + Search(context.Context, EntryFilter) ([]Entry, error) +} + +func defaultComponent() (string, error) { + ex, err := os.Executable() + if err != nil { + return "", err + } + + return filepath.Base(ex), nil } diff --git a/auditing/meilisearch.go b/auditing/meilisearch.go index ef121f5..aa2fb3b 100644 --- a/auditing/meilisearch.go +++ b/auditing/meilisearch.go @@ -5,8 +5,6 @@ import ( "errors" "fmt" "log/slog" - "os" - "path/filepath" "regexp" "slices" "strings" @@ -17,6 +15,15 @@ import ( "github.com/meilisearch/meilisearch-go" ) +type MeilisearchConfig struct { + URL string + APIKey string + + IndexPrefix string + RotationInterval Interval + Keep int64 +} + type meiliAuditing struct { component string client *meilisearch.Client @@ -39,32 +46,33 @@ const ( meiliIndexCreationWaitInterval = 100 * time.Millisecond ) -func New(c Config) (Auditing, error) { +func NewMeilisearch(c Config, mc MeilisearchConfig) (Auditing, error) { if c.Component == "" { - ex, err := os.Executable() + component, err := defaultComponent() if err != nil { return nil, err } - c.Component = filepath.Base(ex) + + c.Component = component } client := meilisearch.NewClient(meilisearch.ClientConfig{ - Host: c.URL, - APIKey: c.APIKey, + Host: mc.URL, + APIKey: mc.APIKey, }) v, err := client.GetVersion() if err != nil { - return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", c.URL, err) + return nil, fmt.Errorf("unable to connect to meilisearch at:%s %w", mc.URL, err) } - c.Log.Info("meilisearch", "connected to", v, "index rotated", c.RotationInterval, "index keep", c.Keep) + c.Log.Info("meilisearch", "connected to", v, "index rotated", mc.RotationInterval, "index keep", mc.Keep) a := &meiliAuditing{ component: c.Component, client: client, log: c.Log.WithGroup("auditing"), - indexPrefix: c.IndexPrefix, - rotationInterval: c.RotationInterval, - keep: c.Keep, + indexPrefix: mc.IndexPrefix, + rotationInterval: mc.RotationInterval, + keep: mc.Keep, } return a, nil } @@ -121,7 +129,7 @@ func (a *meiliAuditing) Index(entry Entry) error { return nil } -func (a *meiliAuditing) Search(filter EntryFilter) ([]Entry, error) { +func (a *meiliAuditing) Search(_ context.Context, filter EntryFilter) ([]Entry, error) { predicates := make([]string, 0) if filter.Component != "" { predicates = append(predicates, fmt.Sprintf("component = %q", filter.Component)) diff --git a/auditing/meilisearch_integration_test.go b/auditing/meilisearch_integration_test.go index 60e19d1..7220880 100644 --- a/auditing/meilisearch_integration_test.go +++ b/auditing/meilisearch_integration_test.go @@ -70,6 +70,7 @@ func StartMeilisearch(t testing.TB) (container testcontainers.Container, c *conn } func TestAuditing_Meilisearch(t *testing.T) { + ctx := context.Background() container, c, err := StartMeilisearch(t) require.NoError(t, err) defer func() { @@ -143,7 +144,7 @@ func TestAuditing_Meilisearch(t *testing.T) { { name: "no entries, no search results", t: func(t *testing.T, a Auditing) { - entries, err := a.Search(EntryFilter{}) + entries, err := a.Search(ctx, EntryFilter{}) require.NoError(t, err) assert.Empty(t, entries) }, @@ -158,7 +159,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ Body: "test", }) require.NoError(t, err) @@ -177,7 +178,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{}) + entries, err := a.Search(ctx, EntryFilter{}) require.NoError(t, err) assert.Len(t, entries, len(es)) @@ -187,7 +188,7 @@ func TestAuditing_Meilisearch(t *testing.T) { t.Errorf("diff (+got -want):\n %s", diff) } - entries, err = a.Search(EntryFilter{ + entries, err = a.Search(ctx, EntryFilter{ Body: "This", }) require.NoError(t, err) @@ -206,7 +207,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ RequestId: es[0].RequestId, }) require.NoError(t, err) @@ -234,7 +235,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ Phase: EntryPhaseResponse, }) require.NoError(t, err) @@ -259,7 +260,7 @@ func TestAuditing_Meilisearch(t *testing.T) { err = a.Flush() require.NoError(t, err) - entries, err := a.Search(EntryFilter{ + entries, err := a.Search(ctx, EntryFilter{ // we want to run a phrase search as otherwise we return the other entries as well // https://www.meilisearch.com/docs/reference/api/search#phrase-search-2 Body: fmt.Sprintf("%q", es[0].Body.(string)), @@ -277,10 +278,11 @@ func TestAuditing_Meilisearch(t *testing.T) { tt := tt t.Run(fmt.Sprintf("%d %s", i, tt.name), func(t *testing.T) { - a, err := New(Config{ + a, err := NewMeilisearch(Config{ + Log: slog.Default(), + }, MeilisearchConfig{ URL: c.Endpoint, APIKey: c.Password, - Log: slog.Default(), IndexPrefix: fmt.Sprintf("test-%d", i), }) require.NoError(t, err) diff --git a/auditing/timescaledb.go b/auditing/timescaledb.go new file mode 100644 index 0000000..0e70f9d --- /dev/null +++ b/auditing/timescaledb.go @@ -0,0 +1,334 @@ +package auditing + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "log/slog" + "reflect" + "strings" + "time" + + "github.com/jmoiron/sqlx" + "github.com/lopezator/migrator" + + _ "github.com/lib/pq" +) + +type ( + TimescaleDbConfig struct { + Host string + Port string + DB string + User string + Password string + + // Retention defines when audit traces will be thrown away, only settable on initial database usage + // If this needs to be changed over time, you need to do this manually. Defaults to '14 days'. + Retention string + // CompressionInterval defines after which period audit traces will be compressed, only settable on initial database usage. + // If this needs to be changed over time, you need to do this manually. Defaults to '7 days'. + CompressionInterval string + // ChunkInterval defines after which period audit traces will be stored in a new chunk table, only settable on initial database usage. + // If this needs to be changed over time, you need to do this manually. Defaults to '1 days'. + ChunkInterval string + } + + timescaleAuditing struct { + component string + db *sqlx.DB + log *slog.Logger + + config *TimescaleDbConfig + } + + timescaledbRow struct { + Timestamp time.Time `db:"timestamp"` + Entry []byte `db:"entry"` + } + + sqlCompOp string +) + +const ( + equals sqlCompOp = "equals" + like sqlCompOp = "like" +) + +func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) { + if c.Component == "" { + component, err := defaultComponent() + if err != nil { + return nil, err + } + + c.Component = component + } + + if tc.Port == "" { + tc.Port = "5432" + } + if tc.DB == "" { + tc.DB = "postgres" + } + if tc.User == "" { + tc.User = "postgres" + } + if tc.Retention == "" { + tc.Retention = "14 days" + } + if tc.ChunkInterval == "" { + tc.ChunkInterval = "1 days" + } + if tc.CompressionInterval == "" { + tc.CompressionInterval = "7 days" + } + + source := fmt.Sprintf("host=%s port=%s user=%s dbname=%s password=%s sslmode=disable", tc.Host, tc.Port, tc.User, tc.DB, tc.Password) + + db, err := sqlx.Connect("postgres", source) + if err != nil { + return nil, fmt.Errorf("could not connect to datastore: %w", err) + } + + a := ×caleAuditing{ + component: c.Component, + log: c.Log.WithGroup("auditing"), + db: db, + config: &tc, + } + + err = a.initialize() + if err != nil { + return nil, fmt.Errorf("unable to initialize timescaledb backend: %w", err) + } + + a.log.Info("connected to timescaledb backend") + + return a, nil +} + +func (a *timescaleAuditing) initialize() error { + type txStatement struct { + query string + args []any + } + + initialSchema := &migrator.Migration{ + Name: "Initial database schema", + Func: func(tx *sql.Tx) error { + for _, stmt := range []txStatement{ + { + query: `CREATE EXTENSION IF NOT EXISTS timescaledb`, + }, + { + query: `CREATE EXTENSION IF NOT EXISTS pg_stat_statements`, + }, + { + query: `CREATE TABLE IF NOT EXISTS traces ( + timestamp timestamp NOT NULL, + entry jsonb NOT NULL + )`, + }, + { + query: `SELECT create_hypertable('traces', 'timestamp', chunk_time_interval => $1::interval, if_not_exists => TRUE)`, + args: []any{a.config.ChunkInterval}, + }, + { + query: `ALTER TABLE traces SET ( + timescaledb.compress, + timescaledb.compress_orderby = 'timestamp' + )`, + }, + { + query: `SELECT add_compression_policy('traces', $1::interval)`, + args: []any{a.config.CompressionInterval}, + }, + { + query: `CREATE INDEX IF NOT EXISTS traces_gin_idx ON traces USING GIN (entry)`, + }, + { + query: `SELECT add_retention_policy('traces', $1::interval)`, + args: []any{a.config.Retention}, + }, + } { + if _, err := tx.Exec(stmt.query, stmt.args...); err != nil { + return err + } + } + + return nil + }, + } + + a.db.SetMaxIdleConns(5) + a.db.SetConnMaxLifetime(2 * time.Minute) + a.db.SetMaxOpenConns(95) + + m, err := migrator.New( + migrator.WithLogger(migrator.LoggerFunc(func(msg string, args ...interface{}) { + a.log.Info(fmt.Sprintf(msg, args...)) + })), + migrator.Migrations( + initialSchema, + ), + ) + if err != nil { + return err + } + + if err := m.Migrate(a.db.DB); err != nil { + return err + } + + return nil +} + +func (a *timescaleAuditing) Flush() error { + return nil +} + +func (a *timescaleAuditing) Index(entry Entry) error { + if entry.Timestamp.IsZero() { + return errors.New("timestamp is not set") + } + + q := "INSERT INTO traces (timestamp, entry) VALUES (:timestamp, :entry)" + + e, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("error marshaling entry: %w", err) + } + + row := timescaledbRow{ + Timestamp: entry.Timestamp.UTC(), + Entry: e, + } + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + _, err = a.db.NamedExecContext(ctx, q, row) + if err != nil { + return fmt.Errorf("unable to index audit trace: %w", err) + } + + return nil +} + +func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) { + var ( + where []string + values = map[string]interface{}{} + addFilter = func(field string, value any, op sqlCompOp) error { + if reflect.ValueOf(value).IsZero() { + return nil + } + + values[field] = value + + switch op { + case equals: + where = append(where, fmt.Sprintf("entry ->> '%s'=:%s", field, field)) + case like: + where = append(where, fmt.Sprintf("entry ->> '%s' like '%%' || :%s || '%%'", field, field)) + default: + return fmt.Errorf("comp op not known") + } + + return nil + } + ) + + if err := addFilter("body", filter.Body, like); err != nil { + return nil, err + } + if err := addFilter("component", filter.Component, equals); err != nil { + return nil, err + } + if err := addFilter("detail", filter.Detail, equals); err != nil { + return nil, err + } + if err := addFilter("error", filter.Error, equals); err != nil { + return nil, err + } + if err := addFilter("forwardedfor", filter.ForwardedFor, equals); err != nil { + return nil, err + } + if err := addFilter("path", filter.Path, equals); err != nil { + return nil, err + } + if err := addFilter("phase", filter.Phase, equals); err != nil { + return nil, err + } + if err := addFilter("remoteaddr", filter.RemoteAddr, equals); err != nil { + return nil, err + } + if err := addFilter("rqid", filter.RequestId, equals); err != nil { + return nil, err + } + if err := addFilter("statuscode", filter.StatusCode, equals); err != nil { + return nil, err + } + if err := addFilter("tenant", filter.Tenant, equals); err != nil { + return nil, err + } + if err := addFilter("type", filter.Type, equals); err != nil { + return nil, err + } + if err := addFilter("user", filter.User, equals); err != nil { + return nil, err + } + + // to make queries more efficient for timescaledb, we always provide from + if filter.From.IsZero() { + filter.From = time.Now().Add(-24 * time.Hour).UTC() + } + + values["from"] = filter.From.UTC() + where = append(where, "timestamp >= :from") + + if !filter.To.IsZero() { + values["to"] = filter.To.UTC() + where = append(where, "timestamp <= :to") + } + + q := "SELECT timestamp,entry FROM traces" + if len(where) > 0 { + q += " WHERE " + strings.Join(where, " AND ") + } + q += " ORDER BY timestamp ASC" + + if filter.Limit != 0 { + q += fmt.Sprintf(" LIMIT %d", filter.Limit) + } + + rows, err := a.db.NamedQueryContext(ctx, q, values) + if err != nil { + return nil, err + } + defer rows.Close() + + var entries []Entry + + for rows.Next() { + var e timescaledbRow + + err = rows.StructScan(&e) + if err != nil { + return nil, err + } + + var entry Entry + err = json.Unmarshal(e.Entry, &entry) + if err != nil { + return nil, fmt.Errorf("error unmarshaling entry: %w", err) + } + + entries = append(entries, entry) + } + + return entries, nil +} diff --git a/auditing/timescaledb_integration_test.go b/auditing/timescaledb_integration_test.go new file mode 100644 index 0000000..63af752 --- /dev/null +++ b/auditing/timescaledb_integration_test.go @@ -0,0 +1,333 @@ +//go:build integration +// +build integration + +package auditing + +import ( + "context" + "fmt" + "log/slog" + "sort" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestAuditing_TimescaleDB(t *testing.T) { + ctx := context.Background() + container, auditing := StartTimescaleDB(t, Config{ + Log: slog.Default(), + }) + defer func() { + err := container.Terminate(context.Background()) + require.NoError(t, err) + }() + + now := time.Now().UTC() + // postgres does not store the nano seconds, so we neglect them for comparison: + timeComparer := cmp.Comparer(func(x, y time.Time) bool { + return x.Unix() == y.Unix() + }) + + testEntries := func() []Entry { + return []Entry{ + { + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000000", + Type: EntryTypeHTTP, + Timestamp: now, + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: EntryPhaseResponse, + Path: "/v1/test/0", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.0", + Body: "This is the body of 00000000-0000-0000-0000-000000000000", + StatusCode: 200, + Error: nil, + }, + { + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000001", + Type: EntryTypeHTTP, + Timestamp: now.Add(1 * time.Second), + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: EntryPhaseResponse, + Path: "/v1/test/1", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.1", + Body: "This is the body of 00000000-0000-0000-0000-000000000001", + StatusCode: 201, + Error: nil, + }, + { + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000002", + Type: EntryTypeHTTP, + Timestamp: now.Add(2 * time.Second), + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: EntryPhaseRequest, + Path: "/v1/test/2", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.2", + Body: "This is the body of 00000000-0000-0000-0000-000000000002", + StatusCode: 0, + Error: nil, + }, + } + } + + tests := []struct { + name string + t func(t *testing.T, a Auditing) + }{ + { + name: "no entries, no search results", + t: func(t *testing.T, a Auditing) { + entries, err := a.Search(ctx, EntryFilter{}) + require.NoError(t, err) + assert.Empty(t, entries) + }, + }, + { + name: "insert one entry", + t: func(t *testing.T, a Auditing) { + err := a.Index(Entry{ + Timestamp: now, + Body: "test", + }) + require.NoError(t, err) + err = a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + Body: "test", + }) + require.NoError(t, err) + assert.Len(t, entries, 1) + }, + }, + { + name: "insert a couple of entries", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{}) + require.NoError(t, err) + assert.Len(t, entries, len(es)) + + sort.Slice(entries, func(i, j int) bool { return entries[i].RequestId < entries[j].RequestId }) + + if diff := cmp.Diff(entries, es, cmpopts.IgnoreFields(Entry{}, "Id"), timeComparer); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + + entries, err = a.Search(ctx, EntryFilter{ + Body: "This", + }) + + require.NoError(t, err) + assert.Len(t, entries, len(es)) + }, + }, + { + name: "filter search on rqid", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + RequestId: es[0].RequestId, + }) + require.NoError(t, err) + require.Len(t, entries, 1) + + if diff := cmp.Diff(entries[0], es[0], cmpopts.IgnoreFields(Entry{}, "Id"), timeComparer); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + { + name: "filter search on phase", + t: func(t *testing.T, a Auditing) { + es := testEntries() + var wantEntries []Entry + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + + if e.Phase == EntryPhaseResponse { + wantEntries = append(wantEntries, e) + } + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + Phase: EntryPhaseResponse, + }) + require.NoError(t, err) + require.Len(t, entries, 2) + + sort.Slice(entries, func(i, j int) bool { return entries[i].RequestId < entries[j].RequestId }) + + if diff := cmp.Diff(entries, wantEntries, cmpopts.IgnoreFields(Entry{}, "Id"), timeComparer); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + { + name: "filter on body", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + Body: fmt.Sprintf("%s", es[0].Body.(string)), + }) + require.NoError(t, err) + require.Len(t, entries, 1) + + if diff := cmp.Diff(entries[0], es[0]); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + { + name: "filter on everything", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{ + Limit: 1, + From: now.Add(-1 * time.Minute), + To: now.Add(1 * time.Minute), + Component: "auditing.test", + RequestId: "00000000-0000-0000-0000-000000000000", + Type: "http", + User: "admin", + Tenant: "global", + Detail: "POST", + Phase: "response", + Path: "/v1/test/0", + ForwardedFor: "127.0.0.1", + RemoteAddr: "10.0.0.0", + Body: fmt.Sprintf("%s", es[0].Body.(string)), + StatusCode: 200, + Error: "", + }) + require.NoError(t, err) + require.Len(t, entries, 1) + + if diff := cmp.Diff(entries[0], es[0]); diff != "" { + t.Errorf("diff (+got -want):\n %s", diff) + } + }, + }, + { + name: "filter on nothing", + t: func(t *testing.T, a Auditing) { + es := testEntries() + for _, e := range es { + err := a.Index(e) + require.NoError(t, err) + } + + err := a.Flush() + require.NoError(t, err) + + entries, err := a.Search(ctx, EntryFilter{}) + require.NoError(t, err) + require.Len(t, entries, len(testEntries())) + }, + }, + } + for i, tt := range tests { + tt := tt + + t.Run(fmt.Sprintf("%d %s", i, tt.name), func(t *testing.T) { + defer func() { + a := auditing.(*timescaleAuditing) + a.db.MustExec("DELETE FROM traces;") + }() + + tt.t(t, auditing) + }) + } +} + +func StartTimescaleDB(t testing.TB, config Config) (testcontainers.Container, Auditing) { + req := testcontainers.ContainerRequest{ + Image: "timescale/timescaledb:2.16.1-pg16", + ExposedPorts: []string{"5432/tcp"}, + Env: map[string]string{"POSTGRES_PASSWORD": "password"}, + WaitingFor: wait.ForAll( + wait.ForLog("database system is ready to accept connections"), + wait.ForListeningPort("5432/tcp"), + ), + Cmd: []string{"postgres", "-c", "max_connections=200"}, + } + + ctx := context.Background() + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + require.NoError(t, err) + + ip, err := container.Host(ctx) + require.NoError(t, err) + + port, err := container.MappedPort(ctx, "5432") + require.NoError(t, err) + + auditing, err := NewTimescaleDB(config, TimescaleDbConfig{ + Host: ip, + Port: port.Port(), + DB: "postgres", + User: "postgres", + Password: "password", + }) + require.NoError(t, err) + + return container, auditing +} diff --git a/go.mod b/go.mod index 828a309..4d71019 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,9 @@ require ( github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/icza/dyno v0.0.0-20230330125955-09f820a8d9c0 + github.com/jmoiron/sqlx v1.4.0 + github.com/lib/pq v1.10.9 + github.com/lopezator/migrator v0.3.1 github.com/mattn/go-isatty v0.0.20 github.com/meilisearch/meilisearch-go v0.27.2 github.com/metal-stack/security v0.8.1 @@ -41,8 +44,6 @@ require ( tailscale.com v1.54.0 ) -require github.com/containerd/platforms v0.2.1 // indirect - require ( dario.cat/mergo v1.0.0 // indirect filippo.io/edwards25519 v1.1.0 // indirect @@ -71,6 +72,7 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/containerd/containerd v1.7.20 // indirect github.com/containerd/log v0.1.0 // indirect + github.com/containerd/platforms v0.2.1 // indirect github.com/coreos/go-iptables v0.7.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/dockercfg v0.3.1 // indirect diff --git a/go.sum b/go.sum index fe80197..df39df8 100644 --- a/go.sum +++ b/go.sum @@ -154,6 +154,8 @@ github.com/go-openapi/swag v0.23.0 h1:vsEVJDUo2hPJ2tu0/Xc+4noaxyEffXNIs3cOULZ+Gr github.com/go-openapi/swag v0.23.0/go.mod h1:esZ8ITTYEsH1V2trKHjAN8Ai7xHb8RV+YSZ577vPjgQ= github.com/go-openapi/validate v0.24.0 h1:LdfDKwNbpB6Vn40xhTdNZAnfLECL81w+VX3BumrGD58= github.com/go-openapi/validate v0.24.0/go.mod h1:iyeX1sEufmv3nPbBdX3ieNviWnOZaJ1+zquzJEf2BAQ= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI= github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= @@ -206,10 +208,28 @@ github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2 github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/insomniacslk/dhcp v0.0.0-20240204152450-ca2dc33955c1 h1:L3pm9Kf2G6gJVYawz2SrI5QnV1wzHYbqmKnSHHXJAb8= github.com/insomniacslk/dhcp v0.0.0-20240204152450-ca2dc33955c1/go.mod h1:izxuNQZeFrbx2nK2fAyN5iNUB34Fe9j0nK4PwLzAkKw= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.11.0 h1:HiHArx4yFbwl91X3qqIHtUFoiIfLNJXCQRsnzkiwwaQ= +github.com/jackc/pgconn v1.11.0/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.2.0 h1:r7JypeP2D3onoQTCxWdTpCtJ4D+qpKr0TxvoyMhZ5ns= +github.com/jackc/pgproto3/v2 v2.2.0/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= +github.com/jackc/pgtype v1.10.0 h1:ILnBWrRMSXGczYvmkYD6PsYyVFUNLTnIUJHHDLmqk38= +github.com/jackc/pgtype v1.10.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.15.0 h1:B7dTkXsdILD3MF987WGGCcg+tvLW6bZJdEcqVFeU//w= +github.com/jackc/pgx/v4 v4.15.0/go.mod h1:D/zyOyXiaM1TmVWnOM18p0xdDtdakRBa0RsVGI3U3bw= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jmoiron/sqlx v1.4.0 h1:1PLqN7S1UYp5t4SrVVnt4nUVNemrDAtxlulVe+Qgm3o= +github.com/jmoiron/sqlx v1.4.0/go.mod h1:ZrZ7UsYB/weZdl2Bxg6jCRO9c3YHl8r3ahlKmRT4JLY= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/josharian/native v1.0.1-0.20221213033349-c1e37c09b531/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= @@ -249,6 +269,10 @@ github.com/lestrrat-go/jwx/v2 v2.1.0 h1:0zs7Ya6+39qoit7gwAf+cYm1zzgS3fceIdo7RmQ5 github.com/lestrrat-go/jwx/v2 v2.1.0/go.mod h1:Xpw9QIaUGiIUD1Wx0NcY1sIHwFf8lDuZn/cmxtXYRys= github.com/lestrrat-go/option v1.0.1 h1:oAzP2fvZGQKWkvHa1/SAcFolBEca1oN+mQ7eooNBEYU= github.com/lestrrat-go/option v1.0.1/go.mod h1:5ZHFbivi4xwXxhxY9XHDe2FHo6/Z7WWmtT7T5nBBp3I= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lopezator/migrator v0.3.1 h1:ZFPT6aC7+nGWkqhleynABZ6ftycSf6hmHHLOaryq1Og= +github.com/lopezator/migrator v0.3.1/go.mod h1:X+lHDMZ9Ci3/KdbypJcQYFFwipVrJsX4fRCQ4QLauYk= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= @@ -267,6 +291,8 @@ github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= +github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= github.com/mdlayher/genetlink v1.3.2 h1:KdrNKe+CTu+IbZnm/GVUMXSqBBLqcGpRDa0xkQy56gw= github.com/mdlayher/genetlink v1.3.2/go.mod h1:tcC3pkCrPUGIKKsCsp0B3AdaaKuHtaxoJRz3cc+528o= github.com/mdlayher/netlink v1.7.2 h1:/UtM3ofJap7Vl4QWCPDGXY8d3GIY2UGSDbK+QWmY8/g= diff --git a/pkg/ssh/ssh.go b/pkg/ssh/ssh.go index 235f790..04017bf 100644 --- a/pkg/ssh/ssh.go +++ b/pkg/ssh/ssh.go @@ -125,7 +125,7 @@ func (c *Client) Connect(env *Env) error { ssh.TTY_OP_OSPEED: 115200, // output speed = 14.4kbaud } - fileDescriptor := int(os.Stdin.Fd()) + fileDescriptor := int(os.Stdin.Fd()) //nolint if term.IsTerminal(fileDescriptor) { originalState, err := term.MakeRaw(fileDescriptor) diff --git a/rest/middleware.go b/rest/middleware.go index 54478ad..fcdcbcc 100644 --- a/rest/middleware.go +++ b/rest/middleware.go @@ -56,7 +56,13 @@ func RequestLoggerFilter(logger *slog.Logger) restful.FilterFunction { requestID := req.HeaderParameter("X-Request-Id") if requestID == "" { - requestID = uuid.NewString() + uuid, err := uuid.NewV7() + if err != nil { + _, _ = resp.Write([]byte("unable to generate request uuid " + err.Error())) + resp.WriteHeader(http.StatusInternalServerError) + return + } + requestID = uuid.String() } fields := []any{