diff --git a/pkg/pipeline/source/pactl/pactl.go b/pkg/pipeline/source/pactl/pactl.go new file mode 100644 index 000000000..a221eddf5 --- /dev/null +++ b/pkg/pipeline/source/pactl/pactl.go @@ -0,0 +1,127 @@ +package pactl + +import ( + "bytes" + "encoding/json" + "os/exec" +) + +func List() (*PactlInfo, error) { + cmd := exec.Command("pactl", "--format", "json", "list") + var b bytes.Buffer + cmd.Stdout = &b + if err := cmd.Run(); err != nil { + return nil, err + } + + info := &PactlInfo{} + return info, json.Unmarshal(b.Bytes(), info) +} + +type PactlInfo struct { + Modules []Module `json:"modules"` + Sinks []Device `json:"sinks"` + Sources []Device `json:"sources"` + SinkInputs []SinkInput `json:"sink_inputs"` + SourceOutputs []SourceOutput `json:"source_outputs"` + Clients []Client `json:"clients"` + Samples []interface{} `json:"samples"` + Cards []interface{} `json:"cards"` +} + +type Module struct { + Name string `json:"name"` + Argument string `json:"argument"` + UsageCounter string `json:"usage_counter"` + Properties map[string]interface{} `json:"properties"` +} + +type Device struct { + Index int `json:"index"` + State string `json:"state"` + Name string `json:"name"` + Description string `json:"description"` + Driver string `json:"driver"` + SampleSpecification string `json:"sample_specification"` + ChannelMap string `json:"channel_map"` + OwnerModule int `json:"owner_module"` + Mute bool `json:"mute"` + Volume map[string]Volume `json:"volume"` + Balance float64 `json:"balance"` + BaseVolume Volume `json:"base_volume"` + MonitorSource string `json:"monitor_source"` + Latency Latency `json:"latency"` + Flags []string `json:"flags"` + Properties map[string]interface{} `json:"properties"` + Ports []interface{} `json:"ports"` + ActivePort interface{} `json:"active_port"` + Formats []string `json:"formats"` +} + +type IOBase struct { + Index int `json:"index"` + Driver string `json:"driver"` + OwnerModule string `json:"owner_module"` + Client string `json:"client"` + SampleSpecification string `json:"sample_specification"` + ChannelMap string `json:"channel_map"` + Format string `json:"format"` + Corked bool `json:"corked"` + Mute bool `json:"mute"` + Volume map[string]Volume `json:"volume"` + Balance float64 `json:"balance"` + BufferLatencyUSec float64 `json:"buffer_latency_usec"` + SinkLatencyUSec float64 `json:"sink_latency_usec"` + ResampleMethod string `json:"resample_method"` + Properties map[string]interface{} `json:"properties"` +} + +type SinkInput struct { + IOBase `json:",inline"` + Sink int `json:"sink"` +} + +type SourceOutput struct { + IOBase `json:",inline"` + Source int `json:"source"` +} + +type Client struct { + Index int `json:"index"` + Driver string `json:"driver"` + OwnerModule string `json:"owner_module"` + Properties map[string]interface{} `json:"properties"` +} + +type Volume struct { + Value int `json:"value"` + ValuePercent string `json:"value_percent"` + Db string `json:"db"` +} + +type Latency struct { + Actual float64 `json:"actual"` + Configured float64 `json:"configured"` +} + +type EgressInfo struct { + EgressID string + SinkInputs int + SourceOutputs int +} + +func (info *PactlInfo) GetEgressInfo() map[int]*EgressInfo { + egressMap := make(map[int]*EgressInfo) + for _, sink := range info.Sinks { + egressMap[sink.Index] = &EgressInfo{ + EgressID: sink.Name, + } + } + for _, sinkInput := range info.SinkInputs { + egressMap[sinkInput.Sink].SinkInputs++ + } + for _, sourceOutput := range info.SourceOutputs { + egressMap[sourceOutput.Source].SourceOutputs++ + } + return egressMap +} diff --git a/pkg/pipeline/watch.go b/pkg/pipeline/watch.go index 1c57769e0..1c5f19e42 100644 --- a/pkg/pipeline/watch.go +++ b/pkg/pipeline/watch.go @@ -16,6 +16,7 @@ package pipeline import ( "context" + "encoding/json" "fmt" "regexp" "strings" @@ -27,6 +28,7 @@ import ( "github.com/livekit/egress/pkg/pipeline/builder" "github.com/livekit/egress/pkg/pipeline/sink" "github.com/livekit/egress/pkg/pipeline/source" + "github.com/livekit/egress/pkg/pipeline/source/pactl" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/logger" ) @@ -108,6 +110,22 @@ func (c *Controller) gstLog( msg = fmt.Sprintf("[%s %s] %s", category, lvl, message) } c.gstLogger.Debugw(msg, "caller", fmt.Sprintf("%s:%d", file, line)) + + if category == "pulse" { + info, err := pactl.List() + if err != nil { + logger.Errorw("failed to get pulse info", err) + } else { + b, _ := json.Marshal(info.GetEgressInfo()) + logger.Infow("pactl status", + "modules", len(info.Modules), + "sinks", len(info.Sinks), + "sources", len(info.Sources), + "clients", len(info.Clients), + "egresses", string(b), + ) + } + } } func (c *Controller) messageWatch(msg *gst.Message) bool {