diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e0d00f4..8436bda 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -14,27 +14,16 @@ jobs: fail-fast: false matrix: python-version: - - "3.7" - - "3.8" - "3.9" - "3.10" - "3.11" django-version: - - "3.2" - - "4.1" - "4.2" + - "5.0" exclude: - # Django 3.2 is compatible with Python <= 3.10 - - python-version: "3.11" - django-version: "3.2" - - # Django 4.1 is compatible with Python >= 3.8 - - python-version: "3.7" - django-version: "4.1" - - # Django 4.2 is compatible with Python >= 3.8 - - python-version: "3.7" - django-version: "4.2" + # Django 5.0 is compatible with Python >= 3.10 + - python-version: "3.9" + django-version: "5.0" steps: - uses: actions/checkout@v3 diff --git a/docs/_templates/page.html b/docs/_templates/page.html index 8f6fb93..6447336 100644 --- a/docs/_templates/page.html +++ b/docs/_templates/page.html @@ -13,6 +13,12 @@
Hide table of contents sidebar
+ + {%- trans -%} + Skip to content + {%- endtrans -%} + + {% if theme_announcement -%}
{{ next.title }}
- + {%- endif %} {% if prev -%} - +
{{ _("Previous") }} @@ -153,6 +151,7 @@ {%- endif %}
+ {% if theme_footer_icons or READTHEDOCS -%}
{% if theme_footer_icons -%} {% for icon_dict in theme_footer_icons -%} @@ -179,6 +178,7 @@ {%- endif -%} {%- endif %}
+ {%- endif %}
{% endblock footer %} @@ -190,7 +190,7 @@
- {{ _("Contents") }} + {{ _("On this page") }}
diff --git a/docs/requirements.txt b/docs/requirements.txt index dfa1434..3e2d227 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,2 +1,2 @@ -furo==2022.04.07 -sphinx==4.5.0 +furo==2024.04.27 +sphinx==6.2.1 diff --git a/kagi/migrations/0002_remove_webauthnkey_ukey.py b/kagi/migrations/0002_remove_webauthnkey_ukey.py new file mode 100644 index 0000000..72ee037 --- /dev/null +++ b/kagi/migrations/0002_remove_webauthnkey_ukey.py @@ -0,0 +1,16 @@ +# Generated by Django 4.1.1 on 2022-09-23 10:10 + +from django.db import migrations + + +class Migration(migrations.Migration): + dependencies = [ + ("kagi", "0001_initial"), + ] + + operations = [ + migrations.RemoveField( + model_name="webauthnkey", + name="ukey", + ), + ] diff --git a/kagi/models.py b/kagi/models.py index 7d76a11..995dfe2 100644 --- a/kagi/models.py +++ b/kagi/models.py @@ -19,7 +19,6 @@ class WebAuthnKey(models.Model): key_name = models.CharField(max_length=64) public_key = models.TextField(unique=True) - ukey = models.TextField(unique=True) credential_id = models.TextField(unique=True) sign_count = models.IntegerField() diff --git a/kagi/settings.py b/kagi/settings.py index 0ca06f1..18096a5 100644 --- a/kagi/settings.py +++ b/kagi/settings.py @@ -7,7 +7,6 @@ RELYING_PARTY_ID = getattr(settings, "RELYING_PARTY_ID", "localhost") RELYING_PARTY_NAME = getattr(settings, "RELYING_PARTY_NAME", "Kagi Test Project") -WEBAUTHN_ICON_URL = getattr(settings, "WEBAUTHN_ICON_URL", None) WEBAUTHN_TRUSTED_CERTIFICATES = getattr( settings, "WEBAUTHN_TRUSTED_CERTIFICATES", diff --git a/kagi/static/kagi/webauthn.js b/kagi/static/kagi/webauthn.js index 514d2e8..a0003a1 100644 --- a/kagi/static/kagi/webauthn.js +++ b/kagi/static/kagi/webauthn.js @@ -1,329 +1,283 @@ -function b64enc(buf) { - return base64js.fromByteArray(buf) - .replace(/\+/g, "-") - .replace(/\//g, "_") - .replace(/=/g, ""); -} - -function b64RawEnc(buf) { - return base64js.fromByteArray(buf) - .replace(/\+/g, "-") - .replace(/\//g, "_"); -} - -function hexEncode(buf) { - return Array.from(buf) - .map(function(x) { - return ("0" + x.toString(16)).substr(-2); - }) - .join(""); -} - -async function fetch_json(url, options) { - const response = await fetch(url, options); - const body = await response.json(); - if (body.fail) - throw body.fail; - return body; -} - -if (typeof window.Kagi === 'undefined') { - let Kagi = window.Kagi || { - begin_activate: '/kagi/api/begin-activate/', - begin_assertion: '/kagi/api/begin-assertion/', - verify_credential_info: '/kagi/api/verify-credential-info/', - verify_assertion: '/kagi/api/verify-assertion/', - keys_list: '/kagi/keys/', - }; - console.error("window.Kagi is not defined, falling back to default URLs", Kagi); +/* Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Origin: https://github.com/pypi/warehouse + */ -} +const populateWebAuthnErrorList = (errors) => { + const errorList = document.getElementById("webauthn-errors"); + if (errorList === null) { + return; + } + /* NOTE: We only set the alert role once we actually have errors to present, + * to avoid hijacking screenreaders unnecessarily. + */ + errorList.setAttribute("role", "alert"); -/** - * REGISTRATION FUNCTIONS - */ + errors.forEach((error) => { + const errorItem = document.createElement("li"); + errorItem.appendChild(document.createTextNode(error)); + errorList.appendChild(errorItem); + }); +}; -/** - * Callback after the registration form is submitted. - * @param {Event} e - */ -const didClickRegister = async (e) => { - e.preventDefault(); - - // gather the data in the form - const form = document.querySelector('#register-form'); - const formData = new FormData(form); - - // post the data to the server to generate the PublicKeyCredentialCreateOptions - let credentialCreateOptionsFromServer; - try { - credentialCreateOptionsFromServer = await getCredentialCreateOptionsFromServer(formData); - } catch (err) { - return console.error("Failed to generate credential request options:", credentialCreateOptionsFromServer) - } +const doWebAuthn = (formId, func) => { + if (!window.PublicKeyCredential) { + return; + } - // convert certain members of the PublicKeyCredentialCreateOptions into - // byte arrays as expected by the spec. - const publicKeyCredentialCreateOptions = transformCredentialCreateOptions(credentialCreateOptionsFromServer); - - // request the authenticator(s) to create a new credential keypair. - let credential; - try { - credential = await navigator.credentials.create({ - publicKey: publicKeyCredentialCreateOptions - }); - } catch (err) { - return console.error("Error creating credential:", err); - } + const webAuthnForm = document.getElementById(formId); + if (webAuthnForm === null) { + return null; + } - // we now have a new credential! We now need to encode the byte arrays - // in the credential into strings, for posting to our server. - const newAssertionForServer = transformNewAssertionForServer(credential); - - // post the transformed credential data to the server for validation - // and storing the public key - let assertionValidationResponse; - try { - assertionValidationResponse = await postNewAssertionToServer(newAssertionForServer); - } catch (err) { - return console.error("Server validation of credential failed:", err); - } + const webAuthnButton = webAuthnForm.querySelector("button[type=submit]"); + webAuthnButton.disabled = false; - // reload the page after a successful result - window.location.href = Kagi.keys_list; -} + webAuthnForm.addEventListener("submit", async () => { + func(webAuthnButton.value); + event.preventDefault(); + }); +}; -/** - * Get PublicKeyCredentialRequestOptions for this user from the server - * formData of the registration form - * @param {FormData} formData - */ -const getCredentialRequestOptionsFromServer = async (formData) => { - return await fetch_json( - Kagi.begin_assertion, - { - method: "POST", - body: formData - } - ); -} +const webAuthnBtoA = (encoded) => { + return btoa(encoded) + .replace(/\+/g, "-") + .replace(/\//g, "_") + .replace(/=/g, ""); +}; + +const webAuthnBase64Normalize = (encoded) => { + return encoded.replace(/_/g, "/").replace(/-/g, "+"); +}; -const transformCredentialRequestOptions = (credentialRequestOptionsFromServer) => { - let {challenge, allowCredentials} = credentialRequestOptionsFromServer; - challenge = Uint8Array.from(atob(challenge), c => c.charCodeAt(0)); +const transformAssertionOptions = (assertionOptions) => { + let { challenge, allowCredentials } = assertionOptions; - allowCredentials = allowCredentials.map(credentialDescriptor => { - let {id} = credentialDescriptor; - id = id.replace(/\_/g, "/").replace(/\-/g, "+"); - id = Uint8Array.from(atob(id), c => c.charCodeAt(0)); - return Object.assign({}, credentialDescriptor, {id}); + challenge = Uint8Array.from(challenge, (c) => c.charCodeAt(0)); + allowCredentials = allowCredentials.map((credentialDescriptor) => { + let { id } = credentialDescriptor; + id = webAuthnBase64Normalize(id); + id = Uint8Array.from(atob(id), (c) => c.charCodeAt(0)); + return Object.assign({}, credentialDescriptor, { id }); }); - const transformedCredentialRequestOptions = Object.assign( - {}, - credentialRequestOptionsFromServer, - {challenge, allowCredentials}); + const transformedOptions = Object.assign({}, assertionOptions, { + challenge, + allowCredentials, + }); - return transformedCredentialRequestOptions; + return transformedOptions; }; +const transformAssertion = (assertion) => { + const authData = new Uint8Array(assertion.response.authenticatorData); + const clientDataJSON = new Uint8Array(assertion.response.clientDataJSON); + const rawId = new Uint8Array(assertion.rawId); + const sig = new Uint8Array(assertion.response.signature); + const assertionClientExtensions = assertion.getClientExtensionResults(); + + return { + id: assertion.id, + rawId: webAuthnBtoA(String.fromCharCode(...rawId)), + response: { + authenticatorData: webAuthnBtoA(String.fromCharCode(...authData)), + clientDataJSON: webAuthnBtoA(String.fromCharCode(...clientDataJSON)), + signature: webAuthnBtoA(String.fromCharCode(...sig)), + }, + type: assertion.type, + assertionClientExtensions: JSON.stringify(assertionClientExtensions), + }; +}; -/** - * Get PublicKeyCredentialRequestOptions for this user from the server - * formData of the registration form - * @param {FormData} formData - */ -const getCredentialCreateOptionsFromServer = async (formData) => { - return await fetch_json( - Kagi.begin_activate, - { - method: "POST", - body: formData - } - ); -} +const transformCredentialOptions = (credentialOptions) => { + let { challenge, user } = credentialOptions; + user.id = Uint8Array.from(credentialOptions.user.id, (c) => c.charCodeAt(0)); + challenge = Uint8Array.from(credentialOptions.challenge, (c) => + c.charCodeAt(0) + ); -/** - * Transforms items in the credentialCreateOptions generated on the server - * into byte arrays expected by the navigator.credentials.create() call - * @param {Object} credentialCreateOptionsFromServer - */ -const transformCredentialCreateOptions = (credentialCreateOptionsFromServer) => { - let {challenge, user} = credentialCreateOptionsFromServer; - user.id = Uint8Array.from( - atob(credentialCreateOptionsFromServer.user.id), c => c.charCodeAt(0)); + const transformedOptions = Object.assign({}, credentialOptions, { + challenge, + user, + }); - challenge = Uint8Array.from( - atob(credentialCreateOptionsFromServer.challenge), c => c.charCodeAt(0)); + return transformedOptions; +}; - const transformedCredentialCreateOptions = Object.assign( - {}, credentialCreateOptionsFromServer, - {challenge, user}); +const transformCredential = (credential) => { + const attObj = new Uint8Array(credential.response.attestationObject); + const clientDataJSON = new Uint8Array(credential.response.clientDataJSON); + const rawId = new Uint8Array(credential.rawId); + const registrationClientExtensions = credential.getClientExtensionResults(); + + return { + id: credential.id, + rawId: webAuthnBtoA(String.fromCharCode(...rawId)), + type: credential.type, + response: { + attestationObject: webAuthnBtoA(String.fromCharCode(...attObj)), + clientDataJSON: webAuthnBtoA(String.fromCharCode(...clientDataJSON)), + }, + registrationClientExtensions: JSON.stringify(registrationClientExtensions), + }; +}; - return transformedCredentialCreateOptions; -} +const postCredential = async (keyName, credential, token) => { + const formData = new FormData(); + formData.set("key_name", keyName); + formData.set("credentials", JSON.stringify(credential)); + formData.set("csrf_token", token); + + const resp = await fetch(Kagi.verify_credential_info, { + method: "POST", + cache: "no-cache", + body: formData, + credentials: "same-origin", + }); + return await resp.json(); +}; +const postAssertion = async (assertion, token) => { + const formData = new FormData(); + formData.set("credentials", JSON.stringify(assertion)); + formData.set("csrf_token", token); -/** - * AUTHENTICATION FUNCTIONS - */ + const resp = await fetch(Kagi.verify_assertion + window.location.search, { + method: "POST", + cache: "no-cache", + body: formData, + credentials: "same-origin", + }); + return await resp.json(); +}; -/** - * Callback executed after submitting login form - * @param {Event} e - */ -const didClickLogin = async (e) => { - console.log("Login clicked"); - document.getElementById("webauthn-error").innerHTML = ""; - e.preventDefault(); - // gather the data in the form - const form = document.querySelector('#login-form'); - const formData = new FormData(form); - - // post the login data to the server to retrieve the PublicKeyCredentialRequestOptions - let credentialCreateOptionsFromServer; - try { - credentialRequestOptionsFromServer = await getCredentialRequestOptionsFromServer(formData); - } catch (err) { - return console.error("Error when getting request options from server:", err); +const GuardWebAuthn = () => { + if (!window.PublicKeyCredential) { + let webauthn_button = document.getElementById("webauthn-button"); + if (webauthn_button) { + webauthn_button.className += " button--disabled"; } - // convert certain members of the PublicKeyCredentialRequestOptions into - // byte arrays as expected by the spec. - const transformedCredentialRequestOptions = transformCredentialRequestOptions( - credentialRequestOptionsFromServer); - - // request the authenticator to create an assertion signature using the - // credential private key - let assertion; - try { - assertion = await navigator.credentials.get({ - publicKey: transformedCredentialRequestOptions, - }); - } catch (err) { - document.getElementById("webauthn-error").innerHTML = "Connection failed during credential creation."; - return console.error("Error when creating credential:", err); - } - // we now have an authentication assertion! encode the byte arrays contained - // in the assertion data as strings for posting to the server - const transformedAssertionForServer = transformAssertionForServer(assertion); - - // post the assertion to the server for verification. - let response; - try { - response = await postAssertionToServer(transformedAssertionForServer); - } catch (err) { - document.getElementById("webauthn-error").innerHTML = "Error when validating assertion on server."; - return console.error("Error when validating assertion on server:", err); + let webauthn_error = document.getElementById("webauthn-browser-support"); + if (webauthn_error) { + webauthn_error.style.display = "block"; } - window.location.href = response["redirect_to"]; + let webauthn_label = document.getElementById("webauthn-provision-label"); + if (webauthn_label) { + webauthn_label.disabled = true; + } + } }; -/** - * Transforms the binary data in the credential into base64 strings - * for posting to the server. - * @param {PublicKeyCredential} newAssertion - */ -const transformNewAssertionForServer = (newAssertion) => { - const attObj = new Uint8Array( - newAssertion.response.attestationObject); - const clientDataJSON = new Uint8Array( - newAssertion.response.clientDataJSON); - const rawId = new Uint8Array( - newAssertion.rawId); - - const registrationClientExtensions = newAssertion.getClientExtensionResults(); - - return { - id: newAssertion.id, - rawId: b64enc(rawId), - type: newAssertion.type, - attObj: b64enc(attObj), - clientData: b64enc(clientDataJSON), - registrationClientExtensions: JSON.stringify(registrationClientExtensions) - }; -} - -/** - * Posts the new credential data to the server for validation and storage. - * @param {Object} credentialDataForServer - */ -const postNewAssertionToServer = async (credentialDataForServer) => { - const formData = new FormData(); - Object.entries(credentialDataForServer).forEach(([key, value]) => { - formData.set(key, value); - }); +const ProvisionWebAuthn = () => { + doWebAuthn("webauthn-provision-form", async (csrfToken) => { + const label = document.getElementById("id_key_name").value; - return await fetch_json( - Kagi.verify_credential_info, { - method: "POST", - body: formData + const resp = await fetch(Kagi.begin_activate, { + cache: "no-cache", + credentials: "same-origin", }); -} -/** - * Encodes the binary data in the assertion into strings for posting to the server. - * @param {PublicKeyCredential} newAssertion - */ -const transformAssertionForServer = (newAssertion) => { - const authData = new Uint8Array(newAssertion.response.authenticatorData); - const clientDataJSON = new Uint8Array(newAssertion.response.clientDataJSON); - const rawId = new Uint8Array(newAssertion.rawId); - const sig = new Uint8Array(newAssertion.response.signature); - const assertionClientExtensions = newAssertion.getClientExtensionResults(); - - return { - id: newAssertion.id, - rawId: b64enc(rawId), - type: newAssertion.type, - authData: b64RawEnc(authData), - clientData: b64RawEnc(clientDataJSON), - signature: hexEncode(sig), - assertionClientExtensions: JSON.stringify(assertionClientExtensions) - }; + const credentialOptions = await resp.json(); + const transformedOptions = transformCredentialOptions(credentialOptions); + await navigator.credentials + .create({ + publicKey: transformedOptions, + }) + .then(async (credential) => { + const transformedCredential = transformCredential(credential); + + const status = await postCredential( + label, + transformedCredential, + csrfToken + ); + if (status.fail) { + populateWebAuthnErrorList(status.fail.errors); + return; + } + + window.location.replace(Kagi.keys_list); + }) + .catch((error) => { + console.log(error); + populateWebAuthnErrorList([error.message]); + return; + }); + }); }; -/** - * Post the assertion to the server for validation and logging the user in. - * @param {Object} assertionDataForServer - */ -const postAssertionToServer = async (assertionDataForServer) => { - const form = document.querySelector('#login-form'); - const formData = new FormData(form); - Object.entries(assertionDataForServer).forEach(([key, value]) => { - formData.set(key, value); +const AuthenticateWebAuthn = () => { + doWebAuthn("webauthn-auth-form", async (csrfToken) => { + const resp = await fetch(Kagi.begin_assertion + window.location.search, { + cache: "no-cache", + credentials: "same-origin", }); - return await fetch_json( - Kagi.verify_assertion, { - method: "POST", - body: formData - }); -} + const assertionOptions = await resp.json(); + if (assertionOptions.fail) { + window.location.replace("/account/"); + return; + } + const transformedOptions = transformAssertionOptions(assertionOptions); + await navigator.credentials + .get({ + publicKey: transformedOptions, + }) + .then(async (assertion) => { + const transformedAssertion = transformAssertion(assertion); + + const status = await postAssertion(transformedAssertion, csrfToken); + if (status.fail) { + populateWebAuthnErrorList(status.fail.errors); + return; + } -document.addEventListener("DOMContentLoaded", e => { - const registerElement = document.querySelector('#register'); + window.location.replace(status.redirect_to); + }) + .catch((error) => { + populateWebAuthnErrorList([error.message]); + return; + }); + }); +}; + +document.addEventListener("DOMContentLoaded", (e) => { + const registerElement = document.querySelector("#webauthn-provision-form"); if (registerElement) { - registerElement.addEventListener('click', didClickRegister); + ProvisionWebAuthn(); } - const loginElement = document.querySelector('#login'); + const loginElement = document.querySelector("#webauthn-auth-form"); if (loginElement) { - loginElement.addEventListener('click', didClickLogin); + AuthenticateWebAuthn(); } // If browser doesn't support WebAuthn, hide related elements and show warning - if (typeof(PublicKeyCredential) == "undefined") { + if (typeof PublicKeyCredential == "undefined") { var webAuthnFeature = document.getElementById("webauthn-feature"); if (webAuthnFeature) { webAuthnFeature.style.display = "none"; } - var webAuthnUndefinedError = document.getElementById("webauthn-undefined-error"); + var webAuthnUndefinedError = document.getElementById( + "webauthn-undefined-error" + ); if (webAuthnUndefinedError) { - webAuthnUndefinedError.style.display = "block"; } + webAuthnUndefinedError.style.display = "block"; } + } }); diff --git a/kagi/templates/kagi/add_key.html b/kagi/templates/kagi/add_key.html index ac1900f..3e8cc89 100644 --- a/kagi/templates/kagi/add_key.html +++ b/kagi/templates/kagi/add_key.html @@ -6,10 +6,10 @@ {{ block.super }}

{% trans 'To add a security key to your account, insert it, tap the button below, and accept the browser prompt to add the key.' %}

-
+ {% csrf_token %} {{ form }} - +
diff --git a/kagi/templates/kagi/verify_second_factor.html b/kagi/templates/kagi/verify_second_factor.html index 38ea971..d9d5c36 100644 --- a/kagi/templates/kagi/verify_second_factor.html +++ b/kagi/templates/kagi/verify_second_factor.html @@ -11,11 +11,11 @@ {% if forms.webauthn %} -
+ {% csrf_token %}
-
diff --git a/kagi/tests/test_util.py b/kagi/tests/test_util.py index 25bbf12..b9bfcc6 100644 --- a/kagi/tests/test_util.py +++ b/kagi/tests/test_util.py @@ -1,4 +1,4 @@ -from ..util import get_origin +from ..utils import get_origin def test_get_origin(rf): diff --git a/kagi/tests/test_webauthn.py b/kagi/tests/test_webauthn.py new file mode 100644 index 0000000..739ef6a --- /dev/null +++ b/kagi/tests/test_webauthn.py @@ -0,0 +1,195 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Origin: https://github.com/pypi/warehouse + +import pretend +import pytest +import webauthn as pywebauthn +from webauthn.authentication.verify_authentication_response import ( + VerifiedAuthentication, +) +from webauthn.helpers import base64url_to_bytes, bytes_to_base64url +from webauthn.helpers.structs import ( + AttestationFormat, + AuthenticationCredential, + AuthenticatorAssertionResponse, + AuthenticatorAttestationResponse, + PublicKeyCredentialType, + RegistrationCredential, +) +from webauthn.registration.verify_registration_response import VerifiedRegistration + +import kagi.utils.webauthn as webauthn + + +def test_generate_webauthn_challenge(): + challenge = webauthn.generate_webauthn_challenge() + + assert isinstance(challenge, bytes) + assert challenge == base64url_to_bytes(bytes_to_base64url(challenge)) + + +def test_verify_registration_response(monkeypatch): + fake_verified_registration = VerifiedRegistration( + credential_id=b"foo", + credential_public_key=b"bar", + sign_count=0, + aaguid="wutang", + fmt=AttestationFormat.NONE, + credential_type=PublicKeyCredentialType.PUBLIC_KEY, + user_verified=False, + attestation_object=b"foobar", + credential_device_type="single_device", + credential_backed_up=False, + ) + mock_verify_registration_response = pretend.call_recorder( + lambda *a, **kw: fake_verified_registration + ) + monkeypatch.setattr( + pywebauthn, "verify_registration_response", mock_verify_registration_response + ) + + resp = webauthn.verify_registration_response( + ( + '{"id": "foo", "rawId": "foo", "response": ' + '{"attestationObject": "foo", "clientDataJSON": "bar"}}' + ), + b"not_a_real_challenge", + rp_id="fake_rp_id", + origin="fake_origin", + ) + + assert mock_verify_registration_response.calls == [ + pretend.call( + credential=RegistrationCredential( + id="foo", + raw_id=b"~\x8a", + response=AuthenticatorAttestationResponse( + client_data_json=b"m\xaa", attestation_object=b"~\x8a" + ), + transports=None, + type=PublicKeyCredentialType.PUBLIC_KEY, + ), + expected_challenge=bytes_to_base64url(b"not_a_real_challenge").encode(), + expected_rp_id="fake_rp_id", + expected_origin="fake_origin", + require_user_verification=False, + ) + ] + assert resp == fake_verified_registration + + +def test_verify_registration_response_failure(monkeypatch): + monkeypatch.setattr( + pywebauthn, + "verify_registration_response", + pretend.raiser(pywebauthn.helpers.exceptions.InvalidRegistrationResponse), + ) + + with pytest.raises(webauthn.RegistrationRejectedError): + webauthn.verify_registration_response( + ( + '{"id": "foo", "rawId": "foo", "response": ' + '{"attestationObject": "foo", "clientDataJSON": "bar"}}' + ), + b"not_a_real_challenge", + rp_id="fake_rp_id", + origin="fake_origin", + ) + + +def test_verify_assertion_response(monkeypatch): + fake_verified_authentication = VerifiedAuthentication( + credential_id=b"a credential id", + new_sign_count=69, + credential_device_type="single_device", + credential_backed_up=False, + ) + mock_verify_authentication_response = pretend.call_recorder( + lambda *a, **kw: fake_verified_authentication + ) + monkeypatch.setattr( + pywebauthn, + "verify_authentication_response", + mock_verify_authentication_response, + ) + + not_a_real_user = pretend.stub( + webauthn_keys=pretend.stub( + all=lambda: [ + pretend.stub( + public_key=bytes_to_base64url(b"fake public key"), sign_count=68 + ) + ] + ) + ) + resp = webauthn.verify_assertion_response( + ( + '{"id": "foo", "rawId": "foo", "response": ' + '{"authenticatorData": "foo", "clientDataJSON": "bar", ' + '"signature": "wutang"}}' + ), + challenge=b"not_a_real_challenge", + user=not_a_real_user, + origin="fake_origin", + rp_id="fake_rp_id", + ) + + assert mock_verify_authentication_response.calls == [ + pretend.call( + credential=AuthenticationCredential( + id="foo", + raw_id=b"~\x8a", + response=AuthenticatorAssertionResponse( + client_data_json=b"m\xaa", + authenticator_data=b"~\x8a", + signature=b"\xc2\xebZ\x9e", + user_handle=None, + ), + type=PublicKeyCredentialType.PUBLIC_KEY, + ), + expected_challenge=b"bm90X2FfcmVhbF9jaGFsbGVuZ2U", + expected_rp_id="fake_rp_id", + expected_origin="fake_origin", + credential_public_key=b"fake public key", + credential_current_sign_count=68, + require_user_verification=False, + ) + ] + assert resp == fake_verified_authentication + + +def test_verify_assertion_response_failure(monkeypatch): + monkeypatch.setattr( + pywebauthn, + "verify_authentication_response", + pretend.raiser(pywebauthn.helpers.exceptions.InvalidAuthenticationResponse), + ) + + get_webauthn_users = pretend.call_recorder( + lambda *a, **kw: [(b"not a public key", 0)] + ) + monkeypatch.setattr(webauthn, "_get_webauthn_user_public_keys", get_webauthn_users) + + with pytest.raises(webauthn.AuthenticationRejectedError): + webauthn.verify_assertion_response( + ( + '{"id": "foo", "rawId": "foo", "response": ' + '{"authenticatorData": "foo", "clientDataJSON": "bar", ' + '"signature": "wutang"}}' + ), + challenge=b"not_a_real_challenge", + user=pretend.stub(), + origin="fake_origin", + rp_id="fake_rp_id", + ) diff --git a/kagi/tests/test_webauthn_keys.py b/kagi/tests/test_webauthn_keys.py index fdb67a7..99e638f 100644 --- a/kagi/tests/test_webauthn_keys.py +++ b/kagi/tests/test_webauthn_keys.py @@ -1,9 +1,23 @@ +import json from unittest import mock from django.contrib.auth.models import User from django.urls import reverse import pytest +from webauthn.authentication.verify_authentication_response import ( + VerifiedAuthentication, +) +from webauthn.helpers import bytes_to_base64url +from webauthn.helpers.structs import ( + AttestationFormat, + AuthenticationCredential, + AuthenticatorAssertionResponse, + PublicKeyCredentialType, +) +from webauthn.registration.verify_registration_response import VerifiedRegistration + +from kagi.utils import webauthn from .. import settings from ..forms import KeyRegistrationForm @@ -45,11 +59,8 @@ def test_totp_device_deletion_works(admin_client): # Testing view begin activate def test_begin_activate_return_user_credential_options(admin_client): - ukey = "Q3sM6zbLYAssRO7g5BM7" - with mock.patch("kagi.views.api.util.generate_ukey", return_value=ukey): - response = admin_client.post( - reverse("kagi:begin-activate"), {"key_name": "SoloKey"} - ) + response = admin_client.get(reverse("kagi:begin-activate")) + assert response.status_code == 200 credential_options = response.json() assert "challenge" in credential_options @@ -58,62 +69,41 @@ def test_begin_activate_return_user_credential_options(admin_client): "id": settings.RELYING_PARTY_ID, } assert credential_options["user"] == { - "id": ukey, + "id": bytes_to_base64url(b"1"), "name": "admin", - "displayName": "", - "icon": settings.WEBAUTHN_ICON_URL, + "displayName": "admin", } assert "pubKeyCredParams" in credential_options - assert credential_options["extensions"] == {"webauthn.loc": True} - - -def test_begin_activate_fails_if_key_name_is_missing(admin_client): - response = admin_client.post(reverse("kagi:begin-activate"), {"key_name": ""}) - assert response.status_code == 400 - assert response.json() == {"errors": {"key_name": ["This field is required."]}} # Testing view verify credential info def test_webauthn_verify_credential_info(admin_client): # Setup the session - response = admin_client.post( - reverse("kagi:begin-activate"), {"key_name": "SoloKey"} - ) - credential_options = response.json() - challenge = credential_options["challenge"] + response = admin_client.get(reverse("kagi:begin-activate")) - trusted_attestation_cert_required = ( - settings.WEBAUTHN_TRUSTED_ATTESTATION_CERT_REQUIRED + fake_validated_credential = VerifiedRegistration( + credential_id=b"foo", + credential_public_key=b"bar", + sign_count=0, + aaguid="wutang", + fmt=AttestationFormat.NONE, + credential_type=PublicKeyCredentialType.PUBLIC_KEY, + user_verified=False, + attestation_object=b"foobar", + credential_device_type="single_device", + credential_backed_up=False, ) - self_attestation_permitted = settings.WEBAUTHN_SELF_ATTESTATION_PERMITTED - none_attestation_permitted = settings.WEBAUTHN_NONE_ATTESTATION_PERMITTED - - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - webauthn_registration_response = ( - mocked_webauthn.WebAuthnRegistrationResponse.return_value - ) - verify = webauthn_registration_response.verify.return_value - verify.public_key.decode.return_value = "public-key" - verify.credential_id.decode.return_value = "credential-id" - verify.sign_count = 0 - + with mock.patch( + "kagi.views.api.webauthn.verify_registration_response", + return_value=fake_validated_credential, + ) as mocked_verify_registration_response: response = admin_client.post( - reverse("kagi:verify-credential-info"), {"registration": "payload"} + reverse("kagi:verify-credential-info"), + {"credentials": "fake_payload", "key_name": "SoloKey"}, ) - mocked_webauthn.WebAuthnRegistrationResponse.assert_called_with( - settings.RELYING_PARTY_ID, - "http://testserver", - {"registration": ["payload"]}, - challenge, - settings.WEBAUTHN_TRUSTED_CERTIFICATES, - trusted_attestation_cert_required, - self_attestation_permitted, - none_attestation_permitted, - uv_required=False, # User validation - ) - webauthn_registration_response.verify.assert_called_once() + assert mocked_verify_registration_response.called_once assert response.status_code == 200 assert response.json() == {"success": "User successfully registered."} @@ -121,19 +111,18 @@ def test_webauthn_verify_credential_info(admin_client): def test_webauthn_verify_credential_info_fails_if_registration_is_invalid(admin_client): # Setup the session - response = admin_client.post( - reverse("kagi:begin-activate"), {"key_name": "SoloKey"} - ) + response = admin_client.get(reverse("kagi:begin-activate")) - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - webauthn_registration_response = ( - mocked_webauthn.WebAuthnRegistrationResponse.return_value + with mock.patch( + "kagi.views.api.webauthn.verify_registration_response" + ) as mocked_verify_registration_response: + mocked_verify_registration_response.side_effect = ( + webauthn.RegistrationRejectedError("An error occurred") ) - verify = webauthn_registration_response.verify - verify.side_effect = ValueError("An error occurred") response = admin_client.post( - reverse("kagi:verify-credential-info"), {"registration": "payload"} + reverse("kagi:verify-credential-info"), + {"credentials": "payload", "key_name": "SoloKey"}, ) assert response.status_code == 400 @@ -144,31 +133,53 @@ def test_webauthn_verify_credential_info_fails_if_credential_id_already_exists( admin_client, ): # Setup the session - response = admin_client.post( - reverse("kagi:begin-activate"), {"key_name": "SoloKey"} - ) + response = admin_client.get(reverse("kagi:begin-activate")) # Create the WebAuthnKey user = User.objects.get(pk=1) user.webauthn_keys.create( - key_name="SoloKey", sign_count=0, credential_id="credential-id" + key_name="SoloKey", sign_count=0, credential_id=bytes_to_base64url(b"foo") ) - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - webauthn_registration_response = ( - mocked_webauthn.WebAuthnRegistrationResponse.return_value - ) - verify = webauthn_registration_response.verify.return_value - verify.credential_id.decode.return_value = "credential-id" - + fake_validated_credential = VerifiedRegistration( + credential_id=b"foo", + credential_public_key=b"bar", + sign_count=0, + aaguid="wutang", + fmt=AttestationFormat.NONE, + credential_type=PublicKeyCredentialType.PUBLIC_KEY, + user_verified=False, + attestation_object=b"foobar", + credential_device_type="single_device", + credential_backed_up=False, + ) + with mock.patch( + "kagi.views.api.webauthn.verify_registration_response", + return_value=fake_validated_credential, + ): response = admin_client.post( - reverse("kagi:verify-credential-info"), {"registration": "payload"} + reverse("kagi:verify-credential-info"), + {"credentials": "fake_payload", "key_name": "Solo key"}, ) assert response.status_code == 400 assert response.json() == {"fail": "Credential ID already exists."} +def test_webauthn_verify_credential_info_fails_if_key_name_is_missing( + admin_client, +): + # Setup the session + response = admin_client.get(reverse("kagi:begin-activate")) + + response = admin_client.post( + reverse("kagi:verify-credential-info"), {"credentials": "fake_payload"} + ) + + assert response.status_code == 400 + assert response.json() == {"errors": {"key_name": ["This field is required."]}} + + # Testing view begin assertion @pytest.mark.django_db def test_begin_assertion_return_user_credential_options(client): @@ -177,54 +188,48 @@ def test_begin_assertion_return_user_credential_options(client): user.webauthn_keys.create( key_name="SoloKey 1", sign_count=0, - credential_id="credential-id-1", - ukey="abcd", - public_key="pubkey1", + credential_id=bytes_to_base64url(b"credential-id-1"), + public_key=bytes_to_base64url(b"pubkey1"), ) user.webauthn_keys.create( key_name="SoloKey 2", sign_count=0, - credential_id="credential-id-2", - ukey="efgh", - public_key="pubkey2", + credential_id=bytes_to_base64url(b"credential-id-2"), + public_key=bytes_to_base64url(b"pubkey2"), ) - ukey = "Q3sM6zbLYAssRO7g5BM7" - challenge = "k31d65xGDFb0VUq4MEMXmWpuWkzPs889" - - with mock.patch("kagi.views.api.util.generate_ukey", return_value=ukey): - with mock.patch( - "kagi.views.api.util.generate_challenge", return_value=challenge - ): - # We authenticate with username/password - response = client.post( - reverse("kagi:login"), {"username": "admin", "password": "admin"} - ) + challenge = b"k31d65xGDFb0VUq4MEMXmWpuWkzPs889" + + assertion_dict = { + "challenge": bytes_to_base64url(challenge), + "timeout": 60000, + "rpId": "localhost", + "allowCredentials": [ + { + "id": bytes_to_base64url(b"credential-id-1"), + "type": "public-key", + "transports": ["usb", "nfc", "ble", "internal"], + }, + { + "id": bytes_to_base64url(b"credential-id-2"), + "type": "public-key", + "transports": ["usb", "nfc", "ble", "internal"], + }, + ], + "userVerification": "discouraged", + } + + # We authenticate with username/password + response = client.post( + reverse("kagi:login"), {"username": "admin", "password": "admin"} + ) assert response.status_code == 302 assert response.url == reverse("kagi:verify-second-factor") - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - assertion_dict = { - "challenge": "tOOk7MPjGWlezrP6o6tGOXSH0ZesUREO", - "allowCredentials": [ - { - "type": "public-key", - "id": "ePqP9Mi...512GSYg", - "transports": ["usb", "nfc", "ble", "internal"], - }, - { - "type": "public-key", - "id": "qhibXokRKbPA...O1WW7nF", - "transports": ["usb", "nfc", "ble", "internal"], - }, - ], - "rpId": "localhost", - "timeout": 60000, - } - mocked_webauthn.WebAuthnAssertionOptions.return_value.assertion_dict = ( - assertion_dict - ) - response = client.post(reverse("kagi:begin-assertion")) + with mock.patch( + "kagi.views.api.webauthn.generate_webauthn_challenge", return_value=challenge + ): + response = client.get(reverse("kagi:begin-assertion")) assert response.status_code == 200 assert response.json() == assertion_dict @@ -238,9 +243,8 @@ def test_verify_assertion_validates_the_user_webauthn_key(client): user.webauthn_keys.create( key_name="SoloKey", sign_count=0, - credential_id="credential-id", - ukey="abcd", - public_key="pubkey", + credential_id=bytes_to_base64url(b"credential-id"), + public_key=bytes_to_base64url(b"pubkey"), ) response = client.post( reverse("kagi:login"), {"username": "admin", "password": "admin"} @@ -248,43 +252,31 @@ def test_verify_assertion_validates_the_user_webauthn_key(client): assert response.status_code == 302 assert response.url == reverse("kagi:verify-second-factor") - # We authenticate with username/password - challenge = "k31d65xGDFb0VUq4MEMXmWpuWkzPs889" - - with mock.patch("kagi.views.api.util.generate_challenge", return_value=challenge): - response = client.post(reverse("kagi:begin-assertion")) - - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - webauthn_assertion_response = ( - mocked_webauthn.WebAuthnAssertionResponse.return_value - ) - verify = webauthn_assertion_response.verify - verify.return_value = 1 + response = client.get(reverse("kagi:verify-second-factor")) + assert response.status_code == 200 + # We authenticate with username/password + challenge = b"k31d65xGDFb0VUq4MEMXmWpuWkzPs889" + + with mock.patch( + "kagi.views.api.webauthn.generate_webauthn_challenge", return_value=challenge + ): + response = client.get(reverse("kagi:begin-assertion")) + + fake_verified_authentication = VerifiedAuthentication( + credential_id=b"credential-id", + new_sign_count=69, + credential_device_type="single_device", + credential_backed_up=False, + ) + with mock.patch( + "kagi.views.api.webauthn.verify_assertion_response", + return_value=fake_verified_authentication, + ): response = client.post( reverse("kagi:verify-assertion"), - {"id": "credential-id", "assertion": "payload"}, + {"credentials": json.dumps({"fake": "payload"})}, ) - mocked_webauthn.WebAuthnUser.assert_called_with( - "abcd", - "admin", - "", - settings.WEBAUTHN_ICON_URL, - "credential-id", - "pubkey", - 0, - settings.RELYING_PARTY_ID, - ) - - webauthn_user = mocked_webauthn.WebAuthnUser.return_value - webauthn_assertion_response = mocked_webauthn.WebAuthnAssertionResponse - webauthn_assertion_response.assert_called_with( - webauthn_user, - {"id": ["credential-id"], "assertion": ["payload"]}, - challenge, - "http://testserver", - uv_required=False, - ) assert response.status_code == 200 assert response.json() == { @@ -292,49 +284,6 @@ def test_verify_assertion_validates_the_user_webauthn_key(client): "redirect_to": reverse("kagi:two-factor-settings"), } - # Are we truly logged in? - response = client.get(reverse("kagi:two-factor-settings")) - assert response.status_code == 200 - - -# Testing view verify assertion -@pytest.mark.django_db -def test_verify_assertion_fails_if_missing_user_webauthn_key(client): - # We need to create a couple of WebAuthnKey for our user. - user = User.objects.create_user("admin", "john.doe@kagi.com", "admin") - user.webauthn_keys.create( - key_name="SoloKey", - sign_count=0, - credential_id="wrong-id", - ukey="abcd", - public_key="pubkey", - ) - response = client.post( - reverse("kagi:login"), {"username": "admin", "password": "admin"} - ) - assert response.status_code == 302 - assert response.url == reverse("kagi:verify-second-factor") - - # We authenticate with username/password - challenge = "k31d65xGDFb0VUq4MEMXmWpuWkzPs889" - - with mock.patch("kagi.views.api.util.generate_challenge", return_value=challenge): - response = client.post(reverse("kagi:begin-assertion")) - - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - webauthn_assertion_response = ( - mocked_webauthn.WebAuthnAssertionResponse.return_value - ) - verify = webauthn_assertion_response.verify - verify.return_value = 1 - - response = client.post( - reverse("kagi:verify-assertion"), - {"id": "credential-id", "assertion": "payload"}, - ) - assert response.status_code == 400 - assert response.json() == {"fail": "Key does not exist."} - @pytest.mark.django_db def test_verify_assertion_validates_the_assertion(client): @@ -343,9 +292,8 @@ def test_verify_assertion_validates_the_assertion(client): user.webauthn_keys.create( key_name="SoloKey", sign_count=0, - credential_id="credential-id", - ukey="abcd", - public_key="pubkey", + credential_id=bytes_to_base64url(b"credential-id"), + public_key=bytes_to_base64url(b"pubkey"), ) response = client.post( reverse("kagi:login"), {"username": "admin", "password": "admin"} @@ -354,25 +302,33 @@ def test_verify_assertion_validates_the_assertion(client): assert response.url == reverse("kagi:verify-second-factor") # We authenticate with username/password - challenge = "k31d65xGDFb0VUq4MEMXmWpuWkzPs889" - - response = client.get(reverse("kagi:verify-second-factor")) - assert response.status_code == 200 - - with mock.patch("kagi.views.api.util.generate_challenge", return_value=challenge): - response = client.post(reverse("kagi:begin-assertion")) - - with mock.patch("kagi.views.api.webauthn") as mocked_webauthn: - webauthn_assertion_response = ( - mocked_webauthn.WebAuthnAssertionResponse.return_value - ) - verify = webauthn_assertion_response.verify - verify.side_effect = ValueError("An error occurred") - + challenge = b"k31d65xGDFb0VUq4MEMXmWpuWkzPs889" + + with mock.patch( + "kagi.views.api.webauthn.generate_webauthn_challenge", return_value=challenge + ): + response = client.get(reverse("kagi:begin-assertion")) + + with mock.patch( + "kagi.views.api.webauthn.AuthenticationCredential.parse_raw", + return_value=AuthenticationCredential( + id="foo", + raw_id=b"~\x8a", + response=AuthenticatorAssertionResponse( + client_data_json=b'{"type": "webauthn.get", "challenge": "", "origin": "localhost"}', + authenticator_data=b"~\x8a", + signature=b"\xc2\xebZ\x9e", + user_handle=None, + ), + type=PublicKeyCredentialType.PUBLIC_KEY, + ), + ): response = client.post( reverse("kagi:verify-assertion"), - {"id": "credential-id", "assertion": "payload"}, + {"credentials": json.dumps({"fake": "payload"})}, ) assert response.status_code == 400 - assert response.json() == {"fail": "Assertion failed. Error: An error occurred"} + assert response.json() == { + "fail": "Assertion failed. Error: Invalid WebAuthn credential" + } diff --git a/kagi/util.py b/kagi/util.py deleted file mode 100644 index b9da18d..0000000 --- a/kagi/util.py +++ /dev/null @@ -1,47 +0,0 @@ -import random -import string - -from django.conf import settings -from django.contrib.auth import load_backend - - -def generate_challenge(challenge_len): - return "".join( - [ - random.SystemRandom().choice(string.ascii_letters + string.digits) - for i in range(challenge_len) - ] - ) - - -def generate_ukey(): - """Its value's ID member is required, and contains an identifier - for the account, specified by the Relying Party. This is not meant - to be displayed to the user, but is used by the Relying Party to - control the number of credentials -- an authenticator will never - contain more than one credential for a given Relying Party under - the same ID. - - A unique identifier for the entity. For a relying party entity, - sets the RP ID. For a user account entity, this will be an - arbitrary string specified by the relying party. - """ - return generate_challenge(20) - - -def get_origin(request): - return f"{request.scheme}://{request.get_host()}" - - -def get_user(request): - try: - user_id = request.session["kagi_pre_verify_user_pk"] - backend_path = request.session["kagi_pre_verify_user_backend"] - assert backend_path in settings.AUTHENTICATION_BACKENDS - backend = load_backend(backend_path) - user = backend.get_user(user_id) - if user is not None: - user.backend = backend_path - return user - except (KeyError, AssertionError): # pragma: no cover - return None diff --git a/kagi/utils/__init__.py b/kagi/utils/__init__.py new file mode 100644 index 0000000..6874a1c --- /dev/null +++ b/kagi/utils/__init__.py @@ -0,0 +1,20 @@ +from django.conf import settings +from django.contrib.auth import load_backend + + +def get_origin(request): + return f"{request.scheme}://{request.get_host()}" + + +def get_user(request): + try: + user_id = request.session["kagi_pre_verify_user_pk"] + backend_path = request.session["kagi_pre_verify_user_backend"] + assert backend_path in settings.AUTHENTICATION_BACKENDS + backend = load_backend(backend_path) + user = backend.get_user(user_id) + if user is not None: + user.backend = backend_path + return user + except (KeyError, AssertionError): # pragma: no cover + return None diff --git a/kagi/utils/webauthn.py b/kagi/utils/webauthn.py new file mode 100644 index 0000000..ba2fee7 --- /dev/null +++ b/kagi/utils/webauthn.py @@ -0,0 +1,182 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Origin: https://github.com/pypi/warehouse + +import base64 +import json + +import webauthn as pywebauthn +from webauthn.helpers import base64url_to_bytes, generate_challenge +from webauthn.helpers.exceptions import ( + InvalidAuthenticationResponse, + InvalidRegistrationResponse, +) +from webauthn.helpers.options_to_json import options_to_json +from webauthn.helpers.structs import ( + AttestationConveyancePreference, + AuthenticationCredential, + AuthenticatorSelectionCriteria, + AuthenticatorTransport, + PublicKeyCredentialDescriptor, + RegistrationCredential, + UserVerificationRequirement, +) + + +class AuthenticationRejectedError(Exception): + pass + + +class RegistrationRejectedError(Exception): + pass + + +def _get_webauthn_user_public_key_credential_descriptors(user, *, rp_id): + """ + Returns a webauthn.WebAuthnUser instance corresponding + to the given user model, with properties suitable for + usage within the webauthn API. + """ + return [ + PublicKeyCredentialDescriptor( + id=base64url_to_bytes(credential.credential_id), + transports=[ + AuthenticatorTransport.USB, + AuthenticatorTransport.NFC, + AuthenticatorTransport.BLE, + AuthenticatorTransport.INTERNAL, + ], + ) + for credential in user.webauthn_keys.all() + ] + + +def _get_webauthn_user_public_keys(user, *, rp_id): + return [ + ( + base64url_to_bytes(credential.public_key), + credential.sign_count, + ) + for credential in user.webauthn_keys.all() + ] + + +def _webauthn_b64encode(source): + return base64.urlsafe_b64encode(source).rstrip(b"=") + + +def generate_webauthn_challenge(): + """ + Returns a random challenge suitable for use within + Webauthn's credential and configuration option objects. + + See: https://w3c.github.io/webauthn/#cryptographic-challenges + """ + return generate_challenge() + + +def get_credential_options(user, *, challenge, rp_name, rp_id): + """ + Returns a dictionary of options for credential creation + on the client side. + """ + _authenticator_selection = AuthenticatorSelectionCriteria() + _authenticator_selection.user_verification = UserVerificationRequirement.DISCOURAGED + options = pywebauthn.generate_registration_options( + rp_id=rp_id, + rp_name=rp_name, + user_id=str(user.id), + user_name=user.get_username(), + user_display_name=user.get_full_name(), + challenge=challenge, + attestation=AttestationConveyancePreference.NONE, + authenticator_selection=_authenticator_selection, + ) + return json.loads(options_to_json(options)) + + +def get_assertion_options(user, *, challenge, rp_id): + """ + Returns a dictionary of options for assertion retrieval + on the client side. + """ + options = pywebauthn.generate_authentication_options( + rp_id=rp_id, + challenge=challenge, + allow_credentials=_get_webauthn_user_public_key_credential_descriptors( + user, rp_id=rp_id + ), + user_verification=UserVerificationRequirement.DISCOURAGED, + ) + return json.loads(options_to_json(options)) + + +def verify_registration_response(response, challenge, *, rp_id, origin): + """ + Validates the challenge and attestation information + sent from the client during device registration. + + Returns a WebAuthnCredential on success. + Raises RegistrationRejectedError on failire. + """ + # NOTE: We re-encode the challenge below, because our + # response's clientData.challenge is encoded twice: + # first for the entire clientData payload, and then again + # for the individual challenge. + encoded_challenge = _webauthn_b64encode(challenge) + try: + _credential = RegistrationCredential.parse_raw(response) + return pywebauthn.verify_registration_response( + credential=_credential, + expected_challenge=encoded_challenge, + expected_rp_id=rp_id, + expected_origin=origin, + require_user_verification=False, + ) + except InvalidRegistrationResponse as e: + raise RegistrationRejectedError(str(e)) + + +def verify_assertion_response(assertion, *, challenge, user, origin, rp_id): + """ + Validates the challenge and assertion information + sent from the client during authentication. + + Returns an updated signage count on success. + Raises AuthenticationRejectedError on failure. + """ + # NOTE: We re-encode the challenge below, because our + # response's clientData.challenge is encoded twice: + # first for the entire clientData payload, and then again + # for the individual challenge. + encoded_challenge = _webauthn_b64encode(challenge) + webauthn_user_public_keys = _get_webauthn_user_public_keys(user, rp_id=rp_id) + + for public_key, current_sign_count in webauthn_user_public_keys: + try: + _credential = AuthenticationCredential.parse_raw(assertion) + return pywebauthn.verify_authentication_response( + credential=_credential, + expected_challenge=encoded_challenge, + expected_rp_id=rp_id, + expected_origin=origin, + credential_public_key=public_key, + credential_current_sign_count=current_sign_count, + require_user_verification=False, + ) + except InvalidAuthenticationResponse: + pass + + # If we exit the loop, then we've failed to verify the assertion against + # any of the user's WebAuthn credentials. Fail. + raise AuthenticationRejectedError("Invalid WebAuthn credential") diff --git a/kagi/views/api.py b/kagi/views/api.py index 4f5d67a..8c8de83 100644 --- a/kagi/views/api.py +++ b/kagi/views/api.py @@ -8,76 +8,53 @@ from django.views.decorators.csrf import csrf_exempt from django.views.decorators.http import require_http_methods -import webauthn +from webauthn.helpers import base64url_to_bytes, bytes_to_base64url -from .. import settings, util +from .. import settings, utils from ..forms import KeyRegistrationForm from ..models import WebAuthnKey +from ..utils import webauthn # Registration @login_required -@require_http_methods(["POST"]) +@require_http_methods(["GET"]) def webauthn_begin_activate(request): - form = KeyRegistrationForm(request.POST) - - if not form.is_valid(): - return JsonResponse({"errors": form.errors}, status=400) - - username = request.user.get_username() - display_name = request.user.get_full_name() + challenge = webauthn.generate_webauthn_challenge() - challenge = util.generate_challenge(32) - ukey = util.generate_ukey() + request.session["challenge"] = bytes_to_base64url(challenge) - request.session["key_name"] = form.cleaned_data["key_name"] - request.session["challenge"] = challenge - request.session["register_ukey"] = ukey - - make_credential_options = webauthn.WebAuthnMakeCredentialOptions( - challenge, - settings.RELYING_PARTY_NAME, - settings.RELYING_PARTY_ID, - ukey, - username, - display_name, - settings.WEBAUTHN_ICON_URL, + credential_options = webauthn.get_credential_options( + request.user, + challenge=challenge, + rp_name=settings.RELYING_PARTY_NAME, + rp_id=settings.RELYING_PARTY_ID, ) - return JsonResponse(make_credential_options.registration_dict) + return JsonResponse(credential_options) @login_required @csrf_exempt @require_http_methods(["POST"]) def webauthn_verify_credential_info(request): - challenge = request.session["challenge"] - ukey = request.session["register_ukey"] + challenge = base64url_to_bytes(request.session["challenge"]) + credentials = request.POST["credentials"] - registration_response = request.POST - trust_anchor_dir = settings.WEBAUTHN_TRUSTED_CERTIFICATES - trusted_attestation_cert_required = ( - settings.WEBAUTHN_TRUSTED_ATTESTATION_CERT_REQUIRED - ) - self_attestation_permitted = settings.WEBAUTHN_SELF_ATTESTATION_PERMITTED - none_attestation_permitted = settings.WEBAUTHN_NONE_ATTESTATION_PERMITTED - - webauthn_registration_response = webauthn.WebAuthnRegistrationResponse( - settings.RELYING_PARTY_ID, - util.get_origin(request), - registration_response, - challenge, - trust_anchor_dir, - trusted_attestation_cert_required, - self_attestation_permitted, - none_attestation_permitted, - uv_required=False, # User validation - ) + form = KeyRegistrationForm(request.POST) + + if not form.is_valid(): + return JsonResponse({"errors": form.errors}, status=400) try: - webauthn_credential = webauthn_registration_response.verify() - except Exception as e: + webauthn_registration_response = webauthn.verify_registration_response( + credentials, + rp_id=settings.RELYING_PARTY_ID, + origin=utils.get_origin(request), + challenge=challenge, + ) + except webauthn.RegistrationRejectedError as e: return JsonResponse({"fail": f"Registration failed. Error: {e}"}, status=400) # W3C spec. Step 17. @@ -88,23 +65,23 @@ def webauthn_verify_credential_info(request): # ceremony, or it MAY decide to accept the registration, e.g. while deleting # the older registration. credential_id_exists = WebAuthnKey.objects.filter( - credential_id=webauthn_credential.credential_id.decode("utf-8") + credential_id=bytes_to_base64url(webauthn_registration_response.credential_id) ).first() if credential_id_exists: return JsonResponse({"fail": "Credential ID already exists."}, status=400) WebAuthnKey.objects.create( user=request.user, - key_name=request.session.get("key_name", ""), - ukey=ukey, - public_key=webauthn_credential.public_key.decode("utf-8"), - credential_id=webauthn_credential.credential_id.decode("utf-8"), - sign_count=webauthn_credential.sign_count, + key_name=form.cleaned_data["key_name"], + public_key=bytes_to_base64url( + webauthn_registration_response.credential_public_key + ), + credential_id=bytes_to_base64url(webauthn_registration_response.credential_id), + sign_count=webauthn_registration_response.sign_count, ) try: del request.session["challenge"] - del request.session["register_ukey"] del request.session["key_name"] except KeyError: # pragma: no cover pass @@ -113,82 +90,43 @@ def webauthn_verify_credential_info(request): # Login -@require_http_methods(["POST"]) +@require_http_methods(["GET"]) def webauthn_begin_assertion(request): - challenge = util.generate_challenge(32) - request.session["challenge"] = challenge - - user = util.get_user(request) - - username = user.get_username() - display_name = user.get_full_name() - - keys = WebAuthnKey.objects.filter(user=user) - - webauthn_users = [] - for key in keys: - webauthn_users.append( - webauthn.WebAuthnUser( - key.ukey, - username, - display_name, - settings.WEBAUTHN_ICON_URL, - key.credential_id, - key.public_key, - key.sign_count, - settings.RELYING_PARTY_ID, - ) - ) + challenge = webauthn.generate_webauthn_challenge() + request.session["challenge"] = bytes_to_base64url(challenge) - webauthn_assertion_options = webauthn.WebAuthnAssertionOptions( - webauthn_users, challenge + user = utils.get_user(request) + + webauthn_assertion_options = webauthn.get_assertion_options( + user, challenge=challenge, rp_id=settings.RELYING_PARTY_ID ) - return JsonResponse(webauthn_assertion_options.assertion_dict) + return JsonResponse(webauthn_assertion_options) @csrf_exempt @require_http_methods(["POST"]) def webauthn_verify_assertion(request): - challenge = request.session.get("challenge") - assertion_response = request.POST - credential_id = assertion_response.get("id") - - user = util.get_user(request) - - username = user.get_username() - display_name = user.get_full_name() - - key = WebAuthnKey.objects.filter(credential_id=credential_id, user=user).first() - if not key: - return JsonResponse({"fail": "Key does not exist."}, status=400) - - webauthn_user = webauthn.WebAuthnUser( - key.ukey, - username, - display_name, - settings.WEBAUTHN_ICON_URL, - key.credential_id, - key.public_key, - key.sign_count, - settings.RELYING_PARTY_ID, - ) + challenge = base64url_to_bytes(request.session.get("challenge")) - webauthn_assertion_response = webauthn.WebAuthnAssertionResponse( - webauthn_user, - assertion_response, - challenge, - util.get_origin(request), - uv_required=False, # User Verification - ) + user = utils.get_user(request) try: - sign_count = webauthn_assertion_response.verify() - except Exception as e: + webauthn_assertion_response = webauthn.verify_assertion_response( + request.POST["credentials"], + challenge=challenge, + user=user, + origin=utils.get_origin(request), + rp_id=settings.RELYING_PARTY_ID, + ) + except webauthn.AuthenticationRejectedError as e: return JsonResponse({"fail": f"Assertion failed. Error: {e}"}, status=400) # Update counter. - key.sign_count = sign_count + key = user.webauthn_keys.get( + credential_id=bytes_to_base64url(webauthn_assertion_response.credential_id) + ) + key.sign_count = webauthn_assertion_response.new_sign_count key.last_used_at = now() key.save() @@ -211,7 +149,7 @@ def webauthn_verify_assertion(request): return JsonResponse( { - "success": f"Successfully authenticated as {username}", + "success": f"Successfully authenticated as {user.get_username()}", "redirect_to": redirect_to, } ) diff --git a/pyproject.toml b/pyproject.toml index b9dc921..41f35c2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,10 @@ name = "kagi" version = "0.4.0" description = "Django app for WebAuthn and TOTP-based multi-factor authentication" -authors = ["Justin Mayer ", "Rémy Hubscher "] +authors = [ + "Justin Mayer ", + "Rémy Hubscher ", +] license = "BSD-2-Clause" readme = "README.rst" keywords = ["Django", "WebAuthn", "authentication", "MFA", "2FA"] @@ -14,9 +17,8 @@ classifiers = [ "Development Status :: 4 - Beta", "Environment :: Web Environment", "Framework :: Django", - "Framework :: Django :: 3.2", - "Framework :: Django :: 4.1", "Framework :: Django :: 4.2", + "Framework :: Django :: 5.0", "Intended Audience :: Developers", "License :: OSI Approved :: BSD License", "Operating System :: OS Independent", @@ -31,26 +33,27 @@ classifiers = [ [tool.poetry.dependencies] Django = ">= 2.2" -python = ">= 3.7, < 4.0" +python = ">= 3.8, < 4.0" qrcode = ">= 6.1, < 8.0" -webauthn = "^0.4" +webauthn = "^1.6.0" -[tool.poetry.dev-dependencies] +[tool.poetry.group.dev.dependencies] black = "^23.3" flake8 = "^5.0" Flake8-pyproject = "^1.2.3" -furo = "2022.04.07" +furo = "2024.04.27" invoke = "^2.0" isort = "^5.11" livereload = "^2.6" -psutil = {version = "^5.7", optional = true} +pretend = "^1.0.9" +psutil = { version = "^5.7", optional = true } pyOpenSSL = "^23.0" pytest = "^7.1" pytest-cov = "^3.0" pytest-django = "^4.0" pytest-sugar = "^0.9" pytest-xdist = "^2.1" -sphinx = "^4.0" +sphinx = "^6.0" Werkzeug = "^2.0" [tool.autopub] @@ -80,7 +83,10 @@ requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" [tool.pytest.ini_options] -filterwarnings = ['ignore::DeprecationWarning:invoke.loader', 'ignore::DeprecationWarning:invoke.tasks'] +filterwarnings = [ + 'ignore::DeprecationWarning:invoke.loader', + 'ignore::DeprecationWarning:invoke.tasks', +] pythonpath = 'testproj' DJANGO_SETTINGS_MODULE = 'testproj.settings' diff --git a/testproj/testproj/settings.py b/testproj/testproj/settings.py index 3152b9a..3af3fe7 100644 --- a/testproj/testproj/settings.py +++ b/testproj/testproj/settings.py @@ -124,7 +124,6 @@ RELYING_PARTY_ID = "localhost" RELYING_PARTY_NAME = "Kagi Test Project" -WEBAUTHN_ICON_URL = "https://via.placeholder.com/150" # WEBAUTHN_TRUSTED_CERTIFICATES = os.path.join( # BASE_DIR, "..", "trusted_attestation_roots"