To run Apache Beam Python jobs on Flink on Kubernetes, it requires several components depending on the version of Beam:
- a Flink session cluster as the actual workload runner.
- Beam Python SDK harness workers which run the Python UDFs deserialized from the Flink TaskManagers.
- a Beam JobServer which accepts job submission from the client.
- a Flink session cluster as the actual workload runner.
- Beam Python SDK harness workers which run the Python UDFs deserialized from the Flink TaskManagers.
When running with the operator, Beam Python SDK harness workers run as sidecar containers with the Flink TaskManagers.
As a prerequisite, you need to deploy the Flink Operator to your Kubernetes cluster by following the user guide.
Then depending on whether you use JobServer or not, take the following 3 or 2 steps to run a Beam WordCount Python example job with the Flink Operator. You can write a script to automate the process.
- Create a Flink session cluster with Beam WorkerPool as sidecar containers of Flink TaskManager containers with:
kubectl apply -f examples/beam/with_job_server/beam_flink_cluster.yaml
- Replace the
ARTIFACTS_DIR
with a directory (e.g.,gs://my-bucket/artifacts
) accessible from the cluster in examples/beam/with_job_server/beam_job_server.yaml, then start a Beam JobServer with:
kubectl apply -f examples/beam/with_job_server/beam_job_server.yaml
- After both the Flink cluster and the Beam JobServer are up and running, submit the example job with:
kubectl apply -f examples/beam/with_job_server/beam_wordcount_py.yaml
- Create a Flink session cluster with Beam WorkerPool as sidecar containers of Flink TaskManager containers with:
kubectl apply -f examples/beam/without_job_server/beam_flink_cluster.yaml
- After the Flink cluster is up and running, submit the example job with:
kubectl apply -f examples/beam/without_job_server/beam_wordcount_py.yaml
As for Java SDK, you can use the following file to submit the example job:
kubectl apply -f examples/beam/without_job_server/beam_wordcount_java.yaml
Currently there are 2 known issues with running Beam jobs without JobServer:
-
BEAM-9214: sometimes the job first fails with
TypeError: GetJobMetrics() missing 1 required positional argument: 'context'
, but after retry it succeeds. -
BEAM-9225: the job process doesn't exit as expected after it has changed state to DONE.
In the future, we plan to support Beam Python job as a first class job type in this operator. After that you will not need to manage the lifecycle of Flink session cluster by yourself, it would be the same as Flink job cluster.