Skip to content

Commit

Permalink
Improve context handling in polochonfs
Browse files Browse the repository at this point in the history
* Handle fs stop from signal
* Handle stop from program reading the data
* Handle HTTP timeouts
  • Loading branch information
gregdel authored and PouuleT committed Aug 27, 2024
1 parent 93e1d8f commit 6ac965d
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 26 deletions.
22 changes: 15 additions & 7 deletions cmd/polochonfs/async_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,21 @@ func (r *asyncReader) Close() error {
// asyncRead reads data until the cache reaches `cacheSize`.
func (r *asyncReader) asyncRead() {
defer r.wg.Done()

var err error
for {
select {
case <-r.ctx.Done():
return
err = r.ctx.Err()
if err == context.Canceled {
return
}
break
default:
}

if r.sourcePos == r.end {
// All good
return
}

Expand All @@ -136,14 +143,15 @@ func (r *asyncReader) asyncRead() {
continue
}

if err := r.readBlockFromSource(); err != nil {
log.WithFields(log.Fields{
"error": err,
"name": r.name,
}).Error("Async read error")
return
if err = r.readBlockFromSource(); err != nil {
break
}
}

log.WithFields(log.Fields{
"error": err,
"name": r.name,
}).Error("Async read error")
}

// newAsyncReader creates a new asyncReader and starts a goroutine to read the
Expand Down
14 changes: 7 additions & 7 deletions cmd/polochonfs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ func (pfs *polochonfs) init() error {
return nil
}

func (pfs *polochonfs) buildFS(ctx context.Context) {
func (pfs *polochonfs) buildFS(_ context.Context) {
log.Debug("Adding persistent nodes")
pfs.root.addChild(newPersistentNodeDir(movieDirName))
pfs.root.addChild(newPersistentNodeDir(showDirName))

pfs.updateMovies(ctx)
pfs.updateShows(ctx)
pfs.updateMovies()
pfs.updateShows()
}

func (pfs *polochonfs) handleUpdates() {
Expand All @@ -84,15 +84,15 @@ func (pfs *polochonfs) handleUpdates() {
switch s {
case syscall.SIGUSR1:
log.Info("Updating movies from signal")
pfs.updateMovies(pfs.ctx)
pfs.updateMovies()
case syscall.SIGUSR2:
log.Info("Updating shows from signal")
pfs.updateShows(pfs.ctx)
pfs.updateShows()
}
case <-ticker.C:
log.Debug("Handle updates from ticker")
pfs.updateMovies(pfs.ctx)
pfs.updateShows(pfs.ctx)
pfs.updateMovies()
pfs.updateShows()
case <-pfs.ctx.Done():
log.Info("Handle updates done")
return
Expand Down
6 changes: 4 additions & 2 deletions cmd/polochonfs/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var (
defaultTimeout = 3 * time.Second
libraryRefresh = 1 * time.Minute
umountLogTimeout = 1 * time.Minute
globalCtx context.Context

// Polochon URL / Token configs
polochonURL string
Expand All @@ -43,8 +44,9 @@ func main() {
}

func run() error {
ctx, cancel := context.WithCancel(context.Background())
pfs, err := newPolochonFs(ctx)
var cancel context.CancelFunc
globalCtx, cancel = context.WithCancel(context.Background())
pfs, err := newPolochonFs(globalCtx)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/polochonfs/movies.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"fmt"
"strings"

Expand All @@ -21,7 +20,7 @@ func movieDirTitle(m *papi.Movie) string {
return strings.ReplaceAll(dirTitle, "/", "-")
}

func (pfs *polochonfs) updateMovies(ctx context.Context) {
func (pfs *polochonfs) updateMovies() {
dir := pfs.root.getChild(movieDirName)

log.Debug("Fecthing movies")
Expand Down
51 changes: 45 additions & 6 deletions cmd/polochonfs/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ func (n *node) Open(ctx context.Context, flags uint32) (fh fs.FileHandle, fuseFl
type fileHandle struct {
name, url string
size, lastOffset int64
cancel context.CancelFunc
buffer io.ReadCloser
}

Expand All @@ -246,6 +247,7 @@ func (fh *fileHandle) close() {
if fh.buffer == nil {
return
}
fh.cancel()
fh.buffer.Close()
fh.buffer = nil
}
Expand All @@ -256,8 +258,11 @@ func (fh *fileHandle) setup(ctx context.Context, offset int64) error {
"offset": offset,
}).Trace("Setting up filehandle")

cancelCtx, cancelFunc := context.WithCancel(globalCtx)
fh.cancel = cancelFunc

client := http.DefaultClient
req, err := http.NewRequestWithContext(ctx, "GET", fh.url, nil)
req, err := http.NewRequestWithContext(cancelCtx, "GET", fh.url, nil)
if err != nil {
return err
}
Expand All @@ -268,12 +273,19 @@ func (fh *fileHandle) setup(ctx context.Context, offset int64) error {
req.Header.Add("Range", fmt.Sprintf("bytes=%d-", offset))
}

timeout := time.AfterFunc(defaultTimeout, func() {
log.WithField("name", fh.name).Error("Request timeout")
fh.cancel()
})

resp, err := client.Do(req)
timeout.Stop()
if err != nil {
return err
}

if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent {
if resp.StatusCode != http.StatusOK &&
resp.StatusCode != http.StatusPartialContent {
return fmt.Errorf("Invalid HTTP response code")
}

Expand All @@ -284,38 +296,65 @@ func (fh *fileHandle) setup(ctx context.Context, offset int64) error {
}

fh.lastOffset = offset
fh.buffer = newAsyncReader(ctx, fh.name, resp.Body, offset, fh.size)
fh.buffer = newAsyncReader(cancelCtx, fh.name, resp.Body, offset, fh.size)
return nil
}

func (fh *fileHandle) Read(ctx context.Context, dest []byte, offset int64) (fuse.ReadResult, syscall.Errno) {
defaultErr := syscall.ENETUNREACH // Network unreachable
readSize := int64(len(dest))

l := log.WithFields(log.Fields{
"name": fh.name,
"requested_offset": offset,
})

// Handle context cancelled from the given fuse.Context
err := ctx.Err()
if err != nil {
l.WithField("error", err).Error("Read failed")
return fuse.ReadResultData(dest), defaultErr
if err == context.Canceled {
// go-fuse advises to return EINTR on canceled context.
return fuse.ReadResultData(dest), syscall.EINTR
}

l.WithField("error", err).Error("Read failed from fuse context")
return fuse.ReadResultData(dest), syscall.EIO
}

// Handle the global context
err = globalCtx.Err()
if err != nil {
if err != context.Canceled {
l.WithField("error", err).Error("Read failed from global context")
}
return fuse.ReadResultData(dest), syscall.EIO
}

defaultErr := syscall.ENETUNREACH // Network unreachable
if fh.buffer == nil || offset != fh.lastOffset {
if err := fh.setup(ctx, offset); err != nil {
l.WithField("error", err).Error("Failed to setup file handle")
return fuse.ReadResultData(dest), defaultErr
}
}

timedOut := false
timeout := time.AfterFunc(defaultTimeout, func() {
timedOut = true
fh.cancel()
})
read, err := fh.buffer.Read(dest)
timeout.Stop()

fh.lastOffset += int64(read)
l = l.WithFields(log.Fields{
"read": humanize.SI(float64(readSize), "B"),
"last_offset": fh.lastOffset,
})

if timedOut {
err = fmt.Errorf("timeout after " + defaultTimeout.String())
}

switch err {
case nil:
l.Trace("Read from async reader")
Expand Down
3 changes: 1 addition & 2 deletions cmd/polochonfs/shows.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package main

import (
"context"
"fmt"

polochon "github.com/odwrtw/polochon/lib"
"github.com/odwrtw/polochon/lib/papi"
log "github.com/sirupsen/logrus"
)

func (pfs *polochonfs) updateShows(ctx context.Context) {
func (pfs *polochonfs) updateShows() {
dir := pfs.root.getChild(showDirName)

log.Debug("Fecthing shows")
Expand Down

0 comments on commit 6ac965d

Please sign in to comment.