Skip to content
29 changes: 25 additions & 4 deletions core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
APP_NAME = "Coldwire"
APP_VERSION = "0.1"

# hard-coded filepaths
ACCOUNT_FILE_PATH = "account.coldwire"

# network defaults (seconds)
LONGPOLL_MIN = 5
LONGPOLL_MAX = 30
Expand All @@ -17,27 +20,45 @@
ML_KEM_1024_NAME = "Kyber1024"
ML_KEM_1024_SK_LEN = 3168
ML_KEM_1024_PK_LEN = 1568
ML_KEM_1024_CT_LEN = 1568


ML_DSA_87_NAME = "Dilithium5"
ML_DSA_87_SK_LEN = 4864
ML_DSA_87_PK_LEN = 2592
ML_DSA_87_SIGN_LEN = 4595

ML_BUFFER_LIMITS = {

CLASSIC_MCELIECE_8_F_NAME = "Classic-McEliece-8192128f"
CLASSIC_MCELIECE_8_F_SK_LEN = 14120
CLASSIC_MCELIECE_8_F_PK_LEN = 1357824
CLASSIC_MCELIECE_8_F_CT_LEN = 208


CLASSIC_MCELIECE_8_F_ROTATE_AT = 3 # Default OTP batches needed to be sent for a key rotation to occur



ALGOS_BUFFER_LIMITS = {
ML_KEM_1024_NAME: {
"SK_LEN": ML_KEM_1024_SK_LEN,
"PK_LEN": ML_KEM_1024_PK_LEN
"PK_LEN": ML_KEM_1024_PK_LEN,
"CT_LEN": ML_KEM_1024_CT_LEN
},
ML_DSA_87_NAME: {
"SK_LEN" : ML_DSA_87_SK_LEN,
"PK_LEN" : ML_DSA_87_PK_LEN,
"SIGN_LEN": ML_DSA_87_SIGN_LEN
}
},
CLASSIC_MCELIECE_8_F_NAME: {
"SK_LEN": CLASSIC_MCELIECE_8_F_SK_LEN,
"PK_LEN": CLASSIC_MCELIECE_8_F_PK_LEN,
"CT_LEN": CLASSIC_MCELIECE_8_F_CT_LEN
},
}

# hash parameters
ARGON2_MEMORY = 256 * 1024 # KB
ARGON2_MEMORY = 256 * 1024 # MB
ARGON2_ITERS = 3
ARGON2_OUTPUT_LEN = 32 # bytes
ARGON2_SALT_LEN = 32 # bytes
Expand Down
35 changes: 19 additions & 16 deletions core/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
ML_DSA_87_SK_LEN,
ML_DSA_87_PK_LEN,
ML_DSA_87_SIGN_LEN,
ML_BUFFER_LIMITS
ALGOS_BUFFER_LIMITS
)


Expand All @@ -44,7 +44,7 @@ def create_signature(algorithm: str, message: bytes, private_key: bytes) -> byte
Returns:
Signature bytes of fixed size defined by the algorithm.
"""
with oqs.Signature(algorithm, secret_key = private_key[:ML_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as signer:
with oqs.Signature(algorithm, secret_key = private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as signer:
return signer.sign(message)

def verify_signature(algorithm: str, message: bytes, signature: bytes, public_key: bytes) -> bool:
Expand All @@ -61,7 +61,7 @@ def verify_signature(algorithm: str, message: bytes, signature: bytes, public_ke
True if valid, False if invalid.
"""
with oqs.Signature(algorithm) as verifier:
return verifier.verify(message, signature[:ML_BUFFER_LIMITS[algorithm]["SIGN_LEN"]], public_key[:ML_BUFFER_LIMITS[algorithm]["PK_LEN"]])
return verifier.verify(message, signature[:ALGOS_BUFFER_LIMITS[algorithm]["SIGN_LEN"]], public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])

def generate_sign_keys(algorithm: str = ML_DSA_87_NAME):
"""
Expand Down Expand Up @@ -137,9 +137,9 @@ def one_time_pad(plaintext: bytes, key: bytes) -> bytes:
otpd_plaintext += bytes([plain_byte ^ key_byte])
return otpd_plaintext

def generate_kem_keys(algorithm: str = ML_KEM_1024_NAME):
def generate_kem_keys(algorithm: str):
"""
Generates ML-KEM-1024 keypair (Kyber).
Generates a KEM keypair.

Args:
algorithm: PQ KEM algorithm (default Kyber1024).
Expand All @@ -152,39 +152,42 @@ def generate_kem_keys(algorithm: str = ML_KEM_1024_NAME):
private_key = kem.export_secret_key()
return private_key, public_key

def decrypt_kyber_shared_secrets(ciphertext_blob: bytes, private_key: bytes, otp_pad_size: int = OTP_PAD_SIZE):
def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
"""
Decrypts concatenated Kyber ciphertexts to derive shared one-time pad.
Decrypts concatenated KEM ciphertexts to derive shared one-time pad.

Args:
ciphertext_blob: Concatenated Kyber ciphertexts.
private_key: ML-KEM-1024 private key.
private_key: KEM private key.
algorithm: KEM algorithm NIST name.
otp_pad_size: Desired OTP pad size in bytes.

Returns:
Shared secret OTP pad bytes.
"""
cipher_size = 1568 # Kyber1024 ciphertext size
cipher_size = ALGOS_BUFFER_LIMITS[algorithm]["CT_LEN"] # KEM ciphertext size
shared_secrets = b''
cursor = 0

with oqs.KeyEncapsulation(ML_KEM_1024_NAME, secret_key=private_key[:ML_BUFFER_LIMITS[ML_KEM_1024_NAME]["SK_LEN"]]) as kem:
with oqs.KeyEncapsulation(algorithm, secret_key=private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as kem:
while len(shared_secrets) < otp_pad_size:
ciphertext = ciphertext_blob[cursor:cursor + cipher_size]
if len(ciphertext) != cipher_size:
raise ValueError("Ciphertext blob is malformed or incomplete")
raise ValueError(f"Ciphertext of {algorithm} blob is malformed or incomplete ({len(ciphertext)})")

shared_secret = kem.decap_secret(ciphertext)
shared_secrets += shared_secret
cursor += cipher_size

return shared_secrets[:otp_pad_size]

def generate_kyber_shared_secrets(public_key: bytes, otp_pad_size: int = OTP_PAD_SIZE):
def generate_shared_secrets(public_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
"""
Generates a one-time pad via Kyber encapsulation.
Generates a one-time pad via `algorithm` encapsulation.

Args:
public_key: Recipient's ML-KEM-1024 public key.
public_key: Recipient's public key.
algorithm: KEM algorithm NIST name.
otp_pad_size: Desired OTP pad size in bytes.

Returns:
Expand All @@ -193,9 +196,9 @@ def generate_kyber_shared_secrets(public_key: bytes, otp_pad_size: int = OTP_PAD
shared_secrets = b''
ciphertexts_blob = b''

with oqs.KeyEncapsulation(ML_KEM_1024_NAME) as kem:
with oqs.KeyEncapsulation(algorithm) as kem:
while len(shared_secrets) < otp_pad_size:
ciphertext, shared_secret = kem.encap_secret(public_key[:ML_BUFFER_LIMITS[ML_KEM_1024_NAME]["PK_LEN"]])
ciphertext, shared_secret = kem.encap_secret(public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])
ciphertexts_blob += ciphertext
shared_secrets += shared_secret

Expand Down
9 changes: 3 additions & 6 deletions core/trad_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.argon2 import Argon2id
from core.constants import (
OTP_PAD_SIZE,
AES_GCM_NONCE_LEN,
ARGON2_ITERS,
ARGON2_MEMORY,
Expand All @@ -22,6 +23,7 @@
import secrets



def sha3_512(data: bytes) -> bytes:
"""
Compute a SHA3-512 hash of the given data.
Expand All @@ -37,12 +39,7 @@ def sha3_512(data: bytes) -> bytes:
return h.digest()


def derive_key_argon2id(
password: bytes,
salt: bytes = None,
salt_length: int = ARGON2_SALT_LEN,
output_length: int = ARGON2_OUTPUT_LEN
) -> tuple[bytes, bytes]:
def derive_key_argon2id(password: bytes, salt: bytes = None, salt_length: int = ARGON2_SALT_LEN, output_length: int = ARGON2_OUTPUT_LEN) -> tuple[bytes, bytes]:
"""
Derive a symmetric key from a password using Argon2id.

Expand Down
15 changes: 12 additions & 3 deletions logic/background_worker.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from core.requests import http_request
from logic.smp import smp_unanswered_questions, smp_data_handler
from logic.pfs import pfs_data_handler
from logic.pfs import pfs_data_handler, update_ephemeral_keys
from logic.message import messages_data_handler
from core.constants import (
LONGPOLL_MIN,
Expand Down Expand Up @@ -29,9 +29,11 @@ def background_worker(user_data, user_data_lock, ui_queue, stop_flag):
logger.debug("Data longpoll request has timed out, retrying...")
continue

logger.debug("SMP messages: %s", json.dumps(response, indent = 2))
# logger.debug("Data received: %s", json.dumps(response, indent = 2)[:2000])

for message in response["messages"]:
logger.debug("Received data message: %s", json.dumps(message, indent = 2)[:5000])

# Sanity check universal message fields
if (not "sender" in message) or (not message["sender"].isdigit()) or (len(message["sender"]) != 16):
logger.error("Impossible condition, either you have discovered a bug in Coldwire, or the server is attempting to denial-of-service you. Skipping data message with no (or malformed) sender...")
Expand All @@ -52,9 +54,16 @@ def background_worker(user_data, user_data_lock, ui_queue, stop_flag):

elif message["data_type"] == "message":
messages_data_handler(user_data, user_data_lock, user_data_copied, ui_queue, message)

else:
logger.error(
"Impossible condition, either you have discovered a bug in Coldwire, or the server is attempting to denial-of-service you. Skipping data message with unknown data type (%s)...",
message["data_type"]
)

# *Sigh* I had to put this here because if we rotate before finishing reading all of the messages
# we would literally overwrite our own key.
# TODO: We need to keep the last used key and use it when decapsulation with new key gives invalid output
# because it might actually take some time for our keys to be uploaded to server + other servers and to the contact.
#
update_ephemeral_keys(user_data, user_data_lock)

24 changes: 19 additions & 5 deletions logic/contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import json
import math

from core.constants import (
ML_KEM_1024_NAME,
CLASSIC_MCELIECE_8_F_NAME,
CLASSIC_MCELIECE_8_F_ROTATE_AT
)

def generate_nickname_id(length: int = 4) -> str:
# Calculate nickname ID: digits get >= letters
Expand Down Expand Up @@ -56,14 +61,23 @@ def save_contact(user_data: dict, user_data_lock, contact_id: str) -> None:
"smp_step": None,
},
"ephemeral_keys": {
"contact_public_key": None,
"contact_public_keys": {
CLASSIC_MCELIECE_8_F_NAME: None,
ML_KEM_1024_NAME: None
},
"our_keys": {
"public_key": None,
"private_key": None,
CLASSIC_MCELIECE_8_F_NAME: {
"public_key": None,
"private_key": None,
"rotation_counter": 0,
"rotate_at": CLASSIC_MCELIECE_8_F_ROTATE_AT,
},
ML_KEM_1024_NAME: {
"public_key": None,
"private_key": None,
},
"rotation_counter": None,
"rotate_at": None,

}
},
"our_pads": {
"hash_chain": None,
Expand Down
Loading