Skip to content

Commit

Permalink
refactor: ReactiveQuery增加copy方法.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Nov 5, 2024
1 parent f951154 commit 423062e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 54 deletions.
4 changes: 2 additions & 2 deletions hsweb-easy-orm-rdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@
</dependency>

<dependency>
<groupId>dev.miku</groupId>
<groupId>io.asyncer</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
<version>0.9.7</version>
<optional>true</optional>
</dependency>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ public interface ReactiveQuery<T> extends DSLQuery<ReactiveQuery<T>> {
* @return 如果未查询到结果将返回{@link Mono#empty()}
*/
Mono<T> fetchOne();

/**
* 复制当前查询对象,在需要多次设置查询条件时,可以使用此方法复制一个新的查询对象
*
* @return 新的查询对象
* @since 4.1.3
*/
default ReactiveQuery<T> copy() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,78 +41,86 @@ public DefaultReactiveQuery(TableOrViewMetadata tableMetadata,
this.context = context;
}

@Override
public ReactiveQuery<T> copy() {
DefaultReactiveQuery<T> copy = new DefaultReactiveQuery<>(
tableMetadata, columnMapping, operator, wrapper, context);
copy.param = param.clone();
return copy;
}

@Override
public Flux<T> fetch() {
return this
.doFetch(operator.query(tableMetadata),
"fetch",
(_queryOperator) -> _queryOperator
.context(param.getContext())
.select(getSelectColumn())
.where(param.getTerms())
.orderBy(getSortOrder())
.when(param.isPaging(), query -> query.paging(param.getPageIndex(), param.getPageSize()))
.when(param.isForUpdate(), QueryOperator::forUpdate)
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetch")))
.reactive())
.contextWrite(context);
.doFetch(operator.query(tableMetadata),
"fetch",
(_queryOperator) -> _queryOperator
.context(param.getContext())
.select(getSelectColumn())
.where(param.getTerms())
.orderBy(getSortOrder())
.when(param.isPaging(), query -> query.paging(param.getPageIndex(), param.getPageSize()))
.when(param.isForUpdate(), QueryOperator::forUpdate)
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetch")))
.reactive())
.contextWrite(context);
}

@Override
public Mono<T> fetchOne() {
return this
.doFetch(operator.query(tableMetadata),
"fetchOne",
(_queryOperator) -> _queryOperator
.context(param.getContext())
.select(getSelectColumn())
.where(param.getTerms())
.orderBy(getSortOrder())
//.paging(0, 1)
.when(param.isForUpdate(), QueryOperator::forUpdate)
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetchOne")))
.reactive()
.take(1))
.contextWrite(context)
.singleOrEmpty();
.doFetch(operator.query(tableMetadata),
"fetchOne",
(_queryOperator) -> _queryOperator
.context(param.getContext())
.select(getSelectColumn())
.where(param.getTerms())
.orderBy(getSortOrder())
//.paging(0, 1)
.when(param.isForUpdate(), QueryOperator::forUpdate)
.fetch(eventWrapper(tableMetadata, wrapper, executorType("reactive"), type("fetchOne")))
.reactive()
.take(1))
.contextWrite(context)
.singleOrEmpty();
}

private <O> Flux<O> doFetch(QueryOperator queryOperator, String type, Function<QueryOperator, Publisher<O>> executor) {
DefaultReactiveResultHolder holder = new DefaultReactiveResultHolder();
tableMetadata
.fireEvent(MappingEventTypes.select_before, eventContext ->
eventContext.set(
source(DefaultReactiveQuery.this),
query(queryOperator),
dml(operator),
tableMetadata(tableMetadata),
columnMapping(columnMapping),
reactiveResultHolder.value(holder),
queryOaram.value(param),
executorType("reactive"),
type(type)
));
.fireEvent(MappingEventTypes.select_before, eventContext ->
eventContext.set(
source(DefaultReactiveQuery.this),
query(queryOperator),
dml(operator),
tableMetadata(tableMetadata),
columnMapping(columnMapping),
reactiveResultHolder.value(holder),
queryOaram.value(param),
executorType("reactive"),
type(type)
));
return holder
.doBefore()
.thenMany(Flux.defer(() -> executor.apply(queryOperator.clone())));
.doBefore()
.thenMany(Flux.defer(() -> executor.apply(queryOperator.clone())));
}

@Override
public Mono<Integer> count() {
QueryOperator queryOperator = operator
.query(tableMetadata)
.select(count1().as("_total"));
.query(tableMetadata)
.select(count1().as("_total"));
return this
.doFetch(queryOperator, "count", _opt -> _opt
.context(param.getContext())
.where(param.getTerms())
.fetch(column("_total", Number.class::cast))
.reactive()
.map(Number::intValue)
.reduce(Math::addExact)
.switchIfEmpty(Mono.just(0)))
.contextWrite(context)
.singleOrEmpty();
.doFetch(queryOperator, "count", _opt -> _opt
.context(param.getContext())
.where(param.getTerms())
.fetch(column("_total", Number.class::cast))
.reactive()
.map(Number::intValue)
.reduce(Math::addExact)
.switchIfEmpty(Mono.just(0)))
.contextWrite(context)
.singleOrEmpty();
}

}

0 comments on commit 423062e

Please sign in to comment.