From a44860afc1d63f94a47d7c98264eb13610590dc5 Mon Sep 17 00:00:00 2001 From: Bruno Luiz Silva Date: Fri, 5 Mar 2021 18:25:00 +0000 Subject: [PATCH] feat: database automatic clean-ups based on storage-max-age --- .air.conf | 2 +- README.md | 2 +- cmd/jornada/main.go | 27 ++++++++++-- go.mod | 1 + go.sum | 2 + internal/cleaner/cleaner.go | 74 ++++++++++++++++++++++++++++++++ internal/repo/events_badger.go | 50 +++++++++++++++++++++ internal/repo/sessions_sql.go | 26 ++++++++++- internal/search/v1/search.go | 2 +- internal/server/http.go | 6 ++- internal/server/sessions_http.go | 3 +- 11 files changed, 186 insertions(+), 9 deletions(-) create mode 100644 internal/cleaner/cleaner.go diff --git a/.air.conf b/.air.conf index c63d43e..5234050 100644 --- a/.air.conf +++ b/.air.conf @@ -5,7 +5,7 @@ tmp_dir = ".tmp" [build] cmd = "go build -o ./.tmp/jornada --tags 'json1' ./cmd/jornada" bin = ".tmp/jornada" -full_bin = "./.tmp/jornada --address 127.0.0.1 --log-level debug" +full_bin = "./.tmp/jornada --address 127.0.0.1 --log-level debug --storage-max-age 48h" include_ext = ["go", "tpl", "tmpl", "html", ".env", "js"] exclude_dir = ["assets", ".tmp", "vendor", "frontend/node_modules"] include_dir = [] diff --git a/README.md b/README.md index 75f32ad..799891c 100644 --- a/README.md +++ b/README.md @@ -104,7 +104,7 @@ If you want to contribute with Jornada, you might need to run from the source. T - [x] Support filter and search (based on meta or client data) - [x] Support for metrics - [x] Paginate results -- [ ] Support database automatic clean-ups, based on configurations +- [x] Support database automatic clean-ups, based on configurations - [ ] Nice error pages - [ ] SQL hooks for query logging and timing (or tracing of some sort) - [ ] Tweak SQLite diff --git a/cmd/jornada/main.go b/cmd/jornada/main.go index bf28815..2e8c485 100644 --- a/cmd/jornada/main.go +++ b/cmd/jornada/main.go @@ -1,8 +1,12 @@ package main import ( + "context" "os" + "os/signal" + "time" + "github.com/brunoluiz/jornada/internal/cleaner" "github.com/brunoluiz/jornada/internal/op/logger" "github.com/brunoluiz/jornada/internal/repo" "github.com/brunoluiz/jornada/internal/server" @@ -10,6 +14,7 @@ import ( "github.com/brunoluiz/jornada/internal/storage/sqldb" _ "github.com/joho/godotenv/autoload" "github.com/urfave/cli/v2" + "golang.org/x/sync/errgroup" ) func main() { @@ -22,6 +27,7 @@ func main() { &cli.StringSliceFlag{Name: "allowed-origins", Value: cli.NewStringSlice("*"), EnvVars: []string{"ALLOWED_ORIGINS"}, Usage: "CORS allowed origins"}, &cli.StringFlag{Name: "db-dsn", Value: "sqlite:///tmp/jornada.db?cache=shared&mode=rwc&_journal_mode=WAL", EnvVars: []string{"DB_DSN"}, Usage: "DSN for SQL database (see github.com/mattn/go-sqlite3 for more options)"}, &cli.StringFlag{Name: "events-dsn", Value: "badger:///tmp/jornada.events", EnvVars: []string{"EVENTS_DSN"}, Usage: "Events storage path (BadgerDB)"}, + &cli.DurationFlag{Name: "storage-max-age", Value: time.Hour * 24 * 14, EnvVars: []string{"STORAGE_MAX_AGE"}, Usage: "How long should Jornada keep sessions stored in database (14 days by default)"}, &cli.StringFlag{Name: "log-level", Value: "info", EnvVars: []string{"LOG_LEVEL"}, Usage: "Log level"}, }, Action: run, @@ -33,7 +39,7 @@ func main() { } func run(c *cli.Context) error { - ctx := c.Context + ctx, _ := signal.NotifyContext(c.Context, os.Interrupt) log := logger.New(c.String("log-level")) b, err := badgerdb.New(c.String("events-dsn"), log) @@ -54,7 +60,9 @@ func run(c *cli.Context) error { return err } - server, err := server.New( + clean := cleaner.New(c.Duration("storage-max-age"), recordings, events) + + svc, err := server.New( log, recordings, events, @@ -69,5 +77,18 @@ func run(c *cli.Context) error { return err } - return server.Run() + return waiter(ctx, clean.Run, svc.Run) +} + +func waiter(ctx context.Context, runners ...func(context.Context) error) error { + eg, ctx := errgroup.WithContext(ctx) + + for _, runner := range runners { + r := runner + eg.Go(func() error { + return r(ctx) + }) + } + + return eg.Wait() } diff --git a/go.mod b/go.mod index e18b421..510a6f6 100644 --- a/go.mod +++ b/go.mod @@ -19,4 +19,5 @@ require ( github.com/stretchr/testify v1.5.1 github.com/ua-parser/uap-go v0.0.0-20210121150957-347a3497cc39 github.com/urfave/cli/v2 v2.3.0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/go.sum b/go.sum index 2dc21ad..547af5a 100644 --- a/go.sum +++ b/go.sum @@ -403,6 +403,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/cleaner/cleaner.go b/internal/cleaner/cleaner.go new file mode 100644 index 0000000..dbaddbe --- /dev/null +++ b/internal/cleaner/cleaner.go @@ -0,0 +1,74 @@ +package cleaner + +import ( + "context" + "log" + "time" + + "github.com/brunoluiz/jornada/internal/repo" +) + +// BulkDeleter whatever requires deleting +type BulkDeleter interface { + Delete(ctx context.Context, id ...string) error +} + +// SessionRepository interfaces with session storage +type SessionRepository interface { + BulkDeleter + Get(ctx context.Context, opts ...repo.GetOpt) ([]repo.Session, error) +} + +// Cleaner finds old records using session repository and then deletes items older than StorageMaxAge +type Cleaner struct { + StorageMaxAge time.Duration + Sessions SessionRepository + Events BulkDeleter +} + +// New return Cleaner instance +func New(t time.Duration, session SessionRepository, events BulkDeleter) *Cleaner { + return &Cleaner{t, session, events} +} + +// Run run ticker which cleans-up old registers +func (c *Cleaner) Run(ctx context.Context) error { + ticker := time.NewTicker(time.Hour) + defer ticker.Stop() + + if err := c.run(ctx); err != nil { + return err + } + + select { + case <-ticker.C: + if err := c.run(ctx); err != nil { + return err + } + case <-ctx.Done(): + return nil + } + + return nil +} + +func (c *Cleaner) run(ctx context.Context) error { + t := time.Now().Add(-c.StorageMaxAge) + + sessions, err := c.Sessions.Get(ctx, repo.WithUpdatedAtUntil(t)) + if err != nil { + return err + } + + ids := make([]string, 0, len(sessions)) + for _, session := range sessions { + ids = append(ids, session.ID) + } + + if err := c.Events.Delete(ctx, ids...); err != nil { + log.Println(err) + return err + } + + return c.Sessions.Delete(ctx, ids...) +} diff --git a/internal/repo/events_badger.go b/internal/repo/events_badger.go index 0ffa9a7..1927b8b 100644 --- a/internal/repo/events_badger.go +++ b/internal/repo/events_badger.go @@ -72,6 +72,56 @@ func (store *EventBadgerV2) Get(ctx context.Context, sessionID string, cb func(b }) } +// Delete delete a specified set of IDs +func (store *EventBadgerV2) Delete(ctx context.Context, ids ...string) error { + collectSize := 100000 + return store.db.View(func(txn *badger.Txn) error { + opts := badger.DefaultIteratorOptions + opts.AllVersions = false + opts.PrefetchValues = false + it := txn.NewIterator(opts) + defer it.Close() + + keysForDelete := make([][]byte, 0, collectSize) + keysCollected := 0 + for _, id := range ids { + sessionID := id + + for it.Seek(store.messageKey(sessionID, 1)); it.ValidForPrefix([]byte(store.id(sessionID))); it.Next() { + key := it.Item().KeyCopy(nil) + keysForDelete = append(keysForDelete, key) + keysCollected++ + if keysCollected == collectSize { + if err := store.deleteKeys(keysForDelete); err != nil { + return err + } + keysForDelete = make([][]byte, 0, collectSize) + keysCollected = 0 + } + } + } + + if keysCollected > 0 { + if err := store.deleteKeys(keysForDelete); err != nil { + return err + } + } + + return nil + }) +} + +func (store *EventBadgerV2) deleteKeys(keysForDelete [][]byte) error { + return store.db.Update(func(txn *badger.Txn) error { + for _, key := range keysForDelete { + if err := txn.Delete(key); err != nil { + return err + } + } + return nil + }) +} + // lastSequence gets last ID saved in DB func (store *EventBadgerV2) lastSequence(tx *badger.Txn, id string) (uint64, error) { it := tx.NewIterator(badger.IteratorOptions{ diff --git a/internal/repo/sessions_sql.go b/internal/repo/sessions_sql.go index 05645a8..352fed9 100644 --- a/internal/repo/sessions_sql.go +++ b/internal/repo/sessions_sql.go @@ -27,11 +27,13 @@ type ( Name string `json:"name"` } + // OS details about session's OS OS struct { Name string `json:"name"` Version string `json:"version"` } + // Browser details about session's browser Browser struct { Name string `json:"name"` Version string `json:"version"` @@ -51,6 +53,7 @@ type ( UpdatedAt time.Time `json:"updatedAt"` } + // GetOpt configure Get query builder GetOpt func(b *sq.SelectBuilder) ) @@ -169,18 +172,27 @@ func (store *SessionSQL) GetByID(ctx context.Context, id string) (out Session, e return res[0], nil } +// WithSearchFilter filter query using search/v1 query output func WithSearchFilter(cond string, params []interface{}) func(b *sq.SelectBuilder) { return func(b *sq.SelectBuilder) { *b = b.Where(cond, params...) } } +// WithPagination filter query with offset and limit func WithPagination(offset uint64, limit uint64) func(b *sq.SelectBuilder) { return func(b *sq.SelectBuilder) { *b = b.Offset(offset).Limit(limit) } } +// WithUpdatedAtUntil filter query with updated_at <= time.Time +func WithUpdatedAtUntil(updatedAt time.Time) func(b *sq.SelectBuilder) { + return func(b *sq.SelectBuilder) { + *b = b.Where("updated_at <= ?", updatedAt) + } +} + // Get get all available resources func (store *SessionSQL) Get(ctx context.Context, opts ...GetOpt) (out []Session, err error) { q := sq.Select(`s.id, s.client_id, s.user_agent, device, os.name, os.version, browser.name, browser.version, s.updated_at, s.meta, user.id, user.name, user.email`). @@ -220,7 +232,19 @@ func (store *SessionSQL) Get(ctx context.Context, opts ...GetOpt) (out []Session return out, nil } -func scanSession(rs *sql.Rows) (Session, error) { +// Delete delete a specified set of IDs +func (store *SessionSQL) Delete(ctx context.Context, ids ...string) error { + q := sq.Delete("sessions").Where(sq.Eq{"id": ids}) + sql, params, err := q.ToSql() + if err != nil { + return err + } + + _, err = store.db.ExecContext(ctx, sql, params...) + return err +} + +func scanSession(rs sq.RowScanner) (Session, error) { var meta []byte var session Session err := rs.Scan( diff --git a/internal/search/v1/search.go b/internal/search/v1/search.go index 1bd25fa..fd2fc09 100644 --- a/internal/search/v1/search.go +++ b/internal/search/v1/search.go @@ -22,7 +22,7 @@ func ToSQL(in string) (out string, params []interface{}, err error) { // Replace all " with ' out = strings.ReplaceAll(in, "\"", "'") - // Replace all possibily dangerous expressions + // Replace all possible dangerous expressions out = dangerousRegex.ReplaceAllString(in, "") // Transform all `meta.foo = 'bar'` into `meta.key = 'foo' and meta.value = 'bar'` diff --git a/internal/server/http.go b/internal/server/http.go index 570b54a..bfd0095 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -91,7 +91,7 @@ func New( } // Run start serving requests through configurations done in *Server -func (s *Server) Run() error { +func (s *Server) Run(_ context.Context) error { s.log.Infof("Running ⚡️ %s", s.config.Addr) return s.server.ListenAndServe() } @@ -106,6 +106,10 @@ func (s *Server) Close() error { // Error handle http errors func (s *Server) Error(w http.ResponseWriter, r *http.Request, err error, code int) { + if err == nil { + return + } + s.log.Error(err) http.Error(w, err.Error(), code) } diff --git a/internal/server/sessions_http.go b/internal/server/sessions_http.go index 770366a..fb48d8d 100644 --- a/internal/server/sessions_http.go +++ b/internal/server/sessions_http.go @@ -71,7 +71,8 @@ func (s *Server) registerSessionRoutes(r *chi.Mux) error { data, err := s.sessions.Get(r.Context(), opts...) if err != nil { - t.ExecuteTemplate(w, templatePathSessionList, sessionListParams{Sessions: data, URL: s.config.PublicURL, Query: query, Error: err, NextPage: -1, PrevPage: -1}) + err = t.ExecuteTemplate(w, templatePathSessionList, sessionListParams{Sessions: data, URL: s.config.PublicURL, Query: query, Error: err, NextPage: -1, PrevPage: -1}) + s.Error(w, r, err, http.StatusInternalServerError) return }