From 7c2e461247c520157dbf2297a0717baa3d6acbfe Mon Sep 17 00:00:00 2001 From: David Mihalcik Date: Fri, 24 Jan 2025 15:27:46 -0500 Subject: [PATCH 1/2] chore(xtest): pyright suggestions --- xtest/abac.py | 8 +++ xtest/assertions.py | 2 +- xtest/conftest.py | 47 ++++++++++------- xtest/nano.py | 10 ++-- xtest/test_abac.py | 119 ++++++++++++++++++++++--------------------- xtest/test_legacy.py | 6 +-- xtest/test_tdfs.py | 4 +- 7 files changed, 107 insertions(+), 89 deletions(-) diff --git a/xtest/abac.py b/xtest/abac.py index f152a699..cb4edddf 100644 --- a/xtest/abac.py +++ b/xtest/abac.py @@ -58,6 +58,14 @@ class Attribute(BaseModel): active: BoolValue | None = None metadata: Metadata | None = None + @property + def value_fqns(self) -> list[str]: + if not self.values: + return [] + v = [v.fqn for v in self.values if v.fqn] + assert len(v) == len(self.values) + return v + class SubjectMappingOperatorEnum(enum.IntEnum): IN = 1 diff --git a/xtest/assertions.py b/xtest/assertions.py index 00faca5d..ee0a6e16 100644 --- a/xtest/assertions.py +++ b/xtest/assertions.py @@ -11,7 +11,7 @@ class Statement(BaseModel): format: str schema: str - value: str | dict + value: str | dict[str, str] class Binding(BaseModel): diff --git a/xtest/conftest.py b/xtest/conftest.py index 3396566d..6bdcda88 100644 --- a/xtest/conftest.py +++ b/xtest/conftest.py @@ -15,7 +15,7 @@ import abac -def pytest_addoption(parser): +def pytest_addoption(parser: pytest.Parser): parser.addoption( "--large", action="store_true", @@ -29,39 +29,48 @@ def pytest_addoption(parser): parser.addoption("--containers", help="which container formats to test") -def pytest_generate_tests(metafunc): +def pytest_generate_tests(metafunc: pytest.Metafunc): if "size" in metafunc.fixturenames: metafunc.parametrize( "size", ["large" if metafunc.config.getoption("large") else "small"], scope="session", ) + + def list_opt(name: str) -> list[str]: + v = metafunc.config.getoption(name) + if not v: + return [] + if type(v) is not str: + raise ValueError(f"Invalid value for {name}: {v}") + return v.split() + if "encrypt_sdk" in metafunc.fixturenames: if metafunc.config.getoption("--sdks-encrypt"): - encrypt_sdks = metafunc.config.getoption("--sdks-encrypt").split() + encrypt_sdks = list_opt("--sdks-encrypt") elif metafunc.config.getoption("--sdks"): - encrypt_sdks = metafunc.config.getoption("--sdks").split() + encrypt_sdks = list_opt("--sdks") else: encrypt_sdks = ["js", "go", "java"] metafunc.parametrize("encrypt_sdk", encrypt_sdks) if "decrypt_sdk" in metafunc.fixturenames: if metafunc.config.getoption("--sdks-decrypt"): - decrypt_sdks = metafunc.config.getoption("--sdks-decrypt").split() + decrypt_sdks = list_opt("--sdks-decrypt") elif metafunc.config.getoption("--sdks"): - decrypt_sdks = metafunc.config.getoption("--sdks").split() + decrypt_sdks = list_opt("--sdks") else: decrypt_sdks = ["js", "go", "java"] metafunc.parametrize("decrypt_sdk", decrypt_sdks) if "container" in metafunc.fixturenames: if metafunc.config.getoption("--containers"): - containers = metafunc.config.getoption("--containers").split() + containers = list_opt("--containers") else: containers = ["nano", "ztdf", "nano-with-ecdsa"] metafunc.parametrize("container", containers) @pytest.fixture(scope="module") -def pt_file(tmp_dir, size): +def pt_file(tmp_dir: str, size: str): pt_file = f"{tmp_dir}test-plain-{size}.txt" length = (5 * 2**30) if size == "large" else 128 with open(pt_file, "w") as f: @@ -305,7 +314,7 @@ def one_attribute_attr_kas_grant( otdfctl: abac.OpentdfCommandLineTool, kas_url_attr: str, temporary_namespace: abac.Namespace, -): +) -> abac.Attribute: anyof = otdfctl.attribute_create( temporary_namespace, "attrgrant", abac.AttributeRule.ANY_OF, ["alpha"] ) @@ -350,7 +359,7 @@ def attr_and_value_kas_grants_or( kas_url_attr: str, kas_url_value1: str, temporary_namespace: abac.Namespace, -): +) -> abac.Attribute: anyof = otdfctl.attribute_create( temporary_namespace, "attrorvalgrant", @@ -404,7 +413,7 @@ def attr_and_value_kas_grants_and( kas_url_attr: str, kas_url_value1: str, temporary_namespace: abac.Namespace, -): +) -> abac.Attribute: allof = otdfctl.attribute_create( temporary_namespace, "attrandvalgrant", @@ -459,7 +468,7 @@ def one_attribute_ns_kas_grant( otdfctl: abac.OpentdfCommandLineTool, kas_url_ns: str, temporary_namespace: abac.Namespace, -): +) -> abac.Attribute: anyof = otdfctl.attribute_create( temporary_namespace, "nsgrant", abac.AttributeRule.ANY_OF, ["alpha"] ) @@ -503,7 +512,7 @@ def ns_and_value_kas_grants_or( otdfctl: abac.OpentdfCommandLineTool, kas_url_value1: str, kas_url_ns: str, -): +) -> abac.Attribute: temp_namespace = create_temp_namesapce(otdfctl) anyof = otdfctl.attribute_create( temp_namespace, @@ -557,7 +566,7 @@ def ns_and_value_kas_grants_and( otdfctl: abac.OpentdfCommandLineTool, kas_url_value1: str, kas_url_ns: str, -): +) -> abac.Attribute: temp_namespace = create_temp_namesapce(otdfctl) allof = otdfctl.attribute_create( temp_namespace, @@ -609,12 +618,12 @@ def ns_and_value_kas_grants_and( @pytest.fixture(scope="module") -def hs256_key(): +def hs256_key() -> str: return base64.b64encode(secrets.token_bytes(32)).decode("ascii") @pytest.fixture(scope="module") -def rs256_keys(): +def rs256_keys() -> tuple[str, str]: # Generate an RSA private key private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) @@ -670,7 +679,7 @@ def assertion_file_no_keys(): @pytest.fixture(scope="module") -def assertion_file_rs_and_hs_keys(hs256_key, rs256_keys): +def assertion_file_rs_and_hs_keys(hs256_key: str, rs256_keys: tuple[str, str]): rs256_private, _ = rs256_keys assertion_list = [ assertions.Assertion( @@ -709,11 +718,11 @@ def assertion_file_rs_and_hs_keys(hs256_key, rs256_keys): def write_assertion_verification_keys_to_file( file_name: str, - assertion_verificaiton_keys: assertions.AssertionVerificationKeys = None, + assertion_verification_keys: assertions.AssertionVerificationKeys, ): as_file = f"{tmp_dir}test-assertion-verification-{file_name}.json" assertion_verification_json = json.dumps( - to_jsonable_python(assertion_verificaiton_keys, exclude_none=True) + to_jsonable_python(assertion_verification_keys, exclude_none=True) ) with open(as_file, "w") as f: f.write(assertion_verification_json) diff --git a/xtest/nano.py b/xtest/nano.py index ad270f18..8521b41e 100644 --- a/xtest/nano.py +++ b/xtest/nano.py @@ -53,7 +53,7 @@ class EccMode(ct.EnumBase): secp256k1 = 3 @property - def bit_length(self) -> int: + def mode_bit_length(self) -> int: if self in (EccMode.secp256r1, EccMode.secp256k1): return 256 if self == EccMode.secp384r1: @@ -64,7 +64,7 @@ def bit_length(self) -> int: @property def byte_length(self) -> int: - return (self.bit_length + 7) >> 3 + return (self.mode_bit_length + 7) >> 3 @property def signature_length(self) -> int: @@ -74,7 +74,7 @@ def signature_length(self) -> int: def public_key_length(self) -> int: return self.byte_length + 1 - def __getitem__(self, key): + def __getitem__(self, key: str): return getattr(self, key) @@ -103,7 +103,7 @@ def mac_length(self) -> int: return 16 raise ValueError("invalid EC cipher mode") - def __getitem__(self, key): + def __getitem__(self, key: str): return getattr(self, key) @@ -142,7 +142,7 @@ def size(self) -> int: return 32 return 0 - def __getitem__(self, key): + def __getitem__(self, key: str): return getattr(self, key) diff --git a/xtest/test_abac.py b/xtest/test_abac.py index 019de338..65385cde 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -2,17 +2,18 @@ import pytest import tdfs +from xtest.abac import Attribute -cipherTexts = {} +cipherTexts: dict[str, str] = {} def test_autoconfigure_one_attribute( - attribute_single_kas_grant, + attribute_single_kas_grant: Attribute, encrypt_sdk: tdfs.sdk_type, decrypt_sdk: tdfs.sdk_type, - tmp_dir, - pt_file, + tmp_dir: str, + pt_file: str, kas_url_value1: str, ): global counter @@ -33,7 +34,7 @@ def test_autoconfigure_one_attribute( ct_file, mime_type="text/plain", fmt="ztdf", - attr_values=[attribute_single_kas_grant.values[0].fqn], + attr_values=attribute_single_kas_grant.value_fqns, ) cipherTexts[sample_name] = ct_file manifest = tdfs.manifest(ct_file) @@ -46,11 +47,11 @@ def test_autoconfigure_one_attribute( def test_autoconfigure_two_kas_or( - attribute_two_kas_grant_or, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + attribute_two_kas_grant_or: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_value1: str, kas_url_value2: str, ): @@ -69,8 +70,8 @@ def test_autoconfigure_two_kas_or( mime_type="text/plain", fmt="ztdf", attr_values=[ - attribute_two_kas_grant_or.values[0].fqn, - attribute_two_kas_grant_or.values[1].fqn, + attribute_two_kas_grant_or.value_fqns[0], + attribute_two_kas_grant_or.value_fqns[1], ], ) cipherTexts[sample_name] = ct_file @@ -105,11 +106,11 @@ def skip_hexless_skew(encrypt_sdk: tdfs.sdk_type, decrypt_sdk: tdfs.sdk_type): def test_autoconfigure_double_kas_and( - attribute_two_kas_grant_and, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + attribute_two_kas_grant_and: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_value1: str, kas_url_value2: str, ): @@ -128,8 +129,8 @@ def test_autoconfigure_double_kas_and( mime_type="text/plain", fmt="ztdf", attr_values=[ - attribute_two_kas_grant_and.values[0].fqn, - attribute_two_kas_grant_and.values[1].fqn, + attribute_two_kas_grant_and.value_fqns[0], + attribute_two_kas_grant_and.value_fqns[1], ], ) cipherTexts[sample_name] = ct_file @@ -149,11 +150,11 @@ def test_autoconfigure_double_kas_and( def test_autoconfigure_one_attribute_attr_grant( - one_attribute_attr_kas_grant, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + one_attribute_attr_kas_grant: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_attr: str, ): skip_if_unsupported(encrypt_sdk, "autoconfigure") @@ -171,7 +172,7 @@ def test_autoconfigure_one_attribute_attr_grant( mime_type="text/plain", fmt="ztdf", attr_values=[ - one_attribute_attr_kas_grant.values[0].fqn, + one_attribute_attr_kas_grant.value_fqns[0], ], ) cipherTexts[sample_name] = ct_file @@ -185,11 +186,11 @@ def test_autoconfigure_one_attribute_attr_grant( def test_autoconfigure_two_kas_or_attr_and_value_grant( - attr_and_value_kas_grants_or, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + attr_and_value_kas_grants_or: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_attr: str, kas_url_value1: str, ): @@ -208,8 +209,8 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( mime_type="text/plain", fmt="ztdf", attr_values=[ - attr_and_value_kas_grants_or.values[0].fqn, - attr_and_value_kas_grants_or.values[1].fqn, + attr_and_value_kas_grants_or.value_fqns[0], + attr_and_value_kas_grants_or.value_fqns[1], ], ) cipherTexts[sample_name] = ct_file @@ -229,11 +230,11 @@ def test_autoconfigure_two_kas_or_attr_and_value_grant( def test_autoconfigure_two_kas_and_attr_and_value_grant( - attr_and_value_kas_grants_and, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + attr_and_value_kas_grants_and: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_attr: str, kas_url_value1: str, ): @@ -252,8 +253,8 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( mime_type="text/plain", fmt="ztdf", attr_values=[ - attr_and_value_kas_grants_and.values[0].fqn, - attr_and_value_kas_grants_and.values[1].fqn, + attr_and_value_kas_grants_and.value_fqns[0], + attr_and_value_kas_grants_and.value_fqns[1], ], ) cipherTexts[sample_name] = ct_file @@ -273,11 +274,11 @@ def test_autoconfigure_two_kas_and_attr_and_value_grant( def test_autoconfigure_one_attribute_ns_grant( - one_attribute_ns_kas_grant, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + one_attribute_ns_kas_grant: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_ns: str, ): skip_if_unsupported(encrypt_sdk, "autoconfigure", "ns_grants") @@ -295,7 +296,7 @@ def test_autoconfigure_one_attribute_ns_grant( mime_type="text/plain", fmt="ztdf", attr_values=[ - one_attribute_ns_kas_grant.values[0].fqn, + one_attribute_ns_kas_grant.value_fqns[0], ], ) cipherTexts[sample_name] = ct_file @@ -309,11 +310,11 @@ def test_autoconfigure_one_attribute_ns_grant( def test_autoconfigure_two_kas_or_ns_and_value_grant( - ns_and_value_kas_grants_or, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + ns_and_value_kas_grants_or: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_ns: str, kas_url_value1: str, ): @@ -332,8 +333,8 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( mime_type="text/plain", fmt="ztdf", attr_values=[ - ns_and_value_kas_grants_or.values[0].fqn, - ns_and_value_kas_grants_or.values[1].fqn, + ns_and_value_kas_grants_or.value_fqns[0], + ns_and_value_kas_grants_or.value_fqns[1], ], ) cipherTexts[sample_name] = ct_file @@ -353,11 +354,11 @@ def test_autoconfigure_two_kas_or_ns_and_value_grant( def test_autoconfigure_two_kas_and_ns_and_value_grant( - ns_and_value_kas_grants_and, - encrypt_sdk, - decrypt_sdk, - tmp_dir, - pt_file, + ns_and_value_kas_grants_and: Attribute, + encrypt_sdk: tdfs.sdk_type, + decrypt_sdk: tdfs.sdk_type, + tmp_dir: str, + pt_file: str, kas_url_ns: str, kas_url_value1: str, ): @@ -376,8 +377,8 @@ def test_autoconfigure_two_kas_and_ns_and_value_grant( mime_type="text/plain", fmt="ztdf", attr_values=[ - ns_and_value_kas_grants_and.values[0].fqn, - ns_and_value_kas_grants_and.values[1].fqn, + ns_and_value_kas_grants_and.value_fqns[0], + ns_and_value_kas_grants_and.value_fqns[1], ], ) cipherTexts[sample_name] = ct_file diff --git a/xtest/test_legacy.py b/xtest/test_legacy.py index f68e6bb3..546a81c6 100644 --- a/xtest/test_legacy.py +++ b/xtest/test_legacy.py @@ -13,7 +13,7 @@ def get_golden_file(golden_file_name: str) -> str: def test_decrypt_small( decrypt_sdk: tdfs.sdk_type, - tmp_dir, + tmp_dir: str, ): ct_file = get_golden_file("small-java-4.3.0-e0f8caf.tdf") rt_file = os.path.join(tmp_dir, "small-java.untdf") @@ -28,7 +28,7 @@ def test_decrypt_small( def test_decrypt_no_splitid( decrypt_sdk: tdfs.sdk_type, - tmp_dir, + tmp_dir: str, ): ct_file = get_golden_file("no-splitids-java.tdf") rt_file = os.path.join(tmp_dir, "no-splitids-java.untdf") @@ -43,7 +43,7 @@ def test_decrypt_no_splitid( def test_decrypt_big( decrypt_sdk: tdfs.sdk_type, - tmp_dir, + tmp_dir: str, ): ct_file = get_golden_file("big-java-4.3.0-e0f8caf.tdf") rt_file = os.path.join(tmp_dir, "big-java.untdf") diff --git a/xtest/test_tdfs.py b/xtest/test_tdfs.py index 9641063a..b2c00838 100644 --- a/xtest/test_tdfs.py +++ b/xtest/test_tdfs.py @@ -218,11 +218,11 @@ def change_policy_binding(manifest: tdfs.Manifest) -> tdfs.Manifest: if isinstance(pb, tdfs.PolicyBinding): hash = pb.hash altered_hash = base64.b64encode(change_last_three(base64.b64decode(hash))) - pb.hash = altered_hash + pb.hash = str(altered_hash) manifest.encryptionInformation.keyAccess[0].policyBinding = pb else: altered_hash = base64.b64encode(change_last_three(base64.b64decode(pb))) - manifest.encryptionInformation.keyAccess[0].policyBinding = altered_hash + manifest.encryptionInformation.keyAccess[0].policyBinding = str(altered_hash) return manifest From c6fe0cc01e0474cfdc605378a19be90f4eb7e8e8 Mon Sep 17 00:00:00 2001 From: David Mihalcik Date: Mon, 27 Jan 2025 09:39:30 -0500 Subject: [PATCH 2/2] Update test_abac.py --- xtest/test_abac.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xtest/test_abac.py b/xtest/test_abac.py index 65385cde..1737475d 100644 --- a/xtest/test_abac.py +++ b/xtest/test_abac.py @@ -2,7 +2,7 @@ import pytest import tdfs -from xtest.abac import Attribute +from abac import Attribute cipherTexts: dict[str, str] = {}