This repository has been archived by the owner on Oct 6, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdatabase.go
126 lines (102 loc) · 3.22 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package storage
import (
"context"
"github.com/Worldcoin/hubble-commander/config"
"github.com/Worldcoin/hubble-commander/db"
bdg "github.com/dgraph-io/badger/v3"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
)
var databaseTracer = otel.Tracer("database")
type Database struct {
Badger *db.Database
}
func NewDatabase(cfg *config.Config) (*Database, error) {
badgerDB, err := db.NewDatabase(cfg.Badger)
if err != nil {
return nil, err
}
database := &Database{
Badger: badgerDB,
}
if cfg.Bootstrap.Prune {
err = database.Badger.Prune()
if err != nil {
return nil, err
}
log.Debug("Badger database was pruned")
}
return database, nil
}
func (d *Database) BeginTransaction(opts TxOptions) (*db.TxController, *Database) {
database := *d
badgerTx, badgerDB := d.Badger.BeginTransaction(!opts.ReadOnly)
database.Badger = badgerDB
return badgerTx, &database
}
func (d *Database) ExecuteInTransactionWithSpan(
ctx context.Context,
opts TxOptions,
fn func(txCtx context.Context, txDatabase *Database) error,
) error {
retries := 0
err := d.unsafeExecuteInTransactionWithSpan(ctx, retries, opts, fn)
for errors.Is(err, bdg.ErrConflict) {
// nb. if we were already inside a transaction when this function is
// called then we run inside the outer transaction, so the `Commit`
// call is a no-op and this retry logic will never get a chance to
// fire
log.WithError(err).Warn("Retrying transaction due to conflict")
err = d.unsafeExecuteInTransactionWithSpan(ctx, retries, opts, fn)
retries += 1
}
return err
}
// all errors are already wrapped w stack traces, except errors fn returns
func (d *Database) ExecuteInTransaction(opts TxOptions, fn func(txDatabase *Database) error) error {
err := d.unsafeExecuteInTransaction(opts, fn)
if errors.Is(err, bdg.ErrConflict) {
// nb. if we were already inside a transaction when this function is
// called then we run inside the outer transaction, so the `Commit`
// call is a no-op and this retry logic will never get a chance to
// fire
log.Debug("ExecuteInTransaction transaction conflicted, trying again")
return d.ExecuteInTransaction(opts, fn)
}
return err
}
func (d *Database) unsafeExecuteInTransaction(opts TxOptions, fn func(txDatabase *Database) error) (err error) {
txController, txDatabase := d.BeginTransaction(opts)
defer txController.Rollback(&err)
err = fn(txDatabase)
if err != nil {
return err
}
return txController.Commit()
}
func (d *Database) unsafeExecuteInTransactionWithSpan(
ctx context.Context,
retries int,
opts TxOptions,
fn func(txCtx context.Context, txDatabase *Database) error,
) (err error) {
spanCtx, span := databaseTracer.Start(ctx, "database.ExecuteInTransaction")
defer span.End()
span.SetAttributes(attribute.Int("hubble.database.tx_retries", retries))
txController, txDatabase := d.BeginTransaction(opts)
defer txController.Rollback(&err)
err = fn(spanCtx, txDatabase)
if err != nil {
return err
}
return func() error {
_, innerSpan := databaseTracer.Start(spanCtx, "database.Commit")
defer innerSpan.End()
return txController.Commit()
}()
}
func (d *Database) Close() error {
return d.Badger.Close()
}