diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java index 019bc6730dbf..d5c0f7102c1d 100644 --- a/src/java/org/apache/cassandra/service/MigrationManager.java +++ b/src/java/org/apache/cassandra/service/MigrationManager.java @@ -76,18 +76,24 @@ public void unregister(MigrationListener listener) listeners.remove(listener); } + public static void scheduleSchemaPullNoDelay(InetAddress endpoint, EndpointState state) + { + UUID schemaVersion = state.getSchemaVersion(); + if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null) + maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value, true); + } public static void scheduleSchemaPull(InetAddress endpoint, EndpointState state) { UUID schemaVersion = state.getSchemaVersion(); if (!endpoint.equals(FBUtilities.getBroadcastAddress()) && schemaVersion != null) - maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value); + maybeScheduleSchemaPull(schemaVersion, endpoint, state.getApplicationState(ApplicationState.RELEASE_VERSION).value, false); } /** * If versions differ this node sends request with local migration list to the endpoint * and expecting to receive a list of migrations to apply locally. */ - private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String releaseVersion) + private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetAddress endpoint, String releaseVersion, boolean noDelay) { String ourMajorVersion = FBUtilities.getReleaseVersionMajor(); if (!releaseVersion.startsWith(ourMajorVersion)) @@ -113,11 +119,11 @@ private static void maybeScheduleSchemaPull(final UUID theirVersion, final InetA } if (!shouldPullSchemaFrom(endpoint)) { - logger.debug("Not pulling schema because versions match or shouldPullSchemaFrom returned false"); + logger.debug("Not pulling schema because shouldPullSchemaFrom returned false"); return; } - if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS) + if (Schema.instance.isEmpty() || runtimeMXBean.getUptime() < MIGRATION_DELAY_IN_MS || noDelay) { // If we think we may be bootstrapping or have recently started, submit MigrationTask immediately logger.debug("Immediately submitting migration task for {}, " + @@ -184,28 +190,6 @@ private static boolean is30Compatible(int version) return version == MessagingService.current_version || version == MessagingService.VERSION_3014; } - public static boolean isReadyForBootstrap() - { - return MigrationTask.getInflightTasks().isEmpty(); - } - - public static void waitUntilReadyForBootstrap() - { - CountDownLatch completionLatch; - while ((completionLatch = MigrationTask.getInflightTasks().poll()) != null) - { - try - { - if (!completionLatch.await(MIGRATION_TASK_WAIT_IN_SECONDS, TimeUnit.SECONDS)) - logger.error("Migration task failed to complete"); - } - catch (InterruptedException e) - { - Thread.currentThread().interrupt(); - logger.error("Migration task was interrupted"); - } - } - } public void notifyCreateKeyspace(KeyspaceMetadata ksm) { @@ -651,6 +635,11 @@ public static void resetLocalSchema() logger.info("Local schema reset is complete."); } + public int getMigrationTaskWaitInSeconds() + { + return MIGRATION_TASK_WAIT_IN_SECONDS; + } + /** * We have a set of non-local, distributed system keyspaces, e.g. system_traces, system_auth, etc. * (see {@link Schema#REPLICATED_SYSTEM_KEYSPACE_NAMES}), that need to be created on cluster initialisation, diff --git a/src/java/org/apache/cassandra/service/MigrationTask.java b/src/java/org/apache/cassandra/service/MigrationTask.java index db65c20769d2..c36094f5e990 100644 --- a/src/java/org/apache/cassandra/service/MigrationTask.java +++ b/src/java/org/apache/cassandra/service/MigrationTask.java @@ -19,20 +19,22 @@ import java.net.InetAddress; import java.util.Collection; +import java.util.Comparator; import java.util.EnumSet; import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ConcurrentSkipListSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.SystemKeyspace.BootstrapState; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.gms.FailureDetector; -import org.apache.cassandra.net.IAsyncCallback; +import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; @@ -44,10 +46,18 @@ class MigrationTask extends WrappedRunnable { private static final Logger logger = LoggerFactory.getLogger(MigrationTask.class); - private static final ConcurrentLinkedQueue inflightTasks = new ConcurrentLinkedQueue<>(); - private static final Set monitoringBootstrapStates = EnumSet.of(BootstrapState.NEEDS_BOOTSTRAP, BootstrapState.IN_PROGRESS); + private static final Comparator inetcomparator = new Comparator() + { + public int compare(InetAddress addr1, InetAddress addr2) + { + return addr1.getHostAddress().compareTo(addr2.getHostAddress()); + } + }; + + private static final Set inFlightRequests = new ConcurrentSkipListSet(inetcomparator); + private final InetAddress endpoint; MigrationTask(InetAddress endpoint) @@ -55,9 +65,19 @@ class MigrationTask extends WrappedRunnable this.endpoint = endpoint; } - public static ConcurrentLinkedQueue getInflightTasks() + public static boolean addInFlightSchemaRequest(InetAddress ep) { - return inflightTasks; + return inFlightRequests.add(ep); + } + + public static boolean completedInFlightSchemaRequest(InetAddress ep) + { + return inFlightRequests.remove(ep); + } + + public static boolean hasInFlighSchemaRequest(InetAddress ep) + { + return inFlightRequests.contains(ep); } public void runMayThrow() throws Exception @@ -77,17 +97,23 @@ public void runMayThrow() throws Exception return; } + if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState()) && !addInFlightSchemaRequest(endpoint)) + { + logger.debug("Skipped sending a migration request: node {} already has a request in flight", endpoint); + return; + } + MessageOut message = new MessageOut<>(MessagingService.Verb.MIGRATION_REQUEST, null, MigrationManager.MigrationsSerializer.instance); - final CountDownLatch completionLatch = new CountDownLatch(1); - IAsyncCallback> cb = new IAsyncCallback>() + IAsyncCallbackWithFailure>cb = new IAsyncCallbackWithFailure>() { @Override public void response(MessageIn> message) { try { + logger.trace("Received response to schema request from {} at {}", message.from, System.currentTimeMillis()); SchemaKeyspace.mergeSchemaAndAnnounceVersion(message.payload); } catch (ConfigurationException e) @@ -96,7 +122,7 @@ public void response(MessageIn> message) } finally { - completionLatch.countDown(); + completedInFlightSchemaRequest(endpoint); } } @@ -104,12 +130,14 @@ public boolean isLatencyForSnitch() { return false; } - }; - - // Only save the latches if we need bootstrap or are bootstrapping - if (monitoringBootstrapStates.contains(SystemKeyspace.getBootstrapState())) - inflightTasks.offer(completionLatch); - MessagingService.instance().sendRR(message, endpoint, cb); + public void onFailure(InetAddress from, RequestFailureReason failureReason) + { + logger.warn("Timed out waiting for schema response from {} at {}", endpoint, System.currentTimeMillis()); + completedInFlightSchemaRequest(endpoint); + } + }; + logger.trace("Sending schema pull request to {} at {} with timeout {}", endpoint, System.currentTimeMillis(), message.getTimeout()); + MessagingService.instance().sendRR(message, endpoint, cb, message.getTimeout(), true); } } diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8466eb332e22..7430595b072e 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -895,14 +895,53 @@ public void waitForSchema(int delay) } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } - // if our schema hasn't matched yet, wait until it has - // we do this by waiting for all in-flight migration requests and responses to complete - // (post CASSANDRA-1391 we don't expect this to be necessary very often, but it doesn't hurt to be careful) - if (!MigrationManager.isReadyForBootstrap()) + + int retryCount = 0; + + //if we don't have the same schema as the rest of the live nodes, send new schema pull requests + while (!checkForSchemaAgreement()) + { + setMode(Mode.JOINING, "schema not yet in agreement, sending new schema pull requests", true); + for (InetAddress remote :Gossiper.instance.getLiveTokenOwners()) + { + if (!MigrationTask.hasInFlighSchemaRequest(remote)) + { + logger.debug("Resending schema request to: {}", remote.toString()); + MigrationManager.scheduleSchemaPullNoDelay(remote, Gossiper.instance.getEndpointStateForEndpoint(remote)); + Uninterruptibles.sleepUninterruptibly(DatabaseDescriptor.getMinRpcTimeout() + + ((MigrationManager.instance.getMigrationTaskWaitInSeconds() * 1000)), TimeUnit.MILLISECONDS); + } + else + { + logger.debug("Schema request already in progress with: {}", remote.toString()); + Uninterruptibles.sleepUninterruptibly(MigrationManager.instance.getMigrationTaskWaitInSeconds(), TimeUnit.SECONDS); + } + + if (checkForSchemaAgreement()) + return; + } + + retryCount++; + + // Abort startup if we still cant find an agreement after resending a request to each node + if (retryCount > 1) + throw new RuntimeException("Couldn't achieve schema agreement after waiting for, or resending schema requests to each node"); + + } + } + + public boolean checkForSchemaAgreement() + { + UUID localVersion = Schema.instance.getVersion(); + logger.debug("Local schema version: {}", Schema.schemaVersionToString(localVersion)); + for (InetAddress remote :Gossiper.instance.getLiveTokenOwners()) { - setMode(Mode.JOINING, "waiting for schema information to complete", true); - MigrationManager.waitUntilReadyForBootstrap(); + UUID remoteVersion = Gossiper.instance.getSchemaVersion(remote); + logger.debug("Remote node {}, schema version: {}", remote.toString(), Schema.schemaVersionToString(remoteVersion)); + if (!localVersion.equals(remoteVersion)) + return false; } + return true; } @VisibleForTesting