diff --git a/README.md b/README.md index 2b6a4f3..260799b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,8 @@ -Deployment +# Flow Pack Distribution Service + +## Running the PDS backend service + +Docker cp env.example .env # edit according to your setup docker run -p 3000:3000 --env-file .env ghcr.io/flow-hydraulics/flow-pds:latest @@ -6,15 +10,60 @@ Deployment Dev environment cp env.example .env - cp env.example .env.test - # If docker-compose installed + # Needs docker-compose installed make dev -Test +## Testing + + cp env.example .env.test # Standalone (can NOT have emulator running in docker) ./tests-with-emulator.sh # With docker-compose environment ("make dev" above) go test -v + + +## Project layout + +Backend service code: `./service` + +Contract code (test, deploy): `./go-contracts` + +API spec: +- `./models` +- `./reference` + +Simple API tests: `./api-scripts` + +Cadence source code: +- `./cadence-contracts` +- `./cadence-scripts` +- `./cadence-transactions` + +## Configuration + +### Database + +| Config variable | Environment variable | Description | Default | Examples | +| --------------- | :-------------------------- | ------------------------------------------------------------------------------------------------ | ----------- | ------------------------- | +| DatabaseType | `FLOW_PDS_DATABASE_DSN` | Type of database driver | `sqlite` | `sqlite`, `psql`, `mysql` | +| DatabaseDSN | `FLOW_PDS_DATABASE_TYPE` | Data source name ([DSN](https://en.wikipedia.org/wiki/Data_source_name)) for database connection | `pds.db` | See below | + +Examples of Database DSN + + mysql://john:pass@localhost:3306/my_db + + postgresql://postgres:postgres@localhost:5432/postgres + + user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local + + host=localhost user=gorm password=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai + +For more: https://gorm.io/docs/connecting_to_the_database.html + + +### All possible configuration variables + +Refer to [service/config/config.go](service/config/config.go) for details and documentation. diff --git a/api/misc.http b/api-scipts/distributions.http similarity index 100% rename from api/misc.http rename to api-scipts/distributions.http diff --git a/main.go b/main.go index f4cb479..1c2d80b 100644 --- a/main.go +++ b/main.go @@ -3,16 +3,16 @@ package main import ( "flag" "fmt" - "log" + "os" "github.com/flow-hydraulics/flow-pds/service/app" "github.com/flow-hydraulics/flow-pds/service/common" "github.com/flow-hydraulics/flow-pds/service/config" - "github.com/flow-hydraulics/flow-pds/service/errors" "github.com/flow-hydraulics/flow-pds/service/http" "github.com/flow-hydraulics/flow-pds/service/transactions" "github.com/onflow/flow-go-sdk/client" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" ) @@ -23,6 +23,10 @@ var ( buildTime string // when the executable was built ) +func init() { + log.SetLevel(log.InfoLevel) +} + func main() { var ( printVersion bool @@ -58,13 +62,12 @@ func main() { func runServer(cfg *config.Config) error { if cfg == nil { - return &errors.NilConfigError{} + return fmt.Errorf("config not provided") } - // Application wide loggers - logServer := log.New(os.Stdout, "[SERVER] ", log.LstdFlags|log.Lshortfile) + logger := log.New() - logServer.Printf("Starting server (v%s)...\n", version) + logger.Printf("Starting server (v%s)...\n", version) // Flow client // TODO: WithInsecure()? @@ -74,7 +77,7 @@ func runServer(cfg *config.Config) error { } defer func() { if err := flowClient.Close(); err != nil { - logServer.Println(err) + logger.Println(err) } }() @@ -94,11 +97,11 @@ func runServer(cfg *config.Config) error { } // Application - app := app.New(cfg, db, flowClient, true) + app := app.New(cfg, logger, db, flowClient, true) defer app.Close() // HTTP server - server := http.NewServer(cfg, logServer, app) + server := http.NewServer(cfg, logger, app) server.ListenAndServe() diff --git a/service/app/app.go b/service/app/app.go index f578eea..4964a4c 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -6,21 +6,28 @@ import ( "github.com/flow-hydraulics/flow-pds/service/config" "github.com/google/uuid" "github.com/onflow/flow-go-sdk/client" + log "github.com/sirupsen/logrus" "gorm.io/gorm" ) +// App handles all the application logic and interfaces directly with the database type App struct { cfg *config.Config + logger *log.Logger db *gorm.DB flowClient *client.Client contract *Contract quit chan bool // Chan type does not matter as we only use this to 'close' } -func New(cfg *config.Config, db *gorm.DB, flowClient *client.Client, poll bool) *App { - contract := NewContract(cfg, flowClient) +func New(cfg *config.Config, logger *log.Logger, db *gorm.DB, flowClient *client.Client, poll bool) *App { + if logger == nil { + panic("no logger") + } + + contract := NewContract(cfg, logger, flowClient) quit := make(chan bool) - app := &App{cfg, db, flowClient, contract, quit} + app := &App{cfg, logger, db, flowClient, contract, quit} if poll { go poller(app) @@ -29,10 +36,12 @@ func New(cfg *config.Config, db *gorm.DB, flowClient *client.Client, poll bool) return app } +// Closes allows the poller to close controllably func (app *App) Close() { close(app.quit) } +// CreateDistribution validates a distribution, resolves it and stores it in database func (app *App) CreateDistribution(ctx context.Context, distribution *Distribution) error { if err := distribution.Validate(); err != nil { return err @@ -49,12 +58,15 @@ func (app *App) CreateDistribution(ctx context.Context, distribution *Distributi return nil } +// ListDistributions lists all distributions in the database. Uses 'limit' and 'offset' to +// limit the fetched slice size. func (app *App) ListDistributions(ctx context.Context, limit, offset int) ([]Distribution, error) { opt := ParseListOptions(limit, offset) return ListDistributions(app.db, opt) } +// GetDistribution returns a distribution from database based on its offchain ID (uuid). func (app *App) GetDistribution(ctx context.Context, id uuid.UUID) (*Distribution, error) { distribution, err := GetDistribution(app.db, id) if err != nil { @@ -64,6 +76,8 @@ func (app *App) GetDistribution(ctx context.Context, id uuid.UUID) (*Distributio return distribution, nil } +// CancelDistribution cancels a distribution. Spec and implementation for +// distribution cancelling is not finished yet. func (app *App) CancelDistribution(ctx context.Context, id uuid.UUID) error { return app.db.Transaction(func(tx *gorm.DB) error { distribution, err := GetDistribution(tx, id) @@ -79,6 +93,7 @@ func (app *App) CancelDistribution(ctx context.Context, id uuid.UUID) error { }) } +// GetPack returns a pack from database based on its offchain ID (uuid). func (app *App) GetPack(ctx context.Context, id uuid.UUID) (*Pack, error) { pack, err := GetPack(app.db, id) if err != nil { diff --git a/service/app/circulating_pack.go b/service/app/circulating_pack.go index e178c92..e5f29ed 100644 --- a/service/app/circulating_pack.go +++ b/service/app/circulating_pack.go @@ -8,7 +8,8 @@ import ( "gorm.io/gorm" ) -// CirculatingPackContract represents the contract of a pack NFT that has been put into circulation. +// CirculatingPackContract represents the contract of a pack NFT that has been +// put into circulation. // We need to monitor each circulating packs events. type CirculatingPackContract struct { gorm.Model diff --git a/service/app/collectible.go b/service/app/collectible.go index bce99b1..1b8ba08 100644 --- a/service/app/collectible.go +++ b/service/app/collectible.go @@ -9,13 +9,17 @@ import ( "github.com/onflow/flow-go-sdk" ) +// Collectible is a reference to an NFT which can be included in a pack. type Collectible struct { - FlowID common.FlowID // ID of the collectible NFT + FlowID common.FlowID // Flow ID of the collectible NFT ContractReference AddressLocation // Reference to the collectible NFT contract } +// Collectibles slice type. Allows storing collectibles of a pack +// embedded (as a text column of 'distribution_packs' table) in database. type Collectibles []Collectible +// CollectibleFromString returns a collectible from the string representation. func CollectibleFromString(s string) (Collectible, error) { split := strings.Split(string(s), ".") address := common.FlowAddress(flow.HexToAddress(split[1])) @@ -37,6 +41,8 @@ func (c Collectible) String() string { return fmt.Sprintf("A.%s.%s.%d", c.ContractReference.Address, c.ContractReference.Name, c.FlowID.Int64) } +// HashString returns the string representation of a collectible used to +// construct a packs commitmentHash. func (c Collectible) HashString() string { return c.String() } @@ -50,6 +56,7 @@ func (Collectibles) GormDataType() string { return "text" } +// Scan a collectibles slice from database. func (cc *Collectibles) Scan(value interface{}) error { str, ok := value.(string) if !ok { @@ -68,6 +75,7 @@ func (cc *Collectibles) Scan(value interface{}) error { return nil } +// Convert a collectibles slice to database storable format. func (cc Collectibles) Value() (driver.Value, error) { return strings.Trim(strings.Join(strings.Fields(fmt.Sprint(cc)), ","), "[]"), nil } diff --git a/service/app/contract_interface.go b/service/app/contract_interface.go index ae0323f..1f60aca 100644 --- a/service/app/contract_interface.go +++ b/service/app/contract_interface.go @@ -44,11 +44,12 @@ const ( // - Timeout for settling and minting? // - Cancel? +// Contract handles all the onchain logic and functions type Contract struct { cfg *config.Config + logger *log.Logger flowClient *client.Client account *flow_helpers.Account - logger *log.Logger } func minInt(a int, b int) int { @@ -58,17 +59,22 @@ func minInt(a int, b int) int { return a } -func NewContract(cfg *config.Config, flowClient *client.Client) *Contract { +func NewContract(cfg *config.Config, logger *log.Logger, flowClient *client.Client) *Contract { pdsAccount := flow_helpers.GetAccount( flow.HexToAddress(cfg.AdminAddress), cfg.AdminPrivateKey, []int{0}, // TODO (latenssi): more key indexes ) - logger := log.New() - logger.SetLevel(log.InfoLevel) // TODO - return &Contract{cfg, flowClient, pdsAccount, logger} + return &Contract{cfg, logger, flowClient, pdsAccount} } +// StartSettlement sets the given distributions state to 'settling' and starts the settlement +// phase onchain. +// It lists all collectible NFTs in the distribution and creates batches +// of 'SETTLE_BATCH_SIZE' from them. +// It then creates and stores the settlement Flow transactions (PDS account withdraw from issuer to escrow) in +// database to be later processed by a poller. +// Batching needs to be done to control the transaction size. func (c *Contract) StartSettlement(ctx context.Context, db *gorm.DB, dist *Distribution) error { c.logger.WithFields(log.Fields{ "method": "StartSettlement", @@ -174,6 +180,15 @@ func (c *Contract) StartSettlement(ctx context.Context, db *gorm.DB, dist *Distr return nil } +// StartMinting sets the given distributions state to 'minting' and starts the settlement +// phase onchain. +// It creates a CirculatingPackContract to allow onchain monitoring +// (listening for events) of any pack that has been put to circulation. +// It then lists all Pack NFTs in the distribution and creates batches +// of 'MINT_BATCH_SIZE' from them. +// It then creates and stores the minting Flow transactions in database to be +// later processed by a poller. +// Batching needs to be done to control the transaction size. func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribution) error { c.logger.WithFields(log.Fields{ "method": "StartMinting", @@ -288,25 +303,31 @@ func (c *Contract) StartMinting(ctx context.Context, db *gorm.DB, dist *Distribu return nil } +// Cancel needs to be specced and implemented. func (c *Contract) Cancel(ctx context.Context, db *gorm.DB, dist *Distribution) error { + // TODO (latenssi) + c.logger.WithFields(log.Fields{ "method": "Cancel", "ID": dist.ID, }).Info("Cancel") - if err := dist.SetCancelled(); err != nil { - return err - } + return fmt.Errorf("cancel is not yet implemented") - if err := UpdateDistribution(db, dist); err != nil { - return err - } + // if err := dist.SetCancelled(); err != nil { + // return err + // } - // TODO (latenssi) + // if err := UpdateDistribution(db, dist); err != nil { + // return err + // } - return nil + // return nil } +// UpdateSettlementStatus polls for 'Deposit' events regarding the given distributions +// collectible NFTs. +// It updates the settelement status in database accordingly. func (c *Contract) UpdateSettlementStatus(ctx context.Context, db *gorm.DB, dist *Distribution) error { c.logger.WithFields(log.Fields{ "method": "UpdateSettlementStatus", @@ -396,6 +417,7 @@ func (c *Contract) UpdateSettlementStatus(ctx context.Context, db *gorm.DB, dist } if settlement.IsComplete() { + // TODO: consider updating the distribution separately if err := dist.SetSettled(); err != nil { return err } @@ -420,6 +442,9 @@ func (c *Contract) UpdateSettlementStatus(ctx context.Context, db *gorm.DB, dist return nil } +// UpdateMintingStatus polls for 'Mint' events regarding the given distributions +// Pack NFTs. +// It updates the minting status in database accordingly. func (c *Contract) UpdateMintingStatus(ctx context.Context, db *gorm.DB, dist *Distribution) error { c.logger.WithFields(log.Fields{ "method": "UpdateMintingStatus", @@ -517,6 +542,7 @@ func (c *Contract) UpdateMintingStatus(ctx context.Context, db *gorm.DB, dist *D } if minting.IsComplete() { + // TODO: consider updating the distribution separately if err := dist.SetComplete(); err != nil { return err } @@ -540,6 +566,10 @@ func (c *Contract) UpdateMintingStatus(ctx context.Context, db *gorm.DB, dist *D return nil } +// UpdateCirculatingPack polls for 'REVEAL_REQUEST' and 'OPEN_REQUEST' events +// regarding the given CirculatingPackContract. +// It handles each event by creating and storing an appropriate Flow transaction +// in database to be later processed by a poller. func (c *Contract) UpdateCirculatingPack(ctx context.Context, db *gorm.DB, cpc *CirculatingPackContract) error { c.logger.WithFields(log.Fields{ "method": "UpdateCirculatingPack", @@ -604,7 +634,7 @@ func (c *Contract) UpdateCirculatingPack(ctx context.Context, db *gorm.DB, cpc * } switch eventName { - case REVEAL_REQUEST: + case REVEAL_REQUEST: // Reveal a pack if err := pack.Reveal(); err != nil { return err } @@ -656,7 +686,7 @@ func (c *Contract) UpdateCirculatingPack(ctx context.Context, db *gorm.DB, cpc * "packFlowID": flowID, }).Info("Pack reveal transaction created") - case OPEN_REQUEST: + case OPEN_REQUEST: // Open a pack if err := pack.Open(); err != nil { return err } diff --git a/service/app/distribution.go b/service/app/distribution.go index e96a331..36436ee 100644 --- a/service/app/distribution.go +++ b/service/app/distribution.go @@ -1,12 +1,9 @@ package app import ( - "crypto/sha256" - "encoding/hex" "fmt" "math/rand" "sort" - "strings" "time" "github.com/flow-hydraulics/flow-pds/service/common" @@ -243,85 +240,3 @@ func (dist Distribution) PackSlotCount() int { func (dist Distribution) SlotCount() int { return dist.PackCount() * dist.PackSlotCount() } - -// SetCommitmentHash should -// - validate the pack -// - decide on a random salt value -// - calculate the commitment hash for the pack -func (p *Pack) SetCommitmentHash() error { - if err := p.Validate(); err != nil { - return fmt.Errorf("pack validation error: %w", err) - } - - if !p.Salt.IsEmpty() { - return fmt.Errorf("salt is already set") - } - - if !p.CommitmentHash.IsEmpty() { - return fmt.Errorf("commitmentHash is already set") - } - - salt, err := common.GenerateRandomBytes(SALT_LENGTH) - if err != nil { - return err - } - - p.Salt = salt - p.CommitmentHash = p.Hash() - - return nil -} - -// Hash outputs the 'commitmentHash' of a pack. -// It is converting inputs to string and joining them with a delim to make the input more readable. -// This will allow anyone to easily copy paste strings and verify the hash. -// We also use the full reference (address and name) of a collectible to make -// it more difficult to fiddle with the types of collectibles inside a pack. -func (p *Pack) Hash() []byte { - inputs := make([]string, 1+len(p.Collectibles)) - inputs[0] = hex.EncodeToString(p.Salt) - for i, c := range p.Collectibles { - inputs[i+1] = c.HashString() - } - input := strings.Join(inputs, HASH_DELIM) - hash := sha256.Sum256([]byte(input)) - return hash[:] -} - -// Seal should set the FlowID of the pack and set it as sealed -func (p *Pack) Seal(id common.FlowID) error { - if p.State != common.PackStateInit { - return fmt.Errorf("pack in unexpected state: %d", p.State) - } - - if p.FlowID.Valid { - return fmt.Errorf("pack FlowID already set: %v", id) - } - - p.FlowID = id - p.State = common.PackStateSealed - - return nil -} - -// Reveal should set the pack as revealed -func (p *Pack) Reveal() error { - if p.State != common.PackStateSealed { - return fmt.Errorf("pack in unexpected state: %d", p.State) - } - - p.State = common.PackStateRevealed - - return nil -} - -// Open should set the pack as opened -func (p *Pack) Open() error { - if p.State != common.PackStateRevealed { - return fmt.Errorf("pack in unexpected state: %d", p.State) - } - - p.State = common.PackStateOpened - - return nil -} diff --git a/service/app/minting.go b/service/app/minting.go index 9fbf567..6b33487 100644 --- a/service/app/minting.go +++ b/service/app/minting.go @@ -5,6 +5,7 @@ import ( "gorm.io/gorm" ) +// Minting represents the minting status of a distribution. type Minting struct { gorm.Model ID uuid.UUID `gorm:"column:id;primary_key;type:uuid;"` diff --git a/service/app/pack.go b/service/app/pack.go new file mode 100644 index 0000000..0d39567 --- /dev/null +++ b/service/app/pack.go @@ -0,0 +1,92 @@ +package app + +import ( + "crypto/sha256" + "encoding/hex" + "fmt" + "strings" + + "github.com/flow-hydraulics/flow-pds/service/common" +) + +// SetCommitmentHash should +// - validate the pack +// - decide on a random salt value +// - calculate the commitment hash for the pack +func (p *Pack) SetCommitmentHash() error { + if err := p.Validate(); err != nil { + return fmt.Errorf("pack validation error: %w", err) + } + + if !p.Salt.IsEmpty() { + return fmt.Errorf("salt is already set") + } + + if !p.CommitmentHash.IsEmpty() { + return fmt.Errorf("commitmentHash is already set") + } + + salt, err := common.GenerateRandomBytes(SALT_LENGTH) + if err != nil { + return err + } + + p.Salt = salt + p.CommitmentHash = p.Hash() + + return nil +} + +// Hash outputs the 'commitmentHash' of a pack. +// It is converting inputs to string and joining them with a delim to make the input more readable. +// This will allow anyone to easily copy paste strings and verify the hash. +// We also use the full reference (address and name) of a collectible to make +// it more difficult to fiddle with the types of collectibles inside a pack. +func (p *Pack) Hash() []byte { + inputs := make([]string, 1+len(p.Collectibles)) + inputs[0] = hex.EncodeToString(p.Salt) + for i, c := range p.Collectibles { + inputs[i+1] = c.HashString() + } + input := strings.Join(inputs, HASH_DELIM) + hash := sha256.Sum256([]byte(input)) + return hash[:] +} + +// Seal should set the FlowID of the pack and set it as sealed +func (p *Pack) Seal(id common.FlowID) error { + if p.State != common.PackStateInit { + return fmt.Errorf("pack in unexpected state: %d", p.State) + } + + if p.FlowID.Valid { + return fmt.Errorf("pack FlowID already set: %v", id) + } + + p.FlowID = id + p.State = common.PackStateSealed + + return nil +} + +// Reveal should set the pack as revealed +func (p *Pack) Reveal() error { + if p.State != common.PackStateSealed { + return fmt.Errorf("pack in unexpected state: %d", p.State) + } + + p.State = common.PackStateRevealed + + return nil +} + +// Open should set the pack as opened +func (p *Pack) Open() error { + if p.State != common.PackStateRevealed { + return fmt.Errorf("pack in unexpected state: %d", p.State) + } + + p.State = common.PackStateOpened + + return nil +} diff --git a/service/app/poller.go b/service/app/poller.go index cfb7d2f..17602c2 100644 --- a/service/app/poller.go +++ b/service/app/poller.go @@ -11,6 +11,9 @@ import ( "gorm.io/gorm" ) +// TODO: instead of running everything in one transaction, separate them by the +// parent object or something suitable + func poller(app *App) { ctx := context.Background() ctx, cancel := context.WithCancel(ctx) @@ -91,6 +94,7 @@ func handleSettling(ctx context.Context, db *gorm.DB, contract *Contract) error } for _, dist := range settling { + // TODO: separate db transaction? if err := contract.UpdateSettlementStatus(ctx, tx, &dist); err != nil { return err } @@ -124,6 +128,7 @@ func handleMinting(ctx context.Context, db *gorm.DB, contract *Contract) error { } for _, dist := range minting { + // TODO: separate db transaction? if err := contract.UpdateMintingStatus(ctx, tx, &dist); err != nil { return err } diff --git a/service/app/settlement.go b/service/app/settlement.go index 9ec3163..e9ebc02 100644 --- a/service/app/settlement.go +++ b/service/app/settlement.go @@ -8,6 +8,7 @@ import ( "gorm.io/gorm" ) +// Settlement represents the settlement status of a distribution. type Settlement struct { gorm.Model ID uuid.UUID `gorm:"column:id;primary_key;type:uuid;"` diff --git a/service/common/flowID.go b/service/common/flowID.go index c9f5fc2..b012ebe 100644 --- a/service/common/flowID.go +++ b/service/common/flowID.go @@ -16,7 +16,7 @@ import ( // For reference: // https://www.reddit.com/r/golang/comments/7eycli/why_is_there_no_sqlnulluint64/ -type FlowID sql.NullInt64 +type FlowID sql.NullInt64 // Nullable int 64 type FlowIDList []FlowID func (i FlowID) LessThan(j FlowID) bool { diff --git a/service/common/states.go b/service/common/states.go index 2f7059c..deaf903 100644 --- a/service/common/states.go +++ b/service/common/states.go @@ -4,6 +4,10 @@ type DistributionState uint type PackState uint type SettlementState uint type MintingState uint +type TransactionState int + +// TODO (latenssi): represent states as strings (instead of integers) to allow +// flexibility in database structure? const ( DistributionStateInit DistributionState = iota @@ -34,3 +38,11 @@ const ( MintingStateStopped MintingStateDone ) + +const ( + TransactionStateInit = iota + TransactionStateRetry + TransactionStateSent + TransactionStateFailed + TransactionStateOk +) diff --git a/service/config/config.go b/service/config/config.go index 848d151..c9ae054 100644 --- a/service/config/config.go +++ b/service/config/config.go @@ -1,10 +1,9 @@ package config import ( - "log" - "github.com/caarlos0/env/v6" "github.com/joho/godotenv" + log "github.com/sirupsen/logrus" ) type Config struct { @@ -12,6 +11,7 @@ type Config struct { AdminAddress string `env:"FLOW_PDS_ADMIN_ADDRESS,notEmpty"` AdminPrivateKey string `env:"FLOW_PDS_ADMIN_PRIVATE_KEY,notEmpty"` + // TODO AdminPrivateKeyIndexes // -- Database -- diff --git a/service/errors/errors.go b/service/errors/errors.go deleted file mode 100644 index fedf794..0000000 --- a/service/errors/errors.go +++ /dev/null @@ -1,7 +0,0 @@ -package errors - -type NilConfigError struct{} - -func (e *NilConfigError) Error() string { - return "MainConfig can not be nil" -} diff --git a/service/flow_helpers/account.go b/service/flow_helpers/account.go index 05a3d1d..579421c 100644 --- a/service/flow_helpers/account.go +++ b/service/flow_helpers/account.go @@ -53,7 +53,7 @@ func GetAccount(address flow.Address, privateKeyInHex string, keyIndexes []int) } // KeyIndex rotates the given indexes ('KeyIndexes') and returns the next index -// TODO (latenssi): sync over database as this currently only works in a single instance situation +// TODO (latenssi): sync over database as this currently only works in a single instance situation? func (a *Account) KeyIndex() int { // NOTE: This won't help if having multiple instances of the PDS service running keyIndexLock.Lock() diff --git a/service/flow_helpers/transaction.go b/service/flow_helpers/transaction.go index 96822e6..f667959 100644 --- a/service/flow_helpers/transaction.go +++ b/service/flow_helpers/transaction.go @@ -2,11 +2,7 @@ package flow_helpers import ( "context" - "fmt" - "time" - "github.com/flow-hydraulics/flow-pds/go-contracts/util" - "github.com/onflow/cadence" "github.com/onflow/flow-go-sdk" "github.com/onflow/flow-go-sdk/client" ) @@ -30,63 +26,3 @@ func SignProposeAndPayAs(ctx context.Context, flowClient *client.Client, account return nil } - -func PrepareTransaction(arguments []cadence.Value, txScriptPath string) (*flow.Transaction, error) { - txScript := util.ParseCadenceTemplate(txScriptPath) - - tx := flow.NewTransaction(). - SetScript(txScript). - SetGasLimit(9999) - - for _, arg := range arguments { - if err := tx.AddArgument(arg); err != nil { - return nil, err - } - } - - return tx, nil -} - -func SendAndWait(ctx context.Context, flowClient *client.Client, timeout time.Duration, tx flow.Transaction) (*flow.TransactionResult, error) { - if err := flowClient.SendTransaction(ctx, tx); err != nil { - return nil, err - } - return WaitForSeal(ctx, flowClient, timeout, tx.ID()) -} - -func WaitForSeal(ctx context.Context, flowClient *client.Client, timeout time.Duration, id flow.Identifier) (*flow.TransactionResult, error) { - var ( - result *flow.TransactionResult - err error - ) - - if timeout > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, timeout) - defer cancel() - } - - for { - result, err = flowClient.GetTransactionResult(ctx, id) - if err != nil { - return nil, err - } - - if result.Error != nil { - return result, result.Error - } - - switch result.Status { - default: - // Not an interesting state, exit switch and continue loop - case flow.TransactionStatusExpired: - // Expired, handle as an error - return result, fmt.Errorf("transaction expired") - case flow.TransactionStatusSealed: - // Sealed, all good - return result, nil - } - - time.Sleep(time.Second) - } -} diff --git a/service/http/handlers.go b/service/http/handlers.go index 6d7cbc5..a593473 100644 --- a/service/http/handlers.go +++ b/service/http/handlers.go @@ -2,13 +2,13 @@ package http import ( "encoding/json" - "log" "net/http" "strconv" "github.com/flow-hydraulics/flow-pds/service/app" "github.com/google/uuid" "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" ) // Create a distribution diff --git a/service/http/middleware.go b/service/http/middleware.go index f1114a6..779844b 100644 --- a/service/http/middleware.go +++ b/service/http/middleware.go @@ -4,11 +4,11 @@ import ( "encoding/json" "fmt" "io" - "log" "net/http" "strings" gorilla "github.com/gorilla/handlers" + log "github.com/sirupsen/logrus" ) func UseCors(h http.Handler) http.Handler { diff --git a/service/http/router.go b/service/http/router.go index afea29b..fc79ce4 100644 --- a/service/http/router.go +++ b/service/http/router.go @@ -1,11 +1,11 @@ package http import ( - "log" "net/http" "github.com/flow-hydraulics/flow-pds/service/app" "github.com/gorilla/mux" + log "github.com/sirupsen/logrus" ) func NewRouter(logger *log.Logger, app *app.App) http.Handler { diff --git a/service/http/server.go b/service/http/server.go index 0bb1cc0..db0c76d 100644 --- a/service/http/server.go +++ b/service/http/server.go @@ -3,8 +3,6 @@ package http import ( "context" "fmt" - "io" - "log" "net/http" "os" "os/signal" @@ -12,6 +10,7 @@ import ( "github.com/flow-hydraulics/flow-pds/service/app" "github.com/flow-hydraulics/flow-pds/service/config" + log "github.com/sirupsen/logrus" ) type Server struct { @@ -22,8 +21,7 @@ type Server struct { func NewServer(cfg *config.Config, logger *log.Logger, app *app.App) *Server { if logger == nil { - // A discarding logger - logger = log.New(io.Discard, "", log.LstdFlags|log.Lshortfile) + panic("no logger") } r := NewRouter(logger, app) diff --git a/service/http/types.go b/service/http/types.go index 93a173b..74fbc32 100644 --- a/service/http/types.go +++ b/service/http/types.go @@ -8,8 +8,6 @@ import ( "github.com/google/uuid" ) -// TODO (latenssi): represent states as strings - type ReqCreateDistribution struct { FlowID common.FlowID `json:"distFlowID"` Issuer common.FlowAddress `json:"issuer"` diff --git a/service/transactions/database.go b/service/transactions/database.go index 7c45e62..314526d 100644 --- a/service/transactions/database.go +++ b/service/transactions/database.go @@ -1,6 +1,7 @@ package transactions import ( + "github.com/flow-hydraulics/flow-pds/service/common" "github.com/google/uuid" "gorm.io/gorm" "gorm.io/gorm/clause" @@ -24,16 +25,19 @@ func (t *StorableTransaction) Save(db *gorm.DB) error { return db.Omit(clause.Associations).Save(t).Error } +// GetTransaction returns a StorableTransaction from database. func GetTransaction(db *gorm.DB, id uuid.UUID) (*StorableTransaction, error) { t := StorableTransaction{} return &t, db.First(&t, id).Error } +// SendableIDs returns the offchain IDs of StorableTransactions that are +// currently sendable. func SendableIDs(db *gorm.DB) ([]uuid.UUID, error) { list := []StorableTransaction{} err := db.Select("id").Order("created_at desc"). - Where(map[string]interface{}{"state": TransactionStateInit}). - Or(map[string]interface{}{"state": TransactionStateRetry}). + Where(map[string]interface{}{"state": common.TransactionStateInit}). + Or(map[string]interface{}{"state": common.TransactionStateRetry}). Find(&list).Error if err != nil { return nil, err @@ -45,10 +49,12 @@ func SendableIDs(db *gorm.DB) ([]uuid.UUID, error) { return res, nil } +// SentIDs returns the offchain IDs of StorableTransactions that are +// currently sent. func SentIDs(db *gorm.DB) ([]uuid.UUID, error) { list := []StorableTransaction{} err := db.Select("id").Order("created_at desc"). - Where(map[string]interface{}{"state": TransactionStateSent}). + Where(map[string]interface{}{"state": common.TransactionStateSent}). Find(&list).Error if err != nil { return nil, err diff --git a/service/transactions/transactions.go b/service/transactions/transactions.go index ee9984c..3c037bb 100644 --- a/service/transactions/transactions.go +++ b/service/transactions/transactions.go @@ -5,6 +5,7 @@ import ( "encoding/json" "strings" + "github.com/flow-hydraulics/flow-pds/service/common" "github.com/flow-hydraulics/flow-pds/service/flow_helpers" "github.com/google/uuid" "github.com/onflow/cadence" @@ -16,29 +17,16 @@ import ( "gorm.io/gorm" ) -// TODO (latenssi): move this to main and use an application wide logger -func init() { - log.SetLevel(log.InfoLevel) -} - -type TransactionState int - -const ( - TransactionStateInit = iota - TransactionStateRetry - TransactionStateSent - TransactionStateError - TransactionStateOk -) - +// StorableTransaction represents a Flow transaction. +// It stores the script and arguments of a transaction. type StorableTransaction struct { gorm.Model ID uuid.UUID `gorm:"column:id;primary_key;type:uuid;"` - State TransactionState `gorm:"column:state"` - Error string `gorm:"column:error"` - RetryCount uint `gorm:"column:retry_count"` - TransactionID string `gorm:"column:transaction_id"` + State common.TransactionState `gorm:"column:state"` + Error string `gorm:"column:error"` + RetryCount uint `gorm:"column:retry_count"` + TransactionID string `gorm:"column:transaction_id"` Script string `gorm:"column:script"` Arguments datatypes.JSON `gorm:"column:arguments"` @@ -67,6 +55,7 @@ func NewTransaction(script []byte, arguments []cadence.Value) (*StorableTransact return &transaction, nil } +// Prepare parses the transaction into a sendable state. func (t *StorableTransaction) Prepare() (*flow.Transaction, error) { argsBytes := [][]byte{} if err := json.Unmarshal(t.Arguments, &argsBytes); err != nil { @@ -95,8 +84,10 @@ func (t *StorableTransaction) Prepare() (*flow.Transaction, error) { return tx, nil } +// Send prepares a Flow transaction, signs it and then sends it. +// Updates the TransactionID each time. func (t *StorableTransaction) Send(ctx context.Context, flowClient *client.Client, account *flow_helpers.Account) error { - if t.State == TransactionStateRetry { + if t.State == common.TransactionStateRetry { t.RetryCount++ } @@ -120,12 +111,17 @@ func (t *StorableTransaction) Send(ctx context.Context, flowClient *client.Clien return err } + // Update TransactionID t.TransactionID = tx.ID().Hex() - t.State = TransactionStateSent + + // Update state + t.State = common.TransactionStateSent return nil } +// HandleResult checks the results of a transaction onchain and updates the +// StorableTransaction accordingly. func (t *StorableTransaction) HandleResult(ctx context.Context, flowClient *client.Client) error { result, err := flowClient.GetTransactionResult(ctx, flow.HexToID(t.TransactionID)) if err != nil { @@ -137,13 +133,13 @@ func (t *StorableTransaction) HandleResult(ctx context.Context, flowClient *clie if result.Error != nil { t.Error = result.Error.Error() if strings.Contains(result.Error.Error(), "invalid proposal key") { - t.State = TransactionStateRetry + t.State = common.TransactionStateRetry log.WithFields(log.Fields{ "transactionID": t.TransactionID, }).Warn("Invalid proposal key in transaction, retrying later") } else { - t.State = TransactionStateError + t.State = common.TransactionStateFailed log.WithFields(log.Fields{ "transactionID": t.TransactionID, @@ -155,9 +151,9 @@ func (t *StorableTransaction) HandleResult(ctx context.Context, flowClient *clie switch result.Status { case flow.TransactionStatusExpired: - t.State = TransactionStateRetry + t.State = common.TransactionStateRetry case flow.TransactionStatusSealed: - t.State = TransactionStateOk + t.State = common.TransactionStateOk } return nil diff --git a/test_lib.go b/test_lib.go index f0b7670..5cf455a 100644 --- a/test_lib.go +++ b/test_lib.go @@ -11,10 +11,13 @@ import ( "github.com/flow-hydraulics/flow-pds/service/http" "github.com/flow-hydraulics/flow-pds/service/transactions" "github.com/onflow/flow-go-sdk/client" + log "github.com/sirupsen/logrus" "google.golang.org/grpc" "gorm.io/gorm" ) +var testLogger *log.Logger + func cleanTestDatabase(cfg *config.Config, db *gorm.DB) { // Only run this if database DSN contains "test" if strings.Contains(strings.ToLower(cfg.DatabaseDSN), "test") { @@ -47,6 +50,10 @@ func getTestCfg() *config.Config { } func getTestApp(cfg *config.Config, poll bool) (*app.App, func()) { + if testLogger == nil { + testLogger = log.New() + } + flowClient, err := client.New(cfg.AccessAPIHost, grpc.WithInsecure()) if err != nil { panic(err) @@ -67,7 +74,7 @@ func getTestApp(cfg *config.Config, poll bool) (*app.App, func()) { panic(err) } - app := app.New(cfg, db, flowClient, poll) + app := app.New(cfg, testLogger, db, flowClient, poll) clean := func() { app.Close() @@ -79,11 +86,16 @@ func getTestApp(cfg *config.Config, poll bool) (*app.App, func()) { } func getTestServer(cfg *config.Config, poll bool) (*http.Server, func()) { + if testLogger == nil { + testLogger = log.New() + } + app, cleanupApp := getTestApp(cfg, poll) clean := func() { cleanupApp() } - return http.NewServer(cfg, nil, app), clean + + return http.NewServer(cfg, testLogger, app), clean } func makeTestCollection(size int) []common.FlowID {