From 11662e99610b313398b3cabe354f7faec8e74931 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 17:33:36 +0100 Subject: [PATCH 01/19] replace root cmd. --- cmd/root.go | 270 +++++++++++++++++++++++++++++--------- cmd/serve.go | 6 +- go.mod | 1 + go.sum | 3 + internal/config/config.go | 40 ++---- internal/main.go | 9 +- 6 files changed, 223 insertions(+), 106 deletions(-) diff --git a/cmd/root.go b/cmd/root.go index 1e4ff7c..5aa6181 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,100 +1,242 @@ package cmd import ( + "fmt" + "io" "os" + "os/signal" "runtime" "strings" + "syscall" "github.com/fsnotify/fsnotify" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/spf13/viper" - - transcode "github.com/m1k1o/go-transcode/internal" + "gopkg.in/natefinch/lumberjack.v2" ) -func Execute() error { - return root.Execute() -} +// Default configuration path +const defCfgPath = "/etc/transcode/" -var root = &cobra.Command{ - Use: "transcode", - Short: "transcode server", - Long: `transcode server`, +// ENV prefix for configuration +const envPrefix = "TRANSCODE" + +var rootCmd = &cobra.Command{ + Use: "transcode", + Short: "Transcode server CLI.", + Long: `Transcode HTTP on-demand transcoding API.`, + Version: "1.0.0", } +var onConfigLoad []func() + func init() { + var cfgFile string + var logConfig logConfig + cobra.OnInitialize(func() { - ////// - // logs - ////// - zerolog.TimeFieldFormat = zerolog.TimeFormatUnix - log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stdout}) - - ////// - // configs - ////// - - // at this point we did not read any config data, so we need to tell - // explicitly how to get this value - cfgFile := viper.GetString("config") // Use config file from the flag - if cfgFile == "" { - cfgFile = os.Getenv("TRANSCODE_CONFIG") // Use config file from the env - } + initConfiguration(cfgFile, defCfgPath, envPrefix) + logConfig.Set() + initLogging(logConfig) + + // display used configuration file + file := viper.ConfigFileUsed() + if file != "" { + viper.OnConfigChange(func(e fsnotify.Event) { + log.Info().Msg("config file reloaded") + + // call load config + for _, loadConfig := range onConfigLoad { + loadConfig() + } + }) - if cfgFile != "" { - viper.SetConfigFile(cfgFile) // use config file from the flag + viper.WatchConfig() + + log.Info().Str("config", file).Msg("preflight complete with config file") } else { - if runtime.GOOS == "linux" { - viper.AddConfigPath("/etc/transcode/") - } + log.Warn().Msg("preflight complete without config file") + } - viper.AddConfigPath(".") - viper.SetConfigName("config") + // call load config + for _, loadConfig := range onConfigLoad { + loadConfig() } + }) + + // config file + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "configuration file path") + _ = viper.BindPFlag("config", rootCmd.PersistentFlags().Lookup("config")) - viper.SetEnvPrefix("TRANSCODE") - viper.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) - viper.AutomaticEnv() // read in environment variables that match + // log configuration + _ = logConfig.Init(rootCmd) +} + +type Config interface { + Init(cmd *cobra.Command) error + Set() +} - err := viper.ReadInConfig() - if err != nil && cfgFile != "" { - log.Err(err).Msg("unable to read in config") +func Execute() error { + return rootCmd.Execute() +} + +// +// Configuration initialization +// + +func initConfiguration(cfgFile string, defCfgPath string, envPrefix string) { + // use configuration file if provided + if cfgFile != "" { + viper.SetConfigFile(cfgFile) + } else { + // confguratino file name + viper.SetConfigName("config") + + // search for configuration file + if runtime.GOOS == "linux" && defCfgPath != "" { + viper.AddConfigPath(defCfgPath) } - // all configs (from file, env and flags) are loaded now, - // we can set them - config := transcode.Service.RootConfig - config.Set() + // seatch for configuration file in ./ + viper.AddConfigPath(".") + } + + if envPrefix != "" { + // env prefix is uppercase progname + viper.SetEnvPrefix(envPrefix) - if config.Debug { - zerolog.SetGlobalLevel(zerolog.DebugLevel) - } else { - zerolog.SetGlobalLevel(zerolog.InfoLevel) + // replace . and - with _ + viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) + + // read in environment variables that match + viper.AutomaticEnv() + } + + // read config file + err := viper.ReadInConfig() + if err != nil && cfgFile != "" { + panic(fmt.Errorf("fatal error config file: %w", err)) + } +} + +// +// Logging initialization +// + +type logConfig struct { + // Set log level + Level string `yaml:"level"` + // Enable console logging + Console bool `yaml:"console"` + // Enable file logging and specify its path + File string `yaml:"file"` + // MaxAge the max age in days to keep a logfile + MaxAge int `yaml:"maxage"` + // MaxSize the max size in MB of the logfile before it's rolled + MaxSize int `yaml:"maxsize"` + // MaxBackups the max number of rolled files to keep + MaxBackups int `yaml:"maxbackups"` +} + +func (logConfig) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().String("log.level", "", "Set log level") + if err := viper.BindPFlag("log.level", cmd.PersistentFlags().Lookup("log.level")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("log.console", true, "Enable console logging") + if err := viper.BindPFlag("log.console", cmd.PersistentFlags().Lookup("log.console")); err != nil { + return err + } + + cmd.PersistentFlags().String("log.file", "", "Enable file logging and specify its path") + if err := viper.BindPFlag("log.file", cmd.PersistentFlags().Lookup("log.file")); err != nil { + return err + } + + cmd.PersistentFlags().Int("log.maxage", 0, "MaxAge the max age in days to keep a logfile") + if err := viper.BindPFlag("log.maxage", cmd.PersistentFlags().Lookup("log.maxage")); err != nil { + return err + } + + cmd.PersistentFlags().Int("log.maxsize", 100, "MaxSize the max size in MB of the logfile before it's rolled") + if err := viper.BindPFlag("log.maxsize", cmd.PersistentFlags().Lookup("log.maxsize")); err != nil { + return err + } + + cmd.PersistentFlags().Int("log.maxbackups", 0, "MaxBackups the max number of rolled files to keep") + if err := viper.BindPFlag("log.maxbackups", cmd.PersistentFlags().Lookup("log.maxbackups")); err != nil { + return err + } + + return nil +} + +func (c *logConfig) Set() { + c.Level = viper.GetString("log.level") + c.Console = viper.GetBool("log.console") + c.File = viper.GetString("log.file") + c.MaxAge = viper.GetInt("log.maxage") + c.MaxSize = viper.GetInt("log.maxsize") + c.MaxBackups = viper.GetInt("log.maxbackups") +} + +func initLogging(config logConfig) { + var writers []io.Writer + + if config.Console { + writers = append(writers, zerolog.ConsoleWriter{ + Out: os.Stderr, + }) + } + + if config.File != "" { + logger := &lumberjack.Logger{ + Filename: config.File, + MaxAge: config.MaxAge, // days + MaxSize: config.MaxSize, // megabytes + MaxBackups: config.MaxBackups, // files } - file := viper.ConfigFileUsed() - if file != "" { - viper.OnConfigChange(func(e fsnotify.Event) { - log.Info().Msg("config file reloaded") - transcode.Service.ConfigReload() - }) + // rotate in response to SIGHUP + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGHUP) - viper.WatchConfig() + go func() { + for { + <-c + logger.Rotate() + } + }() + + writers = append(writers, logger) + } + + zerolog.TimeFieldFormat = zerolog.TimeFormatUnix + log.Logger = log.Output(io.MultiWriter(writers...)) - log.Info(). - Bool("debug", config.Debug). - Str("config", file). - Msg("preflight complete with config file") + if config.Level == "" { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + log.Info().Msg("using default log level") + } else { + // set custom log level + level, err := zerolog.ParseLevel(config.Level) + if err != nil { + zerolog.SetGlobalLevel(zerolog.InfoLevel) + log.Warn().Str("log-level", config.Level).Msg("unknown log level") } else { - log.Warn(). - Bool("debug", config.Debug). - Msg("preflight complete without config file") + zerolog.SetGlobalLevel(level) } - }) - - if err := transcode.Service.RootConfig.Init(root); err != nil { - log.Panic().Err(err).Msg("unable to run root command") } + + log.Info(). + Bool("console", config.Console). + Str("file", config.File). + Int("maxage", config.MaxAge). + Int("maxsize", config.MaxSize). + Int("maxbackups", config.MaxBackups). + Msg("logging configured") } diff --git a/cmd/serve.go b/cmd/serve.go index 75530af..71c8008 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -4,7 +4,7 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - "github.com/m1k1o/go-transcode/internal" + transcode "github.com/m1k1o/go-transcode/internal" ) func init() { @@ -15,7 +15,7 @@ func init() { Run: transcode.Service.ServeCommand, } - cobra.OnInitialize(func() { + onConfigLoad = append(onConfigLoad, func() { transcode.Service.ServerConfig.Set() transcode.Service.Preflight() }) @@ -24,5 +24,5 @@ func init() { log.Panic().Err(err).Msg("unable to run serve command") } - root.AddCommand(command) + rootCmd.AddCommand(command) } diff --git a/go.mod b/go.mod index 6c08c69..e5b4915 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( golang.org/x/text v0.3.7 // indirect gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/ini.v1 v1.63.2 // indirect + gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) require ( diff --git a/go.sum b/go.sum index 09cf700..6e42e10 100644 --- a/go.sum +++ b/go.sum @@ -43,6 +43,7 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= @@ -686,6 +687,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.62.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.63.2 h1:tGK/CyBg7SMzb60vP1M03vNZ3VDu3wGQJwn7Sxi9r3c= gopkg.in/ini.v1 v1.63.2/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/natefinch/lumberjack.v2 v2.0.0 h1:1Lc07Kr7qY4U2YPouBjpCLxpiyxIVoxqXgkXLknAOE8= +gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/config/config.go b/internal/config/config.go index eac056c..59f01a6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -9,37 +9,6 @@ import ( "github.com/spf13/viper" ) -type Root struct { - Debug bool - PProf bool - CfgFile string -} - -func (Root) Init(cmd *cobra.Command) error { - cmd.PersistentFlags().BoolP("debug", "d", false, "enable debug mode") - if err := viper.BindPFlag("debug", cmd.PersistentFlags().Lookup("debug")); err != nil { - return err - } - - cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") - if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { - return err - } - - cmd.PersistentFlags().String("config", "", "configuration file path") - if err := viper.BindPFlag("config", cmd.PersistentFlags().Lookup("config")); err != nil { - return err - } - - return nil -} - -func (s *Root) Set() { - s.Debug = viper.GetBool("debug") - s.PProf = viper.GetBool("pprof") - s.CfgFile = viper.GetString("config") -} - type VideoProfile struct { Width int `mapstructure:"width"` Height int `mapstructure:"height"` @@ -63,6 +32,8 @@ type VOD struct { } type Server struct { + PProf bool + Cert string Key string Bind string @@ -78,6 +49,11 @@ type Server struct { } func (Server) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") + if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { + return err + } + cmd.PersistentFlags().String("bind", "127.0.0.1:8080", "address/port/socket to serve neko") if err := viper.BindPFlag("bind", cmd.PersistentFlags().Lookup("bind")); err != nil { return err @@ -117,6 +93,8 @@ func (Server) Init(cmd *cobra.Command) error { } func (s *Server) Set() { + s.PProf = viper.GetBool("pprof") + s.Cert = viper.GetString("cert") s.Key = viper.GetString("key") s.Bind = viper.GetString("bind") diff --git a/internal/main.go b/internal/main.go index 16a361a..3e5ff0c 100644 --- a/internal/main.go +++ b/internal/main.go @@ -17,13 +17,11 @@ var Service *Main func init() { Service = &Main{ - RootConfig: &config.Root{}, ServerConfig: &config.Server{}, } } type Main struct { - RootConfig *config.Root ServerConfig *config.Server logger zerolog.Logger @@ -45,7 +43,7 @@ func (main *Main) Start() { main.httpManager.Mount(main.apiManager.Mount) main.httpManager.Start() - if main.RootConfig.PProf { + if main.ServerConfig.PProf { pathPrefix := "/debug/pprof/" main.httpManager.WithDebugPProf(pathPrefix) main.logger.Info().Msgf("mounted debug pprof endpoint at %s", pathPrefix) @@ -77,8 +75,3 @@ func (main *Main) ServeCommand(cmd *cobra.Command, args []string) { main.Shutdown() main.logger.Info().Msg("shutdown complete") } - -func (main *Main) ConfigReload() { - main.RootConfig.Set() - main.ServerConfig.Set() -} From 2b2569f732d6cd718475d01ee04b50fa4e9344d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 17:38:25 +0100 Subject: [PATCH 02/19] move modules to pkg. --- internal/api/hls.go | 2 +- internal/api/hlsproxy.go | 2 +- internal/api/hlsvod.go | 3 ++- {hls => pkg/hls}/manager.go | 0 {hls => pkg/hls}/types.go | 0 {hlsproxy => pkg/hlsproxy}/cache.go | 0 {hlsproxy => pkg/hlsproxy}/manager.go | 0 {hlsproxy => pkg/hlsproxy}/types.go | 0 {hlsvod => pkg/hlsvod}/cache.go | 0 {hlsvod => pkg/hlsvod}/manager.go | 0 {hlsvod => pkg/hlsvod}/probe.go | 0 {hlsvod => pkg/hlsvod}/transcode.go | 0 {hlsvod => pkg/hlsvod}/types.go | 0 {hlsvod => pkg/hlsvod}/utils.go | 0 14 files changed, 4 insertions(+), 3 deletions(-) rename {hls => pkg/hls}/manager.go (100%) rename {hls => pkg/hls}/types.go (100%) rename {hlsproxy => pkg/hlsproxy}/cache.go (100%) rename {hlsproxy => pkg/hlsproxy}/manager.go (100%) rename {hlsproxy => pkg/hlsproxy}/types.go (100%) rename {hlsvod => pkg/hlsvod}/cache.go (100%) rename {hlsvod => pkg/hlsvod}/manager.go (100%) rename {hlsvod => pkg/hlsvod}/probe.go (100%) rename {hlsvod => pkg/hlsvod}/transcode.go (100%) rename {hlsvod => pkg/hlsvod}/types.go (100%) rename {hlsvod => pkg/hlsvod}/utils.go (100%) diff --git a/internal/api/hls.go b/internal/api/hls.go index 6bd4074..e304132 100644 --- a/internal/api/hls.go +++ b/internal/api/hls.go @@ -9,7 +9,7 @@ import ( "github.com/go-chi/chi" "github.com/rs/zerolog/log" - "github.com/m1k1o/go-transcode/hls" + "github.com/m1k1o/go-transcode/pkg/hls" ) var hlsManagers map[string]hls.Manager = make(map[string]hls.Manager) diff --git a/internal/api/hlsproxy.go b/internal/api/hlsproxy.go index 9752624..c5eddff 100644 --- a/internal/api/hlsproxy.go +++ b/internal/api/hlsproxy.go @@ -6,7 +6,7 @@ import ( "github.com/go-chi/chi" - "github.com/m1k1o/go-transcode/hlsproxy" + "github.com/m1k1o/go-transcode/pkg/hlsproxy" ) const hlsProxyPerfix = "/hlsproxy/" diff --git a/internal/api/hlsvod.go b/internal/api/hlsvod.go index 7f86f32..e7ff648 100644 --- a/internal/api/hlsvod.go +++ b/internal/api/hlsvod.go @@ -10,8 +10,9 @@ import ( "strings" "github.com/go-chi/chi" - "github.com/m1k1o/go-transcode/hlsvod" "github.com/rs/zerolog/log" + + "github.com/m1k1o/go-transcode/pkg/hlsvod" ) var hlsVodManagers map[string]hlsvod.Manager = make(map[string]hlsvod.Manager) diff --git a/hls/manager.go b/pkg/hls/manager.go similarity index 100% rename from hls/manager.go rename to pkg/hls/manager.go diff --git a/hls/types.go b/pkg/hls/types.go similarity index 100% rename from hls/types.go rename to pkg/hls/types.go diff --git a/hlsproxy/cache.go b/pkg/hlsproxy/cache.go similarity index 100% rename from hlsproxy/cache.go rename to pkg/hlsproxy/cache.go diff --git a/hlsproxy/manager.go b/pkg/hlsproxy/manager.go similarity index 100% rename from hlsproxy/manager.go rename to pkg/hlsproxy/manager.go diff --git a/hlsproxy/types.go b/pkg/hlsproxy/types.go similarity index 100% rename from hlsproxy/types.go rename to pkg/hlsproxy/types.go diff --git a/hlsvod/cache.go b/pkg/hlsvod/cache.go similarity index 100% rename from hlsvod/cache.go rename to pkg/hlsvod/cache.go diff --git a/hlsvod/manager.go b/pkg/hlsvod/manager.go similarity index 100% rename from hlsvod/manager.go rename to pkg/hlsvod/manager.go diff --git a/hlsvod/probe.go b/pkg/hlsvod/probe.go similarity index 100% rename from hlsvod/probe.go rename to pkg/hlsvod/probe.go diff --git a/hlsvod/transcode.go b/pkg/hlsvod/transcode.go similarity index 100% rename from hlsvod/transcode.go rename to pkg/hlsvod/transcode.go diff --git a/hlsvod/types.go b/pkg/hlsvod/types.go similarity index 100% rename from hlsvod/types.go rename to pkg/hlsvod/types.go diff --git a/hlsvod/utils.go b/pkg/hlsvod/utils.go similarity index 100% rename from hlsvod/utils.go rename to pkg/hlsvod/utils.go From e6bcb93b56e02d94bcc676b2de254600848b31a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 18:17:51 +0100 Subject: [PATCH 03/19] separate server as internal module. --- README.md | 11 ++---- internal/config/config.go | 49 ++++--------------------- internal/main.go | 22 ++++------- internal/server/config.go | 57 +++++++++++++++++++++++++++++ internal/{http => server}/debug.go | 8 ++-- internal/{http => server}/http.go | 50 +++++++++++++++---------- internal/{http => server}/logger.go | 2 +- 7 files changed, 113 insertions(+), 86 deletions(-) create mode 100644 internal/server/config.go rename internal/{http => server}/debug.go (77%) rename internal/{http => server}/http.go (59%) rename internal/{http => server}/logger.go (99%) diff --git a/README.md b/README.md index c7e51d5..cd1e9e8 100644 --- a/README.md +++ b/README.md @@ -46,21 +46,18 @@ streams: Full configuration example: ```yaml -# allow debug outputs -debug: true - -# mount debug pprof endpoint at /debug/pprof/ -pprof: true - # bind server to IP:PORT (use :8888 for all connections) bind: localhost:8888 # serve static files from this directory (optional) static: /var/www/html -# TODO: issue #4 +# trust reverse proxies proxy: true +# mount debug pprof endpoint at /debug/pprof/ +pprof: true + # For live streaming streams: cam: rtmp://localhost/live/cam diff --git a/internal/config/config.go b/internal/config/config.go index 59f01a6..4a5c571 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -5,6 +5,8 @@ import ( "os" "path" + "github.com/m1k1o/go-transcode/internal/server" + "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -32,13 +34,7 @@ type VOD struct { } type Server struct { - PProf bool - - Cert string - Key string - Bind string - Static string - Proxy bool + Server server.Config BaseDir string `yaml:"basedir,omitempty"` Streams map[string]string `yaml:"streams"` @@ -48,34 +44,9 @@ type Server struct { HlsProxy map[string]string } -func (Server) Init(cmd *cobra.Command) error { - cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") - if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { - return err - } - - cmd.PersistentFlags().String("bind", "127.0.0.1:8080", "address/port/socket to serve neko") - if err := viper.BindPFlag("bind", cmd.PersistentFlags().Lookup("bind")); err != nil { - return err - } - - cmd.PersistentFlags().String("cert", "", "path to the SSL cert used to secure the neko server") - if err := viper.BindPFlag("cert", cmd.PersistentFlags().Lookup("cert")); err != nil { - return err - } - - cmd.PersistentFlags().String("key", "", "path to the SSL key used to secure the neko server") - if err := viper.BindPFlag("key", cmd.PersistentFlags().Lookup("key")); err != nil { - return err - } - - cmd.PersistentFlags().String("static", "", "path to neko client files to serve") - if err := viper.BindPFlag("static", cmd.PersistentFlags().Lookup("static")); err != nil { - return err - } - - cmd.PersistentFlags().Bool("proxy", false, "allow reverse proxies") - if err := viper.BindPFlag("proxy", cmd.PersistentFlags().Lookup("proxy")); err != nil { +func (s *Server) Init(cmd *cobra.Command) error { + // TODO: Scope + if err := s.Server.Init(cmd); err != nil { return err } @@ -93,13 +64,7 @@ func (Server) Init(cmd *cobra.Command) error { } func (s *Server) Set() { - s.PProf = viper.GetBool("pprof") - - s.Cert = viper.GetString("cert") - s.Key = viper.GetString("key") - s.Bind = viper.GetString("bind") - s.Static = viper.GetString("static") - s.Proxy = viper.GetBool("proxy") + s.Server.Set() s.BaseDir = viper.GetString("basedir") if s.BaseDir == "" { diff --git a/internal/main.go b/internal/main.go index 3e5ff0c..a5f617f 100644 --- a/internal/main.go +++ b/internal/main.go @@ -10,7 +10,7 @@ import ( "github.com/m1k1o/go-transcode/internal/api" "github.com/m1k1o/go-transcode/internal/config" - "github.com/m1k1o/go-transcode/internal/http" + "github.com/m1k1o/go-transcode/internal/server" ) var Service *Main @@ -24,9 +24,9 @@ func init() { type Main struct { ServerConfig *config.Server - logger zerolog.Logger - apiManager *api.ApiManagerCtx - httpManager *http.HttpManagerCtx + logger zerolog.Logger + apiManager *api.ApiManagerCtx + serverManager *server.ServerManagerCtx } func (main *Main) Preflight() { @@ -39,15 +39,9 @@ func (main *Main) Start() { main.apiManager = api.New(config) main.apiManager.Start() - main.httpManager = http.New(config) - main.httpManager.Mount(main.apiManager.Mount) - main.httpManager.Start() - - if main.ServerConfig.PProf { - pathPrefix := "/debug/pprof/" - main.httpManager.WithDebugPProf(pathPrefix) - main.logger.Info().Msgf("mounted debug pprof endpoint at %s", pathPrefix) - } + main.serverManager = server.New(&config.Server) + main.serverManager.Mount(main.apiManager.Mount) + main.serverManager.Start() main.logger.Info().Msgf("serving streams from basedir %s: %s", config.BaseDir, config.Streams) } @@ -55,7 +49,7 @@ func (main *Main) Start() { func (main *Main) Shutdown() { var err error - err = main.httpManager.Shutdown() + err = main.serverManager.Shutdown() main.logger.Err(err).Msg("http manager shutdown") err = main.apiManager.Shutdown() diff --git a/internal/server/config.go b/internal/server/config.go new file mode 100644 index 0000000..ef34706 --- /dev/null +++ b/internal/server/config.go @@ -0,0 +1,57 @@ +package server + +import ( + "github.com/rs/zerolog/log" + + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +type Config struct { + Bind string `mapstructure:"bind"` + Static string `mapstructure:"static"` + SSLCert string `mapstructure:"sslcert"` + SSLKey string `mapstructure:"sslkey"` + Proxy bool `mapstructure:"proxy"` + PProf bool `mapstructure:"pprof"` +} + +func (Config) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().String("bind", "127.0.0.1:8080", "address/port/socket to serve http") + if err := viper.BindPFlag("bind", cmd.PersistentFlags().Lookup("bind")); err != nil { + return err + } + + cmd.PersistentFlags().String("static", "", "path to client files to serve") + if err := viper.BindPFlag("static", cmd.PersistentFlags().Lookup("static")); err != nil { + return err + } + + cmd.PersistentFlags().String("sslcert", "", "path to the SSL cert") + if err := viper.BindPFlag("sslcert", cmd.PersistentFlags().Lookup("sslcert")); err != nil { + return err + } + + cmd.PersistentFlags().String("sslkey", "", "path to the SSL key") + if err := viper.BindPFlag("sslkey", cmd.PersistentFlags().Lookup("sslkey")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("proxy", false, "allow reverse proxies") + if err := viper.BindPFlag("proxy", cmd.PersistentFlags().Lookup("proxy")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") + if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { + return err + } + + return nil +} + +func (c *Config) Set() { + if err := viper.Unmarshal(c); err != nil { + log.Panic().Msg("unable to unmarshal config structure") + } +} diff --git a/internal/http/debug.go b/internal/server/debug.go similarity index 77% rename from internal/http/debug.go rename to internal/server/debug.go index 32565fd..7e474c9 100644 --- a/internal/http/debug.go +++ b/internal/server/debug.go @@ -1,4 +1,4 @@ -package http +package server import ( "net/http" @@ -7,8 +7,10 @@ import ( "github.com/go-chi/chi" ) -func (s *HttpManagerCtx) WithDebugPProf(pathPrefix string) { - s.router.Route(pathPrefix, func(r chi.Router) { +const pprofPath = "/debug/pprof/" + +func withPProf(router *chi.Mux) { + router.Route(pprofPath, func(r chi.Router) { r.Get("/", pprof.Index) r.Get("/{action}", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/http/http.go b/internal/server/http.go similarity index 59% rename from internal/http/http.go rename to internal/server/http.go index f58a8ef..d683653 100644 --- a/internal/http/http.go +++ b/internal/server/http.go @@ -1,4 +1,4 @@ -package http +package server import ( "context" @@ -10,22 +10,27 @@ import ( "github.com/go-chi/chi/middleware" "github.com/rs/zerolog" "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/internal/config" ) -type HttpManagerCtx struct { +type ServerManagerCtx struct { logger zerolog.Logger - config *config.Server + config *Config router *chi.Mux - http *http.Server + server *http.Server } -func New(config *config.Server) *HttpManagerCtx { - logger := log.With().Str("module", "http").Logger() +func New(config *Config) *ServerManagerCtx { + logger := log.With().Str("module", "server").Logger() router := chi.NewRouter() router.Use(middleware.RequestID) // Create a request ID for each request + + // get real users ip + if config.Proxy { + router.Use(middleware.RealIP) + } + + // add http logger router.Use(middleware.RequestLogger(&logformatter{logger})) router.Use(middleware.Recoverer) // Recover from panics without crashing server @@ -41,48 +46,55 @@ func New(config *config.Server) *HttpManagerCtx { }) } + // mount pprof endpoint + if config.PProf { + withPProf(router) + logger.Info().Msgf("with pprof endpoint at %s", pprofPath) + } + + // use custom 404 router.NotFound(func(w http.ResponseWriter, r *http.Request) { //nolint _, _ = w.Write([]byte("404")) }) - return &HttpManagerCtx{ + return &ServerManagerCtx{ logger: logger, config: config, router: router, - http: &http.Server{ + server: &http.Server{ Addr: config.Bind, Handler: router, }, } } -func (s *HttpManagerCtx) Start() { - if s.config.Cert != "" && s.config.Key != "" { +func (s *ServerManagerCtx) Start() { + if s.config.SSLCert != "" && s.config.SSLKey != "" { s.logger.Warn().Msg("TLS support is provided for convenience, but you should never use it in production. Use a reverse proxy (apache nginx caddy) instead!") go func() { - if err := s.http.ListenAndServeTLS(s.config.Cert, s.config.Key); err != http.ErrServerClosed { + if err := s.server.ListenAndServeTLS(s.config.SSLCert, s.config.SSLKey); err != http.ErrServerClosed { s.logger.Panic().Err(err).Msg("unable to start https server") } }() - s.logger.Info().Msgf("https listening on %s", s.http.Addr) + s.logger.Info().Msgf("https listening on %s", s.server.Addr) } else { go func() { - if err := s.http.ListenAndServe(); err != http.ErrServerClosed { + if err := s.server.ListenAndServe(); err != http.ErrServerClosed { s.logger.Panic().Err(err).Msg("unable to start http server") } }() - s.logger.Info().Msgf("http listening on %s", s.http.Addr) + s.logger.Info().Msgf("http listening on %s", s.server.Addr) } } -func (s *HttpManagerCtx) Shutdown() error { +func (s *ServerManagerCtx) Shutdown() error { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - return s.http.Shutdown(ctx) + return s.server.Shutdown(ctx) } -func (s *HttpManagerCtx) Mount(fn func(r *chi.Mux)) { +func (s *ServerManagerCtx) Mount(fn func(r *chi.Mux)) { fn(s.router) } diff --git a/internal/http/logger.go b/internal/server/logger.go similarity index 99% rename from internal/http/logger.go rename to internal/server/logger.go index acc20ef..1b5d5ac 100644 --- a/internal/http/logger.go +++ b/internal/server/logger.go @@ -1,4 +1,4 @@ -package http +package server import ( "fmt" From f106dc7af0e6b703e5eb33af72779dca0f6bad5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 18:37:41 +0100 Subject: [PATCH 04/19] move docker to own folder. --- Dockerfile => .docker/Dockerfile | 0 .../Dockerfile.nvidia | 0 .docker/README.md | 54 ++++++++++++++++++ .docker/build | 5 ++ .../docker-compose.yaml | 10 ++-- .gitlab-ci.yml | 4 +- README.md | 55 +------------------ dev/build | 4 -- dev/exec | 4 -- dev/go | 8 --- dev/start | 10 ---- 11 files changed, 67 insertions(+), 87 deletions(-) rename Dockerfile => .docker/Dockerfile (100%) rename Dockerfile.nvidia => .docker/Dockerfile.nvidia (100%) create mode 100644 .docker/README.md create mode 100755 .docker/build rename docker-compose.yaml => .docker/docker-compose.yaml (52%) delete mode 100755 dev/build delete mode 100755 dev/exec delete mode 100755 dev/go delete mode 100755 dev/start diff --git a/Dockerfile b/.docker/Dockerfile similarity index 100% rename from Dockerfile rename to .docker/Dockerfile diff --git a/Dockerfile.nvidia b/.docker/Dockerfile.nvidia similarity index 100% rename from Dockerfile.nvidia rename to .docker/Dockerfile.nvidia diff --git a/.docker/README.md b/.docker/README.md new file mode 100644 index 0000000..6c5cdba --- /dev/null +++ b/.docker/README.md @@ -0,0 +1,54 @@ +# Docker + +## Build + +```sh +./build go-transcode:latest +``` + +## Run + +```sh +docker run --rm -d \ + --name="go-transcode" \ + -p "8080:8080" \ + -v "${PWD}/config.yaml:/app/config.yaml" go-transcode:latest +``` + +# Nvidia GPU support + +You will need to have [nvidia-docker](https://github.com/NVIDIA/nvidia-docker) installed. + +## Build + +First, you need to build previous container. Then, build Nvidia container. + +```sh +docker build --build-arg "TRANSCODE_IMAGE=go-transcode:latest" -t go-transcode-nvidia:latest -f Dockerfile.nvidia .. +``` + +## Run + +```sh +docker run --rm -d \ + --gpus=all \ + --name="go-transcode-nvidia" \ + -p "8080:8080" \ + -v "${PWD}/config.yaml:/app/config.yaml" go-transcode-nvidia:latest +``` + +## Supported inputs + +Input codec will be automatically determined from given stream. Please check your graphic card's supported codec and maximum concurrent sessions [here](https://developer.nvidia.com/video-encode-decode-gpu-support-matrix). + +| Codec | CUVID | Codec Name | +| ---------- | ----------- | ----------------------------------------- | +| h264 | h264_cuvid | H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10 | +| hevc | hevc_cuvid | H.265 / HEVC | +| mjpeg | mjpeg_cuvid | Motion JPEG | +| mpeg1video | mpeg1_cuvid | MPEG-1 video | +| mpeg2video | mpeg2_cuvid | MPEG-2 video | +| mpeg4 | mpeg4_cuvid | MPEG-4 part 2 | +| vc1 | vc1_cuvid | SMPTE VC-1 | +| vp8 | vp8_cuvid | On2 VP8 | +| vp9 | vp9_cuvid | Google VP9 | diff --git a/.docker/build b/.docker/build new file mode 100755 index 0000000..055c8f4 --- /dev/null +++ b/.docker/build @@ -0,0 +1,5 @@ +#!/bin/sh +cd "$(dirname "$0")" + +BUILD_TAG="${1:-transcode_server_img}" +docker build -t "${BUILD_TAG}" -f Dockerfile .. diff --git a/docker-compose.yaml b/.docker/docker-compose.yaml similarity index 52% rename from docker-compose.yaml rename to .docker/docker-compose.yaml index 68bc717..4aa78f4 100644 --- a/docker-compose.yaml +++ b/.docker/docker-compose.yaml @@ -2,13 +2,13 @@ version: "3" services: go-transcode: - build: . - container_name: go-transcode - restart: unless-stopped + build: "./" + container_name: "go-transcode" + restart: "unless-stopped" ports: - "8080:8080" volumes: - - ./config.yaml:/app/config.yaml + - "./config.yaml:/app/config.yaml" # Mount your VOD here, for hlsvod module # - ./media:/app/media - command: serve -d + command: "serve -d" diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 36a1378..ee26363 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -14,7 +14,7 @@ build: variables: BASE_IMAGE: $CI_REGISTRY_IMAGE:latest script: - - docker build -t $BASE_IMAGE . + - docker build -t $BASE_IMAGE -f ./docker/Dockerfile . - docker push $BASE_IMAGE only: - master @@ -25,7 +25,7 @@ build nivdia: GO_TRANSCODE_IMAGE: $CI_REGISTRY_IMAGE:latest IMAGE_TAG: $CI_REGISTRY_IMAGE/nivdia:latest script: - - docker build --build-arg "GO_TRANSCODE_IMAGE=${GO_TRANSCODE_IMAGE}" --build-arg "BASE_IMAGE=${NVIDIA_FFMPEG_IMAGE}" -t $IMAGE_TAG -f Dockerfile.nvidia . + - docker build --build-arg "GO_TRANSCODE_IMAGE=${GO_TRANSCODE_IMAGE}" --build-arg "BASE_IMAGE=${NVIDIA_FFMPEG_IMAGE}" -t $IMAGE_TAG -f ./docker/Dockerfile.nvidia . - docker push $IMAGE_TAG only: - master diff --git a/README.md b/README.md index cd1e9e8..6ff63ea 100644 --- a/README.md +++ b/README.md @@ -133,60 +133,7 @@ $ ./go-transcode serve First line is warning and "serving streams" line says empty list (`map[]`) because we don't have config.yaml so there no stream configured. Make your config.yaml and try again. -## Docker - -### Build - -```sh -docker build -t go-transcode:latest . -``` - -### Run - -```sh -docker run --rm -d \ - --name="go-transcode" \ - -p "8080:8080" \ - -v "${PWD}/config.yaml:/app/config.yaml" go-transcode:latest -``` - -## Nvidia GPU support (docker) - -You will need to have [nvidia-docker](https://github.com/NVIDIA/nvidia-docker) installed. - -### Build - -First, you need to build previous container. Then, build Nvidia container. - -```sh -docker build --build-arg "TRANSCODE_IMAGE=go-transcode:latest" -t go-transcode-nvidia:latest -f Dockerfile.nvidia . -``` - -### Run - -```sh -docker run --rm -d \ - --gpus=all \ - --name="go-transcode-nvidia" \ - -p "8080:8080" \ - -v "${PWD}/config.yaml:/app/config.yaml" go-transcode-nvidia:latest -``` - -### Supported inputs - -Input codec will be automatically determined from given stream. Please check your graphic card's supported codec and maximum concurrent sessions [here](https://developer.nvidia.com/video-encode-decode-gpu-support-matrix). - -| Codec | CUVID | Codec Name | -| ---------- | ----------- | ----------------------------------------- | -| h264 | h264_cuvid | H.264 / AVC / MPEG-4 AVC / MPEG-4 part 10 | -| hevc | hevc_cuvid | H.265 / HEVC | -| mjpeg | mjpeg_cuvid | Motion JPEG | -| mpeg1video | mpeg1_cuvid | MPEG-1 video | -| mpeg2video | mpeg2_cuvid | MPEG-2 video | -| mpeg4 | mpeg4_cuvid | MPEG-4 part 2 | -| vc1 | vc1_cuvid | SMPTE VC-1 | -| vp8 | vp8_cuvid | On2 VP8 | -| vp9 | vp9_cuvid | Google VP9 | +See [.docker](.docker) folder for docker support. ## Alternatives diff --git a/dev/build b/dev/build deleted file mode 100755 index a9a9aca..0000000 --- a/dev/build +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker build -t transcode_server_img .. diff --git a/dev/exec b/dev/exec deleted file mode 100755 index 29ef203..0000000 --- a/dev/exec +++ /dev/null @@ -1,4 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker exec -it transcode_server_dev /bin/bash; diff --git a/dev/go b/dev/go deleted file mode 100755 index da31564..0000000 --- a/dev/go +++ /dev/null @@ -1,8 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker run -it --rm \ - -v "${PWD}/../:/app" \ - --entrypoint="go" \ - --volume "${PWD}/../:/app" \ - transcode_server_img "$@"; diff --git a/dev/start b/dev/start deleted file mode 100755 index 65d4ac1..0000000 --- a/dev/start +++ /dev/null @@ -1,10 +0,0 @@ -#!/bin/sh -cd "$(dirname "$0")" - -docker run --rm -it \ - --name="transcode_server_dev" \ - -p "3005:8080" \ - -v "${PWD}/../:/app" \ - --entrypoint="/bin/bash" \ - --workdir="/app" \ - transcode_server_img -c 'go build && ./go-transcode serve --bind :8080'; From fcb5315867dd57f9d92dbcf7133800af6e135c61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 18:39:06 +0100 Subject: [PATCH 05/19] update main.go. --- main.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/main.go b/main.go index b7ab534..43e6b46 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,9 @@ package main import ( - "github.com/rs/zerolog/log" - "github.com/m1k1o/go-transcode/cmd" ) func main() { - if err := cmd.Execute(); err != nil { - log.Panic().Err(err).Msg("failed to execute command") - } + _ = cmd.Execute() } From 7e49056c2af432a0b772505efb971dff9d2aa01f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 18:43:19 +0100 Subject: [PATCH 06/19] updade readme folders architecture. --- README.md | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index 6ff63ea..84cc234 100644 --- a/README.md +++ b/README.md @@ -150,19 +150,17 @@ Join us in the [Matrix space](https://matrix.to/#/#go-transcode:proxychat.net) ( The source code is in the following files/folders: -- `cmd/` and `main.go`: source for the command-line interface -- `hls/`: process runner for HLS transcoding -- `hlsvod/`: process runner for HLS VOD transcoding (for static files) -- `internal/`: actual source code logic +- `.docker`: for docker support. +- `cmd/` and `main.go`: source for the command-line interface. +- `internal/`: internal source code modules. +- `pkg/`: external modules as lib. *TODO: document different modules/packages and dependencies* Other files/folders in the repositories are: -- `data/`: files used/served by go-transcode -- `dev/`: some docker helper scripts -- `profiles/`: the ffmpeg profiles for transcoding -- `tests/`: some tests for the project -- `Dockerfile`, `Dockerfile.nvidia` and `docker-compose.yaml`: for the docker lovers -- `god.mod` and `go.sum`: golang dependencies/modules tracking -- `LICENSE`: licensing information (Apache 2.0) +- `docs/`: documentation and usage examples. +- `profiles/`: the ffmpeg profiles for transcoding. +- `tests/`: some tests for the project. +- `god.mod` and `go.sum`: golang dependencies/modules tracking. +- `LICENSE`: licensing information (Apache 2.0). From f8501d99bf7b7297df694c539a13518280cbb23d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 19:14:46 +0100 Subject: [PATCH 07/19] rename hls to hlslive. --- internal/api/{hls.go => hlslive.go} | 6 +++--- pkg/{hls => hlslive}/manager.go | 2 +- pkg/{hls => hlslive}/types.go | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) rename internal/api/{hls.go => hlslive.go} (92%) rename pkg/{hls => hlslive}/manager.go (99%) rename pkg/{hls => hlslive}/types.go (94%) diff --git a/internal/api/hls.go b/internal/api/hlslive.go similarity index 92% rename from internal/api/hls.go rename to internal/api/hlslive.go index e304132..418bc62 100644 --- a/internal/api/hls.go +++ b/internal/api/hlslive.go @@ -9,10 +9,10 @@ import ( "github.com/go-chi/chi" "github.com/rs/zerolog/log" - "github.com/m1k1o/go-transcode/pkg/hls" + "github.com/m1k1o/go-transcode/pkg/hlslive" ) -var hlsManagers map[string]hls.Manager = make(map[string]hls.Manager) +var hlsManagers map[string]hlslive.Manager = make(map[string]hlslive.Manager) //go:embed play.html var playHTML string @@ -49,7 +49,7 @@ func (a *ApiManagerCtx) HLS(r chi.Router) { manager, ok := hlsManagers[ID] if !ok { // create new manager - manager = hls.New(func() *exec.Cmd { + manager = hlslive.New(func() *exec.Cmd { // get transcode cmd cmd, err := a.transcodeStart(profilePath, input) if err != nil { diff --git a/pkg/hls/manager.go b/pkg/hlslive/manager.go similarity index 99% rename from pkg/hls/manager.go rename to pkg/hlslive/manager.go index 4728624..c0fc2b1 100644 --- a/pkg/hls/manager.go +++ b/pkg/hlslive/manager.go @@ -1,4 +1,4 @@ -package hls +package hlslive import ( "errors" diff --git a/pkg/hls/types.go b/pkg/hlslive/types.go similarity index 94% rename from pkg/hls/types.go rename to pkg/hlslive/types.go index eb6c94f..b6f6a9c 100644 --- a/pkg/hls/types.go +++ b/pkg/hlslive/types.go @@ -1,4 +1,4 @@ -package hls +package hlslive import "net/http" From fa9e363d6e8116e2ffd05dec4d8710d5e469ff74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 19:27:36 +0100 Subject: [PATCH 08/19] hlslive add config. --- internal/api/hlslive.go | 2 +- pkg/hlslive/manager.go | 38 ++++++++++++++++++-------------------- pkg/hlslive/types.go | 13 ++++++++++++- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/internal/api/hlslive.go b/internal/api/hlslive.go index 418bc62..221ad23 100644 --- a/internal/api/hlslive.go +++ b/internal/api/hlslive.go @@ -57,7 +57,7 @@ func (a *ApiManagerCtx) HLS(r chi.Router) { } return cmd - }) + }, nil) hlsManagers[ID] = manager } diff --git a/pkg/hlslive/manager.go b/pkg/hlslive/manager.go index c0fc2b1..17a3901 100644 --- a/pkg/hlslive/manager.go +++ b/pkg/hlslive/manager.go @@ -17,26 +17,12 @@ import ( "github.com/m1k1o/go-transcode/internal/utils" ) -// how often should be cleanup called -const cleanupPeriod = 4 * time.Second - -// timeout for first playlist, when it waits for new data -const playlistTimeout = 60 * time.Second - -// minimum segments available to consider stream as active -const hlsMinimumSegments = 2 - -// how long must be active stream idle to be considered as dead -const activeIdleTimeout = 12 * time.Second - -// how long must be iactive stream idle to be considered as dead -const inactiveIdleTimeout = 24 * time.Second - type ManagerCtx struct { logger zerolog.Logger mu sync.Mutex cmdFactory func() *exec.Cmd active bool + config *Config events struct { onStart func() onCmdLog func(message string) @@ -54,10 +40,22 @@ type ManagerCtx struct { shutdown chan interface{} } -func New(cmdFactory func() *exec.Cmd) *ManagerCtx { +func New(cmdFactory func() *exec.Cmd, config *Config) *ManagerCtx { + // use default config values + if config == nil { + config = &Config{ + CleanupPeriod: 4 * time.Second, + PlaylistTimeout: 60 * time.Second, + HlsMinimumSegments: 2, + ActiveIdleTimeout: 12 * time.Second, + InactiveIdleTimeout: 24 * time.Second, + } + } + return &ManagerCtx{ logger: log.With().Str("module", "hls").Str("submodule", "manager").Logger(), cmdFactory: cmdFactory, + config: config, playlistLoad: make(chan string), shutdown: make(chan interface{}), @@ -119,7 +117,7 @@ func (m *ManagerCtx) Start() error { Str("playlist", m.playlist). Msg("received playlist") - if m.sequence == hlsMinimumSegments { + if m.sequence == m.config.HlsMinimumSegments { m.active = true m.playlistLoad <- m.playlist close(m.playlistLoad) @@ -136,7 +134,7 @@ func (m *ManagerCtx) Start() error { // periodic cleanup go func() { - ticker := time.NewTicker(cleanupPeriod) + ticker := time.NewTicker(m.config.CleanupPeriod) defer ticker.Stop() for { @@ -217,7 +215,7 @@ func (m *ManagerCtx) Stop() { func (m *ManagerCtx) Cleanup() { m.mu.Lock() diff := time.Since(m.lastRequest) - stop := m.active && diff > activeIdleTimeout || !m.active && diff > inactiveIdleTimeout + stop := m.active && diff > m.config.ActiveIdleTimeout || !m.active && diff > m.config.InactiveIdleTimeout m.mu.Unlock() m.logger.Debug(). @@ -256,7 +254,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { m.logger.Warn().Msg("playlist load failed because of shutdown") http.Error(w, "500 playlist not available", http.StatusInternalServerError) return - case <-time.After(playlistTimeout): + case <-time.After(m.config.PlaylistTimeout): m.logger.Warn().Msg("playlist load channel timeouted") http.Error(w, "504 playlist timeout", http.StatusGatewayTimeout) return diff --git a/pkg/hlslive/types.go b/pkg/hlslive/types.go index b6f6a9c..c3dfae5 100644 --- a/pkg/hlslive/types.go +++ b/pkg/hlslive/types.go @@ -1,6 +1,17 @@ package hlslive -import "net/http" +import ( + "net/http" + "time" +) + +type Config struct { + CleanupPeriod time.Duration // how often should be cleanup called + PlaylistTimeout time.Duration // timeout for first playlist, when it waits for new data + HlsMinimumSegments int // minimum segments available to consider stream as active + ActiveIdleTimeout time.Duration // how long must be active stream idle to be considered as dead + InactiveIdleTimeout time.Duration // how long must be iactive stream idle to be considered as dead +} type Manager interface { Start() error From 931ee28bf305621b8fc0f0b391c2514c4ee5ef01 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 19:29:32 +0100 Subject: [PATCH 09/19] fix content type for segments. --- pkg/hlslive/manager.go | 2 +- pkg/hlsvod/manager.go | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/hlslive/manager.go b/pkg/hlslive/manager.go index 17a3901..1d70edf 100644 --- a/pkg/hlslive/manager.go +++ b/pkg/hlslive/manager.go @@ -280,7 +280,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { m.lastRequest = time.Now() m.mu.Unlock() - w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") + w.Header().Set("Content-Type", "video/MP2T") w.Header().Set("Cache-Control", "no-cache") http.ServeFile(w, r, path) } diff --git a/pkg/hlsvod/manager.go b/pkg/hlsvod/manager.go index 2bb8913..b88f5a4 100644 --- a/pkg/hlsvod/manager.go +++ b/pkg/hlsvod/manager.go @@ -577,7 +577,6 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { } // return existing segment - w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") - w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Content-Type", "video/MP2T") http.ServeFile(w, r, segmentPath) } From f7e49400c1136192ecb7964e29f20cf7f73a726d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 19:32:56 +0100 Subject: [PATCH 10/19] hlsproxy add config. --- internal/api/hlsproxy.go | 2 +- pkg/hlsproxy/cache.go | 2 +- pkg/hlsproxy/manager.go | 26 ++++++++++++++------------ pkg/hlsproxy/types.go | 11 ++++++++++- 4 files changed, 26 insertions(+), 15 deletions(-) diff --git a/internal/api/hlsproxy.go b/internal/api/hlsproxy.go index c5eddff..b2f07b8 100644 --- a/internal/api/hlsproxy.go +++ b/internal/api/hlsproxy.go @@ -27,7 +27,7 @@ func (a *ApiManagerCtx) HLSProxy(r chi.Router) { manager, ok := hlsProxyManagers[ID] if !ok { // create new manager - manager = hlsproxy.New(baseUrl, hlsProxyPerfix+ID+"/") + manager = hlsproxy.New(baseUrl, hlsProxyPerfix+ID+"/", nil) hlsProxyManagers[ID] = manager } diff --git a/pkg/hlsproxy/cache.go b/pkg/hlsproxy/cache.go index 4f8a60d..9ca21c8 100644 --- a/pkg/hlsproxy/cache.go +++ b/pkg/hlsproxy/cache.go @@ -90,7 +90,7 @@ func (m *ManagerCtx) cleanupStart() { go func() { m.logger.Debug().Msg("cleanup started") - ticker := time.NewTicker(cacheCleanupPeriod) + ticker := time.NewTicker(m.config.CacheCleanupPeriod) defer ticker.Stop() for { diff --git a/pkg/hlsproxy/manager.go b/pkg/hlsproxy/manager.go index a01b1e9..97beeeb 100644 --- a/pkg/hlsproxy/manager.go +++ b/pkg/hlsproxy/manager.go @@ -14,19 +14,11 @@ import ( "github.com/rs/zerolog/log" ) -// how often should be cache cleanup called -const cacheCleanupPeriod = 4 * time.Second - -// how long should be segment kept in memory -const segmentExpiration = 60 * time.Second - -// how long should be playlist kept in memory -const playlistExpiration = 1 * time.Second - type ManagerCtx struct { logger zerolog.Logger baseUrl string prefix string + config *Config cache map[string]*utils.Cache cacheMu sync.RWMutex @@ -36,7 +28,16 @@ type ManagerCtx struct { shutdown chan struct{} } -func New(baseUrl string, prefix string) *ManagerCtx { +func New(baseUrl string, prefix string, config *Config) *ManagerCtx { + // use default config values + if config == nil { + config = &Config{ + CacheCleanupPeriod: 4 * time.Second, + SegmentExpiration: 60 * time.Second, + PlaylistExpiration: 1 * time.Second, + } + } + // ensure it ends with slash baseUrl = strings.TrimSuffix(baseUrl, "/") baseUrl += "/" @@ -45,6 +46,7 @@ func New(baseUrl string, prefix string) *ManagerCtx { logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(), baseUrl: baseUrl, prefix: prefix, + config: config, cache: map[string]*utils.Cache{}, } } @@ -84,7 +86,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { var re = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`) text := re.ReplaceAllString(string(buf), m.prefix) - cache = m.saveToCache(url, strings.NewReader(text), playlistExpiration) + cache = m.saveToCache(url, strings.NewReader(text), m.config.PlaylistExpiration) } w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") @@ -113,7 +115,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { return } - cache = m.saveToCache(url, resp.Body, segmentExpiration) + cache = m.saveToCache(url, resp.Body, m.config.SegmentExpiration) } w.Header().Set("Content-Type", "video/MP2T") diff --git a/pkg/hlsproxy/types.go b/pkg/hlsproxy/types.go index 79bec46..3936bba 100644 --- a/pkg/hlsproxy/types.go +++ b/pkg/hlsproxy/types.go @@ -1,6 +1,15 @@ package hlsproxy -import "net/http" +import ( + "net/http" + "time" +) + +type Config struct { + CacheCleanupPeriod time.Duration // how often should be cache cleanup called + SegmentExpiration time.Duration // how long should be segment kept in memory + PlaylistExpiration time.Duration // how long should be playlist kept in memory +} type Manager interface { Shutdown() From fc3b6fb72e8c1447e0ad60d311eb29c7946d0723 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 10 Dec 2021 21:00:45 +0100 Subject: [PATCH 11/19] use ctx for transcode start. --- internal/api/hlslive.go | 5 +++-- internal/api/http.go | 12 +++++++----- internal/api/router.go | 11 ++++++----- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/internal/api/hlslive.go b/internal/api/hlslive.go index 221ad23..45159f1 100644 --- a/internal/api/hlslive.go +++ b/internal/api/hlslive.go @@ -1,6 +1,7 @@ package api import ( + "context" _ "embed" "fmt" "net/http" @@ -37,7 +38,7 @@ func (a *ApiManagerCtx) HLS(r chi.Router) { } // check if profile exists - profilePath, err := a.ProfilePath("hls", profile) + profilePath, err := a.profilePath("hls", profile) if err != nil { logger.Warn().Err(err).Msg("profile path could not be found") http.Error(w, "404 profile not found", http.StatusNotFound) @@ -51,7 +52,7 @@ func (a *ApiManagerCtx) HLS(r chi.Router) { // create new manager manager = hlslive.New(func() *exec.Cmd { // get transcode cmd - cmd, err := a.transcodeStart(profilePath, input) + cmd, err := a.transcodeStart(context.Background(), profilePath, input) if err != nil { logger.Error().Err(err).Msg("transcode could not be started") } diff --git a/internal/api/http.go b/internal/api/http.go index f190dab..31fb3be 100644 --- a/internal/api/http.go +++ b/internal/api/http.go @@ -21,7 +21,7 @@ func (a *ApiManagerCtx) Http(r chi.Router) { // dummy input for testing purposes file := a.config.AbsPath("profiles", "http-test.sh") - cmd := exec.Command(file) + cmd := exec.CommandContext(r.Context(), file) logger.Info().Msg("command startred") read, write := io.Pipe() @@ -38,6 +38,7 @@ func (a *ApiManagerCtx) Http(r chi.Router) { go func() { _ = cmd.Run() }() + _, _ = io.Copy(w, read) }) @@ -58,14 +59,14 @@ func (a *ApiManagerCtx) Http(r chi.Router) { } // check if profile exists - profilePath, err := a.ProfilePath("hls", profile) + profilePath, err := a.profilePath("hls", profile) if err != nil { logger.Warn().Err(err).Msg("profile path could not be found") http.Error(w, "404 profile not found", http.StatusNotFound) return } - cmd, err := a.transcodeStart(profilePath, input) + cmd, err := a.transcodeStart(r.Context(), profilePath, input) if err != nil { logger.Warn().Err(err).Msg("transcode could not be started") http.Error(w, "500 not available", http.StatusInternalServerError) @@ -89,6 +90,7 @@ func (a *ApiManagerCtx) Http(r chi.Router) { go func() { _ = cmd.Run() }() + _, _ = io.Copy(w, read) }) @@ -110,14 +112,14 @@ func (a *ApiManagerCtx) Http(r chi.Router) { } // check if profile exists - profilePath, err := a.ProfilePath("hls", profile) + profilePath, err := a.profilePath("hls", profile) if err != nil { logger.Warn().Err(err).Msg("profile path could not be found") http.Error(w, "404 profile not found", http.StatusNotFound) return } - cmd, err := a.transcodeStart(profilePath, input) + cmd, err := a.transcodeStart(r.Context(), profilePath, input) if err != nil { logger.Warn().Err(err).Msg("transcode could not be started") http.Error(w, "500 not available", http.StatusInternalServerError) diff --git a/internal/api/router.go b/internal/api/router.go index aad132f..bdff634 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "net/http" "os" @@ -68,7 +69,7 @@ func (a *ApiManagerCtx) Mount(r *chi.Mux) { r.Group(a.Http) } -func (a *ApiManagerCtx) ProfilePath(folder string, profile string) (string, error) { +func (a *ApiManagerCtx) profilePath(folder string, profile string) (string, error) { // [profiles]/hls,http/[profile].sh // [profiles] defaults to [basedir]/profiles @@ -77,19 +78,19 @@ func (a *ApiManagerCtx) ProfilePath(folder string, profile string) (string, erro } profilePath := path.Join(a.config.Profiles, folder, fmt.Sprintf("%s.sh", profile)) - if _, err := os.Stat(profilePath); os.IsNotExist(err) { + if _, err := os.Stat(profilePath); err != nil { return "", err } + return profilePath, nil } -// Call ProfilePath before -func (a *ApiManagerCtx) transcodeStart(profilePath string, input string) (*exec.Cmd, error) { +func (a *ApiManagerCtx) transcodeStart(ctx context.Context, profilePath string, input string) (*exec.Cmd, error) { url, ok := a.config.Streams[input] if !ok { return nil, fmt.Errorf("stream not found") } log.Info().Str("profilePath", profilePath).Str("url", url).Msg("command startred") - return exec.Command(profilePath, url), nil + return exec.CommandContext(ctx, profilePath, url), nil } From ca9c060cffc9b4ae333133375eaa5c8ddf966cdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 24 Dec 2021 00:17:21 +0100 Subject: [PATCH 12/19] fix config in pkg. --- internal/api/hlsvod.go | 4 ++-- pkg/hlslive/manager.go | 15 ++------------ pkg/hlslive/types.go | 19 ++++++++++++++++++ pkg/hlsproxy/manager.go | 14 ++------------ pkg/hlsproxy/types.go | 13 +++++++++++++ pkg/hlsvod/manager.go | 36 ++++++++++------------------------ pkg/hlsvod/types.go | 43 ++++++++++++++++++++++++++++++++++++++--- 7 files changed, 88 insertions(+), 56 deletions(-) diff --git a/internal/api/hlsvod.go b/internal/api/hlsvod.go index e7ff648..9fa9eda 100644 --- a/internal/api/hlsvod.go +++ b/internal/api/hlsvod.go @@ -41,7 +41,7 @@ func (a *ApiManagerCtx) HlsVod(r chi.Router) { // serve master profile if hlsResource == "index.m3u8" { - data, err := hlsvod.New(hlsvod.Config{ + data, err := hlsvod.New(&hlsvod.Config{ MediaPath: vodMediaPath, VideoKeyframes: a.config.Vod.VideoKeyframes, @@ -120,7 +120,7 @@ func (a *ApiManagerCtx) HlsVod(r chi.Router) { } // create new manager - manager = hlsvod.New(hlsvod.Config{ + manager = hlsvod.New(&hlsvod.Config{ MediaPath: vodMediaPath, TranscodeDir: transcodeDir, SegmentPrefix: profileID, diff --git a/pkg/hlslive/manager.go b/pkg/hlslive/manager.go index 1d70edf..f244043 100644 --- a/pkg/hlslive/manager.go +++ b/pkg/hlslive/manager.go @@ -19,10 +19,10 @@ import ( type ManagerCtx struct { logger zerolog.Logger + config Config mu sync.Mutex cmdFactory func() *exec.Cmd active bool - config *Config events struct { onStart func() onCmdLog func(message string) @@ -41,21 +41,10 @@ type ManagerCtx struct { } func New(cmdFactory func() *exec.Cmd, config *Config) *ManagerCtx { - // use default config values - if config == nil { - config = &Config{ - CleanupPeriod: 4 * time.Second, - PlaylistTimeout: 60 * time.Second, - HlsMinimumSegments: 2, - ActiveIdleTimeout: 12 * time.Second, - InactiveIdleTimeout: 24 * time.Second, - } - } - return &ManagerCtx{ logger: log.With().Str("module", "hls").Str("submodule", "manager").Logger(), cmdFactory: cmdFactory, - config: config, + config: config.withDefaultValues(), playlistLoad: make(chan string), shutdown: make(chan interface{}), diff --git a/pkg/hlslive/types.go b/pkg/hlslive/types.go index c3dfae5..8002155 100644 --- a/pkg/hlslive/types.go +++ b/pkg/hlslive/types.go @@ -13,6 +13,25 @@ type Config struct { InactiveIdleTimeout time.Duration // how long must be iactive stream idle to be considered as dead } +func (c Config) withDefaultValues() Config { + if c.CleanupPeriod == 0 { + c.CleanupPeriod = 4 * time.Second + } + if c.PlaylistTimeout == 0 { + c.PlaylistTimeout = 60 * time.Second + } + if c.HlsMinimumSegments == 0 { + c.HlsMinimumSegments = 2 + } + if c.ActiveIdleTimeout == 0 { + c.ActiveIdleTimeout = 12 * time.Second + } + if c.InactiveIdleTimeout == 0 { + c.InactiveIdleTimeout = 24 * time.Second + } + return c +} + type Manager interface { Start() error Stop() diff --git a/pkg/hlsproxy/manager.go b/pkg/hlsproxy/manager.go index 97beeeb..9db7c56 100644 --- a/pkg/hlsproxy/manager.go +++ b/pkg/hlsproxy/manager.go @@ -6,7 +6,6 @@ import ( "regexp" "strings" "sync" - "time" "github.com/m1k1o/go-transcode/internal/utils" @@ -18,7 +17,7 @@ type ManagerCtx struct { logger zerolog.Logger baseUrl string prefix string - config *Config + config Config cache map[string]*utils.Cache cacheMu sync.RWMutex @@ -29,15 +28,6 @@ type ManagerCtx struct { } func New(baseUrl string, prefix string, config *Config) *ManagerCtx { - // use default config values - if config == nil { - config = &Config{ - CacheCleanupPeriod: 4 * time.Second, - SegmentExpiration: 60 * time.Second, - PlaylistExpiration: 1 * time.Second, - } - } - // ensure it ends with slash baseUrl = strings.TrimSuffix(baseUrl, "/") baseUrl += "/" @@ -46,7 +36,7 @@ func New(baseUrl string, prefix string, config *Config) *ManagerCtx { logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(), baseUrl: baseUrl, prefix: prefix, - config: config, + config: config.withDefaultValues(), cache: map[string]*utils.Cache{}, } } diff --git a/pkg/hlsproxy/types.go b/pkg/hlsproxy/types.go index 3936bba..c21a415 100644 --- a/pkg/hlsproxy/types.go +++ b/pkg/hlsproxy/types.go @@ -11,6 +11,19 @@ type Config struct { PlaylistExpiration time.Duration // how long should be playlist kept in memory } +func (c Config) withDefaultValues() Config { + if c.CacheCleanupPeriod == 0 { + c.CacheCleanupPeriod = 4 * time.Second + } + if c.SegmentExpiration == 0 { + c.SegmentExpiration = 60 * time.Second + } + if c.PlaylistExpiration == 0 { + c.PlaylistExpiration = 1 * time.Second + } + return c +} + type Manager interface { Shutdown() diff --git a/pkg/hlsvod/manager.go b/pkg/hlsvod/manager.go index b88f5a4..3e1b987 100644 --- a/pkg/hlsvod/manager.go +++ b/pkg/hlsvod/manager.go @@ -18,21 +18,10 @@ import ( "github.com/rs/zerolog/log" ) -// how long can it take for transcode to be ready -const readyTimeout = 80 * time.Second - -// how long can it take for transcode to return first data -const transcodeTimeout = 10 * time.Second - type ManagerCtx struct { logger zerolog.Logger config Config - segmentLength float64 - segmentOffset float64 - segmentBufferMin int // minimum segments available after playing head - segmentBufferMax int // maximum segments to be transcoded at once - ready bool readyMu sync.RWMutex readyChan chan struct{} @@ -51,17 +40,12 @@ type ManagerCtx struct { cancel context.CancelFunc } -func New(config Config) *ManagerCtx { +func New(config *Config) *ManagerCtx { ctx, cancel := context.WithCancel(context.Background()) + return &ManagerCtx{ logger: log.With().Str("module", "hlsvod").Str("submodule", "manager").Logger(), - config: config, - - segmentLength: 3.50, - segmentOffset: 1.25, - segmentBufferMin: 3, - segmentBufferMax: 5, - + config: config.withDefaultValues(), ctx: ctx, cancel: cancel, } @@ -121,7 +105,7 @@ func (m *ManagerCtx) httpEnsureReady(w http.ResponseWriter) bool { m.logger.Warn().Msg("manager load failed because of shutdown") http.Error(w, "500 manager not available", http.StatusInternalServerError) return false - case <-time.After(readyTimeout): + case <-time.After(m.config.ReadyTimeout): m.logger.Warn().Msg("manager load timeouted") http.Error(w, "504 manager timeout", http.StatusGatewayTimeout) return false @@ -228,7 +212,7 @@ func (m *ManagerCtx) getPlaylist() string { "#EXT-X-VERSION:4", "#EXT-X-PLAYLIST-TYPE:VOD", "#EXT-X-MEDIA-SEQUENCE:0", - fmt.Sprintf("#EXT-X-TARGETDURATION:%.2f", m.segmentLength+m.segmentOffset), + fmt.Sprintf("#EXT-X-TARGETDURATION:%.2f", m.config.SegmentLength+m.config.SegmentOffset), } // playlist segments @@ -255,7 +239,7 @@ func (m *ManagerCtx) initialize() { } // generate breakpoints from keyframes - m.breakpoints = convertToSegments(keyframes, m.metadata.Duration, m.segmentLength, m.segmentOffset) + m.breakpoints = convertToSegments(keyframes, m.metadata.Duration, m.config.SegmentLength, m.config.SegmentOffset) // generate playlist m.playlist = m.getPlaylist() @@ -420,8 +404,8 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error { func (m *ManagerCtx) transcodeFromSegment(index int) error { segmentsTotal := len(m.segments) - if index+m.segmentBufferMax < segmentsTotal { - segmentsTotal = index + m.segmentBufferMax + if index+m.config.SegmentBufferMax < segmentsTotal { + segmentsTotal = index + m.config.SegmentBufferMax } offset, limit := 0, 0 @@ -444,7 +428,7 @@ func (m *ManagerCtx) transcodeFromSegment(index int) error { } // if offset is greater than our minimal offset, we have enough segments available - if offset > m.segmentBufferMin { + if offset > m.config.SegmentBufferMin { return nil } @@ -562,7 +546,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { m.logger.Warn().Msg("media transcode failed because of shutdown") http.Error(w, "500 media not available", http.StatusInternalServerError) return - case <-time.After(transcodeTimeout): + case <-time.After(m.config.TranscodeTimeout): m.logger.Warn().Msg("media transcode timeouted") http.Error(w, "504 media timeout", http.StatusGatewayTimeout) return diff --git a/pkg/hlsvod/types.go b/pkg/hlsvod/types.go index 7c4e651..ac1fd3f 100644 --- a/pkg/hlsvod/types.go +++ b/pkg/hlsvod/types.go @@ -3,11 +3,12 @@ package hlsvod import ( "context" "net/http" + "time" ) type Config struct { - MediaPath string // Transcoded video input. - TranscodeDir string // Temporary directory to store transcoded elements. + MediaPath string // transcoded video input. + TranscodeDir string // temporary directory to store transcoded elements. SegmentPrefix string VideoProfile *VideoProfile @@ -15,10 +16,46 @@ type Config struct { AudioProfile *AudioProfile Cache bool - CacheDir string // If not empty, cache will folder will be used instead of media path + CacheDir string // if not empty, cache will folder will be used instead of media path FFmpegBinary string FFprobeBinary string + + ReadyTimeout time.Duration // how long can it take for transcode to be ready + TranscodeTimeout time.Duration // how long can it take for transcode to be ready + + SegmentLength float64 + SegmentOffset float64 // maximim segment length deviation + SegmentBufferMin int // minimum segments available after playing head + SegmentBufferMax int // maximum segments to be transcoded at once +} + +func (c Config) withDefaultValues() Config { + if c.FFmpegBinary == "" { + c.FFmpegBinary = "ffmpeg" + } + if c.FFprobeBinary == "" { + c.FFprobeBinary = "ffprobe" + } + if c.ReadyTimeout == 0 { + c.ReadyTimeout = 80 * time.Second + } + if c.TranscodeTimeout == 0 { + c.TranscodeTimeout = 10 * time.Second + } + if c.SegmentLength == 0 { + c.SegmentLength = 3.50 + } + if c.SegmentOffset == 0 { + c.SegmentOffset = 1.25 + } + if c.SegmentBufferMin == 0 { + c.SegmentBufferMin = 3 + } + if c.SegmentBufferMax == 0 { + c.SegmentBufferMax = 5 + } + return c } type Manager interface { From fed18ec71c80f8e39a7b49691af63613c6e961e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 24 Dec 2021 00:48:29 +0100 Subject: [PATCH 13/19] hls proxy move to config. --- internal/api/hlsproxy.go | 7 +++++-- pkg/hlsproxy/manager.go | 33 +++++++++++++-------------------- pkg/hlsproxy/types.go | 20 +++++++++++++++++++- 3 files changed, 37 insertions(+), 23 deletions(-) diff --git a/internal/api/hlsproxy.go b/internal/api/hlsproxy.go index b2f07b8..2d4f3e3 100644 --- a/internal/api/hlsproxy.go +++ b/internal/api/hlsproxy.go @@ -27,7 +27,10 @@ func (a *ApiManagerCtx) HLSProxy(r chi.Router) { manager, ok := hlsProxyManagers[ID] if !ok { // create new manager - manager = hlsproxy.New(baseUrl, hlsProxyPerfix+ID+"/", nil) + manager = hlsproxy.New(&hlsproxy.Config{ + PlaylistBaseUrl: baseUrl, + PlaylistPrefix: hlsProxyPerfix + ID, + }) hlsProxyManagers[ID] = manager } @@ -35,7 +38,7 @@ func (a *ApiManagerCtx) HLSProxy(r chi.Router) { if strings.HasSuffix(r.URL.String(), ".m3u8") { manager.ServePlaylist(w, r) } else { - manager.ServeMedia(w, r) + manager.ServeSegment(w, r) } }) } diff --git a/pkg/hlsproxy/manager.go b/pkg/hlsproxy/manager.go index 9db7c56..6450414 100644 --- a/pkg/hlsproxy/manager.go +++ b/pkg/hlsproxy/manager.go @@ -14,10 +14,8 @@ import ( ) type ManagerCtx struct { - logger zerolog.Logger - baseUrl string - prefix string - config Config + logger zerolog.Logger + config Config cache map[string]*utils.Cache cacheMu sync.RWMutex @@ -27,17 +25,11 @@ type ManagerCtx struct { shutdown chan struct{} } -func New(baseUrl string, prefix string, config *Config) *ManagerCtx { - // ensure it ends with slash - baseUrl = strings.TrimSuffix(baseUrl, "/") - baseUrl += "/" - +func New(config *Config) *ManagerCtx { return &ManagerCtx{ - logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(), - baseUrl: baseUrl, - prefix: prefix, - config: config.withDefaultValues(), - cache: map[string]*utils.Cache{}, + logger: log.With().Str("module", "hlsproxy").Str("submodule", "manager").Logger(), + config: config.withDefaultValues(), + cache: map[string]*utils.Cache{}, } } @@ -46,7 +38,7 @@ func (m *ManagerCtx) Shutdown() { } func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { - url := m.baseUrl + strings.TrimPrefix(r.URL.String(), m.prefix) + url := m.config.PlaylistBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.PlaylistPrefix) cache, ok := m.getFromCache(url) if !ok { @@ -73,8 +65,9 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { return } - var re = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`) - text := re.ReplaceAllString(string(buf), m.prefix) + // TODO: Handle relative paths. + text := string(buf) + text = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`).ReplaceAllString(text, m.config.SegmentPrefix) cache = m.saveToCache(url, strings.NewReader(text), m.config.PlaylistExpiration) } @@ -85,14 +78,14 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { cache.ServeHTTP(w) } -func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { - url := m.baseUrl + strings.TrimPrefix(r.URL.String(), m.prefix) +func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { + url := m.config.SegmentBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.SegmentPrefix) cache, ok := m.getFromCache(url) if !ok { resp, err := http.Get(url) if err != nil { - m.logger.Err(err).Msg("unable to get HTTP") + m.logger.Err(err).Str("url", url).Msg("unable to get HTTP") http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return } diff --git a/pkg/hlsproxy/types.go b/pkg/hlsproxy/types.go index c21a415..a036196 100644 --- a/pkg/hlsproxy/types.go +++ b/pkg/hlsproxy/types.go @@ -2,16 +2,28 @@ package hlsproxy import ( "net/http" + "strings" "time" ) type Config struct { + PlaylistBaseUrl string + PlaylistPrefix string + SegmentBaseUrl string // optional: will be used playlist value if empty + SegmentPrefix string // optional: will be used playlist value if empty + CacheCleanupPeriod time.Duration // how often should be cache cleanup called SegmentExpiration time.Duration // how long should be segment kept in memory PlaylistExpiration time.Duration // how long should be playlist kept in memory } func (c Config) withDefaultValues() Config { + if c.SegmentBaseUrl == "" { + c.SegmentBaseUrl = c.PlaylistBaseUrl + } + if c.SegmentPrefix == "" { + c.SegmentPrefix = c.PlaylistPrefix + } if c.CacheCleanupPeriod == 0 { c.CacheCleanupPeriod = 4 * time.Second } @@ -21,6 +33,12 @@ func (c Config) withDefaultValues() Config { if c.PlaylistExpiration == 0 { c.PlaylistExpiration = 1 * time.Second } + // ensure it ends with single / + c.PlaylistBaseUrl = strings.TrimRight(c.PlaylistBaseUrl, "/") + "/" + c.SegmentBaseUrl = strings.TrimRight(c.SegmentBaseUrl, "/") + "/" + // ensure it starts and ends with single / + c.PlaylistPrefix = "/" + strings.Trim(c.PlaylistPrefix, "/") + "/" + c.SegmentPrefix = "/" + strings.Trim(c.SegmentPrefix, "/") + "/" return c } @@ -28,5 +46,5 @@ type Manager interface { Shutdown() ServePlaylist(w http.ResponseWriter, r *http.Request) - ServeMedia(w http.ResponseWriter, r *http.Request) + ServeSegment(w http.ResponseWriter, r *http.Request) } From fe7456be1733da53b4d0b5fdcec9ccbb562080a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 24 Dec 2021 00:48:51 +0100 Subject: [PATCH 14/19] rename to serve segment. --- internal/api/hlslive.go | 2 +- internal/api/hlsvod.go | 2 +- pkg/hlslive/manager.go | 6 +++--- pkg/hlslive/types.go | 2 +- pkg/hlsvod/manager.go | 24 ++++++++++++------------ pkg/hlsvod/types.go | 2 +- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/api/hlslive.go b/internal/api/hlslive.go index 45159f1..40bb494 100644 --- a/internal/api/hlslive.go +++ b/internal/api/hlslive.go @@ -84,7 +84,7 @@ func (a *ApiManagerCtx) HLS(r chi.Router) { return } - manager.ServeMedia(w, r) + manager.ServeSegment(w, r) }) r.Get("/{profile}/{input}/play.html", func(w http.ResponseWriter, r *http.Request) { diff --git a/internal/api/hlsvod.go b/internal/api/hlsvod.go index 9fa9eda..f178bf1 100644 --- a/internal/api/hlsvod.go +++ b/internal/api/hlsvod.go @@ -155,7 +155,7 @@ func (a *ApiManagerCtx) HlsVod(r chi.Router) { if hlsResource == profileID+".m3u8" { manager.ServePlaylist(w, r) } else { - manager.ServeMedia(w, r) + manager.ServeSegment(w, r) } }) } diff --git a/pkg/hlslive/manager.go b/pkg/hlslive/manager.go index f244043..4db349c 100644 --- a/pkg/hlslive/manager.go +++ b/pkg/hlslive/manager.go @@ -255,13 +255,13 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(playlist)) } -func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { +func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { fileName := path.Base(r.URL.RequestURI()) path := path.Join(m.tempdir, fileName) if _, err := os.Stat(path); os.IsNotExist(err) { - m.logger.Warn().Str("path", path).Msg("media file not found") - http.Error(w, "404 media not found", http.StatusNotFound) + m.logger.Warn().Str("path", path).Msg("segment not found") + http.Error(w, "404 segment not found", http.StatusNotFound) return } diff --git a/pkg/hlslive/types.go b/pkg/hlslive/types.go index 8002155..bc6c2e9 100644 --- a/pkg/hlslive/types.go +++ b/pkg/hlslive/types.go @@ -38,7 +38,7 @@ type Manager interface { Cleanup() ServePlaylist(w http.ResponseWriter, r *http.Request) - ServeMedia(w http.ResponseWriter, r *http.Request) + ServeSegment(w http.ResponseWriter, r *http.Request) OnStart(event func()) OnCmdLog(event func(message string)) diff --git a/pkg/hlsvod/manager.go b/pkg/hlsvod/manager.go index 3e1b987..e0dfedc 100644 --- a/pkg/hlsvod/manager.go +++ b/pkg/hlsvod/manager.go @@ -489,7 +489,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { _, _ = w.Write([]byte(m.playlist)) } -func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { +func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { // ensure that manager started if !m.httpEnsureReady(w) { return @@ -501,7 +501,7 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { // getting index from segment name index, ok := m.parseSegmentIndex(reqSegName) if !ok { - http.Error(w, "400 bad media path", http.StatusBadRequest) + http.Error(w, "400 bad segment path", http.StatusBadRequest) return } @@ -514,8 +514,8 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { // try to transcode from current segment if err := m.transcodeFromSegment(index); err != nil { - m.logger.Err(err).Int("index", index).Msg("unable to transcode media") - http.Error(w, "500 unable to transcode", http.StatusInternalServerError) + m.logger.Err(err).Int("index", index).Msg("unable to transcode segment") + http.Error(w, "500 unable to transcode segment", http.StatusInternalServerError) return } @@ -525,8 +525,8 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { segChan, ok := m.waitForSegment(index) if !ok { // this should never happen - m.logger.Error().Int("index", index).Msg("media not queued even after transcode") - http.Error(w, "409 media not queued even after transcode", http.StatusConflict) + m.logger.Error().Int("index", index).Msg("segment not queued even after transcode") + http.Error(w, "409 segment not queued even after transcode", http.StatusConflict) return } @@ -543,20 +543,20 @@ func (m *ManagerCtx) ServeMedia(w http.ResponseWriter, r *http.Request) { } // when transcode stops before getting ready case <-m.ctx.Done(): - m.logger.Warn().Msg("media transcode failed because of shutdown") - http.Error(w, "500 media not available", http.StatusInternalServerError) + m.logger.Warn().Msg("segment transcode failed because of shutdown") + http.Error(w, "500 segment not available", http.StatusInternalServerError) return case <-time.After(m.config.TranscodeTimeout): - m.logger.Warn().Msg("media transcode timeouted") - http.Error(w, "504 media timeout", http.StatusGatewayTimeout) + m.logger.Warn().Msg("segment transcode timeouted") + http.Error(w, "504 segment transcode timeout", http.StatusGatewayTimeout) return } } // check if segment is on the disk if _, err := os.Stat(segmentPath); os.IsNotExist(err) { - m.logger.Warn().Int("index", index).Str("path", segmentPath).Msg("media file not found") - http.Error(w, "404 media not found", http.StatusNotFound) + m.logger.Warn().Int("index", index).Str("path", segmentPath).Msg("segment file not found") + http.Error(w, "404 segment not found", http.StatusNotFound) return } diff --git a/pkg/hlsvod/types.go b/pkg/hlsvod/types.go index ac1fd3f..d662c5d 100644 --- a/pkg/hlsvod/types.go +++ b/pkg/hlsvod/types.go @@ -64,5 +64,5 @@ type Manager interface { Preload(ctx context.Context) (*ProbeMediaData, error) ServePlaylist(w http.ResponseWriter, r *http.Request) - ServeMedia(w http.ResponseWriter, r *http.Request) + ServeSegment(w http.ResponseWriter, r *http.Request) } From d2452bffd3afef1983bbfa9e687b3217dff4d464 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 24 Dec 2021 00:51:52 +0100 Subject: [PATCH 15/19] CacheFileExt to config. --- pkg/hlsvod/cache.go | 10 ++++------ pkg/hlsvod/types.go | 8 ++++++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/hlsvod/cache.go b/pkg/hlsvod/cache.go index c6dc232..6ca141f 100644 --- a/pkg/hlsvod/cache.go +++ b/pkg/hlsvod/cache.go @@ -7,11 +7,9 @@ import ( "path" ) -const cacheFileSuffix = ".go-transcode-cache" - func (m *ManagerCtx) getCacheData() ([]byte, error) { // check for local cache - localCachePath := m.config.MediaPath + cacheFileSuffix + localCachePath := m.config.MediaPath + m.config.CacheFileExt if _, err := os.Stat(localCachePath); err == nil { m.logger.Info().Str("path", localCachePath).Msg("media local cache hit") return os.ReadFile(localCachePath) @@ -22,7 +20,7 @@ func (m *ManagerCtx) getCacheData() ([]byte, error) { h.Write([]byte(m.config.MediaPath)) hash := h.Sum(nil) - fileName := fmt.Sprintf("%x%s", hash, cacheFileSuffix) + fileName := fmt.Sprintf("%x%s", hash, m.config.CacheFileExt) globalCachePath := path.Join(m.config.CacheDir, fileName) if _, err := os.Stat(globalCachePath); err == nil { m.logger.Info().Str("path", globalCachePath).Msg("media global cache hit") @@ -33,7 +31,7 @@ func (m *ManagerCtx) getCacheData() ([]byte, error) { } func (m *ManagerCtx) saveLocalCacheData(data []byte) error { - localCachePath := m.config.MediaPath + cacheFileSuffix + localCachePath := m.config.MediaPath + m.config.CacheFileExt return os.WriteFile(localCachePath, data, 0755) } @@ -42,7 +40,7 @@ func (m *ManagerCtx) saveGlobalCacheData(data []byte) error { h.Write([]byte(m.config.MediaPath)) hash := h.Sum(nil) - fileName := fmt.Sprintf("%x%s", hash, cacheFileSuffix) + fileName := fmt.Sprintf("%x%s", hash, m.config.CacheFileExt) globalCachePath := path.Join(m.config.CacheDir, fileName) return os.WriteFile(globalCachePath, data, 0755) } diff --git a/pkg/hlsvod/types.go b/pkg/hlsvod/types.go index d662c5d..aa609ae 100644 --- a/pkg/hlsvod/types.go +++ b/pkg/hlsvod/types.go @@ -15,8 +15,9 @@ type Config struct { VideoKeyframes bool AudioProfile *AudioProfile - Cache bool - CacheDir string // if not empty, cache will folder will be used instead of media path + Cache bool + CacheDir string // if not empty, cache will folder will be used instead of media path + CacheFileExt string // extension of cache files created by this module FFmpegBinary string FFprobeBinary string @@ -31,6 +32,9 @@ type Config struct { } func (c Config) withDefaultValues() Config { + if c.CacheFileExt == "" { + c.FFmpegBinary = ".go-transcode-cache" + } if c.FFmpegBinary == "" { c.FFmpegBinary = "ffmpeg" } From 3383f49ea4c0fbe7df3f846d4d03c570ae9527ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Fri, 24 Dec 2021 01:09:40 +0100 Subject: [PATCH 16/19] fix segment prefix naming. --- internal/api/hlsproxy.go | 4 ++-- internal/api/hlsvod.go | 6 +++--- pkg/hlsproxy/manager.go | 6 +++--- pkg/hlsproxy/types.go | 16 ++++++++-------- pkg/hlsvod/manager.go | 7 ++++--- pkg/hlsvod/types.go | 6 +++--- 6 files changed, 23 insertions(+), 22 deletions(-) diff --git a/internal/api/hlsproxy.go b/internal/api/hlsproxy.go index 2d4f3e3..107fdc2 100644 --- a/internal/api/hlsproxy.go +++ b/internal/api/hlsproxy.go @@ -28,8 +28,8 @@ func (a *ApiManagerCtx) HLSProxy(r chi.Router) { if !ok { // create new manager manager = hlsproxy.New(&hlsproxy.Config{ - PlaylistBaseUrl: baseUrl, - PlaylistPrefix: hlsProxyPerfix + ID, + PlaylistBaseUrl: baseUrl, + PlaylistPathPrefix: hlsProxyPerfix + ID, }) hlsProxyManagers[ID] = manager } diff --git a/internal/api/hlsvod.go b/internal/api/hlsvod.go index f178bf1..6c4d318 100644 --- a/internal/api/hlsvod.go +++ b/internal/api/hlsvod.go @@ -121,9 +121,9 @@ func (a *ApiManagerCtx) HlsVod(r chi.Router) { // create new manager manager = hlsvod.New(&hlsvod.Config{ - MediaPath: vodMediaPath, - TranscodeDir: transcodeDir, - SegmentPrefix: profileID, + MediaPath: vodMediaPath, + TranscodeDir: transcodeDir, + SegmentNamePrefix: profileID, VideoProfile: &hlsvod.VideoProfile{ Width: profile.Width, diff --git a/pkg/hlsproxy/manager.go b/pkg/hlsproxy/manager.go index 6450414..4be1cf1 100644 --- a/pkg/hlsproxy/manager.go +++ b/pkg/hlsproxy/manager.go @@ -38,7 +38,7 @@ func (m *ManagerCtx) Shutdown() { } func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { - url := m.config.PlaylistBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.PlaylistPrefix) + url := m.config.PlaylistBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.PlaylistPathPrefix) cache, ok := m.getFromCache(url) if !ok { @@ -67,7 +67,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { // TODO: Handle relative paths. text := string(buf) - text = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`).ReplaceAllString(text, m.config.SegmentPrefix) + text = regexp.MustCompile(`(?m:^(https?\:\/\/[^\/]+)?\/)`).ReplaceAllString(text, m.config.SegmentPathPrefix) cache = m.saveToCache(url, strings.NewReader(text), m.config.PlaylistExpiration) } @@ -79,7 +79,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { } func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { - url := m.config.SegmentBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.SegmentPrefix) + url := m.config.SegmentBaseUrl + strings.TrimPrefix(r.URL.String(), m.config.SegmentPathPrefix) cache, ok := m.getFromCache(url) if !ok { diff --git a/pkg/hlsproxy/types.go b/pkg/hlsproxy/types.go index a036196..5a8a010 100644 --- a/pkg/hlsproxy/types.go +++ b/pkg/hlsproxy/types.go @@ -7,10 +7,10 @@ import ( ) type Config struct { - PlaylistBaseUrl string - PlaylistPrefix string - SegmentBaseUrl string // optional: will be used playlist value if empty - SegmentPrefix string // optional: will be used playlist value if empty + PlaylistBaseUrl string + PlaylistPathPrefix string + SegmentBaseUrl string // optional: will be used playlist value if empty + SegmentPathPrefix string // optional: will be used playlist value if empty CacheCleanupPeriod time.Duration // how often should be cache cleanup called SegmentExpiration time.Duration // how long should be segment kept in memory @@ -21,8 +21,8 @@ func (c Config) withDefaultValues() Config { if c.SegmentBaseUrl == "" { c.SegmentBaseUrl = c.PlaylistBaseUrl } - if c.SegmentPrefix == "" { - c.SegmentPrefix = c.PlaylistPrefix + if c.SegmentPathPrefix == "" { + c.SegmentPathPrefix = c.PlaylistPathPrefix } if c.CacheCleanupPeriod == 0 { c.CacheCleanupPeriod = 4 * time.Second @@ -37,8 +37,8 @@ func (c Config) withDefaultValues() Config { c.PlaylistBaseUrl = strings.TrimRight(c.PlaylistBaseUrl, "/") + "/" c.SegmentBaseUrl = strings.TrimRight(c.SegmentBaseUrl, "/") + "/" // ensure it starts and ends with single / - c.PlaylistPrefix = "/" + strings.Trim(c.PlaylistPrefix, "/") + "/" - c.SegmentPrefix = "/" + strings.Trim(c.SegmentPrefix, "/") + "/" + c.PlaylistPathPrefix = "/" + strings.Trim(c.PlaylistPathPrefix, "/") + "/" + c.SegmentPathPrefix = "/" + strings.Trim(c.SegmentPathPrefix, "/") + "/" return c } diff --git a/pkg/hlsvod/manager.go b/pkg/hlsvod/manager.go index e0dfedc..6734fcf 100644 --- a/pkg/hlsvod/manager.go +++ b/pkg/hlsvod/manager.go @@ -185,14 +185,14 @@ func (m *ManagerCtx) loadMetadata(ctx context.Context) error { } func (m *ManagerCtx) getSegmentName(index int) string { - return fmt.Sprintf("%s-%05d.ts", m.config.SegmentPrefix, index) + return fmt.Sprintf("%s-%05d.ts", m.config.SegmentNamePrefix, index) } func (m *ManagerCtx) parseSegmentIndex(segmentName string) (int, bool) { regex := regexp.MustCompile(`^(.*)-([0-9]{5})\.ts$`) matches := regex.FindStringSubmatch(segmentName) - if len(matches) != 3 || matches[1] != m.config.SegmentPrefix { + if len(matches) != 3 || matches[1] != m.config.SegmentNamePrefix { return 0, false } @@ -355,7 +355,8 @@ func (m *ManagerCtx) transcodeSegments(offset, limit int) error { segments, err := TranscodeSegments(m.ctx, m.config.FFmpegBinary, TranscodeConfig{ InputFilePath: m.config.MediaPath, OutputDirPath: m.config.TranscodeDir, - SegmentPrefix: m.config.SegmentPrefix, // This does not need to match. + // This does not need to be the same as chosen prefix. + SegmentPrefix: m.config.SegmentNamePrefix, VideoProfile: m.config.VideoProfile, AudioProfile: m.config.AudioProfile, diff --git a/pkg/hlsvod/types.go b/pkg/hlsvod/types.go index aa609ae..938e6f4 100644 --- a/pkg/hlsvod/types.go +++ b/pkg/hlsvod/types.go @@ -7,9 +7,9 @@ import ( ) type Config struct { - MediaPath string // transcoded video input. - TranscodeDir string // temporary directory to store transcoded elements. - SegmentPrefix string + MediaPath string // transcoded video input + TranscodeDir string // temporary directory to store transcoded elements + SegmentNamePrefix string // e.g. prefix-000001.ts VideoProfile *VideoProfile VideoKeyframes bool From ad1f160b75d8a42775cbc87c7733aa98fc07a64f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Mon, 27 Dec 2021 20:09:40 +0100 Subject: [PATCH 17/19] modify cache to use io writer. --- internal/utils/cache.go | 27 +++++++++++++++++++-------- pkg/hlsproxy/cache.go | 4 ++-- pkg/hlsproxy/manager.go | 4 ++-- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/internal/utils/cache.go b/internal/utils/cache.go index 141a94b..637af74 100644 --- a/internal/utils/cache.go +++ b/internal/utils/cache.go @@ -2,7 +2,6 @@ package utils import ( "io" - "net/http" "sync" "time" ) @@ -17,13 +16,13 @@ type Cache struct { listeners []func([]byte) (int, error) listenersMu sync.RWMutex - Expires time.Time + expires time.Time } func NewCache(expires time.Time) *Cache { return &Cache{ closeCh: make(chan struct{}), - Expires: expires, + expires: expires, } } @@ -64,9 +63,8 @@ func (c *Cache) Close() error { return nil } -func (c *Cache) ServeHTTP(w http.ResponseWriter) { - offset := 0 - index := 0 +func (c *Cache) CopyTo(w io.Writer) error { + offset, index := 0, 0 for { c.mu.RLock() @@ -79,7 +77,11 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) { chunk := c.chunks[index] c.mu.RUnlock() - i, _ := w.Write(chunk) + i, err := w.Write(chunk) + if err != nil { + return err + } + offset += i index++ continue @@ -87,7 +89,11 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) { // if stream is already closed if closed { - return + var err error + if closer, ok := w.(io.WriteCloser); ok { + err = closer.Close() + } + return err } // we don't have enough data but stream is not closed @@ -101,4 +107,9 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) { // wait until it finishes <-c.closeCh + return nil +} + +func (c *Cache) Expired() bool { + return time.Now().After(c.expires) } diff --git a/pkg/hlsproxy/cache.go b/pkg/hlsproxy/cache.go index 9ca21c8..cd5aeeb 100644 --- a/pkg/hlsproxy/cache.go +++ b/pkg/hlsproxy/cache.go @@ -19,7 +19,7 @@ func (m *ManagerCtx) getFromCache(key string) (*utils.Cache, bool) { } // if cache has expired - if time.Now().After(entry.Expires) { + if entry.Expired() { return nil, false } @@ -61,7 +61,7 @@ func (m *ManagerCtx) clearCache() { m.cacheMu.Lock() for key, entry := range m.cache { // remove expired entries - if time.Now().After(entry.Expires) { + if entry.Expired() { delete(m.cache, key) m.logger.Debug().Str("key", key).Msg("cache cleanup remove expired") } else { diff --git a/pkg/hlsproxy/manager.go b/pkg/hlsproxy/manager.go index 4be1cf1..5d02236 100644 --- a/pkg/hlsproxy/manager.go +++ b/pkg/hlsproxy/manager.go @@ -75,7 +75,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/vnd.apple.mpegurl") w.WriteHeader(200) - cache.ServeHTTP(w) + cache.CopyTo(w) } func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { @@ -104,5 +104,5 @@ func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "video/MP2T") w.WriteHeader(200) - cache.ServeHTTP(w) + cache.CopyTo(w) } From 310c4dbe7d96a742a177e07810ca0f1e6d84090a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Mon, 27 Dec 2021 23:47:58 +0100 Subject: [PATCH 18/19] add modules. --- README.md | 5 +- modules/hlslive/module.go | 115 ++++++++++++++++++++++ modules/hlslive/types.go | 18 ++++ modules/hlsproxy/module.go | 102 ++++++++++++++++++++ modules/hlsproxy/types.go | 19 ++++ modules/hlsvod/module.go | 181 +++++++++++++++++++++++++++++++++++ modules/hlsvod/types.go | 26 +++++ modules/httpstream/module.go | 145 ++++++++++++++++++++++++++++ modules/httpstream/types.go | 11 +++ modules/player/module.go | 50 ++++++++++ modules/player/player.html | 31 ++++++ modules/player/types.go | 12 +++ modules/types.go | 13 +++ 13 files changed, 726 insertions(+), 2 deletions(-) create mode 100644 modules/hlslive/module.go create mode 100644 modules/hlslive/types.go create mode 100644 modules/hlsproxy/module.go create mode 100644 modules/hlsproxy/types.go create mode 100644 modules/hlsvod/module.go create mode 100644 modules/hlsvod/types.go create mode 100644 modules/httpstream/module.go create mode 100644 modules/httpstream/types.go create mode 100644 modules/player/module.go create mode 100644 modules/player/player.html create mode 100644 modules/player/types.go create mode 100644 modules/types.go diff --git a/README.md b/README.md index 84cc234..bd65b03 100644 --- a/README.md +++ b/README.md @@ -152,8 +152,9 @@ The source code is in the following files/folders: - `.docker`: for docker support. - `cmd/` and `main.go`: source for the command-line interface. -- `internal/`: internal source code modules. -- `pkg/`: external modules as lib. +- `internal/`: internal source code. +- `modules/`: standalone plug'n'play modules. +- `pkg/`: external packages ready to be reused. *TODO: document different modules/packages and dependencies* diff --git a/modules/hlslive/module.go b/modules/hlslive/module.go new file mode 100644 index 0000000..764cf55 --- /dev/null +++ b/modules/hlslive/module.go @@ -0,0 +1,115 @@ +package hlslive + +import ( + "fmt" + "net/http" + "os" + "os/exec" + "path" + "regexp" + "strings" + + "github.com/m1k1o/go-transcode/pkg/hlslive" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config + + managers map[string]hlslive.Manager +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "hlslive").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + + managers: make(map[string]hlslive.Manager), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + for _, manager := range m.managers { + manager.Stop() + } +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading and ending / + p = strings.Trim(p, "/") + // split path to parts + s := strings.Split(p, "/") + + // we need exactly three parts of the url + if len(s) != 3 { + http.NotFound(w, r) + return + } + + // {source}/{profile}/{resource} + sourceName, profileName, resource := s[0], s[1], s[2] + + // check if parameters match regex + if !resourceRegex.MatchString(sourceName) || + !resourceRegex.MatchString(profileName) { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + ID := fmt.Sprintf("%s/%s", sourceName, profileName) + manager, ok := m.managers[ID] + if !ok { + // find relevant source + source, ok := m.config.Sources[sourceName] + if !ok { + http.Error(w, "404 source not found", http.StatusNotFound) + return + } + + // check if exists profile path + profilePath := path.Join(m.config.ProfilesPath, profileName) + if _, err := os.Stat(profilePath); err != nil { + http.Error(w, "404 profile not found", http.StatusNotFound) + return + } + + // create new manager + manager = hlslive.New(func() *exec.Cmd { + return exec.Command(profilePath, source) + }, &m.config.Config) + + m.managers[ID] = manager + } + + if resource == m.config.PlaylistName { + manager.ServePlaylist(w, r) + } else { + manager.ServeSegment(w, r) + } +} diff --git a/modules/hlslive/types.go b/modules/hlslive/types.go new file mode 100644 index 0000000..041872c --- /dev/null +++ b/modules/hlslive/types.go @@ -0,0 +1,18 @@ +package hlslive + +import "github.com/m1k1o/go-transcode/pkg/hlslive" + +type Config struct { + hlslive.Config + + Sources map[string]string + ProfilesPath string + PlaylistName string +} + +func (c Config) withDefaultValues() Config { + if c.PlaylistName == "" { + c.PlaylistName = "index.m3u8" + } + return c +} diff --git a/modules/hlsproxy/module.go b/modules/hlsproxy/module.go new file mode 100644 index 0000000..79e17bf --- /dev/null +++ b/modules/hlsproxy/module.go @@ -0,0 +1,102 @@ +package hlsproxy + +import ( + "net/http" + "regexp" + "strings" + + "github.com/m1k1o/go-transcode/pkg/hlsproxy" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config + + managers map[string]hlsproxy.Manager +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "hlsproxy").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + + managers: make(map[string]hlsproxy.Manager), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + for _, manager := range m.managers { + manager.Shutdown() + } +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading / + p = strings.TrimLeft(p, "/") + // split path to parts + s := strings.Split(p, "/") + + // we need at least first part of the url + if len(s) == 0 { + http.NotFound(w, r) + return + } + + sourceName := s[0] + + // check if parameters match regex + if !resourceRegex.MatchString(sourceName) { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + manager, ok := m.managers[sourceName] + if !ok { + // find relevant source + source, ok := m.config.Sources[sourceName] + if !ok { + http.Error(w, "404 source not found", http.StatusNotFound) + return + } + + config := m.config.Config + config.PlaylistBaseUrl = source + config.PlaylistPathPrefix = strings.TrimRight(m.pathPrefix, "/") + sourceName + + // create new manager + manager = hlsproxy.New(&config) + m.managers[sourceName] = manager + } + + if strings.HasSuffix(r.URL.Path, ".m3u8") { + manager.ServePlaylist(w, r) + } else { + manager.ServeSegment(w, r) + } +} diff --git a/modules/hlsproxy/types.go b/modules/hlsproxy/types.go new file mode 100644 index 0000000..69cb1d0 --- /dev/null +++ b/modules/hlsproxy/types.go @@ -0,0 +1,19 @@ +package hlsproxy + +import "github.com/m1k1o/go-transcode/pkg/hlsproxy" + +type Config struct { + hlsproxy.Config + + // overwritten properties + PlaylistBaseUrl string `mapstructure:"-"` + PlaylistPathPrefix string `mapstructure:"-"` + SegmentBaseUrl string `mapstructure:"-"` + SegmentPathPrefix string `mapstructure:"-"` + + Sources map[string]string +} + +func (c Config) withDefaultValues() Config { + return c +} diff --git a/modules/hlsvod/module.go b/modules/hlsvod/module.go new file mode 100644 index 0000000..bc11f42 --- /dev/null +++ b/modules/hlsvod/module.go @@ -0,0 +1,181 @@ +package hlsvod + +import ( + "fmt" + "net/http" + "os" + "path" + "path/filepath" + "strings" + + "github.com/m1k1o/go-transcode/pkg/hlsvod" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config + + managers map[string]hlsvod.Manager +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "hlsvod").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + + managers: make(map[string]hlsvod.Manager), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + for _, manager := range m.managers { + manager.Stop() + } +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading and ending / + p = strings.Trim(p, "/") + + // get index of last slash from path + lastSlashIndex := strings.LastIndex(p, "/") + if lastSlashIndex == -1 { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + // everything after last slash is hls resource (playlist or segment) + hlsResource := p[lastSlashIndex+1:] + // everything before last slash is vod media path + vodMediaPath := p[:lastSlashIndex] + // use clean path + vodMediaPath = filepath.Clean(vodMediaPath) + vodMediaPath = path.Join(m.config.MediaBasePath, vodMediaPath) + + // serve master profile + if hlsResource == m.config.MasterPlaylistName { + // modify default config + config := m.config.Config + config.MediaPath = vodMediaPath + + data, err := hlsvod.New(&config).Preload(r.Context()) + if err != nil { + m.logger.Warn().Err(err).Msg("unable to preload metadata") + http.Error(w, "500 unable to preload metadata", http.StatusInternalServerError) + return + } + + width, height := 0, 0 + if data.Video != nil { + width, height = data.Video.Width, data.Video.Height + } + + profiles := map[string]hlsvod.VideoProfile{} + for name, profile := range m.config.VideoProfiles { + if width != 0 && width < profile.Width && + height != 0 && height < profile.Height { + continue + } + + profiles[name] = hlsvod.VideoProfile{ + Width: profile.Width, + Height: profile.Height, + Bitrate: (profile.Bitrate + m.config.AudioProfile.Bitrate) / 100 * 105000, + } + } + + playlist := hlsvod.StreamsPlaylist(profiles, "%s.m3u8") + _, _ = w.Write([]byte(playlist)) + return + } + + // get profile name (everythinb before . or -) + profileID := strings.FieldsFunc(hlsResource, func(r rune) bool { + return r == '.' || r == '-' + })[0] + + // check if exists profile and fetch + profile, ok := m.config.VideoProfiles[profileID] + if !ok { + http.Error(w, "404 profile not found", http.StatusNotFound) + return + } + + ID := fmt.Sprintf("%s/%s", profileID, vodMediaPath) + manager, ok := m.managers[ID] + + m.logger.Info(). + Str("path", p). + Str("hlsResource", hlsResource). + Str("vodMediaPath", vodMediaPath). + Msg("new hls vod request") + + // if manager was not found + if !ok { + // check if vod media path exists + if _, err := os.Stat(vodMediaPath); os.IsNotExist(err) { + http.Error(w, "404 vod not found", http.StatusNotFound) + return + } + + // create own transcoding directory + transcodeDir, err := os.MkdirTemp(m.config.TranscodeDir, fmt.Sprintf("vod-%s-*", profileID)) + if err != nil { + m.logger.Warn().Err(err).Msg("could not create temp dir") + http.Error(w, "500 could not create temp dir", http.StatusInternalServerError) + return + } + + // modify default config + config := m.config.Config + config.MediaPath = vodMediaPath + config.TranscodeDir = transcodeDir // with current medias subfolder + config.SegmentNamePrefix = profileID + config.VideoProfile = &hlsvod.VideoProfile{ + Width: profile.Width, + Height: profile.Height, + Bitrate: profile.Bitrate, + } + + // create new manager + manager = hlsvod.New(&config) + if err := manager.Start(); err != nil { + m.logger.Warn().Err(err).Msg("hls vod manager could not be started") + http.Error(w, "500 hls vod manager could not be started", http.StatusInternalServerError) + return + } + + m.managers[ID] = manager + } + + // server playlist or segment + if hlsResource == profileID+".m3u8" { + manager.ServePlaylist(w, r) + } else { + manager.ServeSegment(w, r) + } +} diff --git a/modules/hlsvod/types.go b/modules/hlsvod/types.go new file mode 100644 index 0000000..779e296 --- /dev/null +++ b/modules/hlsvod/types.go @@ -0,0 +1,26 @@ +package hlsvod + +import "github.com/m1k1o/go-transcode/pkg/hlsvod" + +type Config struct { + hlsvod.Config + + // overwritten properties + MediaPath string `mapstructure:"-"` + SegmentNamePrefix string `mapstructure:"-"` + VideoProfile *hlsvod.VideoProfile `mapstructure:"-"` + + // modified properties + MediaBasePath string + TranscodeDir string + + VideoProfiles map[string]hlsvod.VideoProfile + MasterPlaylistName string +} + +func (c Config) withDefaultValues() Config { + if c.MasterPlaylistName == "" { + c.MasterPlaylistName = "index.m3u8" + } + return c +} diff --git a/modules/httpstream/module.go b/modules/httpstream/module.go new file mode 100644 index 0000000..a68dac5 --- /dev/null +++ b/modules/httpstream/module.go @@ -0,0 +1,145 @@ +package httpstream + +import ( + "io" + "net/http" + "os" + "os/exec" + "path" + "regexp" + "strings" + + "github.com/m1k1o/go-transcode/internal/utils" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "httpstream").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if !strings.HasPrefix(r.URL.Path, m.pathPrefix) { + http.NotFound(w, r) + return + } + + p := r.URL.Path + // remove path prefix + p = strings.TrimPrefix(p, m.pathPrefix) + // remove leading and ending / + p = strings.Trim(p, "/") + // split path to parts + s := strings.Split(p, "/") + + // we need exactly two parts of the url + if len(s) != 2 { + http.NotFound(w, r) + return + } + + // {source}/{profile}/{resource} + sourceName, profileName := s[0], s[1] + + // check if parameters match regex + if !resourceRegex.MatchString(sourceName) || + !resourceRegex.MatchString(profileName) { + http.Error(w, "400 invalid parameters", http.StatusBadRequest) + return + } + + // find relevant source + source, ok := m.config.Sources[sourceName] + if !ok { + http.Error(w, "404 source not found", http.StatusNotFound) + return + } + + // check if exists profile path + profilePath := path.Join(m.config.ProfilesPath, profileName) + if _, err := os.Stat(profilePath); err != nil { + http.Error(w, "404 profile not found", http.StatusNotFound) + return + } + + cmd := exec.CommandContext(r.Context(), profilePath, source) + cmd.Stderr = utils.LogWriter(m.logger) + + if m.config.UseBufCopy { + m.bufCopyCmdToHttp(cmd, w) + } else { + m.pipeCmdToHttp(cmd, w) + } +} + +func (m *ModuleCtx) bufCopyCmdToHttp(cmd *exec.Cmd, w http.ResponseWriter) { + w.Header().Set("Content-Type", "video/mp2t") + + read, write := io.Pipe() + cmd.Stdout = write + + go utils.IOPipeToHTTP(w, read) + m.logger.Info().Msg("starting command") + + err := cmd.Run() + if err != nil { + m.logger.Warn().Err(err).Msg("transcode could not be started") + http.Error(w, "500 not available", http.StatusInternalServerError) + return + } + + write.Close() + m.logger.Info().Msg("command finished") +} + +func (m *ModuleCtx) pipeCmdToHttp(cmd *exec.Cmd, w http.ResponseWriter) { + read, write := io.Pipe() + cmd.Stdout = write + + err := cmd.Start() + if err != nil { + m.logger.Warn().Err(err).Msg("transcode could not be started") + http.Error(w, "500 not available", http.StatusInternalServerError) + return + } + + m.logger.Info().Msg("command started") + + go func() { + err := cmd.Wait() + m.logger.Err(err).Msg("command finished") + read.Close() + write.Close() + }() + + w.Header().Set("Content-Type", "video/mp2t") + _, _ = io.Copy(w, read) +} diff --git a/modules/httpstream/types.go b/modules/httpstream/types.go new file mode 100644 index 0000000..9f8dc4a --- /dev/null +++ b/modules/httpstream/types.go @@ -0,0 +1,11 @@ +package httpstream + +type Config struct { + Sources map[string]string + ProfilesPath string + UseBufCopy bool +} + +func (c Config) withDefaultValues() Config { + return c +} diff --git a/modules/player/module.go b/modules/player/module.go new file mode 100644 index 0000000..261aea6 --- /dev/null +++ b/modules/player/module.go @@ -0,0 +1,50 @@ +package player + +import ( + _ "embed" + "net/http" + "strings" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +//go:embed player.html +var playHTML string + +type ModuleCtx struct { + logger zerolog.Logger + pathPrefix string + config Config +} + +func New(pathPrefix string, config *Config) *ModuleCtx { + module := &ModuleCtx{ + logger: log.With().Str("module", "player").Logger(), + pathPrefix: pathPrefix, + config: config.withDefaultValues(), + } + + return module +} + +func (m *ModuleCtx) Shutdown() { + +} + +// TODO: Reload config in all managers. +func (m *ModuleCtx) ConfigReload(config *Config) { + m.config = config.withDefaultValues() +} + +// TODO: Periodically call this to remove old managers. +func (m *ModuleCtx) Cleanup() { + +} + +func (m *ModuleCtx) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/html") + + html := strings.Replace(playHTML, "index.m3u8", m.config.Source, 1) + _, _ = w.Write([]byte(html)) +} diff --git a/modules/player/player.html b/modules/player/player.html new file mode 100644 index 0000000..71a7da3 --- /dev/null +++ b/modules/player/player.html @@ -0,0 +1,31 @@ + + + + + Player + + + + + + + + + + + + + + diff --git a/modules/player/types.go b/modules/player/types.go new file mode 100644 index 0000000..0490cc8 --- /dev/null +++ b/modules/player/types.go @@ -0,0 +1,12 @@ +package player + +type Config struct { + Source string +} + +func (c Config) withDefaultValues() Config { + if c.Source == "" { + c.Source = "index.m3u8" + } + return c +} diff --git a/modules/types.go b/modules/types.go new file mode 100644 index 0000000..f0129d1 --- /dev/null +++ b/modules/types.go @@ -0,0 +1,13 @@ +package modules + +import "net/http" + +type Config interface { +} + +type Module interface { + Shutdown() + ConfigReload(config Config) + Cleanup() + ServeHTTP(w http.ResponseWriter, r *http.Request) +} From b9cbb9815b9d436f5a99f472ed7be4dc7fb2e62d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= Date: Thu, 30 Dec 2021 15:55:45 +0100 Subject: [PATCH 19/19] move serve from api. --- cmd/serve.go | 12 +- internal/api/hlslive.go | 94 --------------- internal/api/hlsproxy.go | 44 ------- internal/api/hlsvod.go | 161 ------------------------- internal/api/http.go | 141 ---------------------- internal/api/play.html | 31 ----- internal/api/router.go | 96 --------------- internal/main.go | 71 ----------- internal/{config => serve}/config.go | 68 ++++++++--- internal/serve/serve.go | 173 +++++++++++++++++++++++++++ internal/server/http.go | 15 +-- 11 files changed, 242 insertions(+), 664 deletions(-) delete mode 100644 internal/api/hlslive.go delete mode 100644 internal/api/hlsproxy.go delete mode 100644 internal/api/hlsvod.go delete mode 100644 internal/api/http.go delete mode 100644 internal/api/play.html delete mode 100644 internal/api/router.go delete mode 100644 internal/main.go rename internal/{config => serve}/config.go (60%) create mode 100644 internal/serve/serve.go diff --git a/cmd/serve.go b/cmd/serve.go index 71c8008..1a4ad3e 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -4,23 +4,25 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - transcode "github.com/m1k1o/go-transcode/internal" + "github.com/m1k1o/go-transcode/internal/serve" ) func init() { + serve := serve.NewCommand() + command := &cobra.Command{ Use: "serve", Short: "serve transcode server", Long: `serve transcode server`, - Run: transcode.Service.ServeCommand, + Run: serve.Run, } onConfigLoad = append(onConfigLoad, func() { - transcode.Service.ServerConfig.Set() - transcode.Service.Preflight() + serve.Config.Set() + serve.Preflight() }) - if err := transcode.Service.ServerConfig.Init(command); err != nil { + if err := serve.Config.Init(command); err != nil { log.Panic().Err(err).Msg("unable to run serve command") } diff --git a/internal/api/hlslive.go b/internal/api/hlslive.go deleted file mode 100644 index 40bb494..0000000 --- a/internal/api/hlslive.go +++ /dev/null @@ -1,94 +0,0 @@ -package api - -import ( - "context" - _ "embed" - "fmt" - "net/http" - "os/exec" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/pkg/hlslive" -) - -var hlsManagers map[string]hlslive.Manager = make(map[string]hlslive.Manager) - -//go:embed play.html -var playHTML string - -func (a *ApiManagerCtx) HLS(r chi.Router) { - r.Get("/{profile}/{input}/index.m3u8", func(w http.ResponseWriter, r *http.Request) { - logger := log.With().Str("module", "hls").Logger() - - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - - if !resourceRegex.MatchString(profile) || !resourceRegex.MatchString(input) { - http.Error(w, "400 invalid parameters", http.StatusBadRequest) - return - } - - // check if stream exists - _, ok := a.config.Streams[input] - if !ok { - http.Error(w, "404 stream not found", http.StatusNotFound) - return - } - - // check if profile exists - profilePath, err := a.profilePath("hls", profile) - if err != nil { - logger.Warn().Err(err).Msg("profile path could not be found") - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - ID := fmt.Sprintf("%s/%s", profile, input) - - manager, ok := hlsManagers[ID] - if !ok { - // create new manager - manager = hlslive.New(func() *exec.Cmd { - // get transcode cmd - cmd, err := a.transcodeStart(context.Background(), profilePath, input) - if err != nil { - logger.Error().Err(err).Msg("transcode could not be started") - } - - return cmd - }, nil) - - hlsManagers[ID] = manager - } - - manager.ServePlaylist(w, r) - }) - - r.Get("/{profile}/{input}/{file}.ts", func(w http.ResponseWriter, r *http.Request) { - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - file := chi.URLParam(r, "file") - - if !resourceRegex.MatchString(profile) || !resourceRegex.MatchString(input) || !resourceRegex.MatchString(file) { - http.Error(w, "400 invalid parameters", http.StatusBadRequest) - return - } - - ID := fmt.Sprintf("%s/%s", profile, input) - - manager, ok := hlsManagers[ID] - if !ok { - http.Error(w, "404 transcode not found", http.StatusNotFound) - return - } - - manager.ServeSegment(w, r) - }) - - r.Get("/{profile}/{input}/play.html", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/html") - _, _ = w.Write([]byte(playHTML)) - }) -} diff --git a/internal/api/hlsproxy.go b/internal/api/hlsproxy.go deleted file mode 100644 index 107fdc2..0000000 --- a/internal/api/hlsproxy.go +++ /dev/null @@ -1,44 +0,0 @@ -package api - -import ( - "net/http" - "strings" - - "github.com/go-chi/chi" - - "github.com/m1k1o/go-transcode/pkg/hlsproxy" -) - -const hlsProxyPerfix = "/hlsproxy/" - -var hlsProxyManagers map[string]hlsproxy.Manager = make(map[string]hlsproxy.Manager) - -func (a *ApiManagerCtx) HLSProxy(r chi.Router) { - r.Get(hlsProxyPerfix+"{sourceId}/*", func(w http.ResponseWriter, r *http.Request) { - ID := chi.URLParam(r, "sourceId") - - // check if stream exists - baseUrl, ok := a.config.HlsProxy[ID] - if !ok { - http.Error(w, "404 hls proxy source not found", http.StatusNotFound) - return - } - - manager, ok := hlsProxyManagers[ID] - if !ok { - // create new manager - manager = hlsproxy.New(&hlsproxy.Config{ - PlaylistBaseUrl: baseUrl, - PlaylistPathPrefix: hlsProxyPerfix + ID, - }) - hlsProxyManagers[ID] = manager - } - - // if this is playlist request - if strings.HasSuffix(r.URL.String(), ".m3u8") { - manager.ServePlaylist(w, r) - } else { - manager.ServeSegment(w, r) - } - }) -} diff --git a/internal/api/hlsvod.go b/internal/api/hlsvod.go deleted file mode 100644 index 6c4d318..0000000 --- a/internal/api/hlsvod.go +++ /dev/null @@ -1,161 +0,0 @@ -package api - -import ( - _ "embed" - "fmt" - "net/http" - "os" - "path" - "path/filepath" - "strings" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/pkg/hlsvod" -) - -var hlsVodManagers map[string]hlsvod.Manager = make(map[string]hlsvod.Manager) - -func (a *ApiManagerCtx) HlsVod(r chi.Router) { - r.Get("/vod/*", func(w http.ResponseWriter, r *http.Request) { - logger := log.With().Str("module", "hlsvod").Logger() - - // remove /vod/ from path - urlPath := r.URL.Path[5:] - - // get index of last slash from path - lastSlashIndex := strings.LastIndex(urlPath, "/") - if lastSlashIndex == -1 { - http.Error(w, "400 invalid parameters", http.StatusBadRequest) - return - } - - // everything after last slash is hls resource (playlist or segment) - hlsResource := urlPath[lastSlashIndex+1:] - // everything before last slash is vod media path - vodMediaPath := urlPath[:lastSlashIndex] - // use clean path - vodMediaPath = filepath.Clean(vodMediaPath) - vodMediaPath = path.Join(a.config.Vod.MediaDir, vodMediaPath) - - // serve master profile - if hlsResource == "index.m3u8" { - data, err := hlsvod.New(&hlsvod.Config{ - MediaPath: vodMediaPath, - VideoKeyframes: a.config.Vod.VideoKeyframes, - - Cache: a.config.Vod.Cache, - CacheDir: a.config.Vod.CacheDir, - - FFmpegBinary: a.config.Vod.FFmpegBinary, - FFprobeBinary: a.config.Vod.FFprobeBinary, - }).Preload(r.Context()) - - if err != nil { - logger.Warn().Err(err).Msg("unable to preload metadata") - http.Error(w, "500 unable to preload metadata", http.StatusInternalServerError) - return - } - - width, height := 0, 0 - if data.Video != nil { - width, height = data.Video.Width, data.Video.Height - } - - profiles := map[string]hlsvod.VideoProfile{} - for name, profile := range a.config.Vod.VideoProfiles { - if width != 0 && width < profile.Width && - height != 0 && height < profile.Height { - continue - } - - profiles[name] = hlsvod.VideoProfile{ - Width: profile.Width, - Height: profile.Height, - Bitrate: (profile.Bitrate + a.config.Vod.AudioProfile.Bitrate) / 100 * 105000, - } - } - - playlist := hlsvod.StreamsPlaylist(profiles, "%s.m3u8") - _, _ = w.Write([]byte(playlist)) - return - } - - // get profile name (everythinb before . or -) - profileID := strings.FieldsFunc(hlsResource, func(r rune) bool { - return r == '.' || r == '-' - })[0] - - // check if exists profile and fetch - profile, ok := a.config.Vod.VideoProfiles[profileID] - if !ok { - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - ID := fmt.Sprintf("%s/%s", profileID, vodMediaPath) - manager, ok := hlsVodManagers[ID] - - logger.Info(). - Str("path", urlPath). - Str("hlsResource", hlsResource). - Str("vodMediaPath", vodMediaPath). - Msg("new hls vod request") - - // if manager was not found - if !ok { - // check if vod media path exists - if _, err := os.Stat(vodMediaPath); os.IsNotExist(err) { - http.Error(w, "404 vod not found", http.StatusNotFound) - return - } - - // create own transcoding directory - transcodeDir, err := os.MkdirTemp(a.config.Vod.TranscodeDir, fmt.Sprintf("vod-%s-*", profileID)) - if err != nil { - logger.Warn().Err(err).Msg("could not create temp dir") - http.Error(w, "500 could not create temp dir", http.StatusInternalServerError) - return - } - - // create new manager - manager = hlsvod.New(&hlsvod.Config{ - MediaPath: vodMediaPath, - TranscodeDir: transcodeDir, - SegmentNamePrefix: profileID, - - VideoProfile: &hlsvod.VideoProfile{ - Width: profile.Width, - Height: profile.Height, - Bitrate: profile.Bitrate, - }, - VideoKeyframes: a.config.Vod.VideoKeyframes, - AudioProfile: &hlsvod.AudioProfile{ - Bitrate: a.config.Vod.AudioProfile.Bitrate, - }, - - Cache: a.config.Vod.Cache, - CacheDir: a.config.Vod.CacheDir, - - FFmpegBinary: a.config.Vod.FFmpegBinary, - FFprobeBinary: a.config.Vod.FFprobeBinary, - }) - - hlsVodManagers[ID] = manager - - if err := manager.Start(); err != nil { - logger.Warn().Err(err).Msg("hls vod manager could not be started") - http.Error(w, "500 hls vod manager could not be started", http.StatusInternalServerError) - return - } - } - - // server playlist or segment - if hlsResource == profileID+".m3u8" { - manager.ServePlaylist(w, r) - } else { - manager.ServeSegment(w, r) - } - }) -} diff --git a/internal/api/http.go b/internal/api/http.go deleted file mode 100644 index 31fb3be..0000000 --- a/internal/api/http.go +++ /dev/null @@ -1,141 +0,0 @@ -package api - -import ( - "io" - "net/http" - "os/exec" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/internal/utils" -) - -func (a *ApiManagerCtx) Http(r chi.Router) { - r.Get("/test", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "video/mp2t") - logger := log.With(). - Str("path", r.URL.Path). - Str("module", "ffmpeg"). - Logger() - - // dummy input for testing purposes - file := a.config.AbsPath("profiles", "http-test.sh") - cmd := exec.CommandContext(r.Context(), file) - logger.Info().Msg("command startred") - - read, write := io.Pipe() - cmd.Stdout = write - cmd.Stderr = utils.LogWriter(logger) - - defer func() { - logger.Info().Msg("command stopped") - - read.Close() - write.Close() - }() - - go func() { - _ = cmd.Run() - }() - - _, _ = io.Copy(w, read) - }) - - r.Get("/{profile}/{input}", func(w http.ResponseWriter, r *http.Request) { - logger := log.With(). - Str("path", r.URL.Path). - Str("module", "ffmpeg"). - Logger() - - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - - // check if stream exists - _, ok := a.config.Streams[input] - if !ok { - http.Error(w, "404 stream not found", http.StatusNotFound) - return - } - - // check if profile exists - profilePath, err := a.profilePath("hls", profile) - if err != nil { - logger.Warn().Err(err).Msg("profile path could not be found") - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - cmd, err := a.transcodeStart(r.Context(), profilePath, input) - if err != nil { - logger.Warn().Err(err).Msg("transcode could not be started") - http.Error(w, "500 not available", http.StatusInternalServerError) - return - } - - logger.Info().Msg("command started") - w.Header().Set("Content-Type", "video/mp2t") - - read, write := io.Pipe() - cmd.Stdout = write - cmd.Stderr = utils.LogWriter(logger) - - defer func() { - logger.Info().Msg("command stopped") - - read.Close() - write.Close() - }() - - go func() { - _ = cmd.Run() - }() - - _, _ = io.Copy(w, read) - }) - - // buffered http streaming (alternative to prervious type) - r.Get("/{profile}/{input}/buf", func(w http.ResponseWriter, r *http.Request) { - logger := log.With(). - Str("path", r.URL.Path). - Str("module", "ffmpeg"). - Logger() - - profile := chi.URLParam(r, "profile") - input := chi.URLParam(r, "input") - - // check if stream exists - _, ok := a.config.Streams[input] - if !ok { - http.Error(w, "404 stream not found", http.StatusNotFound) - return - } - - // check if profile exists - profilePath, err := a.profilePath("hls", profile) - if err != nil { - logger.Warn().Err(err).Msg("profile path could not be found") - http.Error(w, "404 profile not found", http.StatusNotFound) - return - } - - cmd, err := a.transcodeStart(r.Context(), profilePath, input) - if err != nil { - logger.Warn().Err(err).Msg("transcode could not be started") - http.Error(w, "500 not available", http.StatusInternalServerError) - return - } - - logger.Info().Msg("command started") - w.Header().Set("Content-Type", "video/mp2t") - - read, write := io.Pipe() - cmd.Stdout = write - cmd.Stderr = utils.LogWriter(logger) - - go utils.IOPipeToHTTP(w, read) - _ = cmd.Run() - write.Close() - logger.Info().Msg("command stopped") - }) -} diff --git a/internal/api/play.html b/internal/api/play.html deleted file mode 100644 index 71a7da3..0000000 --- a/internal/api/play.html +++ /dev/null @@ -1,31 +0,0 @@ - - - - - Player - - - - - - - - - - - - - - diff --git a/internal/api/router.go b/internal/api/router.go deleted file mode 100644 index bdff634..0000000 --- a/internal/api/router.go +++ /dev/null @@ -1,96 +0,0 @@ -package api - -import ( - "context" - "fmt" - "net/http" - "os" - "os/exec" - "path" - "regexp" - - "github.com/go-chi/chi" - "github.com/rs/zerolog/log" - - "github.com/m1k1o/go-transcode/internal/config" -) - -var resourceRegex = regexp.MustCompile(`^[0-9A-Za-z_-]+$`) - -type ApiManagerCtx struct { - config *config.Server -} - -func New(config *config.Server) *ApiManagerCtx { - return &ApiManagerCtx{ - config: config, - } -} - -func (manager *ApiManagerCtx) Start() { -} - -func (manager *ApiManagerCtx) Shutdown() error { - // stop all hls managers - for _, hls := range hlsManagers { - hls.Stop() - } - - // stop all hls vod managers - for _, hls := range hlsVodManagers { - hls.Stop() - } - - // shutdown all hls proxy managers - for _, hls := range hlsProxyManagers { - hls.Shutdown() - } - - return nil -} - -func (a *ApiManagerCtx) Mount(r *chi.Mux) { - r.Get("/ping", func(w http.ResponseWriter, r *http.Request) { - //nolint - _, _ = w.Write([]byte("pong")) - }) - - if a.config.Vod.MediaDir != "" { - r.Group(a.HlsVod) - log.Info().Str("vod-dir", a.config.Vod.MediaDir).Msg("static file transcoding is active") - } - - if len(a.config.HlsProxy) > 0 { - r.Group(a.HLSProxy) - log.Info().Interface("hls-proxy", a.config.HlsProxy).Msg("hls proxy is active") - } - - r.Group(a.HLS) - r.Group(a.Http) -} - -func (a *ApiManagerCtx) profilePath(folder string, profile string) (string, error) { - // [profiles]/hls,http/[profile].sh - // [profiles] defaults to [basedir]/profiles - - if !resourceRegex.MatchString(profile) { - return "", fmt.Errorf("invalid profile path") - } - - profilePath := path.Join(a.config.Profiles, folder, fmt.Sprintf("%s.sh", profile)) - if _, err := os.Stat(profilePath); err != nil { - return "", err - } - - return profilePath, nil -} - -func (a *ApiManagerCtx) transcodeStart(ctx context.Context, profilePath string, input string) (*exec.Cmd, error) { - url, ok := a.config.Streams[input] - if !ok { - return nil, fmt.Errorf("stream not found") - } - - log.Info().Str("profilePath", profilePath).Str("url", url).Msg("command startred") - return exec.CommandContext(ctx, profilePath, url), nil -} diff --git a/internal/main.go b/internal/main.go deleted file mode 100644 index a5f617f..0000000 --- a/internal/main.go +++ /dev/null @@ -1,71 +0,0 @@ -package transcode - -import ( - "os" - "os/signal" - - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" - "github.com/spf13/cobra" - - "github.com/m1k1o/go-transcode/internal/api" - "github.com/m1k1o/go-transcode/internal/config" - "github.com/m1k1o/go-transcode/internal/server" -) - -var Service *Main - -func init() { - Service = &Main{ - ServerConfig: &config.Server{}, - } -} - -type Main struct { - ServerConfig *config.Server - - logger zerolog.Logger - apiManager *api.ApiManagerCtx - serverManager *server.ServerManagerCtx -} - -func (main *Main) Preflight() { - main.logger = log.With().Str("service", "main").Logger() -} - -func (main *Main) Start() { - config := main.ServerConfig - - main.apiManager = api.New(config) - main.apiManager.Start() - - main.serverManager = server.New(&config.Server) - main.serverManager.Mount(main.apiManager.Mount) - main.serverManager.Start() - - main.logger.Info().Msgf("serving streams from basedir %s: %s", config.BaseDir, config.Streams) -} - -func (main *Main) Shutdown() { - var err error - - err = main.serverManager.Shutdown() - main.logger.Err(err).Msg("http manager shutdown") - - err = main.apiManager.Shutdown() - main.logger.Err(err).Msg("api manager shutdown") -} - -func (main *Main) ServeCommand(cmd *cobra.Command, args []string) { - main.logger.Info().Msg("starting main server") - main.Start() - main.logger.Info().Msg("main ready") - - quit := make(chan os.Signal, 1) - signal.Notify(quit, os.Interrupt) - sig := <-quit - - main.logger.Warn().Msgf("received %s, attempting graceful shutdown", sig) - main.Shutdown() - main.logger.Info().Msg("shutdown complete") -} diff --git a/internal/config/config.go b/internal/serve/config.go similarity index 60% rename from internal/config/config.go rename to internal/serve/config.go index 4a5c571..fc58b01 100644 --- a/internal/config/config.go +++ b/internal/serve/config.go @@ -1,12 +1,9 @@ -package config +package serve import ( - "fmt" "os" "path" - "github.com/m1k1o/go-transcode/internal/server" - "github.com/spf13/cobra" "github.com/spf13/viper" ) @@ -33,8 +30,15 @@ type VOD struct { FFprobeBinary string `mapstructure:"ffprobe-binary"` } -type Server struct { - Server server.Config +type Config struct { + Debug bool + PProf bool + + Cert string + Key string + Bind string + Static string + Proxy bool BaseDir string `yaml:"basedir,omitempty"` Streams map[string]string `yaml:"streams"` @@ -44,9 +48,39 @@ type Server struct { HlsProxy map[string]string } -func (s *Server) Init(cmd *cobra.Command) error { - // TODO: Scope - if err := s.Server.Init(cmd); err != nil { +func (Config) Init(cmd *cobra.Command) error { + cmd.PersistentFlags().BoolP("debug", "d", false, "enable debug mode") + if err := viper.BindPFlag("debug", cmd.PersistentFlags().Lookup("debug")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("pprof", false, "enable pprof endpoint available at /debug/pprof") + if err := viper.BindPFlag("pprof", cmd.PersistentFlags().Lookup("pprof")); err != nil { + return err + } + + cmd.PersistentFlags().String("bind", "127.0.0.1:8080", "address/port/socket to serve neko") + if err := viper.BindPFlag("bind", cmd.PersistentFlags().Lookup("bind")); err != nil { + return err + } + + cmd.PersistentFlags().String("cert", "", "path to the SSL cert used to secure the neko server") + if err := viper.BindPFlag("cert", cmd.PersistentFlags().Lookup("cert")); err != nil { + return err + } + + cmd.PersistentFlags().String("key", "", "path to the SSL key used to secure the neko server") + if err := viper.BindPFlag("key", cmd.PersistentFlags().Lookup("key")); err != nil { + return err + } + + cmd.PersistentFlags().String("static", "", "path to neko client files to serve") + if err := viper.BindPFlag("static", cmd.PersistentFlags().Lookup("static")); err != nil { + return err + } + + cmd.PersistentFlags().Bool("proxy", false, "allow reverse proxies") + if err := viper.BindPFlag("proxy", cmd.PersistentFlags().Lookup("proxy")); err != nil { return err } @@ -63,8 +97,15 @@ func (s *Server) Init(cmd *cobra.Command) error { return nil } -func (s *Server) Set() { - s.Server.Set() +func (s *Config) Set() { + s.Debug = viper.GetBool("debug") + s.PProf = viper.GetBool("pprof") + + s.Cert = viper.GetString("cert") + s.Key = viper.GetString("key") + s.Bind = viper.GetString("bind") + s.Static = viper.GetString("static") + s.Proxy = viper.GetBool("proxy") s.BaseDir = viper.GetString("basedir") if s.BaseDir == "" { @@ -78,8 +119,7 @@ func (s *Server) Set() { s.Profiles = viper.GetString("profiles") if s.Profiles == "" { - // TODO: issue #5 - s.Profiles = fmt.Sprintf("%s/profiles", s.BaseDir) + s.Profiles = path.Join(s.BaseDir, "profiles") } s.Streams = viper.GetStringMapString("streams") @@ -130,7 +170,7 @@ func (s *Server) Set() { s.HlsProxy = viper.GetStringMapString("hls-proxy") } -func (s *Server) AbsPath(elem ...string) string { +func (s *Config) AbsPath(elem ...string) string { // prepend base path elem = append([]string{s.BaseDir}, elem...) return path.Join(elem...) diff --git a/internal/serve/serve.go b/internal/serve/serve.go new file mode 100644 index 0000000..1662ab6 --- /dev/null +++ b/internal/serve/serve.go @@ -0,0 +1,173 @@ +package serve + +import ( + "os" + "os/signal" + "path" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/spf13/cobra" + + "github.com/m1k1o/go-transcode/internal/server" + "github.com/m1k1o/go-transcode/modules/hlslive" + "github.com/m1k1o/go-transcode/modules/hlsproxy" + "github.com/m1k1o/go-transcode/modules/hlsvod" + "github.com/m1k1o/go-transcode/modules/httpstream" + "github.com/m1k1o/go-transcode/modules/player" + hlsVodPkg "github.com/m1k1o/go-transcode/pkg/hlsvod" +) + +func NewCommand() *Main { + return &Main{ + Config: &Config{}, + } +} + +type Main struct { + Config *Config + + logger zerolog.Logger + server *server.ServerManagerCtx + hlsLive *hlslive.ModuleCtx + hlsProxy *hlsproxy.ModuleCtx + hlsVod *hlsvod.ModuleCtx + httpStream *httpstream.ModuleCtx + player *player.ModuleCtx +} + +func (main *Main) Preflight() { + main.logger = log.With().Str("service", "main").Logger() +} + +func (main *Main) start() { + config := main.Config + + main.server = server.New(&server.Config{ + Bind: config.Bind, + Static: config.Static, + SSLCert: config.Cert, + SSLKey: config.Key, + Proxy: config.Proxy, + PProf: config.PProf, + }) + + /* + r.Get("/ping", func(w http.ResponseWriter, r *http.Request) { + //nolint + _, _ = w.Write([]byte("pong")) + }) + */ + + if config.Vod.MediaDir != "" { + videoProfiles := map[string]hlsVodPkg.VideoProfile{} + for key, prof := range config.Vod.VideoProfiles { + videoProfiles[key] = hlsVodPkg.VideoProfile{ + Width: prof.Width, + Height: prof.Height, + Bitrate: prof.Bitrate, + } + } + + main.hlsVod = hlsvod.New("/vod/", &hlsvod.Config{ + MediaBasePath: config.Vod.MediaDir, + TranscodeDir: config.Vod.TranscodeDir, + VideoProfiles: videoProfiles, + MasterPlaylistName: "index.m3u8", + + Config: hlsVodPkg.Config{ + VideoKeyframes: config.Vod.VideoKeyframes, + AudioProfile: &hlsVodPkg.AudioProfile{ + Bitrate: config.Vod.AudioProfile.Bitrate, + }, + + Cache: config.Vod.Cache, + CacheDir: config.Vod.CacheDir, + + FFmpegBinary: config.Vod.FFmpegBinary, + FFprobeBinary: config.Vod.FFprobeBinary, + }, + }) + + main.server.Handle("/vod/", main.hlsVod) + main.logger.Info().Str("vod-dir", config.Vod.MediaDir).Msg("static file transcoding is active") + } + + if len(config.HlsProxy) > 0 { + main.hlsProxy = hlsproxy.New("/hlsproxy/", &hlsproxy.Config{ + Sources: config.HlsProxy, + }) + main.server.Handle("/hlsproxy/", main.hlsProxy) + log.Info().Interface("hls-proxy", config.HlsProxy).Msg("hls proxy is active") + } + + main.hlsLive = hlslive.New("/", &hlslive.Config{ + Sources: config.Streams, + ProfilesPath: path.Join(config.Profiles, "hls"), + PlaylistName: "index.m3u8", + // TOOD: Profile ends with .sh + }) + main.server.Handle("/", main.hlsLive) + main.logger.Info().Msg("hlsLive registered") + + // TODO: Match correct URLs. + main.httpStream = httpstream.New("/", &httpstream.Config{ + Sources: config.Streams, + ProfilesPath: path.Join(config.Profiles, "http"), + UseBufCopy: false, + }) + main.server.Handle("/", main.httpStream) + main.logger.Info().Msg("httpStream registered") + + // TODO: Match correct URLs. + main.player = player.New("/player/", &player.Config{}) + main.server.Handle("/player/", main.player) + main.logger.Info().Msg("player registered") + + main.server.Start() + main.logger.Info().Msgf("serving streams from basedir %s: %s", config.BaseDir, config.Streams) +} + +func (main *Main) shutdown() { + err := main.server.Shutdown() + main.logger.Err(err).Msg("http manager shutdown") + + if main.hlsVod != nil { + main.hlsVod.Shutdown() + main.logger.Info().Msg("hlsVod shutdown") + } + + if main.hlsProxy != nil { + main.hlsProxy.Shutdown() + main.logger.Info().Msg("hlsProxy shutdown") + } + + if main.hlsLive != nil { + main.hlsLive.Shutdown() + main.logger.Info().Msg("hlsLive shutdown") + } + + if main.httpStream != nil { + main.httpStream.Shutdown() + main.logger.Info().Msg("httpStream shutdown") + } + + if main.player != nil { + main.player.Shutdown() + main.logger.Info().Msg("player shutdown") + } +} + +func (main *Main) Run(cmd *cobra.Command, args []string) { + main.logger.Info().Msg("starting main server") + main.start() + main.logger.Info().Msg("main ready") + + quit := make(chan os.Signal, 1) + signal.Notify(quit, os.Interrupt) + sig := <-quit + + main.logger.Warn().Msgf("received %s, attempting graceful shutdown", sig) + main.shutdown() + main.logger.Info().Msg("shutdown complete") +} diff --git a/internal/server/http.go b/internal/server/http.go index d683653..4c80b0e 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -52,11 +52,8 @@ func New(config *Config) *ServerManagerCtx { logger.Info().Msgf("with pprof endpoint at %s", pprofPath) } - // use custom 404 - router.NotFound(func(w http.ResponseWriter, r *http.Request) { - //nolint - _, _ = w.Write([]byte("404")) - }) + // we could use custom 404 + router.NotFound(http.NotFound) return &ServerManagerCtx{ logger: logger, @@ -77,14 +74,14 @@ func (s *ServerManagerCtx) Start() { s.logger.Panic().Err(err).Msg("unable to start https server") } }() - s.logger.Info().Msgf("https listening on %s", s.server.Addr) + s.logger.Info().Msgf("https server listening on %s", s.server.Addr) } else { go func() { if err := s.server.ListenAndServe(); err != http.ErrServerClosed { s.logger.Panic().Err(err).Msg("unable to start http server") } }() - s.logger.Info().Msgf("http listening on %s", s.server.Addr) + s.logger.Info().Msgf("http server listening on %s", s.server.Addr) } } @@ -98,3 +95,7 @@ func (s *ServerManagerCtx) Shutdown() error { func (s *ServerManagerCtx) Mount(fn func(r *chi.Mux)) { fn(s.router) } + +func (s *ServerManagerCtx) Handle(pattern string, fn http.Handler) { + s.router.Handle(pattern, fn) +}