Skip to content

Commit

Permalink
fix #912 rework windowTimeout internal to ensure consistency (#913)
Browse files Browse the repository at this point in the history
- The previous windowTimeout implementation inherited from older design
and wasn't aligned with last year work from Reactive Streams Commons.
With drainLoop design, we can now ensure proper serialization without
requiring an internal Serialized Subscriber.
  • Loading branch information
smaldini authored Oct 20, 2017
1 parent 76f22f6 commit 4d215d9
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 418 deletions.
2 changes: 1 addition & 1 deletion reactor-core/src/main/java/reactor/core/Disposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ interface Swap extends Disposable, Supplier<Disposable> {
* @return true if the operation succeeded, false if the container has been disposed
* @see #replace(Disposable)
*/
boolean update(Disposable next);
boolean update(@Nullable Disposable next);

/**
* Atomically push the next {@link Disposable} on this container but don't dispose the previous
Expand Down
2 changes: 1 addition & 1 deletion reactor-core/src/main/java/reactor/core/Disposables.java
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ static final class SwapDisposable implements Disposable.Swap {
AtomicReferenceFieldUpdater.newUpdater(SwapDisposable.class, Disposable.class, "inner");

@Override
public boolean update(Disposable next) {
public boolean update(@Nullable Disposable next) {
return Disposables.set(INNER, this, next);
}

Expand Down
4 changes: 2 additions & 2 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -2289,7 +2289,7 @@ public final Flux<List<T>> bufferTimeout(int maxSize, Duration timespan, Schedul
*/
public final <C extends Collection<? super T>> Flux<C> bufferTimeout(int maxSize, Duration timespan,
Scheduler timer, Supplier<C> bufferSupplier) {
return onAssembly(new FluxBufferTimeOrSize<>(this, maxSize, timespan.toMillis(), timer, bufferSupplier));
return onAssembly(new FluxBufferTimeout<>(this, maxSize, timespan.toMillis(), timer, bufferSupplier));
}

/**
Expand Down Expand Up @@ -7448,7 +7448,7 @@ public final Flux<Flux<T>> windowTimeout(int maxSize, Duration timespan) {
* @return a {@link Flux} of {@link Flux} windows based on element count and duration
*/
public final Flux<Flux<T>> windowTimeout(int maxSize, Duration timespan, Scheduler timer) {
return onAssembly(new FluxWindowTimeOrSize<>(this, maxSize, timespan.toMillis(), timer));
return onAssembly(new FluxWindowTimeout<>(this, maxSize, timespan.toMillis(), timer));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@
/**
* @author Stephane Maldini
*/
final class FluxBufferTimeOrSize<T, C extends Collection<? super T>> extends FluxOperator<T,
final class FluxBufferTimeout<T, C extends Collection<? super T>> extends FluxOperator<T,
C> {

final int batchSize;
final Supplier<C> bufferSupplier;
final Scheduler timer;
final long timespan;

FluxBufferTimeOrSize(Flux<T> source,
FluxBufferTimeout(Flux<T> source,
int maxSize,
long timespan,
Scheduler timer,
Expand Down
Loading

0 comments on commit 4d215d9

Please sign in to comment.