Source code for fido2.server

# Copyright (c) 2018 Yubico AB
# All rights reserved.
#
#   Redistribution and use in source and binary forms, with or
#   without modification, are permitted provided that the following
#   conditions are met:
#
#    1. Redistributions of source code must retain the above copyright
#       notice, this list of conditions and the following disclaimer.
#    2. Redistributions in binary form must reproduce the above
#       copyright notice, this list of conditions and the following
#       disclaimer in the documentation and/or other materials provided
#       with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.

from __future__ import annotations

import logging
import os
from typing import Any, Callable, Mapping, Sequence

from cryptography.exceptions import InvalidSignature as _InvalidSignature
from cryptography.hazmat.primitives import constant_time

from .cose import CoseKey
from .rpid import verify_rp_id
from .utils import websafe_decode, websafe_encode
from .webauthn import (
    AttestationConveyancePreference,
    AttestationObject,
    AttestedCredentialData,
    AuthenticationResponse,
    AuthenticatorAttachment,
    AuthenticatorData,
    AuthenticatorSelectionCriteria,
    CollectedClientData,
    CredentialCreationOptions,
    CredentialRequestOptions,
    PublicKeyCredentialCreationOptions,
    PublicKeyCredentialDescriptor,
    PublicKeyCredentialParameters,
    PublicKeyCredentialRequestOptions,
    PublicKeyCredentialRpEntity,
    PublicKeyCredentialType,
    PublicKeyCredentialUserEntity,
    RegistrationResponse,
    ResidentKeyRequirement,
    UserVerificationRequirement,
)

logger = logging.getLogger(__name__)


VerifyAttestation = Callable[[AttestationObject, bytes], None]
VerifyOrigin = Callable[[str], bool]


def _verify_origin_for_rp(rp_id: str) -> VerifyOrigin:
    return lambda o: verify_rp_id(rp_id, o)


def _validata_challenge(challenge: bytes | None) -> bytes:
    if challenge is None:
        challenge = os.urandom(32)
    else:
        if not isinstance(challenge, bytes):
            raise TypeError("Custom challenge must be of type 'bytes'.")
        if len(challenge) < 16:
            raise ValueError("Custom challenge length must be >= 16.")
    return challenge


[docs] def to_descriptor( credential: AttestedCredentialData, transports=None ) -> PublicKeyCredentialDescriptor: """Converts an AttestedCredentialData to a PublicKeyCredentialDescriptor. :param credential: AttestedCredentialData containing the credential ID to use. :param transports: Optional list of AuthenticatorTransport strings to add to the descriptor. :return: A descriptor of the credential, for use with register_begin or authenticate_begin. :rtype: PublicKeyCredentialDescriptor """ return PublicKeyCredentialDescriptor( type=PublicKeyCredentialType.PUBLIC_KEY, id=credential.credential_id, transports=transports, )
def _wrap_credentials( creds: Sequence[AttestedCredentialData | PublicKeyCredentialDescriptor] | None, ) -> Sequence[PublicKeyCredentialDescriptor] | None: if creds is None: return None return [ ( to_descriptor(c) if isinstance(c, AttestedCredentialData) else PublicKeyCredentialDescriptor.from_dict(c) ) for c in creds ] def _ignore_attestation( attestation_object: AttestationObject, client_data_hash: bytes ) -> None: """Ignore attestation."""
[docs] class Fido2Server: """FIDO2 server. :param rp: Relying party data as `PublicKeyCredentialRpEntity` instance. :param attestation: (optional) Requirement on authenticator attestation. :param verify_origin: (optional) Alternative function to validate an origin. :param verify_attestation: (optional) function to validate attestation, which is invoked with attestation_object and client_data_hash. It should return nothing and raise an exception on failure. By default, attestation is ignored. Attestation is also ignored if `attestation` is set to `none`. """ def __init__( self, rp: PublicKeyCredentialRpEntity, attestation: AttestationConveyancePreference | None = None, verify_origin: VerifyOrigin | None = None, verify_attestation: VerifyAttestation | None = None, ): self.rp = PublicKeyCredentialRpEntity.from_dict(rp) assert self.rp.id is not None # nosec self._verify = verify_origin or _verify_origin_for_rp(self.rp.id) self.timeout = None self.attestation = AttestationConveyancePreference(attestation) self.allowed_algorithms = [ PublicKeyCredentialParameters( type=PublicKeyCredentialType.PUBLIC_KEY, alg=alg ) for alg in CoseKey.supported_algorithms() ] self._verify_attestation = verify_attestation or _ignore_attestation logger.debug(f"Fido2Server initialized for RP: {self.rp}")
[docs] def register_begin( self, user: PublicKeyCredentialUserEntity, credentials: ( Sequence[AttestedCredentialData | PublicKeyCredentialDescriptor] | None ) = None, resident_key_requirement: ResidentKeyRequirement | None = None, user_verification: UserVerificationRequirement | None = None, authenticator_attachment: AuthenticatorAttachment | None = None, challenge: bytes | None = None, extensions=None, ) -> tuple[CredentialCreationOptions, Any]: """Return a PublicKeyCredentialCreationOptions registration object and the internal state dictionary that needs to be passed as is to the corresponding `register_complete` call. :param user: The dict containing the user data. :param credentials: The list of previously registered credentials, these can be of type AttestedCredentialData, or PublicKeyCredentialDescriptor. :param resident_key_requirement: The desired RESIDENT_KEY_REQUIREMENT level. :param user_verification: The desired USER_VERIFICATION level. :param authenticator_attachment: The desired AUTHENTICATOR_ATTACHMENT or None to not provide a preference (and get both types). :param challenge: A custom challenge to sign and verify or None to use OS-specific random bytes. :return: Registration data, internal state.""" if not self.allowed_algorithms: raise ValueError("Server has no allowed algorithms.") challenge = _validata_challenge(challenge) descriptors = _wrap_credentials(credentials) state = self._make_internal_state(challenge, user_verification) logger.debug( "Starting new registration, existing credentials: " + ", ".join(d.id.hex() for d in descriptors or []) ) return ( CredentialCreationOptions( public_key=PublicKeyCredentialCreationOptions( rp=self.rp, user=PublicKeyCredentialUserEntity.from_dict(user), challenge=challenge, pub_key_cred_params=self.allowed_algorithms, timeout=self.timeout, exclude_credentials=descriptors, authenticator_selection=( AuthenticatorSelectionCriteria( authenticator_attachment=authenticator_attachment, resident_key=resident_key_requirement, user_verification=user_verification, ) if any( ( authenticator_attachment, resident_key_requirement, user_verification, ) ) else None ), attestation=self.attestation, extensions=extensions, ) ), state, )
[docs] def register_complete( self, state, response: RegistrationResponse | Mapping[str, Any], ) -> AuthenticatorData: """Verify the correctness of the registration data received from the client. :param state: The state data returned by the corresponding `register_begin`. :param response: The registration response from the client. :return: The authenticator data """ registration = RegistrationResponse.from_dict(response) client_data = registration.response.client_data attestation_object = registration.response.attestation_object if client_data.type != CollectedClientData.TYPE.CREATE: raise ValueError("Incorrect type in CollectedClientData.") if not self._verify(client_data.origin): raise ValueError("Invalid origin in CollectedClientData.") if not constant_time.bytes_eq( websafe_decode(state["challenge"]), client_data.challenge ): raise ValueError("Wrong challenge in response.") if not constant_time.bytes_eq( self.rp.id_hash or b"", attestation_object.auth_data.rp_id_hash ): raise ValueError("Wrong RP ID hash in response.") if not attestation_object.auth_data.is_user_present(): raise ValueError("User Present flag not set.") if ( state["user_verification"] == UserVerificationRequirement.REQUIRED and not attestation_object.auth_data.is_user_verified() ): raise ValueError( "User verification required, but User Verified flag not set." ) if self.attestation not in (None, AttestationConveyancePreference.NONE): logger.debug(f"Verifying attestation of type {attestation_object.fmt}") self._verify_attestation(attestation_object, client_data.hash) # We simply ignore attestation if self.attestation == 'none', as not all # clients strip the attestation. auth_data = attestation_object.auth_data assert auth_data.credential_data is not None # nosec logger.info( "New credential registered: " + auth_data.credential_data.credential_id.hex() ) return auth_data
[docs] def authenticate_begin( self, credentials: ( Sequence[AttestedCredentialData | PublicKeyCredentialDescriptor] | None ) = None, user_verification: UserVerificationRequirement | None = None, challenge: bytes | None = None, extensions=None, ) -> tuple[CredentialRequestOptions, Any]: """Return a PublicKeyCredentialRequestOptions assertion object and the internal state dictionary that needs to be passed as is to the corresponding `authenticate_complete` call. :param credentials: The list of previously registered credentials, these can be of type AttestedCredentialData, or PublicKeyCredentialDescriptor. :param user_verification: The desired USER_VERIFICATION level. :param challenge: A custom challenge to sign and verify or None to use OS-specific random bytes. :return: Assertion data, internal state.""" challenge = _validata_challenge(challenge) descriptors = _wrap_credentials(credentials) state = self._make_internal_state(challenge, user_verification) if descriptors is None: logger.debug("Starting new authentication without credentials") else: logger.debug( "Starting new authentication, for credentials: " + ", ".join(d.id.hex() for d in descriptors) ) return ( CredentialRequestOptions( public_key=PublicKeyCredentialRequestOptions( challenge=challenge, timeout=self.timeout, rp_id=self.rp.id, allow_credentials=descriptors, user_verification=user_verification, extensions=extensions, ) ), state, )
[docs] def authenticate_complete( self, state, credentials: Sequence[AttestedCredentialData], response: AuthenticationResponse | Mapping[str, Any], ) -> AttestedCredentialData: """Verify the correctness of the assertion data received from the client. :param state: The state data returned by the corresponding `register_begin`. :param credentials: The list of previously registered credentials. :param credential_id: The credential id from the client response. :param client_data: The client data. :param auth_data: The authenticator data. :param signature: The signature provided by the client.""" authentication = AuthenticationResponse.from_dict(response) credential_id = authentication.raw_id client_data = authentication.response.client_data auth_data = authentication.response.authenticator_data signature = authentication.response.signature if client_data.type != CollectedClientData.TYPE.GET: raise ValueError("Incorrect type in CollectedClientData.") if not self._verify(client_data.origin): raise ValueError("Invalid origin in CollectedClientData.") if websafe_decode(state["challenge"]) != client_data.challenge: raise ValueError("Wrong challenge in response.") if not constant_time.bytes_eq(self.rp.id_hash or b"", auth_data.rp_id_hash): raise ValueError("Wrong RP ID hash in response.") if not auth_data.is_user_present(): raise ValueError("User Present flag not set.") if ( state["user_verification"] == UserVerificationRequirement.REQUIRED and not auth_data.is_user_verified() ): raise ValueError( "User verification required, but user verified flag not set." ) for cred in credentials: if cred.credential_id == credential_id: try: cred.public_key.verify(auth_data + client_data.hash, signature) except _InvalidSignature: raise ValueError("Invalid signature.") logger.info(f"Credential authenticated: {credential_id.hex()}") return cred raise ValueError("Unknown credential ID.")
@staticmethod def _make_internal_state( challenge: bytes, user_verification: UserVerificationRequirement | None ): return { "challenge": websafe_encode(challenge), "user_verification": user_verification, }