Skip to content

Commit

Permalink
🚧 Bump InfluxDB code
Browse files Browse the repository at this point in the history
  • Loading branch information
yannbertrand committed Oct 22, 2024
1 parent 545bec4 commit b6de6d3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 45 deletions.
6 changes: 5 additions & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
APPLE_WATCH_INFLUX_DATABASE="apple_watch"
APPLE_WATCH_INFLUX_BUCKET="apple_watch"
APPLE_WATCH_INFLUX_ORG="my_org"
APPLE_WATCH_INFLUX_USERNAME="my_user"
APPLE_WATCH_INFLUX_PASSWORD="my_password"
APPLE_WATCH_INFLUX_TOKEN="my_token"
APPLE_WATCH_INFLUX_MEASUREMENT="workouts"
6 changes: 4 additions & 2 deletions grafana/provisioning/datasources/datasource.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ datasources:
access: proxy
orgId: 1
url: http://apple_watch_influxdb:8086
database: apple_watch
jsonData:
dbName: apple_watch
httpHeaderName1: 'Authorization'
timeInterval: 1d
isDefault: true
secureJsonData:
httpHeaderValue1: 'Token my_token'
editable: true
options:
path: /etc/grafana/provisioning/dashboards
119 changes: 77 additions & 42 deletions lib/influx.js
Original file line number Diff line number Diff line change
@@ -1,28 +1,31 @@
const Influx = require('influx')
const { InfluxDB, Point, HttpError } = require('@influxdata/influxdb-client')
const { SetupAPI } = require('@influxdata/influxdb-client-apis')
const { hostname } = require('node:os')
const { Writable } = require('node:stream')

let influx
let influxWriteApi
;(async () => {
influxWriteApi = await loadInfluxInstance()
})()

const stream = new Writable({
async write(chunk, encoding, callback) {
try {
await loadInfluxInstance()

const currentWorkout = JSON.parse(chunk.toString())

const influxPoints = [currentWorkout].map((workout) => ({
measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT,
tags: { type: workout.type, date: workout.startDate.substring(0, 10) },
fields: {
duration: workout.duration,
totalDistance: workout.totalDistance,
totalEnergyBurned: workout.totalEnergyBurned,
sourceName: workout.sourceName,
sourceVersion: workout.sourceVersion,
},
timestamp: new Date(workout.startDate),
}))
const influxPoints = [currentWorkout].map((workout) =>
new Point(process.env.APPLE_WATCH_INFLUX_MEASUREMENT)
.tag('type', 'date')
.floatField('duration', workout.duration)
.floatField('totalDistance', workout.totalDistance)
.floatField('totalEnergyBurned', workout.totalEnergyBurned)
.stringField('sourceName', workout.sourceName)
.stringField('sourceVersion', workout.sourceVersion)
.timestamp(new Date(workout.startDate))
)

await influx.writePoints(influxPoints)
console.log('*** WRITE POINTS ***')
influxWriteApi.writePoints(influxPoints)
} catch (error) {
console.error(error)
}
Expand All @@ -31,37 +34,69 @@ const stream = new Writable({
},
})

stream.on('close', () => {
closeInfluxInstance(influxWriteApi)
})

module.exports = () => {
return stream
}

const loadInfluxInstance = async () => {
if (influx !== undefined) {
return
}
async function loadInfluxInstance() {
const url = 'http://localhost:8086'
const org = process.env.APPLE_WATCH_INFLUX_ORG
const bucket = process.env.APPLE_WATCH_INFLUX_BUCKET
const token = process.env.APPLE_WATCH_INFLUX_TOKEN
const username = process.env.APPLE_WATCH_INFLUX_USERNAME
const password = process.env.APPLE_WATCH_INFLUX_PASSWORD

const influxInstance = new Influx.InfluxDB({
host: 'localhost',
database: process.env.APPLE_WATCH_INFLUX_DATABASE,
schema: [
{
measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT,
tags: ['type', 'date'],
fields: {
duration: Influx.FieldType.FLOAT,
totalDistance: Influx.FieldType.FLOAT,
totalEnergyBurned: Influx.FieldType.FLOAT,
sourceName: Influx.FieldType.STRING,
sourceVersion: Influx.FieldType.STRING,
console.log('*** ONBOARDING ***')
const setupApi = new SetupAPI(new InfluxDB({ url }))
try {
const { allowed } = await setupApi.getSetup()
if (allowed) {
await setupApi.postSetup({
body: {
org,
bucket,
username,
password,
token,
},
},
],
})

const databasesNames = await influxInstance.getDatabaseNames()
if (!databasesNames.includes(process.env.APPLE_WATCH_INFLUX_DATABASE)) {
await influxInstance.createDatabase(process.env.APPLE_WATCH_INFLUX_DATABASE)
})
console.log(`InfluxDB '${url}' is now onboarded.`)
} else {
console.log(`InfluxDB '${url}' has been already onboarded.`)
}
console.log('\nFinished SUCCESS')
} catch (e) {
console.error(e)
console.log('\nFinished ERROR')
}
// create a write API, expecting point timestamps in nanoseconds (can be also 's', 'ms', 'us')
const influxWriteApi = new InfluxDB({ url, token }).getWriteApi(org, bucket, 'ns')

// setup default tags for all writes through this API
influxWriteApi.useDefaultTags({ location: hostname() })

return influxWriteApi
}

influx = influxInstance
async function closeInfluxInstance(influxWriteApi) {
// WriteApi always buffer data into batches to optimize data transfer to InfluxDB server.
// writeApi.flush() can be called to flush the buffered data. The data is always written
// asynchronously, Moreover, a failed write (caused by a temporary networking or server failure)
// is retried automatically. Read `writeAdvanced.js` for better explanation and details.
//
// close() flushes the remaining buffered data and then cancels pending retries.
try {
await influxWriteApi.close()
console.log('FINISHED ...')
} catch (e) {
console.error(e)
if (e instanceof HttpError && e.statusCode === 401) {
console.log('Run ./onboarding.js to setup a new InfluxDB database.')
}
console.log('\nFinished ERROR')
}
}
17 changes: 17 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
"test": "jest"
},
"dependencies": {
"@influxdata/influxdb-client": "^1.35.0",
"@influxdata/influxdb-client-apis": "^1.35.0",
"influx": "5.9.3",
"sax": "1.4.1",
"unzipper": "0.12.3",
Expand Down

0 comments on commit b6de6d3

Please sign in to comment.