Skip to content

Commit

Permalink
Wait for schema agreement rather than in flight schema requests when …
Browse files Browse the repository at this point in the history
…bootstrapping (CASSANDRA-15158)
  • Loading branch information
Cameron Zemek committed Oct 23, 2020
1 parent b0b6f12 commit db7ed09
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 48 deletions.
41 changes: 15 additions & 26 deletions src/java/org/apache/cassandra/service/MigrationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,24 @@ public void unregister(MigrationListener listener)
listeners.remove(listener);
}

public static void scheduleSchemaPullNoDelay(InetAddress endpoint, EndpointState state)
{
UUID schemaVersion = state.getSchemaVersion();
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value, true);
}
public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state)
{
UUID schemaVersion = state.getSchemaVersion();
if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null)
maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value);
maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value, false);
}

/**
* If versions differ this node sends request with local migration list to the endpoint
* and expecting to receive a list of migrations to apply locally.
*/
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String releaseVersion)
private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String releaseVersion, boolean noDelay)
{
String ourMajorVersion = FBUtilities.getReleaseVersionMajor();
if (!releaseVersion.startsWith(ourMajorVersion))
Expand All @@ -113,11 +119,11 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA
}
if (!shouldPullSchemaFrom(endpoint))
{
logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false");
logger.debug("Not pulling schema because shouldPullSchemaFrom returned false");
return;
}

if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS)
if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS || noDelay)
{
// If we think we may be bootstrapping or have recently started, submit MigrationTask immediately
logger.debug("Immediately submitting migration task for {}, " +
Expand Down Expand Up @@ -184,28 +190,6 @@ private static boolean is30Compatible(int version)
return version == MessagingService.current_version || version == MessagingService.VERSION_3014;
}

public static boolean isReadyForBootstrap()
{
return MigrationTask.getInflightTasks().isEmpty();
}

public static void waitUntilReadyForBootstrap()
{
CountDownLatch completionLatch;
while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null)
{
try
{
if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS))
logger.error("Migration task failed to complete");
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
logger.error("Migration task was interrupted");
}
}
}

public void notifyCreateKeyspace(KeyspaceMetadata ksm)
{
Expand Down Expand Up @@ -651,6 +635,11 @@ public static void resetLocalSchema()
logger.info("Local schema reset is complete.");
}

public int getMigrationTaskWaitInSeconds()
{
return MIGRATION_TASK_WAIT_IN_SECONDS;
}

/**
* We have a set of non-local, distributed system keyspaces, e.g. system_traces, system_auth, etc.
* (see {@link Schema#REPLICATED_SYSTEM_KEYSPACE_NAMES}), that need to be created on cluster initialisation,
Expand Down
60 changes: 44 additions & 16 deletions src/java/org/apache/cassandra/service/MigrationTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@

import java.net.InetAddress;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ConcurrentSkipListSet;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.SystemKeyspace.BootstrapState;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.RequestFailureReason;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallback;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
Expand All @@ -44,20 +46,38 @@ class MigrationTask extends WrappedRunnable
{
private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class);

private static final ConcurrentLinkedQueue<CountDownLatch> inflightTasks = new ConcurrentLinkedQueue<>();

private static final Set<BootstrapState> monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS);

private static final Comparator<InetAddress> inetcomparator = new Comparator<InetAddress>()
{
public int compare(InetAddress addr1, InetAddress addr2)
{
return addr1.getHostAddress().compareTo(addr2.getHostAddress());
}
};

private static final Set<InetAddress> inFlightRequests = new ConcurrentSkipListSet<InetAddress>(inetcomparator);

private final InetAddress endpoint;

MigrationTask(InetAddress endpoint)
{
this.endpoint = endpoint;
}

public static ConcurrentLinkedQueue<CountDownLatch> getInflightTasks()
public static boolean addInFlightSchemaRequest(InetAddress ep)
{
return inflightTasks;
return inFlightRequests.add(ep);
}

public static boolean completedInFlightSchemaRequest(InetAddress ep)
{
return inFlightRequests.remove(ep);
}

public static boolean hasInFlighSchemaRequest(InetAddress ep)
{
return inFlightRequests.contains(ep);
}

public void runMayThrow() throws Exception
Expand All @@ -77,17 +97,23 @@ public void runMayThrow() throws Exception
return;
}

if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()) && !addInFlightSchemaRequest(endpoint))
{
logger.debug("Skipped sending a migration request: node {} already has a request in flight", endpoint);
return;
}

MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance);

final CountDownLatch completionLatch = new CountDownLatch(1);

IAsyncCallback<Collection<Mutation>> cb = new IAsyncCallback<Collection<Mutation>>()
IAsyncCallbackWithFailure<Collection<Mutation>>cb = new IAsyncCallbackWithFailure<Collection<Mutation>>()
{
@Override
public void response(MessageIn<Collection<Mutation>> message)
{
try
{
logger.trace("Received response to schema request from {} at {}", message.from, System.currentTimeMillis());
SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload);
}
catch (ConfigurationException e)
Expand All @@ -96,20 +122,22 @@ public void response(MessageIn<Collection<Mutation>> message)
}
finally
{
completionLatch.countDown();
completedInFlightSchemaRequest(endpoint);
}
}

public boolean isLatencyForSnitch()
{
return false;
}
};

// Only save the latches if we need bootstrap or are bootstrapping
if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()))
inflightTasks.offer(completionLatch);

MessagingService.instance().sendRR(message, endpoint, cb);
public void onFailure(InetAddress from, RequestFailureReason failureReason)
{
logger.warn("Timed out waiting for schema response from {} at {}", endpoint, System.currentTimeMillis());
completedInFlightSchemaRequest(endpoint);
}
};
logger.trace("Sending schema pull request to {} at {} with timeout {}", endpoint, System.currentTimeMillis(), message.getTimeout());
MessagingService.instance().sendRR(message, endpoint, cb, message.getTimeout(), true);
}
}
51 changes: 45 additions & 6 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -901,14 +901,53 @@ public void waitForSchema(int delay)
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
// if our schema hasn't matched yet, wait until it has
// we do this by waiting for all in-flight migration requests and responses to complete
// (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful)
if (!MigrationManager.isReadyForBootstrap())

int retryCount = 0;

//if we don't have the same schema as the rest of the live nodes, send new schema pull requests
while (!checkForSchemaAgreement())
{
setMode(Mode.JOINING, "schema not yet in agreement, sending new schema pull requests", true);
for (InetAddress remote :Gossiper.instance.getLiveTokenOwners())
{
if (!MigrationTask.hasInFlighSchemaRequest(remote))
{
logger.debug("Resending schema request to: {}", remote.toString());
MigrationManager.scheduleSchemaPullNoDelay(remote, Gossiper.instance.getEndpointStateForEndpoint(remote));
Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getMinRpcTimeout() +
((MigrationManager.instance.getMigrationTaskWaitInSeconds() * 1000)), TimeUnit.MILLISECONDS);
}
else
{
logger.debug("Schema request already in progress with: {}", remote.toString());
Uninterruptibles.sleepUninterruptibly(MigrationManager.instance.getMigrationTaskWaitInSeconds(), TimeUnit.SECONDS);
}

if (checkForSchemaAgreement())
return;
}

retryCount++;

// Abort startup if we still cant find an agreement after resending a request to each node
if (retryCount > 1)
throw new RuntimeException("Couldn't achieve schema agreement after waiting for, or resending schema requests to each node");

}
}

public boolean checkForSchemaAgreement()
{
UUID localVersion = Schema.instance.getVersion();
logger.debug("Local schema version: {}", Schema.schemaVersionToString(localVersion));
for (InetAddress remote :Gossiper.instance.getLiveTokenOwners())
{
setMode(Mode.JOINING, "waiting for schema information to complete", true);
MigrationManager.waitUntilReadyForBootstrap();
UUID remoteVersion = Gossiper.instance.getSchemaVersion(remote);
logger.debug("Remote node {}, schema version: {}", remote.toString(), Schema.schemaVersionToString(remoteVersion));
if (!localVersion.equals(remoteVersion))
return false;
}
return true;
}

@VisibleForTesting
Expand Down

0 comments on commit db7ed09

Please sign in to comment.