Skip to content

Commit

Permalink
added PSV and TSV file types and fixed unused --type option
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Mar 13, 2021
1 parent 3ef5c89 commit 3a7c2e2
Show file tree
Hide file tree
Showing 137 changed files with 8,397 additions and 639 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

Redis Input/Output Tools (RIOT) is a set of import/export command line utilities for Redis:

* [RIOT DB](https://redis-developer.github.io/riot/riot-db.html): migrate from an RDBMS to Redis, RediSearch, RedisJSON, ...
* [RIOT File](https://redis-developer.github.io/riot/riot-file.html): bulk import/export data from/to files.
* [RIOT Gen](https://redis-developer.github.io/riot/riot-gen.html): generate sample Redis datasets for new feature development and proof of concept.
* [RIOT Redis](https://redis-developer.github.io/riot/riot-redis.html): live replication from any Redis database (including AWS Elasticache) to another Redis database.
* [RIOT Stream](https://redis-developer.github.io/riot/riot-stream.html): import/export messages from/to Kafka topics.
* [RIOT DB](http://developer.redislabs.com/riot/db.html): migrate from an RDBMS to Redis, RediSearch, RedisJSON, ...
* [RIOT File](http://developer.redislabs.com/riot/file.html): bulk import/export data from/to files.
* [RIOT Gen](http://developer.redislabs.com/riot/gen.html): generate sample Redis datasets for new feature development and proof of concept.
* [RIOT Redis](http://developer.redislabs.com/riot/redis.html): live replication from any Redis database (including AWS Elasticache) to another Redis database.
* [RIOT Stream](http://developer.redislabs.com/riot/stream.html): import/export messages from/to Kafka topics.
14 changes: 2 additions & 12 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ subprojects {
implementation project(':core')
testImplementation project(':test')
implementation 'org.slf4j:slf4j-jdk14'
implementation 'org.slf4j:log4j-over-slf4j'
}
}
}
Expand All @@ -90,20 +89,11 @@ asciidoctor {
exclude '*.excalidraw'
into 'images'
}
from("connectors/file/src/test/resources/csv") {
include '*.csv'
into '.'
}
from("connectors/file/src/test/resources/json") {
include '*.json'
into '.'
}
from("connectors/file/src/test/resources/xml") {
include '*.xml'
from("connectors/file/src/test/resources/files") {
include '*.*'
into '.'
}
}

outputOptions {
separateOutputDirs = false
}
Expand Down
5 changes: 0 additions & 5 deletions connectors/db/riot-db

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import com.redislabs.riot.RiotApp;
import com.redislabs.riot.test.AbstractStandaloneRedisTest;

public class DbTest extends AbstractStandaloneRedisTest {
public abstract class AbstractDatabaseTest extends AbstractStandaloneRedisTest {

@Override
protected RiotApp app() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public static void main(String[] args) throws IOException, SQLException {
Connection connection = dataSource.getConnection();
ScriptRunner scriptRunner = ScriptRunner.builder().connection(connection).autoCommit(false).stopOnError(true)
.build();
InputStream inputStream = TestOracle.class.getClassLoader().getResourceAsStream("oracle/hr.sql");
InputStream inputStream = TestOracle.class.getClassLoader().getResourceAsStream("oracle.sql");
scriptRunner.runScript(new InputStreamReader(inputStream));
Statement statement = connection.createStatement();
statement.execute("SELECT COUNT(*) AS count FROM employees");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

@Testcontainers
@SuppressWarnings("rawtypes")
public class TestPostgreSQL extends DbTest {
public class TestPostgreSQL extends AbstractDatabaseTest {

@Container
private static final PostgreSQLContainer postgreSQL = new PostgreSQLContainer(
Expand All @@ -42,7 +42,7 @@ public void testExport() throws Exception {
statement.execute("CREATE TABLE mytable (id smallint NOT NULL, field1 bpchar, field2 bpchar)");
statement.execute("ALTER TABLE ONLY mytable ADD CONSTRAINT pk_mytable PRIMARY KEY (id)");
DataGenerator.builder().commands(async).dataTypes(Collections.singletonList(DataType.HASH)).build().run();
executeFile("/postgresql/export.txt");
executeFile("export-postgresql");
statement.execute("SELECT COUNT(*) AS count FROM mytable");
ResultSet countResultSet = statement.getResultSet();
countResultSet.next();
Expand All @@ -62,9 +62,9 @@ public void testImport() throws Exception {
Connection connection = dataSource.getConnection();
ScriptRunner scriptRunner = ScriptRunner.builder().connection(connection).autoCommit(false).stopOnError(true)
.build();
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("postgresql/northwind.sql");
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("northwind.sql");
scriptRunner.runScript(new InputStreamReader(inputStream));
executeFile("/postgresql/import.txt");
executeFile("import-postgresql");
Statement statement = connection.createStatement();
statement.execute("SELECT COUNT(*) AS count FROM orders");
List<String> keys = sync.keys("order:*");
Expand All @@ -78,14 +78,14 @@ public void testImport() throws Exception {
}

@Test
public void testImportToJsonStrings() throws Exception {
public void testImportSet() throws Exception {
DataSource dataSource = dataSource(postgreSQL);
Connection connection = dataSource.getConnection();
ScriptRunner scriptRunner = ScriptRunner.builder().connection(connection).autoCommit(false).stopOnError(true)
.build();
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("postgresql/northwind.sql");
InputStream inputStream = getClass().getClassLoader().getResourceAsStream("northwind.sql");
scriptRunner.runScript(new InputStreamReader(inputStream));
executeFile("/postgresql/import-to-json-strings.txt");
executeFile("import-postgresql-set");
Statement statement = connection.createStatement();
statement.execute("SELECT * FROM orders");
ResultSet resultSet = statement.getResultSet();
Expand Down
1 change: 1 addition & 0 deletions connectors/db/src/test/resources/export-postgresql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
riot-db export "INSERT INTO mytable (id, field1, field2) VALUES (CAST(:id AS SMALLINT), :field1, :field2)" --url jdbc:postgresql://host:port/database --username appuser --password passwd --scan-match "hash:*" --key-regex "hash:(?<id>.*)"
1 change: 1 addition & 0 deletions connectors/db/src/test/resources/import-postgresql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
riot-db import "SELECT * FROM orders" --url jdbc:postgresql://host:port/database --username appuser --password passwd hset --keyspace order --keys order_id
1 change: 1 addition & 0 deletions connectors/db/src/test/resources/import-postgresql-set
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
riot-db import "SELECT * FROM orders" --url jdbc:postgresql://host:port/database --username appuser --password passwd set --keyspace order --keys order_id
File renamed without changes.
1 change: 0 additions & 1 deletion connectors/db/src/test/resources/postgresql/export.txt

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion connectors/db/src/test/resources/postgresql/import.txt

This file was deleted.

5 changes: 0 additions & 5 deletions connectors/file/riot-file

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,6 @@
import picocli.CommandLine.Command;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -37,7 +32,7 @@ protected Flow flow() throws Exception {
}
List<Step> steps = new ArrayList<>();
for (String file : expandedFiles) {
FileType fileType = FileUtils.fileType(file);
FileType fileType = fileOptions.type(file);
Resource resource = FileUtils.inputResource(file, fileOptions);
AbstractItemStreamItemReader<T> reader = reader(file, fileType, resource);
String name = FileUtils.filename(resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ protected Flow flow() throws Exception {
List<Step> steps = new ArrayList<>();
DataStructureItemProcessor processor = new DataStructureItemProcessor();
for (String file : expandedFiles) {
FileType fileType = FileUtils.fileType(file);
FileType fileType = fileOptions.type(file);
Resource resource = FileUtils.inputResource(file, fileOptions);
String name = FileUtils.filename(resource);
AbstractItemStreamItemReader<DataStructure<String>> reader = reader(fileType, resource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected Flow flow() throws Exception {
}

private ItemWriter<DataStructure<String>> writer() throws IOException {
FileType fileType = FileUtils.fileType(file);
FileType fileType = fileOptions.type(file);
WritableResource resource = FileUtils.outputResource(file, fileOptions);
switch (fileType) {
case JSON:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,6 @@ public class FileImportOptions {
@Option(names = "--continuation", description = "Line continuation string (default: ${DEFAULT-VALUE})", paramLabel = "<string>")
private String continuationString = "\\";

public String delimiter(String file) {
if (delimiter == null) {
String extension = FileUtils.extension(file);
if (extension != null) {
switch (extension) {
case FileUtils.EXT_TSV:
return DelimitedLineTokenizer.DELIMITER_TAB;
case FileUtils.EXT_CSV:
return DelimitedLineTokenizer.DELIMITER_COMMA;
}
}
return DelimitedLineTokenizer.DELIMITER_COMMA;
}
return delimiter;
}

public int linesToSkip() {
if (linesToSkip == null) {
if (header) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public class FileOptions {
@Builder.Default
@Option(names = "--encoding", description = "File encoding (default: ${DEFAULT-VALUE})", paramLabel = "<charset>")
private String encoding = Charset.defaultCharset().name();
@Option(names = {"-t", "--filetype"}, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "<type>")
@Option(names = {"-t", "--type"}, description = "File type: ${COMPLETION-CANDIDATES}", paramLabel = "<type>")
private FileType type;
@Option(names = {"-z", "--gzip"}, description = "File is gzip compressed")
private boolean gzip;
Expand All @@ -29,5 +29,18 @@ public class FileOptions {
@ArgGroup(exclusive = false, heading = "Google Cloud Storage options%n")
private GcsOptions gcs = GcsOptions.builder().build();

public FileType type(String file) {
if (type == null) {
String fileExtension = FileUtils.extension(file);
for (FileType type : FileType.values()) {
if (type.getExtension().equals(fileExtension)) {
return type;
}
}
return FileType.CSV;
}
return type;
}

}

Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.redislabs.riot.file;

import lombok.Getter;

public enum FileType {

DELIMITED(FileUtils.EXT_CSV, FileUtils.EXT_TSV), FIXED(FileUtils.EXT_FW), JSON(FileUtils.EXT_JSON), XML(FileUtils.EXT_XML);
CSV("csv"), PSV("psv"), TSV("tsv"), FW("fw"), JSON("json"), XML("xml");

private final String[] extensions;
@Getter
private final String extension;

FileType(String... extensions) {
this.extensions = extensions;
FileType(String extension) {
this.extension = extension;
}

public String[] getExtensions() {
return extensions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,6 @@ public class FileUtils {
public final static String GS_URI_PREFIX = "gs://";
public final static String S3_URI_PREFIX = "s3://";


public final static String EXT_CSV = "csv";
public final static String EXT_TSV = "tsv";
public final static String EXT_FW = "fw";
public final static String EXT_JSON = "json";
public final static String EXT_XML = "xml";
private final static Pattern EXTENSION_PATTERN = Pattern.compile("(?i)\\.(?<extension>\\w+)(?<gz>\\.gz)?$");

public static String filename(Resource resource) throws IOException {
Expand All @@ -75,18 +69,6 @@ public static String filename(Resource resource) throws IOException {
return path.substring(cut + 1);
}

public static FileType fileType(String file) {
String extension = extension(file);
for (FileType type : FileType.values()) {
for (String typeExtension : type.getExtensions()) {
if (typeExtension.equals(extension)) {
return type;
}
}
}
return FileType.DELIMITED;
}

public static boolean isGzip(String file) {
return extensionGroup(file, "gz") != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,18 @@ public class KeyValueFileImportCommand extends AbstractFileImportCommand<Map<Str
@SuppressWarnings({"unchecked", "rawtypes"})
protected AbstractItemStreamItemReader<Map<String, Object>> reader(String file, FileType fileType, Resource resource) {
switch (fileType) {
case DELIMITED:
case CSV:
case PSV:
case TSV:
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter(options.delimiter(file));
tokenizer.setDelimiter(delimiter(fileType));
tokenizer.setQuoteCharacter(options.getQuoteCharacter());
if (!ObjectUtils.isEmpty(options.getIncludedFields())) {
tokenizer.setIncludedFields(options.getIncludedFields());
}
log.info("Creating delimited reader with {} for file {}", options, file);
return flatFileReader(resource, tokenizer);
case FIXED:
case FW:
FixedLengthTokenizer fixedLengthTokenizer = new FixedLengthTokenizer();
Assert.notEmpty(options.getColumnRanges(), "Column ranges are required");
fixedLengthTokenizer.setColumns(options.getColumnRanges());
Expand All @@ -61,6 +63,20 @@ protected AbstractItemStreamItemReader<Map<String, Object>> reader(String file,
throw new IllegalArgumentException("Unsupported file type: " + fileType);
}

private String delimiter(FileType fileType) {
if (options.getDelimiter() != null) {
return options.getDelimiter();
}
switch (fileType) {
case TSV:
return DelimitedLineTokenizer.DELIMITER_TAB;
case PSV:
return "|";
default:
return DelimitedLineTokenizer.DELIMITER_COMMA;
}
}

@Override
protected ItemProcessor<Map<String, Object>, Map<String, Object>> processor() {
return processingOptions.processor(connection);
Expand Down
Loading

0 comments on commit 3a7c2e2

Please sign in to comment.