Compare commits

..

3 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 0295e78113 add doc 2022-08-26 12:37:01 -07:00
Gilles Boccon-Gibod 16217cf758 fix logic test 2022-08-25 16:26:59 -07:00
Gilles Boccon-Gibod 0568cead47 add usb_probe tool and improve compatibility with older/non-compliant devices 2022-08-25 16:04:52 -07:00
6 changed files with 42 additions and 144 deletions
-2
View File
@@ -497,8 +497,6 @@ class Device(CompositeEventEmitter):
self.smp_manager = smp.Manager(self, self.random_address)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_CID, self.on_smp_pdu)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_BR_CID, self.on_smp_pdu)
# Register the SDP server with the L2CAP Channel Manager
self.sdp_server.register(self.l2cap_channel_manager)
-5
View File
@@ -18,8 +18,6 @@
import logging
from colors import color
from bumble.smp import SMP_CID, SMP_Command
from .core import name_or_number
from .gatt import ATT_PDU, ATT_CID
from .l2cap import (
@@ -75,9 +73,6 @@ class PacketTracer:
if l2cap_pdu.cid == ATT_CID:
att_pdu = ATT_PDU.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(att_pdu)
elif l2cap_pdu.cid == SMP_CID:
smp_command = SMP_Command.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(smp_command)
elif l2cap_pdu.cid == L2CAP_SIGNALING_CID or l2cap_pdu.cid == L2CAP_LE_SIGNALING_CID:
control_frame = L2CAP_Control_Frame.from_bytes(l2cap_pdu.payload)
self.analyzer.emit(control_frame)
+3 -4
View File
@@ -44,13 +44,12 @@ HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, host, handle, role, peer_address, transport):
def __init__(self, host, handle, role, peer_address):
self.host = host
self.handle = handle
self.role = role
self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
def on_hci_acl_data_packet(self, packet):
self.assembler.feed_packet(packet)
@@ -365,7 +364,7 @@ class Host(EventEmitter):
connection = self.connections.get(event.connection_handle)
if connection is None:
connection = Connection(self, event.connection_handle, event.role, event.peer_address, BT_LE_TRANSPORT)
connection = Connection(self, event.connection_handle, event.role, event.peer_address)
self.connections[event.connection_handle] = connection
# Notify the client
@@ -400,7 +399,7 @@ class Host(EventEmitter):
connection = self.connections.get(event.connection_handle)
if connection is None:
connection = Connection(self, event.connection_handle, BT_CENTRAL_ROLE, event.bd_addr, BT_BR_EDR_TRANSPORT)
connection = Connection(self, event.connection_handle, BT_CENTRAL_ROLE, event.bd_addr)
self.connections[event.connection_handle] = connection
# Notify the client
+15 -50
View File
@@ -44,7 +44,6 @@ logger = logging.getLogger(__name__)
# Constants
# -----------------------------------------------------------------------------
SMP_CID = 0x06
SMP_BR_CID = 0x07
SMP_PAIRING_REQUEST_COMMAND = 0x01
SMP_PAIRING_RESPONSE_COMMAND = 0x02
@@ -153,8 +152,6 @@ SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
# -----------------------------------------------------------------------------
# Utils
@@ -601,7 +598,6 @@ class Session:
self.pairing_config = pairing_config
self.wait_before_continuing = None
self.completed = False
self.ctkd_task = None
# Decide if we're the initiator or the responder
self.is_initiator = (connection.role == BT_CENTRAL_ROLE)
@@ -881,21 +877,10 @@ class Session:
)
)
async def derive_ltk(self):
link_key = await self.manager.device.get_link_key(self.connection.peer_address)
assert link_key is not None
ilk = crypto.h7(
salt=SMP_CTKD_H7_BRLE_SALT,
w=link_key) if self.ct2 else crypto.h6(link_key, b'tmp2')
self.ltk = crypto.h6(ilk, b'brle')
def distribute_keys(self):
# Distribute the keys as required
if self.is_initiator:
# CTKD: Derive LTK from LinkKey
if self.connection.transport == BT_BR_EDR_TRANSPORT and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.ctkd_task = asyncio.create_task(self.derive_ltk())
elif not self.sc:
if not self.sc:
# Distribute the LTK, EDIV and RAND
if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
@@ -915,7 +900,7 @@ class Session:
csrk = bytes(16) # FIXME: testing
if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
@@ -924,11 +909,8 @@ class Session:
self.link_key = crypto.h6(ilk, b'lebr')
else:
# CTKD: Derive LTK from LinkKey
if self.connection.transport == BT_BR_EDR_TRANSPORT and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.ctkd_task = asyncio.create_task(self.derive_ltk())
# Distribute the LTK, EDIV and RAND
elif not self.sc:
if not self.sc:
if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
@@ -947,7 +929,7 @@ class Session:
csrk = bytes(16) # FIXME: testing
if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
@@ -958,7 +940,7 @@ class Session:
def compute_peer_expected_distributions(self, key_distribution_flags):
# Set our expectations for what to wait for in the key distribution phase
self.peer_expected_distributions = []
if not self.sc and self.connection.transport == BT_LE_TRANSPORT:
if not self.sc:
if (key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0):
self.peer_expected_distributions.append(SMP_Encryption_Information_Command)
self.peer_expected_distributions.append(SMP_Master_Identification_Command)
@@ -981,7 +963,12 @@ class Session:
self.peer_expected_distributions.remove(command_class)
logger.debug(f'remaining distributions: {[c.__name__ for c in self.peer_expected_distributions]}')
if not self.peer_expected_distributions:
self.on_peer_key_distribution_complete()
# The initiator can now send its keys
if self.is_initiator:
self.distribute_keys()
# Nothing left to expect, we're done
self.on_pairing()
else:
logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red'))
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
@@ -1002,28 +989,17 @@ class Session:
self.connection.remove_listener('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh)
self.manager.on_session_end(self)
def on_peer_key_distribution_complete(self):
# The initiator can now send its keys
if self.is_initiator:
self.distribute_keys()
asyncio.create_task(self.on_pairing())
def on_connection_encryption_change(self):
if self.connection.is_encrypted:
if self.is_responder:
# The responder distributes its keys first, the initiator later
self.distribute_keys()
# If we're not expecting key distributions from the peer, we're done
if not self.peer_expected_distributions:
self.on_peer_key_distribution_complete()
def on_connection_encryption_key_refresh(self):
# Do as if the connection had just been encrypted
self.on_connection_encryption_change()
async def on_pairing(self):
def on_pairing(self):
logger.debug('pairing complete')
if self.completed:
@@ -1040,16 +1016,11 @@ class Session:
else:
peer_address = self.connection.peer_address
# Wait for link key fetch and key derivation
if self.ctkd_task is not None:
await self.ctkd_task
self.ctkd_task = None
# Create an object to hold the keys
keys = PairingKeys()
keys.address_type = peer_address.address_type
authenticated = self.pairing_method != self.JUST_WORKS
if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT:
if self.sc:
keys.ltk = PairingKeys.Key(
value = self.ltk,
authenticated = authenticated
@@ -1088,6 +1059,7 @@ class Session:
value = self.link_key,
authenticated = authenticated
)
self.manager.on_pairing(self, peer_address, keys)
def on_pairing_failure(self, reason):
@@ -1165,12 +1137,6 @@ class Session:
# Respond
self.send_pairing_response_command()
# Vol 3, Part C, 5.2.2.1.3
# CTKD over BR/EDR should happen after the connection has been encrypted,
# so when receiving pairing requests, responder should start distributing keys
if self.connection.transport == BT_BR_EDR_TRANSPORT and self.connection.is_encrypted and self.is_responder and accepted:
self.distribute_keys()
def on_smp_pairing_response_command(self, command):
if self.is_responder:
logger.warn(color('received pairing response as a responder', 'red'))
@@ -1496,8 +1462,7 @@ class Manager(EventEmitter):
def send_command(self, connection, command):
logger.debug(f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}')
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, command.to_bytes())
connection.send_l2cap_pdu(SMP_CID, command.to_bytes())
def on_smp_pdu(self, connection, pdu):
# Look for a session with this connection, and create one if none exists
+22 -82
View File
@@ -56,19 +56,18 @@ async def open_usb_transport(spec):
USB_RECIPIENT_DEVICE = 0x00
USB_REQUEST_TYPE_CLASS = 0x01 << 5
USB_ENDPOINT_EVENTS_IN = 0x81
USB_ENDPOINT_ACL_IN = 0x82
USB_ENDPOINT_ACL_OUT = 0x02
USB_DEVICE_CLASS_WIRELESS_CONTROLLER = 0xE0
USB_DEVICE_SUBCLASS_RF_CONTROLLER = 0x01
USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER = 0x01
USB_ENDPOINT_TRANSFER_TYPE_BULK = 0x02
USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT = 0x03
USB_ENDPOINT_IN = 0x80
READ_SIZE = 1024
class UsbPacketSink:
def __init__(self, device, acl_out):
def __init__(self, device):
self.device = device
self.acl_out = acl_out
self.transfer = device.getTransfer()
self.packets = collections.deque() # Queue of packets waiting to be sent
self.loop = asyncio.get_running_loop()
@@ -117,7 +116,7 @@ async def open_usb_transport(spec):
packet_type = packet[0]
if packet_type == hci.HCI_ACL_DATA_PACKET:
self.transfer.setBulk(
self.acl_out,
USB_ENDPOINT_ACL_OUT,
packet[1:],
callback=self.on_packet_sent
)
@@ -153,12 +152,10 @@ async def open_usb_transport(spec):
logger.debug('OUT transfer likely already completed')
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, context, device, acl_in, events_in):
def __init__(self, context, device):
super().__init__()
self.context = context
self.device = device
self.acl_in = acl_in
self.events_in = events_in
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.closed = False
@@ -175,7 +172,7 @@ async def open_usb_transport(spec):
# Set up transfer objects for input
self.events_in_transfer = device.getTransfer()
self.events_in_transfer.setInterrupt(
self.events_in,
USB_ENDPOINT_EVENTS_IN,
READ_SIZE,
callback=self.on_packet_received,
user_data=hci.HCI_EVENT_PACKET
@@ -184,7 +181,7 @@ async def open_usb_transport(spec):
self.acl_in_transfer = device.getTransfer()
self.acl_in_transfer.setBulk(
self.acl_in,
USB_ENDPOINT_ACL_IN,
READ_SIZE,
callback=self.on_packet_received,
user_data=hci.HCI_ACL_DATA_PACKET
@@ -251,7 +248,7 @@ async def open_usb_transport(spec):
await self.event_loop_done
class UsbTransport(Transport):
def __init__(self, context, device, interface, setting, source, sink):
def __init__(self, context, device, interface, source, sink):
super().__init__(source, sink)
self.context = context
self.device = device
@@ -260,10 +257,6 @@ async def open_usb_transport(spec):
# Get exclusive access
device.claimInterface(interface)
# Set the alternate setting if not the default
if setting != 0:
device.setInterfaceAltSetting(interface, setting)
# The source and sink can now start
source.start()
sink.start()
@@ -320,64 +313,11 @@ async def open_usb_transport(spec):
raise ValueError('device not found')
logger.debug(f'USB Device: {found}')
# Look for the first interface with the right class and endpoints
def find_endpoints(device):
for (configuration_index, configuration) in enumerate(device):
interface = None
for interface in configuration:
setting = None
for setting in interface:
if (
setting.getClass() != USB_DEVICE_CLASS_WIRELESS_CONTROLLER or
setting.getSubClass() != USB_DEVICE_SUBCLASS_RF_CONTROLLER or
setting.getProtocol() != USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
):
continue
events_in = None
acl_in = None
acl_out = None
for endpoint in setting:
attributes = endpoint.getAttributes()
address = endpoint.getAddress()
if attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_BULK:
if address & USB_ENDPOINT_IN and acl_in is None:
acl_in = address
elif acl_out is None:
acl_out = address
elif attributes & 0x03 == USB_ENDPOINT_TRANSFER_TYPE_INTERRUPT:
if address & USB_ENDPOINT_IN and events_in is None:
events_in = address
# Return if we found all 3 endpoints
if acl_in is not None and acl_out is not None and events_in is not None:
return (
configuration_index + 1,
setting.getNumber(),
setting.getAlternateSetting(),
acl_in,
acl_out,
events_in
)
else:
logger.debug(f'skipping configuration {configuration_index + 1} / interface {setting.getNumber()}')
endpoints = find_endpoints(found)
if endpoints is None:
raise ValueError('no compatible interface found for device')
(configuration, interface, setting, acl_in, acl_out, events_in) = endpoints
logger.debug(
f'selected endpoints: configuration={configuration}, '
f'interface={interface}, '
f'setting={setting}, '
f'acl_in=0x{acl_in:02X}, '
f'acl_out=0x{acl_out:02X}, '
f'events_in=0x{events_in:02X}, '
)
device = found.open()
# Use the first interface
interface = 0
# Detach the kernel driver if supported and needed
if usb1.hasCapability(usb1.CAP_SUPPORTS_DETACH_KERNEL_DRIVER):
try:
@@ -389,21 +329,21 @@ async def open_usb_transport(spec):
# Set the configuration if needed
try:
current_configuration = device.getConfiguration()
logger.debug(f'current configuration = {current_configuration}')
configuration = device.getConfiguration()
logger.debug(f'current configuration = {configuration}')
except usb1.USBError:
current_configuration = 0
configuration = 0
if current_configuration != configuration:
if configuration != 1:
try:
logger.debug(f'setting configuration {configuration}')
device.setConfiguration(configuration)
logger.debug('setting configuration 1')
device.setConfiguration(1)
except usb1.USBError:
logger.warning('failed to set configuration')
logger.warning('failed to set configuration 1')
source = UsbPacketSource(context, device, acl_in, events_in)
sink = UsbPacketSink(device, acl_out)
return UsbTransport(context, device, interface, setting, source, sink)
source = UsbPacketSource(context, device)
sink = UsbPacketSink(device)
return UsbTransport(context, device, interface, source, sink)
except usb1.USBError as error:
logger.warning(color(f'!!! failed to open USB device: {error}', 'red'))
context.close()
+2 -1
View File
@@ -246,7 +246,8 @@ IO_CAP = [
SC = [False, True]
MITM = [False, True]
# Key distribution is a 4-bit bitmask
KEY_DIST = range(16)
# IdKey is necessary for current SMP structure
KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)]
@pytest.mark.asyncio
@pytest.mark.parametrize('io_cap, sc, mitm, key_dist',