diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java index 2f965c8e5b9d..9ef72ee7f27e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeFalse; @@ -47,7 +48,6 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; @@ -197,23 +197,17 @@ private void testDynamicDestinations(boolean customType) throws Exception { p.run(); assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class), - null, - null, + Iterables.filter(elements, new StartsWith("a")), 0, baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class), - null, - null, + Iterables.filter(elements, new StartsWith("b")), 0, baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class), - null, - null, + Iterables.filter(elements, new StartsWith("c")), 0, baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); @@ -323,87 +317,63 @@ public void testDynamicDefaultFilenamePolicy() throws Exception { .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); p.run(); - String[] aElements = - Iterables.toArray( - StreamSupport.stream( - elements.stream() - .filter( - Predicates.compose(new StartsWith("a"), new ExtractWriteDestination()) - ::apply) - .collect(Collectors.toList()) - .spliterator(), - false) - .map(Functions.toStringFunction()::apply) - .collect(Collectors.toList()), - String.class); - String[] bElements = - Iterables.toArray( - StreamSupport.stream( - elements.stream() - .filter( - Predicates.compose(new StartsWith("b"), new ExtractWriteDestination()) - ::apply) - .collect(Collectors.toList()) - .spliterator(), - false) - .map(Functions.toStringFunction()::apply) - .collect(Collectors.toList()), - String.class); - String[] cElements = - Iterables.toArray( - StreamSupport.stream( - elements.stream() - .filter( - Predicates.compose(new StartsWith("c"), new ExtractWriteDestination()) - ::apply) - .collect(Collectors.toList()) - .spliterator(), - false) - .map(Functions.toStringFunction()::apply) - .collect(Collectors.toList()), - String.class); + Iterable aElements = + elements.stream() + .filter(Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())::apply) + .collect(Collectors.toList()) + .stream() + .map(Functions.toStringFunction()) + .collect(Collectors.toList()); + Iterable bElements = + elements.stream() + .filter(Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())::apply) + .collect(Collectors.toList()) + .stream() + .map(Functions.toStringFunction()) + .collect(Collectors.toList()); + Iterable cElements = + elements.stream() + .filter(Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())::apply) + .collect(Collectors.toList()) + .stream() + .map(Functions.toStringFunction()) + .collect(Collectors.toList()); assertOutputFiles( aElements, - null, - null, 0, baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( bElements, - null, - null, 0, baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( cElements, - null, - null, 0, baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); } - private void runTestWrite(String[] elems) throws Exception { + private void runTestWrite(Iterable elems) throws Exception { runTestWrite(elems, null, null, 1); } - private void runTestWrite(String[] elems, int numShards) throws Exception { + private void runTestWrite(Iterable elems, int numShards) throws Exception { runTestWrite(elems, null, null, numShards); } - private void runTestWrite(String[] elems, String header, String footer) throws Exception { + private void runTestWrite(Iterable elems, String header, String footer) throws Exception { runTestWrite(elems, header, footer, 1); } - private void runTestWrite(String[] elems, String header, String footer, int numShards) + private void runTestWrite(Iterable elems, String header, String footer, int numShards) throws Exception { runTestWrite(elems, header, footer, numShards, false); } private void runTestWrite( - String[] elems, String header, String footer, int numShards, boolean skipIfEmpty) + Iterable elems, String header, String footer, int numShards, boolean skipIfEmpty) throws Exception { String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite"); @@ -411,7 +381,7 @@ private void runTestWrite( FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); PCollection input = - p.apply("CreateInput", Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of())); + p.apply("CreateInput", Create.of(elems).withCoder(StringUtf8Coder.of())); TextIO.TypedWrite write = TextIO.write().to(baseFilename).withHeader(header).withFooter(footer).withOutputFilenames(); @@ -441,27 +411,22 @@ private void runTestWrite( } private static void assertOutputFiles( - String[] elems, - final String header, - final String footer, - int numShards, - ResourceId outputPrefix, - String shardNameTemplate) + Iterable elems, int numShards, ResourceId outputPrefix, String shardNameTemplate) throws Exception { - assertOutputFiles(elems, header, footer, numShards, outputPrefix, shardNameTemplate, false); + assertOutputFiles(elems, null, null, numShards, outputPrefix, shardNameTemplate, false); } private static void assertOutputFiles( - String[] elems, - final String header, - final String footer, + Iterable elems, + final @Nullable String header, + final @Nullable String footer, int numShards, ResourceId outputPrefix, String shardNameTemplate, boolean skipIfEmpty) throws Exception { List expectedFiles = new ArrayList<>(); - if (skipIfEmpty && elems.length == 0) { + if (skipIfEmpty && Iterables.isEmpty(elems)) { String pattern = outputPrefix.toString() + "*"; MatchResult matches = Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(pattern))); @@ -489,7 +454,7 @@ private static void assertOutputFiles( actual.add(currentFile); } - List expectedElements = new ArrayList<>(elems.length); + List expectedElements = new ArrayList<>(); for (String elem : elems) { byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); String line = new String(encodedElem, StandardCharsets.UTF_8); @@ -551,49 +516,49 @@ private static Predicate> haveProperHeaderAndFooter( @Test @Category(NeedsRunner.class) public void testWriteStrings() throws Exception { - runTestWrite(LINES.toArray(new String[0])); + runTestWrite(LINES); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStringsNoSharding() throws Exception { - runTestWrite(NO_LINES.toArray(new String[0]), 0); + runTestWrite(NO_LINES, 0); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStrings() throws Exception { - runTestWrite(NO_LINES.toArray(new String[0])); + runTestWrite(NO_LINES); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStringsSkipIfEmpty() throws Exception { - runTestWrite(NO_LINES.toArray(new String[0]), null, null, 0, true); + runTestWrite(NO_LINES, null, null, 0, true); } @Test @Category(NeedsRunner.class) public void testShardedWrite() throws Exception { - runTestWrite(LINES.toArray(new String[0]), 5); + runTestWrite(LINES, 5); } @Test @Category(NeedsRunner.class) public void testWriteWithHeader() throws Exception { - runTestWrite(LINES.toArray(new String[0]), MY_HEADER, null); + runTestWrite(LINES, MY_HEADER, null); } @Test @Category(NeedsRunner.class) public void testWriteWithFooter() throws Exception { - runTestWrite(LINES.toArray(new String[0]), null, MY_FOOTER); + runTestWrite(LINES, null, MY_FOOTER); } @Test @Category(NeedsRunner.class) public void testWriteWithHeaderAndFooter() throws Exception { - runTestWrite(LINES.toArray(new String[0]), MY_HEADER, MY_FOOTER); + runTestWrite(LINES, MY_HEADER, MY_FOOTER); } @Test @@ -605,8 +570,7 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { FileSystems.matchNewResource( Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true); - PCollection input = - p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder)); + PCollection input = p.apply(Create.of(LINES2).withCoder(coder)); final WritableByteChannelFactory writableByteChannelFactory = new DrunkWritableByteChannelFactory(); @@ -625,15 +589,13 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { p.run(); - final List drunkElems = new ArrayList<>(LINES2.toArray(new String[0]).length * 2 + 2); - for (String elem : LINES2.toArray(new String[0])) { - drunkElems.add(elem); - drunkElems.add(elem); + List expectedElems = new ArrayList<>(2 * LINES2.size()); + for (String elem : LINES2) { + expectedElems.add(elem); + expectedElems.add(elem); } assertOutputFiles( - drunkElems.toArray(new String[0]), - null, - null, + expectedElems, 1, baseDir.resolve( outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(), @@ -716,7 +678,7 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception { FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); PCollection input = - p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder)) + p.apply(Create.of(LINES2).withCoder(coder)) .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))); @@ -738,11 +700,13 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception { String pattern = baseFilename.toString() + "*"; List matches = FileSystems.match(Collections.singletonList(pattern)); List found = new ArrayList<>(Iterables.getOnlyElement(matches).metadata()); - assertEquals(3, found.size()); + // As sharding is random, the elements may end up in the same shards. + assertFalse(found.isEmpty()); + assertTrue(found.size() <= 3); // Now assert file contents irrespective of exact shard indices. assertOutputFiles( - LINES2.toArray(new String[0]), + LINES2, null, null, 0, // match all files by prefix