# Copyright (c) 2020 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from .core import (
TRANSPORT,
Version,
bytes2int,
require_version,
NotSupportedError,
BadResponseError,
)
from .core import ApplicationNotAvailableError
from .core.otp import (
check_crc,
calculate_crc,
OtpConnection,
OtpProtocol,
CommandRejectedError,
)
from .core.smartcard import AID, SmartCardConnection, SmartCardProtocol, ScpKeyParams
import abc
import struct
import warnings
from hashlib import sha1
from threading import Event
from enum import unique, IntEnum, IntFlag
from typing import TypeVar, Optional, Union, Callable
import logging
logger = logging.getLogger(__name__)
T = TypeVar("T")
[docs]
@unique
class SLOT(IntEnum):
ONE = 1
TWO = 2
[docs]
@staticmethod
def map(slot: "SLOT", one: T, two: T) -> T:
if slot == 1:
return one
elif slot == 2:
return two
raise ValueError("Invalid slot (must be 1 or 2)")
[docs]
@unique
class CONFIG_SLOT(IntEnum):
CONFIG_1 = 1 # First (default / V1) configuration
NAV = 2 # V1 only
CONFIG_2 = 3 # Second (V2) configuration
UPDATE_1 = 4 # Update slot 1
UPDATE_2 = 5 # Update slot 2
SWAP = 6 # Swap slot 1 and 2
NDEF_1 = 8 # Write NDEF record
NDEF_2 = 9 # Write NDEF record for slot 2
DEVICE_SERIAL = 0x10 # Device serial number
DEVICE_CONFIG = 0x11 # Write device configuration record
SCAN_MAP = 0x12 # Write scancode map
YK4_CAPABILITIES = 0x13 # Read YK4 capabilities (device info) list
YK4_SET_DEVICE_INFO = 0x15 # Write device info
CHAL_OTP_1 = 0x20 # Write 6 byte challenge to slot 1, get Yubico OTP response
CHAL_OTP_2 = 0x28 # Write 6 byte challenge to slot 2, get Yubico OTP response
CHAL_HMAC_1 = 0x30 # Write 64 byte challenge to slot 1, get HMAC-SHA1 response
CHAL_HMAC_2 = 0x38 # Write 64 byte challenge to slot 2, get HMAC-SHA1 response
[docs]
class TKTFLAG(IntFlag):
# Yubikey 1 and above
TAB_FIRST = 0x01 # Send TAB before first part
APPEND_TAB1 = 0x02 # Send TAB after first part
APPEND_TAB2 = 0x04 # Send TAB after second part
APPEND_DELAY1 = 0x08 # Add 0.5s delay after first part
APPEND_DELAY2 = 0x10 # Add 0.5s delay after second part
APPEND_CR = 0x20 # Append CR as final character
# Yubikey 2 and above
PROTECT_CFG2 = 0x80
# Block update of config 2 unless config 2 is configured and has this bit set
# Yubikey 2.1 and above
OATH_HOTP = 0x40 # OATH HOTP mode
# Yubikey 2.2 and above
CHAL_RESP = 0x40 # Challenge-response enabled (both must be set)
[docs]
class CFGFLAG(IntFlag):
# Yubikey 1 and above
SEND_REF = 0x01 # Send reference string (0..F) before data
PACING_10MS = 0x04 # Add 10ms intra-key pacing
PACING_20MS = 0x08 # Add 20ms intra-key pacing
STATIC_TICKET = 0x20 # Static ticket generation
# Yubikey 1 only
TICKET_FIRST = 0x02 # Send ticket first (default is fixed part)
ALLOW_HIDTRIG = 0x10 # Allow trigger through HID/keyboard
# Yubikey 2 and above
SHORT_TICKET = 0x02 # Send truncated ticket (half length)
STRONG_PW1 = 0x10 # Strong password policy flag #1 (mixed case)
STRONG_PW2 = 0x40 # Strong password policy flag #2 (substitute 0..7 to digits)
MAN_UPDATE = 0x80 # Allow manual (local) update of static OTP
# Yubikey 2.1 and above
OATH_HOTP8 = 0x02 # Generate 8 digits HOTP rather than 6 digits
OATH_FIXED_MODHEX1 = 0x10 # First byte in fixed part sent as modhex
OATH_FIXED_MODHEX2 = 0x40 # First two bytes in fixed part sent as modhex
OATH_FIXED_MODHEX = 0x50 # Fixed part sent as modhex
OATH_FIXED_MASK = 0x50 # Mask to get out fixed flags
# Yubikey 2.2 and above
CHAL_YUBICO = 0x20 # Challenge-response enabled - Yubico OTP mode
CHAL_HMAC = 0x22 # Challenge-response enabled - HMAC-SHA1
HMAC_LT64 = 0x04 # Set when HMAC message is less than 64 bytes
CHAL_BTN_TRIG = 0x08 # Challenge-response operation requires button press
[docs]
class EXTFLAG(IntFlag):
SERIAL_BTN_VISIBLE = 0x01 # Serial number visible at startup (button press)
SERIAL_USB_VISIBLE = 0x02 # Serial number visible in USB iSerial field
SERIAL_API_VISIBLE = 0x04 # Serial number visible via API call
# V2.3 flags only
USE_NUMERIC_KEYPAD = 0x08 # Use numeric keypad for digits
FAST_TRIG = 0x10 # Use fast trig if only cfg1 set
ALLOW_UPDATE = 0x20
# Allow update of existing configuration (selected flags + access code)
DORMANT = 0x40 # Dormant config (woken up, flag removed, requires update flag)
# V2.4/3.1 flags only
LED_INV = 0x80 # LED idle state is off rather than on
# Flags valid for update
TKTFLAG_UPDATE_MASK = (
TKTFLAG.TAB_FIRST
| TKTFLAG.APPEND_TAB1
| TKTFLAG.APPEND_TAB2
| TKTFLAG.APPEND_DELAY1
| TKTFLAG.APPEND_DELAY2
| TKTFLAG.APPEND_CR
)
CFGFLAG_UPDATE_MASK = CFGFLAG.PACING_10MS | CFGFLAG.PACING_20MS
EXTFLAG_UPDATE_MASK = (
EXTFLAG.SERIAL_BTN_VISIBLE
| EXTFLAG.SERIAL_USB_VISIBLE
| EXTFLAG.SERIAL_API_VISIBLE
| EXTFLAG.USE_NUMERIC_KEYPAD
| EXTFLAG.FAST_TRIG
| EXTFLAG.ALLOW_UPDATE
| EXTFLAG.DORMANT
| EXTFLAG.LED_INV
)
# Data sizes
FIXED_SIZE = 16
UID_SIZE = 6
KEY_SIZE = 16
ACC_CODE_SIZE = 6
CONFIG_SIZE = 52
NDEF_DATA_SIZE = 54
HMAC_KEY_SIZE = 20
HMAC_CHALLENGE_SIZE = 64
HMAC_RESPONSE_SIZE = 20
SCAN_CODES_SIZE = FIXED_SIZE + UID_SIZE + KEY_SIZE
SHA1_BLOCK_SIZE = 64
[docs]
@unique
class NDEF_TYPE(IntEnum):
TEXT = ord("T")
URI = ord("U")
DEFAULT_NDEF_URI = "https://my.yubico.com/yk/#"
NDEF_URL_PREFIXES = (
"http://www.",
"https://www.",
"http://",
"https://",
"tel:",
"mailto:",
"ftp://anonymous:anonymous@",
"ftp://ftp.",
"ftps://",
"sftp://",
"smb://",
"nfs://",
"ftp://",
"dav://",
"news:",
"telnet://",
"imap:",
"rtsp://",
"urn:",
"pop:",
"sip:",
"sips:",
"tftp:",
"btspp://",
"btl2cap://",
"btgoep://",
"tcpobex://",
"irdaobex://",
"file://",
"urn:epc:id:",
"urn:epc:tag:",
"urn:epc:pat:",
"urn:epc:raw:",
"urn:epc:",
"urn:nfc:",
)
def _build_config(fixed, uid, key, ext, tkt, cfg, acc_code=None):
buf = (
fixed.ljust(FIXED_SIZE, b"\0")
+ uid
+ key
+ (acc_code or b"\0" * ACC_CODE_SIZE)
+ struct.pack(">BBBB", len(fixed), ext, tkt, cfg)
+ b"\0\0" # RFU
)
return buf + struct.pack("<H", 0xFFFF & ~calculate_crc(buf))
def _build_update(ext, tkt, cfg, acc_code=None):
if ext & ~EXTFLAG_UPDATE_MASK != 0:
raise ValueError("Unsupported ext flags for update")
if tkt & ~TKTFLAG_UPDATE_MASK != 0:
raise ValueError("Unsupported tkt flags for update")
if cfg & ~CFGFLAG_UPDATE_MASK != 0:
raise ValueError("Unsupported cfg flags for update")
return _build_config(
b"", b"\0" * UID_SIZE, b"\0" * KEY_SIZE, ext, tkt, cfg, acc_code
)
def _build_ndef_config(value, ndef_type=NDEF_TYPE.URI):
if ndef_type == NDEF_TYPE.URI:
if value is None:
value = DEFAULT_NDEF_URI
for i, prefix in enumerate(NDEF_URL_PREFIXES):
if value.startswith(prefix):
id_code = i + 1
value = value[len(prefix) :]
break
else:
id_code = 0
data = bytes([id_code]) + value.encode()
else:
if value is None:
value = ""
data = b"\x02en" + value.encode()
if len(data) > NDEF_DATA_SIZE:
raise ValueError("URI payload too large")
return bytes([len(data), ndef_type]) + data.ljust(NDEF_DATA_SIZE, b"\0")
[docs]
@unique
class CFGSTATE(IntFlag):
# Bits in touch_level
SLOT1_VALID = 0x01 # configuration 1 is valid (from firmware 2.1)
SLOT2_VALID = 0x02 # configuration 2 is valid (from firmware 2.1)
SLOT1_TOUCH = 0x04 # configuration 1 requires touch (from firmware 3.0)
SLOT2_TOUCH = 0x08 # configuration 2 requires touch (from firmware 3.0)
LED_INV = 0x10 # LED behavior is inverted (EXTFLAG_LED_INV mirror)
def _shorten_hmac_key(key: bytes) -> bytes:
if len(key) > SHA1_BLOCK_SIZE:
key = sha1(key).digest() # nosec
elif len(key) > HMAC_KEY_SIZE:
raise NotSupportedError(f"Key lengths > {HMAC_KEY_SIZE} bytes not supported")
return key
Cfg = TypeVar("Cfg", bound="SlotConfiguration")
[docs]
class SlotConfiguration:
def __init__(self):
self._fixed = b""
self._uid = b"\0" * UID_SIZE
self._key = b"\0" * KEY_SIZE
self._flags = {}
self._update_flags(EXTFLAG.SERIAL_API_VISIBLE, True)
self._update_flags(EXTFLAG.ALLOW_UPDATE, True)
def _update_flags(self, flag: IntFlag, value: bool) -> None:
flag_key = type(flag)
flags = self._flags.get(flag_key, 0)
self._flags[flag_key] = flags | flag if value else flags & ~flag
[docs]
def is_supported_by(self, version: Version) -> bool:
return True
[docs]
def get_config(self, acc_code: Optional[bytes] = None) -> bytes:
return _build_config(
self._fixed,
self._uid,
self._key,
self._flags.get(EXTFLAG, 0),
self._flags.get(TKTFLAG, 0),
self._flags.get(CFGFLAG, 0),
acc_code,
)
[docs]
def serial_api_visible(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.SERIAL_API_VISIBLE, value)
return self
[docs]
def serial_usb_visible(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.SERIAL_USB_VISIBLE, value)
return self
[docs]
def allow_update(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.ALLOW_UPDATE, value)
return self
[docs]
def dormant(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.DORMANT, value)
return self
[docs]
def invert_led(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.LED_INV, value)
return self
[docs]
def protect_slot2(self: Cfg, value: bool) -> Cfg:
self._update_flags(TKTFLAG.PROTECT_CFG2, value)
return self
[docs]
class HmacSha1SlotConfiguration(SlotConfiguration):
def __init__(self, key: bytes):
super(HmacSha1SlotConfiguration, self).__init__()
key = _shorten_hmac_key(key)
# Key is packed into key and uid
self._key = key[:KEY_SIZE].ljust(KEY_SIZE, b"\0")
self._uid = key[KEY_SIZE:].ljust(UID_SIZE, b"\0")
self._update_flags(TKTFLAG.CHAL_RESP, True)
self._update_flags(CFGFLAG.CHAL_HMAC, True)
self._update_flags(CFGFLAG.HMAC_LT64, True)
[docs]
def is_supported_by(self, version):
return version >= (2, 2, 0) or version[0] == 0
[docs]
def require_touch(self: Cfg, value: bool) -> Cfg:
self._update_flags(CFGFLAG.CHAL_BTN_TRIG, value)
return self
[docs]
def lt64(self: Cfg, value: bool) -> Cfg:
self._update_flags(CFGFLAG.HMAC_LT64, value)
return self
[docs]
class KeyboardSlotConfiguration(SlotConfiguration):
def __init__(self):
super(KeyboardSlotConfiguration, self).__init__()
self._update_flags(TKTFLAG.APPEND_CR, True)
self._update_flags(EXTFLAG.FAST_TRIG, True)
[docs]
def append_cr(self: Cfg, value: bool) -> Cfg:
self._update_flags(TKTFLAG.APPEND_CR, value)
return self
[docs]
def fast_trigger(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.FAST_TRIG, value)
return self
[docs]
def pacing(self: Cfg, pacing_10ms: bool = False, pacing_20ms: bool = False) -> Cfg:
self._update_flags(CFGFLAG.PACING_10MS, pacing_10ms)
self._update_flags(CFGFLAG.PACING_20MS, pacing_20ms)
return self
[docs]
def use_numeric(self: Cfg, value: bool) -> Cfg:
self._update_flags(EXTFLAG.USE_NUMERIC_KEYPAD, value)
return self
[docs]
class HotpSlotConfiguration(KeyboardSlotConfiguration):
def __init__(self, key: bytes):
super(HotpSlotConfiguration, self).__init__()
key = _shorten_hmac_key(key)
# Key is packed into key and uid
self._key = key[:KEY_SIZE].ljust(KEY_SIZE, b"\0")
self._uid = key[KEY_SIZE:].ljust(UID_SIZE, b"\0")
self._update_flags(TKTFLAG.OATH_HOTP, True)
self._update_flags(CFGFLAG.OATH_FIXED_MODHEX2, True)
[docs]
def is_supported_by(self, version):
return version >= (2, 2, 0) or version[0] == 0
[docs]
def digits8(self: Cfg, value: bool) -> Cfg:
self._update_flags(CFGFLAG.OATH_HOTP8, value)
return self
[docs]
def token_id(
self: Cfg,
token_id: bytes,
fixed_modhex1: bool = False,
fixed_modhex2: bool = True,
) -> Cfg:
if len(token_id) > FIXED_SIZE:
raise ValueError(f"token_id must be <= {FIXED_SIZE} bytes")
self._fixed = token_id
self._update_flags(CFGFLAG.OATH_FIXED_MODHEX1, fixed_modhex1)
self._update_flags(CFGFLAG.OATH_FIXED_MODHEX2, fixed_modhex2)
return self
[docs]
def imf(self: Cfg, imf: int) -> Cfg:
if not (imf % 16 == 0 and 0 <= imf <= 0xFFFF0):
raise ValueError(
f"imf should be between {0} and {1048560}, evenly dividable by 16"
)
self._uid = self._uid[:4] + struct.pack(">H", imf >> 4)
return self
[docs]
class StaticPasswordSlotConfiguration(KeyboardSlotConfiguration):
def __init__(self, scan_codes: bytes):
super(StaticPasswordSlotConfiguration, self).__init__()
if len(scan_codes) > SCAN_CODES_SIZE:
raise NotSupportedError("Password is too long")
# Scan codes are packed into fixed, uid, and key
scan_codes = scan_codes.ljust(SCAN_CODES_SIZE, b"\0")
self._fixed = scan_codes[:FIXED_SIZE]
self._uid = scan_codes[FIXED_SIZE : FIXED_SIZE + UID_SIZE]
self._key = scan_codes[FIXED_SIZE + UID_SIZE :]
self._update_flags(CFGFLAG.SHORT_TICKET, True)
[docs]
def is_supported_by(self, version):
return version >= (2, 2, 0) or version[0] == 0
[docs]
class YubiOtpSlotConfiguration(KeyboardSlotConfiguration):
def __init__(self, fixed: bytes, uid: bytes, key: bytes):
super(YubiOtpSlotConfiguration, self).__init__()
if len(fixed) > FIXED_SIZE:
raise ValueError(f"fixed must be <= {FIXED_SIZE} bytes")
if len(uid) != UID_SIZE:
raise ValueError(f"uid must be {UID_SIZE} bytes")
if len(key) != KEY_SIZE:
raise ValueError(f"key must be {KEY_SIZE} bytes")
self._fixed = fixed
self._uid = uid
self._key = key
[docs]
def tabs(
self: Cfg,
before: bool = False,
after_first: bool = False,
after_second: bool = False,
) -> Cfg:
self._update_flags(TKTFLAG.TAB_FIRST, before)
self._update_flags(TKTFLAG.APPEND_TAB1, after_first)
self._update_flags(TKTFLAG.APPEND_TAB2, after_second)
return self
[docs]
def delay(self: Cfg, after_first: bool = False, after_second: bool = False) -> Cfg:
self._update_flags(TKTFLAG.APPEND_DELAY1, after_first)
self._update_flags(TKTFLAG.APPEND_DELAY2, after_second)
return self
[docs]
def send_reference(self: Cfg, value: bool) -> Cfg:
self._update_flags(CFGFLAG.SEND_REF, value)
return self
[docs]
class StaticTicketSlotConfiguration(KeyboardSlotConfiguration):
def __init__(self, fixed: bytes, uid: bytes, key: bytes):
super(StaticTicketSlotConfiguration, self).__init__()
if len(fixed) > FIXED_SIZE:
raise ValueError(f"fixed must be <= {FIXED_SIZE} bytes")
if len(uid) != UID_SIZE:
raise ValueError(f"uid must be {UID_SIZE} bytes")
if len(key) != KEY_SIZE:
raise ValueError(f"key must be {KEY_SIZE} bytes")
self._fixed = fixed
self._uid = uid
self._key = key
self._update_flags(CFGFLAG.STATIC_TICKET, True)
[docs]
def short_ticket(self: Cfg, value: bool) -> Cfg:
self._update_flags(CFGFLAG.SHORT_TICKET, value)
return self
[docs]
def strong_password(
self: Cfg, upper_case: bool = False, digit: bool = False, special: bool = False
) -> Cfg:
self._update_flags(CFGFLAG.STRONG_PW1, upper_case)
self._update_flags(CFGFLAG.STRONG_PW2, digit or special)
self._update_flags(CFGFLAG.SEND_REF, special)
return self
[docs]
def manual_update(self: Cfg, value: bool) -> Cfg:
self._update_flags(CFGFLAG.MAN_UPDATE, value)
return self
[docs]
class UpdateConfiguration(KeyboardSlotConfiguration):
def __init__(self):
super(UpdateConfiguration, self).__init__()
self._fixed = b"\0" * FIXED_SIZE
self._uid = b"\0" * UID_SIZE
self._key = b"\0" * KEY_SIZE
[docs]
def is_supported_by(self, version):
return version >= (2, 2, 0) or version[0] == 0
def _update_flags(self, flag, value):
# NB: All EXT flags are allowed
if isinstance(flag, TKTFLAG):
if not TKTFLAG_UPDATE_MASK & flag:
raise ValueError("Unsupported TKT flag for update")
elif isinstance(flag, CFGFLAG):
if not CFGFLAG_UPDATE_MASK & flag:
raise ValueError("Unsupported CFG flag for update")
super(UpdateConfiguration, self)._update_flags(flag, value)
[docs]
def protect_slot2(self: Cfg, value):
raise ValueError("protect_slot2 cannot be applied to UpdateConfiguration")
[docs]
def tabs(
self: Cfg,
before: bool = False,
after_first: bool = False,
after_second: bool = False,
) -> Cfg:
self._update_flags(TKTFLAG.TAB_FIRST, before)
self._update_flags(TKTFLAG.APPEND_TAB1, after_first)
self._update_flags(TKTFLAG.APPEND_TAB2, after_second)
return self
[docs]
def delay(self: Cfg, after_first: bool = False, after_second: bool = False) -> Cfg:
self._update_flags(TKTFLAG.APPEND_DELAY1, after_first)
self._update_flags(TKTFLAG.APPEND_DELAY2, after_second)
return self
[docs]
class ConfigState:
"""The configuration state of the YubiOTP application."""
def __init__(self, version: Version, touch_level: int):
self.version = version
self.flags = sum(CFGSTATE) & touch_level
[docs]
def is_touch_triggered(self, slot: SLOT) -> bool:
"""Checks if a (programmed) state is triggered by touch (not challenge-response)
Requires YubiKey 3 or later.
"""
require_version(self.version, (3, 0, 0))
return self.flags & (CFGSTATE.SLOT1_TOUCH, CFGSTATE.SLOT2_TOUCH)[slot - 1] != 0
[docs]
def is_led_inverted(self) -> bool:
"""Checks if the LED behavior is inverted."""
return self.flags & CFGSTATE.LED_INV != 0
def __repr__(self):
items = []
try:
items.append(
"configured: (%s, %s)"
% (self.is_configured(SLOT.ONE), self.is_configured(SLOT.TWO))
)
items.append(
"touch_triggered: (%s, %s)"
% (self.is_touch_triggered(SLOT.ONE), self.is_touch_triggered(SLOT.TWO))
)
items.append("led_inverted: %s" % self.is_led_inverted())
except NotSupportedError:
pass
return f"ConfigState({', '.join(items)})"
class _Backend(abc.ABC):
@abc.abstractmethod
def close(self) -> None:
...
@abc.abstractmethod
def write_update(self, slot: CONFIG_SLOT, data: bytes) -> bytes:
...
@abc.abstractmethod
def send_and_receive(
self,
slot: CONFIG_SLOT,
data: bytes,
expected_len: int,
event: Optional[Event] = None,
on_keepalive: Optional[Callable[[int], None]] = None,
) -> bytes:
...
class _YubiOtpOtpBackend(_Backend):
def __init__(self, protocol):
self.protocol = protocol
def close(self):
self.protocol.close()
def write_update(self, slot, data):
return self.protocol.send_and_receive(slot, data)
def send_and_receive(self, slot, data, expected_len, event=None, on_keepalive=None):
response = self.protocol.send_and_receive(slot, data, event, on_keepalive)
if check_crc(response[: expected_len + 2]):
return response[:expected_len]
raise BadResponseError("Invalid CRC")
INS_CONFIG = 0x01
INS_YK2_STATUS = 0x03
class _YubiOtpSmartCardBackend(_Backend):
def __init__(self, protocol, version, prog_seq):
self.protocol = protocol
self._version = version
self._prog_seq = prog_seq
def close(self):
self.protocol.close()
def write_update(self, slot, data):
status = self.protocol.send_apdu(0, INS_CONFIG, slot, 0, data)
if not status: # Some commands don't return status on some YubiKeys
status = self.protocol.send_apdu(0, INS_YK2_STATUS, 0, 0)
prev_prog_seq, self._prog_seq = self._prog_seq, status[3]
if self._prog_seq == prev_prog_seq + 1:
return status
if self._prog_seq == 0 and prev_prog_seq > 0:
version = Version.from_bytes(status[:3])
if status[4] & 0x1F == 0:
return status
if (5, 0, 0) <= version < (5, 4, 3): # Programming state does not update
return status
raise CommandRejectedError("Not updated")
def send_and_receive(self, slot, data, expected_len, event=None, on_keepalive=None):
response = self.protocol.send_apdu(0, INS_CONFIG, slot, 0, data)
if expected_len == len(response):
return response
raise BadResponseError("Unexpected response length")
[docs]
class YubiOtpSession:
"""A session with the YubiOTP application."""
def __init__(
self,
connection: Union[OtpConnection, SmartCardConnection],
scp_key_params: Optional[ScpKeyParams] = None,
):
if isinstance(connection, OtpConnection):
if scp_key_params:
raise ValueError("SCP can only be used with SmartCardConnection")
otp_protocol = OtpProtocol(connection)
self._status = otp_protocol.read_status()
self._version = otp_protocol.version
self.backend: _Backend = _YubiOtpOtpBackend(otp_protocol)
elif isinstance(connection, SmartCardConnection):
card_protocol = SmartCardProtocol(connection)
mgmt_version = None
if connection.transport == TRANSPORT.NFC:
# This version is more reliable over NFC
try:
card_protocol.select(AID.MANAGEMENT)
select_str = card_protocol.select(AID.MANAGEMENT).decode()
mgmt_version = Version.from_string(select_str)
except ApplicationNotAvailableError:
pass # Not available (probably NEO), get version from status
self._status = card_protocol.select(AID.OTP)
otp_version = Version.from_bytes(self._status[:3])
if mgmt_version and mgmt_version[0] == 3:
# NEO reports the highest of these two
self._version = max(mgmt_version, otp_version)
else:
self._version = mgmt_version or otp_version
card_protocol.configure(self._version)
if scp_key_params:
card_protocol.init_scp(scp_key_params)
self.backend = _YubiOtpSmartCardBackend(
card_protocol, self._version, self._status[3]
)
else:
raise TypeError("Unsupported connection type")
logger.debug(
"YubiOTP session initialized for "
f"connection={type(connection).__name__}, version={self.version}, "
f"state={self.get_config_state()}"
)
[docs]
def close(self) -> None:
"""Close the underlying connection.
:deprecated: call .close() on the underlying connection instead.
"""
warnings.warn(
"Deprecated: call .close() on the underlying connection instead.",
DeprecationWarning,
)
self.backend.close()
@property
def version(self) -> Version:
"""The version of the Yubico OTP application,
typically the same as the YubiKey firmware."""
return self._version
[docs]
def get_serial(self) -> int:
"""Get serial number."""
return bytes2int(
self.backend.send_and_receive(CONFIG_SLOT.DEVICE_SERIAL, b"", 4)
)
[docs]
def get_config_state(self) -> ConfigState:
"""Get configuration state of the YubiOTP application."""
return ConfigState(self.version, struct.unpack("<H", self._status[4:6])[0])
def _write_config(self, slot, config, cur_acc_code):
has_acc = bool(cur_acc_code)
logger.debug(f"Writing configuration to slot {slot}, access code: {has_acc}")
self._status = self.backend.write_update(
slot, config + (cur_acc_code or b"\0" * ACC_CODE_SIZE)
)
logger.info("Configuration written")
[docs]
def put_configuration(
self,
slot: SLOT,
configuration: SlotConfiguration,
acc_code: Optional[bytes] = None,
cur_acc_code: Optional[bytes] = None,
) -> None:
"""Write configuration to slot.
:param slot: The slot to configure.
:param configuration: The slot configuration.
:param acc_code: The new access code.
:param cur_acc_code: The current access code.
"""
if not configuration.is_supported_by(self.version):
raise NotSupportedError(
"This configuration is not supported on this YubiKey version"
)
slot = SLOT(slot)
logger.debug(
f"Writing configuration of type {type(configuration).__name__} to "
f"slot {slot}"
)
self._write_config(
SLOT.map(slot, CONFIG_SLOT.CONFIG_1, CONFIG_SLOT.CONFIG_2),
configuration.get_config(acc_code),
cur_acc_code,
)
[docs]
def update_configuration(
self,
slot: SLOT,
configuration: SlotConfiguration,
acc_code: Optional[bytes] = None,
cur_acc_code: Optional[bytes] = None,
) -> None:
"""Update configuration in slot.
:param slot: The slot to update the configuration in.
:param configuration: The slot configuration.
:param acc_code: The new access code.
:param cur_acc_code: The current access code.
"""
if not configuration.is_supported_by(self.version):
raise NotSupportedError(
"This configuration is not supported on this YubiKey version"
)
if acc_code != cur_acc_code and (4, 3, 2) <= self.version < (4, 3, 6):
raise NotSupportedError(
"The access code cannot be updated on this YubiKey. "
"Instead, delete the slot and configure it anew."
)
slot = SLOT(slot)
logger.debug(f"Writing configuration update to slot {slot}")
self._write_config(
SLOT.map(slot, CONFIG_SLOT.UPDATE_1, CONFIG_SLOT.UPDATE_2),
configuration.get_config(acc_code),
cur_acc_code,
)
[docs]
def swap_slots(self) -> None:
"""Swap the two slot configurations."""
logger.debug("Swapping touch slots")
self._write_config(CONFIG_SLOT.SWAP, b"", None)
[docs]
def delete_slot(self, slot: SLOT, cur_acc_code: Optional[bytes] = None) -> None:
"""Delete configuration stored in slot.
:param slot: The slot to delete the configuration in.
:param cur_acc_code: The current access code.
"""
slot = SLOT(slot)
logger.debug(f"Deleting slot {slot}")
self._write_config(
SLOT.map(slot, CONFIG_SLOT.CONFIG_1, CONFIG_SLOT.CONFIG_2),
b"\0" * CONFIG_SIZE,
cur_acc_code,
)
[docs]
def set_scan_map(
self, scan_map: bytes, cur_acc_code: Optional[bytes] = None
) -> None:
"""Update scan-codes on YubiKey.
This updates the scan-codes (or keyboard presses) that the YubiKey
will use when typing out OTPs.
"""
logger.debug("Writing scan map")
self._write_config(CONFIG_SLOT.SCAN_MAP, scan_map, cur_acc_code)
[docs]
def set_ndef_configuration(
self,
slot: SLOT,
uri: Optional[str] = None,
cur_acc_code: Optional[bytes] = None,
ndef_type: NDEF_TYPE = NDEF_TYPE.URI,
) -> None:
"""Configure a slot to be used over NDEF (NFC).
:param slot: The slot to configure.
:param uri: URI or static text.
:param cur_acc_code: The current access code.
:param ndef_type: The NDEF type (text or URI).
"""
slot = SLOT(slot)
logger.debug(f"Writing NDEF configuration for slot {slot} of type {ndef_type}")
self._write_config(
SLOT.map(slot, CONFIG_SLOT.NDEF_1, CONFIG_SLOT.NDEF_2),
_build_ndef_config(uri, ndef_type),
cur_acc_code,
)
[docs]
def calculate_hmac_sha1(
self,
slot: SLOT,
challenge: bytes,
event: Optional[Event] = None,
on_keepalive: Optional[Callable[[int], None]] = None,
) -> bytes:
"""Perform a challenge-response operation using HMAC-SHA1.
:param slot: The slot to perform the operation against.
:param challenge: The challenge.
:param event: An event.
"""
require_version(self.version, (2, 2, 0))
slot = SLOT(slot)
logger.debug(f"Calculating response for slot {slot}")
# Pad challenge with byte different from last
challenge = challenge.ljust(
HMAC_CHALLENGE_SIZE, b"\1" if challenge.endswith(b"\0") else b"\0"
)
return self.backend.send_and_receive(
SLOT.map(slot, CONFIG_SLOT.CHAL_HMAC_1, CONFIG_SLOT.CHAL_HMAC_2),
challenge,
HMAC_RESPONSE_SIZE,
event,
on_keepalive,
)