Compare commits

..

24 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
3040df3179 add support for the heart rate service 2022-07-23 09:38:44 -07:00
Gilles Boccon-Gibod
c66b357de6 Merge pull request #13 from google/gbg/standard-profiles
support for type adapters and framework for standard GATT profiles
2022-07-22 10:21:39 -07:00
Gilles Boccon-Gibod
e156ed3758 add in-context uuids and service proxy factories 2022-07-20 19:56:40 -07:00
Gilles Boccon-Gibod
0ffed3deff Merge pull request #11 from zxzxwu/main
Implement CTKD over LE and key distribution delegation
2022-07-20 15:35:26 -07:00
Josh Wu
2f949a1182 Delegate SMP key distribution
* Delegate SMP key distribution
* Align LE pairing key expectation
* Parametrize SMP self test, and add key distribution coverage
2022-07-21 01:19:36 +08:00
Josh Wu
4e2fae5145 Implement CTKD over LE 2022-07-21 01:19:25 +08:00
Gilles Boccon-Gibod
2b58364c51 Merge pull request #14 from zxzxwu/conn-lookup
Refactor find_connection_by_bd_addr
2022-07-20 08:26:04 -07:00
Josh Wu
e3bf7c4b53 Refactor find_connection_by_bd_addr
* Compare only address bytes because Address.__eq__ also compares types.
* Add a transport field to find connection to a device on specific
  transport. (It's possible to connect a device on both BR/EDR and LE)
2022-07-20 21:32:20 +08:00
Gilles Boccon-Gibod
bd28892734 add support for type adapters and framework for adding standard GATT profiles 2022-07-19 19:42:21 -07:00
Gilles Boccon-Gibod
b64fa65921 Merge pull request #10 from zxzxwu/main
Make pairing and link mode configurable
2022-07-18 12:48:07 -07:00
Josh Wu
7d87c3cc3a Make pairing and link mode configurable 2022-07-18 14:28:21 +08:00
Gilles Boccon-Gibod
94fc81c183 Merge pull request #7 from DeltaEvo/david/config-load
Make DeviceConfiguration loadable from a dict
2022-06-25 05:08:44 -07:00
Gilles Boccon-Gibod
b65b395fc4 Merge pull request #8 from Arrowbox/threadsafe_usb_close
Use threadsafe call when setting event_loop_done
2022-06-25 05:07:42 -07:00
David Duarte
0f157d55f7 Make DeviceConfiguration loadable from a dict 2022-06-24 09:57:16 +00:00
Jayson Messenger
925d79491f Use threadsafe call when setting event_loop_done
Previously, the close method would hang waiting on the future to be
done.
2022-06-23 15:19:05 -04:00
Gilles Boccon-Gibod
3d14df909c Merge pull request #5 from google/gbg/disconnection-event-routing
fix the routing of disconnection events
2022-06-15 10:50:14 -07:00
Gilles Boccon-Gibod
153788afe3 fix the routing of disconnection events 2022-06-14 14:38:40 -07:00
Gilles Boccon-Gibod
99ca31c063 Merge pull request #4 from google/gbg/classic-pairing-io
classic pairing io
2022-06-14 11:32:37 -07:00
Gilles Boccon-Gibod
9629e677f2 improve readability as per PR suggestion 2022-06-14 10:57:54 -07:00
Gilles Boccon-Gibod
250c1e3395 address PR comments 2022-06-13 16:44:57 -07:00
Gilles Boccon-Gibod
70dca1d7c9 cosmetic fix 2022-06-11 15:50:53 -07:00
Gilles Boccon-Gibod
a5015c1305 add pytest async options 2022-06-11 12:28:00 -07:00
Gilles Boccon-Gibod
6e22df4838 add doc structure 2022-06-11 12:19:54 -07:00
Gilles Boccon-Gibod
b4e2f21d2a add classic pairing io delegation 2022-06-11 01:33:51 -07:00
36 changed files with 1883 additions and 432 deletions

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@ docs/mkdocs/site
tests/__pycache__
test-results.xml
bumble/transport/__pycache__
bumble/profiles/__pycache__

View File

@@ -32,10 +32,10 @@ async def dump_gatt_db(peer, done):
# Discover all services
print(color('### Discovering Services and Characteristics', 'magenta'))
await peer.discover_services()
await peer.discover_characteristics()
for service in peer.services:
await service.discover_characteristics()
for characteristic in service.characteristics:
await peer.discover_descriptors(characteristic)
await characteristic.discover_descriptors()
print(color('=== Services ===', 'yellow'))
show_services(peer.services)
@@ -47,7 +47,7 @@ async def dump_gatt_db(peer, done):
for attribute in attributes:
print(attribute)
try:
value = await peer.read_value(attribute)
value = await attribute.read_value()
print(color(f'{value.hex()}', 'green'))
except ProtocolError as error:
print(color(error, 'red'))

View File

@@ -73,7 +73,7 @@ class GattlinkHubBridge(Device.Listener):
gattlink_service = services[0]
# Discover all the characteristics for the service
characteristics = await self.peer.discover_characteristics(service = gattlink_service)
characteristics = await gattlink_service.discover_characteristics()
print(color('=== Characteristics discovered', 'yellow'))
for characteristic in characteristics:
if characteristic.uuid == GG_GATTLINK_RX_CHARACTERISTIC_UUID:

View File

@@ -44,7 +44,7 @@ from bumble.att import (
# -----------------------------------------------------------------------------
class Delegate(PairingDelegate):
def __init__(self, connection, capability_string, prompt):
def __init__(self, mode, connection, capability_string, prompt):
super().__init__({
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
@@ -53,6 +53,7 @@ class Delegate(PairingDelegate):
'none': PairingDelegate.NO_OUTPUT_NO_INPUT
}[capability_string.lower()])
self.mode = mode
self.peer = Peer(connection)
self.peer_name = None
self.prompt = prompt
@@ -64,7 +65,7 @@ class Delegate(PairingDelegate):
# Try to get the peer's name
if self.peer:
peer_name = await get_peer_name(self.peer)
peer_name = await get_peer_name(self.peer, self.mode)
self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
else:
self.peer_name = '[?]'
@@ -91,7 +92,7 @@ class Delegate(PairingDelegate):
# Accept silently
return True
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
await self.update_peer_name()
# Wait a bit to allow some of the log lines to print before we prompt
@@ -102,7 +103,7 @@ class Delegate(PairingDelegate):
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
print(color('###-----------------------------------', 'yellow'))
while True:
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:06}? ', 'yellow'))
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:0{digits}}? ', 'yellow'))
response = response.lower().strip()
if response == 'yes':
return True
@@ -125,7 +126,7 @@ class Delegate(PairingDelegate):
except ValueError:
pass
async def display_number(self, number):
async def display_number(self, number, digits):
await self.update_peer_name()
# Wait a bit to allow some of the log lines to print before we prompt
@@ -134,19 +135,23 @@ class Delegate(PairingDelegate):
# Display a PIN code
print(color('###-----------------------------------', 'yellow'))
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
print(color(f'### PIN: {number:06}', 'yellow'))
print(color(f'### PIN: {number:0{digits}}', 'yellow'))
print(color('###-----------------------------------', 'yellow'))
# -----------------------------------------------------------------------------
async def get_peer_name(peer):
services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
if not services:
return None
async def get_peer_name(peer, mode):
if mode == 'classic':
return await peer.request_name()
else:
# Try to get the peer name from GATT
services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
if not services:
return None
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
# -----------------------------------------------------------------------------
@@ -224,9 +229,22 @@ def on_pairing_failure(reason):
# -----------------------------------------------------------------------------
async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
async def pair(
mode,
sc,
mitm,
bond,
io,
prompt,
request,
print_keys,
keystore_file,
device_config,
hci_transport,
address_or_name
):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
# Create a device to manage the host
@@ -245,19 +263,25 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
# Expose a GATT characteristic that can be used to trigger pairing by
# responding with an authentication error when read
device.add_service(
Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=read_with_error, write=write_with_error)
)
]
if mode == 'le':
device.add_service(
Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=read_with_error, write=write_with_error)
)
]
)
)
)
# Select LE or Classic
if mode == 'classic':
device.classic_enabled = True
device.le_enabled = False
# Get things going
await device.power_on()
@@ -267,7 +291,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
sc,
mitm,
bond,
Delegate(connection, io, prompt)
Delegate(mode, connection, io, prompt)
)
# Connect to a peer or wait for a connection
@@ -278,10 +302,14 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
if not request:
try:
await connection.pair()
if mode == 'le':
await connection.pair()
else:
await connection.authenticate()
return
except ProtocolError as error:
print(color(f'Pairing failed: {error}', 'red'))
return
except ProtocolError:
pass
else:
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
@@ -291,6 +319,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
# -----------------------------------------------------------------------------
@click.command()
@click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True)
@click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True)
@click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True)
@click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True)
@@ -300,11 +329,11 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
@click.option('--keystore-file', help='File in which to store the pairing keys')
@click.argument('device-config')
@click.argument('transport')
@click.argument('hci_transport')
@click.argument('address-or-name', required=False)
def main(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name))
asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name))
# -----------------------------------------------------------------------------

View File

@@ -682,11 +682,14 @@ class Attribute(EventEmitter):
def __init__(self, attribute_type, permissions, value = b''):
EventEmitter.__init__(self)
self.handle = 0
self.permissions = permissions
self.handle = 0
self.end_group_handle = 0
self.permissions = permissions
# Convert the type to a UUID
if type(attribute_type) is bytes:
# Convert the type to a UUID object if it isn't already
if type(attribute_type) is str:
self.type = UUID(attribute_type)
elif type(attribute_type) is bytes:
self.type = UUID.from_bytes(attribute_type)
else:
self.type = attribute_type
@@ -698,16 +701,13 @@ class Attribute(EventEmitter):
self.value = value
def read_value(self, connection):
if type(self.value) is bytes:
return self.value
if read := getattr(self.value, 'read', None):
try:
return read(connection)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
if read := getattr(self.value, 'read', None):
try:
return read(connection)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
return bytes(self.value)
return self.value
def write_value(self, connection, value):
if write := getattr(self.value, 'write', None):

View File

@@ -227,3 +227,17 @@ def g2(u, v, x, y):
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)), bytes(reversed(x)))[-4:],
byteorder='big'
)
# -----------------------------------------------------------------------------
def h6(w, key_id):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
'''
return aes_cmac(key_id, w)
# -----------------------------------------------------------------------------
def h7(salt, w):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
'''
return aes_cmac(w, salt)

View File

@@ -137,6 +137,21 @@ class Peer:
def get_characteristics_by_uuid(self, uuid, service = None):
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(self, proxy_class):
return proxy_class.from_client(self.gatt_client)
async def discover_service_and_create_proxy(self, proxy_class):
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
if services:
service = services[0]
await service.discover_characteristics()
return self.create_service_proxy(proxy_class)
# [Classic only]
async def request_name(self):
return await self.connection.request_remote_name()
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -176,6 +191,7 @@ class Connection(CompositeEventEmitter):
self.transport = transport
self.peer_address = peer_address
self.peer_resolvable_address = peer_resolvable_address
self.peer_name = None # Classic only
self.role = role
self.parameters = parameters
self.encryption = 0
@@ -231,6 +247,10 @@ class Connection(CompositeEventEmitter):
supervision_timeout
)
# [Classic only]
async def request_remote_name(self):
return await self.device.request_remote_name(self)
def __str__(self):
return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})'
@@ -245,39 +265,48 @@ class DeviceConfiguration:
self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.advertising_data = bytes(
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
)
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
def load_from_dict(self, config):
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
self.le_enabled = config.get('le_enabled', self.le_enabled)
self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled)
self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
else:
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data)
def load_from_file(self, filename):
with open(filename, 'r') as file:
config = json.load(file)
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
else:
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data)
self.load_from_dict(json.load(file))
# -----------------------------------------------------------------------------
# Decorators used with the following Device class
@@ -290,9 +319,19 @@ def with_connection_from_handle(function):
@functools.wraps(function)
def wrapper(self, connection_handle, *args, **kwargs):
if (connection := self.lookup_connection(connection_handle)) is None:
logger.warn(f'no connection found for handle 0x{connection_handle:04X}')
return
function(self, connection, *args, **kwargs)
raise ValueError('no connection for handle')
return function(self, connection, *args, **kwargs)
return wrapper
# Decorator that converts the first argument from a bluetooth address to a connection
def with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address, *args, **kwargs):
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, *args, **kwargs)
raise ValueError('no connection for address')
return wrapper
@@ -368,7 +407,6 @@ class Device(CompositeEventEmitter):
self.connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.le_enabled = True
self.classic_enabled = False
self.discoverable = False
self.connectable = False
@@ -388,6 +426,10 @@ class Device(CompositeEventEmitter):
self.advertising_interval_max = config.advertising_interval_max
self.keystore = keys.KeyStore.create_for_device(config)
self.irk = config.irk
self.le_enabled = config.le_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
# If a name is passed, override the name from the config
if name:
@@ -453,6 +495,12 @@ class Device(CompositeEventEmitter):
if connection := self.connections.get(connection_handle):
return connection
def find_connection_by_bd_addr(self, bd_addr, transport=None):
for connection in self.connections.values():
if connection.peer_address.get_bytes() == bd_addr.get_bytes():
if transport is None or connection.transport == transport:
return connection
def register_l2cap_server(self, psm, server):
self.l2cap_channel_manager.register_server(psm, server)
@@ -480,6 +528,11 @@ class Device(CompositeEventEmitter):
logger.debug(color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow'))
self.public_address = response.return_parameters.bd_addr
await self.send_command(HCI_Write_LE_Host_Support_Command(
le_supported_host = int(self.le_enabled),
simultaneous_le_host = int(self.le_simultaneous_enabled),
))
if self.le_enabled:
# Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command(
@@ -517,7 +570,12 @@ class Device(CompositeEventEmitter):
HCI_Write_Class_Of_Device_Command(class_of_device = self.class_of_device)
)
await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command(simple_pairing_mode = 0x01)
HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled))
)
await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled))
)
# Let the SMP manager know about the address
@@ -792,8 +850,8 @@ class Device(CompositeEventEmitter):
async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
self.on('disconnection', pending_disconnection.set_result)
self.on('disconnection_failure', pending_disconnection.set_exception)
connection.on('disconnection', pending_disconnection.set_result)
connection.on('disconnection_failure', pending_disconnection.set_exception)
# Request a disconnection
result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason))
@@ -806,8 +864,8 @@ class Device(CompositeEventEmitter):
self.disconnecting = True
return await pending_disconnection
finally:
self.remove_listener('disconnection', pending_disconnection.set_result)
self.remove_listener('disconnection_failure', pending_disconnection.set_exception)
connection.remove_listener('disconnection', pending_disconnection.set_result)
connection.remove_listener('disconnection_failure', pending_disconnection.set_exception)
self.disconnecting = False
async def update_connection_parameters(
@@ -936,13 +994,13 @@ class Device(CompositeEventEmitter):
# Set up event handlers
pending_authentication = asyncio.get_running_loop().create_future()
def on_authentication_complete():
def on_authentication():
pending_authentication.set_result(None)
def on_authentication_failure(error):
pending_authentication.set_exception(error)
def on_authentication_failure(error_code):
pending_authentication.set_exception(HCI_Error(error_code))
connection.on('connection_authentication_complete', on_authentication_complete)
connection.on('connection_authentication', on_authentication)
connection.on('connection_authentication_failure', on_authentication_failure)
# Request the authentication
@@ -957,7 +1015,7 @@ class Device(CompositeEventEmitter):
# Wait for the authentication to complete
await pending_authentication
finally:
connection.remove_listener('connection_authentication_complete', on_authentication_complete)
connection.remove_listener('connection_authentication', on_authentication)
connection.remove_listener('connection_authentication_failure', on_authentication_failure)
async def encrypt(self, connection):
@@ -1028,6 +1086,40 @@ class Device(CompositeEventEmitter):
connection.remove_listener('connection_encryption_change', on_encryption_change)
connection.remove_listener('connection_encryption_failure', on_encryption_failure)
# [Classic only]
async def request_remote_name(self, connection):
# Set up event handlers
pending_name = asyncio.get_running_loop().create_future()
def on_remote_name():
pending_name.set_result(connection.peer_name)
def on_remote_name_failure(error_code):
pending_name.set_exception(HCI_Error(error_code))
connection.on('remote_name', on_remote_name)
connection.on('remote_name_failure', on_remote_name_failure)
try:
result = await self.send_command(
HCI_Remote_Name_Request_Command(
bd_addr = connection.peer_address,
page_scan_repetition_mode = HCI_Remote_Name_Request_Command.R0, # TODO investigate other options
reserved = 0,
clock_offset = 0 # TODO investigate non-0 values
)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
# Wait for the result
return await pending_name
finally:
connection.remove_listener('remote_name', on_remote_name)
connection.remove_listener('remote_name_failure', on_remote_name_failure)
# [Classic only]
@host_event_handler
def on_link_key(self, bd_addr, link_key, key_type):
@@ -1123,14 +1215,15 @@ class Device(CompositeEventEmitter):
asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising))
@host_event_handler
def on_disconnection_failure(self, error_code):
@with_connection_from_handle
def on_disconnection_failure(self, connection, error_code):
logger.debug(f'*** Disconnection failed: {error_code}')
error = ConnectionError(
error_code,
'hci',
HCI_Constant.error_name(error_code)
)
self.emit('disconnection_failure', error)
connection.emit('disconnection_failure', error)
@host_event_handler
@AsyncRunner.run_in_task()
@@ -1141,10 +1234,10 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_authentication_complete(self, connection):
logger.debug(f'*** Connection Authentication Complete: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
def on_connection_authentication(self, connection):
logger.debug(f'*** Connection Authentication: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
connection.authenticated = True
connection.emit('connection_authentication_complete')
connection.emit('connection_authentication')
@host_event_handler
@with_connection_from_handle
@@ -1152,6 +1245,132 @@ class Device(CompositeEventEmitter):
logger.debug(f'*** Connection Authentication Failure: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
connection.emit('connection_authentication_failure', error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Map the SMP IO capability to a Classic IO capability
io_capability = {
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY
}.get(pairing_config.delegate.io_capability)
if io_capability is None:
logger.warning(f'cannot map IO capability ({pairing_config.delegate.io_capability}')
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# Compute the authentication requirements
authentication_requirements = (
# No Bonding
(
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS
),
# General Bonding
(
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS
)
)[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
# Respond
self.host.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr = connection.peer_address,
io_capability = io_capability,
oob_data_present = 0x00, # Not present
authentication_requirements = authentication_requirements
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_confirmation_request(self, connection, code):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_confirm = pairing_config.delegate.io_capability not in {
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY
}
# Respond
if can_confirm and pairing_config.delegate:
async def compare_numbers():
numbers_match = await pairing_config.delegate.compare_numbers(code, digits=6)
if numbers_match:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(compare_numbers())
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_passkey_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in {
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
}
# Respond
if can_input and pairing_config.delegate:
async def get_number():
number = await pairing_config.delegate.get_number()
if number is not None:
self.host.send_command_sync(
HCI_User_Passkey_Request_Reply_Command(
bd_addr = connection.peer_address,
numeric_value = number)
)
else:
self.host.send_command_sync(
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(get_number())
else:
self.host.send_command_sync(
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_remote_name(self, connection, remote_name):
# Try to decode the name
try:
connection.peer_name = remote_name.decode('utf-8')
connection.emit('remote_name')
except UnicodeDecodeError as error:
logger.warning('peer name is not valid UTF-8')
connection.emit('remote_name_failure', error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_remote_name_failure(self, connection, error):
connection.emit('remote_name_failure', error)
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption):

View File

@@ -22,6 +22,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import types
import logging
from colors import color
@@ -53,13 +55,13 @@ GATT_NEXT_DST_CHANGE_SERVICE = UUID.from_16_bits(0x1807, 'Next DS
GATT_GLUCOSE_SERVICE = UUID.from_16_bits(0x1808, 'Glucose')
GATT_HEALTH_THERMOMETER_SERVICE = UUID.from_16_bits(0x1809, 'Health Thermometer')
GATT_DEVICE_INFORMATION_SERVICE = UUID.from_16_bits(0x180A, 'Device Information')
GATT_DEVICE_HEART_RATE_SERVICE = UUID.from_16_bits(0x180D, 'Heart Rate')
GATT_PHONE_ALTERT_STATUS_SERVICE = UUID.from_16_bits(0x180E, 'Phone Alert Status')
GATT_DEVICE_BATTERY_SERVICE = UUID.from_16_bits(0x180F, 'Battery')
GATT_HEART_RATE_SERVICE = UUID.from_16_bits(0x180D, 'Heart Rate')
GATT_PHONE_ALERT_STATUS_SERVICE = UUID.from_16_bits(0x180E, 'Phone Alert Status')
GATT_BATTERY_SERVICE = UUID.from_16_bits(0x180F, 'Battery')
GATT_BLOOD_PRESSURE_SERVICE = UUID.from_16_bits(0x1810, 'Blood Pressure')
GATT_ALTERT_NOTIFICATION_SERVICE = UUID.from_16_bits(0x1811, 'Alert Notification')
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE = UUID.from_16_bits(0x1812, 'Human Interface Device')
GATT_DEVICE_SCAN_PARAMETERS_SERVICE = UUID.from_16_bits(0x1813, 'Scan Parameters')
GATT_ALERT_NOTIFICATION_SERVICE = UUID.from_16_bits(0x1811, 'Alert Notification')
GATT_HUMAN_INTERFACE_DEVICE_SERVICE = UUID.from_16_bits(0x1812, 'Human Interface Device')
GATT_SCAN_PARAMETERS_SERVICE = UUID.from_16_bits(0x1813, 'Scan Parameters')
GATT_RUNNING_SPEED_AND_CADENCE_SERVICE = UUID.from_16_bits(0x1814, 'Running Speed and Cadence')
GATT_AUTOMATION_IO_SERVICE = UUID.from_16_bits(0x1815, 'Automation IO')
GATT_CYCLING_SPEED_AND_CADENCE_SERVICE = UUID.from_16_bits(0x1816, 'Cycling Speed and Cadence')
@@ -119,7 +121,7 @@ GATT_ENVIRONMENTAL_SENSING_CONFIGURATION_DESCRIPTOR = UUID.from_16_bits(0x290B,
GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C, 'Environmental Sensing Measurement')
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
GATT_COMPLETE_BE_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
# Device Information Service
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
@@ -132,27 +134,34 @@ GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC = UUID.from_16_bits(0x2A2
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2A2A, 'IEEE 11073-20601 Regulatory Certification Data List')
GATT_PNP_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A50, 'PnP ID')
# Human Interface Device
# Human Interface Device Service
GATT_HID_INFORMATION_CHARACTERISTIC = UUID.from_16_bits(0x2A4A, 'HID Information')
GATT_REPORT_MAP_CHARACTERISTIC = UUID.from_16_bits(0x2A4B, 'Report Map')
GATT_HID_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A4C, 'HID Control Point')
GATT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A4D, 'Report')
GATT_PROTOCOL_MODE_CHARACTERISTIC = UUID.from_16_bits(0x2A4E, 'Protocol Mode')
# Heart Rate Service
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC = UUID.from_16_bits(0x2A37, 'Heart Rate Measurement')
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2A38, 'Body Sensor Location')
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart Rate Control Point')
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Misc
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
GATT_RECONNECTION_ADDRESS_CHARACTERISTIC = UUID.from_16_bits(0x2A03, 'Reconnection Address')
GATT_PERIPHERAL_PREFERRREED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
GATT_RECONNECTION_ADDRESS_CHARACTERISTIC = UUID.from_16_bits(0x2A03, 'Reconnection Address')
GATT_PERIPHERAL_PREFERRED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
# -----------------------------------------------------------------------------
@@ -189,13 +198,24 @@ class Service(Attribute):
self.uuid = uuid
self.included_services = []
self.characteristics = characteristics[:]
self.end_group_handle = 0
self.primary = primary
def __str__(self):
return f'Service(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}){"" if self.primary else "*"}'
# -----------------------------------------------------------------------------
class TemplateService(Service):
'''
Convenience abstract class that can be used by profile-specific subclasses that want
to expose their UUID as a class property
'''
UUID = None
def __init__(self, characteristics, primary=True):
super().__init__(self.UUID, characteristics, primary)
# -----------------------------------------------------------------------------
class Characteristic(Attribute):
'''
@@ -227,56 +247,34 @@ class Characteristic(Attribute):
def property_name(property):
return Characteristic.PROPERTY_NAMES.get(property, '')
@staticmethod
def properties_as_string(properties):
return ','.join([
Characteristic.property_name(p) for p in Characteristic.PROPERTY_NAMES.keys()
if properties & p
])
def __init__(self, uuid, properties, permissions, value = b'', descriptors = []):
# Convert the uuid to a UUID object if it isn't already
if type(uuid) is str:
uuid = UUID(uuid)
super().__init__(uuid, permissions, value)
self.uuid = uuid
self.properties = properties
self._descriptors = descriptors
self._descriptors_discovered = False
self.end_group_handle = 0
self.attach_descriptors()
def attach_descriptors(self):
""" Let all the descriptors know they are attached to this characteristic """
for descriptor in self._descriptors:
descriptor.characteristic = self
def add_descriptor(self, descriptor):
descriptor.characteristic = self
self.descriptors.append(descriptor)
self.uuid = self.type
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors:
if descriptor.uuid == descriptor_type:
return descriptor
@property
def descriptors(self):
return self._descriptors
@descriptors.setter
def descriptors(self, value):
self._descriptors = value
self._descriptors_discovered = True
self.attach_descriptors()
@property
def descriptors_discovered(self):
return self._descriptors_discovered
def get_properties_as_string(self):
return ','.join([self.property_name(p) for p in self.PROPERTY_NAMES.keys() if self.properties & p])
def __str__(self):
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={self.get_properties_as_string()})'
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicValue:
'''
Characteristic value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
'''
def __init__(self, read=None, write=None):
self._read = read
self._write = write
@@ -289,20 +287,148 @@ class CharacteristicValue:
self._write(connection, value)
# -----------------------------------------------------------------------------
class CharacteristicAdapter:
'''
An adapter that can adapt any object with `read_value` and `write_value`
methods (like Characteristic and CharacteristicProxy objects) by wrapping
those methods with ones that return/accept encoded/decoded values.
Objects with async methods are considered proxies, so the adaptation is one
where the return value of `read_value` is decoded and the value passed to
`write_value` is encoded. Other objects are considered local characteristics
so the adaptation is one where the return value of `read_value` is encoded
and the value passed to `write_value` is decoded.
If the characteristic has a `subscribe` method, it is wrapped with one where
the values are decoded before being passed to the subscriber.
'''
def __init__(self, characteristic):
self.wrapped_characteristic = characteristic
if (
asyncio.iscoroutinefunction(characteristic.read_value) and
asyncio.iscoroutinefunction(characteristic.write_value)
):
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
else:
self.read_value = self.read_encoded_value
self.write_value = self.write_encoded_value
if hasattr(self.wrapped_characteristic, 'subscribe'):
self.subscribe = self.wrapped_subscribe
def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name)
def read_encoded_value(self, connection):
return self.encode_value(self.wrapped_characteristic.read_value(connection))
def write_encoded_value(self, connection, value):
return self.wrapped_characteristic.write_value(connection, self.decode_value(value))
async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value):
return await self.wrapped_characteristic.write_value(self.encode_value(value))
def encode_value(self, value):
return value
def decode_value(self, value):
return value
def wrapped_subscribe(self, subscriber=None):
return self.wrapped_characteristic.subscribe(
None if subscriber is None else lambda value: subscriber(self.decode_value(value))
)
# -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(self, characteristic, encode=None, decode=None):
super().__init__(characteristic)
self.encode = encode
self.decode = decode
def encode_value(self, value):
return self.encode(value) if self.encode else value
def decode_value(self, value):
return self.decode(value) if self.decode else value
# -----------------------------------------------------------------------------
class PackedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic, format):
super().__init__(characteristic)
self.struct = struct.Struct(format)
def pack(self, *values):
return self.struct.pack(*values)
def unpack(self, buffer):
return self.struct.unpack(buffer)
def encode_value(self, value):
return self.pack(*value if type(value) is tuple else (value,))
def decode_value(self, value):
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept aa dictionary which
is packed/unpacked according to format, with the arguments extracted from the dictionary
by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(self, characteristic, format, keys):
super().__init__(characteristic, format)
self.keys = keys
def pack(self, values):
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer):
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class UTF8CharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value):
return value.encode('utf-8')
def decode_value(self, value):
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class Descriptor(Attribute):
'''
See Vol 3, Part G - 3.3.3 Characteristic Descriptor Declarations
'''
def __init__(self, uuid, permissions, value = b''):
# Convert the uuid to a UUID object if it isn't already
if type(uuid) is str:
uuid = UUID(uuid)
super().__init__(uuid, permissions, value)
self.uuid = uuid
self.characteristic = None
def __init__(self, descriptor_type, permissions, value = b''):
super().__init__(descriptor_type, permissions, value)
def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, uuid={self.uuid}, value={self.read_value(None).hex()})'
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})'

View File

@@ -35,10 +35,9 @@ from .gatt import (
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_REQUEST_TIMEOUT,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Service,
Characteristic,
Descriptor
Characteristic
)
# -----------------------------------------------------------------------------
@@ -47,6 +46,91 @@ from .gatt import (
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------
class AttributeProxy(EventEmitter):
def __init__(self, client, handle, end_group_handle, attribute_type):
EventEmitter.__init__(self)
self.client = client
self.handle = handle
self.end_group_handle = end_group_handle
self.type = attribute_type
async def read_value(self, no_long_read=False):
return await self.client.read_value(self.handle, no_long_read)
async def write_value(self, value, with_response=False):
return await self.client.write_value(self.handle, value, with_response)
def __str__(self):
return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})'
class ServiceProxy(AttributeProxy):
@staticmethod
def from_client(cls, client, service_uuid):
# The service and its characteristics are considered to have already been discovered
services = client.get_services_by_uuid(service_uuid)
service = services[0] if services else None
return cls(service) if service else None
def __init__(self, client, handle, end_group_handle, uuid, primary=True):
attribute_type = GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE if primary else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE
super().__init__(client, handle, end_group_handle, attribute_type)
self.uuid = uuid
self.characteristics = []
async def discover_characteristics(self, uuids=[]):
return await self.client.discover_characteristics(uuids, self)
def get_characteristics_by_uuid(self, uuid):
return self.client.get_characteristics_by_uuid(uuid, self)
def __str__(self):
return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})'
class CharacteristicProxy(AttributeProxy):
def __init__(self, client, handle, end_group_handle, uuid, properties):
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = properties
self.descriptors = []
self.descriptors_discovered = False
def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors:
if descriptor.type == descriptor_type:
return descriptor
async def discover_descriptors(self):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None):
return await self.client.subscribe(self, subscriber)
def __str__(self):
return f'Characteristic(handle=0x{self.handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
class DescriptorProxy(AttributeProxy):
def __init__(self, client, handle, descriptor_type):
super().__init__(client, handle, 0, descriptor_type)
def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type})'
class ProfileServiceProxy:
'''
Base class for profile-specific service proxies
'''
@classmethod
def from_client(cls, client):
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
# -----------------------------------------------------------------------------
# GATT Client
# -----------------------------------------------------------------------------
@@ -173,10 +257,14 @@ class Client:
logger.warning(f'bogus handle values: {attribute_handle} {end_group_handle}')
return
# Create a primary service object
service = Service(UUID.from_bytes(attribute_value), [], True)
service.handle = attribute_handle
service.end_group_handle = end_group_handle
# Create a service proxy for this service
service = ServiceProxy(
self,
attribute_handle,
end_group_handle,
UUID.from_bytes(attribute_value),
True
)
# Filter out returned services based on the given uuids list
if (not uuids) or (service.uuid in uuids):
@@ -233,10 +321,8 @@ class Client:
logger.warning(f'bogus handle values: {attribute_handle} {end_group_handle}')
return
# Create a primary service object
service = Service(uuid, [], True)
service.handle = attribute_handle
service.end_group_handle = end_group_handle
# Create a service proxy for this service
service = ServiceProxy(self, attribute_handle, end_group_handle, uuid, True)
# Add the service to the peer's service list
services.append(service)
@@ -314,8 +400,7 @@ class Client:
properties, handle = struct.unpack_from('<BH', attribute_value)
characteristic_uuid = UUID.from_bytes(attribute_value[3:])
characteristic = Characteristic(characteristic_uuid, properties, 0)
characteristic.handle = handle
characteristic = CharacteristicProxy(self, handle, 0, characteristic_uuid, properties)
# Set the previous characteristic's end handle
if characteristics:
@@ -382,8 +467,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
descriptor = Descriptor(UUID.from_bytes(attribute_uuid), 0)
descriptor.handle = attribute_handle
descriptor = DescriptorProxy(self, attribute_handle, UUID.from_bytes(attribute_uuid))
descriptors.append(descriptor)
# TODO: read descriptor value
@@ -427,8 +511,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
attribute = Attribute(attribute_uuid, 0)
attribute.handle = attribute_handle
attribute = AttributeProxy(self, attribute_handle, 0, UUID.from_bytes(attribute_uuid))
attributes.append(attribute)
# Move on to the next attributes

View File

@@ -79,6 +79,7 @@ HCI_VERSION_BLUETOOTH_CORE_4_2 = 8
HCI_VERSION_BLUETOOTH_CORE_5_0 = 9
HCI_VERSION_BLUETOOTH_CORE_5_1 = 10
HCI_VERSION_BLUETOOTH_CORE_5_2 = 11
HCI_VERSION_BLUETOOTH_CORE_5_3 = 12
# HCI Packet types
HCI_COMMAND_PACKET = 0x01
@@ -258,6 +259,9 @@ HCI_READ_REMOTE_VERSION_INFORMATION_COMMAND = hci_command_
HCI_READ_CLOCK_OFFSET_COMMAND = hci_command_op_code(0x01, 0x001F)
HCI_IO_CAPABILITY_REQUEST_REPLY_COMMAND = hci_command_op_code(0x01, 0x002B)
HCI_USER_CONFIRMATION_REQUEST_REPLY_COMMAND = hci_command_op_code(0x01, 0x002C)
HCI_USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY_COMMAND = hci_command_op_code(0x01, 0x002D)
HCI_USER_PASSKEY_REQUEST_REPLY_COMMAND = hci_command_op_code(0x01, 0x002E)
HCI_USER_PASSKEY_REQUEST_NEGATIVE_REPLY_COMMAND = hci_command_op_code(0x01, 0x002F)
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND = hci_command_op_code(0x01, 0x003D)
HCI_SNIFF_MODE_COMMAND = hci_command_op_code(0x02, 0x0003)
HCI_EXIT_SNIFF_MODE_COMMAND = hci_command_op_code(0x02, 0x0004)
@@ -407,6 +411,9 @@ HCI_COMMAND_NAMES = {
HCI_READ_CLOCK_OFFSET_COMMAND: 'HCI_READ_CLOCK_OFFSET_COMMAND',
HCI_IO_CAPABILITY_REQUEST_REPLY_COMMAND: 'HCI_IO_CAPABILITY_REQUEST_REPLY_COMMAND',
HCI_USER_CONFIRMATION_REQUEST_REPLY_COMMAND: 'HCI_USER_CONFIRMATION_REQUEST_REPLY_COMMAND',
HCI_USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY_COMMAND: 'HCI_USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY_COMMAND',
HCI_USER_PASSKEY_REQUEST_REPLY_COMMAND: 'HCI_USER_PASSKEY_REQUEST_REPLY_COMMAND',
HCI_USER_PASSKEY_REQUEST_NEGATIVE_REPLY_COMMAND: 'HCI_USER_PASSKEY_REQUEST_NEGATIVE_REPLY_COMMAND',
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND: 'HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND',
HCI_SNIFF_MODE_COMMAND: 'HCI_SNIFF_MODE_COMMAND',
HCI_EXIT_SNIFF_MODE_COMMAND: 'HCI_EXIT_SNIFF_MODE_COMMAND',
@@ -683,20 +690,20 @@ HCI_IO_CAPABILITY_NAMES = {
}
# Authentication Requirements
HCI_MITM_NOT_REQUIRED_NO_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS = 0x00
HCI_MITM_REQUIRED_NO_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS = 0x01
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS = 0x02
HCI_MITM_REQUIRED_DEDICATED_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS = 0x03
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS = 0x04
HCI_MITM_REQUIRED_GENERAL_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS = 0x05
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS = 0x00
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS = 0x01
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS = 0x02
HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS = 0x03
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS = 0x04
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS = 0x05
HCI_AUTHENTICATION_REQUIREMENTS_NAMES = {
HCI_MITM_NOT_REQUIRED_NO_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_NO_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_NO_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_NO_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_DEDICATED_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_DEDICATED_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_GENERAL_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_GENERAL_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS'
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS'
}
# Link Key Types
@@ -723,7 +730,7 @@ HCI_LINK_TYPE_NAMES = {
}
# Address types
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x0
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
@@ -1371,6 +1378,9 @@ class HCI_Remote_Name_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.19 Remote Name Request Command
'''
R0 = 0x00
R1 = 0x01
R2 = 0x02
# -----------------------------------------------------------------------------
@@ -1449,6 +1459,55 @@ class HCI_User_Confirmation_Request_Reply_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address)
]
)
class HCI_User_Confirmation_Request_Negative_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.31 User Confirmation Request Negative Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('numeric_value', 4)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address)
]
)
class HCI_User_Passkey_Request_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.32 User Passkey Request Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address)
]
)
class HCI_User_Passkey_Request_Negative_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.33 User Passkey Request Negative Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('connection_handle', 2),
@@ -3367,6 +3426,16 @@ class HCI_User_Confirmation_Request_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('bd_addr', Address.parse_address)
])
class HCI_User_Passkey_Request_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.43 User Passkey Request Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('status', STATUS_SPEC),

View File

@@ -85,6 +85,7 @@ class Host(EventEmitter):
self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
# Connect to the source and sink if specified
if controller_source:
@@ -363,7 +364,7 @@ class Host(EventEmitter):
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
# Notify the client
self.emit('connection_failure', event.status)
self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_disconnection_complete_event(self, event):
# Find the connection
@@ -495,7 +496,7 @@ class Host(EventEmitter):
def on_hci_authentication_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
self.emit('connection_authentication_complete', event.connection_handle)
self.emit('connection_authentication', event.connection_handle)
else:
self.emit('connection_authentication_failure', event.connection_handle, event.status)
@@ -560,26 +561,16 @@ class Host(EventEmitter):
asyncio.create_task(send_link_key())
def on_hci_io_capability_request_event(self, event):
# For now, just return NoInputNoOutput and no MITM
# TODO: delegate the decision
self.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr = event.bd_addr,
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
oob_data_present = 0x00,
authentication_requirements = 0x00 # 0x02 # FIXME: testing only
)
)
self.emit('authentication_io_capability_request', event.bd_addr)
def on_hci_io_capability_response_event(self, event):
pass
def on_hci_user_confirmation_request_event(self, event):
# For now, just confirm everything
# TODO: delegate the decision
self.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr = event.bd_addr)
)
self.emit('authentication_user_confirmation_request', event.bd_addr, event.numeric_value)
def on_hci_user_passkey_request_event(self, event):
self.emit('authentication_user_passkey_request', event.bd_addr)
def on_hci_inquiry_complete_event(self, event):
self.emit('inquiry_complete')
@@ -602,3 +593,9 @@ class Host(EventEmitter):
event.extended_inquiry_response,
event.rssi
)
def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
self.emit('remote_name', event.bd_addr, event.remote_name)

View File

@@ -0,0 +1,13 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,61 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
GATT_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
PackedCharacteristicAdapter
)
# -----------------------------------------------------------------------------
class BatteryService(TemplateService):
UUID = GATT_BATTERY_SERVICE
BATTERY_LEVEL_FORMAT = 'B'
def __init__(self, read_battery_level):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level)
),
format=BatteryService.BATTERY_LEVEL_FORMAT
)
super().__init__([self.battery_level_characteristic])
# -----------------------------------------------------------------------------
class BatteryServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = BatteryService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_BATTERY_LEVEL_CHARACTERISTIC):
self.battery_level = PackedCharacteristicAdapter(
characteristics[0],
format=BatteryService.BATTERY_LEVEL_FORMAT
)
else:
self.battery_level = None

View File

@@ -0,0 +1,135 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_MODEL_NUMBER_STRING_CHARACTERISTIC,
GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC,
GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC,
GATT_SYSTEM_ID_CHARACTERISTIC,
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter
)
# -----------------------------------------------------------------------------
class DeviceInformationService(TemplateService):
UUID = GATT_DEVICE_INFORMATION_SERVICE
@staticmethod
def pack_system_id(oui, manufacturer_id):
return struct.pack('<Q', oui << 40 | manufacturer_id)
@staticmethod
def unpack_system_id(buffer):
system_id = struct.unpack('<Q', buffer)[0]
return (system_id >> 40, system_id & 0xFFFFFFFFFF)
def __init__(
self,
manufacturer_name: str = None,
model_number: str = None,
serial_number: str = None,
hardware_revision: str = None,
firmware_revision: str = None,
software_revision: str = None,
system_id: Tuple[int, int] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: bytes = None
# TODO: pnp_id
):
characteristics = [
Characteristic(
uuid,
Characteristic.READ,
Characteristic.READABLE,
field
)
for (field, uuid) in (
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
(model_number, GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
(serial_number, GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC),
(hardware_revision, GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC),
(firmware_revision, GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC),
(software_revision, GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC)
)
if field is not None
]
if system_id is not None:
characteristics.append(Characteristic(
GATT_SYSTEM_ID_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
self.pack_system_id(*system_id)
))
if ieee_regulatory_certification_data_list is not None:
characteristics.append(Characteristic(
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
ieee_regulatory_certification_data_list
))
super().__init__(characteristics)
# -----------------------------------------------------------------------------
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
for (field, uuid) in (
('manufacturer_name', GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
('model_number', GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
('serial_number', GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC),
('hardware_revision', GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC),
('firmware_revision', GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC),
('software_revision', GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC)
):
if characteristics := service_proxy.get_characteristics_by_uuid(uuid):
characteristic = UTF8CharacteristicAdapter(characteristics[0])
else:
characteristic = None
self.__setattr__(field, characteristic)
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_SYSTEM_ID_CHARACTERISTIC):
self.system_id = DelegatedCharacteristicAdapter(
characteristics[0],
encode=lambda v: DeviceInformationService.pack_system_id(*v),
decode=DeviceInformationService.unpack_system_id
)
else:
self.system_id = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC):
self.ieee_regulatory_certification_data_list = characteristics[0]
else:
self.ieee_regulatory_certification_data_list = None

View File

@@ -0,0 +1,221 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from enum import IntEnum
import struct
from ..gatt_client import ProfileServiceProxy
from ..att import ATT_Error
from ..gatt import (
GATT_HEART_RATE_SERVICE,
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter
)
# -----------------------------------------------------------------------------
class HeartRateService(TemplateService):
UUID = GATT_HEART_RATE_SERVICE
HEART_RATE_CONTROL_POINT_FORMAT = 'B'
CONTROL_POINT_NOT_SUPPORTED = 0x80
RESET_ENERGY_EXPENDED = 0x01
class BodySensorLocation(IntEnum):
OTHER = 0,
CHEST = 1,
WRIST = 2,
FINGER = 3,
HAND = 4,
EAR_LOBE = 5,
FOOT = 6
class HeartRateMeasurement:
def __init__(
self,
heart_rate,
sensor_contact_detected=None,
energy_expended=None,
rr_intervals=None
):
if heart_rate < 0 or heart_rate > 0xFFFF:
raise ValueError('heart_rate out of range')
if energy_expended is not None and (energy_expended < 0 or energy_expended > 0xFFFF):
raise ValueError('energy_expended out of range')
if rr_intervals:
for rr_interval in rr_intervals:
if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
raise ValueError('rr_intervals out of range')
self.heart_rate = heart_rate
self.sensor_contact_detected = sensor_contact_detected
self.energy_expended = energy_expended
self.rr_intervals = rr_intervals
@classmethod
def from_bytes(cls, data):
flags = data[0]
offset = 1
if flags & 1:
hr = struct.unpack_from('<H', data, offset)[0]
offset += 2
else:
hr = struct.unpack_from('B', data, offset)[0]
offset += 1
if flags & (1 << 2):
sensor_contact_detected = (flags & (1 << 1) != 0)
else:
sensor_contact_detected = None
if flags & (1 << 3):
energy_expended = struct.unpack_from('<H', data, offset)[0]
offset += 2
else:
energy_expended = None
if flags & (1 << 4):
rr_intervals = tuple(
struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
for i in range((len(data) - offset) // 2)
)
else:
rr_intervals = ()
return cls(hr, sensor_contact_detected, energy_expended, rr_intervals)
def __bytes__(self):
if self.heart_rate < 256:
flags = 0
data = struct.pack('B', self.heart_rate)
else:
flags = 1
data = struct.pack('<H', self.heart_rate)
if self.sensor_contact_detected is not None:
flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2)
if self.energy_expended is not None:
flags |= (1 << 3)
data += struct.pack('<H', self.energy_expended)
if self.rr_intervals:
flags |= (1 << 4)
data += b''.join([
struct.pack('<H', int(rr_interval * 1024))
for rr_interval in self.rr_intervals
])
return bytes([flags]) + data
def __str__(self):
return f'HeartRateMeasurement(heart_rate={self.heart_rate},'\
f' sensor_contact_detected={self.sensor_contact_detected},'\
f' energy_expended={self.energy_expended},'\
f' rr_intervals={self.rr_intervals})'
def __init__(
self,
read_heart_rate_measurement,
body_sensor_location=None,
reset_energy_expended=None
):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.NOTIFY,
0,
CharacteristicValue(read=read_heart_rate_measurement)
),
encode=lambda value: bytes(value)
)
characteristics = [self.heart_rate_measurement_characteristic]
if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([int(body_sensor_location)])
)
characteristics.append(self.body_sensor_location_characteristic)
if reset_energy_expended:
def write_heart_rate_control_point_value(connection, value):
if value == self.RESET_ENERGY_EXPENDED:
if reset_energy_expended is not None:
reset_energy_expended(connection)
else:
raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=write_heart_rate_control_point_value)
),
format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT
)
characteristics.append(self.heart_rate_control_point_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
class HeartRateServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = HeartRateService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC):
self.heart_rate_measurement = DelegatedCharacteristicAdapter(
characteristics[0],
decode=HeartRateService.HeartRateMeasurement.from_bytes
)
else:
self.heart_rate_measurement = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC):
self.body_sensor_location = DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda value: HeartRateService.BodySensorLocation(value[0])
)
else:
self.body_sensor_location = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC):
self.heart_rate_control_point = PackedCharacteristicAdapter(
characteristics[0],
format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT
)
else:
self.heart_rate_control_point = None
async def reset_energy_expended(self):
if self.heart_rate_control_point is not None:
return await self.heart_rate_control_point.write_value(HeartRateService.RESET_ENERGY_EXPENDED)

View File

@@ -150,6 +150,8 @@ SMP_SC_AUTHREQ = 0b00001000
SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
# -----------------------------------------------------------------------------
# Utils
@@ -457,22 +459,38 @@ class PairingDelegate:
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
DEFAULT_KEY_DISTRIBUTION = (SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG)
def __init__(self, io_capability = NO_OUTPUT_NO_INPUT):
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION
):
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self):
return True
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits=6):
return True
async def get_number(self):
return 0
async def display_number(self, number):
async def display_number(self, number, digits=6):
pass
async def key_distribution_response(self, peer_initiator_key_distribution, peer_responder_key_distribution):
return (
(peer_initiator_key_distribution &
self.local_initiator_key_distribution),
(peer_responder_key_distribution &
self.local_responder_key_distribution)
)
# -----------------------------------------------------------------------------
class PairingConfig:
@@ -559,6 +577,7 @@ class Session:
self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key = None
self.initiator_key_distribution = 0
self.responder_key_distribution = 0
self.peer_random_value = None
@@ -596,11 +615,8 @@ class Session:
self.pairing_result = None
# Key Distribution (default values before negotiation)
self.initiator_key_distribution = (
SMP_ENC_KEY_DISTRIBUTION_FLAG |
SMP_ID_KEY_DISTRIBUTION_FLAG # |SMP_SIGN_KEY_DISTRIBUTION_FLAG
)
self.responder_key_distribution = self.initiator_key_distribution
self.initiator_key_distribution = pairing_config.delegate.local_initiator_key_distribution
self.responder_key_distribution = pairing_config.delegate.local_responder_key_distribution
# Authentication Requirements Flags - Vol 3, Part H, Figure 3.3
self.bonding = pairing_config.bonding
@@ -699,7 +715,7 @@ class Session:
async def prompt():
logger.debug(f'verification code: {code}')
try:
response = await self.pairing_config.delegate.compare_numbers(code)
response = await self.pairing_config.delegate.compare_numbers(code, digits=6)
if response:
next_steps()
return
@@ -733,7 +749,7 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey))
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey, digits=6))
def input_passkey(self, next_steps=None):
# Prompt the user for the passkey displayed on the peer
@@ -870,47 +886,56 @@ class Session:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute IRK
# Distribute IRK & BD ADDR
if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
self.send_command(
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
# Distribute BD ADDR
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
# Distribute CSRK
csrk = bytes(16) # FIXME: testing
if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
salt=SMP_CTKD_H7_LEBR_SALT,
w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1')
self.link_key = crypto.h6(ilk, b'lebr')
else:
# Distribute the LTK
# Distribute the LTK, EDIV and RAND
if not self.sc:
if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute EDIV and RAND
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute IRK
# Distribute IRK & BD ADDR
if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
self.send_command(
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
# Distribute BD ADDR
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
# Distribute CSRK
csrk = bytes(16) # FIXME: testing
if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
salt=SMP_CTKD_H7_LEBR_SALT,
w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1')
self.link_key = crypto.h6(ilk, b'lebr')
def compute_peer_expected_distributions(self, key_distribution_flags):
# Set our expectations for what to wait for in the key distribution phase
@@ -945,7 +970,7 @@ class Session:
# Nothing left to expect, we're done
self.on_pairing()
else:
logger.warn(color('!!! unexpected key distribution command', 'red'))
logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red'))
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
async def pair(self):
@@ -1029,6 +1054,11 @@ class Session:
value = self.peer_signature_key,
authenticated = authenticated
)
if self.link_key is not None:
keys.link_key = PairingKeys.Key(
value = self.link_key,
authenticated = authenticated
)
self.manager.on_pairing(self, peer_address, keys)
@@ -1076,6 +1106,7 @@ class Session:
# Bonding and SC require both sides to request/support it
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
# Check for OOB
if command.oob_data_flag != 0:
@@ -1091,8 +1122,8 @@ class Session:
logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}')
# Key distribution
self.initiator_key_distribution &= command.initiator_key_distribution
self.responder_key_distribution &= command.responder_key_distribution
self.initiator_key_distribution, self.responder_key_distribution = await self.pairing_config.delegate.key_distribution_response(
command.initiator_key_distribution, command.responder_key_distribution)
self.compute_peer_expected_distributions(self.initiator_key_distribution)
# The pairing is now starting

View File

@@ -221,7 +221,7 @@ async def open_usb_transport(spec):
pass
logger.debug('USB event loop done')
self.event_loop_done.set_result(None)
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.closed = True

View File

@@ -0,0 +1,2 @@
GATT DUMP TOOL
==============

View File

@@ -5,6 +5,8 @@ Included in the project are a few apps and tools, built on top of the core libra
These include:
* [Console](console.md) - an interactive text-based console
* [Pair](pair.md) - Pair/bond two devices (LE and Classic)
* [Unbond](unbond.md) - Remove a previously established bond
* [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets
* [Golden Gate Bridge](gg_bridge.md) - a bridge between GATT and UDP to use with the Golden Gate "stack tool"
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form

View File

@@ -0,0 +1,2 @@
PAIR TOOL
=========

View File

@@ -0,0 +1,2 @@
SHOW TOOL
=========

View File

@@ -0,0 +1,2 @@
UNBOND TOOL
===========

View File

@@ -0,0 +1,72 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Battery Service
peer = Peer(connection)
print('=== Discovering Battery Service')
battery_service = await peer.discover_service_and_create_proxy(BatteryServiceProxy)
# Check that the service was found
if not battery_service:
print('!!! Service not found')
return
# Subscribe to and read the battery level
if battery_service.battery_level:
await battery_service.battery_level.subscribe(
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
)
value = await battery_service.battery_level.read_value()
print(f'{color("Initial Battery Level:", "green")} {value}')
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -25,59 +25,41 @@ import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.gatt import (
Service,
Characteristic,
CharacteristicValue,
GATT_DEVICE_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC
)
# -----------------------------------------------------------------------------
def read_battery_level(connection):
return bytes([random.randint(0, 100)])
from bumble.profiles.battery_service import BatteryService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python battery_service.py <device-config> <transport-spec>')
print('example: python battery_service.py device1.json usb:0')
print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Add a Battery Service to the GATT sever
device.add_services([
Service(
GATT_DEVICE_BATTERY_SERVICE,
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level)
)
]
)
])
battery_service = BatteryService(lambda _: random.randint(0, 100))
device.add_service(battery_service)
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Battery', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, struct.pack('<H', 0x180F)),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(battery_service.uuid)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising()
await hci_source.wait_for_termination()
await device.start_advertising(auto_restart=True)
# Notify every 3 seconds
while True:
await asyncio.sleep(3.0)
await device.notify_subscribers(battery_service.battery_level_characteristic)
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())

View File

@@ -0,0 +1,80 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: device_information_client.py <transport-spec> <bluetooth-address>')
print('example: device_information_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Device Information service
peer = Peer(connection)
print('=== Discovering Device Information Service')
device_information_service = await peer.discover_service_and_create_proxy(DeviceInformationServiceProxy)
# Check that the service was found
if device_information_service is None:
print('!!! Service not found')
return
# Read and print the fields
if device_information_service.manufacturer_name is not None:
print(color('Manufacturer Name: ', 'green'), await device_information_service.manufacturer_name.read_value())
if device_information_service.model_number is not None:
print(color('Model Number: ', 'green'), await device_information_service.model_number.read_value())
if device_information_service.serial_number is not None:
print(color('Serial Number: ', 'green'), await device_information_service.serial_number.read_value())
if device_information_service.hardware_revision is not None:
print(color('Hardware Revision: ', 'green'), await device_information_service.hardware_revision.read_value())
if device_information_service.firmware_revision is not None:
print(color('Firmware Revision: ', 'green'), await device_information_service.firmware_revision.read_value())
if device_information_service.software_revision is not None:
print(color('Software Revision: ', 'green'), await device_information_service.software_revision.read_value())
if device_information_service.system_id is not None:
print(color('System ID: ', 'green'), await device_information_service.system_id.read_value())
if device_information_service.ieee_regulatory_certification_data_list is not None:
print(color('Regulatory Certification:', 'green'), (await device_information_service.ieee_regulatory_certification_data_list.read_value()).hex())
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,66 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.profiles.device_information_service import DeviceInformationService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService(
manufacturer_name = 'ACME',
model_number = 'AB-102',
serial_number = '7654321',
hardware_revision = '1.1.3',
software_revision = '2.5.6',
system_id = (0x123456, 0x8877665544)
)
device.add_service(device_information_service)
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Device', 'utf-8')),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -1,96 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.transport import open_transport
from bumble.utils import AsyncRunner
from bumble import gatt
# -----------------------------------------------------------------------------
class Listener(Device.Listener):
def __init__(self, device):
self.device = device
self.done = asyncio.get_running_loop().create_future()
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
print(f'=== Connected to {connection}')
# Discover the Device Info service
peer = Peer(connection)
print('=== Discovering Device Info')
await peer.discover_services([gatt.GATT_DEVICE_INFORMATION_SERVICE])
# Check that the service was found
device_info_services = peer.get_services_by_uuid(gatt.GATT_DEVICE_INFORMATION_SERVICE)
if not device_info_services:
print('!!! Service not found')
return
# Get the characteristics we want from the (first) device info service
service = device_info_services[0]
await peer.discover_characteristics([
gatt.GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC
], service)
# Read the manufacturer name
manufacturer_name = peer.get_characteristics_by_uuid(gatt.GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, service)
if manufacturer_name:
value = await peer.read_value(manufacturer_name[0])
print(color('Manufacturer Name:', 'green'), value.decode('utf-8'))
else:
print('>>> No manufacturer name')
self.done.set_result(None)
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: get_peer_device_info.py <transport-spec> <bluetooth-address>')
print('example: get_peer_device_info.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
packet_source, packet_sink = await open_transport(sys.argv[1])
print('<<< connected')
# Create a host using the packet source and sink as controller
host = Host(controller_source=packet_source, controller_sink=packet_sink)
# Create a device to manage the host, with a custom listener
device = Device('Bumble', address = 'F0:F1:F2:F3:F4:F5', host = host)
device.listener = Listener(device)
await device.power_on()
# Connect to a peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
await device.connect(target_address)
await device.listener.done
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,75 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Heart Rate Service
peer = Peer(connection)
print('=== Discovering Heart Rate Service')
heart_rate_service = await peer.discover_service_and_create_proxy(HeartRateServiceProxy)
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,95 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import sys
import time
import math
import random
import struct
import logging
import asyncio
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.profiles.device_information_service import DeviceInformationService
from bumble.profiles.heart_rate_service import HeartRateService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Keep track of accumulated expended energy
energy_start_time = time.time()
def reset_energy_expended():
nonlocal energy_start_time
energy_start_time = time.time()
# Add a Device Information Service and Heart Rate Service to the GATT sever
device_information_service = DeviceInformationService(
manufacturer_name = 'ACME',
model_number = 'HR-102',
serial_number = '7654321',
hardware_revision = '1.1.3',
software_revision = '2.5.6',
system_id = (0x123456, 0x8877665544)
)
heart_rate_service = HeartRateService(
read_heart_rate_measurement = lambda _: HeartRateService.HeartRateMeasurement(
heart_rate = 100 + int(50 * math.sin(time.time() * math.pi / 60)),
sensor_contact_detected = random.choice((True, False, None)),
energy_expended = random.choice((int((time.time() - energy_start_time) * 100), None)),
rr_intervals = random.choice(((random.randint(900, 1100) / 1000, random.randint(900, 1100) / 1000), None))
),
body_sensor_location=HeartRateService.BodySensorLocation.WRIST,
reset_energy_expended=lambda _: reset_energy_expended()
)
device.add_services([device_information_service, heart_rate_service])
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Heart', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(heart_rate_service.uuid)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
# Notify every 3 seconds
while True:
await asyncio.sleep(3.0)
await device.notify_subscribers(heart_rate_service.heart_rate_measurement_characteristic)
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -45,8 +45,7 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller('C1', host_source = hci_source, host_sink = hci_sink, link = link)
print("====", sys.argv)
controller1.address = sys.argv[1]
controller1.random_address = sys.argv[1]
# Create a second controller using the same link
controller2 = Controller('C2', link = link)

View File

@@ -41,10 +41,10 @@ class Listener(Device.Listener):
print('=== Discovering services')
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
for service in peer.services:
await service.discover_characteristics()
for characteristic in service.characteristics:
await peer.discover_descriptors(characteristic)
await characteristic.discover_descriptors()
print('=== Services discovered')
show_services(peer.services)

View File

@@ -25,7 +25,6 @@ from bumble.controller import Controller
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.link import LocalLink
from bumble.utils import AsyncRunner
from bumble.gatt import (
Service,
Characteristic,
@@ -37,43 +36,6 @@ from bumble.gatt import (
)
# -----------------------------------------------------------------------------
class ClientListener(Device.Listener):
def __init__(self, device):
self.device = device
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
print(f'=== Client: connected to {connection}')
# Discover all services
print('=== Discovering services')
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
for service in peer.services:
for characteristic in service.characteristics:
await peer.discover_descriptors(characteristic)
print('=== Services discovered')
show_services(peer.services)
# Discover all attributes
print('=== Discovering attributes')
attributes = await peer.discover_attributes()
for attribute in attributes:
print(attribute)
print('=== Attributes discovered')
# Read all attributes
for attribute in attributes:
try:
value = await peer.read_value(attribute)
print(color(f'0x{attribute.handle:04X} = {value.hex()}', 'green'))
except ProtocolError as error:
print(color(f'cannot read {attribute.handle:04X}:', 'red'), error)
# -----------------------------------------------------------------------------
class ServerListener(Device.Listener):
def on_connection(self, connection):
@@ -90,7 +52,6 @@ async def main():
client_host = Host()
client_host.controller = client_controller
client_device = Device("client", address = 'F0:F1:F2:F3:F4:F5', host = client_host)
client_device.listener = ClientListener(client_device)
await client_device.power_on()
# Setup a stack for the server
@@ -116,7 +77,36 @@ async def main():
server_device.add_service(device_info_service)
# Connect the client to the server
await client_device.connect(server_device.address)
connection = await client_device.connect(server_device.random_address)
print(f'=== Client: connected to {connection}')
# Discover all services
print('=== Discovering services')
peer = Peer(connection)
await peer.discover_services()
for service in peer.services:
await service.discover_characteristics()
for characteristic in service.characteristics:
await characteristic.discover_descriptors()
print('=== Services discovered')
show_services(peer.services)
# Discover all attributes
print('=== Discovering attributes')
attributes = await peer.discover_attributes()
for attribute in attributes:
print(attribute)
print('=== Attributes discovered')
# Read all attributes
for attribute in attributes:
try:
value = await attribute.read_value()
print(color(f'0x{attribute.handle:04X} = {value.hex()}', 'green'))
except ProtocolError as error:
print(color(f'cannot read {attribute.handle:04X}:', 'red'), error)
await asyncio.get_running_loop().create_future()
# -----------------------------------------------------------------------------

View File

@@ -18,6 +18,7 @@
import asyncio
import logging
import os
import struct
import pytest
from bumble.controller import Controller
@@ -25,6 +26,12 @@ from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue
@@ -91,6 +98,96 @@ def test_ATT_Read_By_Group_Type_Request():
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v)
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == v)
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(c.value == v)
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None)
assert(value == bytes(reversed(v)))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(a.value == bytes(reversed(v)))
# Packed adapter with single element format
v = 1234
pv = struct.pack('>H', v)
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == v)
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == (v1, v2))
# Mapped adapter
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == mapped)
# UTF-8 adapter
v = 'Hello π'
ev = v.encode('utf-8')
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == ev)
c.value = None
a.write_value(None, ev)
assert(a.value == v)
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert(x == b)
result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object()
c.write(z, b)
assert(result == [(z, b)])
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
@@ -199,6 +296,56 @@ async def test_read_write():
assert(characteristic2._last_value[1] == b)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = TwoDevices().devices
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert(len(c) == 1)
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
c1 = c[0]
v1 = await c1.read_value()
assert(v1 == v)
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', v)[0])
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert(characteristic1.value == b)
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', b)[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
@@ -330,6 +477,7 @@ async def test_subscribe_notify():
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
# -----------------------------------------------------------------------------
@@ -338,4 +486,6 @@ if __name__ == '__main__':
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())

View File

@@ -1,2 +1,3 @@
[pytest]
junit_logging = all
asyncio_mode = auto

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import itertools
import logging
import os
import pytest
@@ -30,7 +31,8 @@ from bumble.smp import (
PairingConfig,
PairingDelegate,
SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR
SMP_CONFIRM_VALUE_FAILED_ERROR,
SMP_ID_KEY_DISTRIBUTION_FLAG,
)
from bumble.core import ProtocolError
@@ -196,11 +198,28 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# -----------------------------------------------------------------------------
IO_CAP = [
PairingDelegate.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
]
SC = [False, True]
MITM = [False, True]
# Key distribution is a 4-bit bitmask
# IdKey is necessary for current SMP structure
KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)]
@pytest.mark.asyncio
async def test_self_smp():
@pytest.mark.parametrize('io_cap, sc, mitm, key_dist',
itertools.product(IO_CAP, SC, MITM, KEY_DIST)
)
async def test_self_smp(io_cap, sc, mitm, key_dist):
class Delegate(PairingDelegate):
def __init__(self, name, io_capability):
super().__init__(io_capability)
def __init__(self, name, io_capability, local_initiator_key_distribution, local_responder_key_distribution):
super().__init__(io_capability, local_initiator_key_distribution,
local_responder_key_distribution)
self.name = name
self.reset()
@@ -208,11 +227,11 @@ async def test_self_smp():
self.peer_delegate = None
self.number = asyncio.get_running_loop().create_future()
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
if self.peer_delegate is None:
logger.warn(f'[{self.name}] no peer delegate')
return False
await self.display_number(number)
await self.display_number(number, digits=6)
logger.debug(f'[{self.name}] waiting for peer number')
peer_number = await self.peer_delegate.number
logger.debug(f'[{self.name}] comparing numbers: {number} and {peer_number}')
@@ -231,7 +250,7 @@ async def test_self_smp():
logger.debug(f'[{self.name}] returning number: {peer_number}')
return peer_number
async def display_number(self, number):
async def display_number(self, number, digits):
logger.debug(f'[{self.name}] displaying number: {number}')
self.number.set_result(number)
@@ -240,17 +259,8 @@ async def test_self_smp():
pairing_config_sets = [('Initiator', [None]), ('Responder', [None])]
for pairing_config_set in pairing_config_sets:
for io_capability in [
PairingDelegate.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
]:
for sc in [False, True]:
for mitm in [False, True]:
delegate = Delegate(pairing_config_set[0], io_capability)
pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate))
delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist)
pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate))
for pairing_config1 in pairing_config_sets[0][1]:
for pairing_config2 in pairing_config_sets[1][1]:
@@ -262,7 +272,9 @@ async def test_self_smp():
if pairing_config1 and pairing_config2:
pairing_config1.delegate.peer_delegate = pairing_config2.delegate
pairing_config2.delegate.peer_delegate = pairing_config1.delegate
await _test_self_smp_with_configs(pairing_config1, pairing_config2)
# -----------------------------------------------------------------------------
@@ -293,7 +305,7 @@ async def test_self_smp_wrong_pin():
def __init__(self):
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
return False
wrong_pin_pairing_config = PairingConfig(delegate = WrongPinDelegate())

View File

@@ -176,6 +176,20 @@ def test_g2():
assert(value == 0x2f9ed5ba)
# -----------------------------------------------------------------------------
def test_h6():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY_ID = bytes.fromhex('6c656272')
assert(h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399'))
# -----------------------------------------------------------------------------
def test_h7():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
assert(h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011'))
# -----------------------------------------------------------------------------
def test_ah():
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')))
@@ -195,4 +209,6 @@ if __name__ == '__main__':
test_f5()
test_f6()
test_g2()
test_h6()
test_h7()
test_ah()