diff --git a/src/cryptography/hazmat/bindings/_rust/ocsp.pyi b/src/cryptography/hazmat/bindings/_rust/ocsp.pyi index 103e96c1f117..630eaa004bbc 100644 --- a/src/cryptography/hazmat/bindings/_rust/ocsp.pyi +++ b/src/cryptography/hazmat/bindings/_rust/ocsp.pyi @@ -103,6 +103,8 @@ class OCSPSingleResponse: def hash_algorithm(self) -> hashes.HashAlgorithm: ... @property def serial_number(self) -> int: ... + @property + def extensions(self) -> x509.Extensions: ... def load_der_ocsp_request(data: bytes) -> ocsp.OCSPRequest: ... def load_der_ocsp_response(data: bytes) -> ocsp.OCSPResponse: ... diff --git a/src/rust/src/pkcs7.rs b/src/rust/src/pkcs7.rs index 06814f777d50..a4c05621baad 100644 --- a/src/rust/src/pkcs7.rs +++ b/src/rust/src/pkcs7.rs @@ -768,6 +768,11 @@ where x509::certificate::Certificate { raw: raw_cert, cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_issuer: pyo3::sync::PyOnceLock::new(), + cached_subject: pyo3::sync::PyOnceLock::new(), + cached_public_key: pyo3::sync::PyOnceLock::new(), + cached_signature_algorithm_oid: pyo3::sync::PyOnceLock::new(), + cached_signature_hash_algorithm: pyo3::sync::PyOnceLock::new(), }, )?)?; diff --git a/src/rust/src/x509/certificate.rs b/src/rust/src/x509/certificate.rs index d0bb651f053e..724a9a2b9f20 100644 --- a/src/rust/src/x509/certificate.rs +++ b/src/rust/src/x509/certificate.rs @@ -41,6 +41,11 @@ self_cell::self_cell!( pub(crate) struct Certificate { pub(crate) raw: OwnedCertificate, pub(crate) cached_extensions: pyo3::sync::PyOnceLock>, + pub(crate) cached_issuer: pyo3::sync::PyOnceLock>, + pub(crate) cached_subject: pyo3::sync::PyOnceLock>, + pub(crate) cached_public_key: pyo3::sync::PyOnceLock>, + pub(crate) cached_signature_algorithm_oid: pyo3::sync::PyOnceLock>, + pub(crate) cached_signature_hash_algorithm: pyo3::sync::PyOnceLock>, } #[pyo3::pymethods] @@ -78,10 +83,17 @@ impl Certificate { &self, py: pyo3::Python<'p>, ) -> CryptographyResult> { - keys::load_der_public_key_bytes( - py, - self.raw.borrow_dependent().tbs_cert.spki.tlv().full_data(), - ) + Ok(self + .cached_public_key + .get_or_try_init(py, || { + keys::load_der_public_key_bytes( + py, + self.raw.borrow_dependent().tbs_cert.spki.tlv().full_data(), + ) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -138,14 +150,28 @@ impl Certificate { #[getter] fn issuer<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult> { - Ok(x509::parse_name(py, self.raw.borrow_dependent().issuer()) - .map_err(|e| e.add_location(asn1::ParseLocation::Field("issuer")))?) + Ok(self + .cached_issuer + .get_or_try_init(py, || { + x509::parse_name(py, self.raw.borrow_dependent().issuer()) + .map_err(|e| e.add_location(asn1::ParseLocation::Field("issuer"))) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] fn subject<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult> { - Ok(x509::parse_name(py, self.raw.borrow_dependent().subject()) - .map_err(|e| e.add_location(asn1::ParseLocation::Field("subject")))?) + Ok(self + .cached_subject + .get_or_try_init(py, || { + x509::parse_name(py, self.raw.borrow_dependent().subject()) + .map_err(|e| e.add_location(asn1::ParseLocation::Field("subject"))) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -277,7 +303,17 @@ impl Certificate { &self, py: pyo3::Python<'p>, ) -> Result, CryptographyError> { - sign::identify_signature_hash_algorithm(py, &self.raw.borrow_dependent().signature_alg) + Ok(self + .cached_signature_hash_algorithm + .get_or_try_init(py, || { + sign::identify_signature_hash_algorithm( + py, + &self.raw.borrow_dependent().signature_alg, + ) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -285,7 +321,14 @@ impl Certificate { &self, py: pyo3::Python<'p>, ) -> pyo3::PyResult> { - oid_to_py_oid(py, self.raw.borrow_dependent().signature_alg.oid()) + Ok(self + .cached_signature_algorithm_oid + .get_or_try_init(py, || { + oid_to_py_oid(py, self.raw.borrow_dependent().signature_alg.oid()) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -441,6 +484,11 @@ pub(crate) fn load_der_x509_certificate( Ok(Certificate { raw, cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_issuer: pyo3::sync::PyOnceLock::new(), + cached_subject: pyo3::sync::PyOnceLock::new(), + cached_public_key: pyo3::sync::PyOnceLock::new(), + cached_signature_algorithm_oid: pyo3::sync::PyOnceLock::new(), + cached_signature_hash_algorithm: pyo3::sync::PyOnceLock::new(), }) } diff --git a/src/rust/src/x509/crl.rs b/src/rust/src/x509/crl.rs index a67e0a891de4..668ddae897d3 100644 --- a/src/rust/src/x509/crl.rs +++ b/src/rust/src/x509/crl.rs @@ -2,6 +2,8 @@ // 2.0, and the BSD License. See the LICENSE file in the root of this repository // for complete details. +use std::collections::HashMap; + use cryptography_x509::certificate::SerialNumber; use cryptography_x509::common::{self, Asn1Read}; use cryptography_x509::crl::{ @@ -46,7 +48,11 @@ pub(crate) fn load_der_x509_crl( Ok(CertificateRevocationList { owned, revoked_certs: pyo3::sync::PyOnceLock::new(), + serial_number_map: pyo3::sync::PyOnceLock::new(), cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_issuer: pyo3::sync::PyOnceLock::new(), + cached_signature_algorithm_oid: pyo3::sync::PyOnceLock::new(), + cached_signature_hash_algorithm: pyo3::sync::PyOnceLock::new(), }) } @@ -84,7 +90,11 @@ pub(crate) struct CertificateRevocationList { owned: OwnedCertificateRevocationList, revoked_certs: pyo3::sync::PyOnceLock>, + serial_number_map: pyo3::sync::PyOnceLock, OwnedRevokedCertificate>>, cached_extensions: pyo3::sync::PyOnceLock>, + cached_issuer: pyo3::sync::PyOnceLock>, + cached_signature_algorithm_oid: pyo3::sync::PyOnceLock>, + cached_signature_hash_algorithm: pyo3::sync::PyOnceLock>, } impl CertificateRevocationList { @@ -199,7 +209,14 @@ impl CertificateRevocationList { &self, py: pyo3::Python<'p>, ) -> pyo3::PyResult> { - oid_to_py_oid(py, self.owned.borrow_dependent().signature_algorithm.oid()) + Ok(self + .cached_signature_algorithm_oid + .get_or_try_init(py, || { + oid_to_py_oid(py, self.owned.borrow_dependent().signature_algorithm.oid()) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -207,10 +224,17 @@ impl CertificateRevocationList { &self, py: pyo3::Python<'p>, ) -> Result, CryptographyError> { - sign::identify_signature_hash_algorithm( - py, - &self.owned.borrow_dependent().signature_algorithm, - ) + Ok(self + .cached_signature_hash_algorithm + .get_or_try_init(py, || { + sign::identify_signature_hash_algorithm( + py, + &self.owned.borrow_dependent().signature_algorithm, + ) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -250,14 +274,21 @@ impl CertificateRevocationList { #[getter] fn issuer<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult> { - Ok(x509::parse_name( - py, - self.owned - .borrow_dependent() - .tbs_cert_list - .issuer - .unwrap_read(), - )?) + Ok(self + .cached_issuer + .get_or_try_init(py, || { + x509::parse_name( + py, + self.owned + .borrow_dependent() + .tbs_cert_list + .issuer + .unwrap_read(), + ) + .map(|v| v.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -390,21 +421,43 @@ impl CertificateRevocationList { ) -> pyo3::PyResult> { let serial_bytes = py_uint_to_big_endian_bytes(py, serial)?; - // Use try_map_crl_to_revoked_cert to soundly extract the certificate - let owned = try_map_crl_to_revoked_cert(&self.owned, py, |crl| { - let certs = crl.tbs_cert_list.revoked_certificates.as_ref()?; - - // TODO: linear scan. Make a hash or bisect! - certs - .unwrap_read() - .clone() - .find(|cert| serial_bytes == cert.user_certificate.as_bytes()) + let map = self.serial_number_map.get_or_init(py, || { + let mut map = HashMap::new(); + let mut it_data = map_crl_to_iterator_data(&self.owned, py, |crl| { + crl.tbs_cert_list + .revoked_certificates + .as_ref() + .map(|v| v.unwrap_read().clone()) + }); + loop { + let revoked = try_map_arc_data_mut_crl_iterator(py, &mut it_data, |v| match v { + Some(v) => match v.next() { + Some(revoked) => Ok(revoked), + None => Err(()), + }, + None => Err(()), + }); + match revoked { + Ok(owned) => { + let key = owned + .borrow_dependent() + .user_certificate + .as_bytes() + .to_vec(); + map.insert(key, owned); + } + Err(()) => break, + } + } + map }); - Ok(owned.map(|o| RevokedCertificate { - owned: o, - cached_extensions: pyo3::sync::PyOnceLock::new(), - })) + Ok(map + .get(serial_bytes.as_ref()) + .map(|owned| RevokedCertificate { + owned: owned.clone_with_py(py), + cached_extensions: pyo3::sync::PyOnceLock::new(), + })) } fn is_signature_valid<'p>( @@ -473,33 +526,6 @@ where }) } -// Open-coded implementation of the API discussed in -// https://github.com/joshua-maros/ouroboros/issues/38 -fn try_map_crl_to_revoked_cert( - source: &OwnedCertificateRevocationList, - py: pyo3::Python<'_>, - f: F, -) -> Option -where - F: for<'a> FnOnce(&'a RawCertificateRevocationList<'a>) -> Option>, -{ - OwnedRevokedCertificate::try_new(source.borrow_owner().clone_ref(py), |_| { - // SAFETY: This is safe because cloning the PyBytes Py<> ensures the data is - // alive, but Rust doesn't understand the lifetime relationship it - // produces. - match f(unsafe { - std::mem::transmute::< - &RawCertificateRevocationList<'_>, - &RawCertificateRevocationList<'_>, - >(source.borrow_dependent()) - }) { - Some(cert) => Ok(cert), - None => Err(()), - } - }) - .ok() -} - // Open-coded implementation of the API discussed in // https://github.com/joshua-maros/ouroboros/issues/38 fn map_revoked_cert( diff --git a/src/rust/src/x509/csr.rs b/src/rust/src/x509/csr.rs index 8f7f17ff1a71..c6cf5fe04f91 100644 --- a/src/rust/src/x509/csr.rs +++ b/src/rust/src/x509/csr.rs @@ -29,6 +29,7 @@ self_cell::self_cell!( pub(crate) struct CertificateSigningRequest { raw: OwnedCsr, cached_extensions: pyo3::sync::PyOnceLock>, + cached_attributes: pyo3::sync::PyOnceLock>, } #[pyo3::pymethods] @@ -129,32 +130,40 @@ impl CertificateSigningRequest { #[getter] fn attributes<'p>(&self, py: pyo3::Python<'p>) -> pyo3::PyResult> { - let pyattrs = pyo3::types::PyList::empty(py); - for attribute in self - .raw - .borrow_dependent() - .csr_info - .attributes - .unwrap_read() - .clone() - { - check_attribute_length(attribute.values.unwrap_read().clone()).map_err(|_| { - pyo3::exceptions::PyValueError::new_err( - "Only single-valued attributes are supported", - ) - })?; - let oid = oid_to_py_oid(py, &attribute.type_id)?; - let val = attribute.values.unwrap_read().clone().next().unwrap(); - let serialized = pyo3::types::PyBytes::new(py, val.data()); - let tag = val.tag().as_u8().ok_or_else(|| { - CryptographyError::from(pyo3::exceptions::PyValueError::new_err( - "Long-form tags are not supported in CSR attribute values", - )) - })?; - let pyattr = types::ATTRIBUTE.get(py)?.call1((oid, serialized, tag))?; - pyattrs.append(pyattr)?; - } - types::ATTRIBUTES.get(py)?.call1((pyattrs,)) + Ok(self + .cached_attributes + .get_or_try_init(py, || -> pyo3::PyResult> { + let pyattrs = pyo3::types::PyList::empty(py); + for attribute in self + .raw + .borrow_dependent() + .csr_info + .attributes + .unwrap_read() + .clone() + { + check_attribute_length(attribute.values.unwrap_read().clone()).map_err( + |_| { + pyo3::exceptions::PyValueError::new_err( + "Only single-valued attributes are supported", + ) + }, + )?; + let oid = oid_to_py_oid(py, &attribute.type_id)?; + let val = attribute.values.unwrap_read().clone().next().unwrap(); + let serialized = pyo3::types::PyBytes::new(py, val.data()); + let tag = val.tag().as_u8().ok_or_else(|| { + CryptographyError::from(pyo3::exceptions::PyValueError::new_err( + "Long-form tags are not supported in CSR attribute values", + )) + })?; + let pyattr = types::ATTRIBUTE.get(py)?.call1((oid, serialized, tag))?; + pyattrs.append(pyattr)?; + } + Ok(types::ATTRIBUTES.get(py)?.call1((pyattrs,))?.unbind()) + })? + .bind(py) + .clone()) } #[getter] @@ -236,6 +245,7 @@ pub(crate) fn load_der_x509_csr( Ok(CertificateSigningRequest { raw, cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_attributes: pyo3::sync::PyOnceLock::new(), }) } diff --git a/src/rust/src/x509/ocsp_req.rs b/src/rust/src/x509/ocsp_req.rs index c972c0cc98de..d4a410749ef1 100644 --- a/src/rust/src/x509/ocsp_req.rs +++ b/src/rust/src/x509/ocsp_req.rs @@ -44,6 +44,10 @@ pub(crate) fn load_der_ocsp_request( Ok(OCSPRequest { raw, cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_issuer_name_hash: pyo3::sync::PyOnceLock::new(), + cached_issuer_key_hash: pyo3::sync::PyOnceLock::new(), + cached_hash_algorithm: pyo3::sync::PyOnceLock::new(), + cached_serial_number: pyo3::sync::PyOnceLock::new(), }) } @@ -52,6 +56,10 @@ pub(crate) struct OCSPRequest { raw: OwnedOCSPRequest, cached_extensions: pyo3::sync::PyOnceLock>, + cached_issuer_name_hash: pyo3::sync::PyOnceLock>, + cached_issuer_key_hash: pyo3::sync::PyOnceLock>, + cached_hash_algorithm: pyo3::sync::PyOnceLock>, + cached_serial_number: pyo3::sync::PyOnceLock>, } impl OCSPRequest { @@ -71,13 +79,39 @@ impl OCSPRequest { #[pyo3::pymethods] impl OCSPRequest { #[getter] - fn issuer_name_hash(&self) -> &[u8] { - self.cert_id().issuer_name_hash + fn issuer_name_hash<'p>( + &self, + py: pyo3::Python<'p>, + ) -> pyo3::PyResult> { + Ok(self + .cached_issuer_name_hash + .get_or_try_init(py, || -> pyo3::PyResult> { + Ok( + pyo3::types::PyBytes::new(py, self.cert_id().issuer_name_hash) + .into_any() + .unbind(), + ) + })? + .bind(py) + .clone()) } #[getter] - fn issuer_key_hash(&self) -> &[u8] { - self.cert_id().issuer_key_hash + fn issuer_key_hash<'p>( + &self, + py: pyo3::Python<'p>, + ) -> pyo3::PyResult> { + Ok(self + .cached_issuer_key_hash + .get_or_try_init(py, || -> pyo3::PyResult> { + Ok( + pyo3::types::PyBytes::new(py, self.cert_id().issuer_key_hash) + .into_any() + .unbind(), + ) + })? + .bind(py) + .clone()) } #[getter] @@ -85,17 +119,26 @@ impl OCSPRequest { &self, py: pyo3::Python<'p>, ) -> Result, CryptographyError> { - let cert_id = self.cert_id(); - - match ocsp::ALGORITHM_PARAMETERS_TO_HASH.get(&cert_id.hash_algorithm.params) { - Some(alg_name) => Ok(types::HASHES_MODULE.get(py)?.getattr(*alg_name)?.call0()?), - None => Err(CryptographyError::from( - exceptions::UnsupportedAlgorithm::new_err(format!( - "Signature algorithm OID: {} not recognized", - cert_id.hash_algorithm.oid() - )), - )), - } + Ok(self + .cached_hash_algorithm + .get_or_try_init(py, || { + let cert_id = self.cert_id(); + match ocsp::ALGORITHM_PARAMETERS_TO_HASH.get(&cert_id.hash_algorithm.params) { + Some(alg_name) => Ok(types::HASHES_MODULE + .get(py)? + .getattr(*alg_name)? + .call0()? + .unbind()), + None => Err(CryptographyError::from( + exceptions::UnsupportedAlgorithm::new_err(format!( + "Signature algorithm OID: {} not recognized", + cert_id.hash_algorithm.oid() + )), + )), + } + })? + .bind(py) + .clone()) } #[getter] @@ -103,8 +146,19 @@ impl OCSPRequest { &self, py: pyo3::Python<'p>, ) -> Result, CryptographyError> { - let bytes = self.cert_id().serial_number.as_bytes(); - Ok(big_byte_slice_to_py_int(py, bytes)?) + Ok(self + .cached_serial_number + .get_or_try_init( + py, + || -> Result, CryptographyError> { + Ok( + big_byte_slice_to_py_int(py, self.cert_id().serial_number.as_bytes())? + .unbind(), + ) + }, + )? + .bind(py) + .clone()) } #[getter] diff --git a/src/rust/src/x509/ocsp_resp.rs b/src/rust/src/x509/ocsp_resp.rs index cab6320fc922..5bf06ea65495 100644 --- a/src/rust/src/x509/ocsp_resp.rs +++ b/src/rust/src/x509/ocsp_resp.rs @@ -47,6 +47,7 @@ pub(crate) fn load_der_ocsp_response( raw: Arc::new(raw), cached_extensions: pyo3::sync::PyOnceLock::new(), cached_single_extensions: pyo3::sync::PyOnceLock::new(), + cached_certs: pyo3::sync::PyOnceLock::new(), }) } @@ -64,6 +65,7 @@ pub(crate) struct OCSPResponse { cached_extensions: pyo3::sync::PyOnceLock>, cached_single_extensions: pyo3::sync::PyOnceLock>, + cached_certs: pyo3::sync::PyOnceLock>, } impl OCSPResponse { @@ -239,36 +241,41 @@ impl OCSPResponse { &self, py: pyo3::Python<'p>, ) -> CryptographyResult> { - let resp = self.requires_successful_response()?; - let py_certs = pyo3::types::PyList::empty(py); - let certs = match &resp.certs { - Some(certs) => certs.unwrap_read(), - None => return Ok(py_certs), - }; - for i in 0..certs.len() { - // TODO: O(n^2), don't have too many certificates! - let raw_cert = map_arc_data_ocsp_response(py, &self.raw, |_data, resp| { - match &resp.response_bytes.as_ref().unwrap().response { - Response::Basic(b) => b, + self.requires_successful_response()?; + Ok(self + .cached_certs + .get_or_try_init(py, || -> CryptographyResult> { + let resp = self.requires_successful_response()?; + let py_certs = pyo3::types::PyList::empty(py); + let certs = match &resp.certs { + Some(certs) => certs.unwrap_read().clone(), + None => return Ok(py_certs.into_any().unbind()), + }; + for cert in certs { + let cert_der = asn1::write_single(&cert)?; + let raw_cert = certificate::OwnedCertificate::try_new( + pyo3::types::PyBytes::new(py, &cert_der).unbind(), + |data| asn1::parse_single(data.as_bytes(py)), + )?; + py_certs.append(pyo3::Bound::new( + py, + x509::certificate::Certificate { + raw: raw_cert, + cached_extensions: pyo3::sync::PyOnceLock::new(), + cached_issuer: pyo3::sync::PyOnceLock::new(), + cached_subject: pyo3::sync::PyOnceLock::new(), + cached_public_key: pyo3::sync::PyOnceLock::new(), + cached_signature_algorithm_oid: pyo3::sync::PyOnceLock::new(), + cached_signature_hash_algorithm: pyo3::sync::PyOnceLock::new(), + }, + )?)?; } - .get() - .certs - .as_ref() - .unwrap() - .unwrap_read() - .clone() - .nth(i) - .unwrap() - }); - py_certs.append(pyo3::Bound::new( - py, - x509::certificate::Certificate { - raw: raw_cert, - cached_extensions: pyo3::sync::PyOnceLock::new(), - }, - )?)?; - } - Ok(py_certs) + Ok(py_certs.into_any().unbind()) + })? + .bind(py) + .cast::() + .unwrap() + .clone()) } #[getter] @@ -482,30 +489,6 @@ impl OCSPResponse { } } -// Open-coded implementation of the API discussed in -// https://github.com/joshua-maros/ouroboros/issues/38 -fn map_arc_data_ocsp_response( - py: pyo3::Python<'_>, - it: &OwnedOCSPResponse, - f: impl for<'this> FnOnce( - &'this [u8], - &ocsp_resp::OCSPResponse<'this>, - ) -> cryptography_x509::certificate::Certificate<'this>, -) -> certificate::OwnedCertificate { - certificate::OwnedCertificate::new(it.borrow_owner().clone_ref(py), |inner_it| { - it.with_dependent(|_, value| { - // SAFETY: This is safe because `Arc::clone` ensures the data is - // alive, but Rust doesn't understand the lifetime relationship it - // produces. Open-coded implementation of the API discussed in - // https://github.com/joshua-maros/ouroboros/issues/38 - f(inner_it.as_bytes(py), unsafe { - std::mem::transmute::<&ocsp_resp::OCSPResponse<'_>, &ocsp_resp::OCSPResponse<'_>>( - value, - ) - }) - }) - }) -} fn try_map_arc_data_mut_ocsp_response_iterator( it: &mut OwnedOCSPResponseIteratorData, f: impl for<'this> FnOnce( @@ -905,7 +888,10 @@ impl OCSPResponseIterator { } }) .ok()?; - Some(OCSPSingleResponse { raw: single_resp }) + Some(OCSPSingleResponse { + raw: single_resp, + cached_extensions: pyo3::sync::PyOnceLock::new(), + }) } } @@ -920,6 +906,7 @@ self_cell::self_cell!( #[pyo3::pyclass(frozen, module = "cryptography.hazmat.bindings._rust.ocsp")] pub(crate) struct OCSPSingleResponse { raw: OwnedSingleResponse, + cached_extensions: pyo3::sync::PyOnceLock>, } impl OCSPSingleResponse { @@ -1039,4 +1026,25 @@ impl OCSPSingleResponse { let single_resp = self.single_response(); singleresp_py_next_update_utc(single_resp, py) } + + #[getter] + fn extensions(&self, py: pyo3::Python<'_>) -> pyo3::PyResult> { + x509::parse_and_cache_extensions( + py, + &self.cached_extensions, + &self.single_response().raw_single_extensions, + |ext| match &ext.extn_id { + &oid::SIGNED_CERTIFICATE_TIMESTAMPS_OID => { + let contents = ext.value::<&[u8]>()?; + let scts = sct::parse_scts(py, contents, sct::LogEntryType::Certificate)?; + Ok(Some( + types::SIGNED_CERTIFICATE_TIMESTAMPS + .get(py)? + .call1((scts,))?, + )) + } + _ => crl::parse_crl_entry_ext(py, ext), + }, + ) + } } diff --git a/tests/bench/test_x509.py b/tests/bench/test_x509.py index abfbbf92a199..9ed4ccf72948 100644 --- a/tests/bench/test_x509.py +++ b/tests/bench/test_x509.py @@ -9,6 +9,7 @@ import certifi from cryptography import x509 +from cryptography.x509 import ocsp from ..utils import load_vectors_from_file @@ -46,6 +47,128 @@ def test_load_pem_certificate(benchmark): benchmark(x509.load_pem_x509_certificate, cert_bytes) +# --------------------------------------------------------------------------- +# Repeated property access — these measure the cost of the cached fast path. +# Each benchmark constructs the object once, then calls the getter repeatedly. +# --------------------------------------------------------------------------- + + +def test_certificate_subject(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.subject) + + +def test_certificate_issuer(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.issuer) + + +def test_certificate_public_key(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.public_key()) + + +def test_certificate_signature_hash_algorithm(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.signature_hash_algorithm) + + +def test_certificate_signature_algorithm_oid(benchmark): + cert_bytes = load_vectors_from_file( + os.path.join("x509", "cryptography.io.pem"), + loader=lambda f: f.read(), + mode="rb", + ) + cert = x509.load_pem_x509_certificate(cert_bytes) + benchmark(lambda: cert.signature_algorithm_oid) + + +def test_crl_issuer(benchmark): + crl_bytes = load_vectors_from_file( + os.path.join("x509", "PKITS_data", "crls", "indirectCRLCA5CRL.crl"), + loader=lambda f: f.read(), + mode="rb", + ) + crl = x509.load_der_x509_crl(crl_bytes) + benchmark(lambda: crl.issuer) + + +def test_crl_serial_number_lookup_hit(benchmark): + """Repeated lookup for a serial number present in the CRL.""" + crl_bytes = load_vectors_from_file( + os.path.join("x509", "PKITS_data", "crls", "indirectCRLCA5CRL.crl"), + loader=lambda f: f.read(), + mode="rb", + ) + crl = x509.load_der_x509_crl(crl_bytes) + # Serial 1 is always present in this CRL. + benchmark(lambda: crl.get_revoked_certificate_by_serial_number(1)) + + +def test_crl_serial_number_lookup_miss(benchmark): + """Repeated lookup for a serial number absent from the CRL.""" + crl_bytes = load_vectors_from_file( + os.path.join("x509", "PKITS_data", "crls", "indirectCRLCA5CRL.crl"), + loader=lambda f: f.read(), + mode="rb", + ) + crl = x509.load_der_x509_crl(crl_bytes) + benchmark(lambda: crl.get_revoked_certificate_by_serial_number(99999)) + + +def test_ocsp_request_properties(benchmark): + req_bytes = load_vectors_from_file( + os.path.join("x509", "ocsp", "req-sha1.der"), + loader=lambda f: f.read(), + mode="rb", + ) + req = ocsp.load_der_ocsp_request(req_bytes) + + def bench(): + req.issuer_name_hash + req.issuer_key_hash + req.hash_algorithm + req.serial_number + + benchmark(bench) + + +def test_ocsp_response_properties(benchmark): + resp_bytes = load_vectors_from_file( + os.path.join("x509", "ocsp", "resp-sha256.der"), + loader=lambda f: f.read(), + mode="rb", + ) + resp = ocsp.load_der_ocsp_response(resp_bytes) + + def bench(): + resp.issuer_key_hash + resp.serial_number + resp.signature_hash_algorithm + + benchmark(bench) + + def test_verify_docs_python_org(benchmark, pytestconfig): limbo_root = pytestconfig.getoption("--x509-limbo-root", skip=True) with open(os.path.join(limbo_root, "limbo.json"), "rb") as f: diff --git a/tests/x509/test_ocsp.py b/tests/x509/test_ocsp.py index 9de9bfd81bd8..bc5ebe8a2e37 100644 --- a/tests/x509/test_ocsp.py +++ b/tests/x509/test_ocsp.py @@ -1609,6 +1609,59 @@ def test_single_extensions(self, backend): assert ext.oid == x509.CRLReason.oid assert ext.value == x509.CRLReason(x509.ReasonFlags.unspecified) + def test_single_response_extensions_empty(self): + # A typical single-response has no per-response extensions. + resp = _load_data( + os.path.join("x509", "ocsp", "resp-sha256.der"), + ocsp.load_der_ocsp_response, + ) + single = next(resp.responses) + assert isinstance(single, ocsp.OCSPSingleResponse) + assert len(single.extensions) == 0 + # Idempotent: second access returns the same cached object. + assert single.extensions is single.extensions + + def test_single_response_extensions_sct(self, backend): + # resp-sct-extension.der carries an SCT list in the per-response + # extensions (raw_single_extensions). Verify the new + # OCSPSingleResponse.extensions getter exposes the same data that + # OCSPResponse.single_extensions already reported for this file. + resp = _load_data( + os.path.join("x509", "ocsp", "resp-sct-extension.der"), + ocsp.load_der_ocsp_response, + ) + single = next(resp.responses) + assert len(single.extensions) == 1 + ext = single.extensions[0] + assert ext.oid == x509.ObjectIdentifier("1.3.6.1.4.1.11129.2.4.5") + assert len(ext.value) == 4 + + def test_single_response_extensions_reason(self, backend): + # resp-single-extension-reason.der carries a CRLReason in the + # per-response extensions. + resp = _load_data( + os.path.join("x509", "ocsp", "resp-single-extension-reason.der"), + ocsp.load_der_ocsp_response, + ) + single = next(resp.responses) + assert len(single.extensions) == 1 + ext = single.extensions[0] + assert ext.oid == x509.CRLReason.oid + assert ext.value == x509.CRLReason(x509.ReasonFlags.unspecified) + + def test_certificates_cached(self): + # OCSPResponse.certificates is cached: repeated access must return + # the identical list object, not rebuild it each time. + resp = _load_data( + os.path.join("x509", "ocsp", "resp-delegate-unknown-cert.der"), + ocsp.load_der_ocsp_response, + ) + first = resp.certificates + second = resp.certificates + assert first is second + assert len(first) == 1 + assert isinstance(first[0], x509.Certificate) + def test_unknown_response_type(self): with pytest.raises(ValueError): _load_data( diff --git a/tests/x509/test_x509.py b/tests/x509/test_x509.py index 3be4a3162171..5d69e5358eae 100644 --- a/tests/x509/test_x509.py +++ b/tests/x509/test_x509.py @@ -1028,6 +1028,10 @@ def test_country_jurisdiction_country_too_long(self, backend): os.path.join("x509", "custom", "bad_country.pem"), x509.load_pem_x509_certificate, ) + # Both warnings are emitted during the first parse_name call (when + # cert.subject is first accessed); subsequent accesses return the + # cached Name object without re-parsing, so both checks must live + # inside a single pytest.warns block. with pytest.warns(UserWarning): assert ( cert.subject.get_attributes_for_oid(x509.NameOID.COUNTRY_NAME)[ @@ -1035,8 +1039,6 @@ def test_country_jurisdiction_country_too_long(self, backend): ].value == "too long" ) - - with pytest.warns(UserWarning): assert ( cert.subject.get_attributes_for_oid( x509.NameOID.JURISDICTION_COUNTRY_NAME