Skip to content

Commit

Permalink
Merge pull request #11286 from vegaprotocol/11285
Browse files Browse the repository at this point in the history
feat: add support for trading transaction ordering
  • Loading branch information
ze97286 authored May 22, 2024
2 parents 89e3902 + 0c71984 commit e6b8ed1
Show file tree
Hide file tree
Showing 21 changed files with 3,040 additions and 2,519 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [11196](https://github.com/vegaprotocol/vega/issues/11196) - Add an active field in the price monitoring bounds payload.
- [11211](https://github.com/vegaprotocol/vega/issues/11211) - Liquidation engine includes `vAMM` shapes as available volume.
- [11217](https://github.com/vegaprotocol/vega/issues/11217) - Allow market proposals to override risk factors.
- [11285](https://github.com/vegaprotocol/vega/issues/11285) - Add support for trading transaction ordering.
- [11282](https://github.com/vegaprotocol/vega/issues/11282) - Allow a party to withdraw rewards from an AMM vested account.

### 🐛 Fixes
Expand Down
2 changes: 2 additions & 0 deletions commands/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ func CheckInputData(rawInputData []byte) (*commandspb.InputData, Errors) {
errs.Merge(checkAmendAMM(cmd.AmendAmm))
case *commandspb.InputData_CancelAmm:
errs.Merge(checkCancelAMM(cmd.CancelAmm))
case *commandspb.InputData_DelayedTransactionsWrapper:
break
default:
errs.AddForProperty("tx.input_data.command", ErrIsNotSupported)
}
Expand Down
5 changes: 5 additions & 0 deletions core/blockchain/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (app *App) InitChain(_ context.Context, req *types.RequestInitChain) (*type
return &types.ResponseInitChain{}, nil
}

func (app *App) GetTx(tx []byte) (Tx, error) {
txx, _, err := app.getTx(tx)
return txx, err
}

// PrepareProposal will take the given transactions from the mempool and attempts to prepare a
// proposal from them when it's our turn to do so while keeping the size, gas, pow, and spam constraints.
func (app *App) PrepareProposal(_ context.Context, req *types.RequestPrepareProposal) (*types.ResponsePrepareProposal, error) {
Expand Down
2 changes: 2 additions & 0 deletions core/events/transaction_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ func NewTransactionResultEventFailure(

func (t *TransactionResult) setTx(tx interface{}) *TransactionResult {
switch tv := tx.(type) {
case *commandspb.DelayedTransactionsWrapper:
break
case *commandspb.OrderSubmission:
t.evt.Transaction = &eventspb.TransactionResult_OrderSubmission{
OrderSubmission: tv,
Expand Down
36 changes: 36 additions & 0 deletions core/nodewallets/commander.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,36 @@ func NewCommander(cfg Config, log *logging.Logger, bc Chain, w *vega.Wallet, bst
}, nil
}

func (c *Commander) NewTransaction(ctx context.Context, cmd txn.Command, payload proto.Message) ([]byte, error) {
chainID, err := c.bc.GetChainID(ctx)
if err != nil {
c.log.Error("couldn't retrieve chain ID",
logging.Error(err),
)
return nil, err
}
inputData := commands.NewInputData(c.bstats.Height())
wrapPayloadIntoInputData(inputData, cmd, payload)
marshalInputData, err := commands.MarshalInputData(inputData)
if err != nil {
// this should never be possible
c.log.Panic("could not marshal core transaction", logging.Error(err))
}

signature, err := c.sign(commands.BundleInputDataForSigning(marshalInputData, chainID))
if err != nil {
// this should never be possible too
c.log.Panic("could not sign command", logging.Error(err))
}

tx := commands.NewTransaction(c.wallet.PubKey().Hex(), marshalInputData, signature)
marshalledTx, err := proto.Marshal(tx)
if err != nil {
return nil, err
}
return marshalledTx, nil
}

// Command - send command to chain.
// Note: beware when passing in an exponential back off since the done function may be called many times.
func (c *Commander) Command(ctx context.Context, cmd txn.Command, payload proto.Message, done func(string, error), bo *backoff.ExponentialBackOff) {
Expand Down Expand Up @@ -165,6 +195,12 @@ func wrapPayloadIntoInputData(data *commandspb.InputData, cmd txn.Command, paylo
switch cmd {
case txn.SubmitOrderCommand, txn.CancelOrderCommand, txn.AmendOrderCommand, txn.VoteCommand, txn.WithdrawCommand, txn.LiquidityProvisionCommand, txn.ProposeCommand, txn.BatchProposeCommand, txn.SubmitOracleDataCommand, txn.StopOrdersCancellationCommand, txn.StopOrdersSubmissionCommand:
panic("command is not supported to be sent by a node.")
case txn.DelayedTransactionsWrapper:
if underlyingCmd, ok := payload.(*commandspb.DelayedTransactionsWrapper); ok {
data.Command = &commandspb.InputData_DelayedTransactionsWrapper{
DelayedTransactionsWrapper: underlyingCmd,
}
}
case txn.ProtocolUpgradeCommand:
if underlyingCmd, ok := payload.(*commandspb.ProtocolUpgradeProposal); ok {
data.Command = &commandspb.InputData_ProtocolUpgradeProposal{
Expand Down
148 changes: 142 additions & 6 deletions core/processor/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ type EthCallEngine interface {
Start()
}

type TxCache interface {
SetRawTxs(rtx [][]byte)
GetRawTxs() [][]byte
NewDelayedTransaction(ctx context.Context, delayed [][]byte) []byte
}

type App struct {
abci *abci.App
currentTimestamp time.Time
Expand Down Expand Up @@ -253,6 +259,9 @@ type App struct {
nilSpam bool

maxBatchSize atomic.Uint64
txCache TxCache

seenDelayedTxTransactions bool
}

func NewApp(log *logging.Logger,
Expand Down Expand Up @@ -297,6 +306,7 @@ func NewApp(log *logging.Logger,
ethCallEngine EthCallEngine,
balanceChecker BalanceChecker,
partiesEngine PartiesEngine,
txCache TxCache,
) *App {
log = log.Named(namedLogger)
log.SetLevel(config.Level.Get())
Expand Down Expand Up @@ -345,6 +355,7 @@ func NewApp(log *logging.Logger,
ethCallEngine: ethCallEngine,
balanceChecker: balanceChecker,
partiesEngine: partiesEngine,
txCache: txCache,
}

// setup handlers
Expand Down Expand Up @@ -527,7 +538,9 @@ func NewApp(log *logging.Logger,
).
HandleDeliverTx(txn.UpdatePartyProfileCommand,
app.SendTransactionResult(app.UpdatePartyProfile),
)
).
HandleDeliverTx(txn.DelayedTransactionsWrapper,
app.SendTransactionResult(app.handleDelayedTransactionWrapper))

app.time.NotifyOnTick(app.onTick)

Expand Down Expand Up @@ -863,19 +876,59 @@ func (app *App) OnInitChain(req *tmtypes.RequestInitChain) (*tmtypes.ResponseIni
// caused the party to get blocked.
func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
var totalBytes int64
validationResults := []pow.ValidationEntry{}

// internally we use this as max bytes, externally to consensus params we return max ints. This is done so that cometbft always returns to us the full mempool
// and we can first sort it by priority and then reap by size.
maxBytes := tmtypesint.DefaultBlockParams().MaxBytes * 4
app.log.Debug("prepareProposal called with", logging.Int("txs", len(rawTxs)), logging.Int64("max-bytes", maxBytes))

// as transactions that are wrapped for sending in the next block are not removed from the mempool
// to avoid adding them both from the mempool and from the cache we need to check
// they were not in the cache.
// we still need to check that the transactions from previous block are passing pow and spam requirements.
addedFromPreviousHash := map[string]struct{}{}
delayedTxs := [][]byte{}
for _, txx := range app.txCache.GetRawTxs() {
tx, err := app.abci.GetTx(txx)
if err != nil {
continue
}
if !app.nilPow {
vr, d := app.pow.CheckBlockTx(tx)
validationResults = append(validationResults, pow.ValidationEntry{Tx: tx, Difficulty: d, ValResult: vr})
if vr != pow.ValidationResultSuccess && vr != pow.ValidationResultValidatorCommand {
app.log.Debug("pow failure", logging.Int64("validation-result", int64(vr)))
continue
}
}
if !app.nilSpam {
err := app.spam.CheckBlockTx(tx)
if err != nil {
app.log.Debug("spam error", logging.Error(err))
continue
}
}
if err := app.canSubmitTx(tx); err != nil {
continue
}

addedFromPreviousHash[hex.EncodeToString(tx.Hash())] = struct{}{}
delayedTxs = append(delayedTxs, txx)
totalBytes += int64(len(txx))
}

// wrap the transaction with information about gas wanted and priority
wrappedTxs := make([]*TxWrapper, 0, len(txs))
for i, v := range txs {
wtx, error := app.wrapTx(v, rawTxs[i], i)
if error != nil {
continue
}
if _, ok := addedFromPreviousHash[hex.EncodeToString(wtx.tx.Hash())]; ok {
app.log.Debug("ignoring mempool transaction corresponding to a delayed transaction from previous block")
continue
}
wrappedTxs = append(wrappedTxs, wtx)
}

Expand All @@ -888,10 +941,12 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
})

// add transactions to the block as long as we can without breaking size and gas limits in order of priority
validationResults := []pow.ValidationEntry{}
maxGas := app.getMaxGas()
totalGasWanted := uint64(0)
blockTxs := [][]byte{}
cancellations := [][]byte{}
postOnly := [][]byte{}
anythingElseFromThisBlock := [][]byte{}
nextBlockRtx := [][]byte{}

for _, tx := range wrappedTxs {
totalBytes += int64(len(tx.raw))
Expand All @@ -903,6 +958,11 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
break
}

if tx.tx.Command() == txn.DelayedTransactionsWrapper {
app.log.Debug("delayed transaction wrapper should never be submitted into the mempool")
continue
}

if !app.nilPow {
vr, d := app.pow.CheckBlockTx(tx.tx)
validationResults = append(validationResults, pow.ValidationEntry{Tx: tx.tx, Difficulty: d, ValResult: vr})
Expand All @@ -923,10 +983,61 @@ func (app *App) prepareProposal(txs []abci.Tx, rawTxs [][]byte) [][]byte {
if err := app.canSubmitTx(tx.tx); err != nil {
continue
}
app.log.Debug("adding tx to blockProposal", logging.String("tx-hash", hex.EncodeToString(tx.tx.Hash())), logging.String("tid", tx.tx.GetPoWTID()))
blockTxs = append(blockTxs, tx.raw)

switch tx.tx.Command() {
case txn.CancelOrderCommand, txn.CancelAMMCommand, txn.StopOrdersCancellationCommand:
cancellations = append(cancellations, tx.raw)
case txn.SubmitOrderCommand:
s := &commandspb.OrderSubmission{}
if err := tx.tx.Unmarshal(s); err != nil {
continue
}
if s.PostOnly {
postOnly = append(postOnly, tx.raw)
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
case txn.AmendOrderCommand, txn.AmendAMMCommand, txn.StopOrdersSubmissionCommand:
nextBlockRtx = append(nextBlockRtx, tx.raw)
case txn.BatchMarketInstructions:
batch := &commandspb.BatchMarketInstructions{}
if err := tx.tx.Unmarshal(batch); err != nil {
continue
}
// if there are no amends/submissions
if len(batch.Amendments) == 0 && len(batch.Submissions) == 0 && len(batch.StopOrdersSubmission) == 0 {
cancellations = append(cancellations, tx.raw)
} else if len(batch.Amendments) == 0 && len(batch.StopOrdersSubmission) == 0 {
allPostOnly := true
for _, sub := range batch.Submissions {
if !sub.PostOnly {
allPostOnly = false
break
}
}
if allPostOnly {
postOnly = append(postOnly, tx.raw)
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
} else {
nextBlockRtx = append(nextBlockRtx, tx.raw)
}
default:
anythingElseFromThisBlock = append(anythingElseFromThisBlock, tx.raw)
}
}
blockTxs := [][]byte{}
blockTxs = append(blockTxs, cancellations...) // cancellations go first
blockTxs = append(blockTxs, postOnly...) // then post only orders
if delayedTxs != nil {
blockTxs = append(blockTxs, delayedTxs...) // then anything from previous block
}
blockTxs = append(blockTxs, anythingElseFromThisBlock...) // finally anything else from this block
if len(nextBlockRtx) > 0 {
wrapperTx := app.txCache.NewDelayedTransaction(app.blockCtx, nextBlockRtx)
blockTxs = append(blockTxs, wrapperTx)
}
app.log.Debug("prepareProposal returned with", logging.Int("blockTxs", len(blockTxs)))
if !app.nilPow {
app.pow.EndPrepareProposal(validationResults)
}
Expand All @@ -947,6 +1058,8 @@ func (app *App) processProposal(txs []abci.Tx) bool {
maxGas := app.gastimator.GetMaxGas()
maxBytes := tmtypesint.DefaultBlockParams().MaxBytes * 4
size := int64(0)
delayedTxCount := 0

for _, tx := range txs {
size += int64(tx.GetLength())
if size > maxBytes {
Expand All @@ -960,6 +1073,14 @@ func (app *App) processProposal(txs []abci.Tx) bool {
if totalGasWanted > int(maxGas) {
return false
}
// allow only one delayed transaction wrapper in one block and its transactions must match what we expect.
if tx.Command() == txn.DelayedTransactionsWrapper {
if delayedTxCount > 0 {
app.log.Debug("more than one DelayedTransactionsWrapper")
return false
}
delayedTxCount += 1
}
}

if !app.nilPow && !app.pow.ProcessProposal(txs) {
Expand All @@ -983,6 +1104,11 @@ func (app *App) OnEndBlock(blockHeight uint64) (tmtypes.ValidatorUpdates, types1
logging.String("previous-datetime", vegatime.Format(app.previousTimestamp)),
)

if !app.seenDelayedTxTransactions {
app.txCache.SetRawTxs(nil)
}
app.seenDelayedTxTransactions = false

app.epoch.OnBlockEnd(app.blockCtx)
app.stateVar.OnBlockEnd(app.blockCtx)
app.banking.OnBlockEnd(app.blockCtx, app.currentTimestamp)
Expand Down Expand Up @@ -2715,6 +2841,16 @@ func (app *App) JoinTeam(ctx context.Context, tx abci.Tx) error {
return nil
}

func (app *App) handleDelayedTransactionWrapper(ctx context.Context, tx abci.Tx) error {
txs := &commandspb.DelayedTransactionsWrapper{}
if err := tx.Unmarshal(txs); err != nil {
return fmt.Errorf("could not deserialize DelayedTransactionsWrapper command: %w", err)
}
app.txCache.SetRawTxs(txs.Transactions)
app.seenDelayedTxTransactions = true
return nil
}

func (app *App) UpdatePartyProfile(ctx context.Context, tx abci.Tx) error {
params := &commandspb.UpdatePartyProfile{}
if err := tx.Unmarshal(params); err != nil {
Expand Down
2 changes: 2 additions & 0 deletions core/processor/gastimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ func (g *Gastimator) GetPriority(tx abci.Tx) uint64 {

func (g *Gastimator) CalcGasWantedForTx(tx abci.Tx) (uint64, error) {
switch tx.Command() {
case txn.DelayedTransactionsWrapper:
return 0, nil
case txn.SubmitOrderCommand:
s := &commandspb.OrderSubmission{}
if err := tx.Unmarshal(s); err != nil {
Expand Down
10 changes: 10 additions & 0 deletions core/processor/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ func DecodeTx(payload []byte, chainID string) (*Tx, error) {

func (t Tx) Command() txn.Command {
switch cmd := t.inputData.Command.(type) {
case *commandspb.InputData_DelayedTransactionsWrapper:
return txn.DelayedTransactionsWrapper
case *commandspb.InputData_OrderSubmission:
return txn.SubmitOrderCommand
case *commandspb.InputData_OrderCancellation:
Expand Down Expand Up @@ -261,6 +263,8 @@ func (t Tx) GetCmd() interface{} {
return cmd.AmendAmm
case *commandspb.InputData_CancelAmm:
return cmd.CancelAmm
case *commandspb.InputData_DelayedTransactionsWrapper:
return cmd.DelayedTransactionsWrapper
default:
return fmt.Errorf("command %T is not supported", cmd)
}
Expand Down Expand Up @@ -490,6 +494,12 @@ func (t Tx) Unmarshal(i interface{}) error {
return errors.New("failed to unmarshall to CancelAMM")
}
*underlyingCmd = *cmd.CancelAmm
case *commandspb.InputData_DelayedTransactionsWrapper:
underlyingCmd, ok := i.(*commandspb.DelayedTransactionsWrapper)
if !ok {
return errors.New("failed to unmarshall to DelayedTransactionsWrapper")
}
*underlyingCmd = *cmd.DelayedTransactionsWrapper
default:
return fmt.Errorf("command %T is not supported", cmd)
}
Expand Down
Loading

0 comments on commit e6b8ed1

Please sign in to comment.