Skip to content
This repository was archived by the owner on Nov 5, 2022. It is now read-only.

Commit 7d90444

Browse files
authored
Merge pull request #18 from Fallenstedt/0.3.1
0.3.1
2 parents 79b021c + 6c28655 commit 7d90444

File tree

5 files changed

+111
-27
lines changed

5 files changed

+111
-27
lines changed

README.md

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,33 @@ Create a twitterstream instance with your access token from above.
4141
api := twitterstream.NewTwitterStream(tok.AccessToken)
4242
```
4343

44+
##### Set your unmarshal hook
45+
It is encouraged you set an unmarshal hook for thread-safety. Go's `bytes.Buffer` is not thread safe. Sharing a `bytes.Buffer`
46+
across multiple goroutines introduces risk of panics when decoding json [source](https://github.com/Fallenstedt/twitter-stream/issues/13).
47+
To avoid panics, it's encouraged to unmarshal json in the same goroutine where the `bytes.Buffer` exists. Use `SetUnmarshalHook` to set a function that unmarshals json.
48+
49+
By default, twitterstream's unmarshal hook will return `[]byte` if you want to live dangerously.
50+
51+
```go
52+
api.Stream.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
53+
// StreemData is a struct that represents your returned json
54+
// This is a quick resource to generate a struct from your json
55+
// https://mholt.github.io/json-to-go/
56+
data := StreamData{}
57+
if err := json.Unmarshal(bytes, &data); err != nil {
58+
log.Printf("Failed to unmarshal bytes: %v", err)
59+
}
60+
return data, err
61+
})
62+
```
63+
64+
65+
66+
4467
##### Start Stream
4568
Start your stream. This is a long-running HTTP GET request.
4669
You can get specific data you want by adding [query params](https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream).
47-
Additionally, [view an example of query params here](https://developer.twitter.com/en/docs/twitter-api/expansions).
70+
Additionally, [view an example of query params here](https://developer.twitter.com/en/docs/twitter-api/expansions), or in the [examples](https://github.com/fallenstedt/twitter-stream/tree/master/example)
4871

4972
```go
5073
err := api.Stream.StartStream("")
@@ -54,7 +77,7 @@ Additionally, [view an example of query params here](https://developer.twitter.c
5477
}
5578
```
5679

57-
4. Consume Messages from the Stream
80+
Consume Messages from the Stream
5881
Handle any `io.EOF` and other errors that arise first, then unmarshal your bytes into your favorite struct. Below is an example with strings
5982
```go
6083
go func() {

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.3.0
1+
0.3.1

example/main.go

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package main
22

33
import (
4+
"encoding/json"
45
"fmt"
56
twitterstream "github.com/fallenstedt/twitter-stream"
7+
"log"
68
"time"
79
)
810

@@ -13,12 +15,33 @@ const secret = "YOUR_SECRET"
1315
func main() {
1416
// Use your favorite function from below here
1517
startStream()
16-
//addRules()
17-
//getRules()
18-
//deleteRules()
18+
addRules()
19+
getRules()
20+
deleteRules()
21+
}
22+
23+
type StreamData struct {
24+
Data struct {
25+
Text string `json:"text"`
26+
ID string `json:"id"`
27+
CreatedAt time.Time `json:"created_at"`
28+
AuthorID string `json:"author_id"`
29+
} `json:"data"`
30+
Includes struct {
31+
Users []struct {
32+
ID string `json:"id"`
33+
Name string `json:"name"`
34+
Username string `json:"username"`
35+
} `json:"users"`
36+
} `json:"includes"`
37+
MatchingRules []struct {
38+
ID int64 `json:"id"`
39+
Tag string `json:"tag"`
40+
} `json:"matching_rules"`
1941
}
2042

2143

44+
2245
func startStream() {
2346
tok, err := twitterstream.NewTokenGenerator().SetApiKeyAndSecret(key, secret).RequestBearerToken()
2447

@@ -27,8 +50,18 @@ func startStream() {
2750
}
2851

2952
api := twitterstream.NewTwitterStream(tok.AccessToken)
53+
// It is encouraged you unmarashal json with twitterstream's unmarshal hook. This is a thread-safe
54+
// way to unmarshal json
55+
api.Stream.SetUnmarshalHook(func(bytes []byte) (interface{}, error) {
56+
data := StreamData{}
57+
if err := json.Unmarshal(bytes, &data); err != nil {
58+
log.Printf("Failed to unmarshal bytes: %v", err)
59+
}
60+
return data, err
61+
})
62+
defer api.Stream.StopStream()
3063

31-
err = api.Stream.StartStream("")
64+
err = api.Stream.StartStream("?expansions=author_id&tweet.fields=created_at")
3265

3366
if err != nil {
3467
panic(err)
@@ -39,7 +72,14 @@ func startStream() {
3972
if message.Err != nil {
4073
panic(message.Err)
4174
}
42-
fmt.Println(string(message.Data))
75+
76+
// Type assertion
77+
tweet, ok := message.Data.(StreamData)
78+
if !ok {
79+
continue
80+
}
81+
82+
fmt.Println(tweet.Data.Text)
4383
}
4484
}()
4585

stream.go

Lines changed: 34 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,78 +2,92 @@ package twitterstream
22

33
import (
44
"net/http"
5-
"sync"
65
)
76

7+
88
type (
9+
// UnmarshalHook is a function that will unmarshal json.
10+
UnmarshalHook func([]byte) (interface{}, error)
11+
912
// IStream is the interface that the stream struct implements.
1013
IStream interface {
1114
StartStream(queryParams string) error
1215
StopStream()
1316
GetMessages() <-chan StreamMessage
17+
SetUnmarshalHook(hook UnmarshalHook)
1418
}
1519

1620
// StreamMessage is the message that is sent from the messages channel.
1721
StreamMessage struct {
18-
Data []byte
22+
Data interface{}
1923
Err error
2024
}
2125

22-
23-
stream struct {
26+
// Stream is the struct that manages a long running TCP connection with Twitter.
27+
// It accepts an 'unamarshalHook' which allows you to unmarshal json in a thread-safe manner.
28+
// It is highly encouraged to set a unmarshal hook before starting a stream. Unmarshaling json
29+
// in a separate goroutine is not recommended because the Go bytes.Buffer is not thread safe.
30+
Stream struct {
31+
unmarshalHook UnmarshalHook
2432
messages chan StreamMessage
2533
httpClient IHttpClient
2634
done chan struct{}
27-
group *sync.WaitGroup
2835
reader IStreamResponseBodyReader
2936
}
3037
)
3138

32-
func newStream(httpClient IHttpClient, reader IStreamResponseBodyReader) *stream {
33-
return &stream{
39+
func newStream(httpClient IHttpClient, reader IStreamResponseBodyReader) *Stream {
40+
return &Stream{
41+
unmarshalHook: func(bytes []byte) (interface{}, error) {
42+
return bytes, nil
43+
},
3444
messages: make(chan StreamMessage),
3545
done: make(chan struct{}),
36-
group: new(sync.WaitGroup),
3746
reader: reader,
3847
httpClient: httpClient,
3948
}
4049
}
4150

51+
// SetUnmarshalHook sets the function that unmarshals json. It is highly encouraged
52+
// that you unmarshal json with this hook to promote thread-safety. Go's bytes.Buffer is not
53+
// thread safe and can result in panics when a bytes.Buffer is shared across goroutines.
54+
func (s *Stream) SetUnmarshalHook(hook UnmarshalHook) {
55+
s.unmarshalHook = hook
56+
}
57+
4258
// GetMessages returns the read-only messages channel
43-
func (s *stream) GetMessages() <-chan StreamMessage {
59+
func (s *Stream) GetMessages() <-chan StreamMessage {
4460
return s.messages
4561
}
4662

4763
// StopStream sends a close signal to stop the stream of tweets.
48-
func (s *stream) StopStream() {
64+
func (s *Stream) StopStream() {
4965
close(s.done)
5066
}
5167

52-
// StartStream makes an HTTP GET request to twitter and starts streaming tweets to the Messages channel using Server Sent Events.
68+
// StartStream makes an HTTP GET request to twitter and starts streaming tweets to the Messages channel.
5369
// Accepts query params described in GET /2/tweets/search/stream to expand the payload that is returned. Query params string must begin with a ?.
5470
// See available query params here https://developer.twitter.com/en/docs/twitter-api/tweets/filtered-stream/api-reference/get-tweets-search-stream.
5571
// See an example here: https://developer.twitter.com/en/docs/twitter-api/expansions.
56-
func (s *stream) StartStream(optionalQueryParams string) error {
72+
func (s *Stream) StartStream(optionalQueryParams string) error {
5773
res, err := s.httpClient.getSearchStream(optionalQueryParams)
5874

5975
if err != nil {
6076
return err
6177
}
6278

6379
s.reader.setStreamResponseBody(res.Body)
64-
s.group.Add(1)
6580

6681
go s.streamMessages(res)
6782

6883
return nil
6984
}
7085

71-
func (s *stream) streamMessages(res *http.Response) {
86+
func (s *Stream) streamMessages(res *http.Response) {
7287
defer res.Body.Close()
73-
defer s.group.Done()
7488

7589
for !stopped(s.done) {
76-
data, err := s.reader.readNext()
90+
b, err := s.reader.readNext()
7791
if err != nil {
7892
s.messages <- StreamMessage{
7993
Data: nil,
@@ -82,14 +96,16 @@ func (s *stream) streamMessages(res *http.Response) {
8296
s.StopStream()
8397
break
8498
}
85-
if len(data) == 0 {
99+
if len(b) == 0 {
86100
// empty keep-alive
87101
continue
88102
}
89103

104+
data, err := s.unmarshalHook(b)
105+
90106
s.messages <- StreamMessage{
91107
Data: data,
92-
Err: nil,
108+
Err: err,
93109
}
94110
}
95111
}

stream_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@ func TestStartStream(t *testing.T) {
8787
}
8888
}()
8989
r := <-result
90-
if string(tt.result.Data) != string(r.Data) {
90+
91+
expected, _ := tt.result.Data.([]byte)
92+
res, _ := r.Data.([]byte)
93+
94+
95+
if string(expected) != string(res) {
9196
t.Errorf("got %v, want %s", result, tt.result)
9297
}
9398
})

0 commit comments

Comments
 (0)