Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,14 @@ public boolean needsFiltering(Index.Group indexGroup, IndexHints indexHints)

private boolean isSupportedBy(Index.Group indexGroup, IndexHints indexHints, ColumnMetadata column)
{
for (Index index : indexGroup.getNotExcludedIndexes(indexHints))
for (Index index : indexGroup.getIndexes())
{
if (indexHints.excludes(index))
continue;

if (isSupportedBy(index, column))
return true;
}

return false;
}
Expand Down Expand Up @@ -680,7 +685,7 @@ public final void addToRowFilter(RowFilter.Builder filter,
throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions");
}
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,14 @@ public Index findSupportingIndex(IndexRegistry indexRegistry, IndexHints indexHi
@Override
public boolean needsFiltering(Index.Group indexGroup, IndexHints indexHints)
{
for (Index index : indexGroup.getNotExcludedIndexes(indexHints))
for (Index index : indexGroup.getIndexes())
{
if (indexHints.excludes(index))
continue;

if (isSupportedBy(index))
return false;
}

return true;
}
Expand Down
67 changes: 16 additions & 51 deletions src/java/org/apache/cassandra/db/filter/IndexHints.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QualifiedName;
import org.apache.cassandra.db.TypeSizes;
import org.apache.cassandra.exceptions.InvalidRequestException;
Expand All @@ -42,6 +43,7 @@
import org.apache.cassandra.schema.IndexMetadata;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.vint.VIntCoding;

import static java.lang.String.format;

Expand Down Expand Up @@ -71,12 +73,6 @@ public boolean includes(String indexName)
return false;
}

@Override
public Set<Index> includedIn(Collection<Index> indexes)
{
return Collections.emptySet();
}

@Override
public boolean includesAnyOf(Collection<Index> indexes)
{
Expand All @@ -95,12 +91,6 @@ public boolean excludes(String indexName)
return false;
}

@Override
public <T extends Index> Set<T> notExcluded(Iterable<T> indexes)
{
return Sets.newHashSet(indexes);
}

@Override
public void validate(@Nullable Index.QueryPlan queryPlan)
{
Expand Down Expand Up @@ -156,23 +146,6 @@ public boolean includes(String indexName)
return false;
}

/**
* Returns the indexes in the specified collection of indexes that are included by these hints.
*
* @param indexes a collection of indexes
* @return the indexes that are included by these hints
*/
public Set<Index> includedIn(Collection<Index> indexes)
{
Set<Index> result = new HashSet<>();
for (Index index : indexes)
{
if (includes(index))
result.add(index);
}
return result;
}

/**
* @param indexes a collection of indexes
* @return {@code true} if any of the indexes is included, {@code false} otherwise
Expand Down Expand Up @@ -210,21 +183,6 @@ public boolean excludes(String indexName)
return false;
}

/**
* @param indexes a set of indexes
* @return the indexes that are not excluded by these hints
*/
public <T extends Index> Set<T> notExcluded(Iterable<T> indexes)
{
Set<T> result = new HashSet<>();
for (T index : indexes)
{
if (!excludes(index))
result.add(index);
}
return result;
}

/**
* Returns the best of the specified indexes that satisfies the specified filter and is not excluded.
* The order of preference to determine whether an index is better than another is:
Expand Down Expand Up @@ -422,10 +380,10 @@ public static IndexHints fromCQLNames(Set<QualifiedName> included,
TableMetadata table,
IndexRegistry indexRegistry)
{
if (included != null && included.size() > Short.MAX_VALUE)
if (included != null && included.size() > maxIncludedOrExcludedIndexCount())
throw new InvalidRequestException(TOO_MANY_INDEXES_ERROR + included.size());

if (excluded != null && excluded.size() > Short.MAX_VALUE)
if (excluded != null && excluded.size() > maxIncludedOrExcludedIndexCount())
throw new InvalidRequestException(TOO_MANY_INDEXES_ERROR + excluded.size());

IndexHints hints = IndexHints.create(fetchIndexes(included, table, indexRegistry),
Expand All @@ -451,6 +409,14 @@ public static IndexHints fromCQLNames(Set<QualifiedName> included,
return hints;
}

private static int maxIncludedOrExcludedIndexCount()
{
int guardrail = DatabaseDescriptor.getGuardrailsConfig().getSecondaryIndexesPerTableFailThreshold();

// If no guardrail is configured, use a value that safely fits in a single byte for serialization:
return guardrail > 0 ? guardrail : 128;
}

private static Set<IndexMetadata> fetchIndexes(Set<QualifiedName> indexNames, TableMetadata table, IndexRegistry indexRegistry)
{
if (indexNames == null || indexNames.isEmpty())
Expand Down Expand Up @@ -626,16 +592,16 @@ private void serialize(Set<IndexMetadata> indexes, DataOutputPlus out, int versi
return;

int n = indexes.size();
assert n < Short.MAX_VALUE : TOO_MANY_INDEXES_ERROR + n;
assert n < maxIncludedOrExcludedIndexCount() : TOO_MANY_INDEXES_ERROR + n;

out.writeShort(n);
out.writeVInt32(n);
for (IndexMetadata index : indexes)
IndexMetadata.serializer.serialize(index, out, version);
}

private Set<IndexMetadata> deserialize(DataInputPlus in, int version, TableMetadata table) throws IOException
{
short n = in.readShort();
int n = (int) in.readVInt();
Set<IndexMetadata> indexes = new HashSet<>(n);
for (short i = 0; i < n; i++)
{
Expand All @@ -650,8 +616,7 @@ private long serializedSize(Set<IndexMetadata> indexes, int version)
if (indexes.isEmpty())
return 0;

long size = 0;
size += TypeSizes.SHORT_SIZE; // number of indexes
long size = VIntCoding.computeVIntSize(indexes.size());
for (IndexMetadata index : indexes)
size += IndexMetadata.serializer.serializedSize(index, version);
return size;
Expand Down
12 changes: 0 additions & 12 deletions src/java/org/apache/cassandra/index/Index.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.WriteContext;
import org.apache.cassandra.db.filter.IndexHints;
import org.apache.cassandra.db.filter.RowFilter;
import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
import org.apache.cassandra.db.marshal.AbstractType;
Expand Down Expand Up @@ -850,17 +849,6 @@ public int hashCode()
*/
Set<? extends Index> getIndexes();

/**
* Returns the indexes that are members of this group that are not excluded by the hints.
*
* @param hints the index hints with the indexes to exclude.
* @return the indexes that are members of this group that are not excluded by the hints.
*/
default Set<? extends Index> getNotExcludedIndexes(IndexHints hints)
{
return hints.notExcluded(getIndexes());
}

/**
* Adds the specified {@link Index} as a member of this group.
*
Expand Down
19 changes: 15 additions & 4 deletions src/java/org/apache/cassandra/index/IndexRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ public Collection<Index> listIndexes()
return Collections.emptyList();
}

@Override
public Collection<Index> listNotExcludedIndexes(IndexHints hints)
{
return Collections.emptyList();
}

@Override
public Collection<Index.Group> listIndexGroups()
{
Expand Down Expand Up @@ -306,6 +312,14 @@ public Collection<Index> listIndexes()
return Collections.singletonList(index);
}

@Override
public Collection<Index> listNotExcludedIndexes(IndexHints hints)
{
return hints.excludes(index)
? Collections.emptyList()
: Collections.singletonList(index);
}

@Override
public Collection<Index.Group> listIndexGroups()
{
Expand Down Expand Up @@ -349,10 +363,7 @@ default void registerIndex(Index index)
* @param hints the index hints with the indexes to exclude.
* @return the indexes in this registry that are not excluded by the hints.
*/
default Collection<Index> listNotExcludedIndexes(IndexHints hints)
{
return hints.notExcluded(listIndexes());
}
Collection<Index> listNotExcludedIndexes(IndexHints hints);

default Optional<Index.Analyzer> getAnalyzerFor(ColumnMetadata column, Operator operator, ByteBuffer value)
{
Expand Down
23 changes: 21 additions & 2 deletions src/java/org/apache/cassandra/index/SecondaryIndexManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ public class SecondaryIndexManager implements IndexRegistry, INotificationConsum
* The indexes that are available for querying.
*/
private final Set<String> queryableIndexes = Sets.newConcurrentHashSet();

/**
* The indexes that are available for writing.
*/
Expand Down Expand Up @@ -659,7 +659,7 @@ public void onSuccess(Object o)
* <p>
* If the index doesn't support ALL {@link Index.LoadType} it performs a recovery {@link Index#getRecoveryTaskSupport()}
* instead of a build {@link Index#getBuildTaskSupport()}
*
*
* @param sstables the SSTables to be (re)indexed
* @param indexes the indexes to be (re)built for the specifed SSTables
* @param isFullRebuild True if this method is invoked as a full index rebuild, false otherwise
Expand Down Expand Up @@ -1470,6 +1470,24 @@ public Collection<Index> listIndexes()
return ImmutableSet.copyOf(indexes.values());
}

@Override
public Collection<Index> listNotExcludedIndexes(IndexHints hints)
{
if (indexes.isEmpty())
return Collections.emptySet();

if (hints == IndexHints.NONE || hints.excluded.isEmpty())
return listIndexes();

ImmutableSet.Builder<Index> builder = ImmutableSet.builder();
for (Index index : indexes.values())
{
if (!hints.excludes(index))
builder.add(index);
}
return builder.build();
}

public Set<Index.Group> listIndexGroups()
{
return ImmutableSet.copyOf(indexGroups.values());
Expand Down Expand Up @@ -1973,4 +1991,5 @@ public void makeIndexQueryable(Index index, Index.Status status)
logger.info("Index [{}] became writable after successful build.", name);
}
}

}
25 changes: 19 additions & 6 deletions src/java/org/apache/cassandra/net/MessagingService.java
Original file line number Diff line number Diff line change
Expand Up @@ -798,13 +798,26 @@ public Set<InetAddressAndPort> endpointsWithConnectionsOnVersionBelow(String key
Set<InetAddressAndPort> nodes = new HashSet<>();
for (InetAddressAndPort node : StorageService.instance.getTokenMetadataForKeyspace(keyspace).getAllEndpoints())
{
ConnectionType.MESSAGING_TYPES.forEach(type -> {
OutboundConnections connections = getOutbound(node, false);
OutboundConnection connection = connections != null ? connections.connectionFor(type) : null;
if (connection != null && connection.messagingVersion() < version)
nodes.add(node);
});
if (hasConnectionWithVersionBelow(node, version))
nodes.add(node);
}
return nodes;
}

private boolean hasConnectionWithVersionBelow(InetAddressAndPort node, int version)
{
OutboundConnections connections = getOutbound(node, false);

if (connections == null)
return false;

for (ConnectionType type : ConnectionType.MESSAGING_TYPES)
{
OutboundConnection connection = connections.connectionFor(type);
if (connection != null && connection.messagingVersion() < version)
return true;
}

return false;
}
}