diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java b/connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java index fa0d328b8..bae20b6d0 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java @@ -2,12 +2,14 @@ import lombok.AllArgsConstructor; import lombok.Builder; +import lombok.Data; import lombok.NoArgsConstructor; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import picocli.CommandLine.Option; import javax.sql.DataSource; +@Data @Builder @NoArgsConstructor @AllArgsConstructor diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java index dd6723835..9c5e771f1 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java @@ -10,6 +10,7 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; +import javax.sql.DataSource; import java.util.Map; @Slf4j @@ -23,10 +24,13 @@ public class DatabaseExportCommand extends AbstractExportCommand> builder = new JdbcBatchItemWriterBuilder<>(); builder.itemSqlParameterSourceProvider(MapSqlParameterSource::new); - log.info("Creating data source {}", dataSourceOptions); - builder.dataSource(dataSourceOptions.dataSource()); + builder.dataSource(dataSource); builder.sql(exportOptions.getSql()); builder.assertUpdates(exportOptions.isAssertUpdates()); JdbcBatchItemWriter> writer = builder.build(); diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java index a9bf9e411..8dfbd614d 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java @@ -2,6 +2,7 @@ import com.redislabs.riot.AbstractImportCommand; import com.redislabs.riot.KeyValueProcessingOptions; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.database.JdbcCursorItemReader; @@ -13,11 +14,12 @@ import javax.sql.DataSource; import java.util.Map; +@Slf4j @Command(name = "import", description = "Import from a database") public class DatabaseImportCommand extends AbstractImportCommand, Map> { @Mixin - private DataSourceOptions options = DataSourceOptions.builder().build(); + private DataSourceOptions dataSourceOptions = DataSourceOptions.builder().build(); @Mixin private DatabaseImportOptions importOptions = DatabaseImportOptions.builder().build(); @Mixin @@ -25,7 +27,10 @@ public class DatabaseImportCommand extends AbstractImportCommand> builder = new JdbcCursorItemReaderBuilder<>(); builder.saveState(false); builder.dataSource(dataSource); @@ -35,7 +40,7 @@ protected Flow flow() throws Exception { if (importOptions.getMaxRows() != null) { builder.maxRows(importOptions.getMaxRows()); } - builder.name("database-reader"); + builder.name(name + "-database-reader"); if (importOptions.getQueryTimeout() != null) { builder.queryTimeout(importOptions.getQueryTimeout()); } @@ -45,7 +50,6 @@ protected Flow flow() throws Exception { builder.verifyCursorPosition(importOptions.isVerifyCursorPosition()); JdbcCursorItemReader> reader = builder.build(); reader.afterPropertiesSet(); - String name = dataSource.getConnection().getMetaData().getDatabaseProductName(); return flow(step(name + "-db-import-step", "Importing from " + name, reader).build()); } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java index 10851912a..1b271dfa4 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java @@ -6,26 +6,33 @@ import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.item.support.AbstractItemStreamItemReader; import org.springframework.core.io.Resource; +import org.springframework.util.ObjectUtils; import picocli.CommandLine; import picocli.CommandLine.Command; import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; @Command public abstract class AbstractFileImportCommand extends AbstractImportCommand { + @SuppressWarnings("unused") @CommandLine.Parameters(arity = "1..*", description = "One ore more files or URLs", paramLabel = "FILE") - private String[] files = new String[0]; + private String[] files; @Getter @CommandLine.Mixin private FileOptions fileOptions = FileOptions.builder().build(); @Override protected Flow flow() throws Exception { - String[] expandedFiles = FileUtils.expand(files); - if (expandedFiles.length == 0) { + List expandedFiles = FileUtils.expand(files); + if (ObjectUtils.isEmpty(expandedFiles)) { throw new FileNotFoundException("File not found: " + String.join(", ", files)); } List steps = new ArrayList<>(); diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java index 652075ec9..d1576c900 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java @@ -18,9 +18,11 @@ import org.springframework.batch.item.support.AbstractItemStreamItemReader; import org.springframework.batch.item.xml.XmlItemReader; import org.springframework.core.io.Resource; +import org.springframework.util.ObjectUtils; import picocli.CommandLine; import picocli.CommandLine.Command; +import java.io.FileNotFoundException; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -38,7 +40,10 @@ public class DataStructureFileImportCommand extends AbstractTransferCommand expandedFiles = FileUtils.expand(files); + if (ObjectUtils.isEmpty(expandedFiles)) { + throw new FileNotFoundException("File not found: " + String.join(", ", files)); + } List steps = new ArrayList<>(); DataStructureItemProcessor processor = new DataStructureItemProcessor(); for (String file : expandedFiles) { diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java index 770d44134..60d9c26d6 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java @@ -1,42 +1,34 @@ package com.redislabs.riot.file; -import lombok.*; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.Range; import picocli.CommandLine.Option; -import java.util.ArrayList; -import java.util.List; - @Data @Builder @AllArgsConstructor @NoArgsConstructor public class FileImportOptions { - @Getter - @Builder.Default @Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names", paramLabel = "") - private List names = new ArrayList<>(); + private String[] names; @Option(names = {"-h", "--header"}, description = "Delimited/FW first line contains field names") private boolean header; @Option(names = "--delimiter", description = "Delimiter character", paramLabel = "") private String delimiter; @Option(names = "--skip", description = "Delimited/FW lines to skip at start", paramLabel = "") private Integer linesToSkip; - @Getter - @Builder.Default @Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based)", paramLabel = "") - private List includedFields = new ArrayList<>(); - @Getter - @Builder.Default + private int[] includedFields; @Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges", paramLabel = "") - private List columnRanges = new ArrayList<>(); - @Getter + private Range[] columnRanges; @Builder.Default @Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE})", paramLabel = "") private Character quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER; - @Getter @Builder.Default @Option(names = "--continuation", description = "Line continuation string (default: ${DEFAULT-VALUE})", paramLabel = "") private String continuationString = "\\"; diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileUtils.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileUtils.java index 0c293f48d..bfdd10a04 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileUtils.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileUtils.java @@ -233,8 +233,10 @@ public static XmlItemReader xmlReader(Resource resource, Class clazz) return xmlReaderBuilder.build(); } - - public static String[] expand(String... files) throws IOException { + public static List expand(String... files) throws IOException { + if (files == null) { + return null; + } List expandedFiles = new ArrayList<>(); for (String file : files) { if (isFile(file)) { @@ -254,7 +256,7 @@ public static String[] expand(String... files) throws IOException { expandedFiles.add(file); } } - return expandedFiles.toArray(new String[0]); + return expandedFiles; } } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java index cc2eb1b70..79be95da3 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java @@ -12,12 +12,12 @@ import org.springframework.batch.item.file.transform.AbstractLineTokenizer; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.FixedLengthTokenizer; -import org.springframework.batch.item.file.transform.Range; import org.springframework.batch.item.json.JsonItemReader; import org.springframework.batch.item.support.AbstractItemStreamItemReader; import org.springframework.batch.item.xml.XmlItemReader; import org.springframework.core.io.Resource; import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; import picocli.CommandLine; import picocli.CommandLine.Command; @@ -40,15 +40,15 @@ protected AbstractItemStreamItemReader> reader(String file, DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setDelimiter(options.delimiter(file)); tokenizer.setQuoteCharacter(options.getQuoteCharacter()); - if (!options.getIncludedFields().isEmpty()) { - tokenizer.setIncludedFields(options.getIncludedFields().stream().mapToInt(i -> i).toArray()); + if (!ObjectUtils.isEmpty(options.getIncludedFields())) { + tokenizer.setIncludedFields(options.getIncludedFields()); } log.info("Creating delimited reader with {} for file {}", options, file); return flatFileReader(resource, tokenizer); case FIXED: FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer(); Assert.notEmpty(options.getColumnRanges(), "Column ranges are required"); - fixedLengthTokenizer.setColumns(options.getColumnRanges().toArray(new Range[0])); + fixedLengthTokenizer.setColumns(options.getColumnRanges()); log.info("Creating fixed-width reader with {} for file {}", options, file); return flatFileReader(resource, fixedLengthTokenizer); case JSON: @@ -67,7 +67,9 @@ protected ItemProcessor, Map> processor() { } private FlatFileItemReader> flatFileReader(Resource resource, AbstractLineTokenizer tokenizer) { - tokenizer.setNames(options.getNames().toArray(new String[0])); + if (!ObjectUtils.isEmpty(options.getNames())) { + tokenizer.setNames(options.getNames()); + } FlatFileItemReaderBuilder> builder = new FlatFileItemReaderBuilder<>(); builder.resource(resource); builder.encoding(getFileOptions().getEncoding()); diff --git a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java index adeb2db3d..f1b5bf207 100644 --- a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java +++ b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java @@ -4,42 +4,28 @@ import com.redislabs.riot.AbstractImportCommand; import com.redislabs.riot.KeyValueProcessingOptions; import io.lettuce.core.RedisURI; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.item.ItemProcessor; import picocli.CommandLine; import picocli.CommandLine.Command; -import picocli.CommandLine.Option; -import picocli.CommandLine.Parameters; import java.util.LinkedHashMap; -import java.util.Locale; import java.util.Map; +@Slf4j @Command(name = "import", description = "Import generated data") public class GenerateCommand extends AbstractImportCommand, Map> { - @Parameters(description = "SpEL expressions", paramLabel = "SPEL") - private Map fakerFields = new LinkedHashMap<>(); - @SuppressWarnings("unused") - @Option(names = "--introspect", description = "Use given search index to introspect Faker fields", paramLabel = "") - private String fakerIndex; - @Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "") - private Locale locale = Locale.ENGLISH; - @SuppressWarnings("unused") - @Option(names = "--metadata", description = "Include metadata (index, partition)") - private boolean includeMetadata; - @Option(names = "--start", description = "Start index (default: ${DEFAULT-VALUE})", paramLabel = "") - private long start = 0; - @Option(names = "--end", description = "End index (default: ${DEFAULT-VALUE})", paramLabel = "") - private long end = 1000; - @Option(names = "--sleep", description = "Duration in ms to sleep before each item generation (default: ${DEFAULT-VALUE})", paramLabel = "") - private long sleep = 0; + @CommandLine.Mixin + private GenerateOptions options = GenerateOptions.builder().build(); @CommandLine.Mixin private KeyValueProcessingOptions processingOptions = KeyValueProcessingOptions.builder().build(); @Override protected Flow flow() { - FakerItemReader reader = FakerItemReader.builder().locale(locale).includeMetadata(includeMetadata).fields(fakerFields(getRedisURI())).start(start).end(end).sleep(sleep).build(); + log.info("Creating Faker reader with {}", options); + FakerItemReader reader = FakerItemReader.builder().locale(options.getLocale()).includeMetadata(options.isIncludeMetadata()).fields(fakerFields(getRedisURI())).start(options.getStart()).end(options.getEnd()).sleep(options.getSleep()).build(); return flow(step("generate-step", "Generating", reader).build()); } @@ -57,14 +43,14 @@ private String expression(Field field) { } private Map fakerFields(RedisURI uri) { - Map fields = new LinkedHashMap<>(fakerFields); - if (fakerIndex == null) { + Map fields = options.getFakerFields() == null ? new LinkedHashMap<>() : new LinkedHashMap<>(options.getFakerFields()); + if (options.getFakerIndex() == null) { return fields; } RediSearchClient client = RediSearchClient.create(uri); try (StatefulRediSearchConnection connection = client.connect()) { RediSearchCommands commands = connection.sync(); - IndexInfo info = RediSearchUtils.getInfo(commands.ftInfo(fakerIndex)); + IndexInfo info = RediSearchUtils.getInfo(commands.ftInfo(options.getFakerIndex())); for (Field field : info.getFields()) { fields.put(field.getName(), expression(field)); } diff --git a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateOptions.java b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateOptions.java new file mode 100644 index 000000000..90eb92e63 --- /dev/null +++ b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateOptions.java @@ -0,0 +1,38 @@ +package com.redislabs.riot.gen; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import picocli.CommandLine; + +import java.util.Locale; +import java.util.Map; + +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class GenerateOptions { + + @CommandLine.Parameters(description = "SpEL expressions", paramLabel = "SPEL") + private Map fakerFields; + @SuppressWarnings("unused") + @CommandLine.Option(names = "--introspect", description = "Use given search index to introspect Faker fields", paramLabel = "") + private String fakerIndex; + @Builder.Default + @CommandLine.Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "") + private Locale locale = Locale.ENGLISH; + @SuppressWarnings("unused") + @CommandLine.Option(names = "--metadata", description = "Include metadata (index, partition)") + private boolean includeMetadata; + @Builder.Default + @CommandLine.Option(names = "--start", description = "Start index (default: ${DEFAULT-VALUE})", paramLabel = "") + private long start = 0; + @Builder.Default + @CommandLine.Option(names = "--end", description = "End index (default: ${DEFAULT-VALUE})", paramLabel = "") + private long end = 1000; + @Builder.Default + @CommandLine.Option(names = "--sleep", description = "Duration in ms to sleep before each item generation (default: ${DEFAULT-VALUE})", paramLabel = "") + private long sleep = 0; +} diff --git a/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java b/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java index 22f72762c..0825632e3 100644 --- a/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java +++ b/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java @@ -9,6 +9,7 @@ import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection; import io.lettuce.core.pubsub.StatefulRedisPubSubConnection; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPool; import org.springframework.batch.core.job.builder.FlowBuilder; import org.springframework.batch.core.job.flow.Flow; @@ -27,13 +28,14 @@ import java.time.Duration; +@Slf4j @Command(name = "replicate", description = "Replicate a source Redis database to a target Redis database") public class ReplicateCommand extends AbstractTransferCommand, KeyValue> { @CommandLine.ArgGroup(exclusive = false, heading = "Target Redis connection options%n") - private RedisOptions targetRedis = RedisOptions.builder().build(); + private RedisOptions targetRedisOptions = RedisOptions.builder().build(); @CommandLine.ArgGroup(exclusive = false, heading = "Source Redis reader options%n") - private RedisReaderOptions options = RedisReaderOptions.builder().build(); + private RedisReaderOptions readerOptions = RedisReaderOptions.builder().build(); @SuppressWarnings("unused") @Option(names = "--live", description = "Enable live replication.") private boolean live; @@ -53,8 +55,8 @@ public class ReplicateCommand extends AbstractTransferCommand pubSubConnection(AbstractRedisClient client) { if (client instanceof RedisClusterClient) { + log.info("Establishing Redis cluster pub/sub connection"); return ((RedisClusterClient) client).connectPubSub(); } + log.info("Establishing Redis pub/sub connection"); return ((RedisClient) client).connectPubSub(); } @@ -97,6 +101,7 @@ protected Flow flow() { StepBuilder, KeyValue> liveReplicationStep = stepBuilder("live-replication-step", liveReplicationName); KeyDumpItemReader liveReader = liveReader(); liveReader.setName("Live" + ClassUtils.getShortName(liveReader.getClass())); + log.info("Configuring live transfer with {}", flushingOptions); SimpleFlow liveFlow = flow(liveReplicationName).start(flushingOptions.configure(liveReplicationStep.reader(liveReader).writer(targetKeyDumpWriter()).build()).build()).build(); flow = flow(liveReplicationName).split(new SimpleAsyncTaskExecutor()).add(liveFlow, flow.build()); } @@ -118,8 +123,9 @@ private String message(KeyComparisonItemWriter writer) { @SuppressWarnings("unchecked") private KeyComparisonItemWriter comparisonWriter() { + log.info("Creating key comparator with TTL tolerance of {} seconds", ttlTolerance); Duration ttlToleranceDuration = Duration.ofSeconds(ttlTolerance); - if (targetRedis.isCluster()) { + if (targetRedisOptions.isCluster()) { DataStructureItemReader targetReader = configureScanReader(DataStructureItemReader.builder((GenericObjectPool>) targetPool, (StatefulRedisClusterConnection) targetConnection)).build(); return new KeyComparisonItemWriter<>(targetReader, ttlToleranceDuration); } @@ -152,29 +158,35 @@ private KeyDumpItemReader liveReader() { } private > B configureScanReader(B builder) { - configureReader(builder.scanMatch(options.getScanMatch()).scanCount(options.getScanCount()).sampleSize(options.getSampleSize())); + log.info("Configuring scan reader with {}", readerOptions); + configureReader(builder.scanMatch(readerOptions.getScanMatch()).scanCount(readerOptions.getScanCount()).sampleSize(readerOptions.getSampleSize())); return builder; } private > B configureLiveReader(B builder) { - configureReader(builder.keyPattern(options.getScanMatch()).notificationQueueCapacity(notificationQueueCapacity).database(getRedisURI().getDatabase()).flushingInterval(flushingOptions.getFlushIntervalDuration()).idleTimeout(flushingOptions.getIdleTimeoutDuration())); + log.info("Configuring live reader with {}, {}, queueCapacity={}", readerOptions, flushingOptions, notificationQueueCapacity); + configureReader(builder.keyPattern(readerOptions.getScanMatch()).notificationQueueCapacity(notificationQueueCapacity).database(getRedisURI().getDatabase()).flushingInterval(flushingOptions.getFlushIntervalDuration()).idleTimeout(flushingOptions.getIdleTimeoutDuration())); return builder; } private > void configureReader(B builder) { - configureCommandTimeoutBuilder(builder.threadCount(options.getThreads()).chunkSize(options.getBatchSize()).queueCapacity(options.getQueueCapacity())); + configureCommandTimeoutBuilder(builder.threadCount(readerOptions.getThreads()).chunkSize(readerOptions.getBatchSize()).queueCapacity(readerOptions.getQueueCapacity())); } @SuppressWarnings("unchecked") private ItemWriter> targetKeyDumpWriter() { - if (targetRedis.isCluster()) { - return KeyDumpItemWriter.clusterBuilder((GenericObjectPool>) targetPool).replace(true).commandTimeout(getTargetCommandTimeout()).build(); + if (targetRedisOptions.isCluster()) { + log.info("Creating Redis cluster key dump writer"); + return configure(KeyDumpItemWriter.clusterBuilder((GenericObjectPool>) targetPool)).build(); } - return KeyDumpItemWriter.builder((GenericObjectPool>) targetPool).replace(true).commandTimeout(getTargetCommandTimeout()).build(); + log.info("Creating Redis key dump writer"); + return configure(KeyDumpItemWriter.builder((GenericObjectPool>) targetPool)).build(); } - private Duration getTargetCommandTimeout() { - return targetRedis.uris().get(0).getTimeout(); + private KeyDumpItemWriter.KeyDumpItemWriterBuilder configure(KeyDumpItemWriter.KeyDumpItemWriterBuilder builder) { + Duration commandTimeout = targetRedisOptions.uris().get(0).getTimeout(); + log.info("Setting key dump writer command timeout to {}", commandTimeout); + return builder.replace(true).commandTimeout(commandTimeout); } } diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java index 872b27dd9..9397eede1 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java @@ -13,6 +13,7 @@ import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Producer; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.kafka.support.serializer.JsonSerializer; +import org.springframework.util.ObjectUtils; import org.springframework.util.unit.DataSize; import picocli.CommandLine.Option; @@ -36,9 +37,8 @@ public enum SerDe { @Getter @Option(names = "--schema-registry-url", description = "Schema registry URL", paramLabel = "") private String schemaRegistryUrl; - @Builder.Default @Option(names = {"-p", "--property"}, arity = "1..*", description = "Additional consumer properties", paramLabel = "") - private Map properties = new HashMap<>(); + private Map properties; @Builder.Default @Option(names = "--serde", description = "Serializer/Deserializer: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "") private SerDe serde = SerDe.JSON; @@ -63,7 +63,9 @@ private Map properties() { if (schemaRegistryUrl != null) { properties.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); } - properties.putAll(this.properties); + if (!ObjectUtils.isEmpty(this.properties)) { + properties.putAll(this.properties); + } return properties; } diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java index 7d555fdf2..d7f65aa4b 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java @@ -10,6 +10,7 @@ import io.lettuce.core.XReadArgs.StreamOffset; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.flow.Flow; @@ -18,6 +19,8 @@ import org.springframework.core.convert.converter.Converter; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; import picocli.CommandLine; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -25,19 +28,19 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +@Slf4j @Command(name = "export", description = "Import Redis streams into Kafka topics") public class StreamExportCommand extends AbstractTransferCommand, ProducerRecord> { @SuppressWarnings("unused") @Parameters(arity = "1..*", description = "One ore more streams to read from", paramLabel = "STREAM") - private List streams; + private String[] streams; @CommandLine.Mixin private KafkaOptions options = KafkaOptions.builder().build(); @CommandLine.Mixin private FlushingTransferOptions flushingOptions = FlushingTransferOptions.builder().build(); - @Option(names = "--block", description = "XREAD block time in millis (default: ${DEFAULT-VALUE})", hidden = true, paramLabel = "") - private long block = 100; @Option(names = "--offset", description = "XREAD offset (default: ${DEFAULT-VALUE})", paramLabel = "") private String offset = "0-0"; @SuppressWarnings("unused") @@ -46,6 +49,7 @@ public class StreamExportCommand extends AbstractTransferCommand steps = new ArrayList<>(); for (String stream : streams) { StepBuilder, ProducerRecord> step = stepBuilder(stream + "-stream-export-step", "Exporting from " + stream); @@ -56,13 +60,17 @@ protected Flow flow() { private StreamItemReader reader(StreamOffset offset) { if (isCluster()) { + log.info("Creating cluster stream reader with offset {}", offset); return StreamItemReader.builder((StatefulRedisClusterConnection) connection).offset(offset).build(); } + log.info("Creating stream reader with offset {}", offset); return StreamItemReader.builder((StatefulRedisConnection) connection).offset(offset).build(); } private KafkaItemWriter writer() { - return KafkaItemWriter.builder().kafkaTemplate(new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(options.producerProperties()))).build(); + Map producerProperties = options.producerProperties(); + log.info("Creating Kafka writer with producer properties {}", producerProperties); + return KafkaItemWriter.builder().kafkaTemplate(new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(producerProperties))).build(); } private ItemProcessor, ProducerRecord> processor() { diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java index bdae09391..a0e13fa8c 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java @@ -10,6 +10,7 @@ import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.batch.core.Step; @@ -18,6 +19,8 @@ import org.springframework.batch.item.redis.support.CommandBuilder; import org.springframework.batch.item.redis.support.CommandItemWriter; import org.springframework.core.convert.converter.Converter; +import org.springframework.util.Assert; +import org.springframework.util.ObjectUtils; import picocli.CommandLine; import picocli.CommandLine.Command; import picocli.CommandLine.Option; @@ -26,15 +29,17 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.function.BiFunction; +@Slf4j @Command(name = "import", description = "Import Kafka topics into Redis streams") public class StreamImportCommand extends AbstractTransferCommand, ConsumerRecord> { @SuppressWarnings("unused") @Parameters(arity = "1..*", description = "One ore more topics to read from", paramLabel = "TOPIC") - private List topics; + private String[] topics; @CommandLine.Mixin private KafkaOptions options = KafkaOptions.builder().build(); @SuppressWarnings("unused") @@ -51,9 +56,13 @@ public class StreamImportCommand extends AbstractTransferCommand steps = new ArrayList<>(); + Properties consumerProperties = options.consumerProperties(); + log.info("Using Kafka consumer properties: {}", consumerProperties); for (String topic : topics) { - KafkaItemReader reader = new KafkaItemReaderBuilder().partitions(0).consumerProperties(options.consumerProperties()).partitions(0).name(topic).saveState(false).topic(topic).build(); + log.info("Creating Kafka reader for topic {}", topic); + KafkaItemReader reader = new KafkaItemReaderBuilder().partitions(0).consumerProperties(consumerProperties).partitions(0).name(topic).saveState(false).topic(topic).build(); StepBuilder, ConsumerRecord> step = stepBuilder(topic + "-stream-import-step", "Importing from " + topic); steps.add(step.reader(reader).writer(writer()).build().build()); } @@ -65,8 +74,10 @@ private ItemWriter> writer() { XAddArgs xAddArgs = xAddArgs(); BiFunction, ConsumerRecord, RedisFuture> command = CommandBuilder.>xadd().keyConverter(keyConverter()).argsConverter(r -> xAddArgs).bodyConverter(bodyConverter()).build(); if (isCluster()) { + log.info("Creating cluster stream writer"); return CommandItemWriter.>clusterBuilder((GenericObjectPool>) pool, (BiFunction) command).build(); } + log.info("Creating stream writer"); return CommandItemWriter.>builder((GenericObjectPool>) pool, (BiFunction) command).build(); } diff --git a/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java b/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java index 2f42fe7b6..5cb83d22f 100644 --- a/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java +++ b/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java @@ -11,6 +11,7 @@ import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.core.convert.converter.Converter; +import org.springframework.util.ObjectUtils; import picocli.CommandLine.Option; import java.text.SimpleDateFormat; @@ -21,33 +22,29 @@ @AllArgsConstructor public class KeyValueProcessingOptions { - @Builder.Default @Option(arity = "1..*", names = "--spel", description = "SpEL expression to produce a field", paramLabel = "") - private Map spelFields = new HashMap<>(); - @Builder.Default + private Map spelFields; @Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context", paramLabel = "") - private Map variables = new HashMap<>(); + private Map variables; @Builder.Default @Option(names = "--date", description = "Processor date format (default: ${DEFAULT-VALUE})", paramLabel = "") private String dateFormat = new SimpleDateFormat().toPattern(); - @Builder.Default @Option(arity = "1..*", names = "--regex", description = "Extract named values from source field using regex", paramLabel = "") - private Map regexes = new HashMap<>(); - @Builder.Default + private Map regexes; @Option(arity = "1..*", names = "--filter", description = "SpEL expression to filter records", paramLabel = "") - private List filters = new ArrayList<>(); + private String[] filters; public ItemProcessor, Map> processor(StatefulConnection connection) { List, Map>> processors = new ArrayList<>(); - if (!spelFields.isEmpty()) { + if (!ObjectUtils.isEmpty(spelFields)) { processors.add(new SpelProcessor(connection, new SimpleDateFormat(dateFormat), variables, spelFields)); } - if (!regexes.isEmpty()) { + if (!ObjectUtils.isEmpty(regexes)) { Map>> fields = new LinkedHashMap<>(); regexes.forEach((f, r) -> fields.put(f, RegexNamedGroupsExtractor.builder().regex(r).build())); processors.add(new MapProcessor(fields)); } - if (!filters.isEmpty()) { + if (!ObjectUtils.isEmpty(filters)) { processors.add(new FilteringProcessor(filters)); } if (processors.isEmpty()) { diff --git a/core/src/main/java/com/redislabs/riot/RedisOptions.java b/core/src/main/java/com/redislabs/riot/RedisOptions.java index b42a8bd22..964b635b2 100644 --- a/core/src/main/java/com/redislabs/riot/RedisOptions.java +++ b/core/src/main/java/com/redislabs/riot/RedisOptions.java @@ -1,9 +1,6 @@ package com.redislabs.riot; -import io.lettuce.core.ClientOptions; -import io.lettuce.core.RedisClient; -import io.lettuce.core.RedisURI; -import io.lettuce.core.SslOptions; +import io.lettuce.core.*; import io.lettuce.core.api.StatefulConnection; import io.lettuce.core.cluster.ClusterClientOptions; import io.lettuce.core.cluster.RedisClusterClient; @@ -14,7 +11,9 @@ import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; import lombok.*; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.springframework.util.ObjectUtils; import picocli.CommandLine.Option; import java.io.File; @@ -22,6 +21,7 @@ import java.util.ArrayList; import java.util.List; +@Slf4j @Builder @NoArgsConstructor @AllArgsConstructor @@ -47,9 +47,8 @@ public class RedisOptions { private String username; @Option(names = {"-a", "--pass"}, arity = "0..1", interactive = true, description = "Password to use when connecting to the server.", paramLabel = "") private char[] password; - @Builder.Default @Option(names = {"-u", "--uri"}, arity = "0..*", description = "Server URI.", paramLabel = "") - private List uris = new ArrayList<>(); + private List uris; @Builder.Default @Option(names = "--timeout", description = "Redis command timeout (default: ${DEFAULT-VALUE}).", paramLabel = "") private long timeout = DEFAULT_TIMEOUT; @@ -86,16 +85,16 @@ public class RedisOptions { private String clientName; public List uris() { - List uris = new ArrayList<>(this.uris); - if (uris.isEmpty()) { - RedisURI uri = new RedisURI(); - uri.setHost(host); - uri.setPort(port); + List redisURIs = new ArrayList<>(); + if (ObjectUtils.isEmpty(uris)) { + RedisURI uri = RedisURI.create(host, port); uri.setSocket(socket); uri.setSsl(tls); - uris.add(uri); + redisURIs.add(uri); + } else { + redisURIs.addAll(this.uris); } - for (RedisURI uri : uris) { + for (RedisURI uri : redisURIs) { uri.setVerifyPeer(verifyPeer); if (username != null) { uri.setUsername(username); @@ -113,7 +112,7 @@ public List uris() { uri.setClientName(clientName); } } - return uris; + return redisURIs; } private ClientResources clientResources() { @@ -167,4 +166,12 @@ public > GenericObjectPoolConfig return config; } + public AbstractRedisClient client() { + if (cluster) { + log.info("Creating Redis cluster client: {}", this); + return redisClusterClient(); + } + log.info("Creating Redis client: {}", this); + return redisClient(); + } } diff --git a/core/src/main/java/com/redislabs/riot/RiotCommand.java b/core/src/main/java/com/redislabs/riot/RiotCommand.java index acabeb634..6aede3e5b 100644 --- a/core/src/main/java/com/redislabs/riot/RiotCommand.java +++ b/core/src/main/java/com/redislabs/riot/RiotCommand.java @@ -58,9 +58,8 @@ public Integer call() throws Exception { @Override public void afterPropertiesSet() throws Exception { - this.client = client(app.getRedisOptions()); + this.client = app.getRedisOptions().client(); this.pool = pool(app.getRedisOptions(), client); - log.info("Connecting to {}", app.getRedisOptions().uris()); this.connection = connection(client); } @@ -77,13 +76,6 @@ public void shutdown() { } } - protected AbstractRedisClient client(RedisOptions redisOptions) { - if (redisOptions.isCluster()) { - return redisOptions.redisClusterClient(); - } - return redisOptions.redisClient(); - } - protected BaseRedisCommands sync() { if (connection instanceof StatefulRedisClusterConnection) { return ((StatefulRedisClusterConnection) connection).sync(); diff --git a/core/src/main/java/com/redislabs/riot/processor/FilteringProcessor.java b/core/src/main/java/com/redislabs/riot/processor/FilteringProcessor.java index 9c2553acc..ff9f329cf 100644 --- a/core/src/main/java/com/redislabs/riot/processor/FilteringProcessor.java +++ b/core/src/main/java/com/redislabs/riot/processor/FilteringProcessor.java @@ -15,7 +15,7 @@ public class FilteringProcessor implements ItemProcessor, Ma private final StandardEvaluationContext context; private final List expressions; - public FilteringProcessor(List filters) { + public FilteringProcessor(String... filters) { this.context = new StandardEvaluationContext(); context.setPropertyAccessors(Collections.singletonList(new MapAccessor())); SpelExpressionParser parser = new SpelExpressionParser(); diff --git a/core/src/main/java/com/redislabs/riot/redis/FilteringOptions.java b/core/src/main/java/com/redislabs/riot/redis/FilteringOptions.java index 67a1911f3..25b04adf7 100644 --- a/core/src/main/java/com/redislabs/riot/redis/FilteringOptions.java +++ b/core/src/main/java/com/redislabs/riot/redis/FilteringOptions.java @@ -8,6 +8,7 @@ import lombok.Builder; import lombok.NoArgsConstructor; import org.springframework.core.convert.converter.Converter; +import org.springframework.util.ObjectUtils; import picocli.CommandLine; import java.util.Map; @@ -17,21 +18,23 @@ @NoArgsConstructor public class FilteringOptions { - @Builder.Default @CommandLine.Option(arity = "1..*", names = "--include", description = "Name(s) of fields to include", paramLabel = "") - private String[] includes = new String[0]; - @Builder.Default + private String[] includes; @CommandLine.Option(arity = "1..*", names = "--exclude", description = "Name(s) of fields to exclude", paramLabel = "") - private String[] excludes = new String[0]; + private String[] excludes; public Converter, Map> converter() { MapFlattener mapFlattener = new MapFlattener<>(new ObjectToStringConverter()); - if (includes.length == 0 && excludes.length == 0) { + if (ObjectUtils.isEmpty(includes) && ObjectUtils.isEmpty(excludes)) { return mapFlattener; } MapFilteringConverter.MapFilteringConverterBuilder filtering = MapFilteringConverter.builder(); - filtering.includes(includes); - filtering.excludes(excludes); + if (!ObjectUtils.isEmpty(includes)) { + filtering.includes(includes); + } + if (!ObjectUtils.isEmpty(excludes)) { + filtering.excludes(excludes); + } return new CompositeConverter(mapFlattener, filtering.build()); }