From ca9e72300691a6b0a72b0a21ab2f9c5e925304f0 Mon Sep 17 00:00:00 2001 From: Yann Bertrand <5855339+yannbertrand@users.noreply.github.com> Date: Tue, 22 Oct 2024 18:45:19 +0200 Subject: [PATCH] =?UTF-8?q?=E2=AC=86=EF=B8=8F=20=20Bump=20InfluxDB=20code?= =?UTF-8?q?=20to=20v2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Gudsfile --- .env | 6 +- docker-compose.yml | 3 + .../provisioning/datasources/datasource.yml | 5 +- lib/controller.js | 9 +- lib/handler.js | 2 +- lib/influx.js | 117 +++++++++++------- package-lock.json | 17 +++ package.json | 2 + 8 files changed, 114 insertions(+), 47 deletions(-) diff --git a/.env b/.env index 3fa6e90..7a7e288 100644 --- a/.env +++ b/.env @@ -1,2 +1,6 @@ -APPLE_WATCH_INFLUX_DATABASE="apple_watch" +APPLE_WATCH_INFLUX_BUCKET="apple_watch" +APPLE_WATCH_INFLUX_ORG="my_org" +APPLE_WATCH_INFLUX_USERNAME="my_user" +APPLE_WATCH_INFLUX_PASSWORD="my_password" +APPLE_WATCH_INFLUX_TOKEN="my_token" APPLE_WATCH_INFLUX_MEASUREMENT="workouts" \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 1b1958e..b198da1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,6 +14,9 @@ services: container_name: apple_watch_grafana ports: - "3000:3000" + environment: + - APPLE_WATCH_INFLUX_BUCKET=$APPLE_WATCH_INFLUX_BUCKET + - APPLE_WATCH_INFLUX_TOKEN=$APPLE_WATCH_INFLUX_TOKEN links: - influxdb volumes: diff --git a/grafana/provisioning/datasources/datasource.yml b/grafana/provisioning/datasources/datasource.yml index 3234081..7bf1d87 100644 --- a/grafana/provisioning/datasources/datasource.yml +++ b/grafana/provisioning/datasources/datasource.yml @@ -10,9 +10,12 @@ datasources: access: proxy orgId: 1 url: http://apple_watch_influxdb:8086 - database: apple_watch + database: $APPLE_WATCH_INFLUX_BUCKET jsonData: + httpHeaderName1: 'Authorization' timeInterval: 1d + secureJsonData: + httpHeaderValue1: 'Token $APPLE_WATCH_INFLUX_TOKEN' isDefault: true editable: true options: diff --git a/lib/controller.js b/lib/controller.js index 354a802..feda709 100644 --- a/lib/controller.js +++ b/lib/controller.js @@ -6,9 +6,16 @@ const appleWatchWorkoutParser = require('./parser') const writeInfluxPoints = require('./influx') module.exports.pushArchiveDataToInfluxDB = (archivePath) => { + console.log('Exploring your data. It will take some time.') + fs.createReadStream(archivePath) .pipe(unzipper.ParseOne('export.xml')) - .on('entry', () => console.log('Exploring your data. It will take some time, please wait...')) + .on('entry', () => console.log('Please wait...\n')) .pipe(appleWatchWorkoutParser(new Handler())) .pipe(writeInfluxPoints()) + .on('finish', () => + console.log( + "All done! You're ready to open http://localhost:3000/d/apple-watch-workouts/year-dashboard?orgId=1\n" + ) + ) } diff --git a/lib/handler.js b/lib/handler.js index c749104..4b0d6e7 100644 --- a/lib/handler.js +++ b/lib/handler.js @@ -26,7 +26,7 @@ module.exports = class Handler { onEnd() { console.log( - `Found ${this._workoutsIds.size} workouts and ${this._nbOfDuplicateWorkouts} duplicates` + `\nFound ${this._workoutsIds.size} workouts and ${this._nbOfDuplicateWorkouts} duplicates` ) } diff --git a/lib/influx.js b/lib/influx.js index 69f3daa..595d225 100644 --- a/lib/influx.js +++ b/lib/influx.js @@ -1,67 +1,98 @@ -const Influx = require('influx') +const { InfluxDB, Point } = require('@influxdata/influxdb-client') +const { SetupAPI } = require('@influxdata/influxdb-client-apis') +const { hostname } = require('node:os') const { Writable } = require('node:stream') -let influx +let influxWriteApi +;(async () => { + influxWriteApi = await loadInfluxInstance() +})() + const stream = new Writable({ async write(chunk, encoding, callback) { try { - await loadInfluxInstance() - const currentWorkout = JSON.parse(chunk.toString()) - const influxPoints = [currentWorkout].map((workout) => ({ - measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT, - tags: { type: workout.type, date: workout.startDate.substring(0, 10) }, - fields: { - duration: workout.duration, - totalDistance: workout.totalDistance, - totalEnergyBurned: workout.totalEnergyBurned, - sourceName: workout.sourceName, - sourceVersion: workout.sourceVersion, - }, - timestamp: new Date(workout.startDate), - })) + const influxPoints = [currentWorkout].map((workout) => + new Point(process.env.APPLE_WATCH_INFLUX_MEASUREMENT) + .tag('type', workout.type) + .tag('date', workout.startDate.substring(0, 10)) + .tag('sourceName', workout.sourceName) + .tag('sourceVersion', workout.sourceVersion) + .floatField('duration', workout.duration) + .floatField('totalDistance', workout.totalDistance) + .floatField('totalEnergyBurned', workout.totalEnergyBurned) + .timestamp(new Date(workout.startDate)) + ) - await influx.writePoints(influxPoints) + influxWriteApi.writePoints(influxPoints) } catch (error) { - console.error(error) + console.log(`InfluxDB writing error: ${error.message}`) } callback() }, }) +stream.on('close', () => { + closeInfluxInstance(influxWriteApi) +}) + module.exports = () => { return stream } -const loadInfluxInstance = async () => { - if (influx !== undefined) { - return - } +async function loadInfluxInstance() { + const url = 'http://localhost:8086' + const org = process.env.APPLE_WATCH_INFLUX_ORG + const bucket = process.env.APPLE_WATCH_INFLUX_BUCKET + const token = process.env.APPLE_WATCH_INFLUX_TOKEN + const username = process.env.APPLE_WATCH_INFLUX_USERNAME + const password = process.env.APPLE_WATCH_INFLUX_PASSWORD - const influxInstance = new Influx.InfluxDB({ - host: 'localhost', - database: process.env.APPLE_WATCH_INFLUX_DATABASE, - schema: [ - { - measurement: process.env.APPLE_WATCH_INFLUX_MEASUREMENT, - tags: ['type', 'date'], - fields: { - duration: Influx.FieldType.FLOAT, - totalDistance: Influx.FieldType.FLOAT, - totalEnergyBurned: Influx.FieldType.FLOAT, - sourceName: Influx.FieldType.STRING, - sourceVersion: Influx.FieldType.STRING, - }, - }, - ], - }) + const influx = new InfluxDB({ url, token }) + const setupApi = new SetupAPI(influx) - const databasesNames = await influxInstance.getDatabaseNames() - if (!databasesNames.includes(process.env.APPLE_WATCH_INFLUX_DATABASE)) { - await influxInstance.createDatabase(process.env.APPLE_WATCH_INFLUX_DATABASE) + try { + const { allowed } = await setupApi.getSetup() + if (allowed) { + await setupApi.postSetup({ + body: { + org, + bucket, + username, + password, + token, + }, + }) + console.log(`InfluxDB '${url}' is now onboarded.`) + } else { + console.debug(`InfluxDB '${url}' is ready.`) + } + } catch (error) { + console.error(error) + console.log('\nInfluxDB setup ERROR') } - influx = influxInstance + const influxWriteApi = influx.getWriteApi(org, bucket, 's') + influxWriteApi.useDefaultTags({ location: hostname() }) + + return influxWriteApi +} + +async function closeInfluxInstance(influxWriteApi) { + // WriteApi always buffer data into batches to optimize data transfer to InfluxDB server. + // writeApi.flush() can be called to flush the buffered data. The data is always written + // asynchronously, Moreover, a failed write (caused by a temporary networking or server failure) + // is retried automatically. + // + // close() flushes the remaining buffered data and then cancels pending retries. + + try { + await influxWriteApi.close() + console.debug('InfluxDB connection closed successfully.') + } catch (error) { + console.error(error) + console.log('\nInfluxDB connection closing ERROR') + } } diff --git a/package-lock.json b/package-lock.json index e95d405..6a425b3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,6 +9,8 @@ "version": "0.1.1", "license": "MIT", "dependencies": { + "@influxdata/influxdb-client": "^1.35.0", + "@influxdata/influxdb-client-apis": "^1.35.0", "influx": "5.9.3", "sax": "1.4.1", "unzipper": "0.12.3", @@ -713,6 +715,21 @@ "node": ">=14.21.3" } }, + "node_modules/@influxdata/influxdb-client": { + "version": "1.35.0", + "resolved": "https://registry.npmjs.org/@influxdata/influxdb-client/-/influxdb-client-1.35.0.tgz", + "integrity": "sha512-woWMi8PDpPQpvTsRaUw4Ig+nOGS/CWwAwS66Fa1Vr/EkW+NEwxI8YfPBsdBMn33jK2Y86/qMiiuX/ROHIkJLTw==", + "license": "MIT" + }, + "node_modules/@influxdata/influxdb-client-apis": { + "version": "1.35.0", + "resolved": "https://registry.npmjs.org/@influxdata/influxdb-client-apis/-/influxdb-client-apis-1.35.0.tgz", + "integrity": "sha512-+7h6smVPHYBge2rNKgYh/5k+SriYvPMsoJDfbUiQt1vJtpWnElwgBDLDl7Cr6d9XPC+FCI9GP4GQEMv7y8WxdA==", + "license": "MIT", + "peerDependencies": { + "@influxdata/influxdb-client": "*" + } + }, "node_modules/@istanbuljs/load-nyc-config": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@istanbuljs/load-nyc-config/-/load-nyc-config-1.1.0.tgz", diff --git a/package.json b/package.json index 5b845b4..57ff8ee 100644 --- a/package.json +++ b/package.json @@ -13,6 +13,8 @@ "test": "jest" }, "dependencies": { + "@influxdata/influxdb-client": "^1.35.0", + "@influxdata/influxdb-client-apis": "^1.35.0", "influx": "5.9.3", "sax": "1.4.1", "unzipper": "0.12.3",