Skip to content

Commit

Permalink
Merge pull request #111 from flow-hydraulics/latenssi/sequence-numbering
Browse files Browse the repository at this point in the history
Allow multiple transactions in one block per key, minor optimization regarding calls to Flow
  • Loading branch information
nanuuki authored Oct 13, 2021
2 parents c5b3a75 + dee15aa commit 122305a
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 23 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ Dev environment
# Needs docker-compose installed
make dev

### Deployment notes

**NOTE:** Currently the PDS backend only supports a single instance setup. This is because of sequence number bookkeeping in `service/flow_helpers/account.go` (see `getSequenceNumber`).

## Testing

cp env.example .env.test
Expand Down
6 changes: 5 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func runServer(cfg *config.Config) error {
}

// Application
app := app.New(cfg, logger, db, flowClient, true)
app, err := app.New(cfg, logger, db, flowClient, true)
if err != nil {
return err
}

defer app.Close()

// HTTP server
Expand Down
10 changes: 7 additions & 3 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,24 @@ type App struct {
quit chan bool // Chan type does not matter as we only use this to 'close'
}

func New(cfg *config.Config, logger *log.Logger, db *gorm.DB, flowClient *client.Client, poll bool) *App {
func New(cfg *config.Config, logger *log.Logger, db *gorm.DB, flowClient *client.Client, poll bool) (*App, error) {
if logger == nil {
panic("no logger")
}

contract := NewContract(cfg, logger, flowClient)
contract, err := NewContract(cfg, logger, flowClient)
if err != nil {
return nil, err
}

quit := make(chan bool)
app := &App{cfg, logger, db, flowClient, contract, quit}

if poll {
go poller(app)
}

return app
return app, nil
}

// Closes allows the poller to close controllably
Expand Down
33 changes: 20 additions & 13 deletions service/app/contract_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,21 @@ func minInt(a int, b int) int {
return a
}

func NewContract(cfg *config.Config, logger *log.Logger, flowClient *client.Client) *Contract {
func NewContract(cfg *config.Config, logger *log.Logger, flowClient *client.Client) (*Contract, error) {
pdsAccount := flow_helpers.GetAccount(
flow.HexToAddress(cfg.AdminAddress),
cfg.AdminPrivateKey,
cfg.AdminPrivateKeyType,
cfg.AdminPrivateKeyIndexes,
)
return &Contract{cfg, logger, flowClient, pdsAccount}
flowAccount, err := flowClient.GetAccount(context.Background(), pdsAccount.Address)
if err != nil {
return nil, err
}
if len(flowAccount.Keys) < len(pdsAccount.KeyIndexes) {
return nil, fmt.Errorf("too many key indexes given for admin account")
}
return &Contract{cfg, logger, flowClient, pdsAccount}, nil
}

// StartSettlement sets the given distributions state to 'settling' and starts the settlement
Expand Down Expand Up @@ -96,7 +103,7 @@ func (c *Contract) StartSettlement(ctx context.Context, db *gorm.DB, dist *Distr
return err // rollback
}

latestBlock, err := c.flowClient.GetLatestBlock(ctx, true)
latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
return err // rollback
}
Expand Down Expand Up @@ -125,7 +132,7 @@ func (c *Contract) StartSettlement(ctx context.Context, db *gorm.DB, dist *Distr
DistributionID: dist.ID,
CurrentCount: 0,
TotalCount: uint(len(collectibles)),
StartAtBlock: latestBlock.Height - 1,
StartAtBlock: latestBlockHeader.Height - 1,
EscrowAddress: common.FlowAddressFromString(c.cfg.AdminAddress),
Collectibles: settlementCollectibles,
}
Expand Down Expand Up @@ -212,7 +219,7 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu
return err // rollback
}

latestBlock, err := c.flowClient.GetLatestBlock(ctx, true)
latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
return err // rollback
}
Expand All @@ -221,7 +228,7 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu
cpc := CirculatingPackContract{
Name: dist.PackTemplate.PackReference.Name,
Address: dist.PackTemplate.PackReference.Address,
StartAtBlock: latestBlock.Height - 1,
StartAtBlock: latestBlockHeader.Height - 1,
}

// Try to find an existing one (CirculatingPackContract)
Expand Down Expand Up @@ -265,7 +272,7 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu
DistributionID: dist.ID,
CurrentCount: 0,
TotalCount: uint(len(packs)),
StartAtBlock: latestBlock.Height - 1,
StartAtBlock: latestBlockHeader.Height - 1,
}

if err := InsertMinting(db, &minting); err != nil {
Expand Down Expand Up @@ -383,13 +390,13 @@ func (c *Contract) UpdateSettlementStatus(ctx context.Context, db *gorm.DB, dist
return err // rollback
}

latestBlock, err := c.flowClient.GetLatestBlock(ctx, true)
latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
return err // rollback
}

begin := settlement.StartAtBlock + 1
end := min(latestBlock.Height, begin+MAX_EVENTS_PER_CHECK)
end := min(latestBlockHeader.Height, begin+MAX_EVENTS_PER_CHECK)

logger = logger.WithFields(log.Fields{
"blockBegin": begin,
Expand Down Expand Up @@ -517,13 +524,13 @@ func (c *Contract) UpdateMintingStatus(ctx context.Context, db *gorm.DB, dist *D
return err // rollback
}

latestBlock, err := c.flowClient.GetLatestBlock(ctx, true)
latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
return err // rollback
}

begin := minting.StartAtBlock + 1
end := min(latestBlock.Height, begin+MAX_EVENTS_PER_CHECK)
end := min(latestBlockHeader.Height, begin+MAX_EVENTS_PER_CHECK)

logger = logger.WithFields(log.Fields{
"blockBegin": begin,
Expand Down Expand Up @@ -674,13 +681,13 @@ func (c *Contract) UpdateCirculatingPack(ctx context.Context, db *gorm.DB, cpc *
OPENED,
}

latestBlock, err := c.flowClient.GetLatestBlock(ctx, true)
latestBlockHeader, err := c.flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
return err // rollback
}

begin := cpc.StartAtBlock + 1
end := min(latestBlock.Height, begin+MAX_EVENTS_PER_CHECK)
end := min(latestBlockHeader.Height, begin+MAX_EVENTS_PER_CHECK)

logger = logger.WithFields(log.Fields{
"blockBegin": begin,
Expand Down
58 changes: 56 additions & 2 deletions service/flow_helpers/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ var accounts map[flow.Address]*Account
var accountsLock = &sync.Mutex{} // Making sure our "accounts" var is a singleton
var keyIndexLock = &sync.Mutex{}

var seqNumLock = &sync.Mutex{}
var lastAccountKeySeqNumber map[flow.Address]map[int]uint64
var lastAccountKeyBlock map[flow.Address]map[int]flow.Identifier

const GOOGLE_KMS_KEY_TYPE = "google_kms"

type Account struct {
Expand Down Expand Up @@ -72,12 +76,16 @@ func (a *Account) KeyIndex() int {
return i
}

func (a *Account) GetProposalKey(ctx context.Context, flowClient *client.Client) (*flow.AccountKey, error) {
func (a *Account) GetProposalKey(ctx context.Context, flowClient *client.Client, referenceBlockID flow.Identifier) (*flow.AccountKey, error) {
account, err := flowClient.GetAccount(ctx, a.Address)
k := account.Keys[a.KeyIndex()]
if err != nil {
return nil, fmt.Errorf("error in flow_helpers.Account.GetProposalKey: %w", err)
}

k := account.Keys[a.KeyIndex()]

k.SequenceNumber = getSequenceNumber(a.Address, k, referenceBlockID)

return k, nil
}

Expand Down Expand Up @@ -120,3 +128,49 @@ func getGoogleKMSSigner(address flow.Address, resourceId string) (crypto.Signer,

return s, nil
}

// getSequenceNumber, is a hack around the fact that GetAccount on Flow Client returns
// the latest SequenceNumber on-chain but it might be outdated as we may be
// sending multiple transactions in the current block
// NOTE: This breaks if running in a multi-instance setup
func getSequenceNumber(address flow.Address, accountKey *flow.AccountKey, currentBlockID flow.Identifier) uint64 {
seqNumLock.Lock()
defer seqNumLock.Unlock()

// Init lastAccountKeySeqNumber
if lastAccountKeySeqNumber == nil {
lastAccountKeySeqNumber = make(map[flow.Address]map[int]uint64)
}

if lastAccountKeySeqNumber[address] == nil {
lastAccountKeySeqNumber[address] = make(map[int]uint64)
}

// Init lastAccountKeyBlock
if lastAccountKeyBlock == nil {
lastAccountKeyBlock = make(map[flow.Address]map[int]flow.Identifier)
}

if lastAccountKeyBlock[address] == nil {
lastAccountKeyBlock[address] = make(map[int]flow.Identifier)
}

useGiven := true

// Check if operating in the same block as before
if prevID, ok := lastAccountKeyBlock[address][accountKey.Index]; ok && prevID == currentBlockID {
// Check if we have a previous number stored and if it is larger or equal to new number
if prevNumber, ok := lastAccountKeySeqNumber[address][accountKey.Index]; ok && accountKey.SequenceNumber <= prevNumber {
lastAccountKeySeqNumber[address][accountKey.Index]++
useGiven = false
}
}

if useGiven {
lastAccountKeySeqNumber[address][accountKey.Index] = accountKey.SequenceNumber
}

lastAccountKeyBlock[address][accountKey.Index] = currentBlockID

return lastAccountKeySeqNumber[address][accountKey.Index]
}
2 changes: 1 addition & 1 deletion service/flow_helpers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

func SignProposeAndPayAs(ctx context.Context, flowClient *client.Client, account *Account, tx *flow.Transaction) error {
key, err := account.GetProposalKey(ctx, flowClient)
key, err := account.GetProposalKey(ctx, flowClient, tx.ReferenceBlockID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions service/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ func (t *StorableTransaction) Prepare(ctx context.Context, flowClient *client.Cl
}
}

latestBlock, err := flowClient.GetLatestBlock(ctx, true)
latestBlockHeader, err := flowClient.GetLatestBlockHeader(ctx, true)
if err != nil {
return nil, err
}

tx.SetReferenceBlockID(latestBlock.ID)
tx.SetReferenceBlockID(latestBlockHeader.ID)

if err := flow_helpers.SignProposeAndPayAs(ctx, flowClient, account, tx); err != nil {
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion test_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func getTestApp(cfg *config.Config, poll bool) (*app.App, func()) {
panic(err)
}

app := app.New(cfg, testLogger, db, flowClient, poll)
app, err := app.New(cfg, testLogger, db, flowClient, poll)
if err != nil {
panic(err)
}

clean := func() {
app.Close()
Expand Down

0 comments on commit 122305a

Please sign in to comment.