From d337467714ca9737d614432b2c0dd7ab1373a3f7 Mon Sep 17 00:00:00 2001 From: Tobias Oberstein Date: Thu, 11 Aug 2022 18:01:19 +0200 Subject: [PATCH] expand EIP712AuthorityCertificate; more tests (#1589) expand EIP712AuthorityCertificate; more tests --- autobahn/_version.py | 2 +- autobahn/xbr/__init__.py | 8 +- autobahn/xbr/_eip712_authority_certificate.py | 210 +++++++++++++++--- autobahn/xbr/_eip712_base.py | 17 ++ autobahn/xbr/_eip712_certificate_chain.py | 136 ++++++------ autobahn/xbr/_eip712_delegate_certificate.py | 131 +++++++++-- autobahn/xbr/test/test_xbr_eip712.py | 76 ++++++- 7 files changed, 453 insertions(+), 127 deletions(-) diff --git a/autobahn/_version.py b/autobahn/_version.py index 3b4e125ef..a742bd558 100644 --- a/autobahn/_version.py +++ b/autobahn/_version.py @@ -24,6 +24,6 @@ # ############################################################################### -__version__ = '22.7.1' +__version__ = '22.8.1.dev1' __build__ = '00000000-0000000' diff --git a/autobahn/xbr/__init__.py b/autobahn/xbr/__init__.py index 0ecb2ae48..fe392b79b 100644 --- a/autobahn/xbr/__init__.py +++ b/autobahn/xbr/__init__.py @@ -76,13 +76,12 @@ from autobahn.xbr._schema import FbsSchema, FbsObject, FbsType, FbsRPCCall, FbsEnum, FbsService, FbsEnumValue, \ FbsAttribute, FbsField, FbsRepository # noqa from autobahn.xbr._wallet import stretch_argon2_secret, expand_argon2_secret, pkm_from_argon2_secret # noqa - - HAS_XBR = True - from autobahn.xbr._frealm import FederatedRealm, Seeder # noqa - from autobahn.xbr._secmod import EthereumKey # noqa + from autobahn.xbr._secmod import EthereumKey, SecurityModuleMemory # noqa from autobahn.xbr._userkey import UserKey # noqa + HAS_XBR = True + if not hasattr(abi, 'collapse_type'): def collapse_type(base, sub, arrlist): @@ -414,6 +413,7 @@ def account_from_ethkey(ethkey: bytes) -> eth_account.account.Account: 'FederatedRealm', 'Seeder', 'EthereumKey', + 'SecurityModuleMemory', ) except (ImportError, FileNotFoundError) as e: diff --git a/autobahn/xbr/_eip712_authority_certificate.py b/autobahn/xbr/_eip712_authority_certificate.py index 63bc41b5b..fc64cfee6 100644 --- a/autobahn/xbr/_eip712_authority_certificate.py +++ b/autobahn/xbr/_eip712_authority_certificate.py @@ -23,10 +23,18 @@ # THE SOFTWARE. # ############################################################################### - +import os.path +import pprint from binascii import a2b_hex +from typing import Dict, Any, Optional, List + +import web3 +import cbor2 + +from py_eth_sig_utils.eip712 import encode_typed_data from autobahn.wamp.message import _URI_PAT_REALM_NAME_ETH +from autobahn.xbr._secmod import EthereumKey from ._eip712_base import sign, recover, is_chain_id, is_address, is_block_number, is_signature, is_eth_privkey from ._eip712_certificate import EIP712Certificate @@ -198,6 +206,7 @@ class EIP712AuthorityCertificate(EIP712Certificate): CAPABILITY_CONSUMER = 32 __slots__ = ( + # EIP712 attributes 'chainId', 'verifyingContract', 'validFrom', @@ -206,16 +215,75 @@ class EIP712AuthorityCertificate(EIP712Certificate): 'realm', 'capabilities', 'meta', + + # additional attributes + 'signatures', + 'hash', ) def __init__(self, chainId: int, verifyingContract: bytes, validFrom: int, issuer: bytes, subject: bytes, - realm: bytes, capabilities: int, meta: str): + realm: bytes, capabilities: int, meta: str, + signatures: Optional[List[bytes]] = None): super().__init__(chainId, verifyingContract, validFrom) self.issuer = issuer self.subject = subject self.realm = realm self.capabilities = capabilities self.meta = meta + self.signatures = signatures + eip712 = create_eip712_authority_certificate(chainId, + verifyingContract, + validFrom, + issuer, + subject, + realm, + capabilities, + meta) + self.hash = encode_typed_data(eip712) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, self.__class__): + return False + if not EIP712AuthorityCertificate.__eq__(self, other): + return False + if other.chainId != self.chainId: + return False + if other.verifyingContract != self.verifyingContract: + return False + if other.validFrom != self.validFrom: + return False + if other.issuer != self.issuer: + return False + if other.subject != self.subject: + return False + if other.realm != self.realm: + return False + if other.capabilities != self.capabilities: + return False + if other.meta != self.meta: + return False + if other.signatures != self.signatures: + return False + if other.hash != self.hash: + return False + return True + + def __ne__(self, other: Any) -> bool: + return not self.__eq__(other) + + def __str__(self) -> str: + return pprint.pformat(self.marshal()) + + def sign(self, key: EthereumKey, binary: bool = False) -> bytes: + eip712 = create_eip712_authority_certificate(self.chainId, + self.verifyingContract, + self.validFrom, + self.issuer, + self.subject, + self.realm, + self.capabilities, + self.meta) + return key.sign_typed_data(eip712, binary=binary) def recover(self, signature: bytes) -> bytes: return recover_eip712_authority_certificate(self.chainId, @@ -228,15 +296,45 @@ def recover(self, signature: bytes) -> bytes: self.meta, signature) + def marshal(self, binary: bool = False) -> Dict[str, Any]: + obj = create_eip712_authority_certificate(chainId=self.chainId, + verifyingContract=self.verifyingContract, + validFrom=self.validFrom, + issuer=self.issuer, + subject=self.subject, + realm=self.realm, + capabilities=self.capabilities, + meta=self.meta) + if not binary: + obj['message']['verifyingContract'] = web3.Web3.toChecksumAddress(obj['message']['verifyingContract']) if obj['message']['verifyingContract'] else None + obj['message']['issuer'] = web3.Web3.toChecksumAddress(obj['message']['issuer']) if obj['message']['issuer'] else None + obj['message']['subject'] = web3.Web3.toChecksumAddress(obj['message']['subject']) if obj['message']['subject'] else None + obj['message']['realm'] = web3.Web3.toChecksumAddress(obj['message']['realm']) if obj['message']['realm'] else None + return obj + @staticmethod - def parse(data) -> 'EIP712AuthorityCertificate': + def parse(obj, binary: bool = False) -> 'EIP712AuthorityCertificate': + if type(obj) != dict: + raise ValueError('invalid type {} for object in EIP712AuthorityCertificate.parse'.format(type(obj))) + + primaryType = obj.get('primaryType', None) + if primaryType != 'EIP712AuthorityCertificate': + raise ValueError('invalid primaryType "{}" - expected "EIP712AuthorityCertificate"'.format(primaryType)) + + # FIXME: check EIP712 types, domain + + data = obj.get('message', None) if type(data) != dict: raise ValueError('invalid type {} for EIP712AuthorityCertificate'.format(type(data))) for k in data: - if k not in ['chainId', 'verifyingContract', 'validFrom', 'issuer', 'subject', + if k not in ['type', 'chainId', 'verifyingContract', 'validFrom', 'issuer', 'subject', 'realm', 'capabilities', 'meta']: raise ValueError('invalid attribute "{}" in EIP712AuthorityCertificate'.format(k)) + _type = data.get('type', None) + if _type and _type != 'EIP712AuthorityCertificate': + raise ValueError('unexpected type "{}" in EIP712AuthorityCertificate'.format(_type)) + chainId = data.get('chainId', None) if chainId is None: raise ValueError('missing chainId in EIP712AuthorityCertificate') @@ -246,13 +344,20 @@ def parse(data) -> 'EIP712AuthorityCertificate': verifyingContract = data.get('verifyingContract', None) if verifyingContract is None: raise ValueError('missing verifyingContract in EIP712AuthorityCertificate') - if type(verifyingContract) != str: - raise ValueError( - 'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract))) - if not _URI_PAT_REALM_NAME_ETH.match(verifyingContract): - raise ValueError( - 'invalid value "{}" for verifyingContract in EIP712AuthorityCertificate'.format(verifyingContract)) - verifyingContract = a2b_hex(verifyingContract[2:]) + if binary: + if type(verifyingContract) != bytes: + raise ValueError( + 'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract))) + if len(verifyingContract) != 20: + raise ValueError('invalid value length {} of verifyingContract'.format(len(verifyingContract))) + else: + if type(verifyingContract) != str: + raise ValueError( + 'invalid type {} for verifyingContract in EIP712AuthorityCertificate'.format(type(verifyingContract))) + if not _URI_PAT_REALM_NAME_ETH.match(verifyingContract): + raise ValueError( + 'invalid value "{}" for verifyingContract in EIP712AuthorityCertificate'.format(verifyingContract)) + verifyingContract = a2b_hex(verifyingContract[2:]) validFrom = data.get('validFrom', None) if validFrom is None: @@ -263,29 +368,50 @@ def parse(data) -> 'EIP712AuthorityCertificate': issuer = data.get('issuer', None) if issuer is None: raise ValueError('missing issuer in EIP712AuthorityCertificate') - if type(issuer) != str: - raise ValueError('invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer))) - if not _URI_PAT_REALM_NAME_ETH.match(issuer): - raise ValueError('invalid value "{}" for issuer in EIP712AuthorityCertificate'.format(issuer)) - issuer = a2b_hex(issuer[2:]) + if binary: + if type(issuer) != bytes: + raise ValueError( + 'invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer))) + if len(issuer) != 20: + raise ValueError('invalid value length {} of issuer'.format(len(issuer))) + else: + if type(issuer) != str: + raise ValueError('invalid type {} for issuer in EIP712AuthorityCertificate'.format(type(issuer))) + if not _URI_PAT_REALM_NAME_ETH.match(issuer): + raise ValueError('invalid value "{}" for issuer in EIP712AuthorityCertificate'.format(issuer)) + issuer = a2b_hex(issuer[2:]) subject = data.get('subject', None) if subject is None: raise ValueError('missing subject in EIP712AuthorityCertificate') - if type(subject) != str: - raise ValueError('invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject))) - if not _URI_PAT_REALM_NAME_ETH.match(subject): - raise ValueError('invalid value "{}" for subject in EIP712AuthorityCertificate'.format(subject)) - subject = a2b_hex(subject[2:]) + if binary: + if type(subject) != bytes: + raise ValueError( + 'invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject))) + if len(subject) != 20: + raise ValueError('invalid value length {} of verifyingContract'.format(len(subject))) + else: + if type(subject) != str: + raise ValueError('invalid type {} for subject in EIP712AuthorityCertificate'.format(type(subject))) + if not _URI_PAT_REALM_NAME_ETH.match(subject): + raise ValueError('invalid value "{}" for subject in EIP712AuthorityCertificate'.format(subject)) + subject = a2b_hex(subject[2:]) realm = data.get('realm', None) if realm is None: raise ValueError('missing realm in EIP712AuthorityCertificate') - if type(realm) != str: - raise ValueError('invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm))) - if not _URI_PAT_REALM_NAME_ETH.match(realm): - raise ValueError('invalid value "{}" for realm in EIP712AuthorityCertificate'.format(realm)) - realm = a2b_hex(realm[2:]) + if binary: + if type(realm) != bytes: + raise ValueError( + 'invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm))) + if len(realm) != 20: + raise ValueError('invalid value length {} of realm'.format(len(realm))) + else: + if type(realm) != str: + raise ValueError('invalid type {} for realm in EIP712AuthorityCertificate'.format(type(realm))) + if not _URI_PAT_REALM_NAME_ETH.match(realm): + raise ValueError('invalid value "{}" for realm in EIP712AuthorityCertificate'.format(realm)) + realm = a2b_hex(realm[2:]) capabilities = data.get('capabilities', None) if capabilities is None: @@ -302,3 +428,35 @@ def parse(data) -> 'EIP712AuthorityCertificate': obj = EIP712AuthorityCertificate(chainId=chainId, verifyingContract=verifyingContract, validFrom=validFrom, issuer=issuer, subject=subject, realm=realm, capabilities=capabilities, meta=meta) return obj + + def save(self, filename: str) -> int: + """ + Save certificate to file. File format (serialized as CBOR): + + [cert_hash: bytes, cert_eip712: Dict[str, Any], cert_signatures: List[bytes]] + + :param filename: + :return: + """ + cert_obj = [self.hash, self.marshal(binary=True), self.signatures or []] + with open(filename, 'wb') as f: + data = cbor2.dumps(cert_obj) + f.write(data) + return len(data) + + @staticmethod + def load(filename) -> 'EIP712AuthorityCertificate': + """ + Load certificate from file. + + :param filename: + :return: + """ + if not os.path.isfile(filename): + raise RuntimeError('cannot create EIP712AuthorityCertificate from filename "{}": not a file'.format(filename)) + with open(filename, 'rb') as f: + cert_hash, cert_eip712, cert_signatures = cbor2.loads(f.read()) + cert = EIP712AuthorityCertificate.parse(cert_eip712, binary=True) + assert cert_hash == cert.hash + cert.signatures = cert_signatures + return cert diff --git a/autobahn/xbr/_eip712_base.py b/autobahn/xbr/_eip712_base.py index 0a3c41558..a3bbd79e0 100644 --- a/autobahn/xbr/_eip712_base.py +++ b/autobahn/xbr/_eip712_base.py @@ -31,6 +31,23 @@ _EIP712_SIG_LEN = 32 + 32 + 1 +def _hash(data) -> bytes: + """ + keccak256(abi.encode( + EIP712_MEMBER_REGISTER_TYPEHASH, + obj.chainId, + obj.verifyingContract, + obj.member, + obj.registered, + keccak256(bytes(obj.eula)), + keccak256(bytes(obj.profile)) + )); + + :param data: + :return: + """ + + def sign(eth_privkey: bytes, data: Dict[str, Any]) -> bytes: """ Sign the given data using the given Ethereum private key. diff --git a/autobahn/xbr/_eip712_certificate_chain.py b/autobahn/xbr/_eip712_certificate_chain.py index 0d341cc76..760298735 100644 --- a/autobahn/xbr/_eip712_certificate_chain.py +++ b/autobahn/xbr/_eip712_certificate_chain.py @@ -24,7 +24,6 @@ # ############################################################################### -from binascii import a2b_hex from typing import List, Tuple, Dict, Any, Union from autobahn.xbr._eip712_delegate_certificate import EIP712DelegateCertificate @@ -40,80 +39,79 @@ def parse_certificate_chain(certificates: List[Tuple[Dict[str, Any], str]]) \ """ # parse the whole certificate chain cert_chain = [] - cert_sigs = [] - for cert_data, cert_sig in certificates: + for cert_hash, cert_data, cert_sig in certificates: if cert_data['primaryType'] == 'EIP712DelegateCertificate': - cert = EIP712DelegateCertificate.parse(cert_data['message']) + cert = EIP712DelegateCertificate.parse(cert_data) elif cert_data['primaryType'] == 'EIP712AuthorityCertificate': - cert = EIP712AuthorityCertificate.parse(cert_data['message']) + cert = EIP712AuthorityCertificate.parse(cert_data) else: assert False, 'should not arrive here' cert_chain.append(cert) - cert_sigs.append(cert_sig) - # FIXME: allow length 2 and length > 3 - assert len(cert_chain) == 3 - - # Certificate Chain Rules (CCR): + # FIXME: proper adaptive implementation of certificate chain rules checking + # if False: + # assert len(cert_chain) == 3 # - # 1. **CCR-1**: The `chainId` and `verifyingContract` must match for all certificates to what we expect, and `validFrom` before current block number on the respective chain. - # 2. **CCR-2**: The `realm` must match for all certificates to the respective realm. - # 3. **CCR-3**: The type of the first certificate in the chain must be a `EIP712DelegateCertificate`, and all subsequent certificates must be of type `EIP712AuthorityCertificate`. - # 4. **CCR-4**: The last certificate must be self-signed (`issuer` equals `subject`), it is a root CA certificate. - # 5. **CCR-5**: The intermediate certificate's `issuer` must be equal to the `subject` of the previous certificate. - # 6. **CCR-6**: The root certificate must be `validFrom` before the intermediate certificate - # 7. **CCR-7**: The `capabilities` of intermediate certificate must be a subset of the root cert - # 8. **CCR-8**: The intermediate certificate's `subject` must be the delegate certificate `delegate` - # 9. **CCR-9**: The intermediate certificate must be `validFrom` before the delegate certificate - # 10. **CCR-10**: The root certificate's signature must be valid and signed by the root certificate's `issuer`. - # 11. **CCR-11**: The intermediate certificate's signature must be valid and signed by the intermediate certificate's `issuer`. - # 12. **CCR-12**: The delegate certificate's signature must be valid and signed by the `delegate`. - - # CCR-1 - chainId = 1 - verifyingContract = a2b_hex('0xf766Dc789CF04CD18aE75af2c5fAf2DA6650Ff57'[2:]) - for cert in cert_chain: - assert cert.chainId == chainId - assert cert.verifyingContract == verifyingContract - - # CCR-2 - realm = a2b_hex('0xA6e693CC4A2b4F1400391a728D26369D9b82ef96'[2:]) - for cert in cert_chain[1:]: - assert cert.realm == realm - - # CCR-3 - assert isinstance(cert_chain[0], EIP712DelegateCertificate) - for i in [1, len(cert_chain) - 1]: - assert isinstance(cert_chain[i], EIP712AuthorityCertificate) - - # CCR-4 - assert cert_chain[2].subject == cert_chain[2].issuer - - # CCR-5 - assert cert_chain[1].issuer == cert_chain[2].subject - - # CCR-6 - assert cert_chain[2].validFrom <= cert_chain[1].validFrom - - # CCR-7 - assert cert_chain[2].capabilities == cert_chain[2].capabilities | cert_chain[1].capabilities - - # CCR-8 - assert cert_chain[1].subject == cert_chain[0].delegate - - # CCR-9 - assert cert_chain[1].validFrom <= cert_chain[0].validFrom - - # CCR-10 - _issuer = cert_chain[2].recover(a2b_hex(cert_sigs[2])) - assert _issuer == cert_chain[2].issuer - - # CCR-11 - _issuer = cert_chain[1].recover(a2b_hex(cert_sigs[1])) - assert _issuer == cert_chain[1].issuer - - # CCR-12 - _issuer = cert_chain[0].recover(a2b_hex(cert_sigs[0])) - assert _issuer == cert_chain[0].delegate + # # Certificate Chain Rules (CCR): + # # + # # 1. **CCR-1**: The `chainId` and `verifyingContract` must match for all certificates to what we expect, and `validFrom` before current block number on the respective chain. + # # 2. **CCR-2**: The `realm` must match for all certificates to the respective realm. + # # 3. **CCR-3**: The type of the first certificate in the chain must be a `EIP712DelegateCertificate`, and all subsequent certificates must be of type `EIP712AuthorityCertificate`. + # # 4. **CCR-4**: The last certificate must be self-signed (`issuer` equals `subject`), it is a root CA certificate. + # # 5. **CCR-5**: The intermediate certificate's `issuer` must be equal to the `subject` of the previous certificate. + # # 6. **CCR-6**: The root certificate must be `validFrom` before the intermediate certificate + # # 7. **CCR-7**: The `capabilities` of intermediate certificate must be a subset of the root cert + # # 8. **CCR-8**: The intermediate certificate's `subject` must be the delegate certificate `delegate` + # # 9. **CCR-9**: The intermediate certificate must be `validFrom` before the delegate certificate + # # 10. **CCR-10**: The root certificate's signature must be valid and signed by the root certificate's `issuer`. + # # 11. **CCR-11**: The intermediate certificate's signature must be valid and signed by the intermediate certificate's `issuer`. + # # 12. **CCR-12**: The delegate certificate's signature must be valid and signed by the `delegate`. + # + # # CCR-1 + # chainId = 1 + # verifyingContract = a2b_hex('0xf766Dc789CF04CD18aE75af2c5fAf2DA6650Ff57'[2:]) + # for cert in cert_chain: + # assert cert.chainId == chainId + # assert cert.verifyingContract == verifyingContract + # + # # CCR-2 + # realm = a2b_hex('0xA6e693CC4A2b4F1400391a728D26369D9b82ef96'[2:]) + # for cert in cert_chain[1:]: + # assert cert.realm == realm + # + # # CCR-3 + # assert isinstance(cert_chain[0], EIP712DelegateCertificate) + # for i in [1, len(cert_chain) - 1]: + # assert isinstance(cert_chain[i], EIP712AuthorityCertificate) + # + # # CCR-4 + # assert cert_chain[2].subject == cert_chain[2].issuer + # + # # CCR-5 + # assert cert_chain[1].issuer == cert_chain[2].subject + # + # # CCR-6 + # assert cert_chain[2].validFrom <= cert_chain[1].validFrom + # + # # CCR-7 + # assert cert_chain[2].capabilities == cert_chain[2].capabilities | cert_chain[1].capabilities + # + # # CCR-8 + # assert cert_chain[1].subject == cert_chain[0].delegate + # + # # CCR-9 + # assert cert_chain[1].validFrom <= cert_chain[0].validFrom + # + # # CCR-10 + # _issuer = cert_chain[2].recover(a2b_hex(cert_sigs[2])) + # assert _issuer == cert_chain[2].issuer + # + # # CCR-11 + # _issuer = cert_chain[1].recover(a2b_hex(cert_sigs[1])) + # assert _issuer == cert_chain[1].issuer + # + # # CCR-12 + # _issuer = cert_chain[0].recover(a2b_hex(cert_sigs[0])) + # assert _issuer == cert_chain[0].delegate return cert_chain diff --git a/autobahn/xbr/_eip712_delegate_certificate.py b/autobahn/xbr/_eip712_delegate_certificate.py index 3cb0c6bb2..1cd5949e4 100644 --- a/autobahn/xbr/_eip712_delegate_certificate.py +++ b/autobahn/xbr/_eip712_delegate_certificate.py @@ -24,10 +24,15 @@ # ############################################################################### -from binascii import a2b_hex -from typing import Dict, Any +import os +import pprint +from typing import Dict, Any, Optional, List +from binascii import a2b_hex, b2a_hex -import json +import web3 +import cbor2 + +from py_eth_sig_utils.eip712 import encode_typed_data from autobahn.wamp.message import _URI_PAT_REALM_NAME_ETH from autobahn.xbr._secmod import EthereumKey @@ -184,6 +189,7 @@ def recover_eip712_delegate_certificate(chainId: int, class EIP712DelegateCertificate(EIP712Certificate): __slots__ = ( + # EIP712 attributes 'chainId', 'verifyingContract', 'validFrom', @@ -191,17 +197,61 @@ class EIP712DelegateCertificate(EIP712Certificate): 'csPubKey', 'bootedAt', 'meta', + + # additional attributes + 'signatures', + 'hash', ) def __init__(self, chainId: int, verifyingContract: bytes, validFrom: int, delegate: bytes, csPubKey: bytes, - bootedAt: int, meta: str): + bootedAt: int, meta: str, signatures: Optional[List[bytes]] = None): super().__init__(chainId, verifyingContract, validFrom) self.delegate = delegate self.csPubKey = csPubKey self.bootedAt = bootedAt self.meta = meta - - def sign(self, key: EthereumKey) -> bytes: + self.signatures = signatures + eip712 = create_eip712_delegate_certificate(chainId, + verifyingContract, + validFrom, + delegate, + csPubKey, + bootedAt, + meta) + self.hash = encode_typed_data(eip712) + + def __eq__(self, other: Any) -> bool: + if not isinstance(other, self.__class__): + return False + if not EIP712DelegateCertificate.__eq__(self, other): + return False + if other.chainId != self.chainId: + return False + if other.verifyingContract != self.verifyingContract: + return False + if other.validFrom != self.validFrom: + return False + if other.delegate != self.delegate: + return False + if other.csPubKey != self.csPubKey: + return False + if other.bootedAt != self.bootedAt: + return False + if other.meta != self.meta: + return False + if other.signatures != self.signatures: + return False + if other.hash != self.hash: + return False + return True + + def __ne__(self, other: Any) -> bool: + return not self.__eq__(other) + + def __str__(self) -> str: + return pprint.pformat(self.marshal()) + + def sign(self, key: EthereumKey, binary: bool = False) -> bytes: eip712 = create_eip712_delegate_certificate(self.chainId, self.verifyingContract, self.validFrom, @@ -209,9 +259,7 @@ def sign(self, key: EthereumKey) -> bytes: self.csPubKey, self.bootedAt, self.meta) - # FIXME - data = json.dumps(eip712).encode() - return key.sign(data) + return key.sign_typed_data(eip712, binary=binary) def recover(self, signature: bytes) -> bytes: return recover_eip712_delegate_certificate(self.chainId, @@ -223,23 +271,42 @@ def recover(self, signature: bytes) -> bytes: self.meta, signature) - def marshal(self) -> Dict[str, Any]: - return create_eip712_delegate_certificate(self.chainId, - self.verifyingContract, - self.validFrom, - self.delegate, - self.csPubKey, - self.bootedAt, - self.meta) + def marshal(self, binary: bool = False) -> Dict[str, Any]: + obj = create_eip712_delegate_certificate(chainId=self.chainId, + verifyingContract=self.verifyingContract, + validFrom=self.validFrom, + delegate=self.delegate, + csPubKey=self.csPubKey, + bootedAt=self.bootedAt, + meta=self.meta) + if not binary: + obj['message']['verifyingContract'] = web3.Web3.toChecksumAddress(obj['message']['verifyingContract']) if obj['message']['verifyingContract'] else None + obj['message']['delegate'] = web3.Web3.toChecksumAddress(obj['message']['delegate']) if obj['message']['delegate'] else None + obj['message']['csPubKey'] = b2a_hex(obj['message']['csPubKey']).decode() if obj['message']['csPubKey'] else None + return obj @staticmethod - def parse(data) -> 'EIP712DelegateCertificate': + def parse(obj) -> 'EIP712DelegateCertificate': + if type(obj) != dict: + raise ValueError('invalid type {} for object in EIP712DelegateCertificate.parse'.format(type(obj))) + + primaryType = obj.get('primaryType', None) + if primaryType != 'EIP712DelegateCertificate': + raise ValueError('invalid primaryType "{}" - expected "EIP712DelegateCertificate"'.format(primaryType)) + + # FIXME: check EIP712 types, domain + + data = obj.get('message', None) if type(data) != dict: raise ValueError('invalid type {} for EIP712DelegateCertificate'.format(type(data))) for k in data: - if k not in ['chainId', 'verifyingContract', 'delegate', 'validFrom', 'csPubKey', 'bootedAt', 'meta']: + if k not in ['type', 'chainId', 'verifyingContract', 'delegate', 'validFrom', 'csPubKey', 'bootedAt', 'meta']: raise ValueError('invalid attribute "{}" in EIP712DelegateCertificate'.format(k)) + _type = data.get('type', None) + if _type and _type != 'EIP712DelegateCertificate': + raise ValueError('unexpected type "{}" in EIP712DelegateCertificate'.format(_type)) + chainId = data.get('chainId', None) if chainId is None: raise ValueError('missing chainId in EIP712DelegateCertificate') @@ -296,3 +363,29 @@ def parse(data) -> 'EIP712DelegateCertificate': obj = EIP712DelegateCertificate(chainId=chainId, verifyingContract=verifyingContract, validFrom=validFrom, delegate=delegate, csPubKey=csPubKey, bootedAt=bootedAt, meta=meta) return obj + + def save(self, filename: str) -> int: + """ + Save certificate to file. File format (serialized as CBOR): + + [cert_hash: bytes, cert_eip712: Dict[str, Any], cert_signatures: List[bytes]] + + :param filename: + :return: + """ + cert_obj = [self.hash, self.marshal(binary=True), self.signatures or []] + with open(filename, 'wb') as f: + data = cbor2.dumps(cert_obj) + f.write(data) + return len(data) + + @staticmethod + def load(filename) -> 'EIP712DelegateCertificate': + if not os.path.isfile(filename): + raise RuntimeError('cannot create EIP712DelegateCertificate from filename "{}": not a file'.format(filename)) + with open(filename, 'rb') as f: + cert_hash, cert_eip712, cert_signatures = cbor2.loads(f.read()) + cert = EIP712DelegateCertificate.parse(cert_eip712, binary=True) + assert cert_hash == cert.hash + cert.signatures = cert_signatures + return cert diff --git a/autobahn/xbr/test/test_xbr_eip712.py b/autobahn/xbr/test/test_xbr_eip712.py index ef9c7a2a0..a2f131772 100644 --- a/autobahn/xbr/test/test_xbr_eip712.py +++ b/autobahn/xbr/test/test_xbr_eip712.py @@ -26,6 +26,7 @@ import os import sys +import tempfile from binascii import a2b_hex, b2a_hex from unittest import skipIf @@ -145,7 +146,8 @@ def setUp(self): # HELLO.Details.authextra.certificates # - self._certs_expected1 = [({'domain': {'name': 'WMP', 'version': '1'}, + self._certs_expected1 = [(None, + {'domain': {'name': 'WMP', 'version': '1'}, 'message': {'bootedAt': 1657781999086394759, 'chainId': 1, 'csPubKey': '12ae0184b180e9a9c5e45be4a1afbce3c6491320063701cd9c4011a777d04089', @@ -170,7 +172,8 @@ def setUp(self): 'EIP712Domain': [{'name': 'name', 'type': 'string'}, {'name': 'version', 'type': 'string'}]}}, '70726dda677cac8f21366f8023d17203b2f4f9099e954f9bebb2134086e2ac291d80ce038a1342a7748d4b0750f06b8de491561d581c90c99f1c09c91cfa7e191c'), - ({'domain': {'name': 'WMP', 'version': '1'}, + (None, + {'domain': {'name': 'WMP', 'version': '1'}, 'message': {'capabilities': 12, 'chainId': 1, 'issuer': '0xf766Dc789CF04CD18aE75af2c5fAf2DA6650Ff57', @@ -198,7 +201,8 @@ def setUp(self): 'EIP712Domain': [{'name': 'name', 'type': 'string'}, {'name': 'version', 'type': 'string'}]}}, 'f031b2625ae7e32e7eec3a8fa09f4db3a43217f282b7695e5b09dd2e13c25dc679c1f3ce27b94a3074786f7f12183a2a275a00aea5a66b83c431281f1069bd841c'), - ({'domain': {'name': 'WMP', 'version': '1'}, + (None, + {'domain': {'name': 'WMP', 'version': '1'}, 'message': {'capabilities': 63, 'chainId': 1, 'issuer': '0xf766Dc789CF04CD18aE75af2c5fAf2DA6650Ff57', @@ -310,7 +314,7 @@ def test_eip712_create_certificate_chain_manual(self): # create certificates chain # - certificates = [(cert1_data, cert1_sig), (cert2_data, cert2_sig), (cert3_data, cert3_sig)] + certificates = [(None, cert1_data, cert1_sig), (None, cert2_data, cert2_sig), (None, cert3_data, cert3_sig)] if False: from pprint import pprint @@ -327,7 +331,63 @@ def test_eip712_create_certificate_chain_manual(self): @inlineCallbacks def test_eip712_create_certificate_chain_highlevel(self): yield self._sm.open() - # FIXME + + # keys needed to create all certificates in certificate chain + ca_key: EthereumKey = self._sm[0] + + # data needed for root authority certificate: cert3 + ca_cert_chainId = 1 + ca_cert_verifyingContract = a2b_hex('0xf766Dc789CF04CD18aE75af2c5fAf2DA6650Ff57'[2:]) + ca_cert_validFrom = 666666 + ca_cert_issuer = ca_key.address(binary=True) + ca_cert_subject = ca_cert_issuer + ca_cert_realm = a2b_hex('0xA6e693CC4A2b4F1400391a728D26369D9b82ef96'[2:]) + ca_cert_capabilities = EIP712AuthorityCertificate.CAPABILITY_ROOT_CA | EIP712AuthorityCertificate.CAPABILITY_INTERMEDIATE_CA | EIP712AuthorityCertificate.CAPABILITY_PUBLIC_RELAY | EIP712AuthorityCertificate.CAPABILITY_PRIVATE_RELAY | EIP712AuthorityCertificate.CAPABILITY_PROVIDER | EIP712AuthorityCertificate.CAPABILITY_CONSUMER + ca_cert_meta = '' + + # create root authority certificate signature: directly from provided data attributes + ca_cert_data = create_eip712_authority_certificate(chainId=ca_cert_chainId, + verifyingContract=ca_cert_verifyingContract, + validFrom=ca_cert_validFrom, issuer=ca_cert_issuer, + subject=ca_cert_subject, realm=ca_cert_realm, + capabilities=ca_cert_capabilities, meta=ca_cert_meta) + ca_cert_sig = yield ca_key.sign_typed_data(ca_cert_data, binary=False) + + # create root authority certificate signature: from certificate object + ca_cert = EIP712AuthorityCertificate(chainId=ca_cert_chainId, + verifyingContract=ca_cert_verifyingContract, + validFrom=ca_cert_validFrom, + issuer=ca_cert_issuer, + subject=ca_cert_subject, + realm=ca_cert_realm, + capabilities=ca_cert_capabilities, + meta=ca_cert_meta) + ca_cert_sig2 = yield ca_cert.sign(ca_key) + + # re-create root authority certificate from round-tripping (marshal-parse) + ca_cert2 = EIP712AuthorityCertificate.parse(ca_cert.marshal()) + ca_cert_sig3 = yield ca_cert2.sign(ca_key) + + # all different ways to compute signature must result in same signature value + self.assertEqual(ca_cert_sig, ca_cert_sig2) + self.assertEqual(ca_cert_sig, ca_cert_sig3) + + # and match this signature value + self.assertEqual(ca_cert_sig, 'd9e679753e1120a8ba8edea4895d2e056ba98eaa1acbe11bf6210f3a48a56de830aa6a566cc4920' + 'c74a284ffcd9f7d1af5fe229268a44030522db19d5a75f4131c') + + # test save/load instance to/from file + with tempfile.NamedTemporaryFile() as fd: + # save certificate to file + ca_cert.save(fd.name) + + # load certificate from file + ca_cert3 = EIP712AuthorityCertificate.load(fd.name) + + # ensure it produces the same signature + ca_cert_sig4 = yield ca_cert3.sign(ca_key) + self.assertEqual(ca_cert_sig, ca_cert_sig4) + yield self._sm.close() @inlineCallbacks @@ -342,7 +402,7 @@ def test_eip712_verify_certificate_chain_manual(self): # parse the whole certificate chain cert_chain = [] cert_sigs = [] - for cert_data, cert_sig in self._certs_expected1: + for cert_hash, cert_data, cert_sig in self._certs_expected1: self.assertIn('domain', cert_data) self.assertIn('message', cert_data) self.assertIn('primaryType', cert_data) @@ -350,9 +410,9 @@ def test_eip712_verify_certificate_chain_manual(self): self.assertIn(cert_data['primaryType'], cert_data['types']) self.assertIn(cert_data['primaryType'], ['EIP712DelegateCertificate', 'EIP712AuthorityCertificate']) if cert_data['primaryType'] == 'EIP712DelegateCertificate': - cert = EIP712DelegateCertificate.parse(cert_data['message']) + cert = EIP712DelegateCertificate.parse(cert_data) elif cert_data['primaryType'] == 'EIP712AuthorityCertificate': - cert = EIP712AuthorityCertificate.parse(cert_data['message']) + cert = EIP712AuthorityCertificate.parse(cert_data) else: assert False, 'should not arrive here' cert_chain.append(cert)