Skip to content
This repository has been archived by the owner on Oct 9, 2021. It is now read-only.

Commit

Permalink
broadcast: Add hookstream support (always enabled)
Browse files Browse the repository at this point in the history
  • Loading branch information
williamlsh committed May 19, 2021
1 parent 7ede951 commit 2a4ffd9
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 6 deletions.
9 changes: 8 additions & 1 deletion cmd/broadcast/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,13 @@ func mqttClientFlags(options *cfg.MQTTClientConfigOptions) []cli.Flag {
DefaultText: "/edge/livestream/signal/candidate/send",
Destination: &options.CandidateRecvTopicPrefix,
}),
altsrc.NewStringFlag(&cli.StringFlag{
Name: "mqtt_client.topic_hook_stream_prefix",
Usage: "MQTT topic prefix for hooking of seeding stream",
Value: "/edge/livestream/hook",
DefaultText: "/edge/livestream/hook",
Destination: &options.HookStreamTopicPrefix,
}),
altsrc.NewUintFlag(&cli.UintFlag{
Name: "mqtt_client.qos",
Usage: "MQTT client qos for WebRTC SDP signaling",
Expand All @@ -177,7 +184,7 @@ func mqttClientFlags(options *cfg.MQTTClientConfigOptions) []cli.Flag {
}),
altsrc.NewBoolFlag(&cli.BoolFlag{
Name: "mqtt_client.retained",
Usage: "MQTT client setting retainsion for WebRTC SDP signaling",
Usage: "MQTT client setting retention for WebRTC SDP signaling",
Value: false,
DefaultText: "false",
Destination: &options.Retained,
Expand Down
2 changes: 2 additions & 0 deletions config/config.example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ topic_candidate_send_prefix = "/edge/livestream/signal/candidate/recv"
# topic_candidate_recv_prefix is opposite to edge's topic_candidate_send_prefix topic
topic_candidate_recv_prefix = "/edge/livestream/signal/candidate/send"

topic_hook_stream_prefix = "/edge/livestream/hook"

qos = 0
retained = false

Expand Down
5 changes: 4 additions & 1 deletion internal/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ func (s *Service) Broadcast() error {
})
pub.Signal()

sub := subscriber.New(&s.sessions, &s.logger, s.config.WebRTCConfigOptions)
sub := subscriber.New(s.client, &s.sessions, &s.logger, &cfg.SubscriberConfigOptions{
MQTTClientConfigOptions: s.config.MQTTClientConfigOptions,
WebRTCConfigOptions: s.config.WebRTCConfigOptions,
})
handler := sub.Signal()

server := s.newServer(handler)
Expand Down
6 changes: 6 additions & 0 deletions internal/broadcast/cfg/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ type PublisherConfigOptions struct {
WebRTCConfigOptions
}

type SubscriberConfigOptions struct {
MQTTClientConfigOptions
WebRTCConfigOptions
}

type WebRTCConfigOptions struct {
ICEServer string
Username string
Expand All @@ -23,6 +28,7 @@ type MQTTClientConfigOptions struct {
AnswerTopicPrefix string
CandidateSendTopicPrefix string // Opposite to edge's CandidateRecvTopicPrefix topic
CandidateRecvTopicPrefix string // Opposite to edge's CandidateSendTopicPrefix topic.
HookStreamTopicPrefix string
Qos uint
Retained bool
}
Expand Down
3 changes: 2 additions & 1 deletion internal/broadcast/publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (p *Publisher) sendCandidate(meta *pb.Meta) webrtcx.SendCandidateFunc {
go func() {
<-t.Done()
if t.Error() != nil {
p.logger.Err(t.Error()).Msgf("could not publish to %s", p.config.CandidateSendTopicPrefix)
p.logger.Err(t.Error()).Msgf("could not publish to %s", topic)
}
}()
return nil
Expand Down Expand Up @@ -177,6 +177,7 @@ func (p *Publisher) signalPeerConnection(offer *pb.SessionDescription, logger *z
p.sendCandidate(offer.Meta),
p.recvCandidate(offer.Meta),
p.registerSession(offer.Meta, videoTrack),
webrtcx.NoopHookStreamFunc,
)

// TODO: handle blocking case with timeout for channels.
Expand Down
35 changes: 32 additions & 3 deletions internal/broadcast/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"sync"

pb "github.com/SB-IM/pb/signal"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/gorilla/mux"
"github.com/pion/webrtc/v3"
"github.com/rs/zerolog"
Expand All @@ -21,7 +22,8 @@ import (

// Subscriber stands for a subscriber webRTC peer.
type Subscriber struct {
config cfg.WebRTCConfigOptions
client mqtt.Client
config *cfg.SubscriberConfigOptions
logger zerolog.Logger

// sessions must be created before used by publisher and is shared between publishers ans subscribers.
Expand All @@ -44,9 +46,15 @@ type outgoingMessage struct {
}

// New returns a new Subscriber.
func New(sessions *sync.Map, logger *zerolog.Logger, config cfg.WebRTCConfigOptions) *Subscriber {
func New(
client mqtt.Client,
sessions *sync.Map,
logger *zerolog.Logger,
config *cfg.SubscriberConfigOptions,
) *Subscriber {
l := logger.With().Str("component", "Subscriber").Logger()
return &Subscriber{
client: client,
sessions: sessions,
config: config,
logger: l,
Expand Down Expand Up @@ -134,11 +142,12 @@ func (s *Subscriber) processMessage(ctx context.Context, c *websocket.Conn) {
}

wcx := webrtcx.New(
s.config,
s.config.WebRTCConfigOptions,
&logger,
sendCandidate(ctx, c, offer.Meta),
recvCandidate(candidateChan[offer.Meta.TrackSource]),
webrtcx.NoopRegisterSessionFunc,
s.hookStream(offer.Meta),
)

var sdp webrtc.SessionDescription
Expand Down Expand Up @@ -212,6 +221,26 @@ func (s *Subscriber) processMessage(ctx context.Context, c *websocket.Conn) {
}
}

// hookStream only sends signal to drone track source.
func (s *Subscriber) hookStream(meta *pb.Meta) webrtcx.HookStreamFunc {
if meta.TrackSource != pb.TrackSource_DRONE {
return webrtcx.NoopHookStreamFunc
}
return func() {
topic := s.config.HookStreamTopicPrefix + "/" + meta.Id + "/" + strconv.Itoa(int(meta.TrackSource))
t := s.client.Publish(topic, byte(s.config.Qos), s.config.Retained, []byte(""))
// Handle the token in a go routine so this loop keeps sending messages regardless of delivery status
go func() {
<-t.Done()
if t.Error() != nil {
s.logger.Err(t.Error()).Msgf("could not publish to %s", topic)
} else {
s.logger.Info().Str("topic", topic).Msg("Sent hook signal")
}
}()
}
}

// sendCandidate sends an ice candidate through webSocket.
// It can be called multiple time to send multiple ice candidates.
func sendCandidate(ctx context.Context, c *websocket.Conn, meta *pb.Meta) webrtcx.SendCandidateFunc {
Expand Down
12 changes: 12 additions & 0 deletions internal/broadcast/webrtc/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type RecvCandidateFunc func() <-chan string
// For subscriber, it should use NoopRegisterSessionFunc instead.
type RegisterSessionFunc func()

// HookStreamFunc hooks the stream seeding source on peer connection established.
type HookStreamFunc func()

const (
rtcpPLIInterval = time.Second * 3
)
Expand All @@ -43,6 +46,8 @@ type WebRTC struct {
recvCandidate RecvCandidateFunc

registerSession RegisterSessionFunc

hookStream HookStreamFunc
}

// New returns a new WebRTC.
Expand All @@ -52,6 +57,7 @@ func New(
sendCandidate SendCandidateFunc,
recvCandidate RecvCandidateFunc,
registerSession RegisterSessionFunc,
hookStream HookStreamFunc,
) *WebRTC {
return &WebRTC{
logger: *logger,
Expand All @@ -60,6 +66,7 @@ func New(
sendCandidate: sendCandidate,
recvCandidate: recvCandidate,
registerSession: registerSession,
hookStream: hookStream,
}
}

Expand Down Expand Up @@ -165,6 +172,8 @@ func (w *WebRTC) signalPeerConnection(peerConnection *webrtc.PeerConnection) err
}
w.logger.Info().Msg("peer connection has been closed")
case webrtc.ICEConnectionStateConnected:
// Hook video seeding source here.
w.hookStream()
// Register session after ICE state is connected.
w.registerSession()
default:
Expand Down Expand Up @@ -295,3 +304,6 @@ func NoopRecvCandidateFunc() <-chan string {

// NoopRegisterSessionFunc does nothing.
func NoopRegisterSessionFunc() {}

// NoopHookStreamFunc does nothing.
func NoopHookStreamFunc() {}

0 comments on commit 2a4ffd9

Please sign in to comment.