Skip to content

Commit

Permalink
Add support for downsampled buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
lr101 committed Sep 5, 2024
1 parent 548b296 commit d88e3f7
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 7 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ This service is run via docker:
INFLUXDB_URL=http://indluxdb:8086
INFLUXDB_TOKEN=<influx_token>
INFLUXDB_BUCKET=tempserver
INFLUXDB_DOWNSAMPLED_BUCKET=tempserver_sampled
INFLUXDB_ORG=<org>
INFLUXDB_RETENTION_PERIOD=7 # in days
# optional
MAX_THREADS=5
Expand All @@ -29,6 +31,27 @@ This service is run via docker:
```shell
docker compose up -d
```

### Influxdb

- Set up a bucket (ex. tempserver) and a bucket for down-sampling (ex. tempserver_sampled)
- Create an access token to allow access to read and write to these buckets
- Create an influx task to down-sample and run it every 24h to aggregate the last day:
```
option task = {name: "Aggregate Tempserver 24h", every: 24h}
data =
from(bucket: "tempserver")
|> range(start: -1d, stop: now())
|> filter(fn: (r) => r._measurement == "entry")
data
|> aggregateWindow(fn: mean, every: 1h)
|> to(bucket: "tempserver_sampled", org: "lr-projects")
```
- Setup the tempserver retention policy to delete values after a specific time period (to keep data size small)
- Make sure the `INFLUX_RETENTION_PERIOD` value in your .env is the same (or smaller) to have available data

## Arduino

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,18 @@ class InfluxDbConfiguration(private val influxProperties: InfluxProperties) {
null
}
}

@Bean
fun influxDBSampledClient(): InfluxDBClient? {
return if (influxProperties.enabled) {
InfluxDBClientFactory.create(
influxProperties.url,
influxProperties.token.toCharArray(),
influxProperties.org,
influxProperties.bucketSampled
)
} else {
null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class InfluxProperties {
lateinit var url: String
lateinit var token: String
lateinit var bucket: String
lateinit var bucketSampled: String
lateinit var org: String
var enabled: Boolean = false
var retentionPeriod: Int = 7
}
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
package de.lrprojects.tempserver.service.impl

import com.influxdb.client.InfluxDBClient
import com.influxdb.client.QueryApi
import com.influxdb.client.domain.WritePrecision
import com.influxdb.client.write.Point
import com.influxdb.exceptions.BadRequestException
import de.lrprojects.tempserver.config.InfluxProperties
import de.lrprojects.tempserver.entity.Entry
import de.lrprojects.tempserver.service.api.EntryService
import org.hibernate.query.sqm.tree.SqmNode.log
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Service
import java.time.OffsetDateTime
import java.time.ZoneOffset

@Service
class EntryServiceImpl(
private val influxDBClient: InfluxDBClient,
@Qualifier("influxDBClient") private val influxDBClient: InfluxDBClient,
@Qualifier("influxDBSampledClient") private val influxDBSampledClient: InfluxDBClient,
private val influxProperties: InfluxProperties
): EntryService {

override fun getEntries(sensorId: String, date1: OffsetDateTime?, date2: OffsetDateTime?, limit: Int?, interval: Int?): List<Entry> {
val toDate = date1?.toString() ?: "now()"
val fromDate = date2?.toString() ?: "0"

val selectedBucket = selectBucket(date2)
val bucketName = if (selectedBucket) influxProperties.bucket else influxProperties.bucketSampled
val bucketQueryApi = if (selectedBucket) influxDBClient.queryApi else influxDBSampledClient.queryApi

val query = if (limit != null) {
getLimit(influxProperties.bucket, sensorId, toDate, fromDate, limit)
getLimit(bucketName, sensorId, toDate, fromDate, limit)
} else if (interval != null) {
aggregatedMean(influxProperties.bucket, sensorId, toDate, fromDate, interval)
aggregatedMean(bucketName, sensorId, toDate, fromDate, interval)
} else {
getEntries(influxProperties.bucket, sensorId, toDate, fromDate)
getEntries(bucketName, sensorId, toDate, fromDate)
}

val queryApi = influxDBClient.queryApi
try {
log.info("Run getEntries query: $query")
val tables = queryApi.query(query)
val tables = bucketQueryApi.query(query)
return tables.flatMap { table ->
table.records.map {
Entry(
Expand Down Expand Up @@ -69,6 +77,11 @@ class EntryServiceImpl(
influxDBClient.queryApi.query(deleteQuery)
}


private fun selectBucket(date2: OffsetDateTime?): Boolean {
return date2 != null && OffsetDateTime.now().minusDays(influxProperties.retentionPeriod.toLong()).isBefore(date2)
}

companion object {
fun aggregatedMean(bucket: String, sensorId: String, toDate: String, fromDate: String, interval: Int) = """
from(bucket: "$bucket")
Expand Down
4 changes: 3 additions & 1 deletion src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ influxdb:
token: ${INFLUXDB_TOKEN}
bucket: ${INFLUXDB_BUCKET}
org: ${INFLUXDB_ORG}
enabled: true
enabled: true
bucket-sampled: ${INFLUXDB_DOWNSAMPLED_BUCKET}
retention-period: ${INFLUXDB_RETENTION_PERIOD}

0 comments on commit d88e3f7

Please sign in to comment.