Skip to content

Commit

Permalink
ASG AllocationStrategy support, subnet discovery (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
prateekgogia authored Nov 2, 2021
1 parent bfac310 commit bc46950
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 25 deletions.
25 changes: 25 additions & 0 deletions operator/config/data-plane-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,35 @@ spec:
type: object
spec:
properties:
allocationStrategy:
description: AllocationStrategy helps user define the strategy to
provision worker nodes in EC2, defaults to "lowest-price"
type: string
clusterName:
description: ClusterName is used to connect the worker nodes to a
control plane clusterName.
type: string
instanceTypes:
description: InstanceTypes is an optional field thats lets user specify
the instance types for worker nodes, defaults to instance types
"t2.xlarge", "t3.xlarge" or "t3a.xlarge"
items:
type: string
type: array
nodeCount:
description: NodeCount is the desired number of worker nodes for this
dataplane.
type: integer
subnetSelector:
additionalProperties:
type: string
description: SubnetSelector lets user define label key and values
for kit to select the subnets for worker nodes. It can contain key:value
to select subnets with particular label, or a specific key:"*" to
select all subnets with a specific key. If no selector is provided,
worker nodes are provisioned in the same subnet as control plane
nodes.
type: object
type: object
status:
properties:
Expand Down
9 changes: 8 additions & 1 deletion operator/docs/examples/dataplane.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,11 @@ metadata:
name: example-nodes
spec:
clusterName: example # Desired Cluster Name
nodeCount: 1
nodeCount: 1
subnetSelector:
kubernetes.io/cluster/kit-management-cluster: "*"
instanceTypes:
- c4.xlarge
- c5.xlarge
- c4.4xlarge
- c5.4xlarge
19 changes: 18 additions & 1 deletion operator/pkg/apis/dataplane/v1alpha1/dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,23 @@ type DataPlaneList struct {
}

type DataPlaneSpec struct {
// ClusterName is used to connect the worker nodes to a control plane clusterName.
ClusterName string `json:"clusterName,omitempty"`
NodeCount int `json:"nodeCount,omitempty"`
// NodeCount is the desired number of worker nodes for this dataplane.
NodeCount int `json:"nodeCount,omitempty"`
// SubnetSelector lets user define label key and values for kit to select
// the subnets for worker nodes. It can contain key:value to select subnets
// with particular label, or a specific key:"*" to select all subnets with a
// specific key. If no selector is provided, worker nodes are
// provisioned in the same subnet as control plane nodes.
// +optional
SubnetSelector map[string]string `json:"subnetSelector,omitempty"`
// InstanceTypes is an optional field thats lets user specify the instance
// types for worker nodes, defaults to instance types "t2.xlarge", "t3.xlarge" or "t3a.xlarge"
// +optional
InstanceTypes []string `json:"instanceTypes,omitempty"`
// AllocationStrategy helps user define the strategy to provision worker nodes in EC2,
// defaults to "lowest-price"
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
}
6 changes: 6 additions & 0 deletions operator/pkg/apis/dataplane/v1alpha1/dataplane_defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ func (c *DataPlane) SetDefaults(ctx context.Context) {

// SetDefaults for the DataPlaneSpec, cascading to all subspecs
func (s *DataPlaneSpec) SetDefaults(ctx context.Context) {
if s.AllocationStrategy == "" {
s.AllocationStrategy = "lowest-price"
}
if len(s.InstanceTypes) == 0 {
s.InstanceTypes = []string{"t2.xlarge", "t3.xlarge", "t3a.xlarge"}
}
}
14 changes: 13 additions & 1 deletion operator/pkg/apis/dataplane/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

123 changes: 101 additions & 22 deletions operator/pkg/awsprovider/instances/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,22 @@ func NewController(ec2api *awsprovider.EC2, autoscaling *awsprovider.AutoScaling
}

func (c *Controller) Reconcile(ctx context.Context, dataplane *v1alpha1.DataPlane) error {
privateSubnets, err := c.getPrivateSubnetsFor(ctx, dataplane.Spec.ClusterName)
if err != nil {
return fmt.Errorf("getting private subnet for %s, %w", dataplane.Spec.ClusterName, err)
}
if len(privateSubnets) == 0 {
return fmt.Errorf("failed to find private subnets for dataplane")
}
asg, err := c.getAutoScalingGroup(ctx, AutoScalingGroupNameFor(dataplane.Spec.ClusterName))
if err != nil {
return fmt.Errorf("getting auto scaling group for %v, %w", dataplane.Spec.ClusterName, err)
}
if asg == nil {
if err := c.createAutoScalingGroup(ctx, dataplane, privateSubnets); err != nil {
if err := c.createAutoScalingGroup(ctx, dataplane); err != nil {
return fmt.Errorf("creating auto scaling group for %v, %w", dataplane.Spec.ClusterName, err)
}
zap.S().Infof("[%s] Created autoscaling group", dataplane.Spec.ClusterName)
return nil
}
if err := c.updateAutoScalingGroup(ctx, dataplane, asg, privateSubnets); err != nil {
if asg.Status != nil && *asg.Status == "Delete in progress" {
// there are scenarios if you delete ASG and recreate quickly ASG might still be getting deleted
return fmt.Errorf("ASG %v deletion in progress", asg.AutoScalingGroupName)
}
if err := c.updateAutoScalingGroup(ctx, dataplane, asg); err != nil {
return fmt.Errorf("updating auto scaling group %v, %w", AutoScalingGroupNameFor(dataplane.Spec.ClusterName), err)
}
return nil
Expand All @@ -79,31 +76,65 @@ func (c *Controller) Finalize(ctx context.Context, dataplane *v1alpha1.DataPlane
return nil
}

func (c *Controller) updateAutoScalingGroup(ctx context.Context, dataplane *v1alpha1.DataPlane, asg *autoscaling.Group, subnets []string) error {
func (c *Controller) updateAutoScalingGroup(ctx context.Context, dataplane *v1alpha1.DataPlane, asg *autoscaling.Group) error {
subnets, err := c.subnetsFor(ctx, dataplane)
if err != nil {
return fmt.Errorf("getting private subnet for %s, %w", dataplane.Spec.ClusterName, err)
}
if len(subnets) == 0 {
return fmt.Errorf("failed to find private subnets for dataplane")
}
if functional.ValidateAll(
func() bool { return asg != nil },
func() bool {
return functional.StringsMatch(strings.Split(ptr.StringValue(asg.VPCZoneIdentifier), ","), subnets)
},
func() bool { return ptr.Int64Value(asg.DesiredCapacity) == int64(dataplane.Spec.NodeCount) }) {
func() bool { return ptr.Int64Value(asg.DesiredCapacity) == int64(dataplane.Spec.NodeCount) },
func() bool {
return functional.StringsMatch(
parseOverridesFromASG(asg.MixedInstancesPolicy.LaunchTemplate.Overrides),
parseOverridesFromASG(instanceTypes(dataplane.Spec.InstanceTypes)),
)
}) {
return nil
}
_, err := c.autoscaling.UpdateAutoScalingGroupWithContext(ctx, &autoscaling.UpdateAutoScalingGroupInput{
zap.S().Infof("[%v] updating ASG %v", dataplane.Spec.ClusterName, *asg.AutoScalingGroupName)
_, err = c.autoscaling.UpdateAutoScalingGroupWithContext(ctx, &autoscaling.UpdateAutoScalingGroupInput{
AutoScalingGroupName: ptr.String(AutoScalingGroupNameFor(dataplane.Spec.ClusterName)),
DesiredCapacity: ptr.Int64(int64(dataplane.Spec.NodeCount)),
VPCZoneIdentifier: ptr.String(strings.Join(subnets, ",")),
MixedInstancesPolicy: &autoscaling.MixedInstancesPolicy{
LaunchTemplate: &autoscaling.LaunchTemplate{
Overrides: instanceTypes(dataplane.Spec.InstanceTypes),
},
},
})
return err
}

func (c *Controller) createAutoScalingGroup(ctx context.Context, dataplane *v1alpha1.DataPlane, subnets []string) error {
_, err := c.autoscaling.CreateAutoScalingGroupWithContext(ctx, &autoscaling.CreateAutoScalingGroupInput{
func (c *Controller) createAutoScalingGroup(ctx context.Context, dataplane *v1alpha1.DataPlane) error {
subnets, err := c.subnetsFor(ctx, dataplane)
if err != nil {
return fmt.Errorf("getting private subnet for %s, %w", dataplane.Spec.ClusterName, err)
}
if len(subnets) == 0 {
return fmt.Errorf("failed to find private subnets for dataplane")
}
_, err = c.autoscaling.CreateAutoScalingGroupWithContext(ctx, &autoscaling.CreateAutoScalingGroupInput{
AutoScalingGroupName: ptr.String(AutoScalingGroupNameFor(dataplane.Spec.ClusterName)),
DesiredCapacity: ptr.Int64(int64(dataplane.Spec.NodeCount)),
MaxSize: ptr.Int64(int64(1000)),
MinSize: ptr.Int64(int64(0)),
LaunchTemplate: &autoscaling.LaunchTemplateSpecification{
LaunchTemplateName: ptr.String(launchtemplate.TemplateName(dataplane.Spec.ClusterName)),
MixedInstancesPolicy: &autoscaling.MixedInstancesPolicy{
InstancesDistribution: &autoscaling.InstancesDistribution{
OnDemandAllocationStrategy: ptr.String(dataplane.Spec.AllocationStrategy),
},
LaunchTemplate: &autoscaling.LaunchTemplate{
LaunchTemplateSpecification: &autoscaling.LaunchTemplateSpecification{
LaunchTemplateName: ptr.String(launchtemplate.TemplateName(dataplane.Spec.ClusterName)),
},
Overrides: instanceTypes(dataplane.Spec.InstanceTypes),
},
},
VPCZoneIdentifier: ptr.String(strings.Join(subnets, ",")),
Tags: generateAutoScalingTags(dataplane.Spec.ClusterName),
Expand All @@ -127,14 +158,19 @@ func (c *Controller) getAutoScalingGroup(ctx context.Context, groupName string)
return output.AutoScalingGroups[0], nil
}

func (c *Controller) getPrivateSubnetsFor(ctx context.Context, clusterName string) ([]string, error) {
instanceIDs, err := c.instances.ControlPlaneInstancesFor(ctx, clusterName)
func (c *Controller) subnetsFor(ctx context.Context, dataplane *v1alpha1.DataPlane) ([]string, error) {
// Discover subnets provided as part of the subnetSelector in DP spec.
if len(dataplane.Spec.SubnetSelector) != 0 {
return c.subnetsForSelector(ctx, dataplane.Spec.SubnetSelector)
}
// If subnetSelector is not provided fallback on control plane instance subnets
instanceIDs, err := c.instances.ControlPlaneInstancesFor(ctx, dataplane.Spec.ClusterName)
if err != nil {
return nil, err
}
subnetIDs, err := c.getSubnetIDsFor(ctx, instanceIDs)
subnetIDs, err := c.subnetsForInstances(ctx, instanceIDs)
if err != nil {
return nil, fmt.Errorf("getting subnet for %s, %w", clusterName, err)
return nil, fmt.Errorf("getting subnet for %s, %w", dataplane.Spec.ClusterName, err)
}
return c.filterPrivateSubnets(ctx, subnetIDs)
}
Expand All @@ -156,7 +192,34 @@ func (c *Controller) filterPrivateSubnets(ctx context.Context, ids []*string) ([
return result, nil
}

func (c *Controller) getSubnetIDsFor(ctx context.Context, instanceIDs []string) ([]*string, error) {
func (c *Controller) subnetsForSelector(ctx context.Context, selector map[string]string) ([]string, error) {
filters := []*ec2.Filter{}
// Filter by selector
for key, value := range selector {
if value == "*" {
filters = append(filters, &ec2.Filter{
Name: aws.String("tag-key"),
Values: []*string{aws.String(key)},
})
} else {
filters = append(filters, &ec2.Filter{
Name: aws.String(fmt.Sprintf("tag:%s", key)),
Values: []*string{aws.String(value)},
})
}
}
output, err := c.ec2api.DescribeSubnetsWithContext(ctx, &ec2.DescribeSubnetsInput{Filters: filters})
if err != nil {
return nil, fmt.Errorf("describing subnets %+v, %w", filters, err)
}
result := []string{}
for _, o := range output.Subnets {
result = append(result, *o.SubnetId)
}
return result, nil
}

func (c *Controller) subnetsForInstances(ctx context.Context, instanceIDs []string) ([]*string, error) {
requestIds := []*string{}
for _, instanceID := range instanceIDs {
requestIds = append(requestIds, ptr.String(instanceID))
Expand Down Expand Up @@ -191,7 +254,23 @@ func generateAutoScalingTags(clusterName string) []*autoscaling.Tag {
PropagateAtLaunch: aws.Bool(true),
}, {
Key: aws.String("Name"),
Value: aws.String("auto-scaling-group"),
Value: aws.String(fmt.Sprintf("%s-dataplane-nodes", clusterName)),
PropagateAtLaunch: aws.Bool(true),
}}
}

func instanceTypes(overrides []string) []*autoscaling.LaunchTemplateOverrides {
result := []*autoscaling.LaunchTemplateOverrides{}
for _, override := range overrides {
result = append(result, &autoscaling.LaunchTemplateOverrides{InstanceType: ptr.String(override)})
}
return result
}

func parseOverridesFromASG(overrides []*autoscaling.LaunchTemplateOverrides) []string {
result := []string{}
for _, override := range overrides {
result = append(result, ptr.StringValue(override.InstanceType))
}
return result
}

0 comments on commit bc46950

Please sign in to comment.