From 6b4e0d52a90062a5290f27b53566bef6c6d7aa5d Mon Sep 17 00:00:00 2001 From: Ratko Rudic Date: Mon, 16 Sep 2024 20:26:19 +0200 Subject: [PATCH] Adds support for MySQL publisher. MySQL publisher stores measurements into `readings` table. Each value received is stored as a new record. Value description and unit name is also stored with each record. This means that table is denormalised but it's also faster and easier to use. Publisher waits for all readings and inserts all values with one multiple INSERTs statement. This is way faster than saving each value separately. --- cmd/run.go | 35 +++++++++++++++++ docs/mbmd_mysql.md | 34 +++++++++++++++++ go.mod | 2 + go.sum | 4 ++ server/mysql.go | 95 ++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 170 insertions(+) create mode 100644 docs/mbmd_mysql.md create mode 100644 server/mysql.go diff --git a/cmd/run.go b/cmd/run.go index 9acceb94..9a33c824 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -156,6 +156,26 @@ any type is considered valid. "", "InfluxDB password (optional)", ) + runCmd.PersistentFlags().String( + "mysql-host", + "", + "MySQL host", + ) + runCmd.PersistentFlags().String( + "mysql-user", + "", + "MySQL user", + ) + runCmd.PersistentFlags().String( + "mysql-password", + "", + "MySQL password", + ) + runCmd.PersistentFlags().String( + "mysql-database", + "", + "MySQL database", + ) pflags := runCmd.PersistentFlags() @@ -167,6 +187,9 @@ any type is considered valid. // influx bindPFlagsWithPrefix(pflags, "influx", "url", "database", "measurement", "organization", "token", "user", "password") + + // mysql + bindPFlagsWithPrefix(pflags, "mysql", "host", "user", "password", "database") } // checkVersion validates if updates are available @@ -343,6 +366,18 @@ func run(cmd *cobra.Command, args []string) { tee.AttachRunner(server.NewSnipRunner(influx.Run)) } + // MySQL client + if viper.GetString("mysql.host") != "" { + mysql := server.NewMySQLClient( + viper.GetString("mysql.host"), + viper.GetString("mysql.user"), + viper.GetString("mysql.password"), + viper.GetString("mysql.database"), + ) + + tee.AttachRunner(server.NewSnipRunner(mysql.Run)) + } + ctx, cancel := context.WithCancel(context.Background()) go qe.Run(ctx, viper.GetDuration("rate"), cc, rc) diff --git a/docs/mbmd_mysql.md b/docs/mbmd_mysql.md new file mode 100644 index 00000000..893743d4 --- /dev/null +++ b/docs/mbmd_mysql.md @@ -0,0 +1,34 @@ +## mbmd and mysql + +Let mbmd write sensor data to mysql + +### Table + +Create this table in your database: + +``` +create table readings ( + device varchar(100), + measurement varchar(60), + value float, + tstamp int, + description varchar(100), + unit varchar(10), + primary key (device, measurement, tstamp), + index idx_tstamp (tstamp) +) +``` + +### MySQL parameters + +Run mbmd with: + +``` +mbmd run -a ... -d ... --mysql-host={your-db-host:3306} --mysql-user={your-mysql-user} --mysql-password={mysql-user-password} --mysql-database={mysql-database-name} +``` + +for example: + +``` +mbmd run -a192.168.1.123:1502 -dSOLAREDGE:1.0 -dSOLAREDGE:1.1 -r2s --mysql-database=solaredge --mysql-host=127.0.0.1:3306 --mysql-user=mbmd --mysql-password=secret +``` diff --git a/go.mod b/go.mod index 3db899eb..9b135a99 100644 --- a/go.mod +++ b/go.mod @@ -22,11 +22,13 @@ require ( ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fsnotify/fsnotify v1.7.0 // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/google/go-github v17.0.0+incompatible // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/uuid v1.6.0 // indirect diff --git a/go.sum b/go.sum index a4a9b3cb..943523f7 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/andig/gosunspec v0.0.0-20231205122018-1daccfa17912 h1:pphS+WUa9zK0ZH/Rt2JMEaOx0qz8dDzOzbr/W7BMiXQ= github.com/andig/gosunspec v0.0.0-20231205122018-1daccfa17912/go.mod h1:c6P6szcR+ROkqZruOR4f6qbDKFjZX6OitPpj+yJ/r8k= @@ -24,6 +26,8 @@ github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHk github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= diff --git a/server/mysql.go b/server/mysql.go new file mode 100644 index 00000000..bce42047 --- /dev/null +++ b/server/mysql.go @@ -0,0 +1,95 @@ +package server + +import ( + "database/sql" + "fmt" + "strings" + "sync" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +// MySQL publisher +type MySQL struct { + client *sql.DB +} + +// NewMySQLClient creates new publisher for MySQL +func NewMySQLClient( + host string, + user string, + password string, + database string, +) *MySQL { + connString := user + ":" + password + "@tcp(" + host + ")/" + database + db, err := sql.Open("mysql", connString) + if err != nil { + panic(err.Error()) + } + + err = db.Ping() + if err != nil { + panic(err.Error()) + } + + return &MySQL{ + client: db, + } +} + +// Run MySQL publisher +func (m *MySQL) Run(in <-chan QuerySnip) { + + var items []string + var vals []interface{} + var sql string + var mu sync.Mutex + + ticker := time.NewTicker(time.Second) + + for { + select { + case snip, ok := <-in: + if !ok { + return + } + + description, unit := snip.Measurement.DescriptionAndUnit() + + mu.Lock() + items = append(items, "(?, ?, ?, ?, ?, ?)") + vals = append(vals, snip.Device, snip.Measurement.String(), snip.Value, snip.Timestamp.Unix(), description, unit) + mu.Unlock() + + case <-ticker.C: + if len(items) == 0 { + fmt.Println("Nothing to do ...", time.Now().Unix()) + continue + } + + mu.Lock() + + sql = "INSERT INTO readings (device, measurement, value, tstamp, description, unit) " + + " VALUES " + strings.Join(items, ",") + + stmt, err := m.client.Prepare(sql) + if err != nil { + fmt.Println("Error preparing statement:", err) + mu.Unlock() + continue + } + if _, err := stmt.Exec(vals...); err != nil { + fmt.Println("Error executing statement:", err) + } + + fmt.Println("Added: ", time.Now().Unix(), len(items)) + + items = nil + vals = nil + mu.Unlock() + + stmt.Close() + } + } +}