Skip to content

Commit a554719

Browse files
committed
implement toPipeline()
1 parent 45b2ab2 commit a554719

File tree

4 files changed

+93
-1
lines changed

4 files changed

+93
-1
lines changed

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,3 +135,38 @@ val id = conn.select("SELECT * FROM USER WHERE ID = 1").blockingFirst { it.getIn
135135

136136
println(id)
137137
```
138+
139+
## Allowing Choice of Sequence, Observable, or Flowable
140+
141+
This library supports emitting `T` items built off a `ResultSet` in the form of:
142+
143+
* `Sequence<T>`
144+
* `Observable<T>`
145+
* `Flowable<T>`
146+
* `Single<T>`
147+
* `Maybe<T>`
148+
149+
If you are building an API, it may be handy to allow the user of the API to choose the means in which to receive the results.
150+
151+
The `toPipeline()` function will allow mapping the `ResultSet` to `T` items, but defer to the API user how to receive the results.
152+
153+
154+
```kotlin
155+
156+
data class User(val userName: String, val password: String)
157+
158+
fun getUsers() = conn.select("SELECT * FROM USER")
159+
.toPipeline {
160+
User(it.getString("USERNAME"), it.getString("PASSWORD"))
161+
}
162+
163+
fun main(args: Array<String>) {
164+
165+
getUsers().asFlowable().subscribe { println("Receiving $it via Flowable") }
166+
167+
getUsers().asObservable().subscribe { println("Receiving $it via Observable") }
168+
169+
getUsers().asSequence().forEach { println("Receiving $it via Sequence") }
170+
}
171+
172+
```

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ signing {
4646

4747
group = "org.nield"
4848
archivesBaseName = "rxkotlin-jdbc"
49-
version = "0.2.5"
49+
version = "0.2.6"
5050

5151
uploadArchives {
5252
repositories {

src/main/kotlin/org/nield/rxkotlinjdbc/JdbcExtensions.kt

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class SelectOperation(
171171

172172
fun toCompletable() = toFlowable { Unit }.ignoreElements()
173173

174+
fun <T: Any> toPipeline(mapper: (ResultSet) -> T) = Pipeline(this, mapper = mapper)
175+
174176
fun <T : Any> toSequence(mapper: (ResultSet) -> T): ResultSetSequence<T> {
175177
val cps = builder.toPreparedStatement()
176178
return ResultSetState({ cps.ps.executeQuery() }, cps.ps, cps.conn, autoClose).toSequence(mapper)
@@ -241,6 +243,8 @@ class InsertOperation(
241243

242244
fun toCompletable() = toFlowable { Unit }.ignoreElements()
243245

246+
fun <T: Any> toPipeline(mapper: (ResultSet) -> T) = Pipeline(insertOperation = this, mapper = mapper)
247+
244248
fun <T : Any> toSequence(mapper: (ResultSet) -> T): ResultSetSequence<T> {
245249
val cps = builder.toPreparedStatement()
246250
return ResultSetState({
@@ -390,6 +394,20 @@ class QueryIterator<out T>(val qs: ResultSetState,
390394
}
391395
}
392396

397+
398+
class Pipeline<T: Any>(val selectOperation: SelectOperation? = null,
399+
val insertOperation: InsertOperation? = null,
400+
val mapper: (ResultSet) -> T
401+
) {
402+
fun toObservable(): Observable<T> = selectOperation?.toObservable(mapper) ?: insertOperation?.toObservable(mapper) ?: throw Exception("Operation must be provided")
403+
fun toFlowable(): Flowable<T> = selectOperation?.toFlowable(mapper) ?: insertOperation?.toFlowable(mapper) ?: throw Exception("Operation must be provided")
404+
fun toSingle(): Single<T> = selectOperation?.toSingle(mapper) ?: insertOperation?.toSingle(mapper) ?: throw Exception("Operation must be provided")
405+
fun toMaybe(): Maybe<T> = selectOperation?.toMaybe(mapper) ?: insertOperation?.toMaybe(mapper) ?: throw Exception("Operation must be provided")
406+
fun toSequence(): ResultSetSequence<T> = selectOperation?.toSequence(mapper) ?: insertOperation?.toSequence(mapper) ?: throw Exception("Operation must be provided")
407+
fun blockingFirst() = selectOperation?.blockingFirst(mapper) ?: insertOperation?.blockingFirst(mapper) ?: throw Exception("Operation must be provided")
408+
fun blockingFirstOrNull() = selectOperation?.blockingFirstOrNull(mapper) ?: insertOperation?.blockingFirstOrNull(mapper) ?: throw Exception("Operation must be provided")
409+
}
410+
393411
fun PreparedStatement.processParameters(v: Array<out Any?>) = v.forEachIndexed { i,v2 -> processParameter(i,v2)}
394412

395413
fun PreparedStatement.processParameter(pos: Int, argVal: Any?) {

src/test/kotlin/JdbcExtensionsTest.kt

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.reactivex.observers.TestObserver
44
import io.reactivex.subscribers.TestSubscriber
55
import junit.framework.Assert.assertFalse
66
import junit.framework.Assert.assertTrue
7+
import org.junit.Assert
78
import org.junit.Test
89
import org.nield.rxkotlinjdbc.*
910
import java.sql.DriverManager
@@ -365,4 +366,42 @@ class DatabaseTest {
365366

366367
testObserver.assertValues(mapOf("ID" to 1, "USERNAME" to "thomasnield", "PASSWORD" to "password123"))
367368
}
369+
370+
@Test
371+
fun pipelineTest() {
372+
val conn = connectionFactory()
373+
374+
data class User(val userName: String, val password: String)
375+
376+
val testObserver = TestObserver<User>()
377+
val testSubscriber = TestSubscriber<User>()
378+
379+
val pipeline = conn.select("SELECT * FROM USER")
380+
.toPipeline {
381+
User(it.getString("USERNAME"), it.getString("PASSWORD"))
382+
}
383+
384+
385+
pipeline.toObservable().subscribe(testObserver)
386+
387+
testObserver.assertValues(
388+
User("thomasnield","password123"),
389+
User("bobmarshal","batman43")
390+
)
391+
392+
393+
pipeline.toFlowable().subscribe(testSubscriber)
394+
395+
testSubscriber.assertValues(
396+
User("thomasnield","password123"),
397+
User("bobmarshal","batman43")
398+
)
399+
400+
Assert.assertTrue(
401+
pipeline.toSequence().toSet() == setOf(
402+
User("thomasnield","password123"),
403+
User("bobmarshal","batman43")
404+
)
405+
)
406+
}
368407
}

0 commit comments

Comments
 (0)