Skip to content

Commit

Permalink
introduce syntax sugar for simple streams
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Apr 24, 2024
1 parent 9c836a9 commit fca3d67
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 0 deletions.
19 changes: 19 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,25 @@ func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator {
}
}

// Stream initiates a stream subscription for the [fauna.Query].
//
// This is a syntax sugar for [fauna.Client.Query] and [fauna.Client.Subscribe].
//
// Note that the query provided MUST return [fauna.Stream] value. Otherwise,
// this method returns an error.
func (c *Client) Stream(fql *Query, opts ...QueryOptFn) (*Events, error) {
res, err := c.Query(fql, opts...)
if err != nil {
return nil, err
}

if stream, ok := res.Data.(Stream); ok {
return c.Subscribe(stream)
}

return nil, fmt.Errorf("expected query to return a fauna.Stream but got %T", res.Data)
}

// Subscribe initiates a stream subscription for the given stream value.
func (c *Client) Subscribe(stream Stream, opts ...StreamOptFn) (*Events, error) {
req := streamRequest{
Expand Down
65 changes: 65 additions & 0 deletions client_example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,71 @@ func ExampleClient_Paginate() {
// Output: 20
}

func ExampleClient_Stream() {
// IMPORTANT: just for the purpose of example, don't actually hardcode secret
_ = os.Setenv(fauna.EnvFaunaSecret, "secret")
_ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal)

client, err := fauna.NewDefaultClient()
if err != nil {
log.Fatalf("client should have been initialized: %s", err)
}

// setup a collection
setupQuery, _ := fauna.FQL(`
Collection.byName('StreamingSandbox')?.delete()
Collection.create({ name: 'StreamingSandbox' })
`, nil)
if _, err := client.Query(setupQuery); err != nil {
log.Fatalf("failed to setup the collection: %s", err)
}

// create a stream
streamQuery, _ := fauna.FQL(`StreamingSandbox.all().toStream()`, nil)
events, err := client.Stream(streamQuery)
if err != nil {
log.Fatalf("failed to subscribe to the stream value: %s", err)
}
defer events.Close()

// produce some events while the subscription is open
createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil)
updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil)
deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil)

queries := []*fauna.Query{createQuery, updateQuery, deleteQuery}
for _, query := range queries {
if _, err := client.Query(query); err != nil {
log.Fatalf("failed execute CRUD query: %s", err)
}
}

// fetch the produced events
type Data struct {
Foo string `fauna:"foo"`
}

expect := 3
for expect > 0 {
event, err := events.Next()
if err != nil {
log.Fatalf("failed to receive next event: %s", err)
}
switch event.Type {
case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent:
var data Data
if err := event.Unmarshal(&data); err != nil {
log.Fatalf("failed to unmarshal event data: %s", err)
}
fmt.Printf("Event: %s Data: %+v\n", event.Type, data)
expect--
}
}
// Output: Event: add Data: {Foo:bar}
// Event: update Data: {Foo:baz}
// Event: remove Data: {Foo:baz}
}

func ExampleClient_Subscribe() {
// IMPORTANT: just for the purpose of example, don't actually hardcode secret
_ = os.Setenv(fauna.EnvFaunaSecret, "secret")
Expand Down
20 changes: 20 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,26 @@ func TestStreaming(t *testing.T) {
Foo string `fauna:"foo"`
}

t.Run("single-step streaming", func(t *testing.T) {
t.Run("Stream events", func(t *testing.T) {
streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil)
events, err := client.Stream(streamQ)
require.NoError(t, err)
defer events.Close()

event, err := events.Next()
require.NoError(t, err)
require.Equal(t, event.Type, fauna.StatusEvent)
})

t.Run("Fails on non-streamable values", func(t *testing.T) {
streamQ, _ := fauna.FQL(`"I'm a string"`, nil)
events, err := client.Stream(streamQ)
require.ErrorContains(t, err, "expected query to return a fauna.Stream but got string")
require.Nil(t, events)
})
})

t.Run("multi-step streaming", func(t *testing.T) {
t.Run("Stream events", func(t *testing.T) {
streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil)
Expand Down

0 comments on commit fca3d67

Please sign in to comment.