Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/livepeer/starter/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewLivepeerConfig(fs *flag.FlagSet) LivepeerConfig {
cfg.LiveAIAuthWebhookURL = fs.String("liveAIAuthWebhookUrl", "", "Live AI RTMP authentication webhook URL")
cfg.LivePaymentInterval = fs.Duration("livePaymentInterval", *cfg.LivePaymentInterval, "Interval to pay process Gateway <> Orchestrator Payments for Live AI Video")
cfg.LiveOutSegmentTimeout = fs.Duration("liveOutSegmentTimeout", *cfg.LiveOutSegmentTimeout, "Timeout duration to wait the output segment to be available in the Live AI pipeline; defaults to no timeout")
cfg.LiveAICapRefreshModels = fs.String("liveAICapRefreshModels", "", "Comma separated list of models to periodically fetch capacity for. Leave unset to switch off periodic refresh.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note that we need to be careful with deploying this change because if we have this flag configured in infra the gateway will fail to start because the flag does not exist anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's a pain, i guess maybe i could leave it in but not use it, then come along after the prod deploy and remove it

cfg.LiveAISaveNSegments = fs.Int("liveAISaveNSegments", 10, "Set how many segments to save to disk for debugging (both input and output)")
cfg.LiveAICapReportInterval = fs.Duration("liveAICapReportInterval", *cfg.LiveAICapReportInterval, "Interval to report Live AI container capacity metrics")

// Onchain:
cfg.EthAcctAddr = fs.String("ethAcctAddr", *cfg.EthAcctAddr, "Existing Eth account address. For use when multiple ETH accounts exist in the keystore directory")
Expand Down
9 changes: 4 additions & 5 deletions cmd/livepeer/starter/starter.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ type LivepeerConfig struct {
LiveAIHeartbeatInterval *time.Duration
LivePaymentInterval *time.Duration
LiveOutSegmentTimeout *time.Duration
LiveAICapRefreshModels *string
LiveAICapReportInterval *time.Duration
LiveAISaveNSegments *int
}

Expand Down Expand Up @@ -241,6 +241,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
defaultLiveOutSegmentTimeout := 0 * time.Second
defaultGatewayHost := ""
defaultLiveAIHeartbeatInterval := 5 * time.Second
defaultLiveAICapReportInterval := 25 * time.Minute

// Onchain:
defaultEthAcctAddr := ""
Expand Down Expand Up @@ -359,6 +360,7 @@ func DefaultLivepeerConfig() LivepeerConfig {
LiveOutSegmentTimeout: &defaultLiveOutSegmentTimeout,
GatewayHost: &defaultGatewayHost,
LiveAIHeartbeatInterval: &defaultLiveAIHeartbeatInterval,
LiveAICapReportInterval: &defaultLiveAICapReportInterval,

// Onchain:
EthAcctAddr: &defaultEthAcctAddr,
Expand Down Expand Up @@ -1591,7 +1593,7 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if *cfg.Network != "offchain" {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout)
dbOrchPoolCache, err := discovery.NewDBOrchestratorPoolCache(ctx, n, timeWatcher, orchBlacklist, *cfg.DiscoveryTimeout, *cfg.LiveAICapReportInterval)
if err != nil {
exit("Could not create orchestrator pool with DB cache: %v", err)
}
Expand Down Expand Up @@ -1756,9 +1758,6 @@ func StartLivepeer(ctx context.Context, cfg LivepeerConfig) {
if cfg.LiveAITrickleHostForRunner != nil {
n.LiveAITrickleHostForRunner = *cfg.LiveAITrickleHostForRunner
}
if cfg.LiveAICapRefreshModels != nil && *cfg.LiveAICapRefreshModels != "" {
n.LiveAICapRefreshModels = strings.Split(*cfg.LiveAICapRefreshModels, ",")
}
n.LiveAISaveNSegments = cfg.LiveAISaveNSegments

//Create Livepeer Node
Expand Down
1 change: 0 additions & 1 deletion core/livepeernode.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ type LivepeerNode struct {
LiveAIHeartbeatInterval time.Duration
LivePaymentInterval time.Duration
LiveOutSegmentTimeout time.Duration
LiveAICapRefreshModels []string
LiveAISaveNSegments *int

// Gateway
Expand Down
82 changes: 64 additions & 18 deletions discovery/db_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,34 +14,33 @@ import (
"github.com/livepeer/go-livepeer/core"
"github.com/livepeer/go-livepeer/eth"
lpTypes "github.com/livepeer/go-livepeer/eth/types"
"github.com/livepeer/go-livepeer/monitor"
"github.com/livepeer/go-livepeer/net"
"github.com/livepeer/go-livepeer/pm"
"github.com/livepeer/go-livepeer/server"

"github.com/golang/glog"
)

var cacheRefreshInterval = 25 * time.Minute
var getTicker = func() *time.Ticker {
return time.NewTicker(cacheRefreshInterval)
}
var networkCapabilitiesReportingInterval = 25 * time.Minute

type ticketParamsValidator interface {
ValidateTicketParams(ticketParams *pm.TicketParams) error
}

type DBOrchestratorPoolCache struct {
store common.OrchestratorStore
lpEth eth.LivepeerEthClient
ticketParamsValidator ticketParamsValidator
rm common.RoundsManager
bcast common.Broadcaster
orchBlacklist []string
discoveryTimeout time.Duration
node *core.LivepeerNode
store common.OrchestratorStore
lpEth eth.LivepeerEthClient
ticketParamsValidator ticketParamsValidator
rm common.RoundsManager
bcast common.Broadcaster
orchBlacklist []string
discoveryTimeout time.Duration
node *core.LivepeerNode
lastNetworkCapabilitiesReported time.Time
}

func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration) (*DBOrchestratorPoolCache, error) {
func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm common.RoundsManager, orchBlacklist []string, discoveryTimeout time.Duration, liveAICapReportInterval time.Duration) (*DBOrchestratorPoolCache, error) {
if node.Eth == nil {
return nil, fmt.Errorf("could not create DBOrchestratorPoolCache: LivepeerEthClient is nil")
}
Expand All @@ -66,7 +65,7 @@ func NewDBOrchestratorPoolCache(ctx context.Context, node *core.LivepeerNode, rm
return err
}

if err := dbo.pollOrchestratorInfo(ctx); err != nil {
if err := dbo.pollOrchestratorInfo(ctx, liveAICapReportInterval); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -252,13 +251,13 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchestratorStake() error {
return nil
}

func (dbo *DBOrchestratorPoolCache) pollOrchestratorInfo(ctx context.Context) error {
func (dbo *DBOrchestratorPoolCache) pollOrchestratorInfo(ctx context.Context, liveAICapReportInterval time.Duration) error {
if err := dbo.cacheOrchInfos(); err != nil {
glog.Errorf("unable to poll orchestrator info: %v", err)
return err
}

ticker := getTicker()
ticker := time.NewTicker(liveAICapReportInterval)
go func() {
for {
select {
Expand Down Expand Up @@ -393,12 +392,59 @@ func (dbo *DBOrchestratorPoolCache) cacheOrchInfos() error {
i = numOrchs //exit loop
}
}
//save network capabilities in LivepeerNode
dbo.node.UpdateNetworkCapabilities(orchNetworkCapabilities)

// Only update network capabilities every 25 minutes
if time.Since(dbo.lastNetworkCapabilitiesReported) >= networkCapabilitiesReportingInterval {
// Save network capabilities in LivepeerNode
dbo.node.UpdateNetworkCapabilities(orchNetworkCapabilities)

dbo.lastNetworkCapabilitiesReported = time.Now()
}
Comment on lines +396 to +402
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we always send network capabilities?

Copy link
Contributor Author

@mjh1 mjh1 Oct 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to leave the existing behaviour alone, so only sending to kafka every 25 mins, wdyt @ad-astra-video ? Going from 25 mins to 10 seconds seems wrong :)


// Report AI container capacity metrics
reportAICapacityFromNetworkCapabilities(orchNetworkCapabilities)

return nil
}

func reportAICapacityFromNetworkCapabilities(orchNetworkCapabilities []*common.OrchNetworkCapabilities) {
// Build structured capacity data
modelCapacities := make(map[string]*monitor.ModelAICapacities)

for _, orchCap := range orchNetworkCapabilities {
models := getModelCapsFromNetCapabilities(orchCap.Capabilities)

for modelID, model := range models {
if _, exists := modelCapacities[modelID]; !exists {
modelCapacities[modelID] = &monitor.ModelAICapacities{
ModelID: modelID,
Orchestrators: make(map[string]monitor.AIContainerCapacity),
}
}

capacity := monitor.AIContainerCapacity{
Idle: int(model.Capacity),
InUse: int(model.CapacityInUse),
}
modelCapacities[modelID].Orchestrators[orchCap.OrchURI] = capacity
}
}

monitor.ReportAIContainerCapacity(modelCapacities)
}

func getModelCapsFromNetCapabilities(caps *net.Capabilities) map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint {
if caps == nil || caps.Constraints == nil || caps.Constraints.PerCapability == nil {
return nil
}
liveAI, ok := caps.Constraints.PerCapability[uint32(core.Capability_LiveVideoToVideo)]
if !ok {
return nil
}

return liveAI.Models
}

func (dbo *DBOrchestratorPoolCache) Broadcaster() common.Broadcaster {
return dbo.bcast
}
Expand Down
57 changes: 0 additions & 57 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
for _, i := range rand.Perm(numAvailableOrchs) {
go getOrchInfo(ctx, common.OrchestratorDescriptor{linfos[i], nil}, 0, odCh, errCh, allOrchDescrCh)
}
go reportLiveAICapacity(allOrchDescrCh, caps)

// use a timer to time out the entire get info loop below
cutoffTimer := time.NewTimer(maxGetOrchestratorCutoffTimeout)
Expand Down Expand Up @@ -326,62 +325,6 @@ func (o *orchestratorPool) GetOrchestrators(ctx context.Context, numOrchestrator
return ods, nil
}

func getModelCaps(caps *net.Capabilities) map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint {
if caps == nil || caps.Constraints == nil || caps.Constraints.PerCapability == nil {
return nil
}
liveAI, ok := caps.Constraints.PerCapability[uint32(core.Capability_LiveVideoToVideo)]
if !ok {
return nil
}

return liveAI.Models
}

func reportLiveAICapacity(ch chan common.OrchestratorDescriptor, caps common.CapabilityComparator) {
if !monitor.Enabled {
return
}
modelsReq := getModelCaps(caps.ToNetCapabilities())

var allOrchInfo []common.OrchestratorDescriptor
var done bool
for {
select {
case od := <-ch:
allOrchInfo = append(allOrchInfo, od)
case <-time.After(maxGetOrchestratorCutoffTimeout):
done = true
}
if done {
break
}
}

idleContainersByModelAndOrchestrator := make(map[string]map[string]int)
for _, od := range allOrchInfo {
var models map[string]*net.Capabilities_CapabilityConstraints_ModelConstraint
if od.RemoteInfo != nil {
models = getModelCaps(od.RemoteInfo.Capabilities)
}

for modelID := range modelsReq {
idle := 0
if models != nil {
if model, ok := models[modelID]; ok {
idle = int(model.Capacity)
}
}

if _, exists := idleContainersByModelAndOrchestrator[modelID]; !exists {
idleContainersByModelAndOrchestrator[modelID] = make(map[string]int)
}
idleContainersByModelAndOrchestrator[modelID][od.LocalInfo.URL.String()] = idle
}
}
monitor.AIContainersIdleAfterGatewayDiscovery(idleContainersByModelAndOrchestrator)
}

func (o *orchestratorPool) Size() int {
return len(o.infos)
}
Expand Down
Loading
Loading