-
-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathclient.go
129 lines (102 loc) · 2.64 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package goriak
import (
riak "github.com/basho/riak-go-client"
"crypto/tls"
"crypto/x509"
"errors"
"io/ioutil"
"strconv"
"strings"
)
// Session holds the connection to Riak
type Session struct {
riak *riak.Cluster
opts ConnectOpts
}
// ConnectOpts are the available options for connecting to your Riak instance
type ConnectOpts struct {
// Both Address and Addresses should be on the form HOST|IP[:PORT]
Address string // Address to a single Riak host. Will be used in case Addresses is empty
Addresses []string // Addresses to all Riak hosts.
// Username and password for connection to servers with secirity enabled
User string
Password string
// Path to root CA certificate. Required if security is used
CARootCert string
// Option to override port. Is set to 8087 by default
Port uint32
}
// Connect creates a new Riak connection. See ConnectOpts for the available options.
func Connect(opts ConnectOpts) (*Session, error) {
client := Session{
opts: opts,
}
err := client.connect()
if err != nil {
return nil, err
}
return &client, nil
}
func (c *Session) connect() error {
if len(c.opts.Addresses) == 0 {
c.opts.Addresses = []string{c.opts.Address}
}
var authOptions *riak.AuthOptions
// Build auth options
if c.opts.User != "" {
rootCertPemData, err := ioutil.ReadFile(c.opts.CARootCert)
if err != nil {
return errors.New("Opening CARootCert: " + err.Error())
}
rootCertPool := x509.NewCertPool()
if !rootCertPool.AppendCertsFromPEM(rootCertPemData) {
return errors.New("Invalid PEM certificate file")
}
tlsConf := &tls.Config{
InsecureSkipVerify: true,
RootCAs: rootCertPool,
}
authOptions = &riak.AuthOptions{
User: c.opts.User,
Password: c.opts.Password,
TlsConfig: tlsConf,
}
}
var nodes []*riak.Node
// Set to default port if not provided
port := c.opts.Port
if port == 0 {
port = 8087
}
for _, address := range c.opts.Addresses {
if !strings.Contains(address, ":") {
// Add port if not set in the user config
address = address + ":" + strconv.FormatUint(uint64(port), 10)
}
// Set ServerName based on the address we're connecting to
if authOptions != nil {
addressWithoutPort := address[0:strings.Index(address, ":")]
authOptions.TlsConfig.ServerName = addressWithoutPort
}
node, err := riak.NewNode(&riak.NodeOptions{
RemoteAddress: address,
AuthOptions: authOptions,
})
if err != nil {
return err
}
nodes = append(nodes, node)
}
con, err := riak.NewCluster(&riak.ClusterOptions{
Nodes: nodes,
})
if err != nil {
return err
}
err = con.Start()
if err != nil {
return err
}
c.riak = con
return nil
}