diff --git a/apps/bench.py b/apps/bench.py index 0c74f72..e2200c1 100644 --- a/apps/bench.py +++ b/apps/bench.py @@ -1256,6 +1256,7 @@ class Central(Connection.Listener): self.device.classic_enabled = self.classic # Set up a pairing config factory with minimal requirements. + self.device.config.keystore = "JsonKeyStore" self.device.pairing_config_factory = lambda _: PairingConfig( sc=False, mitm=False, bonding=False ) @@ -1408,6 +1409,7 @@ class Peripheral(Device.Listener, Connection.Listener): self.device.classic_enabled = self.classic # Set up a pairing config factory with minimal requirements. + self.device.config.keystore = "JsonKeyStore" self.device.pairing_config_factory = lambda _: PairingConfig( sc=False, mitm=False, bonding=False ) diff --git a/apps/pair.py b/apps/pair.py index 5ed7679..13dc06d 100644 --- a/apps/pair.py +++ b/apps/pair.py @@ -18,9 +18,12 @@ import asyncio import os import logging +import struct + import click from prompt_toolkit.shortcuts import PromptSession +from bumble.a2dp import make_audio_sink_service_sdp_records from bumble.colors import color from bumble.device import Device, Peer from bumble.transport import open_transport_or_link @@ -30,16 +33,20 @@ from bumble.smp import error_name as smp_error_name from bumble.keys import JsonKeyStore from bumble.core import ( AdvertisingData, + Appearance, ProtocolError, PhysicalTransport, + UUID, ) from bumble.gatt import ( GATT_DEVICE_NAME_CHARACTERISTIC, GATT_GENERIC_ACCESS_SERVICE, + GATT_HEART_RATE_SERVICE, + GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, Service, Characteristic, - CharacteristicValue, ) +from bumble.hci import OwnAddressType from bumble.att import ( ATT_Error, ATT_INSUFFICIENT_AUTHENTICATION_ERROR, @@ -62,7 +69,7 @@ class Waiter: self.linger = linger def terminate(self): - if not self.linger: + if not self.linger and not self.done.done: self.done.set_result(None) async def wait_until_terminated(self): @@ -193,7 +200,7 @@ class Delegate(PairingDelegate): # ----------------------------------------------------------------------------- async def get_peer_name(peer, mode): - if mode == 'classic': + if peer.connection.transport == PhysicalTransport.BR_EDR: return await peer.request_name() # Try to get the peer name from GATT @@ -225,13 +232,14 @@ def read_with_error(connection): raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) -def write_with_error(connection, _value): - if not connection.is_encrypted: - raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) - - if not AUTHENTICATION_ERROR_RETURNED[1]: - AUTHENTICATION_ERROR_RETURNED[1] = True - raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) +# ----------------------------------------------------------------------------- +def sdp_records(): + service_record_handle = 0x00010001 + return { + service_record_handle: make_audio_sink_service_sdp_records( + service_record_handle + ) + } # ----------------------------------------------------------------------------- @@ -239,15 +247,19 @@ def on_connection(connection, request): print(color(f'<<< Connection: {connection}', 'green')) # Listen for pairing events - connection.on('pairing_start', on_pairing_start) - connection.on('pairing', lambda keys: on_pairing(connection, keys)) + connection.on(connection.EVENT_PAIRING_START, on_pairing_start) + connection.on(connection.EVENT_PAIRING, lambda keys: on_pairing(connection, keys)) connection.on( - 'pairing_failure', lambda reason: on_pairing_failure(connection, reason) + connection.EVENT_CLASSIC_PAIRING, lambda: on_classic_pairing(connection) + ) + connection.on( + connection.EVENT_PAIRING_FAILURE, + lambda reason: on_pairing_failure(connection, reason), ) # Listen for encryption changes connection.on( - 'connection_encryption_change', + connection.EVENT_CONNECTION_ENCRYPTION_CHANGE, lambda: on_connection_encryption_change(connection), ) @@ -288,6 +300,20 @@ async def on_pairing(connection, keys): Waiter.instance.terminate() +# ----------------------------------------------------------------------------- +@AsyncRunner.run_in_task() +async def on_classic_pairing(connection): + print(color('***-----------------------------------', 'cyan')) + print( + color( + f'*** Paired [Classic]! (peer identity={connection.peer_address})', 'cyan' + ) + ) + print(color('***-----------------------------------', 'cyan')) + await asyncio.sleep(POST_PAIRING_DELAY) + Waiter.instance.terminate() + + # ----------------------------------------------------------------------------- @AsyncRunner.run_in_task() async def on_pairing_failure(connection, reason): @@ -305,6 +331,7 @@ async def pair( mitm, bond, ctkd, + advertising_address, identity_address, linger, io, @@ -313,6 +340,8 @@ async def pair( request, print_keys, keystore_file, + advertise_service_uuids, + advertise_appearance, device_config, hci_transport, address_or_name, @@ -328,29 +357,33 @@ async def pair( # Expose a GATT characteristic that can be used to trigger pairing by # responding with an authentication error when read - if mode == 'le': - device.le_enabled = True + if mode in ('le', 'dual'): device.add_service( Service( - '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', + GATT_HEART_RATE_SERVICE, [ Characteristic( - '552957FB-CF1F-4A31-9535-E78847E1A714', - Characteristic.Properties.READ - | Characteristic.Properties.WRITE, - Characteristic.READABLE | Characteristic.WRITEABLE, - CharacteristicValue( - read=read_with_error, write=write_with_error - ), + GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READ_REQUIRES_AUTHENTICATION, + bytes(1), ) ], ) ) - # Select LE or Classic - if mode == 'classic': + # LE and Classic support + if mode in ('classic', 'dual'): device.classic_enabled = True device.classic_smp_enabled = ctkd + if mode in ('le', 'dual'): + device.le_enabled = True + if mode == 'dual': + device.le_simultaneous_enabled = True + + # Setup SDP + if mode in ('classic', 'dual'): + device.sdp_service_records = sdp_records() # Get things going await device.power_on() @@ -436,13 +469,109 @@ async def pair( print(color(f'Pairing failed: {error}', 'red')) else: - if mode == 'le': - # Advertise so that peers can find us and connect - await device.start_advertising(auto_restart=True) - else: + if mode in ('le', 'dual'): + # Advertise so that peers can find us and connect. + # Include the heart rate service UUID in the advertisement data + # so that devices like iPhones can show this device in their + # Bluetooth selector. + service_uuids_16 = [] + service_uuids_32 = [] + service_uuids_128 = [] + if advertise_service_uuids: + for uuid in advertise_service_uuids: + uuid = uuid.replace("-", "") + if len(uuid) == 4: + service_uuids_16.append(UUID(uuid)) + elif len(uuid) == 8: + service_uuids_32.append(UUID(uuid)) + elif len(uuid) == 32: + service_uuids_128.append(UUID(uuid)) + else: + print(color('Invalid UUID format', 'red')) + return + else: + service_uuids_16.append(GATT_HEART_RATE_SERVICE) + + flags = AdvertisingData.Flags.LE_LIMITED_DISCOVERABLE_MODE + if mode == 'le': + flags |= AdvertisingData.Flags.BR_EDR_NOT_SUPPORTED + if mode == 'dual': + flags |= AdvertisingData.Flags.SIMULTANEOUS_LE_BR_EDR_CAPABLE + + ad_structs = [ + ( + AdvertisingData.FLAGS, + bytes([flags]), + ), + (AdvertisingData.COMPLETE_LOCAL_NAME, 'Bumble'.encode()), + ] + if service_uuids_16: + ad_structs.append( + ( + AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, + b"".join(bytes(uuid) for uuid in service_uuids_16), + ) + ) + if service_uuids_32: + ad_structs.append( + ( + AdvertisingData.INCOMPLETE_LIST_OF_32_BIT_SERVICE_CLASS_UUIDS, + b"".join(bytes(uuid) for uuid in service_uuids_32), + ) + ) + if service_uuids_128: + ad_structs.append( + ( + AdvertisingData.INCOMPLETE_LIST_OF_128_BIT_SERVICE_CLASS_UUIDS, + b"".join(bytes(uuid) for uuid in service_uuids_128), + ) + ) + + if advertise_appearance: + advertise_appearance = advertise_appearance.upper() + try: + advertise_appearance_int = int(advertise_appearance) + except ValueError: + category, subcategory = advertise_appearance.split('/') + try: + category_enum = Appearance.Category[category] + except ValueError: + print( + color(f'Invalid appearance category {category}', 'red') + ) + return + subcategory_class = Appearance.SUBCATEGORY_CLASSES[ + category_enum + ] + try: + subcategory_enum = subcategory_class[subcategory] + except ValueError: + print(color(f'Invalid subcategory {subcategory}', 'red')) + return + advertise_appearance_int = int( + Appearance(category_enum, subcategory_enum) + ) + ad_structs.append( + ( + AdvertisingData.APPEARANCE, + struct.pack(' None: try: - if await utils.cancel_on_event(connection, 'disconnection', method()): + if await utils.cancel_on_event( + connection, Connection.EVENT_DISCONNECTION, method() + ): await self.host.send_command( hci.HCI_User_Confirmation_Request_Reply_Command( bd_addr=connection.peer_address @@ -5741,7 +5760,9 @@ class Device(utils.CompositeEventEmitter): async def reply() -> None: try: number = await utils.cancel_on_event( - connection, 'disconnection', pairing_config.delegate.get_number() + connection, + Connection.EVENT_DISCONNECTION, + pairing_config.delegate.get_number(), ) if number is not None: await self.host.send_command( @@ -5775,7 +5796,9 @@ class Device(utils.CompositeEventEmitter): # Ask the user to enter a string async def get_pin_code(): pin_code = await utils.cancel_on_event( - connection, 'disconnection', pairing_config.delegate.get_string(16) + connection, + Connection.EVENT_DISCONNECTION, + pairing_config.delegate.get_string(16), ) if pin_code is not None: @@ -5814,7 +5837,9 @@ class Device(utils.CompositeEventEmitter): # Show the passkey to the user utils.cancel_on_event( - connection, 'disconnection', pairing_config.delegate.display_number(passkey) + connection, + Connection.EVENT_DISCONNECTION, + pairing_config.delegate.display_number(passkey, digits=6), ) # [Classic only] @@ -5950,13 +5975,17 @@ class Device(utils.CompositeEventEmitter): @host_event_handler @with_connection_from_handle - def on_connection_encryption_change(self, connection, encryption): + def on_connection_encryption_change( + self, connection, encryption, encryption_key_size + ): logger.debug( f'*** Connection Encryption Change: [0x{connection.handle:04X}] ' f'{connection.peer_address} as {connection.role_name}, ' - f'encryption={encryption}' + f'encryption={encryption}, ' + f'key_size={encryption_key_size}' ) connection.encryption = encryption + connection.encryption_key_size = encryption_key_size if ( not connection.authenticated and connection.transport == PhysicalTransport.BR_EDR diff --git a/bumble/hci.py b/bumble/hci.py index 2ab46e4..029c2dc 100644 --- a/bumble/hci.py +++ b/bumble/hci.py @@ -29,13 +29,12 @@ from typing_extensions import Self from bumble import crypto from bumble.colors import color from bumble.core import ( - PhysicalTransport, AdvertisingData, DeviceClass, InvalidArgumentError, InvalidPacketError, - ProtocolError, PhysicalTransport, + ProtocolError, bit_flags_to_strings, name_or_number, padded_bytes, @@ -225,6 +224,7 @@ HCI_CONNECTIONLESS_PERIPHERAL_BROADCAST_CHANNEL_MAP_CHANGE_EVENT = 0X55 HCI_INQUIRY_RESPONSE_NOTIFICATION_EVENT = 0X56 HCI_AUTHENTICATED_PAYLOAD_TIMEOUT_EXPIRED_EVENT = 0X57 HCI_SAM_STATUS_CHANGE_EVENT = 0X58 +HCI_ENCRYPTION_CHANGE_V2_EVENT = 0x59 HCI_VENDOR_EVENT = 0xFF @@ -3364,6 +3364,20 @@ class HCI_Set_Event_Mask_Page_2_Command(HCI_Command): See Bluetooth spec @ 7.3.69 Set Event Mask Page 2 Command ''' + @staticmethod + def mask(event_codes: Iterable[int]) -> bytes: + ''' + Compute the event mask value for a list of events. + ''' + # NOTE: this implementation takes advantage of the fact that as of version 6.0 + # of the core specification, the bit number for each event code is equal to 64 + # less than the event code. + # If future versions of the specification deviate from that, a different + # implementation would be needed. + return sum((1 << event_code - 64) for event_code in event_codes).to_bytes( + 8, 'little' + ) + # ----------------------------------------------------------------------------- @HCI_Command.command( @@ -6977,6 +6991,30 @@ class HCI_Encryption_Change_Event(HCI_Event): ) +# ----------------------------------------------------------------------------- +@HCI_Event.event( + [ + ('status', STATUS_SPEC), + ('connection_handle', 2), + ( + 'encryption_enabled', + { + 'size': 1, + # pylint: disable-next=unnecessary-lambda + 'mapper': lambda x: HCI_Encryption_Change_Event.encryption_enabled_name( + x + ), + }, + ), + ('encryption_key_size', 1), + ] +) +class HCI_Encryption_Change_V2_Event(HCI_Event): + ''' + See Bluetooth spec @ 7.7.8 Encryption Change Event + ''' + + # ----------------------------------------------------------------------------- @HCI_Event.event( [('status', STATUS_SPEC), ('connection_handle', 2), ('lmp_features', 8)] diff --git a/bumble/host.py b/bumble/host.py index 183c5a7..755732a 100644 --- a/bumble/host.py +++ b/bumble/host.py @@ -435,6 +435,14 @@ class Host(utils.EventEmitter): ) ) ) + if self.supports_command(hci.HCI_SET_EVENT_MASK_PAGE_2_COMMAND): + await self.send_command( + hci.HCI_Set_Event_Mask_Page_2_Command( + event_mask_page_2=hci.HCI_Set_Event_Mask_Page_2_Command.mask( + [hci.HCI_ENCRYPTION_CHANGE_V2_EVENT] + ) + ) + ) if ( self.local_version is not None @@ -1384,6 +1392,21 @@ class Host(utils.EventEmitter): 'connection_encryption_change', event.connection_handle, event.encryption_enabled, + 0, + ) + else: + self.emit( + 'connection_encryption_failure', event.connection_handle, event.status + ) + + def on_hci_encryption_change_v2_event(self, event): + # Notify the client + if event.status == hci.HCI_SUCCESS: + self.emit( + 'connection_encryption_change', + event.connection_handle, + event.encryption_enabled, + event.encryption_key_size, ) else: self.emit( diff --git a/examples/run_a2dp_sink.py b/examples/run_a2dp_sink.py index a3bcb29..f5d337c 100644 --- a/examples/run_a2dp_sink.py +++ b/examples/run_a2dp_sink.py @@ -33,12 +33,6 @@ from bumble.avdtp import ( from bumble.a2dp import ( make_audio_sink_service_sdp_records, A2DP_SBC_CODEC_TYPE, - SBC_MONO_CHANNEL_MODE, - SBC_DUAL_CHANNEL_MODE, - SBC_SNR_ALLOCATION_METHOD, - SBC_LOUDNESS_ALLOCATION_METHOD, - SBC_STEREO_CHANNEL_MODE, - SBC_JOINT_STEREO_CHANNEL_MODE, SbcMediaCodecInformation, )