Skip to content

Commit

Permalink
implement token based streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Apr 23, 2024
1 parent 155ed97 commit 7c69a09
Show file tree
Hide file tree
Showing 6 changed files with 408 additions and 15 deletions.
31 changes: 30 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type Client struct {
maxBackoff time.Duration

// lazily cached URLs
queryURL *url.URL
queryURL, streamURL *url.URL
}

// NewDefaultClient initialize a [fauna.Client] with recommend default settings
Expand Down Expand Up @@ -196,6 +196,16 @@ func (c *Client) parseQueryURL() (url *url.URL, err error) {
return
}

func (c *Client) parseStreamURL() (url *url.URL, err error) {
if c.streamURL != nil {
url = c.streamURL
} else if url, err = url.Parse(c.url); err == nil {
url = url.JoinPath("stream", "1")
c.streamURL = url
}
return
}

func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) {
req2 := req.Clone(req.Context())
body, rerr := io.ReadAll(req.Body)
Expand Down Expand Up @@ -291,6 +301,25 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator {
}
}

// Subscribe initiates a stream subscription for the given stream value.
func (c *Client) Subscribe(stream Stream) (*Subscription, error) {
streamReq := streamRequest{
apiRequest: apiRequest{c.ctx, c.headers},
Stream: stream,
}

if byteStream, err := streamReq.do(c); err == nil {
sub := &Subscription{
events: make(chan *Event),
byteStream: byteStream,
}
go sub.consume()
return sub, nil
} else {
return nil, err
}
}

// QueryIterator is a [fauna.Client] iterator for paginated queries
type QueryIterator struct {
client *Client
Expand Down
78 changes: 78 additions & 0 deletions client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,81 @@ func ExampleClient_Paginate() {
fmt.Printf("%d", len(items))
// Output: 20
}

func ExampleClient_Subscribe() {
// IMPORTANT: just for the purpose of example, don't actually hardcode secret
_ = os.Setenv(fauna.EnvFaunaSecret, "secret")
_ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal)

client, err := fauna.NewDefaultClient()
if err != nil {
log.Fatalf("client should have been initialized: %s", err)
}

// setup a collection
setupQuery, _ := fauna.FQL(`
Collection.byName('StreamingSandbox')?.delete()
Collection.create({ name: 'StreamingSandbox' })
`, nil)
if _, err := client.Query(setupQuery); err != nil {
log.Fatalf("failed to setup the collection: %s", err)
}

// create a stream
streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil)
result, err := client.Query(streamQuery)
if err != nil {
log.Fatalf("failed to create a stream: %s", err)
}

var stream fauna.Stream
if err := result.Unmarshal(&stream); err != nil {
log.Fatalf("failed to unmarshal the stream value: %s", err)
}

// initiate the stream subscription
subscription, err := client.Subscribe(stream)
if err != nil {
log.Fatalf("failed to subscribe to the stream value: %s", err)
}
defer subscription.Close()

// produce some events while the subscription is open
createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil)
updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil)
deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil)

queries := []*fauna.Query{createQuery, updateQuery, deleteQuery}
for _, query := range queries {
if _, err := client.Query(query); err != nil {
log.Fatalf("failed execute CRUD query: %s", err)
}
}

// fetch the produced events
type Data struct {
Foo string `fauna:"foo"`
}

events := subscription.Events()
expect := 3

for expect > 0 {
event := <-events
if event == nil {
break
}
switch event.Type {
case "add", "update", "remove":
var data Data
if err := event.Unmarshal(&data); err != nil {
log.Fatalf("failed to unmarshal event data: %s", err)
}
fmt.Printf("Event: %s Data: %+v\n", event.Type, data)
expect--
}
}
// Output: Event: add Data: {Foo:bar}
// Event: update Data: {Foo:baz}
// Event: remove Data: {Foo:baz}
}
72 changes: 58 additions & 14 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,19 @@ type queryResponse struct {
Tags string `json:"query_tags"`
}

func parseQueryResponse(httpRes *http.Response, attempts int) (qRes *queryResponse, err error) {
var bytesIn []byte
if bytesIn, err = io.ReadAll(httpRes.Body); err != nil {
err = fmt.Errorf("failed to read response body: %w", err)
return
}

if err = json.Unmarshal(bytesIn, &qRes); err != nil {
err = fmt.Errorf("failed to umarmshal response: %w", err)
}
return
}

func (r *queryResponse) queryTags() map[string]string {
ret := map[string]string{}

Expand All @@ -74,6 +87,7 @@ func (r *queryResponse) queryTags() map[string]string {

return ret
}

func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) {
var bytesOut []byte
if bytesOut, err = marshal(qReq); err != nil {
Expand All @@ -94,25 +108,15 @@ func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) {
return
}

var (
qRes queryResponse
bytesIn []byte
)

if bytesIn, err = io.ReadAll(httpRes.Body); err != nil {
err = fmt.Errorf("failed to read response body: %w", err)
return
}

if err = json.Unmarshal(bytesIn, &qRes); err != nil {
err = fmt.Errorf("failed to umarmshal response: %w", err)
var qRes *queryResponse
if qRes, err = parseQueryResponse(httpRes, attempts); err != nil {
return
}

cli.lastTxnTime.sync(qRes.TxnTime)
qRes.Header = httpRes.Header

if err = getErrFauna(httpRes.StatusCode, &qRes, attempts); err != nil {
if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err != nil {
return
}

Expand All @@ -123,10 +127,50 @@ func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) {
}

qSus = &QuerySuccess{
QueryInfo: newQueryInfo(&qRes),
QueryInfo: newQueryInfo(qRes),
Data: data,
StaticType: qRes.StaticType,
}
qSus.Stats.Attempts = attempts
return
}

type streamRequest struct {
apiRequest
Stream Stream
StartTS int64
}

func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) {
var bytesOut []byte
if bytesOut, err = marshal(streamReq); err != nil {
err = fmt.Errorf("marshal request failed: %w", err)
return
}

var streamURL *url.URL
if streamURL, err = cli.parseStreamURL(); err != nil {
return
}

var (
attempts int
httpRes *http.Response
)
if attempts, httpRes, err = streamReq.post(cli, streamURL, bytesOut); err != nil {
return
}

if httpRes.StatusCode != http.StatusOK {
var qRes *queryResponse
if qRes, err = parseQueryResponse(httpRes, attempts); err == nil {
if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil {
err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode)
}
}
return
}

bytes = httpRes.Body
return
}
7 changes: 7 additions & 0 deletions serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,13 @@ func encode(v any, hint string) (any, error) {
}
}
return out, nil

case streamRequest:
out := map[string]any{"token": string(vt.Stream)}
if vt.StartTS > 0 {
out["start_ts"] = vt.StartTS
}
return out, nil
}

switch value := reflect.ValueOf(v); value.Kind() {
Expand Down
137 changes: 137 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package fauna

import (
"encoding/json"
"io"
)

// Event represents a streaming event.
//
// All events contain the [fauna.Event.Type] and [fauna.Event.Stats] fields.
//
// Events of type "add", "update", and "remove" will contain the
// [fauna.Event.Data] field with the event's data in it. Data events have their
// [fauna.Event.Error] field set to nil. Data events can be umarmshalled into a
// user-defined struct via the [fauna.Event.Unmarshal] method.
//
// Events of type "status" and "error" will have their [fauna.Event.Data] field
// set to nil. Error events contain the [fauna.Event.Error] field present with
// the underlying error information.
type Event struct {
// Type is this event's type.
Type string `json:"type"`

// TxnTime is the transaction time that produce this event.
TxnTime int64 `json:"txn_ts,omitempty"`

// Data is the event's data. Data is set to nil if the Type field is set to
// "status" or "error".
Data any `json:"data,omitempty"`

// Error contains error information when the event Type is set to "error".
Error *ErrEvent `json:"error,omitempty"`

// Stats contains the ops acquired to process the event.
Stats Stats `json:"stats"`
}

// Unmarshal will unmarshal the raw [fauna.Event.Data] (if present) into the
// known type provided as `into`. `into` must be a pointer to a map or struct.
func (e *Event) Unmarshal(into any) error {
return decodeInto(e.Data, into)
}

// ErrEvent contains error information present in error events.
//
// Error events with "abort" code contain its aborting value present in the
// [fauan.ErrEvent.Abort]. The aborting values can be unmarshalled with the
// [fauna.ErrEvent.Unmarshal] method.
type ErrEvent struct {
// Code is the error's code.
Code string `json:"code"`

// Message is the error's message.
Message string `json:"message"`

// Abort is the error's abort data, present if Code == "abort".
Abort any `json:"abort"`
}

// Unmarshal will unmarshal the raw [fauna.ErrEvent.Abort] (if present) into the
// known type provided as `into`. `into` must be a pointer to a map or struct.
func (e *ErrEvent) Unmarshal(into any) error {
return decodeInto(e.Abort, into)
}

// Subscription is a Fauna stream subscription.
//
// Events can be obtained by reading from the [fauna.Subscription.Events]
// channel. Note that the events channel emits a nil event on closing.
//
// If the subscription gets closed unexpectedly, its closing error can be
// retrieved via the [fauna.Subscription.Error] method.
//
// A stream subscription can be gracefully closed via the
// [fauna.Subscription.Close] method.
type Subscription struct {
client *Client

Check failure on line 77 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.22)

field `client` is unused (unused)

Check failure on line 77 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.21)

field `client` is unused (unused)

Check failure on line 77 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.20)

field `client` is unused (unused)

Check failure on line 77 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.19)

field `client` is unused (unused)
stream Stream

Check failure on line 78 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.22)

field `stream` is unused (unused)

Check failure on line 78 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.21)

field `stream` is unused (unused)

Check failure on line 78 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.20)

field `stream` is unused (unused)

Check failure on line 78 in stream.go

View workflow job for this annotation

GitHub Actions / validate (1.19)

field `stream` is unused (unused)

byteStream io.ReadCloser
events chan *Event
error error
closed bool
}

// Events return the subscription's events channel.
func (s *Subscription) Events() <-chan *Event { return s.events }

// Error returns the subscription's closing error, if any.
func (s *Subscription) Error() error { return s.error }

// Close gracefully closes the stream subscription.
func (s *Subscription) Close() (err error) {
if !s.closed {
s.closed = true
err = s.byteStream.Close()
}
return
}

func (s *Subscription) consume() {
defer close(s.events)
decoder := json.NewDecoder(s.byteStream)

for {
event := &Event{}
if err := decoder.Decode(event); err != nil {
// NOTE: When closing the stream, a network error may occur as due
// to its socket closing while the json decoder is blocked reading
// it. Errors to close the socket are already emitted by the Close()
// method, therefore, we don't want to propagate them here again.
if !s.closed {
s.error = err
}
break
}
if err := convertEvent(event); err != nil {
s.error = err
break
}
s.events <- event
}
}

func convertEvent(event *Event) (err error) {
if event.Data != nil {
if event.Data, err = convert(false, event.Data); err != nil {
return
}
}
if event.Error != nil && event.Error.Abort != nil {
if event.Error.Abort, err = convert(false, event.Error.Abort); err != nil {
return
}
}
return
}
Loading

0 comments on commit 7c69a09

Please sign in to comment.