Skip to content

Commit

Permalink
Make Subscription non nullable and suppress NullAway (#3647)
Browse files Browse the repository at this point in the history
Signed-off-by: Violeta Georgieva <violeta.georgieva@broadcom.com>
  • Loading branch information
violetagg authored Feb 20, 2025
1 parent a09b92e commit d044e89
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ static final class DisposableAcquire
// Deliberately suppress "NullAway"
// This is a lazy initialization
PooledRef<PooledConnection> pooledRef;
@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableAcquire(
ConnectionObserver obs,
Expand Down Expand Up @@ -164,6 +168,7 @@ public Context currentContext() {

@Override
public void dispose() {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,11 @@ static final class DisposableConnect implements CoreSubscriber<Channel>, Disposa
final Context currentContext;
final @Nullable Supplier<? extends SocketAddress> bindAddress;

@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableConnect(MonoSink<Connection> sink, @Nullable Supplier<? extends SocketAddress> bindAddress) {
this(sink, Context.of(sink.contextView()), bindAddress);
Expand All @@ -136,6 +140,7 @@ public Context currentContext() {

@Override
public void dispose() {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,7 +498,11 @@ static class DisposableBind implements CoreSubscriber<Channel>, DisposableServer
final SocketAddress bindAddress;

@Nullable Channel channel;
@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableBind(MonoSink<DisposableServer> sink, TransportConfig config, SocketAddress bindAddress) {
this.sink = sink;
Expand Down Expand Up @@ -535,6 +539,7 @@ public final void dispose() {
}
}
else {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,11 @@ static final class DisposableAcquire
// This is a lazy initialization
PooledRef<Connection> pooledRef;
@Nullable SocketAddress remoteAddress;
@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableAcquire(
ConnectionObserver obs,
Expand Down Expand Up @@ -306,6 +310,7 @@ public Context currentContext() {

@Override
public void dispose() {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,11 @@ static final class DisposableAcquire
// This is a lazy initialization
PooledRef<Connection> pooledRef;
@Nullable SocketAddress remoteAddress;
@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableAcquire(
boolean acceptGzip,
Expand Down Expand Up @@ -303,6 +307,7 @@ public Context currentContext() {

@Override
public void dispose() {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ static final class DisposableConnect implements CoreSubscriber<Channel>, Disposa
final ConnectionObserver streamObserver;
final Map<ChannelOption<?>, ?> streamOptions;

@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableConnect(QuicClientConfig config, SocketAddress bindAddress, MonoSink<QuicConnection> sink) {
this.attributes = config.attributes();
Expand All @@ -159,6 +163,7 @@ public Context currentContext() {

@Override
public void dispose() {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
import io.netty.util.NetUtil;
import org.jspecify.annotations.Nullable;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
Expand Down Expand Up @@ -115,7 +114,11 @@ static final class DisposableBind implements CoreSubscriber<Channel>, Disposable
final Context currentContext;
final MonoSink<Connection> sink;

@Nullable Subscription subscription;
// Never null when accessed - only via dispose()
// which is registered into sink.onCancel() callback.
// See onSubscribe(Subscription).
@SuppressWarnings("NullAway")
Subscription subscription;

DisposableBind(SocketAddress bindAddress, MonoSink<Connection> sink) {
this.bindAddress = bindAddress;
Expand All @@ -130,6 +133,7 @@ public Context currentContext() {

@Override
public void dispose() {
// sink.onCancel() registration happens in onSubscribe()
subscription.cancel();
}

Expand Down

0 comments on commit d044e89

Please sign in to comment.