From c42831e57fc16a7e39ffbf1ae80deb46d0a0a8eb Mon Sep 17 00:00:00 2001 From: Son Roy Almerol Date: Sun, 2 Feb 2025 00:09:56 -0500 Subject: [PATCH] use bytebufferpool library --- go.mod | 1 + go.sum | 2 ++ proxy/stream/shared_buffer.go | 64 ++++++++++++++++++---------------- proxy/stream/stream_handler.go | 23 +++++++----- 4 files changed, 51 insertions(+), 39 deletions(-) diff --git a/go.mod b/go.mod index b94396c9..4c9773c9 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( ) require ( + github.com/valyala/bytebufferpool v1.0.0 // indirect golang.org/x/crypto v0.31.0 // indirect golang.org/x/sys v0.28.0 // indirect ) diff --git a/go.sum b/go.sum index adc0afc3..8f444cf8 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IX github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= golang.org/x/crypto v0.31.0 h1:ihbySMvVjLAeSH1IbfcRTkD/iNscyz8rGzjF/E5hV6U= golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ssk= golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= diff --git a/proxy/stream/shared_buffer.go b/proxy/stream/shared_buffer.go index a145b753..63a6f370 100644 --- a/proxy/stream/shared_buffer.go +++ b/proxy/stream/shared_buffer.go @@ -11,10 +11,12 @@ import ( "sync" "sync/atomic" "time" + + "github.com/valyala/bytebufferpool" ) type ChunkData struct { - Data []byte + Buffer *bytebufferpool.ByteBuffer Error error Status int Timestamp time.Time @@ -23,7 +25,7 @@ type ChunkData struct { var chunkPool = sync.Pool{ New: func() interface{} { return &ChunkData{ - Data: make([]byte, 0, 4096), // Initial capacity + Buffer: bytebufferpool.Get(), } }, } @@ -53,7 +55,7 @@ func NewStreamCoordinator(streamID string, config *StreamConfig, cm *store.Concu coord := &StreamCoordinator{ buffer: r, - writerChan: make(chan struct{}, 1), // Buffered channel + writerChan: make(chan struct{}, 1), logger: logger, config: config, cm: cm, @@ -120,7 +122,6 @@ func (c *StreamCoordinator) StartWriter(ctx context.Context, m3uIndex string, re lastErr := start zeroReads := 0 - // Use buffered channel for graceful shutdown done := make(chan struct{}, 1) defer close(done) @@ -204,32 +205,29 @@ func (c *StreamCoordinator) Write(chunk *ChunkData) bool { newChunk.Timestamp = chunk.Timestamp // Only copy data if there is any - if len(chunk.Data) > 0 { - if cap(newChunk.Data) < len(chunk.Data) { - newChunk.Data = make([]byte, len(chunk.Data)) - } else { - newChunk.Data = newChunk.Data[:len(chunk.Data)] + if chunk.Buffer != nil && chunk.Buffer.Len() > 0 { + newChunk.Buffer.Reset() + _, err := newChunk.Buffer.Write(chunk.Buffer.Bytes()) + if err != nil { + c.logger.Errorf("Error copying buffer: %v", err) } - copy(newChunk.Data, chunk.Data) - } else { - newChunk.Data = newChunk.Data[:0] } // Return current chunk to pool if it exists if current := c.buffer.Value.(*ChunkData); current != nil { + current.Buffer.Reset() + bytebufferpool.Put(current.Buffer) chunkPool.Put(current) } c.buffer.Value = newChunk c.buffer = c.buffer.Next() - // Update error state if necessary if chunk.Error != nil || chunk.Status != 0 { c.lastError.Store(newChunk) c.active.Store(false) } - // Notify waiting readers c.dataNotify.Broadcast() return true } @@ -238,12 +236,10 @@ func (c *StreamCoordinator) ReadChunks(fromPosition *ring.Ring) ([]*ChunkData, * c.dataNotify.L.Lock() defer c.dataNotify.L.Unlock() - // Wait for new data if buffer is empty for fromPosition == c.buffer && c.active.Load() { c.dataNotify.Wait() } - // Check for errors first if err := c.lastError.Load().(*ChunkData); err != nil { return nil, err, fromPosition } @@ -254,20 +250,20 @@ func (c *StreamCoordinator) ReadChunks(fromPosition *ring.Ring) ([]*ChunkData, * chunks := make([]*ChunkData, 0, 32) current := fromPosition - // Only process new chunks up to the current buffer position for current != c.buffer { if chunk, ok := current.Value.(*ChunkData); ok { - // Only include chunks with actual data or status - if len(chunk.Data) > 0 || chunk.Error != nil || chunk.Status != 0 { - // Create a single copy of the chunk data + if (chunk.Buffer != nil && chunk.Buffer.Len() > 0) || chunk.Error != nil || chunk.Status != 0 { newChunk := &ChunkData{ Status: chunk.Status, Error: chunk.Error, Timestamp: chunk.Timestamp, + Buffer: bytebufferpool.Get(), } - if len(chunk.Data) > 0 { - newChunk.Data = make([]byte, len(chunk.Data)) - copy(newChunk.Data, chunk.Data) + if chunk.Buffer != nil && chunk.Buffer.Len() > 0 { + _, err := newChunk.Buffer.Write(chunk.Buffer.Bytes()) + if err != nil { + c.logger.Errorf("Error copying buffer in ReadChunks: %v", err) + } } chunks = append(chunks, newChunk) } @@ -285,6 +281,10 @@ func (c *StreamCoordinator) clearBuffer() { current := c.buffer for i := 0; i < c.config.SharedBufferSize; i++ { if chunk, ok := current.Value.(*ChunkData); ok { + if chunk.Buffer != nil { + chunk.Buffer.Reset() + bytebufferpool.Put(chunk.Buffer) + } chunkPool.Put(chunk) } current.Value = chunkPool.Get() @@ -300,18 +300,20 @@ func (c *StreamCoordinator) getTimeoutDuration() time.Duration { } func (c *StreamCoordinator) writeData(data []byte) { - chunk := &ChunkData{ - Data: data, - Timestamp: time.Now(), + chunk := chunkPool.Get().(*ChunkData) + chunk.Buffer.Reset() + _, err := chunk.Buffer.Write(data) + if err != nil { + c.logger.Errorf("Error writing to buffer: %v", err) } + chunk.Timestamp = time.Now() c.Write(chunk) } func (c *StreamCoordinator) writeError(err error, status int) { - chunk := &ChunkData{ - Error: err, - Status: status, - Timestamp: time.Now(), - } + chunk := chunkPool.Get().(*ChunkData) + chunk.Error = err + chunk.Status = status + chunk.Timestamp = time.Now() c.Write(chunk) } diff --git a/proxy/stream/stream_handler.go b/proxy/stream/stream_handler.go index 9602e317..734288f1 100644 --- a/proxy/stream/stream_handler.go +++ b/proxy/stream/stream_handler.go @@ -137,10 +137,12 @@ func (h *StreamHandler) handleBufferedStream( ) StreamResult { h.coordinator.RegisterClient() defer h.coordinator.UnregisterClient() + if atomic.LoadInt32(&h.coordinator.clientCount) == 1 { h.logger.Debugf("Starting writer goroutine for m3uIndex: %s", m3uIndex) go h.coordinator.StartWriter(ctx, m3uIndex, resp) } + var bytesWritten int64 var lastPosition *ring.Ring @@ -162,11 +164,14 @@ func (h *StreamHandler) handleBufferedStream( currentPos := lastPosition h.logger.Debugf("Processing remaining chunks before error, starting from: %p", currentPos) + for currentPos != h.coordinator.buffer { if chunk, ok := currentPos.Value.(*ChunkData); ok { - h.logger.Debugf("Found chunk: len(Data)=%d, Error=%v, Status=%d", len(chunk.Data), chunk.Error, chunk.Status) - if len(chunk.Data) > 0 { - written, err := writer.Write(chunk.Data) + h.logger.Debugf("Found chunk: len(Buffer)=%d, Error=%v, Status=%d", + chunk.Buffer.Len(), chunk.Error, chunk.Status) + + if chunk.Buffer != nil && chunk.Buffer.Len() > 0 { + written, err := writer.Write(chunk.Buffer.Bytes()) if err != nil { h.logger.Errorf("Error writing remaining chunks: %s", err.Error()) return StreamResult{bytesWritten, err, 0} @@ -186,10 +191,12 @@ func (h *StreamHandler) handleBufferedStream( hasNewData := false for lastPosition != h.coordinator.buffer { if chunk, ok := lastPosition.Value.(*ChunkData); ok { - h.logger.Debugf("Processing chunk: len(Data)=%d, Error=%v, Status=%d", len(chunk.Data), chunk.Error, chunk.Status) - if len(chunk.Data) > 0 { + h.logger.Debugf("Processing chunk: len(Buffer)=%d, Error=%v, Status=%d", + chunk.Buffer.Len(), chunk.Error, chunk.Status) + + if chunk.Buffer != nil && chunk.Buffer.Len() > 0 { hasNewData = true - written, err := writer.Write(chunk.Data) + written, err := writer.Write(chunk.Buffer.Bytes()) if err != nil { h.coordinator.mu.RUnlock() h.logger.Errorf("Error writing to client: %s", err.Error()) @@ -205,10 +212,10 @@ func (h *StreamHandler) handleBufferedStream( lastPosition = lastPosition.Next() h.logger.Debugf("Moving to next position: %p", lastPosition) } - h.coordinator.mu.RUnlock() + if !hasNewData { - time.Sleep(100 * time.Millisecond) + time.Sleep(10 * time.Millisecond) } } }