Skip to content

Commit

Permalink
Small improvements.
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerrit91 committed Aug 23, 2024
1 parent 7855e39 commit e0481fa
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 62 deletions.
33 changes: 17 additions & 16 deletions auditing/auditing.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package auditing

import (
"context"
"log/slog"
"os"
"path/filepath"
Expand Down Expand Up @@ -49,32 +50,32 @@ const (
const EntryFilterDefaultLimit int64 = 100

type Entry struct {
Id string `db:"-"` // filled by the auditing driver
Id string // filled by the auditing driver

Component string `db:"component"`
RequestId string `db:"rqid" json:"rqid"`
Type EntryType `db:"type"`
Timestamp time.Time `db:"timestamp"`
Component string
RequestId string `json:"rqid"`
Type EntryType
Timestamp time.Time

User string `db:"userid"`
Tenant string `db:"tenant"`
User string
Tenant string

// For `EntryDetailHTTP` the HTTP method get, post, put, delete, ...
// For `EntryDetailGRPC` unary, stream
Detail EntryDetail `db:"detail"`
Detail EntryDetail
// e.g. Request, Response, Error, Opened, Close
Phase EntryPhase `db:"phase"`
Phase EntryPhase
// For `EntryDetailHTTP` /api/v1/...
// For `EntryDetailGRPC` /api.v1/... (the method name)
Path string `db:"path"`
ForwardedFor string `db:"forwardedfor"`
RemoteAddr string `db:"remoteaddr"`
Path string
ForwardedFor string
RemoteAddr string

Body any `db:"body"` // JSON, string or numbers
StatusCode int `db:"statuscode"` // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code
Body any // JSON, string or numbers
StatusCode int // for `EntryDetailHTTP` the HTTP status code, for EntryDetailGRPC` the grpc status code

// Internal errors
Error string `db:"error"`
Error string
}

func (e *Entry) prepareForNextPhase() {
Expand Down Expand Up @@ -133,7 +134,7 @@ type Auditing interface {
// Searches for entries matching the given filter.
// By default only recent entries will be returned.
// The returned entries will be sorted by timestamp in descending order.
Search(EntryFilter) ([]Entry, error)
Search(context.Context, EntryFilter) ([]Entry, error)
}

func defaultComponent() (string, error) {
Expand Down
2 changes: 1 addition & 1 deletion auditing/meilisearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (a *meiliAuditing) Index(entry Entry) error {
return nil
}

func (a *meiliAuditing) Search(filter EntryFilter) ([]Entry, error) {
func (a *meiliAuditing) Search(_ context.Context, filter EntryFilter) ([]Entry, error) {
predicates := make([]string, 0)
if filter.Component != "" {
predicates = append(predicates, fmt.Sprintf("component = %q", filter.Component))
Expand Down
15 changes: 8 additions & 7 deletions auditing/meilisearch_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func StartMeilisearch(t testing.TB) (container testcontainers.Container, c *conn
}

func TestAuditing_Meilisearch(t *testing.T) {
ctx := context.Background()
container, c, err := StartMeilisearch(t)
require.NoError(t, err)
defer func() {
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
{
name: "no entries, no search results",
t: func(t *testing.T, a Auditing) {
entries, err := a.Search(EntryFilter{})
entries, err := a.Search(ctx, EntryFilter{})
require.NoError(t, err)
assert.Empty(t, entries)
},
Expand All @@ -158,7 +159,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
err = a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
Body: "test",
})
require.NoError(t, err)
Expand All @@ -177,7 +178,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
err = a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{})
entries, err := a.Search(ctx, EntryFilter{})
require.NoError(t, err)
assert.Len(t, entries, len(es))

Expand All @@ -187,7 +188,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
t.Errorf("diff (+got -want):\n %s", diff)
}

entries, err = a.Search(EntryFilter{
entries, err = a.Search(ctx, EntryFilter{
Body: "This",
})
require.NoError(t, err)
Expand All @@ -206,7 +207,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
err = a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
RequestId: es[0].RequestId,
})
require.NoError(t, err)
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
err = a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
Phase: EntryPhaseResponse,
})
require.NoError(t, err)
Expand All @@ -259,7 +260,7 @@ func TestAuditing_Meilisearch(t *testing.T) {
err = a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
// we want to run a phrase search as otherwise we return the other entries as well
// https://www.meilisearch.com/docs/reference/api/search#phrase-search-2
Body: fmt.Sprintf("%q", es[0].Body.(string)),
Expand Down
122 changes: 91 additions & 31 deletions auditing/timescaledb.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package auditing
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log/slog"
"reflect"
Expand All @@ -17,22 +18,50 @@ import (
_ "github.com/lib/pq"
)

type TimescaleDbConfig struct {
Host string
Port string
DB string
User string
Password string
}

type timescaleAuditing struct {
component string
db *sqlx.DB
log *slog.Logger
type (
TimescaleDbConfig struct {
Host string
Port string
DB string
User string
Password string
}

timescaleAuditing struct {
component string
db *sqlx.DB
log *slog.Logger

cols []string
vals []any
}

// to keep the public interface free from field tags like "db" and "json" (as these might differ for different dbs)
// we introduce an internal type. unfortunately, this requires a conversion, which takes effort to maintain
timescaleEntry struct {
Component string `db:"component"`
RequestId string `db:"rqid" json:"rqid"`
Type EntryType `db:"type"`
Timestamp time.Time `db:"timestamp"`
User string `db:"userid"`
Tenant string `db:"tenant"`
Detail EntryDetail `db:"detail"`
Phase EntryPhase `db:"phase"`
Path string `db:"path"`
ForwardedFor string `db:"forwardedfor"`
RemoteAddr string `db:"remoteaddr"`
Body any `db:"body"`
StatusCode int `db:"statuscode"`
Error string `db:"error"`
}

sqlCompOp string
)

cols []string
vals []any
}
const (
equals sqlCompOp = "equals"
like sqlCompOp = "like"
)

func NewTimescaleDB(c Config, tc TimescaleDbConfig) (Auditing, error) {
if c.Component == "" {
Expand Down Expand Up @@ -187,31 +216,27 @@ func (a *timescaleAuditing) Index(entry Entry) error {
return err
}

internalEntry, err := a.toInternal(entry)
if err != nil {
return fmt.Errorf("unable to convert audit trace to database entry: %w", err)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

entry.Id = entry.RequestId

_, err = a.db.NamedExecContext(ctx, q, entry)
_, err = a.db.NamedExecContext(ctx, q, internalEntry)
if err != nil {
return fmt.Errorf("unable to index audit trace: %w", err)
}

return nil
}

type compOp string

const (
equals compOp = "equals"
like compOp = "like"
)

func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) {
func (a *timescaleAuditing) Search(ctx context.Context, filter EntryFilter) ([]Entry, error) {
var (
where []string
values = map[string]interface{}{}
addFilter = func(field string, value any, op compOp) error {
addFilter = func(field string, value any, op sqlCompOp) error {
if reflect.ValueOf(value).IsZero() {
return nil
}
Expand Down Expand Up @@ -303,7 +328,7 @@ func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) {
return nil, err
}

rows, err := a.db.NamedQueryContext(context.TODO(), q, values) // TODO: search needs a ctx!
rows, err := a.db.NamedQueryContext(ctx, q, values)
if err != nil {
return nil, err
}
Expand All @@ -312,17 +337,52 @@ func (a *timescaleAuditing) Search(filter EntryFilter) ([]Entry, error) {
var entries []Entry

for rows.Next() {
var e Entry
var e timescaleEntry

err = rows.StructScan(&e)
if err != nil {
return nil, err
}

e.Id = e.RequestId
entry, err := a.toExternal(e)
if err != nil {
return nil, fmt.Errorf("unable to convert entry: %w", err)
}

entries = append(entries, e)
entries = append(entries, entry)
}

return entries, nil
}

func (_ *timescaleAuditing) toInternal(e Entry) (*timescaleEntry, error) {
intermediate, err := json.Marshal(e) // nolint
if err != nil {
return nil, err
}
var internalEntry timescaleEntry
err = json.Unmarshal(intermediate, &internalEntry) // nolint
if err != nil {
return nil, err
}

internalEntry.RequestId = e.RequestId

return &internalEntry, nil
}

func (_ *timescaleAuditing) toExternal(e timescaleEntry) (Entry, error) {
intermediate, err := json.Marshal(e) // nolint
if err != nil {
return Entry{}, err
}
var externalEntry Entry
err = json.Unmarshal(intermediate, &externalEntry) // nolint
if err != nil {
return Entry{}, err
}

externalEntry.Id = e.RequestId

return externalEntry, nil
}
15 changes: 8 additions & 7 deletions auditing/timescaledb_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
)

func TestAuditing_TimescaleDB(t *testing.T) {
ctx := context.Background()
container, auditing := StartTimescaleDB(t, Config{
Log: slog.Default(),
})
Expand Down Expand Up @@ -94,7 +95,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
{
name: "no entries, no search results",
t: func(t *testing.T, a Auditing) {
entries, err := a.Search(EntryFilter{})
entries, err := a.Search(ctx, EntryFilter{})
require.NoError(t, err)
assert.Empty(t, entries)
},
Expand All @@ -109,7 +110,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
err = a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
Body: "test",
})
require.NoError(t, err)
Expand All @@ -128,7 +129,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
err := a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{})
entries, err := a.Search(ctx, EntryFilter{})
require.NoError(t, err)
assert.Len(t, entries, len(es))

Expand All @@ -138,7 +139,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
t.Errorf("diff (+got -want):\n %s", diff)
}

entries, err = a.Search(EntryFilter{
entries, err = a.Search(ctx, EntryFilter{
Body: "This",
})
require.NoError(t, err)
Expand All @@ -157,7 +158,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
err := a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
RequestId: es[0].RequestId,
})
require.NoError(t, err)
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
err := a.Flush()
require.NoError(t, err)

entries, err := a.Search(EntryFilter{
entries, err := a.Search(ctx, EntryFilter{
Phase: EntryPhaseResponse,
})
require.NoError(t, err)
Expand All @@ -210,7 +211,7 @@ func TestAuditing_TimescaleDB(t *testing.T) {
// err := a.Flush()
// require.NoError(t, err)

// entries, err := a.Search(EntryFilter{
// entries, err := a.Search(ctx, EntryFilter{
// Body: fmt.Sprintf("%q", es[0].Body.(string)),
// })
// require.NoError(t, err)
Expand Down

0 comments on commit e0481fa

Please sign in to comment.