Skip to content

Commit

Permalink
Scale observation building with massive amount of results (#317)
Browse files Browse the repository at this point in the history
* Option 4: change random seed every x round

* extract sorter and add unit tests
  • Loading branch information
amirylm authored Apr 8, 2024
1 parent 2c683c4 commit 13d6bb7
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 16 deletions.
70 changes: 55 additions & 15 deletions pkg/v3/plugin/hooks/add_from_staging.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
package hooks

import (
"bytes"
"fmt"
"log"
"sort"
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/types/automation"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/random"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/telemetry"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types"
)

type AddFromStagingHook struct {
store types.ResultStore
logger *log.Logger
coord types.Coordinator
sorter stagedResultSorter
}

func NewAddFromStagingHook(store types.ResultStore, coord types.Coordinator, logger *log.Logger) AddFromStagingHook {
return AddFromStagingHook{
store: store,
coord: coord,
logger: log.New(logger.Writer(), fmt.Sprintf("[%s | build hook:add-from-staging]", telemetry.ServiceName), telemetry.LogPkgStdFlags),
sorter: stagedResultSorter{
shuffledIDs: make(map[string]string),
},
}
}

type AddFromStagingHook struct {
store types.ResultStore
logger *log.Logger
coord types.Coordinator
}

// RunHook adds results from the store to the observation.
// It sorts by a shuffled workID. workID for all items is shuffled using a pseudorandom source
// that is the same across all nodes for a given round. This ensures that all nodes try to
Expand All @@ -39,20 +47,52 @@ func (hook *AddFromStagingHook) RunHook(obs *ocr2keepersv3.AutomationObservation
if err != nil {
return err
}
// creating a map to hold the shuffled workIDs
shuffledIDs := make(map[string]string, len(results))
for _, result := range results {
shuffledIDs[result.WorkID] = random.ShuffleString(result.WorkID, rSrc)

results = hook.sorter.orderResults(results, rSrc)
if n := len(results); n > limit {
results = results[:limit]
hook.logger.Printf("skipped %d available results in staging", n-limit)
}
hook.logger.Printf("adding %d results to observation", len(results))
obs.Performable = append(obs.Performable, results...)

return nil
}

type stagedResultSorter struct {
lastRandSrc [16]byte
shuffledIDs map[string]string
lock sync.Mutex
}

// orderResults orders the results by the shuffled workID
func (sorter *stagedResultSorter) orderResults(results []automation.CheckResult, rSrc [16]byte) []automation.CheckResult {
sorter.lock.Lock()
defer sorter.lock.Unlock()

shuffledIDs := sorter.updateShuffledIDs(results, rSrc)
// sort by the shuffled workID
sort.Slice(results, func(i, j int) bool {
return shuffledIDs[results[i].WorkID] < shuffledIDs[results[j].WorkID]
})
if len(results) > limit {
results = results[:limit]

return results
}

// updateShuffledIDs updates the shuffledIDs cache with the new random source or items.
// NOTE: This function is not thread-safe and should be called with a lock
func (sorter *stagedResultSorter) updateShuffledIDs(results []automation.CheckResult, rSrc [16]byte) map[string]string {
// once the random source changes, the workIDs needs to be shuffled again with the new source
if !bytes.Equal(sorter.lastRandSrc[:], rSrc[:]) {
sorter.lastRandSrc = rSrc
sorter.shuffledIDs = make(map[string]string)
}
hook.logger.Printf("adding %d results to observation", len(results))
obs.Performable = append(obs.Performable, results...)

return nil
for _, result := range results {
if _, ok := sorter.shuffledIDs[result.WorkID]; !ok {
sorter.shuffledIDs[result.WorkID] = random.ShuffleString(result.WorkID, rSrc)
}
}

return sorter.shuffledIDs
}
117 changes: 117 additions & 0 deletions pkg/v3/plugin/hooks/add_from_staging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/mock"

ocr2keepersv3 "github.com/smartcontractkit/chainlink-automation/pkg/v3"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/random"
"github.com/smartcontractkit/chainlink-automation/pkg/v3/types/mocks"
types "github.com/smartcontractkit/chainlink-common/pkg/types/automation"
)
Expand Down Expand Up @@ -235,6 +236,122 @@ func TestAddFromStagingHook_RunHook_Limits(t *testing.T) {
}
}

func TestAddFromStagingHook_stagedResultSorter(t *testing.T) {
tests := []struct {
name string
cached []types.CheckResult
lastRandSrc [16]byte
input []types.CheckResult
rSrc [16]byte
expected []types.CheckResult
expectedCache map[string]string
expectedLastRandSrc [16]byte
}{
{
name: "empty results",
cached: []types.CheckResult{},
input: []types.CheckResult{},
rSrc: [16]byte{1},
expected: []types.CheckResult{},
expectedLastRandSrc: [16]byte{1},
},
{
name: "happy path",
input: []types.CheckResult{
{UpkeepID: [32]byte{3}, WorkID: "30a"},
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
},
rSrc: [16]byte{1},
expected: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
expectedCache: map[string]string{
"10c": "1c0",
"20b": "2b0",
"30a": "3a0",
},
expectedLastRandSrc: [16]byte{1},
},
{
name: "with cached results",
cached: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
},
lastRandSrc: [16]byte{1},
input: []types.CheckResult{
{UpkeepID: [32]byte{3}, WorkID: "30a"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
},
rSrc: [16]byte{1},
expected: []types.CheckResult{
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
expectedCache: map[string]string{
"10c": "1c0",
"20b": "2b0",
"30a": "3a0",
},
expectedLastRandSrc: [16]byte{1},
},
{
name: "with cached results of different rand src",
cached: []types.CheckResult{
{UpkeepID: [32]byte{1}, WorkID: "10c"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
},
lastRandSrc: [16]byte{1},
input: []types.CheckResult{
{UpkeepID: [32]byte{3}, WorkID: "30a"},
{UpkeepID: [32]byte{2}, WorkID: "20b"},
},
rSrc: [16]byte{2},
expected: []types.CheckResult{
{UpkeepID: [32]byte{2}, WorkID: "20b"},
{UpkeepID: [32]byte{3}, WorkID: "30a"},
},
expectedCache: map[string]string{
"20b": "02b",
"30a": "03a",
},
expectedLastRandSrc: [16]byte{2},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
sorter := stagedResultSorter{
shuffledIDs: make(map[string]string),
}

if len(tc.cached) > 0 {
sorter.shuffledIDs = make(map[string]string)
for _, r := range tc.cached {
sorter.shuffledIDs[r.WorkID] = random.ShuffleString(r.WorkID, tc.lastRandSrc)
}
sorter.lastRandSrc = tc.lastRandSrc
}

results := sorter.orderResults(tc.input, tc.rSrc)
assert.Equal(t, len(tc.expected), len(results))
for i := range results {
assert.Equal(t, tc.expected[i].WorkID, results[i].WorkID)
}
sorter.lock.Lock()
defer sorter.lock.Unlock()
assert.Equal(t, tc.expectedLastRandSrc, sorter.lastRandSrc)
assert.Equal(t, len(tc.expectedCache), len(sorter.shuffledIDs))
for k, v := range tc.expectedCache {
assert.Equal(t, v, sorter.shuffledIDs[k])
}
})
}
}

func getMocks(n int) (*mocks.MockResultStore, *mocks.MockCoordinator) {
mockResults := make([]types.CheckResult, n)
for i := 0; i < n; i++ {
Expand Down
6 changes: 5 additions & 1 deletion pkg/v3/plugin/ocr3.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ func (plugin *ocr3Plugin) Observation(ctx context.Context, outctx ocr3types.Outc

plugin.AddBlockHistoryHook.RunHook(&observation, ocr2keepersv3.ObservationBlockHistoryLimit)

if err := plugin.AddFromStagingHook.RunHook(&observation, ocr2keepersv3.ObservationPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, outctx.SeqNr)); err != nil {
// using the OCR seq number for randomness of the performables ordering.
// high randomness results in expesive ordering, therefore we reduce
// the range of the randomness by dividing the seq number by 10
randSrcSeq := outctx.SeqNr / 10
if err := plugin.AddFromStagingHook.RunHook(&observation, ocr2keepersv3.ObservationPerformablesLimit, getRandomKeySource(plugin.ConfigDigest, randSrcSeq)); err != nil {
return nil, err
}
prommetrics.AutomationPluginPerformables.WithLabelValues(prommetrics.PluginStepResultStore).Set(float64(len(observation.Performable)))
Expand Down

0 comments on commit 13d6bb7

Please sign in to comment.