@@ -14,13 +14,17 @@ import (
14
14
"net/http"
15
15
"net/url"
16
16
"os"
17
+ "os/signal"
18
+ "path"
17
19
"strings"
20
+ "syscall"
18
21
"time"
19
22
20
23
"github.com/bluesky-social/jetstream/pkg/models"
21
24
"github.com/gorilla/websocket"
22
- "github.com/karalabe/go-bluesky"
25
+ bluesky "github.com/karalabe/go-bluesky"
23
26
"github.com/klauspost/compress/zstd"
27
+ "github.com/tailscale/setec/client/setec"
24
28
)
25
29
26
30
var (
33
37
webhookURL = flag .String ("slack-webhook-url" , envOr ("SLACK_WEBHOOK_URL" , "" ),
34
38
"slack webhook URL (required)" )
35
39
bskyServerURL = flag .String ("bsky-server-url" , envOr ("BSKY_SERVER_URL" ,
36
- "https://bsky.network " ), "bluesky PDS server URL" )
40
+ "https://bsky.social " ), "bluesky PDS server URL" )
37
41
watchWord = flag .String ("watch-word" , envOr ("WATCH_WORD" , "tailscale" ),
38
42
"the word to watch out for. may be multiple words in future (required)" )
43
+
44
+ secretsURL = flag .String ("secrets-url" , envOr ("SECRETS_URL" , "" ),
45
+ "the URL of a secrets server (if empty, no server is used)" )
46
+ secretsPrefix = flag .String ("secrets-prefix" , envOr ("SECRETS_PREFIX" , "" ),
47
+ "the prefix to prepend to secret names fetched from --secrets-url" )
39
48
)
40
49
41
50
// Public addresses of jetstream websocket services.
@@ -51,6 +60,10 @@ var jetstreams = []string{
51
60
// Only the DecodeAll method may be used.
52
61
var zstdDecoder * zstd.Decoder
53
62
63
+ // httpClient must be used for all HTTP requests. It is a variable so that it
64
+ // can be replaced with a proxy.
65
+ var httpClient = http .DefaultClient
66
+
54
67
func init () {
55
68
// Jetstream uses a custom zstd dictionary, so make sure we do the same.
56
69
var err error
@@ -65,20 +78,38 @@ func main() {
65
78
// TODO(creachadair): Usage text.
66
79
67
80
switch {
68
- case * webhookURL == "" :
81
+ case * webhookURL == "" && * secretsURL == "" :
69
82
log .Fatal ("missing slack webhook URL (SLACK_WEBHOOK_URL)" )
70
83
case * bskyServerURL == "" :
71
84
log .Fatal ("missing Bluesky server URL (BSKY_SERVER_URL)" )
72
85
case * bskyHandle == "" :
73
86
log .Fatal ("Missing Bluesky account handle (BSKY_HANDLE)" )
74
- case * bskyAppKey == "" :
87
+ case * bskyAppKey == "" && * secretsURL == "" :
75
88
log .Fatal ("missing Bluesky app secret (BSKY_APP_PASSWORD)" )
76
89
case * watchWord == "" :
77
90
log .Fatal ("missing watchword" )
78
91
}
79
92
93
+ ctx , cancel := signal .NotifyContext (context .Background (), syscall .SIGINT , syscall .SIGTERM )
94
+ defer cancel ()
95
+
96
+ if * secretsURL != "" {
97
+ webhookSecret := path .Join (* secretsPrefix , "slack-webhook-url" )
98
+ appKeySecret := path .Join (* secretsPrefix , "bluesky-app-key" )
99
+ st , err := setec .NewStore (ctx , setec.StoreConfig {
100
+ Client : setec.Client {Server : * secretsURL , DoHTTP : httpClient .Do },
101
+ Secrets : []string {webhookSecret , appKeySecret },
102
+ })
103
+ if err != nil {
104
+ log .Fatalf ("initialize secrets store: %v" , err )
105
+ }
106
+ * webhookURL = st .Secret (webhookSecret ).GetString ()
107
+ * bskyAppKey = st .Secret (appKeySecret ).GetString ()
108
+ log .Printf ("Fetched client secrets from %q" , * secretsURL )
109
+ }
110
+
80
111
nextAddr := nextWSAddress ()
81
- for {
112
+ for ctx . Err () == nil {
82
113
wsURL := url.URL {
83
114
Scheme : "wss" ,
84
115
Host : nextAddr (),
@@ -87,7 +118,7 @@ func main() {
87
118
}
88
119
slog .Info ("ws connecting" , "url" , wsURL .String ())
89
120
90
- err := websocketConnection (wsURL )
121
+ err := websocketConnection (ctx , wsURL )
91
122
slog .Error ("ws connection" , "url" , wsURL , "err" , err )
92
123
93
124
// TODO(erisa): exponential backoff
@@ -119,13 +150,12 @@ func nextWSAddress() func() string {
119
150
}
120
151
}
121
152
122
- func websocketConnection (wsUrl url.URL ) error {
153
+ func websocketConnection (ctx context. Context , wsUrl url.URL ) error {
123
154
// add compression headers
124
155
headers := http.Header {}
125
156
headers .Add ("Socket-Encoding" , "zstd" )
126
157
127
158
c , _ , err := websocket .DefaultDialer .Dial (wsUrl .String (), headers )
128
-
129
159
if err != nil {
130
160
return fmt .Errorf ("dial jetstream: %v" , err )
131
161
}
@@ -135,9 +165,7 @@ func websocketConnection(wsUrl url.URL) error {
135
165
return nil
136
166
})
137
167
138
- ctx := context .Background ()
139
-
140
- bsky , err := bluesky .Dial (ctx , * bskyServerURL )
168
+ bsky , err := bluesky .DialWithClient (ctx , * bskyServerURL , httpClient )
141
169
if err != nil {
142
170
log .Fatal ("dial bsky: " , err )
143
171
}
@@ -148,7 +176,7 @@ func websocketConnection(wsUrl url.URL) error {
148
176
log .Fatal ("login bsky: " , err )
149
177
}
150
178
151
- for {
179
+ for ctx . Err () == nil {
152
180
// bail if we take too long for a read
153
181
c .SetReadDeadline (time .Now ().Add (time .Second * 5 ))
154
182
@@ -157,15 +185,16 @@ func websocketConnection(wsUrl url.URL) error {
157
185
return err
158
186
}
159
187
160
- err = readJetstreamMessage (jetstreamMessage , bsky )
188
+ err = readJetstreamMessage (ctx , jetstreamMessage , bsky )
161
189
if err != nil {
162
190
log .Println ("error reading jetstream message: " , jetstreamMessage , err )
163
191
continue
164
192
}
165
193
}
194
+ return ctx .Err ()
166
195
}
167
196
168
- func readJetstreamMessage (jetstreamMessageEncoded []byte , bsky * bluesky.Client ) error {
197
+ func readJetstreamMessage (ctx context. Context , jetstreamMessageEncoded []byte , bsky * bluesky.Client ) error {
169
198
// Decompress the message
170
199
m , err := zstdDecoder .DecodeAll (jetstreamMessageEncoded , nil )
171
200
if err != nil {
@@ -189,7 +218,7 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
189
218
jetstreamMessageStr := string (jetstreamMessage )
190
219
191
220
go func () {
192
- profile , err := getBskyProfile (bskyMessage , bsky )
221
+ profile , err := getBskyProfile (ctx , bskyMessage , bsky )
193
222
if err != nil {
194
223
slog .Error ("fetch profile" , "err" , err , "msg" , jetstreamMessageStr )
195
224
return
@@ -201,7 +230,7 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
201
230
imageURL = fmt .Sprintf ("https://cdn.bsky.app/img/feed_fullsize/plain/%s/%s" , bskyMessage .Did , bskyMessage .Commit .Record .Embed .Images [0 ].Image .Ref .Link )
202
231
}
203
232
204
- err = sendToSlack (jetstreamMessageStr , bskyMessage , imageURL , * profile )
233
+ err = sendToSlack (ctx , jetstreamMessageStr , bskyMessage , imageURL , * profile )
205
234
if err != nil {
206
235
slog .Error ("slack error" , "err" , err )
207
236
}
@@ -211,8 +240,8 @@ func readJetstreamMessage(jetstreamMessageEncoded []byte, bsky *bluesky.Client)
211
240
return nil
212
241
}
213
242
214
- func getBskyProfile (bskyMessage BskyMessage , bsky * bluesky.Client ) (* bluesky.Profile , error ) {
215
- profile , err := bsky .FetchProfile (context . Background () , bskyMessage .Did )
243
+ func getBskyProfile (ctx context. Context , bskyMessage BskyMessage , bsky * bluesky.Client ) (* bluesky.Profile , error ) {
244
+ profile , err := bsky .FetchProfile (ctx , bskyMessage .Did )
216
245
if err != nil {
217
246
return nil , err
218
247
}
@@ -225,7 +254,7 @@ func getBskyProfile(bskyMessage BskyMessage, bsky *bluesky.Client) (*bluesky.Pro
225
254
return profile , nil
226
255
}
227
256
228
- func sendToSlack (jetstreamMessageStr string , bskyMessage BskyMessage , imageURL string , profile bluesky.Profile ) error {
257
+ func sendToSlack (ctx context. Context , jetstreamMessageStr string , bskyMessage BskyMessage , imageURL string , profile bluesky.Profile ) error {
229
258
attachments := []SlackAttachment {
230
259
{
231
260
AuthorName : fmt .Sprintf ("%s (@%s)" , profile .Name , profile .Handle ),
@@ -246,20 +275,24 @@ func sendToSlack(jetstreamMessageStr string, bskyMessage BskyMessage, imageURL s
246
275
log .Printf ("failed to marshal text: %v" , err )
247
276
248
277
}
249
- res , err := http .Post (* webhookURL , "application/json" , bytes .NewBuffer (body ))
278
+ req , err := http .NewRequestWithContext (ctx , "POST" , * webhookURL , bytes .NewReader (body ))
279
+ if err != nil {
280
+ return err
281
+ }
282
+ req .Header .Set ("Content-Type" , "application/json" )
283
+ res , err := httpClient .Do (req )
250
284
if err != nil {
251
285
slog .Error ("failed to post to slack" , "msg" , jetstreamMessageStr )
252
286
return err
253
287
}
288
+ defer res .Body .Close ()
254
289
255
290
if res .StatusCode != http .StatusOK {
256
291
body , err := io .ReadAll (res .Body )
257
292
if err != nil {
258
293
slog .Error ("bad error code from slack and fail to read body" , "statusCode" , res .StatusCode , "msg" , jetstreamMessageStr )
259
294
return err
260
295
}
261
- defer res .Body .Close ()
262
-
263
296
slog .Error ("error code response from slack" , "statusCode" , res .StatusCode , "responseBody" , string (body ), "msg" , jetstreamMessageStr )
264
297
return fmt .Errorf ("slack: %s %s" , res .Status , string (body ))
265
298
}
0 commit comments