Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeFalse;

Expand All @@ -47,7 +48,6 @@
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
Expand Down Expand Up @@ -197,23 +197,17 @@ private void testDynamicDestinations(boolean customType) throws Exception {
p.run();

assertOutputFiles(
Iterables.toArray(Iterables.filter(elements, new StartsWith("a")), String.class),
null,
null,
Iterables.filter(elements, new StartsWith("a")),
0,
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
Iterables.toArray(Iterables.filter(elements, new StartsWith("b")), String.class),
null,
null,
Iterables.filter(elements, new StartsWith("b")),
0,
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
Iterables.toArray(Iterables.filter(elements, new StartsWith("c")), String.class),
null,
null,
Iterables.filter(elements, new StartsWith("c")),
0,
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
Expand Down Expand Up @@ -323,95 +317,71 @@ public void testDynamicDefaultFilenamePolicy() throws Exception {
.withTempDirectory(FileSystems.matchNewResource(baseDir.toString(), true)));
p.run();

String[] aElements =
Iterables.toArray(
StreamSupport.stream(
elements.stream()
.filter(
Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())
::apply)
.collect(Collectors.toList())
.spliterator(),
false)
.map(Functions.toStringFunction()::apply)
.collect(Collectors.toList()),
String.class);
String[] bElements =
Iterables.toArray(
StreamSupport.stream(
elements.stream()
.filter(
Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())
::apply)
.collect(Collectors.toList())
.spliterator(),
false)
.map(Functions.toStringFunction()::apply)
.collect(Collectors.toList()),
String.class);
String[] cElements =
Iterables.toArray(
StreamSupport.stream(
elements.stream()
.filter(
Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())
::apply)
.collect(Collectors.toList())
.spliterator(),
false)
.map(Functions.toStringFunction()::apply)
.collect(Collectors.toList()),
String.class);
Iterable<String> aElements =
elements.stream()
.filter(Predicates.compose(new StartsWith("a"), new ExtractWriteDestination())::apply)
.collect(Collectors.toList())
.stream()
.map(Functions.toStringFunction())
.collect(Collectors.toList());
Iterable<String> bElements =
elements.stream()
.filter(Predicates.compose(new StartsWith("b"), new ExtractWriteDestination())::apply)
.collect(Collectors.toList())
.stream()
.map(Functions.toStringFunction())
.collect(Collectors.toList());
Iterable<String> cElements =
elements.stream()
.filter(Predicates.compose(new StartsWith("c"), new ExtractWriteDestination())::apply)
.collect(Collectors.toList())
.stream()
.map(Functions.toStringFunction())
.collect(Collectors.toList());
assertOutputFiles(
aElements,
null,
null,
0,
baseDir.resolve("file_a.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
bElements,
null,
null,
0,
baseDir.resolve("file_b.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
assertOutputFiles(
cElements,
null,
null,
0,
baseDir.resolve("file_c.txt", ResolveOptions.StandardResolveOptions.RESOLVE_FILE),
DefaultFilenamePolicy.DEFAULT_UNWINDOWED_SHARD_TEMPLATE);
}

private void runTestWrite(String[] elems) throws Exception {
private void runTestWrite(Iterable<String> elems) throws Exception {
runTestWrite(elems, null, null, 1);
}

private void runTestWrite(String[] elems, int numShards) throws Exception {
private void runTestWrite(Iterable<String> elems, int numShards) throws Exception {
runTestWrite(elems, null, null, numShards);
}

private void runTestWrite(String[] elems, String header, String footer) throws Exception {
private void runTestWrite(Iterable<String> elems, String header, String footer) throws Exception {
runTestWrite(elems, header, footer, 1);
}

private void runTestWrite(String[] elems, String header, String footer, int numShards)
private void runTestWrite(Iterable<String> elems, String header, String footer, int numShards)
throws Exception {
runTestWrite(elems, header, footer, numShards, false);
}

private void runTestWrite(
String[] elems, String header, String footer, int numShards, boolean skipIfEmpty)
Iterable<String> elems, String header, String footer, int numShards, boolean skipIfEmpty)
throws Exception {
String outputName = "file.txt";
Path baseDir = Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite");
ResourceId baseFilename =
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());

PCollection<String> input =
p.apply("CreateInput", Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
p.apply("CreateInput", Create.of(elems).withCoder(StringUtf8Coder.of()));

TextIO.TypedWrite<String, Void> write =
TextIO.write().to(baseFilename).withHeader(header).withFooter(footer).withOutputFilenames();
Expand Down Expand Up @@ -441,27 +411,22 @@ private void runTestWrite(
}

private static void assertOutputFiles(
String[] elems,
final String header,
final String footer,
int numShards,
ResourceId outputPrefix,
String shardNameTemplate)
Iterable<String> elems, int numShards, ResourceId outputPrefix, String shardNameTemplate)
throws Exception {
assertOutputFiles(elems, header, footer, numShards, outputPrefix, shardNameTemplate, false);
assertOutputFiles(elems, null, null, numShards, outputPrefix, shardNameTemplate, false);
}

private static void assertOutputFiles(
String[] elems,
final String header,
final String footer,
Iterable<String> elems,
final @Nullable String header,
final @Nullable String footer,
int numShards,
ResourceId outputPrefix,
String shardNameTemplate,
boolean skipIfEmpty)
throws Exception {
List<File> expectedFiles = new ArrayList<>();
if (skipIfEmpty && elems.length == 0) {
if (skipIfEmpty && Iterables.isEmpty(elems)) {
String pattern = outputPrefix.toString() + "*";
MatchResult matches =
Iterables.getOnlyElement(FileSystems.match(Collections.singletonList(pattern)));
Expand Down Expand Up @@ -489,7 +454,7 @@ private static void assertOutputFiles(
actual.add(currentFile);
}

List<String> expectedElements = new ArrayList<>(elems.length);
List<String> expectedElements = new ArrayList<>();
for (String elem : elems) {
byte[] encodedElem = CoderUtils.encodeToByteArray(StringUtf8Coder.of(), elem);
String line = new String(encodedElem, StandardCharsets.UTF_8);
Expand Down Expand Up @@ -551,49 +516,49 @@ private static Predicate<List<String>> haveProperHeaderAndFooter(
@Test
@Category(NeedsRunner.class)
public void testWriteStrings() throws Exception {
runTestWrite(LINES.toArray(new String[0]));
runTestWrite(LINES);
}

@Test
@Category(NeedsRunner.class)
public void testWriteEmptyStringsNoSharding() throws Exception {
runTestWrite(NO_LINES.toArray(new String[0]), 0);
runTestWrite(NO_LINES, 0);
}

@Test
@Category(NeedsRunner.class)
public void testWriteEmptyStrings() throws Exception {
runTestWrite(NO_LINES.toArray(new String[0]));
runTestWrite(NO_LINES);
}

@Test
@Category(NeedsRunner.class)
public void testWriteEmptyStringsSkipIfEmpty() throws Exception {
runTestWrite(NO_LINES.toArray(new String[0]), null, null, 0, true);
runTestWrite(NO_LINES, null, null, 0, true);
}

@Test
@Category(NeedsRunner.class)
public void testShardedWrite() throws Exception {
runTestWrite(LINES.toArray(new String[0]), 5);
runTestWrite(LINES, 5);
}

@Test
@Category(NeedsRunner.class)
public void testWriteWithHeader() throws Exception {
runTestWrite(LINES.toArray(new String[0]), MY_HEADER, null);
runTestWrite(LINES, MY_HEADER, null);
}

@Test
@Category(NeedsRunner.class)
public void testWriteWithFooter() throws Exception {
runTestWrite(LINES.toArray(new String[0]), null, MY_FOOTER);
runTestWrite(LINES, null, MY_FOOTER);
}

@Test
@Category(NeedsRunner.class)
public void testWriteWithHeaderAndFooter() throws Exception {
runTestWrite(LINES.toArray(new String[0]), MY_HEADER, MY_FOOTER);
runTestWrite(LINES, MY_HEADER, MY_FOOTER);
}

@Test
Expand All @@ -605,8 +570,7 @@ public void testWriteWithWritableByteChannelFactory() throws Exception {
FileSystems.matchNewResource(
Files.createTempDirectory(tempFolder.getRoot().toPath(), "testwrite").toString(), true);

PCollection<String> input =
p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder));
PCollection<String> input = p.apply(Create.of(LINES2).withCoder(coder));

final WritableByteChannelFactory writableByteChannelFactory =
new DrunkWritableByteChannelFactory();
Expand All @@ -625,15 +589,13 @@ public void testWriteWithWritableByteChannelFactory() throws Exception {

p.run();

final List<String> drunkElems = new ArrayList<>(LINES2.toArray(new String[0]).length * 2 + 2);
for (String elem : LINES2.toArray(new String[0])) {
drunkElems.add(elem);
drunkElems.add(elem);
List<String> expectedElems = new ArrayList<>(2 * LINES2.size());
for (String elem : LINES2) {
expectedElems.add(elem);
expectedElems.add(elem);
}
assertOutputFiles(
drunkElems.toArray(new String[0]),
null,
null,
expectedElems,
1,
baseDir.resolve(
outputName + writableByteChannelFactory.getSuggestedFilenameSuffix(),
Expand Down Expand Up @@ -716,7 +678,7 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
FileBasedSink.convertToFileResourceIfPossible(baseDir.resolve(outputName).toString());

PCollection<String> input =
p.apply(Create.of(Arrays.asList(LINES2.toArray(new String[0]))).withCoder(coder))
p.apply(Create.of(LINES2).withCoder(coder))
.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED)
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))));

Expand All @@ -738,11 +700,13 @@ public void testWriteUnboundedWithCustomBatchParameters() throws Exception {
String pattern = baseFilename.toString() + "*";
List<MatchResult> matches = FileSystems.match(Collections.singletonList(pattern));
List<Metadata> found = new ArrayList<>(Iterables.getOnlyElement(matches).metadata());
assertEquals(3, found.size());
// As sharding is random, the elements may end up in the same shards.
assertFalse(found.isEmpty());
assertTrue(found.size() <= 3);

// Now assert file contents irrespective of exact shard indices.
assertOutputFiles(
LINES2.toArray(new String[0]),
LINES2,
null,
null,
0, // match all files by prefix
Expand Down
Loading