Skip to content

Commit

Permalink
[FLINK-35007] Add support for Flink 1.19 (#90)
Browse files Browse the repository at this point in the history
* [FLINK-35007] Add support for Flink 1.19

This also includes dropping the weekly tests for the `v3.0` branch, since `v3.1` has been released and that's our main version going forward.

* [FLINK-35007] Remove unused test class that relied on removed Internal class

* [FLINK-35007][ci] Copy old `flink-conf.yaml` to make sure that all Python tests work for Flink 1.x releases

(cherry picked from commit 897001d)
  • Loading branch information
MartijnVisser committed Apr 11, 2024
1 parent 906b6c0 commit 11c427d
Show file tree
Hide file tree
Showing 5 changed files with 344 additions and 35 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
include:
- flink: 1.18.1
jdk: '8, 11, 17'
- flink: 1.19-SNAPSHOT
- flink: 1.19.0
jdk: '8, 11, 17, 21'
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
Expand All @@ -39,7 +39,7 @@ jobs:
python_test:
strategy:
matrix:
flink: [ 1.17.2, 1.18.1 ]
flink: [ 1.17.2, 1.18.1, 1.19.0 ]
uses: apache/flink-connector-shared-utils/.github/workflows/python_ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
17 changes: 13 additions & 4 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,20 @@ jobs:
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.17.1,
branch: v3.0
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.17.2,
branch: v3.1
}, {
flink: 1.18.0,
branch: v3.0
flink: 1.18.1,
jdk: '8, 11, 17',
branch: v3.1
}, {
flink: 1.19.0,
branch: v3.1,
jdk: '8, 11, 17',
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

package org.apache.flink.streaming.connectors.kafka.testutils;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
Expand All @@ -36,8 +33,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;

import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Random;

Expand Down Expand Up @@ -210,29 +205,5 @@ public void shutdown() {
public Throwable getError() {
return this.error;
}

private static class MockTransformation extends Transformation<String> {
public MockTransformation() {
super("MockTransform", BasicTypeInfo.STRING_TYPE_INFO, 1);
}

@Override
public List<Transformation<?>> getTransitivePredecessors() {
return null;
}

@Override
public List<Transformation<?>> getInputs() {
return Collections.emptyList();
}
}

private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {

@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
return null;
}
}
}
}
18 changes: 18 additions & 0 deletions flink-python/dev/integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,23 @@ echo "Checking ${FLINK_SOURCE_DIR} for 'pyflink_gateway_server.py'"
find "${FLINK_SOURCE_DIR}/flink-python" -name pyflink_gateway_server.py
find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name pyflink_gateway_server.py -exec cp "${FLINK_SOURCE_DIR}/flink-python/pyflink/pyflink_gateway_server.py" {} \;

# Copy an empty flink-conf.yaml to conf/ folder, so that all Python tests on Flink 1.x can succeed.
# This needs to be changed when adding support for Flink 2.0
echo "Checking ${FLINK_SOURCE_DIR} for 'config.yaml'"
find "${FLINK_SOURCE_DIR}/flink-python" -name config.yaml

# For every occurrence of config.yaml (new YAML file since Flink 1.19), copy in the old flink-conf.yaml so that
# is used over the new config.yaml file.
#
# Because our intention is to copy `flink-conf.yaml` into the same directory as `config.yaml` and not replace it,
# we need to extract the directory from `{}` and then specify the target filename (`flink-conf.yaml`) explicitly.
# Unfortunately, `find`'s `-exec` doesn't directly support manipulating `{}`. So we use a slightly modified shell command
#
# `"${1}"` and `"${2}"` correspond to the first and second arguments after the shell command.
# In this case, `"${1}"` is the path to `flink-conf.yaml` and `"${2}"` is the path to each `config.yaml` found by `find`.
# `$(dirname "${2}")` extracts the directory part of the path to `config.yaml`, and then `/flink-conf.yaml`
# specifies the target filename within that directory.
find "${FLINK_SOURCE_DIR}/flink-python/.tox" -name config.yaml -exec sh -c 'cp "${1}" "$(dirname "${2}")/flink-conf.yaml"' _ "${FLINK_SOURCE_DIR}/flink-python/pyflink/flink-conf.yaml" {} \;

# python test
test_all_modules
Loading

0 comments on commit 11c427d

Please sign in to comment.