Skip to content

Commit

Permalink
[ARCTIC-272][Flink] Sync Flink temporal join Arctic table to Flink 1.…
Browse files Browse the repository at this point in the history
…15 (#277)
  • Loading branch information
SteNicholas committed Sep 2, 2022
1 parent 709de34 commit 31b5ce3
Show file tree
Hide file tree
Showing 37 changed files with 1,208 additions and 66 deletions.
15 changes: 15 additions & 0 deletions flink/v1.15/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,21 @@
<scope>test</scope>
</dependency>

<!-- for values test connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- conflicts with the version in spring-cloud, so assign it explicitly here -->
<dependency>
<groupId>org.apache.curator</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package com.netease.arctic.flink;

import org.apache.commons.collections.CollectionUtils;
import org.apache.flink.table.api.TableColumn;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;

import java.util.List;
import java.util.function.Function;

/**
* An util that converts flink table schema.
Expand All @@ -47,4 +51,75 @@ public static TableSchema toSchema(RowType rowType, List<String> primaryKeys) {
return builder.build();
}

/**
* Add watermark info to help {@link com.netease.arctic.flink.table.FlinkSource}
* and {@link com.netease.arctic.flink.table.ArcticDynamicSource} distinguish the watermark field.
* For now, it only be used in the case of Arctic as dim-table.
*/
public static TableSchema getPhysicalSchema(TableSchema tableSchema, boolean addWatermark) {
if (!addWatermark) {
return tableSchema;
}
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
tableSchema.getWatermarkSpecs().forEach(builder::watermark);
return builder.build();
}

/**
* filter watermark due to watermark is a virtual field for now, not in arctic physical table.
*/
public static TableSchema filterWatermark(TableSchema tableSchema) {
List<WatermarkSpec> watermarkSpecs = tableSchema.getWatermarkSpecs();
if (watermarkSpecs.isEmpty()) {
return tableSchema;
}

Function<TableColumn, Boolean> filter = (tableColumn) -> {
boolean isWatermark = false;
for (WatermarkSpec spec : watermarkSpecs) {
if (spec.getRowtimeAttribute().equals(tableColumn.getName())) {
isWatermark = true;
break;
}
}
return !isWatermark;
};
return filter(tableSchema, filter).build();
}

/**
* If filter result is true, keep the column; otherwise, remove the column.
*/
public static TableSchema.Builder filter(TableSchema tableSchema, Function<TableColumn, Boolean> filter) {
TableSchema.Builder builder = TableSchema.builder();

tableSchema
.getTableColumns()
.forEach(
tableColumn -> {
if (!filter.apply(tableColumn)) {
return;
}
builder.field(tableColumn.getName(), tableColumn.getType());
});
tableSchema
.getPrimaryKey()
.ifPresent(
uniqueConstraint ->
builder.primaryKey(
uniqueConstraint.getName(),
uniqueConstraint.getColumns().toArray(new String[0])));
return builder;
}

public static RowType toRowType(TableSchema tableSchema) {
LogicalType[] fields = new LogicalType[tableSchema.getFieldCount()];

for (int i = 0; i < fields.length; i++) {
TableColumn tableColumn = tableSchema.getTableColumn(i).get();
fields[i] = tableColumn.getType().getLogicalType();
}
return RowType.of(fields);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ private com.netease.arctic.catalog.ArcticCatalog createBaseArcticCatalog() {
return CatalogLoader.load(metastoreUrl, properties);
}

public String getMetastoreUrl() {
return metastoreUrl;
}

public Map<String, String> getProperties() {
return properties;
}

public InternalCatalogBuilder() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,20 @@

package com.netease.arctic.flink.catalog;

import com.google.common.base.Objects;
import com.netease.arctic.NoSuchDatabaseException;
import com.netease.arctic.flink.InternalCatalogBuilder;
import com.netease.arctic.flink.catalog.factories.ArcticCatalogFactoryOptions;
import com.netease.arctic.flink.table.DynamicTableFactory;
import com.netease.arctic.flink.table.descriptors.ArcticValidator;
import com.netease.arctic.flink.util.ArcticUtils;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.PrimaryKeySpec;
import com.netease.arctic.table.TableBuilder;
import com.netease.arctic.table.TableIdentifier;
import com.netease.arctic.table.TableProperties;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.WatermarkSpec;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -68,13 +72,20 @@

import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.PROPS_BOOTSTRAP_SERVERS;
import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;

/**
* Catalogs for arctic data lake.
*/
public class ArcticCatalog extends AbstractCatalog {
public static final String DEFAULT_DB = "default";

/**
* To distinguish 'CREATE TABLE LIKE' by checking stack
* {@link org.apache.flink.table.planner.operations.SqlCreateTableConverter#lookupLikeSourceTable}
*/
public static final String SQL_LIKE_METHOD = "lookupLikeSourceTable";

private final InternalCatalogBuilder catalogBuilder;

private com.netease.arctic.catalog.ArcticCatalog internalCatalog;
Expand Down Expand Up @@ -169,6 +180,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
RowType rowType = FlinkSchemaUtil.convert(arcticSchema);
Map<String, String> arcticProperties = Maps.newHashMap(table.properties());
fillTableProperties(arcticProperties);
fillTableMetaPropertiesIfLookupLike(arcticProperties, tableIdentifier);

List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
return new CatalogTableImpl(
Expand All @@ -178,6 +190,35 @@ public CatalogBaseTable getTable(ObjectPath tablePath) throws TableNotExistExcep
null);
}

/**
* For now, 'CREATE TABLE LIKE' would be treated as the case which users want to add watermark in temporal join,
* as an alternative of lookup join, and use Arctic table as build table, i.e. right table.
* So the properties those required in temporal join will be put automatically.
* <p>
* If you don't want the properties, 'EXCLUDING ALL' is what you need.
* More details @see <a href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#like">LIKE</a>
*/
private void fillTableMetaPropertiesIfLookupLike(Map<String, String> properties, TableIdentifier tableIdentifier) {
StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace();
boolean isLookupLike = false;
for (StackTraceElement stackTraceElement : stackTraceElements) {
if (Objects.equal(SQL_LIKE_METHOD, stackTraceElement.getMethodName())) {
isLookupLike = true;
break;
}
}

if (!isLookupLike) {
return;
}

properties.put(CONNECTOR.key(), DynamicTableFactory.IDENTIFIER);
properties.put(ArcticValidator.ARCTIC_CATALOG.key(), tableIdentifier.getCatalog());
properties.put(ArcticValidator.ARCTIC_TABLE.key(), tableIdentifier.getTableName());
properties.put(ArcticValidator.ARCTIC_DATABASE.key(), tableIdentifier.getDatabase());
properties.put(ArcticCatalogFactoryOptions.METASTORE_URL.key(), catalogBuilder.getMetastoreUrl());
}

private static List<String> toPartitionKeys(PartitionSpec spec, Schema icebergSchema) {
List<String> partitionKeys = Lists.newArrayList();
for (PartitionField field : spec.fields()) {
Expand Down Expand Up @@ -253,7 +294,20 @@ public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ig
validateFlinkTable(table);

TableSchema tableSchema = table.getSchema();
Schema icebergSchema = FlinkSchemaUtil.convert(tableSchema);
TableSchema.Builder b = TableSchema.builder();

tableSchema.getTableColumns().forEach(c -> {
List<WatermarkSpec> ws = tableSchema.getWatermarkSpecs();
for (WatermarkSpec w : ws) {
if (w.getRowtimeAttribute().equals(c.getName())) {
return;
}
}
b.field(c.getName(), c.getType());
});
TableSchema tableSchemaWithoutWatermark = b.build();

Schema icebergSchema = FlinkSchemaUtil.convert(tableSchemaWithoutWatermark);

TableBuilder tableBuilder = internalCatalog.newTableBuilder(
getTableIdentifier(tablePath), icebergSchema);
Expand Down Expand Up @@ -295,9 +349,6 @@ private static void validateFlinkTable(CatalogBaseTable table) {
}
});

if (!schema.getWatermarkSpecs().isEmpty()) {
throw new UnsupportedOperationException("Creating table with watermark specs is not supported yet.");
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.flink.metric;

/**
* metric constant
*/
public class MetricConstant {

/**
* The start time of arctic table's initialization when it used as build table in temporal join.
*/
public static final String TEMPORAL_TABLE_INITIALIZATION_START_TIMESTAMP =
"temporalTableInitializationStartTimestamp";
/**
* The end time of arctic table's initialization when it used as build table in temporal join.
*/
public static final String TEMPORAL_TABLE_INITIALIZATION_END_TIMESTAMP = "temporalTableInitializationEndTimestamp";

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package com.netease.arctic.flink.write;
package com.netease.arctic.flink.metric;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,8 @@ private static class MicrosToTimestampTzReader extends ParquetValueReaders.Unbox
public TimestampData read(TimestampData ignored) {
long value = readLong();
return TimestampData.fromLocalDateTime(Instant.ofEpochSecond(
Math.floorDiv(value, 1000_000),
Math.floorMod(value, 1000_000) * 1000)
Math.floorDiv(value, 1000_000L),
Math.floorMod(value, 1000_000L) * 1000)
.atOffset(ZoneOffset.UTC)
.toLocalDateTime());
}
Expand All @@ -406,8 +406,8 @@ private static class MicrosToTimestampReader extends ParquetValueReaders.Unboxed
public TimestampData read(TimestampData ignored) {
long value = readLong();
return TimestampData.fromInstant(Instant.ofEpochSecond(
Math.floorDiv(value, 1000_000),
Math.floorMod(value, 1000_000) * 1000));
Math.floorDiv(value, 1000_000L),
Math.floorMod(value, 1000_000L) * 1000));
}

@Override
Expand Down Expand Up @@ -478,7 +478,7 @@ private static class LossyMicrosToMillisTimeReader extends ParquetValueReaders.P
@Override
public Integer read(Integer reuse) {
// Discard microseconds since Flink uses millisecond unit for TIME type.
return (int) Math.floorDiv(column.nextLong(), 1000);
return (int) Math.floorDiv(column.nextLong(), 1000L);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@

/**
* Arctic Source based of Flip27.
* <p>
* If ArcticSource is used as a build table in lookup join, it will be implemented by temporal join.
* Two source should use processing time as watermark.
* ArcticSource will generate watermark after first splits planned by ArcticSourceEnumerator having been finished.
*/
public class ArcticSource<T> implements Source<T, ArcticSplit, ArcticSourceEnumState>, ResultTypeQueryable<T> {
private static final long serialVersionUID = 1L;
Expand All @@ -54,14 +58,20 @@ public class ArcticSource<T> implements Source<T, ArcticSplit, ArcticSourceEnumS
private final TypeInformation<T> typeInformation;
private final ArcticTableLoader loader;
private final String tableName;
/**
* generate arctic watermark. This is only for lookup join arctic table, and arctic table is used as build table,
* i.e. right table.
*/
private final boolean dimTable;

public ArcticSource(ArcticTableLoader loader, ArcticScanContext scanContext, ReaderFunction<T> readerFunction,
TypeInformation<T> typeInformation, String tableName) {
TypeInformation<T> typeInformation, String tableName, boolean dimTable) {
this.loader = loader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.typeInformation = typeInformation;
this.tableName = tableName;
this.dimTable = dimTable;
}

@Override
Expand All @@ -71,7 +81,7 @@ public Boundedness getBoundedness() {

@Override
public SourceReader<T, ArcticSplit> createReader(SourceReaderContext readerContext) throws Exception {
return new ArcticSourceReader<>(readerFunction, readerContext.getConfiguration(), readerContext);
return new ArcticSourceReader<>(readerFunction, readerContext.getConfiguration(), readerContext, dimTable);
}

@Override
Expand All @@ -93,7 +103,7 @@ private SplitEnumerator<ArcticSplit, ArcticSourceEnumState> createEnumerator(
}

if (scanContext.isStreaming()) {
return new ArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, enumState);
return new ArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, enumState, dimTable);
} else {
return new StaticArcticSourceEnumerator(enumContext, splitAssigner, loader, scanContext, null);
}
Expand Down
Loading

0 comments on commit 31b5ce3

Please sign in to comment.