-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #75 from mutablelogic/v4
Added text stream implementation
- Loading branch information
Showing
7 changed files
with
254 additions
and
84 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,166 @@ | ||
package httpresponse | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"errors" | ||
"io" | ||
"net/http" | ||
"sync" | ||
"time" | ||
) | ||
|
||
/////////////////////////////////////////////////////////////////////////////// | ||
// TYPES | ||
|
||
// TextStream implements a stream of text events | ||
type TextStream struct { | ||
wg sync.WaitGroup | ||
w io.Writer | ||
ch chan *textevent | ||
err error | ||
} | ||
|
||
type textevent struct { | ||
name string | ||
data []any | ||
} | ||
|
||
/////////////////////////////////////////////////////////////////////////////// | ||
// GLOBALS | ||
|
||
const ( | ||
defaultKeepAlive = 10 * time.Second | ||
) | ||
|
||
var ( | ||
strPing = "ping" | ||
strEvent = []byte("event: ") | ||
strData = []byte("data: ") | ||
strNewline = []byte("\n") | ||
) | ||
|
||
/////////////////////////////////////////////////////////////////////////////// | ||
// LIFECYCLE | ||
|
||
// Create a new text stream with mimetype text/event-stream | ||
// Additional header tuples can be provided as a series of key-value pairs | ||
func NewTextStream(w http.ResponseWriter, tuples ...string) *TextStream { | ||
// Check parameters | ||
if w == nil { | ||
return nil | ||
} | ||
if len(tuples)%2 != 0 { | ||
return nil | ||
} | ||
|
||
// Create a text stream | ||
self := new(TextStream) | ||
self.w = w | ||
self.ch = make(chan *textevent) | ||
|
||
// Set the default content type | ||
w.Header().Set(ContentTypeKey, ContentTypeTextStream) | ||
|
||
// Set additional headers | ||
for i := 0; i < len(tuples); i += 2 { | ||
w.Header().Set(tuples[i], tuples[i+1]) | ||
} | ||
|
||
// Write the response, don't know is this is the right one | ||
w.WriteHeader(http.StatusContinue) | ||
|
||
// goroutine will write to the response writer until the channel is closed | ||
self.wg.Add(1) | ||
go func() { | ||
defer self.wg.Done() | ||
|
||
// Create a ticker for ping messages | ||
ticker := time.NewTimer(100 * time.Millisecond) | ||
defer ticker.Stop() | ||
|
||
// Run until the channel is closed | ||
for { | ||
select { | ||
case evt := <-self.ch: | ||
if evt == nil { | ||
return | ||
} | ||
self.emit(evt) | ||
case <-ticker.C: | ||
self.err = errors.Join(self.err, self.emit(&textevent{strPing, nil})) | ||
ticker.Reset(defaultKeepAlive) | ||
} | ||
} | ||
}() | ||
|
||
// Return the textstream object | ||
return self | ||
} | ||
|
||
// Close the text stream to stop sending ping messages | ||
func (s *TextStream) Close() error { | ||
// Close the channel | ||
close(s.ch) | ||
|
||
// Wait for the goroutine to finish | ||
s.wg.Wait() | ||
|
||
// Return any errors | ||
return s.err | ||
} | ||
|
||
/////////////////////////////////////////////////////////////////////////////// | ||
// PUBLIC METHODS | ||
|
||
// Write a text event to the stream, and one or more optional data objects | ||
// which are encoded as JSON | ||
func (s *TextStream) Write(name string, data ...any) { | ||
s.ch <- &textevent{name, data} | ||
} | ||
|
||
/////////////////////////////////////////////////////////////////////////////// | ||
// PRIVATE METHODS | ||
|
||
// emit an event to the stream | ||
func (s *TextStream) emit(e *textevent) error { | ||
var result error | ||
|
||
// Write the event to the stream | ||
if e.name != "" { | ||
if err := s.write(strEvent, []byte(e.name), strNewline); err != nil { | ||
return err | ||
} | ||
} | ||
|
||
// Write the data to the stream | ||
for _, v := range e.data { | ||
if v == nil { | ||
continue | ||
} else if data, err := json.Marshal(v); err != nil { | ||
result = errors.Join(result, err) | ||
} else if err := s.write(strData, data, strNewline); err != nil { | ||
result = errors.Join(result, err) | ||
} | ||
} | ||
|
||
// Flush the event | ||
if result == nil { | ||
if err := s.write(strNewline); err != nil { | ||
result = errors.Join(result, err) | ||
} | ||
if w, ok := s.w.(http.Flusher); ok { | ||
w.Flush() | ||
} | ||
} | ||
|
||
// Return any errors | ||
return result | ||
} | ||
|
||
func (s *TextStream) write(v ...[]byte) error { | ||
if _, err := s.w.Write(bytes.Join(v, nil)); err != nil { | ||
return err | ||
} | ||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
package httpresponse_test | ||
|
||
import ( | ||
"net/http/httptest" | ||
"testing" | ||
"time" | ||
|
||
// Packages | ||
"github.com/mutablelogic/go-server/pkg/httpresponse" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func Test_textstream_001(t *testing.T) { | ||
assert := assert.New(t) | ||
|
||
t.Run("New", func(t *testing.T) { | ||
resp := httptest.NewRecorder() | ||
ts := httpresponse.NewTextStream(resp) | ||
assert.NotNil(ts) | ||
t.Log(ts) | ||
assert.NoError(ts.Close()) | ||
}) | ||
|
||
t.Run("Ping", func(t *testing.T) { | ||
resp := httptest.NewRecorder() | ||
ts := httpresponse.NewTextStream(resp) | ||
assert.NotNil(ts) | ||
|
||
time.Sleep(1 * time.Second) | ||
assert.NoError(ts.Close()) | ||
assert.Equal(100, resp.Code) | ||
assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) | ||
assert.Equal("event: ping\n\n", resp.Body.String()) | ||
}) | ||
|
||
t.Run("EventNoData", func(t *testing.T) { | ||
resp := httptest.NewRecorder() | ||
ts := httpresponse.NewTextStream(resp) | ||
assert.NotNil(ts) | ||
|
||
ts.Write("foo") | ||
|
||
time.Sleep(1 * time.Second) | ||
assert.NoError(ts.Close()) | ||
assert.Equal(100, resp.Code) | ||
assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) | ||
assert.Equal("event: foo\n\n"+"event: ping\n\n", resp.Body.String()) | ||
}) | ||
|
||
t.Run("EventData", func(t *testing.T) { | ||
resp := httptest.NewRecorder() | ||
ts := httpresponse.NewTextStream(resp) | ||
assert.NotNil(ts) | ||
|
||
ts.Write("foo", "bar") | ||
|
||
time.Sleep(1 * time.Second) | ||
assert.NoError(ts.Close()) | ||
assert.Equal(100, resp.Code) | ||
assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) | ||
assert.Equal("event: foo\n"+"data: \"bar\"\n\n"+"event: ping\n\n", resp.Body.String()) | ||
}) | ||
|
||
t.Run("EventDataData", func(t *testing.T) { | ||
resp := httptest.NewRecorder() | ||
ts := httpresponse.NewTextStream(resp) | ||
assert.NotNil(ts) | ||
|
||
ts.Write("foo", "bar1", "bar2") | ||
|
||
time.Sleep(1 * time.Second) | ||
assert.NoError(ts.Close()) | ||
assert.Equal(100, resp.Code) | ||
assert.Equal("text/event-stream", resp.Header().Get("Content-Type")) | ||
assert.Equal("event: foo\n"+"data: \"bar1\"\n"+"data: \"bar2\"\n\n"+"event: ping\n\n", resp.Body.String()) | ||
}) | ||
|
||
} |
This file was deleted.
Oops, something went wrong.