Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/cryptography/hazmat/asn1/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# for complete details.

from cryptography.hazmat.asn1.asn1 import (
TLV,
BitString,
Default,
Explicit,
Expand All @@ -12,14 +13,15 @@
Null,
PrintableString,
Size,
UtcTime,
UTCTime,
Variant,
decode_der,
encode_der,
sequence,
)

__all__ = [
"TLV",
"BitString",
"Default",
"Explicit",
Expand All @@ -29,7 +31,7 @@
"Null",
"PrintableString",
"Size",
"UtcTime",
"UTCTime",
"Variant",
"decode_der",
"encode_der",
Expand Down
27 changes: 23 additions & 4 deletions src/cryptography/hazmat/asn1/asn1.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,24 @@ def _normalize_field_type(
)
):
raise TypeError(
f"field {field_name} has a SIZE annotation, but SIZE annotations "
f"are only supported for fields of types: [SEQUENCE OF, "
"BIT STRING, OCTET STRING, UTF8String, PrintableString, IA5String]"
f"field '{field_name}' has a SIZE annotation, but SIZE "
"annotations are only supported for fields of types: "
"[SEQUENCE OF, BIT STRING, OCTET STRING, UTF8String, "
"PrintableString, IA5String]"
)

if field_type is TLV:
if isinstance(annotation.encoding, Implicit):
raise TypeError(
f"field '{field_name}' has an IMPLICIT annotation, but "
"IMPLICIT annotations are not supported for TLV types."
)
elif annotation.default is not None:
raise TypeError(
f"field '{field_name}' has a DEFAULT annotation, but "
"DEFAULT annotations are not supported for TLV types."
)

if hasattr(field_type, "__asn1_root__"):
annotated_root = field_type.__asn1_root__
if not isinstance(annotated_root, declarative_asn1.AnnotatedType):
Expand All @@ -161,6 +174,11 @@ def _normalize_field_type(
optional_type = (
union_args[0] if union_args[1] is type(None) else union_args[1]
)
if optional_type is TLV:
raise TypeError(
"optional TLV types (`TLV | None`) are not "
"currently supported"
)
annotated_type = _normalize_field_type(optional_type, field_name)

if not annotated_type.annotation.is_empty():
Expand Down Expand Up @@ -362,7 +380,8 @@ class Default(typing.Generic[U]):

PrintableString = declarative_asn1.PrintableString
IA5String = declarative_asn1.IA5String
UtcTime = declarative_asn1.UtcTime
UTCTime = declarative_asn1.UtcTime
GeneralizedTime = declarative_asn1.GeneralizedTime
BitString = declarative_asn1.BitString
TLV = declarative_asn1.Tlv
Null = declarative_asn1.Null
7 changes: 7 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/declarative_asn1.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ class BitString:
def as_bytes(self) -> bytes: ...
def padding_bits(self) -> int: ...

class Tlv:
@property
def tag(self) -> int: ...
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think for a second about if int makes sense here, or if we should have a Tag type.

What value does an integer tag have for a >1-byte tag with (say) a constructed flag set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing this is blocked on alex/rust-asn1#596

@property
def data(self) -> memoryview: ...
def parse(self, cls: type): ...

class Null:
def __new__(cls) -> Null: ...
def __repr__(self) -> str: ...
Expand Down
52 changes: 47 additions & 5 deletions src/rust/src/declarative_asn1/decode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use pyo3::types::{PyAnyMethods, PyListMethods};
use crate::asn1::big_byte_slice_to_py_int;
use crate::declarative_asn1::types::{
check_size_constraint, is_tag_valid_for_type, is_tag_valid_for_variant, AnnotatedType,
Annotation, BitString, Encoding, GeneralizedTime, IA5String, Null, PrintableString, Type,
Annotation, BitString, Encoding, GeneralizedTime, IA5String, Null, PrintableString, Tlv, Type,
UtcTime, Variant,
};
use crate::error::CryptographyError;
Expand Down Expand Up @@ -128,7 +128,7 @@ fn decode_generalized_time<'a>(
Some(_) => {
return Err(CryptographyError::Py(
pyo3::exceptions::PyValueError::new_err(
"decoded GeneralizedTime data has higher precision than supported".to_string(),
"decoded GeneralizedTime data has higher precision than supported",
),
))
}
Expand Down Expand Up @@ -161,6 +161,33 @@ fn decode_bitstring<'a>(
)?)
}

fn decode_tlv<'a>(
py: pyo3::Python<'a>,
parser: &mut Parser<'a>,
encoding: &Option<pyo3::Py<Encoding>>,
) -> ParseResult<pyo3::Bound<'a, Tlv>> {
let tlv = match encoding {
Some(e) => match e.get() {
Encoding::Implicit(_) => Err(CryptographyError::Py(
// We don't support IMPLICIT TLV
pyo3::exceptions::PyValueError::new_err(
"invalid type definition: TLV/ANY fields cannot be implicitly encoded",
),
))?,
Encoding::Explicit(n) => parser.read_explicit_element::<asn1::Tlv<'_>>(*n),
},
None => parser.read_element::<asn1::Tlv<'_>>(),
}?;
Ok(pyo3::Bound::new(
py,
Tlv {
tag: tlv.tag().value(),
data_index: tlv.full_data().len() - tlv.data().len(),
full_data: pyo3::types::PyBytes::new(py, tlv.full_data()).unbind(),
},
)?)
}

fn decode_null<'a>(
py: pyo3::Python<'a>,
parser: &mut Parser<'a>,
Expand All @@ -182,7 +209,7 @@ fn decode_choice_with_encoding<'a>(
Encoding::Implicit(_) => Err(CryptographyError::Py(
// CHOICEs cannot be IMPLICIT. See X.680 section 31.2.9.
pyo3::exceptions::PyValueError::new_err(
"invalid type definition: CHOICE fields cannot be implicitly encoded".to_string(),
"invalid type definition: CHOICE fields cannot be implicitly encoded",
),
))?,
Encoding::Explicit(n) => {
Expand Down Expand Up @@ -296,7 +323,7 @@ pub(crate) fn decode_annotated_type<'a>(
}
Err(CryptographyError::Py(
pyo3::exceptions::PyValueError::new_err(
"could not find matching variant when parsing CHOICE field".to_string(),
"could not find matching variant when parsing CHOICE field",
),
))?
}
Expand All @@ -311,13 +338,14 @@ pub(crate) fn decode_annotated_type<'a>(
Type::UtcTime() => decode_utc_time(py, parser, encoding)?.into_any(),
Type::GeneralizedTime() => decode_generalized_time(py, parser, encoding)?.into_any(),
Type::BitString() => decode_bitstring(py, parser, annotation)?.into_any(),
Type::Tlv() => decode_tlv(py, parser, encoding)?.into_any(),
Type::Null() => decode_null(py, parser, encoding)?.into_any(),
};

match &ann_type.annotation.get().default {
Some(default) if decoded.eq(default.bind(py))? => Err(CryptographyError::Py(
pyo3::exceptions::PyValueError::new_err(
"invalid DER: DEFAULT value was explicitly encoded".to_string(),
"invalid DER: DEFAULT value was explicitly encoded",
),
)),
_ => Ok(decoded),
Expand Down Expand Up @@ -352,4 +380,18 @@ mod tests {
.contains("invalid type definition: CHOICE fields cannot be implicitly encoded"));
});
}
#[test]
fn test_decode_implicit_tlv() {
pyo3::Python::initialize();
pyo3::Python::attach(|py| {
let result = asn1::parse(&[], |parser| {
let encoding = pyo3::Py::new(py, Encoding::Implicit(0)).ok();
super::decode_tlv(py, parser, &encoding)
});
assert!(result.is_err());
let error = result.unwrap_err();
assert!(format!("{error}")
.contains("invalid type definition: TLV/ANY fields cannot be implicitly encoded"));
});
}
}
1 change: 1 addition & 0 deletions src/rust/src/declarative_asn1/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl asn1::Asn1Writable for AnnotatedTypeObject<'_> {
.map_err(|_| asn1::WriteError::AllocationError)?;
write_value(writer, &bitstring, encoding)
}
Type::Tlv() => Err(asn1::WriteError::AllocationError),
Type::Null() => write_value(writer, &(), encoding),
}
}
Expand Down
90 changes: 85 additions & 5 deletions src/rust/src/declarative_asn1/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use asn1::{
IA5String as Asn1IA5String, PrintableString as Asn1PrintableString, SimpleAsn1Readable,
UtcTime as Asn1UtcTime,
};
use pyo3::types::{PyAnyMethods, PyTzInfoAccess};
use pyo3::types::{PyAnyMethods, PySequenceMethods, PyTzInfoAccess};
use pyo3::{IntoPyObject, PyTypeInfo};

use crate::error::CryptographyError;
Expand Down Expand Up @@ -51,6 +51,8 @@ pub enum Type {
GeneralizedTime(),
/// BIT STRING (`bytes`)
BitString(),
/// ANY (parsed as a TLV)
Tlv(),
/// NULL
Null(),
}
Expand Down Expand Up @@ -167,6 +169,39 @@ impl Variant {
}
}

#[pyo3::pyclass(frozen, module = "cryptography.hazmat.bindings._rust.asn1")]
pub struct Tlv {
#[pyo3(get)]
pub tag: u32,

// We store the bytes of the entire TLV, and to access the Value part
// we store the index where it starts.
pub data_index: usize,
pub full_data: pyo3::Py<pyo3::types::PyBytes>,
}

#[pyo3::pymethods]
impl Tlv {
pub fn parse<'p>(
&'p self,
py: pyo3::Python<'p>,
class: &pyo3::Bound<'p, pyo3::types::PyType>,
) -> pyo3::PyResult<pyo3::Bound<'p, pyo3::PyAny>> {
crate::declarative_asn1::asn1::decode_der(py, class, self.full_data.as_bytes(py))
}

#[getter]
pub fn data<'p>(
&self,
py: pyo3::Python<'p>,
) -> pyo3::PyResult<pyo3::Bound<'p, pyo3::types::PyMemoryView>> {
let mem_view = pyo3::types::PyMemoryView::from(self.full_data.bind(py))?;
let seq = mem_view.cast::<pyo3::types::PySequence>().unwrap();
let slice = seq.get_slice(self.data_index, seq.len()?)?;
pyo3::types::PyMemoryView::from(&slice)
}
Comment on lines +193 to +202
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems unfortunate that every time you access data, you get an allocation and copy. Should we be returning a memory view or something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way of doing that if we're slicing the bytes before we return them? The pyo3 API for memory view only supports initializing from an existing Python object: https://docs.rs/pyo3/latest/pyo3/types/struct.PyMemoryView.html#method.from

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just pass the bytes object and then slice the memoryview?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ofc, fixed!

}

// TODO: Once the minimum Python version is >= 3.10, use a `self_cell`
// to store the owned PyString along with the dependent Asn1PrintableString
// in order to avoid verifying the string twice (once during construction,
Expand Down Expand Up @@ -274,19 +309,19 @@ impl UtcTime {
fn new(py: pyo3::Python<'_>, inner: pyo3::Py<pyo3::types::PyDateTime>) -> pyo3::PyResult<Self> {
if inner.bind(py).get_tzinfo().is_none() {
return Err(pyo3::exceptions::PyValueError::new_err(
"invalid UtcTime: cannot initialize with naive datetime object",
"invalid UTCTime: cannot initialize with naive datetime object",
));
}
let (datetime, microseconds) =
crate::x509::py_to_datetime_with_microseconds(py, inner.bind(py).clone())?;

if microseconds.is_some() {
return Err(pyo3::exceptions::PyValueError::new_err(
"invalid UtcTime: fractional seconds are not supported",
"invalid UTCTime: fractional seconds are not supported",
));
}
Asn1UtcTime::new(datetime).map_err(|e| {
pyo3::exceptions::PyValueError::new_err(format!("invalid UtcTime: {e}"))
pyo3::exceptions::PyValueError::new_err(format!("invalid UTCTime: {e}"))
})?;
Ok(UtcTime { inner })
}
Expand All @@ -308,7 +343,7 @@ impl UtcTime {
) -> pyo3::PyResult<pyo3::Bound<'py, pyo3::types::PyString>> {
pyo3::types::PyString::from_fmt(
py,
format_args!("UtcTime({})", self.inner.bind(py).repr()?),
format_args!("UTCTime({})", self.inner.bind(py).repr()?),
)
}
}
Expand Down Expand Up @@ -452,6 +487,8 @@ pub fn non_root_python_to_rust<'p>(
Type::GeneralizedTime().into_pyobject(py)
} else if class.is(BitString::type_object(py)) {
Type::BitString().into_pyobject(py)
} else if class.is(Tlv::type_object(py)) {
Type::Tlv().into_pyobject(py)
} else if class.is(Null::type_object(py)) {
Type::Null().into_pyobject(py)
} else {
Expand Down Expand Up @@ -572,6 +609,18 @@ pub(crate) fn is_tag_valid_for_type(
check_tag_with_encoding(asn1::GeneralizedTime::TAG, encoding, tag)
}
Type::BitString() => check_tag_with_encoding(asn1::BitString::TAG, encoding, tag),
Type::Tlv() => {
match encoding {
Some(e) => match e.get() {
// TLVs with implicit annotations are not supported
// (they are caught first at the Python level)
Encoding::Implicit(_) => false,
Encoding::Explicit(n) => tag == asn1::explicit_tag(*n),
},
// When reading TLVs we accept any tag
None => true,
}
}
Type::Null() => check_tag_with_encoding(asn1::Null::TAG, encoding, tag),
}
}
Expand Down Expand Up @@ -689,4 +738,35 @@ mod tests {
));
})
}
#[test]
// Needed for coverage of `is_tag_valid_for_type(Type::Tlv())`, since
// `is_tag_valid_for_type` is never called with a TLV type.
fn test_tlv_is_tag_valid_for_type() {
pyo3::Python::initialize();

pyo3::Python::attach(|py| {
assert!(is_tag_valid_for_type(
py,
asn1::BigInt::TAG,
&Type::Tlv(),
&None
));

let implicit_encoding = pyo3::Py::new(py, Encoding::Implicit(3)).ok();
assert!(!is_tag_valid_for_type(
py,
asn1::BigInt::TAG,
&Type::Tlv(),
&implicit_encoding
));

let explicit_encoding = pyo3::Py::new(py, Encoding::Explicit(3)).ok();
assert!(is_tag_valid_for_type(
py,
asn1::explicit_tag(3),
&Type::Tlv(),
&explicit_encoding
));
})
}
}
2 changes: 1 addition & 1 deletion src/rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ mod _rust {
#[pymodule_export]
use crate::declarative_asn1::types::{
non_root_python_to_rust, AnnotatedType, Annotation, BitString, Encoding,
GeneralizedTime, IA5String, Null, PrintableString, Size, Type, UtcTime, Variant,
GeneralizedTime, IA5String, Null, PrintableString, Size, Tlv, Type, UtcTime, Variant,
};
}

Expand Down
Loading