Skip to content
This repository has been archived by the owner on Apr 26, 2022. It is now read-only.

Commit

Permalink
feat: database automatic clean-ups based on storage-max-age
Browse files Browse the repository at this point in the history
  • Loading branch information
brunoluiz committed Mar 5, 2021
1 parent 7358781 commit a44860a
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 9 deletions.
2 changes: 1 addition & 1 deletion .air.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ tmp_dir = ".tmp"
[build]
cmd = "go build -o ./.tmp/jornada --tags 'json1' ./cmd/jornada"
bin = ".tmp/jornada"
full_bin = "./.tmp/jornada --address 127.0.0.1 --log-level debug"
full_bin = "./.tmp/jornada --address 127.0.0.1 --log-level debug --storage-max-age 48h"
include_ext = ["go", "tpl", "tmpl", "html", ".env", "js"]
exclude_dir = ["assets", ".tmp", "vendor", "frontend/node_modules"]
include_dir = []
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ If you want to contribute with Jornada, you might need to run from the source. T
- [x] Support filter and search (based on meta or client data)
- [x] Support for metrics
- [x] Paginate results
- [ ] Support database automatic clean-ups, based on configurations
- [x] Support database automatic clean-ups, based on configurations
- [ ] Nice error pages
- [ ] SQL hooks for query logging and timing (or tracing of some sort)
- [ ] Tweak SQLite
Expand Down
27 changes: 24 additions & 3 deletions cmd/jornada/main.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package main

import (
"context"
"os"
"os/signal"
"time"

"github.com/brunoluiz/jornada/internal/cleaner"
"github.com/brunoluiz/jornada/internal/op/logger"
"github.com/brunoluiz/jornada/internal/repo"
"github.com/brunoluiz/jornada/internal/server"
"github.com/brunoluiz/jornada/internal/storage/badgerdb"
"github.com/brunoluiz/jornada/internal/storage/sqldb"
_ "github.com/joho/godotenv/autoload"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
)

func main() {
Expand All @@ -22,6 +27,7 @@ func main() {
&cli.StringSliceFlag{Name: "allowed-origins", Value: cli.NewStringSlice("*"), EnvVars: []string{"ALLOWED_ORIGINS"}, Usage: "CORS allowed origins"},
&cli.StringFlag{Name: "db-dsn", Value: "sqlite:///tmp/jornada.db?cache=shared&mode=rwc&_journal_mode=WAL", EnvVars: []string{"DB_DSN"}, Usage: "DSN for SQL database (see github.com/mattn/go-sqlite3 for more options)"},
&cli.StringFlag{Name: "events-dsn", Value: "badger:///tmp/jornada.events", EnvVars: []string{"EVENTS_DSN"}, Usage: "Events storage path (BadgerDB)"},
&cli.DurationFlag{Name: "storage-max-age", Value: time.Hour * 24 * 14, EnvVars: []string{"STORAGE_MAX_AGE"}, Usage: "How long should Jornada keep sessions stored in database (14 days by default)"},
&cli.StringFlag{Name: "log-level", Value: "info", EnvVars: []string{"LOG_LEVEL"}, Usage: "Log level"},
},
Action: run,
Expand All @@ -33,7 +39,7 @@ func main() {
}

func run(c *cli.Context) error {
ctx := c.Context
ctx, _ := signal.NotifyContext(c.Context, os.Interrupt)
log := logger.New(c.String("log-level"))

b, err := badgerdb.New(c.String("events-dsn"), log)
Expand All @@ -54,7 +60,9 @@ func run(c *cli.Context) error {
return err
}

server, err := server.New(
clean := cleaner.New(c.Duration("storage-max-age"), recordings, events)

svc, err := server.New(
log,
recordings,
events,
Expand All @@ -69,5 +77,18 @@ func run(c *cli.Context) error {
return err
}

return server.Run()
return waiter(ctx, clean.Run, svc.Run)
}

func waiter(ctx context.Context, runners ...func(context.Context) error) error {
eg, ctx := errgroup.WithContext(ctx)

for _, runner := range runners {
r := runner
eg.Go(func() error {
return r(ctx)
})
}

return eg.Wait()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ require (
github.com/stretchr/testify v1.5.1
github.com/ua-parser/uap-go v0.0.0-20210121150957-347a3497cc39
github.com/urfave/cli/v2 v2.3.0
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,8 @@ golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
74 changes: 74 additions & 0 deletions internal/cleaner/cleaner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cleaner

import (
"context"
"log"
"time"

"github.com/brunoluiz/jornada/internal/repo"
)

// BulkDeleter whatever requires deleting
type BulkDeleter interface {
Delete(ctx context.Context, id ...string) error
}

// SessionRepository interfaces with session storage
type SessionRepository interface {
BulkDeleter
Get(ctx context.Context, opts ...repo.GetOpt) ([]repo.Session, error)
}

// Cleaner finds old records using session repository and then deletes items older than StorageMaxAge
type Cleaner struct {
StorageMaxAge time.Duration
Sessions SessionRepository
Events BulkDeleter
}

// New return Cleaner instance
func New(t time.Duration, session SessionRepository, events BulkDeleter) *Cleaner {
return &Cleaner{t, session, events}
}

// Run run ticker which cleans-up old registers
func (c *Cleaner) Run(ctx context.Context) error {
ticker := time.NewTicker(time.Hour)
defer ticker.Stop()

if err := c.run(ctx); err != nil {
return err
}

select {
case <-ticker.C:
if err := c.run(ctx); err != nil {
return err
}
case <-ctx.Done():
return nil
}

return nil
}

func (c *Cleaner) run(ctx context.Context) error {
t := time.Now().Add(-c.StorageMaxAge)

sessions, err := c.Sessions.Get(ctx, repo.WithUpdatedAtUntil(t))
if err != nil {
return err
}

ids := make([]string, 0, len(sessions))
for _, session := range sessions {
ids = append(ids, session.ID)
}

if err := c.Events.Delete(ctx, ids...); err != nil {
log.Println(err)
return err
}

return c.Sessions.Delete(ctx, ids...)
}
50 changes: 50 additions & 0 deletions internal/repo/events_badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,56 @@ func (store *EventBadgerV2) Get(ctx context.Context, sessionID string, cb func(b
})
}

// Delete delete a specified set of IDs
func (store *EventBadgerV2) Delete(ctx context.Context, ids ...string) error {
collectSize := 100000
return store.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.AllVersions = false
opts.PrefetchValues = false
it := txn.NewIterator(opts)
defer it.Close()

keysForDelete := make([][]byte, 0, collectSize)
keysCollected := 0
for _, id := range ids {
sessionID := id

for it.Seek(store.messageKey(sessionID, 1)); it.ValidForPrefix([]byte(store.id(sessionID))); it.Next() {
key := it.Item().KeyCopy(nil)
keysForDelete = append(keysForDelete, key)
keysCollected++
if keysCollected == collectSize {
if err := store.deleteKeys(keysForDelete); err != nil {
return err
}
keysForDelete = make([][]byte, 0, collectSize)
keysCollected = 0
}
}
}

if keysCollected > 0 {
if err := store.deleteKeys(keysForDelete); err != nil {
return err
}
}

return nil
})
}

func (store *EventBadgerV2) deleteKeys(keysForDelete [][]byte) error {
return store.db.Update(func(txn *badger.Txn) error {
for _, key := range keysForDelete {
if err := txn.Delete(key); err != nil {
return err
}
}
return nil
})
}

// lastSequence gets last ID saved in DB
func (store *EventBadgerV2) lastSequence(tx *badger.Txn, id string) (uint64, error) {
it := tx.NewIterator(badger.IteratorOptions{
Expand Down
26 changes: 25 additions & 1 deletion internal/repo/sessions_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ type (
Name string `json:"name"`
}

// OS details about session's OS
OS struct {
Name string `json:"name"`
Version string `json:"version"`
}

// Browser details about session's browser
Browser struct {
Name string `json:"name"`
Version string `json:"version"`
Expand All @@ -51,6 +53,7 @@ type (
UpdatedAt time.Time `json:"updatedAt"`
}

// GetOpt configure Get query builder
GetOpt func(b *sq.SelectBuilder)
)

Expand Down Expand Up @@ -169,18 +172,27 @@ func (store *SessionSQL) GetByID(ctx context.Context, id string) (out Session, e
return res[0], nil
}

// WithSearchFilter filter query using search/v1 query output
func WithSearchFilter(cond string, params []interface{}) func(b *sq.SelectBuilder) {
return func(b *sq.SelectBuilder) {
*b = b.Where(cond, params...)
}
}

// WithPagination filter query with offset and limit
func WithPagination(offset uint64, limit uint64) func(b *sq.SelectBuilder) {
return func(b *sq.SelectBuilder) {
*b = b.Offset(offset).Limit(limit)
}
}

// WithUpdatedAtUntil filter query with updated_at <= time.Time
func WithUpdatedAtUntil(updatedAt time.Time) func(b *sq.SelectBuilder) {
return func(b *sq.SelectBuilder) {
*b = b.Where("updated_at <= ?", updatedAt)
}
}

// Get get all available resources
func (store *SessionSQL) Get(ctx context.Context, opts ...GetOpt) (out []Session, err error) {
q := sq.Select(`s.id, s.client_id, s.user_agent, device, os.name, os.version, browser.name, browser.version, s.updated_at, s.meta, user.id, user.name, user.email`).
Expand Down Expand Up @@ -220,7 +232,19 @@ func (store *SessionSQL) Get(ctx context.Context, opts ...GetOpt) (out []Session
return out, nil
}

func scanSession(rs *sql.Rows) (Session, error) {
// Delete delete a specified set of IDs
func (store *SessionSQL) Delete(ctx context.Context, ids ...string) error {
q := sq.Delete("sessions").Where(sq.Eq{"id": ids})
sql, params, err := q.ToSql()
if err != nil {
return err
}

_, err = store.db.ExecContext(ctx, sql, params...)
return err
}

func scanSession(rs sq.RowScanner) (Session, error) {
var meta []byte
var session Session
err := rs.Scan(
Expand Down
2 changes: 1 addition & 1 deletion internal/search/v1/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func ToSQL(in string) (out string, params []interface{}, err error) {
// Replace all " with '
out = strings.ReplaceAll(in, "\"", "'")

// Replace all possibily dangerous expressions
// Replace all possible dangerous expressions
out = dangerousRegex.ReplaceAllString(in, "")

// Transform all `meta.foo = 'bar'` into `meta.key = 'foo' and meta.value = 'bar'`
Expand Down
6 changes: 5 additions & 1 deletion internal/server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func New(
}

// Run start serving requests through configurations done in *Server
func (s *Server) Run() error {
func (s *Server) Run(_ context.Context) error {
s.log.Infof("Running ⚡️ %s", s.config.Addr)
return s.server.ListenAndServe()
}
Expand All @@ -106,6 +106,10 @@ func (s *Server) Close() error {

// Error handle http errors
func (s *Server) Error(w http.ResponseWriter, r *http.Request, err error, code int) {
if err == nil {
return
}

s.log.Error(err)
http.Error(w, err.Error(), code)
}
3 changes: 2 additions & 1 deletion internal/server/sessions_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ func (s *Server) registerSessionRoutes(r *chi.Mux) error {

data, err := s.sessions.Get(r.Context(), opts...)
if err != nil {
t.ExecuteTemplate(w, templatePathSessionList, sessionListParams{Sessions: data, URL: s.config.PublicURL, Query: query, Error: err, NextPage: -1, PrevPage: -1})
err = t.ExecuteTemplate(w, templatePathSessionList, sessionListParams{Sessions: data, URL: s.config.PublicURL, Query: query, Error: err, NextPage: -1, PrevPage: -1})
s.Error(w, r, err, http.StatusInternalServerError)
return
}

Expand Down

0 comments on commit a44860a

Please sign in to comment.