diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java index e550fb4c4..2d7eb3137 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/AsyncCassandraTemplate.java @@ -69,6 +69,7 @@ import com.datastax.oss.driver.api.core.DriverException; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.cql.AsyncResultSet; +import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.core.cql.Statement; @@ -94,6 +95,7 @@ * * @author Mark Paluch * @author John Blum + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.AsyncCassandraOperations * @since 2.0 */ @@ -122,7 +124,7 @@ public class AsyncCassandraTemplate * * @param session {@link CqlSession} used to interact with Cassandra; must not be {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public AsyncCassandraTemplate(CqlSession session) { this(session, newConverter()); @@ -136,7 +138,7 @@ public AsyncCassandraTemplate(CqlSession session) { * @param converter {@link CassandraConverter} used to convert between Java and Cassandra types; must not be * {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public AsyncCassandraTemplate(CqlSession session, CassandraConverter converter) { this(new DefaultSessionFactory(session), converter); @@ -150,7 +152,7 @@ public AsyncCassandraTemplate(CqlSession session, CassandraConverter converter) * @param converter {@link CassandraConverter} used to convert between Java and Cassandra types; must not be * {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public AsyncCassandraTemplate(SessionFactory sessionFactory, CassandraConverter converter) { this(new AsyncCqlTemplate(sessionFactory), converter); @@ -164,7 +166,7 @@ public AsyncCassandraTemplate(SessionFactory sessionFactory, CassandraConverter * @param converter {@link CassandraConverter} used to convert between Java and Cassandra types; must not be * {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public AsyncCassandraTemplate(AsyncCqlTemplate asyncCqlTemplate, CassandraConverter converter) { @@ -265,10 +267,14 @@ protected StatementFactory getStatementFactory() { return this.statementFactory; } - private CqlIdentifier getTableName(Class entityClass) { - return getEntityOperations().getTableName(entityClass); + private TableCoordinates constructTableCoordinates(Class entityClass) { + EntityOperations entityOperations = getEntityOperations(); + return TableCoordinates.of(entityOperations.getKeyspaceName(entityClass), + entityOperations.getTableName(entityClass)); } + + // ------------------------------------------------------------------------- // Methods dealing with static CQL // ------------------------------------------------------------------------- @@ -452,20 +458,22 @@ public ListenableFuture delete(Query query, Class entityClass) throw Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "Entity type must not be null"); - return doDelete(query, entityClass, getTableName(entityClass)); + return doDelete(query, entityClass, constructTableCoordinates(entityClass)); } - private ListenableFuture doDelete(Query query, Class entityClass, CqlIdentifier tableName) { + private ListenableFuture doDelete(Query query, Class entityClass, TableCoordinates tableCoordinates) { StatementBuilder builder = getStatementFactory().delete(query, getRequiredPersistentEntity(entityClass), - tableName); + tableCoordinates); SimpleStatement delete = builder.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableName)); + maybeEmitEvent(new BeforeDeleteEvent<>(delete, entityClass, tableCoordinates.getTableName())); ListenableFuture future = getAsyncCqlOperations().execute(delete); - future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableName)), e -> {}); + future.addCallback( + success -> maybeEmitEvent(new AfterDeleteEvent<>(delete, entityClass, tableCoordinates.getTableName())), + e -> {}); return future; } @@ -482,7 +490,7 @@ public ListenableFuture count(Class entityClass) { Assert.notNull(entityClass, "Entity type must not be null"); - return doCount(Query.empty(), entityClass, getTableName(entityClass)); + return doCount(Query.empty(), entityClass, constructTableCoordinates(entityClass)); } /* (non-Javadoc) @@ -494,13 +502,13 @@ public ListenableFuture count(Query query, Class entityClass) throws Da Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "Entity type must not be null"); - return doCount(query, entityClass, getTableName(entityClass)); + return doCount(query, entityClass, constructTableCoordinates(entityClass)); } - ListenableFuture doCount(Query query, Class entityClass, CqlIdentifier tableName) { + ListenableFuture doCount(Query query, Class entityClass, TableCoordinates tableCoordinates) { StatementBuilder countStatement = getStatementFactory() - .count(query, getRequiredPersistentEntity(entityClass), tableName); + .count(query, getRequiredPersistentEntity(entityClass), tableCoordinates); SimpleStatement statement = countStatement.build(); @@ -521,7 +529,7 @@ public ListenableFuture exists(Object id, Class entityClass) { CassandraPersistentEntity entity = getRequiredPersistentEntity(entityClass); StatementBuilder select = getStatementFactory() - .selectOneById(id, entity, entity.getTableName()); + .selectOneById(id, entity, TableCoordinates.of(entity)); return new MappingListenableFutureAdapter<>(getAsyncCqlOperations().queryForResultSet(select.build()), resultSet -> resultSet.one() != null); @@ -537,7 +545,7 @@ public ListenableFuture exists(Query query, Class entityClass) throw Assert.notNull(entityClass, "Entity type must not be null"); StatementBuilder select = getStatementFactory() - .select(query.limit(1), getRequiredPersistentEntity(entityClass), getTableName(entityClass)); + .select(query.limit(1), getRequiredPersistentEntity(entityClass), constructTableCoordinates(entityClass)); return new MappingListenableFutureAdapter<>(getAsyncCqlOperations().queryForResultSet(select.build()), resultSet -> resultSet.one() != null); @@ -554,7 +562,8 @@ public ListenableFuture selectOneById(Object id, Class entityClass) { CassandraPersistentEntity entity = getRequiredPersistentEntity(entityClass); CqlIdentifier tableName = entity.getTableName(); - StatementBuilder select = getStatementFactory().selectOneById(id, entity, + TableCoordinates.of(entity)); Function mapper = getMapper(entityClass, entityClass, tableName); return new MappingListenableFutureAdapter<>( @@ -579,38 +588,40 @@ public ListenableFuture> insert(T entity, InsertOptions Assert.notNull(entity, "Entity must not be null"); Assert.notNull(options, "InsertOptions must not be null"); - return doInsert(entity, options, getTableName(entity.getClass())); + return doInsert(entity, options, constructTableCoordinates(entity.getClass())); } - private ListenableFuture> doInsert(T entity, WriteOptions options, CqlIdentifier tableName) { + private ListenableFuture> doInsert(T entity, WriteOptions options, + TableCoordinates tableCoordinates) { - AdaptibleEntity source = getEntityOperations().forEntity(maybeCallBeforeConvert(entity, tableName), + AdaptibleEntity source = getEntityOperations().forEntity( + maybeCallBeforeConvert(entity, tableCoordinates.getTableName()), getConverter().getConversionService()); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entity; StatementBuilder builder = getStatementFactory().insert(entityToUse, options, persistentEntity, - tableName); + tableCoordinates); if (source.isVersionedEntity()) { builder.apply(Insert::ifNotExists); - return doInsertVersioned(builder.build(), entityToUse, source, tableName); + return doInsertVersioned(builder.build(), entityToUse, source, tableCoordinates); } - return doInsert(builder.build(), entityToUse, source, tableName); + return doInsert(builder.build(), entityToUse, source, tableCoordinates); } private ListenableFuture> doInsertVersioned(SimpleStatement insert, T entity, - AdaptibleEntity source, CqlIdentifier tableName) { + AdaptibleEntity source, TableCoordinates tableCoordinates) { - return executeSave(entity, tableName, insert, result -> { + return executeSave(entity, tableCoordinates, insert, result -> { if (!result.wasApplied()) { throw new OptimisticLockingFailureException( String.format("Cannot insert entity %s with version %s into table %s as it already exists", entity, - source.getVersion(), tableName)); + source.getVersion(), tableCoordinates.getTableName())); } }); } @@ -618,9 +629,9 @@ private ListenableFuture> doInsertVersioned(SimpleState @SuppressWarnings("unused") private ListenableFuture> doInsert(SimpleStatement insert, T entity, AdaptibleEntity source, - CqlIdentifier tableName) { + TableCoordinates tableCoordinates) { - return executeSave(entity, tableName, insert); + return executeSave(entity, tableCoordinates, insert); } /* (non-Javadoc) @@ -642,40 +653,41 @@ public ListenableFuture> update(T entity, UpdateOptions AdaptibleEntity source = getEntityOperations().forEntity(entity, getConverter().getConversionService()); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); - CqlIdentifier tableName = persistentEntity.getTableName(); + TableCoordinates tableCoordinates = TableCoordinates.of(persistentEntity); - T entityToUpdate = maybeCallBeforeConvert(entity, tableName); + T entityToUpdate = maybeCallBeforeConvert(entity, tableCoordinates.getTableName()); - return source.isVersionedEntity() ? doUpdateVersioned(entityToUpdate, options, tableName, persistentEntity) - : doUpdate(entityToUpdate, options, tableName, persistentEntity); + return source.isVersionedEntity() ? doUpdateVersioned(entityToUpdate, options, tableCoordinates, persistentEntity) + : doUpdate(entityToUpdate, options, tableCoordinates, persistentEntity); } private ListenableFuture> doUpdateVersioned(T entity, UpdateOptions options, - CqlIdentifier tableName, CassandraPersistentEntity persistentEntity) { + TableCoordinates tableCoordinates, CassandraPersistentEntity persistentEntity) { AdaptibleEntity source = getEntityOperations().forEntity(entity, getConverter().getConversionService()); Number previousVersion = source.getVersion(); T toSave = source.incrementVersion(); - StatementBuilder update = getStatementFactory().update(toSave, options, persistentEntity, tableName); + StatementBuilder update = getStatementFactory().update(toSave, options, persistentEntity, tableCoordinates); source.appendVersionCondition(update, previousVersion); - return executeSave(toSave, tableName, update.build(), result -> { + return executeSave(toSave, tableCoordinates, update.build(), result -> { if (!result.wasApplied()) { throw new OptimisticLockingFailureException( String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", toSave, - source.getVersion(), tableName)); + source.getVersion(), tableCoordinates)); } }); } - private ListenableFuture> doUpdate(T entity, UpdateOptions options, CqlIdentifier tableName, + private ListenableFuture> doUpdate(T entity, UpdateOptions options, + TableCoordinates tableCoordinates, CassandraPersistentEntity persistentEntity) { - StatementBuilder update = getStatementFactory().update(entity, options, persistentEntity, tableName); + StatementBuilder update = getStatementFactory().update(entity, options, persistentEntity, tableCoordinates); - return executeSave(entity, tableName, update.build()); + return executeSave(entity, tableCoordinates, update.build()); } /* (non-Javadoc) @@ -697,33 +709,34 @@ public ListenableFuture delete(Object entity, QueryOptions options) AdaptibleEntity source = getEntityOperations().forEntity(entity, getConverter().getConversionService()); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); - CqlIdentifier tableName = persistentEntity.getTableName(); + TableCoordinates tableCoordinates = TableCoordinates.of(persistentEntity); - return source.isVersionedEntity() ? doDeleteVersioned(entity, options, source, tableName) - : doDelete(entity, options, tableName); + return source.isVersionedEntity() ? doDeleteVersioned(entity, options, source, tableCoordinates) + : doDelete(entity, options, tableCoordinates); } private ListenableFuture doDeleteVersioned(Object entity, QueryOptions options, - AdaptibleEntity source, CqlIdentifier tableName) { + AdaptibleEntity source, TableCoordinates tableCoordinates) { - StatementBuilder delete = getStatementFactory().delete(entity, options, getConverter(), tableName); + StatementBuilder delete = getStatementFactory().delete(entity, options, getConverter(), tableCoordinates); ; - return executeDelete(entity, tableName, source.appendVersionCondition(delete).build(), result -> { + return executeDelete(entity, tableCoordinates, source.appendVersionCondition(delete).build(), result -> { if (!result.wasApplied()) { throw new OptimisticLockingFailureException( String.format("Cannot delete entity %s with version %s in table %s. Has it been modified meanwhile?", - entity, source.getVersion(), tableName)); + entity, source.getVersion(), tableCoordinates)); } }); } - private ListenableFuture doDelete(Object entity, QueryOptions options, CqlIdentifier tableName) { + private ListenableFuture doDelete(Object entity, QueryOptions options, + TableCoordinates tableCoordinates) { - StatementBuilder delete = getStatementFactory().delete(entity, options, getConverter(), tableName); + StatementBuilder delete = getStatementFactory().delete(entity, options, getConverter(), tableCoordinates); - return executeDelete(entity, tableName, delete.build(), result -> {}); + return executeDelete(entity, tableCoordinates, delete.build(), result -> {}); } /* (non-Javadoc) @@ -757,14 +770,17 @@ public ListenableFuture truncate(Class entityClass) { Assert.notNull(entityClass, "Entity type must not be null"); - CqlIdentifier tableName = getTableName(entityClass); - Truncate truncate = QueryBuilder.truncate(tableName); + TableCoordinates tableCoordinates = constructTableCoordinates(entityClass); + Truncate truncate = QueryBuilder.truncate(tableCoordinates.getKeyspaceName().orElse(null), + tableCoordinates.getTableName()); SimpleStatement statement = truncate.build(); - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName)); + maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableCoordinates.getTableName())); ListenableFuture future = getAsyncCqlOperations().execute(statement); - future.addCallback(success -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName)), e -> {}); + future.addCallback( + success -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableCoordinates.getTableName())), + e -> {}); return new MappingListenableFutureAdapter<>(future, aBoolean -> null); } @@ -773,17 +789,17 @@ public ListenableFuture truncate(Class entityClass) { // Implementation hooks and utility methods // ------------------------------------------------------------------------- - private ListenableFuture> executeSave(T entity, CqlIdentifier tableName, + private ListenableFuture> executeSave(T entity, TableCoordinates tableCoordinates, SimpleStatement statement) { - return executeSave(entity, tableName, statement, ignore -> {}); + return executeSave(entity, tableCoordinates, statement, ignore -> {}); } - private ListenableFuture> executeSave(T entity, CqlIdentifier tableName, + private ListenableFuture> executeSave(T entity, TableCoordinates tableCoordinates, SimpleStatement statement, Consumer beforeAfterSaveEvent) { - - maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement)); - T entityToSave = maybeCallBeforeSave(entity, tableName, statement); + // todo leverage getKeyspaceName() + maybeEmitEvent(new BeforeSaveEvent<>(entity, tableCoordinates.getTableName(), statement)); + T entityToSave = maybeCallBeforeSave(entity, tableCoordinates.getTableName(), statement); ListenableFuture result = getAsyncCqlOperations().execute(new AsyncStatementCallback(statement)); @@ -795,16 +811,17 @@ private ListenableFuture> executeSave(T entity, CqlIden beforeAfterSaveEvent.accept(writeResult); - maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName)); + maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableCoordinates.getTableName())); return writeResult; }); } - private ListenableFuture executeDelete(Object entity, CqlIdentifier tableName, SimpleStatement statement, + private ListenableFuture executeDelete(Object entity, TableCoordinates tableCoordinates, + SimpleStatement statement, Consumer resultConsumer) { - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableCoordinates.getTableName())); ListenableFuture result = getAsyncCqlOperations().execute(new AsyncStatementCallback(statement)); @@ -815,7 +832,7 @@ private ListenableFuture executeDelete(Object entity, CqlIdentifier resultConsumer.accept(writeResult); - maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableCoordinates.getTableName())); return writeResult; }); diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java index fceafd546..759bc052a 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraBatchTemplate.java @@ -31,6 +31,7 @@ import com.datastax.oss.driver.api.core.cql.BatchStatementBuilder; import com.datastax.oss.driver.api.core.cql.BatchType; import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.datastax.oss.driver.api.core.cql.Statement; /** * Default implementation for {@link CassandraBatchOperations}. @@ -38,6 +39,7 @@ * @author Mark Paluch * @author John Blum * @author Anup Sabbi + * @author Tomasz Lelek * @since 1.5 */ class CassandraBatchTemplate implements CassandraBatchOperations { @@ -168,7 +170,7 @@ public CassandraBatchOperations insert(Iterable entities, WriteOptions option .getRequiredPersistentEntity(entity.getClass()); SimpleStatement insertQuery = getStatementFactory() - .insert(entity, options, persistentEntity, persistentEntity.getTableName()).build(); + .insert(entity, options, persistentEntity, TableCoordinates.of(persistentEntity)).build(); this.batch.addStatement(insertQuery); } @@ -213,7 +215,7 @@ public CassandraBatchOperations update(Iterable entities, WriteOptions option CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); SimpleStatement update = getStatementFactory() - .update(entity, options, persistentEntity, persistentEntity.getTableName()).build(); + .update(entity, options, persistentEntity, TableCoordinates.of(persistentEntity)).build(); this.batch.addStatement(update); } @@ -258,7 +260,7 @@ public CassandraBatchOperations delete(Iterable entities, WriteOptions option CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); SimpleStatement delete = getStatementFactory() - .delete(entity, options, this.getConverter(), persistentEntity.getTableName()).build(); + .delete(entity, options, this.getConverter(), TableCoordinates.of(persistentEntity)).build(); this.batch.addStatement(delete); } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraOperations.java index e457fad6f..5a08956c0 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraOperations.java @@ -17,6 +17,7 @@ import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.stream.Stream; import org.springframework.dao.DataAccessException; @@ -41,6 +42,7 @@ * @author David Webb * @author Matthew Adams * @author Mark Paluch + * @author Tomasz Lelek * @see CassandraTemplate * @see CqlOperations * @see Statement @@ -80,6 +82,16 @@ public interface CassandraOperations extends FluentCassandraOperations { */ CqlIdentifier getTableName(Class entityClass); + + /** + * The keyspace used for the specified class by this template. + * + * @param entityClass entity class, may be {@literal null}. + * @return the {@link Optional } the keyspace to which the entity shall be persisted. If null, then + * default session-level keyspace will be used. + */ + Optional getKeyspaceName(Class entityClass); + // ------------------------------------------------------------------------- // Methods dealing with static CQL // ------------------------------------------------------------------------- diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java index a8a046c65..23aca5c11 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/CassandraTemplate.java @@ -16,11 +16,13 @@ package org.springframework.data.cassandra.core; import java.util.List; +import java.util.Optional; import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import org.jetbrains.annotations.NotNull; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; @@ -95,6 +97,7 @@ * @author Mark Paluch * @author John Blum * @author Lukasz Antoniak + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.CassandraOperations * @since 2.0 */ @@ -122,7 +125,7 @@ public class CassandraTemplate implements CassandraOperations, ApplicationEventP * * @param session {@link CqlSession} used to interact with Cassandra; must not be {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public CassandraTemplate(CqlSession session) { this(session, newConverter()); @@ -136,7 +139,7 @@ public CassandraTemplate(CqlSession session) { * @param converter {@link CassandraConverter} used to convert between Java and Cassandra types; must not be * {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public CassandraTemplate(CqlSession session, CassandraConverter converter) { this(new DefaultSessionFactory(session), converter); @@ -164,7 +167,7 @@ public CassandraTemplate(SessionFactory sessionFactory, CassandraConverter conve * @param converter {@link CassandraConverter} used to convert between Java and Cassandra types; must not be * {@literal null}. * @see CassandraConverter - * @see Session + * @see CqlSession */ public CassandraTemplate(CqlOperations cqlOperations, CassandraConverter converter) { @@ -281,6 +284,18 @@ public CqlIdentifier getTableName(Class entityClass) { return getEntityOperations().getTableName(entityClass); } + /* (non-Javadoc) + * @see org.springframework.data.cassandra.core.CassandraOperations#getKeyspaceName(java.lang.Class) + */ + @Override + public Optional getKeyspaceName(Class entityClass) { + return getEntityOperations().getKeyspaceName(entityClass); + } + + private TableCoordinates getTableCoordinates(Class entityClass){ + return TableCoordinates.of(getKeyspaceName(entityClass), getTableName(entityClass)); + } + // ------------------------------------------------------------------------- // Methods dealing with static CQL // ------------------------------------------------------------------------- @@ -393,10 +408,11 @@ public List select(Query query, Class entityClass) throws DataAccessEx Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "Entity type must not be null"); - return doSelect(query, entityClass, getTableName(entityClass), entityClass); + return doSelect(query, entityClass, getTableCoordinates(entityClass), entityClass); } - List doSelect(Query query, Class entityClass, CqlIdentifier tableName, Class returnType) { + List doSelect(Query query, Class entityClass, TableCoordinates tableCoordinates, + Class returnType) { CassandraPersistentEntity entity = getRequiredPersistentEntity(entityClass); @@ -404,9 +420,9 @@ List doSelect(Query query, Class entityClass, CqlIdentifier tableName, Query queryToUse = query.columns(columns); - StatementBuilder select = getStatementFactory().select(queryToUse, entity, tableCoordinates); - Function mapper = getMapper(entityClass, returnType, tableName); + Function mapper = getMapper(entityClass, returnType, tableCoordinates.getTableName()); return getCqlOperations().query(select.build(), (row, rowNum) -> mapper.apply(row)); } @@ -445,17 +461,18 @@ public Stream stream(Query query, Class entityClass) throws DataAccess Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "Entity type must not be null"); - return doStream(query, entityClass, getTableName(entityClass), entityClass); + return doStream(query, entityClass, getTableCoordinates(entityClass), entityClass); } - Stream doStream(Query query, Class entityClass, CqlIdentifier tableName, Class returnType) { + Stream doStream(Query query, Class entityClass, TableCoordinates tableCoordinates, + Class returnType) { StatementBuilder countStatement = getStatementFactory().count(query, - getRequiredPersistentEntity(entityClass), tableName); + getRequiredPersistentEntity(entityClass), tableCoordinates); SimpleStatement statement = countStatement.build(); Long count = getCqlOperations().queryForObject(statement, Long.class); @@ -564,11 +581,12 @@ public boolean exists(Object id, Class entityClass) { Assert.notNull(entityClass, "Entity type must not be null"); CassandraPersistentEntity entity = getRequiredPersistentEntity(entityClass); - StatementBuilder select = getStatementFactory().selectOneById(id, entity, TableCoordinates.of(entity)); return getCqlOperations().queryForResultSet(select.build()).one() != null; } + /* (non-Javadoc) * @see org.springframework.data.cassandra.core.CassandraOperations#exists(org.springframework.data.cassandra.core.query.Query, java.lang.Class) */ @@ -578,13 +596,13 @@ public boolean exists(Query query, Class entityClass) throws DataAccessExcept Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "Entity type must not be null"); - return doExists(query, entityClass, getTableName(entityClass)); + return doExists(query, entityClass, getTableCoordinates(entityClass)); } - boolean doExists(Query query, Class entityClass, CqlIdentifier tableName) { + boolean doExists(Query query, Class entityClass, TableCoordinates tableCoordinates) { StatementBuilder select = getStatementFactory().selectOneById(id, entity, tableName); - Function mapper = getMapper(entityClass, entityClass, tableName); + TableCoordinates tableCoordinates = TableCoordinates.of(entity); + StatementBuilder select = getStatementFactory().select(queryToUse, persistentEntity, tableName); + StatementBuilder count = getStatementFactory().count(query, getRequiredPersistentEntity(entityClass), - tableName); + tableCoordinates); return getReactiveCqlOperations().queryForObject(count.build(), Long.class).switchIfEmpty(Mono.just(0L)); } @@ -504,7 +515,7 @@ public Mono exists(Object id, Class entityClass) { Assert.notNull(entityClass, "Entity type must not be null"); CassandraPersistentEntity entity = getRequiredPersistentEntity(entityClass); - StatementBuilder builder = getStatementFactory().selectOneById(id, entity, TableCoordinates.of(entity)); return getReactiveCqlOperations().queryForRows(builder.build()).hasElements(); } @@ -518,13 +529,13 @@ public Mono exists(Query query, Class entityClass) throws DataAccess Assert.notNull(query, "Query must not be null"); Assert.notNull(entityClass, "Entity type must not be null"); - return doExists(query, entityClass, getTableName(entityClass)); + return doExists(query, entityClass, getTableCoordinates(entityClass)); } - Mono doExists(Query query, Class entityClass, CqlIdentifier tableName) { + Mono doExists(Query query, Class entityClass, TableCoordinates tableCoordinates) { StatementBuilder builder = getStatementFactory().selectOneById(id, getRequiredPersistentEntity(entityClass), - getTableName(entityClass)); + getTableCoordinates(entityClass)); return selectOne(builder.build(), entityClass); } @@ -561,12 +572,12 @@ public Mono> insert(T entity, InsertOptions options) { Assert.notNull(entity, "Entity must not be null"); Assert.notNull(options, "InsertOptions must not be null"); - return doInsert(entity, options, getTableName(entity.getClass())); + return doInsert(entity, options, getTableCoordinates(entity.getClass())); } - Mono> doInsert(T entity, WriteOptions options, CqlIdentifier tableName) { + Mono> doInsert(T entity, WriteOptions options, TableCoordinates tableCoordinates) { - return maybeCallBeforeConvert(entity, tableName).flatMap(entityToInsert -> { + return maybeCallBeforeConvert(entity, tableCoordinates.getTableName()).flatMap(entityToInsert -> { AdaptibleEntity source = this.entityOperations.forEntity(entityToInsert, getConverter().getConversionService()); @@ -575,27 +586,27 @@ Mono> doInsert(T entity, WriteOptions options, CqlIdent T entityToUse = source.isVersionedEntity() ? source.initializeVersionProperty() : entityToInsert; StatementBuilder builder = getStatementFactory().insert(entityToUse, options, persistentEntity, - tableName); + tableCoordinates); if (source.isVersionedEntity()) { builder.apply(Insert::ifNotExists); - return doInsertVersioned(builder.build(), entityToUse, source, tableName); + return doInsertVersioned(builder.build(), entityToUse, source, tableCoordinates); } - return doInsert(builder.build(), entityToUse, tableName); + return doInsert(builder.build(), entityToUse, tableCoordinates); }); } private Mono> doInsertVersioned(SimpleStatement insert, T entity, AdaptibleEntity source, - CqlIdentifier tableName) { + TableCoordinates tableCoordinates) { - return executeSave(entity, tableName, insert, (result, sink) -> { + return executeSave(entity, tableCoordinates, insert, (result, sink) -> { if (!result.wasApplied()) { sink.error(new OptimisticLockingFailureException( String.format("Cannot insert entity %s with version %s into table %s as it already exists", entity, - source.getVersion(), tableName))); + source.getVersion(), tableCoordinates.getTableName()))); return; } @@ -604,8 +615,8 @@ private Mono> doInsertVersioned(SimpleStatement insert, }); } - private Mono> doInsert(SimpleStatement insert, T entity, CqlIdentifier tableName) { - return executeSave(entity, tableName, insert); + private Mono> doInsert(SimpleStatement insert, T entity, TableCoordinates tableCoordinates) { + return executeSave(entity, tableCoordinates, insert); } /* (non-Javadoc) @@ -627,15 +638,15 @@ public Mono> update(T entity, UpdateOptions options) { AdaptibleEntity source = this.entityOperations.forEntity(entity, getConverter().getConversionService()); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); - CqlIdentifier tableName = persistentEntity.getTableName(); + TableCoordinates tableCoordinates = TableCoordinates.of(persistentEntity); - return maybeCallBeforeConvert(entity, tableName).flatMap(entityToUpdate -> { - return source.isVersionedEntity() ? doUpdateVersioned(entity, options, tableName, persistentEntity) - : doUpdate(entity, options, tableName, persistentEntity); + return maybeCallBeforeConvert(entity, tableCoordinates.getTableName()).flatMap(entityToUpdate -> { + return source.isVersionedEntity() ? doUpdateVersioned(entity, options, tableCoordinates, persistentEntity) + : doUpdate(entity, options, tableCoordinates, persistentEntity); }); } - private Mono> doUpdateVersioned(T entity, UpdateOptions options, CqlIdentifier tableName, + private Mono> doUpdateVersioned(T entity, UpdateOptions options, TableCoordinates tableCoordinates, CassandraPersistentEntity persistentEntity) { AdaptibleEntity source = getEntityOperations().forEntity(entity, getConverter().getConversionService()); @@ -643,16 +654,16 @@ private Mono> doUpdateVersioned(T entity, UpdateOptions Number previousVersion = source.getVersion(); T toSave = source.incrementVersion(); - StatementBuilder builder = getStatementFactory().update(toSave, options, persistentEntity, tableName); + StatementBuilder builder = getStatementFactory().update(toSave, options, persistentEntity, tableCoordinates); SimpleStatement update = source.appendVersionCondition(builder, previousVersion).build(); - return executeSave(toSave, tableName, update, (result, sink) -> { + return executeSave(toSave, tableCoordinates, update, (result, sink) -> { if (!result.wasApplied()) { sink.error(new OptimisticLockingFailureException( String.format("Cannot save entity %s with version %s to table %s. Has it been modified meanwhile?", toSave, - source.getVersion(), tableName))); + source.getVersion(), tableCoordinates.getTableName()))); return; } @@ -661,12 +672,12 @@ private Mono> doUpdateVersioned(T entity, UpdateOptions }); } - private Mono> doUpdate(T entity, UpdateOptions options, CqlIdentifier tableName, + private Mono> doUpdate(T entity, UpdateOptions options, TableCoordinates tableCoordinates, CassandraPersistentEntity persistentEntity) { - StatementBuilder builder = getStatementFactory().update(entity, options, persistentEntity, tableName); + StatementBuilder builder = getStatementFactory().update(entity, options, persistentEntity, tableCoordinates); - return executeSave(entity, tableName, builder.build()); + return executeSave(entity, tableCoordinates, builder.build()); } /* (non-Javadoc) @@ -688,25 +699,25 @@ public Mono delete(Object entity, QueryOptions options) { AdaptibleEntity source = this.entityOperations.forEntity(entity, getConverter().getConversionService()); CassandraPersistentEntity persistentEntity = getRequiredPersistentEntity(entity.getClass()); - CqlIdentifier tableName = persistentEntity.getTableName(); + TableCoordinates tableCoordinates = TableCoordinates.of(persistentEntity); - StatementBuilder builder = getStatementFactory().delete(entity, options, getConverter(), tableName); + StatementBuilder builder = getStatementFactory().delete(entity, options, getConverter(), tableCoordinates); return source.isVersionedEntity() - ? doDeleteVersioned(source.appendVersionCondition(builder).build(), entity, source, tableName) - : doDelete(builder.build(), entity, tableName); + ? doDeleteVersioned(source.appendVersionCondition(builder).build(), entity, source, tableCoordinates) + : doDelete(builder.build(), entity, tableCoordinates); } private Mono doDeleteVersioned(SimpleStatement delete, Object entity, AdaptibleEntity source, - CqlIdentifier tableName) { + TableCoordinates tableCoordinates) { - return executeDelete(entity, tableName, delete, (result, sink) -> { + return executeDelete(entity, tableCoordinates, delete, (result, sink) -> { if (!result.wasApplied()) { sink.error(new OptimisticLockingFailureException( String.format("Cannot delete entity %s with version %s in table %s. Has it been modified meanwhile?", - entity, source.getVersion(), tableName))); + entity, source.getVersion(), tableCoordinates.getTableName()))); return; } @@ -715,8 +726,8 @@ private Mono doDeleteVersioned(SimpleStatement delete, Object entit }); } - private Mono doDelete(SimpleStatement delete, Object entity, CqlIdentifier tableName) { - return executeDelete(entity, tableName, delete, (result, sink) -> sink.next(result)); + private Mono doDelete(SimpleStatement delete, Object entity, TableCoordinates tableCoordinates) { + return executeDelete(entity, tableCoordinates, delete, (result, sink) -> sink.next(result)); } /* (non-Javadoc) @@ -748,14 +759,14 @@ public Mono truncate(Class entityClass) { Assert.notNull(entityClass, "Entity type must not be null"); - CqlIdentifier tableName = getTableName(entityClass); - Truncate truncate = QueryBuilder.truncate(tableName); + TableCoordinates tableCoordinates = getTableCoordinates(entityClass); + Truncate truncate = QueryBuilder.truncate(tableCoordinates.getKeyspaceName().orElse(null), tableCoordinates.getTableName()); SimpleStatement statement = truncate.build(); Mono result = getReactiveCqlOperations().execute(statement) - .doOnSubscribe(it -> maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableName))); + .doOnSubscribe(it -> maybeEmitEvent(new BeforeDeleteEvent<>(statement, entityClass, tableCoordinates.getTableName()))); - return result.doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableName))).then(); + return result.doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entityClass, tableCoordinates.getTableName()))).then(); } // ------------------------------------------------------------------------- @@ -798,37 +809,37 @@ public ReactiveUpdate update(Class domainType) { // Implementation hooks and utility methods // ------------------------------------------------------------------------- - private Mono> executeSave(T entity, CqlIdentifier tableName, SimpleStatement statement) { - return executeSave(entity, tableName, statement, (writeResult, sink) -> sink.next(writeResult)); + private Mono> executeSave(T entity, TableCoordinates tableCoordinates, SimpleStatement statement) { + return executeSave(entity, tableCoordinates, statement, (writeResult, sink) -> sink.next(writeResult)); } - private Mono> executeSave(T entity, CqlIdentifier tableName, SimpleStatement statement, + private Mono> executeSave(T entity, TableCoordinates tableCoordinates, SimpleStatement statement, BiConsumer, SynchronousSink>> handler) { return Mono.defer(() -> { - maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement)); + maybeEmitEvent(new BeforeSaveEvent<>(entity, tableCoordinates.getTableName(), statement)); - return maybeCallBeforeSave(entity, tableName, statement).flatMapMany(entityToSave -> { + return maybeCallBeforeSave(entity, tableCoordinates.getTableName(), statement).flatMapMany(entityToSave -> { Flux execute = getReactiveCqlOperations().execute(new StatementCallback(statement)); return execute.map(it -> EntityWriteResult.of(it, entityToSave)).handle(handler) // - .doOnNext(it -> maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableName))); + .doOnNext(it -> maybeEmitEvent(new AfterSaveEvent<>(entityToSave, tableCoordinates.getTableName()))); }).next(); }); } - private Mono executeDelete(Object entity, CqlIdentifier tableName, SimpleStatement statement, + private Mono executeDelete(Object entity, TableCoordinates tableCoordinates, SimpleStatement statement, BiConsumer> handler) { - maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableName)); + maybeEmitEvent(new BeforeDeleteEvent<>(statement, entity.getClass(), tableCoordinates.getTableName())); Flux execute = getReactiveCqlOperations().execute(new StatementCallback(statement)); return execute.map(it -> EntityWriteResult.of(it, entity)).handle(handler) // - .doOnSubscribe(it -> maybeEmitEvent(new BeforeSaveEvent<>(entity, tableName, statement))) // - .doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableName))) // + .doOnSubscribe(it -> maybeEmitEvent(new BeforeSaveEvent<>(entity, tableCoordinates.getTableName(), statement))) // + .doOnNext(it -> maybeEmitEvent(new AfterDeleteEvent<>(statement, entity.getClass(), tableCoordinates.getTableName()))) // .next(); } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperation.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperation.java index e2ffcaeaf..40154cdd6 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperation.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperation.java @@ -42,6 +42,7 @@ * * @author Mark Paluch * @author John Blum + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.query.Query * @since 2.1 */ @@ -92,6 +93,23 @@ default DeleteWithQuery inTable(String table) { * @see DeleteWithQuery */ DeleteWithQuery inTable(CqlIdentifier table); + + /** + * Explicitly set the {@link CqlIdentifier keyspace} and the {@link CqlIdentifier name} of the table on which to + * perform the delete. + *

+ * Skip this step to use the default table derived from the {@link Class domain type}. Skip this step to use the + * default session-level keyspace. + * + * @param table {@link CqlIdentifier name} of the table; must not be {@literal null}. + * @param keyspace {@link CqlIdentifier keyspace} of the table; if set to {@literal null}, the default session-level + * keyspace will be used. + * @return new instance of {@link DeleteWithQuery}. + * @throws IllegalArgumentException if {@link CqlIdentifier table} is {@literal null}. + * @see com.datastax.oss.driver.api.core.CqlIdentifier + * @see DeleteWithQuery + */ + DeleteWithQuery inTable(CqlIdentifier keyspace, CqlIdentifier table); } /** diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperationSupport.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperationSupport.java index 487fed8bf..1e136fd45 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperationSupport.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveDeleteOperationSupport.java @@ -17,6 +17,8 @@ import reactor.core.publisher.Mono; +import java.util.Optional; + import org.springframework.data.cassandra.core.query.Query; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -47,7 +49,7 @@ public ReactiveDelete delete(Class domainType) { Assert.notNull(domainType, "DomainType must not be null"); - return new ReactiveDeleteSupport(this.template, domainType, Query.empty(), null); + return new ReactiveDeleteSupport(this.template, domainType, Query.empty(), null, null); } static class ReactiveDeleteSupport implements ReactiveDelete, TerminatingDelete { @@ -58,25 +60,37 @@ static class ReactiveDeleteSupport implements ReactiveDelete, TerminatingDelete private final Query query; + private final @Nullable CqlIdentifier keyspaceName; private final @Nullable CqlIdentifier tableName; public ReactiveDeleteSupport(ReactiveCassandraTemplate template, Class domainType, Query query, - CqlIdentifier tableName) { + @Nullable CqlIdentifier keyspaceName, @Nullable CqlIdentifier tableName) { this.template = template; this.domainType = domainType; this.query = query; + this.keyspaceName = keyspaceName; this.tableName = tableName; } /* (non-Javadoc) - * @see org.springframework.data.cassandra.core.ReactiveDeleteOperation.DeleteWithTable#inTable(org.springframework.data.cassandra.core.cql.CqlIdentifier) + * @see org.springframework.data.cassandra.core.ReactiveDeleteOperation.DeleteWithTable#inTable( com.datastax.oss.driver.api.core.CqlIdentifier) */ @Override public DeleteWithQuery inTable(CqlIdentifier tableName) { Assert.notNull(tableName, "Table name must not be null"); - return new ReactiveDeleteSupport(this.template, this.domainType, this.query, tableName); + return new ReactiveDeleteSupport(this.template, this.domainType, this.query, null, tableName); + } + + /* (non-Javadoc) + * @see org.springframework.data.cassandra.core.ReactiveDeleteOperation.DeleteWithTable#inTable(com.datastax.oss.driver.api.core.CqlIdentifier,com.datastax.oss.driver.api.core.CqlIdentifier) + */ + @Override + public DeleteWithQuery inTable(CqlIdentifier keyspaceName, CqlIdentifier tableName) { + Assert.notNull(tableName, "Table name must not be null"); + + return new ReactiveDeleteSupport(this.template, this.domainType, this.query, keyspaceName, tableName); } /* (non-Javadoc) @@ -87,18 +101,28 @@ public TerminatingDelete matching(Query query) { Assert.notNull(query, "Query must not be null"); - return new ReactiveDeleteSupport(this.template, this.domainType, query, this.tableName); + return new ReactiveDeleteSupport(this.template, this.domainType, query, this.keyspaceName, this.tableName); } /* (non-Javadoc) * @see org.springframework.data.cassandra.core.ReactiveDeleteOperation.TerminatingDelete#all() */ public Mono all() { - return this.template.doDelete(this.query, this.domainType, getTableName()); + return this.template.doDelete(this.query, this.domainType, getTableCoordinates()); } private CqlIdentifier getTableName() { return this.tableName != null ? this.tableName : this.template.getTableName(this.domainType); } + + private Optional getKeyspaceName() { + return this.keyspaceName != null ? Optional.of(this.keyspaceName) + : this.template.getKeyspaceName(this.domainType); + } + + private TableCoordinates getTableCoordinates() { + return TableCoordinates.of(getKeyspaceName(), getTableName()); + } + } } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperation.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperation.java index 567756ce8..41a4a7f4a 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperation.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperation.java @@ -39,6 +39,7 @@ * * @author Mark Paluch * @author John Blum + * @author Tomasz Lelek * @since 2.1 */ public interface ReactiveInsertOperation { @@ -90,6 +91,24 @@ default InsertWithOptions inTable(String table) { */ InsertWithOptions inTable(CqlIdentifier table); + + /** + * Explicitly set the {@link CqlIdentifier keyspace} and the {@link CqlIdentifier name} of the table on which to + * perform the insert. + *

+ * Skip this step to use the default table derived from the {@link Class domain type}. Skip this step to use the + * default session-level keyspace. + * + * @param table {@link CqlIdentifier name} of the table; must not be {@literal null}. + * @param keyspace {@link CqlIdentifier keyspace} of the table; if set to {@literal null}, the default session-level + * keyspace will be used. + * @return new instance of {@link InsertWithOptions}. + * @throws IllegalArgumentException if {@link CqlIdentifier table} is {@literal null}. + * @see com.datastax.oss.driver.api.core.CqlIdentifier + * @see InsertWithOptions + */ + InsertWithOptions inTable(CqlIdentifier keyspace, CqlIdentifier table); + } /** diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperationSupport.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperationSupport.java index 1a134f1e6..3dd57c66a 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperationSupport.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveInsertOperationSupport.java @@ -17,6 +17,8 @@ import reactor.core.publisher.Mono; +import java.util.Optional; + import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -26,6 +28,7 @@ * Implementation of {@link ReactiveInsertOperation}. * * @author Mark Paluch + * @author Tomasz Lelek * @since 2.1 */ class ReactiveInsertOperationSupport implements ReactiveInsertOperation { @@ -44,7 +47,7 @@ public ReactiveInsert insert(Class domainType) { Assert.notNull(domainType, "DomainType must not be null"); - return new ReactiveInsertSupport<>(this.template, domainType, InsertOptions.empty(), null); + return new ReactiveInsertSupport<>(this.template, domainType, InsertOptions.empty(), null, null); } static class ReactiveInsertSupport implements ReactiveInsert { @@ -55,25 +58,37 @@ static class ReactiveInsertSupport implements ReactiveInsert { private final InsertOptions insertOptions; + private final @Nullable CqlIdentifier keyspaceName; private final @Nullable CqlIdentifier tableName; public ReactiveInsertSupport(ReactiveCassandraTemplate template, Class domainType, InsertOptions insertOptions, - CqlIdentifier tableName) { + @Nullable CqlIdentifier keyspaceName, @Nullable CqlIdentifier tableName) { this.template = template; this.domainType = domainType; this.insertOptions = insertOptions; + this.keyspaceName = keyspaceName; this.tableName = tableName; } /* (non-Javadoc) - * @see org.springframework.data.cassandra.core.ReactiveInsertOperation.InsertWithTable#inTable(org.springframework.data.cassandra.core.cql.CqlIdentifier) + * @see org.springframework.data.cassandra.core.ReactiveInsertOperation.InsertWithTable#inTable(com.datastax.oss.driver.api.core.CqlIdentifier) */ @Override public InsertWithOptions inTable(CqlIdentifier tableName) { Assert.notNull(tableName, "Table name must not be null"); - return new ReactiveInsertSupport<>(this.template, this.domainType, this.insertOptions, tableName); + return new ReactiveInsertSupport<>(this.template, this.domainType, this.insertOptions, null, tableName); + } + + /* (non-Javadoc) + * @see org.springframework.data.cassandra.core.ReactiveInsertOperation.InsertWithTable#inTable(com.datastax.oss.driver.api.core.CqlIdentifier. com.datastax.oss.driver.api.core.CqlIdentifier) + */ + @Override + public InsertWithOptions inTable(CqlIdentifier keyspaceName, CqlIdentifier tableName) { + Assert.notNull(tableName, "Table name must not be null"); + + return new ReactiveInsertSupport<>(this.template, this.domainType, this.insertOptions, keyspaceName, tableName); } /* (non-Javadoc) @@ -84,7 +99,8 @@ public TerminatingInsert withOptions(InsertOptions insertOptions) { Assert.notNull(insertOptions, "InsertOptions must not be null"); - return new ReactiveInsertSupport<>(this.template, this.domainType, insertOptions, this.tableName); + return new ReactiveInsertSupport<>(this.template, this.domainType, insertOptions, this.keyspaceName, + this.tableName); } /* (non-Javadoc) @@ -95,11 +111,20 @@ public Mono> one(T object) { Assert.notNull(object, "Object must not be null"); - return this.template.doInsert(object, this.insertOptions, getTableName()); + return this.template.doInsert(object, this.insertOptions, getTableCoordinates()); } private CqlIdentifier getTableName() { return this.tableName != null ? this.tableName : this.template.getTableName(this.domainType); } + + private Optional getKeyspaceName() { + return this.keyspaceName != null ? Optional.of(this.keyspaceName) + : this.template.getKeyspaceName(this.domainType); + } + + private TableCoordinates getTableCoordinates() { + return TableCoordinates.of(getKeyspaceName(), getTableName()); + } } } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperation.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperation.java index f9ce706fe..5d6dd677a 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperation.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperation.java @@ -15,6 +15,8 @@ */ package org.springframework.data.cassandra.core; +import org.springframework.data.cassandra.core.mapping.Embedded; +import org.springframework.lang.Nullable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -48,6 +50,7 @@ * * @author Mark Paluch * @author John Blum + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.query.Query * @since 2.1 */ @@ -100,6 +103,23 @@ default SelectWithProjection inTable(String table) { */ SelectWithProjection inTable(CqlIdentifier table); + /** + * Explicitly set the {@link CqlIdentifier keyspace} and the {@link CqlIdentifier name} of the table on which to + * perform the query. + *

+ * Skip this step to use the default table derived from the {@link Class domain type}. Skip this step to use the + * default session-level keyspace. + * + * @param table {@link CqlIdentifier name} of the table; must not be {@literal null}. + * @param keyspace {@link CqlIdentifier keyspace} of the table; if set to {@literal null}, the default session-level + * keyspace will be used. + * @return new instance of {@link SelectWithProjection}. + * @throws IllegalArgumentException if {@link CqlIdentifier table} is {@literal null}. + * @see com.datastax.oss.driver.api.core.CqlIdentifier + * @see SelectWithProjection + */ + SelectWithProjection inTable(@Nullable CqlIdentifier keyspace, CqlIdentifier table); + } /** diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperationSupport.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperationSupport.java index 26d502a4d..8ceee2719 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperationSupport.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveSelectOperationSupport.java @@ -18,6 +18,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import java.util.Optional; + import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.data.cassandra.core.query.Query; import org.springframework.lang.Nullable; @@ -29,6 +31,7 @@ * Implementation of {@link ReactiveSelectOperation}. * * @author Mark Paluch + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.ReactiveSelectOperation * @see org.springframework.data.cassandra.core.query.Query * @since 2.1 @@ -49,7 +52,7 @@ public ReactiveSelect query(Class domainType) { Assert.notNull(domainType, "DomainType must not be null"); - return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null); + return new ReactiveSelectSupport<>(this.template, domainType, domainType, Query.empty(), null, null); } static class ReactiveSelectSupport implements ReactiveSelect { @@ -62,26 +65,40 @@ static class ReactiveSelectSupport implements ReactiveSelect { private final Query query; + private final @Nullable CqlIdentifier keyspaceName; private final @Nullable CqlIdentifier tableName; public ReactiveSelectSupport(ReactiveCassandraTemplate template, Class domainType, Class returnType, - Query query, CqlIdentifier tableName) { + Query query, @Nullable CqlIdentifier keyspaceName, @Nullable CqlIdentifier tableName) { this.template = template; this.domainType = domainType; this.returnType = returnType; this.query = query; this.tableName = tableName; + this.keyspaceName = keyspaceName; } /* (non-Javadoc) - * @see org.springframework.data.cassandra.core.ReactiveSelectOperation.SelectWithTable#inTable(org.springframework.data.cassandra.core.cql.CqlIdentifier) + * @see org.springframework.data.cassandra.core.ReactiveSelectOperation.SelectWithTable#inTable(com.datastax.oss.driver.api.core.CqlIdentifier) */ @Override public SelectWithProjection inTable(CqlIdentifier tableName) { Assert.notNull(tableName, "Table name must not be null"); - return new ReactiveSelectSupport<>(this.template, this.domainType, this.returnType, this.query, tableName); + return new ReactiveSelectSupport<>(this.template, this.domainType, this.returnType, this.query, null, tableName); + } + + /* (non-Javadoc) + * @see org.springframework.data.cassandra.core.ReactiveSelectOperation.SelectWithTable#inTable(com.datastax.oss.driver.api.core.CqlIdentifier, com.datastax.oss.driver.api.core.CqlIdentifier) + */ + @Override + public SelectWithProjection inTable(@Nullable CqlIdentifier keyspaceName, CqlIdentifier tableName) { + + Assert.notNull(tableName, "Table name must not be null"); + + return new ReactiveSelectSupport<>(this.template, this.domainType, this.returnType, this.query, keyspaceName, + tableName); } /* (non-Javadoc) @@ -92,7 +109,8 @@ public SelectWithQuery as(Class returnType) { Assert.notNull(returnType, "ReturnType must not be null"); - return new ReactiveSelectSupport<>(this.template, this.domainType, returnType, this.query, this.tableName); + return new ReactiveSelectSupport<>(this.template, this.domainType, returnType, this.query, this.keyspaceName, + this.tableName); } /* (non-Javadoc) @@ -103,7 +121,8 @@ public TerminatingSelect matching(Query query) { Assert.notNull(query, "Query must not be null"); - return new ReactiveSelectSupport<>(this.template, this.domainType, this.returnType, query, this.tableName); + return new ReactiveSelectSupport<>(this.template, this.domainType, this.returnType, query, this.keyspaceName, + this.tableName); } /* (non-Javadoc) @@ -111,7 +130,7 @@ public TerminatingSelect matching(Query query) { */ @Override public Mono count() { - return this.template.doCount(this.query, this.domainType, getTableName()); + return this.template.doCount(this.query, this.domainType, getTableCoordinates()); } /* (non-Javadoc) @@ -119,7 +138,7 @@ public Mono count() { */ @Override public Mono exists() { - return this.template.doExists(this.query, this.domainType, getTableName()); + return this.template.doExists(this.query, this.domainType, getTableCoordinates()); } /* (non-Javadoc) @@ -127,7 +146,8 @@ public Mono exists() { */ @Override public Mono first() { - return this.template.doSelect(this.query.limit(1), this.domainType, getTableName(), this.returnType).next(); + return this.template + .doSelect(this.query.limit(1), this.domainType, getTableCoordinates(), this.returnType).next(); } /* (non-Javadoc) @@ -136,7 +156,8 @@ public Mono first() { @Override public Mono one() { - Flux result = this.template.doSelect(this.query.limit(2), this.domainType, getTableName(), this.returnType); + Flux result = this.template.doSelect(this.query.limit(2), this.domainType, getTableCoordinates(), + this.returnType); return result.collectList() // .flatMap(it -> { @@ -159,11 +180,20 @@ public Mono one() { */ @Override public Flux all() { - return this.template.doSelect(this.query, this.domainType, getTableName(), this.returnType); + return this.template.doSelect(this.query, this.domainType, getTableCoordinates(), this.returnType); + } + + private Optional getKeyspaceName() { + return this.keyspaceName != null ? Optional.of(this.keyspaceName) + : this.template.getKeyspaceName(this.domainType); } private CqlIdentifier getTableName() { return this.tableName != null ? this.tableName : this.template.getTableName(this.domainType); } + + private TableCoordinates getTableCoordinates() { + return TableCoordinates.of(getKeyspaceName(), getTableName()); + } } } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperation.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperation.java index 464e31045..7afa7d0bf 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperation.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperation.java @@ -19,6 +19,7 @@ import org.springframework.data.cassandra.core.query.Query; import org.springframework.data.cassandra.core.query.Update; +import org.springframework.lang.Nullable; import org.springframework.util.Assert; import com.datastax.oss.driver.api.core.CqlIdentifier; @@ -46,6 +47,7 @@ * * @author Mark Paluch * @author John Blum + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.query.Query * @see org.springframework.data.cassandra.core.query.Update * @since 2.1 @@ -93,11 +95,28 @@ default UpdateWithQuery inTable(String table) { * @param table {@link CqlIdentifier name} of the table; must not be {@literal null}. * @return new instance of {@link UpdateWithQuery}. * @throws IllegalArgumentException if {@link CqlIdentifier table} is {@literal null}. - * @see com.datastax.oss.driver.api.core.CqlIdentifier + * @see CqlIdentifier * @see UpdateWithQuery */ UpdateWithQuery inTable(CqlIdentifier table); + /** + * Explicitly set the {@link CqlIdentifier keyspace} and the {@link CqlIdentifier name} of the table on which to + * perform the update. + *

+ * Skip this step to use the default table derived from the {@link Class domain type}. Skip this step to use the + * default session-level keyspace. + * + * @param table {@link CqlIdentifier name} of the table; must not be {@literal null}. + * @param keyspace {@link CqlIdentifier keyspace} of the table; if set to {@literal null}, the default session-level + * keyspace will be used. + * @return new instance of {@link UpdateWithQuery}. + * @throws IllegalArgumentException if {@link CqlIdentifier table} is {@literal null}. + * @see CqlIdentifier + * @see UpdateWithQuery + */ + UpdateWithQuery inTable(@Nullable CqlIdentifier keyspace, CqlIdentifier table); + } /** diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperationSupport.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperationSupport.java index 0ff22afed..28c529252 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperationSupport.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/ReactiveUpdateOperationSupport.java @@ -17,6 +17,8 @@ import reactor.core.publisher.Mono; +import java.util.Optional; + import org.springframework.data.cassandra.core.query.Query; import org.springframework.data.cassandra.core.query.Update; import org.springframework.lang.Nullable; @@ -28,6 +30,7 @@ * Implementation of {@link ReactiveUpdateOperation}. * * @author Mark Paluch + * @author Tomasz Lelek * @see org.springframework.data.cassandra.core.ReactiveUpdateOperation * @see org.springframework.data.cassandra.core.query.Query * @see org.springframework.data.cassandra.core.query.Update @@ -49,7 +52,7 @@ public ReactiveUpdate update(Class domainType) { Assert.notNull(domainType, "DomainType must not be null"); - return new ReactiveUpdateSupport(this.template, domainType, Query.empty(), null); + return new ReactiveUpdateSupport(this.template, domainType, Query.empty(), null, null); } static class ReactiveUpdateSupport implements ReactiveUpdate, TerminatingUpdate { @@ -60,25 +63,37 @@ static class ReactiveUpdateSupport implements ReactiveUpdate, TerminatingUpdate private final Query query; + private final @Nullable CqlIdentifier keyspaceName; private final @Nullable CqlIdentifier tableName; public ReactiveUpdateSupport(ReactiveCassandraTemplate template, Class domainType, Query query, - CqlIdentifier tableName) { + @Nullable CqlIdentifier keyspaceName, @Nullable CqlIdentifier tableName) { this.template = template; this.domainType = domainType; this.query = query; + this.keyspaceName = keyspaceName; this.tableName = tableName; } /* (non-Javadoc) - * @see org.springframework.data.cassandra.core.ReactiveUpdateOperation.UpdateWithTable#inTable(org.springframework.data.cassandra.core.cql.CqlIdentifier) + * @see org.springframework.data.cassandra.core.ReactiveUpdateOperation.UpdateWithTable#inTable(CqlIdentifier) */ @Override public UpdateWithQuery inTable(CqlIdentifier tableName) { Assert.notNull(tableName, "Table name must not be null"); - return new ReactiveUpdateSupport(this.template, this.domainType, this.query, tableName); + return new ReactiveUpdateSupport(this.template, this.domainType, this.query, null, tableName); + } + + /* (non-Javadoc) + * @see org.springframework.data.cassandra.core.ReactiveUpdateOperation.UpdateWithTable#inTable(com.datastax.oss.driver.api.core.CqlIdentifier, com.datastax.oss.driver.api.core.CqlIdentifier) + */ + @Override + public UpdateWithQuery inTable(@Nullable CqlIdentifier keyspaceName, CqlIdentifier tableName) { + Assert.notNull(tableName, "Table name must not be null"); + + return new ReactiveUpdateSupport(this.template, this.domainType, this.query, keyspaceName, tableName); } /* (non-Javadoc) @@ -89,7 +104,7 @@ public TerminatingUpdate matching(Query query) { Assert.notNull(query, "Query must not be null"); - return new ReactiveUpdateSupport(this.template, this.domainType, query, this.tableName); + return new ReactiveUpdateSupport(this.template, this.domainType, query, this.keyspaceName, this.tableName); } /* (non-Javadoc) @@ -100,11 +115,20 @@ public Mono apply(Update update) { Assert.notNull(update, "Update must not be null"); - return this.template.doUpdate(this.query, update, this.domainType, getTableName()); + return this.template.doUpdate(this.query, update, this.domainType, getTableCoordinates()); } private CqlIdentifier getTableName() { return this.tableName != null ? this.tableName : this.template.getTableName(this.domainType); } + + private Optional getKeyspaceName() { + return this.keyspaceName != null ? Optional.of(this.keyspaceName) + : this.template.getKeyspaceName(this.domainType); + } + + private TableCoordinates getTableCoordinates() { + return TableCoordinates.of(getKeyspaceName(), getTableName()); + } } } diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/StatementFactory.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/StatementFactory.java index e7b1744d9..e6369bcaf 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/StatementFactory.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/StatementFactory.java @@ -91,6 +91,7 @@ * * @author Mark Paluch * @author John Blum + * @author Tomasz Lelek * @see com.datastax.oss.driver.api.core.cql.Statement * @see org.springframework.data.cassandra.core.query.Query * @see org.springframework.data.cassandra.core.query.Update @@ -180,7 +181,8 @@ public StatementBuilder count(Query query, CassandraPersistentEntity * * @param query user-defined count {@link Query} to execute; must not be {@literal null}. * @param entity {@link CassandraPersistentEntity entity} to count; must not be {@literal null}. - * @param tableName must not be {@literal null}. + * @param tableCoordinates must not be {@literal null}. * @return the select builder. * @since 2.1 */ - public StatementBuilder count(Query query, CassandraPersistentEntity entity, + TableCoordinates tableCoordinates) { Filter filter = getQueryMapper().getMappedObject(query, entity); List selectors = Collections.singletonList(FunctionCall.from("COUNT", 1L)); - return createSelect(query, entity, filter, selectors, tableName); + return createSelect(query, entity, filter, selectors, tableCoordinates); } /** @@ -207,17 +210,18 @@ public StatementBuilder selectOneById(Object id, CassandraPersistentEntity persistentEntity, - CqlIdentifier tableName) { + TableCoordinates tableCoordinates) { Where where = new Where(); cassandraConverter.write(id, where, persistentEntity); - return StatementBuilder.of(QueryBuilder.selectFrom(tableName).all().limit(1)) + return StatementBuilder.of(QueryBuilder + .selectFrom(tableCoordinates.getKeyspaceName().orElse(null), tableCoordinates.getTableName()).all().limit(1)) .bind((statement, factory) -> statement.where(toRelations(where, factory))); } @@ -233,7 +237,8 @@ public StatementBuilder select(Query query, CassandraPersistentEntity * * @param query must not be {@literal null}. * @param persistentEntity must not be {@literal null}. - * @param tableName must not be {@literal null}. + * @param tableCoordinates must not be {@literal null}. * @return the select builder. * @since 2.1 */ public StatementBuilder select(Query query, CassandraPersistentEntity List selectors = getQueryMapper().getMappedSelectors(query.getColumns(), persistentEntity); - return createSelect(query, persistentEntity, filter, selectors, tableName); + return createSelect(query, persistentEntity, filter, selectors, tableCoordinates); } /** @@ -274,25 +279,25 @@ public StatementBuilder insert(Object objectToInsert, WriteOption CassandraPersistentEntity persistentEntity = cassandraConverter.getMappingContext() .getRequiredPersistentEntity(objectToInsert.getClass()); - return insert(objectToInsert, options, persistentEntity, persistentEntity.getTableName()); + return insert(objectToInsert, options, persistentEntity, + TableCoordinates.of(persistentEntity)); } /** * Creates a Query Object for an insert. * - * @param tableName the table name, must not be empty and not {@literal null}. + * @param tableCoordinates the table coordinates, must not be {@literal null}. * @param objectToInsert the object to save, must not be {@literal null}. * @param options optional {@link WriteOptions} to apply to the {@link Insert} statement, may be {@literal null}. * @param persistentEntity the {@link CassandraPersistentEntity} to write insert values. * @return the select builder. */ StatementBuilder insert(Object objectToInsert, WriteOptions options, - CassandraPersistentEntity persistentEntity, CqlIdentifier tableName) { + CassandraPersistentEntity persistentEntity, TableCoordinates tableCoordinates) { - Assert.notNull(tableName, "TableName must not be null"); + Assert.notNull(tableCoordinates.getTableName(), "TableName must not be null"); Assert.notNull(objectToInsert, "Object to insert must not be null"); Assert.notNull(persistentEntity, "CassandraPersistentEntity must not be null"); - Assert.notNull(tableName, "Table name must not be null"); boolean insertNulls; if (options instanceof InsertOptions) { @@ -307,7 +312,9 @@ StatementBuilder insert(Object objectToInsert, WriteOptions optio cassandraConverter.write(objectToInsert, object, persistentEntity); StatementBuilder builder = StatementBuilder - .of(QueryBuilder.insertInto(tableName).valuesByIds(Collections.emptyMap())).bind((statement, factory) -> { + .of(QueryBuilder.insertInto(tableCoordinates.getKeyspaceName().orElse(null), tableCoordinates.getTableName()) + .valuesByIds(Collections.emptyMap())) + .bind((statement, factory) -> { Map values = createTerms(insertNulls, object, factory); @@ -348,7 +355,8 @@ public StatementBuilder Assert.notNull(update, "Update must not be null"); Assert.notNull(persistentEntity, "CassandraPersistentEntity must not be null"); - return update(query, update, persistentEntity, persistentEntity.getTableName()); + return update(query, update, persistentEntity, + TableCoordinates.of(persistentEntity)); } /** @@ -357,23 +365,25 @@ public StatementBuilder * @param query must not be {@literal null}. * @param update must not be {@literal null}. * @param persistentEntity must not be {@literal null}. - * @param tableName must not be {@literal null}. + * @param tableCoordinates must not be {@literal null}. * @return the update builder. * @since 2.1 */ StatementBuilder update(Query query, Update update, - CassandraPersistentEntity persistentEntity, CqlIdentifier tableName) { + CassandraPersistentEntity persistentEntity, TableCoordinates tableCoordinates) { Assert.notNull(query, "Query must not be null"); Assert.notNull(update, "Update must not be null"); Assert.notNull(persistentEntity, "CassandraPersistentEntity must not be null"); - Assert.notNull(tableName, "Table name must not be null"); + Assert.notNull(tableCoordinates, "TableCoordinates must not be null"); + Assert.notNull(tableCoordinates.getTableName(), "Table name must not be null"); Filter filter = getQueryMapper().getMappedObject(query, persistentEntity); Update mappedUpdate = getUpdateMapper().getMappedObject(update, persistentEntity); - StatementBuilder builder = update(tableName, mappedUpdate, + StatementBuilder builder = update(tableCoordinates, + mappedUpdate, filter); query.getQueryOptions().filter(UpdateOptions.class::isInstance).map(UpdateOptions.class::cast) @@ -407,7 +417,8 @@ public StatementBuilder CassandraPersistentEntity persistentEntity = cassandraConverter.getMappingContext() .getRequiredPersistentEntity(objectToUpdate.getClass()); - return update(objectToUpdate, options, persistentEntity, persistentEntity.getTableName()); + return update(objectToUpdate, options, persistentEntity, + TableCoordinates.of(persistentEntity)); } /** @@ -417,13 +428,14 @@ public StatementBuilder * @param objectToUpdate must not be {@literal null}. * @param options must not be {@literal null}. * @param entity must not be {@literal null}. - * @param tableName must not be {@literal null}. + * @param tableCoordinates must not be {@literal null}. * @return the update builder. */ StatementBuilder update(Object objectToUpdate, - WriteOptions options, CassandraPersistentEntity entity, CqlIdentifier tableName) { + WriteOptions options, CassandraPersistentEntity entity, TableCoordinates tableCoordinates) { - Assert.notNull(tableName, "TableName must not be null"); + Assert.notNull(tableCoordinates, "TableCoordinates must not be null"); + Assert.notNull(tableCoordinates.getTableName(), "TableName must not be null"); Assert.notNull(objectToUpdate, "Object to builder must not be null"); Assert.notNull(options, "WriteOptions must not be null"); Assert.notNull(entity, "CassandraPersistentEntity must not be null"); @@ -436,7 +448,8 @@ StatementBuilder update( where.forEach((cqlIdentifier, o) -> object.remove(cqlIdentifier)); StatementBuilder builder = StatementBuilder - .of(QueryBuilder.update(tableName).set().where()) + .of(QueryBuilder.update(tableCoordinates.getKeyspaceName().orElse(null), tableCoordinates.getTableName()).set() + .where()) .bind((statement, factory) -> ((UpdateWithAssignments) statement).set(toAssignments(object, factory)) .where(toRelations(where, factory))) .apply(update -> addWriteOptions(update, options)); @@ -482,7 +495,7 @@ public StatementBuilder delete(Query query, CassandraPersistentEntity Assert.notNull(query, "Query must not be null"); Assert.notNull(persistentEntity, "CassandraPersistentEntity must not be null"); - return delete(query, persistentEntity, persistentEntity.getTableName()); + return delete(query, persistentEntity, TableCoordinates.of(persistentEntity)); } /** @@ -490,21 +503,22 @@ public StatementBuilder delete(Query query, CassandraPersistentEntity * * @param query must not be {@literal null}. * @param persistentEntity must not be {@literal null}. - * @param tableName must not be {@literal null}. + * @param tableCoordinates must not be {@literal null}. * @return the delete builder. * @see 2.1 */ public StatementBuilder delete(Query query, CassandraPersistentEntity persistentEntity, - CqlIdentifier tableName) { + TableCoordinates tableCoordinates) { Assert.notNull(query, "Query must not be null"); Assert.notNull(persistentEntity, "CassandraPersistentEntity must not be null"); - Assert.notNull(tableName, "Table name must not be null"); + Assert.notNull(tableCoordinates, "TableCoordinates name must not be null"); + Assert.notNull(tableCoordinates.getTableName(), "Table name must not be null"); Filter filter = getQueryMapper().getMappedObject(query, persistentEntity); List columnNames = getQueryMapper().getMappedColumnNames(query.getColumns(), persistentEntity); - StatementBuilder builder = delete(columnNames, tableName, filter); + StatementBuilder builder = delete(columnNames, tableCoordinates, filter); query.getQueryOptions().filter(DeleteOptions.class::isInstance).map(DeleteOptions.class::cast) .map(DeleteOptions::getIfCondition) @@ -526,20 +540,21 @@ public StatementBuilder delete(Query query, CassandraPersistentEntity * @param entity must not be {@literal null}. * @param options must not be {@literal null}. * @param entityWriter must not be {@literal null}. - * @param tableName must not be {@literal null}. + * @param tableCoordinates must not be {@literal null}. * @return the delete builder. */ StatementBuilder delete(Object entity, QueryOptions options, EntityWriter entityWriter, - CqlIdentifier tableName) { - - Assert.notNull(tableName, "TableName must not be null"); + TableCoordinates tableCoordinates) { + Assert.notNull(tableCoordinates, "TableCoordinates must not be null"); + Assert.notNull(tableCoordinates.getTableName(), "TableName must not be null"); Assert.notNull(entity, "Object to builder must not be null"); Assert.notNull(entityWriter, "EntityWriter must not be null"); Where where = new Where(); entityWriter.write(entity, where); - StatementBuilder builder = StatementBuilder.of(QueryBuilder.deleteFrom(tableName).where()) + StatementBuilder builder = StatementBuilder.of(QueryBuilder + .deleteFrom(tableCoordinates.getKeyspaceName().orElse(null), tableCoordinates.getTableName()).where()) .bind((statement, factory) -> statement.where(toRelations(where, factory))); Optional.of(options).filter(WriteOptions.class::isInstance).map(WriteOptions.class::cast) @@ -592,12 +607,12 @@ Columns computeColumnsForProjection(Columns columns, PersistentEntity pers } private StatementBuilder select = createSelectAndOrder(selectors, tableName, filter, sort); + StatementBuilder createSelect(Query query, CassandraPersistentEn return select; } - private static StatementBuilder createSelectAndOrder(List selectors, + TableCoordinates from, Filter filter, Sort sort) { Select select; if (selectors.isEmpty()) { - select = QueryBuilder.selectFrom(from).all(); + select = QueryBuilder.selectFrom(from.getKeyspaceName().orElse(null), from.getTableName()).all(); } else { List mappedSelectors = selectors.stream() @@ -629,7 +645,8 @@ private static StatementBuilder builder = StatementBuilder.of(select); @@ -674,10 +691,12 @@ private static com.datastax.oss.driver.api.querybuilder.select.Selector getSelec .column(CqlIdentifier.fromInternal(selector.getExpression())); } - private static StatementBuilder update(CqlIdentifier table, + private static StatementBuilder update( + TableCoordinates tableCoordinates, Update mappedUpdate, Filter filter) { - UpdateStart updateStart = QueryBuilder.update(table); + UpdateStart updateStart = QueryBuilder.update(tableCoordinates.getKeyspaceName().orElse(null), + tableCoordinates.getTableName()); return StatementBuilder.of((com.datastax.oss.driver.api.querybuilder.update.Update) updateStart) .bind((statement, factory) -> { @@ -827,9 +846,9 @@ private static Assignment getAssignment(AddToMapOp updateOp, TermFactory termFac return Assignment.append(updateOp.toCqlIdentifier(), termFactory.create(updateOp.getValue())); } - private StatementBuilder delete(List columnNames, CqlIdentifier from, Filter filter) { + private StatementBuilder delete(List columnNames, TableCoordinates from, Filter filter) { - DeleteSelection select = QueryBuilder.deleteFrom(from); + DeleteSelection select = QueryBuilder.deleteFrom(from.getKeyspaceName().orElse(null), from.getTableName()); for (CqlIdentifier columnName : columnNames) { select = select.column(columnName); diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/TableCoordinates.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/TableCoordinates.java new file mode 100644 index 000000000..ff5c70697 --- /dev/null +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/TableCoordinates.java @@ -0,0 +1,61 @@ +/* + * Copyright 2017-2020 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.cassandra.core; + +import java.util.Optional; + +import com.datastax.oss.driver.api.core.CqlIdentifier; +import org.springframework.data.cassandra.core.mapping.BasicCassandraPersistentEntity; +import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity; + +/** + * The coordinates that are defining optional keyspace and non-optional table. They will be used when constructing and + * executing CQL queries. + * + * @author Tomasz Lelek + */ +public class TableCoordinates { + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private final Optional keyspaceName; + private final CqlIdentifier tableName; + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + private TableCoordinates(Optional keyspaceName, CqlIdentifier tableName) { + this.keyspaceName = keyspaceName; + this.tableName = tableName; + } + + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + public static TableCoordinates of(Optional keyspaceName, CqlIdentifier tableName) { + return new TableCoordinates(keyspaceName, tableName); + } + + public static TableCoordinates of(BasicCassandraPersistentEntity persistentEntity) { + return new TableCoordinates(persistentEntity.getKeyspaceName(), persistentEntity.getTableName()); + } + + public static TableCoordinates of(CassandraPersistentEntity persistentEntity) { + return new TableCoordinates(persistentEntity.getKeyspaceName(), persistentEntity.getTableName()); + } + + public Optional getKeyspaceName() { + return keyspaceName; + } + + public CqlIdentifier getTableName() { + return tableName; + } + +} diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/BasicCassandraPersistentEntity.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/BasicCassandraPersistentEntity.java index 7287237bb..3cb5b6058 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/BasicCassandraPersistentEntity.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/BasicCassandraPersistentEntity.java @@ -43,6 +43,7 @@ * @author Matthew T. Adams * @author John Blum * @author Mark Paluch + * @author Tomasz Lelek */ public class BasicCassandraPersistentEntity extends BasicPersistentEntity implements CassandraPersistentEntity, ApplicationContextAware { @@ -59,6 +60,8 @@ public class BasicCassandraPersistentEntity extends BasicPersistentEntity keyspaceName = Optional.empty(); + /** * Create a new {@link BasicCassandraPersistentEntity} given {@link TypeInformation}. * @@ -212,6 +215,7 @@ public void setNamingStrategy(NamingStrategy namingStrategy) { Assert.notNull(namingStrategy, "NamingStrategy must not be null"); this.namingStrategy = namingStrategy; + setKeyspaceName(namingStrategy.getKeyspaceName(this)); } NamingStrategy getNamingStrategy() { @@ -226,6 +230,16 @@ public CqlIdentifier getTableName() { return Optional.ofNullable(this.tableName).orElseGet(this::determineTableName); } + @Override + public Optional getKeyspaceName() { + return keyspaceName; + } + + @Override + public void setKeyspaceName(Optional keyspaceName) { + this.keyspaceName = keyspaceName; + } + /** * @param verifier The verifier to set. */ diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraMappingContext.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraMappingContext.java index ff7a7d8b6..dcd7bd388 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraMappingContext.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraMappingContext.java @@ -56,6 +56,7 @@ * @author John Blum * @author Jens Schauder * @author Vagif Zeynalov + * @author Toamsz Lelek */ public class CassandraMappingContext extends AbstractMappingContext, CassandraPersistentProperty> @@ -72,7 +73,7 @@ public class CassandraMappingContext private Mapping mapping = new Mapping(); private NamingStrategy namingStrategy = NamingStrategy.INSTANCE; - + private @Deprecated @Nullable UserTypeResolver userTypeResolver; private @Deprecated CodecRegistry codecRegistry = CodecRegistry.DEFAULT; diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraPersistentEntity.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraPersistentEntity.java index ada924be6..de3dc3d08 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraPersistentEntity.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/CassandraPersistentEntity.java @@ -15,6 +15,8 @@ */ package org.springframework.data.cassandra.core.mapping; +import java.util.Optional; + import org.springframework.data.mapping.PersistentEntity; import org.springframework.util.Assert; @@ -26,6 +28,7 @@ * @author Alex Shvid * @author Matthew T. Adams * @author Mark Paluch + * @author Tomasz Lelek */ public interface CassandraPersistentEntity extends PersistentEntity { @@ -50,6 +53,21 @@ public interface CassandraPersistentEntity extends PersistentEntity getKeyspaceName(); + + /** + * Sets the CQL table name. + * + * @param keyspaceName if is {@code Optional.empty()}, then session level keyspace will be used. + */ + @SuppressWarnings("OptionalUsedAsFieldOrParameterType") + void setKeyspaceName(Optional keyspaceName); + /** * Sets the CQL table name. * @@ -70,6 +88,7 @@ default void setTableName(org.springframework.data.cassandra.core.cql.CqlIdentif */ void setTableName(CqlIdentifier tableName); + /** * @return {@literal true} if the type is a mapped tuple type. * @since 2.1 diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/EmbeddedEntityOperations.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/EmbeddedEntityOperations.java index 7233ea27c..9577a0669 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/EmbeddedEntityOperations.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/EmbeddedEntityOperations.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Optional; import java.util.Spliterator; import java.util.function.Consumer; @@ -41,6 +42,7 @@ * Support methods to obtain {@link PersistentProperty} and {@link PersistentEntity} for embedded properties. * * @author Christoph Strobl + * @author Tomasz Lelek * @since 3.0 * @see Embedded */ @@ -101,6 +103,16 @@ public CqlIdentifier getTableName() { return delegate.getTableName(); } + @Override + public Optional getKeyspaceName() { + return delegate.getKeyspaceName(); + } + + @Override + public void setKeyspaceName(Optional keyspaceName) { + delegate.setKeyspaceName(keyspaceName); + } + @Override @Deprecated public void setTableName(org.springframework.data.cassandra.core.cql.CqlIdentifier tableName) { diff --git a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/NamingStrategy.java b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/NamingStrategy.java index 50f2417b3..4ba453c47 100644 --- a/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/NamingStrategy.java +++ b/spring-data-cassandra/src/main/java/org/springframework/data/cassandra/core/mapping/NamingStrategy.java @@ -15,8 +15,10 @@ */ package org.springframework.data.cassandra.core.mapping; +import java.util.Optional; import java.util.function.UnaryOperator; +import com.datastax.oss.driver.api.core.CqlIdentifier; import org.springframework.util.Assert; /** @@ -53,6 +55,11 @@ default String getTableName(CassandraPersistentEntity entity) { return entity.getType().getSimpleName(); } + default Optional getKeyspaceName(CassandraPersistentEntity entity) { + Assert.notNull(entity, "CassandraPersistentEntity must not be null"); + return Optional.empty(); + } + /** * Create a user-defined type name from the given {@link CassandraPersistentEntity}. */ diff --git a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/StatementFactoryUnitTests.java b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/StatementFactoryUnitTests.java index f7a950aec..5f70217c5 100644 --- a/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/StatementFactoryUnitTests.java +++ b/spring-data-cassandra/src/test/java/org/springframework/data/cassandra/core/StatementFactoryUnitTests.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.junit.jupiter.api.Test; @@ -33,8 +34,10 @@ import org.springframework.data.cassandra.core.cql.WriteOptions; import org.springframework.data.cassandra.core.cql.util.StatementBuilder; import org.springframework.data.cassandra.core.cql.util.StatementBuilder.ParameterHandling; +import org.springframework.data.cassandra.core.mapping.CassandraMappingContext; import org.springframework.data.cassandra.core.mapping.CassandraPersistentEntity; import org.springframework.data.cassandra.core.mapping.Column; +import org.springframework.data.cassandra.core.mapping.NamingStrategy; import org.springframework.data.cassandra.core.query.Columns; import org.springframework.data.cassandra.core.query.Criteria; import org.springframework.data.cassandra.core.query.Query; @@ -42,6 +45,7 @@ import org.springframework.data.cassandra.domain.Group; import org.springframework.data.domain.Sort; +import com.datastax.oss.driver.api.core.CqlIdentifier; import com.datastax.oss.driver.api.core.DefaultConsistencyLevel; import com.datastax.oss.driver.api.core.cql.SimpleStatement; import com.datastax.oss.driver.api.querybuilder.delete.Delete; @@ -52,6 +56,7 @@ * Unit tests for {@link StatementFactory}. * * @author Mark Paluch + * @author Tomasz Lelek */ class StatementFactoryUnitTests { @@ -610,6 +615,27 @@ void shouldCreateCountQuery() { .isEqualTo("SELECT count(1) FROM group WHERE foo='bar'"); } + @Test // DATACASS-751 + void shouldConstructQueryWithKeyspace() { + CassandraMappingContext cassandraMappingContext = new CassandraMappingContext(); + cassandraMappingContext.setNamingStrategy(new NamingStrategy() { + @Override + public Optional getKeyspaceName(CassandraPersistentEntity entity) { + return Optional.of(CqlIdentifier.fromCql("ks1")); + } + }); + + CassandraConverter converter = new MappingCassandraConverter(cassandraMappingContext); + UpdateMapper updateMapper = new UpdateMapper(converter); + StatementFactory statementFactory = new StatementFactory(updateMapper, updateMapper); + + + StatementBuilder