from .core import Tlv, int2bytes, BadResponseError, Version
from .core.smartcard import (
AID,
SmartCardConnection,
SmartCardProtocol,
ApduError,
SW,
ScpProcessor,
)
from .core.smartcard.scp import (
INS_INITIALIZE_UPDATE,
INS_EXTERNAL_AUTHENTICATE,
INS_INTERNAL_AUTHENTICATE,
INS_PERFORM_SECURITY_OPERATION,
KeyRef,
ScpKid,
ScpKeyParams,
StaticKeys,
)
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric import ec
from typing import Mapping, Sequence, Union, cast
from enum import IntEnum, unique
import logging
logger = logging.getLogger(__name__)
INS_GET_DATA = 0xCA
INS_PUT_KEY = 0xD8
INS_STORE_DATA = 0xE2
INS_DELETE = 0xE4
INS_GENERATE_KEY = 0xF1
TAG_KEY_INFORMATION = 0xE0
TAG_CARD_RECOGNITION_DATA = 0x66
TAG_CA_KLOC_IDENTIFIERS = 0xFF33
TAG_CA_KLCC_IDENTIFIERS = 0xFF34
TAG_CERTIFICATE_STORE = 0xBF21
[docs]
@unique
class KeyType(IntEnum):
AES = 0x88
ECC_PUBLIC_KEY = 0xB0
ECC_PRIVATE_KEY = 0xB1
ECC_KEY_PARAMS = 0xF0
_DEFAULT_KCV_IV = b"\1" * 16
[docs]
@unique
class Curve(IntEnum):
SECP256R1 = 0x00
SECP384R1 = 0x01
SECP521R1 = 0x02
BrainpoolP256R1 = 0x03
BrainpoolP384R1 = 0x05
BrainpoolP512R1 = 0x07
@classmethod
def _from_key(cls, private_key: ec.EllipticCurvePrivateKey) -> "Curve":
name = private_key.curve.name.lower()
for curve in cls:
if curve.name.lower() == name:
return curve
raise ValueError("Unsupported private key")
@property
def _curve(self) -> ec.EllipticCurve:
return getattr(ec, self.name)()
def _int2asn1(value: int) -> bytes:
bs = int2bytes(value)
if bs[0] & 0x80:
bs = b"\x00" + bs
return Tlv(0x93, bs)
def _encrypt_cbc(key: bytes, data: bytes, iv: bytes = b"\0" * 16) -> bytes:
encryptor = Cipher(
algorithms.AES(key),
modes.CBC(iv),
backend=default_backend(),
).encryptor()
return encryptor.update(data) + encryptor.finalize()
[docs]
class SecurityDomainSession:
"""A session for managing SCP keys"""
def __init__(self, connection: SmartCardConnection):
self.protocol = SmartCardProtocol(connection)
self.protocol.select(AID.SECURE_DOMAIN)
# We don't know the exact version, but this is the minimum that supports SCP
self.protocol.configure(Version(5, 3, 0))
logger.debug("SecurityDomain session initialized")
[docs]
def authenticate(self, key_params: ScpKeyParams) -> None:
"""Initialize SCP and authenticate the session.
SCP11b does not authenticate the OCE, and will not allow the usage of commands
which require authentication of the OCE.
"""
self.protocol.init_scp(key_params)
[docs]
def get_data(self, tag: int, data: bytes = b"") -> bytes:
"""Read data from the security domain."""
return self.protocol.send_apdu(0, INS_GET_DATA, tag >> 8, tag & 0xFF, data)
[docs]
def get_key_information(self) -> Mapping[KeyRef, Mapping[int, int]]:
"""Get information about the currently loaded keys."""
# 11.3.3.1.1 Key Information Template ('E0')
keys = {}
for d in Tlv.parse_list(self.get_data(TAG_KEY_INFORMATION)):
data = Tlv.unpack(0xC0, d)
keys[KeyRef(data[:2])] = dict(zip(data[2::2], data[3::2]))
return keys
[docs]
def get_card_recognition_data(self) -> bytes:
"""Get information about the card."""
# 7.4.1.3 Card Recognition Data
return Tlv.unpack(0x73, self.get_data(TAG_CARD_RECOGNITION_DATA))
[docs]
def get_supported_ca_identifiers(
self, kloc: bool = False, klcc: bool = False
) -> Mapping[KeyRef, bytes]:
"""Get a list of the CA issuer Subject Key Identifiers for keys.
Setting one of kloc or klcc to True will cause only those CAs to be returned.
By default, this will get both KLOC and KLCC CAs.
:param kloc: Get KLOC CAs.
:param klcc: Get KLCC CAs.
"""
if not kloc and not klcc:
kloc = klcc = True
logger.debug(f"Getting CA identifiers KLOC={kloc}, KLCC={klcc}")
data = b""
# Combine CA list for KLCC and KLOC
for fetch, tag in (
(kloc, TAG_CA_KLOC_IDENTIFIERS),
(klcc, TAG_CA_KLCC_IDENTIFIERS),
):
if fetch:
try:
data += self.get_data(tag)
except ApduError as e:
if e.sw != SW.REFERENCE_DATA_NOT_FOUND:
raise
tlvs = Tlv.parse_list(data)
return {
KeyRef(tlvs[i + 1].value): tlvs[i].value for i in range(0, len(tlvs), 2)
}
[docs]
def get_certificate_bundle(self, key: KeyRef) -> Sequence[x509.Certificate]:
"""Get the certificates associated with the given SCP11 private key.
Certificates are returned leaf-last.
"""
logger.debug(f"Getting certificate bundle for {key}")
try:
return [
x509.load_der_x509_certificate(cert)
for cert in Tlv.parse_list(
self.get_data(TAG_CERTIFICATE_STORE, Tlv(0xA6, Tlv(0x83, key)))
)
]
except ApduError as e:
if e.sw == SW.REFERENCE_DATA_NOT_FOUND:
return []
raise
[docs]
def reset(self) -> None:
"""Perform a factory reset of the Security Domain.
This will remove all keys and associated data, as well as restore the default
SCP03 static keys, and generate a new (attestable) SCP11b key.
"""
logger.debug("Resetting all SCP keys")
# Reset is done by blocking all available keys
data = b"\0" * 8
for key in self.get_key_information().keys():
if key.kid == 0x01:
# SCP03 uses KID=0, we use KVN=0 to allow deleting the default keys
# which have an invalid KVN (0xff).
key = KeyRef(0, 0)
ins = INS_INITIALIZE_UPDATE
elif key.kid in (0x02, 0x03):
continue # Skip these, will be deleted by 0x01
elif key.kid in (0x11, 0x15):
ins = INS_EXTERNAL_AUTHENTICATE
elif key.kid == 0x13:
ins = INS_INTERNAL_AUTHENTICATE
else: # 0x10, 0x20-0x2F
ins = INS_PERFORM_SECURITY_OPERATION
for _ in range(65):
try:
self.protocol.send_apdu(0x80, ins, key.kvn, key.kid, data)
except ApduError as e:
if e.sw in (
SW.AUTH_METHOD_BLOCKED,
SW.SECURITY_CONDITION_NOT_SATISFIED,
):
break
elif e.sw == SW.INCORRECT_PARAMETERS:
continue
raise
logger.info("SCP keys reset")
[docs]
def store_data(self, data: bytes) -> None:
"""Stores data in the security domain.
Requires OCE verification.
"""
self.protocol.send_apdu(0, INS_STORE_DATA, 0x90, 0, data)
[docs]
def store_certificate_bundle(
self, key: KeyRef, certificates: Sequence[x509.Certificate]
) -> None:
"""Store the certificate chain for the given key.
Requires OCE verification.
Certificates should be in order, with the leaf certificate last.
"""
logger.debug(f"Storing certificate bundle for {key}")
self.store_data(
Tlv(0xA6, Tlv(0x83, key))
+ Tlv(
TAG_CERTIFICATE_STORE,
b"".join(
c.public_bytes(serialization.Encoding.DER) for c in certificates
),
)
)
logger.info("Certificate bundle stored")
[docs]
def store_allowlist(self, key: KeyRef, serials: Sequence[int]) -> None:
"""Store which certificate serial numbers that can be used for a given key.
Requires OCE verification.
If no allowlist is stored, any certificate signed by the CA can be used.
"""
logger.debug(f"Storing serial allowlist for {key}")
self.store_data(
Tlv(0xA6, Tlv(0x83, key))
+ Tlv(0x70, b"".join(_int2asn1(s) for s in serials))
)
logger.info("Serial allowlist stored")
[docs]
def store_ca_issuer(self, key: KeyRef, ski: bytes) -> None:
"""Store the SKI (Subject Key Identifier) for the CA of a given key.
Requires OCE verification.
"""
logger.debug(f"Storing CA issuer SKI for {key}: {ski.hex()}")
klcc = key.kid in (ScpKid.SCP11a, ScpKid.SCP11b, ScpKid.SCP11c)
self.store_data(
Tlv(
0xA6,
Tlv(0x80, b"\1" if klcc else b"\0") + Tlv(0x42, ski) + Tlv(0x83, key),
)
)
logger.info("CA issuer SKI stored")
[docs]
def delete_key(self, kid: int = 0, kvn: int = 0, delete_last: bool = False) -> None:
"""Delete one (or more) keys.
Requires OCE verification.
All keys matching the given KID and/or KVN will be deleted.
To delete the final key you must set delete_last = True.
"""
if not kid and not kvn:
raise ValueError("Must specify at least one of kid, kvn.")
if kid in (1, 2, 3):
# SCP03 keys can only be deleted by KVN
if kvn:
kid = 0
else:
raise ValueError("SCP03 keys can only be deleted by KVN")
logger.debug(f"Deleting keys with KID={kid or 'ANY'}, KVN={kvn or 'ANY'}")
data = b""
if kid:
data += Tlv(0xD0, bytes([kid]))
if kvn:
data += Tlv(0xD2, bytes([kvn]))
self.protocol.send_apdu(0x80, INS_DELETE, 0, int(delete_last), data)
logger.info("Keys deleted")
[docs]
def generate_ec_key(
self, key: KeyRef, curve: Curve = Curve.SECP256R1, replace_kvn: int = 0
) -> ec.EllipticCurvePublicKey:
"""Generate a new SCP11 key.
Requires OCE verification.
Use replace_kvn to replace an existing key.
"""
logger.debug(
f"Generating new key for {key}"
+ (f", replacing KVN={replace_kvn}" if replace_kvn else "")
)
data = bytes([key.kvn]) + Tlv(KeyType.ECC_KEY_PARAMS, bytes([curve]))
resp = self.protocol.send_apdu(
0x80, INS_GENERATE_KEY, replace_kvn, key.kid, data
)
encoded_point = Tlv.unpack(KeyType.ECC_PUBLIC_KEY, resp)
logger.info("New key generated")
return ec.EllipticCurvePublicKey.from_encoded_point(curve._curve, encoded_point)
[docs]
def put_key(
self,
key: KeyRef,
sk: Union[StaticKeys, ec.EllipticCurvePrivateKey, ec.EllipticCurvePublicKey],
replace_kvn: int = 0,
) -> None:
"""Import an SCP key.
Requires OCE verification.
The value of the sk argument should match the SCP type as defined by the KID.
Use replace_kvn to replace an existing key.
"""
logger.debug(f"Importing key into {key} of type {type(sk)}")
processor = self.protocol._processor
if not isinstance(processor, ScpProcessor):
raise ValueError("Must be authenticated!")
data = bytes([key.kvn])
expected = data
dek = processor._state._keys.key_dek
p2 = key.kid
if isinstance(sk, StaticKeys):
if not dek:
raise ValueError("No session DEK key available")
if not sk.key_dek:
raise ValueError("New DEK must be set in static keys")
p2 |= 0x80
for k in cast(Sequence[bytes], sk):
kcv = _encrypt_cbc(k, _DEFAULT_KCV_IV)[:3]
data += Tlv(KeyType.AES, _encrypt_cbc(dek, k)) + bytes([len(kcv)]) + kcv
expected += kcv
else:
if isinstance(sk, ec.EllipticCurvePrivateKey):
if not dek:
raise ValueError("No session DEK key available")
n = (sk.key_size + 7) // 8
s = int2bytes(sk.private_numbers().private_value, n)
data += Tlv(KeyType.ECC_PRIVATE_KEY, _encrypt_cbc(dek, s))
elif isinstance(sk, ec.EllipticCurvePublicKey):
data += Tlv(
KeyType.ECC_PUBLIC_KEY,
sk.public_bytes(
serialization.Encoding.X962,
serialization.PublicFormat.UncompressedPoint,
),
)
else:
raise TypeError("Unsupported key type")
data += Tlv(KeyType.ECC_KEY_PARAMS, bytes([Curve._from_key(sk)])) + b"\0"
resp = self.protocol.send_apdu(0x80, INS_PUT_KEY, replace_kvn, p2, data)
if resp != expected:
raise BadResponseError("Incorrect key check value")
logger.info("Key imported")