Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions clirr-ignores.xml
Original file line number Diff line number Diff line change
Expand Up @@ -447,4 +447,10 @@
<method>*one*</method>
<justification>False positive. Method is still present in parent interface (and was only introduced in ResultSet as a workaround for another clirr false positive)</justification>
</difference>

<difference>
<differenceType>8001</differenceType> <!-- class removed -->
<className>com/datastax/driver/core/GuavaCompatibility</className>
<justification>Compatibility shim dropped now that we depend on modern Guava APIs directly.</justification>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
package com.datastax.driver.core;

import com.google.common.base.Function;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.util.concurrent.EventExecutor;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -114,7 +116,7 @@ public ListenableFuture<PreparedStatement> prepareAsync(final RegularStatement s
final CodecRegistry codecRegistry = getCluster().getConfiguration().getCodecRegistry();
ListenableFuture<PreparedStatement> prepared =
prepareAsync(statement.getQueryString(codecRegistry), statement.getOutgoingPayload());
return GuavaCompatibility.INSTANCE.transform(
return Futures.transform(
prepared,
new Function<PreparedStatement, PreparedStatement>() {
@Override
Expand All @@ -134,7 +136,8 @@ public PreparedStatement apply(PreparedStatement prepared) {

return prepared;
}
});
},
MoreExecutors.directExecutor());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -30,7 +32,7 @@ class ChainedResultSetFuture extends AbstractFuture<ResultSet> implements Result
void setSource(ResultSetFuture source) {
if (this.isCancelled()) source.cancel(false);
this.source = source;
GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
source,
new FutureCallback<ResultSet>() {
@Override
Expand All @@ -42,7 +44,8 @@ public void onSuccess(ResultSet result) {
public void onFailure(Throwable t) {
ChainedResultSetFuture.this.setException(t);
}
});
},
MoreExecutors.directExecutor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.List;

/**
Expand Down Expand Up @@ -68,7 +69,7 @@ static class Forwarding extends CloseFuture {
Forwarding(List<CloseFuture> futures) {
this.futures = futures;

GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
Futures.allAsList(futures),
new FutureCallback<List<Void>>() {
@Override
Expand All @@ -80,7 +81,8 @@ public void onFailure(Throwable t) {
public void onSuccess(List<Void> v) {
Forwarding.this.onFuturesDone();
}
});
},
MoreExecutors.directExecutor());
}

@Override
Expand Down
23 changes: 13 additions & 10 deletions driver-core/src/main/java/com/datastax/driver/core/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ public class Cluster implements Closeable {

static {
logDriverVersion();
// Force initialization to fail fast if there is an issue detecting the version
GuavaCompatibility.init();
}

@VisibleForTesting
Expand Down Expand Up @@ -399,17 +397,19 @@ public ListenableFuture<Session> connectAsync(final String keyspace) {
} else {
final String useQuery = "USE " + keyspace;
ListenableFuture<ResultSet> keyspaceSet =
GuavaCompatibility.INSTANCE.transformAsync(
Futures.transformAsync(
sessionInitialized,
new AsyncFunction<Session, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(Session session) throws Exception {
return session.executeAsync(useQuery);
}
});
},
MoreExecutors.directExecutor());
ListenableFuture<ResultSet> withErrorHandling =
GuavaCompatibility.INSTANCE.withFallback(
Futures.catchingAsync(
keyspaceSet,
Throwable.class,
new AsyncFunction<Throwable, ResultSet>() {
@Override
public ListenableFuture<ResultSet> apply(Throwable t) throws Exception {
Expand All @@ -427,8 +427,10 @@ public ListenableFuture<ResultSet> apply(Throwable t) throws Exception {
}
throw Throwables.propagate(t);
}
});
return GuavaCompatibility.INSTANCE.transform(withErrorHandling, Functions.constant(session));
},
MoreExecutors.directExecutor());
return Futures.transform(
withErrorHandling, Functions.constant(session), MoreExecutors.directExecutor());
}
}

Expand Down Expand Up @@ -2696,7 +2698,7 @@ public void run() {
future.setResult(rs);
}
},
GuavaCompatibility.INSTANCE.sameThreadExecutor());
MoreExecutors.directExecutor());

} catch (Exception e) {
logger.warn("Error while waiting for schema agreement", e);
Expand Down Expand Up @@ -3136,7 +3138,7 @@ private ListenableFuture<?> schedule(final ExceptionCatchingRunnable task) {
@Override
public void runMayThrow() throws Exception {
ListenableFuture<?> f = execute(task);
GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
f,
new FutureCallback<Object>() {
@Override
Expand All @@ -3148,7 +3150,8 @@ public void onSuccess(Object result) {
public void onFailure(Throwable t) {
future.setException(t);
}
});
},
MoreExecutors.directExecutor());
}
},
NEW_NODE_DELAY_SECONDS,
Expand Down
27 changes: 15 additions & 12 deletions driver-core/src/main/java/com/datastax/driver/core/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import io.netty.bootstrap.Bootstrap;
Expand Down Expand Up @@ -348,17 +349,18 @@ public void operationComplete(ChannelFuture future) throws Exception {
factory.manager.configuration.getPoolingOptions().getInitializationExecutor();

ListenableFuture<Void> queryOptionsFuture =
GuavaCompatibility.INSTANCE.transformAsync(
Futures.transformAsync(
channelReadyFuture, onChannelReady(protocolVersion, initExecutor), initExecutor);

ListenableFuture<Void> initializeTransportFuture =
GuavaCompatibility.INSTANCE.transformAsync(
Futures.transformAsync(
queryOptionsFuture, onOptionsReady(protocolVersion, initExecutor), initExecutor);

// Fallback on initializeTransportFuture so we can properly propagate specific exceptions.
ListenableFuture<Void> initFuture =
GuavaCompatibility.INSTANCE.withFallback(
Futures.catchingAsync(
initializeTransportFuture,
Throwable.class,
new AsyncFunction<Throwable, Void>() {
@Override
public ListenableFuture<Void> apply(Throwable t) throws Exception {
Expand Down Expand Up @@ -390,7 +392,7 @@ public ListenableFuture<Void> apply(Throwable t) throws Exception {
initExecutor);

// Ensure the connection gets closed if the caller cancels the returned future.
GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
initFuture,
new MoreFutures.FailureCallback<Void>() {
@Override
Expand Down Expand Up @@ -431,7 +433,8 @@ private static String extractMessage(Throwable t) {
public ListenableFuture<String> optionsQuery() {
Future startupOptionsFuture = write(new Requests.Options());

return GuavaCompatibility.INSTANCE.transformAsync(startupOptionsFuture, onSupportedResponse());
return Futures.transformAsync(
startupOptionsFuture, onSupportedResponse(), MoreExecutors.directExecutor());
}

private AsyncFunction<Void, Void> onChannelReady(
Expand All @@ -440,7 +443,7 @@ private AsyncFunction<Void, Void> onChannelReady(
@Override
public ListenableFuture<Void> apply(Void input) throws Exception {
Future startupOptionsFuture = write(new Requests.Options());
return GuavaCompatibility.INSTANCE.transformAsync(
return Futures.transformAsync(
startupOptionsFuture, onOptionsResponse(protocolVersion, initExecutor), initExecutor);
}
};
Expand Down Expand Up @@ -516,7 +519,7 @@ public ListenableFuture<Void> apply(Void input) throws Exception {
write(
new Requests.Startup(
protocolOptions.getCompression(), protocolOptions.isNoCompact(), extraOptions));
return GuavaCompatibility.INSTANCE.transformAsync(
return Futures.transformAsync(
startupResponseFuture, onStartupResponse(protocolVersion, initExecutor), initExecutor);
}
};
Expand Down Expand Up @@ -624,7 +627,7 @@ private ListenableFuture<Void> checkClusterName(
new Requests.Query("select cluster_name from system.local where key = 'local'"));
try {
write(clusterNameFuture);
return GuavaCompatibility.INSTANCE.transformAsync(
return Futures.transformAsync(
clusterNameFuture,
new AsyncFunction<ResultSet, Void>() {
@Override
Expand Down Expand Up @@ -663,7 +666,7 @@ private ListenableFuture<Void> authenticateV1(
new Requests.Credentials(((ProtocolV1Authenticator) authenticator).getCredentials());
try {
Future authResponseFuture = write(creds);
return GuavaCompatibility.INSTANCE.transformAsync(
return Futures.transformAsync(
authResponseFuture,
new AsyncFunction<Message.Response, Void>() {
@Override
Expand Down Expand Up @@ -699,7 +702,7 @@ private ListenableFuture<Void> authenticateV2(

try {
Future authResponseFuture = write(new Requests.AuthResponse(initialResponse));
return GuavaCompatibility.INSTANCE.transformAsync(
return Futures.transformAsync(
authResponseFuture, onV2AuthResponse(authenticator, protocolVersion, executor), executor);
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
Expand Down Expand Up @@ -730,7 +733,7 @@ public ListenableFuture<Void> apply(Message.Response authResponse) throws Except
// Otherwise, send the challenge response back to the server
logger.trace("{} Sending Auth response to challenge", this);
Future nextResponseFuture = write(new Requests.AuthResponse(responseToServer));
return GuavaCompatibility.INSTANCE.transformAsync(
return Futures.transformAsync(
nextResponseFuture,
onV2AuthResponse(authenticator, protocolVersion, executor),
executor);
Expand Down Expand Up @@ -898,7 +901,7 @@ ListenableFuture<Connection> setKeyspaceAsync(final String keyspace)
// Note: we quote the keyspace below, because the name is the one coming from Cassandra, so
// it's in the right case already
Future future = write(new Requests.Query("USE \"" + keyspace + '"'));
GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
future,
new FutureCallback<Message.Response>() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.net.InetAddress;
import java.net.InetSocketAddress;
Expand Down Expand Up @@ -721,7 +723,7 @@ private ListenableFuture<ResultSet> selectPeersFuture(final Connection connectio
connection.write(peersV2Future);
final SettableFuture<ResultSet> peersFuture = SettableFuture.create();
// if peers v2 query fails, query peers table instead.
GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
peersV2Future,
new FutureCallback<ResultSet>() {

Expand All @@ -745,7 +747,8 @@ public void onFailure(Throwable t) {
peersFuture.setException(t);
}
}
});
},
MoreExecutors.directExecutor());
return peersFuture;
} else {
DefaultResultSetFuture peersFuture =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.datastax.driver.core.utils.MoreFutures;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.util.List;
import java.util.Queue;
Expand Down Expand Up @@ -248,7 +250,7 @@ private void deliverEvents() {
} else {
logger.trace("{} debouncer: delivering {} events", name, toDeliver.size());
ListenableFuture<?> delivered = callback.deliver(toDeliver);
GuavaCompatibility.INSTANCE.addCallback(
Futures.addCallback(
delivered,
new FutureCallback<Object>() {
@Override
Expand All @@ -260,7 +262,8 @@ public void onSuccess(Object result) {
public void onFailure(Throwable t) {
for (SettableFuture<Void> future : futures) future.setException(t);
}
});
},
MoreExecutors.directExecutor());
}

// If we didn't dequeue all events (or new ones arrived since we did), make sure we eventually
Expand Down
Loading
Loading