diff --git a/.env b/.env index 3fa6e90..7a7e288 100644 --- a/.env +++ b/.env @@ -1,2 +1,6 @@ -APPLE_WATCH_INFLUX_DATABASE="apple_watch" +APPLE_WATCH_INFLUX_BUCKET="apple_watch" +APPLE_WATCH_INFLUX_ORG="my_org" +APPLE_WATCH_INFLUX_USERNAME="my_user" +APPLE_WATCH_INFLUX_PASSWORD="my_password" +APPLE_WATCH_INFLUX_TOKEN="my_token" APPLE_WATCH_INFLUX_MEASUREMENT="workouts" \ No newline at end of file diff --git a/grafana/provisioning/datasources/datasource.yml b/grafana/provisioning/datasources/datasource.yml index 3234081..1c2ae38 100644 --- a/grafana/provisioning/datasources/datasource.yml +++ b/grafana/provisioning/datasources/datasource.yml @@ -10,9 +10,12 @@ datasources: access: proxy orgId: 1 url: http://apple_watch_influxdb:8086 - database: apple_watch jsonData: + dbName: apple_watch + httpHeaderName1: 'Authorization' timeInterval: 1d + secureJsonData: + httpHeaderValue1: 'Token $APPLE_WATCH_INFLUX_TOKEN' isDefault: true editable: true options: diff --git a/lib/influx.js b/lib/influx.js index 69f3daa..223594d 100644 --- a/lib/influx.js +++ b/lib/influx.js @@ -1,28 +1,32 @@ -const Influx = require('influx') +const { InfluxDB, Point, HttpError } = require('@influxdata/influxdb-client') +const { SetupAPI } = require('@influxdata/influxdb-client-apis') +const { hostname } = require('node:os') const { Writable } = require('node:stream') -let influx +let influxWriteApi +;(async () => { + influxWriteApi = await loadInfluxInstance() +})() + const stream = new Writable({ async write(chunk, encoding, callback) { try { - await loadInfluxInstance() - const currentWorkout = JSON.parse(chunk.toString()) - const influxPoints = [currentWorkout].map((workout) => ({ - measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT, - tags: { type: workout.type, date: workout.startDate.substring(0, 10) }, - fields: { - duration: workout.duration, - totalDistance: workout.totalDistance, - totalEnergyBurned: workout.totalEnergyBurned, - sourceName: workout.sourceName, - sourceVersion: workout.sourceVersion, - }, - timestamp: new Date(workout.startDate), - })) + const influxPoints = [currentWorkout].map((workout) => + new Point(process.env.APPLE_WATCH_INFLUX_MEASUREMENT) + .tag('type', workout.type) + .tag('date', workout.startDate.substring(0, 10)) + .tag('sourceName', workout.sourceName) + .tag('sourceVersion', workout.sourceVersion) + .floatField('duration', workout.duration) + .floatField('totalDistance', workout.totalDistance) + .floatField('totalEnergyBurned', workout.totalEnergyBurned) + .timestamp(new Date(workout.startDate)) + ) - await influx.writePoints(influxPoints) + console.log('*** WRITE POINTS ***') + influxWriteApi.writePoints(influxPoints) } catch (error) { console.error(error) } @@ -31,37 +35,69 @@ const stream = new Writable({ }, }) +stream.on('close', () => { + closeInfluxInstance(influxWriteApi) +}) + module.exports = () => { return stream } -const loadInfluxInstance = async () => { - if (influx !== undefined) { - return - } +async function loadInfluxInstance() { + const url = 'http://localhost:8086' + const org = process.env.APPLE_WATCH_INFLUX_ORG + const bucket = process.env.APPLE_WATCH_INFLUX_BUCKET + const token = process.env.APPLE_WATCH_INFLUX_TOKEN + const username = process.env.APPLE_WATCH_INFLUX_USERNAME + const password = process.env.APPLE_WATCH_INFLUX_PASSWORD - const influxInstance = new Influx.InfluxDB({ - host: 'localhost', - database: process.env.APPLE_WATCH_INFLUX_DATABASE, - schema: [ - { - measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT, - tags: ['type', 'date'], - fields: { - duration: Influx.FieldType.FLOAT, - totalDistance: Influx.FieldType.FLOAT, - totalEnergyBurned: Influx.FieldType.FLOAT, - sourceName: Influx.FieldType.STRING, - sourceVersion: Influx.FieldType.STRING, + console.log('*** ONBOARDING ***') + const setupApi = new SetupAPI(new InfluxDB({ url })) + try { + const { allowed } = await setupApi.getSetup() + if (allowed) { + await setupApi.postSetup({ + body: { + org, + bucket, + username, + password, + token, }, - }, - ], - }) - - const databasesNames = await influxInstance.getDatabaseNames() - if (!databasesNames.includes(process.env.APPLE_WATCH_INFLUX_DATABASE)) { - await influxInstance.createDatabase(process.env.APPLE_WATCH_INFLUX_DATABASE) + }) + console.log(`InfluxDB '${url}' is now onboarded.`) + } else { + console.log(`InfluxDB '${url}' has been already onboarded.`) + } + console.log('\nFinished SUCCESS') + } catch (e) { + console.error(e) + console.log('\nFinished ERROR') } + // create a write API, expecting point timestamps in nanoseconds (can be also 's', 'ms', 'us') + const influxWriteApi = new InfluxDB({ url, token }).getWriteApi(org, bucket, 'ns') + + // setup default tags for all writes through this API + influxWriteApi.useDefaultTags({ location: hostname() }) + + return influxWriteApi +} - influx = influxInstance +async function closeInfluxInstance(influxWriteApi) { + // WriteApi always buffer data into batches to optimize data transfer to InfluxDB server. + // writeApi.flush() can be called to flush the buffered data. The data is always written + // asynchronously, Moreover, a failed write (caused by a temporary networking or server failure) + // is retried automatically. Read `writeAdvanced.js` for better explanation and details. + // + // close() flushes the remaining buffered data and then cancels pending retries. + try { + await influxWriteApi.close() + console.log('FINISHED ...') + } catch (e) { + console.error(e) + if (e instanceof HttpError && e.statusCode === 401) { + console.log('Run ./onboarding.js to setup a new InfluxDB database.') + } + console.log('\nFinished ERROR') + } } diff --git a/package-lock.json b/package-lock.json index e95d405..6a425b3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,8 @@ "version": "0.1.1", "license": "MIT", "dependencies": { + "@influxdata/influxdb-client": "^1.35.0", + "@influxdata/influxdb-client-apis": "^1.35.0", "influx": "5.9.3", "sax": "1.4.1", "unzipper": "0.12.3", @@ -713,6 +715,21 @@ "node": ">=14.21.3" } }, + "node_modules/@influxdata/influxdb-client": { + "version": "1.35.0", + "resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.35.0.tgz", + "integrity": "sha512-woWMi8PDpPQpvTsRaUw4Ig+nOGS/CWwAwS66Fa1Vr/EkW+NEwxI8YfPBsdBMn33jK2Y86/qMiiuX/ROHIkJLTw==", + "license": "MIT" + }, + "node_modules/@influxdata/influxdb-client-apis": { + "version": "1.35.0", + "resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.35.0.tgz", + "integrity": "sha512-+7h6smVPHYBge2rNKgYh/5k+SriYvPMsoJDfbUiQt1vJtpWnElwgBDLDl7Cr6d9XPC+FCI9GP4GQEMv7y8WxdA==", + "license": "MIT", + "peerDependencies": { + "@influxdata/influxdb-client": "*" + } + }, "node_modules/@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", diff --git a/package.json b/package.json index 5b845b4..57ff8ee 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,8 @@ "test": "jest" }, "dependencies": { + "@influxdata/influxdb-client": "^1.35.0", + "@influxdata/influxdb-client-apis": "^1.35.0", "influx": "5.9.3", "sax": "1.4.1", "unzipper": "0.12.3",