Skip to content

Commit

Permalink
refactor: added httpClient instance in commitParams struct
Browse files Browse the repository at this point in the history
  • Loading branch information
Yashk767 committed Apr 30, 2024
1 parent 255f8c9 commit 5dda2e9
Show file tree
Hide file tree
Showing 9 changed files with 443 additions and 431 deletions.
8 changes: 4 additions & 4 deletions cmd/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
var wg sync.WaitGroup

log.Debug("Creating a local cache which will store API result and expire at the end of commit state")
localCache := cache.NewLocalCache(time.Second * time.Duration(core.StateLength))
commitParams.LocalCache = cache.NewLocalCache(time.Second * time.Duration(core.StateLength))

log.Debug("Iterating over all the collections...")
for i := 0; i < int(numActiveCollections); i++ {
Expand All @@ -97,7 +97,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
errChan <- err
return
}
collectionData, err := razorUtils.GetAggregatedDataOfCollection(client, collectionId, epoch, localCache, commitParams)
collectionData, err := razorUtils.GetAggregatedDataOfCollection(client, collectionId, epoch, commitParams)
if err != nil {
log.Error("Error in getting aggregated data of collection: ", err)
errChan <- err
Expand Down Expand Up @@ -129,7 +129,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
if err != nil {
// Returning the first error from the error channel
log.Error("Error in getting collection data: ", err)
localCache.StopCleanup()
commitParams.LocalCache.StopCleanup()
return types.CommitData{}, err
}
}
Expand All @@ -139,7 +139,7 @@ func (*UtilsStruct) HandleCommitState(client *ethclient.Client, epoch uint32, se
log.Debug("HandleCommitState: SeqAllottedCollections: ", seqAllottedCollections)
log.Debug("HandleCommitState: Leaves: ", leavesOfTree)

localCache.StopCleanup()
commitParams.LocalCache.StopCleanup()

return types.CommitData{
AssignedCollections: assignedCollections,
Expand Down
6 changes: 6 additions & 0 deletions core/types/http.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
package types

import "net/http"

type HttpClientConfig struct {
Timeout int64
MaxIdleConnections int
MaxIdleConnectionsPerHost int
}

type HttpClientInterface interface {
Do(request *http.Request) (*http.Response, error)
}
4 changes: 2 additions & 2 deletions core/types/vote.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package types
import (
"math/big"
"razor/cache"
"razor/client"
)

type ElectedProposer struct {
Expand Down Expand Up @@ -70,5 +69,6 @@ type ProposeFileData struct {
type CommitParams struct {
JobsCache *cache.JobsCache
CollectionsCache *cache.CollectionsCache
HttpClient *client.HttpClient
LocalCache *cache.LocalCache
HttpClient HttpClientInterface
}
14 changes: 6 additions & 8 deletions utils/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"fmt"
"io"
"net/http"
"razor/cache"
"razor/client"
"razor/core"
"regexp"
"time"
Expand All @@ -20,30 +18,30 @@ import (
"github.com/gocolly/colly"
)

func GetDataFromAPI(httpClient *client.HttpClient, dataSourceURLStruct types.DataSourceURL, localCache *cache.LocalCache) ([]byte, error) {
func GetDataFromAPI(commitParams types.CommitParams, dataSourceURLStruct types.DataSourceURL) ([]byte, error) {
cacheKey, err := generateCacheKey(dataSourceURLStruct.URL, dataSourceURLStruct.Body)
if err != nil {
log.Errorf("Error in generating cache key for API %s: %v", dataSourceURLStruct.URL, err)
return nil, err
}

cachedData, found := localCache.Read(cacheKey)
cachedData, found := commitParams.LocalCache.Read(cacheKey)
if found {
log.Debugf("Getting Data for URL %s from local cache...", dataSourceURLStruct.URL)
return cachedData, nil
}

response, err := makeAPIRequest(httpClient, dataSourceURLStruct)
response, err := makeAPIRequest(commitParams.HttpClient, dataSourceURLStruct)
if err != nil {
return nil, err
}

// Storing the data into cache
localCache.Update(response, cacheKey, time.Now().Add(time.Second*time.Duration(core.StateLength)).Unix())
commitParams.LocalCache.Update(response, cacheKey, time.Now().Add(time.Second*time.Duration(core.StateLength)).Unix())
return response, nil
}

func makeAPIRequest(httpClient *client.HttpClient, dataSourceURLStruct types.DataSourceURL) ([]byte, error) {
func makeAPIRequest(httpClient types.HttpClientInterface, dataSourceURLStruct types.DataSourceURL) ([]byte, error) {
var requestBody io.Reader // Using the broader io.Reader interface here

switch dataSourceURLStruct.Type {
Expand Down Expand Up @@ -121,7 +119,7 @@ func addHeaderToRequest(request *http.Request, headerMap map[string]string) *htt
return request
}

func ProcessRequest(httpClient *client.HttpClient, dataSourceURLStruct types.DataSourceURL, requestBody io.Reader) ([]byte, error) {
func ProcessRequest(httpClient types.HttpClientInterface, dataSourceURLStruct types.DataSourceURL, requestBody io.Reader) ([]byte, error) {
request, err := http.NewRequest(dataSourceURLStruct.Type, dataSourceURLStruct.URL, requestBody)
if err != nil {
return nil, err
Expand Down
6 changes: 5 additions & 1 deletion utils/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,11 @@ func TestGetDataFromAPI(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
localCache := cache.NewLocalCache(time.Second * 10)
got, err := GetDataFromAPI(httpClient, tt.args.urlStruct, localCache)
commitParams := types.CommitParams{
LocalCache: localCache,
HttpClient: httpClient,
}
got, err := GetDataFromAPI(commitParams, tt.args.urlStruct)
if (err != nil) != tt.wantErr {
t.Errorf("GetDataFromAPI() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
21 changes: 10 additions & 11 deletions utils/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"math/big"
"os"
"razor/cache"
"razor/client"
"razor/core"
"razor/core/types"
"razor/path"
Expand Down Expand Up @@ -142,21 +141,21 @@ func (*UtilsStruct) GetActiveCollectionIds(client *ethclient.Client) ([]uint16,
return activeCollectionIds, nil
}

func (*UtilsStruct) GetAggregatedDataOfCollection(client *ethclient.Client, collectionId uint16, epoch uint32, localCache *cache.LocalCache, commitParams types.CommitParams) (*big.Int, error) {
func (*UtilsStruct) GetAggregatedDataOfCollection(client *ethclient.Client, collectionId uint16, epoch uint32, commitParams types.CommitParams) (*big.Int, error) {
activeCollection, err := UtilsInterface.GetActiveCollection(commitParams.CollectionsCache, collectionId)
if err != nil {
log.Error(err)
return nil, err
}
//Supply previous epoch to Aggregate in case if last reported value is required.
collectionData, aggregationError := UtilsInterface.Aggregate(client, epoch-1, activeCollection, localCache, commitParams)
collectionData, aggregationError := UtilsInterface.Aggregate(client, epoch-1, activeCollection, commitParams)
if aggregationError != nil {
return nil, aggregationError
}
return collectionData, nil
}

func (*UtilsStruct) Aggregate(client *ethclient.Client, previousEpoch uint32, collection bindings.StructsCollection, localCache *cache.LocalCache, commitParams types.CommitParams) (*big.Int, error) {
func (*UtilsStruct) Aggregate(client *ethclient.Client, previousEpoch uint32, collection bindings.StructsCollection, commitParams types.CommitParams) (*big.Int, error) {
var jobs []bindings.StructsJob
var overriddenJobIds []uint16

Expand Down Expand Up @@ -212,7 +211,7 @@ func (*UtilsStruct) Aggregate(client *ethclient.Client, previousEpoch uint32, co
if len(jobs) == 0 {
return nil, errors.New("no jobs present in the collection")
}
dataToCommit, weight := UtilsInterface.GetDataToCommitFromJobs(jobs, localCache, commitParams.HttpClient)
dataToCommit, weight := UtilsInterface.GetDataToCommitFromJobs(jobs, commitParams)
if len(dataToCommit) == 0 {
prevCommitmentData, err := UtilsInterface.FetchPreviousValue(client, previousEpoch, collection.Id)
if err != nil {
Expand Down Expand Up @@ -254,7 +253,7 @@ func (*UtilsStruct) GetActiveCollection(collectionsCache *cache.CollectionsCache
return collection, nil
}

func (*UtilsStruct) GetDataToCommitFromJobs(jobs []bindings.StructsJob, localCache *cache.LocalCache, httpClient *client.HttpClient) ([]*big.Int, []uint8) {
func (*UtilsStruct) GetDataToCommitFromJobs(jobs []bindings.StructsJob, commitParams types.CommitParams) ([]*big.Int, []uint8) {
var (
wg sync.WaitGroup
mu sync.Mutex
Expand All @@ -264,18 +263,18 @@ func (*UtilsStruct) GetDataToCommitFromJobs(jobs []bindings.StructsJob, localCac

for _, job := range jobs {
wg.Add(1)
go processJobConcurrently(&wg, &mu, &data, &weight, job, localCache, httpClient)
go processJobConcurrently(&wg, &mu, &data, &weight, job, commitParams)
}

wg.Wait()

return data, weight
}

func processJobConcurrently(wg *sync.WaitGroup, mu *sync.Mutex, data *[]*big.Int, weight *[]uint8, job bindings.StructsJob, localCache *cache.LocalCache, httpClient *client.HttpClient) {
func processJobConcurrently(wg *sync.WaitGroup, mu *sync.Mutex, data *[]*big.Int, weight *[]uint8, job bindings.StructsJob, commitParams types.CommitParams) {
defer wg.Done()

dataToAppend, err := UtilsInterface.GetDataToCommitFromJob(job, localCache, httpClient)
dataToAppend, err := UtilsInterface.GetDataToCommitFromJob(job, commitParams)
if err != nil {
return
}
Expand All @@ -287,7 +286,7 @@ func processJobConcurrently(wg *sync.WaitGroup, mu *sync.Mutex, data *[]*big.Int
*weight = append(*weight, job.Weight)
}

func (*UtilsStruct) GetDataToCommitFromJob(job bindings.StructsJob, localCache *cache.LocalCache, httpClient *client.HttpClient) (*big.Int, error) {
func (*UtilsStruct) GetDataToCommitFromJob(job bindings.StructsJob, commitParams types.CommitParams) (*big.Int, error) {
var parsedJSON map[string]interface{}
var (
response []byte
Expand Down Expand Up @@ -323,7 +322,7 @@ func (*UtilsStruct) GetDataToCommitFromJob(job bindings.StructsJob, localCache *
var parsedData interface{}
if job.SelectorType == 0 {
start := time.Now()
response, apiErr = GetDataFromAPI(httpClient, dataSourceURLStruct, localCache)
response, apiErr = GetDataFromAPI(commitParams, dataSourceURLStruct)
if apiErr != nil {
log.Errorf("Job ID: %d, Error in fetching data from API %s: %v", job.Id, job.Url, apiErr)
return nil, apiErr
Expand Down
20 changes: 14 additions & 6 deletions utils/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func TestAggregate(t *testing.T) {
ioMock.On("ReadAll", mock.Anything).Return(tt.args.fileData, tt.args.fileDataErr)
utilsMock.On("HandleOfficialJobsFromJSONFile", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.args.overrrideJobs, tt.args.overrideJobIds)

got, err := utils.Aggregate(client, previousEpoch, tt.args.collection, &cache.LocalCache{}, types.CommitParams{HttpClient: &clientPkg.HttpClient{}})
got, err := utils.Aggregate(client, previousEpoch, tt.args.collection, commitParams)

if (err != nil) != tt.wantErr {
t.Errorf("Aggregate() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down Expand Up @@ -640,9 +641,12 @@ func TestGetDataToCommitFromJobs(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
UtilsInterface = &UtilsStruct{}
lc := cache.NewLocalCache(time.Second * 10)
commitParams := types.CommitParams{
LocalCache: cache.NewLocalCache(time.Second * 10),
HttpClient: httpClient,
}

gotDataArray, gotWeightArray := UtilsInterface.GetDataToCommitFromJobs(tt.args.jobs, lc, httpClient)
gotDataArray, gotWeightArray := UtilsInterface.GetDataToCommitFromJobs(tt.args.jobs, commitParams)
if len(gotDataArray) != tt.wantArrayLength || len(gotWeightArray) != tt.wantArrayLength {
t.Errorf("GetDataToCommitFromJobs() got = %v, want %v", gotDataArray, tt.wantArrayLength)
}
Expand Down Expand Up @@ -754,8 +758,12 @@ func TestGetDataToCommitFromJob(t *testing.T) {
}
utils := StartRazor(optionsPackageStruct)

lc := cache.NewLocalCache(time.Second * 10)
data, err := utils.GetDataToCommitFromJob(tt.args.job, lc, httpClient)
commitParams := types.CommitParams{
LocalCache: cache.NewLocalCache(time.Second * 10),
HttpClient: httpClient,
}

data, err := utils.GetDataToCommitFromJob(tt.args.job, commitParams)
fmt.Println("JOB returns data: ", data)
if (err != nil) != tt.wantErr {
t.Errorf("GetDataToCommitFromJob() error = %v, wantErr %v", err, tt.wantErr)
Expand Down Expand Up @@ -1294,7 +1302,7 @@ func TestGetAggregatedDataOfCollection(t *testing.T) {
utilsMock.On("GetActiveCollection", mock.Anything, mock.Anything).Return(tt.args.activeCollection, tt.args.activeCollectionErr)
utilsMock.On("Aggregate", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(tt.args.collectionData, tt.args.aggregationErr)

got, err := utils.GetAggregatedDataOfCollection(client, collectionId, epoch, &cache.LocalCache{}, types.CommitParams{HttpClient: &clientPkg.HttpClient{}})
got, err := utils.GetAggregatedDataOfCollection(client, collectionId, epoch, types.CommitParams{HttpClient: &clientPkg.HttpClient{}})
if (err != nil) != tt.wantErr {
t.Errorf("GetAggregatedDataOfCollection() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down
9 changes: 4 additions & 5 deletions utils/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"math/big"
"os"
"razor/cache"
"razor/client"
"razor/core/types"
"razor/pkg/bindings"
"time"
Expand Down Expand Up @@ -111,15 +110,15 @@ type Utils interface {
GetActiveJob(client *ethclient.Client, jobId uint16) (bindings.StructsJob, error)
GetCollection(client *ethclient.Client, collectionId uint16) (bindings.StructsCollection, error)
GetActiveCollection(collectionsCache *cache.CollectionsCache, collectionId uint16) (bindings.StructsCollection, error)
Aggregate(client *ethclient.Client, previousEpoch uint32, collection bindings.StructsCollection, localCache *cache.LocalCache, commitParams types.CommitParams) (*big.Int, error)
GetDataToCommitFromJobs(jobs []bindings.StructsJob, localCache *cache.LocalCache, httpClient *client.HttpClient) ([]*big.Int, []uint8)
GetDataToCommitFromJob(job bindings.StructsJob, localCache *cache.LocalCache, httpClient *client.HttpClient) (*big.Int, error)
Aggregate(client *ethclient.Client, previousEpoch uint32, collection bindings.StructsCollection, commitParams types.CommitParams) (*big.Int, error)
GetDataToCommitFromJobs(jobs []bindings.StructsJob, commitParams types.CommitParams) ([]*big.Int, []uint8)
GetDataToCommitFromJob(job bindings.StructsJob, commitParams types.CommitParams) (*big.Int, error)
GetAssignedCollections(client *ethclient.Client, numActiveCollections uint16, seed []byte) (map[int]bool, []*big.Int, error)
GetLeafIdOfACollection(client *ethclient.Client, collectionId uint16) (uint16, error)
GetCollectionIdFromIndex(client *ethclient.Client, medianIndex uint16) (uint16, error)
GetCollectionIdFromLeafId(client *ethclient.Client, leafId uint16) (uint16, error)
GetNumActiveCollections(client *ethclient.Client) (uint16, error)
GetAggregatedDataOfCollection(client *ethclient.Client, collectionId uint16, epoch uint32, localCache *cache.LocalCache, commitParams types.CommitParams) (*big.Int, error)
GetAggregatedDataOfCollection(client *ethclient.Client, collectionId uint16, epoch uint32, commitParams types.CommitParams) (*big.Int, error)
GetJobs(client *ethclient.Client) ([]bindings.StructsJob, error)
GetAllCollections(client *ethclient.Client) ([]bindings.StructsCollection, error)
GetActiveCollectionIds(client *ethclient.Client) ([]uint16, error)
Expand Down
Loading

0 comments on commit 5dda2e9

Please sign in to comment.