Skip to content

Commit

Permalink
fix: save history of seen ethereum verifier events
Browse files Browse the repository at this point in the history
  • Loading branch information
wwestgarth authored and jeremyletang committed Mar 4, 2024
1 parent d2fc8bd commit b8d740f
Show file tree
Hide file tree
Showing 11 changed files with 3,296 additions and 2,659 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

- [10722](https://github.com/vegaprotocol/vega/issues/10722) - Team API aggregation does not aggregate from the latest epoch.
- [10743](https://github.com/vegaprotocol/vega/issues/10722) - When SLA hysteresis epochs are updated resize the ring using new value.
- [10811](https://github.com/vegaprotocol/vega/issues/10811) - Ethereum call events are now properly `deduplicated` when restoring from a snapshot.
- [10725](https://github.com/vegaprotocol/vega/issues/10725) - Batch proposal votes to contain `ELS` per market.
- [10744](https://github.com/vegaprotocol/vega/issues/10744) - Prevent governance suspension of a market already governance suspended.

Expand Down
3 changes: 3 additions & 0 deletions core/datasource/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var (
// ErrDuplicatedEthereumCallEvent is returned when there is a duplicated Ethereum event.
ErrDuplicatedEthereumCallEvent = errors.New("duplicated call event")

// ErrEthereumCallEventTooOld is returned when an old Ethereum event is received by the network.
ErrEthereumCallEventTooOld = errors.New("call event is too old")

// ErrDuplicatedEthereumCallEvent is returned when no deterministic time is given to set the next time trigger
// repetition.
ErrMissingTimeForSettingTriggerRepetition = errors.New("missing time for setting trigger repetition")
Expand Down
98 changes: 98 additions & 0 deletions core/datasource/external/ethverifier/acked_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (C) 2023 Gobalsky Labs Limited
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package ethverifier

import (
"github.com/emirpasic/gods/sets/treeset"
"github.com/emirpasic/gods/utils"
)

type ackedEvtBucket struct {
ts int64
hashes []string
}

func ackedEvtBucketComparator(a, b interface{}) int {
bucket1 := a.(*ackedEvtBucket)
bucket2 := b.(*ackedEvtBucket)
return utils.Int64Comparator(bucket1.ts, bucket2.ts)
}

type ackedEvents struct {
timeService TimeService
events *treeset.Set // we only care about the key
}

func (a *ackedEvents) AddAt(ts int64, hashes ...string) {
_, value := a.events.Find(func(i int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
return bucket.ts == ts
})

if value != nil {
bucket := value.(*ackedEvtBucket)
for _, newHash := range hashes {
found := false
for _, v := range bucket.hashes {
// hash already exists
if v == newHash {
found = true
break
}
}

if !found {
bucket.hashes = append(bucket.hashes, newHash)
}
}

return
}

a.events.Add(&ackedEvtBucket{ts: ts, hashes: append([]string{}, hashes...)})
}

func (a *ackedEvents) Add(hash string) {
a.AddAt(a.timeService.GetTimeNow().Unix(), hash)
}

func (a *ackedEvents) Contains(hash string) bool {
_, value := a.events.Find(func(index int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
for _, v := range bucket.hashes {
if hash == v {
return true
}
}

return false
})

return value != nil
}

func (a *ackedEvents) RemoveBefore(ts int64) {
set := a.events.Select(func(index int, value interface{}) bool {
bucket := value.(*ackedEvtBucket)
return bucket.ts <= ts
})

a.events.Remove(set.Values()...)
}

func (a *ackedEvents) Size() int {
return a.events.Size()
}
30 changes: 28 additions & 2 deletions core/datasource/external/ethverifier/l2_verifier_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,27 @@ func (s *L2Verifiers) GetState(k string) ([]byte, []types.StateProvider, error)

for k, v := range s.verifiers {
s.log.Debug("serialising state for evm verifier", logging.String("source-chain-id", k))

slice := make([]*snapshotpb.EthVerifierBucket, 0, v.ackedEvts.Size())
iter := v.ackedEvts.events.Iterator()
for iter.Next() {
v := (iter.Value().(*ackedEvtBucket))
slice = append(slice, &snapshotpb.EthVerifierBucket{
Ts: v.ts,
Hashes: v.hashes,
})
}

ethOracles.L2EthOracles.ChainIdEthOracles = append(
ethOracles.L2EthOracles.ChainIdEthOracles,
&snapshotpb.ChainIdEthOracles{
SourceChainId: k,
LastBlock: v.lastEthBlockPayloadData().IntoProto().EthOracleVerifierLastBlock,
LastBlock: v.ethBlockPayloadData(v.lastBlock).IntoProto().EthOracleVerifierLastBlock,
CallResults: v.pendingContractCallEventsPayloadData().IntoProto().EthContractCallResults,
Misc: &snapshotpb.EthOracleVerifierMisc{
Buckets: slice,
PatchBlock: v.ethBlockPayloadData(v.patchBlock).IntoProto().EthOracleVerifierLastBlock,
},
},
)
}
Expand Down Expand Up @@ -111,7 +126,18 @@ func (s *L2Verifiers) restoreState(ctx context.Context, l2EthOracles *snapshotpb
Time: v.LastBlock.BlockTime,
}
}
verifier.restoreLastEthBlock(lastBlock)

var patchBlock *types.EthBlock
if v.Misc.PatchBlock != nil {
lastBlock = &types.EthBlock{
Height: v.Misc.PatchBlock.BlockHeight,
Time: v.Misc.PatchBlock.BlockTime,
}
}

verifier.restoreLastEthBlock(ctx, lastBlock)
verifier.restorePatchBlock(ctx, patchBlock)
verifier.restoreSeen(v.Misc.Buckets)

pending := []*ethcall.ContractCallEvent{}

Expand Down
48 changes: 40 additions & 8 deletions core/datasource/external/ethverifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,12 @@ import (
"code.vegaprotocol.io/vega/logging"
vegapb "code.vegaprotocol.io/vega/protos/vega"
datapb "code.vegaprotocol.io/vega/protos/vega/data/v1"

"github.com/emirpasic/gods/sets/treeset"
)

const keepHashesDuration = 24 * 2 * time.Hour

//go:generate go run github.com/golang/mock/mockgen -destination mocks/witness_mock.go -package mocks code.vegaprotocol.io/vega/core/datasource/external/ethverifier Witness
type Witness interface {
StartCheckWithDelay(validators.Resource, func(interface{}, bool), time.Time, int64) error
Expand Down Expand Up @@ -102,10 +106,13 @@ type Verifier struct {
pendingCallEvents []*pendingCallEvent
finalizedCallResults []*ethcall.ContractCallEvent

// the eth block height of the last seen ethereum TX
lastBlock *types.EthBlock
// the eth block height when we did the patch upgrade to fix the missing seen map
patchBlock *types.EthBlock

mu sync.Mutex
hashes map[string]struct{}
mu sync.Mutex
ackedEvts *ackedEvents
}

type pendingCallEvent struct {
Expand Down Expand Up @@ -139,29 +146,51 @@ func New(
oracleEngine: oracleBroadcaster,
ethEngine: ethCallEngine,
ethConfirmations: ethConfirmations,
hashes: map[string]struct{}{},
ackedEvts: &ackedEvents{
timeService: ts,
events: treeset.NewWith(ackedEvtBucketComparator),
},
}
return s
}

func (s *Verifier) ensureNotDuplicate(hash string) bool {
func (s *Verifier) ensureNotTooOld(callEvent ethcall.ContractCallEvent) bool {
s.mu.Lock()
defer s.mu.Unlock()

if _, ok := s.hashes[hash]; ok {
if s.patchBlock != nil && callEvent.BlockHeight < s.patchBlock.Height {
return false
}

s.hashes[hash] = struct{}{}
tt := time.Unix(int64(callEvent.BlockTime), 0)
removeBefore := s.timeService.GetTimeNow().Add(-keepHashesDuration)

return !tt.Before(removeBefore)
}

func (s *Verifier) ensureNotDuplicate(callEvent ethcall.ContractCallEvent) bool {
s.mu.Lock()
defer s.mu.Unlock()

if s.ackedEvts.Contains(callEvent.Hash()) {
return false
}

s.ackedEvts.AddAt(int64(callEvent.BlockTime), callEvent.Hash())
return true
}

// TODO: non finalized events could cause a memory leak, this needs to be addressed in a way that will prevent processing
// duplicates but not result in a memory leak, agreed to postpone for now (other verifiers have the same issue)

func (s *Verifier) ProcessEthereumContractCallResult(callEvent ethcall.ContractCallEvent) error {
if ok := s.ensureNotDuplicate(callEvent.Hash()); !ok {
if !s.ensureNotTooOld(callEvent) {
s.log.Error("historic ethereum event received",
logging.String("event", fmt.Sprintf("%+v", callEvent)))
return errors.ErrEthereumCallEventTooOld
}

if ok := s.ensureNotDuplicate(callEvent); !ok {
s.log.Error("ethereum call event already exists",
logging.String("event", fmt.Sprintf("%+v", callEvent)))
return errors.ErrDuplicatedEthereumCallEvent
Expand Down Expand Up @@ -328,6 +357,9 @@ func (s *Verifier) OnTick(ctx context.Context, t time.Time) {
s.broker.Send(events.NewOracleDataEvent(ctx, vegapb.OracleData{ExternalData: dataProto.ExternalData}))
}
}

s.finalizedCallResults = nil

// keep hashes for 2 days
removeBefore := t.Add(-keepHashesDuration)
s.ackedEvts.RemoveBefore(removeBefore.Unix())
}
Loading

0 comments on commit b8d740f

Please sign in to comment.