Migrate and Validate Tables between Origin and Target Cassandra Clusters.
Important
Please note this job has been tested with spark version 3.5.6
- Get the latest image that includes all dependencies from DockerHub
- All migration tools (
cassandra-data-migrator+dsbulk+cqlsh) would be available in the/assets/folder of the container
- All migration tools (
- Download the latest jar file
using one of the approaches below,
curl -L0 https://github.com/datastax/cassandra-data-migrator/releases/download/x.y.z/cassandra-data-migrator-x.y.z.jar --output cassandra-data-migrator-x.y.z.jar(OR)- Download from packages area here
- Java11 (minimum) as Spark binaries are compiled with it.
- Spark
3.5.xwith Scala2.13and Hadoop3.3- Typically installed using this binary on a single VM (no cluster necessary) where you want to run this job. This simple setup is recommended for most one-time migrations.
- However we recommend using a Spark Cluster or a Spark Serverless platform like
DatabricksorGoogle Dataproc(that supports the above mentioned versions) for large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.
Spark can be installed by running the following: -
wget https://archive.apache.org/dist/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3-scala2.13.tgz
tar -xvzf spark-3.5.6-bin-hadoop3-scala2.13.tgz
Caution
If the above Spark and Scala version does not match, you may see an exception like below when running the CDM jobs,
Exception in thread "main" java.lang.NoSuchMethodError: 'void scala.runtime.Statics.releaseFence()'
Note
If you only have Scala version 2.12.x available in your environment, you can update these two lines in pom.xml with the specific Scala 2.12.x version, build the jar locally as shown here and use that CDM jar instead.
Note
When deploying CDM on a Spark cluster, replace the params --master "local[*]" with --master "spark://master-host:port" and remove any params (e.g. --driver-memory, --executor-memory, etc.) related to a single VM run
cdm.propertiesfile needs to be configured as applicable for the environment. The file can have any name, it does not need to becdm.properties.- A sample properties file with default values can be found here as cdm.properties
- A complete reference properties file with default values can be found here as cdm-detailed.properties
- Place the properties file where it can be accessed while running the job via spark-submit.
- Run the job using
spark-submitcommand as shown below:
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.Migrate cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
Note:
- Above command generates a log file
logfile_name_*.txtto avoid log output on the console. - Update the memory options (driver & executor memory) based on your use-case
- To track details of a run (recorded on the
targetkeyspace), pass param--conf spark.cdm.trackRun=true - To filter records only for a specific token range, pass the below two additional params to the
MigrationORValidationjob
--conf spark.cdm.filter.cassandra.partition.min=<token-range-min>
--conf spark.cdm.filter.cassandra.partition.max=<token-range-max>
- To run the job in Data validation mode, use class option
--class com.datastax.cdm.job.DiffDataas shown below
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- Validation job will report differences as “ERRORS” in the log file as shown below.
23/04/06 08:43:06 ERROR DiffJobSession: Mismatch row found for key: [key3] Mismatch: Target Index: 1 Origin: valueC Target: value999)
23/04/06 08:43:06 ERROR DiffJobSession: Corrected mismatch row in target: [key3]
23/04/06 08:43:06 ERROR DiffJobSession: Missing target row found for key: [key2]
23/04/06 08:43:06 ERROR DiffJobSession: Inserted missing row in target: [key2]
- Please grep for all
ERRORfrom the output log files to get the list of missing and mismatched records.- Note that it lists differences by primary-key values.
- If you would like to redirect such logs (rows with details of
missingandmismatchedrows) into a separate file, you could use thelog4j2.propertiesfile provided here as shown below
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.executor.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
--conf spark.driver.extraJavaOptions='-Dlog4j.configurationFile=log4j2.properties' \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.DiffData cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- The Validation job can also be run in an AutoCorrect mode. This mode can
- Add any missing records from
origintotarget - Update any mismatched records between
originandtarget
- Add any missing records from
- Enable/disable this feature using one or both of the below params in the properties file
spark.cdm.autocorrect.missing false|true
spark.cdm.autocorrect.mismatch false|true
Important
The validation job will never delete records from target i.e. it only adds or updates data on target
- You can rerun/resume the last Migration or Validation job that stopped (or completed with some errors) for any reasons by setting
spark.cdm.trackRun.autoReruntotrue(default isfalse) as shown below. This will auto-discover the progress of the last job and will resume from that point i.e. it will skip any token-ranges from the previous run that were migrated (or validated) successfully.
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.trackRun.autoRerun=true \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- You can also resume a specific earlier job (not the last one) that stopped (or completed with some errors) for any reasons by passing a specific
spark.cdm.trackRun.previousRunIdas shown below
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.trackRun.previousRunId=<prev_run_id> \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.<Migrate|DiffData> cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
- This mode can help identify large fields on an
origintable that may break you cluster guardrails (e.g. AstraDB has a 10MB limit for a single large field), use class option--class com.datastax.cdm.job.GuardrailCheckas shown below
spark-submit --properties-file cdm.properties \
--conf spark.cdm.schema.origin.keyspaceTable="<keyspacename>.<tablename>" \
--conf spark.cdm.feature.guardrail.colSizeInKB=10000 \
--master "local[*]" --driver-memory 25G --executor-memory 25G \
--class com.datastax.cdm.job.GuardrailCheck cassandra-data-migrator-5.x.x.jar &> logfile_name_$(date +%Y%m%d_%H_%M).txt
Note
This mode only operates on one database i.e. origin, there is no target in this mode
- Auto-detects table schema (column names, types, keys, collections, UDTs, etc.)
- Including counter table Counter tables
- Rerun/Resume a previous job that may have stopped for any reason (killed, had exceptions, etc.)
- If you rerun a
validationjob, it will include any token-ranges that had differences in the previous run
- If you rerun a
- Preserve writetimes and TTLs
- Supports migration/validation of advanced DataTypes (Sets, Lists, Maps, UDTs)
- Filter records from
Originusingwritetimeand/or CQL conditions and/or a list of token-ranges - Perform guardrail checks (identify large fields)
- Supports adding
constantsas new columns onTarget - Supports expanding
Mapcolumns onOrigininto multiple records onTarget - Supports extracting value from a JSON column in
Originand map it to a specific field onTarget - Can be deployed on a Spark Cluster or a single VM
- Fully containerized (Docker and K8s friendly)
- SSL Support (including custom cipher algorithms)
- Migrate from any Cassandra
Origin(Apache Cassandra® / DataStax Enterprise™ / DataStax Astra DB™) to any CassandraTarget(Apache Cassandra® / DataStax Enterprise™ / DataStax Astra DB™) - Automatic download of Secure Connect Bundles for Astra DB using the DevOps API
- Supports migration/validation from and to Azure Cosmos Cassandra
- Validate migration accuracy and performance using a smaller randomized data-set
- Supports adding custom fixed
writetimeand/orttl - Track run information (start-time, end-time, run-metrics, status, etc.) in tables (
cdm_run_infoandcdm_run_details) on the target keyspace
- Each run (Migration or Validation) can be tracked (when enabled). You can find summary and details of the same in tables
cdm_run_infoandcdm_run_detailsin the target keyspace. - CDM does not migrate
ttl&writetimeat the field-level (for optimization reasons). It instead finds the field with the highestttl& the field with the highestwritetimewithin anoriginrow and uses those values on the entiretargetrow. - CDM ignores using collection and UDT fields for
ttl&writetimecalculations by default for performance reasons. If you want to include such fields, setspark.cdm.schema.ttlwritetime.calc.useCollectionsparam totrue. - If a table has only collection and/or UDT non-key columns and no table-level
ttlconfiguration, the target will have nottl, which can lead to inconsistencies betweenoriginandtargetas rows expire onorigindue tottlexpiry. If you want to avoid this, we recommend settingspark.cdm.schema.ttlwritetime.calc.useCollectionsparam totruein such scenarios. - If a table has only collection and/or UDT non-key columns, the
writetimeused on target will be time the job was run. If you want to avoid this, we recommend settingspark.cdm.schema.ttlwritetime.calc.useCollectionsparam totruein such scenarios. - CDM uses
UNSETvalue for null fields (including empty texts) to avoid creating (or carrying forward) tombstones during row creation. - When CDM migration (or validation with autocorrect) is run multiple times on the same table (for whatever reasons), it could lead to duplicate entries in
listtype columns. Note this is due to a Cassandra/DSE bug and not a CDM issue. This issue can be addressed by enabling and setting a positive value forspark.cdm.transform.custom.writetime.incrementByparam. This param was specifically added to address this issue. - When you rerun job to resume from a previous run, the run metrics (read, write, skipped, etc.) captured in table
cdm_run_infowill be only for the current run. If the previous run was killed for some reasons, its run metrics may not have been saved. If the previous run did complete (not killed) but with errors, then you will have all run metrics from previous run as well. - When running on a Spark Cluster (and not a single VM), the rate-limit values (
spark.cdm.perfops.ratelimit.origin&spark.cdm.perfops.ratelimit.target) applies to individual Spark worker nodes. Hence this value should be set to the effective-rate-limit-you-need/number-of-spark-worker-nodes . E.g. If you need an effective rate-limit of 10000, and the number of Spark worker nodes are 4, then you should set the above rate-limit params to a value of 2500.
Below recommendations may only be useful when migrating large tables where the default performance is not good enough
- Performance bottleneck are usually the result of
- Low resource availability on
OriginORTargetcluster - Low resource availability on CDM VMs, see recommendations here
- Bad schema design which could be caused by out of balance
Origincluster, large partitions (> 100 MB), large rows (> 10MB) and/or high column count.
- Low resource availability on
- Incorrect configuration of below properties may negatively impact performance
numParts: Default is 5K, but ideal value is usually around table-size/10MB.batchSize: Default is 5, but this should be set to 1 for tables where primary-key=partition-key OR where average row-size is > 20 KB. Similarly, this should be set to a value > 5, if row-size is small (< 1KB) and most partitions have several rows (100+).fetchSizeInRows: Default is 1K and this usually works fine. However you can reduce this as needed if your table has many large rows (over 100KB).ratelimit: Default is20000, but this property should usually be updated (after updating other properties) to the highest possible value that youroriginandtargetclusters can efficiently handle.
- Using schema manipulation features (like
constantColumns,explodeMap,extractJson), transformation functions and/or where-filter-conditions (except partition min/max) may negatively impact performance - We typically recommend this infrastructure for CDM VMs and this starter conf. You can then optimize the job further based on CDM params info provided above and the observed load and throughput on
OriginandTargetclusters - We recommend using a Spark Cluster or a Spark Serverless platform like
DatabricksorGoogle Dataprocfor large (e.g. several terabytes) complex migrations OR when CDM is used as a long-term data-transfer utility and not a one-time job.
Note
For additional performance tuning, refer to details mentioned in the cdm-detailed.properties file here
- Clone this repo
- Move to the repo folder
cd cassandra-data-migrator - Run the build
mvn clean package(Needs Maven 3.9.x) - The fat jar (
cassandra-data-migrator-5.x.x.jar) file should now be present in thetargetfolder
Checkout all our wonderful contributors here.