From 6ca98aa328d9a1bd1fa4d57027d930608ddb61bf Mon Sep 17 00:00:00 2001 From: jruaux Date: Thu, 4 Mar 2021 13:28:01 -0800 Subject: [PATCH] improved logging --- build.gradle | 1 + ...aseOptions.java => DataSourceOptions.java} | 10 +- .../riot/db/DatabaseExportCommand.java | 17 +- .../riot/db/DatabaseExportOptions.java | 3 +- .../riot/db/DatabaseImportCommand.java | 26 +- .../riot/db/DatabaseImportOptions.java | 3 +- .../riot/file/AbstractFileImportCommand.java | 8 +- .../file/DataStructureFileImportCommand.java | 16 +- .../riot/file/FileExportCommand.java | 25 +- .../riot/file/FileExportOptions.java | 9 + .../riot/file/FileImportOptions.java | 15 +- .../com/redislabs/riot/file/FileOptions.java | 13 +- .../riot/file/GZIPInputStreamResource.java | 4 - .../riot/file/GZIPOutputStreamResource.java | 4 - .../com/redislabs/riot/file/GcsOptions.java | 6 + .../riot/file/KeyValueFileImportCommand.java | 11 +- .../com/redislabs/riot/file/S3Options.java | 6 + .../riot/file/UncustomizedUrlResource.java | 22 +- .../redislabs/riot/gen/FakerItemReader.java | 2 +- .../redislabs/riot/gen/GenerateCommand.java | 6 +- .../riot/gen/ReflectivePropertyAccessor.java | 1140 ++++++++--------- .../riot/redis/ReplicateCommand.java | 18 +- .../com/redislabs/riot/redis/RiotRedis.java | 1 + .../redislabs/riot/stream/KafkaOptions.java | 9 +- .../riot/stream/StreamExportCommand.java | 6 +- .../riot/stream/StreamImportCommand.java | 9 +- .../riot/stream/kafka/KafkaItemReader.java | 290 ++--- .../riot/stream/kafka/KafkaItemWriter.java | 4 +- .../com/redislabs/riot/stream/TestKafka.java | 4 +- .../redislabs/riot/AbstractExportCommand.java | 11 +- .../redislabs/riot/AbstractImportCommand.java | 14 +- .../redislabs/riot/AbstractTaskCommand.java | 1 + .../riot/AbstractTransferCommand.java | 5 +- .../java/com/redislabs/riot/HelpCommand.java | 13 +- .../riot/HiddenGenerateCompletion.java | 9 - .../riot/KeyValueProcessingOptions.java | 12 +- .../riot/ManifestVersionProvider.java | 43 - .../com/redislabs/riot/OneLineLogFormat.java | 36 - .../java/com/redislabs/riot/RedisOptions.java | 19 +- .../com/redislabs/riot/RedisURIConverter.java | 17 - .../main/java/com/redislabs/riot/RiotApp.java | 257 ++-- .../java/com/redislabs/riot/RiotCommand.java | 15 +- .../java/com/redislabs/riot/StepBuilder.java | 7 +- .../riot/convert/ObjectToNumberConverter.java | 39 +- .../riot/convert/ObjectToStringConverter.java | 2 +- .../riot/processor/SpelProcessor.java | 106 +- .../riot/redis/AbstractCollectionCommand.java | 6 +- .../riot/redis/AbstractKeyCommand.java | 5 +- .../riot/redis/AbstractRedisCommand.java | 1 + .../com/redislabs/riot/redis/EvalCommand.java | 1 + .../redislabs/riot/redis/ExpireCommand.java | 3 +- .../redislabs/riot/redis/GeoaddCommand.java | 6 +- .../com/redislabs/riot/redis/HsetCommand.java | 2 +- .../redislabs/riot/redis/LpushCommand.java | 4 +- .../redislabs/riot/redis/RpushCommand.java | 4 +- .../com/redislabs/riot/redis/SaddCommand.java | 4 +- .../com/redislabs/riot/redis/SetCommand.java | 6 +- .../com/redislabs/riot/redis/XaddCommand.java | 7 +- .../com/redislabs/riot/redis/ZaddCommand.java | 5 +- .../redislabs/riot/test/AbstractRiotTest.java | 2 +- .../test/AbstractStandaloneRedisTest.java | 2 +- 61 files changed, 1162 insertions(+), 1190 deletions(-) rename connectors/db/src/main/java/com/redislabs/riot/db/{DatabaseOptions.java => DataSourceOptions.java} (85%) delete mode 100644 core/src/main/java/com/redislabs/riot/HiddenGenerateCompletion.java delete mode 100644 core/src/main/java/com/redislabs/riot/ManifestVersionProvider.java delete mode 100644 core/src/main/java/com/redislabs/riot/OneLineLogFormat.java delete mode 100644 core/src/main/java/com/redislabs/riot/RedisURIConverter.java diff --git a/build.gradle b/build.gradle index 0725b66a6..d6ae2f4e2 100644 --- a/build.gradle +++ b/build.gradle @@ -68,6 +68,7 @@ subprojects { implementation project(':core') testImplementation project(':test') implementation 'org.slf4j:slf4j-jdk14' + implementation 'org.slf4j:log4j-over-slf4j' } } } diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseOptions.java b/connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java similarity index 85% rename from connectors/db/src/main/java/com/redislabs/riot/db/DatabaseOptions.java rename to connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java index 69eeb4330..fa0d328b8 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseOptions.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DataSourceOptions.java @@ -1,13 +1,17 @@ package com.redislabs.riot.db; -import lombok.Data; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties; import picocli.CommandLine.Option; import javax.sql.DataSource; -@Data -public class DatabaseOptions { +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DataSourceOptions { @Option(names = "--driver", description = "Fully qualified name of the JDBC driver", paramLabel = "") private String driver; diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java index a766a006c..dd6723835 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportCommand.java @@ -2,6 +2,7 @@ import com.redislabs.riot.AbstractExportCommand; import com.redislabs.riot.processor.DataStructureMapItemProcessor; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.item.database.JdbcBatchItemWriter; import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder; @@ -9,26 +10,28 @@ import picocli.CommandLine.Command; import picocli.CommandLine.Mixin; -import javax.sql.DataSource; import java.util.Map; +@Slf4j @Command(name = "export", description = "Export to a database") public class DatabaseExportCommand extends AbstractExportCommand> { @Mixin - private DatabaseExportOptions options = new DatabaseExportOptions(); + private DataSourceOptions dataSourceOptions = DataSourceOptions.builder().build(); + @Mixin + private DatabaseExportOptions exportOptions = DatabaseExportOptions.builder().build(); @Override protected Flow flow() throws Exception { - DataSource dataSource = options.dataSource(); JdbcBatchItemWriterBuilder> builder = new JdbcBatchItemWriterBuilder<>(); builder.itemSqlParameterSourceProvider(MapSqlParameterSource::new); - builder.dataSource(dataSource); - builder.sql(options.getSql()); - builder.assertUpdates(options.isAssertUpdates()); + log.info("Creating data source {}", dataSourceOptions); + builder.dataSource(dataSourceOptions.dataSource()); + builder.sql(exportOptions.getSql()); + builder.assertUpdates(exportOptions.isAssertUpdates()); JdbcBatchItemWriter> writer = builder.build(); writer.afterPropertiesSet(); - DataStructureMapItemProcessor processor = DataStructureMapItemProcessor.builder().keyRegex(options.getKeyRegex()).build(); + DataStructureMapItemProcessor processor = DataStructureMapItemProcessor.builder().keyRegex(exportOptions.getKeyRegex()).build(); return flow(step(processor, writer).build()); } diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportOptions.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportOptions.java index 3a039f8f4..c8ce3bb1d 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportOptions.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseExportOptions.java @@ -4,11 +4,10 @@ import picocli.CommandLine; @Data -@EqualsAndHashCode(callSuper = true) @Builder @NoArgsConstructor @AllArgsConstructor -public class DatabaseExportOptions extends DatabaseOptions { +public class DatabaseExportOptions { @CommandLine.Parameters(arity = "1", description = "SQL INSERT statement.", paramLabel = "SQL") private String sql; diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java index c3784873c..a9bf9e411 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportCommand.java @@ -17,9 +17,11 @@ public class DatabaseImportCommand extends AbstractImportCommand, Map> { @Mixin - private DatabaseImportOptions options = new DatabaseImportOptions(); + private DataSourceOptions options = DataSourceOptions.builder().build(); @Mixin - private KeyValueProcessingOptions processingOptions = new KeyValueProcessingOptions(); + private DatabaseImportOptions importOptions = DatabaseImportOptions.builder().build(); + @Mixin + private KeyValueProcessingOptions processingOptions = KeyValueProcessingOptions.builder().build(); @Override protected Flow flow() throws Exception { @@ -27,24 +29,24 @@ protected Flow flow() throws Exception { JdbcCursorItemReaderBuilder> builder = new JdbcCursorItemReaderBuilder<>(); builder.saveState(false); builder.dataSource(dataSource); - if (options.getFetchSize() != null) { - builder.fetchSize(options.getFetchSize()); + if (importOptions.getFetchSize() != null) { + builder.fetchSize(importOptions.getFetchSize()); } - if (options.getMaxRows() != null) { - builder.maxRows(options.getMaxRows()); + if (importOptions.getMaxRows() != null) { + builder.maxRows(importOptions.getMaxRows()); } builder.name("database-reader"); - if (options.getQueryTimeout() != null) { - builder.queryTimeout(options.getQueryTimeout()); + if (importOptions.getQueryTimeout() != null) { + builder.queryTimeout(importOptions.getQueryTimeout()); } builder.rowMapper(new ColumnMapRowMapper()); - builder.sql(options.getSql()); - builder.useSharedExtendedConnection(options.isUseSharedExtendedConnection()); - builder.verifyCursorPosition(options.isVerifyCursorPosition()); + builder.sql(importOptions.getSql()); + builder.useSharedExtendedConnection(importOptions.isUseSharedExtendedConnection()); + builder.verifyCursorPosition(importOptions.isVerifyCursorPosition()); JdbcCursorItemReader> reader = builder.build(); reader.afterPropertiesSet(); String name = dataSource.getConnection().getMetaData().getDatabaseProductName(); - return flow(step("Importing from " + name, reader).build()); + return flow(step(name + "-db-import-step", "Importing from " + name, reader).build()); } @Override diff --git a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportOptions.java b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportOptions.java index a2a2c1aca..5fc0ba837 100644 --- a/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportOptions.java +++ b/connectors/db/src/main/java/com/redislabs/riot/db/DatabaseImportOptions.java @@ -4,11 +4,10 @@ import picocli.CommandLine; @Data -@EqualsAndHashCode(callSuper = true) @Builder @NoArgsConstructor @AllArgsConstructor -public class DatabaseImportOptions extends DatabaseOptions { +public class DatabaseImportOptions { @CommandLine.Parameters(arity = "1", description = "SQL SELECT statement", paramLabel = "SQL") private String sql; diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java index 357a5b461..10851912a 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/AbstractFileImportCommand.java @@ -2,8 +2,6 @@ import com.redislabs.riot.AbstractImportCommand; import lombok.Getter; -import lombok.Setter; -import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.Step; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.item.support.AbstractItemStreamItemReader; @@ -15,8 +13,6 @@ import java.util.ArrayList; import java.util.List; -@Slf4j -@Setter @Command public abstract class AbstractFileImportCommand extends AbstractImportCommand { @@ -24,7 +20,7 @@ public abstract class AbstractFileImportCommand extends AbstractImportCommand private String[] files = new String[0]; @Getter @CommandLine.Mixin - private FileOptions fileOptions = new FileOptions(); + private FileOptions fileOptions = FileOptions.builder().build(); @Override protected Flow flow() throws Exception { @@ -39,7 +35,7 @@ protected Flow flow() throws Exception { AbstractItemStreamItemReader reader = reader(file, fileType, resource); String name = FileUtils.filename(resource); reader.setName(name); - steps.add(step("Importing file " + name, reader).build()); + steps.add(step(name + "-file-import-step", "Importing " + name, reader).build()); } return flow(steps.toArray(new Step[0])); } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java index 6c9f6f061..652075ec9 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/DataStructureFileImportCommand.java @@ -13,6 +13,7 @@ import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.json.JsonItemReader; import org.springframework.batch.item.redis.DataStructureItemWriter; +import org.springframework.batch.item.redis.support.CommandTimeoutBuilder; import org.springframework.batch.item.redis.support.DataStructure; import org.springframework.batch.item.support.AbstractItemStreamItemReader; import org.springframework.batch.item.xml.XmlItemReader; @@ -20,6 +21,7 @@ import picocli.CommandLine; import picocli.CommandLine.Command; +import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -32,7 +34,7 @@ public class DataStructureFileImportCommand extends AbstractTransferCommand, DataStructure> step = stepBuilder("Importing file " + name); - AbstractItemStreamItemReader> reader = reader(file, fileType, resource); + AbstractItemStreamItemReader> reader = reader(fileType, resource); reader.setName(name); + StepBuilder, DataStructure> step = stepBuilder(name + "-datastructure-file-import-step", "Importing " + name); steps.add(step.reader(reader).processor(processor).writer(writer()).build().build()); } return flow(steps.toArray(new Step[0])); @@ -53,17 +55,19 @@ protected Flow flow() throws Exception { private ItemWriter> writer() { if (isCluster()) { - return DataStructureItemWriter.clusterBuilder((GenericObjectPool>) pool).commandTimeout(getCommandTimeout()).build(); + return configureCommandTimeoutBuilder(DataStructureItemWriter.clusterBuilder((GenericObjectPool>) pool)).build(); } - return DataStructureItemWriter.builder((GenericObjectPool>) pool).commandTimeout(getCommandTimeout()).build(); + return configureCommandTimeoutBuilder(DataStructureItemWriter.builder((GenericObjectPool>) pool)).build(); } @SuppressWarnings({"unchecked", "rawtypes"}) - protected AbstractItemStreamItemReader> reader(String file, FileType fileType, Resource resource) { + protected AbstractItemStreamItemReader> reader(FileType fileType, Resource resource) { switch (fileType) { case JSON: + log.info("Creating JSON data structure reader for file {}", resource); return (JsonItemReader) FileUtils.jsonReader(resource, DataStructure.class); case XML: + log.info("Creating XML data structure reader for file {}", resource); return (XmlItemReader) FileUtils.xmlReader(resource, DataStructure.class); } throw new IllegalArgumentException("Unsupported file type: " + fileType); diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileExportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileExportCommand.java index a6adc3f1a..1cef75fde 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileExportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileExportCommand.java @@ -2,7 +2,7 @@ import com.fasterxml.jackson.dataformat.xml.XmlMapper; import com.redislabs.riot.AbstractExportCommand; -import lombok.Setter; +import lombok.extern.slf4j.Slf4j; import org.springframework.batch.core.job.flow.Flow; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.item.json.JacksonJsonObjectMarshaller; @@ -17,16 +17,17 @@ import java.io.IOException; -@Setter +@Slf4j @Command(name = "export", description = "Export Redis data to a JSON or XML file") public class FileExportCommand extends AbstractExportCommand> { + @SuppressWarnings("unused") @CommandLine.Parameters(arity = "1", description = "File path or URL", paramLabel = "FILE") - protected String file; + private String file; @Mixin - private FileExportOptions options = new FileExportOptions(); + private FileExportOptions exportOptions = FileExportOptions.builder().build(); @Mixin - private FileOptions fileOptions = new FileOptions(); + private FileOptions fileOptions = FileOptions.builder().build(); @Override protected Flow flow() throws Exception { @@ -40,23 +41,25 @@ private ItemWriter> writer() throws IOException { case JSON: JsonResourceItemWriterBuilder> jsonWriterBuilder = new JsonResourceItemWriterBuilder<>(); jsonWriterBuilder.name("json-resource-item-writer"); - jsonWriterBuilder.append(options.isAppend()); + jsonWriterBuilder.append(exportOptions.isAppend()); jsonWriterBuilder.encoding(fileOptions.getEncoding()); jsonWriterBuilder.jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>()); - jsonWriterBuilder.lineSeparator(options.getLineSeparator()); + jsonWriterBuilder.lineSeparator(exportOptions.getLineSeparator()); jsonWriterBuilder.resource(resource); jsonWriterBuilder.saveState(false); + log.info("Creating JSON writer with {} for file {}", exportOptions, file); return jsonWriterBuilder.build(); case XML: XmlResourceItemWriterBuilder> xmlWriterBuilder = new XmlResourceItemWriterBuilder<>(); xmlWriterBuilder.name("xml-resource-item-writer"); - xmlWriterBuilder.append(options.isAppend()); + xmlWriterBuilder.append(exportOptions.isAppend()); xmlWriterBuilder.encoding(fileOptions.getEncoding()); xmlWriterBuilder.xmlObjectMarshaller(xmlMarshaller()); - xmlWriterBuilder.lineSeparator(options.getLineSeparator()); - xmlWriterBuilder.rootName(options.getRootName()); + xmlWriterBuilder.lineSeparator(exportOptions.getLineSeparator()); + xmlWriterBuilder.rootName(exportOptions.getRootName()); xmlWriterBuilder.resource(resource); xmlWriterBuilder.saveState(false); + log.info("Creating XML writer with {} for file {}", exportOptions, file); return xmlWriterBuilder.build(); default: throw new IllegalArgumentException("Unsupported file type: " + fileType); @@ -65,7 +68,7 @@ private ItemWriter> writer() throws IOException { private JsonObjectMarshaller> xmlMarshaller() { XmlMapper mapper = new XmlMapper(); - mapper.setConfig(mapper.getSerializationConfig().withRootName(options.getElementName())); + mapper.setConfig(mapper.getSerializationConfig().withRootName(exportOptions.getElementName())); JacksonJsonObjectMarshaller> marshaller = new JacksonJsonObjectMarshaller<>(); marshaller.setObjectMapper(mapper); return marshaller; diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileExportOptions.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileExportOptions.java index 0f2ed9a69..014340c56 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileExportOptions.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileExportOptions.java @@ -1,18 +1,27 @@ package com.redislabs.riot.file; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import org.springframework.batch.item.file.FlatFileItemWriter; import picocli.CommandLine; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class FileExportOptions { @CommandLine.Option(names = "--append", description = "Append to file if it exists") private boolean append; + @Builder.Default @CommandLine.Option(names = "--root", description = "XML root element tag name (default: ${DEFAULT-VALUE})", paramLabel = "") private String rootName = "root"; + @Builder.Default @CommandLine.Option(names = "--element", description = "XML element tag name (default: ${DEFAULT-VALUE})", paramLabel = "") private String elementName = "record"; + @Builder.Default @CommandLine.Option(names = "--line-sep", description = "String to separate lines (default: system default)", paramLabel = "") private String lineSeparator = FlatFileItemWriter.DEFAULT_LINE_SEPARATOR; diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java index 33ede2e75..770d44134 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileImportOptions.java @@ -1,6 +1,6 @@ package com.redislabs.riot.file; -import lombok.Data; +import lombok.*; import org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import org.springframework.batch.item.file.transform.Range; import picocli.CommandLine.Option; @@ -9,8 +9,13 @@ import java.util.List; @Data +@Builder +@AllArgsConstructor +@NoArgsConstructor public class FileImportOptions { + @Getter + @Builder.Default @Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names", paramLabel = "") private List names = new ArrayList<>(); @Option(names = {"-h", "--header"}, description = "Delimited/FW first line contains field names") @@ -19,12 +24,20 @@ public class FileImportOptions { private String delimiter; @Option(names = "--skip", description = "Delimited/FW lines to skip at start", paramLabel = "") private Integer linesToSkip; + @Getter + @Builder.Default @Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based)", paramLabel = "") private List includedFields = new ArrayList<>(); + @Getter + @Builder.Default @Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges", paramLabel = "") private List columnRanges = new ArrayList<>(); + @Getter + @Builder.Default @Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE})", paramLabel = "") private Character quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER; + @Getter + @Builder.Default @Option(names = "--continuation", description = "Line continuation string (default: ${DEFAULT-VALUE})", paramLabel = "") private String continuationString = "\\"; diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/FileOptions.java b/connectors/file/src/main/java/com/redislabs/riot/file/FileOptions.java index ab79be6d7..c0f049b54 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/FileOptions.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/FileOptions.java @@ -1,24 +1,33 @@ package com.redislabs.riot.file; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import picocli.CommandLine.ArgGroup; import picocli.CommandLine.Option; import java.nio.charset.Charset; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class FileOptions { + @Builder.Default @Option(names = "--encoding", description = "File encoding (default: ${DEFAULT-VALUE})", paramLabel = "") private String encoding = Charset.defaultCharset().name(); @Option(names = {"-t", "--filetype"}, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "") private FileType type; @Option(names = {"-z", "--gzip"}, description = "File is gzip compressed") private boolean gzip; + @Builder.Default @ArgGroup(exclusive = false, heading = "Amazon Simple Storage Service options%n") - private S3Options s3 = new S3Options(); + private S3Options s3 = S3Options.builder().build(); + @Builder.Default @ArgGroup(exclusive = false, heading = "Google Cloud Storage options%n") - private GcsOptions gcs = new GcsOptions(); + private GcsOptions gcs = GcsOptions.builder().build(); } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/GZIPInputStreamResource.java b/connectors/file/src/main/java/com/redislabs/riot/file/GZIPInputStreamResource.java index 2f15de8db..82b3ddde0 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/GZIPInputStreamResource.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/GZIPInputStreamResource.java @@ -8,10 +8,6 @@ public class GZIPInputStreamResource extends InputStreamResource { - public GZIPInputStreamResource(InputStream inputStream) throws IOException { - super(new GZIPInputStream(inputStream)); - } - public GZIPInputStreamResource(InputStream inputStream, String description) throws IOException { super(new GZIPInputStream(inputStream), description); } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/GZIPOutputStreamResource.java b/connectors/file/src/main/java/com/redislabs/riot/file/GZIPOutputStreamResource.java index 1183015f2..baf81d3c9 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/GZIPOutputStreamResource.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/GZIPOutputStreamResource.java @@ -8,10 +8,6 @@ public class GZIPOutputStreamResource extends OutputStreamResource { - public GZIPOutputStreamResource(OutputStream outStream) throws IOException { - super(new GZIPOutputStream(outStream)); - } - public GZIPOutputStreamResource(OutputStream outStream, String desc) throws IOException { super(new GZIPOutputStream(outStream), desc); } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/GcsOptions.java b/connectors/file/src/main/java/com/redislabs/riot/file/GcsOptions.java index f4959ef0b..7940e1c2b 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/GcsOptions.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/GcsOptions.java @@ -1,11 +1,17 @@ package com.redislabs.riot.file; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import picocli.CommandLine.Option; import java.io.File; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class GcsOptions { @Option(names = "--gcs-key-file", description = "GCS private key (e.g. /usr/local/key.json)", paramLabel = "") diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java b/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java index e8c6d4c7e..cc2eb1b70 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/KeyValueFileImportCommand.java @@ -29,10 +29,11 @@ public class KeyValueFileImportCommand extends AbstractFileImportCommand> { @CommandLine.Mixin - protected FileImportOptions options = new FileImportOptions(); + protected FileImportOptions options = FileImportOptions.builder().build(); @CommandLine.Mixin - private KeyValueProcessingOptions processingOptions = new KeyValueProcessingOptions(); + private KeyValueProcessingOptions processingOptions = KeyValueProcessingOptions.builder().build(); + @SuppressWarnings({"unchecked", "rawtypes"}) protected AbstractItemStreamItemReader> reader(String file, FileType fileType, Resource resource) { switch (fileType) { case DELIMITED: @@ -42,15 +43,19 @@ protected AbstractItemStreamItemReader> reader(String file, if (!options.getIncludedFields().isEmpty()) { tokenizer.setIncludedFields(options.getIncludedFields().stream().mapToInt(i -> i).toArray()); } + log.info("Creating delimited reader with {} for file {}", options, file); return flatFileReader(resource, tokenizer); case FIXED: FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer(); Assert.notEmpty(options.getColumnRanges(), "Column ranges are required"); fixedLengthTokenizer.setColumns(options.getColumnRanges().toArray(new Range[0])); + log.info("Creating fixed-width reader with {} for file {}", options, file); return flatFileReader(resource, fixedLengthTokenizer); case JSON: + log.info("Creating JSON reader for file {}", file); return (JsonItemReader) FileUtils.jsonReader(resource, Map.class); case XML: + log.info("Creating XML reader for file {}", file); return (XmlItemReader) FileUtils.xmlReader(resource, Map.class); } throw new IllegalArgumentException("Unsupported file type: " + fileType); @@ -90,7 +95,7 @@ public HeaderCallbackHandler(AbstractLineTokenizer tokenizer) { @Override public void handleLine(String line) { - log.debug("Found header {}", line); + log.info("Found header {}", line); tokenizer.setNames(tokenizer.tokenize(line).getValues()); } } diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/S3Options.java b/connectors/file/src/main/java/com/redislabs/riot/file/S3Options.java index 8a71f9275..d36e1e8a7 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/S3Options.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/S3Options.java @@ -1,9 +1,15 @@ package com.redislabs.riot.file; +import lombok.AllArgsConstructor; +import lombok.Builder; import lombok.Data; +import lombok.NoArgsConstructor; import picocli.CommandLine.Option; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class S3Options { @Option(names = "--s3-access", description = "Access key", paramLabel = "") diff --git a/connectors/file/src/main/java/com/redislabs/riot/file/UncustomizedUrlResource.java b/connectors/file/src/main/java/com/redislabs/riot/file/UncustomizedUrlResource.java index f9fa09c31..a9173fead 100644 --- a/connectors/file/src/main/java/com/redislabs/riot/file/UncustomizedUrlResource.java +++ b/connectors/file/src/main/java/com/redislabs/riot/file/UncustomizedUrlResource.java @@ -1,12 +1,10 @@ package com.redislabs.riot.file; +import org.springframework.core.io.UrlResource; + import java.io.IOException; import java.net.HttpURLConnection; import java.net.MalformedURLException; -import java.net.URI; -import java.net.URL; - -import org.springframework.core.io.UrlResource; public class UncustomizedUrlResource extends UrlResource { @@ -14,22 +12,6 @@ public UncustomizedUrlResource(String path) throws MalformedURLException { super(path); } - public UncustomizedUrlResource(String protocol, String location, String fragment) throws MalformedURLException { - super(protocol, location, fragment); - } - - public UncustomizedUrlResource(String protocol, String location) throws MalformedURLException { - super(protocol, location); - } - - public UncustomizedUrlResource(URI uri) throws MalformedURLException { - super(uri); - } - - public UncustomizedUrlResource(URL url) { - super(url); - } - @Override protected void customizeConnection(HttpURLConnection con) throws IOException { // do nothing diff --git a/connectors/gen/src/main/java/com/redislabs/riot/gen/FakerItemReader.java b/connectors/gen/src/main/java/com/redislabs/riot/gen/FakerItemReader.java index 48c66882e..3d47912f3 100644 --- a/connectors/gen/src/main/java/com/redislabs/riot/gen/FakerItemReader.java +++ b/connectors/gen/src/main/java/com/redislabs/riot/gen/FakerItemReader.java @@ -37,7 +37,7 @@ public class FakerItemReader extends AbstractItemCountingItemStreamItemReader expressions; @Builder - public FakerItemReader(Locale locale, boolean includeMetadata, Map fields, long start, long end, long sleep) { + private FakerItemReader(Locale locale, boolean includeMetadata, Map fields, long start, long end, long sleep) { Assert.notNull(fields, "Fields are required."); Assert.isTrue(end > start, "End index must be strictly greater than start index"); setName(ClassUtils.getShortName(getClass())); diff --git a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java index 265526b8e..adeb2db3d 100644 --- a/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java +++ b/connectors/gen/src/main/java/com/redislabs/riot/gen/GenerateCommand.java @@ -20,10 +20,12 @@ public class GenerateCommand extends AbstractImportCommand, @Parameters(description = "SpEL expressions", paramLabel = "SPEL") private Map fakerFields = new LinkedHashMap<>(); + @SuppressWarnings("unused") @Option(names = "--introspect", description = "Use given search index to introspect Faker fields", paramLabel = "") private String fakerIndex; @Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "") private Locale locale = Locale.ENGLISH; + @SuppressWarnings("unused") @Option(names = "--metadata", description = "Include metadata (index, partition)") private boolean includeMetadata; @Option(names = "--start", description = "Start index (default: ${DEFAULT-VALUE})", paramLabel = "") @@ -33,12 +35,12 @@ public class GenerateCommand extends AbstractImportCommand, @Option(names = "--sleep", description = "Duration in ms to sleep before each item generation (default: ${DEFAULT-VALUE})", paramLabel = "") private long sleep = 0; @CommandLine.Mixin - private KeyValueProcessingOptions processingOptions = new KeyValueProcessingOptions(); + private KeyValueProcessingOptions processingOptions = KeyValueProcessingOptions.builder().build(); @Override protected Flow flow() { FakerItemReader reader = FakerItemReader.builder().locale(locale).includeMetadata(includeMetadata).fields(fakerFields(getRedisURI())).start(start).end(end).sleep(sleep).build(); - return flow(step("Generating", reader).build()); + return flow(step("generate-step", "Generating", reader).build()); } private String expression(Field field) { diff --git a/connectors/gen/src/main/java/com/redislabs/riot/gen/ReflectivePropertyAccessor.java b/connectors/gen/src/main/java/com/redislabs/riot/gen/ReflectivePropertyAccessor.java index 6ffed7649..290c0c13f 100644 --- a/connectors/gen/src/main/java/com/redislabs/riot/gen/ReflectivePropertyAccessor.java +++ b/connectors/gen/src/main/java/com/redislabs/riot/gen/ReflectivePropertyAccessor.java @@ -26,580 +26,570 @@ public class ReflectivePropertyAccessor implements PropertyAccessor { - static { - Set> booleanTypes = new HashSet<>(4); - booleanTypes.add(Boolean.class); - booleanTypes.add(Boolean.TYPE); - } - - private final boolean allowWrite; - private final Map readerCache = new ConcurrentHashMap<>(64); - private final Map writerCache = new ConcurrentHashMap<>(64); - private final Map typeDescriptorCache = new ConcurrentHashMap<>(64); - @Nullable - private volatile InvokerPair lastReadInvokerPair; - - /** - * Create a new property accessor for reading as well writing. - * - * @see #ReflectivePropertyAccessor(boolean) - */ - public ReflectivePropertyAccessor() { - this.allowWrite = true; - } - - /** - * Create a new property accessor for reading and possibly writing. - * - * @param allowWrite whether to also allow for write operations - * @since 4.3.15 - * @see #canWrite - */ - public ReflectivePropertyAccessor(boolean allowWrite) { - this.allowWrite = allowWrite; - } - - /** - * Returns {@code null} which means this is a general purpose accessor. - */ - @Override - @Nullable - public Class[] getSpecificTargetClasses() { - return null; - } - - @Override - public boolean canRead(EvaluationContext context, @Nullable Object target, String name) throws AccessException { - if (target == null) { - return false; - } - - Class type = (target instanceof Class ? (Class) target : target.getClass()); - if (type.isArray() && name.equals("length")) { - return true; - } - - PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); - if (this.readerCache.containsKey(cacheKey)) { - return true; - } - Method method = getMethod(name, type); - if (method != null) { - TypeDescriptor typeDescriptor = new TypeDescriptor(new MethodParameter(method, -1)); - this.readerCache.put(cacheKey, new InvokerPair(method, typeDescriptor)); - this.typeDescriptorCache.put(cacheKey, typeDescriptor); - return true; - } else { - Field field = findField(name, type, target); - if (field != null) { - TypeDescriptor typeDescriptor = new TypeDescriptor(field); - this.readerCache.put(cacheKey, new InvokerPair(field, typeDescriptor)); - this.typeDescriptorCache.put(cacheKey, typeDescriptor); - return true; - } - } - - return false; - } - - private Method getMethod(String name, Class type) { - try { - return type.getMethod(name); - } catch (NoSuchMethodException | SecurityException e) { - return null; - } - } - - @Override - public TypedValue read(EvaluationContext context, @Nullable Object target, String name) throws AccessException { - Assert.state(target != null, "Target must not be null"); - Class type = (target instanceof Class ? (Class) target : target.getClass()); - - if (type.isArray() && name.equals("length")) { - if (target instanceof Class) { - throw new AccessException("Cannot access length on array class itself"); - } - return new TypedValue(Array.getLength(target)); - } - - PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); - InvokerPair invoker = this.readerCache.get(cacheKey); - this.lastReadInvokerPair = invoker; - - if (invoker == null || invoker.member instanceof Method) { - Method method = (Method) (invoker != null ? invoker.member : null); - if (method == null) { - method = getMethod(name, type); - if (method != null) { - // Treat it like a property... - // The readerCache will only contain gettable properties (let's not worry about - // setters for now). - TypeDescriptor typeDescriptor = new TypeDescriptor(new MethodParameter(method, -1)); - invoker = new InvokerPair(method, typeDescriptor); - this.lastReadInvokerPair = invoker; - this.readerCache.put(cacheKey, invoker); - } - } - if (method != null) { - try { - ReflectionUtils.makeAccessible(method); - Object value = method.invoke(target); - return new TypedValue(value, invoker.typeDescriptor.narrow(value)); - } catch (Exception ex) { - throw new AccessException("Unable to access property '" + name + "' through getter method", ex); - } - } - } - - if (invoker == null || invoker.member instanceof Field) { - Field field = (Field) (invoker == null ? null : invoker.member); - if (field == null) { - field = findField(name, type, target); - if (field != null) { - invoker = new InvokerPair(field, new TypeDescriptor(field)); - this.lastReadInvokerPair = invoker; - this.readerCache.put(cacheKey, invoker); - } - } - if (field != null) { - try { - ReflectionUtils.makeAccessible(field); - Object value = field.get(target); - return new TypedValue(value, invoker.typeDescriptor.narrow(value)); - } catch (Exception ex) { - throw new AccessException("Unable to access field '" + name + "'", ex); - } - } - } - - throw new AccessException("Neither getter method nor field found for property '" + name + "'"); - } - - @Override - public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) throws AccessException { - if (!this.allowWrite || target == null) { - return false; - } - - Class type = (target instanceof Class ? (Class) target : target.getClass()); - PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); - if (this.writerCache.containsKey(cacheKey)) { - return true; - } - - Method method = getMethod(name, type); - if (method != null) { - TypeDescriptor typeDescriptor = new TypeDescriptor(new MethodParameter(method, -1)); - this.writerCache.put(cacheKey, method); - this.typeDescriptorCache.put(cacheKey, typeDescriptor); - return true; - } else { - Field field = findField(name, type, target); - if (field != null) { - this.writerCache.put(cacheKey, field); - this.typeDescriptorCache.put(cacheKey, new TypeDescriptor(field)); - return true; - } - } - - return false; - } - - @Override - public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object newValue) - throws AccessException { - - if (!this.allowWrite) { - throw new AccessException("PropertyAccessor for property '" + name + "' on target [" + target - + "] does not allow write operations"); - } - - Assert.state(target != null, "Target must not be null"); - Class type = (target instanceof Class ? (Class) target : target.getClass()); - - Object possiblyConvertedNewValue = newValue; - TypeDescriptor typeDescriptor = getTypeDescriptor(context, target, name); - if (typeDescriptor != null) { - try { - possiblyConvertedNewValue = context.getTypeConverter().convertValue(newValue, - TypeDescriptor.forObject(newValue), typeDescriptor); - } catch (EvaluationException evaluationException) { - throw new AccessException("Type conversion failure", evaluationException); - } - } - - PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); - Member cachedMember = this.writerCache.get(cacheKey); - - if (cachedMember == null || cachedMember instanceof Method) { - Method method = (Method) cachedMember; - if (method != null) { - try { - ReflectionUtils.makeAccessible(method); - method.invoke(target, possiblyConvertedNewValue); - return; - } catch (Exception ex) { - throw new AccessException("Unable to access property '" + name + "' through setter method", ex); - } - } - } - - if (cachedMember == null || cachedMember instanceof Field) { - Field field = (Field) cachedMember; - if (field == null) { - field = findField(name, type, target); - if (field != null) { - cachedMember = field; - this.writerCache.put(cacheKey, cachedMember); - } - } - if (field != null) { - try { - ReflectionUtils.makeAccessible(field); - field.set(target, possiblyConvertedNewValue); - return; - } catch (Exception ex) { - throw new AccessException("Unable to access field '" + name + "'", ex); - } - } - } - - throw new AccessException("Neither setter method nor field found for property '" + name + "'"); - } - - /** - * Get the last read invoker pair. - * - * @deprecated as of 4.3.15 since it is not used within the framework anymore - */ - @Deprecated - @Nullable - public Member getLastReadInvokerPair() { - InvokerPair lastReadInvoker = this.lastReadInvokerPair; - return (lastReadInvoker != null ? lastReadInvoker.member : null); - } - - @Nullable - private TypeDescriptor getTypeDescriptor(EvaluationContext context, Object target, String name) { - Class type = (target instanceof Class ? (Class) target : target.getClass()); - - if (type.isArray() && name.equals("length")) { - return TypeDescriptor.valueOf(Integer.TYPE); - } - PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); - TypeDescriptor typeDescriptor = this.typeDescriptorCache.get(cacheKey); - if (typeDescriptor == null) { - // Attempt to populate the cache entry - try { - if (canRead(context, target, name) || canWrite(context, target, name)) { - typeDescriptor = this.typeDescriptorCache.get(cacheKey); - } - } catch (AccessException ex) { - // Continue with null type descriptor - } - } - return typeDescriptor; - } - - /** - * Determine whether the given {@code Method} is a candidate for property access - * on an instance of the given target class. - *

- * The default implementation considers any method as a candidate, even for - * non-user-declared properties on the {@link Object} base class. - * - * @param method the Method to evaluate - * @param targetClass the concrete target class that is being introspected - * @since 4.3.15 - */ - protected boolean isCandidateForProperty(Method method, Class targetClass) { - return true; - } - - @Nullable - private Field findField(String name, Class clazz, Object target) { - Field field = findField(name, clazz, target instanceof Class); - if (field == null && target instanceof Class) { - field = findField(name, target.getClass(), false); - } - return field; - } - - /** - * Find a field of a certain name on a specified class. - */ - @Nullable - protected Field findField(String name, Class clazz, boolean mustBeStatic) { - Field[] fields = clazz.getFields(); - for (Field field : fields) { - if (field.getName().equals(name) && (!mustBeStatic || Modifier.isStatic(field.getModifiers()))) { - return field; - } - } - // We'll search superclasses and implemented interfaces explicitly, - // although it shouldn't be necessary - however, see SPR-10125. - if (clazz.getSuperclass() != null) { - Field field = findField(name, clazz.getSuperclass(), mustBeStatic); - if (field != null) { - return field; - } - } - for (Class implementedInterface : clazz.getInterfaces()) { - Field field = findField(name, implementedInterface, mustBeStatic); - if (field != null) { - return field; - } - } - return null; - } - - /** - * Attempt to create an optimized property accessor tailored for a property of a - * particular name on a particular class. The general ReflectivePropertyAccessor - * will always work but is not optimal due to the need to lookup which - * reflective member (method/field) to use each time read() is called. This - * method will just return the ReflectivePropertyAccessor instance if it is - * unable to build a more optimal accessor. - *

- * Note: An optimal accessor is currently only usable for read attempts. Do not - * call this method if you need a read-write accessor. - * - * @see OptimalPropertyAccessor - */ - public PropertyAccessor createOptimalAccessor(@Nullable Object target, String name) { - // Don't be clever for arrays or a null target... - if (target == null) { - return this; - } - Class clazz = (target instanceof Class ? (Class) target : target.getClass()); - if (clazz.isArray()) { - return this; - } - - PropertyCacheKey cacheKey = new PropertyCacheKey(clazz, name, target instanceof Class); - InvokerPair invocationTarget = this.readerCache.get(cacheKey); - - if (invocationTarget == null || invocationTarget.member instanceof Method) { - Method method = (Method) (invocationTarget != null ? invocationTarget.member : null); - if (method == null) { - method = getMethod(name, clazz); - if (method != null) { - invocationTarget = new InvokerPair(method, new TypeDescriptor(new MethodParameter(method, -1))); - ReflectionUtils.makeAccessible(method); - this.readerCache.put(cacheKey, invocationTarget); - } - } - if (method != null) { - return new OptimalPropertyAccessor(invocationTarget); - } - } - - if (invocationTarget == null || invocationTarget.member instanceof Field) { - Field field = (invocationTarget != null ? (Field) invocationTarget.member : null); - if (field == null) { - field = findField(name, clazz, target instanceof Class); - if (field != null) { - invocationTarget = new InvokerPair(field, new TypeDescriptor(field)); - ReflectionUtils.makeAccessible(field); - this.readerCache.put(cacheKey, invocationTarget); - } - } - if (field != null) { - return new OptimalPropertyAccessor(invocationTarget); - } - } - - return this; - } - - /** - * Captures the member (method/field) to call reflectively to access a property - * value and the type descriptor for the value returned by the reflective call. - */ - private static class InvokerPair { - - final Member member; - - final TypeDescriptor typeDescriptor; - - public InvokerPair(Member member, TypeDescriptor typeDescriptor) { - this.member = member; - this.typeDescriptor = typeDescriptor; - } - } - - private static final class PropertyCacheKey implements Comparable { - - private final Class clazz; - - private final String property; - - private boolean targetIsClass; - - public PropertyCacheKey(Class clazz, String name, boolean targetIsClass) { - this.clazz = clazz; - this.property = name; - this.targetIsClass = targetIsClass; - } - - @Override - public boolean equals(Object other) { - if (this == other) { - return true; - } - if (!(other instanceof PropertyCacheKey)) { - return false; - } - PropertyCacheKey otherKey = (PropertyCacheKey) other; - return (this.clazz == otherKey.clazz && this.property.equals(otherKey.property) - && this.targetIsClass == otherKey.targetIsClass); - } - - @Override - public int hashCode() { - return (this.clazz.hashCode() * 29 + this.property.hashCode()); - } - - @Override - public String toString() { - return "CacheKey [clazz=" + this.clazz.getName() + ", property=" + this.property + ", " + this.property - + ", targetIsClass=" + this.targetIsClass + "]"; - } - - @Override - public int compareTo(PropertyCacheKey other) { - int result = this.clazz.getName().compareTo(other.clazz.getName()); - if (result == 0) { - result = this.property.compareTo(other.property); - } - return result; - } - } - - /** - * An optimized form of a PropertyAccessor that will use reflection but only - * knows how to access a particular property on a particular class. This is - * unlike the general ReflectivePropertyResolver which manages a cache of - * methods/fields that may be invoked to access different properties on - * different classes. This optimal accessor exists because looking up the - * appropriate reflective object by class/name on each read is not cheap. - */ - public static class OptimalPropertyAccessor implements CompilablePropertyAccessor { - - /** - * The member being accessed. - */ - public final Member member; - - private final TypeDescriptor typeDescriptor; - - OptimalPropertyAccessor(InvokerPair target) { - this.member = target.member; - this.typeDescriptor = target.typeDescriptor; - } - - @Override - @Nullable - public Class[] getSpecificTargetClasses() { - throw new UnsupportedOperationException("Should not be called on an OptimalPropertyAccessor"); - } - - @Override - public boolean canRead(EvaluationContext context, @Nullable Object target, String name) throws AccessException { - if (target == null) { - return false; - } - Class type = (target instanceof Class ? (Class) target : target.getClass()); - if (type.isArray()) { - return false; - } - - if (this.member instanceof Method) { - Method method = (Method) this.member; - return name.equals(method.getName()); - } else { - Field field = (Field) this.member; - return field.getName().equals(name); - } - } - - @Override - public TypedValue read(EvaluationContext context, @Nullable Object target, String name) throws AccessException { - if (this.member instanceof Method) { - Method method = (Method) this.member; - try { - ReflectionUtils.makeAccessible(method); - Object value = method.invoke(target); - return new TypedValue(value, this.typeDescriptor.narrow(value)); - } catch (Exception ex) { - throw new AccessException("Unable to access property '" + name + "' through getter method", ex); - } - } else { - Field field = (Field) this.member; - try { - ReflectionUtils.makeAccessible(field); - Object value = field.get(target); - return new TypedValue(value, this.typeDescriptor.narrow(value)); - } catch (Exception ex) { - throw new AccessException("Unable to access field '" + name + "'", ex); - } - } - } - - @Override - public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) { - throw new UnsupportedOperationException("Should not be called on an OptimalPropertyAccessor"); - } - - @Override - public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object newValue) { - throw new UnsupportedOperationException("Should not be called on an OptimalPropertyAccessor"); - } - - @Override - public boolean isCompilable() { - return (Modifier.isPublic(this.member.getModifiers()) - && Modifier.isPublic(this.member.getDeclaringClass().getModifiers())); - } - - @Override - public Class getPropertyType() { - if (this.member instanceof Method) { - return ((Method) this.member).getReturnType(); - } else { - return ((Field) this.member).getType(); - } - } - - @Override - public void generateCode(String propertyName, MethodVisitor mv, CodeFlow cf) { - boolean isStatic = Modifier.isStatic(this.member.getModifiers()); - String descriptor = cf.lastDescriptor(); - String classDesc = this.member.getDeclaringClass().getName().replace('.', '/'); - - if (!isStatic) { - if (descriptor == null) { - cf.loadTarget(mv); - } - if (descriptor == null || !classDesc.equals(descriptor.substring(1))) { - mv.visitTypeInsn(CHECKCAST, classDesc); - } - } else { - if (descriptor != null) { - // A static field/method call will not consume what is on the stack, - // it needs to be popped off. - mv.visitInsn(POP); - } - } - - if (this.member instanceof Method) { - mv.visitMethodInsn((isStatic ? INVOKESTATIC : INVOKEVIRTUAL), classDesc, this.member.getName(), - CodeFlow.createSignatureDescriptor((Method) this.member), false); - } else { - mv.visitFieldInsn((isStatic ? GETSTATIC : GETFIELD), classDesc, this.member.getName(), - CodeFlow.toJvmDescriptor(((Field) this.member).getType())); - } - } - } + static { + Set> booleanTypes = new HashSet<>(4); + booleanTypes.add(Boolean.class); + booleanTypes.add(Boolean.TYPE); + } + + private final boolean allowWrite; + private final Map readerCache = new ConcurrentHashMap<>(64); + private final Map writerCache = new ConcurrentHashMap<>(64); + private final Map typeDescriptorCache = new ConcurrentHashMap<>(64); + @Nullable + private volatile InvokerPair lastReadInvokerPair; + + /** + * Create a new property accessor for reading as well writing. + * + * @see #ReflectivePropertyAccessor(boolean) + */ + public ReflectivePropertyAccessor() { + this.allowWrite = true; + } + + /** + * Create a new property accessor for reading and possibly writing. + * + * @param allowWrite whether to also allow for write operations + * @see #canWrite + * @since 4.3.15 + */ + public ReflectivePropertyAccessor(boolean allowWrite) { + this.allowWrite = allowWrite; + } + + /** + * Returns {@code null} which means this is a general purpose accessor. + */ + @Override + @Nullable + public Class[] getSpecificTargetClasses() { + return null; + } + + @Override + public boolean canRead(EvaluationContext context, @Nullable Object target, String name) throws AccessException { + if (target == null) { + return false; + } + + Class type = (target instanceof Class ? (Class) target : target.getClass()); + if (type.isArray() && name.equals("length")) { + return true; + } + + PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); + if (this.readerCache.containsKey(cacheKey)) { + return true; + } + Method method = getMethod(name, type); + if (method != null) { + TypeDescriptor typeDescriptor = new TypeDescriptor(new MethodParameter(method, -1)); + this.readerCache.put(cacheKey, new InvokerPair(method, typeDescriptor)); + this.typeDescriptorCache.put(cacheKey, typeDescriptor); + return true; + } else { + Field field = findField(name, type, target); + if (field != null) { + TypeDescriptor typeDescriptor = new TypeDescriptor(field); + this.readerCache.put(cacheKey, new InvokerPair(field, typeDescriptor)); + this.typeDescriptorCache.put(cacheKey, typeDescriptor); + return true; + } + } + + return false; + } + + private Method getMethod(String name, Class type) { + try { + return type.getMethod(name); + } catch (NoSuchMethodException | SecurityException e) { + return null; + } + } + + @Override + public TypedValue read(EvaluationContext context, @Nullable Object target, String name) throws AccessException { + Assert.state(target != null, "Target must not be null"); + Class type = (target instanceof Class ? (Class) target : target.getClass()); + + if (type.isArray() && name.equals("length")) { + if (target instanceof Class) { + throw new AccessException("Cannot access length on array class itself"); + } + return new TypedValue(Array.getLength(target)); + } + + PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); + InvokerPair invoker = this.readerCache.get(cacheKey); + this.lastReadInvokerPair = invoker; + + if (invoker == null || invoker.member instanceof Method) { + Method method = (Method) (invoker != null ? invoker.member : null); + if (method == null) { + method = getMethod(name, type); + if (method != null) { + // Treat it like a property... + // The readerCache will only contain gettable properties (let's not worry about + // setters for now). + TypeDescriptor typeDescriptor = new TypeDescriptor(new MethodParameter(method, -1)); + invoker = new InvokerPair(method, typeDescriptor); + this.lastReadInvokerPair = invoker; + this.readerCache.put(cacheKey, invoker); + } + } + if (method != null) { + try { + ReflectionUtils.makeAccessible(method); + Object value = method.invoke(target); + return new TypedValue(value, invoker.typeDescriptor.narrow(value)); + } catch (Exception ex) { + throw new AccessException("Unable to access property '" + name + "' through getter method", ex); + } + } + } + + if (invoker == null || invoker.member instanceof Field) { + Field field = (Field) (invoker == null ? null : invoker.member); + if (field == null) { + field = findField(name, type, target); + if (field != null) { + invoker = new InvokerPair(field, new TypeDescriptor(field)); + this.lastReadInvokerPair = invoker; + this.readerCache.put(cacheKey, invoker); + } + } + if (field != null) { + try { + ReflectionUtils.makeAccessible(field); + Object value = field.get(target); + return new TypedValue(value, invoker.typeDescriptor.narrow(value)); + } catch (Exception ex) { + throw new AccessException("Unable to access field '" + name + "'", ex); + } + } + } + + throw new AccessException("Neither getter method nor field found for property '" + name + "'"); + } + + @Override + public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) throws AccessException { + if (!this.allowWrite || target == null) { + return false; + } + + Class type = (target instanceof Class ? (Class) target : target.getClass()); + PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); + if (this.writerCache.containsKey(cacheKey)) { + return true; + } + + Method method = getMethod(name, type); + if (method != null) { + TypeDescriptor typeDescriptor = new TypeDescriptor(new MethodParameter(method, -1)); + this.writerCache.put(cacheKey, method); + this.typeDescriptorCache.put(cacheKey, typeDescriptor); + return true; + } else { + Field field = findField(name, type, target); + if (field != null) { + this.writerCache.put(cacheKey, field); + this.typeDescriptorCache.put(cacheKey, new TypeDescriptor(field)); + return true; + } + } + + return false; + } + + @Override + public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object newValue) throws AccessException { + + if (!this.allowWrite) { + throw new AccessException("PropertyAccessor for property '" + name + "' on target [" + target + "] does not allow write operations"); + } + + Assert.state(target != null, "Target must not be null"); + Class type = (target instanceof Class ? (Class) target : target.getClass()); + + Object possiblyConvertedNewValue = newValue; + TypeDescriptor typeDescriptor = getTypeDescriptor(context, target, name); + if (typeDescriptor != null) { + try { + possiblyConvertedNewValue = context.getTypeConverter().convertValue(newValue, TypeDescriptor.forObject(newValue), typeDescriptor); + } catch (EvaluationException evaluationException) { + throw new AccessException("Type conversion failure", evaluationException); + } + } + + PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); + Member cachedMember = this.writerCache.get(cacheKey); + + if (cachedMember == null || cachedMember instanceof Method) { + Method method = (Method) cachedMember; + if (method != null) { + try { + ReflectionUtils.makeAccessible(method); + method.invoke(target, possiblyConvertedNewValue); + return; + } catch (Exception ex) { + throw new AccessException("Unable to access property '" + name + "' through setter method", ex); + } + } + } + + if (cachedMember == null || cachedMember instanceof Field) { + Field field = (Field) cachedMember; + if (field == null) { + field = findField(name, type, target); + if (field != null) { + cachedMember = field; + this.writerCache.put(cacheKey, cachedMember); + } + } + if (field != null) { + try { + ReflectionUtils.makeAccessible(field); + field.set(target, possiblyConvertedNewValue); + return; + } catch (Exception ex) { + throw new AccessException("Unable to access field '" + name + "'", ex); + } + } + } + + throw new AccessException("Neither setter method nor field found for property '" + name + "'"); + } + + /** + * Get the last read invoker pair. + * + * @deprecated as of 4.3.15 since it is not used within the framework anymore + */ + @Deprecated + @Nullable + public Member getLastReadInvokerPair() { + InvokerPair lastReadInvoker = this.lastReadInvokerPair; + return (lastReadInvoker != null ? lastReadInvoker.member : null); + } + + @Nullable + private TypeDescriptor getTypeDescriptor(EvaluationContext context, Object target, String name) { + Class type = (target instanceof Class ? (Class) target : target.getClass()); + + if (type.isArray() && name.equals("length")) { + return TypeDescriptor.valueOf(Integer.TYPE); + } + PropertyCacheKey cacheKey = new PropertyCacheKey(type, name, target instanceof Class); + TypeDescriptor typeDescriptor = this.typeDescriptorCache.get(cacheKey); + if (typeDescriptor == null) { + // Attempt to populate the cache entry + try { + if (canRead(context, target, name) || canWrite(context, target, name)) { + typeDescriptor = this.typeDescriptorCache.get(cacheKey); + } + } catch (AccessException ex) { + // Continue with null type descriptor + } + } + return typeDescriptor; + } + + /** + * Determine whether the given {@code Method} is a candidate for property access + * on an instance of the given target class. + *

+ * The default implementation considers any method as a candidate, even for + * non-user-declared properties on the {@link Object} base class. + * + * @param method the Method to evaluate + * @param targetClass the concrete target class that is being introspected + * @since 4.3.15 + */ + protected boolean isCandidateForProperty(Method method, Class targetClass) { + return true; + } + + @Nullable + private Field findField(String name, Class clazz, Object target) { + Field field = findField(name, clazz, target instanceof Class); + if (field == null && target instanceof Class) { + field = findField(name, target.getClass(), false); + } + return field; + } + + /** + * Find a field of a certain name on a specified class. + */ + @Nullable + protected Field findField(String name, Class clazz, boolean mustBeStatic) { + Field[] fields = clazz.getFields(); + for (Field field : fields) { + if (field.getName().equals(name) && (!mustBeStatic || Modifier.isStatic(field.getModifiers()))) { + return field; + } + } + // We'll search superclasses and implemented interfaces explicitly, + // although it shouldn't be necessary - however, see SPR-10125. + if (clazz.getSuperclass() != null) { + Field field = findField(name, clazz.getSuperclass(), mustBeStatic); + if (field != null) { + return field; + } + } + for (Class implementedInterface : clazz.getInterfaces()) { + Field field = findField(name, implementedInterface, mustBeStatic); + if (field != null) { + return field; + } + } + return null; + } + + /** + * Attempt to create an optimized property accessor tailored for a property of a + * particular name on a particular class. The general ReflectivePropertyAccessor + * will always work but is not optimal due to the need to lookup which + * reflective member (method/field) to use each time read() is called. This + * method will just return the ReflectivePropertyAccessor instance if it is + * unable to build a more optimal accessor. + *

+ * Note: An optimal accessor is currently only usable for read attempts. Do not + * call this method if you need a read-write accessor. + * + * @see OptimalPropertyAccessor + */ + public PropertyAccessor createOptimalAccessor(@Nullable Object target, String name) { + // Don't be clever for arrays or a null target... + if (target == null) { + return this; + } + Class clazz = (target instanceof Class ? (Class) target : target.getClass()); + if (clazz.isArray()) { + return this; + } + + PropertyCacheKey cacheKey = new PropertyCacheKey(clazz, name, target instanceof Class); + InvokerPair invocationTarget = this.readerCache.get(cacheKey); + + if (invocationTarget == null || invocationTarget.member instanceof Method) { + Method method = (Method) (invocationTarget != null ? invocationTarget.member : null); + if (method == null) { + method = getMethod(name, clazz); + if (method != null) { + invocationTarget = new InvokerPair(method, new TypeDescriptor(new MethodParameter(method, -1))); + ReflectionUtils.makeAccessible(method); + this.readerCache.put(cacheKey, invocationTarget); + } + } + if (method != null) { + return new OptimalPropertyAccessor(invocationTarget); + } + } + + if (invocationTarget == null || invocationTarget.member instanceof Field) { + Field field = (invocationTarget != null ? (Field) invocationTarget.member : null); + if (field == null) { + field = findField(name, clazz, target instanceof Class); + if (field != null) { + invocationTarget = new InvokerPair(field, new TypeDescriptor(field)); + ReflectionUtils.makeAccessible(field); + this.readerCache.put(cacheKey, invocationTarget); + } + } + if (field != null) { + return new OptimalPropertyAccessor(invocationTarget); + } + } + + return this; + } + + /** + * Captures the member (method/field) to call reflectively to access a property + * value and the type descriptor for the value returned by the reflective call. + */ + private static class InvokerPair { + + final Member member; + + final TypeDescriptor typeDescriptor; + + public InvokerPair(Member member, TypeDescriptor typeDescriptor) { + this.member = member; + this.typeDescriptor = typeDescriptor; + } + } + + private static final class PropertyCacheKey implements Comparable { + + private final Class clazz; + private final String property; + private final boolean targetIsClass; + + public PropertyCacheKey(Class clazz, String name, boolean targetIsClass) { + this.clazz = clazz; + this.property = name; + this.targetIsClass = targetIsClass; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (!(other instanceof PropertyCacheKey)) { + return false; + } + PropertyCacheKey otherKey = (PropertyCacheKey) other; + return (this.clazz == otherKey.clazz && this.property.equals(otherKey.property) && this.targetIsClass == otherKey.targetIsClass); + } + + @Override + public int hashCode() { + return (this.clazz.hashCode() * 29 + this.property.hashCode()); + } + + @Override + public String toString() { + return "CacheKey [clazz=" + this.clazz.getName() + ", property=" + this.property + ", " + this.property + ", targetIsClass=" + this.targetIsClass + "]"; + } + + @Override + public int compareTo(PropertyCacheKey other) { + int result = this.clazz.getName().compareTo(other.clazz.getName()); + if (result == 0) { + result = this.property.compareTo(other.property); + } + return result; + } + } + + /** + * An optimized form of a PropertyAccessor that will use reflection but only + * knows how to access a particular property on a particular class. This is + * unlike the general ReflectivePropertyResolver which manages a cache of + * methods/fields that may be invoked to access different properties on + * different classes. This optimal accessor exists because looking up the + * appropriate reflective object by class/name on each read is not cheap. + */ + public static class OptimalPropertyAccessor implements CompilablePropertyAccessor { + + /** + * The member being accessed. + */ + public final Member member; + + private final TypeDescriptor typeDescriptor; + + OptimalPropertyAccessor(InvokerPair target) { + this.member = target.member; + this.typeDescriptor = target.typeDescriptor; + } + + @Override + @Nullable + public Class[] getSpecificTargetClasses() { + throw new UnsupportedOperationException("Should not be called on an OptimalPropertyAccessor"); + } + + @Override + public boolean canRead(EvaluationContext context, @Nullable Object target, String name) throws AccessException { + if (target == null) { + return false; + } + Class type = (target instanceof Class ? (Class) target : target.getClass()); + if (type.isArray()) { + return false; + } + + if (this.member instanceof Method) { + Method method = (Method) this.member; + return name.equals(method.getName()); + } else { + Field field = (Field) this.member; + return field.getName().equals(name); + } + } + + @Override + public TypedValue read(EvaluationContext context, @Nullable Object target, String name) throws AccessException { + if (this.member instanceof Method) { + Method method = (Method) this.member; + try { + ReflectionUtils.makeAccessible(method); + Object value = method.invoke(target); + return new TypedValue(value, this.typeDescriptor.narrow(value)); + } catch (Exception ex) { + throw new AccessException("Unable to access property '" + name + "' through getter method", ex); + } + } else { + Field field = (Field) this.member; + try { + ReflectionUtils.makeAccessible(field); + Object value = field.get(target); + return new TypedValue(value, this.typeDescriptor.narrow(value)); + } catch (Exception ex) { + throw new AccessException("Unable to access field '" + name + "'", ex); + } + } + } + + @Override + public boolean canWrite(EvaluationContext context, @Nullable Object target, String name) { + throw new UnsupportedOperationException("Should not be called on an OptimalPropertyAccessor"); + } + + @Override + public void write(EvaluationContext context, @Nullable Object target, String name, @Nullable Object newValue) { + throw new UnsupportedOperationException("Should not be called on an OptimalPropertyAccessor"); + } + + @Override + public boolean isCompilable() { + return (Modifier.isPublic(this.member.getModifiers()) && Modifier.isPublic(this.member.getDeclaringClass().getModifiers())); + } + + @Override + public Class getPropertyType() { + if (this.member instanceof Method) { + return ((Method) this.member).getReturnType(); + } else { + return ((Field) this.member).getType(); + } + } + + @Override + public void generateCode(String propertyName, MethodVisitor mv, CodeFlow cf) { + boolean isStatic = Modifier.isStatic(this.member.getModifiers()); + String descriptor = cf.lastDescriptor(); + String classDesc = this.member.getDeclaringClass().getName().replace('.', '/'); + + if (!isStatic) { + if (descriptor == null) { + cf.loadTarget(mv); + } + if (descriptor == null || !classDesc.equals(descriptor.substring(1))) { + mv.visitTypeInsn(CHECKCAST, classDesc); + } + } else { + if (descriptor != null) { + // A static field/method call will not consume what is on the stack, + // it needs to be popped off. + mv.visitInsn(POP); + } + } + + if (this.member instanceof Method) { + mv.visitMethodInsn((isStatic ? INVOKESTATIC : INVOKEVIRTUAL), classDesc, this.member.getName(), CodeFlow.createSignatureDescriptor((Method) this.member), false); + } else { + mv.visitFieldInsn((isStatic ? GETSTATIC : GETFIELD), classDesc, this.member.getName(), CodeFlow.toJvmDescriptor(((Field) this.member).getType())); + } + } + } } diff --git a/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java b/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java index 3d03ffaca..22f72762c 100644 --- a/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java +++ b/connectors/redis/src/main/java/com/redislabs/riot/redis/ReplicateCommand.java @@ -31,9 +31,10 @@ public class ReplicateCommand extends AbstractTransferCommand, KeyValue> { @CommandLine.ArgGroup(exclusive = false, heading = "Target Redis connection options%n") - private RedisOptions targetRedis = new RedisOptions(); + private RedisOptions targetRedis = RedisOptions.builder().build(); @CommandLine.ArgGroup(exclusive = false, heading = "Source Redis reader options%n") private RedisReaderOptions options = RedisReaderOptions.builder().build(); + @SuppressWarnings("unused") @Option(names = "--live", description = "Enable live replication.") private boolean live; @CommandLine.Mixin @@ -89,11 +90,11 @@ public void shutdown() { @Override protected Flow flow() { String name = "Scanning"; - StepBuilder, KeyValue> replicationStep = stepBuilder(name); + StepBuilder, KeyValue> replicationStep = stepBuilder("scan-replication-step", name); FlowBuilder flow = flow(name).start(replicationStep.reader(sourceKeyDumpReader()).writer(targetKeyDumpWriter()).build().build()); if (live) { String liveReplicationName = "Listening"; - StepBuilder, KeyValue> liveReplicationStep = stepBuilder(liveReplicationName); + StepBuilder, KeyValue> liveReplicationStep = stepBuilder("live-replication-step", liveReplicationName); KeyDumpItemReader liveReader = liveReader(); liveReader.setName("Live" + ClassUtils.getShortName(liveReader.getClass())); SimpleFlow liveFlow = flow(liveReplicationName).start(flushingOptions.configure(liveReplicationStep.reader(liveReader).writer(targetKeyDumpWriter()).build()).build()).build(); @@ -101,7 +102,7 @@ protected Flow flow() { } if (verify) { KeyComparisonItemWriter writer = comparisonWriter(); - StepBuilder,DataStructure> verifyStep = stepBuilder("Verifying"); + StepBuilder, DataStructure> verifyStep = stepBuilder("verification-step", "Verifying"); flow = flow("Replication+Verification").start(flow.build()).next(verifyStep.reader(sourceDataStructureReader()).writer(writer).extraMessage(() -> message(writer)).build().build()); } return flow.build(); @@ -115,6 +116,7 @@ private String message(KeyComparisonItemWriter writer) { return String.format(" OK:%s V:%s >:%s <:%s T:%s", writer.getOkCount(), v, l, r, t); } + @SuppressWarnings("unchecked") private KeyComparisonItemWriter comparisonWriter() { Duration ttlToleranceDuration = Duration.ofSeconds(ttlTolerance); if (targetRedis.isCluster()) { @@ -125,6 +127,7 @@ private KeyComparisonItemWriter comparisonWriter() { return new KeyComparisonItemWriter<>(targetReader, ttlToleranceDuration); } + @SuppressWarnings("unchecked") private ItemReader> sourceKeyDumpReader() { if (isCluster()) { return configureScanReader(KeyDumpItemReader.builder((GenericObjectPool>) pool, (StatefulRedisClusterConnection) connection)).build(); @@ -132,6 +135,7 @@ private ItemReader> sourceKeyDumpReader() { return configureScanReader(KeyDumpItemReader.builder((GenericObjectPool>) pool, (StatefulRedisConnection) connection)).build(); } + @SuppressWarnings("unchecked") private ItemReader> sourceDataStructureReader() { if (isCluster()) { return configureScanReader(DataStructureItemReader.builder((GenericObjectPool>) pool, (StatefulRedisClusterConnection) connection)).build(); @@ -139,6 +143,7 @@ private ItemReader> sourceDataStructureReader() { return configureScanReader(DataStructureItemReader.builder((GenericObjectPool>) pool, (StatefulRedisConnection) connection)).build(); } + @SuppressWarnings("unchecked") private KeyDumpItemReader liveReader() { if (isCluster()) { return configureLiveReader(KeyDumpItemReader.builder((GenericObjectPool>) pool, (StatefulRedisClusterPubSubConnection) pubSubConnection)).build(); @@ -156,10 +161,11 @@ private > B configureLiveReader(B bui return builder; } - private void configureReader(AbstractKeyValueItemReader.AbstractKeyValueItemReaderBuilder builder) { - builder.threadCount(options.getThreads()).chunkSize(options.getBatchSize()).commandTimeout(getCommandTimeout()).queueCapacity(options.getQueueCapacity()); + private > void configureReader(B builder) { + configureCommandTimeoutBuilder(builder.threadCount(options.getThreads()).chunkSize(options.getBatchSize()).queueCapacity(options.getQueueCapacity())); } + @SuppressWarnings("unchecked") private ItemWriter> targetKeyDumpWriter() { if (targetRedis.isCluster()) { return KeyDumpItemWriter.clusterBuilder((GenericObjectPool>) targetPool).replace(true).commandTimeout(getTargetCommandTimeout()).build(); diff --git a/connectors/redis/src/main/java/com/redislabs/riot/redis/RiotRedis.java b/connectors/redis/src/main/java/com/redislabs/riot/redis/RiotRedis.java index 8a5823754..cff4f3b9d 100644 --- a/connectors/redis/src/main/java/com/redislabs/riot/redis/RiotRedis.java +++ b/connectors/redis/src/main/java/com/redislabs/riot/redis/RiotRedis.java @@ -11,4 +11,5 @@ public class RiotRedis extends RiotApp { public static void main(String[] args) { System.exit(new RiotRedis().execute(args)); } + } diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java index 3819acc9c..872b27dd9 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/KafkaOptions.java @@ -3,8 +3,7 @@ import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig; import io.confluent.kafka.serializers.KafkaAvroDeserializer; import io.confluent.kafka.serializers.KafkaAvroSerializer; -import lombok.Data; -import lombok.Getter; +import lombok.*; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; @@ -20,6 +19,9 @@ import java.util.*; @Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class KafkaOptions { public enum SerDe { @@ -28,13 +30,16 @@ public enum SerDe { @Option(names = "--broker", arity = "1..*", description = "One or more brokers in the form host:port", paramLabel = "") private List brokers; + @Builder.Default @Option(names = "--group", description = "Consumer group id", paramLabel = "") private String groupId = "$Default"; @Getter @Option(names = "--schema-registry-url", description = "Schema registry URL", paramLabel = "") private String schemaRegistryUrl; + @Builder.Default @Option(names = {"-p", "--property"}, arity = "1..*", description = "Additional consumer properties", paramLabel = "") private Map properties = new HashMap<>(); + @Builder.Default @Option(names = "--serde", description = "Serializer/Deserializer: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "") private SerDe serde = SerDe.JSON; diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java index 973c52bce..7d555fdf2 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamExportCommand.java @@ -29,16 +29,18 @@ @Command(name = "export", description = "Import Redis streams into Kafka topics") public class StreamExportCommand extends AbstractTransferCommand, ProducerRecord> { + @SuppressWarnings("unused") @Parameters(arity = "1..*", description = "One ore more streams to read from", paramLabel = "STREAM") private List streams; @CommandLine.Mixin - private KafkaOptions options = new KafkaOptions(); + private KafkaOptions options = KafkaOptions.builder().build(); @CommandLine.Mixin private FlushingTransferOptions flushingOptions = FlushingTransferOptions.builder().build(); @Option(names = "--block", description = "XREAD block time in millis (default: ${DEFAULT-VALUE})", hidden = true, paramLabel = "") private long block = 100; @Option(names = "--offset", description = "XREAD offset (default: ${DEFAULT-VALUE})", paramLabel = "") private String offset = "0-0"; + @SuppressWarnings("unused") @Option(names = "--topic", description = "Target topic key (default: same as stream)", paramLabel = "") private String topic; @@ -46,7 +48,7 @@ public class StreamExportCommand extends AbstractTransferCommand steps = new ArrayList<>(); for (String stream : streams) { - StepBuilder, ProducerRecord> step = stepBuilder("Exporting stream " + stream); + StepBuilder, ProducerRecord> step = stepBuilder(stream + "-stream-export-step", "Exporting from " + stream); steps.add(flushingOptions.configure(step.reader(reader(StreamOffset.from(stream, offset))).processor(processor()).writer(writer()).build()).build()); } return flow(steps.toArray(new Step[0])); diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java index 27a32e296..bdae09391 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/StreamImportCommand.java @@ -32,14 +32,18 @@ public class StreamImportCommand extends AbstractTransferCommand, ConsumerRecord> { + @SuppressWarnings("unused") @Parameters(arity = "1..*", description = "One ore more topics to read from", paramLabel = "TOPIC") private List topics; @CommandLine.Mixin - private KafkaOptions options = new KafkaOptions(); + private KafkaOptions options = KafkaOptions.builder().build(); + @SuppressWarnings("unused") @Option(names = "--key", description = "Target stream key (default: same as topic)", paramLabel = "") private String key; + @SuppressWarnings("unused") @Option(names = "--maxlen", description = "Stream maxlen", paramLabel = "") private Long maxlen; + @SuppressWarnings("unused") @Option(names = "--trim", description = "Stream efficient trimming ('~' flag)") private boolean approximateTrimming; @CommandLine.Mixin @@ -50,12 +54,13 @@ protected Flow flow() { List steps = new ArrayList<>(); for (String topic : topics) { KafkaItemReader reader = new KafkaItemReaderBuilder().partitions(0).consumerProperties(options.consumerProperties()).partitions(0).name(topic).saveState(false).topic(topic).build(); - StepBuilder, ConsumerRecord> step = stepBuilder("Importing topic " + topic); + StepBuilder, ConsumerRecord> step = stepBuilder(topic + "-stream-import-step", "Importing from " + topic); steps.add(step.reader(reader).writer(writer()).build().build()); } return flow(steps.toArray(new Step[0])); } + @SuppressWarnings({"unchecked", "rawtypes"}) private ItemWriter> writer() { XAddArgs xAddArgs = xAddArgs(); BiFunction, ConsumerRecord, RedisFuture> command = CommandBuilder.>xadd().keyConverter(keyConverter()).argsConverter(r -> xAddArgs).bodyConverter(bodyConverter()).build(); diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java index 02670b3f4..13dda9ebf 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemReader.java @@ -16,15 +16,6 @@ package com.redislabs.riot.stream.kafka; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; @@ -34,13 +25,15 @@ import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader; import org.springframework.util.Assert; +import java.time.Duration; +import java.util.*; + /** *

* An {@link org.springframework.batch.item.ItemReader} implementation for * Apache Kafka. Uses a {@link KafkaConsumer} to read data from a given topic. * Multiple partitions within the same topic can be assigned to this reader. *

- * *

* Since {@link KafkaConsumer} is not thread-safe, this reader is not * thead-safe. @@ -52,158 +45,129 @@ */ public class KafkaItemReader extends AbstractItemCountingItemStreamItemReader> { - private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets"; - - private static final long DEFAULT_POLL_TIMEOUT = 30L; - - private List topicPartitions; - - private Map partitionOffsets; - - private KafkaConsumer kafkaConsumer; - - private Properties consumerProperties; - - private Iterator> consumerRecords; - - private Duration pollTimeout = Duration.ofSeconds(DEFAULT_POLL_TIMEOUT); - - private boolean saveState = true; - - /** - * Create a new {@link KafkaItemReader}. - *

- * {@code consumerProperties} must contain the following keys: - * 'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer' - * - *

- * . - * - * @param consumerProperties properties of the consumer - * @param topicName name of the topic to read data from - * @param partitions list of partitions to read data from - */ - public KafkaItemReader(Properties consumerProperties, String topicName, Integer... partitions) { - this(consumerProperties, topicName, Arrays.asList(partitions)); - } - - /** - * Create a new {@link KafkaItemReader}. - *

- * {@code consumerProperties} must contain the following keys: - * 'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer' - * - *

- * . - * - * @param consumerProperties properties of the consumer - * @param topicName name of the topic to read data from - * @param partitions list of partitions to read data from - */ - public KafkaItemReader(Properties consumerProperties, String topicName, List partitions) { - Assert.notNull(consumerProperties, "Consumer properties must not be null"); - Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), - ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " property must be provided"); - Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG), - ConsumerConfig.GROUP_ID_CONFIG + " property must be provided"); - Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + " property must be provided"); - Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided"); - this.consumerProperties = consumerProperties; - Assert.hasLength(topicName, "Topic name must not be null or empty"); - Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided"); - this.topicPartitions = new ArrayList<>(); - for (Integer partition : partitions) { - this.topicPartitions.add(new TopicPartition(topicName, partition)); - } - } - - /** - * Set a timeout for the consumer topic polling duration. Default to 30 seconds. - * - * @param pollTimeout for the consumer poll operation - */ - public void setPollTimeout(Duration pollTimeout) { - Assert.notNull(pollTimeout, "pollTimeout must not be null"); - Assert.isTrue(!pollTimeout.isZero(), "pollTimeout must not be zero"); - Assert.isTrue(!pollTimeout.isNegative(), "pollTimeout must not be negative"); - this.pollTimeout = pollTimeout; - } - - /** - * Set the flag that determines whether to save internal data for - * {@link ExecutionContext}. Only switch this to false if you don't want to save - * any state from this stream, and you don't need it to be restartable. Always - * set it to false if the reader is being used in a concurrent environment. - * - * @param saveState flag value (default true). - */ - public void setSaveState(boolean saveState) { - this.saveState = saveState; - } - - /** - * The flag that determines whether to save internal state for restarts. - * - * @return true if the flag was set - */ - public boolean isSaveState() { - return this.saveState; - } - - @SuppressWarnings("unchecked") - @Override - public void open(ExecutionContext executionContext) throws ItemStreamException { - this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties); - this.partitionOffsets = new HashMap<>(); - for (TopicPartition topicPartition : this.topicPartitions) { - this.partitionOffsets.put(topicPartition, 0L); - } - if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) { - Map offsets = (Map) executionContext - .get(TOPIC_PARTITION_OFFSETS); - for (Map.Entry entry : offsets.entrySet()) { - this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1); - } - } - this.kafkaConsumer.assign(this.topicPartitions); - this.partitionOffsets.forEach(this.kafkaConsumer::seek); - super.open(executionContext); - } - - @Override - protected void doOpen() throws Exception { - // already done in open() - } - - @Override - protected ConsumerRecord doRead() throws Exception { - if (this.consumerRecords == null || !this.consumerRecords.hasNext()) { - this.consumerRecords = this.kafkaConsumer.poll(this.pollTimeout).iterator(); - } - if (this.consumerRecords.hasNext()) { - ConsumerRecord record = this.consumerRecords.next(); - this.partitionOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); - return record; - } else { - return null; - } - } - - @Override - public void update(ExecutionContext executionContext) { - if (this.saveState) { - executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets)); - } - this.kafkaConsumer.commitSync(); - } - - @Override - protected void doClose() throws Exception { - if (this.kafkaConsumer != null) { - this.kafkaConsumer.close(); - } - } + private static final String TOPIC_PARTITION_OFFSETS = "topic.partition.offsets"; + private static final long DEFAULT_POLL_TIMEOUT = 30L; + + private final List topicPartitions; + private final Properties consumerProperties; + private Map partitionOffsets; + private KafkaConsumer kafkaConsumer; + private Iterator> consumerRecords; + private Duration pollTimeout = Duration.ofSeconds(DEFAULT_POLL_TIMEOUT); + private boolean saveState = true; + + /** + * Create a new {@link KafkaItemReader}. + *

+ * {@code consumerProperties} must contain the following keys: + * 'bootstrap.servers', 'group.id', 'key.deserializer' and 'value.deserializer' + * + *

+ * . + * + * @param consumerProperties properties of the consumer + * @param topicName name of the topic to read data from + * @param partitions list of partitions to read data from + */ + public KafkaItemReader(Properties consumerProperties, String topicName, List partitions) { + Assert.notNull(consumerProperties, "Consumer properties must not be null"); + Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG + " property must be provided"); + Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.GROUP_ID_CONFIG), ConsumerConfig.GROUP_ID_CONFIG + " property must be provided"); + Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG), ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + " property must be provided"); + Assert.isTrue(consumerProperties.containsKey(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG), ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + " property must be provided"); + this.consumerProperties = consumerProperties; + Assert.hasLength(topicName, "Topic name must not be null or empty"); + Assert.isTrue(!partitions.isEmpty(), "At least one partition must be provided"); + this.topicPartitions = new ArrayList<>(); + for (Integer partition : partitions) { + this.topicPartitions.add(new TopicPartition(topicName, partition)); + } + } + + /** + * Set a timeout for the consumer topic polling duration. Default to 30 seconds. + * + * @param pollTimeout for the consumer poll operation + */ + public void setPollTimeout(Duration pollTimeout) { + Assert.notNull(pollTimeout, "pollTimeout must not be null"); + Assert.isTrue(!pollTimeout.isZero(), "pollTimeout must not be zero"); + Assert.isTrue(!pollTimeout.isNegative(), "pollTimeout must not be negative"); + this.pollTimeout = pollTimeout; + } + + /** + * Set the flag that determines whether to save internal data for + * {@link ExecutionContext}. Only switch this to false if you don't want to save + * any state from this stream, and you don't need it to be restartable. Always + * set it to false if the reader is being used in a concurrent environment. + * + * @param saveState flag value (default true). + */ + public void setSaveState(boolean saveState) { + this.saveState = saveState; + } + + /** + * The flag that determines whether to save internal state for restarts. + * + * @return true if the flag was set + */ + public boolean isSaveState() { + return this.saveState; + } + + @SuppressWarnings("unchecked") + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + this.kafkaConsumer = new KafkaConsumer<>(this.consumerProperties); + this.partitionOffsets = new HashMap<>(); + for (TopicPartition topicPartition : this.topicPartitions) { + this.partitionOffsets.put(topicPartition, 0L); + } + if (this.saveState && executionContext.containsKey(TOPIC_PARTITION_OFFSETS)) { + Map offsets = (Map) executionContext.get(TOPIC_PARTITION_OFFSETS); + for (Map.Entry entry : offsets.entrySet()) { + this.partitionOffsets.put(entry.getKey(), entry.getValue() == 0 ? 0 : entry.getValue() + 1); + } + } + this.kafkaConsumer.assign(this.topicPartitions); + this.partitionOffsets.forEach(this.kafkaConsumer::seek); + super.open(executionContext); + } + + @Override + protected void doOpen() { + // already done in open() + } + + @Override + protected ConsumerRecord doRead() { + if (this.consumerRecords == null || !this.consumerRecords.hasNext()) { + this.consumerRecords = this.kafkaConsumer.poll(this.pollTimeout).iterator(); + } + if (this.consumerRecords.hasNext()) { + ConsumerRecord record = this.consumerRecords.next(); + this.partitionOffsets.put(new TopicPartition(record.topic(), record.partition()), record.offset()); + return record; + } else { + return null; + } + } + + @Override + public void update(ExecutionContext executionContext) { + if (this.saveState) { + executionContext.put(TOPIC_PARTITION_OFFSETS, new HashMap<>(this.partitionOffsets)); + } + this.kafkaConsumer.commitSync(); + } + + @Override + protected void doClose() { + if (this.kafkaConsumer != null) { + this.kafkaConsumer.close(); + } + } } diff --git a/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemWriter.java b/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemWriter.java index 61917baf7..2e14e7f79 100644 --- a/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemWriter.java +++ b/connectors/stream/src/main/java/com/redislabs/riot/stream/kafka/KafkaItemWriter.java @@ -18,12 +18,12 @@ public class KafkaItemWriter extends AbstractItemStreamItemWriter kafkaTemplate; @Builder - public KafkaItemWriter(@NonNull KafkaTemplate kafkaTemplate) { + private KafkaItemWriter(@NonNull KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } @Override - public void write(List> items) throws Exception { + public void write(List> items) { for (ProducerRecord item : items) { this.kafkaTemplate.send(item); } diff --git a/connectors/stream/src/test/java/com/redislabs/riot/stream/TestKafka.java b/connectors/stream/src/test/java/com/redislabs/riot/stream/TestKafka.java index 67ba3edc0..5083697c1 100644 --- a/connectors/stream/src/test/java/com/redislabs/riot/stream/TestKafka.java +++ b/connectors/stream/src/test/java/com/redislabs/riot/stream/TestKafka.java @@ -57,10 +57,10 @@ public void testImport() throws Exception { StreamImportCommand command = (StreamImportCommand) command("/import.txt"); JobExecution execution = command.executeAsync(); Thread.sleep(500); - KafkaProducer> producer = new KafkaProducer<>(ImmutableMap.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()), new StringSerializer(), new JsonSerializer>()); + KafkaProducer> producer = new KafkaProducer<>(ImmutableMap.of(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(), ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString()), new StringSerializer(), new JsonSerializer<>()); int count = 100; for (int index = 0; index < count; index++) { - ProducerRecord> record = new ProducerRecord>("topic1", map()); + ProducerRecord> record = new ProducerRecord<>("topic1", map()); Future future = producer.send(record); future.get(); } diff --git a/core/src/main/java/com/redislabs/riot/AbstractExportCommand.java b/core/src/main/java/com/redislabs/riot/AbstractExportCommand.java index afd35f7a1..59d06a1f0 100644 --- a/core/src/main/java/com/redislabs/riot/AbstractExportCommand.java +++ b/core/src/main/java/com/redislabs/riot/AbstractExportCommand.java @@ -19,19 +19,20 @@ public abstract class AbstractExportCommand extends AbstractTransferCommand, O>> step(ItemProcessor, O> processor, ItemWriter writer) throws Exception { - StepBuilder, O> step = stepBuilder("Exporting from " + name(getRedisURI())); + String name = name(getRedisURI()); + StepBuilder, O> step = stepBuilder(name + "-export-step", "Exporting from " + name); return step.reader(reader()).processor(processor).writer(writer).build(); } protected final ItemReader> reader() { if (isCluster()) { - return configure(DataStructureItemReader.builder((GenericObjectPool>) pool, (StatefulRedisClusterConnection) connection)).build(); + return configureScanKeyValueReaderBuilder(DataStructureItemReader.builder((GenericObjectPool>) pool, (StatefulRedisClusterConnection) connection)).build(); } - return configure(DataStructureItemReader.builder((GenericObjectPool>) pool, (StatefulRedisConnection) connection)).build(); + return configureScanKeyValueReaderBuilder(DataStructureItemReader.builder((GenericObjectPool>) pool, (StatefulRedisConnection) connection)).build(); } - private ScanKeyValueItemReaderBuilder> configure(ScanKeyValueItemReaderBuilder> builder) { - return builder.commandTimeout(getCommandTimeout()).chunkSize(options.getBatchSize()).queueCapacity(options.getQueueCapacity()).threadCount(options.getThreads()).scanMatch(options.getScanMatch()).scanCount(options.getScanCount()); + private ScanKeyValueItemReaderBuilder> configureScanKeyValueReaderBuilder(ScanKeyValueItemReaderBuilder> builder) { + return builder.chunkSize(options.getBatchSize()).queueCapacity(options.getQueueCapacity()).threadCount(options.getThreads()).scanMatch(options.getScanMatch()).scanCount(options.getScanCount()); } } diff --git a/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java b/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java index cea9d3c79..d583d8208 100644 --- a/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java +++ b/core/src/main/java/com/redislabs/riot/AbstractImportCommand.java @@ -30,8 +30,8 @@ public abstract class AbstractImportCommand extends AbstractTransferComman @Getter private final List> redisCommands = new ArrayList<>(); - protected AbstractTaskletStepBuilder> step(String name, ItemReader reader) { - StepBuilder step = stepBuilder(name); + protected AbstractTaskletStepBuilder> step(String name, String taskName, ItemReader reader) { + StepBuilder step = stepBuilder(name, taskName); return step.reader(reader).processor(processor()).writer(writer()).build(); } @@ -51,15 +51,9 @@ protected ItemWriter writer() { private Function, ItemWriter> writerProvider() { if (isCluster()) { - return c -> { - CommandItemWriter.CommandItemWriterBuilder builder = CommandItemWriter.clusterBuilder((GenericObjectPool>) pool, (BiFunction) c.command()); - return builder.commandTimeout(getCommandTimeout()).build(); - }; + return c -> configureCommandTimeoutBuilder(CommandItemWriter.clusterBuilder((GenericObjectPool>) pool, (BiFunction) c.command())).build(); } - return c -> { - CommandItemWriter.CommandItemWriterBuilder builder = CommandItemWriter.builder((GenericObjectPool>) pool, (BiFunction) c.command()); - return builder.commandTimeout(getCommandTimeout()).build(); - }; + return c -> configureCommandTimeoutBuilder(CommandItemWriter.builder((GenericObjectPool>) pool, (BiFunction) c.command())).build(); } diff --git a/core/src/main/java/com/redislabs/riot/AbstractTaskCommand.java b/core/src/main/java/com/redislabs/riot/AbstractTaskCommand.java index 131c4ea7c..3529b01cf 100644 --- a/core/src/main/java/com/redislabs/riot/AbstractTaskCommand.java +++ b/core/src/main/java/com/redislabs/riot/AbstractTaskCommand.java @@ -17,6 +17,7 @@ @CommandLine.Command public abstract class AbstractTaskCommand extends RiotCommand { + @SuppressWarnings("unused") @CommandLine.Spec private CommandLine.Model.CommandSpec spec; diff --git a/core/src/main/java/com/redislabs/riot/AbstractTransferCommand.java b/core/src/main/java/com/redislabs/riot/AbstractTransferCommand.java index 420ef6df3..2718b8d34 100644 --- a/core/src/main/java/com/redislabs/riot/AbstractTransferCommand.java +++ b/core/src/main/java/com/redislabs/riot/AbstractTransferCommand.java @@ -7,9 +7,8 @@ public abstract class AbstractTransferCommand extends AbstractTaskCommand @CommandLine.Mixin private TransferOptions transferOptions = TransferOptions.builder().build(); - protected StepBuilder stepBuilder(String name) { - return new StepBuilder(jobFactory, transferOptions).name(name); + protected StepBuilder stepBuilder(String name, String taskName) { + return new StepBuilder(jobFactory, transferOptions).name(name).taskName(taskName); } - } diff --git a/core/src/main/java/com/redislabs/riot/HelpCommand.java b/core/src/main/java/com/redislabs/riot/HelpCommand.java index 73829e3cd..d32d6fba1 100644 --- a/core/src/main/java/com/redislabs/riot/HelpCommand.java +++ b/core/src/main/java/com/redislabs/riot/HelpCommand.java @@ -5,15 +5,10 @@ import java.util.concurrent.Callable; @CommandLine.Command(usageHelpAutoWidth = true) -public class HelpCommand implements Callable { +public class HelpCommand { - @CommandLine.Option(names = "--help", usageHelp = true, description = "Show this help message and exit") - private boolean helpRequested; - - @Override - public Integer call() throws Exception { - CommandLine.usage(this, System.out); - return 0; - } + @SuppressWarnings("unused") + @CommandLine.Option(names = {"-H", "--help"}, usageHelp = true, description = "Show this help message and exit.") + private boolean helpRequested; } diff --git a/core/src/main/java/com/redislabs/riot/HiddenGenerateCompletion.java b/core/src/main/java/com/redislabs/riot/HiddenGenerateCompletion.java deleted file mode 100644 index 86d0b52ff..000000000 --- a/core/src/main/java/com/redislabs/riot/HiddenGenerateCompletion.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.redislabs.riot; - -import picocli.AutoComplete.GenerateCompletion; -import picocli.CommandLine.Command; - -@Command(hidden = true, name = "generate-completion", usageHelpAutoWidth = true) -public class HiddenGenerateCompletion extends GenerateCompletion { - -} diff --git a/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java b/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java index d9d6d9b35..2f42fe7b6 100644 --- a/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java +++ b/core/src/main/java/com/redislabs/riot/KeyValueProcessingOptions.java @@ -5,7 +5,9 @@ import com.redislabs.riot.processor.MapProcessor; import com.redislabs.riot.processor.SpelProcessor; import io.lettuce.core.api.StatefulConnection; -import lombok.Data; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.NoArgsConstructor; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.support.CompositeItemProcessor; import org.springframework.core.convert.converter.Converter; @@ -14,16 +16,24 @@ import java.text.SimpleDateFormat; import java.util.*; +@Builder +@NoArgsConstructor +@AllArgsConstructor public class KeyValueProcessingOptions { + @Builder.Default @Option(arity = "1..*", names = "--spel", description = "SpEL expression to produce a field", paramLabel = "") private Map spelFields = new HashMap<>(); + @Builder.Default @Option(arity = "1..*", names = "--var", description = "Register a variable in the SpEL processor context", paramLabel = "") private Map variables = new HashMap<>(); + @Builder.Default @Option(names = "--date", description = "Processor date format (default: ${DEFAULT-VALUE})", paramLabel = "") private String dateFormat = new SimpleDateFormat().toPattern(); + @Builder.Default @Option(arity = "1..*", names = "--regex", description = "Extract named values from source field using regex", paramLabel = "") private Map regexes = new HashMap<>(); + @Builder.Default @Option(arity = "1..*", names = "--filter", description = "SpEL expression to filter records", paramLabel = "") private List filters = new ArrayList<>(); diff --git a/core/src/main/java/com/redislabs/riot/ManifestVersionProvider.java b/core/src/main/java/com/redislabs/riot/ManifestVersionProvider.java deleted file mode 100644 index e97ec2222..000000000 --- a/core/src/main/java/com/redislabs/riot/ManifestVersionProvider.java +++ /dev/null @@ -1,43 +0,0 @@ -package com.redislabs.riot; - -import java.io.IOException; -import java.net.URL; -import java.util.Enumeration; -import java.util.jar.Attributes; -import java.util.jar.Manifest; - -import picocli.CommandLine.IVersionProvider; - -/** - * {@link IVersionProvider} implementation that returns version information from - * the jar file's {@code /META-INF/MANIFEST.MF} file. - */ -public class ManifestVersionProvider implements IVersionProvider { - - public String[] getVersion() throws Exception { - Enumeration resources = getClass().getClassLoader().getResources("META-INF/MANIFEST.MF"); - while (resources.hasMoreElements()) { - URL url = resources.nextElement(); - try { - Manifest manifest = new Manifest(url.openStream()); - if (isApplicableManifest(manifest)) { - Attributes attr = manifest.getMainAttributes(); - return new String[] { get(attr, "Implementation-Title") + " version \"" - + get(attr, "Implementation-Version") + "\"" }; - } - } catch (IOException ex) { - return new String[] { "Unable to read from " + url + ": " + ex }; - } - } - return new String[0]; - } - - private boolean isApplicableManifest(Manifest manifest) { - Attributes attributes = manifest.getMainAttributes(); - return "RIOT".equals(get(attributes, "Implementation-Title")); - } - - private static Object get(Attributes attributes, String key) { - return attributes.get(new Attributes.Name(key)); - } -} \ No newline at end of file diff --git a/core/src/main/java/com/redislabs/riot/OneLineLogFormat.java b/core/src/main/java/com/redislabs/riot/OneLineLogFormat.java deleted file mode 100644 index 3c9cb30cd..000000000 --- a/core/src/main/java/com/redislabs/riot/OneLineLogFormat.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.redislabs.riot; - -import java.io.PrintWriter; -import java.io.StringWriter; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZoneOffset; -import java.time.ZonedDateTime; -import java.time.format.DateTimeFormatter; -import java.time.format.DateTimeFormatterBuilder; -import java.time.temporal.ChronoField; -import java.util.logging.Formatter; -import java.util.logging.LogRecord; - -public class OneLineLogFormat extends Formatter { - - private final DateTimeFormatter d = new DateTimeFormatterBuilder().appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':').appendValue(ChronoField.MINUTE_OF_HOUR, 2).optionalStart().appendLiteral(':').appendValue(ChronoField.SECOND_OF_MINUTE, 2).optionalStart().appendFraction(ChronoField.NANO_OF_SECOND, 3, 3, true).toFormatter(); - private final ZoneId offset = ZoneOffset.systemDefault(); - - @Override - public String format(LogRecord record) { - String message = formatMessage(record); - ZonedDateTime time = Instant.ofEpochMilli(record.getMillis()).atZone(offset); - if (record.getThrown() == null) { - return String.format("%s %s %s\t: %s%n", time.format(d), record.getLevel().getLocalizedName(), record.getLoggerName(), message); - } - return String.format("%s %s %s\t: %s%n%s%n", time.format(d), record.getLevel().getLocalizedName(), record.getLoggerName(), message, stackTrace(record)); - } - - private String stackTrace(LogRecord record) { - StringWriter sw = new StringWriter(4096); - PrintWriter pw = new PrintWriter(sw); - record.getThrown().printStackTrace(pw); - return sw.toString(); - } -} diff --git a/core/src/main/java/com/redislabs/riot/RedisOptions.java b/core/src/main/java/com/redislabs/riot/RedisOptions.java index 328b93372..b42a8bd22 100644 --- a/core/src/main/java/com/redislabs/riot/RedisOptions.java +++ b/core/src/main/java/com/redislabs/riot/RedisOptions.java @@ -13,7 +13,7 @@ import io.lettuce.core.metrics.DefaultCommandLatencyCollectorOptions; import io.lettuce.core.resource.ClientResources; import io.lettuce.core.resource.DefaultClientResources; -import lombok.Data; +import lombok.*; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import picocli.CommandLine.Option; @@ -22,17 +22,23 @@ import java.util.ArrayList; import java.util.List; -@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor public class RedisOptions { - public static final String DEFAULT_HOST = "127.0.0.1"; + public static final String DEFAULT_HOST = "localhost"; public static final int DEFAULT_PORT = 6379; public static final int DEFAULT_DATABASE = 0; public static final int DEFAULT_TIMEOUT = 60; public static final int DEFAULT_POOL_MAX_TOTAL = 8; + @Setter + @Builder.Default @Option(names = {"-h", "--hostname"}, description = "Server hostname (default: ${DEFAULT-VALUE}).", paramLabel = "") private String host = DEFAULT_HOST; + @Setter + @Builder.Default @Option(names = {"-p", "--port"}, description = "Server port (default: ${DEFAULT-VALUE}).", paramLabel = "") private int port = DEFAULT_PORT; @Option(names = {"-s", "--socket"}, description = "Server socket (overrides hostname and port).", paramLabel = "") @@ -41,16 +47,21 @@ public class RedisOptions { private String username; @Option(names = {"-a", "--pass"}, arity = "0..1", interactive = true, description = "Password to use when connecting to the server.", paramLabel = "") private char[] password; + @Builder.Default @Option(names = {"-u", "--uri"}, arity = "0..*", description = "Server URI.", paramLabel = "") private List uris = new ArrayList<>(); + @Builder.Default @Option(names = "--timeout", description = "Redis command timeout (default: ${DEFAULT-VALUE}).", paramLabel = "") private long timeout = DEFAULT_TIMEOUT; + @Builder.Default @Option(names = {"-n", "--db"}, description = "Database number (default: ${DEFAULT-VALUE}).", paramLabel = "") private int database = DEFAULT_DATABASE; + @Getter @Option(names = {"-c", "--cluster"}, description = "Enable cluster mode.") private boolean cluster; @Option(names = "--tls", description = "Establish a secure TLS connection.") private boolean tls; + @Builder.Default @Option(names = "--no-verify-peer", description = "Verify peers when using TLS. True by default.", negatable = true) private boolean verifyPeer = true; @Option(names = "--ks", description = "Path to keystore.", paramLabel = "", hidden = true) @@ -65,8 +76,10 @@ public class RedisOptions { private File cert; @Option(names = "--latency", description = "Show latency metrics.") private boolean showMetrics; + @Builder.Default @Option(names = "--pool-max", description = "Max pool connections (default: ${DEFAULT-VALUE}).", paramLabel = "") private int poolMaxTotal = DEFAULT_POOL_MAX_TOTAL; + @Builder.Default @Option(names = "--no-auto-reconnect", description = "Auto reconnect on connection loss. True by default.", negatable = true, hidden = true) private boolean autoReconnect = true; @Option(names = "--client", description = "Client name used to connect to Redis.") diff --git a/core/src/main/java/com/redislabs/riot/RedisURIConverter.java b/core/src/main/java/com/redislabs/riot/RedisURIConverter.java deleted file mode 100644 index 16c591d63..000000000 --- a/core/src/main/java/com/redislabs/riot/RedisURIConverter.java +++ /dev/null @@ -1,17 +0,0 @@ -package com.redislabs.riot; - -import io.lettuce.core.RedisURI; -import picocli.CommandLine; - -public class RedisURIConverter implements CommandLine.ITypeConverter { - - @Override - public RedisURI convert(String value) { - try { - return RedisURI.create(value); - } catch (Exception e) { - throw new IllegalArgumentException("Invalid Redis connection string", e); - } - } - -} diff --git a/core/src/main/java/com/redislabs/riot/RiotApp.java b/core/src/main/java/com/redislabs/riot/RiotApp.java index e844c56cd..ad78a6247 100644 --- a/core/src/main/java/com/redislabs/riot/RiotApp.java +++ b/core/src/main/java/com/redislabs/riot/RiotApp.java @@ -1,124 +1,130 @@ package com.redislabs.riot; import com.redislabs.riot.redis.AbstractRedisCommand; +import io.lettuce.core.RedisURI; import io.netty.util.internal.logging.InternalLoggerFactory; import io.netty.util.internal.logging.JdkLoggerFactory; import lombok.Getter; +import picocli.AutoComplete; import picocli.CommandLine; import picocli.CommandLine.*; +import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.net.URL; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.time.ZonedDateTime; +import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeFormatterBuilder; +import java.time.temporal.ChronoField; +import java.util.Enumeration; import java.util.List; -import java.util.logging.ConsoleHandler; -import java.util.logging.Level; -import java.util.logging.LogManager; -import java.util.logging.Logger; +import java.util.jar.Attributes; +import java.util.jar.Manifest; +import java.util.logging.*; -@Command(usageHelpAutoWidth = true, sortOptions = false, versionProvider = ManifestVersionProvider.class, subcommands = HiddenGenerateCompletion.class, abbreviateSynopsis = true) -public class RiotApp implements Runnable { +@Command(sortOptions = false, versionProvider = RiotApp.ManifestVersionProvider.class, subcommands = RiotApp.HiddenGenerateCompletion.class, abbreviateSynopsis = true) +public class RiotApp extends HelpCommand { + + @Command(hidden = true, name = "generate-completion", usageHelpAutoWidth = true) + static class HiddenGenerateCompletion extends AutoComplete.GenerateCompletion { + } private static final String ROOT_LOGGER = ""; - @Option(names = {"-H", "--help"}, usageHelp = true, description = "Show this help message and exit.") - private boolean helpRequested; + @SuppressWarnings("unused") @Option(names = {"-V", "--version"}, versionHelp = true, description = "Print version information and exit.") private boolean versionRequested; + @Getter + @ArgGroup(heading = "Redis connection options%n", exclusive = false) + private RedisOptions redisOptions = RedisOptions.builder().build(); @Option(names = {"-q", "--quiet"}, description = "Log errors only.") private boolean quiet; @Option(names = {"-w", "--warn"}, description = "Set log level to warn.") - private boolean warn; + private boolean warning; @Option(names = {"-i", "--info"}, description = "Set log level to info.") private boolean info; @Option(names = {"-d", "--debug"}, description = "Log in debug mode (includes normal stacktrace).") private boolean debug; - @Getter - @ArgGroup(heading = "Redis connection options%n", exclusive = false) - private RedisOptions redisOptions = new RedisOptions(); - - public int execute(String... args) { - CommandLine commandLine = commandLine(); - ParseResult[] parseResult = new ParseResult[1]; - try { - parseResult[0] = parse(commandLine, args); - initializeLogging(); - return commandLine.getExecutionStrategy().execute(parseResult[0]); - } catch (ParameterException ex) { - try { - return commandLine.getParameterExceptionHandler().handleParseException(ex, args); - } catch (Exception ex2) { - return handleUnhandled(ex2, ex.getCommandLine(), ex.getCommandLine().getCommandSpec().exitCodeOnInvalidInput()); - } - } catch (ExecutionException ex) { - try { - Exception cause = ex.getCause() instanceof Exception ? (Exception) ex.getCause() : ex; - return commandLine.getExecutionExceptionHandler().handleExecutionException(cause, ex.getCommandLine(), parseResult[0]); - } catch (Exception ex2) { - return handleUnhandled(ex2, ex.getCommandLine(), ex.getCommandLine().getCommandSpec().exitCodeOnExecutionException()); - } - } catch (Exception ex) { - return handleUnhandled(ex, commandLine, commandLine.getCommandSpec().exitCodeOnExecutionException()); - } - } - - private static String throwableToColorString(Throwable t, Help.ColorScheme existingColorScheme) { - Help.ColorScheme colorScheme = new Help.ColorScheme.Builder(existingColorScheme).applySystemProperties().build(); - StringWriter stringWriter = new ColoredStackTraceWriter(colorScheme); - t.printStackTrace(new PrintWriter(stringWriter)); - return stringWriter.toString(); - } - - static class ColoredStackTraceWriter extends StringWriter { - Help.ColorScheme colorScheme; - - public ColoredStackTraceWriter(Help.ColorScheme colorScheme) { this.colorScheme = colorScheme; } - - @Override - public void write(String str, int off, int len) { - List styles = str.startsWith("\t") ? colorScheme.stackTraceStyles() : colorScheme.errorStyles(); - super.write(colorScheme.apply(str.substring(off, len), styles).toString()); - } - } - private static int handleUnhandled(Exception ex, CommandLine cmd, int defaultExitCode) { - cmd.getErr().print(throwableToColorString(ex, cmd.getColorScheme())); - cmd.getErr().flush(); - return mappedExitCode(ex, cmd.getExitCodeExceptionMapper(), defaultExitCode); + private int executionStrategy(ParseResult parseResult) { + configureLogging(); + return new CommandLine.RunLast().execute(parseResult); // default execution strategy } - private static int mappedExitCode(Throwable t, IExitCodeExceptionMapper mapper, int defaultExitCode) { - try { - return (mapper != null) ? mapper.getExitCode(t) : defaultExitCode; - } catch (Exception ex) { - ex.printStackTrace(); - return defaultExitCode; - } + private int executionStragegyRunFirst(ParseResult parseResult) { + configureLogging(); + return new CommandLine.RunFirst().execute(parseResult); } - private void initializeLogging() { + private void configureLogging() { + Level level = logLevel(); InternalLoggerFactory.setDefaultFactory(JdkLoggerFactory.INSTANCE); LogManager.getLogManager().reset(); Logger activeLogger = Logger.getLogger(ROOT_LOGGER); ConsoleHandler handler = new ConsoleHandler(); - handler.setLevel(java.util.logging.Level.ALL); + handler.setLevel(Level.ALL); handler.setFormatter(new OneLineLogFormat()); activeLogger.addHandler(handler); - Logger.getLogger(ROOT_LOGGER).setLevel(loggingLevel()); - if (debug) { - Logger.getLogger("io.lettuce").setLevel(java.util.logging.Level.INFO); - Logger.getLogger("io.netty").setLevel(java.util.logging.Level.INFO); - } + Logger.getLogger(ROOT_LOGGER).setLevel(level); } - public CommandLine commandLine() { - CommandLine commandLine = new CommandLine(this); + public int execute(String... args) { + return commandLine().execute(args); + } + + public RiotCommandLine commandLine() { + RiotCommandLine commandLine = new RiotCommandLine(this); + commandLine.setExecutionStrategy(this::executionStrategy); commandLine.setExecutionExceptionHandler(new PrintExceptionMessageHandler()); registerConverters(commandLine); commandLine.setCaseInsensitiveEnumValuesAllowed(true); return commandLine; } - class PrintExceptionMessageHandler implements IExecutionExceptionHandler { + private java.util.logging.Level logLevel() { + if (debug) { + return java.util.logging.Level.FINE; + } + if (info) { + return java.util.logging.Level.INFO; + } + if (warning) { + return java.util.logging.Level.WARNING; + } + if (quiet) { + return java.util.logging.Level.OFF; + } + return Level.SEVERE; + } + + static class OneLineLogFormat extends Formatter { + + private final DateTimeFormatter d = new DateTimeFormatterBuilder().appendValue(ChronoField.HOUR_OF_DAY, 2).appendLiteral(':').appendValue(ChronoField.MINUTE_OF_HOUR, 2).optionalStart().appendLiteral(':').appendValue(ChronoField.SECOND_OF_MINUTE, 2).optionalStart().appendFraction(ChronoField.NANO_OF_SECOND, 3, 3, true).toFormatter(); + private final ZoneId offset = ZoneOffset.systemDefault(); + + @Override + public String format(LogRecord record) { + String message = formatMessage(record); + ZonedDateTime time = Instant.ofEpochMilli(record.getMillis()).atZone(offset); + if (record.getThrown() == null) { + return String.format("%s %s %s\t: %s%n", time.format(d), record.getLevel().getLocalizedName(), record.getLoggerName(), message); + } + return String.format("%s %s %s\t: %s%n%s%n", time.format(d), record.getLevel().getLocalizedName(), record.getLoggerName(), message, stackTrace(record)); + } + + private String stackTrace(LogRecord record) { + StringWriter sw = new StringWriter(4096); + PrintWriter pw = new PrintWriter(sw); + record.getThrown().printStackTrace(pw); + return sw.toString(); + } + } + + private static class PrintExceptionMessageHandler implements IExecutionExceptionHandler { @Override public int handleExecutionException(Exception ex, CommandLine cmd, ParseResult parseResult) { @@ -129,51 +135,84 @@ public int handleExecutionException(Exception ex, CommandLine cmd, ParseResult p } - @SuppressWarnings({"rawtypes", "unchecked"}) - public ParseResult parse(CommandLine commandLine, String[] args) { - ParseResult parseResult = commandLine.parseArgs(args); - ParseResult subcommand = parseResult.subcommand(); - if (subcommand != null) { - Object command = subcommand.commandSpec().userObject(); - if (AbstractImportCommand.class.isAssignableFrom(command.getClass())) { - AbstractImportCommand importCommand = (AbstractImportCommand) command; - List parsedRedisCommands = subcommand.subcommands(); - for (ParseResult parsedRedisCommand : parsedRedisCommands) { - if (parsedRedisCommand.isUsageHelpRequested()) { - return parsedRedisCommand; + private class RiotCommandLine extends CommandLine { + + public RiotCommandLine(Object command) { + super(command); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Override + public ParseResult parseArgs(String... args) { + ParseResult parseResult = super.parseArgs(args); + ParseResult subcommand = parseResult.subcommand(); + if (subcommand != null) { + Object command = subcommand.commandSpec().userObject(); + if (AbstractImportCommand.class.isAssignableFrom(command.getClass())) { + AbstractImportCommand importCommand = (AbstractImportCommand) command; + List parsedRedisCommands = subcommand.subcommands(); + for (ParseResult parsedRedisCommand : parsedRedisCommands) { + if (parsedRedisCommand.isUsageHelpRequested()) { + return parsedRedisCommand; + } + importCommand.getRedisCommands().add((AbstractRedisCommand) parsedRedisCommand.commandSpec().userObject()); } - importCommand.getRedisCommands().add((AbstractRedisCommand) parsedRedisCommand.commandSpec().userObject()); + setExecutionStrategy(RiotApp.this::executionStragegyRunFirst); + return subcommand; } - commandLine.setExecutionStrategy(new RunFirst()); - return subcommand; } + return parseResult; } - return parseResult; } - protected void registerConverters(CommandLine commandLine) { - commandLine.registerConverter(io.lettuce.core.RedisURI.class, new RedisURIConverter()); + static class RedisURIConverter implements CommandLine.ITypeConverter { + + @Override + public RedisURI convert(String value) { + try { + return RedisURI.create(value); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid Redis connection string", e); + } + } + } - @Override - public void run() { - CommandLine.usage(this, System.out); + protected void registerConverters(CommandLine commandLine) { + commandLine.registerConverter(io.lettuce.core.RedisURI.class, new RedisURIConverter()); } - private java.util.logging.Level loggingLevel() { - if (debug) { - return java.util.logging.Level.FINE; - } - if (info) { - return java.util.logging.Level.INFO; + /** + * {@link IVersionProvider} implementation that returns version information from + * the jar file's {@code /META-INF/MANIFEST.MF} file. + */ + static class ManifestVersionProvider implements IVersionProvider { + + public String[] getVersion() throws Exception { + Enumeration resources = getClass().getClassLoader().getResources("META-INF/MANIFEST.MF"); + while (resources.hasMoreElements()) { + URL url = resources.nextElement(); + try { + Manifest manifest = new Manifest(url.openStream()); + if (isApplicableManifest(manifest)) { + Attributes attr = manifest.getMainAttributes(); + return new String[]{get(attr, "Implementation-Title") + " version \"" + get(attr, "Implementation-Version") + "\""}; + } + } catch (IOException ex) { + return new String[]{"Unable to read from " + url + ": " + ex}; + } + } + return new String[0]; } - if (warn) { - return java.util.logging.Level.WARNING; + + private boolean isApplicableManifest(Manifest manifest) { + Attributes attributes = manifest.getMainAttributes(); + return "RIOT".equals(get(attributes, "Implementation-Title")); } - if (quiet) { - return java.util.logging.Level.OFF; + + private static Object get(Attributes attributes, String key) { + return attributes.get(new Attributes.Name(key)); } - return Level.SEVERE; } } diff --git a/core/src/main/java/com/redislabs/riot/RiotCommand.java b/core/src/main/java/com/redislabs/riot/RiotCommand.java index f3ea9500f..acabeb634 100644 --- a/core/src/main/java/com/redislabs/riot/RiotCommand.java +++ b/core/src/main/java/com/redislabs/riot/RiotCommand.java @@ -9,16 +9,22 @@ import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import io.lettuce.core.support.ConnectionPoolSupport; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.pool2.impl.GenericObjectPool; +import org.springframework.batch.item.redis.support.CommandTimeoutBuilder; import org.springframework.beans.factory.InitializingBean; +import org.springframework.util.ClassUtils; import picocli.CommandLine.Command; import picocli.CommandLine.ParentCommand; import java.time.Duration; +import java.util.concurrent.Callable; +@Slf4j @Command(abbreviateSynopsis = true, sortOptions = false) -public abstract class RiotCommand extends HelpCommand implements InitializingBean { +public abstract class RiotCommand extends HelpCommand implements InitializingBean, Callable { + @SuppressWarnings("unused") @ParentCommand private RiotApp app; protected AbstractRedisClient client; @@ -33,8 +39,10 @@ protected RedisURI getRedisURI() { return app.getRedisOptions().uris().get(0); } - protected Duration getCommandTimeout() { - return getRedisURI().getTimeout(); + protected > B configureCommandTimeoutBuilder(B builder) { + Duration commandTimeout = getRedisURI().getTimeout(); + log.info("Configuring {} with command timeout {}", ClassUtils.getShortName(builder.getClass()), commandTimeout); + return builder.commandTimeout(commandTimeout); } @Override @@ -52,6 +60,7 @@ public Integer call() throws Exception { public void afterPropertiesSet() throws Exception { this.client = client(app.getRedisOptions()); this.pool = pool(app.getRedisOptions(), client); + log.info("Connecting to {}", app.getRedisOptions().uris()); this.connection = connection(client); } diff --git a/core/src/main/java/com/redislabs/riot/StepBuilder.java b/core/src/main/java/com/redislabs/riot/StepBuilder.java index 6ee010c36..6415bac47 100644 --- a/core/src/main/java/com/redislabs/riot/StepBuilder.java +++ b/core/src/main/java/com/redislabs/riot/StepBuilder.java @@ -28,6 +28,7 @@ public class StepBuilder { private final TransferOptions options; private String name; + private String taskName; private ItemReader reader; private ItemProcessor processor; private ItemWriter writer; @@ -41,13 +42,13 @@ public StepBuilder(JobFactory jobFactory, TransferOptions options) { public SimpleStepBuilder build() { if (options.getMaxItemCount() != null) { if (reader instanceof AbstractItemCountingItemStreamItemReader) { - log.debug("Configuring reader with maxItemCount={}", options.getMaxItemCount()); + log.info("Configuring reader with maxItemCount={}", options.getMaxItemCount()); ((AbstractItemCountingItemStreamItemReader) reader).setMaxItemCount(Math.toIntExact(options.getMaxItemCount())); } } - SimpleStepBuilder step = jobFactory.step(name + "-step").chunk(options.getChunkSize()).reader(reader).processor(processor).writer(writer); + SimpleStepBuilder step = jobFactory.step(name).chunk(options.getChunkSize()).reader(reader).processor(processor).writer(writer); if (options.isShowProgress()) { - ProgressMonitor.ProgressMonitorBuilder monitorBuilder = ProgressMonitor.builder().taskName(name).max(max(reader)); + ProgressMonitor.ProgressMonitorBuilder monitorBuilder = ProgressMonitor.builder().taskName(taskName).max(max(reader)); if (extraMessage != null) { monitorBuilder.extraMessageSupplier(extraMessage); } diff --git a/core/src/main/java/com/redislabs/riot/convert/ObjectToNumberConverter.java b/core/src/main/java/com/redislabs/riot/convert/ObjectToNumberConverter.java index 077e2ef6d..2de9794d5 100644 --- a/core/src/main/java/com/redislabs/riot/convert/ObjectToNumberConverter.java +++ b/core/src/main/java/com/redislabs/riot/convert/ObjectToNumberConverter.java @@ -5,25 +5,28 @@ public class ObjectToNumberConverter implements Converter { - private final Class targetType; + private final Class targetType; - public ObjectToNumberConverter(Class targetType) { - this.targetType = targetType; - } + public ObjectToNumberConverter(Class targetType) { + this.targetType = targetType; + } - @Override - public T convert(Object source) { - if (source instanceof Number) { - return NumberUtils.convertNumberToTargetClass((Number) source, targetType); - } - if (source instanceof String) { - String string = (String) source; - if (string.isEmpty()) { - return null; - } - return NumberUtils.parseNumber(string, targetType); - } - return null; - } + @Override + public T convert(Object source) { + if (source == null) { + return null; + } + if (source instanceof Number) { + return NumberUtils.convertNumberToTargetClass((Number) source, targetType); + } + if (source instanceof String) { + String string = (String) source; + if (string.isEmpty()) { + return null; + } + return NumberUtils.parseNumber(string, targetType); + } + return null; + } } diff --git a/core/src/main/java/com/redislabs/riot/convert/ObjectToStringConverter.java b/core/src/main/java/com/redislabs/riot/convert/ObjectToStringConverter.java index 3c3550de5..a79f9c5e2 100644 --- a/core/src/main/java/com/redislabs/riot/convert/ObjectToStringConverter.java +++ b/core/src/main/java/com/redislabs/riot/convert/ObjectToStringConverter.java @@ -6,7 +6,7 @@ public class ObjectToStringConverter implements Converter { @Override public String convert(Object source) { - return source.toString(); + return String.valueOf(source); } } diff --git a/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java b/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java index df41e5cd8..b569293d6 100644 --- a/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java +++ b/core/src/main/java/com/redislabs/riot/processor/SpelProcessor.java @@ -23,63 +23,59 @@ @Slf4j public class SpelProcessor implements ItemProcessor, Map> { - private final StandardEvaluationContext context; - private final Map expressions = new LinkedHashMap<>(); - private final AtomicLong index = new AtomicLong(); + private final StandardEvaluationContext context; + private final Map expressions = new LinkedHashMap<>(); + private final AtomicLong index = new AtomicLong(); - public SpelProcessor(StatefulConnection connection, DateFormat dateFormat, - Map variables, Map fields) { - Assert.notNull(connection, "A Redis connection is required."); - Assert.notNull(dateFormat, "A DateFormat instance is required."); - Assert.isTrue(fields != null && !fields.isEmpty(), "At least one field is required."); - // keep a reference to the connection object to avoid GC reclaiming it - this.context = new StandardEvaluationContext(); - context.setVariable("date", dateFormat); - context.setVariable("index", index); - context.setVariable("redis", - connection instanceof StatefulRedisClusterConnection - ? ((StatefulRedisClusterConnection) connection).sync() - : ((StatefulRedisConnection) connection).sync()); - SpelExpressionParser parser = new SpelExpressionParser(); - if (variables != null) { - variables.forEach((k, v) -> context.setVariable(k, parser.parseExpression(v).getValue(context))); - } - Method geoMethod; - try { - geoMethod = getClass().getDeclaredMethod("geo", new Class[] { String.class, String.class }); - context.registerFunction("geo", geoMethod); - } catch (NoSuchMethodException | SecurityException e) { - log.error("Could not register geo function", e); - } - context.setPropertyAccessors(Collections.singletonList(new MapAccessor())); - fields.forEach((k, v) -> expressions.put(k, parser.parseExpression(v))); - } + public SpelProcessor(StatefulConnection connection, DateFormat dateFormat, Map variables, Map fields) { + Assert.notNull(connection, "A Redis connection is required."); + Assert.notNull(dateFormat, "A DateFormat instance is required."); + Assert.isTrue(fields != null && !fields.isEmpty(), "At least one field is required."); + // keep a reference to the connection object to avoid GC reclaiming it + this.context = new StandardEvaluationContext(); + context.setVariable("date", dateFormat); + context.setVariable("index", index); + context.setVariable("redis", connection instanceof StatefulRedisClusterConnection ? ((StatefulRedisClusterConnection) connection).sync() : ((StatefulRedisConnection) connection).sync()); + SpelExpressionParser parser = new SpelExpressionParser(); + if (variables != null) { + variables.forEach((k, v) -> context.setVariable(k, parser.parseExpression(v).getValue(context))); + } + Method geoMethod; + try { + geoMethod = getClass().getDeclaredMethod("geo", String.class, String.class); + context.registerFunction("geo", geoMethod); + } catch (NoSuchMethodException | SecurityException e) { + log.error("Could not register geo function", e); + } + context.setPropertyAccessors(Collections.singletonList(new MapAccessor())); + fields.forEach((k, v) -> expressions.put(k, parser.parseExpression(v))); + } - @Override - public Map process(Map item) { - Map map = new HashMap<>(item); - synchronized (context) { - for (String field : expressions.keySet()) { - try { - Object value = expressions.get(field).getValue(context, map); - if (value != null) { - map.put(field, value); - } - } catch (ExpressionInvocationTargetException e) { - log.error("Error while evaluating field {}", field, e); - throw e; - } - } - index.incrementAndGet(); - } - return map; - } + @Override + public Map process(Map item) { + Map map = new HashMap<>(item); + synchronized (context) { + for (String field : expressions.keySet()) { + try { + Object value = expressions.get(field).getValue(context, map); + if (value != null) { + map.put(field, value); + } + } catch (ExpressionInvocationTargetException e) { + log.error("Error while evaluating field {}", field, e); + throw e; + } + } + index.incrementAndGet(); + } + return map; + } - public static String geo(String longitude, String latitude) { - if (longitude == null || latitude == null) { - return null; - } - return longitude + "," + latitude; - } + public static String geo(String longitude, String latitude) { + if (longitude == null || latitude == null) { + return null; + } + return longitude + "," + latitude; + } } diff --git a/core/src/main/java/com/redislabs/riot/redis/AbstractCollectionCommand.java b/core/src/main/java/com/redislabs/riot/redis/AbstractCollectionCommand.java index a5334c925..1764e955b 100644 --- a/core/src/main/java/com/redislabs/riot/redis/AbstractCollectionCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/AbstractCollectionCommand.java @@ -14,10 +14,8 @@ public abstract class AbstractCollectionCommand extends AbstractKeyCommand { @Option(names = {"-m", "--members"}, arity = "1..*", description = "Member field names for collections", paramLabel = "") private String[] memberFields = new String[0]; - protected , B>> B configure(B builder) { - super.configure(builder); - builder.memberIdConverter(idMaker(memberSpace, memberFields)); - return builder; + protected , B>> B configureCollectionCommandBuilder(B builder) { + return configureKeyCommandBuilder(builder).memberIdConverter(idMaker(memberSpace, memberFields)); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/AbstractKeyCommand.java b/core/src/main/java/com/redislabs/riot/redis/AbstractKeyCommand.java index 71eb0cc8b..88e3aec0e 100644 --- a/core/src/main/java/com/redislabs/riot/redis/AbstractKeyCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/AbstractKeyCommand.java @@ -14,9 +14,8 @@ public abstract class AbstractKeyCommand extends AbstractRedisCommand, B>> B configure(B builder) { - builder.keyConverter(idMaker(keyspace, keys)); - return builder; + protected , B>> B configureKeyCommandBuilder(B builder) { + return builder.keyConverter(idMaker(keyspace, keys)); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/AbstractRedisCommand.java b/core/src/main/java/com/redislabs/riot/redis/AbstractRedisCommand.java index 5177f188e..59a036b34 100644 --- a/core/src/main/java/com/redislabs/riot/redis/AbstractRedisCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/AbstractRedisCommand.java @@ -17,6 +17,7 @@ public abstract class AbstractRedisCommand extends HelpCommand implements Red @CommandLine.Option(names = {"-s", "--separator"}, description = "Key separator (default: ${DEFAULT-VALUE})", paramLabel = "") private String keySeparator = ":"; + @SuppressWarnings("unused") @CommandLine.Option(names = {"-r", "--remove"}, description = "Remove key or member fields the first time they are used") private boolean removeFields; diff --git a/core/src/main/java/com/redislabs/riot/redis/EvalCommand.java b/core/src/main/java/com/redislabs/riot/redis/EvalCommand.java index 17e76ed96..fe87fb053 100644 --- a/core/src/main/java/com/redislabs/riot/redis/EvalCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/EvalCommand.java @@ -14,6 +14,7 @@ @Command(name = "eval", description = "Evaluate a Lua script with keys and arguments from input") public class EvalCommand extends AbstractRedisCommand> { + @SuppressWarnings("unused") @Option(names = "--sha", description = "Digest", paramLabel = "") private String sha; @Option(names = "--keys", arity = "1..*", description = "Key fields", paramLabel = "") diff --git a/core/src/main/java/com/redislabs/riot/redis/ExpireCommand.java b/core/src/main/java/com/redislabs/riot/redis/ExpireCommand.java index 79173c68e..ebb08c2b7 100644 --- a/core/src/main/java/com/redislabs/riot/redis/ExpireCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/ExpireCommand.java @@ -11,6 +11,7 @@ @Command(name = "expire", description = "Set timeouts on keys") public class ExpireCommand extends AbstractKeyCommand { + @SuppressWarnings("unused") @Option(names = "--ttl", description = "EXPIRE timeout field", paramLabel = "") private String timeoutField; @Option(names = "--ttl-default", description = "EXPIRE default timeout (default: ${DEFAULT-VALUE})", paramLabel = "") @@ -18,7 +19,7 @@ public class ExpireCommand extends AbstractKeyCommand { @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.expire()).timeoutConverter(numberFieldExtractor(Long.class, timeoutField, timeoutDefault)).build(); + return configureKeyCommandBuilder(CommandBuilder.expire()).timeoutConverter(numberFieldExtractor(Long.class, timeoutField, timeoutDefault)).build(); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/GeoaddCommand.java b/core/src/main/java/com/redislabs/riot/redis/GeoaddCommand.java index 9136c6c18..f551c361e 100644 --- a/core/src/main/java/com/redislabs/riot/redis/GeoaddCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/GeoaddCommand.java @@ -8,17 +8,19 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "geoadd", description = "Add geospatial items") +@Command(name = "geoadd", description = "Add members to a geo set") public class GeoaddCommand extends AbstractCollectionCommand { + @SuppressWarnings("unused") @Option(names = "--lon", description = "Longitude field", paramLabel = "") private String longitudeField; + @SuppressWarnings("unused") @Option(names = "--lat", description = "Latitude field", paramLabel = "") private String latitudeField; @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.geoadd()).longitudeConverter(doubleFieldExtractor(longitudeField)).latitudeConverter(doubleFieldExtractor(latitudeField)).build(); + return configureCollectionCommandBuilder(CommandBuilder.geoadd()).longitudeConverter(doubleFieldExtractor(longitudeField)).latitudeConverter(doubleFieldExtractor(latitudeField)).build(); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/HsetCommand.java b/core/src/main/java/com/redislabs/riot/redis/HsetCommand.java index 9841ad1af..3f21f2b84 100644 --- a/core/src/main/java/com/redislabs/riot/redis/HsetCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/HsetCommand.java @@ -16,7 +16,7 @@ public class HsetCommand extends AbstractKeyCommand { @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.hset()).mapConverter(filtering.converter()).build(); + return configureKeyCommandBuilder(CommandBuilder.hset()).mapConverter(filtering.converter()).build(); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/LpushCommand.java b/core/src/main/java/com/redislabs/riot/redis/LpushCommand.java index bf04c30c5..55db0400a 100644 --- a/core/src/main/java/com/redislabs/riot/redis/LpushCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/LpushCommand.java @@ -7,12 +7,12 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "lpush", description = "Insert values at the head of lists") +@Command(name = "lpush", description = "Insert values at the head of a list") public class LpushCommand extends AbstractCollectionCommand { @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.lpush()).build(); + return configureCollectionCommandBuilder(CommandBuilder.lpush()).build(); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/RpushCommand.java b/core/src/main/java/com/redislabs/riot/redis/RpushCommand.java index a9d0c3684..c177d6363 100644 --- a/core/src/main/java/com/redislabs/riot/redis/RpushCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/RpushCommand.java @@ -7,12 +7,12 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "rpush", description = "Insert values at the tail of lists") +@Command(name = "rpush", description = "Insert values at the tail of a list") public class RpushCommand extends AbstractCollectionCommand { @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.rpush()).build(); + return configureCollectionCommandBuilder(CommandBuilder.rpush()).build(); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/SaddCommand.java b/core/src/main/java/com/redislabs/riot/redis/SaddCommand.java index 5839b34de..8100530ed 100644 --- a/core/src/main/java/com/redislabs/riot/redis/SaddCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/SaddCommand.java @@ -7,12 +7,12 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "sadd", description = "Add members to sets") +@Command(name = "sadd", description = "Add members to a set") public class SaddCommand extends AbstractCollectionCommand { @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.sadd()).build(); + return configureCollectionCommandBuilder(CommandBuilder.sadd()).build(); } } diff --git a/core/src/main/java/com/redislabs/riot/redis/SetCommand.java b/core/src/main/java/com/redislabs/riot/redis/SetCommand.java index 9bfa47c19..b3e7974b6 100644 --- a/core/src/main/java/com/redislabs/riot/redis/SetCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/SetCommand.java @@ -14,7 +14,7 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "set", description = "Set keys to hold values from input") +@Command(name = "set", description = "Set strings from input") public class SetCommand extends AbstractKeyCommand { private enum StringFormat { @@ -23,14 +23,16 @@ private enum StringFormat { @Option(names = "--format", description = "Serialization: ${COMPLETION-CANDIDATES} (default: ${DEFAULT-VALUE})", paramLabel = "") private StringFormat format = StringFormat.JSON; + @SuppressWarnings("unused") @Option(names = "--field", description = "Raw value field", paramLabel = "") private String field; + @SuppressWarnings("unused") @Option(names = "--root", description = "XML root element name", paramLabel = "") private String root; @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.set()).valueConverter(stringValueConverter()).build(); + return configureKeyCommandBuilder(CommandBuilder.set()).valueConverter(stringValueConverter()).build(); } private Converter, String> stringValueConverter() { diff --git a/core/src/main/java/com/redislabs/riot/redis/XaddCommand.java b/core/src/main/java/com/redislabs/riot/redis/XaddCommand.java index ccb8a913c..326c93669 100644 --- a/core/src/main/java/com/redislabs/riot/redis/XaddCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/XaddCommand.java @@ -11,21 +11,24 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "xadd", description = "Append entries to streams") +@Command(name = "xadd", description = "Append entries to a stream") public class XaddCommand extends AbstractKeyCommand { @CommandLine.Mixin private FilteringOptions filteringOptions = FilteringOptions.builder().build(); + @SuppressWarnings("unused") @Option(names = "--id", description = "Stream entry ID field", paramLabel = "") private String idField; + @SuppressWarnings("unused") @Option(names = "--maxlen", description = "Stream maxlen", paramLabel = "") private Long maxlen; + @SuppressWarnings("unused") @Option(names = "--trim", description = "Stream efficient trimming ('~' flag)") private boolean approximateTrimming; @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.xadd()).argsConverter(argsConverter()).bodyConverter(filteringOptions.converter()).build(); + return configureKeyCommandBuilder(CommandBuilder.xadd()).argsConverter(argsConverter()).bodyConverter(filteringOptions.converter()).build(); } private Converter, XAddArgs> argsConverter() { diff --git a/core/src/main/java/com/redislabs/riot/redis/ZaddCommand.java b/core/src/main/java/com/redislabs/riot/redis/ZaddCommand.java index c75bdeb88..05c5675d2 100644 --- a/core/src/main/java/com/redislabs/riot/redis/ZaddCommand.java +++ b/core/src/main/java/com/redislabs/riot/redis/ZaddCommand.java @@ -8,9 +8,10 @@ import java.util.Map; import java.util.function.BiFunction; -@Command(name = "zadd", description = "Add members with scores sorted sets") +@Command(name = "zadd", description = "Add members with scores to a sorted set") public class ZaddCommand extends AbstractCollectionCommand { + @SuppressWarnings("unused") @Option(names = "--score", description = "Name of the field to use for scores", paramLabel = "") private String scoreField; @Option(names = "--score-default", description = "Score when field not present (default: ${DEFAULT-VALUE})", paramLabel = "") @@ -18,7 +19,7 @@ public class ZaddCommand extends AbstractCollectionCommand { @Override public BiFunction, RedisFuture> command() { - return configure(CommandBuilder.zadd()).scoreConverter(numberFieldExtractor(Double.class, scoreField, scoreDefault)).build(); + return configureCollectionCommandBuilder(CommandBuilder.zadd()).scoreConverter(numberFieldExtractor(Double.class, scoreField, scoreDefault)).build(); } } diff --git a/test/src/main/java/com/redislabs/riot/test/AbstractRiotTest.java b/test/src/main/java/com/redislabs/riot/test/AbstractRiotTest.java index 35a50f407..388ce86a7 100644 --- a/test/src/main/java/com/redislabs/riot/test/AbstractRiotTest.java +++ b/test/src/main/java/com/redislabs/riot/test/AbstractRiotTest.java @@ -37,7 +37,7 @@ protected Object command(String file) throws Exception { protected Object command(RiotApp app, String file) throws Exception { CommandLine commandLine = app.commandLine(); - CommandLine.ParseResult parseResult = app.parse(commandLine, args(file)); + CommandLine.ParseResult parseResult = commandLine.parseArgs(args(file)); return parseResult.subcommand().commandSpec().commandLine().getCommand(); } diff --git a/test/src/main/java/com/redislabs/riot/test/AbstractStandaloneRedisTest.java b/test/src/main/java/com/redislabs/riot/test/AbstractStandaloneRedisTest.java index b187fe29b..43b5a7270 100644 --- a/test/src/main/java/com/redislabs/riot/test/AbstractStandaloneRedisTest.java +++ b/test/src/main/java/com/redislabs/riot/test/AbstractStandaloneRedisTest.java @@ -64,7 +64,7 @@ public void teardown() { } protected RedisOptions redisOptions() { - RedisOptions redisOptions = new RedisOptions(); + RedisOptions redisOptions = RedisOptions.builder().build(); redisOptions.setHost(redisURI.getHost()); redisOptions.setPort(redisURI.getPort()); return redisOptions;