Skip to content

Commit

Permalink
Fixed unintentionally finished of the page output after the first fil…
Browse files Browse the repository at this point in the history
…e was processed.
  • Loading branch information
t3t5u committed Sep 6, 2024
1 parent e247db1 commit aa91d36
Showing 1 changed file with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.embulk.parser.poi_excel.visitor.PoiExcelVisitorValue;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInput;
import org.embulk.spi.Page;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.ParserPlugin;
Expand Down Expand Up @@ -233,6 +234,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
}

try (FileInputInputStream is = new FileInputInputStream(input)) {
final PageOutput neverFinishOutput = new OnlyAddPageOutput(output); // never finish while iterating files
while (is.nextFile()) {
Workbook workbook;
try {
Expand All @@ -245,8 +247,9 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
if (log.isDebugEnabled()) {
log.debug("resolved sheet names={}", list);
}
run(task, schema, input, workbook, list, output);
run(task, schema, input, workbook, list, neverFinishOutput);
}
output.finish(); // explicitly finish at the end
}
}

Expand Down Expand Up @@ -297,6 +300,30 @@ private List<String> resolveSheetName(Workbook workbook, List<String> sheetNames
return new ArrayList<>(set);
}

// Wrapper for output that only add page
private static class OnlyAddPageOutput implements PageOutput {
private final PageOutput output;

OnlyAddPageOutput(PageOutput output) {
this.output = output;
}

@Override
public void add(Page page) {
output.add(page);
}

@Override
public void finish() {
// do nothing
}

@Override
public void close() {
// do nothing
}
}

protected void run(PluginTask task, Schema schema, FileInput input, Workbook workbook, List<String> sheetNames, PageOutput output) {
final int flushCount = task.getFlushCount();

Expand Down

0 comments on commit aa91d36

Please sign in to comment.