Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1 #30

Draft
wants to merge 20 commits into
base: master
Choose a base branch
from
Draft

V1 #30

Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
modify cache to use io writer.
  • Loading branch information
m1k1o committed Dec 27, 2021
commit ad1f160b75d8a42775cbc87c7733aa98fc07a64f
27 changes: 19 additions & 8 deletions internal/utils/cache.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ package utils

import (
"io"
"net/http"
"sync"
"time"
)
@@ -17,13 +16,13 @@ type Cache struct {
listeners []func([]byte) (int, error)
listenersMu sync.RWMutex

Expires time.Time
expires time.Time
}

func NewCache(expires time.Time) *Cache {
return &Cache{
closeCh: make(chan struct{}),
Expires: expires,
expires: expires,
}
}

@@ -64,9 +63,8 @@ func (c *Cache) Close() error {
return nil
}

func (c *Cache) ServeHTTP(w http.ResponseWriter) {
offset := 0
index := 0
func (c *Cache) CopyTo(w io.Writer) error {
offset, index := 0, 0

for {
c.mu.RLock()
@@ -79,15 +77,23 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) {
chunk := c.chunks[index]
c.mu.RUnlock()

i, _ := w.Write(chunk)
i, err := w.Write(chunk)
if err != nil {
return err
}

offset += i
index++
continue
}

// if stream is already closed
if closed {
return
var err error
if closer, ok := w.(io.WriteCloser); ok {
err = closer.Close()
}
return err
}

// we don't have enough data but stream is not closed
@@ -101,4 +107,9 @@ func (c *Cache) ServeHTTP(w http.ResponseWriter) {

// wait until it finishes
<-c.closeCh
return nil
}

func (c *Cache) Expired() bool {
return time.Now().After(c.expires)
}
4 changes: 2 additions & 2 deletions pkg/hlsproxy/cache.go
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ func (m *ManagerCtx) getFromCache(key string) (*utils.Cache, bool) {
}

// if cache has expired
if time.Now().After(entry.Expires) {
if entry.Expired() {
return nil, false
}

@@ -61,7 +61,7 @@ func (m *ManagerCtx) clearCache() {
m.cacheMu.Lock()
for key, entry := range m.cache {
// remove expired entries
if time.Now().After(entry.Expires) {
if entry.Expired() {
delete(m.cache, key)
m.logger.Debug().Str("key", key).Msg("cache cleanup remove expired")
} else {
4 changes: 2 additions & 2 deletions pkg/hlsproxy/manager.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ func (m *ManagerCtx) ServePlaylist(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/vnd.apple.mpegurl")
w.WriteHeader(200)

cache.ServeHTTP(w)
cache.CopyTo(w)
}

func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) {
@@ -104,5 +104,5 @@ func (m *ManagerCtx) ServeSegment(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "video/MP2T")
w.WriteHeader(200)

cache.ServeHTTP(w)
cache.CopyTo(w)
}