Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: fetching jobs and collections from asset caches #1207

Merged
merged 16 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions cache/collectionsCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cache

import (
"razor/pkg/bindings"
"sync"
)

// CollectionsCache struct to hold collection cache and associated mutex
type CollectionsCache struct {
Collections map[uint16]bindings.StructsCollection
Mu sync.RWMutex
}

// NewCollectionsCache creates a new instance of CollectionsCache
func NewCollectionsCache() *CollectionsCache {
return &CollectionsCache{
Collections: make(map[uint16]bindings.StructsCollection),
Mu: sync.RWMutex{},
}
}

func (c *CollectionsCache) GetCollection(collectionId uint16) (bindings.StructsCollection, bool) {
c.Mu.RLock()
defer c.Mu.RUnlock()

collection, exists := c.Collections[collectionId]
return collection, exists
}

func (c *CollectionsCache) UpdateCollection(collectionId uint16, updatedCollection bindings.StructsCollection) {
c.Mu.Lock()
defer c.Mu.Unlock()

c.Collections[collectionId] = updatedCollection
}
35 changes: 35 additions & 0 deletions cache/jobsCache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package cache

import (
"razor/pkg/bindings"
"sync"
)

// JobsCache struct to hold job cache and associated mutex
type JobsCache struct {
Jobs map[uint16]bindings.StructsJob
Mu sync.RWMutex
}

// NewJobsCache creates a new instance of JobsCache
func NewJobsCache() *JobsCache {
return &JobsCache{
Jobs: make(map[uint16]bindings.StructsJob),
Mu: sync.RWMutex{},
}
}

func (j *JobsCache) GetJob(jobId uint16) (bindings.StructsJob, bool) {
j.Mu.RLock()
defer j.Mu.RUnlock()

job, exists := j.Jobs[jobId]
return job, exists
}

func (j *JobsCache) UpdateJob(jobId uint16, updatedJob bindings.StructsJob) {
j.Mu.Lock()
defer j.Mu.Unlock()

j.Jobs[jobId] = updatedJob
}
14 changes: 7 additions & 7 deletions cmd/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ package cmd
import (
"encoding/hex"
"errors"
Types "github.com/ethereum/go-ethereum/core/types"
"math/big"
"razor/cache"
"razor/client"
"razor/core"
"razor/core/types"
"razor/pkg/bindings"
"razor/utils"
"sync"
"time"

Types "github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
solsha3 "github.com/miguelmota/go-solidity-sha3"
Expand Down Expand Up @@ -58,7 +58,7 @@ func (*UtilsStruct) GetSalt(client *ethclient.Client, epoch uint32) ([32]byte, e
HandleCommitState fetches the collections assigned to the staker and creates the leaves required for the merkle tree generation.
Values for only the collections assigned to the staker is fetched for others, 0 is added to the leaves of tree.
*/
func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, seed []byte, httpClient *client.HttpClient, rogueData types.Rogue) (types.CommitData, error) {
func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, seed []byte, commitParams types.CommitParams, rogueData types.Rogue) (types.CommitData, error) {
numActiveCollections, err := razorUtils.GetNumActiveCollections(client)
if err != nil {
return types.CommitData{}, err
Expand All @@ -80,7 +80,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
var wg sync.WaitGroup

log.Debug("Creating a local cache which will store API result and expire at the end of commit state")
localCache := cache.NewLocalCache(time.Second * time.Duration(core.StateLength))
commitParams.LocalCache = cache.NewLocalCache(time.Second * time.Duration(core.StateLength))

log.Debug("Iterating over all the collections...")
for i := 0; i < int(numActiveCollections); i++ {
Expand All @@ -97,7 +97,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
errChan <- err
return
}
collectionData, err := razorUtils.GetAggregatedDataOfCollection(client, collectionId, epoch, localCache, httpClient)
collectionData, err := razorUtils.GetAggregatedDataOfCollection(client, collectionId, epoch, commitParams)
if err != nil {
log.Error("Error in getting aggregated data of collection: ", err)
errChan <- err
Expand Down Expand Up @@ -129,7 +129,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
if err != nil {
// Returning the first error from the error channel
log.Error("Error in getting collection data: ", err)
localCache.StopCleanup()
commitParams.LocalCache.StopCleanup()
return types.CommitData{}, err
}
}
Expand All @@ -139,7 +139,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
log.Debug("HandleCommitState: SeqAllottedCollections: ", seqAllottedCollections)
log.Debug("HandleCommitState: Leaves: ", leavesOfTree)

localCache.StopCleanup()
commitParams.LocalCache.StopCleanup()
ashish10677 marked this conversation as resolved.
Show resolved Hide resolved

return types.CommitData{
AssignedCollections: assignedCollections,
Expand Down
19 changes: 10 additions & 9 deletions cmd/commit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/ethereum/go-ethereum/ethclient"
"github.com/stretchr/testify/mock"
"math/big"
clientPkg "razor/client"
"razor/core"
"razor/core/types"
"razor/pkg/bindings"
Expand Down Expand Up @@ -121,9 +120,10 @@ func TestCommit(t *testing.T) {

func TestHandleCommitState(t *testing.T) {
var (
client *ethclient.Client
epoch uint32
seed []byte
client *ethclient.Client
epoch uint32
seed []byte
commitParams types.CommitParams
)

rogueValue := big.NewInt(1111)
Expand Down Expand Up @@ -234,7 +234,7 @@ func TestHandleCommitState(t *testing.T) {
utilsMock.On("GetRogueRandomValue", mock.Anything).Return(rogueValue)

utils := &UtilsStruct{}
got, err := utils.HandleCommitState(client, epoch, seed, &clientPkg.HttpClient{}, tt.args.rogueData)
got, err := utils.HandleCommitState(client, epoch, seed, commitParams, tt.args.rogueData)
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("Data from HandleCommitState function, got = %v, want = %v", got, tt.want)
}
Expand Down Expand Up @@ -372,9 +372,10 @@ func TestGetSalt(t *testing.T) {

func BenchmarkHandleCommitState(b *testing.B) {
var (
client *ethclient.Client
epoch uint32
seed []byte
client *ethclient.Client
epoch uint32
seed []byte
commitParams types.CommitParams
)

rogueValue := big.NewInt(1111)
Expand All @@ -399,7 +400,7 @@ func BenchmarkHandleCommitState(b *testing.B) {
utilsMock.On("GetRogueRandomValue", mock.Anything).Return(rogueValue)

ut := &UtilsStruct{}
_, err := ut.HandleCommitState(client, epoch, seed, &clientPkg.HttpClient{}, types.Rogue{IsRogue: false})
_, err := ut.HandleCommitState(client, epoch, seed, commitParams, types.Rogue{IsRogue: false})
if err != nil {
log.Fatal(err)
}
Expand Down
175 changes: 175 additions & 0 deletions cmd/eventListeners.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package cmd

import (
"context"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
Types "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
"math/big"
"razor/cache"
"razor/core"
"razor/pkg/bindings"
"razor/utils"
"strings"
"time"
)

func (*UtilsStruct) InitAssetCache(client *ethclient.Client) (*cache.JobsCache, *cache.CollectionsCache, error) {
log.Info("INITIALIZING JOBS AND COLLECTIONS CACHE...")

// Create instances of cache
jobsCache := cache.NewJobsCache()
collectionsCache := cache.NewCollectionsCache()

// Initialize caches
if err := utils.InitJobsCache(client, jobsCache); err != nil {
log.Error("Error in initializing jobs cache: ", err)
return nil, nil, err
}
if err := utils.InitCollectionsCache(client, collectionsCache); err != nil {
log.Error("Error in initializing collections cache: ", err)
return nil, nil, err
}

go scheduleResetCache(client, jobsCache, collectionsCache)

latestHeader, err := clientUtils.GetLatestBlockWithRetry(client)
if err != nil {
log.Error("Error in fetching block: ", err)
return nil, nil, err
}
log.Debugf("initAssetCache: Latest header value: %d", latestHeader.Number)

fromBlock, err := razorUtils.EstimateBlockNumberAtEpochBeginning(client, latestHeader.Number)
if err != nil {
log.Error("Error in estimating block number at epoch beginning: ", err)
return nil, nil, err
}

// Start listeners for job and collection updates, passing the caches as arguments
go startListener(client, fromBlock, time.Second*time.Duration(core.AssetUpdateListenerInterval), jobsCache, collectionsCache)

return jobsCache, collectionsCache, nil
}

// startListener starts a generic listener for blockchain events that handles multiple event types.
func startListener(client *ethclient.Client, fromBlock *big.Int, interval time.Duration, jobsCache *cache.JobsCache, collectionsCache *cache.CollectionsCache) {
// Will start listening for asset update events from confirm state
_, err := cmdUtils.WaitForAppropriateState(client, "start event listener for asset update events", 4)
if err != nil {
log.Error("Error in waiting for appropriate state for starting event listener: ", err)
return
}

// Sleeping till half of the confirm state to start listening for events in interval of duration core.AssetUpdateListenerInterval
log.Debug("Will start listening for asset update events after half of the confirm state passes, sleeping till then...")
time.Sleep(time.Second * time.Duration(core.StateLength/2))

collectionManagerContractABI, err := abi.JSON(strings.NewReader(bindings.CollectionManagerMetaData.ABI))
if err != nil {
log.Errorf("Error in parsing contract ABI: %v", err)
return
}

ticker := time.NewTicker(interval)
defer ticker.Stop()

eventNames := []string{"JobUpdated", "CollectionUpdated", "CollectionActivityStatus", "JobCreated", "CollectionCreated"}

log.Debugf("Starting to listen for asset update events from now in interval of every %v ...", interval)
for range ticker.C {
log.Debug("Checking for asset update events...")
toBlock, err := clientUtils.GetLatestBlockWithRetry(client)
if err != nil {
log.Error("Error in getting latest block to start event listener: ", err)
continue
}

processEvents(client, collectionManagerContractABI, fromBlock, toBlock.Number, eventNames, jobsCache, collectionsCache)

// Update fromBlock for the next interval
fromBlock = new(big.Int).Add(toBlock.Number, big.NewInt(1))
}
}

// processEvents fetches and processes logs for multiple event types.
func processEvents(client *ethclient.Client, contractABI abi.ABI, fromBlock, toBlock *big.Int, eventNames []string, jobsCache *cache.JobsCache, collectionsCache *cache.CollectionsCache) {
logs, err := getEventLogs(client, fromBlock, toBlock)
if err != nil {
log.Errorf("Failed to fetch logs: %v", err)
return
}

for _, eventName := range eventNames {
eventID := contractABI.Events[eventName].ID.Hex()
for _, vLog := range logs {
if len(vLog.Topics) > 0 && vLog.Topics[0].Hex() == eventID {
switch eventName {
case "JobUpdated", "JobCreated":
jobId := utils.ConvertHashToUint16(vLog.Topics[1])
ashish10677 marked this conversation as resolved.
Show resolved Hide resolved
updatedJob, err := utils.UtilsInterface.GetActiveJob(client, jobId)
ashish10677 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf("Error in getting job with job Id %v: %v", jobId, err)
continue
}
log.Debugf("RECEIVED ASSET UPDATE: Updating the job with Id %v with details %+v...", jobId, updatedJob)
jobsCache.UpdateJob(jobId, updatedJob)
case "CollectionUpdated", "CollectionCreated", "CollectionActivityStatus":
collectionId := utils.ConvertHashToUint16(vLog.Topics[1])
newCollection, err := utils.UtilsInterface.GetCollection(client, collectionId)
if err != nil {
log.Errorf("Error in getting collection with collection Id %v: %v", collectionId, err)
continue
}
log.Debugf("RECEIVED ASSET UPDATE: Updating the collection with ID %v with details %+v", collectionId, newCollection)
collectionsCache.UpdateCollection(collectionId, newCollection)
}
}
}
}
}

// getEventLogs is a utility function to fetch the event logs
func getEventLogs(client *ethclient.Client, fromBlock *big.Int, toBlock *big.Int) ([]Types.Log, error) {
// Set up the query for filtering logs
query := ethereum.FilterQuery{
FromBlock: fromBlock,
ToBlock: toBlock,
Addresses: []common.Address{
common.HexToAddress(core.CollectionManagerAddress),
},
}

// Retrieve the logs
logs, err := client.FilterLogs(context.Background(), query)
if err != nil {
log.Errorf("Error in filter logs: %v", err)
return []Types.Log{}, nil
ashish10677 marked this conversation as resolved.
Show resolved Hide resolved
}

return logs, nil
}

func scheduleResetCache(client *ethclient.Client, jobsCache *cache.JobsCache, collectionsCache *cache.CollectionsCache) {
ashish10677 marked this conversation as resolved.
Show resolved Hide resolved
// Will not allow to start scheduling reset cache in confirm and commit state
// As in confirm state, updating jobs/collections cache takes place
// As in commit state, fetching of jobs/collection from cache takes place
_, err := cmdUtils.WaitForAppropriateState(client, "schedule resetting cache", 1, 2, 3)
if err != nil {
log.Error("Error in waiting for appropriate state to schedule reset cache: ", err)
return
}

log.Debugf("Scheduling reset asset cache now in interval of every %v ...", core.AssetCacheExpiry)
assetCacheTicker := time.NewTicker(time.Second * time.Duration(core.AssetCacheExpiry))
defer assetCacheTicker.Stop()

for range assetCacheTicker.C {
log.Info("ASSET CACHE EXPIRED! INITIALIZING JOBS AND COLLECTIONS CACHE AGAIN...")
if err := razorUtils.ResetAssetCache(client, jobsCache, collectionsCache); err != nil {
log.Errorf("Error resetting asset cache: %v", err)
}
}
}
Loading
Loading