-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient.go
54 lines (45 loc) · 1.67 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package gq
import (
"context"
"database/sql"
"fmt"
"github.com/jmoiron/sqlx"
"github.com/mattbonnell/gq/internal"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func init() {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
// Client represents a client of the message queue. It can be used to spawn any number of consumers or producers.
type Client struct {
db *sqlx.DB
}
// NewClient creates a new Client, generating the database schema if it doesn't exist
func NewClient(db *sql.DB, driverName string) (*Client, error) {
log.Debug().Msg("creating new client")
c := Client{db: sqlx.NewDb(db, driverName)}
if err := internal.CreateSchema(c.db); err != nil {
err = fmt.Errorf("error creating schema: %s", err)
log.Debug().Msg(err.Error())
return nil, err
}
log.Debug().Msg("client created")
return &c, nil
}
// NewConsumer creates a new gq Consumer. It begins pulling messages immediately, and passes each one to the supplied process function
func (c Client) NewConsumer(ctx context.Context, p ProcessFunc) (*Consumer, error) {
return newConsumer(ctx, c.db, p, nil)
}
// NewConsumerWithOptions creates a new gq Consumer with the supplied options.
func (c Client) NewConsumerWithOptions(ctx context.Context, p ProcessFunc, opts ConsumerOptions) (*Consumer, error) {
return newConsumer(ctx, c.db, p, &opts)
}
// NewProducer creates a new gq Producer
func (c Client) NewProducer(ctx context.Context) (*Producer, error) {
return newProducer(ctx, c.db, nil)
}
// NewProducerWithOptions creates a new gq Producer with the supplied options
func (c Client) NewProducerWithOptions(ctx context.Context, opts ProducerOptions) (*Producer, error) {
return newProducer(ctx, c.db, &opts)
}