From e3a898c8cf5e2faa251d8f6ac85d199d4a8819ba Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 15:41:22 -0500 Subject: [PATCH 01/13] add subindex for inter-m3u duplicates --- handlers/stream_handler.go | 7 +-- proxy/load_balancer.go | 52 +++++++++++---------- proxy/proxy_stream.go | 6 +-- store/concurrency.go | 42 ++++++++--------- store/downloader.go | 4 +- store/parser.go | 93 +++++++++++++++++++++++++++++--------- store/sessions.go | 6 +-- store/streams.go | 35 ++++++++------ store/types.go | 12 ++--- tests/proxy_test.go | 4 +- updater/updater.go | 6 +-- utils/env.go | 12 ++--- utils/m3u_path.go | 4 +- 13 files changed, 170 insertions(+), 113 deletions(-) diff --git a/handlers/stream_handler.go b/handlers/stream_handler.go index 1e9b84bf..eea10f64 100644 --- a/handlers/stream_handler.go +++ b/handlers/stream_handler.go @@ -32,7 +32,8 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency return } - var selectedIndex int + var selectedIndex string + var selectedSubIndex string var selectedUrl string session := store.GetOrCreateSession(r) @@ -46,7 +47,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency }() for { - resp, selectedUrl, selectedIndex, err = stream.LoadBalancer(ctx, &session, r.Method) + resp, selectedUrl, selectedIndex, selectedSubIndex, err = stream.LoadBalancer(ctx, &session, r.Method) if err != nil { utils.SafeLogf("Error reloading stream for %s: %v\n", streamUrl, err) return @@ -77,7 +78,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency proxyCtx, proxyCtxCancel := context.WithCancel(ctx) defer proxyCtxCancel() - go stream.ProxyStream(proxyCtx, selectedIndex, resp, r, w, exitStatus) + go stream.ProxyStream(proxyCtx, selectedIndex, selectedSubIndex, resp, r, w, exitStatus) select { case <-ctx.Done(): diff --git a/proxy/load_balancer.go b/proxy/load_balancer.go index bca28b69..12bd9af0 100644 --- a/proxy/load_balancer.go +++ b/proxy/load_balancer.go @@ -31,13 +31,13 @@ func NewStreamInstance(streamUrl string, cm *store.ConcurrencyManager) (*StreamI }, nil } -func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, int, error) { +func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store.Session, method string) (*http.Response, string, string, string, error) { debug := os.Getenv("DEBUG") == "true" m3uIndexes := utils.GetM3UIndexes() sort.Slice(m3uIndexes, func(i, j int) bool { - return instance.Cm.ConcurrencyPriorityValue(i) > instance.Cm.ConcurrencyPriorityValue(j) + return instance.Cm.ConcurrencyPriorityValue(m3uIndexes[i], "0") > instance.Cm.ConcurrencyPriorityValue(m3uIndexes[j], "0") }) maxLapsString := os.Getenv("MAX_RETRIES") @@ -60,43 +60,45 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store select { case <-ctx.Done(): - return nil, "", -1, fmt.Errorf("Cancelling load balancer.") + return nil, "", "", "", fmt.Errorf("Cancelling load balancer.") default: for _, index := range m3uIndexes { - if slices.Contains(session.TestedIndexes, index) { - utils.SafeLogf("Skipping M3U_%d: marked as previous stream\n", index+1) - continue - } - - url, ok := instance.Info.URLs[index] + innerMap, ok := instance.Info.URLs[index] if !ok { - utils.SafeLogf("Channel not found from M3U_%d: %s\n", index+1, instance.Info.Title) + utils.SafeLogf("Channel not found from M3U_%s: %s\n", index, instance.Info.Title) continue } - if instance.Cm.CheckConcurrency(index) { - utils.SafeLogf("Concurrency limit reached for M3U_%d: %s\n", index+1, url) - continue - } + for subIndex, url := range innerMap { + if slices.Contains(session.TestedIndexes, index+"|"+subIndex) { + utils.SafeLogf("Skipping M3U_%s|%s: marked as previous stream\n", index, subIndex) + continue + } + + if instance.Cm.CheckConcurrency(index, subIndex) { + utils.SafeLogf("Concurrency limit reached for M3U_%s|%s: %s\n", index, subIndex, url) + continue + } - resp, err := utils.CustomHttpRequest(method, url) - if err == nil { + resp, err := utils.CustomHttpRequest(method, url) + if err == nil { + if debug { + utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url) + } + return resp, url, index, subIndex, nil + } + utils.SafeLogf("Error fetching stream: %s\n", err.Error()) if debug { - utils.SafeLogf("[DEBUG] Successfully fetched stream from %s\n", url) + utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error()) } - return resp, url, index, nil - } - utils.SafeLogf("Error fetching stream: %s\n", err.Error()) - if debug { - utils.SafeLogf("[DEBUG] Error fetching stream from %s: %s\n", url, err.Error()) + session.SetTestedIndexes(append(session.TestedIndexes, index+"|"+subIndex)) } - session.SetTestedIndexes(append(session.TestedIndexes, index)) } if debug { utils.SafeLogf("[DEBUG] All streams skipped in lap %d\n", lap) } - session.SetTestedIndexes([]int{}) + session.SetTestedIndexes([]string{}) } @@ -112,5 +114,5 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store lap++ } - return nil, "", -1, fmt.Errorf("Error fetching stream. Exhausted all streams.") + return nil, "", "", "", fmt.Errorf("Error fetching stream. Exhausted all streams.") } diff --git a/proxy/proxy_stream.go b/proxy/proxy_stream.go index 3a3590e5..abbd0087 100644 --- a/proxy/proxy_stream.go +++ b/proxy/proxy_stream.go @@ -13,7 +13,7 @@ import ( "time" ) -func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex int, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) { +func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex string, subIndex string, resp *http.Response, r *http.Request, w http.ResponseWriter, statusChan chan int) { debug := os.Getenv("DEBUG") == "true" bufferMbInt, err := strconv.Atoi(os.Getenv("BUFFER_MB")) @@ -73,12 +73,12 @@ func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex int, r return } - instance.Cm.UpdateConcurrency(m3uIndex, true) + instance.Cm.UpdateConcurrency(m3uIndex, subIndex, true) defer func() { if debug { utils.SafeLogf("[DEBUG] Defer executed for stream: %s\n", r.RemoteAddr) } - instance.Cm.UpdateConcurrency(m3uIndex, false) + instance.Cm.UpdateConcurrency(m3uIndex, subIndex, false) }() defer func() { diff --git a/store/concurrency.go b/store/concurrency.go index 3b49db26..1e9c341e 100644 --- a/store/concurrency.go +++ b/store/concurrency.go @@ -10,64 +10,64 @@ import ( type ConcurrencyManager struct { mu sync.Mutex - count map[int]int + count map[string]map[string]int } func NewConcurrencyManager() *ConcurrencyManager { - return &ConcurrencyManager{count: make(map[int]int)} + return &ConcurrencyManager{count: make(map[string]map[string]int)} } -func (cm *ConcurrencyManager) Increment(m3uIndex int) { +func (cm *ConcurrencyManager) Increment(m3uIndex string, subIndex string) { cm.mu.Lock() defer cm.mu.Unlock() - cm.count[m3uIndex]++ + cm.count[m3uIndex][subIndex]++ } -func (cm *ConcurrencyManager) Decrement(m3uIndex int) { +func (cm *ConcurrencyManager) Decrement(m3uIndex string, subIndex string) { cm.mu.Lock() defer cm.mu.Unlock() - if cm.count[m3uIndex] > 0 { - cm.count[m3uIndex]-- + if cm.count[m3uIndex][subIndex] > 0 { + cm.count[m3uIndex][subIndex]-- } } -func (cm *ConcurrencyManager) GetCount(m3uIndex int) int { +func (cm *ConcurrencyManager) GetCount(m3uIndex string, subIndex string) int { cm.mu.Lock() defer cm.mu.Unlock() - return cm.count[m3uIndex] + return cm.count[m3uIndex][subIndex] } -func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex int) int { - maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex+1))) +func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string, subIndex string) int { + maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex))) if err != nil { maxConcurrency = 1 } - count := cm.GetCount(m3uIndex) + count := cm.GetCount(m3uIndex, subIndex) return maxConcurrency - count } -func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex int) bool { - maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%d", m3uIndex+1))) +func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string, subIndex string) bool { + maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex))) if err != nil { maxConcurrency = 1 } - count := cm.GetCount(m3uIndex) + count := cm.GetCount(m3uIndex, subIndex) - utils.SafeLogf("Current number of connections for M3U_%d: %d", m3uIndex+1, count) + utils.SafeLogf("Current number of connections for M3U_%s|%s: %d", m3uIndex, subIndex, count) return count >= maxConcurrency } -func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex int, incr bool) { +func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, subIndex string, incr bool) { if incr { - cm.Increment(m3uIndex) + cm.Increment(m3uIndex, subIndex) } else { - cm.Decrement(m3uIndex) + cm.Decrement(m3uIndex, subIndex) } - count := cm.GetCount(m3uIndex) - utils.SafeLogf("Current number of connections for M3U_%d: %d", m3uIndex+1, count) + count := cm.GetCount(m3uIndex, subIndex) + utils.SafeLogf("Current number of connections for M3U_%s|%s: %d", m3uIndex, subIndex, count) } diff --git a/store/downloader.go b/store/downloader.go index 5888e0ed..8df02df3 100644 --- a/store/downloader.go +++ b/store/downloader.go @@ -10,9 +10,9 @@ import ( "m3u-stream-merger/utils" ) -func DownloadM3USource(m3uIndex int) (err error) { +func DownloadM3USource(m3uIndex string) (err error) { debug := os.Getenv("DEBUG") == "true" - m3uURL := os.Getenv(fmt.Sprintf("M3U_URL_%d", m3uIndex+1)) + m3uURL := os.Getenv(fmt.Sprintf("M3U_URL_%s", m3uIndex)) if debug { utils.SafeLogf("[DEBUG] Processing M3U from: %s\n", m3uURL) diff --git a/store/parser.go b/store/parser.go index 036756a7..83051f28 100644 --- a/store/parser.go +++ b/store/parser.go @@ -3,12 +3,17 @@ package store import ( "bufio" "bytes" + "crypto/md5" "encoding/base64" + "encoding/hex" + "errors" "fmt" "os" "path/filepath" "regexp" + "strconv" "strings" + "time" "m3u-stream-merger/utils" @@ -23,30 +28,43 @@ func ParseStreamInfoBySlug(slug string) (*StreamInfo, error) { return nil, err } - initInfo.URLs = make(map[int]string) + initInfo.URLs = make(map[string]map[string]string) indexes := utils.GetM3UIndexes() for _, m3uIndex := range indexes { - fileName := fmt.Sprintf("%s_%d", base64.StdEncoding.EncodeToString([]byte(initInfo.Title)), m3uIndex) - urlEncoded, err := os.ReadFile(filepath.Join(streamsDirPath, fileName)) - if err != nil { - continue - } + fileName := fmt.Sprintf("%s_%s", base64.StdEncoding.EncodeToString([]byte(initInfo.Title)), m3uIndex) - url, err := base64.StdEncoding.DecodeString(string(urlEncoded)) + fileMatches, err := filepath.Glob(filepath.Join(streamsDirPath, fileName, "___*")) if err != nil { continue } - initInfo.URLs[m3uIndex] = strings.TrimSpace(string(url)) + for _, fileMatch := range fileMatches { + fileNameSplit := strings.Split(filepath.Base(fileMatch), "___") + if len(fileNameSplit) != 2 { + continue + } + + urlEncoded, err := os.ReadFile(fileMatch) + if err != nil { + continue + } + + url, err := base64.StdEncoding.DecodeString(string(urlEncoded)) + if err != nil { + continue + } + + initInfo.URLs[m3uIndex][fileNameSplit[1]] = strings.TrimSpace(string(url)) + } } return initInfo, nil } -func M3UScanner(m3uIndex int, fn func(streamInfo StreamInfo)) error { - utils.SafeLogf("Parsing M3U #%d...\n", m3uIndex+1) +func M3UScanner(m3uIndex string, fn func(streamInfo StreamInfo)) error { + utils.SafeLogf("Parsing M3U #%s...\n", m3uIndex) filePath := utils.GetM3UFilePathByIndex(m3uIndex) file, err := os.Open(filePath) @@ -66,12 +84,15 @@ func M3UScanner(m3uIndex int, fn func(streamInfo StreamInfo)) error { scanner := bufio.NewScanner(bytes.NewReader(mappedFile)) var currentLine string + sessionIdHash := md5.Sum([]byte(time.Now().String())) + sessionId := hex.EncodeToString(sessionIdHash[:]) + for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if strings.HasPrefix(line, "#EXTINF:") { currentLine = line } else if currentLine != "" && !strings.HasPrefix(line, "#") { - streamInfo := parseLine(currentLine, line, m3uIndex) + streamInfo := parseLine(sessionId, currentLine, line, m3uIndex) currentLine = "" if checkFilter(streamInfo) { @@ -80,6 +101,19 @@ func M3UScanner(m3uIndex int, fn func(streamInfo StreamInfo)) error { } } + entries, err := os.ReadDir(streamsDirPath) + if err != nil { + return fmt.Errorf("error reading dir path: %w", err) + } + + for _, e := range entries { + if e.Name() == sessionId { + continue + } + + _ = os.RemoveAll(filepath.Join(streamsDirPath, sessionId)) + } + if err := scanner.Err(); err != nil { return fmt.Errorf("error reading M3U file: %w", err) } @@ -87,18 +121,17 @@ func M3UScanner(m3uIndex int, fn func(streamInfo StreamInfo)) error { return nil } -func parseLine(line string, nextLine string, m3uIndex int) StreamInfo { +func parseLine(sessionId string, line string, nextLine string, m3uIndex string) StreamInfo { debug := os.Getenv("DEBUG") == "true" if debug { utils.SafeLogf("[DEBUG] Parsing line: %s\n", line) utils.SafeLogf("[DEBUG] Next line: %s\n", nextLine) - utils.SafeLogf("[DEBUG] M3U index: %d\n", m3uIndex) + utils.SafeLogf("[DEBUG] M3U index: %s\n", m3uIndex) } cleanUrl := strings.TrimSpace(nextLine) currentStream := StreamInfo{} - currentStream.URLs = map[int]string{m3uIndex: cleanUrl} lineWithoutPairs := line @@ -145,16 +178,32 @@ func parseLine(line string, nextLine string, m3uIndex int) StreamInfo { currentStream.Title = utils.TvgNameParser(strings.TrimSpace(lineCommaSplit[1])) } - fileName := fmt.Sprintf("%s_%d", base64.StdEncoding.EncodeToString([]byte(currentStream.Title)), m3uIndex) encodedUrl := base64.StdEncoding.EncodeToString([]byte(cleanUrl)) - err := os.MkdirAll(streamsDirPath, os.ModePerm) - if err != nil { - utils.SafeLogf("[DEBUG] Error creating stream cache folder: %s -> %v\n", streamsDirPath, err) - } - err = os.WriteFile(filepath.Join(streamsDirPath, fileName), []byte(encodedUrl), 0644) - if err != nil { - utils.SafeLogf("[DEBUG] Error indexing stream: %s (#%d) -> %v\n", currentStream.Title, m3uIndex+1, err) + sessionDirPath := filepath.Join(streamsDirPath, sessionId) + + for i := 0; true; i++ { + fileName := fmt.Sprintf("%s_%s___%d", base64.StdEncoding.EncodeToString([]byte(currentStream.Title)), m3uIndex, i) + + err := os.MkdirAll(sessionDirPath, os.ModePerm) + if err != nil { + utils.SafeLogf("[DEBUG] Error creating stream cache folder: %s -> %v\n", sessionDirPath, err) + } + + if _, err := os.Stat(filepath.Join(sessionDirPath, fileName)); errors.Is(err, os.ErrNotExist) { + err = os.WriteFile(filepath.Join(sessionDirPath, fileName), []byte(encodedUrl), 0644) + if err != nil { + utils.SafeLogf("[DEBUG] Error indexing stream: %s (#%s) -> %v\n", currentStream.Title, m3uIndex, err) + } + + currentStream.URLs = map[string]map[string]string{ + m3uIndex: { + strconv.Itoa(i): cleanUrl, + }, + } + + break + } } return currentStream diff --git a/store/sessions.go b/store/sessions.go index 7c3b73ab..18c0576f 100644 --- a/store/sessions.go +++ b/store/sessions.go @@ -11,7 +11,7 @@ import ( type Session struct { ID string CreatedAt time.Time - TestedIndexes []int + TestedIndexes []string } var sessionStore = struct { @@ -36,7 +36,7 @@ func GetOrCreateSession(r *http.Request) Session { session = Session{ ID: fingerprint, CreatedAt: time.Now(), - TestedIndexes: []int{}, + TestedIndexes: []string{}, } sessionStore.Lock() @@ -58,7 +58,7 @@ func ClearSessionStore() { sessionStore.Unlock() } -func (s *Session) SetTestedIndexes(indexes []int) { +func (s *Session) SetTestedIndexes(indexes []string) { debug := os.Getenv("DEBUG") == "true" s.TestedIndexes = indexes diff --git a/store/streams.go b/store/streams.go index c5e00d1d..02018a58 100644 --- a/store/streams.go +++ b/store/streams.go @@ -27,14 +27,21 @@ func GetStreams() []StreamInfo { var wg sync.WaitGroup for _, m3uIndex := range utils.GetM3UIndexes() { wg.Add(1) - go func(m3uIndex int) { + go func(m3uIndex string) { defer wg.Done() err := M3UScanner(m3uIndex, func(streamInfo StreamInfo) { // Check uniqueness and update if necessary if existingStream, exists := streams.Load(streamInfo.Title); exists { - for idx, url := range streamInfo.URLs { - existingStream.(StreamInfo).URLs[idx] = url + for idx, innerMap := range streamInfo.URLs { + if _, ok := existingStream.(StreamInfo).URLs[idx]; !ok { + existingStream.(StreamInfo).URLs[idx] = innerMap + continue + } + + for subIdx, url := range innerMap { + existingStream.(StreamInfo).URLs[idx][subIdx] = url + } } streams.Store(streamInfo.Title, existingStream) } else { @@ -62,18 +69,20 @@ func GetStreams() []StreamInfo { func GenerateStreamURL(baseUrl string, stream StreamInfo) string { var subPath string var err error - for _, srcUrl := range stream.URLs { - subPath, err = utils.GetSubPathFromUrl(srcUrl) - if err != nil { - continue - } + for _, innerMap := range stream.URLs { + for _, srcUrl := range innerMap { + subPath, err = utils.GetSubPathFromUrl(srcUrl) + if err != nil { + continue + } - ext, err := utils.GetFileExtensionFromUrl(srcUrl) - if err != nil { - return fmt.Sprintf("%s/p/%s/%s", baseUrl, subPath, EncodeSlug(stream)) - } + ext, err := utils.GetFileExtensionFromUrl(srcUrl) + if err != nil { + return fmt.Sprintf("%s/p/%s/%s", baseUrl, subPath, EncodeSlug(stream)) + } - return fmt.Sprintf("%s/p/%s/%s%s", baseUrl, subPath, EncodeSlug(stream), ext) + return fmt.Sprintf("%s/p/%s/%s%s", baseUrl, subPath, EncodeSlug(stream), ext) + } } return fmt.Sprintf("%s/p/stream/%s", baseUrl, EncodeSlug(stream)) } diff --git a/store/types.go b/store/types.go index 560c0181..40b06ef3 100644 --- a/store/types.go +++ b/store/types.go @@ -1,10 +1,10 @@ package store type StreamInfo struct { - Title string `json:"title"` - TvgID string `json:"tvg_id"` - TvgChNo string `json:"tvg_ch"` - LogoURL string `json:"logo"` - Group string `json:"group"` - URLs map[int]string `json:"-"` + Title string `json:"title"` + TvgID string `json:"tvg_id"` + TvgChNo string `json:"tvg_ch"` + LogoURL string `json:"logo"` + Group string `json:"group"` + URLs map[string]map[string]string `json:"-"` } diff --git a/tests/proxy_test.go b/tests/proxy_test.go index fe254e76..068f81bb 100644 --- a/tests/proxy_test.go +++ b/tests/proxy_test.go @@ -17,7 +17,7 @@ func TestStreamHandler(t *testing.T) { os.Setenv("M3U_URL_1", "https://gist.githubusercontent.com/sonroyaalmerol/de1c90e8681af040924da5d15c7f530d/raw/06844df09e69ea278060252ca5aa8d767eb4543d/test-m3u.m3u") os.Setenv("INCLUDE_GROUPS_1", "movies") - err := store.DownloadM3USource(0) + err := store.DownloadM3USource("1") if err != nil { t.Errorf("Downloader returned error: %v", err) } @@ -54,7 +54,7 @@ func TestStreamHandler(t *testing.T) { t.Errorf("%s - Expected status code %d, got %d", stream.Title, http.StatusOK, resp.StatusCode) } - res, err := http.Get(stream.URLs[0]) + res, err := http.Get(stream.URLs["1"]["0"]) if err != nil { t.Errorf("HttpGet returned error: %v", err) } diff --git a/updater/updater.go b/updater/updater.go index 04e1943a..82f28749 100644 --- a/updater/updater.go +++ b/updater/updater.go @@ -80,14 +80,14 @@ func (instance *Updater) UpdateSources(ctx context.Context) { indexes := utils.GetM3UIndexes() for _, idx := range indexes { - utils.SafeLogf("Background process: Fetching M3U_URL_%d...\n", idx+1) + utils.SafeLogf("Background process: Fetching M3U_URL_%s...\n", idx) wg.Add(1) // Start the goroutine for periodic updates - go func(idx int) { + go func(idx string) { defer wg.Done() err := store.DownloadM3USource(idx) if err != nil && debug { - utils.SafeLogf("Background process: Error fetching M3U_URL_%d: %v\n", idx+1, err) + utils.SafeLogf("Background process: Error fetching M3U_URL_%s: %v\n", idx, err) } }(idx) } diff --git a/utils/env.go b/utils/env.go index 6e79a249..703209a7 100644 --- a/utils/env.go +++ b/utils/env.go @@ -22,23 +22,19 @@ func GetEnv(env string) string { } } -var m3uIndexes []int +var m3uIndexes []string var m3uIndexesInitialized bool -func GetM3UIndexes() []int { +func GetM3UIndexes() []string { if m3uIndexesInitialized { return m3uIndexes } - m3uIndexes = []int{} + m3uIndexes = []string{} for _, env := range os.Environ() { pair := strings.SplitN(env, "=", 2) if strings.HasPrefix(pair[0], "M3U_URL_") { indexString := strings.TrimPrefix(pair[0], "M3U_URL_") - index, err := strconv.Atoi(indexString) - if err != nil { - continue - } - m3uIndexes = append(m3uIndexes, index-1) + m3uIndexes = append(m3uIndexes, indexString) } } m3uIndexesInitialized = true diff --git a/utils/m3u_path.go b/utils/m3u_path.go index 6316a49c..43602b1b 100644 --- a/utils/m3u_path.go +++ b/utils/m3u_path.go @@ -2,8 +2,8 @@ package utils import "fmt" -func GetM3UFilePathByIndex(m3uIndex int) string { - m3uFile := fmt.Sprintf("/tmp/m3u-proxy/sources/%d.m3u", m3uIndex+1) +func GetM3UFilePathByIndex(m3uIndex string) string { + m3uFile := fmt.Sprintf("/tmp/m3u-proxy/sources/%s.m3u", m3uIndex) return m3uFile } From 168fd1da93ace04e2dea6f5de6a384710b6cc571 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 15:45:52 -0500 Subject: [PATCH 02/13] standardize subindex delimiter --- handlers/stream_handler.go | 2 +- store/parser.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/handlers/stream_handler.go b/handlers/stream_handler.go index eea10f64..8a74a697 100644 --- a/handlers/stream_handler.go +++ b/handlers/stream_handler.go @@ -92,7 +92,7 @@ func StreamHandler(w http.ResponseWriter, r *http.Request, cm *store.Concurrency return } else if streamExitCode == 1 || streamExitCode == 2 { // Retry on server-side connection errors - session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex)) + session.SetTestedIndexes(append(session.TestedIndexes, selectedIndex+"|"+selectedSubIndex)) utils.SafeLogf("Retrying other servers...\n") proxyCtxCancel() } else if streamExitCode == 4 { diff --git a/store/parser.go b/store/parser.go index 83051f28..b33c0600 100644 --- a/store/parser.go +++ b/store/parser.go @@ -35,13 +35,13 @@ func ParseStreamInfoBySlug(slug string) (*StreamInfo, error) { for _, m3uIndex := range indexes { fileName := fmt.Sprintf("%s_%s", base64.StdEncoding.EncodeToString([]byte(initInfo.Title)), m3uIndex) - fileMatches, err := filepath.Glob(filepath.Join(streamsDirPath, fileName, "___*")) + fileMatches, err := filepath.Glob(filepath.Join(streamsDirPath, fileName, "|*")) if err != nil { continue } for _, fileMatch := range fileMatches { - fileNameSplit := strings.Split(filepath.Base(fileMatch), "___") + fileNameSplit := strings.Split(filepath.Base(fileMatch), "|") if len(fileNameSplit) != 2 { continue } @@ -183,7 +183,7 @@ func parseLine(sessionId string, line string, nextLine string, m3uIndex string) sessionDirPath := filepath.Join(streamsDirPath, sessionId) for i := 0; true; i++ { - fileName := fmt.Sprintf("%s_%s___%d", base64.StdEncoding.EncodeToString([]byte(currentStream.Title)), m3uIndex, i) + fileName := fmt.Sprintf("%s_%s|%d", base64.StdEncoding.EncodeToString([]byte(currentStream.Title)), m3uIndex, i) err := os.MkdirAll(sessionDirPath, os.ModePerm) if err != nil { From 60c102d027b69e91edaf4622584d13042c68aa2e Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 15:54:45 -0500 Subject: [PATCH 03/13] ensure maps are initialized --- store/parser.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/store/parser.go b/store/parser.go index b33c0600..ba60fda7 100644 --- a/store/parser.go +++ b/store/parser.go @@ -40,6 +40,8 @@ func ParseStreamInfoBySlug(slug string) (*StreamInfo, error) { continue } + initInfo.URLs[m3uIndex] = make(map[string]string) + for _, fileMatch := range fileMatches { fileNameSplit := strings.Split(filepath.Base(fileMatch), "|") if len(fileNameSplit) != 2 { @@ -196,11 +198,9 @@ func parseLine(sessionId string, line string, nextLine string, m3uIndex string) utils.SafeLogf("[DEBUG] Error indexing stream: %s (#%s) -> %v\n", currentStream.Title, m3uIndex, err) } - currentStream.URLs = map[string]map[string]string{ - m3uIndex: { - strconv.Itoa(i): cleanUrl, - }, - } + currentStream.URLs = make(map[string]map[string]string) + currentStream.URLs[m3uIndex] = make(map[string]string) + currentStream.URLs[m3uIndex][strconv.Itoa(i)] = cleanUrl break } From bcd462cdbb5b732c5a5ea3a1e536eb1c49f306a4 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 16:11:18 -0500 Subject: [PATCH 04/13] fix glob pattern and cleanup --- store/parser.go | 45 +++++++++++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 14 deletions(-) diff --git a/store/parser.go b/store/parser.go index ba60fda7..1cb77545 100644 --- a/store/parser.go +++ b/store/parser.go @@ -23,6 +23,8 @@ import ( const streamsDirPath = "/m3u-proxy/data/streams" func ParseStreamInfoBySlug(slug string) (*StreamInfo, error) { + debug := os.Getenv("DEBUG") == "true" + initInfo, err := DecodeSlug(slug) if err != nil { return nil, err @@ -33,14 +35,22 @@ func ParseStreamInfoBySlug(slug string) (*StreamInfo, error) { indexes := utils.GetM3UIndexes() for _, m3uIndex := range indexes { - fileName := fmt.Sprintf("%s_%s", base64.StdEncoding.EncodeToString([]byte(initInfo.Title)), m3uIndex) + safeTitle := base64.StdEncoding.EncodeToString([]byte(initInfo.Title)) - fileMatches, err := filepath.Glob(filepath.Join(streamsDirPath, fileName, "|*")) + fileName := fmt.Sprintf("%s_%s*", safeTitle, m3uIndex) + globPattern := filepath.Join(streamsDirPath, "*", fileName) + + fileMatches, err := filepath.Glob(globPattern) if err != nil { + if debug { + utils.SafeLogf("Error finding files for pattern %s: %v", globPattern, err) + } continue } - initInfo.URLs[m3uIndex] = make(map[string]string) + if _, exists := initInfo.URLs[m3uIndex]; !exists { + initInfo.URLs[m3uIndex] = make(map[string]string) + } for _, fileMatch := range fileMatches { fileNameSplit := strings.Split(filepath.Base(fileMatch), "|") @@ -113,7 +123,7 @@ func M3UScanner(m3uIndex string, fn func(streamInfo StreamInfo)) error { continue } - _ = os.RemoveAll(filepath.Join(streamsDirPath, sessionId)) + _ = os.RemoveAll(filepath.Join(streamsDirPath, e.Name())) } if err := scanner.Err(); err != nil { @@ -184,24 +194,31 @@ func parseLine(sessionId string, line string, nextLine string, m3uIndex string) sessionDirPath := filepath.Join(streamsDirPath, sessionId) + err := os.MkdirAll(sessionDirPath, os.ModePerm) + if err != nil { + utils.SafeLogf("[DEBUG] Error creating stream cache folder: %s -> %v\n", sessionDirPath, err) + } + for i := 0; true; i++ { fileName := fmt.Sprintf("%s_%s|%d", base64.StdEncoding.EncodeToString([]byte(currentStream.Title)), m3uIndex, i) + filePath := filepath.Join(sessionDirPath, fileName) - err := os.MkdirAll(sessionDirPath, os.ModePerm) - if err != nil { - utils.SafeLogf("[DEBUG] Error creating stream cache folder: %s -> %v\n", sessionDirPath, err) - } - - if _, err := os.Stat(filepath.Join(sessionDirPath, fileName)); errors.Is(err, os.ErrNotExist) { - err = os.WriteFile(filepath.Join(sessionDirPath, fileName), []byte(encodedUrl), 0644) + if _, err := os.Stat(filePath); errors.Is(err, os.ErrNotExist) { + err = os.WriteFile(filePath, []byte(encodedUrl), 0644) if err != nil { utils.SafeLogf("[DEBUG] Error indexing stream: %s (#%s) -> %v\n", currentStream.Title, m3uIndex, err) } - currentStream.URLs = make(map[string]map[string]string) - currentStream.URLs[m3uIndex] = make(map[string]string) - currentStream.URLs[m3uIndex][strconv.Itoa(i)] = cleanUrl + // Initialize maps if not already initialized + if currentStream.URLs == nil { + currentStream.URLs = make(map[string]map[string]string) + } + if currentStream.URLs[m3uIndex] == nil { + currentStream.URLs[m3uIndex] = make(map[string]string) + } + // Add the URL to the map + currentStream.URLs[m3uIndex][strconv.Itoa(i)] = cleanUrl break } } From c1cce96cbc1194e4394fc3b6449406f210f5a970 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 16:21:03 -0500 Subject: [PATCH 05/13] debug: remove cleanup --- store/parser.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/store/parser.go b/store/parser.go index 1cb77545..387ba8d0 100644 --- a/store/parser.go +++ b/store/parser.go @@ -113,18 +113,18 @@ func M3UScanner(m3uIndex string, fn func(streamInfo StreamInfo)) error { } } - entries, err := os.ReadDir(streamsDirPath) - if err != nil { - return fmt.Errorf("error reading dir path: %w", err) - } - - for _, e := range entries { - if e.Name() == sessionId { - continue - } - - _ = os.RemoveAll(filepath.Join(streamsDirPath, e.Name())) - } + // entries, err := os.ReadDir(streamsDirPath) + // if err != nil { + // return fmt.Errorf("error reading dir path: %w", err) + // } + + // for _, e := range entries { + // if e.Name() == sessionId { + // continue + // } + + // _ = os.RemoveAll(filepath.Join(streamsDirPath, e.Name())) + // } if err := scanner.Err(); err != nil { return fmt.Errorf("error reading M3U file: %w", err) From 4f8b67f110406c009ef81f459ac716ae258298d1 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 16:28:33 -0500 Subject: [PATCH 06/13] move session id to getstreams --- store/parser.go | 21 +-------------------- store/streams.go | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/store/parser.go b/store/parser.go index 387ba8d0..86f57dfb 100644 --- a/store/parser.go +++ b/store/parser.go @@ -3,9 +3,7 @@ package store import ( "bufio" "bytes" - "crypto/md5" "encoding/base64" - "encoding/hex" "errors" "fmt" "os" @@ -13,7 +11,6 @@ import ( "regexp" "strconv" "strings" - "time" "m3u-stream-merger/utils" @@ -75,7 +72,7 @@ func ParseStreamInfoBySlug(slug string) (*StreamInfo, error) { return initInfo, nil } -func M3UScanner(m3uIndex string, fn func(streamInfo StreamInfo)) error { +func M3UScanner(m3uIndex string, sessionId string, fn func(streamInfo StreamInfo)) error { utils.SafeLogf("Parsing M3U #%s...\n", m3uIndex) filePath := utils.GetM3UFilePathByIndex(m3uIndex) @@ -96,9 +93,6 @@ func M3UScanner(m3uIndex string, fn func(streamInfo StreamInfo)) error { scanner := bufio.NewScanner(bytes.NewReader(mappedFile)) var currentLine string - sessionIdHash := md5.Sum([]byte(time.Now().String())) - sessionId := hex.EncodeToString(sessionIdHash[:]) - for scanner.Scan() { line := strings.TrimSpace(scanner.Text()) if strings.HasPrefix(line, "#EXTINF:") { @@ -113,19 +107,6 @@ func M3UScanner(m3uIndex string, fn func(streamInfo StreamInfo)) error { } } - // entries, err := os.ReadDir(streamsDirPath) - // if err != nil { - // return fmt.Errorf("error reading dir path: %w", err) - // } - - // for _, e := range entries { - // if e.Name() == sessionId { - // continue - // } - - // _ = os.RemoveAll(filepath.Join(streamsDirPath, e.Name())) - // } - if err := scanner.Err(); err != nil { return fmt.Errorf("error reading M3U file: %w", err) } diff --git a/store/streams.go b/store/streams.go index 02018a58..015a55bc 100644 --- a/store/streams.go +++ b/store/streams.go @@ -1,11 +1,15 @@ package store import ( + "crypto/sha1" + "encoding/hex" "fmt" "m3u-stream-merger/utils" "os" + "path/filepath" "sort" "sync" + "time" ) func GetStreamBySlug(slug string) (StreamInfo, error) { @@ -24,13 +28,16 @@ func GetStreams() []StreamInfo { streams sync.Map ) + sessionIdHash := sha1.Sum([]byte(time.Now().String())) + sessionId := hex.EncodeToString(sessionIdHash[:]) + var wg sync.WaitGroup for _, m3uIndex := range utils.GetM3UIndexes() { wg.Add(1) go func(m3uIndex string) { defer wg.Done() - err := M3UScanner(m3uIndex, func(streamInfo StreamInfo) { + err := M3UScanner(m3uIndex, sessionId, func(streamInfo StreamInfo) { // Check uniqueness and update if necessary if existingStream, exists := streams.Load(streamInfo.Title); exists { for idx, innerMap := range streamInfo.URLs { @@ -55,6 +62,17 @@ func GetStreams() []StreamInfo { } wg.Wait() + entries, err := os.ReadDir(streamsDirPath) + if err == nil { + for _, e := range entries { + if e.Name() == sessionId { + continue + } + + _ = os.RemoveAll(filepath.Join(streamsDirPath, e.Name())) + } + } + streams.Range(func(key, value any) bool { stream := value.(StreamInfo) result = append(result, stream) From 7ac3f18f280c7e4cade59356c5ab542ae641d7af Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 16:33:35 -0500 Subject: [PATCH 07/13] fix concurrency initialization --- store/concurrency.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/store/concurrency.go b/store/concurrency.go index 1e9c341e..2922e2ff 100644 --- a/store/concurrency.go +++ b/store/concurrency.go @@ -20,12 +20,22 @@ func NewConcurrencyManager() *ConcurrencyManager { func (cm *ConcurrencyManager) Increment(m3uIndex string, subIndex string) { cm.mu.Lock() defer cm.mu.Unlock() + + if _, ok := cm.count[m3uIndex]; !ok { + cm.count[m3uIndex] = make(map[string]int) + } + cm.count[m3uIndex][subIndex]++ } func (cm *ConcurrencyManager) Decrement(m3uIndex string, subIndex string) { cm.mu.Lock() defer cm.mu.Unlock() + + if _, ok := cm.count[m3uIndex]; !ok { + cm.count[m3uIndex] = make(map[string]int) + } + if cm.count[m3uIndex][subIndex] > 0 { cm.count[m3uIndex][subIndex]-- } @@ -35,6 +45,10 @@ func (cm *ConcurrencyManager) GetCount(m3uIndex string, subIndex string) int { cm.mu.Lock() defer cm.mu.Unlock() + if _, ok := cm.count[m3uIndex]; !ok { + cm.count[m3uIndex] = make(map[string]int) + } + return cm.count[m3uIndex][subIndex] } From 82e054ea832f1ff2a49b16da4f7f2dcb887ff5e9 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 16:56:05 -0500 Subject: [PATCH 08/13] use total concurrency value for m3u --- proxy/load_balancer.go | 6 +++--- store/concurrency.go | 28 +++++++++++++++++++++------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/proxy/load_balancer.go b/proxy/load_balancer.go index 12bd9af0..761533ac 100644 --- a/proxy/load_balancer.go +++ b/proxy/load_balancer.go @@ -37,7 +37,7 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store m3uIndexes := utils.GetM3UIndexes() sort.Slice(m3uIndexes, func(i, j int) bool { - return instance.Cm.ConcurrencyPriorityValue(m3uIndexes[i], "0") > instance.Cm.ConcurrencyPriorityValue(m3uIndexes[j], "0") + return instance.Cm.ConcurrencyPriorityValue(m3uIndexes[i]) > instance.Cm.ConcurrencyPriorityValue(m3uIndexes[j]) }) maxLapsString := os.Getenv("MAX_RETRIES") @@ -75,8 +75,8 @@ func (instance *StreamInstance) LoadBalancer(ctx context.Context, session *store continue } - if instance.Cm.CheckConcurrency(index, subIndex) { - utils.SafeLogf("Concurrency limit reached for M3U_%s|%s: %s\n", index, subIndex, url) + if instance.Cm.CheckConcurrency(index) { + utils.SafeLogf("Concurrency limit reached for M3U_%s: %s\n", index, url) continue } diff --git a/store/concurrency.go b/store/concurrency.go index 2922e2ff..e5f98a18 100644 --- a/store/concurrency.go +++ b/store/concurrency.go @@ -52,27 +52,41 @@ func (cm *ConcurrencyManager) GetCount(m3uIndex string, subIndex string) int { return cm.count[m3uIndex][subIndex] } -func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string, subIndex string) int { +func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int { + cm.mu.Lock() + defer cm.mu.Unlock() + maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex))) if err != nil { maxConcurrency = 1 } - count := cm.GetCount(m3uIndex, subIndex) + totalCount := 0 + for subIndex := range cm.count[m3uIndex] { + count := cm.GetCount(m3uIndex, subIndex) + totalCount += count + } - return maxConcurrency - count + return maxConcurrency - totalCount } -func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string, subIndex string) bool { +func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool { + cm.mu.Lock() + defer cm.mu.Unlock() + maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex))) if err != nil { maxConcurrency = 1 } - count := cm.GetCount(m3uIndex, subIndex) + totalCount := 0 + for subIndex := range cm.count[m3uIndex] { + count := cm.GetCount(m3uIndex, subIndex) + totalCount += count + } - utils.SafeLogf("Current number of connections for M3U_%s|%s: %d", m3uIndex, subIndex, count) - return count >= maxConcurrency + utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, totalCount) + return totalCount >= maxConcurrency } func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, subIndex string, incr bool) { From 39ee8d489a3e2e30297ba70e60ab1916c8e40984 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 16:58:19 -0500 Subject: [PATCH 09/13] use sha3 for id hash --- store/streams.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/streams.go b/store/streams.go index 015a55bc..d03fb098 100644 --- a/store/streams.go +++ b/store/streams.go @@ -1,7 +1,6 @@ package store import ( - "crypto/sha1" "encoding/hex" "fmt" "m3u-stream-merger/utils" @@ -10,6 +9,7 @@ import ( "sort" "sync" "time" + "vendor/golang.org/x/crypto/sha3" ) func GetStreamBySlug(slug string) (StreamInfo, error) { @@ -28,7 +28,7 @@ func GetStreams() []StreamInfo { streams sync.Map ) - sessionIdHash := sha1.Sum([]byte(time.Now().String())) + sessionIdHash := sha3.Sum224([]byte(time.Now().String())) sessionId := hex.EncodeToString(sessionIdHash[:]) var wg sync.WaitGroup From e658d72b2f6538b9a9bc70c4abcb7c909ca2db79 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 17:00:59 -0500 Subject: [PATCH 10/13] fix sha3 import --- go.mod | 5 ++++- go.sum | 4 ++++ store/streams.go | 3 ++- 3 files changed, 10 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index f41a6f64..b94396c9 100644 --- a/go.mod +++ b/go.mod @@ -9,4 +9,7 @@ require ( github.com/robfig/cron/v3 v3.0.1 ) -require golang.org/x/sys v0.1.0 // indirect +require ( + golang.org/x/crypto v0.31.0 // indirect + golang.org/x/sys v0.28.0 // indirect +) diff --git a/go.sum b/go.sum index 80a867d7..adc0afc3 100644 --- a/go.sum +++ b/go.sum @@ -6,5 +6,9 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= +golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/store/streams.go b/store/streams.go index d03fb098..34cec09b 100644 --- a/store/streams.go +++ b/store/streams.go @@ -9,7 +9,8 @@ import ( "sort" "sync" "time" - "vendor/golang.org/x/crypto/sha3" + + "golang.org/x/crypto/sha3" ) func GetStreamBySlug(slug string) (StreamInfo, error) { From 19584b9b6b4252eba0c3564d7c35d6cbf875530e Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 17:14:44 -0500 Subject: [PATCH 11/13] use total count only on logs --- store/concurrency.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/store/concurrency.go b/store/concurrency.go index e5f98a18..8766f010 100644 --- a/store/concurrency.go +++ b/store/concurrency.go @@ -90,12 +90,20 @@ func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool { } func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, subIndex string, incr bool) { + cm.mu.Lock() + defer cm.mu.Unlock() + if incr { cm.Increment(m3uIndex, subIndex) } else { cm.Decrement(m3uIndex, subIndex) } - count := cm.GetCount(m3uIndex, subIndex) - utils.SafeLogf("Current number of connections for M3U_%s|%s: %d", m3uIndex, subIndex, count) + totalCount := 0 + for subIndex := range cm.count[m3uIndex] { + count := cm.GetCount(m3uIndex, subIndex) + totalCount += count + } + + utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, totalCount) } From 30a03da494e18b19b44e5a41de6f068edeba0583 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 17:20:34 -0500 Subject: [PATCH 12/13] revert concurrency system --- proxy/proxy_stream.go | 4 +-- store/concurrency.go | 62 +++++++++++++------------------------------ 2 files changed, 21 insertions(+), 45 deletions(-) diff --git a/proxy/proxy_stream.go b/proxy/proxy_stream.go index abbd0087..507ae40f 100644 --- a/proxy/proxy_stream.go +++ b/proxy/proxy_stream.go @@ -73,12 +73,12 @@ func (instance *StreamInstance) ProxyStream(ctx context.Context, m3uIndex string return } - instance.Cm.UpdateConcurrency(m3uIndex, subIndex, true) + instance.Cm.UpdateConcurrency(m3uIndex, true) defer func() { if debug { utils.SafeLogf("[DEBUG] Defer executed for stream: %s\n", r.RemoteAddr) } - instance.Cm.UpdateConcurrency(m3uIndex, subIndex, false) + instance.Cm.UpdateConcurrency(m3uIndex, false) }() defer func() { diff --git a/store/concurrency.go b/store/concurrency.go index 8766f010..71b48238 100644 --- a/store/concurrency.go +++ b/store/concurrency.go @@ -10,46 +10,34 @@ import ( type ConcurrencyManager struct { mu sync.Mutex - count map[string]map[string]int + count map[string]int } func NewConcurrencyManager() *ConcurrencyManager { - return &ConcurrencyManager{count: make(map[string]map[string]int)} + return &ConcurrencyManager{count: make(map[string]int)} } -func (cm *ConcurrencyManager) Increment(m3uIndex string, subIndex string) { +func (cm *ConcurrencyManager) Increment(m3uIndex string) { cm.mu.Lock() defer cm.mu.Unlock() - if _, ok := cm.count[m3uIndex]; !ok { - cm.count[m3uIndex] = make(map[string]int) - } - - cm.count[m3uIndex][subIndex]++ + cm.count[m3uIndex]++ } -func (cm *ConcurrencyManager) Decrement(m3uIndex string, subIndex string) { +func (cm *ConcurrencyManager) Decrement(m3uIndex string) { cm.mu.Lock() defer cm.mu.Unlock() - if _, ok := cm.count[m3uIndex]; !ok { - cm.count[m3uIndex] = make(map[string]int) - } - - if cm.count[m3uIndex][subIndex] > 0 { - cm.count[m3uIndex][subIndex]-- + if cm.count[m3uIndex] > 0 { + cm.count[m3uIndex]-- } } -func (cm *ConcurrencyManager) GetCount(m3uIndex string, subIndex string) int { +func (cm *ConcurrencyManager) GetCount(m3uIndex string) int { cm.mu.Lock() defer cm.mu.Unlock() - if _, ok := cm.count[m3uIndex]; !ok { - cm.count[m3uIndex] = make(map[string]int) - } - - return cm.count[m3uIndex][subIndex] + return cm.count[m3uIndex] } func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int { @@ -61,13 +49,9 @@ func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int { maxConcurrency = 1 } - totalCount := 0 - for subIndex := range cm.count[m3uIndex] { - count := cm.GetCount(m3uIndex, subIndex) - totalCount += count - } + count := cm.GetCount(m3uIndex) - return maxConcurrency - totalCount + return maxConcurrency - count } func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool { @@ -79,31 +63,23 @@ func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool { maxConcurrency = 1 } - totalCount := 0 - for subIndex := range cm.count[m3uIndex] { - count := cm.GetCount(m3uIndex, subIndex) - totalCount += count - } + count := cm.GetCount(m3uIndex) - utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, totalCount) - return totalCount >= maxConcurrency + utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count) + return count >= maxConcurrency } -func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, subIndex string, incr bool) { +func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, incr bool) { cm.mu.Lock() defer cm.mu.Unlock() if incr { - cm.Increment(m3uIndex, subIndex) + cm.Increment(m3uIndex) } else { - cm.Decrement(m3uIndex, subIndex) + cm.Decrement(m3uIndex) } - totalCount := 0 - for subIndex := range cm.count[m3uIndex] { - count := cm.GetCount(m3uIndex, subIndex) - totalCount += count - } + count := cm.GetCount(m3uIndex) - utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, totalCount) + utils.SafeLogf("Current number of connections for M3U_%s: %d", m3uIndex, count) } From 931cf2992e186c390f509c865aa26f97151521a6 Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 22 Dec 2024 17:26:31 -0500 Subject: [PATCH 13/13] remove unnecessary concurrency locks --- store/concurrency.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/store/concurrency.go b/store/concurrency.go index 71b48238..05204e0d 100644 --- a/store/concurrency.go +++ b/store/concurrency.go @@ -41,9 +41,6 @@ func (cm *ConcurrencyManager) GetCount(m3uIndex string) int { } func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int { - cm.mu.Lock() - defer cm.mu.Unlock() - maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex))) if err != nil { maxConcurrency = 1 @@ -55,9 +52,6 @@ func (cm *ConcurrencyManager) ConcurrencyPriorityValue(m3uIndex string) int { } func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool { - cm.mu.Lock() - defer cm.mu.Unlock() - maxConcurrency, err := strconv.Atoi(os.Getenv(fmt.Sprintf("M3U_MAX_CONCURRENCY_%s", m3uIndex))) if err != nil { maxConcurrency = 1 @@ -70,9 +64,6 @@ func (cm *ConcurrencyManager) CheckConcurrency(m3uIndex string) bool { } func (cm *ConcurrencyManager) UpdateConcurrency(m3uIndex string, incr bool) { - cm.mu.Lock() - defer cm.mu.Unlock() - if incr { cm.Increment(m3uIndex) } else {