Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fixed
Browse files Browse the repository at this point in the history
sunxiaojian committed Apr 19, 2024
1 parent 03d468c commit 28b8864
Showing 5 changed files with 40 additions and 153 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
@@ -58,8 +57,6 @@ public class FileStoreSourceReaderTest {

@TempDir protected java.nio.file.Path tempDir;

protected String branch = BranchManager.DEFAULT_MAIN_BRANCH;

@BeforeEach
public void beforeEach() throws Exception {
SchemaManager schemaManager =
@@ -141,7 +138,7 @@ public void testReaderOnSplitFinished() throws Exception {
protected FileStoreSourceReader createReader(TestingReaderContext context) {
return new FileStoreSourceReader(
context,
new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(branch),
new TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
new FileStoreSourceReaderMetrics(new DummyMetricGroup()),
IOManager.create(tempDir.toString()),
null);

This file was deleted.

Original file line number Diff line number Diff line change
@@ -28,13 +28,13 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.RecordWriter;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
import org.apache.flink.table.data.RowData;
@@ -67,12 +67,10 @@ public class FileStoreSourceSplitReaderTest {

@TempDir java.nio.file.Path tempDir;

protected String branch = BranchManager.DEFAULT_MAIN_BRANCH;

@BeforeEach
public void beforeEach() throws Exception {
SchemaManager schemaManager =
new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()), branch);
new SchemaManager(LocalFileIO.create(), new Path(tempDir.toUri()));
schemaManager.createTable(
new Schema(
toDataType(
@@ -108,10 +106,10 @@ private FileStoreSourceSplitReader createReader(TableRead tableRead, @Nullable L

private void innerTestOnce(int skip) throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input, branch);
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files, skip));

@@ -134,10 +132,10 @@ private void innerTestOnce(int skip) throws Exception {
@Test
public void testPrimaryKeyWithDelete() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input = kvs();
RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0, branch);
RecordWriter<KeyValue> writer = rw.createMergeTreeWriter(row(1), 0);
for (Tuple2<Long, Long> tuple2 : input) {
writer.write(
new KeyValue()
@@ -156,7 +154,7 @@ public void testPrimaryKeyWithDelete() throws Exception {
writer.close();

assignSplit(reader, newSourceSplit("id1", row(1), 0, files, true));
RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();

List<Tuple2<RowKind, Long>> expected =
input.stream()
@@ -176,18 +174,18 @@ public void testPrimaryKeyWithDelete() throws Exception {
@Test
public void testMultipleBatchInSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1, branch);
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);

List<Tuple2<Long, Long>> input2 = kvs(6);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2, branch);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files));

RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();
assertRecords(
records,
null,
@@ -212,14 +210,14 @@ public void testMultipleBatchInSplit() throws Exception {
@Test
public void testRestore() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input, branch);
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 3));

RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();
assertRecords(
records,
null,
@@ -238,18 +236,18 @@ public void testRestore() throws Exception {
@Test
public void testRestoreMultipleBatchInSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1, branch);
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);

List<Tuple2<Long, Long>> input2 = kvs(6);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2, branch);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 7));

RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();
assertRecords(
records,
null,
@@ -269,17 +267,17 @@ public void testRestoreMultipleBatchInSplit() throws Exception {
@Test
public void testMultipleSplits() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files1 = rw.writeFiles(row(1), 0, input1, branch);
List<DataFileMeta> files1 = rw.writeFiles(row(1), 0, input1);
assignSplit(reader, newSourceSplit("id1", row(1), 0, files1));

List<Tuple2<Long, Long>> input2 = kvs();
List<DataFileMeta> files2 = rw.writeFiles(row(2), 1, input2, branch);
List<DataFileMeta> files2 = rw.writeFiles(row(2), 1, input2);
assignSplit(reader, newSourceSplit("id2", row(2), 1, files2));

RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();
assertRecords(
records,
null,
@@ -307,22 +305,22 @@ public void testMultipleSplits() throws Exception {
@Test
public void testNoSplit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);
assertThatThrownBy(reader::fetch).hasMessageContaining("no split remaining");
reader.close();
}

@Test
public void testLimit() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), 2L);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), 2L);

List<Tuple2<Long, Long>> input = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input, branch);
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input);

assignSplit(reader, newSourceSplit("id1", row(1), 0, files, 0));

RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();

List<Tuple2<RowKind, Long>> expected =
input.stream()
@@ -348,19 +346,19 @@ public void testLimit() throws Exception {
@Test
public void testPauseOrResumeSplits() throws Exception {
TestChangelogDataReadWrite rw = new TestChangelogDataReadWrite(tempDir.toString());
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(branch), null);
FileStoreSourceSplitReader reader = createReader(rw.createReadWithKey(), null);

List<Tuple2<Long, Long>> input1 = kvs();
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1, branch);
List<DataFileMeta> files = rw.writeFiles(row(1), 0, input1);

List<Tuple2<Long, Long>> input2 = kvs(6);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2, branch);
List<DataFileMeta> files2 = rw.writeFiles(row(1), 0, input2);
files.addAll(files2);

FileStoreSourceSplit split1 = newSourceSplit("id1", row(1), 0, files);
assignSplit(reader, split1);

RecordsWithSplitIds<RecordIterator<RowData>> records = reader.fetch();
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records = reader.fetch();
assertRecords(
records,
null,
@@ -375,7 +373,7 @@ public void testPauseOrResumeSplits() throws Exception {

// assign next split
List<Tuple2<Long, Long>> input3 = kvs(12);
List<DataFileMeta> files3 = rw.writeFiles(row(1), 0, input3, branch);
List<DataFileMeta> files3 = rw.writeFiles(row(1), 0, input3);
FileStoreSourceSplit split2 = newSourceSplit("id2", row(1), 0, files3);
assignSplit(reader, split2);

@@ -410,7 +408,7 @@ public void testPauseOrResumeSplits() throws Exception {
}

private void assertRecords(
RecordsWithSplitIds<RecordIterator<RowData>> records,
RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>> records,
String finishedSplit,
String nextSplit,
long startRecordSkipCount,
Loading

0 comments on commit 28b8864

Please sign in to comment.