Skip to content

Commit 8bbcc8e

Browse files
authored
Merge pull request #16 from florian-tirard/bugfix/stream-fetch-size
fix: Use param fetchSize in stream query method
2 parents a4cb86a + a8072f5 commit 8bbcc8e

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

jooq-async-jdbc/src/main/java/fr/maif/jooq/jdbc/JdbcPgAsyncTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public CompletionStage<Tuple0> rollback() {
6969
@Override
7070
public <Q extends Record> Publisher<QueryResult> stream(Integer fetchSize, Function<DSLContext, ? extends ResultQuery<Q>> queryFunction) {
7171
return Flux
72-
.fromIterable(() -> queryFunction.apply(client).stream().iterator())
72+
.fromIterable(() -> queryFunction.apply(client).fetchSize(fetchSize).stream().iterator())
7373
.publishOn(Schedulers.parallel())
7474
.map(JooqQueryResult::new);
7575
}

jooq-async-reactive/src/main/java/fr/maif/jooq/reactive/ReactivePgAsyncTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public <Q extends Record> Flux<QueryResult> stream(Integer fetchSize, Function<D
6060
cursor -> Mono.just(List.<QueryResult>empty())
6161
.expand(results -> {
6262
if (first.getAndSet(false) || cursor.hasMore()) {
63-
return Mono.fromCompletionStage(cursor.read(500).map(rs ->
63+
return Mono.fromCompletionStage(cursor.read(fetchSize).map(rs ->
6464
List.ofAll(rs)
6565
.map(ReactiveRowQueryResult::new)
6666
.map(r -> (QueryResult)r)

0 commit comments

Comments
 (0)