-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysqldb.go
171 lines (134 loc) · 3.89 KB
/
mysqldb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package main
import (
"database/sql"
"fmt"
"strconv"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/op/go-logging"
)
type IMysqlDb interface {
Open() error
Close()
StoreMeasurement(thing *Thing, value string)
StoreSwitchState(thing *Thing, value string)
}
type MysqlDb struct {
log *logging.Logger
orgs *Orgs
Host string
Username string
Password string
Name string
Db *sql.DB
}
func NewMysqlDb(log *logging.Logger, orgs *Orgs, host, username, password, name string) IMysqlDb {
db := &MysqlDb{log: log, orgs: orgs}
db.Host = host
db.Username = username
db.Password = password
db.Name = name
db.Db = nil
return db
}
func (db *MysqlDb) Open() error {
db.log.Infof("Connecting to mysql database %s", db.Host)
// open database if host is specified
if db.Host == "" || db.Name == "" {
db.log.Warningf("Refusing to open mysql database, host or db name not specified")
return nil
}
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s", db.Username, db.Password, db.Host, db.Name)
db.log.Debugf("Mysql DSN: %s", dsn)
d, err := sql.Open("mysql", dsn)
if err != nil {
return err
}
// Open doesn't open a connection. Validate DSN data:
err = d.Ping()
if err != nil {
return err
}
db.Db = d
db.log.Infof("Connected to mysql database")
return nil
}
func (db *MysqlDb) Close() {
if db.Db != nil {
db.Db.Close()
}
}
func (db *MysqlDb) verifyOrg(thing *Thing) *Org {
if db.Db == nil {
db.log.Warningf("Mysql database is not initialized")
return nil
}
// get thing org
org, err := db.orgs.Get(thing.OrgId)
if err != nil {
db.log.Warningf("Store to database rejected for thing (%s) that is not assigned to org", thing.Id.Hex())
return nil
}
// mysql name needs to be configured
if org.MysqlDb == "" {
db.log.Warningf("Store to database rejected for thing (%s), where org (%s) has no mysql configuration", thing.Id.Hex(), org.Name)
return nil
}
return org
}
func (db *MysqlDb) getTimestamp(thing *Thing) int32 {
// generate unix timestamp
ts := int32(time.Now().Unix())
// alter timestamp to match low boundary of configured interval
if thing.StoreMysqlDbInterval > 0 {
ts = ts - (ts % thing.StoreMysqlDbInterval)
}
return ts
}
func (db *MysqlDb) StoreMeasurement(thing *Thing, value string) {
db.log.Debugf("Storing measurement to mysql db, thing: %s, val: %s", thing.Name, value)
// verify if all preconditions are met
org := db.verifyOrg(thing)
if org == nil {
return
}
// convert value to float
valueFloat, err := strconv.ParseFloat(value, 32)
if err != nil {
db.log.Errorf("Mysql database storage - float conversion error for value %s", value)
return
}
ts := db.getTimestamp(thing)
query := "INSERT IGNORE INTO piot_sensors (`id`, `org`, `class`, `value`, `time`) VALUES (?, ?, ?, ?, ?)"
r, err := db.Db.Query(query, thing.Id.Hex(), org.MysqlDb, thing.Sensor.Class, valueFloat, ts)
// Failure when trying to store data
if err != nil {
db.log.Errorf("Mysql database operation failed: %s", err.Error())
}
r.Close() // Always do this or you will leak connections
}
func (db *MysqlDb) StoreSwitchState(thing *Thing, value string) {
db.log.Debugf("Storing switch state to MysqlDb, thing: %s, val: %s", thing.Name, value)
// verify if all preconditions are met
org := db.verifyOrg(thing)
if org == nil {
return
}
if thing.Type != THING_TYPE_SWITCH {
// ignore things which don't represent switch
return
}
// convert value to int
valueInt, err := strconv.Atoi(value)
if err != nil {
db.log.Errorf("Mysql database storage - int conversion error for value %s", value)
return
}
ts := db.getTimestamp(thing)
query := "INSERT IGNORE INTO piot_switches (`id`, `org`, `value`, `time`) VALUES (?, ?, ?, ?)"
r, err := db.Db.Query(query, thing.Id.Hex(), org.MysqlDb, valueInt, ts)
if err != nil {
db.log.Errorf("Mysql database operation failed: %s", err.Error())
}
r.Close() // Always do this or you will leak connections
}