Skip to content

Commit

Permalink
improved logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Mar 5, 2021
1 parent 6ca98aa commit abc3040
Show file tree
Hide file tree
Showing 19 changed files with 194 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.boot.autoconfigure.jdbc.DataSourceProperties;
import picocli.CommandLine.Option;

import javax.sql.DataSource;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import picocli.CommandLine.Command;
import picocli.CommandLine.Mixin;

import javax.sql.DataSource;
import java.util.Map;

@Slf4j
Expand All @@ -23,10 +24,13 @@ public class DatabaseExportCommand extends AbstractExportCommand<Map<String, Obj

@Override
protected Flow flow() throws Exception {
log.info("Creating data source: {}", dataSourceOptions);
DataSource dataSource = dataSourceOptions.dataSource();
String name = dataSource.getConnection().getMetaData().getDatabaseProductName();
log.info("Creating {} database writer: {}", name, exportOptions);
JdbcBatchItemWriterBuilder<Map<String, Object>> builder = new JdbcBatchItemWriterBuilder<>();
builder.itemSqlParameterSourceProvider(MapSqlParameterSource::new);
log.info("Creating data source {}", dataSourceOptions);
builder.dataSource(dataSourceOptions.dataSource());
builder.dataSource(dataSource);
builder.sql(exportOptions.getSql());
builder.assertUpdates(exportOptions.isAssertUpdates());
JdbcBatchItemWriter<Map<String, Object>> writer = builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.redislabs.riot.AbstractImportCommand;
import com.redislabs.riot.KeyValueProcessingOptions;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcCursorItemReader;
Expand All @@ -13,19 +14,23 @@
import javax.sql.DataSource;
import java.util.Map;

@Slf4j
@Command(name = "import", description = "Import from a database")
public class DatabaseImportCommand extends AbstractImportCommand<Map<String, Object>, Map<String, Object>> {

@Mixin
private DataSourceOptions options = DataSourceOptions.builder().build();
private DataSourceOptions dataSourceOptions = DataSourceOptions.builder().build();
@Mixin
private DatabaseImportOptions importOptions = DatabaseImportOptions.builder().build();
@Mixin
private KeyValueProcessingOptions processingOptions = KeyValueProcessingOptions.builder().build();

@Override
protected Flow flow() throws Exception {
DataSource dataSource = options.dataSource();
log.info("Creating data source: {}", dataSourceOptions);
DataSource dataSource = dataSourceOptions.dataSource();
String name = dataSource.getConnection().getMetaData().getDatabaseProductName();
log.info("Creating {} database reader: {}", name, importOptions);
JdbcCursorItemReaderBuilder<Map<String, Object>> builder = new JdbcCursorItemReaderBuilder<>();
builder.saveState(false);
builder.dataSource(dataSource);
Expand All @@ -35,7 +40,7 @@ protected Flow flow() throws Exception {
if (importOptions.getMaxRows() != null) {
builder.maxRows(importOptions.getMaxRows());
}
builder.name("database-reader");
builder.name(name + "-database-reader");
if (importOptions.getQueryTimeout() != null) {
builder.queryTimeout(importOptions.getQueryTimeout());
}
Expand All @@ -45,7 +50,6 @@ protected Flow flow() throws Exception {
builder.verifyCursorPosition(importOptions.isVerifyCursorPosition());
JdbcCursorItemReader<Map<String, Object>> reader = builder.build();
reader.afterPropertiesSet();
String name = dataSource.getConnection().getMetaData().getDatabaseProductName();
return flow(step(name + "-db-import-step", "Importing from " + name, reader).build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,33 @@
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.core.io.Resource;
import org.springframework.util.ObjectUtils;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

@Command
public abstract class AbstractFileImportCommand<T> extends AbstractImportCommand<T, T> {

@SuppressWarnings("unused")
@CommandLine.Parameters(arity = "1..*", description = "One ore more files or URLs", paramLabel = "FILE")
private String[] files = new String[0];
private String[] files;
@Getter
@CommandLine.Mixin
private FileOptions fileOptions = FileOptions.builder().build();

@Override
protected Flow flow() throws Exception {
String[] expandedFiles = FileUtils.expand(files);
if (expandedFiles.length == 0) {
List<String> expandedFiles = FileUtils.expand(files);
if (ObjectUtils.isEmpty(expandedFiles)) {
throw new FileNotFoundException("File not found: " + String.join(", ", files));
}
List<Step> steps = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.xml.XmlItemReader;
import org.springframework.core.io.Resource;
import org.springframework.util.ObjectUtils;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.io.FileNotFoundException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -38,7 +40,10 @@ public class DataStructureFileImportCommand extends AbstractTransferCommand<Data

@Override
protected Flow flow() throws Exception {
String[] expandedFiles = FileUtils.expand(files);
List<String> expandedFiles = FileUtils.expand(files);
if (ObjectUtils.isEmpty(expandedFiles)) {
throw new FileNotFoundException("File not found: " + String.join(", ", files));
}
List<Step> steps = new ArrayList<>();
DataStructureItemProcessor processor = new DataStructureItemProcessor();
for (String file : expandedFiles) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,42 +1,34 @@
package com.redislabs.riot.file;

import lombok.*;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.Range;
import picocli.CommandLine.Option;

import java.util.ArrayList;
import java.util.List;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class FileImportOptions {

@Getter
@Builder.Default
@Option(names = "--fields", arity = "1..*", description = "Delimited/FW field names", paramLabel = "<names>")
private List<String> names = new ArrayList<>();
private String[] names;
@Option(names = {"-h", "--header"}, description = "Delimited/FW first line contains field names")
private boolean header;
@Option(names = "--delimiter", description = "Delimiter character", paramLabel = "<string>")
private String delimiter;
@Option(names = "--skip", description = "Delimited/FW lines to skip at start", paramLabel = "<count>")
private Integer linesToSkip;
@Getter
@Builder.Default
@Option(names = "--include", arity = "1..*", description = "Delimited/FW field indices to include (0-based)", paramLabel = "<index>")
private List<Integer> includedFields = new ArrayList<>();
@Getter
@Builder.Default
private int[] includedFields;
@Option(names = "--ranges", arity = "1..*", description = "Fixed-width column ranges", paramLabel = "<int>")
private List<Range> columnRanges = new ArrayList<>();
@Getter
private Range[] columnRanges;
@Builder.Default
@Option(names = "--quote", description = "Escape character for delimited files (default: ${DEFAULT-VALUE})", paramLabel = "<char>")
private Character quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER;
@Getter
@Builder.Default
@Option(names = "--continuation", description = "Line continuation string (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
private String continuationString = "\\";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,10 @@ public static <T> XmlItemReader<T> xmlReader(Resource resource, Class<T> clazz)
return xmlReaderBuilder.build();
}


public static String[] expand(String... files) throws IOException {
public static List<String> expand(String... files) throws IOException {
if (files == null) {
return null;
}
List<String> expandedFiles = new ArrayList<>();
for (String file : files) {
if (isFile(file)) {
Expand All @@ -254,7 +256,7 @@ public static String[] expand(String... files) throws IOException {
expandedFiles.add(file);
}
}
return expandedFiles.toArray(new String[0]);
return expandedFiles;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
import org.springframework.batch.item.file.transform.AbstractLineTokenizer;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FixedLengthTokenizer;
import org.springframework.batch.item.file.transform.Range;
import org.springframework.batch.item.json.JsonItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.xml.XmlItemReader;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import picocli.CommandLine;
import picocli.CommandLine.Command;

Expand All @@ -40,15 +40,15 @@ protected AbstractItemStreamItemReader<Map<String, Object>> reader(String file,
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter(options.delimiter(file));
tokenizer.setQuoteCharacter(options.getQuoteCharacter());
if (!options.getIncludedFields().isEmpty()) {
tokenizer.setIncludedFields(options.getIncludedFields().stream().mapToInt(i -> i).toArray());
if (!ObjectUtils.isEmpty(options.getIncludedFields())) {
tokenizer.setIncludedFields(options.getIncludedFields());
}
log.info("Creating delimited reader with {} for file {}", options, file);
return flatFileReader(resource, tokenizer);
case FIXED:
FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
Assert.notEmpty(options.getColumnRanges(), "Column ranges are required");
fixedLengthTokenizer.setColumns(options.getColumnRanges().toArray(new Range[0]));
fixedLengthTokenizer.setColumns(options.getColumnRanges());
log.info("Creating fixed-width reader with {} for file {}", options, file);
return flatFileReader(resource, fixedLengthTokenizer);
case JSON:
Expand All @@ -67,7 +67,9 @@ protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
}

private FlatFileItemReader<Map<String, Object>> flatFileReader(Resource resource, AbstractLineTokenizer tokenizer) {
tokenizer.setNames(options.getNames().toArray(new String[0]));
if (!ObjectUtils.isEmpty(options.getNames())) {
tokenizer.setNames(options.getNames());
}
FlatFileItemReaderBuilder<Map<String, Object>> builder = new FlatFileItemReaderBuilder<>();
builder.resource(resource);
builder.encoding(getFileOptions().getEncoding());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,28 @@
import com.redislabs.riot.AbstractImportCommand;
import com.redislabs.riot.KeyValueProcessingOptions;
import io.lettuce.core.RedisURI;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.item.ItemProcessor;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import picocli.CommandLine.Parameters;

import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;

@Slf4j
@Command(name = "import", description = "Import generated data")
public class GenerateCommand extends AbstractImportCommand<Map<String, Object>, Map<String, Object>> {

@Parameters(description = "SpEL expressions", paramLabel = "SPEL")
private Map<String, String> fakerFields = new LinkedHashMap<>();
@SuppressWarnings("unused")
@Option(names = "--introspect", description = "Use given search index to introspect Faker fields", paramLabel = "<index>")
private String fakerIndex;
@Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "<tag>")
private Locale locale = Locale.ENGLISH;
@SuppressWarnings("unused")
@Option(names = "--metadata", description = "Include metadata (index, partition)")
private boolean includeMetadata;
@Option(names = "--start", description = "Start index (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private long start = 0;
@Option(names = "--end", description = "End index (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private long end = 1000;
@Option(names = "--sleep", description = "Duration in ms to sleep before each item generation (default: ${DEFAULT-VALUE})", paramLabel = "<ms>")
private long sleep = 0;
@CommandLine.Mixin
private GenerateOptions options = GenerateOptions.builder().build();
@CommandLine.Mixin
private KeyValueProcessingOptions processingOptions = KeyValueProcessingOptions.builder().build();

@Override
protected Flow flow() {
FakerItemReader reader = FakerItemReader.builder().locale(locale).includeMetadata(includeMetadata).fields(fakerFields(getRedisURI())).start(start).end(end).sleep(sleep).build();
log.info("Creating Faker reader with {}", options);
FakerItemReader reader = FakerItemReader.builder().locale(options.getLocale()).includeMetadata(options.isIncludeMetadata()).fields(fakerFields(getRedisURI())).start(options.getStart()).end(options.getEnd()).sleep(options.getSleep()).build();
return flow(step("generate-step", "Generating", reader).build());
}

Expand All @@ -57,14 +43,14 @@ private String expression(Field<String> field) {
}

private Map<String, String> fakerFields(RedisURI uri) {
Map<String, String> fields = new LinkedHashMap<>(fakerFields);
if (fakerIndex == null) {
Map<String, String> fields = options.getFakerFields() == null ? new LinkedHashMap<>() : new LinkedHashMap<>(options.getFakerFields());
if (options.getFakerIndex() == null) {
return fields;
}
RediSearchClient client = RediSearchClient.create(uri);
try (StatefulRediSearchConnection<String, String> connection = client.connect()) {
RediSearchCommands<String, String> commands = connection.sync();
IndexInfo<String> info = RediSearchUtils.getInfo(commands.ftInfo(fakerIndex));
IndexInfo<String> info = RediSearchUtils.getInfo(commands.ftInfo(options.getFakerIndex()));
for (Field<String> field : info.getFields()) {
fields.put(field.getName(), expression(field));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.redislabs.riot.gen;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import picocli.CommandLine;

import java.util.Locale;
import java.util.Map;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class GenerateOptions {

@CommandLine.Parameters(description = "SpEL expressions", paramLabel = "SPEL")
private Map<String, String> fakerFields;
@SuppressWarnings("unused")
@CommandLine.Option(names = "--introspect", description = "Use given search index to introspect Faker fields", paramLabel = "<index>")
private String fakerIndex;
@Builder.Default
@CommandLine.Option(names = "--locale", description = "Faker locale (default: ${DEFAULT-VALUE})", paramLabel = "<tag>")
private Locale locale = Locale.ENGLISH;
@SuppressWarnings("unused")
@CommandLine.Option(names = "--metadata", description = "Include metadata (index, partition)")
private boolean includeMetadata;
@Builder.Default
@CommandLine.Option(names = "--start", description = "Start index (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private long start = 0;
@Builder.Default
@CommandLine.Option(names = "--end", description = "End index (default: ${DEFAULT-VALUE})", paramLabel = "<int>")
private long end = 1000;
@Builder.Default
@CommandLine.Option(names = "--sleep", description = "Duration in ms to sleep before each item generation (default: ${DEFAULT-VALUE})", paramLabel = "<ms>")
private long sleep = 0;
}
Loading

0 comments on commit abc3040

Please sign in to comment.