Skip to content

Commit

Permalink
chore: API support for redemptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ze97286 committed Nov 5, 2024
1 parent d0ba85e commit 78e0172
Show file tree
Hide file tree
Showing 42 changed files with 5,139 additions and 2,089 deletions.
1 change: 1 addition & 0 deletions cmd/data-node/commands/start/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func (l *NodeCommand) createGRPCServer(config api.Config) *api.GRPCServer {
l.volumeRebateStatsService,
l.volumeRebateProgramService,
l.vaultService,
l.vaultRedemptionService,
)
return grpcServer
}
7 changes: 7 additions & 0 deletions cmd/data-node/commands/start/sqlsubscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type SQLSubscribers struct {
volumeRebateStatsStore *sqlstore.VolumeRebateStats
volumeRebateProgramsStore *sqlstore.VolumeRebatePrograms
vaultStore *sqlstore.Vault
vaultRedemptionStore *sqlstore.VaultRedemptions

// Services
candleService *candlesv2.Svc
Expand Down Expand Up @@ -148,6 +149,7 @@ type SQLSubscribers struct {
volumeRebateStatsService *service.VolumeRebateStats
volumeRebateProgramService *service.VolumeRebatePrograms
vaultService *service.Vault
vaultRedemptionService *service.VaultRedemptions

// Subscribers
accountSub *sqlsubscribers.Account
Expand Down Expand Up @@ -205,6 +207,7 @@ type SQLSubscribers struct {
volumeRebateStatsSub *sqlsubscribers.VolumeRebateStatsUpdated
volumeRebateProgramSub *sqlsubscribers.VolumeRebateProgram
vaultSub *sqlsubscribers.Vault
vaultRedemptionSub *sqlsubscribers.VaultRedemptions
}

func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
Expand Down Expand Up @@ -266,6 +269,7 @@ func (s *SQLSubscribers) GetSQLSubscribers() []broker.SQLBrokerSubscriber {
s.volumeRebateProgramSub,
s.volumeRebateStatsSub,
s.vaultSub,
s.vaultRedemptionSub,
}
}

Expand Down Expand Up @@ -332,6 +336,7 @@ func (s *SQLSubscribers) CreateAllStores(ctx context.Context, Log *logging.Logge
s.volumeRebateStatsStore = sqlstore.NewVolumeRebateStats(transactionalConnectionSource)
s.volumeRebateProgramsStore = sqlstore.NewVolumeRebatePrograms(transactionalConnectionSource)
s.vaultStore = sqlstore.NewVault(transactionalConnectionSource)
s.vaultRedemptionStore = sqlstore.NewVaultRedemptions(transactionalConnectionSource)
}

func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger, cfg service.Config, candlesConfig candlesv2.Config) error {
Expand Down Expand Up @@ -392,6 +397,7 @@ func (s *SQLSubscribers) SetupServices(ctx context.Context, log *logging.Logger,
s.volumeRebateStatsService = service.NewVolumeRebateStats(s.volumeRebateStatsStore)
s.volumeRebateProgramService = service.NewVolumeRebatePrograms(s.volumeRebateProgramsStore)
s.vaultService = service.NewVault(s.vaultStore, log)
s.vaultRedemptionService = service.NewVaultRedemptions(s.vaultRedemptionStore, log)
s.marketDepthService = service.NewMarketDepth(
cfg.MarketDepth,
s.orderStore,
Expand Down Expand Up @@ -476,4 +482,5 @@ func (s *SQLSubscribers) SetupSQLSubscribers() {
s.volumeRebateProgramSub = sqlsubscribers.NewVolumeRebateProgram(s.volumeRebateProgramService)
s.ammPoolsSub = sqlsubscribers.NewAMMPools(s.ammPoolsService, s.marketDepthService)
s.vaultSub = sqlsubscribers.NewVault(s.vaultStore)
s.vaultRedemptionSub = sqlsubscribers.NewVaultRedemptions(s.vaultRedemptionStore)
}
5 changes: 3 additions & 2 deletions core/integration/steps/party_vault_interaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"code.vegaprotocol.io/vega/core/integration/stubs"
"code.vegaprotocol.io/vega/core/vault"
"code.vegaprotocol.io/vega/libs/crypto"
"code.vegaprotocol.io/vega/libs/num"

"github.com/cucumber/godog"
Expand All @@ -46,7 +47,7 @@ func PartiesWithdrawFromVault(vs *vault.VaultService, vault string, table *godog
for _, r := range parseVaultTxTable(table) {
row := vaultTxRow{row: r}
amt := num.MustUintFromString(row.Amount(), 10)
err := vs.WithdrawFromVault(context.Background(), row.Party(), vault, amt)
err := vs.WithdrawFromVault(context.Background(), crypto.RandomHash(), row.Party(), vault, amt)
if err == nil && row.row.HasColumn("error") && len(row.row.MustStr("error")) > 0 {
return fmt.Errorf("expected error (%s) in withdraw from vault (%s) by party (%s) but no error found", row.row.MustStr("error"), vault, row.Party())
} else if err != nil && (!row.row.HasColumn("error") || len(row.row.MustStr("error")) == 0) {
Expand All @@ -65,7 +66,7 @@ func RedemptionRequestsHasTheFollowingState(broker *stubs.BrokerStub, vs *vault.
found := false
for _, rre := range rrEvents {
rr := rre.StreamMessage().GetRedemptionRequest()
if rr.Party == row.Party() && rr.RequestedAmount == row.RequestedAmount() && rr.RemainingAmount == row.RemainingAmount() &&
if rr.PartyId == row.Party() && rr.RequestedAmount == row.RequestedAmount() && rr.RemainingAmount == row.RemainingAmount() &&
rr.Status.String() == row.Status() {
if r.HasColumn("eligibility date") && row.row.MustI64("eligibility date") != rr.Date {
continue
Expand Down
2 changes: 1 addition & 1 deletion core/processor/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -3539,7 +3539,7 @@ func (app *App) WithdrawFromVault(ctx context.Context, tx abci.Tx) error {
} else {
amt, _ = num.UintFromString(params.Amount, 10)
}
return app.vaultService.WithdrawFromVault(ctx, tx.Party(), params.VaultId, amt)
return app.vaultService.WithdrawFromVault(ctx, hex.EncodeToString(tx.Hash()), tx.Party(), params.VaultId, amt)
}

func (app *App) OnBlockchainPrimaryEthereumConfigUpdate(_ context.Context, conf any) error {
Expand Down
8 changes: 4 additions & 4 deletions core/processor/mocks/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type VaultService interface {
UpdateVault(ctx context.Context, vault *types.Vault) error
ChangeVaultOwnership(ctx context.Context, vaultID, owner, newOwner string) error
DepositToVault(ctx context.Context, party, vaultKey string, amount *num.Uint) error
WithdrawFromVault(ctx context.Context, party, vaultKey string, amount *num.Uint) error
WithdrawFromVault(ctx context.Context, requestID, party, vaultKey string, amount *num.Uint) error
}

type GovernanceEngine interface {
Expand Down
4 changes: 4 additions & 0 deletions core/vault/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (vs *VaultService) GetState(k string) ([]byte, []types.StateProvider, error
redemptionQueue := make([]*snapshotpb.RedeemRequest, 0, len(vault.redeemQueue))
for _, rr := range vault.redeemQueue {
redemptionQueue = append(redemptionQueue, &snapshotpb.RedeemRequest{
RequestId: rr.RequestID,
Party: rr.Party,
Date: rr.Date.UnixNano(),
Amount: rr.Amount.String(),
Expand All @@ -64,6 +65,7 @@ func (vs *VaultService) GetState(k string) ([]byte, []types.StateProvider, error
lateRedemptions := make([]*snapshotpb.RedeemRequest, 0, len(vault.lateRedemptions))
for _, rr := range vault.lateRedemptions {
lateRedemptions = append(lateRedemptions, &snapshotpb.RedeemRequest{
RequestId: rr.RequestID,
Party: rr.Party,
Date: rr.Date.UnixNano(),
Amount: rr.Amount.String(),
Expand Down Expand Up @@ -121,6 +123,7 @@ func (vs *VaultService) LoadState(_ context.Context, p *types.Payload) ([]types.
redeemQueue := make([]*RedeemRequest, 0, len(v.RedeemQueue))
for _, rr := range v.RedeemQueue {
redeemQueue = append(redeemQueue, &RedeemRequest{
RequestID: rr.RequestId,
Party: rr.Party,
Date: time.Unix(0, rr.Date),
Amount: num.MustUintFromString(rr.Amount, 10),
Expand All @@ -132,6 +135,7 @@ func (vs *VaultService) LoadState(_ context.Context, p *types.Payload) ([]types.
lateRedemptions := make([]*RedeemRequest, 0, len(v.LateRedemptions))
for _, rr := range v.LateRedemptions {
lateRedemptions = append(lateRedemptions, &RedeemRequest{
RequestID: rr.RequestId,
Party: rr.Party,
Date: time.Unix(0, rr.Date),
Amount: num.MustUintFromString(rr.Amount, 10),
Expand Down
7 changes: 5 additions & 2 deletions core/vault/vault.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
)

type RedeemRequest struct {
RequestID string
Party string
Date time.Time
Amount *num.Uint
Expand Down Expand Up @@ -209,7 +210,7 @@ func (vs *VaultState) ProcessLateRedemptions(ctx context.Context, now time.Time)
}

// WithdrawFromVault generate a new redeem request in the redeem queue with the time corresponding to now + the cutoff period (in days).
func (vs *VaultState) WithdrawFromVault(ctx context.Context, party string, amount *num.Uint, now time.Time) error {
func (vs *VaultState) WithdrawFromVault(ctx context.Context, requestID, party string, amount *num.Uint, now time.Time) error {
if vs.status != types.VaultStatusActive {
return fmt.Errorf("vault is not active")
}
Expand All @@ -226,6 +227,7 @@ func (vs *VaultState) WithdrawFromVault(ctx context.Context, party string, amoun
}
}
rr := &RedeemRequest{
RequestID: requestID,
Party: party,
Amount: amount.Clone(),
Remaining: amount.Clone(),
Expand Down Expand Up @@ -568,9 +570,10 @@ func (vs *VaultState) GetVaultStatus() types.VaultStatus {

func (vs *VaultState) NewRedmptionRequest(r *RedeemRequest) *eventspb.RedemptionRequest {
return &eventspb.RedemptionRequest{
RequestId: r.RequestID,
VaultId: vs.vault.ID,
Asset: vs.vault.Asset,
Party: r.Party,
PartyId: r.Party,
Date: r.Date.UnixNano(),
LastUpdate: r.LastUpdated.UnixNano(),
Status: r.Status,
Expand Down
4 changes: 2 additions & 2 deletions core/vault/vault_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ func (vs *VaultService) DepositToVault(ctx context.Context, party, vaultKey stri
}

// WithdrawFromVault generates a pending redeem request and adds it to the queue.
func (vs *VaultService) WithdrawFromVault(ctx context.Context, party, vaultKey string, amount *num.Uint) error {
func (vs *VaultService) WithdrawFromVault(ctx context.Context, requestID, party, vaultKey string, amount *num.Uint) error {
if _, ok := vs.vaultIdToVault[vaultKey]; !ok {
return fmt.Errorf("vault does not exist")
}
vault := vs.vaultIdToVault[vaultKey]
return vault.WithdrawFromVault(ctx, party, amount, vs.timeService.GetTimeNow())
return vault.WithdrawFromVault(ctx, requestID, party, amount, vs.timeService.GetTimeNow())
}

// OnTick is called for every new block. We do the following for each vault:
Expand Down
9 changes: 5 additions & 4 deletions core/vault/vault_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"code.vegaprotocol.io/vega/core/types"
"code.vegaprotocol.io/vega/core/vault"
"code.vegaprotocol.io/vega/core/vault/mocks"
"code.vegaprotocol.io/vega/libs/crypto"
"code.vegaprotocol.io/vega/libs/num"
"code.vegaprotocol.io/vega/logging"
"code.vegaprotocol.io/vega/protos/vega"
Expand Down Expand Up @@ -621,10 +622,10 @@ func TestGetRedemptionRequestForADate(t *testing.T) {

// p1 is making a request 3 days before the next redemption date
col.EXPECT().GetVaultBalance(gomock.Any(), gomock.Any()).Return(num.NewUint(200), nil).Times(1)
vault.WithdrawFromVault(ctx, "p1", num.NewUint(25), now)
vault.WithdrawFromVault(ctx, crypto.RandomHash(), "p1", num.NewUint(25), now)
// p2 is making a request 2 days before the next redemption date
col.EXPECT().GetVaultBalance(gomock.Any(), gomock.Any()).Return(num.NewUint(200), nil).Times(1)
vault.WithdrawFromVault(ctx, "p2", num.NewUint(25), now.Add(24*time.Hour))
vault.WithdrawFromVault(ctx, crypto.RandomHash(), "p2", num.NewUint(25), now.Add(24*time.Hour))

// with a cutoff of 3 days we expect only p1's request to be included in the first redemption date
redemptionRequests := vault.GetRedemptionRequestForDate(now.Add(3 * 24 * time.Hour))
Expand Down Expand Up @@ -825,8 +826,8 @@ func TestProcessWithdrawals(t *testing.T) {
// party1 has 75% of the vault (balance is 200)
// time now is 3 days before the first withdraw date so we don't expect anything to happen when we process withdrawals
vault.col.EXPECT().GetVaultBalance(gomock.Any(), gomock.Any()).Return(num.NewUint(200), nil).Times(2)
vault.WithdrawFromVault(ctx, "p1", num.NewUint(40), vault.now)
vault.WithdrawFromVault(ctx, "p1", num.NewUint(30), vault.now.Add(24*time.Hour))
vault.WithdrawFromVault(ctx, crypto.RandomHash(), "p1", num.NewUint(40), vault.now)
vault.WithdrawFromVault(ctx, crypto.RandomHash(), "p1", num.NewUint(30), vault.now.Add(24*time.Hour))

// we're ahead of the first redemption date so nothing should happen
vault.ProcessWithdrawals(ctx, vault.now)
Expand Down
4 changes: 4 additions & 0 deletions datanode/api/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ type GRPCServer struct {
volumeRebateStatsService *service.VolumeRebateStats
volumeRebateProgramService *service.VolumeRebatePrograms
vaultService *service.Vault
vaultRedemptionService *service.VaultRedemptions

eventObserver *eventObserver

Expand Down Expand Up @@ -300,6 +301,7 @@ func NewGRPCServer(
volumeRebateStatsService *service.VolumeRebateStats,
volumeRebateProgramsService *service.VolumeRebatePrograms,
vaultService *service.Vault,
vaultRedemptionService *service.VaultRedemptions,
) *GRPCServer {
// setup logger
log = log.Named(namedLogger)
Expand Down Expand Up @@ -374,6 +376,7 @@ func NewGRPCServer(
volumeRebateStatsService: volumeRebateStatsService,
volumeRebateProgramService: volumeRebateProgramsService,
vaultService: vaultService,
vaultRedemptionService: vaultRedemptionService,
eventObserver: &eventObserver{
log: log,
eventService: eventService,
Expand Down Expand Up @@ -627,6 +630,7 @@ func (g *GRPCServer) Start(ctx context.Context, lis net.Listener) error {
volumeRebateProgramService: g.volumeRebateProgramService,
partyDiscountStats: partyDiscountStats,
vaultService: g.vaultService,
vaultRedemptionService: g.vaultRedemptionService,
}

protoapi.RegisterTradingDataServiceServer(g.srv, tradingDataSvcV2)
Expand Down
36 changes: 36 additions & 0 deletions datanode/api/trading_data_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type TradingDataServiceV2 struct {
AMMPoolService AMMService
partyDiscountStats PartyStatsSvc
vaultService *service.Vault
vaultRedemptionService *service.VaultRedemptions
}

func (t *TradingDataServiceV2) SetLogger(l *logging.Logger) {
Expand Down Expand Up @@ -217,6 +218,41 @@ func (t *TradingDataServiceV2) ListVaults(ctx context.Context, req *v2.ListVault
}, nil
}

func (t *TradingDataServiceV2) ListVaultRedemptionRequests(ctx context.Context, req *v2.ListVaultsRedemptionRequestsRequest) (*v2.ListVaultRedemptionRequestsResponse, error) {
pagination, err := entities.CursorPaginationFromProto(req.Pagination)
if err != nil {
return nil, formatE(ErrInvalidPagination, err)
}
redemptionRequests, pageInfo, err := t.vaultRedemptionService.ListRedemptionRequestsWithCursor(ctx, req.VaultIds, req.PartyIds, req.AssetIds, req.Statuses, pagination)
if err != nil {
return nil, err
}
edges := make([]*v2.RedemptionRequestEdge, 0, len(redemptionRequests))
for _, rr := range redemptionRequests {
event := &v1.RedemptionRequest{
RequestId: rr.RequestID.String(),
VaultId: rr.VaultID.String(),
PartyId: rr.PartyID.String(),
Asset: rr.Asset.String(),
Date: rr.EligibilityDate.UnixNano(),
LastUpdate: rr.LastUpdated.UnixNano(),
RequestedAmount: rr.RequestedAmount.String(),
RemainingAmount: rr.RemainingAmount.String(),
Status: vega.RedeemStatus(rr.Status),
}
edges = append(edges, &v2.RedemptionRequestEdge{
Node: event,
Cursor: rr.Cursor().Encode(),
})
}
return &v2.ListVaultRedemptionRequestsResponse{
VaultRedemptionRequests: &v2.RedemptionRequestConnection{
Edges: edges,
PageInfo: pageInfo.ToProto(),
},
}, nil
}

func (t *TradingDataServiceV2) GetVestingBalancesSummary(
ctx context.Context,
req *v2.GetVestingBalancesSummaryRequest,
Expand Down
3 changes: 2 additions & 1 deletion datanode/api/trading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
volumeRebateStatsService := service.NewVolumeRebateStats(sqlstore.NewVolumeRebateStats(sqlConn))
volumeRebateProgramssService := service.NewVolumeRebatePrograms(sqlstore.NewVolumeRebatePrograms(sqlConn))
vaultService := service.NewVault(sqlstore.NewVault(sqlConn), logger)

vaultRedemptionService := service.NewVaultRedemptions(sqlstore.NewVaultRedemptions(sqlConn), logger)
g := api.NewGRPCServer(
logger,
conf.API,
Expand Down Expand Up @@ -231,6 +231,7 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
volumeRebateStatsService,
volumeRebateProgramssService,
vaultService,
vaultRedemptionService,
)
if g == nil {
err = fmt.Errorf("failed to create gRPC server")
Expand Down
2 changes: 1 addition & 1 deletion datanode/entities/entities.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Entities interface {
LiquidityProvider | FundingPeriod | FundingPeriodDataPoint | ReferralSet | ReferralSetRefereeStats |
FlattenReferralSetStats | Team | TeamMember | TeamMemberHistory | FundingPayment | FlattenVolumeDiscountStats |
PaidLiquidityFeesStats | CurrentAndPreviousLiquidityProvisions | TransferDetails | Game | TeamsStatistics | TeamMembersStatistics |
PartyMarginMode | PartyProfile | GamePartyScore | GameTeamScore | AMMPool | FlattenVolumeRebateStats | VaultState
PartyMarginMode | PartyProfile | GamePartyScore | GameTeamScore | AMMPool | FlattenVolumeRebateStats | VaultState | RedemptionRequest
}

type PagedEntity[T proto.Message] interface {
Expand Down
33 changes: 30 additions & 3 deletions datanode/entities/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -1143,9 +1143,9 @@ type VaultStatus vega.VaultStatus

const (
VaultStatusUnspecified = VaultStatus(vega.VaultStatus_VAULT_STATUS_UNSPECIFIED)
VaultStatusActive = AssetStatus(vega.VaultStatus_VAULT_STATUS_ACTIVE)
VaultStatusStopping = AssetStatus(vega.VaultStatus_VAULT_STATUS_STOPPING)
VaultStatusStopped = AssetStatus(vega.VaultStatus_VAULT_STATUS_STOPPED)
VaultStatusActive = VaultStatus(vega.VaultStatus_VAULT_STATUS_ACTIVE)
VaultStatusStopping = VaultStatus(vega.VaultStatus_VAULT_STATUS_STOPPING)
VaultStatusStopped = VaultStatus(vega.VaultStatus_VAULT_STATUS_STOPPED)
)

func (m VaultStatus) EncodeText(_ *pgtype.ConnInfo, buf []byte) ([]byte, error) {
Expand All @@ -1165,3 +1165,30 @@ func (m *VaultStatus) DecodeText(_ *pgtype.ConnInfo, src []byte) error {
*m = VaultStatus(val)
return nil
}

type RedeemStatus vega.RedeemStatus

const (
RedeemStatusUnspecified = RedeemStatus(vega.RedeemStatus_REDEEM_STATUS_UNSPECIFIED)
RedeemStatusPending = RedeemStatus(vega.RedeemStatus_REDEEM_STATUS_PENDING)
RedeemStatusLate = RedeemStatus(vega.RedeemStatus_REDEEM_STATUS_LATE)
RedeemStatusCompleted = RedeemStatus(vega.RedeemStatus_REDEEM_STATUS_COMPLETED)
)

func (m RedeemStatus) EncodeText(_ *pgtype.ConnInfo, buf []byte) ([]byte, error) {
mode, ok := vega.RedeemStatus_name[int32(m)]
if !ok {
return buf, fmt.Errorf("unknown redeem status: %s", mode)
}
return append(buf, []byte(mode)...), nil
}

func (m *RedeemStatus) DecodeText(_ *pgtype.ConnInfo, src []byte) error {
val, ok := vega.RedeemStatus_value[string(src)]
if !ok {
return fmt.Errorf("unknown redeem status: %s", src)
}

*m = RedeemStatus(val)
return nil
}
Loading

0 comments on commit 78e0172

Please sign in to comment.