Skip to content

Commit

Permalink
Add streaming support (#153)
Browse files Browse the repository at this point in the history
Co-authored-by: Erick Pintor <erickpintor@gmail.com>
Co-authored-by: James Rodewig <james.rodewig@fauna.com>
  • Loading branch information
3 people committed May 20, 2024
1 parent 3aec1cd commit 93d2b46
Show file tree
Hide file tree
Showing 9 changed files with 875 additions and 70 deletions.
156 changes: 153 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,16 +263,166 @@ The maximum amount of time to wait before retrying a query. Retries will use an
package main

import (
"time"
"time"

"github.com/fauna/fauna-go"
"github.com/fauna/fauna-go"
)

func main() {
client := fauna.NewClient("mysecret", fauna.DefaultTimeouts(), fauna.MaxBackoff(10 * time.Second))
}
```


## Event Streaming

The driver supports [Event
Streaming](https://docs.fauna.com/fauna/current/learn/streaming).


### Start a stream

To get a stream token, append
[`toStream()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/tostream)
or
[`changesOn()`](https://docs.fauna.com/fauna/current/reference/reference/schema_entities/set/changeson)
to a set from a [supported
source](https://docs.fauna.com/fauna/current/reference/streaming_reference/#supported-sources).

To start and subscribe to the stream, pass a query that produces a stream token
to `Client.Stream()`:

```go
type Product struct {
Name string `fauna:"name"`
Description string `fauna:"description"`
Price float64 `fauna:"price"`
}

func main() {
client, clientErr := fauna.NewDefaultClient()
if clientErr != nil {
panic(clientErr)
}

streamQuery, _ := fauna.FQL("Product.all().toStream()", nil)
events, err := client.Stream(streamQuery)
if err != nil {
panic(err)
}
defer events.Close()

var event fauna.Event
for {
err := events.Next(&event)
if err != nil {
panic(err)
}

switch event.Type {
case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent:
var product Product
if err = event.Unmarshal(&product); err != nil {
panic(err)
}
fmt.Println(product)
}
}
}
```

In query results, the driver represents stream tokens as `fauna.Stream`
values.

To start a stream from a query result, call `Client.Subscribe()` on a
`fauna.Stream` value. This lets you output a stream alongside normal query
results:

```go
type Product struct {
Name string `fauna:"name"`
Description string `fauna:"description"`
Price float64 `fauna:"price"`
}

func main() {
client, clientErr := fauna.NewDefaultClient()
if clientErr != nil {
panic(clientErr)
}

dataLoad, _ := fauna.FQL(`
let products = Product.all()
{
Products: products.toArray(),
Stream: products.toStream()
}
`, nil)

data, err := client.Query(dataLoad)
if err != nil {
panic(err)
}

queryResult := struct {
Products []Product
Stream fauna.Stream
}{}

if err := data.Unmarshal(&queryResult); err != nil {
panic(err)
}

fmt.Println("Existing products:")
for _, product := range queryResult.Products {
fmt.Println(product)
}

events, err := client.Subscribe(queryResult.Stream)
if err != nil {
panic(err)
}
defer events.Close()

fmt.Println("Products from streaming:")
var event fauna.Event
for {
err := events.Next(&event)
if err != nil {
panic(err)
}
switch event.Type {
case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent:
var product Product
if err = event.Unmarshal(&product); err != nil {
panic(err)
}
fmt.Println(product)
}
}
}
```


### Stream options

The [client configuration](#client-configuration) sets default query options for
`Client.Stream()`. To override these options, see [query
options](#query-options).

The `Client.Subscribe()` method accepts a `fauna.StartTime` function. You can
use `fauna.StartTime` to restart a stream after disconnection.

```go
streamQuery, _ := fauna.FQL(`Product.all().toStream()`, nil)
client.Subscribe(streamQuery, fauna.StartTime(1710968002310000))
```

| Function | Description |
| -------- | ----------- |
| `fauna.StartTime` | Sets the stream start time. Accepts an `int64` representing the start time in microseconds since the Unix epoch.<br><br>The start time is typically the time the stream disconnected.<br><br>The start time must be later than the creation time of the stream token. The period between the stream restart and the start time argument can't exceed the `history_days` value for source set's collection. If a collection's `history_days` is `0` or unset, the period can't exceed 15 minutes. |


## Contributing

GitHub pull requests are very welcome.
Expand All @@ -291,4 +441,4 @@ Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied. See the License for the specific language governing
permissions and limitations under the License.
permissions and limitations under the License.
78 changes: 67 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"net"
"net/http"
"net/url"
"os"
"strings"
"time"
Expand Down Expand Up @@ -67,6 +68,9 @@ type Client struct {

maxAttempts int
maxBackoff time.Duration

// lazily cached URLs
queryURL, streamURL *url.URL
}

// NewDefaultClient initialize a [fauna.Client] with recommend default settings
Expand Down Expand Up @@ -125,15 +129,21 @@ func NewClient(secret string, timeouts Timeouts, configFns ...ClientConfigFn) *C
Timeout: timeouts.ConnectionTimeout,
}

// NOTE: prefer a response header timeout instead of a client timeout so
// that the client don't stop reading a http body that was produced by
// Fauna. On the query interface, an HTTP body is sent as a single http
// message. On the streaming interface, HTTP chunks are sent on every event.
// Therefore, it's in the driver's best interest to continue reading the
// HTTP body once the headers appear.
httpClient := &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: dialer.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 20,
IdleConnTimeout: timeouts.IdleConnectionTimeout,
Proxy: http.ProxyFromEnvironment,
DialContext: dialer.DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 20,
IdleConnTimeout: timeouts.IdleConnectionTimeout,
ResponseHeaderTimeout: timeouts.QueryTimeout + timeouts.ClientBufferTimeout,
},
Timeout: timeouts.QueryTimeout + timeouts.ClientBufferTimeout,
}

defaultHeaders := map[string]string{
Expand Down Expand Up @@ -173,6 +183,26 @@ func NewClient(secret string, timeouts Timeouts, configFns ...ClientConfigFn) *C
return client
}

func (c *Client) parseQueryURL() (url *url.URL, err error) {
if c.queryURL != nil {
url = c.queryURL
} else if url, err = url.Parse(c.url); err == nil {
url = url.JoinPath("query", "1")
c.queryURL = url
}
return
}

func (c *Client) parseStreamURL() (url *url.URL, err error) {
if c.streamURL != nil {
url = c.streamURL
} else if url, err = url.Parse(c.url); err == nil {
url = url.JoinPath("stream", "1")
c.streamURL = url
}
return
}

func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) {
req2 := req.Clone(req.Context())
body, rerr := io.ReadAll(req.Body)
Expand Down Expand Up @@ -244,17 +274,19 @@ func (c *Client) backoff(attempt int) (sleep time.Duration) {

// Query invoke fql optionally set multiple [QueryOptFn]
func (c *Client) Query(fql *Query, opts ...QueryOptFn) (*QuerySuccess, error) {
req := &fqlRequest{
Context: c.ctx,
Query: fql,
Headers: c.headers,
req := &queryRequest{
apiRequest: apiRequest{
Context: c.ctx,
Headers: c.headers,
},
Query: fql,
}

for _, queryOptionFn := range opts {
queryOptionFn(req)
}

return c.do(req)
return req.do(c)
}

// Paginate invoke fql with pagination optionally set multiple [QueryOptFn]
Expand All @@ -266,6 +298,30 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator {
}
}

// Stream initiates a stream subscription for the [fauna.Query].
//
// This is a syntax sugar for [fauna.Client.Query] and [fauna.Client.Subscribe].
//
// Note that the query provided MUST return [fauna.Stream] value. Otherwise,
// this method returns an error.
func (c *Client) Stream(fql *Query, opts ...QueryOptFn) (*Events, error) {
res, err := c.Query(fql, opts...)
if err != nil {
return nil, err
}

if stream, ok := res.Data.(Stream); ok {
return c.Subscribe(stream)
}

return nil, fmt.Errorf("expected query to return a fauna.Stream but got %T", res.Data)
}

// Subscribe initiates a stream subscription for the given stream value.
func (c *Client) Subscribe(stream Stream, opts ...StreamOptFn) (*Events, error) {
return subscribe(c, stream, opts...)
}

// QueryIterator is a [fauna.Client] iterator for paginated queries
type QueryIterator struct {
client *Client
Expand Down
Loading

0 comments on commit 93d2b46

Please sign in to comment.