Skip to content

Commit

Permalink
extra pulse logging (#833)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored Dec 24, 2024
1 parent 9ccd7c9 commit e85379c
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 0 deletions.
127 changes: 127 additions & 0 deletions pkg/pipeline/source/pactl/pactl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package pactl

import (
"bytes"
"encoding/json"
"os/exec"
)

func List() (*PactlInfo, error) {
cmd := exec.Command("pactl", "--format", "json", "list")
var b bytes.Buffer
cmd.Stdout = &b
if err := cmd.Run(); err != nil {
return nil, err
}

info := &PactlInfo{}
return info, json.Unmarshal(b.Bytes(), info)
}

type PactlInfo struct {
Modules []Module `json:"modules"`
Sinks []Device `json:"sinks"`
Sources []Device `json:"sources"`
SinkInputs []SinkInput `json:"sink_inputs"`
SourceOutputs []SourceOutput `json:"source_outputs"`
Clients []Client `json:"clients"`
Samples []interface{} `json:"samples"`
Cards []interface{} `json:"cards"`
}

type Module struct {
Name string `json:"name"`
Argument string `json:"argument"`
UsageCounter string `json:"usage_counter"`
Properties map[string]interface{} `json:"properties"`
}

type Device struct {
Index int `json:"index"`
State string `json:"state"`
Name string `json:"name"`
Description string `json:"description"`
Driver string `json:"driver"`
SampleSpecification string `json:"sample_specification"`
ChannelMap string `json:"channel_map"`
OwnerModule int `json:"owner_module"`
Mute bool `json:"mute"`
Volume map[string]Volume `json:"volume"`
Balance float64 `json:"balance"`
BaseVolume Volume `json:"base_volume"`
MonitorSource string `json:"monitor_source"`
Latency Latency `json:"latency"`
Flags []string `json:"flags"`
Properties map[string]interface{} `json:"properties"`
Ports []interface{} `json:"ports"`
ActivePort interface{} `json:"active_port"`
Formats []string `json:"formats"`
}

type IOBase struct {
Index int `json:"index"`
Driver string `json:"driver"`
OwnerModule string `json:"owner_module"`
Client string `json:"client"`
SampleSpecification string `json:"sample_specification"`
ChannelMap string `json:"channel_map"`
Format string `json:"format"`
Corked bool `json:"corked"`
Mute bool `json:"mute"`
Volume map[string]Volume `json:"volume"`
Balance float64 `json:"balance"`
BufferLatencyUSec float64 `json:"buffer_latency_usec"`
SinkLatencyUSec float64 `json:"sink_latency_usec"`
ResampleMethod string `json:"resample_method"`
Properties map[string]interface{} `json:"properties"`
}

type SinkInput struct {
IOBase `json:",inline"`
Sink int `json:"sink"`
}

type SourceOutput struct {
IOBase `json:",inline"`
Source int `json:"source"`
}

type Client struct {
Index int `json:"index"`
Driver string `json:"driver"`
OwnerModule string `json:"owner_module"`
Properties map[string]interface{} `json:"properties"`
}

type Volume struct {
Value int `json:"value"`
ValuePercent string `json:"value_percent"`
Db string `json:"db"`
}

type Latency struct {
Actual float64 `json:"actual"`
Configured float64 `json:"configured"`
}

type EgressInfo struct {
EgressID string
SinkInputs int
SourceOutputs int
}

func (info *PactlInfo) GetEgressInfo() map[int]*EgressInfo {
egressMap := make(map[int]*EgressInfo)
for _, sink := range info.Sinks {
egressMap[sink.Index] = &EgressInfo{
EgressID: sink.Name,
}
}
for _, sinkInput := range info.SinkInputs {
egressMap[sinkInput.Sink].SinkInputs++
}
for _, sourceOutput := range info.SourceOutputs {
egressMap[sourceOutput.Source].SourceOutputs++
}
return egressMap
}
18 changes: 18 additions & 0 deletions pkg/pipeline/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package pipeline

import (
"context"
"encoding/json"
"fmt"
"regexp"
"strings"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/livekit/egress/pkg/pipeline/builder"
"github.com/livekit/egress/pkg/pipeline/sink"
"github.com/livekit/egress/pkg/pipeline/source"
"github.com/livekit/egress/pkg/pipeline/source/pactl"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/logger"
)
Expand Down Expand Up @@ -108,6 +110,22 @@ func (c *Controller) gstLog(
msg = fmt.Sprintf("[%s %s] %s", category, lvl, message)
}
c.gstLogger.Debugw(msg, "caller", fmt.Sprintf("%s:%d", file, line))

if category == "pulse" {
info, err := pactl.List()
if err != nil {
logger.Errorw("failed to get pulse info", err)
} else {
b, _ := json.Marshal(info.GetEgressInfo())
logger.Infow("pactl status",
"modules", len(info.Modules),
"sinks", len(info.Sinks),
"sources", len(info.Sources),
"clients", len(info.Clients),
"egresses", string(b),
)
}
}
}

func (c *Controller) messageWatch(msg *gst.Message) bool {
Expand Down

0 comments on commit e85379c

Please sign in to comment.