diff --git a/Dockerfile b/Dockerfile index dd70cf7d..3a1d02c0 100644 --- a/Dockerfile +++ b/Dockerfile @@ -13,24 +13,13 @@ RUN go mod download # Copy the source code from the current directory to the Working Directory inside the container COPY . . -# Build the Go app -ENV CGO_ENABLED=1 - -# hadolint ignore=DL3018 -RUN apk add --no-cache gcc musl-dev \ - && go build -ldflags='-s -w -extldflags "-static"' -o main . - -# Run tests -RUN go test ./... +RUN go build -ldflags='-s -w' -o main . \ + && go test ./... #################### # Start a new stage from scratch -FROM alpine:3.19.1 - -# Install Redis -# hadolint ignore=DL3018 -RUN apk --no-cache add redis +FROM scratch # Copy the built Go binary from the previous stage COPY --from=build /app/main /gomain @@ -38,11 +27,5 @@ COPY --from=build /app/main /gomain # Expose ports for Go application and Redis EXPOSE 8080 -# Copy the entrypoint script -COPY entrypoint.sh / - -# Set execute permission on the entrypoint script -RUN chmod +x /entrypoint.sh - # Run the entrypoint script -CMD ["/entrypoint.sh"] +CMD ["/gomain"] diff --git a/database/database_test.go b/database/database_test.go index 40665b75..de3f785b 100644 --- a/database/database_test.go +++ b/database/database_test.go @@ -13,20 +13,24 @@ func TestSaveAndLoadFromSQLite(t *testing.T) { // Test LoadFromSQLite with existing data in the database expected := []StreamInfo{{ + DbId: 1, Title: "stream1", TvgID: "test1", LogoURL: "http://test.com/image.png", Group: "test", URLs: []StreamURL{{ + DbId: 1, Content: "testing", M3UIndex: 1, }}, }, { + DbId: 2, Title: "stream2", TvgID: "test2", LogoURL: "http://test2.com/image.png", Group: "test2", URLs: []StreamURL{{ + DbId: 2, Content: "testing2", M3UIndex: 2, }}, @@ -57,12 +61,18 @@ func TestSaveAndLoadFromSQLite(t *testing.T) { t.Errorf("DeleteStreamByTitle returned error: %v", err) } + err = DeleteStreamURL(db, expected[0].URLs[0].DbId) + if err != nil { + t.Errorf("DeleteStreamURL returned error: %v", err) + } + result, err = GetStreams(db) if err != nil { t.Errorf("GetStreams returned error: %v", err) } expected = expected[:1] + expected[0].URLs = make([]StreamURL, 0) if len(result) != len(expected) { t.Errorf("GetStreams returned %+v, expected %+v", result, expected) @@ -94,3 +104,40 @@ func streamInfoEqual(a, b StreamInfo) bool { return true } + +func TestConcurrency(t *testing.T) { + // Initialize the in-memory database + err := InitializeMemDB() + if err != nil { + t.Errorf("Error initializing in-memory database: %v", err) + } + + // Test IncrementConcurrency and GetConcurrency + m3uIndex := 1 + err = IncrementConcurrency(m3uIndex) + if err != nil { + t.Errorf("Error incrementing concurrency: %v", err) + } + + count, err := GetConcurrency(m3uIndex) + if err != nil { + t.Errorf("Error getting concurrency: %v", err) + } + if count != 1 { + t.Errorf("Expected concurrency count to be 1, got %d", count) + } + + // Test DecrementConcurrency + err = DecrementConcurrency(m3uIndex) + if err != nil { + t.Errorf("Error decrementing concurrency: %v", err) + } + + count, err = GetConcurrency(m3uIndex) + if err != nil { + t.Errorf("Error getting concurrency: %v", err) + } + if count != 0 { + t.Errorf("Expected concurrency count to be 0, got %d", count) + } +} diff --git a/database/db.go b/database/db.go index 999c29db..931a6d72 100644 --- a/database/db.go +++ b/database/db.go @@ -6,13 +6,75 @@ import ( "log" "os" "path/filepath" + "strings" "sync" - _ "github.com/mattn/go-sqlite3" + _ "modernc.org/sqlite" ) var mutex sync.Mutex +func checkAndUpdateTable(db *sql.DB, tableName string, expectedColumns map[string]string, foreignKeys map[string]string) error { + // Check table existence + var count int + err := db.QueryRow("SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?", tableName).Scan(&count) + if err != nil { + return fmt.Errorf("error checking %s table: %v\n", tableName, err) + } + + if count > 0 { + // Table exists, check structure + rows, err := db.Query("PRAGMA table_info(" + tableName + ")") + if err != nil { + return fmt.Errorf("error retrieving table info for %s: %v\n", tableName, err) + } + defer rows.Close() + + existingColumns := make(map[string]string) + for rows.Next() { + var cid int + var name, _type string + var notnull, pk int + var dflt_value interface{} + err = rows.Scan(&cid, &name, &_type, ¬null, &dflt_value, &pk) + if err != nil { + return fmt.Errorf("error scanning row: %v\n", err) + } + existingColumns[name] = _type + } + + // Check if column names and types match expected structure + for col, dataType := range expectedColumns { + if existingType, ok := existingColumns[col]; !ok || existingType != dataType { + // Table structure doesn't match, drop and recreate + _, err = db.Exec("DROP TABLE " + tableName) + if err != nil { + return fmt.Errorf("error dropping %s table: %v\n", tableName, err) + } + break + } + } + } + + // Create table if not exists or if dropped due to structure mismatch + query := "CREATE TABLE IF NOT EXISTS " + tableName + " (" + for col, dataType := range expectedColumns { + query += col + " " + dataType + "," + } + if len(foreignKeys) > 0 { + for fk := range foreignKeys { + query += " " + fk + "," + } + } + query = strings.TrimSuffix(query, ",") + ")" + _, err = db.Exec(query) + if err != nil { + return fmt.Errorf("error creating %s table: %v\n", tableName, err) + } + + return nil +} + func InitializeSQLite(name string) (db *sql.DB, err error) { mutex.Lock() defer mutex.Unlock() @@ -38,37 +100,32 @@ func InitializeSQLite(name string) (db *sql.DB, err error) { } file.Close() - db, err = sql.Open("sqlite3", filename) + db, err = sql.Open("sqlite", filename) if err != nil { return nil, fmt.Errorf("error opening SQLite database: %v\n", err) } - // Create table if not exists - _, err = db.Exec(` - CREATE TABLE IF NOT EXISTS streams ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - title TEXT UNIQUE, - tvg_id TEXT, - logo_url TEXT, - group_name TEXT - ) - `) - if err != nil { - return nil, fmt.Errorf("error creating table: %v\n", err) - } - - _, err = db.Exec(` - CREATE TABLE IF NOT EXISTS stream_urls ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - stream_id INTEGER, - content TEXT, - m3u_index INTEGER, - max_concurrency INTEGER DEFAULT 1, - FOREIGN KEY(stream_id) REFERENCES streams(id) - ) - `) - if err != nil { - return nil, fmt.Errorf("error creating table: %v\n", err) + // Check and update 'streams' table + if err := checkAndUpdateTable(db, "streams", map[string]string{ + "id": "INTEGER PRIMARY KEY AUTOINCREMENT", + "title": "TEXT UNIQUE", + "tvg_id": "TEXT", + "logo_url": "TEXT", + "group_name": "TEXT", + }, nil); err != nil { + return nil, err + } + + // Check and update 'stream_urls' table + if err := checkAndUpdateTable(db, "stream_urls", map[string]string{ + "id": "INTEGER PRIMARY KEY AUTOINCREMENT", + "stream_id": "INTEGER", + "content": "TEXT", + "m3u_index": "INTEGER", + }, map[string]string{ + "FOREIGN KEY(stream_id) REFERENCES streams(id)": "", + }); err != nil { + return nil, err } return @@ -207,13 +264,13 @@ func InsertStreamUrl(db *sql.DB, id int64, url StreamURL) (i int64, err error) { } }() - urlStmt, err := tx.Prepare("INSERT INTO stream_urls(stream_id, content, m3u_index, max_concurrency) VALUES(?, ?, ?, ?)") + urlStmt, err := tx.Prepare("INSERT INTO stream_urls(stream_id, content, m3u_index) VALUES(?, ?, ?)") if err != nil { return -1, fmt.Errorf("error preparing statement: %v", err) } defer urlStmt.Close() - res, err := urlStmt.Exec(id, url.Content, url.M3UIndex, url.MaxConcurrency) + res, err := urlStmt.Exec(id, url.Content, url.M3UIndex) if err != nil { return -1, fmt.Errorf("error inserting stream URL: %v", err) } @@ -313,7 +370,7 @@ func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) { return s, fmt.Errorf("error scanning stream: %v", err) } - urlRows, err := db.Query("SELECT id, content, m3u_index, max_concurrency FROM stream_urls WHERE stream_id = ?", s.DbId) + urlRows, err := db.Query("SELECT id, content, m3u_index FROM stream_urls WHERE stream_id = ?", s.DbId) if err != nil { return s, fmt.Errorf("error querying stream URLs: %v", err) } @@ -322,7 +379,7 @@ func GetStreamByTitle(db *sql.DB, title string) (s StreamInfo, err error) { var urls []StreamURL for urlRows.Next() { var u StreamURL - err := urlRows.Scan(&u.DbId, &u.Content, &u.M3UIndex, &u.MaxConcurrency) + err := urlRows.Scan(&u.DbId, &u.Content, &u.M3UIndex) if err != nil { return s, fmt.Errorf("error scanning stream URL: %v", err) } @@ -349,14 +406,14 @@ func GetStreamUrlByUrlAndIndex(db *sql.DB, url string, m3u_index int) (s StreamU mutex.Lock() defer mutex.Unlock() - rows, err := db.Query("SELECT id, content, m3u_index, max_concurrency FROM stream_urls WHERE content = ? AND m3u_index = ?", url, m3u_index) + rows, err := db.Query("SELECT id, content, m3u_index FROM stream_urls WHERE content = ? AND m3u_index = ?", url, m3u_index) if err != nil { return s, fmt.Errorf("error querying streams: %v", err) } defer rows.Close() for rows.Next() { - err = rows.Scan(&s.DbId, &s.Content, &s.M3UIndex, &s.MaxConcurrency) + err = rows.Scan(&s.DbId, &s.Content, &s.M3UIndex) if err != nil { return s, fmt.Errorf("error scanning stream: %v", err) } @@ -387,7 +444,7 @@ func GetStreams(db *sql.DB) ([]StreamInfo, error) { return nil, fmt.Errorf("error scanning stream: %v", err) } - urlRows, err := db.Query("SELECT id, content, m3u_index, max_concurrency FROM stream_urls WHERE stream_id = ?", s.DbId) + urlRows, err := db.Query("SELECT id, content, m3u_index FROM stream_urls WHERE stream_id = ?", s.DbId) if err != nil { return nil, fmt.Errorf("error querying stream URLs: %v", err) } @@ -396,7 +453,7 @@ func GetStreams(db *sql.DB) ([]StreamInfo, error) { var urls []StreamURL for urlRows.Next() { var u StreamURL - err := urlRows.Scan(&u.DbId, &u.Content, &u.M3UIndex, &u.MaxConcurrency) + err := urlRows.Scan(&u.DbId, &u.Content, &u.M3UIndex) if err != nil { return nil, fmt.Errorf("error scanning stream URL: %v", err) } diff --git a/database/memdb.go b/database/memdb.go new file mode 100644 index 00000000..21f26420 --- /dev/null +++ b/database/memdb.go @@ -0,0 +1,108 @@ +package database + +import ( + "github.com/hashicorp/go-memdb" +) + +var memDB *memdb.MemDB + +// Concurrency represents the concurrency count for a specific m3uIndex +type Concurrency struct { + M3UIndex int + Count int +} + +// InitializeMemDB initializes the in-memory database +func InitializeMemDB() error { + // Create the DB schema + schema := &memdb.DBSchema{ + Tables: map[string]*memdb.TableSchema{ + "concurrency": { + Name: "concurrency", + Indexes: map[string]*memdb.IndexSchema{ + "id": { + Name: "id", + Unique: true, + Indexer: &memdb.IntFieldIndex{Field: "M3UIndex"}, + }, + }, + }, + }, + } + + // Create a new data base + db, err := memdb.NewMemDB(schema) + if err != nil { + return err + } + + memDB = db + return nil +} + +// GetConcurrency retrieves the concurrency count for the given m3uIndex +func GetConcurrency(m3uIndex int) (int, error) { + txn := memDB.Txn(false) + defer txn.Abort() + + raw, err := txn.First("concurrency", "id", m3uIndex) + if err != nil { + return 0, err + } + + if raw == nil { + return 0, nil // Key does not exist + } + + return raw.(*Concurrency).Count, nil +} + +// IncrementConcurrency increments the concurrency count for the given m3uIndex +func IncrementConcurrency(m3uIndex int) error { + txn := memDB.Txn(true) + defer txn.Commit() + + raw, err := txn.First("concurrency", "id", m3uIndex) + if err != nil { + return err + } + + var count int + if raw != nil { + count = raw.(*Concurrency).Count + } + + count++ + + err = txn.Insert("concurrency", &Concurrency{M3UIndex: m3uIndex, Count: count}) + if err != nil { + return err + } + + return nil +} + +// DecrementConcurrency decrements the concurrency count for the given m3uIndex +func DecrementConcurrency(m3uIndex int) error { + txn := memDB.Txn(true) + defer txn.Commit() + + raw, err := txn.First("concurrency", "id", m3uIndex) + if err != nil { + return err + } + + var count int + if raw != nil { + count = raw.(*Concurrency).Count + } + + count-- + + err = txn.Insert("concurrency", &Concurrency{M3UIndex: m3uIndex, Count: count}) + if err != nil { + return err + } + + return nil +} diff --git a/database/redis.go b/database/redis.go deleted file mode 100644 index abb76b95..00000000 --- a/database/redis.go +++ /dev/null @@ -1,23 +0,0 @@ -package database - -import ( - "sync" - - "github.com/redis/go-redis/v9" -) - -func InitializeRedis() *redis.Client { - var redisClient *redis.Client - var redisOnce sync.Once - - // Initialize Redis client - redisOnce.Do(func() { - redisClient = redis.NewClient(&redis.Options{ - Addr: "localhost:6379", // Change this to your Redis server address - Password: "", // No password set - DB: 0, // Use default DB - }) - }) - - return redisClient -} diff --git a/database/types.go b/database/types.go index 9587c4f6..2cd53535 100644 --- a/database/types.go +++ b/database/types.go @@ -10,8 +10,7 @@ type StreamInfo struct { } type StreamURL struct { - DbId int64 - Content string - M3UIndex int - MaxConcurrency int + DbId int64 + Content string + M3UIndex int } diff --git a/entrypoint.sh b/entrypoint.sh deleted file mode 100644 index d2c81156..00000000 --- a/entrypoint.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/sh - -# Define function to gracefully stop services -stop_services() { - echo "Stopping services..." - # Stop the Go application gracefully - kill -TERM "$gomain_pid" 2>/dev/null - # Stop the Redis server gracefully - redis-cli shutdown - exit 0 -} - -# Trap SIGTERM signal to stop services gracefully -trap stop_services TERM - -# Start Redis server -redis-server --daemonize yes &> /dev/null & - -# Wait until Redis is ready -while ! redis-cli ping &>/dev/null; do - sleep 0.1 -done - -# Start Go application and redirect its stdout to the container's stdout -/gomain & -gomain_pid=$! - -# Wait for SIGTERM signal to stop services gracefully -wait "$gomain_pid" diff --git a/go.mod b/go.mod index 7bf34514..6c936f43 100644 --- a/go.mod +++ b/go.mod @@ -2,11 +2,26 @@ module m3u-stream-merger go 1.21.5 -require github.com/mattn/go-sqlite3 v1.14.22 +require ( + github.com/hashicorp/go-memdb v1.3.4 + modernc.org/sqlite v1.29.2 +) require ( - github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/go-redis/redis/v8 v8.11.5 // indirect - github.com/redis/go-redis/v9 v9.5.1 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.3.0 // indirect + github.com/hashicorp/go-immutable-radix v1.3.0 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect + github.com/mattn/go-isatty v0.0.16 // indirect + github.com/mattn/go-sqlite3 v1.14.22 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + golang.org/x/sys v0.16.0 // indirect + modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 // indirect + modernc.org/libc v1.41.0 // indirect + modernc.org/mathutil v1.6.0 // indirect + modernc.org/memory v1.7.2 // indirect + modernc.org/strutil v1.2.0 // indirect + modernc.org/token v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 4ca790b0..b0390b99 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,48 @@ -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= -github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= -github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26 h1:Xim43kblpZXfIBQsbuBVKCudVG457BR2GZFIz3uw3hQ= +github.com/google/pprof v0.0.0-20221118152302-e6195bd50e26/go.mod h1:dDKJzRmX4S37WGHujM7tX//fmj1uioxKzKxz3lo4HJo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/go-immutable-radix v1.3.0 h1:8exGP7ego3OmkfksihtSouGMZ+hQrhxx+FVELeXpVPE= +github.com/hashicorp/go-immutable-radix v1.3.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-memdb v1.3.4 h1:XSL3NR682X/cVk2IeV0d70N4DZ9ljI885xAEU8IoK3c= +github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg= +github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= +github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/redis/go-redis/v9 v9.5.1 h1:H1X4D3yHPaYrkL5X06Wh6xNVM/pX0Ft4RV0vMGvLBh8= -github.com/redis/go-redis/v9 v9.5.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0= +golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/tools v0.17.0 h1:FvmRgNOcs3kOa+T20R1uhfP9F6HgG2mfxDv1vrx1Htc= +golang.org/x/tools v0.17.0/go.mod h1:xsh6VxdV005rRVaS6SSAf9oiAqljS7UZUacMZ8Bnsps= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6 h1:5D53IMaUuA5InSeMu9eJtlQXS2NxAhyWQvkKEgXZhHI= +modernc.org/gc/v3 v3.0.0-20240107210532-573471604cb6/go.mod h1:Qz0X07sNOR1jWYCrJMEnbW/X55x206Q7Vt4mz6/wHp4= +modernc.org/libc v1.41.0 h1:g9YAc6BkKlgORsUWj+JwqoB1wU3o4DE3bM3yvA3k+Gk= +modernc.org/libc v1.41.0/go.mod h1:w0eszPsiXoOnoMJgrXjglgLuDy/bt5RR4y3QzUUeodY= +modernc.org/mathutil v1.6.0 h1:fRe9+AmYlaej+64JsEEhoWuAYBkOtQiMEU7n/XgfYi4= +modernc.org/mathutil v1.6.0/go.mod h1:Ui5Q9q1TR2gFm0AQRqQUaBWFLAhQpCwNcuhBOSedWPo= +modernc.org/memory v1.7.2 h1:Klh90S215mmH8c9gO98QxQFsY+W451E8AnzjoE2ee1E= +modernc.org/memory v1.7.2/go.mod h1:NO4NVCQy0N7ln+T9ngWqOQfi7ley4vpwvARR+Hjw95E= +modernc.org/sqlite v1.29.2 h1:xgBSyA3gemwgP31PWFfFjtBorQNYpeypGdoSDjXhrgI= +modernc.org/sqlite v1.29.2/go.mod h1:hG41jCYxOAOoO6BRK66AdRlmOcDzXf7qnwlwjUIOqa0= +modernc.org/strutil v1.2.0 h1:agBi9dp1I+eOnxXeiZawM8F4LawKv4NzGWSaLfyeNZA= +modernc.org/strutil v1.2.0/go.mod h1:/mdcBmfOibveCTBxUl5B5l6W+TTH1FXPLHZE6bTosX0= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/m3u/parser.go b/m3u/parser.go index ffe7cbf2..4476eedc 100644 --- a/m3u/parser.go +++ b/m3u/parser.go @@ -156,9 +156,8 @@ func ParseM3UFromURL(db *sql.DB, m3uURL string, m3uIndex int, maxConcurrency int if existingUrl.Content != line || existingUrl.M3UIndex != m3uIndex { _, err = database.InsertStreamUrl(db, dbId, database.StreamURL{ - Content: line, - M3UIndex: m3uIndex, - MaxConcurrency: maxConcurrency, + Content: line, + M3UIndex: m3uIndex, }) } diff --git a/main.go b/main.go index 5fc81ac1..5f9abc72 100644 --- a/main.go +++ b/main.go @@ -155,11 +155,6 @@ func main() { log.Fatalf("Error initializing current SQLite database: %v", err) } - redisClient := database.InitializeRedis() - if err := redisClient.Ping(context.Background()).Err(); err != nil { - log.Fatalf("Failed to connect to Redis: %s\n", err) - } - go updateSources(ctx) // HTTP handlers diff --git a/mp4_handler.go b/mp4_handler.go index 5323c9ce..696de01c 100644 --- a/mp4_handler.go +++ b/mp4_handler.go @@ -1,7 +1,6 @@ package main import ( - "context" "database/sql" "errors" "fmt" @@ -14,15 +13,13 @@ import ( "strconv" "strings" "syscall" - - "github.com/redis/go-redis/v9" ) -func loadBalancer(ctx context.Context, stream database.StreamInfo) (resp *http.Response, selectedUrl *database.StreamURL, err error) { +func loadBalancer(stream database.StreamInfo) (resp *http.Response, selectedUrl *database.StreamURL, err error) { // Concurrency check mode for _, url := range stream.URLs { - if checkConcurrency(ctx, url.M3UIndex) { - log.Printf("Concurrency limit reached (%d): %s", url.MaxConcurrency, url.Content) + if checkConcurrency(url.M3UIndex) { + log.Printf("Concurrency limit reached (%s): %s", os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", url.M3UIndex)), url.Content) continue // Skip this stream if concurrency limit reached } @@ -100,13 +97,13 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Iterate through the streams and select one based on concurrency and availability var selectedUrl *database.StreamURL - resp, selectedUrl, err = loadBalancer(ctx, stream) + resp, selectedUrl, err = loadBalancer(stream) if err != nil { http.Error(w, "Error fetching MP4 stream. Exhausted all streams.", http.StatusInternalServerError) return } log.Printf("Proxying %s to %s\n", r.RemoteAddr, selectedUrl.Content) - updateConcurrency(ctx, selectedUrl.M3UIndex, true) + updateConcurrency(selectedUrl.M3UIndex, true) // Log the successful response log.Printf("Sent MP4 stream to %s\n", r.RemoteAddr) @@ -116,7 +113,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { case <-ctx.Done(): // Connection closed, handle accordingly log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(ctx, selectedUrl.M3UIndex, false) + updateConcurrency(selectedUrl.M3UIndex, false) return default: // Connection still open, proceed with writing to the response @@ -125,7 +122,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { // Log the error if errors.Is(err, syscall.EPIPE) { log.Println("Client disconnected after fetching MP4 stream") - updateConcurrency(ctx, selectedUrl.M3UIndex, false) + updateConcurrency(selectedUrl.M3UIndex, false) } else { log.Printf("Error copying MP4 stream to response: %s\n", err.Error()) } @@ -134,7 +131,7 @@ func mp4Handler(w http.ResponseWriter, r *http.Request, db *sql.DB) { } } -func checkConcurrency(ctx context.Context, m3uIndex int) bool { +func checkConcurrency(m3uIndex int) bool { maxConcurrency := 1 var err error rawMaxConcurrency, maxConcurrencyExists := os.LookupEnv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex)) @@ -145,31 +142,22 @@ func checkConcurrency(ctx context.Context, m3uIndex int) bool { } } - redisClient := database.InitializeRedis() - val, err := redisClient.Get(ctx, fmt.Sprintf("m3u_%d", m3uIndex)).Result() - if err == redis.Nil { - return false // Key does not exist - } else if err != nil { + count, err := database.GetConcurrency(m3uIndex) + if err != nil { log.Printf("Error checking concurrency: %s\n", err.Error()) return false // Error occurred, treat as concurrency not reached } - count, err := strconv.Atoi(val) - if err != nil { - count = 0 - } - log.Printf("Current concurrent connections for M3U_%d: %d", m3uIndex, count) return count >= maxConcurrency } -func updateConcurrency(ctx context.Context, m3uIndex int, incr bool) { - redisClient := database.InitializeRedis() +func updateConcurrency(m3uIndex int, incr bool) { var err error if incr { - err = redisClient.Incr(ctx, fmt.Sprintf("m3u_%d", m3uIndex)).Err() + err = database.IncrementConcurrency(m3uIndex) } else { - err = redisClient.Decr(ctx, fmt.Sprintf("m3u_%d", m3uIndex)).Err() + err = database.DecrementConcurrency(m3uIndex) } if err != nil { log.Printf("Error updating concurrency: %s\n", err.Error())