# Copyright (c) 2015-2020 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from yubikit.core import Connection, PID, TRANSPORT, YUBIKEY
from yubikit.core.otp import OtpConnection
from yubikit.core.fido import FidoConnection
from yubikit.core.smartcard import SmartCardConnection
from yubikit.management import (
DeviceInfo,
USB_INTERFACE,
)
from yubikit.support import read_info
from .base import YkmanDevice
from .hid import (
list_otp_devices as _list_otp_devices,
list_ctap_devices as _list_ctap_devices,
)
from .pcsc import list_devices as _list_ccid_devices
from smartcard.pcsc.PCSCExceptions import EstablishContextException
from smartcard.Exceptions import NoCardException
from time import sleep, time
from collections import Counter
from typing import (
Dict,
Mapping,
List,
Tuple,
Iterable,
Type,
Hashable,
Set,
)
import sys
import ctypes
import logging
logger = logging.getLogger(__name__)
def _warn_once(message, e_type=Exception):
warned: List[bool] = []
def outer(f):
def inner():
try:
return f()
except e_type:
if not warned:
logger.warning(message)
warned.append(True)
raise
return inner
return outer
[docs]
@_warn_once(
"PC/SC not available. Smart card (CCID) protocols will not function.",
EstablishContextException,
)
def list_ccid_devices():
"""List CCID devices."""
return _list_ccid_devices()
[docs]
@_warn_once("No CTAP HID backend available. FIDO protocols will not function.")
def list_ctap_devices():
"""List CTAP devices."""
return _list_ctap_devices()
[docs]
@_warn_once("No OTP HID backend available. OTP protocols will not function.")
def list_otp_devices():
"""List OTP devices."""
return _list_otp_devices()
_CONNECTION_LIST_MAPPING = {
SmartCardConnection: list_ccid_devices,
OtpConnection: list_otp_devices,
FidoConnection: list_ctap_devices,
}
[docs]
def scan_devices() -> Tuple[Mapping[PID, int], int]:
"""Scan USB for attached YubiKeys, without opening any connections.
:return: A dict mapping PID to device count, and a state object which can be used to
detect changes in attached devices.
"""
fingerprints = set()
merged: Dict[PID, int] = {}
for list_devs in _CONNECTION_LIST_MAPPING.values():
try:
devs = list_devs()
except Exception:
logger.debug("Device listing error", exc_info=True)
devs = []
merged.update(Counter(d.pid for d in devs if d.pid is not None))
fingerprints.update({d.fingerprint for d in devs})
if sys.platform == "win32" and not bool(ctypes.windll.shell32.IsUserAnAdmin()):
from .hid.windows import list_paths
counter: Counter[PID] = Counter()
for pid, path in list_paths():
if pid not in merged:
try:
counter[PID(pid)] += 1
fingerprints.add(path)
except ValueError: # Unsupported PID
logger.debug(f"Unsupported Yubico device with PID: {pid:02x}")
merged.update(counter)
return merged, hash(tuple(fingerprints))
class _PidGroup:
def __init__(self, pid):
self._pid = pid
self._infos: Dict[Hashable, DeviceInfo] = {}
self._resolved: Dict[Hashable, Dict[USB_INTERFACE, YkmanDevice]] = {}
self._unresolved: Dict[USB_INTERFACE, List[YkmanDevice]] = {}
self._devcount: Dict[USB_INTERFACE, int] = Counter()
self._fingerprints: Set[Hashable] = set()
self._ctime = time()
def _key(self, info):
return (
info.serial,
info.version,
info.form_factor,
str(info.supported_capabilities),
info.config.get_bytes(False),
info.is_locked,
info.is_fips,
info.is_sky,
)
def add(self, conn_type, dev, force_resolve=False):
logger.debug(f"Add device for {conn_type}: {dev}")
iface = conn_type.usb_interface
self._fingerprints.add(dev.fingerprint)
self._devcount[iface] += 1
if force_resolve or len(self._resolved) < max(self._devcount.values()):
try:
with dev.open_connection(conn_type) as conn:
info = read_info(conn, dev.pid)
key = self._key(info)
self._infos[key] = info
self._resolved.setdefault(key, {})[iface] = dev
logger.debug(f"Resolved device {info.serial}")
return
except Exception:
logger.warning("Failed opening device", exc_info=True)
self._unresolved.setdefault(iface, []).append(dev)
def supports_connection(self, conn_type):
return conn_type.usb_interface in self._devcount
def connect(self, key, conn_type):
iface = conn_type.usb_interface
resolved = self._resolved[key].get(iface)
if resolved:
return resolved.open_connection(conn_type)
devs = self._unresolved.get(iface, [])
failed = []
try:
while devs:
dev = devs.pop()
try:
conn = dev.open_connection(conn_type)
info = read_info(conn, dev.pid)
dev_key = self._key(info)
if dev_key in self._infos:
self._resolved.setdefault(dev_key, {})[iface] = dev
logger.debug(f"Resolved device {info.serial}")
if dev_key == key:
return conn
elif self._pid.yubikey_type == YUBIKEY.NEO and not devs:
self._resolved.setdefault(key, {})[iface] = dev
logger.debug("Resolved last NEO device without serial")
return conn
conn.close()
except Exception:
logger.warning("Failed opening device", exc_info=True)
failed.append(dev)
finally:
devs.extend(failed)
if self._devcount[iface] < len(self._infos):
logger.debug(f"Checking for more devices over {iface!s}")
for dev in _CONNECTION_LIST_MAPPING[conn_type]():
if self._pid == dev.pid and dev.fingerprint not in self._fingerprints:
self.add(conn_type, dev, True)
resolved = self._resolved[key].get(iface)
if resolved:
return resolved.open_connection(conn_type)
# Retry if we are within a 5 second period after creation,
# as not all USB interface become usable at the exact same time.
if time() < self._ctime + 5:
logger.debug("Device not found, retry in 1s")
sleep(1.0)
return self.connect(key, conn_type)
raise ValueError("Failed to connect to the device")
def get_devices(self):
results = []
for key, info in self._infos.items():
dev = next(iter(self._resolved[key].values()))
results.append(
(_UsbCompositeDevice(self, key, dev.fingerprint, dev.pid), info)
)
return results
class _UsbCompositeDevice(YkmanDevice):
def __init__(self, group, key, fingerprint, pid):
super().__init__(TRANSPORT.USB, fingerprint, pid)
self._group = group
self._key = key
def supports_connection(self, connection_type):
return self._group.supports_connection(connection_type)
def open_connection(self, connection_type):
if not self.supports_connection(connection_type):
raise ValueError("Unsupported Connection type")
# Allow for ~3s reclaim time on NEO for CCID
assert self.pid # nosec
if self.pid.yubikey_type == YUBIKEY.NEO and issubclass(
connection_type, SmartCardConnection
):
for _ in range(6):
try:
return self._group.connect(self._key, connection_type)
except (NoCardException, ValueError):
sleep(0.5)
return self._group.connect(self._key, connection_type)
[docs]
def list_all_devices(
connection_types: Iterable[Type[Connection]] = _CONNECTION_LIST_MAPPING.keys(),
) -> List[Tuple[YkmanDevice, DeviceInfo]]:
"""Connect to all attached YubiKeys and read device info from them.
:param connection_types: An iterable of YubiKey connection types.
:return: A list of (device, info) tuples for each connected device.
"""
groups: Dict[PID, _PidGroup] = {}
for connection_type in connection_types:
for base_type in _CONNECTION_LIST_MAPPING:
if issubclass(connection_type, base_type):
connection_type = base_type
break
else:
raise ValueError("Invalid connection type")
try:
for dev in _CONNECTION_LIST_MAPPING[connection_type]():
group = groups.setdefault(dev.pid, _PidGroup(dev.pid))
group.add(connection_type, dev)
except Exception:
logger.exception("Unable to list devices for connection")
devices = []
for group in groups.values():
devices.extend(group.get_devices())
return devices