From 4d9e7fc77f7b2d4221724243045e0de44a5fa649 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Mon, 9 Mar 2026 13:21:31 +0100 Subject: [PATCH] Fix flaky TextIOWriteTest by loosening the shard count. Records may end up in the same shard as other records as it is random. Simplify the test to use Iterables instead of arrays. --- .../apache/beam/sdk/io/TextIOWriteTest.java | 150 +++++++----------- 1 file changed, 57 insertions(+), 93 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java index 2f965c8e5b9d..9ef72ee7f27e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOWriteTest.java @@ -26,6 +26,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assume.assumeFalse; @@ -47,7 +48,6 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; -import java.util.stream.StreamSupport; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.CustomCoder; @@ -197,23 +197,17 @@ private void testDynamicDestinations(boolean customType) throws Exception { p.run(); assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class), - null, - null, + Iterables.filter(elements, new StartsWith("a")), 0, baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class), - null, - null, + Iterables.filter(elements, new StartsWith("b")), 0, baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( - Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class), - null, - null, + Iterables.filter(elements, new StartsWith("c")), 0, baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); @@ -323,87 +317,63 @@ public void testDynamicDefaultFilenamePolicy() throws Exception { .withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true))); p.run(); - String[] aElements = - Iterables.toArray( - StreamSupport.stream( - elements.stream() - .filter( - Predicates.compose(new StartsWith("a"), new ExtractWriteDestination()) - ::apply) - .collect(Collectors.toList()) - .spliterator(), - false) - .map(Functions.toStringFunction()::apply) - .collect(Collectors.toList()), - String.class); - String[] bElements = - Iterables.toArray( - StreamSupport.stream( - elements.stream() - .filter( - Predicates.compose(new StartsWith("b"), new ExtractWriteDestination()) - ::apply) - .collect(Collectors.toList()) - .spliterator(), - false) - .map(Functions.toStringFunction()::apply) - .collect(Collectors.toList()), - String.class); - String[] cElements = - Iterables.toArray( - StreamSupport.stream( - elements.stream() - .filter( - Predicates.compose(new StartsWith("c"), new ExtractWriteDestination()) - ::apply) - .collect(Collectors.toList()) - .spliterator(), - false) - .map(Functions.toStringFunction()::apply) - .collect(Collectors.toList()), - String.class); + Iterable aElements = + elements.stream() + .filter(Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())::apply) + .collect(Collectors.toList()) + .stream() + .map(Functions.toStringFunction()) + .collect(Collectors.toList()); + Iterable bElements = + elements.stream() + .filter(Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())::apply) + .collect(Collectors.toList()) + .stream() + .map(Functions.toStringFunction()) + .collect(Collectors.toList()); + Iterable cElements = + elements.stream() + .filter(Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())::apply) + .collect(Collectors.toList()) + .stream() + .map(Functions.toStringFunction()) + .collect(Collectors.toList()); assertOutputFiles( aElements, - null, - null, 0, baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( bElements, - null, - null, 0, baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); assertOutputFiles( cElements, - null, - null, 0, baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE), DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE); } - private void runTestWrite(String[] elems) throws Exception { + private void runTestWrite(Iterable elems) throws Exception { runTestWrite(elems, null, null, 1); } - private void runTestWrite(String[] elems, int numShards) throws Exception { + private void runTestWrite(Iterable elems, int numShards) throws Exception { runTestWrite(elems, null, null, numShards); } - private void runTestWrite(String[] elems, String header, String footer) throws Exception { + private void runTestWrite(Iterable elems, String header, String footer) throws Exception { runTestWrite(elems, header, footer, 1); } - private void runTestWrite(String[] elems, String header, String footer, int numShards) + private void runTestWrite(Iterable elems, String header, String footer, int numShards) throws Exception { runTestWrite(elems, header, footer, numShards, false); } private void runTestWrite( - String[] elems, String header, String footer, int numShards, boolean skipIfEmpty) + Iterable elems, String header, String footer, int numShards, boolean skipIfEmpty) throws Exception { String outputName = "file.txt"; Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite"); @@ -411,7 +381,7 @@ private void runTestWrite( FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); PCollection input = - p.apply("CreateInput", Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of())); + p.apply("CreateInput", Create.of(elems).withCoder(StringUtf8Coder.of())); TextIO.TypedWrite write = TextIO.write().to(baseFilename).withHeader(header).withFooter(footer).withOutputFilenames(); @@ -441,27 +411,22 @@ private void runTestWrite( } private static void assertOutputFiles( - String[] elems, - final String header, - final String footer, - int numShards, - ResourceId outputPrefix, - String shardNameTemplate) + Iterable elems, int numShards, ResourceId outputPrefix, String shardNameTemplate) throws Exception { - assertOutputFiles(elems, header, footer, numShards, outputPrefix, shardNameTemplate, false); + assertOutputFiles(elems, null, null, numShards, outputPrefix, shardNameTemplate, false); } private static void assertOutputFiles( - String[] elems, - final String header, - final String footer, + Iterable elems, + final @Nullable String header, + final @Nullable String footer, int numShards, ResourceId outputPrefix, String shardNameTemplate, boolean skipIfEmpty) throws Exception { List expectedFiles = new ArrayList<>(); - if (skipIfEmpty && elems.length == 0) { + if (skipIfEmpty && Iterables.isEmpty(elems)) { String pattern = outputPrefix.toString() + "*"; MatchResult matches = Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(pattern))); @@ -489,7 +454,7 @@ private static void assertOutputFiles( actual.add(currentFile); } - List expectedElements = new ArrayList<>(elems.length); + List expectedElements = new ArrayList<>(); for (String elem : elems) { byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem); String line = new String(encodedElem, StandardCharsets.UTF_8); @@ -551,49 +516,49 @@ private static Predicate> haveProperHeaderAndFooter( @Test @Category(NeedsRunner.class) public void testWriteStrings() throws Exception { - runTestWrite(LINES.toArray(new String[0])); + runTestWrite(LINES); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStringsNoSharding() throws Exception { - runTestWrite(NO_LINES.toArray(new String[0]), 0); + runTestWrite(NO_LINES, 0); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStrings() throws Exception { - runTestWrite(NO_LINES.toArray(new String[0])); + runTestWrite(NO_LINES); } @Test @Category(NeedsRunner.class) public void testWriteEmptyStringsSkipIfEmpty() throws Exception { - runTestWrite(NO_LINES.toArray(new String[0]), null, null, 0, true); + runTestWrite(NO_LINES, null, null, 0, true); } @Test @Category(NeedsRunner.class) public void testShardedWrite() throws Exception { - runTestWrite(LINES.toArray(new String[0]), 5); + runTestWrite(LINES, 5); } @Test @Category(NeedsRunner.class) public void testWriteWithHeader() throws Exception { - runTestWrite(LINES.toArray(new String[0]), MY_HEADER, null); + runTestWrite(LINES, MY_HEADER, null); } @Test @Category(NeedsRunner.class) public void testWriteWithFooter() throws Exception { - runTestWrite(LINES.toArray(new String[0]), null, MY_FOOTER); + runTestWrite(LINES, null, MY_FOOTER); } @Test @Category(NeedsRunner.class) public void testWriteWithHeaderAndFooter() throws Exception { - runTestWrite(LINES.toArray(new String[0]), MY_HEADER, MY_FOOTER); + runTestWrite(LINES, MY_HEADER, MY_FOOTER); } @Test @@ -605,8 +570,7 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { FileSystems.matchNewResource( Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true); - PCollection input = - p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder)); + PCollection input = p.apply(Create.of(LINES2).withCoder(coder)); final WritableByteChannelFactory writableByteChannelFactory = new DrunkWritableByteChannelFactory(); @@ -625,15 +589,13 @@ public void testWriteWithWritableByteChannelFactory() throws Exception { p.run(); - final List drunkElems = new ArrayList<>(LINES2.toArray(new String[0]).length * 2 + 2); - for (String elem : LINES2.toArray(new String[0])) { - drunkElems.add(elem); - drunkElems.add(elem); + List expectedElems = new ArrayList<>(2 * LINES2.size()); + for (String elem : LINES2) { + expectedElems.add(elem); + expectedElems.add(elem); } assertOutputFiles( - drunkElems.toArray(new String[0]), - null, - null, + expectedElems, 1, baseDir.resolve( outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(), @@ -716,7 +678,7 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception { FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString()); PCollection input = - p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder)) + p.apply(Create.of(LINES2).withCoder(coder)) .setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10)))); @@ -738,11 +700,13 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception { String pattern = baseFilename.toString() + "*"; List matches = FileSystems.match(Collections.singletonList(pattern)); List found = new ArrayList<>(Iterables.getOnlyElement(matches).metadata()); - assertEquals(3, found.size()); + // As sharding is random, the elements may end up in the same shards. + assertFalse(found.isEmpty()); + assertTrue(found.size() <= 3); // Now assert file contents irrespective of exact shard indices. assertOutputFiles( - LINES2.toArray(new String[0]), + LINES2, null, null, 0, // match all files by prefix