Skip to content

Commit

Permalink
feat: adding import functionality (#13)
Browse files Browse the repository at this point in the history
Co-authored-by: El De-dog-lo <3859395+fubuloubu@users.noreply.github.com>
  • Loading branch information
johnson2427 and fubuloubu authored Jul 1, 2024
1 parent b367ec6 commit 66695cf
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 15 deletions.
28 changes: 26 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,37 @@ ape aws -h
To create a new key:

```bash
ape aws kms create 'KeyAlias' 'Description of new key'
ape aws kms create KeyAlias -d 'Description of new key'
```

To delete this key:

```bash
ape aws kms delete 'KeyAlias'
ape aws kms delete KeyAlias
```

To import an existing private key into KMS:

```bash
$ ape aws kms import KeyAlias
Enter your private key:
SUCCESS: Key imported successfully with ID: <key-id>
```

You can also import a private key from a file (from hex or bytes):

```bash
$ ape aws kms import KeyAlias --private-key <path-to-private-key>
INFO: Reading private key from <private-key-file>
SUCCESS: Key imported successfully with ID: <key-id>
```

You can import using a mnemonic phrase as well:

```bash
$ ape aws kms import KeyAlias --use-mnemonic
Enter your mnemonic phrase:
SUCCESS: Key imported successfully with ID: <key-id>
```

### IPython
Expand Down
7 changes: 4 additions & 3 deletions ape_aws/accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@


class AwsAccountContainer(AccountContainerAPI):

@property
def aliases(self) -> Iterator[str]:
return map(lambda x: x.alias, kms_client.raw_aliases)
return map(lambda x: x.alias.replace("alias/", ""), kms_client.raw_aliases)

def __len__(self) -> int:
return len(kms_client.raw_aliases)
Expand All @@ -25,7 +26,7 @@ def __len__(self) -> int:
def accounts(self) -> Iterator[AccountAPI]:
return map(
lambda x: KmsAccount(
key_alias=x.alias,
key_alias=x.alias.replace("alias/", ""),
key_id=x.key_id,
key_arn=x.arn,
),
Expand All @@ -40,7 +41,7 @@ class KmsAccount(AccountAPI):

@property
def alias(self) -> str:
return self.key_alias.replace("alias/", "")
return self.key_alias

@property
def public_key(self):
Expand Down
108 changes: 104 additions & 4 deletions ape_aws/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
from typing import ClassVar

import boto3 # type: ignore[import]
from pydantic import BaseModel, Field
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, padding
from eth_account import Account
from pydantic import BaseModel, ConfigDict, Field, field_validator


class AliasResponse(BaseModel):
Expand All @@ -15,10 +19,11 @@ class AliasResponse(BaseModel):

class KeyBaseModel(BaseModel):
alias: str
model_config = ConfigDict(populate_by_name=True)


class CreateKeyModel(KeyBaseModel):
description: str = Field(alias="Description")
description: str | None = Field(default=None, alias="Description")
policy: str | None = Field(default=None, alias="Policy")
key_usage: str = Field(default="SIGN_VERIFY", alias="KeyUsage")
key_spec: str = Field(default="ECC_SECG_P256K1", alias="KeySpec")
Expand Down Expand Up @@ -67,10 +72,89 @@ class CreateKey(CreateKeyModel):
origin: str = Field(default="AWS_KMS", alias="Origin")


class ImportKey(CreateKeyModel):
class ImportKeyRequest(CreateKeyModel):
origin: str = Field(default="EXTERNAL", alias="Origin")


class ImportKey(ImportKeyRequest):
key_id: str = Field(default=None, alias="KeyId")
public_key: bytes = Field(default=None, alias="PublicKey")
private_key: str | bytes = Field(default=None, alias="PrivateKey")
import_token: bytes = Field(default=None, alias="ImportToken")

@field_validator("private_key")
def validate_private_key(cls, value):
if value.startswith("0x"):
value = value[2:]
return value

@property
def get_account(self):
return Account.privateKeyToAccount(self.private_key)

@property
def ec_private_key(self):
loaded_key = self.private_key
if isinstance(loaded_key, bytes):
loaded_key = ec.derive_private_key(int(self.private_key, 16), ec.SECP256K1())
elif isinstance(loaded_key, str):
loaded_key = bytes.fromhex(loaded_key[2:])
loaded_key = ec.derive_private_key(int(self.private_key, 16), ec.SECP256K1())
return loaded_key

@property
def private_key_hex(self):
if isinstance(self.private_key, str):
return self.private_key
elif isinstance(self.private_key, bytes):
return self.private_key.hex()
return self.private_key.private_numbers().private_value.to_bytes(32, "big").hex()

@property
def private_key_bin(self):
"""
Returns the private key in binary format
This is required for the `boto3.client.import_key_material` method
"""
return self.ec_private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)

@property
def private_key_pem(self):
"""
Returns the private key in PEM format for use in outside applications.
"""
return self.ec_private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption(),
)

@property
def public_key_der(self):
return serialization.load_der_public_key(
self.public_key,
backend=default_backend(),
)

@property
def encrypted_private_key(self):
if not self.public_key:
raise ValueError("Public key not found")

return self.public_key_der.encrypt(
self.private_key_bin,
padding.OAEP(
mgf=padding.MGF1(hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)


class DeleteKey(KeyBaseModel):
key_id: str
days: int = 30
Expand Down Expand Up @@ -103,8 +187,9 @@ def sign(self, key_id, msghash):
)
return response.get("Signature")

def create_key(self, key_spec: CreateKey):
def create_key(self, key_spec: CreateKey | ImportKeyRequest):
response = self.client.create_key(**key_spec.to_aws_dict())

key_id = response["KeyMetadata"]["KeyId"]
self.client.create_alias(
AliasName=f"alias/{key_spec.alias}",
Expand All @@ -131,6 +216,21 @@ def create_key(self, key_spec: CreateKey):
)
return key_id

def import_key(self, key_spec: ImportKey):
return self.client.import_key_material(
KeyId=key_spec.key_id,
ImportToken=key_spec.import_token,
EncryptedKeyMaterial=key_spec.encrypted_private_key,
ExpirationModel="KEY_MATERIAL_DOES_NOT_EXPIRE",
)

def get_parameters(self, key_id: str):
return self.client.get_parameters_for_import(
KeyId=key_id,
WrappingAlgorithm="RSAES_OAEP_SHA_256",
WrappingKeySpec="RSA_2048",
)

def delete_key(self, key_spec: DeleteKey):
self.client.delete_alias(AliasName=key_spec.alias)
self.client.schedule_key_deletion(KeyId=key_spec.key_id, PendingWindowInDays=key_spec.days)
Expand Down
112 changes: 106 additions & 6 deletions ape_aws/kms/_cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from pathlib import Path

import click
from ape.cli import ape_cli_context
from eth_account import Account as EthAccount
from eth_account.hdaccount import ETHEREUM_DEFAULT_PATH

from ape_aws.client import CreateKey, DeleteKey, kms_client
from ape_aws.client import CreateKey, DeleteKey, ImportKey, ImportKeyRequest, kms_client


@click.group("kms")
Expand All @@ -17,24 +21,27 @@ def kms():
"administrators",
multiple=True,
help="Apply key policy to a list of administrators if applicable, ex. -a ARN1, -a ARN2",
metavar="list[ARN]",
)
@click.option(
"-u",
"--user",
"users",
multiple=True,
help="Apply key policy to a list of users if applicable, ex. -u ARN1, -u ARN2",
metavar="list[ARN]",
)
@click.option(
"-d",
"--description",
"description",
help="The description of the key you intend to create.",
)
@click.argument("alias_name")
@click.argument("description")
def create_key(
cli_ctx,
alias_name: str,
description: str,
administrators: list[str],
users: list[str],
description: str,
):
"""
Create an Ethereum Private Key in AWS KmsAccount
Expand All @@ -54,7 +61,100 @@ def create_key(
cli_ctx.logger.success(f"Key created successfully with ID: {key_id}")


# TODO: Add `ape aws kms import`
@kms.command(name="import")
@ape_cli_context()
@click.option(
"-p",
"--private-key",
"private_key_path",
type=click.Path(),
help="The private key you intend to import",
)
@click.option(
"-a",
"--admin",
"administrators",
multiple=True,
help="Apply key policy to a list of administrators if applicable, ex. -a ARN1, -a ARN2",
)
@click.option(
"-u",
"--user",
"users",
multiple=True,
help="Apply key policy to a list of users if applicable, ex. -u ARN1, -u ARN2",
)
@click.option(
"-d",
"--description",
"description",
help="The description of the key you intend to create.",
)
@click.option(
"--use-mnemonic",
"import_from_mnemonic",
help="Import a key from a mnemonic phrase",
is_flag=True,
)
@click.option(
"--hd-path",
"hd_path",
help="The hierarchical deterministic path to derive the key from",
default=ETHEREUM_DEFAULT_PATH,
)
@click.argument("alias_name")
def import_key(
cli_ctx,
alias_name: str,
private_key_path: Path,
administrators: list[str],
users: list[str],
description: str,
import_from_mnemonic: bool,
hd_path: str,
):
if private_key_path:
if isinstance(private_key_path, str):
private_key_path = Path(private_key_path)
if private_key_path.exists() and private_key_path.is_file():
cli_ctx.logger.info(f"Reading private key from {private_key_path}")
private_key = private_key_path.read_text().strip()

elif import_from_mnemonic:
mnemonic = click.prompt("Enter your mnemonic phrase", hide_input=True)
EthAccount.enable_unaudited_hdwallet_features()
account = EthAccount.from_mnemonic(mnemonic, account_path=hd_path)
private_key = account.key.hex()

else:
private_key = click.prompt("Enter your private key", hide_input=True)

key_spec = ImportKeyRequest(
alias=alias_name,
description=description, # type: ignore
admins=administrators,
users=users,
)
key_id = kms_client.create_key(key_spec)
create_key_response = kms_client.get_parameters(key_id)
public_key = create_key_response["PublicKey"]
import_token = create_key_response["ImportToken"]
import_key_spec = ImportKey(
**key_spec.model_dump(),
key_id=key_id, # type: ignore
public_key=public_key, # type: ignore
private_key=private_key, # type: ignore
import_token=import_token, # type: ignore
)
try:
response = kms_client.import_key(import_key_spec)
if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
cli_ctx.abort(f"Key failed to import into KMS, {response['Error']}")
cli_ctx.logger.success(f"Key imported successfully with ID: {key_id}")
except Exception as e:
cli_ctx.logger.error(f"Key failed to import into KMS: {e}")


# TODO: Add `ape aws kms sign-message [message]`
# TODO: Add `ape aws kms verify-message [message] [hex-signature]`

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
"boto3>=1.34.79,<2",
"eth-ape>=0.8.2,<0.9",
"ecdsa>=0.19.0,<1",
"cryptography>=37.0.4,<38",
], # NOTE: Add 3rd party libraries here
entry_points={"ape_cli_subcommands": ["ape_aws=ape_aws._cli:cli"]},
python_requires=">=3.7,<4",
Expand Down

0 comments on commit 66695cf

Please sign in to comment.