From 11c427df7262bccd37b99e16b6d48205ee14cbae Mon Sep 17 00:00:00 2001 From: Martijn Visser <2989614+MartijnVisser@users.noreply.github.com> Date: Thu, 11 Apr 2024 10:15:31 +0200 Subject: [PATCH] [FLINK-35007] Add support for Flink 1.19 (#90) * [FLINK-35007] Add support for Flink 1.19 This also includes dropping the weekly tests for the `v3.0` branch, since `v3.1` has been released and that's our main version going forward. * [FLINK-35007] Remove unused test class that relied on removed Internal class * [FLINK-35007][ci] Copy old `flink-conf.yaml` to make sure that all Python tests work for Flink 1.x releases (cherry picked from commit 897001d5682a0708042d59be81a10485ffd0dde7) --- .github/workflows/push_pr.yml | 4 +- .github/workflows/weekly.yml | 17 +- .../kafka/testutils/DataGenerators.java | 29 -- flink-python/dev/integration_test.sh | 18 + flink-python/pyflink/flink-conf.yaml | 311 ++++++++++++++++++ 5 files changed, 344 insertions(+), 35 deletions(-) create mode 100644 flink-python/pyflink/flink-conf.yaml diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 00e2f788d..7f30c691e 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -30,7 +30,7 @@ jobs: include: - flink: 1.18.1 jdk: '8, 11, 17' - - flink: 1.19-SNAPSHOT + - flink: 1.19.0 jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: @@ -39,7 +39,7 @@ jobs: python_test: strategy: matrix: - flink: [ 1.17.2, 1.18.1 ] + flink: [ 1.17.2, 1.18.1, 1.19.0 ] uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} \ No newline at end of file diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index e6bf27dda..f24769ae9 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -38,11 +38,20 @@ jobs: jdk: '8, 11, 17, 21', branch: main }, { - flink: 1.17.1, - branch: v3.0 + flink: 1.20-SNAPSHOT, + jdk: '8, 11, 17, 21', + branch: main + }, { + flink: 1.17.2, + branch: v3.1 }, { - flink: 1.18.0, - branch: v3.0 + flink: 1.18.1, + jdk: '8, 11, 17', + branch: v3.1 + }, { + flink: 1.19.0, + branch: v3.1, + jdk: '8, 11, 17', }] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index 83ee3fb1c..d660bd2f1 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -18,16 +18,13 @@ package org.apache.flink.streaming.connectors.kafka.testutils; -import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase; import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment; @@ -36,8 +33,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import java.util.Collections; -import java.util.List; import java.util.Properties; import java.util.Random; @@ -210,29 +205,5 @@ public void shutdown() { public Throwable getError() { return this.error; } - - private static class MockTransformation extends Transformation { - public MockTransformation() { - super("MockTransform", BasicTypeInfo.STRING_TYPE_INFO, 1); - } - - @Override - public List> getTransitivePredecessors() { - return null; - } - - @Override - public List> getInputs() { - return Collections.emptyList(); - } - } - - private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment { - - @Override - public JobExecutionResult execute(StreamGraph streamGraph) throws Exception { - return null; - } - } } } diff --git a/flink-python/dev/integration_test.sh b/flink-python/dev/integration_test.sh index 19816725a..45f66482a 100755 --- a/flink-python/dev/integration_test.sh +++ b/flink-python/dev/integration_test.sh @@ -50,5 +50,23 @@ echo "Checking ${FLINK_SOURCE_DIR} for 'pyflink_gateway_server.py'" find "${FLINK_SOURCE_DIR}/flink-python" -name pyflink_gateway_server.py find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name pyflink_gateway_server.py -exec cp "${FLINK_SOURCE_DIR}/flink-python/pyflink/pyflink_gateway_server.py" {} \; +# Copy an empty flink-conf.yaml to conf/ folder, so that all Python tests on Flink 1.x can succeed. +# This needs to be changed when adding support for Flink 2.0 +echo "Checking ${FLINK_SOURCE_DIR} for 'config.yaml'" +find "${FLINK_SOURCE_DIR}/flink-python" -name config.yaml + +# For every occurrence of config.yaml (new YAML file since Flink 1.19), copy in the old flink-conf.yaml so that +# is used over the new config.yaml file. +# +# Because our intention is to copy `flink-conf.yaml` into the same directory as `config.yaml` and not replace it, +# we need to extract the directory from `{}` and then specify the target filename (`flink-conf.yaml`) explicitly. +# Unfortunately, `find`'s `-exec` doesn't directly support manipulating `{}`. So we use a slightly modified shell command +# +# `"${1}"` and `"${2}"` correspond to the first and second arguments after the shell command. +# In this case, `"${1}"` is the path to `flink-conf.yaml` and `"${2}"` is the path to each `config.yaml` found by `find`. +# `$(dirname "${2}")` extracts the directory part of the path to `config.yaml`, and then `/flink-conf.yaml` +# specifies the target filename within that directory. +find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name config.yaml -exec sh -c 'cp "${1}" "$(dirname "${2}")/flink-conf.yaml"' _ "${FLINK_SOURCE_DIR}/flink-python/pyflink/flink-conf.yaml" {} \; + # python test test_all_modules diff --git a/flink-python/pyflink/flink-conf.yaml b/flink-python/pyflink/flink-conf.yaml new file mode 100644 index 000000000..b5aa2794d --- /dev/null +++ b/flink-python/pyflink/flink-conf.yaml @@ -0,0 +1,311 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# These parameters are required for Java 17 support. +# They can be safely removed when using Java 8/11. +env.java.opts.all: --add-exports=java.base/sun.net.util=ALL-UNNAMED --add-exports=java.rmi/sun.rmi.registry=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED --add-exports=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.text=ALL-UNNAMED --add-opens=java.base/java.time=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.locks=ALL-UNNAMED + +#============================================================================== +# Common +#============================================================================== + +# The external address of the host on which the JobManager runs and can be +# reached by the TaskManagers and any clients which want to connect. This setting +# is only used in Standalone mode and may be overwritten on the JobManager side +# by specifying the --host parameter of the bin/jobmanager.sh executable. +# In high availability mode, if you use the bin/start-cluster.sh script and setup +# the conf/masters file, this will be taken care of automatically. Yarn +# automatically configure the host name based on the hostname of the node where the +# JobManager runs. + +jobmanager.rpc.address: localhost + +# The RPC port where the JobManager is reachable. + +jobmanager.rpc.port: 6123 + +# The host interface the JobManager will bind to. By default, this is localhost, and will prevent +# the JobManager from communicating outside the machine/container it is running on. +# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. +# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +jobmanager.bind-host: localhost + + +# The total process memory size for the JobManager. +# +# Note this accounts for all memory usage within the JobManager process, including JVM metaspace and other overhead. + +jobmanager.memory.process.size: 1600m + +# The host interface the TaskManager will bind to. By default, this is localhost, and will prevent +# the TaskManager from communicating outside the machine/container it is running on. +# On YARN this setting will be ignored if it is set to 'localhost', defaulting to 0.0.0.0. +# On Kubernetes this setting will be ignored, defaulting to 0.0.0.0. +# +# To enable this, set the bind-host address to one that has access to an outside facing network +# interface, such as 0.0.0.0. + +taskmanager.bind-host: localhost + +# The address of the host on which the TaskManager runs and can be reached by the JobManager and +# other TaskManagers. If not specified, the TaskManager will try different strategies to identify +# the address. +# +# Note this address needs to be reachable by the JobManager and forward traffic to one of +# the interfaces the TaskManager is bound to (see 'taskmanager.bind-host'). +# +# Note also that unless all TaskManagers are running on the same machine, this address needs to be +# configured separately for each TaskManager. + +taskmanager.host: localhost + +# The total process memory size for the TaskManager. +# +# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead. + +taskmanager.memory.process.size: 1728m + +# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'. +# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory. +# +# taskmanager.memory.flink.size: 1280m + +# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline. + +taskmanager.numberOfTaskSlots: 1 + +# The parallelism used for programs that did not specify and other parallelism. + +parallelism.default: 1 + +# The default file system scheme and authority. +# +# By default file paths without scheme are interpreted relative to the local +# root file system 'file:///'. Use this to override the default and interpret +# relative paths relative to a different file system, +# for example 'hdfs://mynamenode:12345' +# +# fs.default-scheme + +#============================================================================== +# High Availability +#============================================================================== + +# The high-availability mode. Possible options are 'NONE' or 'zookeeper'. +# +# high-availability.type: zookeeper + +# The path where metadata for master recovery is persisted. While ZooKeeper stores +# the small ground truth for checkpoint and leader election, this location stores +# the larger objects, like persisted dataflow graphs. +# +# Must be a durable file system that is accessible from all nodes +# (like HDFS, S3, Ceph, nfs, ...) +# +# high-availability.storageDir: hdfs:///flink/ha/ + +# The list of ZooKeeper quorum peers that coordinate the high-availability +# setup. This must be a list of the form: +# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181) +# +# high-availability.zookeeper.quorum: localhost:2181 + + +# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes +# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE) +# The default value is "open" and it can be changed to "creator" if ZK security is enabled +# +# high-availability.zookeeper.client.acl: open + +#============================================================================== +# Fault tolerance and checkpointing +#============================================================================== + +# The backend that will be used to store operator state checkpoints if +# checkpointing is enabled. Checkpointing is enabled when execution.checkpointing.interval > 0. +# +# Execution checkpointing related parameters. Please refer to CheckpointConfig and ExecutionCheckpointingOptions for more details. +# +# execution.checkpointing.interval: 3min +# execution.checkpointing.externalized-checkpoint-retention: [DELETE_ON_CANCELLATION, RETAIN_ON_CANCELLATION] +# execution.checkpointing.max-concurrent-checkpoints: 1 +# execution.checkpointing.min-pause: 0 +# execution.checkpointing.mode: [EXACTLY_ONCE, AT_LEAST_ONCE] +# execution.checkpointing.timeout: 10min +# execution.checkpointing.tolerable-failed-checkpoints: 0 +# execution.checkpointing.unaligned: false +# +# Supported backends are 'hashmap', 'rocksdb', or the +# . +# +# state.backend.type: hashmap + +# Directory for checkpoints filesystem, when using any of the default bundled +# state backends. +# +# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints + +# Default target directory for savepoints, optional. +# +# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints + +# Flag to enable/disable incremental checkpoints for backends that +# support incremental checkpoints (like the RocksDB state backend). +# +# state.backend.incremental: false + +# The failover strategy, i.e., how the job computation recovers from task failures. +# Only restart tasks that may have been affected by the task failure, which typically includes +# downstream tasks and potentially upstream tasks if their produced data is no longer available for consumption. + +jobmanager.execution.failover-strategy: region + +#============================================================================== +# Rest & web frontend +#============================================================================== + +# The port to which the REST client connects to. If rest.bind-port has +# not been specified, then the server will bind to this port as well. +# +#rest.port: 8081 + +# The address to which the REST client will connect to +# +rest.address: localhost + +# Port range for the REST and web server to bind to. +# +#rest.bind-port: 8080-8090 + +# The address that the REST & web server binds to +# By default, this is localhost, which prevents the REST & web server from +# being able to communicate outside of the machine/container it is running on. +# +# To enable this, set the bind address to one that has access to outside-facing +# network interface, such as 0.0.0.0. +# +rest.bind-address: localhost + +# Flag to specify whether job submission is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.submit.enable: false + +# Flag to specify whether job cancellation is enabled from the web-based +# runtime monitor. Uncomment to disable. + +#web.cancel.enable: false + +#============================================================================== +# Advanced +#============================================================================== + +# Override the directories for temporary files. If not specified, the +# system-specific Java temporary directory (java.io.tmpdir property) is taken. +# +# For framework setups on Yarn, Flink will automatically pick up the +# containers' temp directories without any need for configuration. +# +# Add a delimited list for multiple directories, using the system directory +# delimiter (colon ':' on unix) or a comma, e.g.: +# /data1/tmp:/data2/tmp:/data3/tmp +# +# Note: Each directory entry is read from and written to by a different I/O +# thread. You can include the same directory multiple times in order to create +# multiple I/O threads against that directory. This is for example relevant for +# high-throughput RAIDs. +# +# io.tmp.dirs: /tmp + +# The classloading resolve order. Possible values are 'child-first' (Flink's default) +# and 'parent-first' (Java's default). +# +# Child first classloading allows users to use different dependency/library +# versions in their application than those in the classpath. Switching back +# to 'parent-first' may help with debugging dependency issues. +# +# classloader.resolve-order: child-first + +# The amount of memory going to the network stack. These numbers usually need +# no tuning. Adjusting them may be necessary in case of an "Insufficient number +# of network buffers" error. The default min is 64MB, the default max is 1GB. +# +# taskmanager.memory.network.fraction: 0.1 +# taskmanager.memory.network.min: 64mb +# taskmanager.memory.network.max: 1gb + +#============================================================================== +# Flink Cluster Security Configuration +#============================================================================== + +# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors - +# may be enabled in four steps: +# 1. configure the local krb5.conf file +# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit) +# 3. make the credentials available to various JAAS login contexts +# 4. configure the connector to use JAAS/SASL + +# The below configure how Kerberos credentials are provided. A keytab will be used instead of +# a ticket cache if the keytab path and principal are set. + +# security.kerberos.login.use-ticket-cache: true +# security.kerberos.login.keytab: /path/to/kerberos/keytab +# security.kerberos.login.principal: flink-user + +# The configuration below defines which JAAS login contexts + +# security.kerberos.login.contexts: Client,KafkaClient + +#============================================================================== +# ZK Security Configuration +#============================================================================== + +# Below configurations are applicable if ZK ensemble is configured for security + +# Override below configuration to provide custom ZK service name if configured +# zookeeper.sasl.service-name: zookeeper + +# The configuration below must match one of the values set in "security.kerberos.login.contexts" +# zookeeper.sasl.login-context-name: Client + +#============================================================================== +# HistoryServer +#============================================================================== + +# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop) + +# Directory to upload completed jobs to. Add this directory to the list of +# monitored directories of the HistoryServer as well (see below). +#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ + +# The address under which the web-based HistoryServer listens. +#historyserver.web.address: 0.0.0.0 + +# The port under which the web-based HistoryServer listens. +#historyserver.web.port: 8082 + +# Comma separated list of directories to monitor for completed jobs. +#historyserver.archive.fs.dir: hdfs:///completed-jobs/ + +# Interval in milliseconds for refreshing the monitored directories. +#historyserver.archive.fs.refresh-interval: 10000 +