|
36 | 36 | ProofMethod,
|
37 | 37 | StartInteractionMethod,
|
38 | 38 | )
|
39 |
| -from auth_server.models.jose import ECJWK, SupportedAlgorithms, SupportedHTTPMethods, SupportedJWSType |
| 39 | +from auth_server.models.jose import ECJWK, SupportedAlgorithms, SupportedHTTPMethods, SupportedJWSType, SupportedJWSTypeLegacy |
40 | 40 | from auth_server.models.status import Status
|
41 | 41 | from auth_server.saml2 import AuthnInfo, NameID, SAMLAttributes, SessionInfo
|
42 | 42 | from auth_server.testing import MongoTemporaryInstance
|
@@ -321,6 +321,41 @@ def test_transaction_jws(self):
|
321 | 321 | claims = self._get_access_token_claims(access_token=access_token, client=self.client)
|
322 | 322 | assert claims["auth_source"] == AuthSource.TEST
|
323 | 323 |
|
| 324 | + def test_transaction_jws_legacy_typ(self): |
| 325 | + client_key_dict = self.client_jwk.export_public(as_dict=True) |
| 326 | + client_jwk = ECJWK(**client_key_dict) |
| 327 | + req = GrantRequest( |
| 328 | + client=Client(key=Key(proof=Proof(method=ProofMethod.JWS), jwk=client_jwk)), |
| 329 | + access_token=[AccessTokenRequest(flags=[AccessTokenFlags.BEARER])], |
| 330 | + ) |
| 331 | + jws_header = { |
| 332 | + "typ": SupportedJWSTypeLegacy.JWS, |
| 333 | + "alg": SupportedAlgorithms.ES256.value, |
| 334 | + "kid": self.client_jwk.key_id, |
| 335 | + "htm": SupportedHTTPMethods.POST.value, |
| 336 | + "uri": "http://testserver/transaction", |
| 337 | + "created": int(utc_now().timestamp()), |
| 338 | + } |
| 339 | + _jws = jws.JWS(payload=req.json(exclude_unset=True)) |
| 340 | + _jws.add_signature( |
| 341 | + key=self.client_jwk, |
| 342 | + protected=json.dumps(jws_header), |
| 343 | + ) |
| 344 | + data = _jws.serialize(compact=True) |
| 345 | + |
| 346 | + client_header = {"Content-Type": "application/jose"} |
| 347 | + response = self.client.post("/transaction", content=data, headers=client_header) |
| 348 | + |
| 349 | + assert response.status_code == 200 |
| 350 | + assert "access_token" in response.json() |
| 351 | + access_token = response.json()["access_token"] |
| 352 | + assert AccessTokenFlags.BEARER.value in access_token["flags"] |
| 353 | + assert access_token["value"] is not None |
| 354 | + # Verify token and check claims |
| 355 | + claims = self._get_access_token_claims(access_token=access_token, client=self.client) |
| 356 | + assert claims["auth_source"] == AuthSource.TEST |
| 357 | + |
| 358 | + |
324 | 359 | def test_deserialize_bad_jws(self):
|
325 | 360 | client_header = {"Content-Type": "application/jose"}
|
326 | 361 | response = self.client.post("/transaction", content=b"bogus_jws", headers=client_header)
|
@@ -374,6 +409,53 @@ def test_transaction_jwsd(self):
|
374 | 409 | claims = self._get_access_token_claims(access_token=access_token, client=self.client)
|
375 | 410 | assert claims["auth_source"] == AuthSource.TEST
|
376 | 411 |
|
| 412 | + def test_transaction_jwsd_legacy_typ(self): |
| 413 | + client_key_dict = self.client_jwk.export_public(as_dict=True) |
| 414 | + client_jwk = ECJWK(**client_key_dict) |
| 415 | + req = GrantRequest( |
| 416 | + client=Client(key=Key(proof=Proof(method=ProofMethod.JWSD), jwk=client_jwk)), |
| 417 | + access_token=[AccessTokenRequest(flags=[AccessTokenFlags.BEARER])], |
| 418 | + ) |
| 419 | + jws_header = { |
| 420 | + "typ": SupportedJWSTypeLegacy.JWSD, |
| 421 | + "alg": SupportedAlgorithms.ES256.value, |
| 422 | + "kid": self.client_jwk.key_id, |
| 423 | + "htm": SupportedHTTPMethods.POST.value, |
| 424 | + "uri": "http://testserver/transaction", |
| 425 | + "created": int(utc_now().timestamp()), |
| 426 | + } |
| 427 | + |
| 428 | + payload = req.model_dump_json(exclude_unset=True) |
| 429 | + |
| 430 | + # create a hash of payload to send in payload place |
| 431 | + payload_digest = hash_with(SHA256(), payload.encode()) |
| 432 | + payload_hash = base64url_encode(payload_digest) |
| 433 | + |
| 434 | + # create detached jws |
| 435 | + _jws = jws.JWS(payload=payload) |
| 436 | + _jws.add_signature( |
| 437 | + key=self.client_jwk, |
| 438 | + protected=json.dumps(jws_header), |
| 439 | + ) |
| 440 | + data = _jws.serialize(compact=True) |
| 441 | + |
| 442 | + # Remove payload from serialized jws |
| 443 | + header, _, signature = data.split(".") |
| 444 | + client_header = {"Detached-JWS": f"{header}.{payload_hash}.{signature}"} |
| 445 | + |
| 446 | + response = self.client.post( |
| 447 | + "/transaction", content=req.model_dump_json(exclude_unset=True), headers=client_header |
| 448 | + ) |
| 449 | + |
| 450 | + assert response.status_code == 200 |
| 451 | + assert "access_token" in response.json() |
| 452 | + access_token = response.json()["access_token"] |
| 453 | + assert AccessTokenFlags.BEARER.value in access_token["flags"] |
| 454 | + assert access_token["value"] is not None |
| 455 | + # Verify token and check claims |
| 456 | + claims = self._get_access_token_claims(access_token=access_token, client=self.client) |
| 457 | + assert claims["auth_source"] == AuthSource.TEST |
| 458 | + |
377 | 459 | @mock.patch("aiohttp.ClientSession.get", new_callable=AsyncMock)
|
378 | 460 | def test_mdq_flow(self, mock_mdq):
|
379 | 461 | self.config["auth_flows"] = json.dumps(["TestFlow", "MDQFlow"])
|
|
0 commit comments