diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java index 1bd6134081..3ccb0db3d2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/manager/KubernetesOptimizerContainer.java @@ -18,12 +18,18 @@ package org.apache.amoro.server.manager; +import io.fabric8.kubernetes.api.model.Container; +import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.LocalObjectReference; import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder; +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.api.model.PodTemplate; import io.fabric8.kubernetes.api.model.Quantity; +import io.fabric8.kubernetes.api.model.ResourceRequirements; import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; +import io.fabric8.kubernetes.api.model.apps.DeploymentSpec; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; @@ -34,12 +40,15 @@ import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.Yaml; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -54,7 +63,7 @@ public class KubernetesOptimizerContainer extends AbstractResourceContainer { public static final String NAMESPACE = "namespace"; public static final String IMAGE = "image"; public static final String PULL_POLICY = "pullPolicy"; - + public static final String PODTEMPLATE = "podTemplate"; public static final String PULL_SECRETS = "imagePullSecrets"; public static final String KUBE_CONFIG_PATH = "kube-config-path"; @@ -79,9 +88,67 @@ protected Map doScaleOut(Resource resource) { groupProperties.putAll(getContainerProperties()); groupProperties.putAll(resource.getProperties()); - // generate pod start args - long memoryPerThread = Long.parseLong(checkAndGetProperty(groupProperties, MEMORY_PROPERTY)); - long memory = memoryPerThread * resource.getThreadCount(); + Map argsList = generatePodStartArgs(resource, groupProperties); + String image = argsList.get(IMAGE).toString(); + String namespace = argsList.get(NAMESPACE).toString(); + String pullPolicy = argsList.get(PULL_POLICY).toString(); + List imagePullSecretsList = + (List) argsList.get(PULL_SECRETS); + int cpuLimit = (int) argsList.get("cpuLimit"); + long memory = (long) argsList.get(MEMORY_PROPERTY); + String groupName = argsList.get("groupName").toString(); + String resourceId = argsList.get("resourceId").toString(); + String startUpArgs = argsList.get("startUpArgs").toString(); + + String kubernetesName = NAME_PREFIX + resourceId; + Deployment deployment; + + if (null != groupProperties.get(PODTEMPLATE)) { + // configure the podTemplate read from config + PodTemplate podTemplate = initPodTemplateFromLocal(groupProperties); + + deployment = + initPodTemplateFromFrontEnd( + podTemplate, + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList); + } else { + deployment = + initPodTemplateWithoutConfig( + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList); + } + + client.apps().deployments().inNamespace(namespace).resource(deployment).create(); + Map startupProperties = Maps.newHashMap(); + startupProperties.put(NAMESPACE, namespace); + startupProperties.put(KUBERNETES_NAME_PROPERTIES, kubernetesName); + return startupProperties; + } + + public Map generatePodStartArgs( + Resource resource, Map groupProperties) { + long memoryPerThread; + long memory; + + if (resource.getMemoryMb() > 0) { + memory = resource.getMemoryMb(); + } else { + memoryPerThread = Long.parseLong(checkAndGetProperty(groupProperties, MEMORY_PROPERTY)); + memory = memoryPerThread * resource.getThreadCount(); + } // point at amoro home in docker image String startUpArgs = String.format( @@ -104,7 +171,31 @@ protected Map doScaleOut(Resource resource) { String resourceId = resource.getResourceId(); String groupName = resource.getGroupName(); - String kubernetesName = NAME_PREFIX + resourceId; + + Map argsList = Maps.newHashMap(); + argsList.put(NAMESPACE, namespace); + argsList.put(IMAGE, image); + argsList.put(PULL_POLICY, pullPolicy); + argsList.put(PULL_SECRETS, imagePullSecretsList); + argsList.put(MEMORY_PROPERTY, memory); + argsList.put("cpuLimit", cpuLimit); + argsList.put("resourceId", resourceId); + argsList.put("groupName", groupName); + argsList.put("startUpArgs", startUpArgs); + + return argsList; + } + + public Deployment initPodTemplateWithoutConfig( + String image, + String pullPolicy, + int cpuLimit, + String groupName, + String resourceId, + String startUpArgs, + long memory, + List imagePullSecretsList) { + DeploymentBuilder deploymentBuilder = new DeploymentBuilder() .withNewMetadata() @@ -156,13 +247,80 @@ protected Map doScaleOut(Resource resource) { .endTemplate() .endSpec(); } - Deployment deployment = deploymentBuilder.build(); - client.apps().deployments().inNamespace(namespace).resource(deployment).create(); - Map startupProperties = Maps.newHashMap(); - startupProperties.put(NAMESPACE, namespace); - startupProperties.put(KUBERNETES_NAME_PROPERTIES, kubernetesName); - return startupProperties; + return deploymentBuilder.build(); + } + + public PodTemplate initPodTemplateFromLocal(Map groupProperties) { + return new Yaml().loadAs(groupProperties.get(PODTEMPLATE), PodTemplate.class); + } + + public Deployment initPodTemplateFromFrontEnd( + PodTemplate podTemplate, + String image, + String pullPolicy, + int cpuLimit, + String groupName, + String resourceId, + String startUpArgs, + long memory, + List imagePullSecretsList) { + podTemplate + .getTemplate() + .getMetadata() + .setLabels( + new HashMap() { + { + put("app", NAME_PREFIX + resourceId); + put("AmoroOptimizerGroup", groupName); + put("AmoroResourceId", resourceId); + } + }); + + Container container = new Container(); + container.setName("optimizer"); + container.setImage(image); + container.setImagePullPolicy(pullPolicy); + container.setCommand(new ArrayList<>(Arrays.asList("sh", "-c", startUpArgs))); + + ResourceRequirements resourceRequirements = new ResourceRequirements(); + resourceRequirements.setLimits( + ImmutableMap.of( + "memory", new Quantity(memory + "Mi"), + "cpu", new Quantity(cpuLimit + ""))); + resourceRequirements.setRequests( + ImmutableMap.of( + "memory", new Quantity(memory + "Mi"), + "cpu", new Quantity(cpuLimit + ""))); + container.setResources(resourceRequirements); + + podTemplate.getTemplate().getSpec().getContainers().set(0, container); + + if (!imagePullSecretsList.isEmpty()) { + podTemplate.getTemplate().getSpec().setImagePullSecrets(imagePullSecretsList); + } + + DeploymentSpec deploymentSpec = new DeploymentSpec(); + deploymentSpec.setTemplate(podTemplate.getTemplate()); + + LabelSelector labelSelector = new LabelSelector(); + labelSelector.setMatchLabels( + new HashMap() { + { + put("app", NAME_PREFIX + resourceId); + } + }); + + deploymentSpec.setSelector(labelSelector); + deploymentSpec.setReplicas(1); + + Deployment deployment = new Deployment(); + deployment.setSpec(deploymentSpec); + ObjectMeta deploymentMetadata = new ObjectMeta(); + deploymentMetadata.setName(NAME_PREFIX + resourceId); + deployment.setMetadata(deploymentMetadata); + + return deployment; } @Override diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java new file mode 100644 index 0000000000..fa44530b49 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/manager/TestKubernetesOptimizerContainer.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.manager; + +import io.fabric8.kubernetes.api.model.LocalObjectReference; +import io.fabric8.kubernetes.api.model.LocalObjectReferenceBuilder; +import io.fabric8.kubernetes.api.model.PodTemplate; +import io.fabric8.kubernetes.api.model.apps.Deployment; +import org.apache.amoro.OptimizerProperties; +import org.apache.amoro.resource.Resource; +import org.apache.amoro.resource.ResourceType; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.amoro.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.amoro.utils.JacksonUtil; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.yaml.snakeyaml.Yaml; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class TestKubernetesOptimizerContainer { + private KubernetesOptimizerContainer kubernetesOptimizerContainer; + private Map containerProperties; + private Map groupProperties; + public static final String MEMORY_PROPERTY = "memory"; + public static final String CPU_FACTOR_PROPERTY = "cpu.factor"; + public static final String IMAGE = "image"; + public static final String PULL_POLICY = "pullPolicy"; + public static final String PULL_SECRETS = "imagePullSecrets"; + + @Before + public void setup() throws IOException { + // generating configuration files + kubernetesOptimizerContainer = new KubernetesOptimizerContainer(); + groupProperties = Maps.newHashMap(); + containerProperties = Maps.newHashMap(); + + containerProperties.put(OptimizerProperties.AMS_HOME, "/home/ams"); + containerProperties.put(OptimizerProperties.AMS_OPTIMIZER_URI, "thrift://127.0.0.1:1261"); + + URL resource = getClass().getClassLoader().getResource("config.yaml"); + + JsonNode yamlConfig = + JacksonUtil.fromObjects( + new Yaml().loadAs(Files.newInputStream(Paths.get(resource.getPath())), Map.class)); + JsonNode containers = yamlConfig.get(AmoroManagementConf.CONTAINER_LIST); + for (JsonNode container : containers) { + if (container.get("name").asText().equals("KubernetesContainer")) { + ObjectMapper mapper = new ObjectMapper(); + containerProperties.putAll(mapper.convertValue(container.get("properties"), Map.class)); + } + } + groupProperties.putAll(this.containerProperties); + } + + private static String checkAndGetProperty(Map properties, String key) { + Preconditions.checkState( + properties != null && properties.containsKey(key), "Cannot find %s in properties", key); + return properties.get(key); + } + + @Test + public void testBuildPodTemplateFromLocal() { + PodTemplate podTemplate = + kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties); + + Assert.assertEquals(1, podTemplate.getTemplate().getSpec().getContainers().size()); + // read the image version from the podTemplate config and assert it + Assert.assertEquals( + "apache/amoro:0.6", podTemplate.getTemplate().getSpec().getContainers().get(0).getImage()); + } + + @Test + public void testBuildPodTemplateWithResourceSetMemoryMb() { + PodTemplate podTemplate = + kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties); + + ResourceType resourceType = ResourceType.OPTIMIZER; + Map properties = Maps.newHashMap(); + properties.put("memory", "1024"); + Resource resource = + new Resource.Builder("KubernetesContainer", "k8s", resourceType) + .setMemoryMb(1025) // It's not 0 here + .setThreadCount(1) + .setProperties(properties) + .build(); + groupProperties.putAll(resource.getProperties()); + + Map argsList = + kubernetesOptimizerContainer.generatePodStartArgs(resource, groupProperties); + String image = argsList.get(IMAGE).toString(); + String pullPolicy = argsList.get(PULL_POLICY).toString(); + List imagePullSecretsList = + (List) argsList.get(PULL_SECRETS); + int cpuLimit = (int) argsList.get("cpuLimit"); + long memory = (long) argsList.get(MEMORY_PROPERTY); + String groupName = argsList.get("groupName").toString(); + String resourceId = argsList.get("resourceId").toString(); + String startUpArgs = argsList.get("startUpArgs").toString(); + + Deployment deployment = + kubernetesOptimizerContainer.initPodTemplateFromFrontEnd( + podTemplate, + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList); + + Assert.assertEquals(1, deployment.getSpec().getReplicas().intValue()); + Assert.assertNotEquals( + "1024Mi", + deployment + .getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getResources() + .getLimits() + .get("memory") + .toString()); + Assert.assertEquals( + "1025Mi", + deployment + .getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getResources() + .getLimits() + .get("memory") + .toString()); + } + + @Test + public void testBuildPodTemplateConfig() { + // before parameter merging + PodTemplate podTemplate = + kubernetesOptimizerContainer.initPodTemplateFromLocal(groupProperties); + + // after parameter merging + ResourceType resourceType = ResourceType.OPTIMIZER; + Map properties = Maps.newHashMap(); + properties.put("memory", "1024"); + Resource resource = + new Resource.Builder("KubernetesContainer", "k8s", resourceType) + .setMemoryMb(0) + .setThreadCount(1) + .setProperties(properties) + .build(); + groupProperties.putAll(resource.getProperties()); + + // generate pod start args + long memoryPerThread; + long memory; + + if (resource.getMemoryMb() > 0) { + memory = resource.getMemoryMb(); + } else { + memoryPerThread = Long.parseLong(checkAndGetProperty(groupProperties, MEMORY_PROPERTY)); + memory = memoryPerThread * resource.getThreadCount(); + } + String startUpArgs = + String.format( + "/entrypoint.sh optimizer %s %s", + memory, kubernetesOptimizerContainer.buildOptimizerStartupArgsString(resource)); + // read the image version from config and assert it , but not from podTemplate + String image = checkAndGetProperty(groupProperties, IMAGE); + String pullPolicy = checkAndGetProperty(groupProperties, PULL_POLICY); + String pullSecrets = groupProperties.getOrDefault(PULL_SECRETS, ""); + String cpuLimitFactorString = groupProperties.getOrDefault(CPU_FACTOR_PROPERTY, "1.0"); + double cpuLimitFactor = Double.parseDouble(cpuLimitFactorString); + int cpuLimit = (int) (Math.ceil(cpuLimitFactor * resource.getThreadCount())); + + List imagePullSecretsList = + Arrays.stream(pullSecrets.split(";")) + .map(secret -> new LocalObjectReferenceBuilder().withName(secret).build()) + .collect(Collectors.toList()); + + String resourceId = resource.getResourceId(); + String groupName = resource.getGroupName(); + + Assert.assertEquals(1, podTemplate.getTemplate().getSpec().getContainers().size()); + + // read the image version from the podTemplate config and assert it + Assert.assertEquals( + "apache/amoro:0.6", podTemplate.getTemplate().getSpec().getContainers().get(0).getImage()); + + Deployment deployment = + kubernetesOptimizerContainer.initPodTemplateFromFrontEnd( + podTemplate, + image, + pullPolicy, + cpuLimit, + groupName, + resourceId, + startUpArgs, + memory, + imagePullSecretsList); + + Assert.assertEquals("amoro-optimizer-" + resourceId, deployment.getMetadata().getName()); + Assert.assertEquals( + "k8s", + deployment.getSpec().getTemplate().getMetadata().getLabels().get("AmoroOptimizerGroup")); + Assert.assertEquals(1, deployment.getSpec().getReplicas().intValue()); + Assert.assertEquals( + "IfNotPresent", + deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getImagePullPolicy()); + + Assert.assertEquals(1, deployment.getSpec().getTemplate().getSpec().getContainers().size()); + + // read the image version from the podTemplate config and assert it + // the final version is still apache/amoro:0.7-SNAPSHOT + Assert.assertEquals( + "apache/amoro:0.7-SNAPSHOT", + deployment.getSpec().getTemplate().getSpec().getContainers().get(0).getImage()); + Assert.assertEquals( + String.valueOf(cpuLimit), + deployment + .getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getResources() + .getLimits() + .get("cpu") + .toString()); + Assert.assertEquals( + memory + "Mi", + deployment + .getSpec() + .getTemplate() + .getSpec() + .getContainers() + .get(0) + .getResources() + .getLimits() + .get("memory") + .toString()); + } +} diff --git a/amoro-ams/src/test/resources/config.yaml b/amoro-ams/src/test/resources/config.yaml index 168046fda6..4f1efa9c9f 100644 --- a/amoro-ams/src/test/resources/config.yaml +++ b/amoro-ams/src/test/resources/config.yaml @@ -36,4 +36,37 @@ taskmanager: size: 1728m parallelism: - default: 1 \ No newline at end of file + default: 1 + +containers: + - name: KubernetesContainer + container-impl: org.apache.amoro.server.manager.KubernetesOptimizerContainer + properties: + kube-config-path: "~/.kube/config" + image: apache/amoro:0.7-SNAPSHOT + namespace: default + pullPolicy: "IfNotPresent" + podTemplate: | + apiVersion: apps/v1 + kind: PodTemplate + metadata: + name: + template: + metadata: + labels: + app: + AmoroOptimizerGroup: + AmoroResourceId: + spec: + containers: + - name: optimizer + image: apache/amoro:0.6 + imagePullPolicy: IfNotPresent + command: [ "sh", "-c", "echo 'Hello, World!'" ] + resources: + limits: + memory: 2048Mi + cpu: 2 + requests: + memory: 2048Mi + cpu: 2 \ No newline at end of file diff --git a/docs/admin-guides/deployment-on-kubernetes.md b/docs/admin-guides/deployment-on-kubernetes.md index 22ca2fe0d7..b5d75ef083 100644 --- a/docs/admin-guides/deployment-on-kubernetes.md +++ b/docs/admin-guides/deployment-on-kubernetes.md @@ -209,6 +209,56 @@ optimizer: "flink-conf.taskmanager.memory.network.min": "32mb" } ``` +### Configure the Kubernetes Optimizer Container + +By default, the Kubernetes Optimizer Container is enabled. +You can modify the container configuration by changing the `optimizer.Kubernetes` section. + +```yaml +optimizer: + kubernetes: + # enable the kubernetes optimizer container + enabled: true + properties: + namespace: "default" + kube-config-path: "~/.kube/config" + image: "apache/amoro:latest" + pullPolicy: "IfNotPresent" +``` + +To use PodTemplate, you need to copy and paste the following into the `kubernetes.properties`. + +This is the default podTemplate, and when the user doesn't specify any additional parameters, the default is to use the template's parameters + +Therefore, there will be a priority issue that needs to be elaborated: _Resource(WebUi) > Independent User Profile Configuration > PodTemplate_ + +```yaml +podTemplate: | + apiVersion: apps/v1 + kind: PodTemplate + metadata: + name: + template: + metadata: + labels: + app: + AmoroOptimizerGroup: + AmoroResourceId: + spec: + containers: + - name: optimizer + image: apache/amoro:0.6 + imagePullPolicy: IfNotPresent + command: [ "sh", "-c", "echo 'Hello, World!'" ] + resources: + limits: + memory: 2048Mi + cpu: 2 + requests: + memory: 2048Mi + cpu: 2 + +``` ### Configure the RBAC diff --git a/pom.xml b/pom.xml index 8ed0a923d3..15a3471adf 100644 --- a/pom.xml +++ b/pom.xml @@ -140,7 +140,7 @@ 1.0.1 0.16.0 1.20.0 - 6.12.1 + 6.13.3 0.7.0-incubating 32.1.1-jre 2.14.2