-
Notifications
You must be signed in to change notification settings - Fork 15
/
standalone_stream_server.go
248 lines (221 loc) · 7.11 KB
/
standalone_stream_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package gostream
import (
"context"
"errors"
"fmt"
"html/template"
"net/http"
"path/filepath"
"runtime"
"sync"
"github.com/edaniels/golog"
"github.com/pion/webrtc/v3"
"go.uber.org/multierr"
"go.viam.com/utils"
"go.viam.com/utils/rpc"
"goji.io"
"goji.io/pat"
streampb "github.com/edaniels/gostream/proto/stream/v1"
)
// A StandaloneStreamServer is a convenience helper for solely streaming a series
// streams. Streams can be added over time for future new connections.
type StandaloneStreamServer interface {
// AddStream adds the given stream for new connections to see.
AddStream(stream Stream) error
// Start starts the server and waits for new connections.
Start(ctx context.Context) error
// Stop stops the server and stops the underlying streams.
Stop(ctx context.Context) error
}
type standaloneStreamServer struct {
port int
streamServer StreamServer
rpcServer rpc.Server
httpServer *http.Server
started bool
opts StandaloneStreamServerOptions
logger golog.Logger
activeBackgroundWorkers sync.WaitGroup
}
// NewStandaloneStreamServer returns a server that will run on the given port and initially starts
// with the given streams.
func NewStandaloneStreamServer(
port int,
logger golog.Logger,
opts []StandaloneStreamServerOption,
streams ...Stream,
) (StandaloneStreamServer, error) {
var sOpts StandaloneStreamServerOptions
for _, opt := range opts {
opt.apply(&sOpts)
}
streamServer, err := NewStreamServer(streams...)
if err != nil {
return nil, err
}
return &standaloneStreamServer{
port: port,
streamServer: streamServer,
opts: sOpts,
logger: logger,
}, nil
}
// ErrServerAlreadyStarted happens when the server has already been started.
var ErrServerAlreadyStarted = errors.New("already started")
func (ss *standaloneStreamServer) AddStream(stream Stream) error {
return ss.streamServer.AddStream(stream)
}
func (ss *standaloneStreamServer) Start(ctx context.Context) error {
if ss.started {
return ErrServerAlreadyStarted
}
ss.started = true
humanAddress := fmt.Sprintf("localhost:%d", ss.port)
listener, secure, err := utils.NewPossiblySecureTCPListenerFromFile(humanAddress, "", "")
if err != nil {
return err
}
var serverOpts []rpc.ServerOption
webrtcOpts := rpc.WebRTCServerOptions{
Enable: true,
}
if ss.opts.onPeerAdded != nil {
webrtcOpts.OnPeerAdded = ss.opts.onPeerAdded
}
if ss.opts.onPeerRemoved != nil {
webrtcOpts.OnPeerRemoved = ss.opts.onPeerRemoved
}
serverOpts = append(serverOpts, rpc.WithWebRTCServerOptions(webrtcOpts))
serverOpts = append(serverOpts, rpc.WithUnauthenticated(), rpc.WithInstanceNames("local"))
rpcServer, err := rpc.NewServer(ss.logger, serverOpts...)
if err != nil {
return err
}
ss.rpcServer = rpcServer
if err := rpcServer.RegisterServiceServer(
ctx,
&streampb.StreamService_ServiceDesc,
ss.streamServer.ServiceServer(),
streampb.RegisterStreamServiceHandlerFromEndpoint,
); err != nil {
return err
}
//nolint:dogsled
_, thisFilePath, _, _ := runtime.Caller(0)
thisDirPath, err := filepath.Abs(filepath.Dir(thisFilePath))
if err != nil {
return fmt.Errorf("error locating current file: %w", err)
}
t, err := template.New("foo").Funcs(template.FuncMap{
//nolint:gosec
"jsSafe": func(js string) template.JS {
return template.JS(js)
},
//nolint:gosec
"htmlSafe": func(html string) template.HTML {
return template.HTML(html)
},
}).ParseGlob(fmt.Sprintf("%s/*.html", filepath.Join(thisDirPath, "templates")))
if err != nil {
return fmt.Errorf("error parsing templates: %w", err)
}
indexT := t.Lookup("index.html")
mux := goji.NewMux()
mux.HandleFunc(pat.Get("/"), func(w http.ResponseWriter, r *http.Request) {
if err := indexT.Execute(w, struct {
AllowSendAudio bool
}{
ss.opts.allowReceive,
}); err != nil {
panic(err)
}
})
mux.Handle(pat.Get("/static/*"), http.StripPrefix("/static", http.FileServer(http.Dir(filepath.Join(thisDirPath, "frontend/dist")))))
mux.Handle(pat.New("/*"), rpcServer.GRPCHandler())
httpServer, err := utils.NewPlainTextHTTP2Server(mux)
if err != nil {
return err
}
httpServer.Addr = listener.Addr().String()
ss.httpServer = httpServer
var scheme string
if secure {
scheme = "https"
} else {
scheme = "http"
}
ss.activeBackgroundWorkers.Add(2)
utils.PanicCapturingGo(func() {
defer ss.activeBackgroundWorkers.Done()
if err := rpcServer.Start(); err != nil {
ss.logger.Errorw("error starting rpc server", "error", err)
}
})
utils.PanicCapturingGo(func() {
defer ss.activeBackgroundWorkers.Done()
ss.logger.Infow("serving", "url", fmt.Sprintf("%s://%s", scheme, humanAddress))
if err := httpServer.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {
ss.logger.Errorw("error serving", "error", err)
}
})
return nil
}
func (ss *standaloneStreamServer) Stop(ctx context.Context) (err error) {
defer ss.activeBackgroundWorkers.Wait()
defer func() {
err = multierr.Combine(err, ss.rpcServer.Stop())
}()
defer func() {
err = multierr.Combine(err, ss.httpServer.Shutdown(ctx))
}()
return ss.streamServer.Close()
}
// StandaloneStreamServerOptions configures a StandaloneStreamServer.
type StandaloneStreamServerOptions struct {
// onPeerAdded is called when a new peer connection is added.
onPeerAdded func(pc *webrtc.PeerConnection)
// onPeerRemoved is called when an existing peer connection is removed.
onPeerRemoved func(pc *webrtc.PeerConnection)
// allowReceive sets whether or not this stream server wants to receive
// media.
allowReceive bool
}
// StandaloneStreamServerOption configures how we set up the server.
// Cribbed from https://github.com/grpc/grpc-go/blob/aff571cc86e6e7e740130dbbb32a9741558db805/dialoptions.go#L41
type StandaloneStreamServerOption interface {
apply(*StandaloneStreamServerOptions)
}
// funcOption wraps a function that modifies StandaloneStreamServerOptions into an
// implementation of the Option interface.
type funcOption struct {
f func(*StandaloneStreamServerOptions)
}
func (fdo *funcOption) apply(do *StandaloneStreamServerOptions) {
fdo.f(do)
}
func newFuncOption(f func(*StandaloneStreamServerOptions)) *funcOption {
return &funcOption{
f: f,
}
}
// WithStandaloneOnPeerAdded returns an Option which sets a function to call
// when a new peer connection is added.
func WithStandaloneOnPeerAdded(f func(pc *webrtc.PeerConnection)) StandaloneStreamServerOption {
return newFuncOption(func(o *StandaloneStreamServerOptions) {
o.onPeerAdded = f
})
}
// WithStandaloneOnPeerRemoved returns an Option which sets a function to call
// when an existing peer connection is remvoed.
func WithStandaloneOnPeerRemoved(f func(pc *webrtc.PeerConnection)) StandaloneStreamServerOption {
return newFuncOption(func(o *StandaloneStreamServerOptions) {
o.onPeerRemoved = f
})
}
// WithStandaloneAllowReceive returns an Option which sets whether or not this
// stream server wants to receive media.
func WithStandaloneAllowReceive(allowReceive bool) StandaloneStreamServerOption {
return newFuncOption(func(o *StandaloneStreamServerOptions) {
o.allowReceive = allowReceive
})
}