Skip to content

Commit 8a37546

Browse files
committed
Make configurable
1 parent 0156a6b commit 8a37546

File tree

4 files changed

+9
-15
lines changed

4 files changed

+9
-15
lines changed

cmd/livepeer/starter/flags.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package starter
22

33
import (
44
"flag"
5+
"time"
56
)
67

78
func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
@@ -76,6 +77,7 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
7677
cfg.LivePaymentInterval = fs.Duration("livePaymentInterval", *cfg.LivePaymentInterval, "Interval to pay process Gateway <> Orchestrator Payments for Live AI Video")
7778
cfg.LiveOutSegmentTimeout = fs.Duration("liveOutSegmentTimeout", *cfg.LiveOutSegmentTimeout, "Timeout duration to wait the output segment to be available in the Live AI pipeline; defaults to no timeout")
7879
cfg.LiveAISaveNSegments = fs.Int("liveAISaveNSegments", 10, "Set how many segments to save to disk for debugging (both input and output)")
80+
cfg.LiveAICapReportInterval = fs.Duration("liveAICapReportInterval", 25*time.Minute, "Interval to report Live AI container capacity metrics")
7981

8082
// Onchain:
8183
cfg.EthAcctAddr = fs.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address. For use when multiple ETH accounts exist in the keystore directory")

cmd/livepeer/starter/starter.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ type LivepeerConfig struct {
184184
LiveAIHeartbeatInterval *time.Duration
185185
LivePaymentInterval *time.Duration
186186
LiveOutSegmentTimeout *time.Duration
187-
LiveAICapRefreshModels *string
187+
LiveAICapReportInterval *time.Duration
188188
LiveAISaveNSegments *int
189189
}
190190

@@ -1591,7 +1591,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
15911591
if *cfg.Network != "offchain" {
15921592
ctx, cancel := context.WithCancel(ctx)
15931593
defer cancel()
1594-
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout)
1594+
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout, cfg.LiveAICapReportInterval)
15951595
if err != nil {
15961596
exit("Could not create orchestrator pool with DB cache: %v", err)
15971597
}

discovery/db_discovery.go

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,8 @@ import (
2222
"github.com/golang/glog"
2323
)
2424

25-
var cacheRefreshInterval = 10 * time.Second
2625
var networkCapabilitiesReportingInterval = 25 * time.Minute
2726

28-
var getTicker = func() *time.Ticker {
29-
return time.NewTicker(cacheRefreshInterval)
30-
}
31-
3227
type ticketParamsValidator interface {
3328
ValidateTicketParams(ticketParams *pm.TicketParams) error
3429
}
@@ -45,7 +40,7 @@ type DBOrchestratorPoolCache struct {
4540
lastNetworkCapabilitiesReported time.Time
4641
}
4742

48-
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration) (*DBOrchestratorPoolCache, error) {
43+
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration, liveAICapReportInterval time.Duration) (*DBOrchestratorPoolCache, error) {
4944
if node.Eth == nil {
5045
return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
5146
}
@@ -70,7 +65,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
7065
return err
7166
}
7267

73-
if err := dbo.pollOrchestratorInfo(ctx); err != nil {
68+
if err := dbo.pollOrchestratorInfo(ctx, liveAICapReportInterval); err != nil {
7469
return err
7570
}
7671
return nil
@@ -256,13 +251,13 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error {
256251
return nil
257252
}
258253

259-
func (dbo *DBOrchestratorPoolCache) pollOrchestratorInfo(ctx context.Context) error {
254+
func (dbo *DBOrchestratorPoolCache) pollOrchestratorInfo(ctx context.Context, liveAICapReportInterval time.Duration) error {
260255
if err := dbo.cacheOrchInfos(); err != nil {
261256
glog.Errorf("unable to poll orchestrator info: %v", err)
262257
return err
263258
}
264259

265-
ticker := getTicker()
260+
ticker := time.NewTicker(liveAICapReportInterval)
266261
go func() {
267262
for {
268263
select {
@@ -432,9 +427,6 @@ func reportAICapacityFromNetworkCapabilities(orchNetworkCapabilities []*common.O
432427
InUse: int(model.CapacityInUse),
433428
}
434429
modelCapacities[modelID].Orchestrators[orchCap.OrchURI] = capacity
435-
436-
glog.Infof("HELLOO AI container %s %s inUse:%d idle:%d",
437-
modelID, orchCap.OrchURI, capacity.InUse, capacity.Idle)
438430
}
439431
}
440432

discovery/discovery.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
246246
for _, i := range rand.Perm(numAvailableOrchs) {
247247
go getOrchInfo(ctx, common.OrchestratorDescriptor{linfos[i], nil}, 0, odCh, errCh, allOrchDescrCh)
248248
}
249-
// TODO revert the changes from https://github.com/livepeer/go-livepeer/commit/905c3144707f1b682667828f350006c5a2e0a3a8#diff-f032101b11930379f2dd15d9a82631fa0a38d87f16760d7bc5423d25c56d3b4f
249+
// TODO revert the changes from https://github.com/livepeer/go-livepeer/commit/905c3144707f1b682667828f350006c5a2e0a3a8#diff-f032101b11930379f2dd15d9a82631fa0a38d87f16760d7bc5423d25c56d3b4f ?
250250
// go reportLiveAICapacity(allOrchDescrCh, caps)
251251

252252
// use a timer to time out the entire get info loop below

0 commit comments

Comments
 (0)