From 93d2b46be093518e8017ccf4fa67441474634ab6 Mon Sep 17 00:00:00 2001 From: Lucas Pedroza <40873230+pnwpedro@users.noreply.github.com> Date: Mon, 20 May 2024 18:46:18 +0200 Subject: [PATCH] Add streaming support (#153) Co-authored-by: Erick Pintor Co-authored-by: James Rodewig --- README.md | 156 ++++++++++++++++++++++++++++++++- client.go | 78 ++++++++++++++--- client_example_test.go | 151 ++++++++++++++++++++++++++++++++ config.go | 22 +++-- request.go | 160 +++++++++++++++++++++++----------- serializer.go | 25 +++++- serializer_test.go | 6 ++ stream.go | 192 +++++++++++++++++++++++++++++++++++++++++ stream_test.go | 155 +++++++++++++++++++++++++++++++++ 9 files changed, 875 insertions(+), 70 deletions(-) create mode 100644 stream.go create mode 100644 stream_test.go diff --git a/README.md b/README.md index 2e4c536..7a4544c 100644 --- a/README.md +++ b/README.md @@ -263,9 +263,9 @@ The maximum amount of time to wait before retrying a query. Retries will use an package main import ( - "time" + "time" - "github.com/fauna/fauna-go" + "github.com/fauna/fauna-go" ) func main() { @@ -273,6 +273,156 @@ func main() { } ``` + +## Event Streaming + +The driver supports [Event +Streaming](https://docs.fauna.com/fauna/current/learn/streaming). + + +### Start a stream + +To get a stream token, append +[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream) +or +[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson) +to a set from a [supported +source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources). + +To start and subscribe to the stream, pass a query that produces a stream token +to `Client.Stream()`: + +```go +type Product struct { + Name string `fauna:"name"` + Description string `fauna:"description"` + Price float64 `fauna:"price"` +} + +func main() { + client, clientErr := fauna.NewDefaultClient() + if clientErr != nil { + panic(clientErr) + } + + streamQuery, _ := fauna.FQL("Product.all().toStream()", nil) + events, err := client.Stream(streamQuery) + if err != nil { + panic(err) + } + defer events.Close() + + var event fauna.Event + for { + err := events.Next(&event) + if err != nil { + panic(err) + } + + switch event.Type { + case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: + var product Product + if err = event.Unmarshal(&product); err != nil { + panic(err) + } + fmt.Println(product) + } + } +} +``` + +In query results, the driver represents stream tokens as `fauna.Stream` +values. + +To start a stream from a query result, call `Client.Subscribe()` on a +`fauna.Stream` value. This lets you output a stream alongside normal query +results: + +```go +type Product struct { + Name string `fauna:"name"` + Description string `fauna:"description"` + Price float64 `fauna:"price"` +} + +func main() { + client, clientErr := fauna.NewDefaultClient() + if clientErr != nil { + panic(clientErr) + } + + dataLoad, _ := fauna.FQL(` + let products = Product.all() + { + Products: products.toArray(), + Stream: products.toStream() + } + `, nil) + + data, err := client.Query(dataLoad) + if err != nil { + panic(err) + } + + queryResult := struct { + Products []Product + Stream fauna.Stream + }{} + + if err := data.Unmarshal(&queryResult); err != nil { + panic(err) + } + + fmt.Println("Existing products:") + for _, product := range queryResult.Products { + fmt.Println(product) + } + + events, err := client.Subscribe(queryResult.Stream) + if err != nil { + panic(err) + } + defer events.Close() + + fmt.Println("Products from streaming:") + var event fauna.Event + for { + err := events.Next(&event) + if err != nil { + panic(err) + } + switch event.Type { + case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: + var product Product + if err = event.Unmarshal(&product); err != nil { + panic(err) + } + fmt.Println(product) + } + } +} +``` + + +### Stream options + +The [client configuration](#client-configuration) sets default query options for +`Client.Stream()`. To override these options, see [query +options](#query-options). + +The `Client.Subscribe()` method accepts a `fauna.StartTime` function. You can +use `fauna.StartTime` to restart a stream after disconnection. + +```go +streamQuery, _ := fauna.FQL(`Product.all().toStream()`, nil) +client.Subscribe(streamQuery, fauna.StartTime(1710968002310000)) +``` + +| Function | Description | +| -------- | ----------- | +| `fauna.StartTime` | Sets the stream start time. Accepts an `int64` representing the start time in microseconds since the Unix epoch.

The start time is typically the time the stream disconnected.

The start time must be later than the creation time of the stream token. The period between the stream restart and the start time argument can't exceed the `history_days` value for source set's collection. If a collection's `history_days` is `0` or unset, the period can't exceed 15 minutes. | + + ## Contributing GitHub pull requests are very welcome. @@ -291,4 +441,4 @@ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing -permissions and limitations under the License. +permissions and limitations under the License. \ No newline at end of file diff --git a/client.go b/client.go index 84612e8..50e19b2 100644 --- a/client.go +++ b/client.go @@ -11,6 +11,7 @@ import ( "math/rand" "net" "net/http" + "net/url" "os" "strings" "time" @@ -67,6 +68,9 @@ type Client struct { maxAttempts int maxBackoff time.Duration + + // lazily cached URLs + queryURL, streamURL *url.URL } // NewDefaultClient initialize a [fauna.Client] with recommend default settings @@ -125,15 +129,21 @@ func NewClient(secret string, timeouts Timeouts, configFns ...ClientConfigFn) *C Timeout: timeouts.ConnectionTimeout, } + // NOTE: prefer a response header timeout instead of a client timeout so + // that the client don't stop reading a http body that was produced by + // Fauna. On the query interface, an HTTP body is sent as a single http + // message. On the streaming interface, HTTP chunks are sent on every event. + // Therefore, it's in the driver's best interest to continue reading the + // HTTP body once the headers appear. httpClient := &http.Client{ Transport: &http.Transport{ - Proxy: http.ProxyFromEnvironment, - DialContext: dialer.DialContext, - ForceAttemptHTTP2: true, - MaxIdleConns: 20, - IdleConnTimeout: timeouts.IdleConnectionTimeout, + Proxy: http.ProxyFromEnvironment, + DialContext: dialer.DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 20, + IdleConnTimeout: timeouts.IdleConnectionTimeout, + ResponseHeaderTimeout: timeouts.QueryTimeout + timeouts.ClientBufferTimeout, }, - Timeout: timeouts.QueryTimeout + timeouts.ClientBufferTimeout, } defaultHeaders := map[string]string{ @@ -173,6 +183,26 @@ func NewClient(secret string, timeouts Timeouts, configFns ...ClientConfigFn) *C return client } +func (c *Client) parseQueryURL() (url *url.URL, err error) { + if c.queryURL != nil { + url = c.queryURL + } else if url, err = url.Parse(c.url); err == nil { + url = url.JoinPath("query", "1") + c.queryURL = url + } + return +} + +func (c *Client) parseStreamURL() (url *url.URL, err error) { + if c.streamURL != nil { + url = c.streamURL + } else if url, err = url.Parse(c.url); err == nil { + url = url.JoinPath("stream", "1") + c.streamURL = url + } + return +} + func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) { req2 := req.Clone(req.Context()) body, rerr := io.ReadAll(req.Body) @@ -244,17 +274,19 @@ func (c *Client) backoff(attempt int) (sleep time.Duration) { // Query invoke fql optionally set multiple [QueryOptFn] func (c *Client) Query(fql *Query, opts ...QueryOptFn) (*QuerySuccess, error) { - req := &fqlRequest{ - Context: c.ctx, - Query: fql, - Headers: c.headers, + req := &queryRequest{ + apiRequest: apiRequest{ + Context: c.ctx, + Headers: c.headers, + }, + Query: fql, } for _, queryOptionFn := range opts { queryOptionFn(req) } - return c.do(req) + return req.do(c) } // Paginate invoke fql with pagination optionally set multiple [QueryOptFn] @@ -266,6 +298,30 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator { } } +// Stream initiates a stream subscription for the [fauna.Query]. +// +// This is a syntax sugar for [fauna.Client.Query] and [fauna.Client.Subscribe]. +// +// Note that the query provided MUST return [fauna.Stream] value. Otherwise, +// this method returns an error. +func (c *Client) Stream(fql *Query, opts ...QueryOptFn) (*Events, error) { + res, err := c.Query(fql, opts...) + if err != nil { + return nil, err + } + + if stream, ok := res.Data.(Stream); ok { + return c.Subscribe(stream) + } + + return nil, fmt.Errorf("expected query to return a fauna.Stream but got %T", res.Data) +} + +// Subscribe initiates a stream subscription for the given stream value. +func (c *Client) Subscribe(stream Stream, opts ...StreamOptFn) (*Events, error) { + return subscribe(c, stream, opts...) +} + // QueryIterator is a [fauna.Client] iterator for paginated queries type QueryIterator struct { client *Client diff --git a/client_example_test.go b/client_example_test.go index e4eb030..f203089 100644 --- a/client_example_test.go +++ b/client_example_test.go @@ -317,3 +317,154 @@ func ExampleClient_Paginate() { fmt.Printf("%d", len(items)) // Output: 20 } + +func ExampleClient_Stream() { + // IMPORTANT: just for the purpose of example, don't actually hardcode secret + _ = os.Setenv(fauna.EnvFaunaSecret, "secret") + _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + + client, err := fauna.NewDefaultClient() + if err != nil { + log.Fatalf("client should have been initialized: %s", err) + } + + // setup a collection + setupQuery, _ := fauna.FQL(` + if (!Collection.byName('StreamingSandbox').exists()) { + Collection.create({ name: 'StreamingSandbox' }) + } else { + StreamingSandbox.all().forEach(.delete()) + } + `, nil) + if _, err := client.Query(setupQuery); err != nil { + log.Fatalf("failed to setup the collection: %s", err) + } + + // create a stream + streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil) + events, err := client.Stream(streamQuery) + if err != nil { + log.Fatalf("failed to subscribe to the stream value: %s", err) + } + defer events.Close() + + // produce some events while the subscription is open + createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil) + updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil) + deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil) + + queries := []*fauna.Query{createQuery, updateQuery, deleteQuery} + for _, query := range queries { + if _, err := client.Query(query); err != nil { + log.Fatalf("failed execute CRUD query: %s", err) + } + } + + // fetch the produced events + type Data struct { + Foo string `fauna:"foo"` + } + + var event fauna.Event + + expect := 3 + for expect > 0 { + err := events.Next(&event) + if err != nil { + log.Fatalf("failed to receive next event: %s", err) + } + switch event.Type { + case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: + var data Data + if err := event.Unmarshal(&data); err != nil { + log.Fatalf("failed to unmarshal event data: %s", err) + } + fmt.Printf("Event: %s Data: %+v\n", event.Type, data) + expect-- + } + } + // Output: Event: add Data: {Foo:bar} + // Event: update Data: {Foo:baz} + // Event: remove Data: {Foo:baz} +} + +func ExampleClient_Subscribe() { + // IMPORTANT: just for the purpose of example, don't actually hardcode secret + _ = os.Setenv(fauna.EnvFaunaSecret, "secret") + _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + + client, err := fauna.NewDefaultClient() + if err != nil { + log.Fatalf("client should have been initialized: %s", err) + } + + // setup a collection + setupQuery, _ := fauna.FQL(` + if (!Collection.byName('StreamingSandbox').exists()) { + Collection.create({ name: 'StreamingSandbox' }) + } else { + StreamingSandbox.all().forEach(.delete()) + } + `, nil) + if _, err := client.Query(setupQuery); err != nil { + log.Fatalf("failed to setup the collection: %s", err) + } + + // create a stream + streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil) + result, err := client.Query(streamQuery) + if err != nil { + log.Fatalf("failed to create a stream: %s", err) + } + + var stream fauna.Stream + if err := result.Unmarshal(&stream); err != nil { + log.Fatalf("failed to unmarshal the stream value: %s", err) + } + + // initiate the stream subscription + events, err := client.Subscribe(stream) + if err != nil { + log.Fatalf("failed to subscribe to the stream value: %s", err) + } + defer events.Close() + + // produce some events while the subscription is open + createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil) + updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil) + deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil) + + queries := []*fauna.Query{createQuery, updateQuery, deleteQuery} + for _, query := range queries { + if _, err := client.Query(query); err != nil { + log.Fatalf("failed execute CRUD query: %s", err) + } + } + + // fetch the produced events + type Data struct { + Foo string `fauna:"foo"` + } + + var event fauna.Event + + expect := 3 + for expect > 0 { + err := events.Next(&event) + if err != nil { + log.Fatalf("failed to receive next event: %s", err) + } + switch event.Type { + case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: + var data Data + if err := event.Unmarshal(&data); err != nil { + log.Fatalf("failed to unmarshal event data: %s", err) + } + fmt.Printf("Event: %s Data: %+v\n", event.Type, data) + expect-- + } + } + // Output: Event: add Data: {Foo:bar} + // Event: update Data: {Foo:baz} + // Event: remove Data: {Foo:baz} +} diff --git a/config.go b/config.go index 5a44c4f..aada176 100644 --- a/config.go +++ b/config.go @@ -93,18 +93,18 @@ func URL(url string) ClientConfigFn { } // QueryOptFn function to set options on the [Client.Query] -type QueryOptFn func(req *fqlRequest) +type QueryOptFn func(req *queryRequest) // QueryContext set the [context.Context] for a single [Client.Query] func QueryContext(ctx context.Context) QueryOptFn { - return func(req *fqlRequest) { + return func(req *queryRequest) { req.Context = ctx } } // Tags set the tags header on a single [Client.Query] func Tags(tags map[string]string) QueryOptFn { - return func(req *fqlRequest) { + return func(req *queryRequest) { if val, exists := req.Headers[HeaderTags]; exists { req.Headers[HeaderTags] = argsStringFromMap(tags, strings.Split(val, ",")...) } else { @@ -115,19 +115,29 @@ func Tags(tags map[string]string) QueryOptFn { // Traceparent sets the header on a single [Client.Query] func Traceparent(id string) QueryOptFn { - return func(req *fqlRequest) { req.Headers[HeaderTraceparent] = id } + return func(req *queryRequest) { req.Headers[HeaderTraceparent] = id } } // Timeout set the query timeout on a single [Client.Query] func Timeout(dur time.Duration) QueryOptFn { - return func(req *fqlRequest) { + return func(req *queryRequest) { req.Headers[HeaderQueryTimeoutMs] = fmt.Sprintf("%d", dur.Milliseconds()) } } // Typecheck sets the header on a single [Client.Query] func Typecheck(enabled bool) QueryOptFn { - return func(req *fqlRequest) { req.Headers[HeaderTypecheck] = fmt.Sprintf("%v", enabled) } + return func(req *queryRequest) { req.Headers[HeaderTypecheck] = fmt.Sprintf("%v", enabled) } +} + +// StreamOptFn function to set options on the [Client.Subscribe] +type StreamOptFn func(req *streamRequest) + +// StartTime set the streams starting timestamp. +// +// Usefull when resuming a stream after a failure. +func StartTime(ts int64) StreamOptFn { + return func(req *streamRequest) { req.StartTS = ts } } func argsStringFromMap(input map[string]string, currentArgs ...string) string { diff --git a/request.go b/request.go index a1d5655..ad2e8d4 100644 --- a/request.go +++ b/request.go @@ -11,11 +11,42 @@ import ( "strings" ) -type fqlRequest struct { - Context context.Context - Headers map[string]string - Query any `fauna:"query"` - Arguments map[string]any `fauna:"arguments"` +type apiRequest struct { + Context context.Context + Headers map[string]string +} + +func (apiReq *apiRequest) post(cli *Client, url *url.URL, bytesOut []byte) (attempts int, httpRes *http.Response, err error) { + var httpReq *http.Request + if httpReq, err = http.NewRequestWithContext( + apiReq.Context, + http.MethodPost, + url.String(), + bytes.NewReader(bytesOut), + ); err != nil { + err = fmt.Errorf("failed to init request: %w", err) + return + } + + httpReq.Header.Set(headerAuthorization, `Bearer `+cli.secret) + if lastTxnTs := cli.lastTxnTime.string(); lastTxnTs != "" { + httpReq.Header.Set(HeaderLastTxnTs, lastTxnTs) + } + + for k, v := range apiReq.Headers { + httpReq.Header.Set(k, v) + } + + if attempts, httpRes, err = cli.doWithRetry(httpReq); err != nil { + err = ErrNetwork(fmt.Errorf("network error: %w", err)) + } + return +} + +type queryRequest struct { + apiRequest + Query any + Arguments map[string]any } type queryResponse struct { @@ -31,6 +62,19 @@ type queryResponse struct { Tags string `json:"query_tags"` } +func parseQueryResponse(httpRes *http.Response) (qRes *queryResponse, err error) { + var bytesIn []byte + if bytesIn, err = io.ReadAll(httpRes.Body); err != nil { + err = fmt.Errorf("failed to read response body: %w", err) + return + } + + if err = json.Unmarshal(bytesIn, &qRes); err != nil { + err = fmt.Errorf("failed to umarmshal response: %w", err) + } + return +} + func (r *queryResponse) queryTags() map[string]string { ret := map[string]string{} @@ -44,71 +88,89 @@ func (r *queryResponse) queryTags() map[string]string { return ret } -func (c *Client) do(request *fqlRequest) (*QuerySuccess, error) { - bytesOut, bytesErr := marshal(request) - if bytesErr != nil { - return nil, fmt.Errorf("marshal request failed: %w", bytesErr) +func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) { + var bytesOut []byte + if bytesOut, err = marshal(qReq); err != nil { + err = fmt.Errorf("marshal request failed: %w", err) + return } - reqURL, urlErr := url.Parse(c.url) - if urlErr != nil { - return nil, urlErr + var queryURL *url.URL + if queryURL, err = cli.parseQueryURL(); err != nil { + return } - if path, err := url.JoinPath(reqURL.Path, "query", "1"); err != nil { - return nil, err - } else { - reqURL.Path = path + var ( + attempts int + httpRes *http.Response + ) + if attempts, httpRes, err = qReq.post(cli, queryURL, bytesOut); err != nil { + return } - req, reqErr := http.NewRequestWithContext(request.Context, http.MethodPost, reqURL.String(), bytes.NewReader(bytesOut)) - if reqErr != nil { - return nil, fmt.Errorf("failed to init request: %w", reqErr) + var qRes *queryResponse + if qRes, err = parseQueryResponse(httpRes); err != nil { + return } - req.Header.Set(headerAuthorization, `Bearer `+c.secret) - if lastTxnTs := c.lastTxnTime.string(); lastTxnTs != "" { - req.Header.Set(HeaderLastTxnTs, lastTxnTs) - } + cli.lastTxnTime.sync(qRes.TxnTime) + qRes.Header = httpRes.Header - for k, v := range request.Headers { - req.Header.Set(k, v) + if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err != nil { + return } - attempts, r, doErr := c.doWithRetry(req) - if doErr != nil { - return nil, ErrNetwork(fmt.Errorf("network error: %w", doErr)) + var data any + if data, err = decode(qRes.Data); err != nil { + err = fmt.Errorf("failed to decode data: %w", err) + return } - var res queryResponse - - bin, readErr := io.ReadAll(r.Body) - if readErr != nil { - return nil, fmt.Errorf("failed to read response body: %w", readErr) + qSus = &QuerySuccess{ + QueryInfo: newQueryInfo(qRes), + Data: data, + StaticType: qRes.StaticType, } + qSus.Stats.Attempts = attempts + return +} - if unmarshalErr := json.Unmarshal(bin, &res); unmarshalErr != nil { - return nil, fmt.Errorf("failed to umarmshal response: %w", unmarshalErr) - } +type streamRequest struct { + apiRequest + Stream Stream + StartTS int64 +} - c.lastTxnTime.sync(res.TxnTime) - res.Header = r.Header +func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) { + var bytesOut []byte + if bytesOut, err = marshal(streamReq); err != nil { + err = fmt.Errorf("marshal request failed: %w", err) + return + } - if serviceErr := getErrFauna(r.StatusCode, &res, attempts); serviceErr != nil { - return nil, serviceErr + var streamURL *url.URL + if streamURL, err = cli.parseStreamURL(); err != nil { + return } - data, decodeErr := decode(res.Data) - if decodeErr != nil { - return nil, fmt.Errorf("failed to decode data: %w", decodeErr) + var ( + attempts int + httpRes *http.Response + ) + if attempts, httpRes, err = streamReq.post(cli, streamURL, bytesOut); err != nil { + return } - ret := &QuerySuccess{ - QueryInfo: newQueryInfo(&res), - Data: data, - StaticType: res.StaticType, + if httpRes.StatusCode != http.StatusOK { + var qRes *queryResponse + if qRes, err = parseQueryResponse(httpRes); err == nil { + if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil { + err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode) + } + } + return } - ret.Stats.Attempts = attempts - return ret, nil + bytes = httpRes.Body + return } diff --git a/serializer.go b/serializer.go index a3b12e2..6ca5ca9 100644 --- a/serializer.go +++ b/serializer.go @@ -34,6 +34,7 @@ const ( typeTagDoc typeTag = "@doc" typeTagRef typeTag = "@ref" typeTagSet typeTag = "@set" + typeTagStream typeTag = "@stream" typeTagMod typeTag = "@mod" typeTagObject typeTag = "@object" ) @@ -96,6 +97,8 @@ func (p Page) Unmarshal(into any) error { return decodeInto(p.Data, into) } +type Stream string + func mapDecoder(into any) (*mapstructure.Decoder, error) { return mapstructure.NewDecoder(&mapstructure.DecoderConfig{ TagName: "fauna", @@ -232,6 +235,8 @@ func unboxType(body map[string]any) (any, error) { return unboxRef(v.(map[string]any)) case typeTagSet: return unboxSet(v) + case typeTagStream: + return unboxStream(v) case typeTagDoc: return unboxDoc(v.(map[string]any)) case typeTagObject: @@ -400,6 +405,14 @@ func unboxSet(v any) (any, error) { return nil, fmt.Errorf("invalid set %v", v) } +func unboxStream(v any) (any, error) { + if token, ok := v.(string); ok { + return Stream(token), nil + } else { + return nil, fmt.Errorf("invalid stream %v", v) + } +} + func unboxTime(v string) (*time.Time, error) { if t, err := time.Parse(timeFormat, v); err != nil { return nil, err @@ -466,10 +479,13 @@ func encode(v any, hint string) (any, error) { case Page: return encodeFaunaStruct(typeTagSet, vt) + case Stream: + return map[typeTag]any{typeTagStream: vt}, nil + case time.Time: return encodeTime(vt, hint) - case fqlRequest: + case queryRequest: query, err := encode(vt.Query, hint) if err != nil { return nil, err @@ -484,6 +500,13 @@ func encode(v any, hint string) (any, error) { } } return out, nil + + case streamRequest: + out := map[string]any{"token": string(vt.Stream)} + if vt.StartTS > 0 { + out["start_ts"] = vt.StartTS + } + return out, nil } switch value := reflect.ValueOf(v); value.Kind() { diff --git a/serializer_test.go b/serializer_test.go index b9041f5..fc9476e 100644 --- a/serializer_test.go +++ b/serializer_test.go @@ -49,6 +49,7 @@ type BusinessObj struct { RefField Ref `fauna:"ref_field"` NamedRefField NamedRef `fauna:"named_ref_field"` SetField Page `fauna:"set_field"` + StreamField Stream `fauna:"stream_field"` ObjField SubBusinessObj `fauna:"obj_field"` DocField DocBusinessObj `fauna:"doc_field"` NamedDocField NamedDocBusinessObj `fauna:"named_doc_field"` @@ -266,6 +267,11 @@ func TestEncodingFaunaStructs(t *testing.T) { roundTripCheck(t, obj, `{"@set":{"data":["0","1","2"],"after":"foobarbaz"}}`) }) + t.Run("encodes Stream", func(t *testing.T) { + stream := Stream("abcd==") + roundTripCheck(t, stream, `{"@stream":"abcd=="}`) + }) + t.Run("encode NullDoc", func(t *testing.T) { obj := NullDocument{Cause: "Foo", Ref: &Ref{ID: "1234", Coll: &Module{"Foo"}}} roundTripCheck(t, obj, `{"cause": "Foo", "ref": {"@ref":{"id":"1234","coll":{"@mod":"Foo"}}}}`) diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..a081bcf --- /dev/null +++ b/stream.go @@ -0,0 +1,192 @@ +package fauna + +import ( + "encoding/json" + "io" + "net" +) + +// EventType represents a Fauna's event type. +type EventType string + +const ( + // AddEvent happens when a new value is added to the stream's watched set. + AddEvent EventType = "add" + // UpdateEvent happens when a value in the stream's watched set changes. + UpdateEvent EventType = "update" + // Remove event happens when a value in the stream's watched set is removed. + RemoveEvent EventType = "remove" + // StatusEvent happens periodically and comunicates the stream's latest + // transacion time as well as ops aquired during its idle period. + StatusEvent EventType = "status" +) + +// Event represents a streaming event. +// +// Events of type [fauna.StatusEvent] have its [fauna.Event.Data] field set to +// nil. Other event's [fauna.Data] can be unmarshalled via the +// [fauna.Event.Unmarshal] method. +type Event struct { + // Type is this event's type. + Type EventType + // TxnTime is the transaction time that produce this event. + TxnTime int64 + // Data is the event's data. + Data any + // Stats contains the ops acquired to process the event. + Stats Stats +} + +// Unmarshal will unmarshal the raw [fauna.Event.Data] (if present) into the +// known type provided as `into`. `into` must be a pointer to a map or struct. +func (e *Event) Unmarshal(into any) error { + return decodeInto(e.Data, into) +} + +// ErrEvent contains error information present in error events. +// +// Error events with "abort" code contain its aborting value present in the +// [fauan.ErrEvent.Abort]. The aborting values can be unmarshalled with the +// [fauna.ErrEvent.Unmarshal] method. +type ErrEvent struct { + // Code is the error's code. + Code string `json:"code"` + + // Message is the error's message. + Message string `json:"message"` + + // Abort is the error's abort data, present if [fauna.ErrEvent.Code] is + // equals to "abort". + Abort any `json:"abort,omitempty"` +} + +// Error provides the underlying error message. +func (e *ErrEvent) Error() string { + return e.Message +} + +// Unmarshal will unmarshal the raw [fauna.ErrEvent.Abort] (if present) into the +// known type provided as `into`. `into` must be a pointer to a map or struct. +func (e *ErrEvent) Unmarshal(into any) error { + return decodeInto(e.Abort, into) +} + +// Events is an iterator of Fauna events. +// +// The next available event can be obtained by calling the +// [fauna.Events.Next] method. Note this method blocks until the next +// event is available or the events iterator is closed via the +// [fauna.Events.Close] method. +// +// The events iterator wraps an [http.Response.Body] reader. As per Go's current +// [http.Response] implementation, environments using HTTP/1.x may not reuse its +// TCP connections for the duration of its "keep-alive" time if response body is +// not read to completion and closed. By default, Fauna's region groups use the +// HTTP/2.x protocol where this restriction don't apply. However, if connecting +// to Fauna via an HTTP/1.x proxy, be aware of the events iterator closing time. +type Events struct { + client *Client + stream Stream + byteStream io.ReadCloser + decoder *json.Decoder + lastTxnTime int64 +} + +func subscribe(client *Client, stream Stream, opts ...StreamOptFn) (*Events, error) { + events := &Events{client: client, stream: stream} + if err := events.reconnect(opts...); err != nil { + return nil, err + } + return events, nil +} + +func (es *Events) reconnect(opts ...StreamOptFn) error { + req := streamRequest{ + apiRequest: apiRequest{ + es.client.ctx, + es.client.headers, + }, + Stream: es.stream, + StartTS: es.lastTxnTime, + } + + for _, streamOptionFn := range opts { + streamOptionFn(&req) + } + + byteStream, err := req.do(es.client) + if err != nil { + return err + } + + es.byteStream = byteStream + es.decoder = json.NewDecoder(byteStream) + return nil +} + +// Close gracefully closes the events iterator. See [fauna.Events] for details. +func (es *Events) Close() (err error) { + return es.byteStream.Close() +} + +type rawEvent = struct { + Type EventType `json:"type"` + TxnTime int64 `json:"txn_ts"` + Data any `json:"data,omitempty"` + Error *ErrEvent `json:"error,omitempty"` + Stats Stats `json:"stats"` +} + +// Next blocks until the next event is available. +// +// Note that network errors of type [fauna.ErrEvent] are considered fatal and +// close the underlying stream. Calling next after an error event occurs will +// return an error. +func (es *Events) Next(event *Event) (err error) { + raw := rawEvent{} + if err = es.decoder.Decode(&raw); err == nil { + es.syncTxnTime(raw.TxnTime) + err = convertRawEvent(&raw, event) + if _, ok := err.(*ErrEvent); ok { + es.Close() // no more events are comming + } + } else { + // NOTE: This code tries to resume streams on network and IO errors. It + // presume that if the service is unavailable, the reconnect call will + // fail. Automatic retries and backoff mechanisms are impleneted at the + // Client level. + if _, ok := err.(net.Error); ok || err == io.ErrUnexpectedEOF { + if err = es.reconnect(); err == nil { + err = es.Next(event) + } + } + } + return +} + +func (es *Events) syncTxnTime(txnTime int64) { + es.client.lastTxnTime.sync(txnTime) + es.lastTxnTime = txnTime +} + +func convertRawEvent(raw *rawEvent, event *Event) (err error) { + if raw.Error != nil { + if raw.Error.Abort != nil { + if raw.Error.Abort, err = convert(false, raw.Error.Abort); err != nil { + return + } + } + err = raw.Error + } else { + if raw.Data != nil { + if raw.Data, err = convert(false, raw.Data); err != nil { + return + } + } + event.Type = raw.Type + event.TxnTime = raw.TxnTime + event.Data = raw.Data + event.Stats = raw.Stats + } + return +} diff --git a/stream_test.go b/stream_test.go new file mode 100644 index 0000000..8f03571 --- /dev/null +++ b/stream_test.go @@ -0,0 +1,155 @@ +package fauna_test + +import ( + "testing" + + "github.com/fauna/fauna-go" + "github.com/stretchr/testify/require" +) + +func TestStreaming(t *testing.T) { + t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) + t.Setenv(fauna.EnvFaunaSecret, "secret") + + client, clientErr := fauna.NewDefaultClient() + require.NoError(t, clientErr) + + setupQ, _ := fauna.FQL(` + Collection.byName('StreamingTest')?.delete() + Collection.create({ name: 'StreamingTest' }) + `, nil) + + _, err := client.Query(setupQ) + require.NoError(t, err) + + type TestDoc struct { + Foo string `fauna:"foo"` + } + + t.Run("single-step streaming", func(t *testing.T) { + t.Run("Stream events", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil) + events, err := client.Stream(streamQ) + require.NoError(t, err) + defer events.Close() + + var event fauna.Event + err = events.Next(&event) + require.NoError(t, err) + require.Equal(t, fauna.StatusEvent, event.Type) + }) + + t.Run("Fails on non-streamable values", func(t *testing.T) { + streamQ, _ := fauna.FQL(`"I'm a string"`, nil) + events, err := client.Stream(streamQ) + require.ErrorContains(t, err, "expected query to return a fauna.Stream but got string") + require.Nil(t, events) + }) + }) + + t.Run("multi-step streaming", func(t *testing.T) { + t.Run("Stream events", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil) + res, err := client.Query(streamQ) + require.NoError(t, err) + + var stream fauna.Stream + require.NoError(t, res.Unmarshal(&stream)) + + events, err := client.Subscribe(stream) + require.NoError(t, err) + defer events.Close() + + var event fauna.Event + err = events.Next(&event) + require.NoError(t, err) + require.Equal(t, fauna.StatusEvent, event.Type) + + createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) + _, err = client.Query(createQ) + require.NoError(t, err) + + err = events.Next(&event) + require.NoError(t, err) + require.Equal(t, fauna.AddEvent, event.Type) + + var doc TestDoc + require.NoError(t, event.Unmarshal(&doc)) + require.Equal(t, "bar", doc.Foo) + require.NoError(t, events.Close()) + }) + + t.Run("Handle subscription errors", func(t *testing.T) { + events, err := client.Subscribe(fauna.Stream("abc1234==")) + require.IsType(t, err, &fauna.ErrInvalidRequest{}) + require.Nil(t, events) + }) + + t.Run("Handle error events", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().map(doc => abort('oops')).toStream()`, nil) + res, err := client.Query(streamQ) + require.NoError(t, err) + + var stream fauna.Stream + require.NoError(t, res.Unmarshal(&stream)) + + events, err := client.Subscribe(stream) + require.NoError(t, err) + defer events.Close() + + var event fauna.Event + err = events.Next(&event) + require.NoError(t, err) + require.Equal(t, fauna.StatusEvent, event.Type) + + createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) + _, err = client.Query(createQ) + require.NoError(t, err) + + err = events.Next(&event) + require.IsType(t, err, &fauna.ErrEvent{}) + + evErr := err.(*fauna.ErrEvent) + require.Equal(t, "abort", evErr.Code) + require.Equal(t, "Query aborted.", evErr.Message) + + var msg string + require.NoError(t, evErr.Unmarshal(&msg)) + require.Equal(t, "oops", msg) + require.NoError(t, events.Close()) + }) + + t.Run("Resume a stream at a given start time", func(t *testing.T) { + streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil) + res, err := client.Query(streamQ) + require.NoError(t, err) + + var stream fauna.Stream + require.NoError(t, res.Unmarshal(&stream)) + + createFooQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'foo' })`, nil) + createBarQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) + + foo, err := client.Query(createFooQ) + require.NoError(t, err) + + bar, err := client.Query(createBarQ) + require.NoError(t, err) + + events, err := client.Subscribe(stream, fauna.StartTime(foo.TxnTime)) + require.NoError(t, err) + defer events.Close() + + var event fauna.Event + err = events.Next(&event) + require.NoError(t, err) + require.Equal(t, fauna.StatusEvent, event.Type) + require.GreaterOrEqual(t, event.TxnTime, foo.TxnTime) + + err = events.Next(&event) + require.NoError(t, err) + require.Equal(t, fauna.AddEvent, event.Type) + require.Equal(t, bar.TxnTime, event.TxnTime) + }) + }) +}