diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000..14d9bfc4cb --- /dev/null +++ b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -0,0 +1,284 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint; +import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkKubeClient client; + + private final String clusterId; + + public KubernetesClusterDescriptor(Configuration flinkConfig, FlinkKubeClient client) { + this.flinkConfig = flinkConfig; + this.client = client; + this.clusterId = + checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClientProvider createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional restEndpoint = client.getRestEndpoint(clusterId); + + if (restEndpoint.isPresent()) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices(getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } + + @Override + public ClusterClientProvider retrieve(String clusterId) { + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployApplicationCluster( + final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + if (client.getRestService(clusterId).isPresent()) { + client.stopAndCleanupCluster(clusterId); + LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId); + } + + checkNotNull(clusterSpecification); + checkNotNull(applicationConfiguration); + + final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); + if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { + throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster." + + " Expected deployment.target=" + + KubernetesDeploymentTarget.APPLICATION.getName() + + " but actual one was \"" + + deploymentTarget + + "\""); + } + + applicationConfiguration.applyToConfiguration(flinkConfig); + + // No need to do pipelineJars validation if it is a PyFlink job. + if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) + || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) { + final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(flinkConfig); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + } + + final ClusterClientProvider clusterClientProvider = deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) + throws ClusterDeploymentException { + throw new ClusterDeploymentException("Per-Job Mode not supported by Active Kubernetes deployments."); + } + + private ClusterClientProvider deployClusterInternal( + String entryPoint, ClusterSpecification clusterSpecification, boolean detached) + throws ClusterDeploymentException { + final ClusterEntrypoint.ExecutionMode executionMode = + detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; + flinkConfig.setString(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); + + flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + try { + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map(file -> KubernetesUtils.loadPodFromTemplateFile(client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e1); + } + throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + throw new FlinkException("Could not kill Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..a1abc50d80 --- /dev/null +++ b/dinky-client/dinky-client-1.14/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,186 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava30.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KeyToPath; +import io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; + +/** + * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(FLINK_CONF_FILENAME) + .withPath(FLINK_CONF_FILENAME) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final Map propertiesMap = + getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); + data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private Map getClusterSidePropertiesMap(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + return clusterSideConfig.toMap(); + } + + @VisibleForTesting + String getFlinkConfData(Map propertiesMap) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + propertiesMap.forEach((k, v) -> { + out.print(k); + out.print(": "); + out.println(v); + }); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000..81fe9e9984 --- /dev/null +++ b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint; +import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.kubeclient.resources.KubernetesService; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkKubeClient client; + + private final String clusterId; + + public KubernetesClusterDescriptor(Configuration flinkConfig, FlinkKubeClient client) { + this.flinkConfig = flinkConfig; + this.client = client; + this.clusterId = + checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClientProvider createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional restEndpoint = client.getRestEndpoint(clusterId); + + if (restEndpoint.isPresent()) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices(getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } + + @Override + public ClusterClientProvider retrieve(String clusterId) { + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployApplicationCluster( + final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + if (client.getService(KubernetesService.ServiceType.REST_SERVICE, clusterId) + .isPresent()) { + client.stopAndCleanupCluster(clusterId); + LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId); + } + + checkNotNull(clusterSpecification); + checkNotNull(applicationConfiguration); + + final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); + if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { + throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster." + + " Expected deployment.target=" + + KubernetesDeploymentTarget.APPLICATION.getName() + + " but actual one was \"" + + deploymentTarget + + "\""); + } + + applicationConfiguration.applyToConfiguration(flinkConfig); + + // No need to do pipelineJars validation if it is a PyFlink job. + if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) + || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) { + final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(flinkConfig); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + } + + final ClusterClientProvider clusterClientProvider = deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) + throws ClusterDeploymentException { + throw new ClusterDeploymentException("Per-Job Mode not supported by Active Kubernetes deployments."); + } + + private ClusterClientProvider deployClusterInternal( + String entryPoint, ClusterSpecification clusterSpecification, boolean detached) + throws ClusterDeploymentException { + final ClusterEntrypoint.ExecutionMode executionMode = + detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; + flinkConfig.setString(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); + + flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + try { + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map(file -> KubernetesUtils.loadPodFromTemplateFile(client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e1); + } + throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + throw new FlinkException("Could not kill Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..40aca7158e --- /dev/null +++ b/dinky-client/dinky-client-1.15/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava30.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KeyToPath; +import io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; + +/** + * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(FLINK_CONF_FILENAME) + .withPath(FLINK_CONF_FILENAME) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final Map propertiesMap = + getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); + data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private Map getClusterSidePropertiesMap(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.HOST); + return clusterSideConfig.toMap(); + } + + @VisibleForTesting + String getFlinkConfData(Map propertiesMap) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + propertiesMap.forEach((k, v) -> { + out.print(k); + out.print(": "); + out.println(v); + }); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000..a13e27722a --- /dev/null +++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint; +import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkKubeClient client; + + private final String clusterId; + + public KubernetesClusterDescriptor(Configuration flinkConfig, FlinkKubeClient client) { + this.flinkConfig = flinkConfig; + this.client = client; + this.clusterId = + checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClientProvider createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional restEndpoint = client.getRestEndpoint(clusterId); + + if (restEndpoint.isPresent()) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices(getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } + + @Override + public ClusterClientProvider retrieve(String clusterId) { + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployApplicationCluster( + final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + if (client.getService(ExternalServiceDecorator.getExternalServiceName(clusterId)) + .isPresent()) { + client.stopAndCleanupCluster(clusterId); + LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId); + } + + checkNotNull(clusterSpecification); + checkNotNull(applicationConfiguration); + + final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); + if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { + throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster." + + " Expected deployment.target=" + + KubernetesDeploymentTarget.APPLICATION.getName() + + " but actual one was \"" + + deploymentTarget + + "\""); + } + + applicationConfiguration.applyToConfiguration(flinkConfig); + + // No need to do pipelineJars validation if it is a PyFlink job. + if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) + || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) { + final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(flinkConfig); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + } + + final ClusterClientProvider clusterClientProvider = deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) + throws ClusterDeploymentException { + throw new ClusterDeploymentException("Per-Job Mode not supported by Active Kubernetes deployments."); + } + + private ClusterClientProvider deployClusterInternal( + String entryPoint, ClusterSpecification clusterSpecification, boolean detached) + throws ClusterDeploymentException { + final ClusterEntrypoint.ExecutionMode executionMode = + detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; + flinkConfig.setString(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); + + flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + try { + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map(file -> KubernetesUtils.loadPodFromTemplateFile(client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e1); + } + throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + throw new FlinkException("Could not kill Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..40aca7158e --- /dev/null +++ b/dinky-client/dinky-client-1.16/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava30.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.ContainerBuilder; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KeyToPath; +import io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodBuilder; +import io.fabric8.kubernetes.api.model.Volume; +import io.fabric8.kubernetes.api.model.VolumeBuilder; + +/** + * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(FLINK_CONF_FILENAME) + .withPath(FLINK_CONF_FILENAME) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final Map propertiesMap = + getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); + data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private Map getClusterSidePropertiesMap(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.HOST); + return clusterSideConfig.toMap(); + } + + @VisibleForTesting + String getFlinkConfData(Map propertiesMap) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + propertiesMap.forEach((k, v) -> { + out.print(k); + out.print(": "); + out.println(v); + }); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000..a13e27722a --- /dev/null +++ b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -0,0 +1,286 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint; +import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkKubeClient client; + + private final String clusterId; + + public KubernetesClusterDescriptor(Configuration flinkConfig, FlinkKubeClient client) { + this.flinkConfig = flinkConfig; + this.client = client; + this.clusterId = + checkNotNull(flinkConfig.getString(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClientProvider createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional restEndpoint = client.getRestEndpoint(clusterId); + + if (restEndpoint.isPresent()) { + configuration.setString(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.setInteger(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices(getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } + + @Override + public ClusterClientProvider retrieve(String clusterId) { + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployApplicationCluster( + final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + if (client.getService(ExternalServiceDecorator.getExternalServiceName(clusterId)) + .isPresent()) { + client.stopAndCleanupCluster(clusterId); + LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId); + } + + checkNotNull(clusterSpecification); + checkNotNull(applicationConfiguration); + + final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); + if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { + throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster." + + " Expected deployment.target=" + + KubernetesDeploymentTarget.APPLICATION.getName() + + " but actual one was \"" + + deploymentTarget + + "\""); + } + + applicationConfiguration.applyToConfiguration(flinkConfig); + + // No need to do pipelineJars validation if it is a PyFlink job. + if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) + || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) { + final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(flinkConfig); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + } + + final ClusterClientProvider clusterClientProvider = deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) + throws ClusterDeploymentException { + throw new ClusterDeploymentException("Per-Job Mode not supported by Active Kubernetes deployments."); + } + + private ClusterClientProvider deployClusterInternal( + String entryPoint, ClusterSpecification clusterSpecification, boolean detached) + throws ClusterDeploymentException { + final ClusterEntrypoint.ExecutionMode executionMode = + detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; + flinkConfig.setString(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); + + flinkConfig.setString(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + try { + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map(file -> KubernetesUtils.loadPodFromTemplateFile(client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e1); + } + throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + throw new FlinkException("Could not kill Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..f851445272 --- /dev/null +++ b/dinky-client/dinky-client-1.17/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava30.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(FLINK_CONF_FILENAME) + .withPath(FLINK_CONF_FILENAME) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final Map propertiesMap = + getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); + data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private Map getClusterSidePropertiesMap(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.HOST); + return clusterSideConfig.toMap(); + } + + @VisibleForTesting + String getFlinkConfData(Map propertiesMap) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + propertiesMap.forEach((k, v) -> { + out.print(k); + out.print(": "); + out.println(v); + }); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java index 27ed7870ae..4054957436 100644 --- a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java +++ b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -21,6 +21,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; +import org.apache.commons.lang3.StringUtils; import org.apache.flink.client.deployment.ClusterDeploymentException; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.client.deployment.ClusterRetrieveException; @@ -32,6 +33,7 @@ import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; @@ -48,6 +50,11 @@ import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume; import org.apache.flink.kubernetes.utils.Constants; import org.apache.flink.kubernetes.utils.KubernetesUtils; import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; @@ -56,11 +63,16 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.shaded.guava31.com.google.common.io.Files; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import org.slf4j.Logger; @@ -250,6 +262,7 @@ private ClusterClientProvider deployClusterInternal( final KubernetesJobManagerSpecification kubernetesJobManagerSpec = KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( podTemplate, kubernetesJobManagerParameters); + mountLogConfFile(kubernetesJobManagerSpec, flinkConfig); client.createJobManagerComponent(kubernetesJobManagerSpec); @@ -267,6 +280,56 @@ private ClusterClientProvider deployClusterInternal( } } + // Mount log4j.properties, logback.xml and other log configs to the JobManager pod. + private void mountLogConfFile(KubernetesJobManagerSpecification kubernetesJobManagerSpec, Configuration flinkConfig) + throws IOException { + List accompanyingResources = kubernetesJobManagerSpec.getAccompanyingResources(); + Optional flinkConfigMapOptional = accompanyingResources.stream() + .filter(hasMetadata -> "ConfigMap".equals(hasMetadata.getKind()) + && StringUtils.startsWith(hasMetadata.getMetadata().getName(), Constants.CONFIG_MAP_PREFIX)) + .findFirst(); + + List volumes = kubernetesJobManagerSpec + .getDeployment() + .getSpec() + .getTemplate() + .getSpec() + .getVolumes(); + Optional flinkConfVolumeOptional = volumes.stream() + .filter(volume -> Constants.FLINK_CONF_VOLUME.equals(volume.getName())) + .findFirst(); + + if (!flinkConfigMapOptional.isPresent() || !flinkConfVolumeOptional.isPresent()) { + return; + } + + final String configDir = flinkConfig + .getOptional(DeploymentOptionsInternal.CONF_DIR) + .orElse(flinkConfig.get(KubernetesConfigOptions.FLINK_CONF_DIR)); + + List flinkLogConfFileNameList = Arrays.asList( + "log4j.properties", + "log4j-session.properties", + "logback.xml", + "logback-session.xml", + "log4j-cli.properties"); + Map flinkConfigMapData = ((ConfigMap) flinkConfigMapOptional.get()).getData(); + List flinkConfVolumeItems = + flinkConfVolumeOptional.get().getConfigMap().getItems(); + + for (String flinkLogConfFileName : flinkLogConfFileNameList) { + File localLogConfFile = new File(configDir, flinkLogConfFileName); + if (localLogConfFile.exists()) { + flinkConfVolumeItems.add(new KeyToPathBuilder() + .withKey(flinkLogConfFileName) + .withPath(flinkLogConfFileName) + .build()); + flinkConfigMapData.put( + localLogConfFile.getName(), Files.toString(localLogConfFile, StandardCharsets.UTF_8)); + } + } + } + @Override public void killCluster(String clusterId) throws FlinkException { try { diff --git a/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..06f2acb0cf --- /dev/null +++ b/dinky-client/dinky-client-1.18/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,192 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.configuration.GlobalConfiguration.FLINK_CONF_FILENAME; +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava31.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Mounts the log4j.properties, logback.xml, and flink-conf.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(FLINK_CONF_FILENAME) + .withPath(FLINK_CONF_FILENAME) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final Map propertiesMap = + getClusterSidePropertiesMap(kubernetesComponentConf.getFlinkConfiguration()); + data.put(FLINK_CONF_FILENAME, getFlinkConfData(propertiesMap)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private Map getClusterSidePropertiesMap(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.HOST); + return clusterSideConfig.toMap(); + } + + @VisibleForTesting + String getFlinkConfData(Map propertiesMap) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + propertiesMap.forEach((k, v) -> { + out.print(k); + out.print(": "); + out.println(v); + }); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000..c8a34ca4d3 --- /dev/null +++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -0,0 +1,293 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint; +import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.net.URI; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkKubeClientFactory clientFactory; + + private final FlinkKubeClient client; + + private final String clusterId; + + public KubernetesClusterDescriptor(Configuration flinkConfig, FlinkKubeClientFactory clientFactory) { + this.flinkConfig = flinkConfig; + this.clientFactory = clientFactory; + this.client = clientFactory.fromConfiguration(flinkConfig, "client"); + this.clusterId = + checkNotNull(flinkConfig.get(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClientProvider createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional restEndpoint; + try (FlinkKubeClient client = clientFactory.fromConfiguration(configuration, "client")) { + restEndpoint = client.getRestEndpoint(clusterId); + } + + if (restEndpoint.isPresent()) { + configuration.set(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.set(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices(getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } + + @Override + public ClusterClientProvider retrieve(String clusterId) { + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployApplicationCluster( + final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + if (client.getService(ExternalServiceDecorator.getExternalServiceName(clusterId)) + .isPresent()) { + client.stopAndCleanupCluster(clusterId); + LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId); + } + + checkNotNull(clusterSpecification); + checkNotNull(applicationConfiguration); + + final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); + if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { + throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster." + + " Expected deployment.target=" + + KubernetesDeploymentTarget.APPLICATION.getName() + + " but actual one was \"" + + deploymentTarget + + "\""); + } + + applicationConfiguration.applyToConfiguration(flinkConfig); + + // No need to do pipelineJars validation if it is a PyFlink job. + if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) + || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) { + final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(flinkConfig); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + } + + final ClusterClientProvider clusterClientProvider = deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) + throws ClusterDeploymentException { + throw new ClusterDeploymentException("Per-Job Mode not supported by Active Kubernetes deployments."); + } + + private ClusterClientProvider deployClusterInternal( + String entryPoint, ClusterSpecification clusterSpecification, boolean detached) + throws ClusterDeploymentException { + final ClusterEntrypoint.ExecutionMode executionMode = + detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; + flinkConfig.set(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); + + flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + try { + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map(file -> KubernetesUtils.loadPodFromTemplateFile(client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e1); + } + throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + throw new FlinkException("Could not kill Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..15c80c8569 --- /dev/null +++ b/dinky-client/dinky-client-1.19/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,188 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava31.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Mounts the log4j.properties, logback.xml, and config.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(GlobalConfiguration.getFlinkConfFilename()) + .withPath(GlobalConfiguration.getFlinkConfFilename()) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final List confData = getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration()); + data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(confData)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private List getClusterSideConfData(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.HOST); + return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false); + } + + @VisibleForTesting + String getFlinkConfData(List confData) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + confData.forEach(out::println); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java b/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000..ccb5d01f53 --- /dev/null +++ b/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/kubernetes/KubernetesClusterDescriptor.java @@ -0,0 +1,306 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.deployment.application.ApplicationConfiguration; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.ClusterClientProvider; +import org.apache.flink.client.program.PackagedProgramUtils; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.artifact.KubernetesArtifactUploader; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptionsInternal; +import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget; +import org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint; +import org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint; +import org.apache.flink.kubernetes.kubeclient.Endpoint; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClient; +import org.apache.flink.kubernetes.kubeclient.FlinkKubeClientFactory; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.KubernetesJobManagerSpecification; +import org.apache.flink.kubernetes.kubeclient.decorators.ExternalServiceDecorator; +import org.apache.flink.kubernetes.kubeclient.factory.KubernetesJobManagerFactory; +import org.apache.flink.kubernetes.kubeclient.parameters.KubernetesJobManagerParameters; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.kubernetes.utils.KubernetesUtils; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; + +import java.net.URI; +import java.util.List; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Kubernetes specific {@link ClusterDescriptor} implementation. */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private final Configuration flinkConfig; + + private final FlinkKubeClientFactory clientFactory; + + private final FlinkKubeClient client; + + private final KubernetesArtifactUploader artifactUploader; + + private final String clusterId; + + public KubernetesClusterDescriptor( + Configuration flinkConfig, + FlinkKubeClientFactory clientFactory, + KubernetesArtifactUploader artifactUploader) { + this.flinkConfig = flinkConfig; + this.clientFactory = clientFactory; + this.artifactUploader = artifactUploader; + this.client = clientFactory.fromConfiguration(flinkConfig, "client"); + this.clusterId = + checkNotNull(flinkConfig.get(KubernetesConfigOptions.CLUSTER_ID), "ClusterId must be specified!"); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClientProvider createClusterClientProvider(String clusterId) { + return () -> { + final Configuration configuration = new Configuration(flinkConfig); + + final Optional restEndpoint; + try (FlinkKubeClient client = clientFactory.fromConfiguration(configuration, "client")) { + restEndpoint = client.getRestEndpoint(clusterId); + } + + if (restEndpoint.isPresent()) { + configuration.set(RestOptions.ADDRESS, restEndpoint.get().getAddress()); + configuration.set(RestOptions.PORT, restEndpoint.get().getPort()); + } else { + throw new RuntimeException( + new ClusterRetrieveException("Could not get the rest endpoint of " + clusterId)); + } + + try { + // Flink client will always use Kubernetes service to contact with jobmanager. So we + // have a pre-configured web monitor address. Using StandaloneClientHAServices to + // create RestClusterClient is reasonable. + return new RestClusterClient<>( + configuration, + clusterId, + (effectiveConfiguration, fatalErrorHandler) -> + new StandaloneClientHAServices(getWebMonitorAddress(effectiveConfiguration))); + } catch (Exception e) { + throw new RuntimeException(new ClusterRetrieveException("Could not create the RestClusterClient.", e)); + } + }; + } + + private String getWebMonitorAddress(Configuration configuration) throws Exception { + AddressResolution resolution = AddressResolution.TRY_ADDRESS_RESOLUTION; + final KubernetesConfigOptions.ServiceExposedType serviceType = + configuration.get(KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE); + if (serviceType.isClusterIP()) { + resolution = AddressResolution.NO_ADDRESS_RESOLUTION; + LOG.warn( + "Please note that Flink client operations(e.g. cancel, list, stop," + + " savepoint, etc.) won't work from outside the Kubernetes cluster" + + " since '{}' has been set to {}.", + KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE.key(), + serviceType); + } + return HighAvailabilityServicesUtils.getWebMonitorAddress(configuration, resolution); + } + + @Override + public ClusterClientProvider retrieve(String clusterId) { + final ClusterClientProvider clusterClientProvider = createClusterClientProvider(clusterId); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Retrieve flink cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + final ClusterClientProvider clusterClientProvider = + deployClusterInternal(KubernetesSessionClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink session cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployApplicationCluster( + final ClusterSpecification clusterSpecification, final ApplicationConfiguration applicationConfiguration) + throws ClusterDeploymentException { + if (client.getService(ExternalServiceDecorator.getExternalServiceName(clusterId)) + .isPresent()) { + client.stopAndCleanupCluster(clusterId); + LOG.warn("The Flink cluster {} already exists, automatically stopAndCleanupCluster.", clusterId); + } + + checkNotNull(clusterSpecification); + checkNotNull(applicationConfiguration); + + final KubernetesDeploymentTarget deploymentTarget = KubernetesDeploymentTarget.fromConfig(flinkConfig); + if (KubernetesDeploymentTarget.APPLICATION != deploymentTarget) { + throw new ClusterDeploymentException("Couldn't deploy Kubernetes Application Cluster." + + " Expected deployment.target=" + + KubernetesDeploymentTarget.APPLICATION.getName() + + " but actual one was \"" + + deploymentTarget + + "\""); + } + + applicationConfiguration.applyToConfiguration(flinkConfig); + + // No need to do pipelineJars validation if it is a PyFlink job. + if (!(PackagedProgramUtils.isPython(applicationConfiguration.getApplicationClassName()) + || PackagedProgramUtils.isPython(applicationConfiguration.getProgramArguments()))) { + final List pipelineJars = KubernetesUtils.checkJarFileForApplicationMode(flinkConfig); + Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + } + + try { + artifactUploader.uploadAll(flinkConfig); + } catch (Exception ex) { + throw new ClusterDeploymentException(ex); + } + + final ClusterClientProvider clusterClientProvider = deployClusterInternal( + KubernetesApplicationClusterEntrypoint.class.getName(), clusterSpecification, false); + + try (ClusterClient clusterClient = clusterClientProvider.getClusterClient()) { + LOG.info( + "Create flink application cluster {} successfully, JobManager Web Interface: {}", + clusterId, + clusterClient.getWebInterfaceURL()); + } + return clusterClientProvider; + } + + @Override + public ClusterClientProvider deployJobCluster( + ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) + throws ClusterDeploymentException { + throw new ClusterDeploymentException("Per-Job Mode not supported by Active Kubernetes deployments."); + } + + private ClusterClientProvider deployClusterInternal( + String entryPoint, ClusterSpecification clusterSpecification, boolean detached) + throws ClusterDeploymentException { + final ClusterEntrypoint.ExecutionMode executionMode = + detached ? ClusterEntrypoint.ExecutionMode.DETACHED : ClusterEntrypoint.ExecutionMode.NORMAL; + flinkConfig.set(ClusterEntrypoint.INTERNAL_CLUSTER_EXECUTION_MODE, executionMode.toString()); + + flinkConfig.set(KubernetesConfigOptionsInternal.ENTRY_POINT_CLASS, entryPoint); + + // Rpc, blob, rest, taskManagerRpc ports need to be exposed, so update them to fixed values. + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, BlobServerOptions.PORT, Constants.BLOB_SERVER_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, TaskManagerOptions.RPC_PORT, Constants.TASK_MANAGER_RPC_PORT); + KubernetesUtils.checkAndUpdatePortConfigOption(flinkConfig, RestOptions.BIND_PORT, Constants.REST_PORT); + + if (HighAvailabilityMode.isHighAvailabilityModeActivated(flinkConfig)) { + flinkConfig.set(HighAvailabilityOptions.HA_CLUSTER_ID, clusterId); + KubernetesUtils.checkAndUpdatePortConfigOption( + flinkConfig, + HighAvailabilityOptions.HA_JOB_MANAGER_PORT_RANGE, + flinkConfig.get(JobManagerOptions.PORT)); + } + + try { + final KubernetesJobManagerParameters kubernetesJobManagerParameters = + new KubernetesJobManagerParameters(flinkConfig, clusterSpecification); + + final FlinkPod podTemplate = kubernetesJobManagerParameters + .getPodTemplateFilePath() + .map(file -> KubernetesUtils.loadPodFromTemplateFile(client, file, Constants.MAIN_CONTAINER_NAME)) + .orElse(new FlinkPod.Builder().build()); + final KubernetesJobManagerSpecification kubernetesJobManagerSpec = + KubernetesJobManagerFactory.buildKubernetesJobManagerSpecification( + podTemplate, kubernetesJobManagerParameters); + + client.createJobManagerComponent(kubernetesJobManagerSpec); + + return createClusterClientProvider(clusterId); + } catch (Exception e) { + try { + LOG.warn( + "Failed to create the Kubernetes cluster \"{}\", try to clean up the residual resources.", + clusterId); + client.stopAndCleanupCluster(clusterId); + } catch (Exception e1) { + LOG.info("Failed to stop and clean up the Kubernetes cluster \"{}\".", clusterId, e1); + } + throw new ClusterDeploymentException("Could not create Kubernetes cluster \"" + clusterId + "\".", e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + throw new FlinkException("Could not kill Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + client.close(); + } catch (Exception e) { + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java b/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java new file mode 100644 index 0000000000..15c80c8569 --- /dev/null +++ b/dinky-client/dinky-client-1.20/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/FlinkConfMountDecorator.java @@ -0,0 +1,188 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.kubernetes.kubeclient.decorators; + +import static org.apache.flink.kubernetes.utils.Constants.CONFIG_MAP_PREFIX; +import static org.apache.flink.kubernetes.utils.Constants.FLINK_CONF_VOLUME; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.dinky.constant.FlinkParamConstant.CONFIG_FILE_NAME_LIST; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.DeploymentOptionsInternal; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; +import org.apache.flink.kubernetes.kubeclient.FlinkPod; +import org.apache.flink.kubernetes.kubeclient.parameters.AbstractKubernetesParameters; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMap; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Container; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.ContainerBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.HasMetadata; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPath; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.KeyToPathBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Pod; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.PodBuilder; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.Volume; +import org.apache.flink.kubernetes.shaded.io.fabric8.kubernetes.api.model.VolumeBuilder; +import org.apache.flink.kubernetes.utils.Constants; +import org.apache.flink.shaded.guava31.com.google.common.io.Files; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * Mounts the log4j.properties, logback.xml, and config.yaml configuration on the JobManager or + * TaskManager pod. + */ +public class FlinkConfMountDecorator extends AbstractKubernetesStepDecorator { + + private final AbstractKubernetesParameters kubernetesComponentConf; + + public FlinkConfMountDecorator(AbstractKubernetesParameters kubernetesComponentConf) { + this.kubernetesComponentConf = checkNotNull(kubernetesComponentConf); + } + + @Override + public FlinkPod decorateFlinkPod(FlinkPod flinkPod) { + final Pod mountedPod = decoratePod(flinkPod.getPodWithoutMainContainer()); + + final Container mountedMainContainer = new ContainerBuilder(flinkPod.getMainContainer()) + .addNewVolumeMount() + .withName(FLINK_CONF_VOLUME) + .withMountPath(kubernetesComponentConf.getFlinkConfDirInPod()) + .endVolumeMount() + .build(); + + return new FlinkPod.Builder(flinkPod) + .withPod(mountedPod) + .withMainContainer(mountedMainContainer) + .build(); + } + + private Pod decoratePod(Pod pod) { + final List keyToPaths = getLocalLogConfFiles().stream() + .map(file -> new KeyToPathBuilder() + .withKey(file.getName()) + .withPath(file.getName()) + .build()) + .collect(Collectors.toList()); + keyToPaths.add(new KeyToPathBuilder() + .withKey(GlobalConfiguration.getFlinkConfFilename()) + .withPath(GlobalConfiguration.getFlinkConfFilename()) + .build()); + + final Volume flinkConfVolume = new VolumeBuilder() + .withName(FLINK_CONF_VOLUME) + .withNewConfigMap() + .withName(getFlinkConfConfigMapName(kubernetesComponentConf.getClusterId())) + .withItems(keyToPaths) + .endConfigMap() + .build(); + + return new PodBuilder(pod) + .editSpec() + .addNewVolumeLike(flinkConfVolume) + .endVolume() + .endSpec() + .build(); + } + + @Override + public List buildAccompanyingKubernetesResources() throws IOException { + final String clusterId = kubernetesComponentConf.getClusterId(); + + final Map data = new HashMap<>(); + + final List localLogFiles = getLocalLogConfFiles(); + for (File file : localLogFiles) { + data.put(file.getName(), Files.toString(file, StandardCharsets.UTF_8)); + } + + final List confData = getClusterSideConfData(kubernetesComponentConf.getFlinkConfiguration()); + data.put(GlobalConfiguration.getFlinkConfFilename(), getFlinkConfData(confData)); + + final ConfigMap flinkConfConfigMap = new ConfigMapBuilder() + .withApiVersion(Constants.API_VERSION) + .withNewMetadata() + .withName(getFlinkConfConfigMapName(clusterId)) + .withLabels(kubernetesComponentConf.getCommonLabels()) + .endMetadata() + .addToData(data) + .build(); + + return Collections.singletonList(flinkConfConfigMap); + } + + /** Get properties map for the cluster-side after removal of some keys. */ + private List getClusterSideConfData(Configuration flinkConfig) { + final Configuration clusterSideConfig = flinkConfig.clone(); + // Remove some configuration options that should not be taken to cluster side. + clusterSideConfig.removeConfig(KubernetesConfigOptions.KUBE_CONFIG_FILE); + clusterSideConfig.removeConfig(DeploymentOptionsInternal.CONF_DIR); + clusterSideConfig.removeConfig(RestOptions.BIND_ADDRESS); + clusterSideConfig.removeConfig(JobManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.BIND_HOST); + clusterSideConfig.removeConfig(TaskManagerOptions.HOST); + return ConfigurationUtils.convertConfigToWritableLines(clusterSideConfig, false); + } + + @VisibleForTesting + String getFlinkConfData(List confData) throws IOException { + try (StringWriter sw = new StringWriter(); + PrintWriter out = new PrintWriter(sw)) { + confData.forEach(out::println); + + return sw.toString(); + } + } + + private List getLocalLogConfFiles() { + final String confDir = kubernetesComponentConf.getConfigDirectory(); + + List localLogConfFiles = new ArrayList<>(); + for (String fileName : CONFIG_FILE_NAME_LIST) { + final File file = new File(confDir, fileName); + if (file.exists()) { + localLogConfFiles.add(file); + } + } + + return localLogConfFiles; + } + + @VisibleForTesting + public static String getFlinkConfConfigMapName(String clusterId) { + return CONFIG_MAP_PREFIX + clusterId; + } +} diff --git a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java index 3afa0c1915..fea7a1f1f7 100644 --- a/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java +++ b/dinky-client/dinky-client-base/src/main/java/org/dinky/constant/FlinkParamConstant.java @@ -19,6 +19,9 @@ package org.dinky.constant; +import java.util.Arrays; +import java.util.List; + /** * FlinkParam * @@ -26,4 +29,13 @@ */ public final class FlinkParamConstant { public static final String SPLIT = ","; + + public static final List CONFIG_FILE_NAME_LIST = Arrays.asList( + "logback.xml", + "log4j.properties", + "logback-console.xml", + "log4j-console.properties", + "logback-session.xml", + "log4j-session.properties", + "log4j-cli.properties"); }