Skip to content

Commit

Permalink
use bytebufferpool library
Browse files Browse the repository at this point in the history
  • Loading branch information
sonroyaalmerol committed Feb 2, 2025
1 parent f8cf7f1 commit c42831e
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 39 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
)

require (
github.com/valyala/bytebufferpool v1.0.0 // indirect
golang.org/x/crypto v0.31.0 // indirect
golang.org/x/sys v0.28.0 // indirect
)
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U=
golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk=
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
Expand Down
64 changes: 33 additions & 31 deletions proxy/stream/shared_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"sync"
"sync/atomic"
"time"

"github.com/valyala/bytebufferpool"
)

type ChunkData struct {
Data []byte
Buffer *bytebufferpool.ByteBuffer
Error error
Status int
Timestamp time.Time
Expand All @@ -23,7 +25,7 @@ type ChunkData struct {
var chunkPool = sync.Pool{
New: func() interface{} {
return &ChunkData{
Data: make([]byte, 0, 4096), // Initial capacity
Buffer: bytebufferpool.Get(),
}
},
}
Expand Down Expand Up @@ -53,7 +55,7 @@ func NewStreamCoordinator(streamID string, config *StreamConfig, cm *store.Concu

coord := &StreamCoordinator{
buffer: r,
writerChan: make(chan struct{}, 1), // Buffered channel
writerChan: make(chan struct{}, 1),
logger: logger,
config: config,
cm: cm,
Expand Down Expand Up @@ -120,7 +122,6 @@ func (c *StreamCoordinator) StartWriter(ctx context.Context, m3uIndex string, re
lastErr := start
zeroReads := 0

// Use buffered channel for graceful shutdown
done := make(chan struct{}, 1)
defer close(done)

Expand Down Expand Up @@ -204,32 +205,29 @@ func (c *StreamCoordinator) Write(chunk *ChunkData) bool {
newChunk.Timestamp = chunk.Timestamp

// Only copy data if there is any
if len(chunk.Data) > 0 {
if cap(newChunk.Data) < len(chunk.Data) {
newChunk.Data = make([]byte, len(chunk.Data))
} else {
newChunk.Data = newChunk.Data[:len(chunk.Data)]
if chunk.Buffer != nil && chunk.Buffer.Len() > 0 {
newChunk.Buffer.Reset()
_, err := newChunk.Buffer.Write(chunk.Buffer.Bytes())
if err != nil {
c.logger.Errorf("Error copying buffer: %v", err)
}
copy(newChunk.Data, chunk.Data)
} else {
newChunk.Data = newChunk.Data[:0]
}

// Return current chunk to pool if it exists
if current := c.buffer.Value.(*ChunkData); current != nil {
current.Buffer.Reset()
bytebufferpool.Put(current.Buffer)
chunkPool.Put(current)
}

c.buffer.Value = newChunk
c.buffer = c.buffer.Next()

// Update error state if necessary
if chunk.Error != nil || chunk.Status != 0 {
c.lastError.Store(newChunk)
c.active.Store(false)
}

// Notify waiting readers
c.dataNotify.Broadcast()
return true
}
Expand All @@ -238,12 +236,10 @@ func (c *StreamCoordinator) ReadChunks(fromPosition *ring.Ring) ([]*ChunkData, *
c.dataNotify.L.Lock()
defer c.dataNotify.L.Unlock()

// Wait for new data if buffer is empty
for fromPosition == c.buffer && c.active.Load() {
c.dataNotify.Wait()
}

// Check for errors first
if err := c.lastError.Load().(*ChunkData); err != nil {
return nil, err, fromPosition
}
Expand All @@ -254,20 +250,20 @@ func (c *StreamCoordinator) ReadChunks(fromPosition *ring.Ring) ([]*ChunkData, *
chunks := make([]*ChunkData, 0, 32)
current := fromPosition

// Only process new chunks up to the current buffer position
for current != c.buffer {
if chunk, ok := current.Value.(*ChunkData); ok {
// Only include chunks with actual data or status
if len(chunk.Data) > 0 || chunk.Error != nil || chunk.Status != 0 {
// Create a single copy of the chunk data
if (chunk.Buffer != nil && chunk.Buffer.Len() > 0) || chunk.Error != nil || chunk.Status != 0 {
newChunk := &ChunkData{
Status: chunk.Status,
Error: chunk.Error,
Timestamp: chunk.Timestamp,
Buffer: bytebufferpool.Get(),
}
if len(chunk.Data) > 0 {
newChunk.Data = make([]byte, len(chunk.Data))
copy(newChunk.Data, chunk.Data)
if chunk.Buffer != nil && chunk.Buffer.Len() > 0 {
_, err := newChunk.Buffer.Write(chunk.Buffer.Bytes())
if err != nil {
c.logger.Errorf("Error copying buffer in ReadChunks: %v", err)
}
}
chunks = append(chunks, newChunk)
}
Expand All @@ -285,6 +281,10 @@ func (c *StreamCoordinator) clearBuffer() {
current := c.buffer
for i := 0; i < c.config.SharedBufferSize; i++ {
if chunk, ok := current.Value.(*ChunkData); ok {
if chunk.Buffer != nil {
chunk.Buffer.Reset()
bytebufferpool.Put(chunk.Buffer)
}
chunkPool.Put(chunk)
}
current.Value = chunkPool.Get()
Expand All @@ -300,18 +300,20 @@ func (c *StreamCoordinator) getTimeoutDuration() time.Duration {
}

func (c *StreamCoordinator) writeData(data []byte) {
chunk := &ChunkData{
Data: data,
Timestamp: time.Now(),
chunk := chunkPool.Get().(*ChunkData)
chunk.Buffer.Reset()
_, err := chunk.Buffer.Write(data)
if err != nil {
c.logger.Errorf("Error writing to buffer: %v", err)
}
chunk.Timestamp = time.Now()
c.Write(chunk)
}

func (c *StreamCoordinator) writeError(err error, status int) {
chunk := &ChunkData{
Error: err,
Status: status,
Timestamp: time.Now(),
}
chunk := chunkPool.Get().(*ChunkData)
chunk.Error = err
chunk.Status = status
chunk.Timestamp = time.Now()
c.Write(chunk)
}
23 changes: 15 additions & 8 deletions proxy/stream/stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,12 @@ func (h *StreamHandler) handleBufferedStream(
) StreamResult {
h.coordinator.RegisterClient()
defer h.coordinator.UnregisterClient()

if atomic.LoadInt32(&h.coordinator.clientCount) == 1 {
h.logger.Debugf("Starting writer goroutine for m3uIndex: %s", m3uIndex)
go h.coordinator.StartWriter(ctx, m3uIndex, resp)
}

var bytesWritten int64
var lastPosition *ring.Ring

Expand All @@ -162,11 +164,14 @@ func (h *StreamHandler) handleBufferedStream(

currentPos := lastPosition
h.logger.Debugf("Processing remaining chunks before error, starting from: %p", currentPos)

for currentPos != h.coordinator.buffer {
if chunk, ok := currentPos.Value.(*ChunkData); ok {
h.logger.Debugf("Found chunk: len(Data)=%d, Error=%v, Status=%d", len(chunk.Data), chunk.Error, chunk.Status)
if len(chunk.Data) > 0 {
written, err := writer.Write(chunk.Data)
h.logger.Debugf("Found chunk: len(Buffer)=%d, Error=%v, Status=%d",
chunk.Buffer.Len(), chunk.Error, chunk.Status)

if chunk.Buffer != nil && chunk.Buffer.Len() > 0 {
written, err := writer.Write(chunk.Buffer.Bytes())
if err != nil {
h.logger.Errorf("Error writing remaining chunks: %s", err.Error())
return StreamResult{bytesWritten, err, 0}
Expand All @@ -186,10 +191,12 @@ func (h *StreamHandler) handleBufferedStream(
hasNewData := false
for lastPosition != h.coordinator.buffer {
if chunk, ok := lastPosition.Value.(*ChunkData); ok {
h.logger.Debugf("Processing chunk: len(Data)=%d, Error=%v, Status=%d", len(chunk.Data), chunk.Error, chunk.Status)
if len(chunk.Data) > 0 {
h.logger.Debugf("Processing chunk: len(Buffer)=%d, Error=%v, Status=%d",
chunk.Buffer.Len(), chunk.Error, chunk.Status)

if chunk.Buffer != nil && chunk.Buffer.Len() > 0 {
hasNewData = true
written, err := writer.Write(chunk.Data)
written, err := writer.Write(chunk.Buffer.Bytes())
if err != nil {
h.coordinator.mu.RUnlock()
h.logger.Errorf("Error writing to client: %s", err.Error())
Expand All @@ -205,10 +212,10 @@ func (h *StreamHandler) handleBufferedStream(
lastPosition = lastPosition.Next()
h.logger.Debugf("Moving to next position: %p", lastPosition)
}

h.coordinator.mu.RUnlock()

if !hasNewData {
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
}
}
}
Expand Down

0 comments on commit c42831e

Please sign in to comment.