diff --git a/cmd/broadcast/cmd.go b/cmd/broadcast/cmd.go index 35d7a7c..4e8641d 100644 --- a/cmd/broadcast/cmd.go +++ b/cmd/broadcast/cmd.go @@ -168,6 +168,13 @@ func mqttClientFlags(options *cfg.MQTTClientConfigOptions) []cli.Flag { DefaultText: "/edge/livestream/signal/candidate/send", Destination: &options.CandidateRecvTopicPrefix, }), + altsrc.NewStringFlag(&cli.StringFlag{ + Name: "mqtt_client.topic_hook_stream_prefix", + Usage: "MQTT topic prefix for hooking of seeding stream", + Value: "/edge/livestream/hook", + DefaultText: "/edge/livestream/hook", + Destination: &options.HookStreamTopicPrefix, + }), altsrc.NewUintFlag(&cli.UintFlag{ Name: "mqtt_client.qos", Usage: "MQTT client qos for WebRTC SDP signaling", @@ -177,7 +184,7 @@ func mqttClientFlags(options *cfg.MQTTClientConfigOptions) []cli.Flag { }), altsrc.NewBoolFlag(&cli.BoolFlag{ Name: "mqtt_client.retained", - Usage: "MQTT client setting retainsion for WebRTC SDP signaling", + Usage: "MQTT client setting retention for WebRTC SDP signaling", Value: false, DefaultText: "false", Destination: &options.Retained, diff --git a/config/config.example.toml b/config/config.example.toml index cfb6e08..fd183e9 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -13,6 +13,8 @@ topic_candidate_send_prefix = "/edge/livestream/signal/candidate/recv" # topic_candidate_recv_prefix is opposite to edge's topic_candidate_send_prefix topic topic_candidate_recv_prefix = "/edge/livestream/signal/candidate/send" +topic_hook_stream_prefix = "/edge/livestream/hook" + qos = 0 retained = false diff --git a/internal/broadcast/broadcast.go b/internal/broadcast/broadcast.go index af3a72e..bc1cfd5 100644 --- a/internal/broadcast/broadcast.go +++ b/internal/broadcast/broadcast.go @@ -40,7 +40,10 @@ func (s *Service) Broadcast() error { }) pub.Signal() - sub := subscriber.New(&s.sessions, &s.logger, s.config.WebRTCConfigOptions) + sub := subscriber.New(s.client, &s.sessions, &s.logger, &cfg.SubscriberConfigOptions{ + MQTTClientConfigOptions: s.config.MQTTClientConfigOptions, + WebRTCConfigOptions: s.config.WebRTCConfigOptions, + }) handler := sub.Signal() server := s.newServer(handler) diff --git a/internal/broadcast/cfg/cfg.go b/internal/broadcast/cfg/cfg.go index fa0294d..32fd0a6 100644 --- a/internal/broadcast/cfg/cfg.go +++ b/internal/broadcast/cfg/cfg.go @@ -11,6 +11,11 @@ type PublisherConfigOptions struct { WebRTCConfigOptions } +type SubscriberConfigOptions struct { + MQTTClientConfigOptions + WebRTCConfigOptions +} + type WebRTCConfigOptions struct { ICEServer string Username string @@ -23,6 +28,7 @@ type MQTTClientConfigOptions struct { AnswerTopicPrefix string CandidateSendTopicPrefix string // Opposite to edge's CandidateRecvTopicPrefix topic CandidateRecvTopicPrefix string // Opposite to edge's CandidateSendTopicPrefix topic. + HookStreamTopicPrefix string Qos uint Retained bool } diff --git a/internal/broadcast/publisher/publisher.go b/internal/broadcast/publisher/publisher.go index b38710c..ce4c43d 100644 --- a/internal/broadcast/publisher/publisher.go +++ b/internal/broadcast/publisher/publisher.go @@ -75,7 +75,7 @@ func (p *Publisher) sendCandidate(meta *pb.Meta) webrtcx.SendCandidateFunc { go func() { <-t.Done() if t.Error() != nil { - p.logger.Err(t.Error()).Msgf("could not publish to %s", p.config.CandidateSendTopicPrefix) + p.logger.Err(t.Error()).Msgf("could not publish to %s", topic) } }() return nil @@ -177,6 +177,7 @@ func (p *Publisher) signalPeerConnection(offer *pb.SessionDescription, logger *z p.sendCandidate(offer.Meta), p.recvCandidate(offer.Meta), p.registerSession(offer.Meta, videoTrack), + webrtcx.NoopHookStreamFunc, ) // TODO: handle blocking case with timeout for channels. diff --git a/internal/broadcast/subscriber/subscriber.go b/internal/broadcast/subscriber/subscriber.go index 6a95f1f..c8cf997 100644 --- a/internal/broadcast/subscriber/subscriber.go +++ b/internal/broadcast/subscriber/subscriber.go @@ -8,6 +8,7 @@ import ( "sync" pb "github.com/SB-IM/pb/signal" + mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gorilla/mux" "github.com/pion/webrtc/v3" "github.com/rs/zerolog" @@ -21,7 +22,8 @@ import ( // Subscriber stands for a subscriber webRTC peer. type Subscriber struct { - config cfg.WebRTCConfigOptions + client mqtt.Client + config *cfg.SubscriberConfigOptions logger zerolog.Logger // sessions must be created before used by publisher and is shared between publishers ans subscribers. @@ -44,9 +46,15 @@ type outgoingMessage struct { } // New returns a new Subscriber. -func New(sessions *sync.Map, logger *zerolog.Logger, config cfg.WebRTCConfigOptions) *Subscriber { +func New( + client mqtt.Client, + sessions *sync.Map, + logger *zerolog.Logger, + config *cfg.SubscriberConfigOptions, +) *Subscriber { l := logger.With().Str("component", "Subscriber").Logger() return &Subscriber{ + client: client, sessions: sessions, config: config, logger: l, @@ -134,11 +142,12 @@ func (s *Subscriber) processMessage(ctx context.Context, c *websocket.Conn) { } wcx := webrtcx.New( - s.config, + s.config.WebRTCConfigOptions, &logger, sendCandidate(ctx, c, offer.Meta), recvCandidate(candidateChan[offer.Meta.TrackSource]), webrtcx.NoopRegisterSessionFunc, + s.hookStream(offer.Meta), ) var sdp webrtc.SessionDescription @@ -212,6 +221,26 @@ func (s *Subscriber) processMessage(ctx context.Context, c *websocket.Conn) { } } +// hookStream only sends signal to drone track source. +func (s *Subscriber) hookStream(meta *pb.Meta) webrtcx.HookStreamFunc { + if meta.TrackSource != pb.TrackSource_DRONE { + return webrtcx.NoopHookStreamFunc + } + return func() { + topic := s.config.HookStreamTopicPrefix + "/" + meta.Id + "/" + strconv.Itoa(int(meta.TrackSource)) + t := s.client.Publish(topic, byte(s.config.Qos), s.config.Retained, []byte("")) + // Handle the token in a go routine so this loop keeps sending messages regardless of delivery status + go func() { + <-t.Done() + if t.Error() != nil { + s.logger.Err(t.Error()).Msgf("could not publish to %s", topic) + } else { + s.logger.Info().Str("topic", topic).Msg("Sent hook signal") + } + }() + } +} + // sendCandidate sends an ice candidate through webSocket. // It can be called multiple time to send multiple ice candidates. func sendCandidate(ctx context.Context, c *websocket.Conn, meta *pb.Meta) webrtcx.SendCandidateFunc { diff --git a/internal/broadcast/webrtc/webrtc.go b/internal/broadcast/webrtc/webrtc.go index 97ed34c..29d72c1 100644 --- a/internal/broadcast/webrtc/webrtc.go +++ b/internal/broadcast/webrtc/webrtc.go @@ -25,6 +25,9 @@ type RecvCandidateFunc func() <-chan string // For subscriber, it should use NoopRegisterSessionFunc instead. type RegisterSessionFunc func() +// HookStreamFunc hooks the stream seeding source on peer connection established. +type HookStreamFunc func() + const ( rtcpPLIInterval = time.Second * 3 ) @@ -43,6 +46,8 @@ type WebRTC struct { recvCandidate RecvCandidateFunc registerSession RegisterSessionFunc + + hookStream HookStreamFunc } // New returns a new WebRTC. @@ -52,6 +57,7 @@ func New( sendCandidate SendCandidateFunc, recvCandidate RecvCandidateFunc, registerSession RegisterSessionFunc, + hookStream HookStreamFunc, ) *WebRTC { return &WebRTC{ logger: *logger, @@ -60,6 +66,7 @@ func New( sendCandidate: sendCandidate, recvCandidate: recvCandidate, registerSession: registerSession, + hookStream: hookStream, } } @@ -165,6 +172,8 @@ func (w *WebRTC) signalPeerConnection(peerConnection *webrtc.PeerConnection) err } w.logger.Info().Msg("peer connection has been closed") case webrtc.ICEConnectionStateConnected: + // Hook video seeding source here. + w.hookStream() // Register session after ICE state is connected. w.registerSession() default: @@ -295,3 +304,6 @@ func NoopRecvCandidateFunc() <-chan string { // NoopRegisterSessionFunc does nothing. func NoopRegisterSessionFunc() {} + +// NoopHookStreamFunc does nothing. +func NoopHookStreamFunc() {}