Skip to content

Commit

Permalink
Introduce support for stream event cursors
Browse files Browse the repository at this point in the history
  • Loading branch information
erickpintor committed Aug 21, 2024
1 parent 125d69f commit f1413a9
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 16 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -416,17 +416,25 @@ The [client configuration](#client-configuration) sets default query options for
`Stream()`. To override these options, see [query
options](#query-options).

The `Subscribe()` method accepts a `fauna.StartTime` function. You can
use `fauna.StartTime` to restart a stream after disconnection.
The `Subscribe()` method accepts the `fauna.StartTime` and `fauna.EventCursor`
function. Use `fauna.StartTime` to restart a stream at a specific timestamp.

```go
streamQuery, _ := fauna.FQL(`Product.all().toStream()`, nil)
client.Subscribe(streamQuery, fauna.StartTime(1710968002310000))
```

Use `fauna.EventCursor` to resume a stream after a disconnect:

```go
streamQuery, _ := fauna.FQL(`Product.all().toStream()`, nil)
client.Subscribe(streamQuery, fauna.EventCursor("abc2345=="))
```

| Function | Description |
| -------- | ----------- |
| `fauna.StartTime` | Sets the stream start time. Accepts an `int64` representing the start time in microseconds since the Unix epoch.<br><br>The start time is typically the time the stream disconnected.<br><br>The start time must be later than the creation time of the stream token. The period between the stream restart and the start time argument can't exceed the `history_days` value for source set's collection. If a collection's `history_days` is `0` or unset, the period can't exceed 15 minutes. |
| `fauna.StartTime` | Sets the stream start time. Accepts an `int64` representing the start time in microseconds since the Unix epoch.<br><br>The start time must be later than the creation time of the stream token. The period between the stream restart and the start time argument can't exceed the `history_days` value for source set's collection. If a collection's `history_days` is `0` or unset, the period can't exceed 15 minutes. |
| `fauna.EventCursor` | Resumes the stream after the given event cursor. Accepts a `string` with the cursor retrieved from a `fauna.Event`. |


## Contributing
Expand Down
10 changes: 9 additions & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,19 @@ type StreamOptFn func(req *streamRequest)

// StartTime set the streams starting timestamp.
//
// Usefull when resuming a stream after a failure.
// Usefull when resuming a stream at a given point in time.
func StartTime(ts int64) StreamOptFn {
return func(req *streamRequest) { req.StartTS = ts }
}

// EventCursor set the stream starting point based on a previously received
// event cursor.
//
// Usefull when resuming a stream after a failure.
func EventCursor(cursor string) StreamOptFn {
return func(req *streamRequest) { req.Cursor = cursor }
}

func argsStringFromMap(input map[string]string, currentArgs ...string) string {
params := url.Values{}

Expand Down
1 change: 1 addition & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ type streamRequest struct {
apiRequest
Stream Stream
StartTS int64
Cursor string
}

func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) {
Expand Down
3 changes: 3 additions & 0 deletions serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,9 @@ func encode(v any, hint string) (any, error) {
if vt.StartTS > 0 {
out["start_ts"] = vt.StartTS
}
if len(vt.Cursor) > 0 {
out["cursor"] = vt.Cursor
}
return out, nil

case []byte:
Expand Down
28 changes: 16 additions & 12 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type Event struct {
Type EventType
// TxnTime is the transaction time that produce this event.
TxnTime int64
// Cursor is the event's cursor, used for resuming streams after crashes.
Cursor string
// Data is the event's data.
Data any
// Stats contains the ops acquired to process the event.
Expand Down Expand Up @@ -85,12 +87,12 @@ func (e *ErrEvent) Unmarshal(into any) error {
// HTTP/2.x protocol where this restriction don't apply. However, if connecting
// to Fauna via an HTTP/1.x proxy, be aware of the events iterator closing time.
type Events struct {
client *Client
stream Stream
byteStream io.ReadCloser
decoder *json.Decoder
lastTxnTime int64
closed bool
client *Client
stream Stream
byteStream io.ReadCloser
decoder *json.Decoder
lastCursor string
closed bool
}

func subscribe(client *Client, stream Stream, opts ...StreamOptFn) (*Events, error) {
Expand All @@ -107,8 +109,8 @@ func (es *Events) reconnect(opts ...StreamOptFn) error {
es.client.ctx,
es.client.headers,
},
Stream: es.stream,
StartTS: es.lastTxnTime,
Stream: es.stream,
Cursor: es.lastCursor,
}

for _, streamOptionFn := range opts {
Expand Down Expand Up @@ -137,6 +139,7 @@ func (es *Events) Close() (err error) {
type rawEvent = struct {
Type EventType `json:"type"`
TxnTime int64 `json:"txn_ts"`
Cursor string `json:"cursor"`
Data any `json:"data,omitempty"`
Error *ErrEvent `json:"error,omitempty"`
Stats Stats `json:"stats"`
Expand All @@ -150,7 +153,7 @@ type rawEvent = struct {
func (es *Events) Next(event *Event) (err error) {
raw := rawEvent{}
if err = es.decoder.Decode(&raw); err == nil {
es.syncTxnTime(raw.TxnTime)
es.onNextEvent(&raw)
err = convertRawEvent(&raw, event)
if _, ok := err.(*ErrEvent); ok {
es.Close() // no more events are coming
Expand All @@ -169,9 +172,9 @@ func (es *Events) Next(event *Event) (err error) {
return
}

func (es *Events) syncTxnTime(txnTime int64) {
es.client.lastTxnTime.sync(txnTime)
es.lastTxnTime = txnTime
func (es *Events) onNextEvent(event *rawEvent) {
es.client.lastTxnTime.sync(event.TxnTime)
es.lastCursor = event.Cursor
}

func convertRawEvent(raw *rawEvent, event *Event) (err error) {
Expand All @@ -190,6 +193,7 @@ func convertRawEvent(raw *rawEvent, event *Event) (err error) {
}
event.Type = raw.Type
event.TxnTime = raw.TxnTime
event.Cursor = raw.Cursor
event.Data = raw.Data
event.Stats = raw.Stats
}
Expand Down
47 changes: 47 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,52 @@ func TestStreaming(t *testing.T) {
require.Equal(t, fauna.AddEvent, event.Type)
require.Equal(t, bar.TxnTime, event.TxnTime)
})

t.Run("Resume a stream at a given event cursor", func(t *testing.T) {
streamQ, _ := fauna.FQL(`StreamingTest.all().toStream()`, nil)
res, err := client.Query(streamQ)
require.NoError(t, err)

var stream fauna.Stream
require.NoError(t, res.Unmarshal(&stream))

events, err := client.Subscribe(stream)
require.NoError(t, err)
defer events.Close()

createFooQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'foo' })`, nil)
createBarQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil)

foo, err := client.Query(createFooQ)
require.NoError(t, err)

bar, err := client.Query(createBarQ)
require.NoError(t, err)

var event fauna.Event
err = events.Next(&event)
require.NoError(t, err)
require.Equal(t, fauna.StatusEvent, event.Type)

err = events.Next(&event)
require.NoError(t, err)
require.Equal(t, fauna.AddEvent, event.Type)
require.Equal(t, foo.TxnTime, event.TxnTime)
events.Close()

events, err = client.Subscribe(stream, fauna.EventCursor(event.Cursor))
require.NoError(t, err)
defer events.Close()

err = events.Next(&event)
require.NoError(t, err)
require.Equal(t, fauna.StatusEvent, event.Type)
require.GreaterOrEqual(t, foo.TxnTime, event.TxnTime)

err = events.Next(&event)
require.NoError(t, err)
require.Equal(t, fauna.AddEvent, event.Type)
require.Equal(t, bar.TxnTime, event.TxnTime)
})
})
}

0 comments on commit f1413a9

Please sign in to comment.