Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extend cipher suite list #21

Merged
merged 13 commits into from
May 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 45 additions & 2 deletions edhoc/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import Callable, Any, TypeVar, NamedTuple

import cbor2
from cose.algorithms import AESCCM1664128, Sha256, EdDSA, AESCCM16128128, Es256
from cose.curves import X25519, Ed25519, P256
from cose.algorithms import AESCCM1664128, Sha256, EdDSA, AESCCM16128128, Es256, A128GCM, A256GCM, Sha384, Es384
from cose.curves import X25519, Ed25519, P256, P384

from edhoc.exceptions import EdhocException

Expand Down Expand Up @@ -109,6 +109,21 @@ def __ge__(self, other: 'CipherSuite'):
def __repr__(self):
return f'<{self.fullname}: {self.identifier}>'

@classmethod
def check_identifiers(cls):
"""Return the algorithm names for the suite in the sequence in which
they are printed in the EDHOC specification, for easy validation of the
classes."""
return (
cls.aead.identifier,
cls.hash.identifier,
cls.dh_curve.identifier,
cls.sign_alg.identifier,
cls.sign_curve.identifier,
cls.app_aead.identifier,
cls.app_hash.identifier,
)


@CipherSuite.register_ciphersuite()
class CipherSuite0(CipherSuite):
Expand Down Expand Up @@ -165,6 +180,34 @@ class CipherSuite3(CipherSuite):
app_aead = AESCCM1664128
app_hash = Sha256

@CipherSuite.register_ciphersuite()
class CipherSuite4(CipherSuite):
identifier = 4
fullname = "SUITE_4"

aead = A128GCM
hash = Sha256
dh_curve = X25519
sign_alg = Es256
sign_curve = P256
app_aead = A128GCM
app_hash = Sha256
assert CipherSuite4.check_identifiers() == (1, -16, 4, -7, 1, 1, -16)

@CipherSuite.register_ciphersuite()
class CipherSuite5(CipherSuite):
identifier = 5
fullname = "SUITE_5"

aead = A256GCM
hash = Sha384
dh_curve = P384
sign_alg = Es384
sign_curve = P384
app_aead = A256GCM
app_hash = Sha384
assert CipherSuite5.check_identifiers() == (3, -43, 2, -35, 2, 3, -43)


class EdhocKDFInfo(NamedTuple):
edhoc_aead_id: int
Expand Down
4 changes: 2 additions & 2 deletions edhoc/messages/message2.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ def encode(self, corr: Correlation) -> bytes:

def __repr__(self) -> str:
if self.conn_idi != b'':
output = f'<MessageOne: [{self.conn_idi}, {EdhocMessage._truncate(self.g_y)}, {self.conn_idr}, ' \
output = f'<MessageTwo: [{self.conn_idi}, {EdhocMessage._truncate(self.g_y)}, {self.conn_idr}, ' \
f'{self.ciphertext}>'
else:
output = f'<MessageOne: [{EdhocMessage._truncate(self.g_y)}, {self.conn_idr}, {self.ciphertext}>'
output = f'<MessageTwo: [{EdhocMessage._truncate(self.g_y)}, {self.conn_idr}, {self.ciphertext}>'

return output
109 changes: 58 additions & 51 deletions edhoc/roles/edhoc.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import functools
from abc import ABCMeta, abstractmethod
from binascii import hexlify
from typing import List, Dict, Optional, Callable, Union, Any, Type, TYPE_CHECKING
from typing import List, Dict, Optional, Callable, Union, Any, Type, TYPE_CHECKING, Tuple

import cbor2
from asn1crypto.x509 import Certificate
from cose import headers
from cose.curves import X25519, X448, P256
from cose.exceptions import CoseIllegalCurve
Expand All @@ -14,12 +13,13 @@
from cose.keys.keyparam import KpKeyOps, KpAlg
from cose.messages import Sign1Message, Enc0Message
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives import hashes, hmac, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import SECP256R1
from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey, X25519PublicKey
from cryptography.hazmat.primitives.asymmetric.x448 import X448PublicKey, X448PrivateKey
from cryptography.hazmat.primitives.kdf.hkdf import HKDFExpand
from cryptography.x509 import Certificate

from edhoc.definitions import CipherSuite, Method, EdhocKDFInfo, Correlation, EdhocState
from edhoc.exceptions import EdhocException
Expand All @@ -29,6 +29,7 @@
from edhoc.definitions import CS
from cose.keys.keyops import KEYOPS
from cose.keys.cosekey import CK
from cose.headers import CoseHeaderAttribute

RPK = Union[EC2Key, OKPKey]
CBOR = bytes
Expand All @@ -41,21 +42,21 @@ def __init__(self,
cred: Union[RPK, Certificate],
cred_id: CoseHeaderMap,
auth_key: RPK,
supported_ciphers: List['CS'],
supported_ciphers: List[Type['CS']],
conn_id: bytes,
peer_cred: Optional[Union[Callable[..., Union[RPK, Certificate]], RPK, Certificate]],
remote_cred_cb: Callable[[CoseHeaderMap], Union[Certificate, RPK]],
aad1_cb: Optional[Callable[..., bytes]],
aad2_cb: Optional[Callable[..., bytes]],
aad3_cb: Optional[Callable[..., bytes]],
ephemeral_key: Optional['CK'] = None):
"""
Abstract base class for the EDHOC Responder and Initiator roles.

:param cred: CBOR-encoded public authentication credentials.
:param cred_id: The credential identifier (a CBOR encoded COSE header map)
:param auth_key: The private authentication key (of type :class:`~cose.keys.ec2.EC2` or \
:class:`~cose.keys.okp.OKP`). Forms a key pair with `local_authkey`. # noqa: E501
:param supported_ciphers: A list of ciphers supported.
:param cred: An RPK (Raw Public Key) or certificate
:param cred_id: The credential identifier (a COSE header map)
:param auth_key: The private authentication key (of type :class:`~cose.keys.ec2.EC2Key` or \
:class:`~cose.keys.okp.OKPKey`). # noqa: E501
:param supported_ciphers: A list of supported ciphers of type :class:`edhoc.definitions.CipherSuite`.
:param conn_id: The connection identifier to be used.
:param aad1_cb: A callback to pass received additional data to the application protocol.
:param aad2_cb: A callback to pass additional data to the remote endpoint.
Expand All @@ -67,7 +68,9 @@ def __init__(self,
self.cred_id = cred_id
self.auth_key = auth_key
self.supported_ciphers = supported_ciphers
self._peer_cred, self._remote_authkey = self._parse_credentials(peer_cred)

self.remote_cred_cb = remote_cred_cb

self._conn_id = conn_id
self.aad1_cb = aad1_cb
self.aad2_cb = aad2_cb
Expand All @@ -90,16 +93,13 @@ def transcript(self, hash_func: Callable, hash_input: bytes) -> bytes:
return transcript.finalize()

def _signature_or_mac(self, mac: bytes, transcript: bytes, aad_cb: Callable[..., bytes]) -> bytes:

role = 'I' if type(self).__name__ == 'Initiator' else 'R'

if not self.is_static_dh(role):
if not self.is_static_dh(self.role):
cose_sign = Sign1Message(
phdr=self.cred_id,
uhdr={headers.Algorithm: self.cipher_suite.sign_alg},
payload=mac,
key=self.auth_key,
external_aad=self._external_aad(transcript, aad_cb))
external_aad=self._external_aad(self.cred, transcript, aad_cb))
return cose_sign.compute_signature()
else:
return mac
Expand Down Expand Up @@ -127,20 +127,23 @@ def shared_secret(private_key: 'CK', public_key: 'CK') -> bytes:
if public_key.crv == X25519:
d = X25519PrivateKey.from_private_bytes(private_key.d)
x = X25519PublicKey.from_public_bytes(public_key.x)
secret = d.exchange(x)
elif public_key.crv == X448:
d = X448PrivateKey.from_private_bytes(private_key.d)

x = X448PublicKey.from_public_bytes(public_key.x)
secret = d.exchange(x)
elif public_key.crv == P256:
d = ec.derive_private_key(int(hexlify(private_key.d), 16), SECP256R1(), default_backend())

x = ec.EllipticCurvePublicNumbers(int(hexlify(public_key.x), 16),
int(hexlify(public_key.y), 16),
SECP256R1())
x = x.public_key()
secret = d.exchange(ec.ECDH(), x)
else:
raise CoseIllegalCurve(f"{public_key.crv} is unsupported")

secret = d.exchange(x)
return secret

@property
Expand All @@ -150,13 +153,6 @@ def edhoc_state(self):
def exporter(self, label: str, length: int):
return self._hkdf_expand(length, label, self._prk4x3m, self._th4_input)

@property
@abstractmethod
def peer_cred(self):
""" Returns the peer's credentials, e.g. certificate. """

raise NotImplementedError()

@property
@abstractmethod
def corr(self) -> Correlation:
Expand Down Expand Up @@ -229,13 +225,6 @@ def local_pubkey(self) -> RPK:

raise NotImplementedError()

@property
@abstractmethod
def remote_authkey(self) -> RPK:
""" The remote public authentication key. """

raise NotImplementedError()

@property
@abstractmethod
def local_authkey(self) -> RPK:
Expand Down Expand Up @@ -324,6 +313,8 @@ def _prk4x3m_static_dh(self, prk: bytes):
raise NotImplementedError()

def _mac(self,
cred_id: CoseHeaderMap,
cred,
hkdf: Callable,
key_label: str,
key_len: int,
Expand All @@ -338,11 +329,11 @@ def _mac(self,

# calculate the mac using a COSE_Encrypt0 message
return Enc0Message(
phdr=self.cred_id,
phdr=cred_id,
uhdr={headers.IV: iv_bytes, headers.Algorithm: self.cipher_suite.aead},
payload=b'',
key=cose_key,
external_aad=self._external_aad(th_input, aad_cb)
external_aad=self._external_aad(cred, th_input, aad_cb)
).encrypt()

def _create_cose_key(self, hkdf, key_len: int, label: str, prk: bytes, ops: List[Type['KEYOPS']]) -> SymmetricKey:
Expand All @@ -351,9 +342,25 @@ def _create_cose_key(self, hkdf, key_len: int, label: str, prk: bytes, ops: List
optional_params={KpKeyOps: ops, KpAlg: self.cipher_suite.aead}
)

def _external_aad(self, transcript: bytes, aad_cb: Callable[..., bytes]) -> CBOR:
def _external_aad(self, cred: Union[Certificate, RPK], transcript: bytes, aad_cb: Callable[..., bytes]) -> CBOR:
"""Build an unserialized external AAD out of a transcript hash, a cred
and AAD data.

aad = [cbor2.dumps(self.transcript(self.cipher_suite.hash.hash_cls, transcript)), self.cred]
As its format is shared among messages, the cred needs to be picked
suitably for the message (CRED_R in message 2, CRED_I in message 3);
depending on whether this is used in a creating or a verifying
capacity, self.cred or self.remote_cred needs to be passed in.
"""
if isinstance(cred, OKPKey) or isinstance(cred, EC2Key):
encoded_credential = cred.encode()
elif isinstance(cred, Certificate):
encoded_credential = cbor2.dumps(cred.tbs_certificate_bytes)
else:
# TODO: this shouldn't be here, but since somes of the test vectors are not real certificates we need
# this hack
encoded_credential = cbor2.dumps(cred)

aad = [cbor2.dumps(self.transcript(self.cipher_suite.hash.hash_cls, transcript)), encoded_credential]

if aad_cb is not None:
ad = aad_cb()
Expand All @@ -364,15 +371,6 @@ def _external_aad(self, transcript: bytes, aad_cb: Callable[..., bytes]) -> CBOR
aad = b"".join(aad)
return aad

def _verify_signature(self, signature: bytes) -> bool:
_ = signature

if self.peer_cred is None:
return True
else:
# TODO: needs valid CBOR certificate decoding
return True

@abstractmethod
def _decrypt(self, ciphertext: bytes) -> bool:
raise NotImplementedError()
Expand Down Expand Up @@ -417,16 +415,25 @@ def _generate_ephemeral_key(self) -> None:
self.ephemeral_key = EC2Key.generate_key(crv=chosen_suite.dh_curve)

@staticmethod
def _parse_credentials(cred: Union[RPK, 'Certificate']):
def _parse_credentials(cred: Union[RPK, 'Certificate']) -> Tuple[Union[Certificate, RPK], RPK]:
"""
Internal helper function that parser credentials and extracts the public key.
"""
if isinstance(cred, EC2Key) or isinstance(cred, OKPKey):
cred, public_auth_key = cred, cred

cred, auth_key = cred, cred
elif isinstance(cred, Certificate):
cred, public_auth_key = cred, Certificate.public_key
cred, auth_key = cred, cred.public_key().public_bytes(serialization.Encoding.Raw,
serialization.PublicFormat.Raw)
elif isinstance(cred, tuple):
# TODO this will be removed later on, currently here because test vectors do not provide valid certificates
cred, public_auth_key = cred
cred, auth_key = cred
else:
raise EdhocException("Invalid credentials")

return cred, public_auth_key
return cred, auth_key

@classmethod
def _custom_cbor_encoder(cls, encoder, cose_attribute: 'CoseHeaderAttribute'):
encoder.encode(cose_attribute.identifier)

def _populate_remote_details(self, remote_cred_id):
self.remote_cred, self.remote_authkey = self._parse_credentials(self.remote_cred_cb(remote_cred_id))
Loading