1
1
package org .embulk .input ;
2
2
3
+ import java .io .FilterInputStream ;
4
+ import java .io .IOException ;
5
+ import java .io .InputStream ;
6
+ import java .util .ArrayList ;
3
7
import java .util .Arrays ;
4
8
import java .util .Collections ;
5
9
import java .util .List ;
6
- import java .util .ArrayList ;
7
- import java .io .InputStream ;
8
- import java .io .IOException ;
9
- import java .io .FilterInputStream ;
10
- import org .slf4j .Logger ;
10
+ import org .embulk .config .ConfigDiff ;
11
+ import org .embulk .config .ConfigException ;
12
+ import org .embulk .config .ConfigSource ;
11
13
import org .embulk .config .TaskReport ;
14
+ import org .embulk .config .TaskSource ;
15
+ import org .embulk .spi .Exec ;
16
+ import org .embulk .spi .FileInputPlugin ;
17
+ import org .embulk .spi .TransactionalFileInput ;
12
18
import org .embulk .util .config .Config ;
13
19
import org .embulk .util .config .ConfigDefault ;
14
20
import org .embulk .util .config .ConfigMapper ;
15
21
import org .embulk .util .config .ConfigMapperFactory ;
16
22
import org .embulk .util .config .Task ;
17
23
import org .embulk .util .config .TaskMapper ;
18
- import org .embulk .config .ConfigDiff ;
19
- import org .embulk .config .ConfigSource ;
20
- import org .embulk .config .ConfigException ;
21
- import org .embulk .config .TaskSource ;
22
- import org .embulk .spi .Exec ;
23
- import org .embulk .spi .FileInputPlugin ;
24
- import org .embulk .spi .TransactionalFileInput ;
25
24
import org .embulk .util .file .InputStreamFileInput ;
25
+ import org .slf4j .Logger ;
26
26
import org .slf4j .LoggerFactory ;
27
27
28
28
public class CommandFileInputPlugin
29
- implements FileInputPlugin
30
- {
29
+ implements FileInputPlugin {
31
30
public interface PluginTask
32
- extends Task
33
- {
31
+ extends Task {
34
32
@ Config ("command" )
35
33
public String getCommand ();
36
34
@@ -41,18 +39,17 @@ public interface PluginTask
41
39
}
42
40
43
41
@ Override
44
- public ConfigDiff transaction (ConfigSource config , FileInputPlugin .Control control )
45
- {
42
+ public ConfigDiff transaction (ConfigSource config , FileInputPlugin .Control control ) {
46
43
final ConfigMapper configMapper = CONFIG_MAPPER_FACTORY .createConfigMapper ();
47
44
final PluginTask task = configMapper .map (config , PluginTask .class );
48
45
49
46
switch (task .getPipe ()) {
50
- case "stdout" :
51
- break ;
52
- case "stderr" :
53
- break ;
54
- default :
55
- throw new ConfigException (String .format (
47
+ case "stdout" :
48
+ break ;
49
+ case "stderr" :
50
+ break ;
51
+ default :
52
+ throw new ConfigException (String .format (
56
53
"Unknown 'pipe' option '%s'. It must be either 'stdout' or 'stderr'" , task .getPipe ()));
57
54
}
58
55
@@ -61,24 +58,22 @@ public ConfigDiff transaction(ConfigSource config, FileInputPlugin.Control contr
61
58
62
59
@ Override
63
60
public ConfigDiff resume (TaskSource taskSource ,
64
- int taskCount ,
65
- FileInputPlugin .Control control )
66
- {
61
+ int taskCount ,
62
+ FileInputPlugin .Control control ) {
67
63
control .run (taskSource , taskCount );
68
64
69
65
return CONFIG_MAPPER_FACTORY .newConfigDiff ();
70
66
}
71
67
72
68
@ Override
73
69
public void cleanup (TaskSource taskSource ,
74
- int taskCount ,
75
- List <TaskReport > successTaskReports )
76
- {
70
+ int taskCount ,
71
+ List <TaskReport > successTaskReports ) {
77
72
}
78
73
74
+ @ SuppressWarnings ("MissingSwitchDefault" )
79
75
@ Override
80
- public TransactionalFileInput open (TaskSource taskSource , int taskIndex )
81
- {
76
+ public TransactionalFileInput open (TaskSource taskSource , int taskIndex ) {
82
77
final TaskMapper taskMapper = CONFIG_MAPPER_FACTORY .createTaskMapper ();
83
78
final PluginTask task = taskMapper .map (taskSource , PluginTask .class );
84
79
@@ -90,12 +85,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
90
85
91
86
ProcessBuilder builder = new ProcessBuilder (cmdline .toArray (new String [cmdline .size ()]));
92
87
switch (task .getPipe ()) {
93
- case "stdout" :
94
- builder .redirectError (ProcessBuilder .Redirect .INHERIT );
95
- break ;
96
- case "stderr" :
97
- builder .redirectOutput (ProcessBuilder .Redirect .INHERIT );
98
- break ;
88
+ case "stdout" :
89
+ builder .redirectError (ProcessBuilder .Redirect .INHERIT );
90
+ break ;
91
+ case "stderr" :
92
+ builder .redirectOutput (ProcessBuilder .Redirect .INHERIT );
93
+ break ;
99
94
}
100
95
101
96
try {
@@ -104,12 +99,12 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
104
99
InputStream stream = null ;
105
100
try {
106
101
switch (task .getPipe ()) {
107
- case "stdout" :
108
- stream = process .getInputStream ();
109
- break ;
110
- case "stderr" :
111
- stream = process .getErrorStream ();
112
- break ;
102
+ case "stdout" :
103
+ stream = process .getInputStream ();
104
+ break ;
105
+ case "stderr" :
106
+ stream = process .getErrorStream ();
107
+ break ;
113
108
}
114
109
115
110
PluginFileInput input = new PluginFileInput (task , new ProcessWaitInputStream (stream , process ));
@@ -126,30 +121,26 @@ public TransactionalFileInput open(TaskSource taskSource, int taskIndex)
126
121
}
127
122
}
128
123
129
- static List <String > buildShell ()
130
- {
124
+ static List <String > buildShell () {
131
125
String osName = System .getProperty ("os.name" );
132
- if (osName .indexOf ("Windows" ) >= 0 ) {
126
+ if (osName .indexOf ("Windows" ) >= 0 ) {
133
127
return Collections .unmodifiableList (Arrays .asList ("PowerShell.exe" , "-Command" ));
134
128
} else {
135
129
return Collections .unmodifiableList (Arrays .asList ("sh" , "-c" ));
136
130
}
137
131
}
138
132
139
133
private static class ProcessWaitInputStream
140
- extends FilterInputStream
141
- {
134
+ extends FilterInputStream {
142
135
private Process process ;
143
136
144
- public ProcessWaitInputStream (InputStream in , Process process )
145
- {
137
+ public ProcessWaitInputStream (InputStream in , Process process ) {
146
138
super (in );
147
139
this .process = process ;
148
140
}
149
141
150
142
@ Override
151
- public int read () throws IOException
152
- {
143
+ public int read () throws IOException {
153
144
int c = super .read ();
154
145
if (c < 0 ) {
155
146
waitFor ();
@@ -158,8 +149,7 @@ public int read() throws IOException
158
149
}
159
150
160
151
@ Override
161
- public int read (byte [] b ) throws IOException
162
- {
152
+ public int read (byte [] b ) throws IOException {
163
153
int c = super .read (b );
164
154
if (c < 0 ) {
165
155
waitFor ();
@@ -168,8 +158,7 @@ public int read(byte[] b) throws IOException
168
158
}
169
159
170
160
@ Override
171
- public int read (byte [] b , int off , int len ) throws IOException
172
- {
161
+ public int read (byte [] b , int off , int len ) throws IOException {
173
162
int c = super .read (b , off , len );
174
163
if (c < 0 ) {
175
164
waitFor ();
@@ -178,14 +167,12 @@ public int read(byte[] b, int off, int len) throws IOException
178
167
}
179
168
180
169
@ Override
181
- public void close () throws IOException
182
- {
170
+ public void close () throws IOException {
183
171
super .close ();
184
172
waitFor ();
185
173
}
186
174
187
- private synchronized void waitFor () throws IOException
188
- {
175
+ private synchronized void waitFor () throws IOException {
189
176
if (process != null ) {
190
177
int code ;
191
178
try {
@@ -196,7 +183,7 @@ private synchronized void waitFor() throws IOException
196
183
process = null ;
197
184
if (code != 0 ) {
198
185
throw new IOException (String .format (
199
- "Command finished with non-zero exit code. Exit code is %d." , code ));
186
+ "Command finished with non-zero exit code. Exit code is %d." , code ));
200
187
}
201
188
}
202
189
}
@@ -205,22 +192,18 @@ private synchronized void waitFor() throws IOException
205
192
// TODO almost copied from S3FileInputPlugin. include an InputStreamFileInput utility to embulk-core.
206
193
public static class PluginFileInput
207
194
extends InputStreamFileInput
208
- implements TransactionalFileInput
209
- {
195
+ implements TransactionalFileInput {
210
196
private static class SingleFileProvider
211
- implements InputStreamFileInput .Provider
212
- {
197
+ implements InputStreamFileInput .Provider {
213
198
private final InputStream stream ;
214
199
private boolean opened = false ;
215
200
216
- public SingleFileProvider (InputStream stream )
217
- {
201
+ public SingleFileProvider (InputStream stream ) {
218
202
this .stream = stream ;
219
203
}
220
204
221
205
@ Override
222
- public InputStream openNext () throws IOException
223
- {
206
+ public InputStream openNext () throws IOException {
224
207
if (opened ) {
225
208
return null ;
226
209
}
@@ -229,29 +212,29 @@ public InputStream openNext() throws IOException
229
212
}
230
213
231
214
@ Override
232
- public void close () throws IOException
233
- {
215
+ public void close () throws IOException {
234
216
if (!opened ) {
235
217
stream .close ();
236
218
}
237
219
}
238
220
}
239
221
240
- public PluginFileInput (PluginTask task , InputStream stream )
241
- {
222
+ public PluginFileInput (PluginTask task , InputStream stream ) {
242
223
super (Exec .getBufferAllocator (), new SingleFileProvider (stream ));
243
224
}
244
225
245
- public void abort () { }
226
+ public void abort () {
227
+ }
246
228
247
- public TaskReport commit ()
248
- {
229
+ public TaskReport commit () {
249
230
return CONFIG_MAPPER_FACTORY .newTaskReport ();
250
231
}
251
232
252
233
@ Override
253
- public void close () { }
234
+ public void close () {
235
+ }
254
236
}
237
+
255
238
private static final Logger logger = LoggerFactory .getLogger (CommandFileInputPlugin .class );
256
239
257
240
private static final ConfigMapperFactory CONFIG_MAPPER_FACTORY = ConfigMapperFactory .builder ().addDefaultModules ().build ();
0 commit comments