diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/CassandraSharedResourceManager.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/CassandraResourceManager.java similarity index 91% rename from v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/CassandraSharedResourceManager.java rename to v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/CassandraResourceManager.java index 5f166a2317..ab1dbf39c8 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/CassandraSharedResourceManager.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/CassandraResourceManager.java @@ -33,7 +33,6 @@ import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.testcontainers.TestContainerResourceManager; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,10 +51,10 @@ * *

The class is thread-safe. */ -public class CassandraSharedResourceManager - extends TestContainerResourceManager> implements ResourceManager { +public class CassandraResourceManager extends TestContainerResourceManager> + implements ResourceManager { - private static final Logger LOG = LoggerFactory.getLogger(CassandraSharedResourceManager.class); + private static final Logger LOG = LoggerFactory.getLogger(CassandraResourceManager.class); private static final String DEFAULT_CASSANDRA_CONTAINER_NAME = "cassandra"; @@ -70,7 +69,7 @@ public class CassandraSharedResourceManager private final String keyspaceName; private final boolean usingStaticDatabase; - private CassandraSharedResourceManager(Builder builder) { + private CassandraResourceManager(Builder builder) { this( null, new CassandraContainer<>( @@ -80,7 +79,7 @@ private CassandraSharedResourceManager(Builder builder) { @VisibleForTesting @SuppressWarnings("nullness") - CassandraSharedResourceManager( + CassandraResourceManager( @Nullable CqlSession cassandraClient, CassandraContainer container, Builder builder) { super(container, builder); // we are trying to handle userDefined KeyspaceName name without usingStatic Container @@ -176,19 +175,6 @@ public synchronized void execute(String statement) { } } - /** - * Inserts the given Document into a table. - * - *

A database will be created here, if one does not already exist. - * - * @param tableName The name of the table to insert the document into. - * @param document The document to insert into the table. - * @return A boolean indicating whether the Document was inserted successfully. - */ - public synchronized boolean insertDocument(String tableName, Map document) { - return insertDocuments(tableName, ImmutableList.of(document)); - } - /** * Inserts the given Documents into a collection. * @@ -312,9 +298,9 @@ private static RetryPolicy buildRetryPolicy() { .build(); } - /** Builder for {@link CassandraSharedResourceManager}. */ + /** Builder for {@link CassandraResourceManager}. */ public static final class Builder - extends TestContainerResourceManager.Builder { + extends TestContainerResourceManager.Builder { private @Nullable String keyspaceName; @@ -359,8 +345,8 @@ public Builder sePreGeneratedKeyspaceName(boolean preGeneratedKeyspaceName) { } @Override - public CassandraSharedResourceManager build() { - return new CassandraSharedResourceManager(this); + public CassandraResourceManager build() { + return new CassandraResourceManager(this); } } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java deleted file mode 100644 index 64405b98b4..0000000000 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraDbITBase.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Copyright (C) 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.templates; - -import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; - -import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation; -import com.google.common.io.Resources; -import com.google.pubsub.v1.SubscriptionName; -import com.google.pubsub.v1.TopicName; -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.nio.charset.StandardCharsets; -import java.time.format.DateTimeFormatter; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.apache.beam.it.common.PipelineLauncher; -import org.apache.beam.it.common.utils.PipelineUtils; -import org.apache.beam.it.common.utils.ResourceManagerUtils; -import org.apache.beam.it.gcp.TemplateTestBase; -import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils; -import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; -import org.apache.beam.it.gcp.spanner.SpannerResourceManager; -import org.apache.beam.it.gcp.storage.GcsResourceManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public abstract class SpannerToCassandraDbITBase extends TemplateTestBase { - private static final Logger LOG = LoggerFactory.getLogger(SpannerToCassandraDbITBase.class); - - protected SpannerResourceManager createSpannerDatabase(String spannerSchemaFile) - throws IOException { - SpannerResourceManager spannerResourceManager = - SpannerResourceManager.builder("rr-main-" + testName, PROJECT, REGION) - .maybeUseStaticInstance() - .build(); - - String ddl; - try (InputStream inputStream = - Thread.currentThread().getContextClassLoader().getResourceAsStream(spannerSchemaFile)) { - if (inputStream == null) { - throw new FileNotFoundException("Resource file not found: " + spannerSchemaFile); - } - try (BufferedReader reader = - new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { - ddl = reader.lines().collect(Collectors.joining("\n")); - } - } - - if (ddl.isBlank()) { - throw new IllegalStateException("DDL file is empty: " + spannerSchemaFile); - } - - String[] ddls = ddl.trim().split(";"); - for (String d : ddls) { - d = d.trim(); - if (!d.isEmpty()) { - spannerResourceManager.executeDdlStatement(d); - } - } - return spannerResourceManager; - } - - protected SpannerResourceManager createSpannerMetadataDatabase() throws IOException { - SpannerResourceManager spannerMetadataResourceManager = - SpannerResourceManager.builder("rr-meta-" + testName, PROJECT, REGION) - .maybeUseStaticInstance() - .build(); - String dummy = "create table t1(id INT64 ) primary key(id)"; - spannerMetadataResourceManager.executeDdlStatement(dummy); - return spannerMetadataResourceManager; - } - - public PubsubResourceManager setUpPubSubResourceManager() throws IOException { - return PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); - } - - public CassandraSharedResourceManager generateKeyspaceAndBuildCassandraResource() { - String keyspaceName = - ResourceManagerUtils.generateResourceId( - testName, - Pattern.compile("[/\\\\. \"\u0000$]"), - "-", - 27, - DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss-SSSSSS")) - .replace('-', '_'); - if (keyspaceName.length() > 48) { - keyspaceName = keyspaceName.substring(0, 48); - } - return CassandraSharedResourceManager.builder(testName) - .setKeyspaceName(keyspaceName) - .sePreGeneratedKeyspaceName(true) - .build(); - } - - public SubscriptionName createPubsubResources( - String identifierSuffix, PubsubResourceManager pubsubResourceManager, String gcsPrefix) { - String topicNameSuffix = "rr-it" + identifierSuffix; - String subscriptionNameSuffix = "rr-it-sub" + identifierSuffix; - TopicName topic = pubsubResourceManager.createTopic(topicNameSuffix); - SubscriptionName subscription = - pubsubResourceManager.createSubscription(topic, subscriptionNameSuffix); - String prefix = gcsPrefix; - if (prefix.startsWith("/")) { - prefix = prefix.substring(1); - } - prefix += "/retry/"; - gcsClient.createNotification(topic.toString(), prefix); - return subscription; - } - - public void createAndUploadCassandraConfigToGcs( - GcsResourceManager gcsResourceManager, - CassandraSharedResourceManager cassandraResourceManagers, - String cassandraConfigFile) - throws IOException { - - String host = cassandraResourceManagers.getHost(); - int port = cassandraResourceManagers.getPort(); - String keyspaceName = cassandraResourceManagers.getKeyspaceName(); - String cassandraConfigContents; - try (InputStream inputStream = - Thread.currentThread().getContextClassLoader().getResourceAsStream(cassandraConfigFile)) { - if (inputStream == null) { - throw new FileNotFoundException("Resource file not found: " + cassandraConfigFile); - } - cassandraConfigContents = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); - } - - cassandraConfigContents = - cassandraConfigContents - .replace("##host##", host) - .replace("##port##", Integer.toString(port)) - .replace("##keyspace##", keyspaceName); - - gcsResourceManager.createArtifact("input/cassandra-config.conf", cassandraConfigContents); - } - - public PipelineLauncher.LaunchInfo launchDataflowJob( - GcsResourceManager gcsResourceManager, - SpannerResourceManager spannerResourceManager, - SpannerResourceManager spannerMetadataResourceManager, - String subscriptionName, - String identifierSuffix, - String shardingCustomJarPath, - String shardingCustomClassName, - String sourceDbTimezoneOffset, - CustomTransformation customTransformation) - throws IOException { - - Map params = - new HashMap<>() { - { - put("instanceId", spannerResourceManager.getInstanceId()); - put("databaseId", spannerResourceManager.getDatabaseId()); - put("spannerProjectId", PROJECT); - put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId()); - put("metadataInstance", spannerMetadataResourceManager.getInstanceId()); - put( - "sourceShardsFilePath", - getGcsPath("input/cassandra-config.conf", gcsResourceManager)); - put("changeStreamName", "allstream"); - put("dlqGcsPubSubSubscription", subscriptionName); - put("deadLetterQueueDirectory", getGcsPath("dlq", gcsResourceManager)); - put("maxShardConnections", "5"); - put("maxNumWorkers", "1"); - put("numWorkers", "1"); - put("sourceType", "cassandra"); - } - }; - - if (shardingCustomJarPath != null) { - params.put( - "shardingCustomJarPath", - getGcsFullPath(gcsResourceManager, shardingCustomJarPath, identifierSuffix)); - } - if (shardingCustomClassName != null) { - params.put("shardingCustomClassName", shardingCustomClassName); - } - - if (sourceDbTimezoneOffset != null) { - params.put("sourceDbTimezoneOffset", sourceDbTimezoneOffset); - } - - if (customTransformation != null) { - params.put( - "transformationJarPath", getGcsPath(customTransformation.jarPath(), gcsResourceManager)); - params.put("transformationClassName", customTransformation.classPath()); - } - - // Construct template - String jobName = PipelineUtils.createJobName("rrev-it" + testName); - // /-DunifiedWorker=true when using runner v2 - PipelineLauncher.LaunchConfig.Builder options = - PipelineLauncher.LaunchConfig.builder(jobName, specPath); - options.setParameters(params); - options.addEnvironment("additionalExperiments", Collections.singletonList("use_runner_v2")); - // Run - PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options, false); - assertThatPipeline(jobInfo).isRunning(); - return jobInfo; - } - - protected void createCassandraSchema( - CassandraSharedResourceManager cassandraResourceManager, String cassandraSchemaFile) - throws IOException { - String ddl = - String.join( - " ", - Resources.readLines( - Resources.getResource(cassandraSchemaFile), StandardCharsets.UTF_8)); - ddl = ddl.trim(); - String[] ddls = ddl.split(";"); - for (String d : ddls) { - if (!d.isBlank()) { - cassandraResourceManager.execute(d); - } - } - } - - public String getGcsFullPath( - GcsResourceManager gcsResourceManager, String artifactId, String identifierSuffix) { - return ArtifactUtils.getFullGcsPath( - artifactBucketName, identifierSuffix, gcsResourceManager.runId(), artifactId); - } -} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java deleted file mode 100644 index 70cbe22ff5..0000000000 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbDatatypeIT.java +++ /dev/null @@ -1,422 +0,0 @@ -/* - * Copyright (C) 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.templates; - -import static com.google.common.truth.Truth.assertThat; -import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; -import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; - -import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.Row; -import com.google.cloud.ByteArray; -import com.google.cloud.Date; -import com.google.cloud.Timestamp; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Value; -import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; -import com.google.cloud.teleport.metadata.TemplateIntegrationTest; -import com.google.pubsub.v1.SubscriptionName; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import org.apache.beam.it.common.PipelineLauncher; -import org.apache.beam.it.common.PipelineOperator; -import org.apache.beam.it.common.utils.ResourceManagerUtils; -import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; -import org.apache.beam.it.gcp.spanner.SpannerResourceManager; -import org.apache.beam.it.gcp.storage.GcsResourceManager; -import org.apache.commons.codec.DecoderException; -import org.apache.commons.codec.binary.Hex; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.junit.runners.model.MultipleFailureException; - -@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) -@TemplateIntegrationTest(SpannerToSourceDb.class) -@RunWith(JUnit4.class) -public class SpannerToCassandraSourceDbDatatypeIT extends SpannerToCassandraDbITBase { - - private static final String SPANNER_DDL_RESOURCE = - "SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql"; - private static final String CASSANDRA_SCHEMA_FILE_RESOURCE = - "SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql"; - private static final String CASSANDRA_CONFIG_FILE_RESOURCE = - "SpannerToCassandraSourceDbDatatypeIT/cassandra-config-template.conf"; - - private static final String TABLE = "AllDatatypeColumns"; - private static final HashSet testInstances = - new HashSet<>(); - private static PipelineLauncher.LaunchInfo jobInfo; - public static SpannerResourceManager spannerResourceManager; - private static SpannerResourceManager spannerMetadataResourceManager; - public static CassandraSharedResourceManager cassandraResourceManager; - private static GcsResourceManager gcsResourceManager; - private static PubsubResourceManager pubsubResourceManager; - private SubscriptionName subscriptionName; - private final List assertionErrors = new ArrayList<>(); - - /** - * Setup resource managers and Launch dataflow job once during the execution of this test class. - * - * @throws IOException - */ - @Before - public void setUp() throws IOException { - skipBaseCleanup = true; - synchronized (SpannerToCassandraSourceDbDatatypeIT.class) { - testInstances.add(this); - if (jobInfo == null) { - spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE); - spannerMetadataResourceManager = createSpannerMetadataDatabase(); - - cassandraResourceManager = generateKeyspaceAndBuildCassandraResource(); - gcsResourceManager = - GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials) - .build(); - createCassandraSchema(cassandraResourceManager, CASSANDRA_SCHEMA_FILE_RESOURCE); - createAndUploadCassandraConfigToGcs( - gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE); - pubsubResourceManager = setUpPubSubResourceManager(); - subscriptionName = - createPubsubResources( - getClass().getSimpleName(), - pubsubResourceManager, - getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, "")); - jobInfo = - launchDataflowJob( - gcsResourceManager, - spannerResourceManager, - spannerMetadataResourceManager, - subscriptionName.toString(), - null, - null, - null, - null, - null); - } - } - } - - /** - * Cleanup dataflow job and all the resources and resource managers. - * - * @throws IOException - */ - @AfterClass - public static void cleanUp() throws IOException { - for (SpannerToCassandraSourceDbDatatypeIT instance : testInstances) { - instance.tearDownBase(); - } - ResourceManagerUtils.cleanResources( - spannerResourceManager, - cassandraResourceManager, - spannerMetadataResourceManager, - gcsResourceManager, - pubsubResourceManager); - } - - @Test - public void spannerToCassandraSourceDataTypeConversionTest() - throws InterruptedException, IOException, MultipleFailureException { - assertThatPipeline(jobInfo).isRunning(); - writeRowInSpanner(); - assertRowInCassandraDB(); - } - - private long getRowCount() { - String query = String.format("SELECT COUNT(*) FROM %s", TABLE); - ResultSet resultSet = cassandraResourceManager.executeStatement(query); - Row row = resultSet.one(); - if (row != null) { - return row.getLong(0); - } else { - throw new RuntimeException("Query did not return a result for table: " + TABLE); - } - } - - private void writeRowInSpanner() { - Mutation mutation = - Mutation.newInsertOrUpdateBuilder(TABLE) - .set("varchar_column") - .to("SampleVarchar") - .set("tinyint_column") - .to(127) - .set("text_column") - .to("This is some sample text data for the text column.") - .set("date_column") - .to(Value.date(Date.fromJavaUtilDate(java.sql.Date.valueOf("2025-01-27")))) - .set("smallint_column") - .to(32767) - .set("mediumint_column") - .to(8388607) - .set("int_column") - .to(2147483647) - .set("bigint_column") - .to(9223372036854775807L) - .set("float_column") - .to(3.14159) - .set("double_column") - .to(2.718281828459045) - .set("decimal_column") - .to(new BigDecimal("12345.6789")) - .set("datetime_column") - .to(Value.timestamp(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) - .set("timestamp_column") - .to(Value.timestamp(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) - .set("time_column") - .to("12:30:00") - .set("year_column") - .to("2025") - .set("char_column") - .to("CHAR_DATA") - .set("tinytext_column") - .to("Short text for tinytext.") - .set("mediumtext_column") - .to("Longer text data for mediumtext column.") - .set("longtext_column") - .to("Very long text data that exceeds the medium text column length for long text.") - .set("enum_column") - .to("OptionA") - .set("bool_column") - .to(Value.bool(Boolean.TRUE)) - .set("other_bool_column") - .to(Value.bool(Boolean.FALSE)) - .set("bytes_column") - .to(Value.bytes(ByteArray.copyFrom("SGVsbG8gd29ybGQ=".getBytes()))) - .set("list_text_column") - .to(Value.json("[\"apple\", \"banana\", \"cherry\"]")) - .set("list_int_column") - .to(Value.json("[1, 2, 3, 4, 5]")) - .set("frozen_list_bigint_column") - .to(Value.json("[123456789012345, 987654321012345]")) - .set("set_text_column") - .to(Value.json("[\"apple\", \"orange\", \"banana\"]")) - .set("set_date_column") - .to(Value.json("[\"2025-01-27\", \"2025-02-01\"]")) - .set("frozen_set_bool_column") - .to(Value.json("[true, false]")) - .set("map_text_to_int_column") - .to(Value.json("{\"key1\": 10, \"key2\": 20}")) - .set("map_date_to_text_column") - .to(Value.json("{\"2025-01-27\": \"event1\", \"2025-02-01\": \"event2\"}")) - .set("frozen_map_int_to_bool_column") - .to(Value.json("{\"1\": true, \"2\": false}")) - .set("map_text_to_list_column") - .to(Value.json("{\"fruit\": [\"apple\", \"banana\"], \"color\": [\"red\", \"green\"]}")) - .set("map_text_to_set_column") - .to( - Value.json( - "{\"fruit\": [\"apple\", \"banana\"], \"vegetables\": [\"carrot\", \"spinach\"]}")) - .set("set_of_maps_column") - .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) - .set("list_of_sets_column") - .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) - .set("frozen_map_text_to_list_column") - .to(Value.json("{\"fruits\": [\"apple\", \"banana\"]}")) - .set("frozen_map_text_to_set_column") - .to(Value.json("{\"vegetables\": [\"carrot\", \"spinach\"]}")) - .set("frozen_set_of_maps_column") - .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) - .set("frozen_list_of_sets_column") - .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) - .set("varint_column") - .to(Value.bytes(ByteArray.copyFrom("b3f5ed4f".getBytes()))) - .build(); - - spannerResourceManager.write(mutation); - } - - private void assertAll(Runnable... assertions) throws MultipleFailureException { - for (Runnable assertion : assertions) { - try { - assertion.run(); - } catch (AssertionError e) { - assertionErrors.add(e); - } - } - if (!assertionErrors.isEmpty()) { - throw new MultipleFailureException(assertionErrors); - } - } - - private void assertRowInCassandraDB() throws InterruptedException, MultipleFailureException { - PipelineOperator.Result result = - pipelineOperator() - .waitForCondition( - createConfig(jobInfo, Duration.ofMinutes(10)), () -> getRowCount() == 1); - assertThatResult(result).meetsConditions(); - Iterable rows; - try { - rows = cassandraResourceManager.readTable(TABLE); - } catch (Exception e) { - throw new RuntimeException("Failed to read from Cassandra table: " + TABLE, e); - } - - assertThat(rows).hasSize(1); - - Row row = rows.iterator().next(); - - assertThat(rows).hasSize(1); - assertAll( - // Basic Data Types - () -> assertThat(row.getString("varchar_column")).isEqualTo("SampleVarchar"), - () -> assertThat(row.getLong("bigint_column")).isEqualTo(9223372036854775807L), - () -> assertThat(row.getBoolean("bool_column")).isTrue(), - () -> { - String hexString = "5347567362473867643239796247513d"; - byte[] byteArray; - try { - byteArray = Hex.decodeHex(hexString); - } catch (DecoderException e) { - byteArray = new byte[0]; - } - ByteBuffer expectedBuffer = ByteBuffer.wrap(byteArray); - assertThat(row.getByteBuffer("bytes_column")).isEqualTo(expectedBuffer); - }, - () -> assertThat(row.getString("char_column")).isEqualTo("CHAR_DATA"), - () -> - assertThat(row.getLocalDate("date_column")) - .isEqualTo(java.time.LocalDate.of(2025, 1, 27)), - () -> - assertThat(row.getInstant("datetime_column")) - .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), - () -> - assertThat(row.getBigDecimal("decimal_column")).isEqualTo(new BigDecimal("12345.6789")), - () -> assertThat(row.getDouble("double_column")).isEqualTo(2.718281828459045), - () -> assertThat(row.getFloat("float_column")).isEqualTo(3.14159f), - - // Collections (frozen, list, set, map) - () -> - assertThat(row.getList("frozen_list_bigint_column", Long.class)) - .isEqualTo(Arrays.asList(123456789012345L, 987654321012345L)), - () -> - assertThat(row.getSet("frozen_set_bool_column", Boolean.class)) - .isEqualTo(new HashSet<>(Arrays.asList(false, true))), - () -> - assertThat(row.getMap("frozen_map_int_to_bool_column", Integer.class, Boolean.class)) - .isEqualTo(Map.of(1, true, 2, false)), - () -> - assertThat(row.getMap("frozen_map_text_to_list_column", String.class, List.class)) - .isEqualTo(Map.of("fruits", Arrays.asList("apple", "banana"))), - () -> - assertThat(row.getMap("frozen_map_text_to_set_column", String.class, Set.class)) - .isEqualTo(Map.of("vegetables", new HashSet<>(Arrays.asList("carrot", "spinach")))), - () -> - assertThat(row.getSet("frozen_set_of_maps_column", Map.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - - // Lists and Sets - () -> - assertThat(row.getList("list_int_column", Integer.class)) - .isEqualTo(Arrays.asList(1, 2, 3, 4, 5)), - () -> - assertThat(row.getList("list_text_column", String.class)) - .isEqualTo(Arrays.asList("apple", "banana", "cherry")), - () -> - assertThat(row.getList("list_of_sets_column", Set.class)) - .isEqualTo( - Arrays.asList( - new HashSet<>(Arrays.asList("apple", "banana")), - new HashSet<>(Arrays.asList("carrot", "spinach")))), - - // Maps - () -> - assertThat( - row.getMap("map_date_to_text_column", java.time.LocalDate.class, String.class)) - .isEqualTo( - Map.of( - java.time.LocalDate.parse("2025-01-27"), "event1", - java.time.LocalDate.parse("2025-02-01"), "event2")), - () -> - assertThat(row.getMap("map_text_to_int_column", String.class, Integer.class)) - .isEqualTo(Map.of("key1", 10, "key2", 20)), - () -> - assertThat(row.getMap("map_text_to_list_column", String.class, List.class)) - .isEqualTo( - Map.of( - "color", - Arrays.asList("red", "green"), - "fruit", - Arrays.asList("apple", "banana"))), - () -> - assertThat(row.getMap("map_text_to_set_column", String.class, Set.class)) - .isEqualTo( - Map.of( - "fruit", - new HashSet<>(Arrays.asList("apple", "banana")), - "vegetables", - new HashSet<>(Arrays.asList("carrot", "spinach")))), - - // Sets - () -> - assertThat(row.getSet("set_date_column", java.time.LocalDate.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - java.time.LocalDate.parse("2025-01-27"), - java.time.LocalDate.parse("2025-02-01")))), - () -> - assertThat(row.getSet("set_text_column", String.class)) - .isEqualTo(new HashSet<>(Arrays.asList("apple", "orange", "banana"))), - () -> - assertThat(row.getSet("set_of_maps_column", Map.class)) - .isEqualTo( - new HashSet<>( - Arrays.asList( - Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), - - // Other Basic Types - () -> assertThat(row.getShort("smallint_column")).isEqualTo((short) 32767), - () -> assertThat(row.getInt("mediumint_column")).isEqualTo(8388607), - () -> assertThat(row.getInt("int_column")).isEqualTo(2147483647), - () -> assertThat(row.getString("enum_column")).isEqualTo("OptionA"), - () -> assertThat(row.getString("year_column")).isEqualTo("2025"), - () -> - assertThat(row.getString("longtext_column")) - .isEqualTo( - "Very long text data that exceeds the medium text column length for long text."), - () -> assertThat(row.getString("tinytext_column")).isEqualTo("Short text for tinytext."), - () -> - assertThat(row.getString("mediumtext_column")) - .isEqualTo("Longer text data for mediumtext column."), - () -> - assertThat(row.getString("text_column")) - .isEqualTo("This is some sample text data for the text column."), - () -> - assertThat(row.getLocalTime("time_column")) - .isEqualTo(java.time.LocalTime.parse("12:30:00.000000000")), - () -> - assertThat(row.getInstant("timestamp_column")) - .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), - () -> - assertThat(row.getBigInteger("varint_column")) - .isEqualTo(java.math.BigInteger.valueOf(7076111819049546854L))); - } -} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java new file mode 100644 index 0000000000..f16c24c0c0 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToCassandraSourceDbIT.java @@ -0,0 +1,928 @@ +/* + * Copyright (C) 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; + +import com.datastax.oss.driver.api.core.cql.ResultSet; +import com.datastax.oss.driver.api.core.cql.Row; +import com.google.cloud.ByteArray; +import com.google.cloud.Date; +import com.google.cloud.Timestamp; +import com.google.cloud.spanner.Mutation; +import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.TransactionRunner; +import com.google.cloud.spanner.Value; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.pubsub.v1.SubscriptionName; +import java.io.IOException; +import java.math.BigDecimal; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.storage.GcsResourceManager; +import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; +import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.junit.runners.model.MultipleFailureException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(SpannerToSourceDb.class) +@RunWith(JUnit4.class) +public class SpannerToCassandraSourceDbIT extends SpannerToSourceDbITBase { + + private static final Logger LOG = LoggerFactory.getLogger(SpannerToCassandraSourceDbIT.class); + + private static final String SPANNER_DDL_RESOURCE = + "SpannerToCassandraSourceIT/spanner-schema.sql"; + private static final String CASSANDRA_SCHEMA_FILE_RESOURCE = + "SpannerToCassandraSourceIT/cassandra-schema.sql"; + private static final String CASSANDRA_CONFIG_FILE_RESOURCE = + "SpannerToCassandraSourceIT/cassandra-config-template.conf"; + + private static final String USER_TABLE = "Users"; + private static final String ALL_DATA_TYPES_TABLE = "AllDatatypeColumns"; + private static final String ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE = "AllDatatypeTransformation"; + private static final HashSet testInstances = new HashSet<>(); + private static PipelineLauncher.LaunchInfo jobInfo; + public static SpannerResourceManager spannerResourceManager; + private static SpannerResourceManager spannerMetadataResourceManager; + public static CassandraResourceManager cassandraResourceManager; + private static GcsResourceManager gcsResourceManager; + private static PubsubResourceManager pubsubResourceManager; + private SubscriptionName subscriptionName; + private final List assertionErrors = new ArrayList<>(); + + /** + * Setup resource managers and Launch dataflow job once during the execution of this test class. + * + * @throws IOException + */ + @Before + public void setUp() throws IOException { + skipBaseCleanup = true; + synchronized (SpannerToCassandraSourceDbIT.class) { + testInstances.add(this); + if (jobInfo == null) { + spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE); + spannerMetadataResourceManager = createSpannerMetadataDatabase(); + + cassandraResourceManager = generateKeyspaceAndBuildCassandraResource(); + gcsResourceManager = + GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials) + .build(); + createAndUploadCassandraConfigToGcs( + gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE); + createCassandraSchema(cassandraResourceManager, CASSANDRA_SCHEMA_FILE_RESOURCE); + pubsubResourceManager = setUpPubSubResourceManager(); + subscriptionName = + createPubsubResources( + getClass().getSimpleName(), + pubsubResourceManager, + getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, "")); + jobInfo = + launchDataflowJob( + gcsResourceManager, + spannerResourceManager, + spannerMetadataResourceManager, + subscriptionName.toString(), + null, + null, + null, + null, + null, + "cassandra"); + } + } + } + + /** + * Cleanup dataflow job and all the resources and resource managers. + * + * @throws IOException + */ + @AfterClass + public static void cleanUp() throws IOException { + for (SpannerToCassandraSourceDbIT instance : testInstances) { + instance.tearDownBase(); + } + ResourceManagerUtils.cleanResources( + spannerResourceManager, + cassandraResourceManager, + spannerMetadataResourceManager, + gcsResourceManager, + pubsubResourceManager); + } + + /** + * Tests the data flow from Spanner to Cassandra. + * + *

This test ensures that a basic row is successfully written to Spanner and subsequently + * appears in Cassandra, validating end-to-end data consistency. + * + * @throws InterruptedException if the thread is interrupted during execution. + * @throws IOException if an I/O error occurs during the test execution. + */ + @Test + public void spannerToCasandraSourceDbBasic() throws InterruptedException, IOException { + assertThatPipeline(jobInfo).isRunning(); + writeBasicRowInSpanner(); + assertBasicRowInCassandraDB(); + } + + /** + * Tests the data type conversion from Spanner to Cassandra. + * + *

This test ensures that all supported data types are correctly written to Spanner and + * subsequently retrieved from Cassandra, verifying data integrity and type conversions. + * + * @throws InterruptedException if the thread is interrupted during execution. + * @throws IOException if an I/O error occurs during the test execution. + * @throws MultipleFailureException if multiple assertions fail during validation. + */ + @Test + public void spannerToCassandraSourceAllDataTypeConversionTest() + throws InterruptedException, IOException, MultipleFailureException { + assertThatPipeline(jobInfo).isRunning(); + writeAllDataTypeRowsInSpanner(); + assertAllDataTypeRowsInCassandraDB(); + } + + /** + * Tests the conversion of string data types from Spanner to actual data type in Cassandra. + * + *

This test ensures that string-based data types are correctly written to Spanner and + * subsequently retrieved from Cassandra, verifying data integrity and conversion accuracy. + * + * @throws InterruptedException if the thread is interrupted during execution. + * @throws IOException if an I/O error occurs during the test execution. + * @throws MultipleFailureException if multiple assertions fail during validation. + */ + @Test + public void spannerToCassandraSourceDataTypeStringConversionTest() + throws InterruptedException, IOException, MultipleFailureException { + assertThatPipeline(jobInfo).isRunning(); + writeAllRowsAsStringInSpanner(); + assertStringToActualRowsInCassandraDB(); + } + + /** + * Retrieves the total row count of a specified table in Cassandra. + * + *

This method executes a `SELECT COUNT(*)` query on the given table and returns the number of + * rows present in it. + * + * @param tableName the name of the table whose row count is to be retrieved. + * @return the total number of rows in the specified table. + * @throws RuntimeException if the query does not return a result. + */ + private long getRowCount(String tableName) { + String query = String.format("SELECT COUNT(*) FROM %s", tableName); + ResultSet resultSet = cassandraResourceManager.executeStatement(query); + Row row = resultSet.one(); + if (row != null) { + return row.getLong(0); + } else { + throw new RuntimeException("Query did not return a result for table: " + tableName); + } + } + + /** + * Writes basic rows to multiple tables in Google Cloud Spanner. + * + *

This method performs the following operations: + * + *

    + *
  • Inserts or updates a row in the "users" table with an ID of 1. + *
  • Inserts or updates a row in the "users2" table with an ID of 2. + *
  • Executes a transactionally buffered insert/update operation in the "users" table with an + * ID of 3, using a transaction tag for tracking. + *
+ * + * The transaction uses a Spanner client with a specific transaction tag + * ("txBy=forwardMigration"). + */ + private void writeBasicRowInSpanner() { + Mutation m1 = + Mutation.newInsertOrUpdateBuilder("users") + .set("id") + .to(1) + .set("full_name") + .to("A") + .set("from") + .to("B") + .build(); + spannerResourceManager.write(m1); + + Mutation m2 = + Mutation.newInsertOrUpdateBuilder("users2") + .set("id") + .to(2) + .set("full_name") + .to("BB") + .build(); + spannerResourceManager.write(m2); + + // Write a single record to Spanner for the given logical shard + // Add the record with the transaction tag as txBy= + SpannerConfig spannerConfig = + SpannerConfig.create() + .withProjectId(PROJECT) + .withInstanceId(spannerResourceManager.getInstanceId()) + .withDatabaseId(spannerResourceManager.getDatabaseId()); + SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); + spannerAccessor + .getDatabaseClient() + .readWriteTransaction( + Options.tag("txBy=forwardMigration"), + Options.priority(spannerConfig.getRpcPriority().get())) + .run( + (TransactionRunner.TransactionCallable) + transaction -> { + Mutation m3 = + Mutation.newInsertOrUpdateBuilder("users") + .set("id") + .to(3) + .set("full_name") + .to("GG") + .set("from") + .to("BB") + .build(); + transaction.buffer(m3); + return null; + }); + } + + /** + * Asserts that a basic row exists in the Cassandra database. + * + *

This method performs the following steps: + * + *

    + *
  • Waits for the condition that ensures one row exists in the Cassandra table {@code + * USER_TABLE}. + *
  • Retrieves and logs rows from the Cassandra table. + *
  • Checks if exactly one row is present in the table. + *
  • Verifies that the row contains expected values for columns: {@code id}, {@code + * full_name}, and {@code from}. + *
+ * + * @throws InterruptedException if the thread is interrupted while waiting for the row count + * condition. + * @throws RuntimeException if reading from the Cassandra table fails. + */ + private void assertBasicRowInCassandraDB() throws InterruptedException { + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition( + createConfig(jobInfo, Duration.ofMinutes(10)), () -> getRowCount(USER_TABLE) == 1); + assertThatResult(result).meetsConditions(); + Iterable rows; + try { + LOG.info("Reading from Cassandra table: {}", USER_TABLE); + rows = cassandraResourceManager.readTable(USER_TABLE); + LOG.info("Cassandra Rows: {}", rows.toString()); + } catch (Exception e) { + throw new RuntimeException("Failed to read from Cassandra table: " + USER_TABLE, e); + } + + assertThat(rows).hasSize(1); + + Row row = rows.iterator().next(); + LOG.info("Cassandra Row to Assert: {}", row.toString()); + assertThat(row.getInt("id")).isEqualTo(1); + assertThat(row.getString("full_name")).isEqualTo("A"); + assertThat(row.getString("from")).isEqualTo("B"); + } + + /** + * Writes a row containing all supported data types into the Spanner database. + * + *

This method creates and inserts a row into the {@code ALL_DATA_TYPES_TABLE} with various + * data types, including text, numerical, date/time, boolean, byte arrays, lists, sets, and maps. + * The values are set explicitly to ensure compatibility with Spanner's schema. + * + *

The following data types are included: + * + *

    + *
  • {@code varchar_column} - String + *
  • {@code tinyint_column} - Integer + *
  • {@code text_column} - String + *
  • {@code date_column} - Date + *
  • {@code smallint_column} - Integer + *
  • {@code mediumint_column} - Integer + *
  • {@code int_column} - Integer + *
  • {@code bigint_column} - Long + *
  • {@code float_column} - Float + *
  • {@code double_column} - Double + *
  • {@code decimal_column} - BigDecimal + *
  • {@code datetime_column} - Timestamp + *
  • {@code timestamp_column} - Timestamp + *
  • {@code time_column} - String + *
  • {@code year_column} - String + *
  • {@code char_column} - String + *
  • {@code tinytext_column} - String + *
  • {@code mediumtext_column} - String + *
  • {@code longtext_column} - String + *
  • {@code enum_column} - String + *
  • {@code bool_column} - Boolean + *
  • {@code other_bool_column} - Boolean + *
  • {@code bytes_column} - ByteArray + *
  • {@code list_text_column} - JSON List of Strings + *
  • {@code list_int_column} - JSON List of Integers + *
  • {@code frozen_list_bigint_column} - JSON List of Big Integers + *
  • {@code set_text_column} - JSON Set of Strings + *
  • {@code set_date_column} - JSON Set of Dates + *
  • {@code frozen_set_bool_column} - JSON Set of Booleans + *
  • {@code map_text_to_int_column} - JSON Map of Strings to Integers + *
  • {@code map_date_to_text_column} - JSON Map of Dates to Strings + *
  • {@code frozen_map_int_to_bool_column} - JSON Map of Integers to Booleans + *
  • {@code map_text_to_list_column} - JSON Map of Strings to Lists + *
  • {@code map_text_to_set_column} - JSON Map of Strings to Sets + *
  • {@code set_of_maps_column} - JSON Set of Maps + *
  • {@code list_of_sets_column} - JSON List of Sets + *
  • {@code frozen_map_text_to_list_column} - JSON Map of Strings to Lists + *
  • {@code frozen_map_text_to_set_column} - JSON Map of Strings to Sets + *
  • {@code frozen_set_of_maps_column} - JSON Set of Maps + *
  • {@code frozen_list_of_sets_column} - JSON List of Sets + *
  • {@code varint_column} - String (Varint Representation) + *
  • {@code inet_column} - String (IP Address Representation) + *
+ * + * @throws RuntimeException if writing to Spanner fails. + */ + private void writeAllDataTypeRowsInSpanner() { + Mutation mutation = + Mutation.newInsertOrUpdateBuilder(ALL_DATA_TYPES_TABLE) + .set("varchar_column") + .to("SampleVarchar") + .set("tinyint_column") + .to(127) + .set("text_column") + .to("This is some sample text data for the text column.") + .set("date_column") + .to(Value.date(Date.fromJavaUtilDate(java.sql.Date.valueOf("2025-01-27")))) + .set("smallint_column") + .to(32767) + .set("mediumint_column") + .to(8388607) + .set("int_column") + .to(2147483647) + .set("bigint_column") + .to(9223372036854775807L) + .set("float_column") + .to(3.14159) + .set("double_column") + .to(2.718281828459045) + .set("decimal_column") + .to(new BigDecimal("12345.6789")) + .set("datetime_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) + .set("timestamp_column") + .to(Value.timestamp(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) + .set("time_column") + .to("12:30:00") + .set("year_column") + .to("2025") + .set("char_column") + .to("CHAR_DATA") + .set("tinytext_column") + .to("Short text for tinytext.") + .set("mediumtext_column") + .to("Longer text data for mediumtext column.") + .set("longtext_column") + .to("Very long text data that exceeds the medium text column length for long text.") + .set("enum_column") + .to("OptionA") + .set("bool_column") + .to(Value.bool(Boolean.TRUE)) + .set("other_bool_column") + .to(Value.bool(Boolean.FALSE)) + .set("bytes_column") + .to(Value.bytes(ByteArray.copyFrom("Hello world"))) + .set("list_text_column") + .to(Value.json("[\"apple\", \"banana\", \"cherry\"]")) + .set("list_int_column") + .to(Value.json("[1, 2, 3, 4, 5]")) + .set("frozen_list_bigint_column") + .to(Value.json("[123456789012345, 987654321012345]")) + .set("set_text_column") + .to(Value.json("[\"apple\", \"orange\", \"banana\"]")) + .set("set_date_column") + .to(Value.json("[\"2025-01-27\", \"2025-02-01\"]")) + .set("frozen_set_bool_column") + .to(Value.json("[true, false]")) + .set("map_text_to_int_column") + .to(Value.json("{\"key1\": 10, \"key2\": 20}")) + .set("map_date_to_text_column") + .to(Value.json("{\"2025-01-27\": \"event1\", \"2025-02-01\": \"event2\"}")) + .set("frozen_map_int_to_bool_column") + .to(Value.json("{\"1\": true, \"2\": false}")) + .set("map_text_to_list_column") + .to(Value.json("{\"fruit\": [\"apple\", \"banana\"], \"color\": [\"red\", \"green\"]}")) + .set("map_text_to_set_column") + .to( + Value.json( + "{\"fruit\": [\"apple\", \"banana\"], \"vegetables\": [\"carrot\", \"spinach\"]}")) + .set("set_of_maps_column") + .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) + .set("list_of_sets_column") + .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) + .set("frozen_map_text_to_list_column") + .to(Value.json("{\"fruits\": [\"apple\", \"banana\"]}")) + .set("frozen_map_text_to_set_column") + .to(Value.json("{\"vegetables\": [\"carrot\", \"spinach\"]}")) + .set("frozen_set_of_maps_column") + .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) + .set("frozen_list_of_sets_column") + .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) + .set("varint_column") + .to("123456789") + .set("inet_column") + .to("192.168.1.10") + .build(); + + spannerResourceManager.write(mutation); + } + + /** + * Executes multiple assertions and collects all assertion failures. + * + *

This method takes a variable number of {@link Runnable} assertions and executes them + * sequentially. If any assertions fail, their errors are collected, and a {@link + * MultipleFailureException} is thrown containing all assertion errors. + * + *

Usage Example: + * + *

+   * assertAll(
+   *     () -> assertEquals(expectedValue, actualValue),
+   *     () -> assertNotNull(someObject),
+   *     () -> assertTrue(condition)
+   * );
+   * 
+ * + * @param assertions One or more assertions provided as {@link Runnable} lambdas. + * @throws MultipleFailureException if one or more assertions fail. + */ + private void assertAll(Runnable... assertions) throws MultipleFailureException { + for (Runnable assertion : assertions) { + try { + assertion.run(); + } catch (AssertionError e) { + assertionErrors.add(e); + } + } + if (!assertionErrors.isEmpty()) { + throw new MultipleFailureException(assertionErrors); + } + } + + /** + * Validates that all data type rows inserted in Spanner have been correctly migrated and stored + * in Cassandra. + * + *

This method ensures that the data in the Cassandra table {@code ALL_DATA_TYPES_TABLE} + * matches the expected values after migration. It waits for the pipeline to process the data, + * reads the data from Cassandra, and asserts all column values. + * + *

Assertions: + * + *

    + *
  • Basic Data Types - Ensures correct values for varchar, bigint, bool, char, date, + * datetime, decimal, double, float. + *
  • Collections - Validates frozen lists, sets, and maps including nested structures. + *
  • Lists and Sets - Ensures list and set columns contain expected elements. + *
  • Maps - Validates various map column structures including text-to-int, date-to-text, and + * list/set mappings. + *
+ * + *

Example Usage: + * + *

+   * assertAllDataTypeRowsInCassandraDB();
+   * 
+ * + * @throws InterruptedException if the thread is interrupted while waiting for pipeline execution. + * @throws MultipleFailureException if multiple assertion failures occur. + */ + private void assertAllDataTypeRowsInCassandraDB() + throws InterruptedException, MultipleFailureException { + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition( + createConfig(jobInfo, Duration.ofMinutes(10)), + () -> getRowCount(ALL_DATA_TYPES_TABLE) == 1); + assertThatResult(result).meetsConditions(); + Iterable rows; + try { + rows = cassandraResourceManager.readTable(ALL_DATA_TYPES_TABLE); + } catch (Exception e) { + throw new RuntimeException("Failed to read from Cassandra table: " + ALL_DATA_TYPES_TABLE, e); + } + + assertThat(rows).hasSize(1); + + Row row = rows.iterator().next(); + + assertThat(rows).hasSize(1); + assertAll( + // Basic Data Types + () -> assertThat(row.getString("varchar_column")).isEqualTo("SampleVarchar"), + () -> assertThat(row.getLong("bigint_column")).isEqualTo(9223372036854775807L), + () -> assertThat(row.getBoolean("bool_column")).isTrue(), + () -> assertThat(row.getString("char_column")).isEqualTo("CHAR_DATA"), + () -> + assertThat(row.getLocalDate("date_column")) + .isEqualTo(java.time.LocalDate.of(2025, 1, 27)), + () -> + assertThat(row.getInstant("datetime_column")) + .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), + () -> + assertThat(row.getBigDecimal("decimal_column")).isEqualTo(new BigDecimal("12345.6789")), + () -> assertThat(row.getDouble("double_column")).isEqualTo(2.718281828459045), + () -> assertThat(row.getFloat("float_column")).isEqualTo(3.14159f), + + // Collections (frozen, list, set, map) + () -> + assertThat(row.getList("frozen_list_bigint_column", Long.class)) + .isEqualTo(Arrays.asList(123456789012345L, 987654321012345L)), + () -> + assertThat(row.getSet("frozen_set_bool_column", Boolean.class)) + .isEqualTo(new HashSet<>(Arrays.asList(false, true))), + () -> + assertThat(row.getMap("frozen_map_int_to_bool_column", Integer.class, Boolean.class)) + .isEqualTo(Map.of(1, true, 2, false)), + () -> + assertThat(row.getMap("frozen_map_text_to_list_column", String.class, List.class)) + .isEqualTo(Map.of("fruits", Arrays.asList("apple", "banana"))), + () -> + assertThat(row.getMap("frozen_map_text_to_set_column", String.class, Set.class)) + .isEqualTo(Map.of("vegetables", new HashSet<>(Arrays.asList("carrot", "spinach")))), + () -> + assertThat(row.getSet("frozen_set_of_maps_column", Map.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), + + // Lists and Sets + () -> + assertThat(row.getList("list_int_column", Integer.class)) + .isEqualTo(Arrays.asList(1, 2, 3, 4, 5)), + () -> + assertThat(row.getList("list_text_column", String.class)) + .isEqualTo(Arrays.asList("apple", "banana", "cherry")), + () -> + assertThat(row.getList("list_of_sets_column", Set.class)) + .isEqualTo( + Arrays.asList( + new HashSet<>(Arrays.asList("apple", "banana")), + new HashSet<>(Arrays.asList("carrot", "spinach")))), + + // Maps + () -> + assertThat( + row.getMap("map_date_to_text_column", java.time.LocalDate.class, String.class)) + .isEqualTo( + Map.of( + java.time.LocalDate.parse("2025-01-27"), "event1", + java.time.LocalDate.parse("2025-02-01"), "event2")), + () -> + assertThat(row.getMap("map_text_to_int_column", String.class, Integer.class)) + .isEqualTo(Map.of("key1", 10, "key2", 20)), + () -> + assertThat(row.getMap("map_text_to_list_column", String.class, List.class)) + .isEqualTo( + Map.of( + "color", + Arrays.asList("red", "green"), + "fruit", + Arrays.asList("apple", "banana"))), + () -> + assertThat(row.getMap("map_text_to_set_column", String.class, Set.class)) + .isEqualTo( + Map.of( + "fruit", + new HashSet<>(Arrays.asList("apple", "banana")), + "vegetables", + new HashSet<>(Arrays.asList("carrot", "spinach")))), + + // Sets + () -> + assertThat(row.getSet("set_date_column", java.time.LocalDate.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + java.time.LocalDate.parse("2025-01-27"), + java.time.LocalDate.parse("2025-02-01")))), + () -> + assertThat(row.getSet("set_text_column", String.class)) + .isEqualTo(new HashSet<>(Arrays.asList("apple", "orange", "banana"))), + () -> + assertThat(row.getSet("set_of_maps_column", Map.class)) + .isEqualTo( + new HashSet<>( + Arrays.asList( + Map.of("key1", 10, "key2", 20), Map.of("keyA", 5, "keyB", 10)))), + + // Other Basic Types + () -> assertThat(row.getShort("smallint_column")).isEqualTo((short) 32767), + () -> assertThat(row.getInt("mediumint_column")).isEqualTo(8388607), + () -> assertThat(row.getInt("int_column")).isEqualTo(2147483647), + () -> assertThat(row.getString("enum_column")).isEqualTo("OptionA"), + () -> assertThat(row.getString("year_column")).isEqualTo("2025"), + () -> + assertThat(row.getString("longtext_column")) + .isEqualTo( + "Very long text data that exceeds the medium text column length for long text."), + () -> assertThat(row.getString("tinytext_column")).isEqualTo("Short text for tinytext."), + () -> + assertThat(row.getString("mediumtext_column")) + .isEqualTo("Longer text data for mediumtext column."), + () -> + assertThat(row.getString("text_column")) + .isEqualTo("This is some sample text data for the text column."), + () -> + assertThat(row.getLocalTime("time_column")) + .isEqualTo(java.time.LocalTime.parse("12:30:00.000000000")), + () -> + assertThat(row.getInstant("timestamp_column")) + .isEqualTo(java.time.Instant.parse("2025-01-27T10:30:00.000Z")), + () -> + assertThat(row.getBigInteger("varint_column")) + .isEqualTo(java.math.BigInteger.valueOf(123456789L))); + } + + /** + * Inserts multiple rows into the Spanner table {@code ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE}, + * ensuring that all values are stored as strings, regardless of their original data type. + * + *

This method writes sample data to the Spanner table, converting all numerical, boolean, and + * date/time values to their string representations. This ensures compatibility for scenarios + * requiring string-based storage. + * + *

Columns and Data Mapping: + * + *

    + *
  • Basic Types: Strings, numbers (converted to strings), booleans. + *
  • Complex Types: JSON representations for lists, sets, and maps. + *
  • Temporal Types: Date, datetime, timestamp values stored as strings. + *
+ */ + private void writeAllRowsAsStringInSpanner() { + Mutation m; + m = + Mutation.newInsertOrUpdateBuilder(ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE) + .set("varchar_column") + .to("SampleVarchar") + .set("tinyint_column") + .to(String.valueOf(127)) + .set("text_column") + .to("This is some sample text data for the text column.") + .set("date_column") + .to(String.valueOf(Date.fromJavaUtilDate(java.sql.Date.valueOf("2025-01-27")))) + .set("smallint_column") + .to(String.valueOf(32767)) + .set("mediumint_column") + .to(String.valueOf(8388607)) + .set("int_column") + .to(String.valueOf(2147483647)) + .set("bigint_column") + .to(String.valueOf(9223372036854775807L)) + .set("float_column") + .to(String.valueOf(3.14159f)) + .set("double_column") + .to(String.valueOf(2.718281828459045)) + .set("decimal_column") + .to(new BigDecimal("12345.6789").toPlainString()) + .set("datetime_column") + .to(String.valueOf(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) + .set("timestamp_column") + .to(String.valueOf(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) + .set("time_column") + .to("12:30:00") + .set("year_column") + .to("2025") + .set("char_column") + .to("CHAR_DATA") + .set("tinytext_column") + .to("Short text for tinytext.") + .set("mediumtext_column") + .to("Longer text data for mediumtext column.") + .set("longtext_column") + .to("Very long text data that exceeds the medium text column length for long text.") + .set("enum_column") + .to("OptionA") + .set("bool_column") + .to(String.valueOf(Boolean.TRUE)) + .set("other_bool_column") + .to(String.valueOf(Boolean.FALSE)) + .set("list_text_column") + .to(Value.json("[\"apple\", \"banana\", \"cherry\"]")) + .set("list_int_column") + .to(Value.json("[1, 2, 3, 4, 5]")) + .set("frozen_list_bigint_column") + .to(Value.json("[123456789012345, 987654321012345]")) + .set("set_text_column") + .to(Value.json("[\"apple\", \"orange\", \"banana\"]")) + .set("set_date_column") + .to(Value.json("[\"2025-01-27\", \"2025-02-01\"]")) + .set("frozen_set_bool_column") + .to(Value.json("[true, false]")) + .set("map_text_to_int_column") + .to(Value.json("{\"key1\": 10, \"key2\": 20}")) + .set("map_date_to_text_column") + .to(Value.json("{\"2025-01-27\": \"event1\", \"2025-02-01\": \"event2\"}")) + .set("frozen_map_int_to_bool_column") + .to(Value.json("{\"1\": true, \"2\": false}")) + .set("map_text_to_list_column") + .to(Value.json("{\"fruit\": [\"apple\", \"banana\"], \"color\": [\"red\", \"green\"]}")) + .set("map_text_to_set_column") + .to( + Value.json( + "{\"fruit\": [\"apple\", \"banana\"], \"vegetables\": [\"carrot\", \"spinach\"]}")) + .set("set_of_maps_column") + .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) + .set("list_of_sets_column") + .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) + .set("frozen_map_text_to_list_column") + .to(Value.json("{\"fruits\": [\"apple\", \"banana\"]}")) + .set("frozen_map_text_to_set_column") + .to(Value.json("{\"vegetables\": [\"carrot\", \"spinach\"]}")) + .set("frozen_set_of_maps_column") + .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) + .set("frozen_list_of_sets_column") + .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) + .set("varint_column") + .to("123456789") + .build(); + + spannerResourceManager.write(m); + + m = + Mutation.newInsertOrUpdateBuilder(ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE) + .set("varchar_column") + .to("SampleVarchar2") + .set("tinyint_column") + .to(String.valueOf(127)) + .set("text_column") + .to("This is some sample text data for the text column.") + .set("date_column") + .to(String.valueOf(Date.fromJavaUtilDate(java.sql.Date.valueOf("2025-01-27")))) + .set("smallint_column") + .to(String.valueOf(32767)) + .set("mediumint_column") + .to(String.valueOf(8388607)) + .set("int_column") + .to(String.valueOf(2147483647)) + .set("bigint_column") + .to(String.valueOf(9223372036854775807L)) + .set("float_column") + .to(String.valueOf(3.14159f)) + .set("double_column") + .to(String.valueOf(2.718281828459045)) + .set("decimal_column") + .to(new BigDecimal("12345.6789").toPlainString()) + .set("datetime_column") + .to(String.valueOf(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) + .set("timestamp_column") + .to(String.valueOf(Timestamp.parseTimestamp("2025-01-27T10:30:00Z"))) + .set("time_column") + .to("12:30:00") + .set("year_column") + .to("2025") + .set("char_column") + .to("CHAR_DATA") + .set("tinytext_column") + .to("Short text for tinytext.") + .set("mediumtext_column") + .to("Longer text data for mediumtext column.") + .set("longtext_column") + .to("Very long text data that exceeds the medium text column length for long text.") + .set("enum_column") + .to("OptionA") + .set("bool_column") + .to(String.valueOf(Boolean.TRUE)) + .set("other_bool_column") + .to(String.valueOf(Boolean.FALSE)) + .set("list_text_column") + .to(Value.json("[\"apple\", \"banana\", \"cherry\"]")) + .set("list_int_column") + .to(Value.json("[1, 2, 3, 4, 5]")) + .set("frozen_list_bigint_column") + .to(Value.json("[123456789012345, 987654321012345]")) + .set("set_text_column") + .to(Value.json("[\"apple\", \"orange\", \"banana\"]")) + .set("set_date_column") + .to(Value.json("[\"2025-01-27\", \"2025-02-01\"]")) + .set("frozen_set_bool_column") + .to(Value.json("[true, false]")) + .set("map_text_to_int_column") + .to(Value.json("{\"key1\": 10, \"key2\": 20}")) + .set("map_date_to_text_column") + .to(Value.json("{\"2025-01-27\": \"event1\", \"2025-02-01\": \"event2\"}")) + .set("frozen_map_int_to_bool_column") + .to(Value.json("{\"1\": true, \"2\": false}")) + .set("map_text_to_list_column") + .to(Value.json("{\"fruit\": [\"apple\", \"banana\"], \"color\": [\"red\", \"green\"]}")) + .set("map_text_to_set_column") + .to( + Value.json( + "{\"fruit\": [\"apple\", \"banana\"], \"vegetables\": [\"carrot\", \"spinach\"]}")) + .set("set_of_maps_column") + .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) + .set("list_of_sets_column") + .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) + .set("frozen_map_text_to_list_column") + .to(Value.json("{\"fruits\": [\"apple\", \"banana\"]}")) + .set("frozen_map_text_to_set_column") + .to(Value.json("{\"vegetables\": [\"carrot\", \"spinach\"]}")) + .set("frozen_set_of_maps_column") + .to(Value.json("[{\"key1\": 10, \"key2\": 20}, {\"keyA\": 5, \"keyB\": 10}]")) + .set("frozen_list_of_sets_column") + .to(Value.json("[[\"apple\", \"banana\"], [\"carrot\", \"spinach\"]]")) + .set("varint_column") + .to("123456789") + .build(); + + spannerResourceManager.write(m); + } + + /** + * Validates that string-based data stored in Spanner is correctly converted to its actual data + * types when retrieved from Cassandra. + * + *

This method ensures that values stored as strings in Spanner are properly transformed into + * their expected data types in Cassandra. It performs the following: + * + *

    + *
  • Waits for the migration process to complete. + *
  • Reads and verifies that two rows are present in Cassandra. + *
  • Checks specific column values to confirm correct data type conversion. + *
+ * + *

Assertions Performed: + * + *

    + *
  • Verifies that {@code varchar_column} retains its expected string value. + *
  • Confirms that {@code tinyint_column} is correctly converted to a {@code byte}. + *
+ * + * @throws MultipleFailureException if multiple assertions fail during validation. + */ + private void assertStringToActualRowsInCassandraDB() throws MultipleFailureException { + PipelineOperator.Result result = + pipelineOperator() + .waitForCondition( + createConfig(jobInfo, Duration.ofMinutes(15)), + () -> getRowCount(ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE) == 2); + assertThatResult(result).meetsConditions(); + + Iterable rows; + try { + rows = cassandraResourceManager.readTable(ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE); + } catch (Exception e) { + throw new RuntimeException( + "Failed to read from Cassandra table: " + ALL_DATA_TYPES_CUSTOM_CONVERSION_TABLE, e); + } + + assertThat(rows).hasSize(2); + Row row = rows.iterator().next(); + assertAll( + () -> assertThat(row.getString("varchar_column")).isEqualTo("SampleVarchar2"), + () -> assertThat(row.getByte("tinyint_column")).isEqualTo((byte) 127)); + } +} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCassandraIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCassandraIT.java deleted file mode 100644 index eb93ef5020..0000000000 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCassandraIT.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Copyright (C) 2025 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.templates; - -import static com.google.common.truth.Truth.assertThat; -import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; -import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; - -import com.datastax.oss.driver.api.core.cql.ResultSet; -import com.datastax.oss.driver.api.core.cql.Row; -import com.google.cloud.spanner.Mutation; -import com.google.cloud.spanner.Options; -import com.google.cloud.spanner.TransactionRunner; -import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; -import com.google.cloud.teleport.metadata.TemplateIntegrationTest; -import com.google.pubsub.v1.SubscriptionName; -import java.io.IOException; -import java.time.Duration; -import java.util.HashSet; -import org.apache.beam.it.common.PipelineLauncher; -import org.apache.beam.it.common.PipelineOperator; -import org.apache.beam.it.common.utils.ResourceManagerUtils; -import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; -import org.apache.beam.it.gcp.spanner.SpannerResourceManager; -import org.apache.beam.it.gcp.storage.GcsResourceManager; -import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor; -import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) -@TemplateIntegrationTest(SpannerToSourceDb.class) -@RunWith(JUnit4.class) -public class SpannerToSourceDbCassandraIT extends SpannerToCassandraDbITBase { - - private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDbCassandraIT.class); - - private static final String SPANNER_DDL_RESOURCE = - "SpannerToCassandraSourceIT/spanner-schema.sql"; - private static final String CASSANDRA_SCHEMA_FILE_RESOURCE = - "SpannerToCassandraSourceIT/cassandra-schema.sql"; - private static final String CASSANDRA_CONFIG_FILE_RESOURCE = - "SpannerToCassandraSourceIT/cassandra-config-template.conf"; - - private static final String TABLE = "Users"; - private static final HashSet testInstances = new HashSet<>(); - private static PipelineLauncher.LaunchInfo jobInfo; - public static SpannerResourceManager spannerResourceManager; - private static SpannerResourceManager spannerMetadataResourceManager; - public static CassandraSharedResourceManager cassandraResourceManager; - private static GcsResourceManager gcsResourceManager; - private static PubsubResourceManager pubsubResourceManager; - private SubscriptionName subscriptionName; - - /** - * Setup resource managers and Launch dataflow job once during the execution of this test class. - * - * @throws IOException - */ - @Before - public void setUp() throws IOException { - skipBaseCleanup = true; - synchronized (SpannerToSourceDbCassandraIT.class) { - testInstances.add(this); - if (jobInfo == null) { - spannerResourceManager = createSpannerDatabase(SPANNER_DDL_RESOURCE); - spannerMetadataResourceManager = createSpannerMetadataDatabase(); - - cassandraResourceManager = generateKeyspaceAndBuildCassandraResource(); - gcsResourceManager = - GcsResourceManager.builder(artifactBucketName, getClass().getSimpleName(), credentials) - .build(); - createAndUploadCassandraConfigToGcs( - gcsResourceManager, cassandraResourceManager, CASSANDRA_CONFIG_FILE_RESOURCE); - createCassandraSchema(cassandraResourceManager, CASSANDRA_SCHEMA_FILE_RESOURCE); - pubsubResourceManager = setUpPubSubResourceManager(); - subscriptionName = - createPubsubResources( - getClass().getSimpleName(), - pubsubResourceManager, - getGcsPath("dlq", gcsResourceManager).replace("gs://" + artifactBucketName, "")); - jobInfo = - launchDataflowJob( - gcsResourceManager, - spannerResourceManager, - spannerMetadataResourceManager, - subscriptionName.toString(), - null, - null, - null, - null, - null); - } - } - } - - /** - * Cleanup dataflow job and all the resources and resource managers. - * - * @throws IOException - */ - @AfterClass - public static void cleanUp() throws IOException { - for (SpannerToSourceDbCassandraIT instance : testInstances) { - instance.tearDownBase(); - } - ResourceManagerUtils.cleanResources( - spannerResourceManager, - cassandraResourceManager, - spannerMetadataResourceManager, - gcsResourceManager, - pubsubResourceManager); - } - - @Test - public void spannerToCasandraSourceDbBasic() throws InterruptedException, IOException { - assertThatPipeline(jobInfo).isRunning(); - writeRowInSpanner(); - assertRowInCassandraDB(); - } - - private long getRowCount() { - String query = String.format("SELECT COUNT(*) FROM %s", TABLE); - ResultSet resultSet = cassandraResourceManager.executeStatement(query); - Row row = resultSet.one(); - if (row != null) { - return row.getLong(0); - } else { - throw new RuntimeException("Query did not return a result for table: " + TABLE); - } - } - - private void writeRowInSpanner() { - Mutation m1 = - Mutation.newInsertOrUpdateBuilder("users") - .set("id") - .to(1) - .set("full_name") - .to("A") - .set("from") - .to("B") - .build(); - spannerResourceManager.write(m1); - - Mutation m2 = - Mutation.newInsertOrUpdateBuilder("users2") - .set("id") - .to(2) - .set("full_name") - .to("BB") - .build(); - spannerResourceManager.write(m2); - - // Write a single record to Spanner for the given logical shard - // Add the record with the transaction tag as txBy= - SpannerConfig spannerConfig = - SpannerConfig.create() - .withProjectId(PROJECT) - .withInstanceId(spannerResourceManager.getInstanceId()) - .withDatabaseId(spannerResourceManager.getDatabaseId()); - SpannerAccessor spannerAccessor = SpannerAccessor.getOrCreate(spannerConfig); - spannerAccessor - .getDatabaseClient() - .readWriteTransaction( - Options.tag("txBy=forwardMigration"), - Options.priority(spannerConfig.getRpcPriority().get())) - .run( - (TransactionRunner.TransactionCallable) - transaction -> { - Mutation m3 = - Mutation.newInsertOrUpdateBuilder("users") - .set("id") - .to(3) - .set("full_name") - .to("GG") - .set("from") - .to("BB") - .build(); - transaction.buffer(m3); - return null; - }); - } - - private void assertRowInCassandraDB() throws InterruptedException { - PipelineOperator.Result result = - pipelineOperator() - .waitForCondition( - createConfig(jobInfo, Duration.ofMinutes(10)), () -> getRowCount() == 1); - assertThatResult(result).meetsConditions(); - Iterable rows; - try { - LOG.info("Reading from Cassandra table: {}", TABLE); - rows = cassandraResourceManager.readTable(TABLE); - LOG.info("Cassandra Rows: {}", rows.toString()); - } catch (Exception e) { - throw new RuntimeException("Failed to read from Cassandra table: " + TABLE, e); - } - - assertThat(rows).hasSize(1); - - Row row = rows.iterator().next(); - LOG.info("Cassandra Row to Assert: {}", row.toString()); - assertThat(row.getInt("id")).isEqualTo(1); - assertThat(row.getString("full_name")).isEqualTo("A"); - assertThat(row.getString("from")).isEqualTo("B"); - } -} diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java index e58488283d..6a9b87291f 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomShardIT.java @@ -123,6 +123,7 @@ public void setUp() throws IOException, InterruptedException { "input/customShard.jar", "com.custom.CustomShardIdFetcherForIT", null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java index dab4dc27d8..b8b7bbec4e 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbCustomTransformationIT.java @@ -132,7 +132,8 @@ public void setUp() throws IOException, InterruptedException { null, null, null, - customTransformation); + customTransformation, + null); } } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java index dbd023cdef..2a701cda02 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbDatatypeIT.java @@ -118,6 +118,7 @@ public void setUp() throws IOException { null, null, null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java index 7c3ad39760..d1b960e816 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbIT.java @@ -114,6 +114,7 @@ public void setUp() throws IOException { null, null, null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java index 64d15895cd..ee582179de 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java @@ -25,14 +25,22 @@ import com.google.gson.JsonObject; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; +import java.io.BufferedReader; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; +import java.time.format.DateTimeFormatter; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.utils.IORedirectUtil; import org.apache.beam.it.common.utils.PipelineUtils; +import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.gcp.TemplateTestBase; import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils; import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; @@ -53,14 +61,27 @@ protected SpannerResourceManager createSpannerDatabase(String spannerSchemaFile) SpannerResourceManager.builder("rr-main-" + testName, PROJECT, REGION) .maybeUseStaticInstance() .build(); - String ddl = - String.join( - " ", - Resources.readLines(Resources.getResource(spannerSchemaFile), StandardCharsets.UTF_8)); - ddl = ddl.trim(); - String[] ddls = ddl.split(";"); + + String ddl; + try (InputStream inputStream = + Thread.currentThread().getContextClassLoader().getResourceAsStream(spannerSchemaFile)) { + if (inputStream == null) { + throw new FileNotFoundException("Resource file not found: " + spannerSchemaFile); + } + try (BufferedReader reader = + new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + ddl = reader.lines().collect(Collectors.joining("\n")); + } + } + + if (ddl.isBlank()) { + throw new IllegalStateException("DDL file is empty: " + spannerSchemaFile); + } + + String[] ddls = ddl.trim().split(";"); for (String d : ddls) { - if (!d.isBlank()) { + d = d.trim(); + if (!d.isEmpty()) { spannerResourceManager.executeDdlStatement(d); } } @@ -116,6 +137,68 @@ protected void createAndUploadShardConfigToGcs( gcsResourceManager.createArtifact("input/shard.json", shardFileContents); } + protected CassandraResourceManager generateKeyspaceAndBuildCassandraResource() { + String keyspaceName = + ResourceManagerUtils.generateResourceId( + testName, + Pattern.compile("[/\\\\. \"\u0000$]"), + "-", + 27, + DateTimeFormatter.ofPattern("yyyyMMdd-HHmmss-SSSSSS")) + .replace('-', '_'); + if (keyspaceName.length() > 48) { + keyspaceName = keyspaceName.substring(0, 48); + } + return CassandraResourceManager.builder(testName) + .setKeyspaceName(keyspaceName) + .sePreGeneratedKeyspaceName(true) + .build(); + } + + protected void createCassandraSchema( + CassandraResourceManager cassandraResourceManager, String cassandraSchemaFile) + throws IOException { + String ddl = + String.join( + " ", + Resources.readLines( + Resources.getResource(cassandraSchemaFile), StandardCharsets.UTF_8)); + ddl = ddl.trim(); + String[] ddls = ddl.split(";"); + for (String d : ddls) { + if (!d.isBlank()) { + cassandraResourceManager.execute(d); + } + } + } + + public void createAndUploadCassandraConfigToGcs( + GcsResourceManager gcsResourceManager, + CassandraResourceManager cassandraResourceManagers, + String cassandraConfigFile) + throws IOException { + + String host = cassandraResourceManagers.getHost(); + int port = cassandraResourceManagers.getPort(); + String keyspaceName = cassandraResourceManagers.getKeyspaceName(); + String cassandraConfigContents; + try (InputStream inputStream = + Thread.currentThread().getContextClassLoader().getResourceAsStream(cassandraConfigFile)) { + if (inputStream == null) { + throw new FileNotFoundException("Resource file not found: " + cassandraConfigFile); + } + cassandraConfigContents = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8); + } + + cassandraConfigContents = + cassandraConfigContents + .replace("##host##", host) + .replace("##port##", Integer.toString(port)) + .replace("##keyspace##", keyspaceName); + + gcsResourceManager.createArtifact("input/cassandra-config.conf", cassandraConfigContents); + } + public PipelineLauncher.LaunchInfo launchDataflowJob( GcsResourceManager gcsResourceManager, SpannerResourceManager spannerResourceManager, @@ -125,9 +208,9 @@ public PipelineLauncher.LaunchInfo launchDataflowJob( String shardingCustomJarPath, String shardingCustomClassName, String sourceDbTimezoneOffset, - CustomTransformation customTransformation) + CustomTransformation customTransformation, + String sourceType) throws IOException { - // default parameters Map params = new HashMap<>() { @@ -138,13 +221,18 @@ public PipelineLauncher.LaunchInfo launchDataflowJob( put("spannerProjectId", PROJECT); put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId()); put("metadataInstance", spannerMetadataResourceManager.getInstanceId()); - put("sourceShardsFilePath", getGcsPath("input/shard.json", gcsResourceManager)); + put( + "sourceShardsFilePath", + getGcsPath( + sourceType != null ? "input/cassandra-config.conf" : "input/shard.json", + gcsResourceManager)); put("changeStreamName", "allstream"); put("dlqGcsPubSubSubscription", subscriptionName); put("deadLetterQueueDirectory", getGcsPath("dlq", gcsResourceManager)); put("maxShardConnections", "5"); put("maxNumWorkers", "1"); put("numWorkers", "1"); + put("sourceType", sourceType); } }; diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java index 1f5acdc952..cf71cd2c3e 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbInterleaveMultiShardIT.java @@ -124,6 +124,7 @@ public void setUp() throws IOException { null, null, null, + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java index 1ab2d78b49..43cee80730 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbTimezoneIT.java @@ -113,6 +113,7 @@ public void setUp() throws IOException { null, null, "+10:00", + null, null); } } diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-config-template.conf b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-config-template.conf deleted file mode 100644 index 97486a8de0..0000000000 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-config-template.conf +++ /dev/null @@ -1,12 +0,0 @@ - # Configuration for the DataStax Java driver for Apache Cassandra®. - # This file is in HOCON format, see https://github.com/typesafehub/config/blob/master/HOCON.md. - # This file is meant to be used only in unit tests to test loading configuration from file. - # DO NOT USE FOR PRODUCTION. - - datastax-java-driver { - basic.contact-points = ["##host##:##port##"] - basic.session-keyspace = "##keyspace##" - basic.load-balancing-policy { - local-datacenter = "datacenter1" - } - } diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql deleted file mode 100644 index 1a9d6a37ca..0000000000 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/cassandra-schema.sql +++ /dev/null @@ -1,43 +0,0 @@ -CREATE TABLE AllDatatypeColumns ( - varchar_column text PRIMARY KEY, - tinyint_column tinyint, - text_column text, - date_column date, - smallint_column smallint, - mediumint_column int, - int_column int, - bigint_column bigint, - float_column float, - double_column double, - decimal_column decimal, - datetime_column timestamp, - timestamp_column timestamp, - time_column time, - year_column text, - char_column text, - tinytext_column text, - mediumtext_column text, - longtext_column text, - enum_column text, - bool_column boolean, - other_bool_column boolean, - bytes_column BLOB, - list_text_column list, - list_int_column list, - frozen_list_bigint_column frozen>, - set_text_column set, - set_date_column set, - frozen_set_bool_column frozen>, - map_text_to_int_column map, - map_date_to_text_column map, - frozen_map_int_to_bool_column frozen>, - map_text_to_list_column map>>, - map_text_to_set_column map>>, - set_of_maps_column set>>, - list_of_sets_column list>>, - frozen_map_text_to_list_column map>>, - frozen_map_text_to_set_column map>>, - frozen_set_of_maps_column set>>, - frozen_list_of_sets_column list>>, - varint_column varint -); diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql deleted file mode 100644 index 4cb32f2778..0000000000 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceDbDatatypeIT/spanner-schema.sql +++ /dev/null @@ -1,49 +0,0 @@ -CREATE TABLE IF NOT EXISTS alldatatypecolumns ( - varchar_column STRING(20) NOT NULL, - tinyint_column INT64, - text_column STRING(MAX), - date_column DATE, - smallint_column INT64, - mediumint_column INT64, - int_column INT64, - bigint_column INT64, - float_column FLOAT64, - double_column FLOAT64, - decimal_column NUMERIC, - datetime_column TIMESTAMP, - timestamp_column TIMESTAMP, - time_column STRING(MAX), - year_column STRING(MAX), - char_column STRING(10), - tinytext_column STRING(MAX), - mediumtext_column STRING(MAX), - longtext_column STRING(MAX), - enum_column STRING(MAX), - bool_column BOOL, - other_bool_column BOOL, - bytes_column BYTES(MAX), - list_text_column JSON, - list_int_column JSON, - frozen_list_bigint_column JSON, - set_text_column JSON, - set_date_column JSON, - frozen_set_bool_column JSON, - map_text_to_int_column JSON, - map_date_to_text_column JSON, - frozen_map_int_to_bool_column JSON, - map_text_to_list_column JSON, - map_text_to_set_column JSON, - set_of_maps_column JSON, - list_of_sets_column JSON, - frozen_map_text_to_list_column JSON, - frozen_map_text_to_set_column JSON, - frozen_set_of_maps_column JSON, - frozen_list_of_sets_column JSON, - varint_column BYTES(MAX) -) PRIMARY KEY(varchar_column); - -CREATE CHANGE STREAM allstream - FOR ALL OPTIONS ( - value_capture_type = 'NEW_ROW', - retention_period = '7d' - ); \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql index e220267733..3fcc26b3bf 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/cassandra-schema.sql @@ -2,4 +2,92 @@ CREATE TABLE users ( id int PRIMARY KEY, full_name text, "from" text +); + +CREATE TABLE AllDatatypeTransformation ( + varchar_column text PRIMARY KEY, + tinyint_column tinyint, + text_column text, + date_column date, + smallint_column smallint, + mediumint_column int, + int_column int, + bigint_column bigint, + float_column float, + double_column double, + decimal_column decimal, + datetime_column timestamp, + timestamp_column timestamp, + time_column time, + year_column text, + char_column text, + tinytext_column text, + mediumtext_column text, + longtext_column text, + enum_column text, + bool_column boolean, + other_bool_column boolean, + list_text_column list, + list_int_column list, + frozen_list_bigint_column frozen>, + set_text_column set, + set_date_column set, + frozen_set_bool_column frozen>, + map_text_to_int_column map, + map_date_to_text_column map, + frozen_map_int_to_bool_column frozen>, + map_text_to_list_column map>>, + map_text_to_set_column map>>, + set_of_maps_column set>>, + list_of_sets_column list>>, + frozen_map_text_to_list_column map>>, + frozen_map_text_to_set_column map>>, + frozen_set_of_maps_column set>>, + frozen_list_of_sets_column list>>, + varint_column varint +); + +CREATE TABLE AllDatatypeColumns ( + varchar_column text PRIMARY KEY, + tinyint_column tinyint, + text_column text, + date_column date, + smallint_column smallint, + mediumint_column int, + int_column int, + bigint_column bigint, + float_column float, + double_column double, + decimal_column decimal, + datetime_column timestamp, + timestamp_column timestamp, + time_column time, + year_column text, + char_column text, + tinytext_column text, + mediumtext_column text, + longtext_column text, + enum_column text, + bool_column boolean, + other_bool_column boolean, + bytes_column BLOB, + list_text_column list, + list_int_column list, + frozen_list_bigint_column frozen>, + set_text_column set, + set_date_column set, + frozen_set_bool_column frozen>, + map_text_to_int_column map, + map_date_to_text_column map, + frozen_map_int_to_bool_column frozen>, + map_text_to_list_column map>>, + map_text_to_set_column map>>, + set_of_maps_column set>>, + list_of_sets_column list>>, + frozen_map_text_to_list_column map>>, + frozen_map_text_to_set_column map>>, + frozen_set_of_maps_column set>>, + frozen_list_of_sets_column list>>, + varint_column varint, + inet_column INET ); \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql index 5cedd597e1..0b6ae0b7a6 100644 --- a/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToCassandraSourceIT/spanner-schema.sql @@ -9,6 +9,94 @@ CREATE TABLE IF NOT EXISTS users2 ( full_name STRING(25), ) PRIMARY KEY(id); +CREATE TABLE IF NOT EXISTS alldatatypetransformation ( + varchar_column STRING(20) NOT NULL, + tinyint_column STRING(MAX), + text_column STRING(MAX), + date_column STRING(MAX), + smallint_column STRING(MAX), + mediumint_column STRING(MAX), + int_column STRING(MAX), + bigint_column STRING(MAX), + float_column STRING(MAX), + double_column STRING(MAX), + decimal_column STRING(MAX), + datetime_column STRING(MAX), + timestamp_column STRING(MAX), + time_column STRING(MAX), + year_column STRING(MAX), + char_column STRING(10), + tinytext_column STRING(MAX), + mediumtext_column STRING(MAX), + longtext_column STRING(MAX), + enum_column STRING(MAX), + bool_column STRING(MAX), + other_bool_column STRING(MAX), + list_text_column JSON, + list_int_column JSON, + frozen_list_bigint_column JSON, + set_text_column JSON, + set_date_column JSON, + frozen_set_bool_column JSON, + map_text_to_int_column JSON, + map_date_to_text_column JSON, + frozen_map_int_to_bool_column JSON, + map_text_to_list_column JSON, + map_text_to_set_column JSON, + set_of_maps_column JSON, + list_of_sets_column JSON, + frozen_map_text_to_list_column JSON, + frozen_map_text_to_set_column JSON, + frozen_set_of_maps_column JSON, + frozen_list_of_sets_column JSON, + varint_column STRING(MAX) +) PRIMARY KEY(varchar_column); + +CREATE TABLE IF NOT EXISTS alldatatypecolumns ( + varchar_column STRING(20) NOT NULL, + tinyint_column INT64, + text_column STRING(MAX), + date_column DATE, + smallint_column INT64, + mediumint_column INT64, + int_column INT64, + bigint_column INT64, + float_column FLOAT64, + double_column FLOAT64, + decimal_column NUMERIC, + datetime_column TIMESTAMP, + timestamp_column TIMESTAMP, + time_column STRING(MAX), + year_column STRING(MAX), + char_column STRING(10), + tinytext_column STRING(MAX), + mediumtext_column STRING(MAX), + longtext_column STRING(MAX), + enum_column STRING(MAX), + bool_column BOOL, + other_bool_column BOOL, + bytes_column BYTES(MAX), + list_text_column JSON, + list_int_column JSON, + frozen_list_bigint_column JSON, + set_text_column JSON, + set_date_column JSON, + frozen_set_bool_column JSON, + map_text_to_int_column JSON, + map_date_to_text_column JSON, + frozen_map_int_to_bool_column JSON, + map_text_to_list_column JSON, + map_text_to_set_column JSON, + set_of_maps_column JSON, + list_of_sets_column JSON, + frozen_map_text_to_list_column JSON, + frozen_map_text_to_set_column JSON, + frozen_set_of_maps_column JSON, + frozen_list_of_sets_column JSON, + varint_column STRING(MAX), + inet_column STRING(MAX) +) PRIMARY KEY(varchar_column); + CREATE CHANGE STREAM allstream FOR ALL OPTIONS ( value_capture_type = 'NEW_ROW',