diff --git a/flink/v1.15/flink/pom.xml b/flink/v1.15/flink/pom.xml index 21f6871059..c040027dd9 100644 --- a/flink/v1.15/flink/pom.xml +++ b/flink/v1.15/flink/pom.xml @@ -334,6 +334,21 @@ test + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + org.slf4j + slf4j-api + + + + org.apache.curator diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/FlinkSchemaUtil.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/FlinkSchemaUtil.java index 9b3227de65..8205033344 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/FlinkSchemaUtil.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/FlinkSchemaUtil.java @@ -19,11 +19,15 @@ package com.netease.arctic.flink; import org.apache.commons.collections.CollectionUtils; +import org.apache.flink.table.api.TableColumn; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.WatermarkSpec; +import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.types.utils.TypeConversions; import java.util.List; +import java.util.function.Function; /** * An util that converts flink table schema. @@ -47,4 +51,75 @@ public static TableSchema toSchema(RowType rowType, List primaryKeys) { return builder.build(); } + /** + * Add watermark info to help {@link com.netease.arctic.flink.table.FlinkSource} + * and {@link com.netease.arctic.flink.table.ArcticDynamicSource} distinguish the watermark field. + * For now, it only be used in the case of Arctic as dim-table. + */ + public static TableSchema getPhysicalSchema(TableSchema tableSchema, boolean addWatermark) { + if (!addWatermark) { + return tableSchema; + } + TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical); + tableSchema.getWatermarkSpecs().forEach(builder::watermark); + return builder.build(); + } + + /** + * filter watermark due to watermark is a virtual field for now, not in arctic physical table. + */ + public static TableSchema filterWatermark(TableSchema tableSchema) { + List watermarkSpecs = tableSchema.getWatermarkSpecs(); + if (watermarkSpecs.isEmpty()) { + return tableSchema; + } + + Function filter = (tableColumn) -> { + boolean isWatermark = false; + for (WatermarkSpec spec : watermarkSpecs) { + if (spec.getRowtimeAttribute().equals(tableColumn.getName())) { + isWatermark = true; + break; + } + } + return !isWatermark; + }; + return filter(tableSchema, filter).build(); + } + + /** + * If filter result is true, keep the column; otherwise, remove the column. + */ + public static TableSchema.Builder filter(TableSchema tableSchema, Function filter) { + TableSchema.Builder builder = TableSchema.builder(); + + tableSchema + .getTableColumns() + .forEach( + tableColumn -> { + if (!filter.apply(tableColumn)) { + return; + } + builder.field(tableColumn.getName(), tableColumn.getType()); + }); + tableSchema + .getPrimaryKey() + .ifPresent( + uniqueConstraint -> + builder.primaryKey( + uniqueConstraint.getName(), + uniqueConstraint.getColumns().toArray(new String[0]))); + return builder; + } + + public static RowType toRowType(TableSchema tableSchema) { + LogicalType[] fields = new LogicalType[tableSchema.getFieldCount()]; + + for (int i = 0; i < fields.length; i++) { + TableColumn tableColumn = tableSchema.getTableColumn(i).get(); + fields[i] = tableColumn.getType().getLogicalType(); + } + return RowType.of(fields); + } + } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/InternalCatalogBuilder.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/InternalCatalogBuilder.java index 89343d6074..2beea1fa51 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/InternalCatalogBuilder.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/InternalCatalogBuilder.java @@ -39,6 +39,14 @@ private com.netease.arctic.catalog.ArcticCatalog createBaseArcticCatalog() { return CatalogLoader.load(metastoreUrl, properties); } + public String getMetastoreUrl() { + return metastoreUrl; + } + + public Map getProperties() { + return properties; + } + public InternalCatalogBuilder() { } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java index a67e2bf9d4..32b6f275e5 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/catalog/ArcticCatalog.java @@ -18,9 +18,12 @@ package com.netease.arctic.flink.catalog; +import com.google.common.base.Objects; import com.netease.arctic.NoSuchDatabaseException; import com.netease.arctic.flink.InternalCatalogBuilder; +import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; import com.netease.arctic.flink.table.DynamicTableFactory; +import com.netease.arctic.flink.table.descriptors.ArcticValidator; import com.netease.arctic.flink.util.ArcticUtils; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.PrimaryKeySpec; @@ -28,6 +31,7 @@ import com.netease.arctic.table.TableIdentifier; import com.netease.arctic.table.TableProperties; import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.api.WatermarkSpec; import org.apache.flink.table.catalog.AbstractCatalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; @@ -68,6 +72,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; /** * Catalogs for arctic data lake. @@ -75,6 +80,12 @@ public class ArcticCatalog extends AbstractCatalog { public static final String DEFAULT_DB = "default"; + /** + * To distinguish 'CREATE TABLE LIKE' by checking stack + * {@link org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable} + */ + public static final String SQL_LIKE_METHOD = "lookupLikeSourceTable"; + private final InternalCatalogBuilder catalogBuilder; private com.netease.arctic.catalog.ArcticCatalog internalCatalog; @@ -169,6 +180,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep RowType rowType = FlinkSchemaUtil.convert(arcticSchema); Map arcticProperties = Maps.newHashMap(table.properties()); fillTableProperties(arcticProperties); + fillTableMetaPropertiesIfLookupLike(arcticProperties, tableIdentifier); List partitionKeys = toPartitionKeys(table.spec(), table.schema()); return new CatalogTableImpl( @@ -178,6 +190,35 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep null); } + /** + * For now, 'CREATE TABLE LIKE' would be treated as the case which users want to add watermark in temporal join, + * as an alternative of lookup join, and use Arctic table as build table, i.e. right table. + * So the properties those required in temporal join will be put automatically. + *

+ * If you don't want the properties, 'EXCLUDING ALL' is what you need. + * More details @see LIKE + */ + private void fillTableMetaPropertiesIfLookupLike(Map properties, TableIdentifier tableIdentifier) { + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + boolean isLookupLike = false; + for (StackTraceElement stackTraceElement : stackTraceElements) { + if (Objects.equal(SQL_LIKE_METHOD, stackTraceElement.getMethodName())) { + isLookupLike = true; + break; + } + } + + if (!isLookupLike) { + return; + } + + properties.put(CONNECTOR.key(), DynamicTableFactory.IDENTIFIER); + properties.put(ArcticValidator.ARCTIC_CATALOG.key(), tableIdentifier.getCatalog()); + properties.put(ArcticValidator.ARCTIC_TABLE.key(), tableIdentifier.getTableName()); + properties.put(ArcticValidator.ARCTIC_DATABASE.key(), tableIdentifier.getDatabase()); + properties.put(ArcticCatalogFactoryOptions.METASTORE_URL.key(), catalogBuilder.getMetastoreUrl()); + } + private static List toPartitionKeys(PartitionSpec spec, Schema icebergSchema) { List partitionKeys = Lists.newArrayList(); for (PartitionField field : spec.fields()) { @@ -253,7 +294,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig validateFlinkTable(table); TableSchema tableSchema = table.getSchema(); - Schema icebergSchema = FlinkSchemaUtil.convert(tableSchema); + TableSchema.Builder b = TableSchema.builder(); + + tableSchema.getTableColumns().forEach(c -> { + List ws = tableSchema.getWatermarkSpecs(); + for (WatermarkSpec w : ws) { + if (w.getRowtimeAttribute().equals(c.getName())) { + return; + } + } + b.field(c.getName(), c.getType()); + }); + TableSchema tableSchemaWithoutWatermark = b.build(); + + Schema icebergSchema = FlinkSchemaUtil.convert(tableSchemaWithoutWatermark); TableBuilder tableBuilder = internalCatalog.newTableBuilder( getTableIdentifier(tablePath), icebergSchema); @@ -295,9 +349,6 @@ private static void validateFlinkTable(CatalogBaseTable table) { } }); - if (!schema.getWatermarkSpecs().isEmpty()) { - throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet."); - } } @Override diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/metric/MetricConstant.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/metric/MetricConstant.java new file mode 100644 index 0000000000..bda7cb3379 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/metric/MetricConstant.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.metric; + +/** + * metric constant + */ +public class MetricConstant { + + /** + * The start time of arctic table's initialization when it used as build table in temporal join. + */ + public static final String TEMPORAL_TABLE_INITIALIZATION_START_TIMESTAMP = + "temporalTableInitializationStartTimestamp"; + /** + * The end time of arctic table's initialization when it used as build table in temporal join. + */ + public static final String TEMPORAL_TABLE_INITIALIZATION_END_TIMESTAMP = "temporalTableInitializationEndTimestamp"; + +} diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/MetricsGenerator.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/metric/MetricsGenerator.java similarity index 99% rename from flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/MetricsGenerator.java rename to flink/v1.15/flink/src/main/java/com/netease/arctic/flink/metric/MetricsGenerator.java index f120d8c761..f7a16ba6a0 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/MetricsGenerator.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/metric/MetricsGenerator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package com.netease.arctic.flink.write; +package com.netease.arctic.flink.metric; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.table.data.RowData; diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/AdaptHiveFlinkParquetReaders.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/AdaptHiveFlinkParquetReaders.java index 7e30e8e167..09b4e6782e 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/AdaptHiveFlinkParquetReaders.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/AdaptHiveFlinkParquetReaders.java @@ -385,8 +385,8 @@ private static class MicrosToTimestampTzReader extends ParquetValueReaders.Unbox public TimestampData read(TimestampData ignored) { long value = readLong(); return TimestampData.fromLocalDateTime(Instant.ofEpochSecond( - Math.floorDiv(value, 1000_000), - Math.floorMod(value, 1000_000) * 1000) + Math.floorDiv(value, 1000_000L), + Math.floorMod(value, 1000_000L) * 1000) .atOffset(ZoneOffset.UTC) .toLocalDateTime()); } @@ -406,8 +406,8 @@ private static class MicrosToTimestampReader extends ParquetValueReaders.Unboxed public TimestampData read(TimestampData ignored) { long value = readLong(); return TimestampData.fromInstant(Instant.ofEpochSecond( - Math.floorDiv(value, 1000_000), - Math.floorMod(value, 1000_000) * 1000)); + Math.floorDiv(value, 1000_000L), + Math.floorMod(value, 1000_000L) * 1000)); } @Override @@ -478,7 +478,7 @@ private static class LossyMicrosToMillisTimeReader extends ParquetValueReaders.P @Override public Integer read(Integer reuse) { // Discard microseconds since Flink uses millisecond unit for TIME type. - return (int) Math.floorDiv(column.nextLong(), 1000); + return (int) Math.floorDiv(column.nextLong(), 1000L); } } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/ArcticSource.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/ArcticSource.java index 31baf4c699..cce1658d69 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/ArcticSource.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/ArcticSource.java @@ -44,6 +44,10 @@ /** * Arctic Source based of Flip27. + *

+ * If ArcticSource is used as a build table in lookup join, it will be implemented by temporal join. + * Two source should use processing time as watermark. + * ArcticSource will generate watermark after first splits planned by ArcticSourceEnumerator having been finished. */ public class ArcticSource implements Source, ResultTypeQueryable { private static final long serialVersionUID = 1L; @@ -54,14 +58,20 @@ public class ArcticSource implements Source typeInformation; private final ArcticTableLoader loader; private final String tableName; + /** + * generate arctic watermark. This is only for lookup join arctic table, and arctic table is used as build table, + * i.e. right table. + */ + private final boolean dimTable; public ArcticSource(ArcticTableLoader loader, ArcticScanContext scanContext, ReaderFunction readerFunction, - TypeInformation typeInformation, String tableName) { + TypeInformation typeInformation, String tableName, boolean dimTable) { this.loader = loader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.typeInformation = typeInformation; this.tableName = tableName; + this.dimTable = dimTable; } @Override @@ -71,7 +81,7 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader(SourceReaderContext readerContext) throws Exception { - return new ArcticSourceReader<>(readerFunction, readerContext.getConfiguration(), readerContext); + return new ArcticSourceReader<>(readerFunction, readerContext.getConfiguration(), readerContext, dimTable); } @Override @@ -93,7 +103,7 @@ private SplitEnumerator createEnumerator( } if (scanContext.isStreaming()) { - return new ArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, enumState); + return new ArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, enumState, dimTable); } else { return new StaticArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, null); } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumState.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumState.java index 20327c5d15..edf102cfb6 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumState.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumState.java @@ -19,6 +19,7 @@ package com.netease.arctic.flink.read.hybrid.enumerator; import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState; +import com.netease.arctic.flink.read.hybrid.split.TemporalJoinSplits; import javax.annotation.Nullable; import java.util.Collection; @@ -33,14 +34,18 @@ public class ArcticSourceEnumState { private final Collection pendingSplits; @Nullable private final long[] shuffleSplitRelation; + @Nullable + private final TemporalJoinSplits temporalJoinSplits; public ArcticSourceEnumState( Collection pendingSplits, @Nullable ArcticEnumeratorOffset lastEnumeratedOffset, - @Nullable long[] shuffleSplitRelation) { + @Nullable long[] shuffleSplitRelation, + @Nullable TemporalJoinSplits temporalJoinSplits) { this.pendingSplits = pendingSplits; this.lastEnumeratedOffset = lastEnumeratedOffset; this.shuffleSplitRelation = shuffleSplitRelation; + this.temporalJoinSplits = temporalJoinSplits; } @Nullable @@ -56,4 +61,9 @@ public Collection pendingSplits() { public long[] shuffleSplitRelation() { return shuffleSplitRelation; } + + @Nullable + public TemporalJoinSplits temporalJoinSplits() { + return temporalJoinSplits; + } } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializer.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializer.java index 6cddad338d..ecc3ddf811 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializer.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializer.java @@ -21,10 +21,14 @@ import com.netease.arctic.flink.read.hybrid.split.ArcticSplit; import com.netease.arctic.flink.read.hybrid.split.ArcticSplitSerializer; import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState; +import com.netease.arctic.flink.read.hybrid.split.TemporalJoinSplits; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.core.memory.DataInputDeserializer; import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.util.InstantiationUtil; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collection; @@ -34,6 +38,8 @@ * Serializer that serializes and deserializes arctic enumerator {@link ArcticSourceEnumState}. */ public class ArcticSourceEnumStateSerializer implements SimpleVersionedSerializer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ArcticSourceEnumStateSerializer.class); private static final int VERSION = 1; private final ArcticSplitSerializer splitSerializer = ArcticSplitSerializer.INSTANCE; private final ArcticEnumeratorOffsetSerializer offsetSerializer = ArcticEnumeratorOffsetSerializer.INSTANCE; @@ -79,6 +85,13 @@ private byte[] serializeV1(ArcticSourceEnumState enumState) throws IOException { } } + out.writeBoolean(enumState.temporalJoinSplits() != null); + if (enumState.temporalJoinSplits() != null) { + byte[] temporalJoinSplits = InstantiationUtil.serializeObject(enumState.temporalJoinSplits()); + out.writeInt(temporalJoinSplits.length); + out.write(temporalJoinSplits); + } + byte[] result = out.getCopyOfBuffer(); out.clear(); return result; @@ -123,7 +136,18 @@ private ArcticSourceEnumState deserializeV1(byte[] serialized) throws IOExceptio shuffleSplitRelation[i] = in.readLong(); } } - return new ArcticSourceEnumState(pendingSplits, enumeratorOffset, shuffleSplitRelation); + TemporalJoinSplits temporalJoinSplits = null; + if (in.readBoolean()) { + byte[] bytes = new byte[in.readInt()]; + in.read(bytes); + try { + temporalJoinSplits = InstantiationUtil.deserializeObject(bytes, TemporalJoinSplits.class.getClassLoader()); + } catch (ClassNotFoundException e) { + throw new RuntimeException("deserialize FirstSplit error", e); + } + } + + return new ArcticSourceEnumState(pendingSplits, enumeratorOffset, shuffleSplitRelation, temporalJoinSplits); } } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumerator.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumerator.java index f6e21327ed..bcfc709d9a 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumerator.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumerator.java @@ -22,9 +22,12 @@ import com.netease.arctic.flink.read.hybrid.assigner.SplitAssigner; import com.netease.arctic.flink.read.hybrid.reader.HybridSplitReader; import com.netease.arctic.flink.read.hybrid.split.ArcticSplit; +import com.netease.arctic.flink.read.hybrid.split.SplitRequestEvent; +import com.netease.arctic.flink.read.hybrid.split.TemporalJoinSplits; import com.netease.arctic.flink.read.source.ArcticScanContext; import com.netease.arctic.flink.table.ArcticTableLoader; import com.netease.arctic.table.KeyedTable; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.util.FlinkRuntimeException; import org.apache.iceberg.Snapshot; @@ -33,8 +36,10 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.IntStream; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.FILE_SCAN_STARTUP_MODE; import static com.netease.arctic.flink.table.descriptors.ArcticValidator.FILE_SCAN_STARTUP_MODE_LATEST; @@ -46,12 +51,28 @@ public class ArcticSourceEnumerator extends AbstractArcticEnumerator { private static final Logger LOG = LoggerFactory.getLogger(ArcticSourceEnumerator.class); private transient KeyedTable keyedTable; + /** + * To record the snapshotId at the first planSplits. + *

+ * If its value is null, it means that we don't need to generate watermark. Won't check. + */ + private transient volatile TemporalJoinSplits temporalJoinSplits = null; private final ArcticTableLoader loader; private final SplitEnumeratorContext context; private final ContinuousSplitPlanner continuousSplitPlanner; private final SplitAssigner splitAssigner; private final ArcticScanContext scanContext; private final long snapshotDiscoveryIntervalMs; + /** + * If true, using arctic table as build table. + * {@link ArcticSourceEnumerator} will notify {@link com.netease.arctic.flink.read.hybrid.reader.ArcticSourceReader} + * after ArcticReaders have finished reading all {@link TemporalJoinSplits}. + * Then {@link com.netease.arctic.flink.read.hybrid.reader.ArcticSourceReader} will emit a Watermark values + * Long.MAX_VALUE. Advancing TemporalJoinOperator's watermark can trigger the join operation and push the results to + * downstream. The watermark of Long.MAX_VALUE avoids affecting the watermark defined by user arbitrary probe side + */ + private final boolean dimTable; + private volatile boolean sourceEventBeforeFirstPlan = false; /** * snapshotId for the last enumerated snapshot. next incremental enumeration * should be based off this as the starting position. @@ -65,7 +86,8 @@ public ArcticSourceEnumerator( SplitAssigner splitAssigner, ArcticTableLoader loader, ArcticScanContext scanContext, - @Nullable ArcticSourceEnumState enumState) { + @Nullable ArcticSourceEnumState enumState, + boolean dimTable) { super(enumContext, splitAssigner); this.loader = loader; this.context = enumContext; @@ -76,7 +98,9 @@ public ArcticSourceEnumerator( this.enumeratorPosition = new AtomicReference<>(); if (enumState != null) { this.enumeratorPosition.set(enumState.lastEnumeratedOffset()); + this.temporalJoinSplits = enumState.temporalJoinSplits(); } + this.dimTable = dimTable; } @Override @@ -119,6 +143,18 @@ public void start() { } private ContinuousEnumerationResult planSplits() { + ContinuousEnumerationResult result = doPlanSplits(); + if (dimTable && temporalJoinSplits == null) { + temporalJoinSplits = new TemporalJoinSplits(result.splits(), context.metricGroup()); + // the first SourceEvent may be faster than plan splits + if (result.isEmpty() && sourceEventBeforeFirstPlan) { + notifyReaders(); + } + } + return result; + } + + private ContinuousEnumerationResult doPlanSplits() { if (lock.get()) { LOG.info("prefix plan splits thread haven't finished."); return ContinuousEnumerationResult.EMPTY; @@ -143,13 +179,57 @@ private void handleResultOfSplits(ContinuousEnumerationResult enumerationResult, lock.set(false); } + @Override + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + super.handleSourceEvent(subtaskId, sourceEvent); + if (sourceEvent instanceof SplitRequestEvent) { + SplitRequestEvent splitRequestEvent = (SplitRequestEvent) sourceEvent; + Collection finishedSplitIds = splitRequestEvent.finishedSplitIds(); + if (dimTable) { + checkAndNotifyReader(finishedSplitIds); + } + } else { + throw new IllegalArgumentException(String.format("Received unknown event from subtask %d: %s", + subtaskId, sourceEvent.getClass().getCanonicalName())); + } + } + + /** + * Check whether all first splits have been finished or not. + * After all finished, enumerator will send a {@link InitializationFinishedEvent} to notify all + * {@link com.netease.arctic.flink.read.hybrid.reader.ArcticSourceReader}. + * + * @param finishedSplitIds + */ + public void checkAndNotifyReader(Collection finishedSplitIds) { + if (temporalJoinSplits == null) { + sourceEventBeforeFirstPlan = true; + return; + } + + if (temporalJoinSplits.hasNotifiedReader() || + !temporalJoinSplits.removeAndReturnIfAllFinished(finishedSplitIds)) { + return; + } + notifyReaders(); + } + + private void notifyReaders() { + LOG.info("all splits finished, send events to readers"); + IntStream.range(0, context.currentParallelism()) + .forEach(i -> context.sendEventToSourceReader(i, InitializationFinishedEvent.INSTANCE)); + temporalJoinSplits.clear(); + temporalJoinSplits.notifyReader(); + } + @Override public ArcticSourceEnumState snapshotState(long checkpointId) throws Exception { long[] shuffleSplitRelation = null; if (splitAssigner instanceof ShuffleSplitAssigner) { shuffleSplitRelation = ((ShuffleSplitAssigner) splitAssigner).serializePartitionIndex(); } - return new ArcticSourceEnumState(splitAssigner.state(), enumeratorPosition.get(), shuffleSplitRelation); + return new ArcticSourceEnumState(splitAssigner.state(), enumeratorPosition.get(), shuffleSplitRelation, + temporalJoinSplits); } @Override diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/InitializationFinishedEvent.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/InitializationFinishedEvent.java new file mode 100644 index 0000000000..48655efa62 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/InitializationFinishedEvent.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.read.hybrid.enumerator; + +import com.netease.arctic.flink.read.hybrid.reader.ArcticSourceReader; +import org.apache.flink.api.connector.source.SourceEvent; + +/** + * {@link ArcticSourceReader} won't set timestamp to RowData until receiving this Event. + */ +public class InitializationFinishedEvent implements SourceEvent { + private static final long serialVersionUID = 1L; + + public static final InitializationFinishedEvent INSTANCE = new InitializationFinishedEvent(); +} diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/StaticArcticSourceEnumerator.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/StaticArcticSourceEnumerator.java index 951f682120..649729114f 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/StaticArcticSourceEnumerator.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/enumerator/StaticArcticSourceEnumerator.java @@ -84,6 +84,6 @@ protected boolean shouldWaitForMoreSplits() { @Override public ArcticSourceEnumState snapshotState(long checkpointId) throws Exception { - return new ArcticSourceEnumState(assigner.state(), null, null); + return new ArcticSourceEnumState(assigner.state(), null, null, null); } } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticRecordEmitter.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticRecordEmitter.java index cbd995db8e..f1293ae6b0 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticRecordEmitter.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticRecordEmitter.java @@ -21,19 +21,49 @@ import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState; import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.TimestampData; +import org.apache.flink.table.data.utils.JoinedRowData; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Emitter that emit {@link T} to the next flink operator and update the record offset of {@link T} into split state. */ public class ArcticRecordEmitter implements RecordEmitter, T, ArcticSplitState> { + public static final Logger LOGGER = LoggerFactory.getLogger(ArcticRecordEmitter.class); + + /** + * It signifies whether the Long.MIN_VALUE need to be set into RowData. + */ + public boolean populateRowTime; + + public ArcticRecordEmitter(boolean populateRowTime) { + this.populateRowTime = populateRowTime; + } + @Override public void emitRecord( ArcticRecordWithOffset element, SourceOutput sourceOutput, ArcticSplitState split) throws Exception { - sourceOutput.collect(element.record()); + T record = element.record(); + if (!populateRowTime) { + sourceOutput.collect(record); + } else { + Preconditions.checkArgument(record instanceof RowData, + "Custom watermark strategy doesn't support %s, except RowData for now.", + record.getClass()); + RowData rowData = new JoinedRowData((RowData) record, + GenericRowData.of(TimestampData.fromEpochMillis(Long.MIN_VALUE))); + rowData.setRowKind(((RowData) record).getRowKind()); + sourceOutput.collect((T) rowData); + } split.updateOffset(new Object[]{element.insertFileOffset(), element.insertRecordOffset(), element.deleteFileOffset(), element.deleteRecordOffset()}); } + } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticSourceReader.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticSourceReader.java index cdb486c5e1..e86d464d65 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticSourceReader.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/reader/ArcticSourceReader.java @@ -19,13 +19,20 @@ package com.netease.arctic.flink.read.hybrid.reader; import com.netease.arctic.flink.read.ArcticSource; +import com.netease.arctic.flink.read.hybrid.enumerator.InitializationFinishedEvent; import com.netease.arctic.flink.read.hybrid.split.ArcticSplit; import com.netease.arctic.flink.read.hybrid.split.ArcticSplitState; import com.netease.arctic.flink.read.hybrid.split.SplitRequestEvent; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase; +import org.apache.flink.core.io.InputStatus; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Collection; import java.util.Collections; @@ -36,16 +43,27 @@ */ public class ArcticSourceReader extends SingleThreadMultiplexSourceReaderBase, T, ArcticSplit, ArcticSplitState> { + + public static final Logger LOGGER = LoggerFactory.getLogger(ArcticSourceReader.class); + + public ReaderOutput output; + /** + * SourceEvents may be received before this#pollNext. + */ + private volatile boolean maxWatermarkToBeEmitted = false; + + public ArcticSourceReader( ReaderFunction readerFunction, Configuration config, - SourceReaderContext context) { + SourceReaderContext context, + boolean populateRowTime) { super( () -> new HybridSplitReader<>( readerFunction, context ), - new ArcticRecordEmitter<>(), + new ArcticRecordEmitter(populateRowTime), config, context); } @@ -77,4 +95,31 @@ protected ArcticSplit toSplitType(String splitId, ArcticSplitState splitState) { private void requestSplit(Collection finishedSplitIds) { context.sendSourceEventToCoordinator(new SplitRequestEvent(finishedSplitIds)); } + + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + if (!(sourceEvent instanceof InitializationFinishedEvent)) { + return; + } + LOGGER.info("receive InitializationFinishedEvent"); + maxWatermarkToBeEmitted = true; + emitWatermarkIfNeeded(); + } + + private void emitWatermarkIfNeeded() { + if (this.output == null || !maxWatermarkToBeEmitted) { + return; + } + LOGGER.info("emit watermark"); + output.emitWatermark(new Watermark(Long.MAX_VALUE)); + maxWatermarkToBeEmitted = false; + } + + @Override + public InputStatus pollNext(ReaderOutput output) throws Exception { + this.output = output; + emitWatermarkIfNeeded(); + return super.pollNext(output); + } + } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/split/TemporalJoinSplits.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/split/TemporalJoinSplits.java new file mode 100644 index 0000000000..0ed6beca36 --- /dev/null +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/read/hybrid/split/TemporalJoinSplits.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.read.hybrid.split; + +import org.apache.flink.api.connector.source.SourceSplit; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.util.CollectionUtil; +import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Collection; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +import static com.netease.arctic.flink.metric.MetricConstant.TEMPORAL_TABLE_INITIALIZATION_END_TIMESTAMP; +import static com.netease.arctic.flink.metric.MetricConstant.TEMPORAL_TABLE_INITIALIZATION_START_TIMESTAMP; + +/** + * If using arctic table as build-table, TemporalJoinSplits can record the first splits planned by Enumerator. + */ +public class TemporalJoinSplits implements Serializable { + + public static final long serialVersionUID = 1L; + public static final Logger LOGGER = LoggerFactory.getLogger(TemporalJoinSplits.class); + + private final MetricGroup metricGroup; + private final long startTimeMs = System.currentTimeMillis(); + private Map splits; + private long unfinishedCount; + /** + * transient because it is necessary to notify reader again after failover. + */ + private transient boolean hasNotifiedReader = false; + + public TemporalJoinSplits(Collection splits, MetricGroup metricGroup) { + Preconditions.checkNotNull(splits, "plan splits should not be null"); + this.splits = splits.stream().map(SourceSplit::splitId).collect(Collectors.toMap((k) -> k, (i) -> false)); + + unfinishedCount = this.splits.size(); + LOGGER.info("init splits at {}, size:{}", LocalDateTime.now(), unfinishedCount); + this.metricGroup = metricGroup; + if (metricGroup != null) { + metricGroup.gauge(TEMPORAL_TABLE_INITIALIZATION_START_TIMESTAMP, () -> startTimeMs); + } + } + + public Map getSplits() { + return splits; + } + + public synchronized void addSplitsBack(Collection splits) { + if (this.splits == null || CollectionUtil.isNullOrEmpty(splits)) { + return; + } + splits.forEach((p) -> { + Boolean finished = this.splits.get(p.splitId()); + if (finished == null || !finished) { + return; + } + unfinishedCount++; + LOGGER.debug("add back split:{} at {}", p, LocalDateTime.now()); + this.splits.put(p.splitId(), false); + }); + } + + /** + * Remove finished splits. + * + * @return True if all splits are finished, otherwise false. + */ + public synchronized boolean removeAndReturnIfAllFinished(Collection finishedSplitIds) { + if (splits == null) { + return true; + } + if (CollectionUtil.isNullOrEmpty(finishedSplitIds)) { + return unfinishedCount == 0; + } + + finishedSplitIds.forEach((p) -> { + Boolean finished = this.splits.get(p); + if (finished == null || finished) { + return; + } + unfinishedCount--; + this.splits.put(p, true); + LOGGER.debug("finish split:{} at {}", p, LocalDateTime.now()); + }); + if (unfinishedCount == 0) { + LOGGER.info("finish all splits at {}", LocalDateTime.now()); + if (metricGroup != null) { + metricGroup.gauge(TEMPORAL_TABLE_INITIALIZATION_END_TIMESTAMP, System::currentTimeMillis); + } + return true; + } + return false; + } + + public synchronized void clear() { + if (unfinishedCount == 0) { + this.splits = null; + } + } + + public boolean hasNotifiedReader() { + return hasNotifiedReader; + } + + public void notifyReader() { + this.hasNotifiedReader = true; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TemporalJoinSplits that = (TemporalJoinSplits) o; + return startTimeMs == that.startTimeMs && + unfinishedCount == that.unfinishedCount && + hasNotifiedReader == that.hasNotifiedReader && + Objects.equals(metricGroup, that.metricGroup) && + Objects.equals(splits, that.splits); + } + + @Override + public int hashCode() { + return Objects.hash(metricGroup, startTimeMs, splits, unfinishedCount, hasNotifiedReader); + } + +} diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/shuffle/ShuffleHelper.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/shuffle/ShuffleHelper.java index 33310e4537..d48310f6d2 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/shuffle/ShuffleHelper.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/shuffle/ShuffleHelper.java @@ -52,6 +52,7 @@ public static ShuffleHelper build(ArcticTable table, Schema schema, RowType rowT if (table.spec() != null && !CollectionUtil.isNullOrEmpty(table.spec().fields())) { partitionKey = new PartitionKey(table.spec(), schema); } + schema = addFieldsNotInArctic(schema, rowType); if (table.isUnkeyedTable()) { return new ShuffleHelper(rowType, schema.asStruct(), partitionKey); @@ -63,6 +64,29 @@ public static ShuffleHelper build(ArcticTable table, Schema schema, RowType rowT primaryKeyData, partitionKey, rowType, schema.asStruct()); } + /** + * If using arctic table as build table, there will be an additional implicit field, valuing process time. + * + * @param schema The physical schema in Arctic table + * @param rowType Flink RowData type. + * @return the Arctic Schema with additional implicit field. + */ + public static Schema addFieldsNotInArctic(Schema schema, RowType rowType) { + Types.NestedField[] nestedFields = new Types.NestedField[rowType.getFieldCount()]; + + for (int i = 0; i < nestedFields.length; i++) { + RowType.RowField field = rowType.getFields().get(i); + Types.NestedField nestedField; + if ((nestedField = schema.findField(field.getName())) != null) { + nestedFields[i] = nestedField; + } else { + // for now, there is only one case that virtual watermark exist in RowData, but not in Arctic table schema. + nestedFields[i] = Types.NestedField.optional(-1, field.getName(), Types.TimestampType.withoutZone()); + } + } + return new Schema(nestedFields); + } + /** * Should open firstly to initial RowDataWrapper, because it cannot be serialized. */ diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticDynamicSource.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticDynamicSource.java index 1060491ce6..f81d4fa0d2 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticDynamicSource.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticDynamicSource.java @@ -53,6 +53,7 @@ import java.util.List; import java.util.Map; +import static com.netease.arctic.flink.FlinkSchemaUtil.filterWatermark; import static com.netease.arctic.table.TableProperties.READ_DISTRIBUTION_HASH_MODE; import static com.netease.arctic.table.TableProperties.READ_DISTRIBUTION_HASH_MODE_DEFAULT; import static com.netease.arctic.table.TableProperties.READ_DISTRIBUTION_MODE; @@ -113,7 +114,8 @@ public ArcticDynamicSource(String tableName, flinkSchemaRowType = FlinkSchemaUtil.convert(readSchema); } else { flinkSchemaRowType = (RowType) projectedSchema.toRowDataType().getLogicalType(); - readSchema = TypeUtil.reassignIds(FlinkSchemaUtil.convert(projectedSchema), arcticTable.schema()); + readSchema = TypeUtil.reassignIds( + FlinkSchemaUtil.convert(filterWatermark(projectedSchema)), arcticTable.schema()); } } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticFileSource.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticFileSource.java index 624d7ab122..0a6e2ed271 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticFileSource.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/ArcticFileSource.java @@ -20,6 +20,7 @@ import com.netease.arctic.table.ArcticTable; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; @@ -49,6 +50,8 @@ import java.util.List; import java.util.Optional; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.DIM_TABLE_ENABLE; + /** * Flink table api that generates arctic base/change file source operators. */ @@ -203,6 +206,10 @@ public String asSummaryString() { @Override public void applyWatermark(WatermarkStrategy watermarkStrategy) { - this.watermarkStrategy = watermarkStrategy; + Configuration conf = Configuration.fromMap(table.properties()); + boolean dimTable = conf.get(DIM_TABLE_ENABLE); + if (!dimTable) { + this.watermarkStrategy = watermarkStrategy; + } } } \ No newline at end of file diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java index 193fb88146..6eab60810b 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/DynamicTableFactory.java @@ -45,6 +45,7 @@ import org.apache.flink.table.types.DataType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.types.RowKind; +import org.apache.flink.util.Preconditions; import org.apache.iceberg.Schema; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.util.PropertyUtil; @@ -58,6 +59,7 @@ import java.util.Properties; import java.util.Set; +import static com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions.METASTORE_URL; import static com.netease.arctic.flink.table.KafkaConnectorOptionsUtil.createKeyFormatProjection; import static com.netease.arctic.flink.table.KafkaConnectorOptionsUtil.createValueFormatProjection; import static com.netease.arctic.flink.table.KafkaConnectorOptionsUtil.getKafkaProperties; @@ -82,10 +84,10 @@ */ public class DynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory { private static final Logger LOG = LoggerFactory.getLogger(DynamicTableFactory.class); - private static final String IDENTIFIER = "arctic"; - private final ArcticCatalog arcticCatalog; - private final InternalCatalogBuilder internalCatalogBuilder; - private final String internalCatalogName; + public static final String IDENTIFIER = "arctic"; + private ArcticCatalog arcticCatalog; + private InternalCatalogBuilder internalCatalogBuilder; + private String internalCatalogName; public DynamicTableFactory( ArcticCatalog arcticCatalog, @@ -96,15 +98,39 @@ public DynamicTableFactory( this.internalCatalogName = internalCatalogName; } + public DynamicTableFactory() { + } + @Override public DynamicTableSource createDynamicTableSource(Context context) { CatalogTable catalogTable = context.getCatalogTable(); ObjectIdentifier identifier = context.getObjectIdentifier(); + ObjectPath objectPath; Map options = catalogTable.getOptions(); - ArcticTableLoader tableLoader = createTableLoader( - new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName()), - internalCatalogName, internalCatalogBuilder, options); + InternalCatalogBuilder actualBuilder = internalCatalogBuilder; + String actualCatalogName = internalCatalogName; + + // It denotes create table by ddl 'connector' option, not through arcticCatalog.db.tableName + if (actualBuilder == null || actualCatalogName == null) { + String metastoreUrl = options.get(METASTORE_URL.key()); + Preconditions.checkNotNull(metastoreUrl, String.format("%s should be set", METASTORE_URL)); + actualBuilder = InternalCatalogBuilder.builder().metastoreUrl(metastoreUrl); + + actualCatalogName = options.get(ArcticValidator.ARCTIC_CATALOG.key()); + Preconditions.checkNotNull(actualCatalogName, String.format("%s should be set", + ArcticValidator.ARCTIC_CATALOG.key())); + } + + if (options.containsKey(ArcticValidator.ARCTIC_DATABASE.key()) && + options.containsKey(ArcticValidator.ARCTIC_TABLE.key())) { + objectPath = new ObjectPath(options.get(ArcticValidator.ARCTIC_DATABASE.key()), + options.get(ArcticValidator.ARCTIC_TABLE.key())); + } else { + objectPath = new ObjectPath(identifier.getDatabaseName(), identifier.getObjectName()); + } + + ArcticTableLoader tableLoader = createTableLoader(objectPath, actualCatalogName, actualBuilder, options); ArcticTable arcticTable = ArcticUtils.loadArcticTable(tableLoader); ScanTableSource arcticDynamicSource; @@ -112,7 +138,10 @@ public DynamicTableSource createDynamicTableSource(Context context) { String readMode = PropertyUtil.propertyAsString(arcticTable.properties(), ArcticValidator.ARCTIC_READ_MODE, ArcticValidator.ARCTIC_READ_MODE_DEFAULT); - TableSchema tableSchema = TableSchemaUtils.getPhysicalSchema(catalogTable.getSchema()); + boolean dimTable = PropertyUtil.propertyAsBoolean(arcticTable.properties(), + ArcticValidator.DIM_TABLE_ENABLE.key(), ArcticValidator.DIM_TABLE_ENABLE.defaultValue()); + TableSchema tableSchema = com.netease.arctic.flink.FlinkSchemaUtil.getPhysicalSchema(catalogTable.getSchema(), + dimTable); switch (readMode) { case ArcticValidator.ARCTIC_READ_FILE: LOG.info("build file reader"); @@ -133,6 +162,7 @@ public ArcticDynamicSink createDynamicTableSink(Context context) { ObjectIdentifier identifier = context.getObjectIdentifier(); Map options = catalogTable.getOptions(); + final String topic = options.get(TableProperties.LOG_STORE_MESSAGE_TOPIC); ArcticTableLoader tableLoader = createTableLoader( @@ -182,6 +212,11 @@ public Set> optionalOptions() { options.add(SCAN_STARTUP_SPECIFIC_OFFSETS); options.add(SCAN_STARTUP_TIMESTAMP_MILLIS); options.add(SINK_PARTITIONER); + options.add(ArcticValidator.ARCTIC_CATALOG); + options.add(ArcticValidator.ARCTIC_TABLE); + options.add(ArcticValidator.ARCTIC_DATABASE); + options.add(ArcticValidator.DIM_TABLE_ENABLE); + options.add(METASTORE_URL); return options; } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/FlinkSource.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/FlinkSource.java index 4bf93673e0..55533a9c30 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/FlinkSource.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/FlinkSource.java @@ -44,15 +44,21 @@ import org.apache.flink.table.connector.ProviderContext; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.source.FlinkInputFormat; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.util.PropertyUtil; import java.util.HashMap; import java.util.List; import java.util.Map; +import static com.netease.arctic.flink.FlinkSchemaUtil.filterWatermark; +import static com.netease.arctic.flink.FlinkSchemaUtil.toRowType; +import static com.netease.arctic.flink.table.descriptors.ArcticValidator.DIM_TABLE_ENABLE; + /** * An util class create arctic source data stream. */ @@ -148,7 +154,7 @@ public DataStream build() { if (projectedSchema == null) { contextBuilder.project(arcticTable.schema()); } else { - contextBuilder.project(FlinkSchemaUtil.convert(arcticTable.schema(), projectedSchema)); + contextBuilder.project(FlinkSchemaUtil.convert(arcticTable.schema(), filterWatermark(projectedSchema))); } contextBuilder.fromProperties(properties); ArcticScanContext scanContext = contextBuilder.build(); @@ -162,9 +168,24 @@ public DataStream build() { scanContext.caseSensitive(), arcticTable.io() ); + + boolean dimTable = PropertyUtil.propertyAsBoolean(properties, DIM_TABLE_ENABLE.key(), + DIM_TABLE_ENABLE.defaultValue()); + RowType rowType; + if (projectedSchema != null) { + // If dim table is enabled, we reserve a RowTime field in Emitter. + if (dimTable) { + rowType = toRowType(projectedSchema); + } else { + rowType = toRowType(filterWatermark(projectedSchema)); + } + } else { + rowType = FlinkSchemaUtil.convert(scanContext.project()); + } + DataStreamSource sourceStream = env.fromSource( new ArcticSource<>(tableLoader, scanContext, rowDataReaderFunction, - InternalTypeInfo.of(FlinkSchemaUtil.convert(scanContext.project())), arcticTable.name()), + InternalTypeInfo.of(rowType), arcticTable.name(), dimTable), watermarkStrategy, ArcticSource.class.getName()); context.generateUid(ARCTIC_FILE_TRANSFORMATION).ifPresent(sourceStream::uid); return sourceStream; diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java index 8591592d58..cb62d11741 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/table/descriptors/ArcticValidator.java @@ -114,6 +114,30 @@ public class ArcticValidator extends ConnectorDescriptorValidator { " means this job will submit empty snapshots to the table, it is suitable with some valid reasons, e.g." + " advance watermark metadata stored in the table(https://github.com/apache/iceberg/pull/5561)."); + public static final ConfigOption ARCTIC_CATALOG = + ConfigOptions.key("arctic.catalog") + .stringType() + .noDefaultValue() + .withDescription("underlying arctic catalog name."); + + public static final ConfigOption ARCTIC_DATABASE = + ConfigOptions.key("arctic.database") + .stringType() + .noDefaultValue() + .withDescription("underlying arctic database name."); + + public static final ConfigOption ARCTIC_TABLE = + ConfigOptions.key("arctic.table") + .stringType() + .noDefaultValue() + .withDescription("underlying arctic table name."); + + public static final ConfigOption DIM_TABLE_ENABLE = + ConfigOptions.key("dim-table.enable") + .booleanType() + .defaultValue(false) + .withDescription("If it is true, Arctic source will generate watermark after stock data being read"); + @Override public void validate(DescriptorProperties properties) { String emitMode = properties.getString(ARCTIC_EMIT_MODE); diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/util/ArcticUtils.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/util/ArcticUtils.java index 3a3e6f5109..3dd33178dd 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/util/ArcticUtils.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/util/ArcticUtils.java @@ -18,21 +18,21 @@ package com.netease.arctic.flink.util; +import com.netease.arctic.flink.metric.MetricsGenerator; import com.netease.arctic.flink.shuffle.LogRecordV1; import com.netease.arctic.flink.shuffle.ShuffleHelper; import com.netease.arctic.flink.table.ArcticTableLoader; import com.netease.arctic.flink.table.descriptors.ArcticValidator; import com.netease.arctic.flink.write.ArcticLogWriter; -import com.netease.arctic.flink.write.MetricsGenerator; import com.netease.arctic.flink.write.hidden.HiddenLogWriter; import com.netease.arctic.flink.write.hidden.kafka.HiddenKafkaFactory; import com.netease.arctic.table.ArcticTable; -import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.PrimaryKeySpec; import com.netease.arctic.table.TableProperties; import com.netease.arctic.utils.IdGenerator; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.RowType; import org.apache.iceberg.Schema; import org.apache.iceberg.flink.FlinkSchemaUtil; @@ -46,6 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TimeZone; import java.util.stream.Collectors; import static com.netease.arctic.table.TableProperties.LOG_STORE_DATA_VERSION; @@ -149,4 +150,9 @@ public static boolean isToBase(boolean overwrite) { return toBase; } + public static TimestampData getCurrentTimestampData(TimeZone timeZone) { + long ts = System.currentTimeMillis(); + return TimestampData.fromEpochMillis(ts + timeZone.getOffset(ts)); + } + } diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/ArcticWriter.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/ArcticWriter.java index 5e8418140c..d7115c3bf6 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/ArcticWriter.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/ArcticWriter.java @@ -18,6 +18,7 @@ package com.netease.arctic.flink.write; +import com.netease.arctic.flink.metric.MetricsGenerator; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.metrics.Meter; import org.apache.flink.metrics.MeterView; diff --git a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkSink.java b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkSink.java index 51f62e24fc..15327750d1 100644 --- a/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkSink.java +++ b/flink/v1.15/flink/src/main/java/com/netease/arctic/flink/write/FlinkSink.java @@ -19,6 +19,7 @@ package com.netease.arctic.flink.write; import com.google.common.collect.Lists; +import com.netease.arctic.flink.metric.MetricsGenerator; import com.netease.arctic.flink.shuffle.RoundRobinShuffleRulePolicy; import com.netease.arctic.flink.shuffle.ShuffleHelper; import com.netease.arctic.flink.shuffle.ShuffleKey; diff --git a/flink/v1.15/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink/v1.15/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 67433e98f0..93b485b78e 100644 --- a/flink/v1.15/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink/v1.15/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -17,3 +17,4 @@ # com.netease.arctic.flink.catalog.factories.ArcticCatalogFactory +com.netease.arctic.flink.table.DynamicTableFactory diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/DynamicTableSourceTestBase.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/DynamicTableSourceTestBase.java new file mode 100644 index 0000000000..bc626a89f8 --- /dev/null +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/DynamicTableSourceTestBase.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink; + +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.factories.TableFactoryHarness; + +import java.io.Serializable; + +public abstract class DynamicTableSourceTestBase extends TableFactoryHarness.ScanSourceBase implements + SupportsWatermarkPushDown, Serializable { + + public static final long serialVersionUID = 1L; + private WatermarkStrategy watermarkStrategy; + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.all(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { + init(); + return SourceFunctionProvider.of( + new SourceFunction() { + @Override + public void run(SourceContext ctx) { + WatermarkGenerator generator = + watermarkStrategy.createWatermarkGenerator(() -> null); + WatermarkOutput output = new TestWatermarkOutput(ctx); + doRun(generator, output, ctx); + } + + @Override + public void cancel() { + } + }, + false); + } + public void init() {}; + + public abstract void doRun(WatermarkGenerator generator, WatermarkOutput output, + SourceFunction.SourceContext ctx); + + @Override + public void applyWatermark(WatermarkStrategy watermarkStrategy) { + this.watermarkStrategy = watermarkStrategy; + } + + public class TestWatermarkOutput implements WatermarkOutput, Serializable { + public static final long serialVersionUID = 1L; + public SourceFunction.SourceContext ctx; + + public TestWatermarkOutput(SourceFunction.SourceContext ctx) { + this.ctx = ctx; + } + + @Override + public void emitWatermark(Watermark watermark) { + ctx.emitWatermark( + new org.apache.flink.streaming.api.watermark.Watermark( + watermark.getTimestamp())); + } + + @Override + public void markIdle() { + } + + @Override + public void markActive() { + } + } +} diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/FlinkTestBase.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/FlinkTestBase.java index ba9459e8fe..0751596aaa 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/FlinkTestBase.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/FlinkTestBase.java @@ -20,6 +20,7 @@ import com.netease.arctic.TableTestBase; import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions; +import com.netease.arctic.flink.write.KeyedRowDataTaskWriterFactory; import com.netease.arctic.io.reader.GenericArcticDataReader; import com.netease.arctic.scan.CombinedScanTask; import com.netease.arctic.scan.KeyedTableScanTask; @@ -49,12 +50,15 @@ import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.AppendFiles; import org.apache.iceberg.Schema; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; import org.apache.iceberg.flink.MiniClusterResource; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -66,6 +70,7 @@ import org.slf4j.LoggerFactory; import java.time.LocalDateTime; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; @@ -271,4 +276,26 @@ protected static RowData createRowData(Integer id, String name, String dateTime, protected static RowData createRowData(Integer id, String name, String dateTime) { return createRowData(id, name, dateTime, RowKind.INSERT); } + + protected static void commit(KeyedTable keyedTable, WriteResult result, boolean base) { + if (base) { + AppendFiles baseAppend = keyedTable.baseTable().newAppend(); + Arrays.stream(result.dataFiles()).forEach(baseAppend::appendFile); + baseAppend.commit(); + } else { + AppendFiles changeAppend = keyedTable.changeTable().newAppend(); + Arrays.stream(result.dataFiles()).forEach(changeAppend::appendFile); + changeAppend.commit(); + } + } + + protected static TaskWriter createKeyedTaskWriter(KeyedTable keyedTable, RowType rowType, long transactionId, + boolean base) { + KeyedRowDataTaskWriterFactory taskWriterFactory = + new KeyedRowDataTaskWriterFactory(keyedTable, rowType, base); + taskWriterFactory.setTransactionId(transactionId); + taskWriterFactory.setMask(3); + taskWriterFactory.initialize(0, 0); + return taskWriterFactory.create(); + } } diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/ArcticSourceTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/ArcticSourceTest.java index fe78555ac7..a82d630569 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/ArcticSourceTest.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/ArcticSourceTest.java @@ -479,7 +479,8 @@ private ArcticSource initArcticSource(boolean isStreaming, String scanS arcticScanContext, rowDataReaderFunction, typeInformation, - testKeyedTable.name()); + testKeyedTable.name(), + false); } private RowDataReaderFunction initRowDataReadFunction() { diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/FlinkSourceTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/FlinkSourceTest.java index 0d326f787f..2e4daa3400 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/FlinkSourceTest.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/FlinkSourceTest.java @@ -55,7 +55,7 @@ import java.util.Set; import static com.netease.arctic.flink.write.ArcticFileWriterTest.TARGET_FILE_SIZE; -import static com.netease.arctic.flink.write.ArcticFileWriterTest.createTaskWriter; +import static com.netease.arctic.flink.write.ArcticFileWriterTest.createUnkeyedTaskWriter; public class FlinkSourceTest extends FlinkTestBase { @@ -68,7 +68,7 @@ protected static void commit(WriteResult result, Table table) { } protected static void write(Collection data, Table table, RowType rowType) throws IOException { - try (TaskWriter taskWriter = createTaskWriter(table, TARGET_FILE_SIZE, fileFormat, rowType)) { + try (TaskWriter taskWriter = createUnkeyedTaskWriter(table, TARGET_FILE_SIZE, fileFormat, rowType)) { data.forEach(d -> { try { taskWriter.write(DataUtil.toRowData(d)); diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializerTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializerTest.java index 67bb00fadd..6ee0878375 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializerTest.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ArcticSourceEnumStateSerializerTest.java @@ -22,6 +22,7 @@ import com.netease.arctic.flink.read.hybrid.assigner.ShuffleSplitAssigner; import com.netease.arctic.flink.read.hybrid.assigner.ShuffleSplitAssignerTest; import com.netease.arctic.flink.read.hybrid.split.ArcticSplit; +import com.netease.arctic.flink.read.hybrid.split.TemporalJoinSplits; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.junit.Assert; import org.junit.Test; @@ -44,11 +45,14 @@ public void testArcticEnumState() throws IOException { List splitList = FlinkSplitPlanner.planFullTable(testKeyedTable, new AtomicInteger()); shuffleSplitAssigner.onDiscoveredSplits(splitList); + TemporalJoinSplits splits = new TemporalJoinSplits(splitList, null); ArcticSourceEnumState expect = new ArcticSourceEnumState( shuffleSplitAssigner.state(), null, - shuffleSplitAssigner.serializePartitionIndex()); + shuffleSplitAssigner.serializePartitionIndex(), + splits + ); ArcticSourceEnumStateSerializer arcticSourceEnumStateSerializer = new ArcticSourceEnumStateSerializer(); byte[] ser = arcticSourceEnumStateSerializer.serialize(expect); @@ -80,5 +84,8 @@ public void testArcticEnumState() throws IOException { } Assert.assertEquals(splitList.size(), actualSplits.size()); + + TemporalJoinSplits temporalJoinSplits = actual.temporalJoinSplits(); + Assert.assertEquals(expect.temporalJoinSplits(), temporalJoinSplits); } } \ No newline at end of file diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ContinuousSplitPlannerImplTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ContinuousSplitPlannerImplTest.java index 68eafe8c2f..666931a3af 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ContinuousSplitPlannerImplTest.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/ContinuousSplitPlannerImplTest.java @@ -19,17 +19,14 @@ package com.netease.arctic.flink.read.hybrid.enumerator; import com.netease.arctic.flink.FlinkTestBase; -import com.netease.arctic.flink.write.KeyedRowDataTaskWriterFactory; import org.apache.flink.table.data.GenericRowData; import org.apache.flink.table.data.RowData; import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.types.RowKind; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.io.TaskWriter; -import org.apache.iceberg.io.WriteResult; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +36,6 @@ import java.time.LocalDateTime; import java.time.LocalTime; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicLong; @@ -71,7 +67,7 @@ public void init() throws IOException { for (RowData record : baseData) { taskWriter.write(record); } - commit(taskWriter.complete(), true); + commit(testKeyedTable, taskWriter.complete(), true); } //write change insert @@ -86,7 +82,7 @@ public void init() throws IOException { for (RowData record : insert) { taskWriter.write(record); } - commit(taskWriter.complete(), true); + commit(testKeyedTable, taskWriter.complete(), true); } //write change delete @@ -102,28 +98,11 @@ public void init() throws IOException { for (RowData record : update) { taskWriter.write(record); } - commit(taskWriter.complete(), false); - } - } - - protected void commit(WriteResult result, boolean base) { - if (base) { - AppendFiles baseAppend = testKeyedTable.baseTable().newAppend(); - Arrays.stream(result.dataFiles()).forEach(baseAppend::appendFile); - baseAppend.commit(); - } else { - AppendFiles changeAppend = testKeyedTable.changeTable().newAppend(); - Arrays.stream(result.dataFiles()).forEach(changeAppend::appendFile); - changeAppend.commit(); + commit(testKeyedTable, taskWriter.complete(), false); } } protected TaskWriter createTaskWriter(boolean base) { - KeyedRowDataTaskWriterFactory taskWriterFactory = - new KeyedRowDataTaskWriterFactory(testKeyedTable, ROW_TYPE, base); - taskWriterFactory.setTransactionId(TRANSACTION_ID.getAndIncrement()); - taskWriterFactory.setMask(3); - taskWriterFactory.initialize(0, 0); - return taskWriterFactory.create(); + return createKeyedTaskWriter(testKeyedTable, ROW_TYPE, TRANSACTION_ID.getAndIncrement(), base); } } \ No newline at end of file diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/TemporalJoinSplitsThreadSafeTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/TemporalJoinSplitsThreadSafeTest.java new file mode 100644 index 0000000000..766eb6d49d --- /dev/null +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/enumerator/TemporalJoinSplitsThreadSafeTest.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.read.hybrid.enumerator; + +import com.netease.arctic.flink.read.hybrid.split.ArcticSplit; +import com.netease.arctic.flink.read.hybrid.split.TemporalJoinSplits; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +public class TemporalJoinSplitsThreadSafeTest { + + @Test + public void testTemporalJoinSplits() { + List allSplit = new LinkedList<>(); + for (int i = 0; i < 100; i++) { + allSplit.add(UUID.randomUUID().toString()); + } + + Collection arcticSplits = allSplit.stream().map(TestArcticSplit::of).collect(Collectors.toList()); + + for (int i = 0; i < 2; i++) { + round(allSplit, arcticSplits); + } + } + + public void round(List allSplit, Collection arcticSplits) { + TemporalJoinSplits temporalJoinSplits = new TemporalJoinSplits(arcticSplits, null); + int n = allSplit.size(); + + List s1 = new ArrayList<>(allSplit.subList(0, (int) (2.0 / 3 * n))), + s2 = new ArrayList<>(allSplit.subList((int) (1.0 / 3 * n), n)); + Collections.shuffle(s1); + Collections.shuffle(s2); + + List as = new ArrayList<>(arcticSplits); + Collections.shuffle(as); + int an = as.size(); + List as1 = new ArrayList<>(as.subList(0, (int) (2.0 / 3 * an))); + List as2 = new ArrayList<>(as.subList((int) (1.0 / 3 * an), an)); + CompletableFuture f1 = CompletableFuture.runAsync(() -> + temporalJoinSplits.removeAndReturnIfAllFinished(s1) + ); + CompletableFuture f2 = CompletableFuture.runAsync(() -> + temporalJoinSplits.addSplitsBack(as1) + ); + CompletableFuture f3 = CompletableFuture.runAsync(() -> temporalJoinSplits.removeAndReturnIfAllFinished(s2)); + CompletableFuture f4 = CompletableFuture.runAsync(() -> temporalJoinSplits.addSplitsBack(as2)); + CompletableFuture.allOf(f1, f2, f3, f4).join(); + Assert.assertTrue(temporalJoinSplits.removeAndReturnIfAllFinished(allSplit)); + } + + static class TestArcticSplit extends ArcticSplit { + private final String splitId; + + public TestArcticSplit(String splitId) { + this.splitId = splitId; + } + + public static TestArcticSplit of(String splitId) { + return new TestArcticSplit(splitId); + } + + @Override + public Integer taskIndex() { + return null; + } + + @Override + public void updateOffset(Object[] recordOffsets) { + } + + @Override + public String splitId() { + return splitId; + } + } +} \ No newline at end of file diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/reader/RowDataReaderFunctionTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/reader/RowDataReaderFunctionTest.java index 197ef52eaf..1c9e404eaa 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/reader/RowDataReaderFunctionTest.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/read/hybrid/reader/RowDataReaderFunctionTest.java @@ -146,7 +146,7 @@ protected void writeUpdate(List input) throws IOException { for (RowData record : input) { taskWriter.write(record); } - commit(taskWriter.complete(), false); + commit(testKeyedTable, taskWriter.complete(), false); } } diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/table/TestJoin.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/table/TestJoin.java new file mode 100644 index 0000000000..0d4baec17a --- /dev/null +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/table/TestJoin.java @@ -0,0 +1,242 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.flink.table; + +import com.netease.arctic.flink.FlinkTestBase; +import com.netease.arctic.flink.util.ArcticUtils; +import com.netease.arctic.flink.util.DataUtil; +import com.netease.arctic.table.KeyedTable; +import com.netease.arctic.table.TableIdentifier; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.shaded.guava30.com.google.common.collect.Lists; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.TableSchema; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.CloseableIterator; +import org.apache.iceberg.io.TaskWriter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static com.netease.arctic.ams.api.MockArcticMetastoreServer.TEST_CATALOG_NAME; +import static com.netease.arctic.table.TableProperties.LOCATION; +import static org.apache.flink.table.planner.factories.TestValuesTableFactory.registerData; + +public class TestJoin extends FlinkTestBase { + + public static final Logger LOG = LoggerFactory.getLogger(TestJoin.class); + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + private static final String DB = PK_TABLE_ID.getDatabase(); + private static final String TABLE = "test_keyed"; + + public void before() { + super.before(); + super.config(); + } + + @After + public void after() { + sql("DROP TABLE IF EXISTS arcticCatalog." + DB + "." + TABLE); + } + + @Test(timeout = 180000) + public void testRightEmptyLookupJoin() throws Exception { + getEnv().getCheckpointConfig().disableCheckpointing(); + List data = new LinkedList<>(); + data.add(new Object[]{RowKind.INSERT, 1000004L, "a", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 1000015L, "b", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 1000011L, "c", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 1000022L, "d", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 1000021L, "e", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 1000016L, "e", LocalDateTime.now()}); + String id = registerData(DataUtil.toRowList(data)); + sql("CREATE TABLE `user` (id bigint, name string, op_time timestamp(3), watermark for op_time as op_time) " + + "with (" + + " 'connector' = 'values'," + + " 'bounded' = 'false'," + + " 'data-id' = '" + id + "' " + + " )"); + + sql(String.format("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props))); + Map tableProperties = new HashMap<>(); + tableProperties.put(LOCATION, tableDir.getAbsolutePath()); + String table = String.format("arcticCatalog.%s.%s", DB, TABLE); + + String sql = String.format("CREATE TABLE IF NOT EXISTS %s (" + + " info int, id bigint, name STRING" + + ", PRIMARY KEY (id) NOT ENFORCED) WITH %s", table, toWithClause(tableProperties)); + sql(sql); + + sql("create table d (op_time timestamp(3), watermark for op_time as op_time) like %s", table); + + TableResult result = exec("select u.name, u.id, dim.info, dim.name dname from `user` as u left join d " + + "/*+OPTIONS('streaming'='true', 'dim-table.enable'='true')*/ for system_time as of u.op_time as dim" + + " on u.id = dim.id"); + + CommonTestUtils.waitForJobStatus(result.getJobClient().get(), Lists.newArrayList(JobStatus.RUNNING)); + Set actual = new HashSet<>(); + try (CloseableIterator iterator = result.collect()) { + for (Object[] datum : data) { + Row row = iterator.next(); + actual.add(row); + } + } + result.getJobClient().ifPresent(JobClient::cancel); + + List expected = new LinkedList<>(); + expected.add(new Object[]{"a", 1000004L, null, null}); + expected.add(new Object[]{"b", 1000015L, null, null}); + expected.add(new Object[]{"c", 1000011L, null, null}); + expected.add(new Object[]{"d", 1000022L, null, null}); + expected.add(new Object[]{"e", 1000021L, null, null}); + expected.add(new Object[]{"e", 1000016L, null, null}); + Assert.assertEquals(DataUtil.toRowSet(expected), actual); + } + + @Test(timeout = 180000) + public void testLookupJoin() throws Exception { + getEnv().getCheckpointConfig().disableCheckpointing(); + List data = new LinkedList<>(); + data.add(new Object[]{RowKind.INSERT, 1L, "a", LocalDateTime.now().minusDays(3)}); + data.add(new Object[]{RowKind.INSERT, 2L, "b", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 3L, "c", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 4L, "d", LocalDateTime.now().plusDays(3)}); + data.add(new Object[]{RowKind.INSERT, 5L, "e", LocalDateTime.now().plusDays(3)}); + data.add(new Object[]{RowKind.INSERT, 3L, "e", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 6L, "f", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 8L, "g", LocalDateTime.now()}); + data.add(new Object[]{RowKind.INSERT, 9L, "h", LocalDateTime.now()}); + String id = registerData(DataUtil.toRowList(data)); + sql("CREATE TABLE `user` (id bigint, name string, op_time timestamp(3), watermark for op_time as op_time) " + + "with (" + + " 'connector' = 'values'," + + " 'bounded' = 'false'," + + " 'data-id' = '" + id + "' " + + " )"); + + sql(String.format("CREATE CATALOG arcticCatalog WITH %s", toWithClause(props))); + Map tableProperties = new HashMap<>(); + tableProperties.put(LOCATION, tableDir.getAbsolutePath()); + String table = String.format("arcticCatalog.%s.%s", DB, TABLE); + + String sql = String.format("CREATE TABLE IF NOT EXISTS %s (" + + " info int, id bigint, name STRING" + + ", PRIMARY KEY (id) NOT ENFORCED) WITH %s", table, toWithClause(tableProperties)); + sql(sql); + + TableSchema flinkSchema = TableSchema.builder() + .field("info", DataTypes.INT()) + .field("id", DataTypes.BIGINT()) + .field("name", DataTypes.STRING()) + .build(); + RowType rowType = (RowType) flinkSchema.toRowDataType().getLogicalType(); + KeyedTable keyedTable = (KeyedTable) ArcticUtils.loadArcticTable( + ArcticTableLoader.of(TableIdentifier.of(TEST_CATALOG_NAME, DB, TABLE), catalogBuilder)); + TaskWriter taskWriter = createKeyedTaskWriter(keyedTable, rowType, 1, true); + List baseData = new ArrayList() {{ + add(GenericRowData.ofKind( + RowKind.INSERT, 123, 1L, StringData.fromString("a"))); + add(GenericRowData.ofKind( + RowKind.INSERT, 324, 2L, StringData.fromString("b"))); + add(GenericRowData.ofKind( + RowKind.INSERT, 456, 3L, StringData.fromString("c"))); + add(GenericRowData.ofKind( + RowKind.INSERT, 463, 4L, StringData.fromString("d"))); + }}; + for (RowData record : baseData) { + taskWriter.write(record); + } + commit(keyedTable, taskWriter.complete(), true); + + writeChange(keyedTable, rowType, 1); + + sql("create table d (op_time timestamp(3), watermark for op_time as op_time) like %s", table); + + TableResult result = exec("select u.name, u.id, dim.info, dim.name dname from `user` as u left join d " + + "/*+OPTIONS('streaming'='true', 'dim-table.enable'='true')*/ for system_time as of u.op_time as dim" + + " on u.id = dim.id"); + + CommonTestUtils.waitForJobStatus(result.getJobClient().get(), Lists.newArrayList(JobStatus.RUNNING)); + Set actual = new HashSet<>(); + try (CloseableIterator iterator = result.collect()) { + for (Object[] datum : data) { + Row row = iterator.next(); + actual.add(row); + } + } + result.getJobClient().ifPresent(JobClient::cancel); + + List expected = new LinkedList<>(); + expected.add(new Object[]{"a", 1L, 123, "a"}); + expected.add(new Object[]{"b", 2L, 324, "b"}); + expected.add(new Object[]{"c", 3L, null, null}); + expected.add(new Object[]{"d", 4L, 463, "d"}); + expected.add(new Object[]{"e", 5L, 324, "john"}); + expected.add(new Object[]{"e", 3L, null, null}); + expected.add(new Object[]{"f", 6L, 324, "lily"}); + expected.add(new Object[]{"g", 8L, null, null}); + expected.add(new Object[]{"h", 9L, null, null}); + Assert.assertEquals(DataUtil.toRowSet(expected), actual); + } + + private void writeChange(KeyedTable keyedTable, RowType rowType, long tranctionId) { + TaskWriter taskWriter = createKeyedTaskWriter(keyedTable, rowType, tranctionId, false); + List data = new ArrayList() {{ + add(GenericRowData.ofKind( + RowKind.INSERT, 324, 5L, StringData.fromString("john"))); + add(GenericRowData.ofKind( + RowKind.INSERT, 324, 6L, StringData.fromString("lily"))); + add(GenericRowData.ofKind( + RowKind.DELETE, 324, 3L, StringData.fromString("jake1"))); + }}; + try { + for (RowData record : data) { + taskWriter.write(record); + } + commit(keyedTable, taskWriter.complete(), false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/write/ArcticFileWriterTest.java b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/write/ArcticFileWriterTest.java index 1c3f98b4aa..1ed82366bb 100644 --- a/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/write/ArcticFileWriterTest.java +++ b/flink/v1.15/flink/src/test/java/com/netease/arctic/flink/write/ArcticFileWriterTest.java @@ -87,7 +87,7 @@ public static OneInputStreamOperatorTestHarness createArct return harness; } - public static TaskWriter createTaskWriter(Table table, long targetFileSize, FileFormat format, + public static TaskWriter createUnkeyedTaskWriter(Table table, long targetFileSize, FileFormat format, RowType rowType) { TaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory( SerializableTable.copyOf(table), rowType, targetFileSize, format, null); diff --git a/flink/v1.15/flink/src/test/resources/log4j.properties b/flink/v1.15/flink/src/test/resources/log4j.properties index 4e51092b63..f4a416762e 100644 --- a/flink/v1.15/flink/src/test/resources/log4j.properties +++ b/flink/v1.15/flink/src/test/resources/log4j.properties @@ -18,6 +18,7 @@ log4j.rootLogger=error, console log4j.logger.com.netease.arctic=info +log4j.logger.org.apache.flink=info log4j.logger.state.change.logger=fatal log4j.appender.console=org.apache.log4j.ConsoleAppender