-
-
Notifications
You must be signed in to change notification settings - Fork 116
/
Copy pathdatasource.go
226 lines (200 loc) · 7.08 KB
/
datasource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
package timeliner
import (
"bytes"
"context"
"database/sql"
"encoding/gob"
"fmt"
"log"
"time"
)
func init() {
tdBuf := new(bytes.Buffer)
err := gob.NewEncoder(tdBuf).Encode(Metadata{})
if err != nil {
log.Fatalf("[FATAL] Unable to gob-encode metadata struct: %v", err)
}
metadataGobPrefix = tdBuf.Bytes()
}
// RegisterDataSource registers ds as a data source.
func RegisterDataSource(ds DataSource) error {
if ds.ID == "" {
return fmt.Errorf("missing ID")
}
if ds.Name == "" {
return fmt.Errorf("missing Name")
}
if ds.OAuth2.ProviderID != "" && ds.Authenticate != nil {
return fmt.Errorf("conflicting ways of obtaining authorization")
}
// register the data source
if _, ok := dataSources[ds.ID]; ok {
return fmt.Errorf("data source already registered: %s", ds.ID)
}
dataSources[ds.ID] = ds
return nil
}
func saveAllDataSources(db *sql.DB) error {
if len(dataSources) == 0 {
return nil
}
query := `INSERT INTO "data_sources" ("id", "name") VALUES`
var vals []interface{}
var count int
for _, ds := range dataSources {
if count > 0 {
query += ","
}
query += " (?, ?)"
vals = append(vals, ds.ID, ds.Name)
count++
}
query += " ON CONFLICT DO NOTHING"
_, err := db.Exec(query, vals...)
if err != nil {
return fmt.Errorf("writing data sources to DB: %v", err)
}
return nil
}
// DataSource has information about a
// data source that can be registered.
type DataSource struct {
// A snake_cased name of the service
// that uniquely identifies it from
// all others.
ID string
// The human-readable or brand name of
// the service.
Name string
// If the service authenticates with
// OAuth2, fill out this field.
OAuth2 OAuth2
// Otherwise, if the service uses some
// other form of authentication,
// Authenticate is a function which
// returns the credentials needed to
// access an account on the service.
Authenticate AuthenticateFn
// If the service enforces a rate limit,
// specify it here. You can abide it by
// getting an http.Client from the
// Account passed into NewClient.
RateLimit RateLimit
// NewClient is a function which takes
// information about the account and
// returns a type which can facilitate
// transactions with the service.
NewClient NewClientFn
}
// authFunc gets the authentication function for this
// service. If s.Authenticate is set, it returns that;
// if s.OAuth2 is set, it uses a standard OAuth2 func.
func (ds DataSource) authFunc() AuthenticateFn {
if ds.Authenticate != nil {
return ds.Authenticate
} else if ds.OAuth2.ProviderID != "" {
return func(userID string) ([]byte, error) {
return authorizeWithOAuth2(ds.OAuth2)
}
}
return nil
}
// OAuth2 defines which OAuth2 provider a service
// uses and which scopes it requires.
type OAuth2 struct {
// The ID of the service must be recognized
// by the OAuth2 app configuration.
ProviderID string
// The list of scopes to ask for during auth.
Scopes []string
}
// AuthenticateFn is a function that authenticates userID with a service.
// It returns the authorization or credentials needed to operate. The return
// value should be byte-encoded so it can be stored in the DB to be reused.
// To store arbitrary types, encode the value as a gob, for example.
type AuthenticateFn func(userID string) ([]byte, error)
// NewClientFn is a function that returns a client which, given
// the account passed in, can interact with a service provider.
type NewClientFn func(acc Account) (Client, error)
// Client is a type that can interact with a data source.
type Client interface {
// ListItems lists the items on the account. Items should be
// sent on itemChan as they are discovered, but related items
// should be combined onto a single ItemGraph so that their
// relationships can be stored. If the relationships are not
// discovered until later, that's OK: item processing is
// idempotent, so repeating an item from earlier will have no
// adverse effects (this is possible because a unique ID is
// required for each item).
//
// Implementations must honor the context's cancellation. If
// ctx.Done() is closed, the function should return. Typically,
// this is done by having an outer loop select over ctx.Done()
// and default, where the next page or set of items is handled
// in the default case.
//
// ListItems MUST close itemChan when returning. A
// `defer close(itemChan)` will usually suffice. Closing
// this channel signals to the processing goroutine that
// no more items are coming.
//
// Further options for listing items may be passed in opt.
//
// If opt.Filename is specified, the implementation is expected
// to open and list items from that file. If this is not
// supported, an error should be returned. Conversely, if a
// filename is not specified but required, an error should be
// returned.
//
// opt.Timeframe consists of two optional timestamp and/or item
// ID values. If set, item listings should be bounded in the
// respective direction by that timestamp / item ID. (Items
// are assumed to be part of a chronology; both timestamp and
// item ID *may be* provided, when possible, to accommodate
// data sources which do not constrain by timestamp but which
// do by item ID instead.) The respective time and item ID
// fields, if set, will not be in conflict, so either may be
// used if both are present. While it should be documented if
// timeframes are not supported, an error need not be returned
// if they cannot be honored.
//
// opt.Checkpoint consists of the last checkpoint for this
// account if the last call to ListItems did not finish and
// if a checkpoint was saved. If not nil, the checkpoint
// should be used to resume the listing instead of starting
// over from the beginning. Checkpoint values usually consist
// of page tokens or whatever state is required to resume. Call
// timeliner.Checkpoint to set a checkpoint. Checkpoints are not
// required, but if the implementation sets checkpoints, it
// should be able to resume from one, too.
ListItems(ctx context.Context, itemChan chan<- *ItemGraph, opt ListingOptions) error
}
// Timeframe represents a start and end time and/or
// a start and end item, where either value could be
// nil which means unbounded in that direction.
// When items are used as the timeframe boundaries,
// the ItemID fields will be populated. It is not
// guaranteed that any particular field will be set
// or unset just because other fields are set or unset.
// However, if both Since or both Until fields are
// set, that means the timestamp and items are
// correlated; i.e. the Since timestamp is (approx.)
// that of the item ID. Or, put another way: there
// will never be conflicts among the fields which
// are non-nil.
type Timeframe struct {
Since, Until *time.Time
SinceItemID, UntilItemID *string
}
func (tf Timeframe) String() string {
var sinceItemID, untilItemID string
if tf.SinceItemID != nil {
sinceItemID = *tf.SinceItemID
}
if tf.UntilItemID != nil {
untilItemID = *tf.UntilItemID
}
return fmt.Sprintf("{Since:%s Until:%s SinceItemID:%s UntilItemID:%s}",
tf.Since, tf.Until, sinceItemID, untilItemID)
}
var dataSources = make(map[string]DataSource)