diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index dfcbb4fc209..9217b65841d 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.composer.flink; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.configuration.Configuration; import org.apache.flink.cdc.common.event.Event; @@ -25,7 +26,6 @@ import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.sink.DataSink; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.composer.PipelineComposer; import org.apache.flink.cdc.composer.PipelineExecution; import org.apache.flink.cdc.composer.definition.PipelineDef; @@ -98,12 +98,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) { int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM); env.getConfig().setParallelism(parallelism); - List udfFunctions = + List> udfFunctions = pipelineDef.getUdfs().stream() - .map( - udf -> - new UserDefinedFunctionDescriptor( - udf.getName(), udf.getClassPath())) + .map(udf -> Tuple2.of(udf.getName(), udf.getClassPath())) .collect(Collectors.toList()); SchemaChangeBehavior schemaChangeBehavior = diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java index f4a65e9a932..c834cb70e2d 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java @@ -17,8 +17,8 @@ package org.apache.flink.cdc.composer.flink.translator; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cdc.common.event.Event; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.composer.definition.TransformDef; import org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator; import org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator; @@ -37,7 +37,7 @@ public class TransformTranslator { public DataStream translateSchema( DataStream input, List transforms, - List udfFunctions) { + List> udfFunctions) { if (transforms.isEmpty()) { return input; } @@ -64,7 +64,7 @@ public DataStream translateData( List transforms, OperatorID schemaOperatorID, String timezone, - List udfFunctions) { + List> udfFunctions) { if (transforms.isEmpty()) { return input; } diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/FormatFunctionClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/FormatFunctionClass.scala index 49cbffff9df..f44c4727366 100644 --- a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/FormatFunctionClass.scala +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/FormatFunctionClass.scala @@ -23,5 +23,5 @@ import scala.annotation.varargs /** This is an example UDF class for testing purposes only. */ class FormatFunctionClass extends UserDefinedFunction { - @varargs def eval(format: String, args: Object*): String = String.format(format, args:_*) + @varargs def eval(format: String, args: Object*): String = String.format(format, args: _*) } diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/LifecycleFunctionClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/LifecycleFunctionClass.scala index 7cd9c2ee653..60b5a695666 100644 --- a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/LifecycleFunctionClass.scala +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/LifecycleFunctionClass.scala @@ -21,7 +21,7 @@ import org.apache.flink.cdc.common.udf.UserDefinedFunction /** This is an example UDF class for testing purposes only. */ class LifecycleFunctionClass extends UserDefinedFunction { - private var counter: Integer = null + private var counter: Integer = 0 def eval: String = { "#" + { diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/TypeHintFunctionClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/TypeHintFunctionClass.scala index 8c039e545db..ce3049393dc 100644 --- a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/TypeHintFunctionClass.scala +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/cdc/udf/examples/scala/TypeHintFunctionClass.scala @@ -24,7 +24,7 @@ import org.apache.flink.cdc.common.udf.UserDefinedFunction /** This is an example UDF class for testing purposes only. */ class TypeHintFunctionClass extends UserDefinedFunction { - override def getReturnType: DataType = DataTypes.STRING() + override def getReturnType: DataType = DataTypes.STRING def eval: Object = { // Return type could not be inferred from function signature diff --git a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/udf/examples/scala/FormatFunctionClass.scala b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/udf/examples/scala/FormatFunctionClass.scala index 7c01f972623..34ddff91d3f 100644 --- a/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/udf/examples/scala/FormatFunctionClass.scala +++ b/flink-cdc-pipeline-udf-examples/src/main/scala/org/apache/flink/udf/examples/scala/FormatFunctionClass.scala @@ -23,5 +23,5 @@ import scala.annotation.varargs /** This is an example UDF class for testing purposes only. */ class FormatFunctionClass extends ScalarFunction { - @varargs def eval(format: String, args: Object*): String = String.format(format, args:_*) + @varargs def eval(format: String, args: Object*): String = String.format(format, args: _*) } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java index 87098df2e10..98ac265e63e 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java @@ -20,7 +20,6 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java index fb7885bf6c4..a3c8b0ab61a 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformDataOperator.java @@ -29,7 +29,6 @@ import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient; @@ -73,7 +72,8 @@ public class TransformDataOperator extends AbstractStreamOperator /** keep the relationship of TableId and table information. */ private final Map tableInfoMap; - private final List udfFunctions; + private final List> udfFunctions; + private List udfDescriptors; private transient Map udfFunctionInstances; private transient Map, TransformProjectionProcessor> @@ -90,7 +90,7 @@ public static class Builder { private final List> transformRules = new ArrayList<>(); private OperatorID schemaOperatorID; private String timezone; - private List udfFunctions = new ArrayList<>(); + private List> udfFunctions = new ArrayList<>(); public TransformDataOperator.Builder addTransform( String tableInclusions, @Nullable String projection, @Nullable String filter) { @@ -113,7 +113,7 @@ public TransformDataOperator.Builder addTimezone(String timezone) { } public TransformDataOperator.Builder addUdfFunctions( - List udfFunctions) { + List> udfFunctions) { this.udfFunctions.addAll(udfFunctions); return this; } @@ -128,7 +128,7 @@ private TransformDataOperator( List> transformRules, OperatorID schemaOperatorID, String timezone, - List udfFunctions) { + List> udfFunctions) { this.transformRules = transformRules; this.schemaOperatorID = schemaOperatorID; this.timezone = timezone; @@ -149,6 +149,13 @@ public void setup( new SchemaEvolutionClient( containingTask.getEnvironment().getOperatorCoordinatorEventGateway(), schemaOperatorID); + udfDescriptors = + udfFunctions.stream() + .map( + udf -> { + return new UserDefinedFunctionDescriptor(udf.f0, udf.f1); + }) + .collect(Collectors.toList()); } @Override @@ -169,7 +176,7 @@ public void open() throws Exception { return new Tuple4<>( selectors, TransformProjection.of(projection), - TransformFilter.of(filterExpression, udfFunctions), + TransformFilter.of(filterExpression, udfDescriptors), containFilteredComputedColumn( projection, filterExpression)); }) @@ -177,7 +184,7 @@ public void open() throws Exception { this.transformFilterProcessorMap = new ConcurrentHashMap<>(); this.transformProjectionProcessorMap = new ConcurrentHashMap<>(); this.udfFunctionInstances = new ConcurrentHashMap<>(); - udfFunctions.forEach( + udfDescriptors.forEach( udf -> { try { Class clazz = Class.forName(udf.getClassPath()); @@ -270,7 +277,7 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception { Tuple2.of(tableId, transformProjection), TransformProjectionProcessor.of( transformProjection, - udfFunctions, + udfDescriptors, getUdfFunctionInstances())); } TransformProjectionProcessor transformProjectionProcessor = @@ -284,7 +291,7 @@ private void transformSchema(TableId tableId, Schema schema) throws Exception { } private List getUdfFunctionInstances() { - return udfFunctions.stream() + return udfDescriptors.stream() .map(e -> udfFunctionInstances.get(e.getName())) .collect(Collectors.toList()); } @@ -316,7 +323,7 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, timezone, - udfFunctions, + udfDescriptors, getUdfFunctionInstances())); } TransformProjectionProcessor transformProjectionProcessor = @@ -340,7 +347,7 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha getTableInfoFromSchemaEvolutionClient(tableId), transformFilter, timezone, - udfFunctions, + udfDescriptors, getUdfFunctionInstances())); } TransformFilterProcessor transformFilterProcessor = @@ -367,7 +374,7 @@ private Optional processDataChangeEvent(DataChangeEvent dataCha getTableInfoFromSchemaEvolutionClient(tableId), transformProjection, timezone, - udfFunctions, + udfDescriptors, getUdfFunctionInstances())); } TransformProjectionProcessor transformProjectionProcessor = @@ -463,7 +470,7 @@ private void clearOperator() { } private void initializeUdf() { - udfFunctions.forEach( + udfDescriptors.forEach( udf -> { try { if (udf.isCdcPipelineUdf()) { @@ -484,7 +491,7 @@ private void initializeUdf() { } private void destroyUdf() { - udfFunctions.forEach( + udfDescriptors.forEach( udf -> { try { if (udf.isCdcPipelineUdf()) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java index c549935d32b..2446ce9845d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformExpressionCompiler.java @@ -18,7 +18,6 @@ package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.api.common.InvalidProgramException; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java index bbfe99a56c5..3bb3cee99d1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilter.java @@ -17,7 +17,6 @@ package org.apache.flink.cdc.runtime.operators.transform; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.parser.TransformParser; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java index 0681d467833..dd4fd562239 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java @@ -20,7 +20,6 @@ import org.apache.flink.cdc.common.data.RecordData; import org.apache.flink.cdc.common.data.binary.BinaryRecordData; import org.apache.flink.cdc.common.schema.Column; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.parser.JaninoCompiler; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java index 418fc0df994..036257e8b08 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java @@ -22,7 +22,6 @@ import org.apache.flink.cdc.common.event.CreateTableEvent; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.parser.TransformParser; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java index c0e681fcd1b..31ea500d895 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformSchemaOperator.java @@ -30,14 +30,16 @@ import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.schema.Selectors; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.StreamTask; import javax.annotation.Nullable; @@ -60,7 +62,8 @@ public class TransformSchemaOperator extends AbstractStreamOperator private transient Map processorMap; private final List> schemaMetadataTransformers; private transient ListState state; - private final List udfFunctions; + private final List> udfFunctions; + private List udfDescriptors; public static TransformSchemaOperator.Builder newBuilder() { return new TransformSchemaOperator.Builder(); @@ -71,7 +74,7 @@ public static class Builder { private final List> transformRules = new ArrayList<>(); - private final List udfFunctions = new ArrayList<>(); + private final List> udfFunctions = new ArrayList<>(); public TransformSchemaOperator.Builder addTransform( String tableInclusions, @@ -85,7 +88,7 @@ public TransformSchemaOperator.Builder addTransform( } public TransformSchemaOperator.Builder addUdfFunctions( - List udfFunctions) { + List> udfFunctions) { this.udfFunctions.addAll(udfFunctions); return this; } @@ -97,7 +100,7 @@ public TransformSchemaOperator build() { private TransformSchemaOperator( List> transformRules, - List udfFunctions) { + List> udfFunctions) { this.transformRules = transformRules; this.tableChangeInfoMap = new ConcurrentHashMap<>(); this.processorMap = new ConcurrentHashMap<>(); @@ -106,6 +109,18 @@ private TransformSchemaOperator( this.udfFunctions = udfFunctions; } + @Override + public void setup( + StreamTask containingTask, + StreamConfig config, + Output> output) { + super.setup(containingTask, config, output); + this.udfDescriptors = + this.udfFunctions.stream() + .map(udf -> new UserDefinedFunctionDescriptor(udf.f0, udf.f1)) + .collect(Collectors.toList()); + } + @Override public void open() throws Exception { super.open(); @@ -233,7 +248,7 @@ private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableE tableId, TransformProjectionProcessor.of( transformProjection, - udfFunctions, + udfDescriptors, Collections.emptyList())); } TransformProjectionProcessor transformProjectionProcessor = @@ -293,7 +308,7 @@ private DataChangeEvent processProjection( TransformProjectionProcessor.of( tableChangeInfo, transformProjection, - udfFunctions, + udfDescriptors, Collections.emptyList())); } TransformProjectionProcessor transformProjectionProcessor = processorMap.get(tableId); diff --git a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/udf/UserDefinedFunctionDescriptor.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java similarity index 97% rename from flink-cdc-common/src/main/java/org/apache/flink/cdc/common/udf/UserDefinedFunctionDescriptor.java rename to flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java index 4e85b350b49..b168795ed91 100644 --- a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/udf/UserDefinedFunctionDescriptor.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.flink.cdc.common.udf; +package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.cdc.common.annotation.Internal; import org.apache.flink.cdc.common.types.DataType; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 980c8e29260..be70e8a0f49 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -19,8 +19,8 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.io.ParseException; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.common.utils.StringUtils; +import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; import org.apache.calcite.sql.SqlBasicCall; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index 79ff53418e0..e4d4ecd13e2 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -21,9 +21,9 @@ import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.common.utils.StringUtils; import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn; +import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory; import org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable; import org.apache.flink.cdc.runtime.typeutils.DataTypeConverter; diff --git a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/udf/UserDefinedFunctionDescriptorTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptorTest.java similarity index 87% rename from flink-cdc-common/src/test/java/org/apache/flink/cdc/common/udf/UserDefinedFunctionDescriptorTest.java rename to flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptorTest.java index dcda5dc86cf..c139ff5486b 100644 --- a/flink-cdc-common/src/test/java/org/apache/flink/cdc/common/udf/UserDefinedFunctionDescriptorTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptorTest.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.flink.cdc.common.udf; +package org.apache.flink.cdc.runtime.operators.transform; import org.apache.flink.cdc.common.types.DataType; import org.apache.flink.cdc.common.types.DataTypes; +import org.apache.flink.cdc.common.udf.UserDefinedFunction; import org.apache.flink.table.functions.ScalarFunction; import org.junit.jupiter.api.Test; @@ -54,7 +55,7 @@ void testUserDefinedFunctionDescriptor() { .containsExactly( "cdc_udf", "UserDefinedFunctionDescriptorTest$CdcUdf", - "org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptorTest$CdcUdf", + "org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$CdcUdf", null, true); @@ -65,7 +66,7 @@ void testUserDefinedFunctionDescriptor() { .containsExactly( "cdc_udf_with_type_hint", "UserDefinedFunctionDescriptorTest$CdcUdfWithTypeHint", - "org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptorTest$CdcUdfWithTypeHint", + "org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$CdcUdfWithTypeHint", DataTypes.TIMESTAMP_LTZ(9), true); @@ -74,7 +75,7 @@ void testUserDefinedFunctionDescriptor() { .containsExactly( "flink_udf", "UserDefinedFunctionDescriptorTest$FlinkUdf", - "org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptorTest$FlinkUdf", + "org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$FlinkUdf", null, false); @@ -82,7 +83,7 @@ void testUserDefinedFunctionDescriptor() { () -> new UserDefinedFunctionDescriptor("not_udf", NotUDF.class.getName())) .isExactlyInstanceOf(IllegalArgumentException.class) .hasMessage( - "Failed to detect UDF class class org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptorTest$NotUDF " + "Failed to detect UDF class class org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptorTest$NotUDF " + "since it never implements interface org.apache.flink.cdc.common.udf.UserDefinedFunction or " + "extends Flink class org.apache.flink.table.functions.ScalarFunction."); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 254ba1aaa79..50500655491 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -19,7 +19,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.cdc.common.types.DataTypes; -import org.apache.flink.cdc.common.udf.UserDefinedFunctionDescriptor; +import org.apache.flink.cdc.runtime.operators.transform.UserDefinedFunctionDescriptor; import org.apache.flink.cdc.runtime.parser.metadata.TransformSchemaFactory; import org.apache.flink.cdc.runtime.parser.metadata.TransformSqlOperatorTable;