# Copyright (c) 2020 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from .core import (
bytes2int,
int2bytes,
require_version,
Version,
Tlv,
TRANSPORT,
USB_INTERFACE,
NotSupportedError,
BadResponseError,
ApplicationNotAvailableError,
)
from .core.otp import (
check_crc,
OtpConnection,
OtpProtocol,
STATUS_OFFSET_PROG_SEQ,
CommandRejectedError,
)
from .core.fido import FidoConnection
from .core.smartcard import AID, SmartCardConnection, SmartCardProtocol, ScpKeyParams
from fido2.hid import CAPABILITY as CTAP_CAPABILITY
from enum import IntEnum, IntFlag, unique
from dataclasses import dataclass, field
from typing import Optional, Union, Mapping
import abc
import struct
import warnings
import logging
logger = logging.getLogger(__name__)
[docs]
@unique
class CAPABILITY(IntFlag):
"""YubiKey Application identifiers."""
OTP = 0x01
U2F = 0x02
FIDO2 = 0x200
OATH = 0x20
PIV = 0x10
OPENPGP = 0x08
HSMAUTH = 0x100
def __str__(self):
name = "|".join(c.name or str(c) for c in CAPABILITY if c in self)
return f"{name}: {hex(self)}"
@classmethod
def _from_fips(cls, fips: int) -> "CAPABILITY":
c = CAPABILITY(0)
if fips & (1 << 0):
c |= CAPABILITY.FIDO2
if fips & (1 << 1):
c |= CAPABILITY.PIV
if fips & (1 << 2):
c |= CAPABILITY.OPENPGP
if fips & (1 << 3):
c |= CAPABILITY.OATH
if fips & (1 << 4):
c |= CAPABILITY.HSMAUTH
return c
@classmethod
def _from_aid(cls, aid: AID) -> "CAPABILITY":
# TODO: match on prefix?
try:
return getattr(CAPABILITY, aid.name)
except AttributeError:
pass
if aid == AID.FIDO:
return CAPABILITY.FIDO2
raise ValueError("Unhandled AID")
@property
def display_name(self) -> str:
if self == 0:
return "None"
if f"{self:b}".count("1") > 1:
i = 1
names = []
while i < self:
if i & self:
names.append(CAPABILITY(i).display_name)
i <<= 1
return ", ".join(names)
if self == CAPABILITY.OTP:
return "Yubico OTP"
elif self == CAPABILITY.U2F:
return "FIDO U2F"
elif self == CAPABILITY.OPENPGP:
return "OpenPGP"
elif self == CAPABILITY.HSMAUTH:
return "YubiHSM Auth"
return self.name or f"Unknown(0x{self:x})"
@property
def usb_interfaces(self) -> USB_INTERFACE:
ifaces = USB_INTERFACE(0)
if self & CAPABILITY.OTP:
ifaces |= USB_INTERFACE.OTP
if self & (CAPABILITY.U2F | CAPABILITY.FIDO2):
ifaces |= USB_INTERFACE.FIDO
if self & (
0x4 # General CCID bit
| 0x400 # Management over CCID bit
| CAPABILITY.OATH
| CAPABILITY.PIV
| CAPABILITY.OPENPGP
| CAPABILITY.HSMAUTH
):
ifaces |= USB_INTERFACE.CCID
return ifaces
[docs]
@unique
class DEVICE_FLAG(IntFlag):
"""Configuration flags."""
REMOTE_WAKEUP = 0x40
EJECT = 0x80
TAG_USB_SUPPORTED = 0x01
TAG_SERIAL = 0x02
TAG_USB_ENABLED = 0x03
TAG_FORM_FACTOR = 0x04
TAG_VERSION = 0x05
TAG_AUTO_EJECT_TIMEOUT = 0x06
TAG_CHALRESP_TIMEOUT = 0x07
TAG_DEVICE_FLAGS = 0x08
TAG_APP_VERSIONS = 0x09
TAG_CONFIG_LOCK = 0x0A
TAG_UNLOCK = 0x0B
TAG_REBOOT = 0x0C
TAG_NFC_SUPPORTED = 0x0D
TAG_NFC_ENABLED = 0x0E
TAG_IAP_DETECTION = 0x0F
TAG_MORE_DATA = 0x10
TAG_FREE_FORM = 0x11
TAG_HID_INIT_DELAY = 0x12
TAG_PART_NUMBER = 0x13
TAG_FIPS_CAPABLE = 0x14
TAG_FIPS_APPROVED = 0x15
TAG_PIN_COMPLEXITY = 0x16
TAG_NFC_RESTRICTED = 0x17
TAG_RESET_BLOCKED = 0x18
TAG_FPS_VERSION = 0x20
TAG_STM_VERSION = 0x21
[docs]
@dataclass
class DeviceConfig:
"""Management settings for YubiKey which can be configured by the user."""
enabled_capabilities: Mapping[TRANSPORT, CAPABILITY] = field(default_factory=dict)
auto_eject_timeout: Optional[int] = None
challenge_response_timeout: Optional[int] = None
device_flags: Optional[DEVICE_FLAG] = None
nfc_restricted: Optional[bool] = None
[docs]
def get_bytes(
self,
reboot: bool,
cur_lock_code: Optional[bytes] = None,
new_lock_code: Optional[bytes] = None,
) -> bytes:
buf = b""
if reboot:
buf += Tlv(TAG_REBOOT)
if cur_lock_code:
buf += Tlv(TAG_UNLOCK, cur_lock_code)
usb_enabled = self.enabled_capabilities.get(TRANSPORT.USB)
if usb_enabled is not None:
buf += Tlv(TAG_USB_ENABLED, int2bytes(usb_enabled, 2))
nfc_enabled = self.enabled_capabilities.get(TRANSPORT.NFC)
if nfc_enabled is not None:
buf += Tlv(TAG_NFC_ENABLED, int2bytes(nfc_enabled, 2))
if self.auto_eject_timeout is not None:
buf += Tlv(TAG_AUTO_EJECT_TIMEOUT, int2bytes(self.auto_eject_timeout, 2))
if self.challenge_response_timeout is not None:
buf += Tlv(TAG_CHALRESP_TIMEOUT, int2bytes(self.challenge_response_timeout))
if self.device_flags is not None:
buf += Tlv(TAG_DEVICE_FLAGS, int2bytes(self.device_flags))
if new_lock_code:
buf += Tlv(TAG_CONFIG_LOCK, new_lock_code)
if self.nfc_restricted:
buf += Tlv(TAG_NFC_RESTRICTED, b"\1")
if len(buf) > 0xFF:
raise NotSupportedError("DeviceConfiguration too large")
return int2bytes(len(buf)) + buf
[docs]
@dataclass
class DeviceInfo:
"""Information about a YubiKey readable using the ManagementSession."""
config: DeviceConfig
serial: Optional[int]
version: Version
form_factor: FORM_FACTOR
supported_capabilities: Mapping[TRANSPORT, CAPABILITY]
is_locked: bool
is_fips: bool = False
is_sky: bool = False
part_number: Optional[str] = None
fips_capable: CAPABILITY = CAPABILITY(0)
fips_approved: CAPABILITY = CAPABILITY(0)
pin_complexity: bool = False
reset_blocked: CAPABILITY = CAPABILITY(0)
fps_version: Optional[Version] = None
stm_version: Optional[Version] = None
@property
def _is_bio(self) -> bool:
return self.form_factor in (FORM_FACTOR.USB_A_BIO, FORM_FACTOR.USB_C_BIO)
[docs]
def has_transport(self, transport: TRANSPORT) -> bool:
return transport in self.supported_capabilities
[docs]
@classmethod
def parse(cls, encoded: bytes, default_version: Version) -> "DeviceInfo":
if len(encoded) - 1 != encoded[0]:
raise BadResponseError("Invalid length")
return cls.parse_tlvs(Tlv.parse_dict(encoded[1:]), default_version)
[docs]
@classmethod
def parse_tlvs(
cls, data: Mapping[int, bytes], default_version: Version
) -> "DeviceInfo":
locked = data.get(TAG_CONFIG_LOCK) == b"\1"
serial = bytes2int(data.get(TAG_SERIAL, b"\0")) or None
ff_value = bytes2int(data.get(TAG_FORM_FACTOR, b"\0"))
form_factor = FORM_FACTOR.from_code(ff_value)
fips = bool(ff_value & 0x80)
sky = bool(ff_value & 0x40)
if TAG_VERSION in data:
version = Version.from_bytes(data[TAG_VERSION])
else:
version = default_version
auto_eject_to = bytes2int(data.get(TAG_AUTO_EJECT_TIMEOUT, b"\0"))
chal_resp_to = bytes2int(data.get(TAG_CHALRESP_TIMEOUT, b"\0"))
flags = DEVICE_FLAG(bytes2int(data.get(TAG_DEVICE_FLAGS, b"\0")))
supported = {}
enabled = {}
if version == (4, 2, 4): # Doesn't report correctly
supported[TRANSPORT.USB] = CAPABILITY(0x3F)
else:
supported[TRANSPORT.USB] = CAPABILITY(bytes2int(data[TAG_USB_SUPPORTED]))
if TAG_USB_ENABLED in data: # From YK 5.0.0
if not ((4, 0, 0) <= version < (5, 0, 0)): # Broken on YK4
enabled[TRANSPORT.USB] = CAPABILITY(bytes2int(data[TAG_USB_ENABLED]))
if TAG_NFC_SUPPORTED in data: # YK with NFC
supported[TRANSPORT.NFC] = CAPABILITY(bytes2int(data[TAG_NFC_SUPPORTED]))
enabled[TRANSPORT.NFC] = CAPABILITY(bytes2int(data[TAG_NFC_ENABLED]))
nfc_restricted = data.get(TAG_NFC_RESTRICTED, b"\0") == b"\1"
try:
part_number = data.get(TAG_PART_NUMBER, b"").decode() or None
except UnicodeDecodeError:
part_number = None
fips_capable = CAPABILITY._from_fips(
bytes2int(data.get(TAG_FIPS_CAPABLE, b"\0"))
)
fips_approved = CAPABILITY._from_fips(
bytes2int(data.get(TAG_FIPS_APPROVED, b"\0"))
)
pin_complexity = data.get(TAG_PIN_COMPLEXITY, b"\0") == b"\1"
reset_blocked = CAPABILITY(bytes2int(data.get(TAG_RESET_BLOCKED, b"\0")))
fps_version = Version.from_bytes(data.get(TAG_FPS_VERSION, b"\0\0\0"))
stm_version = Version.from_bytes(data.get(TAG_STM_VERSION, b"\0\0\0"))
return cls(
DeviceConfig(enabled, auto_eject_to, chal_resp_to, flags, nfc_restricted),
serial,
version,
form_factor,
supported,
locked,
fips,
sky,
part_number,
fips_capable,
fips_approved,
pin_complexity,
reset_blocked,
fps_version or None,
stm_version or None,
)
_MODES = [
USB_INTERFACE.OTP, # 0x00
USB_INTERFACE.CCID, # 0x01
USB_INTERFACE.OTP | USB_INTERFACE.CCID, # 0x02
USB_INTERFACE.FIDO, # 0x03
USB_INTERFACE.OTP | USB_INTERFACE.FIDO, # 0x04
USB_INTERFACE.FIDO | USB_INTERFACE.CCID, # 0x05
USB_INTERFACE.OTP | USB_INTERFACE.FIDO | USB_INTERFACE.CCID, # 0x06
]
[docs]
@dataclass(init=False, repr=False)
class Mode:
"""YubiKey USB Mode configuration for use with YubiKey NEO and 4."""
code: int
interfaces: USB_INTERFACE
def __init__(self, interfaces: USB_INTERFACE):
try:
self.code = _MODES.index(interfaces)
self.interfaces = USB_INTERFACE(interfaces)
except ValueError:
raise ValueError("Invalid mode!")
def __repr__(self):
return "+".join(t.name or str(t) for t in USB_INTERFACE if t in self.interfaces)
[docs]
@classmethod
def from_code(cls, code: int) -> "Mode":
# Mode is determined from the lowest 3 bits
try:
return cls(_MODES[code & 0b00000111])
except IndexError:
raise ValueError("Invalid mode code")
SLOT_DEVICE_CONFIG = 0x11
SLOT_YK4_CAPABILITIES = 0x13
SLOT_YK4_SET_DEVICE_INFO = 0x15
class _Backend(abc.ABC):
version: Version
@abc.abstractmethod
def close(self) -> None:
...
@abc.abstractmethod
def set_mode(self, data: bytes) -> None:
...
@abc.abstractmethod
def read_config(self, page: int = 0) -> bytes:
...
@abc.abstractmethod
def write_config(self, config: bytes) -> None:
...
class _ManagementOtpBackend(_Backend):
def __init__(self, otp_connection):
self.protocol = OtpProtocol(otp_connection)
self.version = self.protocol.version
if (1, 0, 0) <= self.version < (3, 0, 0):
raise ApplicationNotAvailableError()
def close(self):
self.protocol.close()
def set_mode(self, data):
empty = self.protocol.read_status()[STATUS_OFFSET_PROG_SEQ] == 0
try:
self.protocol.send_and_receive(SLOT_DEVICE_CONFIG, data)
except CommandRejectedError:
if empty:
return # ProgSeq isn't updated by set mode when empty
raise
def read_config(self, page: int = 0):
response = self.protocol.send_and_receive(
SLOT_YK4_CAPABILITIES, int2bytes(page)
)
r_len = response[0]
if check_crc(response[: r_len + 1 + 2]):
return response[: r_len + 1]
raise BadResponseError("Invalid checksum")
def write_config(self, config):
self.protocol.send_and_receive(SLOT_YK4_SET_DEVICE_INFO, config)
INS_SET_MODE = 0x16
INS_READ_CONFIG = 0x1D
INS_WRITE_CONFIG = 0x1C
INS_DEVICE_RESET = 0x1F
P1_DEVICE_CONFIG = 0x11
class _ManagementSmartCardBackend(_Backend):
def __init__(self, smartcard_connection, scp_key_params):
self.protocol = SmartCardProtocol(smartcard_connection)
try:
select_bytes = self.protocol.select(AID.MANAGEMENT)
if scp_key_params:
self.protocol.init_scp(scp_key_params)
elif select_bytes[-2:] == b"\x90\x00":
# YubiKey Edge incorrectly appends SW twice.
select_bytes = select_bytes[:-2]
select_str = select_bytes.decode()
self.version = Version.from_string(select_str)
# For YubiKey NEO, we use the OTP application for further commands
if self.version[0] == 3:
# Workaround to "de-select" on NEO, otherwise it gets stuck.
smartcard_connection.send_and_receive(b"\xa4\x04\x00\x08")
self.protocol.select(AID.OTP)
except ApplicationNotAvailableError:
if smartcard_connection.transport == TRANSPORT.NFC:
# Probably NEO over NFC
status = self.protocol.select(AID.OTP)
self.version = Version.from_bytes(status[:3])
else:
raise
self.protocol.configure(self.version)
def close(self):
self.protocol.close()
def set_mode(self, data):
if self.version[0] == 3: # Using the OTP application
self.protocol.send_apdu(0, 0x01, SLOT_DEVICE_CONFIG, 0, data)
else:
self.protocol.send_apdu(0, INS_SET_MODE, P1_DEVICE_CONFIG, 0, data)
def read_config(self, page: int = 0):
return self.protocol.send_apdu(0, INS_READ_CONFIG, page, 0)
def write_config(self, config):
self.protocol.send_apdu(0, INS_WRITE_CONFIG, 0, 0, config)
def device_reset(self):
self.protocol.send_apdu(0, INS_DEVICE_RESET, 0, 0)
CTAP_VENDOR_FIRST = 0x40
CTAP_YUBIKEY_DEVICE_CONFIG = CTAP_VENDOR_FIRST
CTAP_READ_CONFIG = CTAP_VENDOR_FIRST + 2
CTAP_WRITE_CONFIG = CTAP_VENDOR_FIRST + 3
class _ManagementCtapBackend(_Backend):
def __init__(self, fido_connection):
self.ctap = fido_connection
version = fido_connection.device_version
if version[0] < 4: # Prior to YK4 this was not firmware version
if not (
version[0] == 0 and fido_connection.capabilities & CTAP_CAPABILITY.CBOR
):
version = (3, 0, 0) # Guess that it's a NEO
self.version = Version(*version)
def close(self):
self.ctap.close()
def set_mode(self, data):
self.ctap.call(CTAP_YUBIKEY_DEVICE_CONFIG, data)
def read_config(self, page: int = 0):
return self.ctap.call(CTAP_READ_CONFIG, int2bytes(page))
def write_config(self, config):
self.ctap.call(CTAP_WRITE_CONFIG, config)
[docs]
class ManagementSession:
def __init__(
self,
connection: Union[OtpConnection, SmartCardConnection, FidoConnection],
scp_key_params: Optional[ScpKeyParams] = None,
):
if isinstance(connection, OtpConnection):
if scp_key_params:
raise ValueError("SCP can only be used with SmartCardConnection")
self.backend: _Backend = _ManagementOtpBackend(connection)
elif isinstance(connection, SmartCardConnection):
self.backend = _ManagementSmartCardBackend(connection, scp_key_params)
elif isinstance(connection, FidoConnection):
if scp_key_params:
raise ValueError("SCP can only be used with SmartCardConnection")
self.backend = _ManagementCtapBackend(connection)
else:
raise TypeError("Unsupported connection type")
logger.debug(
"Management session initialized for "
f"connection={type(connection).__name__}, version={self.version}"
)
[docs]
def close(self) -> None:
"""Close the underlying connection.
:deprecated: call .close() on the underlying connection instead.
"""
warnings.warn(
"Deprecated: call .close() on the underlying connection instead.",
DeprecationWarning,
)
self.backend.close()
@property
def version(self) -> Version:
"""The firmware version of the YubiKey"""
return self.backend.version
[docs]
def read_device_info(self) -> DeviceInfo:
"""Get detailed information about the YubiKey."""
require_version(self.version, (4, 1, 0))
more_data = True
tlvs = {}
page = 0
while more_data:
logger.debug(f"Reading DeviceInfo page: {page}")
encoded = self.backend.read_config(page)
if len(encoded) - 1 != encoded[0]:
raise BadResponseError("Invalid length")
data = Tlv.parse_dict(encoded[1:])
more_data = data.pop(TAG_MORE_DATA, 0) == b"\1"
tlvs.update(data)
page += 1
return DeviceInfo.parse_tlvs(tlvs, self.version)
[docs]
def write_device_config(
self,
config: Optional[DeviceConfig] = None,
reboot: bool = False,
cur_lock_code: Optional[bytes] = None,
new_lock_code: Optional[bytes] = None,
) -> None:
"""Write configuration settings for YubiKey.
:param config: The device configuration.
:param reboot: If True the YubiKey will reboot.
:param cur_lock_code: Current lock code.
:param new_lock_code: New lock code.
"""
require_version(self.version, (5, 0, 0))
if cur_lock_code is not None and len(cur_lock_code) != 16:
raise ValueError("Lock code must be 16 bytes")
if new_lock_code is not None and len(new_lock_code) != 16:
raise ValueError("Lock code must be 16 bytes")
config = config or DeviceConfig()
logger.debug(
f"Writing device config: {config}, reboot: {reboot}, "
f"current lock code: {cur_lock_code is not None}, "
f"new lock code: {new_lock_code is not None}"
)
self.backend.write_config(
config.get_bytes(reboot, cur_lock_code, new_lock_code)
)
logger.info("Device config written")
[docs]
def set_mode(
self,
mode: Mode,
chalresp_timeout: int = 0,
auto_eject_timeout: Optional[int] = None,
) -> None:
"""Write connection modes (USB interfaces) for YubiKey.
:param mode: The connection modes (USB interfaces).
:param chalresp_timeout: The timeout when waiting for touch
for challenge response.
:param auto_eject_timeout: When set, the smartcard will
automatically eject after the given time.
"""
logger.debug(
f"Set mode: {mode}, chalresp_timeout: {chalresp_timeout}, "
f"auto_eject_timeout: {auto_eject_timeout}"
)
if self.version >= (5, 0, 0):
# Translate into DeviceConfig
usb_enabled = CAPABILITY(0)
if USB_INTERFACE.OTP in mode.interfaces:
usb_enabled |= CAPABILITY.OTP
if USB_INTERFACE.CCID in mode.interfaces:
usb_enabled |= (
CAPABILITY.OATH
| CAPABILITY.PIV
| CAPABILITY.OPENPGP
| CAPABILITY.HSMAUTH
| 0x400 # Management over CCID bit
)
if USB_INTERFACE.FIDO in mode.interfaces:
usb_enabled |= CAPABILITY.U2F | CAPABILITY.FIDO2
# Overlay with supported capabilities
supported = self.read_device_info().supported_capabilities.get(
TRANSPORT.USB, 0
)
usb_enabled = usb_enabled & supported
logger.debug(f"Delegating to DeviceConfig with usb_enabled: {usb_enabled}")
# N.B: reboot=False, since we're using the older set_mode command
self.write_device_config(
DeviceConfig(
{TRANSPORT.USB: usb_enabled},
auto_eject_timeout,
chalresp_timeout,
)
)
else:
code = mode.code
if auto_eject_timeout is not None:
if mode.interfaces == USB_INTERFACE.CCID:
code |= DEVICE_FLAG.EJECT
else:
raise ValueError("Touch-eject only applicable for mode: CCID")
self.backend.set_mode(
# N.B. This is little endian!
struct.pack("<BBH", code, chalresp_timeout, auto_eject_timeout or 0)
)
logger.info("Mode configuration written")
[docs]
def device_reset(self) -> None:
"""Global factory reset.
This is only available for YubiKey Bio, which has a PIN that is shared between
applications. This will factory reset the global PIN as well as the associated
applications.
"""
if not isinstance(self.backend, _ManagementSmartCardBackend):
raise NotSupportedError("Device reset can only be performed over CCID")
logger.debug("Performing device reset")
self.backend.device_reset()
logger.info("Device reset performed")