Skip to content

Commit 20424c4

Browse files
authored
refactor: fetching jobs and collections from asset caches (#1207)
* feat: added jobs/collections cache structs * feat: added support for using jobs/collections caches * refactor: added time constants required for job/collection cache updates * fix: fixed resetting of cache every expiry interval * refactor: fixed error log * refactor: fix utils tests * refactor: added httpClient instance in commitParams struct * refactor: removed resetting of cache as its not required * feat: checked for job/collection events at the start of commit state in main go routine * refactor: used FilterLogswithRetry and fixed other tests * refactor: fixed benchmark * refactor: moved all the event names to constants * refactor: requested changes * refactor: removed custom http client struct and directly used inbuilt http.Client
1 parent 2ded12a commit 20424c4

21 files changed

+1310
-905
lines changed

cache/collectionsCache.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package cache
2+
3+
import (
4+
"razor/pkg/bindings"
5+
"sync"
6+
)
7+
8+
// CollectionsCache struct to hold collection cache and associated mutex
9+
type CollectionsCache struct {
10+
Collections map[uint16]bindings.StructsCollection
11+
Mu sync.RWMutex
12+
}
13+
14+
// NewCollectionsCache creates a new instance of CollectionsCache
15+
func NewCollectionsCache() *CollectionsCache {
16+
return &CollectionsCache{
17+
Collections: make(map[uint16]bindings.StructsCollection),
18+
Mu: sync.RWMutex{},
19+
}
20+
}
21+
22+
func (c *CollectionsCache) GetCollection(collectionId uint16) (bindings.StructsCollection, bool) {
23+
c.Mu.RLock()
24+
defer c.Mu.RUnlock()
25+
26+
collection, exists := c.Collections[collectionId]
27+
return collection, exists
28+
}
29+
30+
func (c *CollectionsCache) UpdateCollection(collectionId uint16, updatedCollection bindings.StructsCollection) {
31+
c.Mu.Lock()
32+
defer c.Mu.Unlock()
33+
34+
c.Collections[collectionId] = updatedCollection
35+
}

cache/jobsCache.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package cache
2+
3+
import (
4+
"razor/pkg/bindings"
5+
"sync"
6+
)
7+
8+
// JobsCache struct to hold job cache and associated mutex
9+
type JobsCache struct {
10+
Jobs map[uint16]bindings.StructsJob
11+
Mu sync.RWMutex
12+
}
13+
14+
// NewJobsCache creates a new instance of JobsCache
15+
func NewJobsCache() *JobsCache {
16+
return &JobsCache{
17+
Jobs: make(map[uint16]bindings.StructsJob),
18+
Mu: sync.RWMutex{},
19+
}
20+
}
21+
22+
func (j *JobsCache) GetJob(jobId uint16) (bindings.StructsJob, bool) {
23+
j.Mu.RLock()
24+
defer j.Mu.RUnlock()
25+
26+
job, exists := j.Jobs[jobId]
27+
return job, exists
28+
}
29+
30+
func (j *JobsCache) UpdateJob(jobId uint16, updatedJob bindings.StructsJob) {
31+
j.Mu.Lock()
32+
defer j.Mu.Unlock()
33+
34+
j.Jobs[jobId] = updatedJob
35+
}

client/httpClient.go

Lines changed: 0 additions & 26 deletions
This file was deleted.

cmd/commit.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,17 @@ package cmd
44
import (
55
"encoding/hex"
66
"errors"
7-
Types "github.com/ethereum/go-ethereum/core/types"
87
"math/big"
98
"razor/cache"
10-
"razor/client"
119
"razor/core"
1210
"razor/core/types"
1311
"razor/pkg/bindings"
1412
"razor/utils"
1513
"sync"
1614
"time"
1715

16+
Types "github.com/ethereum/go-ethereum/core/types"
17+
1818
"github.com/ethereum/go-ethereum/common"
1919
"github.com/ethereum/go-ethereum/ethclient"
2020
solsha3 "github.com/miguelmota/go-solidity-sha3"
@@ -58,7 +58,7 @@ func (*UtilsStruct) GetSalt(client *ethclient.Client, epoch uint32) ([32]byte, e
5858
HandleCommitState fetches the collections assigned to the staker and creates the leaves required for the merkle tree generation.
5959
Values for only the collections assigned to the staker is fetched for others, 0 is added to the leaves of tree.
6060
*/
61-
func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, seed []byte, httpClient *client.HttpClient, rogueData types.Rogue) (types.CommitData, error) {
61+
func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, seed []byte, commitParams *types.CommitParams, rogueData types.Rogue) (types.CommitData, error) {
6262
numActiveCollections, err := razorUtils.GetNumActiveCollections(client)
6363
if err != nil {
6464
return types.CommitData{}, err
@@ -80,7 +80,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
8080
var wg sync.WaitGroup
8181

8282
log.Debug("Creating a local cache which will store API result and expire at the end of commit state")
83-
localCache := cache.NewLocalCache(time.Second * time.Duration(core.StateLength))
83+
commitParams.LocalCache = cache.NewLocalCache(time.Second * time.Duration(core.StateLength))
8484

8585
log.Debug("Iterating over all the collections...")
8686
for i := 0; i < int(numActiveCollections); i++ {
@@ -97,7 +97,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
9797
errChan <- err
9898
return
9999
}
100-
collectionData, err := razorUtils.GetAggregatedDataOfCollection(client, collectionId, epoch, localCache, httpClient)
100+
collectionData, err := razorUtils.GetAggregatedDataOfCollection(client, collectionId, epoch, commitParams)
101101
if err != nil {
102102
log.Error("Error in getting aggregated data of collection: ", err)
103103
errChan <- err
@@ -129,7 +129,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
129129
if err != nil {
130130
// Returning the first error from the error channel
131131
log.Error("Error in getting collection data: ", err)
132-
localCache.StopCleanup()
132+
commitParams.LocalCache.StopCleanup()
133133
return types.CommitData{}, err
134134
}
135135
}
@@ -139,7 +139,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
139139
log.Debug("HandleCommitState: SeqAllottedCollections: ", seqAllottedCollections)
140140
log.Debug("HandleCommitState: Leaves: ", leavesOfTree)
141141

142-
localCache.StopCleanup()
142+
commitParams.LocalCache.StopCleanup()
143143

144144
return types.CommitData{
145145
AssignedCollections: assignedCollections,

cmd/commit_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,14 @@ import (
99
"github.com/ethereum/go-ethereum/ethclient"
1010
"github.com/stretchr/testify/mock"
1111
"math/big"
12-
clientPkg "razor/client"
12+
"razor/cache"
1313
"razor/core"
1414
"razor/core/types"
1515
"razor/pkg/bindings"
1616
"razor/utils"
1717
"reflect"
1818
"testing"
19+
"time"
1920
)
2021

2122
func TestCommit(t *testing.T) {
@@ -225,6 +226,11 @@ func TestHandleCommitState(t *testing.T) {
225226
}
226227
for _, tt := range tests {
227228
t.Run(tt.name, func(t *testing.T) {
229+
localCache := cache.NewLocalCache(time.Second * 10)
230+
commitParams := &types.CommitParams{
231+
LocalCache: localCache,
232+
}
233+
228234
SetUpMockInterfaces()
229235

230236
utilsMock.On("GetNumActiveCollections", mock.AnythingOfType("*ethclient.Client")).Return(tt.args.numActiveCollections, tt.args.numActiveCollectionsErr)
@@ -234,7 +240,7 @@ func TestHandleCommitState(t *testing.T) {
234240
utilsMock.On("GetRogueRandomValue", mock.Anything).Return(rogueValue)
235241

236242
utils := &UtilsStruct{}
237-
got, err := utils.HandleCommitState(client, epoch, seed, &clientPkg.HttpClient{}, tt.args.rogueData)
243+
got, err := utils.HandleCommitState(client, epoch, seed, commitParams, tt.args.rogueData)
238244
if !reflect.DeepEqual(got, tt.want) {
239245
t.Errorf("Data from HandleCommitState function, got = %v, want = %v", got, tt.want)
240246
}
@@ -390,6 +396,11 @@ func BenchmarkHandleCommitState(b *testing.B) {
390396
for _, v := range table {
391397
b.Run(fmt.Sprintf("Number_Of_Active_Collections%d", v.numActiveCollections), func(b *testing.B) {
392398
for i := 0; i < b.N; i++ {
399+
localCache := cache.NewLocalCache(time.Second * 10)
400+
commitParams := &types.CommitParams{
401+
LocalCache: localCache,
402+
}
403+
393404
SetUpMockInterfaces()
394405

395406
utilsMock.On("GetNumActiveCollections", mock.AnythingOfType("*ethclient.Client")).Return(v.numActiveCollections, nil)
@@ -399,7 +410,7 @@ func BenchmarkHandleCommitState(b *testing.B) {
399410
utilsMock.On("GetRogueRandomValue", mock.Anything).Return(rogueValue)
400411

401412
ut := &UtilsStruct{}
402-
_, err := ut.HandleCommitState(client, epoch, seed, &clientPkg.HttpClient{}, types.Rogue{IsRogue: false})
413+
_, err := ut.HandleCommitState(client, epoch, seed, commitParams, types.Rogue{IsRogue: false})
403414
if err != nil {
404415
log.Fatal(err)
405416
}

cmd/eventListeners.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package cmd
2+
3+
import (
4+
"github.com/ethereum/go-ethereum"
5+
"github.com/ethereum/go-ethereum/accounts/abi"
6+
"github.com/ethereum/go-ethereum/common"
7+
Types "github.com/ethereum/go-ethereum/core/types"
8+
"github.com/ethereum/go-ethereum/ethclient"
9+
"math/big"
10+
"razor/cache"
11+
"razor/core"
12+
"razor/core/types"
13+
"razor/pkg/bindings"
14+
"razor/utils"
15+
"strings"
16+
)
17+
18+
func (*UtilsStruct) InitJobAndCollectionCache(client *ethclient.Client) (*cache.JobsCache, *cache.CollectionsCache, *big.Int, error) {
19+
initAssetCacheBlock, err := clientUtils.GetLatestBlockWithRetry(client)
20+
if err != nil {
21+
log.Error("Error in fetching block: ", err)
22+
return nil, nil, nil, err
23+
}
24+
log.Debugf("InitJobAndCollectionCache: Latest header value when initializing jobs and collections cache: %d", initAssetCacheBlock.Number)
25+
26+
log.Info("INITIALIZING JOBS AND COLLECTIONS CACHE...")
27+
28+
// Create instances of cache
29+
jobsCache := cache.NewJobsCache()
30+
collectionsCache := cache.NewCollectionsCache()
31+
32+
// Initialize caches
33+
if err := utils.InitJobsCache(client, jobsCache); err != nil {
34+
log.Error("Error in initializing jobs cache: ", err)
35+
return nil, nil, nil, err
36+
}
37+
if err := utils.InitCollectionsCache(client, collectionsCache); err != nil {
38+
log.Error("Error in initializing collections cache: ", err)
39+
return nil, nil, nil, err
40+
}
41+
42+
return jobsCache, collectionsCache, initAssetCacheBlock.Number, nil
43+
}
44+
45+
// CheckForJobAndCollectionEvents checks for specific job and collections event that were emitted.
46+
func CheckForJobAndCollectionEvents(client *ethclient.Client, commitParams *types.CommitParams) error {
47+
collectionManagerContractABI, err := abi.JSON(strings.NewReader(bindings.CollectionManagerMetaData.ABI))
48+
if err != nil {
49+
log.Errorf("Error in parsing collection manager contract ABI: %v", err)
50+
return err
51+
}
52+
53+
eventNames := []string{core.JobUpdatedEvent, core.CollectionUpdatedEvent, core.CollectionActivityStatusEvent, core.JobCreatedEvent, core.CollectionCreatedEvent}
54+
55+
log.Debug("Checking for Job/Collection update events...")
56+
toBlock, err := clientUtils.GetLatestBlockWithRetry(client)
57+
if err != nil {
58+
log.Error("Error in getting latest block to start event listener: ", err)
59+
return err
60+
}
61+
62+
// Process events and update the fromBlock for the next iteration
63+
newFromBlock, err := processEvents(client, collectionManagerContractABI, commitParams.FromBlockToCheckForEvents, toBlock.Number, eventNames, commitParams.JobsCache, commitParams.CollectionsCache)
64+
if err != nil {
65+
return err
66+
}
67+
68+
// Update the commitParams with the new fromBlock
69+
commitParams.FromBlockToCheckForEvents = new(big.Int).Add(newFromBlock, big.NewInt(1))
70+
71+
return nil
72+
}
73+
74+
// processEvents fetches and processes logs for multiple event types.
75+
func processEvents(client *ethclient.Client, contractABI abi.ABI, fromBlock, toBlock *big.Int, eventNames []string, jobsCache *cache.JobsCache, collectionsCache *cache.CollectionsCache) (*big.Int, error) {
76+
logs, err := getEventLogs(client, fromBlock, toBlock)
77+
if err != nil {
78+
log.Errorf("Failed to fetch logs: %v", err)
79+
return nil, err
80+
}
81+
82+
for _, eventName := range eventNames {
83+
eventID := contractABI.Events[eventName].ID.Hex()
84+
for _, vLog := range logs {
85+
if len(vLog.Topics) > 0 && vLog.Topics[0].Hex() == eventID {
86+
switch eventName {
87+
case core.JobUpdatedEvent, core.JobCreatedEvent:
88+
jobId := utils.ConvertHashToUint16(vLog.Topics[1])
89+
updatedJob, err := utils.UtilsInterface.GetActiveJob(client, jobId)
90+
if err != nil {
91+
log.Errorf("Error in getting job with job Id %v: %v", jobId, err)
92+
continue
93+
}
94+
log.Debugf("RECEIVED JOB EVENT: Updating the job with Id %v with details %+v...", jobId, updatedJob)
95+
jobsCache.UpdateJob(jobId, updatedJob)
96+
case core.CollectionUpdatedEvent, core.CollectionCreatedEvent, core.CollectionActivityStatusEvent:
97+
collectionId := utils.ConvertHashToUint16(vLog.Topics[1])
98+
newCollection, err := utils.UtilsInterface.GetCollection(client, collectionId)
99+
if err != nil {
100+
log.Errorf("Error in getting collection with collection Id %v: %v", collectionId, err)
101+
continue
102+
}
103+
log.Debugf("RECEIVED COLLECTION EVENT: Updating the collection with ID %v with details %+v", collectionId, newCollection)
104+
collectionsCache.UpdateCollection(collectionId, newCollection)
105+
}
106+
}
107+
}
108+
}
109+
110+
// Return the new toBlock for the next iteration
111+
return toBlock, nil
112+
}
113+
114+
// getEventLogs is a utility function to fetch the event logs
115+
func getEventLogs(client *ethclient.Client, fromBlock *big.Int, toBlock *big.Int) ([]Types.Log, error) {
116+
log.Debugf("Checking for events from block %v to block %v...", fromBlock, toBlock)
117+
118+
// Set up the query for filtering logs
119+
query := ethereum.FilterQuery{
120+
FromBlock: fromBlock,
121+
ToBlock: toBlock,
122+
Addresses: []common.Address{
123+
common.HexToAddress(core.CollectionManagerAddress),
124+
},
125+
}
126+
127+
// Retrieve the logs
128+
logs, err := clientUtils.FilterLogsWithRetry(client, query)
129+
if err != nil {
130+
log.Errorf("Error in filter logs: %v", err)
131+
return []Types.Log{}, err
132+
}
133+
134+
return logs, nil
135+
}

0 commit comments

Comments
 (0)