Skip to content

Commit

Permalink
fix(ledger): add unique index on ik (#1614)
Browse files Browse the repository at this point in the history
  • Loading branch information
gfyrag authored and flemzord committed Jul 19, 2024
1 parent 409459a commit 141dedb
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 26 deletions.
3 changes: 3 additions & 0 deletions components/ledger/internal/engine/command/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ func (commander *Commander) exec(ctx context.Context, parameters Parameters, scr

return unlock, nil
}()
if err != nil {
return nil, err
}
defer unlock(ctx)

err = func() error {
Expand Down
10 changes: 10 additions & 0 deletions components/ledger/internal/storage/ledgerstore/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ var addIndexOnIdempotencyKey string
//go:embed migrations/6-add-reference-index.sql
var addIndexOnReference string

//go:embed migrations/7-add-ik-unique-index.sql
var addIKUniqueIndex string

type Bucket struct {
name string
db *bun.DB
Expand Down Expand Up @@ -190,6 +193,13 @@ func registerMigrations(migrator *migrations.Migrator, name string) {
return err
},
},
migrations.Migration{
Name: "Add unique index on IK",
UpWithContext: func(ctx context.Context, tx bun.Tx) error {
_, err := tx.ExecContext(ctx, addIKUniqueIndex)
return err
},
},
)
}

Expand Down
48 changes: 22 additions & 26 deletions components/ledger/internal/storage/ledgerstore/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@ import (
"github.com/uptrace/bun"
)

const (
LogTableName = "logs"
)

type Logs struct {
bun.BaseModel `bun:"logs,alias:logs"`

Expand All @@ -33,7 +29,7 @@ type Logs struct {
Hash []byte `bun:"hash,type:bytea"`
Date time.Time `bun:"date,type:timestamptz"`
Data RawMessage `bun:"data,type:jsonb"`
IdempotencyKey string `bun:"idempotency_key,type:varchar(256),unique"`
IdempotencyKey *string `bun:"idempotency_key,type:varchar(256),unique"`
}

func (log *Logs) ToCore() *ledger.ChainedLog {
Expand All @@ -45,10 +41,15 @@ func (log *Logs) ToCore() *ledger.ChainedLog {

return &ledger.ChainedLog{
Log: ledger.Log{
Type: ledger.LogTypeFromString(log.Type),
Data: payload,
Date: log.Date.UTC(),
IdempotencyKey: log.IdempotencyKey,
Type: ledger.LogTypeFromString(log.Type),
Data: payload,
Date: log.Date.UTC(),
IdempotencyKey: func() string {
if log.IdempotencyKey != nil {
return *log.IdempotencyKey
}
return ""
}(),
},
ID: (*big.Int)(log.ID),
Hash: log.Hash,
Expand Down Expand Up @@ -88,16 +89,6 @@ func (store *Store) logsQueryBuilder(q PaginatedQueryOptions[any]) func(*bun.Sel
}

func (store *Store) InsertLogs(ctx context.Context, activeLogs ...*ledger.ChainedLog) error {
//links := make([]trace.Link, 0)
//for _, log := range activeLogs {
// links = append(links, trace.LinkFromContext(log.Context))
//}
//
//ctx, span := tracer.Start(context.Background(), "InsertLogBatch", trace.WithLinks(links...))
//defer span.End()
//
//span.SetAttributes(attribute.Int("count", len(activeLogs)))

_, err := store.bucket.db.
NewInsert().
Model(pointer.For(collectionutils.Map(activeLogs, func(from *ledger.ChainedLog) Logs {
Expand All @@ -107,13 +98,18 @@ func (store *Store) InsertLogs(ctx context.Context, activeLogs ...*ledger.Chaine
}

return Logs{
Ledger: store.name,
ID: (*bunpaginate.BigInt)(from.ID),
Type: from.Type.String(),
Hash: from.Hash,
Date: from.Date,
Data: data,
IdempotencyKey: from.IdempotencyKey,
Ledger: store.name,
ID: (*bunpaginate.BigInt)(from.ID),
Type: from.Type.String(),
Hash: from.Hash,
Date: from.Date,
Data: data,
IdempotencyKey: func() *string {
if from.IdempotencyKey != "" {
return &from.IdempotencyKey
}
return nil
}(),
}
}))).
Exec(ctx)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
update logs
set idempotency_key = null
where idempotency_key = '';

update logs
set idempotency_key = null
where id in (
select unnest(duplicateLogIds.ids[2:]) as id
from (
select array_agg(id order by id) as ids
from logs l
where idempotency_key is not null
group by idempotency_key
having count(*) > 1
) duplicateLogIds
);

drop index logs_idempotency_key;

create unique index logs_idempotency_key on logs (idempotency_key);

0 comments on commit 141dedb

Please sign in to comment.