-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-34876][transform] Support UDF functions in transform #3465
Conversation
Having some difficulties getting Calcite related materials, maybe @aiwenmo could provide some expert advice? |
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/JaninoCompilerTest.java
Outdated
Show resolved
Hide resolved
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
Outdated
Show resolved
Hide resolved
598f406
to
2ca3e97
Compare
Adjusted based on previous discussions, cleaned up changes, and added UDF-related docs. |
f9368db
to
cb6fa92
Compare
Rebased to latest Minor concern: By adding more E2e test cases, now it takes nearly 1h 30min to finish running e2e case, which is worrying since current timeout limit is configured to 90 minutes. Maybe something like #3514 is necessary? |
flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java
Outdated
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/definition/UdfDef.java
Outdated
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
Outdated
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
Outdated
Show resolved
Hide resolved
flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/cdc/runtime/operators/transform/UserDefinedFunctionDescriptor.java
Outdated
Show resolved
Hide resolved
default DataType getReturnType() { | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
default DataType getReturnType() { | |
return null; | |
} | |
default Optional<DataType> getReturnType() { | |
return Optional.empty(); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have some doubt on this. This is meant to be a interface protocol, and any reasonable overwrite of this method should not return an empty value.
Seems CI is failing due to an expired link in Doris docs. Rebased & fixed. |
# Conflicts: # flink-cdc-composer/pom.xml # flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java # flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperator.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/PreTransformOperator.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/ProjectionColumnProcessor.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformFilterProcessor.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/transform/TransformProjectionProcessor.java # flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java # flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
Rebased with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@yuxiqian Hi, I have been trying to run an example with this new feature but I always get I tried loading the udf function using the path in the PR description, as well as the package name I imported the function by downloading the pre-packaged jar please advise on what am I missing exactly. Thanks |
Hi @mamineturki, Have you tried putting |
Thanks for the speedy response @yuxiqian . passing the jar using |
Hi @yuxiqian I have another question please. |
Hi @mamineturki, unfortunately it's not possible now, as YAML UDF is basically equivalent to Flink's (For |
This closes FLINK-34876, aims to introduce customized logic into transform expressions, with some key features:
open
&close
lifecycle hooksScalarFunction
One may add a
UDF
block into existing pipeline definition and call them in transform expressions:where
classpath
value either implements CDC'sorg.apache.flink.cdc.common.udf.UserDefinedFunction
interface or extends Flinkorg.apache.flink.table.functions.ScalarFunction
abstract class.UserDefinedFunction
is declared as follows:A valid
UserDefinedFunction
implementation must:eval
method that doesn't return voidA
UserDefinedFunction
implementation may:getReturnType
method to declare its return type.eval
signature if not provided.open
andclose
lifecycle hooks to initialize / clean-up necessary resources.props
defined in YAML will be passed inUserDefinedFunctionContext
as an argument.TransformDataOperator
(orPostTransformOperator
after FLINK-35272).UserDefinedFunction
might be constructed temporarily elsewhere, so make sure resources are handled inopen
andclose
instead of constructors.eval
method overloading is supported, and runtime invocations will be dispatched to the correct one:Flink
ScalarFunction
is supported with the following limitations:TypeInformation
) will be ignored.ScalarFunction
with arguments are not supported.open
andclose
methods declared will be ignored.