Skip to content

Commit

Permalink
Merge pull request #2 from trocco-io/fixed_unintentionally_finished
Browse files Browse the repository at this point in the history
Fixed unintentionally finished of the page output after the first file was processed.
  • Loading branch information
NamedPython authored Sep 13, 2024
2 parents 97da014 + aa91d36 commit 5208941
Showing 1 changed file with 28 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.embulk.parser.poi_excel.visitor.PoiExcelVisitorValue;
import org.embulk.spi.Exec;
import org.embulk.spi.FileInput;
import org.embulk.spi.Page;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.PageOutput;
import org.embulk.spi.ParserPlugin;
Expand Down Expand Up @@ -233,6 +234,7 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
}

try (FileInputInputStream is = new FileInputInputStream(input)) {
final PageOutput neverFinishOutput = new OnlyAddPageOutput(output); // never finish while iterating files
while (is.nextFile()) {
Workbook workbook;
try {
Expand All @@ -245,8 +247,9 @@ public void run(TaskSource taskSource, Schema schema, FileInput input, PageOutpu
if (log.isDebugEnabled()) {
log.debug("resolved sheet names={}", list);
}
run(task, schema, input, workbook, list, output);
run(task, schema, input, workbook, list, neverFinishOutput);
}
output.finish(); // explicitly finish at the end
}
}

Expand Down Expand Up @@ -297,6 +300,30 @@ private List<String> resolveSheetName(Workbook workbook, List<String> sheetNames
return new ArrayList<>(set);
}

// Wrapper for output that only add page
private static class OnlyAddPageOutput implements PageOutput {
private final PageOutput output;

OnlyAddPageOutput(PageOutput output) {
this.output = output;
}

@Override
public void add(Page page) {
output.add(page);
}

@Override
public void finish() {
// do nothing
}

@Override
public void close() {
// do nothing
}
}

protected void run(PluginTask task, Schema schema, FileInput input, Workbook workbook, List<String> sheetNames, PageOutput output) {
final int flushCount = task.getFlushCount();

Expand Down

0 comments on commit 5208941

Please sign in to comment.