Skip to content

Commit a3d81cf

Browse files
authored
feat: add Kotlin WriteApi (#222)
1 parent 13ab60b commit a3d81cf

File tree

26 files changed

+5065
-117
lines changed

26 files changed

+5065
-117
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,14 @@ You have to replace your dependency from: `influxdb-client-scala` to:
1414
1. [#218](https://github.com/influxdata/influxdb-client-java/pull/218): Supports enum types in mapping into POJO
1515
1. [#220](https://github.com/influxdata/influxdb-client-java/pull/220): Create client supporting OSGi environments
1616
1. [#221](https://github.com/influxdata/influxdb-client-java/pull/221): Add feature definition and documentation for Apache Karaf support
17+
1. [#222](https://github.com/influxdata/influxdb-client-java/pull/221): Add `Kotlin` WriteApi
1718

19+
### Dependencies
20+
1. [#222](https://github.com/influxdata/influxdb-client-csharp/pull/222): Update dependencies:
21+
- Kotlin to 1.4.32
22+
1. [#222](https://github.com/influxdata/influxdb-client-csharp/pull/222): Update plugins:
23+
- dokka-maven-plugin to 1.4.30
24+
1825
## 2.1.0 [2021-04-01]
1926

2027
### Bug Fixes

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ The Java, Reactive, OSGi, Kotlin and Scala clients are implemented for the Influ
4848
| --- | --- | --- | --- |
4949
| **[java](./client)** | The reference Java client that allows query, write and InfluxDB 2.0 management. | [javadoc](https://influxdata.github.io/influxdb-client-java/influxdb-client-java/apidocs/index.html), [readme](./client#influxdb-client-java/)| 2.0 |
5050
| **[reactive](./client-reactive)** | The reference RxJava client for the InfluxDB 2.0 that allows query and write in a reactive way.| [javadoc](https://influxdata.github.io/influxdb-client-java/influxdb-client-java/apidocs/index.html), [readme](./client#influxdb-client-java/) |2.0 |
51-
| **[kotlin](./client-kotlin)** | The reference Kotlin client that allows query and write for the InfluxDB 2.0 by [Kotlin Channel coroutines](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html). | [KDoc](https://influxdata.github.io/influxdb-client-java/influxdb-client-kotlin/dokka/influxdb-client-kotlin/com.influxdb.client.kotlin/index.html), [readme](./client-kotlin#influxdb-client-kotlin/) | 2.0|
51+
| **[kotlin](./client-kotlin)** | The reference Kotlin client that allows query and write for the InfluxDB 2.0 by Kotlin [Channel](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/index.html) and [Flow](https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-flow/index.html) coroutines. | [KDoc](https://influxdata.github.io/influxdb-client-java/influxdb-client-kotlin/dokka/influxdb-client-kotlin/com.influxdb.client.kotlin/index.html), [readme](./client-kotlin#influxdb-client-kotlin/) | 2.0|
5252
| **[scala](./client-scala)** | The reference Scala client that allows query and write for the InfluxDB 2.0 by [Akka Streams](https://doc.akka.io/docs/akka/2.6/stream/). | [Scaladoc](https://influxdata.github.io/influxdb-client-java/client-scala/cross/influxdb-client-scala_2.13/scaladocs/com/influxdb/client/scala/index.html), [readme](./client-scala#influxdb-client-scala/) | 2.0 |
5353
| **[osgi](./client-osgi)** | The reference OSGi (R6) client embedding Java and reactive clients and providing standard features (declarative services, configuration, event processing) for the InfluxDB 2.0. | [javadoc](https://influxdata.github.io/influxdb-client-java/influxdb-client-osgi/apidocs/index.html), [readme](./client-osgi) | 2.0 |
5454
| **[karaf](./karaf)** | The Apache Karaf feature definition for the InfluxDB 2.0. | [readme](./karaf) | 2.0 |

client-kotlin/README.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The reference Kotlin client that allows query and write for the InfluxDB 2.0 by
77
## Features
88

99
- [Querying data using Flux language](#queries)
10+
- [Writing data](#writes)
1011
- [Advanced Usage](#advanced-usage)
1112

1213
## Queries
@@ -78,6 +79,89 @@ fun main(args: Array<String>) = runBlocking {
7879
}
7980
```
8081

82+
## Writes
83+
84+
The [WriteKotlinApi](https://influxdata.github.io/influxdb-client-java/influxdb-client-kotlin/dokka/influxdb-client-kotlin/com.influxdb.client.kotlin/-write-kotlin-api/index.html) supports ingest data by:
85+
- `DataPoint`
86+
- `LineProtocol`
87+
- `Data class`
88+
- List of above items
89+
90+
The following example shows how to use various type of data:
91+
92+
```kotlin
93+
package example
94+
95+
import com.influxdb.annotations.Column
96+
import com.influxdb.annotations.Measurement
97+
import com.influxdb.client.domain.WritePrecision
98+
import com.influxdb.client.kotlin.InfluxDBClientKotlinFactory
99+
import com.influxdb.client.write.Point
100+
import kotlinx.coroutines.flow.collect
101+
import kotlinx.coroutines.flow.consumeAsFlow
102+
import kotlinx.coroutines.runBlocking
103+
import java.time.Instant
104+
105+
fun main() = runBlocking {
106+
107+
val org = "my-org"
108+
val bucket = "my-bucket"
109+
110+
//
111+
// Initialize client
112+
//
113+
val client = InfluxDBClientKotlinFactory
114+
.create("http://localhost:8086", "my-token".toCharArray(), org, bucket)
115+
116+
val writeApi = client.getWriteKotlinApi()
117+
118+
//
119+
// Write by Data Point
120+
//
121+
val point = Point.measurement("temperature")
122+
.addTag("location", "west")
123+
.addField("value", 55.0)
124+
.time(Instant.now().toEpochMilli(), WritePrecision.MS)
125+
126+
writeApi.writePoint(point)
127+
128+
//
129+
// Write by LineProtocol
130+
//
131+
writeApi.writeRecord("temperature,location=north value=60.0", WritePrecision.NS)
132+
133+
//
134+
// Write by DataClass
135+
//
136+
val temperature = Temperature("south", 62.0, Instant.now())
137+
138+
writeApi.writeMeasurement(temperature, WritePrecision.NS)
139+
140+
//
141+
// Query results
142+
//
143+
val fluxQuery =
144+
"""from(bucket: "$bucket") |> range(start: 0) |> filter(fn: (r) => (r["_measurement"] == "temperature"))"""
145+
146+
client
147+
.getQueryKotlinApi()
148+
.query(fluxQuery)
149+
.consumeAsFlow()
150+
.collect { println("Measurement: ${it.measurement}, value: ${it.value}") }
151+
152+
client.close()
153+
}
154+
155+
@Measurement(name = "temperature")
156+
data class Temperature(
157+
@Column(tag = true) val location: String,
158+
@Column val value: Double,
159+
@Column(timestamp = true) val time: Instant
160+
)
161+
162+
```
163+
* sources - [KotlinWriteApi.kt](../examples/src/main/java/example/KotlinWriteApi.kt)
164+
81165
## Advanced Usage
82166

83167
### Client configuration file

client-kotlin/pom.xml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535

3636
<name>The Kotlin InfluxDB 2.0 Client</name>
3737
<description>
38-
The reference Kotlin client that allows query and write for the InfluxDB 2.0 by Kotlin Channel coroutines.
38+
The reference Kotlin client that allows query and write for the InfluxDB 2.0
39+
by Kotlin Channel and Flow coroutines.
3940
</description>
4041

4142
<url>https://github.com/influxdata/influxdb-client-java/tree/master/client-kotlin</url>
@@ -107,7 +108,7 @@
107108
<plugin>
108109
<groupId>org.jetbrains.dokka</groupId>
109110
<artifactId>dokka-maven-plugin</artifactId>
110-
<version>0.10.1</version>
111+
<version>1.4.30</version>
111112
<executions>
112113
<execution>
113114
<id>dokka-pre-site</id>

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/InfluxDBClientKotlin.kt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,13 @@ interface InfluxDBClientKotlin : Closeable {
3939
*/
4040
fun getQueryKotlinApi() : QueryKotlinApi
4141

42+
/**
43+
* Get the Write client.
44+
*
45+
* @return the new client instance for the Write API
46+
*/
47+
fun getWriteKotlinApi() : WriteKotlinApi
48+
4249
/**
4350
* Get the health of an instance.
4451
*

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/QueryKotlinApi.kt

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -35,47 +35,52 @@ import kotlinx.coroutines.channels.Channel
3535
interface QueryKotlinApi {
3636

3737
/**
38-
* Executes the Flux query against the InfluxDB and asynchronously stream [FluxRecord]s to [Channel].
38+
* Executes the Flux query against the InfluxDB and asynchronously stream
39+
* [com.influxdb.query.FluxRecord]s to [kotlinx.coroutines.channels.Channel].
3940
*
40-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
41+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
4142
*
4243
* @param query the flux query to execute
43-
* @return the stream of [FluxRecord]s
44+
* @return the stream of [com.influxdb.query.FluxRecord]s
4445
*/
4546
fun query(query: String): Channel<FluxRecord>
4647

4748
/**
48-
* Executes the Flux query against the InfluxDB and asynchronously stream [FluxRecord]s to [Channel].
49+
* Executes the Flux query against the InfluxDB and asynchronously stream
50+
* [com.influxdb.query.FluxRecord]s to [kotlinx.coroutines.channels.Channel].
4951
*
5052
* @param query the flux query to execute
5153
* @param org specifies the source organization
52-
* @return the stream of [FluxRecord]s
54+
* @return the stream of [com.influxdb.query.FluxRecord]s
5355
*/
5456
fun query(query: String, org: String): Channel<FluxRecord>
5557

5658
/**
57-
* Executes the Flux query against the InfluxDB and asynchronously stream [FluxRecord]s to [Channel].
59+
* Executes the Flux query against the InfluxDB and asynchronously stream
60+
* [com.influxdb.query.FluxRecord]s to [kotlinx.coroutines.channels.Channel].
5861
*
59-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
62+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
6063
*
6164
* @param query the flux query to execute
62-
* @return the stream of [FluxRecord]s
65+
* @return the stream of [com.influxdb.query.FluxRecord]s
6366
*/
6467
fun query(query: Query): Channel<FluxRecord>
6568

6669
/**
67-
* Executes the Flux query against the InfluxDB and asynchronously stream [FluxRecord]s to [Channel].
70+
* Executes the Flux query against the InfluxDB and asynchronously stream
71+
* [com.influxdb.query.FluxRecord]s to [kotlinx.coroutines.channels.Channel].
6872
*
6973
* @param query the flux query to execute
7074
* @param org specifies the source organization
71-
* @return the stream of [FluxRecord]s
75+
* @return the stream of [com.influxdb.query.FluxRecord]s
7276
*/
7377
fun query(query: Query, org: String): Channel<FluxRecord>
7478

7579
/**
76-
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to [Channel].
80+
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to
81+
* [kotlinx.coroutines.channels.Channel].
7782
*
78-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
83+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
7984
*
8085
* @param query the flux query to execute
8186
* @param <M> the type of the measurement (POJO)
@@ -84,7 +89,8 @@ interface QueryKotlinApi {
8489
fun <M> query(query: String, measurementType: Class<M>): Channel<M>
8590

8691
/**
87-
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to [Channel].
92+
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to
93+
* [kotlinx.coroutines.channels.Channel].
8894
*
8995
* @param query the flux query to execute
9096
* @param org specifies the source organization
@@ -94,9 +100,10 @@ interface QueryKotlinApi {
94100
fun <M> query(query: String, org: String, measurementType: Class<M>): Channel<M>
95101

96102
/**
97-
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to [Channel].
103+
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to
104+
* [kotlinx.coroutines.channels.Channel].
98105
*
99-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
106+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
100107
*
101108
* @param query the flux query to execute
102109
* @param <M> the type of the measurement (POJO)
@@ -105,7 +112,8 @@ interface QueryKotlinApi {
105112
fun <M> query(query: Query, measurementType: Class<M>): Channel<M>
106113

107114
/**
108-
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to [Channel].
115+
* Executes the Flux query against the InfluxDB and asynchronously stream measurements to
116+
* [kotlinx.coroutines.channels.Channel].
109117
*
110118
* @param query the flux query to execute
111119
* @param org specifies the source organization
@@ -115,17 +123,19 @@ interface QueryKotlinApi {
115123
fun <M> query(query: Query, org: String, measurementType: Class<M>): Channel<M>
116124

117125
/**
118-
* Executes the Flux query against the InfluxDB and asynchronously stream response to [Channel].
126+
* Executes the Flux query against the InfluxDB and asynchronously stream response to
127+
* [kotlinx.coroutines.channels.Channel].
119128
*
120-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
129+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
121130
*
122131
* @param query the flux query to execute
123132
* @return the response stream
124133
*/
125134
fun queryRaw(query: String): Channel<String>
126135

127136
/**
128-
* Executes the Flux query against the InfluxDB and asynchronously stream response to [Channel].
137+
* Executes the Flux query against the InfluxDB and asynchronously stream response to
138+
* [kotlinx.coroutines.channels.Channel].
129139
*
130140
* @param query the flux query to execute
131141
* @param org specifies the source organization
@@ -134,9 +144,10 @@ interface QueryKotlinApi {
134144
fun queryRaw(query: String, org: String): Channel<String>
135145

136146
/**
137-
* Executes the Flux query against the InfluxDB and asynchronously stream response to [Channel].
147+
* Executes the Flux query against the InfluxDB and asynchronously stream response to
148+
* [kotlinx.coroutines.channels.Channel].
138149
*
139-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
150+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
140151
*
141152
* @param query the flux query to execute
142153
* @param dialect Dialect is an object defining the options to use when encoding the response.
@@ -146,7 +157,8 @@ interface QueryKotlinApi {
146157
fun queryRaw(query: String, dialect: Dialect): Channel<String>
147158

148159
/**
149-
* Executes the Flux query against the InfluxDB and asynchronously stream response to [Channel].
160+
* Executes the Flux query against the InfluxDB and asynchronously stream response to
161+
* [kotlinx.coroutines.channels.Channel].
150162
*
151163
* @param query the flux query to execute
152164
* @param org specifies the source organization
@@ -157,17 +169,19 @@ interface QueryKotlinApi {
157169
fun queryRaw(query: String, dialect: Dialect, org: String): Channel<String>
158170

159171
/**
160-
* Executes the Flux query against the InfluxDB and asynchronously stream response to [Channel].
172+
* Executes the Flux query against the InfluxDB and asynchronously stream response to
173+
* [kotlinx.coroutines.channels.Channel].
161174
*
162-
* <p>The {@link InfluxDBClientOptions#getOrg()} will be used as source organization.</p>
175+
* The [com.influxdb.client.InfluxDBClientOptions.getOrg] will be used as source organization.
163176
*
164177
* @param query the flux query to execute
165178
* @return the response stream
166179
*/
167180
fun queryRaw(query: Query): Channel<String>
168181

169182
/**
170-
* Executes the Flux query against the InfluxDB and asynchronously stream response to [Channel].
183+
* Executes the Flux query against the InfluxDB and asynchronously stream response to
184+
* [kotlinx.coroutines.channels.Channel].
171185
*
172186
* @param query the flux query to execute
173187
* @param org specifies the source organization

0 commit comments

Comments
 (0)