diff --git a/.github/workflows/backend.yaml b/.github/workflows/backend.yaml index 4929e13651..c55cab11b5 100644 --- a/.github/workflows/backend.yaml +++ b/.github/workflows/backend.yaml @@ -23,7 +23,6 @@ on: pull_request: branches: - dev - - 1.2 paths-ignore: - 'docs/**' - '**/*.md' @@ -156,7 +155,7 @@ jobs: strategy: fail-fast: true matrix: - flink: [ '1.14', '1.15', '1.16', '1.17', '1.18', '1.19', '1.20' ] + flink: [ '1.15', '1.16', '1.17', '1.18', '1.19', '1.20' ] runs-on: ubuntu-latest services: registry: @@ -166,6 +165,9 @@ jobs: steps: - name: Checkout uses: actions/checkout@v4 + - name: Init Docker Network + run: | + docker network create -d bridge --subnet 172.28.0.0/16 --gateway 172.28.0.1 dinky_net - name: Download artifact uses: actions/download-artifact@v4 with: @@ -190,18 +192,41 @@ jobs: FLINK_VERSION=${{ matrix.flink }} tags: | localhost:5000/dinky/dinky-test:flink + - name: Build Flink Image + uses: docker/build-push-action@v5 + with: + context: . + file: ./e2e_test/docker-compose-env/FlinkDockerfile + # 是否 docker push + push: true + build-args: | + FLINK_VERSION=${{ matrix.flink }} + tags: | + localhost:5000/dinky/flink:flink - name: Init Env Jar run: | + mkdir O e2e_test/docker-compose-env/dinky/jars wget -O e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.30/mysql-connector-java-8.0.30.jar && wget -O e2e_test/docker-compose-env/flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar https://repo1.maven.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.8.3-10.0/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar && wget -O e2e_test/docker-compose-env/dinky/javax.ws.rs-api-2.1.1.jar https://repo1.maven.org/maven2/javax/ws/rs/javax.ws.rs-api/2.1.1/javax.ws.rs-api-2.1.1.jar - - name: Init Docker Network - run: | - docker network create -d bridge dinky_net + wget -O e2e_test/docker-compose-env/dinky/jars/flink-doris-connector.jar https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-${{ matrix.flink }}/24.0.1/flink-doris-connector-${{ matrix.flink }}-24.0.1.jar + wget -O e2e_test/docker-compose-env/dinky/jars/flink-sql-connector-mysql-cdc.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.0/flink-sql-connector-mysql-cdc-3.2.0.jar + cp e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar e2e_test/docker-compose-env/dinky/jars/mysql-connector-java.jar - name: Init Run Docker MySQL uses: hoverkraft-tech/compose-action@v2.0.2 with: compose-file: ./e2e_test/docker-compose-env/mysql/docker-compose.yml + # - name: Init System Env And Clear Docker Build Cache + # run: | + # echo y | docker builder prune + # df -h + # ulimit -a + # sudo swapoff -a + # sudo sysctl -w vm.max_map_count=2000000 + # - name: Init Run Docker Doris + # uses: hoverkraft-tech/compose-action@v2.0.2 + # with: + # compose-file: ./e2e_test/docker-compose-env/doris/docker-compose.yml - name: Init Run Docker Dinky uses: hoverkraft-tech/compose-action@v2.0.2 with: @@ -210,16 +235,53 @@ jobs: uses: hoverkraft-tech/compose-action@v2.0.2 with: compose-file: ./e2e_test/docker-compose-env/hadoop/docker-compose.yml - - name: Replace Flink docker-compose yml - run: | - export FLINK_VERSION=${{ matrix.flink }} && envsubst < ./e2e_test/docker-compose-env/flink/docker-compose.yml > ./e2e_test/docker-compose-env/flink/docker-compose-${{ matrix.flink }}.yml - name: Init Run Docker Flink uses: hoverkraft-tech/compose-action@v2.0.2 with: - compose-file: ./e2e_test/docker-compose-env/flink/docker-compose-${{ matrix.flink }}.yml - + compose-file: ./e2e_test/docker-compose-env/flink/docker-compose.yml + # k8s env + - name: Init k3s + uses: nolar/setup-k3d-k3s@v1 + with: + version: latest + k3d-args: -s 1 --network dinky_net --api-port 172.28.0.1:6550 + - name: Get k3s kube config + run: k3d kubeconfig get --all && mkdir ./kube && k3d kubeconfig get --all > ./kube/k3s.yaml && sed -i 's/0.0.0.0/172.28.0.1/g' ./kube/k3s.yaml + - name: Init k8s RBAC and namespace + run: | + kubectl create namespace dinky + kubectl create serviceaccount dinky -n dinky + kubectl create clusterrolebinding flink-role-binding-dinky --clusterrole=edit --serviceaccount=dinky:dinky + - name: Init k3s main images + run: | + docker exec k3d-k3s-default-server-0 crictl pull library/busybox:latest + docker exec k3d-k3s-default-server-0 crictl pull flink:${{ matrix.flink }}-scala_2.12-java8 + docker pull localhost:5000/dinky/flink:flink + docker tag localhost:5000/dinky/flink:flink dinky/flink:flink + docker save -o flink.tar dinky/flink:flink + k3d images import ./flink.tar + rm -rf ./flink.tar + - name: Test k3s host + run: | + curl -k https://172.28.0.1:6550 - name: Cp Flink Jar Deps - run: docker cp dinky:/opt/dinky/ ./dinky-release + run: | + docker cp dinky:/opt/dinky/ ./dinky-release + mv ./dinky-release/jar/dinky-app*.jar e2e_test/docker-compose-env/dinky/dinky-app.jar + - name: Run python http server + run: | + mkdir -p logs + ls e2e_test/docker-compose-env/dinky/ + nohup python -m http.server -d e2e_test/docker-compose-env/dinky/ 9001 > ./logs/python_http.log & - name: Run Docker Python Script run: | - docker run -v ./dinky-release/extends/flink${{ matrix.flink }}:/flink/lib -v ./e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar:/flink/lib/mysql-connector-java-8.0.30.jar -v./e2e_test/docker-compose-env/flink/conf:/flink/conf -v ./dinky-release/jar:/dinky/jar -v./e2e_test/tools:/app -w /app --net dinky_net --rm --entrypoint /bin/bash python:3.9 -c 'pip install -r requirements.txt && python main.py dinky:8888' + docker run -v ./e2e_test/tools:/app -w /app -v ./kube:/kube -v ./e2e_test/docker-compose-env/dinky:/dinky/jar -v ./dinky-release/extends/flink${{ matrix.flink }}:/opt/flink/lib -v ./e2e_test/docker-compose-env/dinky/mysql-connector-java-8.0.30.jar:/opt/flink/lib/mysql-connector-java-8.0.30.jar --net dinky_net --rm --entrypoint /bin/bash python:3.9 -c 'pip install -r requirements.txt && python main.py dinky:8888 ${{ matrix.flink }}' + - name: Get k8s pods info and logs + if: ${{ always() }} + run: | + chmod -R 755 ./e2e_test/view_k8s_all_pod_logs.sh + ./e2e_test/view_k8s_all_pod_logs.sh dinky + - name: Get Python HttpServer log + if: ${{ always() }} + run: | + cat ./logs/python_http.log diff --git a/e2e_test/docker-compose-env/Dockerfile b/e2e_test/docker-compose-env/Dockerfile old mode 100755 new mode 100644 diff --git a/e2e_test/docker-compose-env/FlinkDockerfile b/e2e_test/docker-compose-env/FlinkDockerfile new file mode 100644 index 0000000000..658b71054e --- /dev/null +++ b/e2e_test/docker-compose-env/FlinkDockerfile @@ -0,0 +1,5 @@ + +ARG FLINK_VERSION + +FROM flink:${FLINK_VERSION}-scala_2.12-java8 as flink-base +RUN rm -f /opt/flink/lib/flink-table-planner-loader*.jar && cp /opt/flink/opt/flink-python*.jar /opt/flink/lib/ && cp /opt/flink/opt/flink-table-planner*.jar /opt/flink/lib/ 2>/dev/null || : diff --git a/e2e_test/docker-compose-env/dinky/docker-compose.yml b/e2e_test/docker-compose-env/dinky/docker-compose.yml index 5de79aa65a..7884e1aec1 100644 --- a/e2e_test/docker-compose-env/dinky/docker-compose.yml +++ b/e2e_test/docker-compose-env/dinky/docker-compose.yml @@ -17,7 +17,8 @@ services: - ./mysql-connector-java-8.0.30.jar:/opt/dinky/lib/mysql-connector-java-8.0.30.jar - ./javax.ws.rs-api-2.1.1.jar:/opt/dinky/lib/javax.ws.rs-api-2.1.1.jar - ../flink/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/dinky/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar - - ../flink/conf/log4j-console.properties:/flink/conf/log4j-console.properties - - ../hadoop:/flink/conf + - ../flink/conf/log4j-console.properties:/opt/flink/conf/log4j-console.properties + - ../hadoop:/opt/flink/conf + - ./jars:/dinky networks: - dinky_net diff --git a/e2e_test/docker-compose-env/doris/docker-compose.yml b/e2e_test/docker-compose-env/doris/docker-compose.yml new file mode 100644 index 0000000000..9ca362236f --- /dev/null +++ b/e2e_test/docker-compose-env/doris/docker-compose.yml @@ -0,0 +1,11 @@ +version: "3" +networks: + dinky_net: + external: true +services: + doris: + image: yagagagaga/doris-standalone:2.1.7 + hostname: doris + container_name: doris + networks: + - dinky_net diff --git a/e2e_test/docker-compose-env/flink/docker-compose.yml b/e2e_test/docker-compose-env/flink/docker-compose.yml index cd300ec22a..c04c7b66e6 100644 --- a/e2e_test/docker-compose-env/flink/docker-compose.yml +++ b/e2e_test/docker-compose-env/flink/docker-compose.yml @@ -7,10 +7,14 @@ services: hostname: jobmanager container_name: jobmanager restart: always - image: flink:${FLINK_VERSION}-scala_2.12-java8 + image: localhost:5000/dinky/flink:flink command: jobmanager environment: - HADOOP_CONF_DIR=/opt/flink/conf + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + parallelism.default: 1 volumes: - ./conf:/opt/flink/conf - ./flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar @@ -19,10 +23,15 @@ services: taskmanager: hostname: taskmanager container_name: taskmanager - image: flink:${FLINK_VERSION}-scala_2.12-java8 + image: localhost:5000/dinky/flink:flink command: taskmanager environment: - HADOOP_CONF_DIR=/opt/flink/conf + - | + FLINK_PROPERTIES= + jobmanager.rpc.address: jobmanager + taskmanager.numberOfTaskSlots: 20 + parallelism.default: 1 volumes: - ./conf:/opt/flink/conf - ./flink-shaded-hadoop-2-uber-2.8.3-10.0.jar:/opt/flink/lib/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar diff --git a/e2e_test/docker-compose-env/hadoop/hadoop.env b/e2e_test/docker-compose-env/hadoop/hadoop.env index fe34f877a4..c78d39fc48 100644 --- a/e2e_test/docker-compose-env/hadoop/hadoop.env +++ b/e2e_test/docker-compose-env/hadoop/hadoop.env @@ -1,43 +1,43 @@ -CORE_CONF_fs_defaultFS=hdfs://namenode:9000 -CORE_CONF_hadoop_http_staticuser_user=root -CORE_CONF_hadoop_proxyuser_hue_hosts=* -CORE_CONF_hadoop_proxyuser_hue_groups=* -CORE_CONF_io_compression_codecs=org.apache.hadoop.io.compress.SnappyCodec - -HDFS_CONF_dfs_webhdfs_enabled=true -HDFS_CONF_dfs_permissions_enabled=false -HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false - -YARN_CONF_yarn_log___aggregation___enable=true -YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/ -YARN_CONF_yarn_resourcemanager_recovery_enabled=true -YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore -YARN_CONF_yarn_resourcemanager_scheduler_class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler -YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___mb=8192 -YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___vcores=4 -YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate -YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true -YARN_CONF_yarn_resourcemanager_hostname=resourcemanager -YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032 -YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030 -YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031 -YARN_CONF_yarn_timeline___service_enabled=true -YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true -YARN_CONF_yarn_timeline___service_hostname=historyserver -YARN_CONF_mapreduce_map_output_compress=true -YARN_CONF_mapred_map_output_compress_codec=org.apache.hadoop.io.compress.SnappyCodec -YARN_CONF_yarn_nodemanager_resource_memory___mb=16384 -YARN_CONF_yarn_nodemanager_resource_cpu___vcores=8 -YARN_CONF_yarn_nodemanager_disk___health___checker_max___disk___utilization___per___disk___percentage=98.5 -YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs -YARN_CONF_yarn_nodemanager_aux___services=mapreduce_shuffle - -MAPRED_CONF_mapreduce_framework_name=yarn -MAPRED_CONF_mapred_child_java_opts=-Xmx4096m -MAPRED_CONF_mapreduce_map_memory_mb=4096 -MAPRED_CONF_mapreduce_reduce_memory_mb=8192 -MAPRED_CONF_mapreduce_map_java_opts=-Xmx3072m -MAPRED_CONF_mapreduce_reduce_java_opts=-Xmx6144m -MAPRED_CONF_yarn_app_mapreduce_am_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/ -MAPRED_CONF_mapreduce_map_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/ -MAPRED_CONF_mapreduce_reduce_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/ \ No newline at end of file +CORE_CONF_fs_defaultFS=hdfs://namenode:9000 +CORE_CONF_hadoop_http_staticuser_user=root +CORE_CONF_hadoop_proxyuser_hue_hosts=* +CORE_CONF_hadoop_proxyuser_hue_groups=* +CORE_CONF_io_compression_codecs=org.apache.hadoop.io.compress.SnappyCodec + +HDFS_CONF_dfs_webhdfs_enabled=true +HDFS_CONF_dfs_permissions_enabled=false +HDFS_CONF_dfs_namenode_datanode_registration_ip___hostname___check=false + +YARN_CONF_yarn_log___aggregation___enable=true +YARN_CONF_yarn_log_server_url=http://historyserver:8188/applicationhistory/logs/ +YARN_CONF_yarn_resourcemanager_recovery_enabled=true +YARN_CONF_yarn_resourcemanager_store_class=org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore +YARN_CONF_yarn_resourcemanager_scheduler_class=org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler +YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___mb=8192 +YARN_CONF_yarn_scheduler_capacity_root_default_maximum___allocation___vcores=4 +YARN_CONF_yarn_resourcemanager_fs_state___store_uri=/rmstate +YARN_CONF_yarn_resourcemanager_system___metrics___publisher_enabled=true +YARN_CONF_yarn_resourcemanager_hostname=resourcemanager +YARN_CONF_yarn_resourcemanager_address=resourcemanager:8032 +YARN_CONF_yarn_resourcemanager_scheduler_address=resourcemanager:8030 +YARN_CONF_yarn_resourcemanager_resource__tracker_address=resourcemanager:8031 +YARN_CONF_yarn_timeline___service_enabled=true +YARN_CONF_yarn_timeline___service_generic___application___history_enabled=true +YARN_CONF_yarn_timeline___service_hostname=historyserver +YARN_CONF_mapreduce_map_output_compress=true +YARN_CONF_mapred_map_output_compress_codec=org.apache.hadoop.io.compress.SnappyCodec +YARN_CONF_yarn_nodemanager_resource_memory___mb=163840 +YARN_CONF_yarn_nodemanager_resource_cpu___vcores=80 +YARN_CONF_yarn_nodemanager_disk___health___checker_max___disk___utilization___per___disk___percentage=98.5 +YARN_CONF_yarn_nodemanager_remote___app___log___dir=/app-logs +YARN_CONF_yarn_nodemanager_aux___services=mapreduce_shuffle + +MAPRED_CONF_mapreduce_framework_name=yarn +MAPRED_CONF_mapred_child_java_opts=-Xmx4096m +MAPRED_CONF_mapreduce_map_memory_mb=4096 +MAPRED_CONF_mapreduce_reduce_memory_mb=8192 +MAPRED_CONF_mapreduce_map_java_opts=-Xmx3072m +MAPRED_CONF_mapreduce_reduce_java_opts=-Xmx6144m +MAPRED_CONF_yarn_app_mapreduce_am_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/ +MAPRED_CONF_mapreduce_map_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/ +MAPRED_CONF_mapreduce_reduce_env=HADOOP_MAPRED_HOME=/data/docker-compose/hadoop-3.2.1/ diff --git a/e2e_test/tools/config.py b/e2e_test/tools/config.py index c006cd4d9c..45ca9935b1 100644 --- a/e2e_test/tools/config.py +++ b/e2e_test/tools/config.py @@ -1,9 +1,63 @@ +import sys +from logger import log + +dinky_addr = sys.argv[1] +flink_version = sys.argv[2] +dinky_app_jar = 'dinky-app.jar' + # standalone standalone_address = "jobmanager:8282" # yarn -yarn_flink_lib="/flink/lib" -yarn_flink_conf="/flink/conf" -yarn_hadoop_conf="/flink/conf" -yarn_dinky_app_jar="/dinky/jar" +yarn_flink_lib = "/opt/flink/lib" +yarn_flink_conf = "/opt/flink/conf" +yarn_hadoop_conf = "/opt/flink/conf" +yarn_dinky_app_jar = "/dinky/jar" + +podTemplate=""" +apiVersion: v1 +kind: Pod +metadata: + name: jobmanager-pod-template +spec: + initContainers: + - name: artifacts-fetcher-dinky + image: library/busybox:latest + imagePullPolicy: Never + # Use wget or other tools to get user jars from remote storage + command: [ 'wget', 'http://172.28.0.1:9001/dinky-app.jar', '-O', '/flink-usrlib/dinky-app.jar' ] + volumeMounts: + - mountPath: /flink-usrlib + name: flink-usrlib + - name: artifacts-fetcher-mysql + image: library/busybox:latest + imagePullPolicy: Never + # Use wget or other tools to get user jars from remote storage + command: [ 'wget', 'http://172.28.0.1:9001/mysql-connector-java-8.0.30.jar', '-O', '/flink-usrlib/mysql-connector-java-8.0.30.jar' ] + volumeMounts: + - mountPath: /flink-usrlib + name: flink-usrlib + + containers: + # Do not change the main container name + - name: flink-main-container + resources: + requests: + ephemeral-storage: 2048Mi + limits: + ephemeral-storage: 2048Mi + volumeMounts: + - mountPath: /opt/flink/usrlib + name: flink-usrlib + volumes: + - name: flink-usrlib + emptyDir: { } + +""" +log.info(f""" +==================================================== + all config dinky address: {dinky_addr} + flink version: {flink_version} +==================================================== +""") diff --git a/e2e_test/tools/dinky_task/flink_jar_sql/mysql2doris.sql b/e2e_test/tools/dinky_task/flink_jar_sql/mysql2doris.sql new file mode 100644 index 0000000000..e2e109cb60 --- /dev/null +++ b/e2e_test/tools/dinky_task/flink_jar_sql/mysql2doris.sql @@ -0,0 +1,9 @@ +set 'execution.checkpointing.interval'='5 s'; +ADD CUSTOMJAR 'rs:/flink-sql-connector-mysql-cdc.jar'; +ADD CUSTOMJAR 'rs:/mysql-connector-java.jar'; +EXECUTE JAR WITH ( +'uri'='rs:/flink-doris-connector.jar', +'main-class'='org.apache.doris.flink.tools.cdc.CdcTools', +'args'='base64@bXlzcWwtc3luYy1kYXRhYmFzZSAgICAgLS1kYXRhYmFzZSBkaW5reSAgICAgLS1teXNxbC1jb25mIGhvc3RuYW1lPW15c3FsICAgICAtLW15c3FsLWNvbmYgcG9ydD0zMzA2ICAgICAtLW15c3FsLWNvbmYgdXNlcm5hbWU9cm9vdCAgICAgLS1teXNxbC1jb25mIHBhc3N3b3JkPWRpbmt5ICAgICAtLW15c3FsLWNvbmYgZGF0YWJhc2UtbmFtZT1kaW5reSAgICAgLS1teXNxbC1jb25mIHNlcnZlci10aW1lLXpvbmU9QXNpYS9TaGFuZ2hhaSAgICAgLS1pbmNsdWRpbmctdGFibGVzICJkaW5reV90YXNrIiAgICAgLS1zaW5rLWNvbmYgZmVub2Rlcz1kb3Jpcy1mZTo4MDMwICAgICAtLXNpbmstY29uZiB1c2VybmFtZT1yb290ICAgICAtLXNpbmstY29uZiBqZGJjLXVybD1qZGJjOm15c3FsOi8vZG9yaXMtZmU6OTAzMCAgICAgLS1zaW5rLWNvbmYgc2luay5sYWJlbC1wcmVmaXg9bGFiZWwtMSAgICAgLS10YWJsZS1jb25mIHJlcGxpY2F0aW9uX251bT0xIA==', +'allowNonRestoredState'='false' +); diff --git a/e2e_test/tools/dinky_task/flink_sql/flink-sql-datagen-test.sql b/e2e_test/tools/dinky_task/flink_sql/flink-sql-datagen-test.sql new file mode 100644 index 0000000000..3c288a15df --- /dev/null +++ b/e2e_test/tools/dinky_task/flink_sql/flink-sql-datagen-test.sql @@ -0,0 +1,59 @@ +DROP TABLE IF EXISTS source_table3; + +CREATE TABLE IF NOT EXISTS source_table3 ( + -- 订单id + `order_id` BIGINT, + --产品 + `product` BIGINT, + --金额 + `amount` BIGINT, + -- 支付时间 + `order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片` + --WATERMARK + WATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND +) +WITH + ( + 'connector' = 'datagen', + 'rows-per-second' = '1', + 'fields.order_id.min' = '1', + 'fields.order_id.max' = '2', + 'fields.amount.min' = '1', + 'fields.amount.max' = '10', + 'fields.product.min' = '1', + 'fields.product.max' = '2' + ); + +-- SELECT * FROM source_table3 LIMIT 10; +DROP TABLE IF EXISTS sink_table5; + +CREATE TABLE IF NOT EXISTS sink_table5 ( + --产品 + `product` BIGINT, + --金额 + `amount` BIGINT, + --支付时间 + `order_time` TIMESTAMP(3), + -- 1分钟时间聚合总数 + `one_minute_sum` BIGINT +) +WITH + ('connector' = 'print'); + +INSERT INTO + sink_table5 +SELECT + product, + amount, + order_time, + SUM(amount) OVER ( + PARTITION BY + product + ORDER BY + order_time + -- 标识统计范围是1个 product 的最近 1 分钟的数据 + RANGE BETWEEN INTERVAL '1' MINUTE PRECEDING + AND CURRENT ROW + ) as one_minute_sum +FROM + source_table3; diff --git a/e2e_test/tools/dinky_task/flink_sql/flink-sql-lineage-test.sql b/e2e_test/tools/dinky_task/flink_sql/flink-sql-lineage-test.sql new file mode 100644 index 0000000000..fb32f14a33 --- /dev/null +++ b/e2e_test/tools/dinky_task/flink_sql/flink-sql-lineage-test.sql @@ -0,0 +1,313 @@ + +create table ods_order_sale_goods_snapshot_r +( + store_code string, + --import_code string, + good_code string, + create_time string, + category_code string, + purchase_kind_code string, + modify_time string, + is_unfree_goods char, + is_unmember_goods char, + member_price decimal(22,4), + retail_price decimal(22,4), + purchase_kind_des string, + sale_order_code string, + id string, + category_des string, + proc_time as proctime(), + sale_time TIMESTAMP(3) , + origin_table STRING , + PRIMARY KEY (id) NOT ENFORCED +) with ( + 'connector'='datagen' + ); + +create table ods_order_sale_order_details_r +( + --import_code string, + member_coupon_type string, + lot_num string, + item_type string, + modify_time string, + perform_price_type string, + row_num int, + account_price decimal(22,4), + perform_price decimal(22,4), + sale_order_code string, + refer_bill_code string, + id string, + cost_price decimal(22,4), + store_code string, + account_money decimal(22,4), + good_code string, + quantity decimal(22,4), + apportion_price decimal(22,4), + create_time string, + --is_preorder string, + perform_profit_margin decimal(22,4), + --promotionbill_plan_code string, + service_charge decimal(22,4), + promotionbill_plan_type string, + account_price_gross_money decimal(22,4), + refer_bill_type string, + refer_bill_row_num int, + output_tax string, + --useful int, + proc_time as proctime(), + sale_time TIMESTAMP(3) , + PRIMARY KEY (sale_order_code,row_num,lot_num,good_code) NOT ENFORCED +) with ( + 'connector'='datagen' + ); + +create table ods_order_sales_ordersalesman_r +( + store_code string, + create_time string, + edit_time string, + sale_order_code string, + sales_job_number string, + id string, + row_num int, + sale_details_id string, + sale_time TIMESTAMP(3) , + PRIMARY KEY (id) NOT ENFORCED +) with ( + 'connector'='datagen' + ); + +create table ods_order_sale_order_r +( + --import_code string, + modify_time string, + --around_money decimal(22,4), + --odd_change_money decimal, + --customer_pay_money decimal, + sale_order_code string, + other_free_money decimal, + gift_free_money decimal, + bill_type string, + refer_bill_code string, + pair_count decimal, + store_code string, + member_id string, + cashdesk_id string, + point_number decimal, + coupon_code string, + create_time string, + casher_code string, + bill_kind string, + order_code string, + order_from string, + refer_bill_type string, + coupon_plan_code string, + cash_free_money decimal, + coupon_type string, + proc_time as proctime(), + sale_time TIMESTAMP(3) , + PRIMARY KEY (sale_order_code) NOT ENFORCED +) with ( + 'connector'='datagen' + ); + + +CREATE TABLE IF NOT EXISTS ods_zt_ord_order_r ( + id string, + parent_order_code string, + order_code string, + customer_code string, + top_channel_code string, + goods_channel_code string, + pos_code string, + channel_code string, + warehouse_code string, + state string, + need_receive_amount decimal(22,4), + customer_need_pay_amount decimal(22,4), + goods_total_amount decimal(22,4), + expense_total_amount decimal(22,4), + preferential_total_amount decimal(22,4), + subsidy_total_amount decimal(22,4), + decucted_total_amount decimal(22,4), + exercise_total_amount decimal(22,4), + ordonnance_id string, + splited string, + delivery_provider string, + payee string, + referrer_id string, + ticket_num string, + referrer_name string, + out_trade_code string, + remark string, + creater_id string, + create_time string, + modify_time string, + del_flag string, + from_warehouse_code string, + order_type int, + PRIMARY KEY (`id`) NOT ENFORCED + ) WITH ( + 'connector'='datagen' + ); + +CREATE TABLE IF NOT EXISTS dwd_sd_so_ordr_detl_r ( + stsc_date string comment '统计日期-天', + sale_ordr_doc string comment '销售订单编号', + --store_code string comment '门店编码', + ordr_sour_code string comment '订单来源编码', + sale_order_proc_time timestamp comment '', + ordr_sale_time string comment '', + prmn_prog_type_code string comment '促销方案类型编码', + ordr_type_code string comment '订单类型编码(销售单/退货单)', + ordr_cate_code string comment '订单类别编码(标准/团购单/赠品记账(积分兑奖)/财务记账(订金))', + prmn_prog_code string comment '促销方案编码', + coup_type string comment '', + coup_code string comment '', + line_item INT comment '行项目', + refn_ordr_type_code string comment '参考单据类型编码(销售单/团购单/处方单/订金退货申请单)', + refn_ordr_doc string comment '参考单据编号', + refn_ordr_item INT comment '参考单据行项目号', + memb_disc_mode string comment '会员优惠方式(会员折扣/会员价/VIP会员价)', + --proj_cate string comment '', + goods_code string comment '商品编码', + sale_order_details_proc_time timestamp comment '', + cate_clas_code string comment '', + cate_clas_name string comment '', + purc_clas_code_new string comment '', + purc_clas_name_new string comment '', + lotn string comment '批号', + sale_tax string comment '', + memb_id string comment '会员ID', + memb_point decimal comment '', + chk_out_id string comment '', + casr_id string comment '', + is_memb_goods string comment '非会员商品标识', + retail_pric decimal(22,4) comment '零售单价', + memb_pric decimal(22,4) comment '会员单价', + exec_pric_type string comment '执行价类型编码', + --befo_divid_pric decimal(22,4) comment '', + divid_pric decimal(22,4) comment '医保单价', + acnt_pric decimal(22,4) comment '记账单价', + exec_pric decimal(22,4) comment '执行单价', + dct_amt decimal comment '', + gift_dct_amt decimal comment '', + other_free_amt decimal comment '', + amt1 decimal(22,4) comment '', + amt2 decimal(22,4) comment '', + sale_amt decimal(22,4) comment '销售金额', + sale_qty decimal(22,4) comment '销售数量', + china_med_qty decimal comment '', + etl_time timestamp comment '', + is_n_surd_prof string comment '不让利商品标识', + cost_pric decimal(22,4) comment '成本单价', + cost_amt decimal(22,4) comment '', + --sale_cost_amt decimal(22,4) comment '销售成本额', + is_effe_ordr string comment '是否有效订单', + order_code string comment '', + is_ecp_self_dstn_ordr string comment '是否ECP自配送订单', + stat_date string comment '', + proj_cate_code string comment '项目类别编码', + purchase_kind_code string comment '项目类别编码', + sale_goods_snapshot_proc_time timestamp comment '', + casr_code string comment '收银员编码', + service_charge decimal(22,4) comment '', + sale_pers_id string comment '营销员ID', + phmc_code string comment '门店编码', + out_phmc_code string comment '出货门店', + is_ydch_flag string comment '是否异店出货', + coup_prog_code string comment '券方案编码', + PRIMARY KEY (stsc_date,sale_ordr_doc,goods_code,line_item,lotn) NOT ENFORCED + ) WITH ( + 'connector'='print' + ); + + +insert into dwd_sd_so_ordr_detl_r +select ifnull(date_format(cast(t1.sale_time as timestamp), 'yyyyMMdd'), 'NA') as stsc_date + , t1.sale_order_code as sale_ordr_doc + --, t1.store_code as store_code + , (case when coalesce(t.order_from, '') = '' then '9999' else t.order_from end) as ordr_sour_code + , t.proc_time as sale_order_proc_time + , date_format(cast(t1.sale_time as timestamp),'yyyy-MM-dd HH:mm:ss') as ordr_sale_time + , t1.promotionbill_plan_type as prmn_prog_type_code + , t.bill_type as ordr_type_code + , t.bill_kind as ordr_cate_code + , t1.promotionbill_plan_type as prmn_prog_code + , t.coupon_type as coup_type + , t.coupon_code as coup_code + , t1.row_num as line_item + , t1.refer_bill_type as refn_ordr_type_code + , t1.refer_bill_code as refn_ordr_doc + , t1.refer_bill_row_num as refn_ordr_item + , t1.member_coupon_type as memb_disc_mode + --, t1.item_type as proj_cate + , t1.good_code as goods_code + , t1.proc_time as sale_order_details_proc_time + , t2.category_code as cate_clas_code + , t2.category_des as cate_clas_name + , t2.purchase_kind_code as purc_clas_code_new + , t2.purchase_kind_des as purc_clas_name_new + , t1.lot_num as lotn + , t1.output_tax as sale_tax + , t.member_id as memb_id + , t.point_number as memb_point + , t.cashdesk_id as chk_out_id + ,case when coalesce(t.casher_code , '') = '' then 'NA' else LPAD(CAST(t.casher_code as string), 8, '0') end as casr_id + , t2.is_unmember_goods as is_memb_goods + , t2.retail_price as retail_pric + , t2.member_price as memb_pric + , t1.perform_price_type as exec_pric_type + --, t1.perform_price as befo_divid_pric + , t1.apportion_price as divid_pric + , t1.account_price as acnt_pric + , t1.perform_price as exec_pric + , t.cash_free_money as dct_amt + , t.gift_free_money as gift_dct_amt + , t.other_free_money as other_free_amt + , t1.perform_profit_margin as amt1 + , t1.account_price_gross_money as amt2 + , t1.account_money as sale_amt + , t1.quantity as sale_qty + , t.pair_count as china_med_qty + , current_timestamp as etl_time + , t2.is_unfree_goods as is_n_surd_prof + , t1.cost_price as cost_pric + , t1.cost_price * t1.quantity as cost_amt + --, t1.cost_price * t1.quantity as sale_cost_amt + , case when t.bill_kind in ('3', '4', '15') then 'N' else 'Y' end as is_effe_ordr + , coalesce(t.order_code, t.sale_order_code) as order_code + , 'N' as is_ecp_self_dstn_ordr + , date_format(cast(t1.sale_time as timestamp), 'yyyyMMdd') as stat_date + , t1.item_type as proj_cate_code + , t2.purchase_kind_code as purchase_kind_code + , t2.proc_time as sale_goods_snapshot_proc_time + , t.casher_code as casr_code + , t1.service_charge as service_charge + ,case when coalesce(t3.sales_job_number , '') = '' then 'NA' else LPAD(CAST(t3.sales_job_number as string), 8, '0') end as sale_pers_id + ,case when t4.from_warehouse_code is not null then t4.from_warehouse_code + when coalesce(trim(t1.store_code),'')='' then 'NA' + else trim(t1.store_code) end as phmc_code + ,case when t4.warehouse_code is not null then t4.warehouse_code + when coalesce(trim(t1.store_code),'')='' then 'NA' + else trim(t1.store_code) end as out_phmc_code + ,if(t4.pos_code is not null, 'Y', 'N') as is_ydch_flag + ,t.coupon_plan_code as coup_prog_code +from ods_order_sale_order_r t + inner join ods_order_sale_order_details_r t1 + on t.sale_order_code = t1.sale_order_code + and date_format(cast(t1.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') + left join ods_order_sale_goods_snapshot_r t2 + on t1.sale_order_code = t2.sale_order_code + and t1.good_code = t2.good_code + and t2.origin_table='sale_goods_snapshot' + and date_format(cast(t2.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') + left join ods_order_sales_ordersalesman_r t3 + on t1.sale_order_code = t3.sale_order_code + and t1.row_num = t3.row_num + and date_format(cast(t3.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd') + left join ods_zt_ord_order_r t4 + on t.sale_order_code = t4.pos_code + and t4.order_type =1 +where date_format(cast(t.sale_time as timestamp), 'yyyyMMdd')=date_format(localtimestamp,'yyyyMMdd'); diff --git a/e2e_test/tools/dinky_task/udf/JAVA_UDF.java b/e2e_test/tools/dinky_task/udf/JAVA_UDF.java new file mode 100644 index 0000000000..b9253588c3 --- /dev/null +++ b/e2e_test/tools/dinky_task/udf/JAVA_UDF.java @@ -0,0 +1,9 @@ +package com.javaudf; + +import org.apache.flink.table.functions.ScalarFunction; + +public class demo extends ScalarFunction { + public String eval(String s) { + return "this is java udf "+s; + } +} diff --git a/e2e_test/tools/dinky_task/udf/PYTHON_UDF.py b/e2e_test/tools/dinky_task/udf/PYTHON_UDF.py new file mode 100644 index 0000000000..b8db9ea0f5 --- /dev/null +++ b/e2e_test/tools/dinky_task/udf/PYTHON_UDF.py @@ -0,0 +1,13 @@ +from pyflink.table import ScalarFunction, DataTypes +from pyflink.table.udf import udf + + +class com(ScalarFunction): + def __init__(self): + pass + + def eval(self, variable): + return str(variable) + + +python = udf(com(), result_type=DataTypes.STRING()) diff --git a/e2e_test/tools/dinky_task/udf/SCALA_UDF.scala b/e2e_test/tools/dinky_task/udf/SCALA_UDF.scala new file mode 100644 index 0000000000..48e047da63 --- /dev/null +++ b/e2e_test/tools/dinky_task/udf/SCALA_UDF.scala @@ -0,0 +1,11 @@ +package com.scala.udf; + +import org.apache.flink.table.api._ +import org.apache.flink.table.functions.ScalarFunction + +// 定义可参数化的函数逻辑 +class demo extends ScalarFunction { + def eval(s: String, begin: Integer, end: Integer): String = { + "this is scala"+s + } +} diff --git a/e2e_test/tools/env.py b/e2e_test/tools/env.py index d68e663a8a..0ce58b4f6c 100644 --- a/e2e_test/tools/env.py +++ b/e2e_test/tools/env.py @@ -1,3 +1,5 @@ +from typing import Optional + from requests import Session import urllib.parse as urlparse from hdfs.client import Client @@ -29,7 +31,20 @@ def addStandaloneCluster(session: Session) -> int: raise Exception(f"Cluster {name} not found") -def addYarnCluster(session: Session) -> int: +def addApplicationCluster(session: Session, params: dict) -> Optional[int]: + name = params['name'] + test_connection_yarn_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params) + assertRespOk(test_connection_yarn_resp, "Test yarn connectivity") + test_connection_yarn_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params) + assertRespOk(test_connection_yarn_resp, "Add Yarn Application Cluster") + get_app_list = session.get(url(f"api/clusterConfiguration/list?keyword={name}"), json=params) + assertRespOk(get_app_list, "Get Yarn Application Cluster") + for data in get_app_list.json()["data"]: + if data["name"] == name: + return data['id'] + + +def addYarnCluster(session: Session) -> Optional[int]: client = Client("http://namenode:9870") flink_lib_path = yarn_flink_lib client.makedirs(flink_lib_path) @@ -38,14 +53,9 @@ def addYarnCluster(session: Session) -> int: for file in files: filepath = os.path.join(root, file) client.upload(flink_lib_path + "/" + file, filepath) - jar_path = yarn_dinky_app_jar - client.makedirs(jar_path) - for root, dirs, files in os.walk(jar_path): - for file in files: - if file.endswith(".jar") and file.__contains__("dinky-app"): - filepath = os.path.join(root, file) - jar_path = filepath - client.upload(jar_path, filepath) + client.makedirs(yarn_dinky_app_jar) + dinky_app_hdfs_jar_path = yarn_dinky_app_jar + "/" + dinky_app_jar + client.upload(dinky_app_hdfs_jar_path, dinky_app_hdfs_jar_path) name = "yarn-test" params = { "type": "yarn-application", @@ -67,17 +77,63 @@ def addYarnCluster(session: Session) -> int: } }, "appConfig": { - "userJarPath": "hdfs://" + jar_path + "userJarPath": "hdfs://" + dinky_app_hdfs_jar_path } } } log.info(f"Adding yarn application cluster, parameters:{params}") - test_connection_yarn_resp = session.post(url("api/clusterConfiguration/testConnect"), json=params) - assertRespOk(test_connection_yarn_resp, "Test yarn connectivity") - test_connection_yarn_resp = session.put(url("api/clusterConfiguration/saveOrUpdate"), json=params) - assertRespOk(test_connection_yarn_resp, "Add Yarn Application Cluster") - get_app_list = session.get(url(f"api/clusterConfiguration/list?keyword={name}"), json=params) - assertRespOk(get_app_list, "Get Yarn Application Cluster") - for data in get_app_list.json()["data"]: - if data["name"] == name: - return data['id'] + return addApplicationCluster(session, params) + + +def addK8sNativeCluster(session: Session) -> Optional[int]: + with open('/kube/k3s.yaml', 'r') as f: + kube_config_content = f.read() + name = "k8s-native-test" + params = { + "type": "kubernetes-application", + "name": name, + "enabled": True, + "config": { + "kubernetesConfig": { + "configuration": { + "kubernetes.rest-service.exposed.type": "NodePort", + "kubernetes.namespace": "dinky", + "kubernetes.service-account": "dinky", + "kubernetes.container.image": f"dinky/flink:flink" + }, + "ingressConfig": { + "kubernetes.ingress.enabled": False + }, + "kubeConfig": kube_config_content, + "podTemplate": podTemplate, + }, + "clusterConfig": { + "flinkConfigPath": "/opt/flink/conf" + }, + "flinkConfig": { + "flinkConfigList": [ + { + "name": "user.artifacts.raw-http-enabled", + "value": "true" + }, + { + "name": "kubernetes.flink.conf.dir", + "value": "/opt/flink/conf" + }, + { + "name": "kubernetes.container.image.pull-policy", + "value": "Never" + } + ], + "configuration": { + "jobmanager.memory.process.size": "1024mb", + "taskmanager.memory.process.size": "1024mb" + } + }, + "appConfig": { + "userJarPath": "local:/opt/flink/usrlib/" + dinky_app_jar, + } + } + } + log.info(f"Adding k8s native application cluster, parameters:{params}") + return addApplicationCluster(session, params) diff --git a/e2e_test/tools/httpUtil.py b/e2e_test/tools/httpUtil.py index d491a3a462..a4c1f60203 100644 --- a/e2e_test/tools/httpUtil.py +++ b/e2e_test/tools/httpUtil.py @@ -1,10 +1,8 @@ -import sys -from logger import log -from requests import Response from json import JSONDecodeError -dinky_addr = sys.argv[1] -log.info(f"The address of the current request:{dinky_addr}") +from requests import Response + +from config import dinky_addr def url(path: str): diff --git a/e2e_test/tools/login.py b/e2e_test/tools/login.py index 676cf8c4c6..f5b829cc95 100644 --- a/e2e_test/tools/login.py +++ b/e2e_test/tools/login.py @@ -18,3 +18,18 @@ def login(session: requests.Session): choose_tenant_resp = session.post(url("api/chooseTenant?tenantId=1")) assertRespOk(choose_tenant_resp, "Choose Tenant") session.cookies.set("tenantId", '1') + +def changeFlinkJobWaitTime(session: requests.Session): + log.info("Change Flink Job Waiting Time: 120 s") + resp: Response = session.post(url("api/sysConfig/modifyConfig"), + json={ + "key": "sys.flink.settings.jobIdWait", + "name": "Job 提交等待时间", + "frontType": "number", + "example": [], + "note": "提交 Application 或 PerJob 任务时获取 Job ID 的最大等待时间(秒)", + "defaultValue": 30, + "value": "120", + "index": 1 + }) + assertRespOk(resp, "ChangeFlinkJobWaitTime") diff --git a/e2e_test/tools/main.py b/e2e_test/tools/main.py index 5f18e656d7..d82644957a 100644 --- a/e2e_test/tools/main.py +++ b/e2e_test/tools/main.py @@ -1,15 +1,35 @@ +import os + import requests -from env import addStandaloneCluster, addYarnCluster -from login import login +from env import addStandaloneCluster, addYarnCluster, addK8sNativeCluster +from login import login, changeFlinkJobWaitTime from task import addCatalogue, Task + +def traverse_files(directory) -> list[str]: + for root, dirs, files in os.walk(directory): + return files + + if __name__ == '__main__': session = requests.session() login(session) + changeFlinkJobWaitTime(session) clusterId = addStandaloneCluster(session) yarn_cluster_id = addYarnCluster(session) + k8s_native_cluster_id = addK8sNativeCluster(session) catalogue = addCatalogue(session, "flink-sql-task") - sql = "DROP TABLE IF EXISTS source_table3;\r\nCREATE TABLE IF NOT EXISTS source_table3(\r\n--订单id\r\n`order_id` BIGINT,\r\n--产品\r\n\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n\r\n--支付时间\r\n`order_time` as CAST(CURRENT_TIMESTAMP AS TIMESTAMP(3)), -- `在这里插入代码片`\r\n--WATERMARK\r\nWATERMARK FOR order_time AS order_time - INTERVAL '2' SECOND\r\n) WITH(\r\n'connector' = 'datagen',\r\n 'rows-per-second' = '1',\r\n 'fields.order_id.min' = '1',\r\n 'fields.order_id.max' = '2',\r\n 'fields.amount.min' = '1',\r\n 'fields.amount.max' = '10',\r\n 'fields.product.min' = '1',\r\n 'fields.product.max' = '2'\r\n);\r\n\r\n-- SELECT * FROM source_table3 LIMIT 10;\r\n\r\nDROP TABLE IF EXISTS sink_table5;\r\nCREATE TABLE IF NOT EXISTS sink_table5(\r\n--产品\r\n`product` BIGINT,\r\n--金额\r\n`amount` BIGINT,\r\n--支付时间\r\n`order_time` TIMESTAMP(3),\r\n--1分钟时间聚合总数\r\n`one_minute_sum` BIGINT\r\n) WITH(\r\n'connector'='print'\r\n);\r\n\r\nINSERT INTO sink_table5\r\nSELECT\r\nproduct,\r\namount,\r\norder_time,\r\nSUM(amount) OVER(\r\nPARTITION BY product\r\nORDER BY order_time\r\n-- 标识统计范围是1个 product 的最近 1 分钟的数据\r\nRANGE BETWEEN INTERVAL '1' MINUTE PRECEDING AND CURRENT ROW\r\n) as one_minute_sum\r\nFROM source_table3;" - flink_sql_datagen_test = Task(session, clusterId, yarn_cluster_id, catalogue.id, "flink-sql-datagen-test", sql) - flink_sql_datagen_test.runFlinkTask(wait_time=10, is_async=True) + + flink_sql_datagen_test = Task(session, clusterId, yarn_cluster_id, k8s_native_cluster_id, catalogue.id) + flink_sql_task_path = "dinky_task/flink_sql" + for file_name in traverse_files(flink_sql_task_path): + sql = open(os.path.join(flink_sql_task_path, file_name)).read() + task_name = file_name.split(".")[0] + flink_sql_datagen_test.runFlinkTask(sql, task_name, is_async=True) + + # flink_jar_sql_task_path = "dinky_task/flink_jar_sql" + # for file_name in traverse_files(flink_jar_sql_task_path): + # sql = open(os.path.join(flink_jar_sql_task_path, file_name)).read() + # task_name = file_name.split(".")[0] + # flink_sql_datagen_test.runFlinkTask(sql, task_name,"FlinkJar", is_async=True) diff --git a/e2e_test/tools/task.py b/e2e_test/tools/task.py index 07b52db7b3..9fef4d9c59 100644 --- a/e2e_test/tools/task.py +++ b/e2e_test/tools/task.py @@ -3,6 +3,9 @@ import requests import concurrent.futures + +from requests import Response + from login import assertRespOk, url from logger import log @@ -19,21 +22,33 @@ class FlinkRunMode(Enum): LOCAL = "local" STANDALONE = "standalone" YARN_APPLICATION = "yarn-application" + KUBERNETES_APPLICATION = "kubernetes-application" @staticmethod def getAllMode(): - return [FlinkRunMode.LOCAL, FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION] + # todo 这里暂时剔除 local,因为并发场景下,会出现接口卡住问题 + return [FlinkRunMode.STANDALONE, FlinkRunMode.YARN_APPLICATION, FlinkRunMode.KUBERNETES_APPLICATION] class Task: - def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int, parent_id: int, name: str, - statement): + def __init__(self, session: requests.Session, cluster_id: int, yarn_cluster_id: int, k8s_native_cluster_id: int, + parent_id: int): self.session = session self.cluster_id = cluster_id self.yarn_cluster_id = yarn_cluster_id + self.k8s_native_cluster_id = k8s_native_cluster_id self.parent_id = parent_id - self.name = name - self.statement = statement + def addBaseTask(self, params:dict): + session = self.session + add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json=params) + assertRespOk(add_parent_dir_resp, "Create a task") + get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={ + "sortValue": "", + "sortType": "" + }) + assertRespOk(get_all_tasks_resp, "Get job details") + data_list: list[dict] = get_all_tasks_resp.json()['data'] + return getTask(data_list, params['name']) def addTask(self, name: str, parent_id: int = 0, dialect: str = "FlinkSql", statement: str = "", run_model: FlinkRunMode = FlinkRunMode.LOCAL) -> CatalogueTree: @@ -46,7 +61,6 @@ def addTask(self, name: str, parent_id: int = 0, dialect: str = "FlinkSql", :return CatalogueTree """ model_str = run_model.value - session = self.session params = { "name": name, "type": dialect, @@ -69,15 +83,9 @@ def addTask(self, name: str, parent_id: int = 0, dialect: str = "FlinkSql", params["task"]["clusterId"] = self.cluster_id elif run_model == FlinkRunMode.YARN_APPLICATION: params["task"]["clusterConfigurationId"] = self.yarn_cluster_id - add_parent_dir_resp = session.put(url("api/catalogue/saveOrUpdateCatalogueAndTask"), json=params) - assertRespOk(add_parent_dir_resp, "Create a task") - get_all_tasks_resp = session.post(url("api/catalogue/getCatalogueTreeData"), json={ - "sortValue": "", - "sortType": "" - }) - assertRespOk(get_all_tasks_resp, "Get job details") - data_list: list[dict] = get_all_tasks_resp.json()['data'] - return getTask(data_list, name) + elif run_model == FlinkRunMode.KUBERNETES_APPLICATION: + params["task"]["clusterConfigurationId"] = self.k8s_native_cluster_id + return self.addBaseTask(params) def getFlinkTaskStatus(self, jobInstanceId: int) -> str: """ @@ -102,32 +110,36 @@ def runTask(self, taskId: int) -> int: assertRespOk(run_task_resp, "Run Task") return run_task_resp.json()['data']['jobInstanceId'] - def runFlinkTask(self, modes: list[FlinkRunMode] = FlinkRunMode.getAllMode(), wait_time: int = 10, - is_async: bool = False): - name = self.name - statement = self.statement + def runFlinkTask(self,statement: str, name: str,dialect:str="FlinkSql", modes: list[FlinkRunMode] = FlinkRunMode.getAllMode(), wait_time: int = 20, + is_async: bool = False ) -> CatalogueTree: parent_id = self.parent_id log.info( f"======================\nA Flink task is currently executed,name: {name}, statement: \n{statement}\n ======================") def taskFunc(mode: FlinkRunMode): - flink_task_name = name + "-" + mode.name - task = self.addTask(flink_task_name, parent_id, "FlinkSql", statement, mode) + flink_task_name = name + "-" + mode.value + task = self.addTask(flink_task_name, parent_id, dialect, statement, mode) job_instance_id = self.runTask(task.task_id) sleep(wait_time) log.info(f"正在检查:{flink_task_name}任务状态") status = self.getFlinkTaskStatus(job_instance_id) assertFlinkTaskIsRunning(status, flink_task_name) + self.stopTask(task.task_id) if is_async: with concurrent.futures.ThreadPoolExecutor() as executor: - results = [executor.submit(taskFunc, model ) for model in modes] + results = [executor.submit(taskFunc, model) for model in modes] for result in results: result.result() else: for mode in modes: taskFunc(mode) + def stopTask(self, taskId: int) -> None: + resp: Response = self.session.get(url(f"api/task/cancel?id={taskId}&withSavePoint=false&forceCancel=true")) + assertRespOk(resp, "StopTask") + + def assertFlinkTaskIsRunning(status: str, name: str): # todo 这里应该判断flink是否有抛出异常,而不是只有状态 diff --git a/e2e_test/view_k8s_all_pod_logs.sh b/e2e_test/view_k8s_all_pod_logs.sh new file mode 100644 index 0000000000..6b134a0108 --- /dev/null +++ b/e2e_test/view_k8s_all_pod_logs.sh @@ -0,0 +1,32 @@ +# 检查是否提供了命名空间参数 +if [ -z "$1" ]; then + echo "Usage: $0 " + exit 1 +fi + +NAMESPACE=$1 + +# 获取指定命名空间下的所有Pod +PODS=$(kubectl get pods -n $NAMESPACE -o jsonpath='{.items[*].metadata.name}') + +for POD in $PODS; do + echo "================= Pod: $POD 信息 =================" + kubectl describe pod $POD -n $NAMESPACE + echo "================= Init Container 日志 =================" + INIT_CONTAINERS=$(kubectl get pod $POD -n $NAMESPACE -o jsonpath='{.spec.initContainers[*].name}') + for INIT_CONTAINER in $INIT_CONTAINERS; do + echo "Init Container: $INIT_CONTAINER 的日志" + kubectl logs -n $NAMESPACE $POD -c $INIT_CONTAINER --previous 2>&1 + kubectl logs -n $NAMESPACE $POD -c $INIT_CONTAINER 2>&1 + echo "----------------------------------------------------" + done + echo "================= 普通 Container 日志 =================" + CONTAINERS=$(kubectl get pod $POD -n $NAMESPACE -o jsonpath='{.spec.containers[*].name}') + for CONTAINER in $CONTAINERS; do + echo "Container: $CONTAINER 的日志" + kubectl logs -n $NAMESPACE $POD -c $CONTAINER --previous 2>&1 + kubectl logs -n $NAMESPACE $POD -c $CONTAINER 2>&1 + echo "----------------------------------------------------" + done + echo "=====================================================" +done