Skip to content

Commit

Permalink
feat: wire in internal time triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
wwestgarth committed Aug 3, 2023
1 parent b13e4eb commit b0295de
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- [8765](https://github.com/vegaprotocol/vega/issues/8765) - Implement snapshots state for `PERPS`.
- [8918](https://github.com/vegaprotocol/vega/issues/8918) - Implement commands for team management.
- [8960](https://github.com/vegaprotocol/vega/issues/8960) - Improve wiring perpetual markets through governance.
- [8968](https://github.com/vegaprotocol/vega/issues/8968) - Improve wiring of internal time triggers for perpetual markets.
- [8756](https://github.com/vegaprotocol/vega/issues/8756) - Settlement and margin implementation for `PERPS`.
- [8887](https://github.com/vegaprotocol/vega/pull/8887) - Remove differences for snapshot loading when the `nullchain` is used instead of `tendermint`
- [8957](https://github.com/vegaprotocol/vega/issues/8957) - Oracle bindings for `PERPS`.
Expand Down
4 changes: 4 additions & 0 deletions core/datasource/common/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,4 +168,8 @@ func TestInternalTimeTriggersIsTriggered(t *testing.T) {
// Given time is after the next trigger
triggered = ttl.IsTriggered(nt.Add(time.Second * 15))
assert.Equal(t, true, triggered)

// check trigger time is progressed
triggered = ttl.IsTriggered(nt.Add(time.Second * 15))
assert.Equal(t, false, triggered)
}
29 changes: 29 additions & 0 deletions core/datasource/external/signedoracle/signedoracle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,32 @@ func TestSpecConfigurationString(t *testing.T) {
}

func TestToDataSourceDefinitionProto(t *testing.T) {}

func TestSpecConfigurationGetTimeTriggers(t *testing.T) {
ds := datasource.NewDefinitionWith(
signedoracle.SpecConfiguration{
Signers: []*common.Signer{
{},
},
Filters: []*common.SpecFilter{
{
Key: &common.SpecPropertyKey{
Name: "test-name",
Type: common.SpecPropertyKeyType(0),
},
Conditions: []*common.SpecCondition{
{
Operator: 8,
Value: "12",
},
},
},
},
})

triggers := ds.GetTimeTriggers()
assert.NotNil(t, triggers)
assert.Equal(t, 1, len(triggers))
assert.IsType(t, &common.InternalTimeTrigger{}, triggers[0])
assert.Nil(t, triggers[0])
}
3 changes: 2 additions & 1 deletion core/datasource/spec/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ func (b *Builtin) OnTick(ctx context.Context, _ time.Time) {
data := common.Data{
Signers: nil,
Data: map[string]string{
BuiltinTimestamp: fmt.Sprintf("%d", b.engine.timeService.GetTimeNow().Unix()),
BuiltinTimestamp: fmt.Sprintf("%d", b.engine.timeService.GetTimeNow().Unix()),
BuiltinTimeTrigger: fmt.Sprintf("%d", b.engine.timeService.GetTimeNow().Unix()),
},
}

Expand Down
73 changes: 73 additions & 0 deletions core/datasource/spec/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@ package spec_test
import (
"context"
"fmt"
"strconv"
"testing"
"time"

bmok "code.vegaprotocol.io/vega/core/broker/mocks"
"code.vegaprotocol.io/vega/core/datasource"
"code.vegaprotocol.io/vega/core/datasource/common"
"code.vegaprotocol.io/vega/core/datasource/definition"
"code.vegaprotocol.io/vega/core/datasource/internal/timetrigger"
dsspec "code.vegaprotocol.io/vega/core/datasource/spec"
"code.vegaprotocol.io/vega/core/datasource/spec/mocks"
"code.vegaprotocol.io/vega/core/events"
Expand All @@ -42,6 +45,7 @@ func TestOracleEngine(t *testing.T) {
t.Run("Unsubscribing unknown ID from oracle engine panics", testOracleEngineUnsubscribingUnknownIDPanics)
t.Run("Updating current time succeeds", testOracleEngineUpdatingCurrentTimeSucceeds)
t.Run("Subscribing to oracle spec activation succeeds", testOracleEngineSubscribingToSpecActivationSucceeds)
t.Run("Builtin time trigger succeeds", testBuiltinTimeTriggerSucceeds)
}

func testOracleEngineListensToSignersSucceeds(t *testing.T) {
Expand Down Expand Up @@ -339,6 +343,36 @@ func testOracleEngineUpdatingCurrentTimeSucceeds(t *testing.T) {
assert.Equal(t, time60, engine2.ts.GetTimeNow())
}

func testBuiltinTimeTriggerSucceeds(t *testing.T) {
// given
trigger := triggerSpec(t, time.Now(), 5)

// setup
ctx := context.Background()
currentTime := time.Now()
engine := newEngine(ctx, t, currentTime)

engine.broker.EXPECT().Send(gomock.Any()).Times(1)
engine.Subscribe(ctx, trigger.spec, trigger.subscriber.Cb)

// broadcast a time that will not trigger
data := common.Data{
Data: map[string]string{
dsspec.BuiltinTimeTrigger: strconv.FormatInt(currentTime.Add(-time.Minute).Unix(), 10),
},
}
engine.BroadcastData(ctx, data)

// now broadcast one that will
engine.broker.EXPECT().Send(gomock.Any()).Times(1)
data = common.Data{
Data: map[string]string{
dsspec.BuiltinTimeTrigger: strconv.FormatInt(currentTime.Add(time.Minute).Unix(), 10),
},
}
engine.BroadcastData(ctx, data)
}

type testEngine struct {
*dsspec.Engine
ts *testTimeService
Expand Down Expand Up @@ -410,6 +444,45 @@ type specBundle struct {
subscriber dummySubscriber
}

func triggerSpec(t *testing.T, initial time.Time, every int64) specBundle {
t.Helper()

cfg := &timetrigger.SpecConfiguration{
Conditions: []*common.SpecCondition{
{
Operator: common.SpecConditionOperator(2),
Value: "12",
},
{
Operator: common.SpecConditionOperator(2),
Value: "17",
},
},
Triggers: common.InternalTimeTriggers{
{
Initial: &initial,
Every: every,
},
},
}

testSpec := vegapb.NewDataSourceSpec(definition.NewWith(cfg).IntoProto())
typedOracleSpec := datasource.SpecFromProto(testSpec)

// Initialise trigger
balh := typedOracleSpec.Data.Content().(timetrigger.SpecConfiguration)
balh.SetNextTrigger(initial)

spec, err := dsspec.New(*typedOracleSpec)
if err != nil {
t.Fatalf("Couldn't create oracle spec: %v", err)
}
return specBundle{
spec: *spec,
subscriber: dummySubscriber{},
}
}

func spec(t *testing.T, currency string, op datapb.Condition_Operator, price string, keys ...string) specBundle {
t.Helper()
var signers []*datapb.Signer
Expand Down
30 changes: 30 additions & 0 deletions core/datasource/spec/spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package spec

import (
"strconv"
"strings"
"time"

"code.vegaprotocol.io/vega/core/datasource"
"code.vegaprotocol.io/vega/core/datasource/common"
Expand All @@ -30,6 +32,9 @@ type Spec struct {
// come from.
signers map[string]struct{}

// any time triggers on the spec
triggers common.InternalTimeTriggers

// filters holds all the expected property keys with the conditions they
// should match.
filters common.Filters
Expand All @@ -49,6 +54,7 @@ type Spec struct {
func New(originalSpec datasource.Spec) (*Spec, error) {
filtersFromSpec := []*common.SpecFilter{}
signersFromSpec := []*common.Signer{}
var triggersFromSpec common.InternalTimeTriggers

isExtType := false
var err error
Expand Down Expand Up @@ -103,10 +109,15 @@ func New(originalSpec datasource.Spec) (*Spec, error) {
}
}

if builtInTrigger {
triggersFromSpec = originalSpec.Data.GetTimeTriggers()
}

os := &Spec{
id: SpecID(originalSpec.ID),
signers: signers,
filters: typedFilters,
triggers: triggersFromSpec,
OriginalSpec: &originalSpec,
}

Expand All @@ -127,6 +138,18 @@ func isInternalData(data common.Data) bool {
return true
}

func isInternalTimeTrigger(data common.Data) (bool, time.Time) {
for k, v := range data.Data {
if k == BuiltinTimeTrigger {
// convert string to time
if t, err := strconv.ParseInt(v, 10, 0); err == nil {
return true, time.Unix(t, 0)
}
}
}
return false, time.Time{}
}

// MatchSigners tries to match the public keys from the provided Data object with the ones
// present in the Spec.
func (s *Spec) MatchSigners(data common.Data) bool {
Expand All @@ -142,6 +165,13 @@ func (s *Spec) MatchData(data common.Data) (bool, error) {
return false, nil
}

// if it is internal time data and we have a time-trigger check that we're past it
if ok, tt := isInternalTimeTrigger(data); ok && s.triggers[0] != nil {
if !s.triggers.IsTriggered(tt) {
return false, nil
}
}

return s.filters.Match(data.Data)
}

Expand Down
13 changes: 11 additions & 2 deletions core/products/perpetual.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package products
import (
"context"
"strconv"
"time"

"code.vegaprotocol.io/vega/core/datasource"
dscommon "code.vegaprotocol.io/vega/core/datasource/common"
Expand Down Expand Up @@ -223,16 +224,20 @@ func (p *Perpetual) receiveDataPoint(ctx context.Context, data dscommon.Data) er
return err
}
// add price point with "eth-block-time" as time
pTime, err := strconv.ParseUint(data.MetaData["eth-block-time"], 10, 64)
pTime, err := strconv.ParseInt(data.MetaData["eth-block-time"], 10, 64)
if err != nil {
p.log.Error("Could not parse the eth block time",
logging.String("eth-block-time", data.MetaData["eth-block-time"]),
logging.Error(err),
)
return err
}

// eth block time is seconds, make it nanoseconds
pTime = time.Unix(pTime, 0).UnixNano()

// now add the price
p.addExternalDataPoint(ctx, assetPrice, int64(pTime))
p.addExternalDataPoint(ctx, assetPrice, pTime)
if p.log.GetLevel() == logging.DebugLevel {
p.log.Debug(
"perp settlement data updated",
Expand Down Expand Up @@ -268,6 +273,10 @@ func (p *Perpetual) receiveSettlementCue(ctx context.Context, data dscommon.Data
p.log.Error("schedule data not valid", data.Debug()...)
return err
}

// the internal cue gives us the time in seconds, so convert to nanoseconds
t = time.Unix(t, 0).UnixNano()

p.handleSettlementCue(ctx, t)
if p.log.GetLevel() == logging.DebugLevel {
p.log.Debug("perp schedule trigger processed")
Expand Down
21 changes: 13 additions & 8 deletions core/products/perpetual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ func testRegisteredCallbacks(t *testing.T) {
// register the callback
perpetual.NotifyOnSettlementData(marketSettle)

perpetual.OnLeaveOpeningAuction(ctx, 1000)
perpetual.OnLeaveOpeningAuction(ctx, scaleToNano(t, 1000))

require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), 890))
require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), scaleToNano(t, 890)))
// callback to receive settlement data
settle(ctx, dscommon.Data{
Data: map[string]string{
Expand All @@ -278,7 +278,7 @@ func testRegisteredCallbacks(t *testing.T) {

for _, p := range points {
// send in an external and a matching internal
require.NoError(t, perpetual.SubmitDataPoint(ctx, p.price, p.t))
require.NoError(t, perpetual.SubmitDataPoint(ctx, p.price, scaleToNano(t, p.t)))
settle(ctx, dscommon.Data{
Data: map[string]string{
perp.DataSourceSpecBinding.SettlementDataProperty: p.price.String(),
Expand All @@ -291,7 +291,7 @@ func testRegisteredCallbacks(t *testing.T) {

// add some data-points in the future from when we will cue the end of the funding period
// they should not affect the funding payment of this period
require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), 2000))
require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), scaleToNano(t, 2000)))
settle(ctx, dscommon.Data{
Data: map[string]string{
perp.DataSourceSpecBinding.SettlementDataProperty: "1",
Expand Down Expand Up @@ -354,9 +354,9 @@ func testRegisteredCallbacksWithDifferentData(t *testing.T) {
// register the callback
perpetual.NotifyOnSettlementData(marketSettle)

perpetual.OnLeaveOpeningAuction(ctx, 1000)
perpetual.OnLeaveOpeningAuction(ctx, scaleToNano(t, 1000))

require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), 890))
require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), scaleToNano(t, 890)))
// callback to receive settlement data
settle(ctx, dscommon.Data{
Data: map[string]string{
Expand All @@ -371,7 +371,7 @@ func testRegisteredCallbacksWithDifferentData(t *testing.T) {
for i, p := range points {
if i%2 == 0 {
ip := num.UintZero().Sub(p.price, num.UintOne())
require.NoError(t, perpetual.SubmitDataPoint(ctx, ip, p.t))
require.NoError(t, perpetual.SubmitDataPoint(ctx, ip, scaleToNano(t, p.t)))
}
settle(ctx, dscommon.Data{
Data: map[string]string{
Expand All @@ -385,7 +385,7 @@ func testRegisteredCallbacksWithDifferentData(t *testing.T) {

// add some data-points in the future from when we will cue the end of the funding period
// they should not affect the funding payment of this period
require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), 2000))
require.NoError(t, perpetual.SubmitDataPoint(ctx, num.UintOne(), scaleToNano(t, 2000)))
settle(ctx, dscommon.Data{
Data: map[string]string{
perp.DataSourceSpecBinding.SettlementDataProperty: "1",
Expand Down Expand Up @@ -455,6 +455,11 @@ func getTestDataPoints(t *testing.T) []*testDataPoint {
}
}

func scaleToNano(t *testing.T, secs int64) int64 {
t.Helper()
return secs * 1000000000
}

type tstPerp struct {
oe *mocks.MockOracleEngine
broker *mocks.MockBroker
Expand Down

0 comments on commit b0295de

Please sign in to comment.