Skip to content

Commit

Permalink
replace dict with dataclass schema in aws kms_key_arns function
Browse files Browse the repository at this point in the history
  • Loading branch information
joneszc committed Oct 31, 2024
1 parent b70adaa commit 8569dbe
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
22 changes: 15 additions & 7 deletions src/_nebari/provider/cloud/amazon_web_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import re
import time
from typing import Dict, List, Optional
from dataclasses import dataclass

import boto3
from botocore.exceptions import ClientError, EndpointConnectionError
Expand Down Expand Up @@ -121,8 +122,15 @@ def instances(region: str) -> Dict[str, str]:
return {t: t for t in instance_types}


@dataclass
class Kms_Key_Info:
Arn: str
KeyUsage: str
KeySpec: str
KeyManager: str

@functools.lru_cache()
def kms_key_arns(region: str) -> Dict[str, dict]:
def kms_key_arns(region: str) -> Dict[str, Kms_Key_Info]:
"""Return dict of available/enabled KMS key IDs and associated KeyMetadata for the AWS region."""
session = aws_session(region=region)
client = session.client("kms")
Expand All @@ -133,12 +141,12 @@ def kms_key_arns(region: str) -> Dict[str, dict]:
# https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kms/client/describe_key.html#:~:text=Response%20Structure
key_data = client.describe_key(KeyId=key_id).get("KeyMetadata")
if key_data.get("Enabled"):
kms_keys[key_id] = {
"Arn": key_data.get("Arn"),
"KeyUsage": key_data.get("KeyUsage"),
"KeySpec": key_data.get("KeySpec"),
"KeyManager": key_data.get("KeyManager"),
}
kms_keys[key_id] = Kms_Key_Info(
Arn=key_data.get("Arn"),
KeyUsage=key_data.get("KeyUsage"),
KeySpec=key_data.get("KeySpec"),
KeyManager=key_data.get("KeyManager"),
)
return kms_keys


Expand Down
12 changes: 6 additions & 6 deletions src/_nebari/stages/infrastructure/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,25 +557,25 @@ def _check_input(cls, data: Any) -> Any:
# Raise error if key_id is not found in available_kms_keys
if (
len(key_id) != 1
or available_kms_keys[key_id[0]]["Arn"] != data["eks_kms_arn"]
or available_kms_keys[key_id[0]].Arn != data["eks_kms_arn"]
):
raise ValueError(
f"Amazon Web Services KMS Key with ARN {data['eks_kms_arn']} not one of available/enabled keys={[v['Arn'] for v in available_kms_keys.values() if v['KeyManager']=='CUSTOMER']}"
f"Amazon Web Services KMS Key with ARN {data['eks_kms_arn']} not one of available/enabled keys={[v.Arn for v in available_kms_keys.values() if v.KeyManager=='CUSTOMER']}"
)
key_id = key_id[0]
# Raise error if key is not a customer managed key
if available_kms_keys[key_id]["KeyManager"] != "CUSTOMER":
if available_kms_keys[key_id].KeyManager != "CUSTOMER":
raise ValueError(
f"Amazon Web Services KMS Key with ID {key_id} is not a customer managed key"
)
# Symmetric KMS keys with Encrypt and decrypt key-usage have the SYMMETRIC_DEFAULT key-spec
# EKS cluster encryption requires a Symmetric key that is set to encrypt and decrypt data
if available_kms_keys[key_id]["KeySpec"] != "SYMMETRIC_DEFAULT":
if available_kms_keys[key_id]["KeyUsage"] == "GENERATE_VERIFY_MAC":
if available_kms_keys[key_id].KeySpec != "SYMMETRIC_DEFAULT":
if available_kms_keys[key_id].KeyUsage == "GENERATE_VERIFY_MAC":
raise ValueError(
f"Amazon Web Services KMS Key with ID {key_id} does not have KeyUsage set to 'Encrypt and decrypt' data"
)
elif available_kms_keys[key_id]["KeyUsage"] != "ENCRYPT_DECRYPT":
elif available_kms_keys[key_id].KeyUsage != "ENCRYPT_DECRYPT":
raise ValueError(
f"Amazon Web Services KMS Key with ID {key_id} is not of type Symmetric, and KeyUsage not set to 'Encrypt and decrypt' data"
)
Expand Down

0 comments on commit 8569dbe

Please sign in to comment.