Skip to content

Commit

Permalink
release hotfix: Adapt to MonoSubscriber not fusing by default (#288)
Browse files Browse the repository at this point in the history
In reactor-core, `Operators.MonoSubscriber` has stopped implementing
ASYNC fusion as a base. It continues to be compatible with Fuseable
publishers but now by default only negotiates `Fuseable.NONE`.

Some RxJava adapter classes don't really have a way of propagating the
fusion up to RxJava and used to rely on the default ASYNC capability
of MonoSubscriber, testing that `requestFusion` would indeed negotiate
that. Now that it negotiates NONE, said tests fail.

This commit removes the tests and adds a FIXME as a more in depth follow
up to this issue (where we can evaluate if it makes sense to keep the
publishers Fuseable).

Also update to latest 3.4.x core snapshot.

See reactor/reactor-core#3245.
  • Loading branch information
simonbasle authored Nov 8, 2022
1 parent 51075b0 commit 2b13a2d
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 45 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[versions]
reactorCore = "3.4.18-SNAPSHOT"
reactorCore = "3.4.25-SNAPSHOT"
# Other shared versions
kotlin = "1.5.32"
reactiveStreams = "1.0.3"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,7 @@ public void subscribe(CoreSubscriber<? super T> s) {
source.subscribe(new MaybeAsMonoObserver<>(s));
}

//FIXME this and all other MonoSubscriber-extending classes are fake-fuseable: requestFusion will always reply NONE unless overridden
static final class MaybeAsMonoObserver<T> extends MonoSubscriber<T, T> implements MaybeObserver<T> {

io.reactivex.rxjava3.disposables.Disposable d;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,8 @@ public void singleToMono() {
.expectComplete()
.verify();
}
//NB: MonoSubscribers like SingleToMonoSubscriber are not fuseable anymore

@Test
public void singleToMonoFused() {
Mono<Integer> m = Single.just(1)
.to(RxJava2Adapter::singleToMono);

StepVerifier.create(m)
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectNext(1)
.expectComplete()
.verify();
}

@Test
public void monoToSingle() {
Mono.just(1)
Expand Down Expand Up @@ -233,14 +222,6 @@ public void maybeToMonoError() {
.expectErrorMessage("Forced failure")
.verify();
}
//NB: MonoSubscribers like MaybeToMonoSubscriber are not fuseable anymore

@Test
public void maybeToMonoEmptyFused() {
Mono<Void> m = Maybe.<Void>empty().to(RxJava2Adapter::maybeToMono);

StepVerifier.create(m)
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectComplete()
.verify();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,7 @@ public void maybeToMonoEmpty() {
.expectComplete()
.verify();
}

@Test
public void maybeToMonoEmptyFused() {
Mono<Void> m = Maybe.<Void>empty().to(RxJava3Adapter::maybeToMono);

StepVerifier.create(m)
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectComplete()
.verify();
}
//NB: MonoSubscribers like MaybeToMonoSubscriber are not fuseable anymore

@Test
public void maybeToMonoError() {
Expand Down Expand Up @@ -312,19 +303,7 @@ public void singleToMono() {
.expectComplete()
.verify();
}

@Test
public void singleToMonoFused() {
Mono<Integer> m = Single.just(1)
.to(RxJava3Adapter::singleToMono);

StepVerifier.create(m)
.expectFusion(Fuseable.ANY, Fuseable.ASYNC)
.expectNext(1)
.expectComplete()
.verify();
}

//NB: MonoSubscribers like SingleToMonoSubscriber are not fuseable anymore

@Test
public void scheduler() {
Expand Down

0 comments on commit 2b13a2d

Please sign in to comment.