diff --git a/client.go b/client.go index 4ac9e40..e4e614f 100644 --- a/client.go +++ b/client.go @@ -73,7 +73,7 @@ type Client struct { maxBackoff time.Duration // lazily cached URLs - queryURL *url.URL + queryURL, streamURL *url.URL } // NewDefaultClient initialize a [fauna.Client] with recommend default settings @@ -196,6 +196,16 @@ func (c *Client) parseQueryURL() (url *url.URL, err error) { return } +func (c *Client) parseStreamURL() (url *url.URL, err error) { + if c.streamURL != nil { + url = c.streamURL + } else if url, err = url.Parse(c.url); err == nil { + url = url.JoinPath("stream", "1") + c.streamURL = url + } + return +} + func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) { req2 := req.Clone(req.Context()) body, rerr := io.ReadAll(req.Body) @@ -291,6 +301,25 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator { } } +// Subscribe initiates a stream subscription for the given stream value. +func (c *Client) Subscribe(stream Stream) (*Subscription, error) { + streamReq := streamRequest{ + apiRequest: apiRequest{c.ctx, c.headers}, + Stream: stream, + } + + if byteStream, err := streamReq.do(c); err == nil { + sub := &Subscription{ + events: make(chan *Event), + byteStream: byteStream, + } + go sub.consume() + return sub, nil + } else { + return nil, err + } +} + // QueryIterator is a [fauna.Client] iterator for paginated queries type QueryIterator struct { client *Client diff --git a/client_example_test.go b/client_example_test.go index e4eb030..5b3f0a4 100644 --- a/client_example_test.go +++ b/client_example_test.go @@ -317,3 +317,81 @@ func ExampleClient_Paginate() { fmt.Printf("%d", len(items)) // Output: 20 } + +func ExampleClient_Subscribe() { + // IMPORTANT: just for the purpose of example, don't actually hardcode secret + _ = os.Setenv(fauna.EnvFaunaSecret, "secret") + _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + + client, err := fauna.NewDefaultClient() + if err != nil { + log.Fatalf("client should have been initialized: %s", err) + } + + // setup a collection + setupQuery, _ := fauna.FQL(` + Collection.byName('StreamingSandbox')?.delete() + Collection.create({ name: 'StreamingSandbox' }) + `, nil) + if _, err := client.Query(setupQuery); err != nil { + log.Fatalf("failed to setup the collection: %s", err) + } + + // create a stream + streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil) + result, err := client.Query(streamQuery) + if err != nil { + log.Fatalf("failed to create a stream: %s", err) + } + + var stream fauna.Stream + if err := result.Unmarshal(&stream); err != nil { + log.Fatalf("failed to unmarshal the stream value: %s", err) + } + + // initiate the stream subscription + subscription, err := client.Subscribe(stream) + if err != nil { + log.Fatalf("failed to subscribe to the stream value: %s", err) + } + defer subscription.Close() + + // produce some events while the subscription is open + createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil) + updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil) + deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil) + + queries := []*fauna.Query{createQuery, updateQuery, deleteQuery} + for _, query := range queries { + if _, err := client.Query(query); err != nil { + log.Fatalf("failed execute CRUD query: %s", err) + } + } + + // fetch the produced events + type Data struct { + Foo string `fauna:"foo"` + } + + events := subscription.Events() + expect := 3 + + for expect > 0 { + event := <-events + if event == nil { + break + } + switch event.Type { + case "add", "update", "remove": + var data Data + if err := event.Unmarshal(&data); err != nil { + log.Fatalf("failed to unmarshal event data: %s", err) + } + fmt.Printf("Event: %s Data: %+v\n", event.Type, data) + expect-- + } + } + // Output: Event: add Data: {Foo:bar} + // Event: update Data: {Foo:baz} + // Event: remove Data: {Foo:baz} +} diff --git a/request.go b/request.go index 9438b79..d03a08e 100644 --- a/request.go +++ b/request.go @@ -62,6 +62,19 @@ type queryResponse struct { Tags string `json:"query_tags"` } +func parseQueryResponse(httpRes *http.Response, attempts int) (qRes *queryResponse, err error) { + var bytesIn []byte + if bytesIn, err = io.ReadAll(httpRes.Body); err != nil { + err = fmt.Errorf("failed to read response body: %w", err) + return + } + + if err = json.Unmarshal(bytesIn, &qRes); err != nil { + err = fmt.Errorf("failed to umarmshal response: %w", err) + } + return +} + func (r *queryResponse) queryTags() map[string]string { ret := map[string]string{} @@ -74,6 +87,7 @@ func (r *queryResponse) queryTags() map[string]string { return ret } + func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) { var bytesOut []byte if bytesOut, err = marshal(qReq); err != nil { @@ -94,25 +108,15 @@ func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) { return } - var ( - qRes queryResponse - bytesIn []byte - ) - - if bytesIn, err = io.ReadAll(httpRes.Body); err != nil { - err = fmt.Errorf("failed to read response body: %w", err) - return - } - - if err = json.Unmarshal(bytesIn, &qRes); err != nil { - err = fmt.Errorf("failed to umarmshal response: %w", err) + var qRes *queryResponse + if qRes, err = parseQueryResponse(httpRes, attempts); err != nil { return } cli.lastTxnTime.sync(qRes.TxnTime) qRes.Header = httpRes.Header - if err = getErrFauna(httpRes.StatusCode, &qRes, attempts); err != nil { + if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err != nil { return } @@ -123,10 +127,50 @@ func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) { } qSus = &QuerySuccess{ - QueryInfo: newQueryInfo(&qRes), + QueryInfo: newQueryInfo(qRes), Data: data, StaticType: qRes.StaticType, } qSus.Stats.Attempts = attempts return } + +type streamRequest struct { + apiRequest + Stream Stream + StartTS int64 +} + +func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) { + var bytesOut []byte + if bytesOut, err = marshal(streamReq); err != nil { + err = fmt.Errorf("marshal request failed: %w", err) + return + } + + var streamURL *url.URL + if streamURL, err = cli.parseStreamURL(); err != nil { + return + } + + var ( + attempts int + httpRes *http.Response + ) + if attempts, httpRes, err = streamReq.post(cli, streamURL, bytesOut); err != nil { + return + } + + if httpRes.StatusCode != http.StatusOK { + var qRes *queryResponse + if qRes, err = parseQueryResponse(httpRes, attempts); err == nil { + if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil { + err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode) + } + } + return + } + + bytes = httpRes.Body + return +} diff --git a/serializer.go b/serializer.go index 8eef3f8..6ca5ca9 100644 --- a/serializer.go +++ b/serializer.go @@ -500,6 +500,13 @@ func encode(v any, hint string) (any, error) { } } return out, nil + + case streamRequest: + out := map[string]any{"token": string(vt.Stream)} + if vt.StartTS > 0 { + out["start_ts"] = vt.StartTS + } + return out, nil } switch value := reflect.ValueOf(v); value.Kind() { diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..f0ffebd --- /dev/null +++ b/stream.go @@ -0,0 +1,137 @@ +package fauna + +import ( + "encoding/json" + "io" +) + +// Event represents a streaming event. +// +// All events contain the [fauna.Event.Type] and [fauna.Event.Stats] fields. +// +// Events of type "add", "update", and "remove" will contain the +// [fauna.Event.Data] field with the event's data in it. Data events have their +// [fauna.Event.Error] field set to nil. Data events can be umarmshalled into a +// user-defined struct via the [fauna.Event.Unmarshal] method. +// +// Events of type "status" and "error" will have their [fauna.Event.Data] field +// set to nil. Error events contain the [fauna.Event.Error] field present with +// the underlying error information. +type Event struct { + // Type is this event's type. + Type string `json:"type"` + + // TxnTime is the transaction time that produce this event. + TxnTime int64 `json:"txn_ts,omitempty"` + + // Data is the event's data. Data is set to nil if the Type field is set to + // "status" or "error". + Data any `json:"data,omitempty"` + + // Error contains error information when the event Type is set to "error". + Error *ErrEvent `json:"error,omitempty"` + + // Stats contains the ops acquired to process the event. + Stats Stats `json:"stats"` +} + +// Unmarshal will unmarshal the raw [fauna.Event.Data] (if present) into the +// known type provided as `into`. `into` must be a pointer to a map or struct. +func (e *Event) Unmarshal(into any) error { + return decodeInto(e.Data, into) +} + +// ErrEvent contains error information present in error events. +// +// Error events with "abort" code contain its aborting value present in the +// [fauan.ErrEvent.Abort]. The aborting values can be unmarshalled with the +// [fauna.ErrEvent.Unmarshal] method. +type ErrEvent struct { + // Code is the error's code. + Code string `json:"code"` + + // Message is the error's message. + Message string `json:"message"` + + // Abort is the error's abort data, present if Code == "abort". + Abort any `json:"abort"` +} + +// Unmarshal will unmarshal the raw [fauna.ErrEvent.Abort] (if present) into the +// known type provided as `into`. `into` must be a pointer to a map or struct. +func (e *ErrEvent) Unmarshal(into any) error { + return decodeInto(e.Abort, into) +} + +// Subscription is a Fauna stream subscription. +// +// Events can be obtained by reading from the [fauna.Subscription.Events] +// channel. Note that the events channel emits a nil event on closing. +// +// If the subscription gets closed unexpectedly, its closing error can be +// retrieved via the [fauna.Subscription.Error] method. +// +// A stream subscription can be gracefully closed via the +// [fauna.Subscription.Close] method. +type Subscription struct { + client *Client + stream Stream + + byteStream io.ReadCloser + events chan *Event + error error + closed bool +} + +// Events return the subscription's events channel. +func (s *Subscription) Events() <-chan *Event { return s.events } + +// Error returns the subscription's closing error, if any. +func (s *Subscription) Error() error { return s.error } + +// Close gracefully closes the stream subscription. +func (s *Subscription) Close() (err error) { + if !s.closed { + s.closed = true + err = s.byteStream.Close() + } + return +} + +func (s *Subscription) consume() { + defer close(s.events) + decoder := json.NewDecoder(s.byteStream) + + for { + event := &Event{} + if err := decoder.Decode(event); err != nil { + // NOTE: When closing the stream, a network error may occur as due + // to its socket closing while the json decoder is blocked reading + // it. Errors to close the socket are already emitted by the Close() + // method, therefore, we don't want to propagate them here again. + if !s.closed { + s.error = err + } + break + } + if err := convertEvent(event); err != nil { + s.error = err + break + } + s.events <- event + } +} + +func convertEvent(event *Event) (err error) { + if event.Data != nil { + if event.Data, err = convert(false, event.Data); err != nil { + return + } + } + if event.Error != nil && event.Error.Abort != nil { + if event.Error.Abort, err = convert(false, event.Error.Abort); err != nil { + return + } + } + return +} diff --git a/stream_test.go b/stream_test.go new file mode 100644 index 0000000..82a0626 --- /dev/null +++ b/stream_test.go @@ -0,0 +1,98 @@ +package fauna_test + +import ( + "testing" + + "github.com/fauna/fauna-go" + "github.com/stretchr/testify/require" +) + +func TestStreaming(t *testing.T) { + t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + t.Setenv(fauna.EnvFaunaSecret, "secret") + + client, clientErr := fauna.NewDefaultClient() + require.NoError(t, clientErr) + + setupQ, _ := fauna.FQL(` + Collection.byName('StreamingTest')?.delete() + Collection.create({ name: 'StreamingTest' }) + `, nil) + + _, err := client.Query(setupQ) + require.NoError(t, err) + + type TestDoc struct { + Foo string `fauna:"foo"` + } + + t.Run("multi-step streaming", func(t *testing.T) { + t.Run("Stream events", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil) + res, err := client.Query(streamQ) + require.NoError(t, err) + + var stream fauna.Stream + require.NoError(t, res.Unmarshal(&stream)) + + sub, err := client.Subscribe(stream) + require.NoError(t, err) + defer sub.Close() + + event := <-sub.Events() + require.NotNil(t, event) + require.Equal(t, event.Type, "status") + + createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) + _, err = client.Query(createQ) + require.NoError(t, err) + + event = <-sub.Events() + require.NotNil(t, event) + require.Equal(t, event.Type, "add") + + var doc TestDoc + require.NoError(t, event.Unmarshal(&doc)) + require.Equal(t, doc.Foo, "bar") + + require.NoError(t, sub.Close()) + require.NoError(t, sub.Error()) + }) + + t.Run("Handle subscription errors", func(t *testing.T) { + _, err := client.Subscribe(fauna.Stream("abc1234==")) + require.IsType(t, err, &fauna.ErrInvalidRequest{}) + }) + + t.Run("Handle error events", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().map(doc => abort('oops')).toStream()`, nil) + res, err := client.Query(streamQ) + require.NoError(t, err) + + var stream fauna.Stream + require.NoError(t, res.Unmarshal(&stream)) + + sub, err := client.Subscribe(stream) + require.NoError(t, err) + defer sub.Close() + + event := <-sub.Events() + require.NotNil(t, event) + require.Equal(t, event.Type, "status") + + createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) + _, err = client.Query(createQ) + require.NoError(t, err) + + event = <-sub.Events() + require.NotNil(t, event) + require.Equal(t, event.Type, "error") + require.Equal(t, event.Error.Code, "abort") + require.Equal(t, event.Error.Message, "Query aborted.") + + var msg string + require.NoError(t, event.Error.Unmarshal(&msg)) + require.Equal(t, msg, "oops") + }) + }) +}