# Copyright (c) 2020 Yubico AB
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or
# without modification, are permitted provided that the following
# conditions are met:
#
# 1. Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# 2. Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided
# with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
from . import Connection, CommandError, TimeoutError, Version, USB_INTERFACE
from yubikit.logging import LOG_LEVEL
from time import sleep
from threading import Event
from typing import Optional, Callable
import abc
import struct
import logging
logger = logging.getLogger(__name__)
MODHEX_ALPHABET = "cbdefghijklnrtuv"
[docs]
class CommandRejectedError(CommandError):
"""The issues command was rejected by the YubiKey"""
[docs]
class OtpConnection(Connection, metaclass=abc.ABCMeta):
usb_interface = USB_INTERFACE.OTP
[docs]
@abc.abstractmethod
def receive(self) -> bytes:
"""Reads an 8 byte feature report"""
[docs]
@abc.abstractmethod
def send(self, data: bytes) -> None:
"""Writes an 8 byte feature report"""
CRC_OK_RESIDUAL = 0xF0B8
[docs]
def calculate_crc(data: bytes) -> int:
crc = 0xFFFF
for index in range(len(data)):
crc ^= data[index]
for i in range(8):
j = crc & 1
crc >>= 1
if j == 1:
crc ^= 0x8408
return crc & 0xFFFF
[docs]
def check_crc(data: bytes) -> bool:
return calculate_crc(data) == CRC_OK_RESIDUAL
[docs]
def modhex_encode(data: bytes) -> str:
"""Encode a bytes-like object using Modhex (modified hexadecimal) encoding."""
return "".join(MODHEX_ALPHABET[b >> 4] + MODHEX_ALPHABET[b & 0xF] for b in data)
[docs]
def modhex_decode(string: str) -> bytes:
"""Decode the Modhex (modified hexadecimal) string."""
if len(string) % 2:
raise ValueError("Length must be a multiple of 2")
return bytes(
MODHEX_ALPHABET.index(string[i]) << 4 | MODHEX_ALPHABET.index(string[i + 1])
for i in range(0, len(string), 2)
)
FEATURE_RPT_SIZE = 8
FEATURE_RPT_DATA_SIZE = FEATURE_RPT_SIZE - 1
SLOT_DATA_SIZE = 64
FRAME_SIZE = SLOT_DATA_SIZE + 6
RESP_PENDING_FLAG = 0x40 # Response pending flag
SLOT_WRITE_FLAG = 0x80 # Write flag - set by app - cleared by device
RESP_TIMEOUT_WAIT_FLAG = 0x20 # Waiting for timeout operation
DUMMY_REPORT_WRITE = 0x8F # Write a dummy report to force update or abort
SEQUENCE_MASK = 0x1F
STATUS_OFFSET_PROG_SEQ = 0x4
STATUS_OFFSET_TOUCH_LOW = 0x5
CONFIG_STATUS_MASK = 0x1F
STATUS_PROCESSING = 1
STATUS_UPNEEDED = 2
def _should_send(packet, seq):
"""All-zero packets are skipped, except for the very first and last packets"""
return seq in (0, 9) or any(packet)
def _format_frame(slot, payload):
return payload + struct.pack("<BH", slot, calculate_crc(payload)) + b"\0\0\0"
[docs]
class OtpProtocol:
"""An implementation of the OTP protocol."""
def __init__(self, otp_connection: OtpConnection):
self.connection = otp_connection
report = self._receive()
self.version = Version.from_bytes(report[1:4])
if self.version[0] == 3: # NEO, may have cached pgmSeq in arbitrator
try: # Force communication with applet to refresh pgmSeq
# Write an invalid scan map, does nothing
self.send_and_receive(0x12, b"c" * 51)
except CommandRejectedError:
pass # This is expected
[docs]
def close(self) -> None:
self.connection.close()
[docs]
def send_and_receive(
self,
slot: int,
data: Optional[bytes] = None,
event: Optional[Event] = None,
on_keepalive: Optional[Callable[[int], None]] = None,
) -> bytes:
"""Sends a command to the YubiKey, and reads the response.
If the command results in a configuration update, the programming sequence
number is verified and the updated status bytes are returned.
:param slot: The slot to send to.
:param data: The data payload to send.
:param state: Optional CommandState for listening for user presence requirement
and for cancelling a command.
:return: Response data (including CRC) in the case of data, or an updated status
struct.
"""
payload = (data or b"").ljust(SLOT_DATA_SIZE, b"\0")
if len(payload) > SLOT_DATA_SIZE:
raise ValueError("Payload too large for HID frame")
if not on_keepalive:
on_keepalive = lambda x: None # noqa
frame = _format_frame(slot, payload)
logger.log(LOG_LEVEL.TRAFFIC, "SEND: %s", frame.hex())
response = self._read_frame(
self._send_frame(frame), event or Event(), on_keepalive
)
logger.log(LOG_LEVEL.TRAFFIC, "RECV: %s", response.hex())
return response
def _receive(self):
report = self.connection.receive()
if len(report) != FEATURE_RPT_SIZE:
raise Exception(
f"Incorrect feature report size (was {len(report)}, "
f"expected {FEATURE_RPT_SIZE})"
)
return report
[docs]
def read_status(self) -> bytes:
"""Receive status bytes from YubiKey.
:return: Status bytes (first 3 bytes are the firmware version).
:raises IOException: in case of communication error.
"""
return self._receive()[1:-1]
def _await_ready_to_write(self):
"""Sleep for up to ~1s waiting for the WRITE flag to be unset"""
for _ in range(20):
if (self._receive()[FEATURE_RPT_DATA_SIZE] & SLOT_WRITE_FLAG) == 0:
return
sleep(0.05)
raise Exception("Timeout waiting for YubiKey to become ready to receive")
def _send_frame(self, buf):
"""Sends a 70 byte frame"""
prog_seq = self._receive()[STATUS_OFFSET_PROG_SEQ]
seq = 0
while buf:
report, buf = buf[:FEATURE_RPT_DATA_SIZE], buf[FEATURE_RPT_DATA_SIZE:]
if _should_send(report, seq):
report += struct.pack(">B", 0x80 | seq)
self._await_ready_to_write()
self.connection.send(report)
seq += 1
return prog_seq
def _read_frame(self, prog_seq, event, on_keepalive):
"""Reads one frame"""
response = b""
seq = 0
needs_touch = False
try:
while True:
report = self._receive()
status_byte = report[FEATURE_RPT_DATA_SIZE]
if (status_byte & RESP_PENDING_FLAG) != 0: # Response packet
if seq == (status_byte & SEQUENCE_MASK):
# Correct sequence
response += report[:FEATURE_RPT_DATA_SIZE]
seq += 1
elif 0 == (status_byte & SEQUENCE_MASK):
# Transmission complete
self._reset_state()
return response
elif status_byte == 0: # Status response
next_prog_seq = report[STATUS_OFFSET_PROG_SEQ]
if response:
raise Exception("Incomplete transfer")
elif next_prog_seq == prog_seq + 1 or (
prog_seq > 0
and next_prog_seq == 0
and report[STATUS_OFFSET_TOUCH_LOW] & CONFIG_STATUS_MASK == 0
): # Note: If no valid configurations exist, prog_seq resets to 0.
# Sequence updated, return status.
return report[1:-1]
elif needs_touch:
raise TimeoutError("Timed out waiting for touch")
else:
raise CommandRejectedError("No data")
else: # Need to wait
if (status_byte & RESP_TIMEOUT_WAIT_FLAG) != 0:
on_keepalive(STATUS_UPNEEDED)
needs_touch = True
timeout = 0.1
else:
on_keepalive(STATUS_PROCESSING)
timeout = 0.02
sleep(timeout)
if event.wait(timeout):
self._reset_state()
raise TimeoutError("Command cancelled by Event")
except KeyboardInterrupt:
logger.debug("Keyboard interrupt, reset state...")
self._reset_state()
raise
def _reset_state(self):
"""Reset the state of YubiKey from reading"""
self.connection.send(b"\xff".rjust(FEATURE_RPT_SIZE, b"\0"))