diff --git a/src/nacl/public.py b/src/nacl/public.py index ef2c2517c..3e6179db5 100644 --- a/src/nacl/public.py +++ b/src/nacl/public.py @@ -11,7 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import ClassVar, Generic, Optional, Type, TypeVar +from abc import ABCMeta, abstractmethod +from typing import ClassVar, Generic, Optional, Tuple, Type, TypeVar import nacl.bindings from nacl import encoding @@ -40,7 +41,9 @@ def __init__( ): self._public_key = encoder.decode(public_key) if not isinstance(self._public_key, bytes): - raise exc.TypeError("PublicKey must be created from 32 bytes") + raise exc.TypeError( + "PublicKey must be created from {} bytes".format(self.SIZE) + ) if len(self._public_key) != self.SIZE: raise exc.ValueError( @@ -421,3 +424,454 @@ def decrypt( ) return plaintext + + +class PublicKx(encoding.Encodable, StringFixer): + """ + The public key counterpart to an Curve25519 :class:`nacl.public.PrivateKx` + for encrypting messages. + + :param public_key: [:class:`bytes`] Encoded Curve25519 public key + :param encoder: A class that is able to decode the `public_key` + + :cvar SIZE: The size that the public key is required to be + """ + + SIZE: ClassVar[int] = nacl.bindings.crypto_kx_PUBLIC_KEY_BYTES + + def __init__( + self, + public_key: bytes, + encoder: encoding.Encoder = encoding.RawEncoder, + ): + self._public_key = encoder.decode(public_key) + if not isinstance(self._public_key, bytes): + raise exc.TypeError( + "PublicKx must be created from {} bytes".format(self.SIZE) + ) + + if len(self._public_key) != self.SIZE: + raise exc.ValueError( + "The public key must be exactly {} bytes long".format( + self.SIZE + ) + ) + + def __bytes__(self) -> bytes: + return self._public_key + + def __hash__(self) -> int: + return hash(bytes(self)) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, self.__class__): + return False + return nacl.bindings.sodium_memcmp(bytes(self), bytes(other)) + + def __ne__(self, other: object) -> bool: + return not (self == other) + + +class PrivateKx(encoding.Encodable, StringFixer): + """ + Private key for decrypting messages using the Curve25519 algorithm. + + .. warning:: This **must** be protected and remain secret. Anyone who + knows the value of your :class:`~nacl.public.PrivateKx` can decrypt + any message encrypted by the corresponding + :class:`~nacl.public.PublicKx` + + :param private_key: The private key used to decrypt messages + :param encoder: The encoder class used to decode the given keys + + :cvar SIZE: The size that the private key is required to be + :cvar SEED_SIZE: The size that the seed used to generate the + private key is required to be + """ + + SIZE: ClassVar[int] = nacl.bindings.crypto_kx_SECRET_KEY_BYTES + SEED_SIZE: ClassVar[int] = nacl.bindings.crypto_kx_SEED_BYTES + + def __init__( + self, + private_key: bytes, + encoder: encoding.Encoder = encoding.RawEncoder, + ): + # Decode the secret_key + private_key = encoder.decode(private_key) + # verify the given secret key type and size are correct + if not ( + isinstance(private_key, bytes) and len(private_key) == self.SIZE + ): + raise exc.TypeError( + ( + "PrivateKx must be created from a {} " + "bytes long raw secret key" + ).format(self.SIZE) + ) + + raw_public_key = nacl.bindings.crypto_scalarmult_base(private_key) + + self._private_key = private_key + self.public_key = PublicKx(raw_public_key) + + @classmethod + def from_seed( + cls, + seed: bytes, + encoder: encoding.Encoder = encoding.RawEncoder, + ) -> "PrivateKx": + """ + Generate a PrivateKx using a deterministic construction + starting from a caller-provided seed + + .. warning:: The seed **must** be high-entropy; therefore, + its generator **must** be a cryptographic quality + random function like, for example, :func:`~nacl.utils.random`. + + .. warning:: The seed **must** be protected and remain secret. + Anyone who knows the seed is really in possession of + the corresponding PrivateKx. + + :param seed: The seed used to generate the private key + :rtype: :class:`~nacl.public.PrivateKx` + """ + # decode the seed + seed = encoder.decode(seed) + # Verify the given seed type and size are correct + if not (isinstance(seed, bytes) and len(seed) == cls.SEED_SIZE): + raise exc.TypeError( + ( + "PrivateKx seed must be a {} bytes long " "binary sequence" + ).format(cls.SEED_SIZE) + ) + # generate a raw key pair from the given seed + raw_pk, raw_sk = nacl.bindings.crypto_kx_seed_keypair(seed) + # construct a instance from the raw secret key + return cls(raw_sk) + + def __bytes__(self) -> bytes: + return self._private_key + + def __hash__(self) -> int: + return hash((type(self), bytes(self.public_key))) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, self.__class__): + return False + return self.public_key == other.public_key + + def __ne__(self, other: object) -> bool: + return not (self == other) + + @classmethod + def generate(cls) -> "PrivateKx": + """ + Generates a random :class:`~nacl.public.PrivateKx` object + + :rtype: :class:`~nacl.public.PrivateKx` + """ + return cls(random(PrivateKx.SIZE), encoder=encoding.RawEncoder) + + +AeadKx = TypeVar("AeadKx", bound="_AeadKx") + + +class _AeadKx(metaclass=ABCMeta): + """ + The _AeadKx class serves as the base class for + :class:`~nacl.public.AeadClient` and + :class:`~nacl.public.AeadServer. + + :param private_key: :class:`~nacl.public.PrivateKx` used to encrypt and + decrypt messages + :param public_key: :class:`~nacl.public.PublicKx` used to encrypt and + decrypt messages + + :cvar NONCE_SIZE: The size that the nonce is required to be. + """ + + NONCE_SIZE: ClassVar[int] = ( + nacl.bindings.crypto_aead_xchacha20poly1305_ietf_NPUBBYTES + ) + _rx_key: bytes + _tx_key: bytes + + def __init__(self, private_key: PrivateKx, public_key: PublicKx): + if not isinstance(private_key, PrivateKx) or not isinstance( + public_key, PublicKx + ): + raise exc.TypeError( + "{} must be created from a PrivateKx and a PublicKx".format( + self.__class__.__name__ + ) + ) + self._rx_key, self._tx_key = self._kx_session_keys( + private_key, public_key + ) + + @abstractmethod + def _kx_session_keys( + self, private_key: PrivateKx, public_key: PublicKx + ) -> Tuple[bytes, bytes]: + """Computes rx and tx keys""" + + @classmethod + @abstractmethod + def decode( + cls: Type[AeadKx], + encoded: bytes, + encoder: Encoder = encoding.RawEncoder, + ) -> AeadKx: + """Decodes from encoded bytes""" + + def __bytes__(self) -> bytes: + return self._rx_key + self._tx_key + + @classmethod + def _decode( + cls: Type[AeadKx], + encoded: bytes, + encoder: Encoder = encoding.RawEncoder, + ) -> AeadKx: + """ + Alternative constructor. Creates from rx key + tx key. + """ + # Create an empty class + aeadKx = cls.__new__(cls) + + # Assign our decoded value to both keys + rx_key = encoded[: nacl.bindings.crypto_kx_SESSION_KEY_BYTES] + tx_key = encoded[nacl.bindings.crypto_kx_SESSION_KEY_BYTES :] + aeadKx._rx_key = encoder.decode(rx_key) + aeadKx._tx_key = encoder.decode(tx_key) + + return aeadKx + + def encrypt( + self, + plaintext: bytes, + aad: bytes = b"", + nonce: Optional[bytes] = None, + encoder: encoding.Encoder = encoding.RawEncoder, + ) -> EncryptedMessage: + """ + Encrypts the plaintext message using the given `nonce` (or generates + one randomly if omitted) and returns the ciphertext encoded with the + encoder. + + .. warning:: It is vitally important for :param nonce: to be unique. + By default, it is generated randomly; [:class:`_AeadKx`] uses XChacha20 + for extended (192b) nonce size, so the risk of reusing random nonces + is negligible. It is *strongly recommended* to keep this behaviour, + as nonce reuse will compromise the privacy of encrypted messages. + Should implicit nonces be inadequate for your application, the + second best option is using split counters; e.g. if sending messages + encrypted under a shared key between 2 users, each user can use the + number of messages it sent so far, prefixed or suffixed with a 1bit + user id. Note that the counter must **never** be rolled back (due + to overflow, on-disk state being rolled back to an earlier backup, + ...) + + :param plaintext: [:class:`bytes`] The plaintext message to encrypt + :param aad: [:class:`bytes`] additional authenticated data + :param nonce: [:class:`bytes`] The nonce to use in the encryption + :param encoder: The encoder to use to encode the ciphertext + :rtype: [:class:`nacl.utils.EncryptedMessage`] + """ + if nonce is None: + nonce = random(self.NONCE_SIZE) + + if len(nonce) != self.NONCE_SIZE: + raise exc.ValueError( + "The nonce must be exactly %s bytes long" % self.NONCE_SIZE + ) + + ciphertext = nacl.bindings.crypto_aead_xchacha20poly1305_ietf_encrypt( + plaintext, aad, nonce, self._tx_key + ) + + encoded_nonce = encoder.encode(nonce) + encoded_ciphertext = encoder.encode(ciphertext) + + return EncryptedMessage._from_parts( + encoded_nonce, + encoded_ciphertext, + encoder.encode(nonce + ciphertext), + ) + + def _decrypt( + self, + ciphertext: bytes, + key: bytes, + aad: bytes = b"", + nonce: Optional[bytes] = None, + encoder: encoding.Encoder = encoding.RawEncoder, + ) -> bytes: + ciphertext = encoder.decode(ciphertext) + + if nonce is None: + # If we were given the nonce and ciphertext combined, split them. + nonce = ciphertext[: self.NONCE_SIZE] + ciphertext = ciphertext[self.NONCE_SIZE :] + + if len(nonce) != self.NONCE_SIZE: + raise exc.ValueError( + "The nonce must be exactly %s bytes long" % self.NONCE_SIZE + ) + + plaintext = nacl.bindings.crypto_aead_xchacha20poly1305_ietf_decrypt( + ciphertext, aad, nonce, key + ) + + return plaintext + + def decrypt( + self, + ciphertext: bytes, + aad: bytes = b"", + nonce: Optional[bytes] = None, + encoder: encoding.Encoder = encoding.RawEncoder, + ) -> bytes: + """ + Decrypts the ciphertext using the `nonce` (explicitly, when passed as a + parameter or implicitly, when omitted, as part of the ciphertext) and + returns the plaintext message. + + :param ciphertext: [:class:`bytes`] The encrypted message to decrypt + :param aad: [:class:`bytes`] additional authenticated data + :param nonce: [:class:`bytes`] The nonce used when encrypting the + ciphertext + :param encoder: The encoder used to decode the ciphertext. + :rtype: [:class:`bytes`] + """ + # Decode our ciphertext + return self._decrypt(ciphertext, self._rx_key, aad, nonce, encoder) + + def decrypt_beforetx( + self, + ciphertext: bytes, + aad: bytes = b"", + nonce: Optional[bytes] = None, + encoder: encoding.Encoder = encoding.RawEncoder, + ) -> bytes: + """ + Decrypts the ciphertext using the `nonce` (explicitly, when passed as a + parameter or implicitly, when omitted, as part of the ciphertext) and + returns the plaintext message. + + :param ciphertext: [:class:`bytes`] The encrypted message to decrypt + :param aad: [:class:`bytes`] additional authenticated data + :param nonce: [:class:`bytes`] The nonce used when encrypting the + ciphertext + :param encoder: The encoder used to decode the ciphertext. + :rtype: [:class:`bytes`] + """ + # Decode our ciphertext + return self._decrypt(ciphertext, self._tx_key, aad, nonce, encoder) + + def rx_key(self) -> bytes: + """ + Returns the Curve25519 rx secret + + .. warning:: It is **VITALLY** important that you use a nonce with your + symmetric cipher. If you fail to do this, you compromise the + privacy of the messages encrypted. Ensure that the key length of + your cipher is 32 bytes. + :rtype: [:class:`bytes`] + """ + return self._rx_key + + def tx_key(self) -> bytes: + """ + Returns the Curve25519 tx secret + + .. warning:: It is **VITALLY** important that you use a nonce with your + symmetric cipher. If you fail to do this, you compromise the + privacy of the messages encrypted. Ensure that the key length of + your cipher is 32 bytes. + :rtype: [:class:`bytes`] + """ + return self._tx_key + + +_AeadClient = TypeVar("_AeadClient", bound="AeadClient") + + +class AeadClient(_AeadKx, encoding.Encodable, StringFixer): + """ + The AeadClient class boxes and unboxes messages between a pair of keys + + The ciphertexts generated by :class:`~nacl.public.AeadClient` include a 16 + byte authenticator which is checked as part of the decryption. An invalid + authenticator will cause the decrypt function to raise an exception. The + authenticator is not a signature. Once you've decrypted the message you've + demonstrated the ability to create arbitrary valid message, so messages you + send are repudiable. For non-repudiable messages, sign them after + encryption. + + :param private_key: :class:`~nacl.public.PrivateKx` used to encrypt and + decrypt messages + :param public_key: :class:`~nacl.public.PublicKx` used to encrypt and + decrypt messages + + :cvar NONCE_SIZE: The size that the nonce is required to be. + """ + + def _kx_session_keys( + self, private_key: PrivateKx, public_key: PublicKx + ) -> Tuple[bytes, bytes]: + return nacl.bindings.crypto_kx_client_session_keys( + private_key.public_key.encode(encoder=encoding.RawEncoder), + private_key.encode(encoder=encoding.RawEncoder), + public_key.encode(encoder=encoding.RawEncoder), + ) + + @classmethod + def decode( + cls: Type[_AeadClient], + encoded: bytes, + encoder: Encoder = encoding.RawEncoder, + ) -> _AeadClient: + return cls._decode(encoded, encoder) + + +_AeadServer = TypeVar("_AeadServer", bound="AeadServer") + + +class AeadServer(_AeadKx, encoding.Encodable, StringFixer): + """ + The AeadServer class boxes and unboxes messages between a pair of keys + + The ciphertexts generated by :class:`~nacl.public.AeadServer` include a 16 + byte authenticator which is checked as part of the decryption. An invalid + authenticator will cause the decrypt function to raise an exception. The + authenticator is not a signature. Once you've decrypted the message you've + demonstrated the ability to create arbitrary valid message, so messages you + send are repudiable. For non-repudiable messages, sign them after + encryption. + + :param private_key: :class:`~nacl.public.PrivateKx` used to encrypt and + decrypt messages + :param public_key: :class:`~nacl.public.PublicKx` used to encrypt and + decrypt messages + + :cvar NONCE_SIZE: The size that the nonce is required to be. + """ + + def _kx_session_keys( + self, private_key: PrivateKx, public_key: PublicKx + ) -> Tuple[bytes, bytes]: + return nacl.bindings.crypto_kx_server_session_keys( + private_key.public_key.encode(encoder=encoding.RawEncoder), + private_key.encode(encoder=encoding.RawEncoder), + public_key.encode(encoder=encoding.RawEncoder), + ) + + @classmethod + def decode( + cls: Type[_AeadServer], + encoded: bytes, + encoder: Encoder = encoding.RawEncoder, + ) -> _AeadServer: + return cls._decode(encoded, encoder) diff --git a/tests/data/kx_from_seed.txt b/tests/data/kx_from_seed.txt new file mode 100644 index 000000000..d3d27743a --- /dev/null +++ b/tests/data/kx_from_seed.txt @@ -0,0 +1,2 @@ +# Fmt: || +7adf0494dc2a0f9abfa45a7d93de7a7176c178c242aa082732cebbbf60416282 d3afc9ad5b5a0305dcc45cced25d2039d006039a765074f3f155a093e8388038e8ec24037f3f3890d01a00e00402ac479081d84b3338563f141fba3462db8415 diff --git a/tests/test_aeadkx.py b/tests/test_aeadkx.py new file mode 100644 index 000000000..7b5857666 --- /dev/null +++ b/tests/test_aeadkx.py @@ -0,0 +1,518 @@ +# Copyright 2013 Donald Stufft and individual contributors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import binascii + +import pytest + +from nacl.encoding import HexEncoder +from nacl.exceptions import CryptoError +from nacl.public import AeadClient, AeadServer, PrivateKx, PublicKx +from nacl.utils import random + +from .test_bindings import _kx_from_seed_vectors +from .utils import check_type_error + +VECTORS = [ + # privalice, pubalice, privbob, pubbob, nonce, plaintext, + # ciphertext_c, ciphertext_s + ( + b"77076d0a7318a57d3c16c17251b26645df4c2f87ebc0992ab177fba51db92c2a", + b"8520f0098930a754748b7ddcb43ef75a0dbf3a0d26381af4eba4a98eaa9b4e6a", + b"5dab087e624a8a4b79e17f8b83800ee66f3bb1292618b6fd1c2f8b27ff88e0eb", + b"de9edb7d7b7dc1b4d35b61c2ece435373f8343c85b78674dadfc7e146f882b4f", + b"69696ee955b62b73cd62bda875fc73d68219e0036b7a0b37", + ( + b"be075fc53c81f2d5cf141316ebeb0c7b5228c52a4c62cbd44b66849b64244ffce5e" + b"cbaaf33bd751a1ac728d45e6c61296cdc3c01233561f41db66cce314adb310e3be8" + b"250c46f06dceea3a7fa1348057e2f6556ad6b1318a024a838f21af1fde048977eb4" + b"8f59ffd4924ca1c60902e52f0a089bc76897040e082f937763848645e0705" + ), + ( + b"353a672c2752c5b3f0ebbfd9f22ead181e3cd51e46f64cd1d48f6ddeb85c0f3eb1c" + b"d339a9da09078d1cef5c723ae37f83027aae107e182507a43573b491655afc0376e" + b"dcbf8a75d586b9691b32f9c5966f136b35135ca274247696ad0294a1e4afe229b72" + b"47419150189627d13b265b9bc16dbd40a1d4a2633ae97aeb240ce57eb1727515d09" + b"96bdabc2205c03e547feceb276" + ), + ( + b"8f9bb2a56563446b050324a9c4c7089920d68ff0097e1e679b9a8a2f0c7aa2f5620" + b"afcf7b7c487883f819eb3aec5a119a045c7ab0240dbe992b9c5706ead15ca289294" + b"9fc1ec97f1221802d85d47eb05ebd5d9871eed00eeef7ee2aae41536f029ce8aaaf" + b"1664a1244236e3602abc96ceb59247f6c2ac41a993c939ffa6db04418d4e7ee065c" + b"f13d95069231c3d949ed6c2c45" + ), + ), +] + + +def test_generate_private_key(): + PrivateKx.generate() + + +def test_generate_private_key_from_random_seed(): + PrivateKx.from_seed(random(PrivateKx.SEED_SIZE)) + + +@pytest.mark.parametrize( + ("seed", "public_key", "secret_key"), _kx_from_seed_vectors() +) +def test_generate_private_key_from_seed( + seed: bytes, public_key: bytes, secret_key: bytes +): + prvt = PrivateKx.from_seed(seed, encoder=HexEncoder) + sk = binascii.unhexlify(secret_key) + pk = binascii.unhexlify(public_key) + assert bytes(prvt) == sk + assert bytes(prvt.public_key) == pk + + +def test_aeadkx_creation(): + pub = PublicKx( + b"ec2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + priv = PrivateKx( + b"5c2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + AeadClient(priv, pub) + AeadServer(priv, pub) + + +def test_aeadkx_decode(): + pub = PublicKx( + b"ec2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + priv = PrivateKx( + b"5c2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + + c1 = AeadClient(priv, pub) + c2 = AeadClient.decode(c1.encode()) + assert c1._tx_key == c2._tx_key + assert c1._rx_key == c2._rx_key + + s1 = AeadServer(priv, pub) + s2 = AeadServer.decode(s1.encode()) + assert s1._tx_key == s2._tx_key + assert s1._rx_key == s2._rx_key + + +def test_aeadkx_bytes(): + pub = PublicKx( + b"ec2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + priv = PrivateKx( + b"5c2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + + c = AeadClient(priv, pub) + assert bytes(c) == c.encode() + + s = AeadServer(priv, pub) + assert bytes(s) == s.encode() + + +@pytest.mark.parametrize( + ( + "privalice", + "pubalice", + "privbob", + "pubbob", + "nonce", + "plaintext", + "ciphertext_c", + "ciphertext_s", + ), + VECTORS, +) +def test_aeadkx_encryption( + privalice: bytes, + pubalice: bytes, + privbob: bytes, + pubbob: bytes, + nonce: bytes, + plaintext: bytes, + ciphertext_c: bytes, + ciphertext_s: bytes, +): + pubalice_decoded = PublicKx(pubalice, encoder=HexEncoder) + privbob_decoded = PrivateKx(privbob, encoder=HexEncoder) + pubbob_decoded = PublicKx(pubbob, encoder=HexEncoder) + privalice_decoded = PrivateKx(privalice, encoder=HexEncoder) + + c = AeadClient(privbob_decoded, pubalice_decoded) + s = AeadServer(privalice_decoded, pubbob_decoded) + + encrypted_c = c.encrypt( + binascii.unhexlify(plaintext), + b"", + binascii.unhexlify(nonce), + encoder=HexEncoder, + ) + + encrypted_s = s.encrypt( + binascii.unhexlify(plaintext), + b"", + binascii.unhexlify(nonce), + encoder=HexEncoder, + ) + + expected_c = binascii.hexlify( + binascii.unhexlify(nonce) + binascii.unhexlify(ciphertext_c), + ) + + expected_s = binascii.hexlify( + binascii.unhexlify(nonce) + binascii.unhexlify(ciphertext_s), + ) + + assert encrypted_c == expected_c + assert encrypted_s == expected_s + assert encrypted_c.nonce == encrypted_s.nonce == nonce + assert encrypted_c.ciphertext == ciphertext_c + assert encrypted_s.ciphertext == ciphertext_s + + +@pytest.mark.parametrize( + ( + "privalice", + "pubalice", + "privbob", + "pubbob", + "nonce", + "plaintext", + "ciphertext_c", + "ciphertext_s", + ), + VECTORS, +) +def test_aeadkx_decryption( + privalice: bytes, + pubalice: bytes, + privbob: bytes, + pubbob: bytes, + nonce: bytes, + plaintext: bytes, + ciphertext_c: bytes, + ciphertext_s: bytes, +): + pubalice_decoded = PublicKx(pubalice, encoder=HexEncoder) + privbob_decoded = PrivateKx(privbob, encoder=HexEncoder) + pubbob_decoded = PublicKx(pubbob, encoder=HexEncoder) + privalice_decoded = PrivateKx(privalice, encoder=HexEncoder) + + c = AeadClient(privbob_decoded, pubalice_decoded) + s = AeadServer(privalice_decoded, pubbob_decoded) + + nonce = binascii.unhexlify(nonce) + decrypted_c1 = binascii.hexlify( + c.decrypt(ciphertext_s, b"", nonce, encoder=HexEncoder), + ) + + decrypted_c2 = binascii.hexlify( + c.decrypt_beforetx(ciphertext_c, b"", nonce, encoder=HexEncoder), + ) + + decrypted_s1 = binascii.hexlify( + s.decrypt(ciphertext_c, b"", nonce, encoder=HexEncoder), + ) + + decrypted_s2 = binascii.hexlify( + s.decrypt_beforetx(ciphertext_s, b"", nonce, encoder=HexEncoder), + ) + + assert ( + decrypted_c1 + == decrypted_c2 + == decrypted_s1 + == decrypted_s2 + == plaintext + ) + + +@pytest.mark.parametrize( + ( + "privalice", + "pubalice", + "privbob", + "pubbob", + "nonce", + "plaintext", + "ciphertext_c", + "ciphertext_s", + ), + VECTORS, +) +def test_aeadkx_decryption_combined( + privalice: bytes, + pubalice: bytes, + privbob: bytes, + pubbob: bytes, + nonce: bytes, + plaintext: bytes, + ciphertext_c: bytes, + ciphertext_s: bytes, +): + pubalice_decoded = PublicKx(pubalice, encoder=HexEncoder) + privbob_decoded = PrivateKx(privbob, encoder=HexEncoder) + pubbob_decoded = PublicKx(pubbob, encoder=HexEncoder) + privalice_decoded = PrivateKx(privalice, encoder=HexEncoder) + + c = AeadClient(privbob_decoded, pubalice_decoded) + s = AeadServer(privalice_decoded, pubbob_decoded) + + combined_c = binascii.hexlify( + binascii.unhexlify(nonce) + binascii.unhexlify(ciphertext_c), + ) + combined_s = binascii.hexlify( + binascii.unhexlify(nonce) + binascii.unhexlify(ciphertext_s), + ) + + decrypted_c1 = binascii.hexlify(c.decrypt(combined_s, encoder=HexEncoder)) + decrypted_s1 = binascii.hexlify(s.decrypt(combined_c, encoder=HexEncoder)) + decrypted_c2 = binascii.hexlify( + c.decrypt_beforetx(combined_c, encoder=HexEncoder) + ) + decrypted_s2 = binascii.hexlify( + s.decrypt_beforetx(combined_s, encoder=HexEncoder) + ) + + assert ( + decrypted_c1 + == decrypted_c2 + == decrypted_s1 + == decrypted_s2 + == plaintext + ) + + +@pytest.mark.parametrize( + ( + "privalice", + "pubalice", + "privbob", + "pubbob", + "nonce", + "plaintext", + "ciphertext_c", + "ciphertext_s", + ), + VECTORS, +) +def test_aeadkx_optional_nonce( + privalice: bytes, + pubalice: bytes, + privbob: bytes, + pubbob: bytes, + nonce: bytes, + plaintext: bytes, + ciphertext_c: bytes, + ciphertext_s: bytes, +): + pubalice_decoded = PublicKx(pubalice, encoder=HexEncoder) + privbob_decoded = PrivateKx(privbob, encoder=HexEncoder) + pubbob_decoded = PublicKx(pubbob, encoder=HexEncoder) + privalice_decoded = PrivateKx(privalice, encoder=HexEncoder) + + c = AeadClient(privbob_decoded, pubalice_decoded) + s = AeadServer(privalice_decoded, pubbob_decoded) + + encrypted = c.encrypt(binascii.unhexlify(plaintext), encoder=HexEncoder) + + decrypted = binascii.hexlify(s.decrypt(encrypted, encoder=HexEncoder)) + + assert decrypted == plaintext + + +@pytest.mark.parametrize( + ( + "privalice", + "pubalice", + "privbob", + "pubbob", + "nonce", + "plaintext", + "ciphertext_c", + "ciphertext_s", + ), + VECTORS, +) +def test_aeadkx_encryption_generates_different_nonces( + privalice: bytes, + pubalice: bytes, + privbob: bytes, + pubbob: bytes, + nonce: bytes, + plaintext: bytes, + ciphertext_c: bytes, + ciphertext_s: bytes, +): + pubalice_decoded = PublicKx(pubalice, encoder=HexEncoder) + privbob_decoded = PrivateKx(privbob, encoder=HexEncoder) + + c = AeadClient(privbob_decoded, pubalice_decoded) + + nonce_0 = c.encrypt( + binascii.unhexlify(plaintext), encoder=HexEncoder + ).nonce + + nonce_1 = c.encrypt( + binascii.unhexlify(plaintext), encoder=HexEncoder + ).nonce + + assert nonce_0 != nonce_1 + + +@pytest.mark.parametrize( + ( + "privalice", + "pubalice", + "privbob", + "pubbob", + "nonce", + "plaintext", + "ciphertext_c", + "ciphertext_s", + ), + VECTORS, +) +def test_box_failed_decryption( + privalice: bytes, + pubalice: bytes, + privbob: bytes, + pubbob: bytes, + nonce: bytes, + plaintext: bytes, + ciphertext_c: bytes, + ciphertext_s: bytes, +): + privbob_decoded = PrivateKx(privbob, encoder=HexEncoder) + pubbob_decoded = PublicKx(pubbob, encoder=HexEncoder) + + # this cannot decrypt the ciphertext! the ciphertext must be decrypted by + # (privalice, pubbob) or (privbob, pubalice) + c = AeadClient(privbob_decoded, pubbob_decoded) + + with pytest.raises(CryptoError): + c.decrypt_beforetx( + ciphertext_c, b"", binascii.unhexlify(nonce), encoder=HexEncoder + ) + + +def test_aeadkx_wrong_length(): + with pytest.raises(ValueError): + PublicKx(b"") + + with pytest.raises(TypeError): + PrivateKx(b"") + + pub = PublicKx( + b"ec2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + priv = PrivateKx( + b"5c2bee2d5be613ca82e377c96a0bf2220d823ce980cdff6279473edc52862798", + encoder=HexEncoder, + ) + + c = AeadClient(priv, pub) + + with pytest.raises(ValueError): + c.encrypt(b"", b"", b"") + + with pytest.raises(ValueError): + c.decrypt(b"", b"", b"") + + +def test_wrong_types(): + priv = PrivateKx.generate() + + check_type_error( + ("PrivateKx must be created from a 32 bytes long raw secret key"), + PrivateKx, + 12, + ) + check_type_error( + ("PrivateKx must be created from a 32 bytes long raw secret key"), + PrivateKx, + priv, + ) + check_type_error( + ("PrivateKx must be created from a 32 bytes long raw secret key"), + PrivateKx, + priv.public_key, + ) + + check_type_error("PublicKx must be created from 32 bytes", PublicKx, 13) + check_type_error("PublicKx must be created from 32 bytes", PublicKx, priv) + check_type_error( + "PublicKx must be created from 32 bytes", PublicKx, priv.public_key + ) + + check_type_error( + "AeadClient must be created from a PrivateKx and a PublicKx", + AeadClient, + priv, + "not a public key", + ) + check_type_error( + "AeadClient must be created from a PrivateKx and a PublicKx", + AeadClient, + priv.encode(), + priv.public_key.encode(), + ) + check_type_error( + "AeadClient must be created from a PrivateKx and a PublicKx", + AeadClient, + priv, + priv.public_key.encode(), + ) + check_type_error( + "AeadClient must be created from a PrivateKx and a PublicKx", + AeadClient, + priv.encode(), + priv.public_key, + ) + check_type_error( + "AeadServer must be created from a PrivateKx and a PublicKx", + AeadServer, + priv, + "not a public key", + ) + check_type_error( + "AeadServer must be created from a PrivateKx and a PublicKx", + AeadServer, + priv.encode(), + priv.public_key.encode(), + ) + check_type_error( + "AeadServer must be created from a PrivateKx and a PublicKx", + AeadServer, + priv, + priv.public_key.encode(), + ) + check_type_error( + "AeadServer must be created from a PrivateKx and a PublicKx", + AeadServer, + priv.encode(), + priv.public_key, + ) + + check_type_error("seed must be a 32 bytes long", PrivateKx.from_seed, b"1") diff --git a/tests/test_bindings.py b/tests/test_bindings.py index a89c361c8..f10c233e4 100644 --- a/tests/test_bindings.py +++ b/tests/test_bindings.py @@ -405,6 +405,20 @@ def _box_from_seed_vectors() -> List[Tuple[bytes, bytes, bytes]]: ] +def _kx_from_seed_vectors() -> List[Tuple[bytes, bytes, bytes]]: + # Fmt: || + DATA = "kx_from_seed.txt" + lines = read_crypto_test_vectors(DATA, maxels=2, delimiter=b"\t") + return [ + ( + x[0], # seed + x[1][:64], # derived public key + x[1][64:], # derived secret key + ) + for x in lines + ] + + @pytest.mark.parametrize( ("seed", "public_key", "secret_key"), _box_from_seed_vectors() ) diff --git a/tests/test_public.py b/tests/test_public.py index eb4201f62..27f2b6fc8 100644 --- a/tests/test_public.py +++ b/tests/test_public.py @@ -16,8 +16,21 @@ import pytest -from nacl.bindings import crypto_box_PUBLICKEYBYTES, crypto_box_SECRETKEYBYTES -from nacl.public import Box, PrivateKey, PublicKey +from nacl.bindings import ( + crypto_box_PUBLICKEYBYTES, + crypto_box_SECRETKEYBYTES, + crypto_kx_PUBLIC_KEY_BYTES, + crypto_kx_SECRET_KEY_BYTES, +) +from nacl.public import ( + AeadClient, + AeadServer, + Box, + PrivateKey, + PrivateKx, + PublicKey, + PublicKx, +) from nacl.utils import random from .utils import assert_equal, assert_not_equal @@ -168,3 +181,159 @@ def test_equivalent_keys_shared_key_getter(self): assert box_AB.shared_key() == box_BA.shared_key() assert box_BprimeA.shared_key() == box_BA.shared_key() + + +class TestPublicKx: + def test_equal_keys_have_equal_hashes(self): + kx1 = PublicKx(b"\x00" * crypto_kx_PUBLIC_KEY_BYTES) + kx2 = PublicKx(b"\x00" * crypto_kx_PUBLIC_KEY_BYTES) + assert hash(kx1) == hash(kx2) + assert id(kx1) != id(kx2) + + def test_equal_keys_are_equal(self): + kx1 = PublicKx(b"\x00" * crypto_kx_PUBLIC_KEY_BYTES) + kx2 = PublicKx(b"\x00" * crypto_kx_PUBLIC_KEY_BYTES) + assert_equal(kx1, kx1) + assert_equal(kx1, kx2) + + @pytest.mark.parametrize( + "kx2", + [ + b"\x00" * crypto_kx_PUBLIC_KEY_BYTES, + PublicKx(b"\x01" * crypto_kx_PUBLIC_KEY_BYTES), + PublicKx(b"\x00" * (crypto_kx_PUBLIC_KEY_BYTES - 1) + b"\x01"), + ], + ) + def test_different_keys_are_not_equal_kx( + self, kx2: Union[bytes, PublicKx] + ): + kx1 = PublicKx(b"\x00" * crypto_kx_PUBLIC_KEY_BYTES) + assert_not_equal(kx1, kx2) + + +class TestPrivateKx: + def test_equal_keys_have_equal_hashes(self): + kx1 = PrivateKx(b"\x00" * crypto_kx_SECRET_KEY_BYTES) + kx2 = PrivateKx(b"\x00" * crypto_kx_SECRET_KEY_BYTES) + assert hash(kx1) == hash(kx2) + assert id(kx1) != id(kx2) + + def test_equal_keys_are_equal(self): + kx1 = PrivateKx(b"\x00" * crypto_kx_SECRET_KEY_BYTES) + kx2 = PrivateKx(b"\x00" * crypto_kx_SECRET_KEY_BYTES) + assert_equal(kx1, kx1) + assert_equal(kx1, kx2) + + def _gen_equivalent_raw_keys_couple_kx( + self, + ) -> Tuple[PrivateKx, PrivateKx]: + rwk1 = bytearray(random(crypto_box_SECRETKEYBYTES)) + rwk2 = bytearray(rwk1) + # mask rwk1 bits + rwk1[0] &= 248 + rwk1[31] &= 127 + rwk1[31] |= 64 + # set rwk2 bits + rwk2[0] |= 7 + rwk2[31] |= 128 + rwk2[31] &= 191 + skx1 = PrivateKx(bytes(rwk1)) + skx2 = PrivateKx(bytes(rwk2)) + return skx1, skx2 + + def test_equivalent_keys_have_equal_hashes(self): + kx1, kx2 = self._gen_equivalent_raw_keys_couple_kx() + assert bytes(kx1) != bytes(kx2) + assert hash(kx1) == hash(kx2) + + def test_equivalent_keys_compare_as_equal(self): + kx1, kx2 = self._gen_equivalent_raw_keys_couple_kx() + assert bytes(kx1) != bytes(kx2) + assert kx1 == kx2 + + def test_sk_and_pk_hashes_are_different(self): + skx = PrivateKx(random(crypto_kx_SECRET_KEY_BYTES)) + assert hash(skx) != hash(skx.public_key) + + @pytest.mark.parametrize( + "kx2", + [ + b"\x00" * crypto_kx_SECRET_KEY_BYTES, + PrivateKx(b"\x01" * crypto_kx_SECRET_KEY_BYTES), + PrivateKx(b"\x00" * (crypto_kx_SECRET_KEY_BYTES - 1) + b"\x01"), + ], + ) + def test_different_keys_are_not_equal_kx( + self, kx2: Union[bytes, PrivateKx] + ): + kx1 = PrivateKx(b"\x00" * crypto_kx_SECRET_KEY_BYTES) + assert_not_equal(kx1, kx2) + + def test_shared_key_getter(self): + """ + RFC 7748 "Elliptic Curves for Security" gives a set of test + parameters for the Diffie-Hellman key exchange on Curve25519: + + 6.1. [Diffie-Hellman on] Curve25519 + [ . . . ] + Alice's private key, a: + 77076d0a7318a57d3c16c17251b26645df4c2f87ebc0992ab177fba51db92c2a + Alice's public key, X25519(a, 9): + 8520f0098930a754748b7ddcb43ef75a0dbf3a0d26381af4eba4a98eaa9b4e6a + Bob's private key, b: + 5dab087e624a8a4b79e17f8b83800ee66f3bb1292618b6fd1c2f8b27ff88e0eb + Bob's public key, X25519(b, 9): + de9edb7d7b7dc1b4d35b61c2ece435373f8343c85b78674dadfc7e146f882b4f + + Since libNaCl/libsodium shared key generation adds an HSalsa20 + key derivation pass on the raw shared Diffie-Hellman key, which + is not exposed by itself, we just check the shared key for equality. + """ + prv_A = ( + b"77076d0a7318a57d3c16c17251b26645" + b"df4c2f87ebc0992ab177fba51db92c2a" + ) + pub_A = ( + b"8520f0098930a754748b7ddcb43ef75a" + b"0dbf3a0d26381af4eba4a98eaa9b4e6a" + ) + prv_B = ( + b"5dab087e624a8a4b79e17f8b83800ee6" + b"6f3bb1292618b6fd1c2f8b27ff88e0eb" + ) + pub_B = ( + b"de9edb7d7b7dc1b4d35b61c2ece43537" + b"3f8343c85b78674dadfc7e146f882b4f" + ) + + alices_kx = PrivateKx(binascii.unhexlify(prv_A)) + bobs_kx = PrivateKx(binascii.unhexlify(prv_B)) + alicesP_kx = alices_kx.public_key + bobsP_kx = bobs_kx.public_key + + assert binascii.unhexlify(pub_A) == bytes(alicesP_kx) + assert binascii.unhexlify(pub_B) == bytes(bobsP_kx) + + aead_AB = AeadClient(alices_kx, bobsP_kx) + aead_BA = AeadServer(bobs_kx, alicesP_kx) + + assert aead_AB.tx_key() == aead_BA.rx_key() + assert aead_AB.rx_key() == aead_BA.tx_key() + + def test_equivalent_keys_shared_key_getter(self): + alices_kx = PrivateKx.generate() + alicesP_kx = alices_kx.public_key + bobs_kx, bobsprime_kx = self._gen_equivalent_raw_keys_couple_kx() + bobsP_kx, bobsprimeP_kx = bobs_kx.public_key, bobsprime_kx.public_key + + assert bobsP_kx == bobsprimeP_kx + + aead_AB = AeadClient(alices_kx, bobsP_kx) + + aead_BA = AeadServer(bobs_kx, alicesP_kx) + aead_BprimeA = AeadServer(bobsprime_kx, alicesP_kx) + + assert aead_AB.tx_key() == aead_BA.rx_key() + assert aead_AB.rx_key() == aead_BA.tx_key() + assert aead_BprimeA.tx_key() == aead_BA.tx_key() + assert aead_BprimeA.rx_key() == aead_BA.rx_key()