Skip to content

Commit

Permalink
Fix checkstyle error. Reformat codes and Optimize imports
Browse files Browse the repository at this point in the history
  • Loading branch information
hiroyuki-sato committed May 8, 2024
1 parent e66796a commit ff46d92
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 88 deletions.
139 changes: 61 additions & 78 deletions src/main/java/org/embulk/input/CommandFileInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,34 @@
package org.embulk.input;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.ArrayList;
import java.io.InputStream;
import java.io.IOException;
import java.io.FilterInputStream;
import org.slf4j.Logger;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigSource;
import org.embulk.config.TaskReport;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;
import org.embulk.util.config.ConfigMapper;
import org.embulk.util.config.ConfigMapperFactory;
import org.embulk.util.config.Task;
import org.embulk.util.config.TaskMapper;
import org.embulk.config.ConfigDiff;
import org.embulk.config.ConfigSource;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskSource;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInputPlugin;
import org.embulk.spi.TransactionalFileInput;
import org.embulk.util.file.InputStreamFileInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommandFileInputPlugin
implements FileInputPlugin
{
implements FileInputPlugin {
public interface PluginTask
extends Task
{
extends Task {
@Config("command")
public String getCommand();

Expand All @@ -41,18 +39,17 @@ public interface PluginTask
}

@Override
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control)
{
public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control control) {
final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY.createConfigMapper();
final PluginTask task = configMapper.map(config, PluginTask.class);

switch (task.getPipe()) {
case "stdout":
break;
case "stderr":
break;
default:
throw new ConfigException(String.format(
case "stdout":
break;
case "stderr":
break;
default:
throw new ConfigException(String.format(
"Unknown 'pipe' option '%s'. It must be either 'stdout' or 'stderr'", task.getPipe()));
}

Expand All @@ -61,24 +58,22 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr

@Override
public ConfigDiff resume(TaskSource taskSource,
int taskCount,
FileInputPlugin.Control control)
{
int taskCount,
FileInputPlugin.Control control) {
control.run(taskSource, taskCount);

return CONFIG_MAPPER_FACTORY.newConfigDiff();
}

@Override
public void cleanup(TaskSource taskSource,
int taskCount,
List<TaskReport> successTaskReports)
{
int taskCount,
List<TaskReport> successTaskReports) {
}

@SuppressWarnings("MissingSwitchDefault")
@Override
public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
{
public TransactionalFileInput open(TaskSource taskSource, int taskIndex) {
final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY.createTaskMapper();
final PluginTask task = taskMapper.map(taskSource, PluginTask.class);

Expand All @@ -90,12 +85,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)

ProcessBuilder builder = new ProcessBuilder(cmdline.toArray(new String[cmdline.size()]));
switch (task.getPipe()) {
case "stdout":
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
break;
case "stderr":
builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
break;
case "stdout":
builder.redirectError(ProcessBuilder.Redirect.INHERIT);
break;
case "stderr":
builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
break;
}

try {
Expand All @@ -104,12 +99,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
InputStream stream = null;
try {
switch (task.getPipe()) {
case "stdout":
stream = process.getInputStream();
break;
case "stderr":
stream = process.getErrorStream();
break;
case "stdout":
stream = process.getInputStream();
break;
case "stderr":
stream = process.getErrorStream();
break;
}

PluginFileInput input = new PluginFileInput(task, new ProcessWaitInputStream(stream, process));
Expand All @@ -126,30 +121,26 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
}
}

protected static List<String> buildShell()
{
protected static List<String> buildShell() {
String osName = System.getProperty("os.name");
if(osName.indexOf("Windows") >= 0) {
if (osName.indexOf("Windows") >= 0) {
return Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command"));
} else {
return Collections.unmodifiableList(Arrays.asList("sh", "-c"));
}
}

private static class ProcessWaitInputStream
extends FilterInputStream
{
extends FilterInputStream {
private Process process;

public ProcessWaitInputStream(InputStream in, Process process)
{
public ProcessWaitInputStream(InputStream in, Process process) {
super(in);
this.process = process;
}

@Override
public int read() throws IOException
{
public int read() throws IOException {
int c = super.read();
if (c < 0) {
waitFor();
Expand All @@ -158,8 +149,7 @@ public int read() throws IOException
}

@Override
public int read(byte[] b) throws IOException
{
public int read(byte[] b) throws IOException {
int c = super.read(b);
if (c < 0) {
waitFor();
Expand All @@ -168,8 +158,7 @@ public int read(byte[] b) throws IOException
}

@Override
public int read(byte[] b, int off, int len) throws IOException
{
public int read(byte[] b, int off, int len) throws IOException {
int c = super.read(b, off, len);
if (c < 0) {
waitFor();
Expand All @@ -178,14 +167,12 @@ public int read(byte[] b, int off, int len) throws IOException
}

@Override
public void close() throws IOException
{
public void close() throws IOException {
super.close();
waitFor();
}

private synchronized void waitFor() throws IOException
{
private synchronized void waitFor() throws IOException {
if (process != null) {
int code;
try {
Expand All @@ -196,7 +183,7 @@ private synchronized void waitFor() throws IOException
process = null;
if (code != 0) {
throw new IOException(String.format(
"Command finished with non-zero exit code. Exit code is %d.", code));
"Command finished with non-zero exit code. Exit code is %d.", code));
}
}
}
Expand All @@ -205,22 +192,18 @@ private synchronized void waitFor() throws IOException
// TODO almost copied from S3FileInputPlugin. include an InputStreamFileInput utility to embulk-core.
public static class PluginFileInput
extends InputStreamFileInput
implements TransactionalFileInput
{
implements TransactionalFileInput {
private static class SingleFileProvider
implements InputStreamFileInput.Provider
{
implements InputStreamFileInput.Provider {
private final InputStream stream;
private boolean opened = false;

public SingleFileProvider(InputStream stream)
{
public SingleFileProvider(InputStream stream) {
this.stream = stream;
}

@Override
public InputStream openNext() throws IOException
{
public InputStream openNext() throws IOException {
if (opened) {
return null;
}
Expand All @@ -229,29 +212,29 @@ public InputStream openNext() throws IOException
}

@Override
public void close() throws IOException
{
public void close() throws IOException {
if (!opened) {
stream.close();
}
}
}

public PluginFileInput(PluginTask task, InputStream stream)
{
public PluginFileInput(PluginTask task, InputStream stream) {
super(Exec.getBufferAllocator(), new SingleFileProvider(stream));
}

public void abort() { }
public void abort() {
}

public TaskReport commit()
{
public TaskReport commit() {
return CONFIG_MAPPER_FACTORY.newTaskReport();
}

@Override
public void close() { }
public void close() {
}
}

private static final Logger logger = LoggerFactory.getLogger(CommandFileInputPlugin.class);

private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory.builder().addDefaultModules().build();
Expand Down
16 changes: 6 additions & 10 deletions src/test/java/org/embulk/input/TestCommandFileInputPlugin.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
package org.embulk.input;

import static org.embulk.input.CommandFileInputPlugin.buildShell;
import static org.junit.Assert.assertEquals;

import java.util.Arrays;
import java.util.Collections;
import org.embulk.test.EmbulkTestRuntime;
import org.junit.Rule;
import org.junit.Test;
import org.embulk.test.EmbulkTestRuntime;
import static org.embulk.input.CommandFileInputPlugin.buildShell;

import java.util.List;

import static org.junit.Assert.assertEquals;

public class TestCommandFileInputPlugin
{
public class TestCommandFileInputPlugin {
@Rule
public EmbulkTestRuntime runtime = new EmbulkTestRuntime();

@Test
public void testShell() {
if (System.getProperty("os.name").indexOf("Windows") >= 0) {
assertEquals(Collections.unmodifiableList(Arrays.asList("PowerShell.exe", "-Command")), buildShell());
}
else {
} else {
assertEquals(Collections.unmodifiableList(Arrays.asList("sh", "-c")), buildShell());
}
}
Expand Down

0 comments on commit ff46d92

Please sign in to comment.