Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import (

// File is a backing data-source for a Stream.
type File interface {
Name() string // The name used to Create/Open the File
io.Reader // Reader must continue reading after EOF on subsequent calls after more Writes.
io.ReaderAt // Similarly to Reader
io.Writer // Concurrent reading/writing must be supported.
io.Closer // Close should do any cleanup when done with the File.
io.Reader // Reader must continue reading after EOF on subsequent calls after more Writes.
io.ReaderAt // Similarly to Reader
io.Writer // Concurrent reading/writing must be supported.
io.Closer // Close should do any cleanup when done with the File.
}

// FileSystem is used to manage Files
Expand Down
7 changes: 3 additions & 4 deletions memfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,14 @@ func NewMemFS() FileSystem {
func (fs *memfs) Create(key string) (File, error) {
fs.mu.Lock()
defer fs.mu.Unlock()
file := newMemFile(key)
file := newMemFile()
fs.files[key] = file
return file, nil
}

func newMemFile(name string) *memFile {
func newMemFile() *memFile {
file := &memFile{
name: name,
r: bytes.NewBuffer(nil),
r: bytes.NewBuffer(nil),
}
file.buf.Store([]byte(nil))
file.memReader.memFile = file
Expand Down
2 changes: 1 addition & 1 deletion reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Reader struct {
}

// Name returns the name of the underlying File in the FileSystem.
func (r *Reader) Name() string { return r.file.Name() }
func (r *Reader) Name() string { return r.s.name }

// ReadAt lets you Read from specific offsets in the Stream.
// ReadAt blocks while waiting for the requested section of the Stream to be written,
Expand Down
16 changes: 9 additions & 7 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Stream struct {
file File
fs FileSystem
closeOnce onceWithErr
name string
}

// New creates a new Stream from the StdFileSystem with Name "name".
Expand All @@ -26,19 +27,20 @@ func New(name string) (*Stream, error) {
// NewStream creates a new Stream with Name "name" in FileSystem fs.
func NewStream(name string, fs FileSystem) (*Stream, error) {
f, err := fs.Create(name)
return newStream(f, fs), err
return newStream(name, f, fs), err
}

// NewMemStream creates an in-memory stream with no name, and no underlying fs.
// This should replace uses of NewStream("name", NewMemFs()).
// Remove() is unsupported as there is no fs to remove it from.
func NewMemStream() *Stream {
f := newMemFile("")
return newStream(f, singletonFs{f})
f := newMemFile()
return newStream("", f, singletonFs{f})
}

func newStream(file File, fs FileSystem) *Stream {
func newStream(name string, file File, fs FileSystem) *Stream {
return &Stream{
name: name,
file: file,
fs: fs,
b: newBroadcaster(),
Expand All @@ -56,7 +58,7 @@ func (fs singletonFs) Open(key string) (File, error) { return &memReader{memFile
func (fs singletonFs) Remove(key string) error { return ErrUnsupported }

// Name returns the name of the underlying File in the FileSystem.
func (s *Stream) Name() string { return s.file.Name() }
func (s *Stream) Name() string { return s.name }

// Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods.
func (s *Stream) Write(p []byte) (int, error) {
Expand Down Expand Up @@ -84,7 +86,7 @@ func (s *Stream) Close() error {
// ErrRemoving if called after Remove.
func (s *Stream) Remove() error {
s.ShutdownWithErr(ErrRemoving)
return s.fs.Remove(s.file.Name())
return s.fs.Remove(s.name)
}

// ShutdownWithErr causes NextReader to stop creating new Readers and instead return err, this
Expand All @@ -109,7 +111,7 @@ func (s *Stream) Cancel() error {
// is written to.
func (s *Stream) NextReader() (*Reader, error) {
return s.b.NewReader(func() (*Reader, error) {
file, err := s.fs.Open(s.file.Name())
file, err := s.fs.Open(s.name)
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ type slowFile struct {
file File
}

func (r slowFile) Name() string { return r.file.Name() }
func (r slowFile) Read(p []byte) (int, error) {
time.Sleep(5 * time.Millisecond)
return r.file.Read(p)
Expand Down