# Copyright (c) 2023 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from .core import (
int2bytes,
bytes2int,
require_version,
Version,
Tlv,
InvalidPinError,
)
from .core.smartcard import (
AID,
SmartCardConnection,
SmartCardProtocol,
ApduError,
SW,
ScpKeyParams,
)
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.asymmetric import ec
from functools import total_ordering
from enum import IntEnum, unique
from dataclasses import dataclass
from typing import Optional, List, Union, Tuple, NamedTuple
import struct
import logging
logger = logging.getLogger(__name__)
# TLV tags for credential data
TAG_LABEL = 0x71
TAG_LABEL_LIST = 0x72
TAG_CREDENTIAL_PASSWORD = 0x73
TAG_ALGORITHM = 0x74
TAG_KEY_ENC = 0x75
TAG_KEY_MAC = 0x76
TAG_CONTEXT = 0x77
TAG_RESPONSE = 0x78
TAG_VERSION = 0x79
TAG_TOUCH = 0x7A
TAG_MANAGEMENT_KEY = 0x7B
TAG_PUBLIC_KEY = 0x7C
TAG_PRIVATE_KEY = 0x7D
# Instruction bytes for commands
INS_PUT = 0x01
INS_DELETE = 0x02
INS_CALCULATE = 0x03
INS_GET_CHALLENGE = 0x04
INS_LIST = 0x05
INS_RESET = 0x06
INS_GET_VERSION = 0x07
INS_PUT_MANAGEMENT_KEY = 0x08
INS_GET_MANAGEMENT_KEY_RETRIES = 0x09
INS_GET_PUBLIC_KEY = 0x0A
# Lengths for parameters
MANAGEMENT_KEY_LEN = 16
CREDENTIAL_PASSWORD_LEN = 16
MIN_LABEL_LEN = 1
MAX_LABEL_LEN = 64
DEFAULT_MANAGEMENT_KEY = (
b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
)
INITIAL_RETRY_COUNTER = 8
[docs]
@unique
class ALGORITHM(IntEnum):
"""Algorithms for YubiHSM Auth credentials."""
AES128_YUBICO_AUTHENTICATION = 38
EC_P256_YUBICO_AUTHENTICATION = 39
@property
def key_len(self):
if self.name.startswith("AES128"):
return 16
elif self.name.startswith("EC_P256"):
return 32
@property
def pubkey_len(self):
if self.name.startswith("EC_P256"):
return 64
def _parse_credential_password(credential_password: Union[bytes, str]) -> bytes:
if isinstance(credential_password, str):
pw = credential_password.encode().ljust(CREDENTIAL_PASSWORD_LEN, b"\0")
else:
pw = bytes(credential_password)
if len(pw) != CREDENTIAL_PASSWORD_LEN:
raise ValueError(
"Credential password must be %d bytes long" % CREDENTIAL_PASSWORD_LEN
)
return pw
def _parse_label(label: str) -> bytes:
try:
parsed_label = label.encode()
except Exception:
raise ValueError(label)
if len(parsed_label) < MIN_LABEL_LEN or len(parsed_label) > MAX_LABEL_LEN:
raise ValueError(
"Label must be between %d and %d bytes long"
% (MIN_LABEL_LEN, MAX_LABEL_LEN)
)
return parsed_label
def _parse_select(response):
data = Tlv.unpack(TAG_VERSION, response)
return Version.from_bytes(data)
def _password_to_key(password: str) -> Tuple[bytes, bytes]:
"""Derive encryption and MAC key from a password.
:return: A tuple containing the encryption key, and MAC key.
"""
pw_bytes = password.encode()
key = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=b"Yubico",
iterations=10000,
backend=default_backend(),
).derive(pw_bytes)
key_enc, key_mac = key[:16], key[16:]
return key_enc, key_mac
def _retries_from_sw(sw):
if sw & 0xFFF0 == SW.VERIFY_FAIL_NO_RETRY:
return sw & ~0xFFF0
return None
[docs]
@total_ordering
@dataclass(order=False, frozen=True)
class Credential:
"""A YubiHSM Auth credential object."""
label: str
algorithm: ALGORITHM
counter: int
touch_required: Optional[bool]
def __lt__(self, other):
a = self.label.lower()
b = other.label.lower()
return a < b
def __eq__(self, other):
return self.label == other.label
def __hash__(self) -> int:
return hash(self.label)
[docs]
class SessionKeys(NamedTuple):
"""YubiHSM Session Keys."""
key_senc: bytes
key_smac: bytes
key_srmac: bytes
[docs]
@classmethod
def parse(cls, response: bytes) -> "SessionKeys":
key_senc = response[:16]
key_smac = response[16:32]
key_srmac = response[32:48]
return cls(
key_senc=key_senc,
key_smac=key_smac,
key_srmac=key_srmac,
)
[docs]
class HsmAuthSession:
"""A session with the YubiHSM Auth application."""
def __init__(
self,
connection: SmartCardConnection,
scp_key_params: Optional[ScpKeyParams] = None,
) -> None:
self.protocol = SmartCardProtocol(connection)
self._version = _parse_select(self.protocol.select(AID.HSMAUTH))
if scp_key_params:
self.protocol.init_scp(scp_key_params)
self.protocol.configure(self._version)
@property
def version(self) -> Version:
"""The YubiHSM Auth application version."""
return self._version
[docs]
def reset(self) -> None:
"""Perform a factory reset on the YubiHSM Auth application."""
self.protocol.send_apdu(0, INS_RESET, 0xDE, 0xAD)
logger.info("YubiHSM Auth application data reset performed")
[docs]
def list_credentials(self) -> List[Credential]:
"""List YubiHSM Auth credentials on YubiKey"""
creds = []
for tlv in Tlv.parse_list(self.protocol.send_apdu(0, INS_LIST, 0, 0)):
data = Tlv.unpack(TAG_LABEL_LIST, tlv)
algorithm = ALGORITHM(data[0])
touch_required = bool(data[1])
label_length = tlv.length - 3
label = data[2 : 2 + label_length].decode()
counter = data[-1]
creds.append(Credential(label, algorithm, counter, touch_required))
return creds
def _put_credential(
self,
management_key: bytes,
label: str,
key: bytes,
algorithm: ALGORITHM,
credential_password: Union[bytes, str],
touch_required: bool = False,
) -> Credential:
if len(management_key) != MANAGEMENT_KEY_LEN:
raise ValueError(
"Management key must be %d bytes long" % MANAGEMENT_KEY_LEN
)
data = (
Tlv(TAG_MANAGEMENT_KEY, management_key)
+ Tlv(TAG_LABEL, _parse_label(label))
+ Tlv(TAG_ALGORITHM, int2bytes(algorithm))
)
if algorithm == ALGORITHM.AES128_YUBICO_AUTHENTICATION:
data += Tlv(TAG_KEY_ENC, key[:16]) + Tlv(TAG_KEY_MAC, key[16:])
elif algorithm == ALGORITHM.EC_P256_YUBICO_AUTHENTICATION:
data += Tlv(TAG_PRIVATE_KEY, key)
data += Tlv(
TAG_CREDENTIAL_PASSWORD, _parse_credential_password(credential_password)
)
if touch_required:
data += Tlv(TAG_TOUCH, int2bytes(1))
else:
data += Tlv(TAG_TOUCH, int2bytes(0))
logger.debug(
f"Importing YubiHSM Auth credential (label={label}, algo={algorithm}, "
f"touch_required={touch_required})"
)
try:
self.protocol.send_apdu(0, INS_PUT, 0, 0, data)
logger.info("Credential imported")
except ApduError as e:
retries = _retries_from_sw(e.sw)
if retries is None:
raise
raise InvalidPinError(
attempts_remaining=retries,
message=f"Invalid management key, {retries} attempts remaining",
)
return Credential(label, algorithm, INITIAL_RETRY_COUNTER, touch_required)
[docs]
def put_credential_symmetric(
self,
management_key: bytes,
label: str,
key_enc: bytes,
key_mac: bytes,
credential_password: Union[bytes, str],
touch_required: bool = False,
) -> Credential:
"""Import a symmetric YubiHSM Auth credential.
:param management_key: The management key.
:param label: The label of the credential.
:param key_enc: The static K-ENC.
:param key_mac: The static K-MAC.
:param credential_password: The password used to protect
access to the credential.
:param touch_required: The touch requirement policy.
"""
aes128_key_len = ALGORITHM.AES128_YUBICO_AUTHENTICATION.key_len
if len(key_enc) != aes128_key_len or len(key_mac) != aes128_key_len:
raise ValueError(
"Encryption and MAC key must be %d bytes long", aes128_key_len
)
return self._put_credential(
management_key,
label,
key_enc + key_mac,
ALGORITHM.AES128_YUBICO_AUTHENTICATION,
credential_password,
touch_required,
)
[docs]
def put_credential_derived(
self,
management_key: bytes,
label: str,
derivation_password: str,
credential_password: Union[bytes, str],
touch_required: bool = False,
) -> Credential:
"""Import a symmetric YubiHSM Auth credential derived from password.
:param management_key: The management key.
:param label: The label of the credential.
:param derivation_password: The password used to derive the keys from.
:param credential_password: The password used to protect
access to the credential.
:param touch_required: The touch requirement policy.
"""
key_enc, key_mac = _password_to_key(derivation_password)
return self.put_credential_symmetric(
management_key, label, key_enc, key_mac, credential_password, touch_required
)
[docs]
def put_credential_asymmetric(
self,
management_key: bytes,
label: str,
private_key: ec.EllipticCurvePrivateKeyWithSerialization,
credential_password: Union[bytes, str],
touch_required: bool = False,
) -> Credential:
"""Import an asymmetric YubiHSM Auth credential.
:param management_key: The management key.
:param label: The label of the credential.
:param private_key: Private key corresponding to the public
authentication key object on the YubiHSM.
:param credential_password: The password used to protect
access to the credential.
:param touch_required: The touch requirement policy.
"""
require_version(self.version, (5, 6, 0))
if not isinstance(private_key.curve, ec.SECP256R1):
raise ValueError("Unsupported curve")
ln = ALGORITHM.EC_P256_YUBICO_AUTHENTICATION.key_len
numbers = private_key.private_numbers()
return self._put_credential(
management_key,
label,
int2bytes(numbers.private_value, ln),
ALGORITHM.EC_P256_YUBICO_AUTHENTICATION,
credential_password,
touch_required,
)
[docs]
def generate_credential_asymmetric(
self,
management_key: bytes,
label: str,
credential_password: Union[bytes, str],
touch_required: bool = False,
) -> Credential:
"""Generate an asymmetric YubiHSM Auth credential.
Generates a private key on the YubiKey, whose corresponding
public key can be retrieved using `get_public_key`.
:param management_key: The management key.
:param label: The label of the credential.
:param credential_password: The password used to protect
access to the credential.
:param touch_required: The touch requirement policy.
"""
require_version(self.version, (5, 6, 0))
return self._put_credential(
management_key,
label,
b"", # Empty byte will generate key
ALGORITHM.EC_P256_YUBICO_AUTHENTICATION,
credential_password,
touch_required,
)
[docs]
def get_public_key(self, label: str) -> ec.EllipticCurvePublicKey:
"""Get the public key for an asymmetric credential.
This will return the long-term public key "PK-OCE" for an
asymmetric credential.
:param label: The label of the credential.
"""
require_version(self.version, (5, 6, 0))
data = Tlv(TAG_LABEL, _parse_label(label))
res = self.protocol.send_apdu(0, INS_GET_PUBLIC_KEY, 0, 0, data)
return ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), res)
[docs]
def delete_credential(self, management_key: bytes, label: str) -> None:
"""Delete a YubiHSM Auth credential.
:param management_key: The management key.
:param label: The label of the credential.
"""
if len(management_key) != MANAGEMENT_KEY_LEN:
raise ValueError(
"Management key must be %d bytes long" % MANAGEMENT_KEY_LEN
)
data = Tlv(TAG_MANAGEMENT_KEY, management_key) + Tlv(
TAG_LABEL, _parse_label(label)
)
try:
self.protocol.send_apdu(0, INS_DELETE, 0, 0, data)
logger.info("Credential deleted")
except ApduError as e:
retries = _retries_from_sw(e.sw)
if retries is None:
raise
raise InvalidPinError(
attempts_remaining=retries,
message=f"Invalid management key, {retries} attempts remaining",
)
[docs]
def put_management_key(
self,
management_key: bytes,
new_management_key: bytes,
) -> None:
"""Change YubiHSM Auth management key
:param management_key: The current management key.
:param new_management_key: The new management key.
"""
if (
len(management_key) != MANAGEMENT_KEY_LEN
or len(new_management_key) != MANAGEMENT_KEY_LEN
):
raise ValueError(
"Management key must be %d bytes long" % MANAGEMENT_KEY_LEN
)
data = Tlv(TAG_MANAGEMENT_KEY, management_key) + Tlv(
TAG_MANAGEMENT_KEY, new_management_key
)
try:
self.protocol.send_apdu(0, INS_PUT_MANAGEMENT_KEY, 0, 0, data)
logger.info("New management key set")
except ApduError as e:
retries = _retries_from_sw(e.sw)
if retries is None:
raise
raise InvalidPinError(
attempts_remaining=retries,
message=f"Invalid management key, {retries} attempts remaining",
)
[docs]
def get_management_key_retries(self) -> int:
"""Get retries remaining for Management key"""
res = self.protocol.send_apdu(0, INS_GET_MANAGEMENT_KEY_RETRIES, 0, 0)
return bytes2int(res)
def _calculate_session_keys(
self,
label: str,
context: bytes,
credential_password: Union[bytes, str],
card_crypto: Optional[bytes] = None,
public_key: Optional[bytes] = None,
) -> bytes:
data = Tlv(TAG_LABEL, _parse_label(label)) + Tlv(TAG_CONTEXT, context)
if public_key:
data += Tlv(TAG_PUBLIC_KEY, public_key)
if card_crypto:
data += Tlv(TAG_RESPONSE, card_crypto)
data += Tlv(
TAG_CREDENTIAL_PASSWORD, _parse_credential_password(credential_password)
)
try:
res = self.protocol.send_apdu(0, INS_CALCULATE, 0, 0, data)
logger.info("Session keys calculated")
except ApduError as e:
retries = _retries_from_sw(e.sw)
if retries is None:
raise
raise InvalidPinError(
attempts_remaining=retries,
message=f"Invalid credential password, {retries} attempts remaining",
)
return res
[docs]
def calculate_session_keys_symmetric(
self,
label: str,
context: bytes,
credential_password: Union[bytes, str],
card_crypto: Optional[bytes] = None,
) -> SessionKeys:
"""Calculate session keys from a symmetric YubiHSM Auth credential.
:param label: The label of the credential.
:param context: The context (host challenge + hsm challenge).
:param credential_password: The password used to protect
access to the credential.
:param card_crypto: The card cryptogram.
"""
return SessionKeys.parse(
self._calculate_session_keys(
label=label,
context=context,
credential_password=credential_password,
card_crypto=card_crypto,
)
)
[docs]
def calculate_session_keys_asymmetric(
self,
label: str,
context: bytes,
public_key: ec.EllipticCurvePublicKey,
credential_password: Union[bytes, str],
card_crypto: bytes,
) -> SessionKeys:
"""Calculate session keys from an asymmetric YubiHSM Auth credential.
:param label: The label of the credential.
:param context: The context (EPK.OCE + EPK.SD).
:param public_key: The YubiHSM device's public key.
:param credential_password: The password used to protect
access to the credential.
:param card_crypto: The card cryptogram.
"""
require_version(self.version, (5, 6, 0))
if not isinstance(public_key.curve, ec.SECP256R1):
raise ValueError("Unsupported curve")
numbers = public_key.public_numbers()
public_key_data = (
struct.pack("!B", 4)
+ int.to_bytes(numbers.x, public_key.key_size // 8, "big")
+ int.to_bytes(numbers.y, public_key.key_size // 8, "big")
)
return SessionKeys.parse(
self._calculate_session_keys(
label=label,
context=context,
credential_password=credential_password,
card_crypto=card_crypto,
public_key=public_key_data,
)
)
[docs]
def get_challenge(
self, label: str, credential_password: Union[bytes, str, None] = None
) -> bytes:
"""Get the Host Challenge.
For symmetric credentials this is Host Challenge, a random 8 byte value.
For asymmetric credentials this is EPK-OCE.
:param label: The label of the credential.
:param credential_password: The password used to protect access to the
credential, needed for asymmetric credentials.
"""
require_version(self.version, (5, 6, 0))
data: bytes = Tlv(TAG_LABEL, _parse_label(label))
if credential_password is not None and self.version >= (5, 7, 1):
data += Tlv(
TAG_CREDENTIAL_PASSWORD, _parse_credential_password(credential_password)
)
return self.protocol.send_apdu(0, INS_GET_CHALLENGE, 0, 0, data)