Compare commits

...

33 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
3040df3179 add support for the heart rate service 2022-07-23 09:38:44 -07:00
Gilles Boccon-Gibod
c66b357de6 Merge pull request #13 from google/gbg/standard-profiles
support for type adapters and framework for standard GATT profiles
2022-07-22 10:21:39 -07:00
Gilles Boccon-Gibod
e156ed3758 add in-context uuids and service proxy factories 2022-07-20 19:56:40 -07:00
Gilles Boccon-Gibod
0ffed3deff Merge pull request #11 from zxzxwu/main
Implement CTKD over LE and key distribution delegation
2022-07-20 15:35:26 -07:00
Josh Wu
2f949a1182 Delegate SMP key distribution
* Delegate SMP key distribution
* Align LE pairing key expectation
* Parametrize SMP self test, and add key distribution coverage
2022-07-21 01:19:36 +08:00
Josh Wu
4e2fae5145 Implement CTKD over LE 2022-07-21 01:19:25 +08:00
Gilles Boccon-Gibod
2b58364c51 Merge pull request #14 from zxzxwu/conn-lookup
Refactor find_connection_by_bd_addr
2022-07-20 08:26:04 -07:00
Josh Wu
e3bf7c4b53 Refactor find_connection_by_bd_addr
* Compare only address bytes because Address.__eq__ also compares types.
* Add a transport field to find connection to a device on specific
  transport. (It's possible to connect a device on both BR/EDR and LE)
2022-07-20 21:32:20 +08:00
Gilles Boccon-Gibod
bd28892734 add support for type adapters and framework for adding standard GATT profiles 2022-07-19 19:42:21 -07:00
Gilles Boccon-Gibod
b64fa65921 Merge pull request #10 from zxzxwu/main
Make pairing and link mode configurable
2022-07-18 12:48:07 -07:00
Josh Wu
7d87c3cc3a Make pairing and link mode configurable 2022-07-18 14:28:21 +08:00
Gilles Boccon-Gibod
94fc81c183 Merge pull request #7 from DeltaEvo/david/config-load
Make DeviceConfiguration loadable from a dict
2022-06-25 05:08:44 -07:00
Gilles Boccon-Gibod
b65b395fc4 Merge pull request #8 from Arrowbox/threadsafe_usb_close
Use threadsafe call when setting event_loop_done
2022-06-25 05:07:42 -07:00
David Duarte
0f157d55f7 Make DeviceConfiguration loadable from a dict 2022-06-24 09:57:16 +00:00
Jayson Messenger
925d79491f Use threadsafe call when setting event_loop_done
Previously, the close method would hang waiting on the future to be
done.
2022-06-23 15:19:05 -04:00
Gilles Boccon-Gibod
3d14df909c Merge pull request #5 from google/gbg/disconnection-event-routing
fix the routing of disconnection events
2022-06-15 10:50:14 -07:00
Gilles Boccon-Gibod
153788afe3 fix the routing of disconnection events 2022-06-14 14:38:40 -07:00
Gilles Boccon-Gibod
99ca31c063 Merge pull request #4 from google/gbg/classic-pairing-io
classic pairing io
2022-06-14 11:32:37 -07:00
Gilles Boccon-Gibod
9629e677f2 improve readability as per PR suggestion 2022-06-14 10:57:54 -07:00
Gilles Boccon-Gibod
250c1e3395 address PR comments 2022-06-13 16:44:57 -07:00
Gilles Boccon-Gibod
70dca1d7c9 cosmetic fix 2022-06-11 15:50:53 -07:00
Gilles Boccon-Gibod
a5015c1305 add pytest async options 2022-06-11 12:28:00 -07:00
Gilles Boccon-Gibod
6e22df4838 add doc structure 2022-06-11 12:19:54 -07:00
Gilles Boccon-Gibod
b4e2f21d2a add classic pairing io delegation 2022-06-11 01:33:51 -07:00
Gilles Boccon-Gibod
1af61e8af3 Update getting_started.md 2022-06-04 23:03:31 -07:00
Gilles Boccon-Gibod
e11119c565 Update README.md 2022-06-04 22:57:14 -07:00
Gilles Boccon-Gibod
b1a31564ef Merge pull request #3 from google/gbg/usb-serial-number
gbg/usb serial number
2022-06-04 20:47:18 -07:00
Gilles Boccon-Gibod
01492d510c close device inside for loop 2022-06-03 19:53:26 -07:00
Gilles Boccon-Gibod
302c495178 fix mkdocstrings python dependency 2022-06-02 15:56:01 -07:00
Gilles Boccon-Gibod
fc7923f83b add missing paths config for mkdocstrings 2022-06-02 15:47:27 -07:00
Gilles Boccon-Gibod
a9bd77e6ee add build workflow 2022-06-02 15:24:45 -07:00
Gilles Boccon-Gibod
ce0cf5fd27 refactor doc 2022-06-02 15:07:50 -07:00
Gilles Boccon-Gibod
86ded3fece support selecting usb device by serial number 2022-05-29 16:30:05 -07:00
45 changed files with 2058 additions and 517 deletions

View File

@@ -24,11 +24,11 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install build
pip install ".[test]"
- name: Build package
run: |
python -m build
python -m pip install ".[test,development,documentation]"
- name: Test with pytest
run: |
pytest
- name: Build
run: |
inv build
inv mkdocs

1
.gitignore vendored
View File

@@ -8,3 +8,4 @@ docs/mkdocs/site
tests/__pycache__
test-results.xml
bumble/transport/__pycache__
bumble/profiles/__pycache__

View File

@@ -11,6 +11,8 @@ Bluetooth Stack for Apps, Emulation, Test and Experimentation
<img src="docs/mkdocs/src/images/logo_framed.png" alt="drawing" width="200" height="200"/>
Bumble is a full-featured Bluetooth stack written entirely in Python. It supports most of the common Bluetooth Low Energy (BLE) and Bluetooth Classic (BR/EDR) protocols and profiles, including GAP, L2CAP, ATT, GATT, SMP, SDP, RFCOMM, HFP, HID and A2DP. The stack can be used with physical radios via HCI over USB, UART, or the Linux VHCI, as well as virtual radios, including the virtual Bluetooth support of the Android emulator.
## Documentation
Browse the pre-built [Online Documentation](https://google.github.io/bumble/),

View File

@@ -32,10 +32,10 @@ async def dump_gatt_db(peer, done):
# Discover all services
print(color('### Discovering Services and Characteristics', 'magenta'))
await peer.discover_services()
await peer.discover_characteristics()
for service in peer.services:
await service.discover_characteristics()
for characteristic in service.characteristics:
await peer.discover_descriptors(characteristic)
await characteristic.discover_descriptors()
print(color('=== Services ===', 'yellow'))
show_services(peer.services)
@@ -47,7 +47,7 @@ async def dump_gatt_db(peer, done):
for attribute in attributes:
print(attribute)
try:
value = await peer.read_value(attribute)
value = await attribute.read_value()
print(color(f'{value.hex()}', 'green'))
except ProtocolError as error:
print(color(error, 'red'))

View File

@@ -73,7 +73,7 @@ class GattlinkHubBridge(Device.Listener):
gattlink_service = services[0]
# Discover all the characteristics for the service
characteristics = await self.peer.discover_characteristics(service = gattlink_service)
characteristics = await gattlink_service.discover_characteristics()
print(color('=== Characteristics discovered', 'yellow'))
for characteristic in characteristics:
if characteristic.uuid == GG_GATTLINK_RX_CHARACTERISTIC_UUID:

View File

@@ -44,7 +44,7 @@ from bumble.att import (
# -----------------------------------------------------------------------------
class Delegate(PairingDelegate):
def __init__(self, connection, capability_string, prompt):
def __init__(self, mode, connection, capability_string, prompt):
super().__init__({
'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
@@ -53,6 +53,7 @@ class Delegate(PairingDelegate):
'none': PairingDelegate.NO_OUTPUT_NO_INPUT
}[capability_string.lower()])
self.mode = mode
self.peer = Peer(connection)
self.peer_name = None
self.prompt = prompt
@@ -64,7 +65,7 @@ class Delegate(PairingDelegate):
# Try to get the peer's name
if self.peer:
peer_name = await get_peer_name(self.peer)
peer_name = await get_peer_name(self.peer, self.mode)
self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
else:
self.peer_name = '[?]'
@@ -91,7 +92,7 @@ class Delegate(PairingDelegate):
# Accept silently
return True
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
await self.update_peer_name()
# Wait a bit to allow some of the log lines to print before we prompt
@@ -102,7 +103,7 @@ class Delegate(PairingDelegate):
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
print(color('###-----------------------------------', 'yellow'))
while True:
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:06}? ', 'yellow'))
response = await aioconsole.ainput(color(f'>>> Does the other device display {number:0{digits}}? ', 'yellow'))
response = response.lower().strip()
if response == 'yes':
return True
@@ -125,7 +126,7 @@ class Delegate(PairingDelegate):
except ValueError:
pass
async def display_number(self, number):
async def display_number(self, number, digits):
await self.update_peer_name()
# Wait a bit to allow some of the log lines to print before we prompt
@@ -134,19 +135,23 @@ class Delegate(PairingDelegate):
# Display a PIN code
print(color('###-----------------------------------', 'yellow'))
print(color(f'### Pairing with {self.peer_name}', 'yellow'))
print(color(f'### PIN: {number:06}', 'yellow'))
print(color(f'### PIN: {number:0{digits}}', 'yellow'))
print(color('###-----------------------------------', 'yellow'))
# -----------------------------------------------------------------------------
async def get_peer_name(peer):
services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
if not services:
return None
async def get_peer_name(peer, mode):
if mode == 'classic':
return await peer.request_name()
else:
# Try to get the peer name from GATT
services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
if not services:
return None
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
if values:
return values[0].decode('utf-8')
# -----------------------------------------------------------------------------
@@ -224,9 +229,22 @@ def on_pairing_failure(reason):
# -----------------------------------------------------------------------------
async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
async def pair(
mode,
sc,
mitm,
bond,
io,
prompt,
request,
print_keys,
keystore_file,
device_config,
hci_transport,
address_or_name
):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
# Create a device to manage the host
@@ -245,19 +263,25 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
# Expose a GATT characteristic that can be used to trigger pairing by
# responding with an authentication error when read
device.add_service(
Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=read_with_error, write=write_with_error)
)
]
if mode == 'le':
device.add_service(
Service(
'50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=read_with_error, write=write_with_error)
)
]
)
)
)
# Select LE or Classic
if mode == 'classic':
device.classic_enabled = True
device.le_enabled = False
# Get things going
await device.power_on()
@@ -267,7 +291,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
sc,
mitm,
bond,
Delegate(connection, io, prompt)
Delegate(mode, connection, io, prompt)
)
# Connect to a peer or wait for a connection
@@ -278,10 +302,14 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
if not request:
try:
await connection.pair()
if mode == 'le':
await connection.pair()
else:
await connection.authenticate()
return
except ProtocolError as error:
print(color(f'Pairing failed: {error}', 'red'))
return
except ProtocolError:
pass
else:
# Advertise so that peers can find us and connect
await device.start_advertising(auto_restart=True)
@@ -291,6 +319,7 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
# -----------------------------------------------------------------------------
@click.command()
@click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True)
@click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True)
@click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True)
@click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True)
@@ -300,11 +329,11 @@ async def pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, d
@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
@click.option('--keystore-file', help='File in which to store the pairing keys')
@click.argument('device-config')
@click.argument('transport')
@click.argument('hci_transport')
@click.argument('address-or-name', required=False)
def main(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name):
def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name):
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(pair(sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, transport, address_or_name))
asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name))
# -----------------------------------------------------------------------------

View File

@@ -20,7 +20,7 @@ import click
from colors import color
from bumble import hci
from bumble.transport import PacketReader
from bumble.transport.common import PacketReader
from bumble.helpers import PacketTracer

View File

@@ -682,11 +682,14 @@ class Attribute(EventEmitter):
def __init__(self, attribute_type, permissions, value = b''):
EventEmitter.__init__(self)
self.handle = 0
self.permissions = permissions
self.handle = 0
self.end_group_handle = 0
self.permissions = permissions
# Convert the type to a UUID
if type(attribute_type) is bytes:
# Convert the type to a UUID object if it isn't already
if type(attribute_type) is str:
self.type = UUID(attribute_type)
elif type(attribute_type) is bytes:
self.type = UUID.from_bytes(attribute_type)
else:
self.type = attribute_type
@@ -698,16 +701,13 @@ class Attribute(EventEmitter):
self.value = value
def read_value(self, connection):
if type(self.value) is bytes:
return self.value
if read := getattr(self.value, 'read', None):
try:
return read(connection)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
if read := getattr(self.value, 'read', None):
try:
return read(connection)
except ATT_Error as error:
raise ATT_Error(error_code=error.error_code, att_handle=self.handle)
else:
return bytes(self.value)
return self.value
def write_value(self, connection, value):
if write := getattr(self.value, 'write', None):

View File

@@ -227,3 +227,17 @@ def g2(u, v, x, y):
aes_cmac(bytes(reversed(u)) + bytes(reversed(v)) + bytes(reversed(y)), bytes(reversed(x)))[-4:],
byteorder='big'
)
# -----------------------------------------------------------------------------
def h6(w, key_id):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.10 Link key conversion function h6
'''
return aes_cmac(key_id, w)
# -----------------------------------------------------------------------------
def h7(salt, w):
'''
See Bluetooth spec, Vol 3, Part H - 2.2.11 Link key conversion function h7
'''
return aes_cmac(w, salt)

View File

@@ -137,6 +137,21 @@ class Peer:
def get_characteristics_by_uuid(self, uuid, service = None):
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(self, proxy_class):
return proxy_class.from_client(self.gatt_client)
async def discover_service_and_create_proxy(self, proxy_class):
# Discover the first matching service and its characteristics
services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID)
if services:
service = services[0]
await service.discover_characteristics()
return self.create_service_proxy(proxy_class)
# [Classic only]
async def request_name(self):
return await self.connection.request_remote_name()
def __str__(self):
return f'{self.connection.peer_address} as {self.connection.role_name}'
@@ -176,6 +191,7 @@ class Connection(CompositeEventEmitter):
self.transport = transport
self.peer_address = peer_address
self.peer_resolvable_address = peer_resolvable_address
self.peer_name = None # Classic only
self.role = role
self.parameters = parameters
self.encryption = 0
@@ -231,6 +247,10 @@ class Connection(CompositeEventEmitter):
supervision_timeout
)
# [Classic only]
async def request_remote_name(self):
return await self.device.request_remote_name(self)
def __str__(self):
return f'Connection(handle=0x{self.handle:04X}, role={self.role_name}, address={self.peer_address})'
@@ -245,39 +265,48 @@ class DeviceConfiguration:
self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.advertising_data = bytes(
AdvertisingData([(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))])
)
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
def load_from_dict(self, config):
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
self.le_enabled = config.get('le_enabled', self.le_enabled)
self.le_simultaneous_enabled = config.get('le_simultaneous_enabled', self.le_simultaneous_enabled)
self.classic_sc_enabled = config.get('classic_sc_enabled', self.classic_sc_enabled)
self.classic_ssp_enabled = config.get('classic_ssp_enabled', self.classic_ssp_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
else:
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data)
def load_from_file(self, filename):
with open(filename, 'r') as file:
config = json.load(file)
# Load simple properties
self.name = config.get('name', self.name)
self.address = Address(config.get('address', self.address))
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get('advertising_interval', self.advertising_interval_min)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
self.irk = bytes.fromhex(irk)
else:
# Construct an IRK from the address bytes
# NOTE: this is not secure, but will always give the same IRK for the same address
address_bytes = bytes(self.address)
self.irk = (address_bytes * 3)[:16]
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
self.advertising_data = bytes.fromhex(advertising_data)
self.load_from_dict(json.load(file))
# -----------------------------------------------------------------------------
# Decorators used with the following Device class
@@ -290,9 +319,19 @@ def with_connection_from_handle(function):
@functools.wraps(function)
def wrapper(self, connection_handle, *args, **kwargs):
if (connection := self.lookup_connection(connection_handle)) is None:
logger.warn(f'no connection found for handle 0x{connection_handle:04X}')
return
function(self, connection, *args, **kwargs)
raise ValueError('no connection for handle')
return function(self, connection, *args, **kwargs)
return wrapper
# Decorator that converts the first argument from a bluetooth address to a connection
def with_connection_from_address(function):
@functools.wraps(function)
def wrapper(self, address, *args, **kwargs):
for connection in self.connections.values():
if connection.peer_address == address:
return function(self, connection, *args, **kwargs)
raise ValueError('no connection for address')
return wrapper
@@ -368,7 +407,6 @@ class Device(CompositeEventEmitter):
self.connecting = False
self.disconnecting = False
self.connections = {} # Connections, by connection handle
self.le_enabled = True
self.classic_enabled = False
self.discoverable = False
self.connectable = False
@@ -388,6 +426,10 @@ class Device(CompositeEventEmitter):
self.advertising_interval_max = config.advertising_interval_max
self.keystore = keys.KeyStore.create_for_device(config)
self.irk = config.irk
self.le_enabled = config.le_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
# If a name is passed, override the name from the config
if name:
@@ -453,6 +495,12 @@ class Device(CompositeEventEmitter):
if connection := self.connections.get(connection_handle):
return connection
def find_connection_by_bd_addr(self, bd_addr, transport=None):
for connection in self.connections.values():
if connection.peer_address.get_bytes() == bd_addr.get_bytes():
if transport is None or connection.transport == transport:
return connection
def register_l2cap_server(self, psm, server):
self.l2cap_channel_manager.register_server(psm, server)
@@ -480,6 +528,11 @@ class Device(CompositeEventEmitter):
logger.debug(color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow'))
self.public_address = response.return_parameters.bd_addr
await self.send_command(HCI_Write_LE_Host_Support_Command(
le_supported_host = int(self.le_enabled),
simultaneous_le_host = int(self.le_simultaneous_enabled),
))
if self.le_enabled:
# Set the controller address
await self.send_command(HCI_LE_Set_Random_Address_Command(
@@ -517,7 +570,12 @@ class Device(CompositeEventEmitter):
HCI_Write_Class_Of_Device_Command(class_of_device = self.class_of_device)
)
await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command(simple_pairing_mode = 0x01)
HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled))
)
await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled))
)
# Let the SMP manager know about the address
@@ -792,8 +850,8 @@ class Device(CompositeEventEmitter):
async def disconnect(self, connection, reason):
# Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future()
self.on('disconnection', pending_disconnection.set_result)
self.on('disconnection_failure', pending_disconnection.set_exception)
connection.on('disconnection', pending_disconnection.set_result)
connection.on('disconnection_failure', pending_disconnection.set_exception)
# Request a disconnection
result = await self.send_command(HCI_Disconnect_Command(connection_handle = connection.handle, reason = reason))
@@ -806,8 +864,8 @@ class Device(CompositeEventEmitter):
self.disconnecting = True
return await pending_disconnection
finally:
self.remove_listener('disconnection', pending_disconnection.set_result)
self.remove_listener('disconnection_failure', pending_disconnection.set_exception)
connection.remove_listener('disconnection', pending_disconnection.set_result)
connection.remove_listener('disconnection_failure', pending_disconnection.set_exception)
self.disconnecting = False
async def update_connection_parameters(
@@ -936,13 +994,13 @@ class Device(CompositeEventEmitter):
# Set up event handlers
pending_authentication = asyncio.get_running_loop().create_future()
def on_authentication_complete():
def on_authentication():
pending_authentication.set_result(None)
def on_authentication_failure(error):
pending_authentication.set_exception(error)
def on_authentication_failure(error_code):
pending_authentication.set_exception(HCI_Error(error_code))
connection.on('connection_authentication_complete', on_authentication_complete)
connection.on('connection_authentication', on_authentication)
connection.on('connection_authentication_failure', on_authentication_failure)
# Request the authentication
@@ -957,7 +1015,7 @@ class Device(CompositeEventEmitter):
# Wait for the authentication to complete
await pending_authentication
finally:
connection.remove_listener('connection_authentication_complete', on_authentication_complete)
connection.remove_listener('connection_authentication', on_authentication)
connection.remove_listener('connection_authentication_failure', on_authentication_failure)
async def encrypt(self, connection):
@@ -1028,6 +1086,40 @@ class Device(CompositeEventEmitter):
connection.remove_listener('connection_encryption_change', on_encryption_change)
connection.remove_listener('connection_encryption_failure', on_encryption_failure)
# [Classic only]
async def request_remote_name(self, connection):
# Set up event handlers
pending_name = asyncio.get_running_loop().create_future()
def on_remote_name():
pending_name.set_result(connection.peer_name)
def on_remote_name_failure(error_code):
pending_name.set_exception(HCI_Error(error_code))
connection.on('remote_name', on_remote_name)
connection.on('remote_name_failure', on_remote_name_failure)
try:
result = await self.send_command(
HCI_Remote_Name_Request_Command(
bd_addr = connection.peer_address,
page_scan_repetition_mode = HCI_Remote_Name_Request_Command.R0, # TODO investigate other options
reserved = 0,
clock_offset = 0 # TODO investigate non-0 values
)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warn(f'HCI_Set_Connection_Encryption_Command failed: {HCI_Constant.error_name(result.status)}')
raise HCI_Error(result.status)
# Wait for the result
return await pending_name
finally:
connection.remove_listener('remote_name', on_remote_name)
connection.remove_listener('remote_name_failure', on_remote_name_failure)
# [Classic only]
@host_event_handler
def on_link_key(self, bd_addr, link_key, key_type):
@@ -1123,14 +1215,15 @@ class Device(CompositeEventEmitter):
asyncio.create_task(self.start_advertising(auto_restart=self.auto_restart_advertising))
@host_event_handler
def on_disconnection_failure(self, error_code):
@with_connection_from_handle
def on_disconnection_failure(self, connection, error_code):
logger.debug(f'*** Disconnection failed: {error_code}')
error = ConnectionError(
error_code,
'hci',
HCI_Constant.error_name(error_code)
)
self.emit('disconnection_failure', error)
connection.emit('disconnection_failure', error)
@host_event_handler
@AsyncRunner.run_in_task()
@@ -1141,10 +1234,10 @@ class Device(CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_authentication_complete(self, connection):
logger.debug(f'*** Connection Authentication Complete: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
def on_connection_authentication(self, connection):
logger.debug(f'*** Connection Authentication: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}')
connection.authenticated = True
connection.emit('connection_authentication_complete')
connection.emit('connection_authentication')
@host_event_handler
@with_connection_from_handle
@@ -1152,6 +1245,132 @@ class Device(CompositeEventEmitter):
logger.debug(f'*** Connection Authentication Failure: [0x{connection.handle:04X}] {connection.peer_address} as {connection.role_name}, error={error}')
connection.emit('connection_authentication_failure', error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
# Map the SMP IO capability to a Classic IO capability
io_capability = {
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY
}.get(pairing_config.delegate.io_capability)
if io_capability is None:
logger.warning(f'cannot map IO capability ({pairing_config.delegate.io_capability}')
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# Compute the authentication requirements
authentication_requirements = (
# No Bonding
(
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS
),
# General Bonding
(
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS
)
)[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
# Respond
self.host.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr = connection.peer_address,
io_capability = io_capability,
oob_data_present = 0x00, # Not present
authentication_requirements = authentication_requirements
)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_confirmation_request(self, connection, code):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_confirm = pairing_config.delegate.io_capability not in {
smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
smp.SMP_DISPLAY_ONLY_IO_CAPABILITY
}
# Respond
if can_confirm and pairing_config.delegate:
async def compare_numbers():
numbers_match = await pairing_config.delegate.compare_numbers(code, digits=6)
if numbers_match:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(compare_numbers())
else:
self.host.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_user_passkey_request(self, connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
can_input = pairing_config.delegate.io_capability in {
smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY,
smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
}
# Respond
if can_input and pairing_config.delegate:
async def get_number():
number = await pairing_config.delegate.get_number()
if number is not None:
self.host.send_command_sync(
HCI_User_Passkey_Request_Reply_Command(
bd_addr = connection.peer_address,
numeric_value = number)
)
else:
self.host.send_command_sync(
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
asyncio.create_task(get_number())
else:
self.host.send_command_sync(
HCI_User_Passkey_Request_Negative_Reply_Command(bd_addr=connection.peer_address)
)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_remote_name(self, connection, remote_name):
# Try to decode the name
try:
connection.peer_name = remote_name.decode('utf-8')
connection.emit('remote_name')
except UnicodeDecodeError as error:
logger.warning('peer name is not valid UTF-8')
connection.emit('remote_name_failure', error)
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_remote_name_failure(self, connection, error):
connection.emit('remote_name_failure', error)
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption):

View File

@@ -22,6 +22,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import types
import logging
from colors import color
@@ -53,13 +55,13 @@ GATT_NEXT_DST_CHANGE_SERVICE = UUID.from_16_bits(0x1807, 'Next DS
GATT_GLUCOSE_SERVICE = UUID.from_16_bits(0x1808, 'Glucose')
GATT_HEALTH_THERMOMETER_SERVICE = UUID.from_16_bits(0x1809, 'Health Thermometer')
GATT_DEVICE_INFORMATION_SERVICE = UUID.from_16_bits(0x180A, 'Device Information')
GATT_DEVICE_HEART_RATE_SERVICE = UUID.from_16_bits(0x180D, 'Heart Rate')
GATT_PHONE_ALTERT_STATUS_SERVICE = UUID.from_16_bits(0x180E, 'Phone Alert Status')
GATT_DEVICE_BATTERY_SERVICE = UUID.from_16_bits(0x180F, 'Battery')
GATT_HEART_RATE_SERVICE = UUID.from_16_bits(0x180D, 'Heart Rate')
GATT_PHONE_ALERT_STATUS_SERVICE = UUID.from_16_bits(0x180E, 'Phone Alert Status')
GATT_BATTERY_SERVICE = UUID.from_16_bits(0x180F, 'Battery')
GATT_BLOOD_PRESSURE_SERVICE = UUID.from_16_bits(0x1810, 'Blood Pressure')
GATT_ALTERT_NOTIFICATION_SERVICE = UUID.from_16_bits(0x1811, 'Alert Notification')
GATT_DEVICE_HUMAN_INTERFACE_DEVICE_SERVICE = UUID.from_16_bits(0x1812, 'Human Interface Device')
GATT_DEVICE_SCAN_PARAMETERS_SERVICE = UUID.from_16_bits(0x1813, 'Scan Parameters')
GATT_ALERT_NOTIFICATION_SERVICE = UUID.from_16_bits(0x1811, 'Alert Notification')
GATT_HUMAN_INTERFACE_DEVICE_SERVICE = UUID.from_16_bits(0x1812, 'Human Interface Device')
GATT_SCAN_PARAMETERS_SERVICE = UUID.from_16_bits(0x1813, 'Scan Parameters')
GATT_RUNNING_SPEED_AND_CADENCE_SERVICE = UUID.from_16_bits(0x1814, 'Running Speed and Cadence')
GATT_AUTOMATION_IO_SERVICE = UUID.from_16_bits(0x1815, 'Automation IO')
GATT_CYCLING_SPEED_AND_CADENCE_SERVICE = UUID.from_16_bits(0x1816, 'Cycling Speed and Cadence')
@@ -119,7 +121,7 @@ GATT_ENVIRONMENTAL_SENSING_CONFIGURATION_DESCRIPTOR = UUID.from_16_bits(0x290B,
GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR = UUID.from_16_bits(0x290C, 'Environmental Sensing Measurement')
GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
GATT_TIME_TRIGGER_DESCRIPTOR = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
GATT_COMPLETE_BE_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
# Device Information Service
GATT_SYSTEM_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A23, 'System ID')
@@ -132,27 +134,34 @@ GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC = UUID.from_16_bits(0x2A2
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2A2A, 'IEEE 11073-20601 Regulatory Certification Data List')
GATT_PNP_ID_CHARACTERISTIC = UUID.from_16_bits(0x2A50, 'PnP ID')
# Human Interface Device
# Human Interface Device Service
GATT_HID_INFORMATION_CHARACTERISTIC = UUID.from_16_bits(0x2A4A, 'HID Information')
GATT_REPORT_MAP_CHARACTERISTIC = UUID.from_16_bits(0x2A4B, 'Report Map')
GATT_HID_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A4C, 'HID Control Point')
GATT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A4D, 'Report')
GATT_PROTOCOL_MODE_CHARACTERISTIC = UUID.from_16_bits(0x2A4E, 'Protocol Mode')
# Heart Rate Service
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC = UUID.from_16_bits(0x2A37, 'Heart Rate Measurement')
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2A38, 'Body Sensor Location')
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart Rate Control Point')
# Battery Service
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
# Misc
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
GATT_RECONNECTION_ADDRESS_CHARACTERISTIC = UUID.from_16_bits(0x2A03, 'Reconnection Address')
GATT_PERIPHERAL_PREFERRREED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
GATT_RECONNECTION_ADDRESS_CHARACTERISTIC = UUID.from_16_bits(0x2A03, 'Reconnection Address')
GATT_PERIPHERAL_PREFERRED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
GATT_SERVICE_CHANGED_CHARACTERISTIC = UUID.from_16_bits(0x2A05, 'Service Changed')
GATT_ALERT_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A06, 'Alert Level')
GATT_TX_POWER_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A07, 'Tx Power Level')
GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
GATT_CURRENT_TIME_CHARACTERISTIC = UUID.from_16_bits(0x2A2B, 'Current Time')
GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
# -----------------------------------------------------------------------------
@@ -189,13 +198,24 @@ class Service(Attribute):
self.uuid = uuid
self.included_services = []
self.characteristics = characteristics[:]
self.end_group_handle = 0
self.primary = primary
def __str__(self):
return f'Service(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}){"" if self.primary else "*"}'
# -----------------------------------------------------------------------------
class TemplateService(Service):
'''
Convenience abstract class that can be used by profile-specific subclasses that want
to expose their UUID as a class property
'''
UUID = None
def __init__(self, characteristics, primary=True):
super().__init__(self.UUID, characteristics, primary)
# -----------------------------------------------------------------------------
class Characteristic(Attribute):
'''
@@ -227,56 +247,34 @@ class Characteristic(Attribute):
def property_name(property):
return Characteristic.PROPERTY_NAMES.get(property, '')
@staticmethod
def properties_as_string(properties):
return ','.join([
Characteristic.property_name(p) for p in Characteristic.PROPERTY_NAMES.keys()
if properties & p
])
def __init__(self, uuid, properties, permissions, value = b'', descriptors = []):
# Convert the uuid to a UUID object if it isn't already
if type(uuid) is str:
uuid = UUID(uuid)
super().__init__(uuid, permissions, value)
self.uuid = uuid
self.properties = properties
self._descriptors = descriptors
self._descriptors_discovered = False
self.end_group_handle = 0
self.attach_descriptors()
def attach_descriptors(self):
""" Let all the descriptors know they are attached to this characteristic """
for descriptor in self._descriptors:
descriptor.characteristic = self
def add_descriptor(self, descriptor):
descriptor.characteristic = self
self.descriptors.append(descriptor)
self.uuid = self.type
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors:
if descriptor.uuid == descriptor_type:
return descriptor
@property
def descriptors(self):
return self._descriptors
@descriptors.setter
def descriptors(self, value):
self._descriptors = value
self._descriptors_discovered = True
self.attach_descriptors()
@property
def descriptors_discovered(self):
return self._descriptors_discovered
def get_properties_as_string(self):
return ','.join([self.property_name(p) for p in self.PROPERTY_NAMES.keys() if self.properties & p])
def __str__(self):
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={self.get_properties_as_string()})'
return f'Characteristic(handle=0x{self.handle:04X}, end=0x{self.end_group_handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
# -----------------------------------------------------------------------------
class CharacteristicValue:
'''
Characteristic value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
'''
def __init__(self, read=None, write=None):
self._read = read
self._write = write
@@ -289,20 +287,148 @@ class CharacteristicValue:
self._write(connection, value)
# -----------------------------------------------------------------------------
class CharacteristicAdapter:
'''
An adapter that can adapt any object with `read_value` and `write_value`
methods (like Characteristic and CharacteristicProxy objects) by wrapping
those methods with ones that return/accept encoded/decoded values.
Objects with async methods are considered proxies, so the adaptation is one
where the return value of `read_value` is decoded and the value passed to
`write_value` is encoded. Other objects are considered local characteristics
so the adaptation is one where the return value of `read_value` is encoded
and the value passed to `write_value` is decoded.
If the characteristic has a `subscribe` method, it is wrapped with one where
the values are decoded before being passed to the subscriber.
'''
def __init__(self, characteristic):
self.wrapped_characteristic = characteristic
if (
asyncio.iscoroutinefunction(characteristic.read_value) and
asyncio.iscoroutinefunction(characteristic.write_value)
):
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
else:
self.read_value = self.read_encoded_value
self.write_value = self.write_encoded_value
if hasattr(self.wrapped_characteristic, 'subscribe'):
self.subscribe = self.wrapped_subscribe
def __getattr__(self, name):
return getattr(self.wrapped_characteristic, name)
def read_encoded_value(self, connection):
return self.encode_value(self.wrapped_characteristic.read_value(connection))
def write_encoded_value(self, connection, value):
return self.wrapped_characteristic.write_value(connection, self.decode_value(value))
async def read_decoded_value(self):
return self.decode_value(await self.wrapped_characteristic.read_value())
async def write_decoded_value(self, value):
return await self.wrapped_characteristic.write_value(self.encode_value(value))
def encode_value(self, value):
return value
def decode_value(self, value):
return value
def wrapped_subscribe(self, subscriber=None):
return self.wrapped_characteristic.subscribe(
None if subscriber is None else lambda value: subscriber(self.decode_value(value))
)
# -----------------------------------------------------------------------------
class DelegatedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts bytes values using an encode and a decode function.
'''
def __init__(self, characteristic, encode=None, decode=None):
super().__init__(characteristic)
self.encode = encode
self.decode = decode
def encode_value(self, value):
return self.encode(value) if self.encode else value
def decode_value(self, value):
return self.decode(value) if self.decode else value
# -----------------------------------------------------------------------------
class PackedCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
For formats with a single value, the adapted `read_value` and `write_value`
methods return/accept single values. For formats with multiple values,
they return/accept a tuple with the same number of elements as is required for
the format.
'''
def __init__(self, characteristic, format):
super().__init__(characteristic)
self.struct = struct.Struct(format)
def pack(self, *values):
return self.struct.pack(*values)
def unpack(self, buffer):
return self.struct.unpack(buffer)
def encode_value(self, value):
return self.pack(*value if type(value) is tuple else (value,))
def decode_value(self, value):
unpacked = self.unpack(value)
return unpacked[0] if len(unpacked) == 1 else unpacked
# -----------------------------------------------------------------------------
class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
'''
Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept aa dictionary which
is packed/unpacked according to format, with the arguments extracted from the dictionary
by key, in the same order as they occur in the `keys` parameter.
'''
def __init__(self, characteristic, format, keys):
super().__init__(characteristic, format)
self.keys = keys
def pack(self, values):
return super().pack(*(values[key] for key in self.keys))
def unpack(self, buffer):
return dict(zip(self.keys, super().unpack(buffer)))
# -----------------------------------------------------------------------------
class UTF8CharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts strings to/from bytes using UTF-8 encoding
'''
def encode_value(self, value):
return value.encode('utf-8')
def decode_value(self, value):
return value.decode('utf-8')
# -----------------------------------------------------------------------------
class Descriptor(Attribute):
'''
See Vol 3, Part G - 3.3.3 Characteristic Descriptor Declarations
'''
def __init__(self, uuid, permissions, value = b''):
# Convert the uuid to a UUID object if it isn't already
if type(uuid) is str:
uuid = UUID(uuid)
super().__init__(uuid, permissions, value)
self.uuid = uuid
self.characteristic = None
def __init__(self, descriptor_type, permissions, value = b''):
super().__init__(descriptor_type, permissions, value)
def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, uuid={self.uuid}, value={self.read_value(None).hex()})'
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type}, value={self.read_value(None).hex()})'

View File

@@ -35,10 +35,9 @@ from .gatt import (
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_REQUEST_TIMEOUT,
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
Service,
Characteristic,
Descriptor
Characteristic
)
# -----------------------------------------------------------------------------
@@ -47,6 +46,91 @@ from .gatt import (
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------
class AttributeProxy(EventEmitter):
def __init__(self, client, handle, end_group_handle, attribute_type):
EventEmitter.__init__(self)
self.client = client
self.handle = handle
self.end_group_handle = end_group_handle
self.type = attribute_type
async def read_value(self, no_long_read=False):
return await self.client.read_value(self.handle, no_long_read)
async def write_value(self, value, with_response=False):
return await self.client.write_value(self.handle, value, with_response)
def __str__(self):
return f'Attribute(handle=0x{self.handle:04X}, type={self.uuid})'
class ServiceProxy(AttributeProxy):
@staticmethod
def from_client(cls, client, service_uuid):
# The service and its characteristics are considered to have already been discovered
services = client.get_services_by_uuid(service_uuid)
service = services[0] if services else None
return cls(service) if service else None
def __init__(self, client, handle, end_group_handle, uuid, primary=True):
attribute_type = GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE if primary else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE
super().__init__(client, handle, end_group_handle, attribute_type)
self.uuid = uuid
self.characteristics = []
async def discover_characteristics(self, uuids=[]):
return await self.client.discover_characteristics(uuids, self)
def get_characteristics_by_uuid(self, uuid):
return self.client.get_characteristics_by_uuid(uuid, self)
def __str__(self):
return f'Service(handle=0x{self.handle:04X}, uuid={self.uuid})'
class CharacteristicProxy(AttributeProxy):
def __init__(self, client, handle, end_group_handle, uuid, properties):
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = properties
self.descriptors = []
self.descriptors_discovered = False
def get_descriptor(self, descriptor_type):
for descriptor in self.descriptors:
if descriptor.type == descriptor_type:
return descriptor
async def discover_descriptors(self):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None):
return await self.client.subscribe(self, subscriber)
def __str__(self):
return f'Characteristic(handle=0x{self.handle:04X}, uuid={self.uuid}, properties={Characteristic.properties_as_string(self.properties)})'
class DescriptorProxy(AttributeProxy):
def __init__(self, client, handle, descriptor_type):
super().__init__(client, handle, 0, descriptor_type)
def __str__(self):
return f'Descriptor(handle=0x{self.handle:04X}, type={self.type})'
class ProfileServiceProxy:
'''
Base class for profile-specific service proxies
'''
@classmethod
def from_client(cls, client):
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
# -----------------------------------------------------------------------------
# GATT Client
# -----------------------------------------------------------------------------
@@ -173,10 +257,14 @@ class Client:
logger.warning(f'bogus handle values: {attribute_handle} {end_group_handle}')
return
# Create a primary service object
service = Service(UUID.from_bytes(attribute_value), [], True)
service.handle = attribute_handle
service.end_group_handle = end_group_handle
# Create a service proxy for this service
service = ServiceProxy(
self,
attribute_handle,
end_group_handle,
UUID.from_bytes(attribute_value),
True
)
# Filter out returned services based on the given uuids list
if (not uuids) or (service.uuid in uuids):
@@ -233,10 +321,8 @@ class Client:
logger.warning(f'bogus handle values: {attribute_handle} {end_group_handle}')
return
# Create a primary service object
service = Service(uuid, [], True)
service.handle = attribute_handle
service.end_group_handle = end_group_handle
# Create a service proxy for this service
service = ServiceProxy(self, attribute_handle, end_group_handle, uuid, True)
# Add the service to the peer's service list
services.append(service)
@@ -314,8 +400,7 @@ class Client:
properties, handle = struct.unpack_from('<BH', attribute_value)
characteristic_uuid = UUID.from_bytes(attribute_value[3:])
characteristic = Characteristic(characteristic_uuid, properties, 0)
characteristic.handle = handle
characteristic = CharacteristicProxy(self, handle, 0, characteristic_uuid, properties)
# Set the previous characteristic's end handle
if characteristics:
@@ -382,8 +467,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
descriptor = Descriptor(UUID.from_bytes(attribute_uuid), 0)
descriptor.handle = attribute_handle
descriptor = DescriptorProxy(self, attribute_handle, UUID.from_bytes(attribute_uuid))
descriptors.append(descriptor)
# TODO: read descriptor value
@@ -427,8 +511,7 @@ class Client:
logger.warning(f'bogus handle value: {attribute_handle}')
return []
attribute = Attribute(attribute_uuid, 0)
attribute.handle = attribute_handle
attribute = AttributeProxy(self, attribute_handle, 0, UUID.from_bytes(attribute_uuid))
attributes.append(attribute)
# Move on to the next attributes

View File

@@ -79,6 +79,7 @@ HCI_VERSION_BLUETOOTH_CORE_4_2 = 8
HCI_VERSION_BLUETOOTH_CORE_5_0 = 9
HCI_VERSION_BLUETOOTH_CORE_5_1 = 10
HCI_VERSION_BLUETOOTH_CORE_5_2 = 11
HCI_VERSION_BLUETOOTH_CORE_5_3 = 12
# HCI Packet types
HCI_COMMAND_PACKET = 0x01
@@ -258,6 +259,9 @@ HCI_READ_REMOTE_VERSION_INFORMATION_COMMAND = hci_command_
HCI_READ_CLOCK_OFFSET_COMMAND = hci_command_op_code(0x01, 0x001F)
HCI_IO_CAPABILITY_REQUEST_REPLY_COMMAND = hci_command_op_code(0x01, 0x002B)
HCI_USER_CONFIRMATION_REQUEST_REPLY_COMMAND = hci_command_op_code(0x01, 0x002C)
HCI_USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY_COMMAND = hci_command_op_code(0x01, 0x002D)
HCI_USER_PASSKEY_REQUEST_REPLY_COMMAND = hci_command_op_code(0x01, 0x002E)
HCI_USER_PASSKEY_REQUEST_NEGATIVE_REPLY_COMMAND = hci_command_op_code(0x01, 0x002F)
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND = hci_command_op_code(0x01, 0x003D)
HCI_SNIFF_MODE_COMMAND = hci_command_op_code(0x02, 0x0003)
HCI_EXIT_SNIFF_MODE_COMMAND = hci_command_op_code(0x02, 0x0004)
@@ -407,6 +411,9 @@ HCI_COMMAND_NAMES = {
HCI_READ_CLOCK_OFFSET_COMMAND: 'HCI_READ_CLOCK_OFFSET_COMMAND',
HCI_IO_CAPABILITY_REQUEST_REPLY_COMMAND: 'HCI_IO_CAPABILITY_REQUEST_REPLY_COMMAND',
HCI_USER_CONFIRMATION_REQUEST_REPLY_COMMAND: 'HCI_USER_CONFIRMATION_REQUEST_REPLY_COMMAND',
HCI_USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY_COMMAND: 'HCI_USER_CONFIRMATION_REQUEST_NEGATIVE_REPLY_COMMAND',
HCI_USER_PASSKEY_REQUEST_REPLY_COMMAND: 'HCI_USER_PASSKEY_REQUEST_REPLY_COMMAND',
HCI_USER_PASSKEY_REQUEST_NEGATIVE_REPLY_COMMAND: 'HCI_USER_PASSKEY_REQUEST_NEGATIVE_REPLY_COMMAND',
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND: 'HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND',
HCI_SNIFF_MODE_COMMAND: 'HCI_SNIFF_MODE_COMMAND',
HCI_EXIT_SNIFF_MODE_COMMAND: 'HCI_EXIT_SNIFF_MODE_COMMAND',
@@ -683,20 +690,20 @@ HCI_IO_CAPABILITY_NAMES = {
}
# Authentication Requirements
HCI_MITM_NOT_REQUIRED_NO_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS = 0x00
HCI_MITM_REQUIRED_NO_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS = 0x01
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS = 0x02
HCI_MITM_REQUIRED_DEDICATED_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS = 0x03
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS = 0x04
HCI_MITM_REQUIRED_GENERAL_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS = 0x05
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS = 0x00
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS = 0x01
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS = 0x02
HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS = 0x03
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS = 0x04
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS = 0x05
HCI_AUTHENTICATION_REQUIREMENTS_NAMES = {
HCI_MITM_NOT_REQUIRED_NO_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_NO_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_NO_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_NO_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_DEDICATED_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_DEDICATED_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_NUMERIC_COMPARISON_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_GENERAL_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_GENERAL_BONDING_USE_IO_CAPABILITIES_AUTHENTICATION_REQUIREMENTS'
HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_DEDICATED_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS',
HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS: 'HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS'
}
# Link Key Types
@@ -723,7 +730,7 @@ HCI_LINK_TYPE_NAMES = {
}
# Address types
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x0
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
@@ -1371,6 +1378,9 @@ class HCI_Remote_Name_Request_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.19 Remote Name Request Command
'''
R0 = 0x00
R1 = 0x01
R2 = 0x02
# -----------------------------------------------------------------------------
@@ -1449,6 +1459,55 @@ class HCI_User_Confirmation_Request_Reply_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address)
]
)
class HCI_User_Confirmation_Request_Negative_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.31 User Confirmation Request Negative Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address),
('numeric_value', 4)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address)
]
)
class HCI_User_Passkey_Request_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.32 User Passkey Request Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
('bd_addr', Address.parse_address)
],
return_parameters_fields=[
('status', STATUS_SPEC),
('bd_addr', Address.parse_address)
]
)
class HCI_User_Passkey_Request_Negative_Reply_Command(HCI_Command):
'''
See Bluetooth spec @ 7.1.33 User Passkey Request Negative Reply Command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command([
('connection_handle', 2),
@@ -3367,6 +3426,16 @@ class HCI_User_Confirmation_Request_Event(HCI_Event):
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('bd_addr', Address.parse_address)
])
class HCI_User_Passkey_Request_Event(HCI_Event):
'''
See Bluetooth spec @ 7.7.43 User Passkey Request Event
'''
# -----------------------------------------------------------------------------
@HCI_Event.event([
('status', STATUS_SPEC),

View File

@@ -85,6 +85,7 @@ class Host(EventEmitter):
self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
# Connect to the source and sink if specified
if controller_source:
@@ -363,7 +364,7 @@ class Host(EventEmitter):
logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}')
# Notify the client
self.emit('connection_failure', event.status)
self.emit('connection_failure', event.connection_handle, event.status)
def on_hci_disconnection_complete_event(self, event):
# Find the connection
@@ -495,7 +496,7 @@ class Host(EventEmitter):
def on_hci_authentication_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
self.emit('connection_authentication_complete', event.connection_handle)
self.emit('connection_authentication', event.connection_handle)
else:
self.emit('connection_authentication_failure', event.connection_handle, event.status)
@@ -560,26 +561,16 @@ class Host(EventEmitter):
asyncio.create_task(send_link_key())
def on_hci_io_capability_request_event(self, event):
# For now, just return NoInputNoOutput and no MITM
# TODO: delegate the decision
self.send_command_sync(
HCI_IO_Capability_Request_Reply_Command(
bd_addr = event.bd_addr,
io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
oob_data_present = 0x00,
authentication_requirements = 0x00 # 0x02 # FIXME: testing only
)
)
self.emit('authentication_io_capability_request', event.bd_addr)
def on_hci_io_capability_response_event(self, event):
pass
def on_hci_user_confirmation_request_event(self, event):
# For now, just confirm everything
# TODO: delegate the decision
self.send_command_sync(
HCI_User_Confirmation_Request_Reply_Command(bd_addr = event.bd_addr)
)
self.emit('authentication_user_confirmation_request', event.bd_addr, event.numeric_value)
def on_hci_user_passkey_request_event(self, event):
self.emit('authentication_user_passkey_request', event.bd_addr)
def on_hci_inquiry_complete_event(self, event):
self.emit('inquiry_complete')
@@ -602,3 +593,9 @@ class Host(EventEmitter):
event.extended_inquiry_response,
event.rssi
)
def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
self.emit('remote_name', event.bd_addr, event.remote_name)

View File

@@ -243,7 +243,7 @@ class L2CAP_Control_Frame:
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('reason', {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.map_reason(x)}),
('reason', {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)}),
('data', '*')
])
class L2CAP_Command_Reject(L2CAP_Control_Frame):
@@ -262,7 +262,7 @@ class L2CAP_Command_Reject(L2CAP_Control_Frame):
}
@staticmethod
def map_reason(reason):
def reason_name(reason):
return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason)
@@ -330,7 +330,7 @@ class L2CAP_Configure_Request(L2CAP_Control_Frame):
@L2CAP_Control_Frame.subclass([
('source_cid', 2),
('flags', 2),
('result', {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.map_result(x)}),
('result', {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)}),
('options', '*')
])
class L2CAP_Configure_Response(L2CAP_Control_Frame):
@@ -355,7 +355,7 @@ class L2CAP_Configure_Response(L2CAP_Control_Frame):
}
@staticmethod
def map_result(result):
def result_name(result):
return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result)
@@ -403,31 +403,49 @@ class L2CAP_Echo_Response(L2CAP_Control_Frame):
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('info_type', 2)
('info_type', {'size': 2, 'mapper': lambda x: L2CAP_Information_Request.info_type_name(x)})
])
class L2CAP_Information_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST
'''
SUCCESS = 0x00
NOT_SUPPORTED = 0x01
CONNECTIONLESS_MTU = 0x0001
EXTENDED_FEATURES_SUPPORTED = 0x0002
FIXED_CHANNELS_SUPPORTED = 0x0003
INFO_TYPE_NAMES = {
CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED'
}
@staticmethod
def info_type_name(info_type):
return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type)
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('info_type', 2),
('result', 2),
('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}),
('result', {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)}),
('data', '*')
])
class L2CAP_Information_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE
'''
SUCCESS = 0x00
NOT_SUPPORTED = 0x01
RESULT_NAMES = {
SUCCESS: 'SUCCESS',
NOT_SUPPORTED: 'NOT_SUPPORTED'
}
@staticmethod
def result_name(result):
return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result)
# -----------------------------------------------------------------------------
@@ -473,7 +491,7 @@ class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
('mtu', 2),
('mps', 2),
('initial_credits', 2),
('result', {'size': 2, 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.map_result(x)})
('result', {'size': 2, 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name(x)})
])
class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
'''
@@ -505,7 +523,7 @@ class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
}
@staticmethod
def map_result(result):
def result_name(result):
return name_or_number(L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_RESULT_NAMES, result)
@@ -980,13 +998,13 @@ class ChannelManager:
def on_l2cap_information_request(self, connection, cid, request):
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
result = L2CAP_Information_Request.SUCCESS
result = L2CAP_Information_Response.SUCCESS
data = struct.pack('<H', 1024) # TODO: don't use a fixed value
elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
result = L2CAP_Information_Request.SUCCESS
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('00000000') # TODO: don't use a fixed value
elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
result = L2CAP_Information_Request.SUCCESS
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('FFFFFFFFFFFFFFFF') # TODO: don't use a fixed value
else:
result = L2CAP_Information_Request.NO_SUPPORTED

View File

@@ -0,0 +1,13 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,61 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
GATT_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
PackedCharacteristicAdapter
)
# -----------------------------------------------------------------------------
class BatteryService(TemplateService):
UUID = GATT_BATTERY_SERVICE
BATTERY_LEVEL_FORMAT = 'B'
def __init__(self, read_battery_level):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level)
),
format=BatteryService.BATTERY_LEVEL_FORMAT
)
super().__init__([self.battery_level_characteristic])
# -----------------------------------------------------------------------------
class BatteryServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = BatteryService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_BATTERY_LEVEL_CHARACTERISTIC):
self.battery_level = PackedCharacteristicAdapter(
characteristics[0],
format=BatteryService.BATTERY_LEVEL_FORMAT
)
else:
self.battery_level = None

View File

@@ -0,0 +1,135 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_MODEL_NUMBER_STRING_CHARACTERISTIC,
GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC,
GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC,
GATT_SYSTEM_ID_CHARACTERISTIC,
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter
)
# -----------------------------------------------------------------------------
class DeviceInformationService(TemplateService):
UUID = GATT_DEVICE_INFORMATION_SERVICE
@staticmethod
def pack_system_id(oui, manufacturer_id):
return struct.pack('<Q', oui << 40 | manufacturer_id)
@staticmethod
def unpack_system_id(buffer):
system_id = struct.unpack('<Q', buffer)[0]
return (system_id >> 40, system_id & 0xFFFFFFFFFF)
def __init__(
self,
manufacturer_name: str = None,
model_number: str = None,
serial_number: str = None,
hardware_revision: str = None,
firmware_revision: str = None,
software_revision: str = None,
system_id: Tuple[int, int] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: bytes = None
# TODO: pnp_id
):
characteristics = [
Characteristic(
uuid,
Characteristic.READ,
Characteristic.READABLE,
field
)
for (field, uuid) in (
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
(model_number, GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
(serial_number, GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC),
(hardware_revision, GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC),
(firmware_revision, GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC),
(software_revision, GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC)
)
if field is not None
]
if system_id is not None:
characteristics.append(Characteristic(
GATT_SYSTEM_ID_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
self.pack_system_id(*system_id)
))
if ieee_regulatory_certification_data_list is not None:
characteristics.append(Characteristic(
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
ieee_regulatory_certification_data_list
))
super().__init__(characteristics)
# -----------------------------------------------------------------------------
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
for (field, uuid) in (
('manufacturer_name', GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
('model_number', GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
('serial_number', GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC),
('hardware_revision', GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC),
('firmware_revision', GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC),
('software_revision', GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC)
):
if characteristics := service_proxy.get_characteristics_by_uuid(uuid):
characteristic = UTF8CharacteristicAdapter(characteristics[0])
else:
characteristic = None
self.__setattr__(field, characteristic)
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_SYSTEM_ID_CHARACTERISTIC):
self.system_id = DelegatedCharacteristicAdapter(
characteristics[0],
encode=lambda v: DeviceInformationService.pack_system_id(*v),
decode=DeviceInformationService.unpack_system_id
)
else:
self.system_id = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC):
self.ieee_regulatory_certification_data_list = characteristics[0]
else:
self.ieee_regulatory_certification_data_list = None

View File

@@ -0,0 +1,221 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from enum import IntEnum
import struct
from ..gatt_client import ProfileServiceProxy
from ..att import ATT_Error
from ..gatt import (
GATT_HEART_RATE_SERVICE,
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
TemplateService,
Characteristic,
CharacteristicValue,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter
)
# -----------------------------------------------------------------------------
class HeartRateService(TemplateService):
UUID = GATT_HEART_RATE_SERVICE
HEART_RATE_CONTROL_POINT_FORMAT = 'B'
CONTROL_POINT_NOT_SUPPORTED = 0x80
RESET_ENERGY_EXPENDED = 0x01
class BodySensorLocation(IntEnum):
OTHER = 0,
CHEST = 1,
WRIST = 2,
FINGER = 3,
HAND = 4,
EAR_LOBE = 5,
FOOT = 6
class HeartRateMeasurement:
def __init__(
self,
heart_rate,
sensor_contact_detected=None,
energy_expended=None,
rr_intervals=None
):
if heart_rate < 0 or heart_rate > 0xFFFF:
raise ValueError('heart_rate out of range')
if energy_expended is not None and (energy_expended < 0 or energy_expended > 0xFFFF):
raise ValueError('energy_expended out of range')
if rr_intervals:
for rr_interval in rr_intervals:
if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
raise ValueError('rr_intervals out of range')
self.heart_rate = heart_rate
self.sensor_contact_detected = sensor_contact_detected
self.energy_expended = energy_expended
self.rr_intervals = rr_intervals
@classmethod
def from_bytes(cls, data):
flags = data[0]
offset = 1
if flags & 1:
hr = struct.unpack_from('<H', data, offset)[0]
offset += 2
else:
hr = struct.unpack_from('B', data, offset)[0]
offset += 1
if flags & (1 << 2):
sensor_contact_detected = (flags & (1 << 1) != 0)
else:
sensor_contact_detected = None
if flags & (1 << 3):
energy_expended = struct.unpack_from('<H', data, offset)[0]
offset += 2
else:
energy_expended = None
if flags & (1 << 4):
rr_intervals = tuple(
struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
for i in range((len(data) - offset) // 2)
)
else:
rr_intervals = ()
return cls(hr, sensor_contact_detected, energy_expended, rr_intervals)
def __bytes__(self):
if self.heart_rate < 256:
flags = 0
data = struct.pack('B', self.heart_rate)
else:
flags = 1
data = struct.pack('<H', self.heart_rate)
if self.sensor_contact_detected is not None:
flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2)
if self.energy_expended is not None:
flags |= (1 << 3)
data += struct.pack('<H', self.energy_expended)
if self.rr_intervals:
flags |= (1 << 4)
data += b''.join([
struct.pack('<H', int(rr_interval * 1024))
for rr_interval in self.rr_intervals
])
return bytes([flags]) + data
def __str__(self):
return f'HeartRateMeasurement(heart_rate={self.heart_rate},'\
f' sensor_contact_detected={self.sensor_contact_detected},'\
f' energy_expended={self.energy_expended},'\
f' rr_intervals={self.rr_intervals})'
def __init__(
self,
read_heart_rate_measurement,
body_sensor_location=None,
reset_energy_expended=None
):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.NOTIFY,
0,
CharacteristicValue(read=read_heart_rate_measurement)
),
encode=lambda value: bytes(value)
)
characteristics = [self.heart_rate_measurement_characteristic]
if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
bytes([int(body_sensor_location)])
)
characteristics.append(self.body_sensor_location_characteristic)
if reset_energy_expended:
def write_heart_rate_control_point_value(connection, value):
if value == self.RESET_ENERGY_EXPENDED:
if reset_energy_expended is not None:
reset_energy_expended(connection)
else:
raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=write_heart_rate_control_point_value)
),
format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT
)
characteristics.append(self.heart_rate_control_point_characteristic)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
class HeartRateServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = HeartRateService
def __init__(self, service_proxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC):
self.heart_rate_measurement = DelegatedCharacteristicAdapter(
characteristics[0],
decode=HeartRateService.HeartRateMeasurement.from_bytes
)
else:
self.heart_rate_measurement = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC):
self.body_sensor_location = DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda value: HeartRateService.BodySensorLocation(value[0])
)
else:
self.body_sensor_location = None
if characteristics := service_proxy.get_characteristics_by_uuid(GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC):
self.heart_rate_control_point = PackedCharacteristicAdapter(
characteristics[0],
format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT
)
else:
self.heart_rate_control_point = None
async def reset_energy_expended(self):
if self.heart_rate_control_point is not None:
return await self.heart_rate_control_point.write_value(HeartRateService.RESET_ENERGY_EXPENDED)

View File

@@ -150,6 +150,8 @@ SMP_SC_AUTHREQ = 0b00001000
SMP_KEYPRESS_AUTHREQ = 0b00010000
SMP_CT2_AUTHREQ = 0b00100000
# Crypto salt
SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
# -----------------------------------------------------------------------------
# Utils
@@ -457,22 +459,38 @@ class PairingDelegate:
DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
DEFAULT_KEY_DISTRIBUTION = (SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG)
def __init__(self, io_capability = NO_OUTPUT_NO_INPUT):
def __init__(
self,
io_capability=NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION
):
self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution
async def accept(self):
return True
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits=6):
return True
async def get_number(self):
return 0
async def display_number(self, number):
async def display_number(self, number, digits=6):
pass
async def key_distribution_response(self, peer_initiator_key_distribution, peer_responder_key_distribution):
return (
(peer_initiator_key_distribution &
self.local_initiator_key_distribution),
(peer_responder_key_distribution &
self.local_responder_key_distribution)
)
# -----------------------------------------------------------------------------
class PairingConfig:
@@ -559,6 +577,7 @@ class Session:
self.ltk = None
self.ltk_ediv = 0
self.ltk_rand = bytes(8)
self.link_key = None
self.initiator_key_distribution = 0
self.responder_key_distribution = 0
self.peer_random_value = None
@@ -596,11 +615,8 @@ class Session:
self.pairing_result = None
# Key Distribution (default values before negotiation)
self.initiator_key_distribution = (
SMP_ENC_KEY_DISTRIBUTION_FLAG |
SMP_ID_KEY_DISTRIBUTION_FLAG # |SMP_SIGN_KEY_DISTRIBUTION_FLAG
)
self.responder_key_distribution = self.initiator_key_distribution
self.initiator_key_distribution = pairing_config.delegate.local_initiator_key_distribution
self.responder_key_distribution = pairing_config.delegate.local_responder_key_distribution
# Authentication Requirements Flags - Vol 3, Part H, Figure 3.3
self.bonding = pairing_config.bonding
@@ -699,7 +715,7 @@ class Session:
async def prompt():
logger.debug(f'verification code: {code}')
try:
response = await self.pairing_config.delegate.compare_numbers(code)
response = await self.pairing_config.delegate.compare_numbers(code, digits=6)
if response:
next_steps()
return
@@ -733,7 +749,7 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey))
asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey, digits=6))
def input_passkey(self, next_steps=None):
# Prompt the user for the passkey displayed on the peer
@@ -870,47 +886,56 @@ class Session:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute IRK
# Distribute IRK & BD ADDR
if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
self.send_command(
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
# Distribute BD ADDR
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
# Distribute CSRK
csrk = bytes(16) # FIXME: testing
if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
salt=SMP_CTKD_H7_LEBR_SALT,
w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1')
self.link_key = crypto.h6(ilk, b'lebr')
else:
# Distribute the LTK
# Distribute the LTK, EDIV and RAND
if not self.sc:
if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk))
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute EDIV and RAND
self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand))
# Distribute IRK
# Distribute IRK & BD ADDR
if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
self.send_command(
SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk)
)
# Distribute BD ADDR
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
self.send_command(SMP_Identity_Address_Information_Command(
addr_type = self.manager.address.address_type,
bd_addr = self.manager.address
))
# Distribute CSRK
csrk = bytes(16) # FIXME: testing
if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
# CTKD, calculate BR/EDR link key
if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
ilk = crypto.h7(
salt=SMP_CTKD_H7_LEBR_SALT,
w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1')
self.link_key = crypto.h6(ilk, b'lebr')
def compute_peer_expected_distributions(self, key_distribution_flags):
# Set our expectations for what to wait for in the key distribution phase
@@ -945,7 +970,7 @@ class Session:
# Nothing left to expect, we're done
self.on_pairing()
else:
logger.warn(color('!!! unexpected key distribution command', 'red'))
logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red'))
self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
async def pair(self):
@@ -1029,6 +1054,11 @@ class Session:
value = self.peer_signature_key,
authenticated = authenticated
)
if self.link_key is not None:
keys.link_key = PairingKeys.Key(
value = self.link_key,
authenticated = authenticated
)
self.manager.on_pairing(self, peer_address, keys)
@@ -1076,6 +1106,7 @@ class Session:
# Bonding and SC require both sides to request/support it
self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
# Check for OOB
if command.oob_data_flag != 0:
@@ -1091,8 +1122,8 @@ class Session:
logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}')
# Key distribution
self.initiator_key_distribution &= command.initiator_key_distribution
self.responder_key_distribution &= command.responder_key_distribution
self.initiator_key_distribution, self.responder_key_distribution = await self.pairing_config.delegate.key_distribution_response(
command.initiator_key_distribution, command.responder_key_distribution)
self.compute_peer_expected_distributions(self.initiator_key_distribution)
# The pairing is now starting

View File

@@ -37,14 +37,17 @@ async def open_usb_transport(spec):
'''
Open a USB transport.
The parameter string has this syntax:
either <index> or <vendor>:<product>
either <index> or <vendor>:<product>[/<serial-number>]
With <index> as the 0-based index to select amongst all the devices that appear
to be supporting Bluetooth HCI (0 being the first one), or
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal.
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The
/<serial-number> suffix max be specified when more than one device with the same
vendor and product identifiers are present.
Examples:
0 --> the first BT USB dongle
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
'''
USB_RECIPIENT_DEVICE = 0x00
@@ -218,7 +221,7 @@ async def open_usb_transport(spec):
pass
logger.debug('USB event loop done')
self.event_loop_done.set_result(None)
self.loop.call_soon_threadsafe(self.event_loop_done.set_result, None)
async def close(self):
self.closed = True
@@ -268,22 +271,32 @@ async def open_usb_transport(spec):
found = None
if ':' in spec:
vendor_id, product_id = spec.split(':')
found = context.getByVendorIDAndProductID(int(vendor_id, 16), int(product_id, 16), skip_on_error=True)
if '/' in product_id:
product_id, serial_number = product_id.split('/')
for device in context.getDeviceIterator(skip_on_error=True):
if (
device.getVendorID() == int(vendor_id, 16) and
device.getProductID() == int(product_id, 16) and
device.getSerialNumber() == serial_number
):
found = device
break
device.close()
else:
found = context.getByVendorIDAndProductID(int(vendor_id, 16), int(product_id, 16), skip_on_error=True)
else:
device_index = int(spec)
device_iterator = context.getDeviceIterator(skip_on_error=True)
try:
for device in device_iterator:
if device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and \
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and \
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER:
if device_index == 0:
found = device
break
device_index -= 1
device.close()
finally:
device_iterator.close()
for device in context.getDeviceIterator(skip_on_error=True):
if (
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
):
if device_index == 0:
found = device
break
device_index -= 1
device.close()
if found is None:
context.close()

View File

@@ -5,6 +5,8 @@ use_directory_urls: false
nav:
- Introduction: index.md
- Getting Started: getting_started.md
- Development:
- Python Environments: development/python_environments.md
- Use Cases:
- Overview: use_cases/index.md
- Use Case 1: use_cases/use_case_1.md
@@ -63,7 +65,10 @@ theme:
custom_dir: 'theme'
plugins:
- mkdocstrings
- mkdocstrings:
handlers:
python:
paths: [../..]
docs_dir: 'src'

View File

@@ -0,0 +1,2 @@
GATT DUMP TOOL
==============

View File

@@ -5,8 +5,10 @@ Included in the project are a few apps and tools, built on top of the core libra
These include:
* [Console](console.md) - an interactive text-based console
* [Pair](pair.md) - Pair/bond two devices (LE and Classic)
* [Unbond](unbond.md) - Remove a previously established bond
* [HCI Bridge](hci_bridge.md) - a HCI transport bridge to connect two HCI transports and filter/snoop the HCI packets
* [Golden Gate Bridge](gg_bridge.md) - a bridge between GATT and UDP to use with the Golden Gate "stack tool"
* [`Show`](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form
* [`Link Relay`](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other.
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form
* [Link Relay](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other.

View File

@@ -0,0 +1,2 @@
PAIR TOOL
=========

View File

@@ -0,0 +1,2 @@
SHOW TOOL
=========

View File

@@ -0,0 +1,2 @@
UNBOND TOOL
===========

View File

@@ -0,0 +1,45 @@
PYTHON ENVIRONMENTS
===================
When you don't want to install Bumble in your main/default python environment,
using a virtual environment, where the package and its dependencies can be
installed, isolated from the rest, may be useful.
There are many flavors of python environments and dependency managers.
This page describes a few of the most common ones.
## venv
`venv` is a standard module that is included with python.
Visit the [`venv` documentation](https://docs.python.org/3/library/venv.html) page for details.
## Pyenv
`pyenv` lets you easily switch between multiple versions of Python. It's simple, unobtrusive, and follows the UNIX tradition of single-purpose tools that do one thing well.
Visit the [`pyenv` site](https://github.com/pyenv/pyenv) for instructions on how to install
and use `pyenv`
## Conda
Conda is a convenient package manager and virtual environment.
The file `environment.yml` is a Conda environment file that you can use to create
a new Conda environment. Once created, you can simply activate this environment when
working with Bumble.
Visit the [Conda site](https://docs.conda.io/en/latest/) for instructions on how to install
and use Conda.
A few useful commands:
### Create a new `bumble` Conda environment
```
$ conda env create -f environment.yml
```
This will create a new environment, named `bumble`, which you can then activate with:
```
$ conda activate bumble
```
### Update an existing `bumble` environment
```
$ conda env update -f environment.yml
```

View File

@@ -20,7 +20,7 @@ You may be simply using Bumble as a module for your own application or as a depe
module, or you may be working on modifying or contributing to the Bumble module or example code
itself.
# Working With Bumble As A Module
# Using Bumble As A Python Module
## Installing
@@ -29,52 +29,40 @@ manager, or from source.
!!! tip "Python Virtual Environments"
When you install Bumble, you have the option to install it as part of your default
python environment, or in a virtual environment, such as a `venv`, `pyenv` or `conda` environment
### venv
`venv` is a standard module that is included with python.
Visit the [`venv` documentation](https://docs.python.org/3/library/venv.html) page for details.
### Pyenv
`pyenv` lets you easily switch between multiple versions of Python. It's simple, unobtrusive, and follows the UNIX tradition of single-purpose tools that do one thing well.
Visit the [`pyenv` site](https://github.com/pyenv/pyenv) for instructions on how to install
and use `pyenv`
### Conda
Conda is a convenient package manager and virtual environment.
The file `environment.yml` is a Conda environment file that you can use to create
a new Conda environment. Once created, you can simply activate this environment when
working with Bumble.
Visit the [Conda side](https://docs.conda.io/en/latest/) for instructions on how to install
and use Conda.
A few useful commands:
#### Create a new `bumble` Conda environment
```
$ conda env create -f environment.yml
```
This will create a new environment, named `bumble`, which you can then activate with:
```
$ conda activate bumble
```
#### Update an existing `bumble` environment
```
$ conda env update -f environment.yml
```
python environment, or in a virtual environment, such as a `venv`, `pyenv` or `conda` environment.
See the [Python Environments page](development/python_environments.md) page for details.
### Install From Source
The instructions for working with virtual Python environments above also apply in this case.
Install with `pip`
Install with `pip`. Run in a command shell in the directory where you downloaded the source
distribution
```
$ python -m pip install -e .
```
### Install from GitHub
You can install directly from GitHub without first downloading the repo.
Install the latest commit from the main branch with `pip`:
```
$ python -m pip install git+https://github.com/google/bumble.git
```
You can specify a specific tag.
Install tag `v0.0.1` with `pip`:
```
$ python -m pip install git+https://github.com/google/bumble.git@v0.0.1
```
You can also specify a specific commit.
Install commit `27c0551` with `pip`:
```
$ python -m pip install git+https://github.com/google/bumble.git@27c0551
```
# Working On The Bumble Code
When you work on the Bumble code itself, and run some of the tests or example apps, or import the
module in your own code, you typically either install the package from source in "development mode" as described above, or you may choose to skip the install phase.
@@ -106,3 +94,13 @@ Setting `PYTHONPATH` locally with each command would look something like:
```
$ PYTHONPATH=. python examples/run_advertiser.py examples/device1.json serial:/dev/tty.usbmodem0006839912171
```
# Where To Go Next
Once you've installed or downloaded Bumble, you can either start using some of the
[Bundled apps and tools](apps_and_tools/index.md), or look at the [examples](examples/index.md)
to get a feel for how to use the APIs, and start writing your own applications.
Depending on the use case you're interested in exploring, you may need to use a physical Bluetooth
controller, like a USB dongle or a board with a Bluetooth radio. Visit the [Hardware page](hardware/index.md)
for more information on using a physical radio, and/or the [Transports page](transports/index.md) for more
details on interfacing with either hardware modules or virtual controllers over various transports.

View File

@@ -46,7 +46,7 @@ through it with Android applications as well as system-managed profiles.
To connect a Bumble host stack to a Root Canal virtual controller instance, use
the bumble `android-emulator` transport in `host` mode (the default).
!!! example "Running the example GATT server connected to the emulator"
!!! example "Run the example GATT server connected to the emulator"
``` shell
$ python run_gatt_server.py device1.json android-emulator
```
@@ -57,7 +57,7 @@ This is an advanced use case, which may not be officially supported, but should
versions of the emulator.
You will likely need to start the emulator from the command line, in order to specify the `-forward-vhci` option (unless the emulator offers a way to control that feature from a user/ui menu).
!!! example "Launching the emulator with VHCI forwarding"
!!! example "Launch the emulator with VHCI forwarding"
In this example, we launch an emulator AVD named "Tiramisu"
```shell
$ emulator -forward-vhci -avd Tiramisu
@@ -70,10 +70,20 @@ You will likely need to start the emulator from the command line, in order to sp
To connect a virtual controller to the Android Bluetooth stack, use the bumble `android-emulator` transport in `controller` mode. For example, using the default gRPC port, the transport name would be: `android-emulator:mode=controller`.
!!! example "connecting the Android emulator to the first USB Bluetooth dongle, using the `hci_bridge` application"
!!! example "Connect the Android emulator to the first USB Bluetooth dongle, using the `hci_bridge` application"
```shell
$ bumble-hci-bridge android-emulator:mode=controller usb:0
```
## Other Tools
The `show` application that's included with Bumble can be used to parse and pretty-print the HCI packets
from an Android HCI "snoop log" (see [this page](https://source.android.com/devices/bluetooth/verifying_debugging)
for details on how to obtain HCI snoop logs from an Android device).
Use the `--format snoop` option to specify that the file is in that specific format.
!!! example "Analyze an Android HCI snoop log file"
```shell
$ bumble-show --format snoop btsnoop_hci.log
```

View File

@@ -0,0 +1,72 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Battery Service
peer = Peer(connection)
print('=== Discovering Battery Service')
battery_service = await peer.discover_service_and_create_proxy(BatteryServiceProxy)
# Check that the service was found
if not battery_service:
print('!!! Service not found')
return
# Subscribe to and read the battery level
if battery_service.battery_level:
await battery_service.battery_level.subscribe(
lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
)
value = await battery_service.battery_level.read_value()
print(f'{color("Initial Battery Level:", "green")} {value}')
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -25,59 +25,41 @@ import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.gatt import (
Service,
Characteristic,
CharacteristicValue,
GATT_DEVICE_BATTERY_SERVICE,
GATT_BATTERY_LEVEL_CHARACTERISTIC
)
# -----------------------------------------------------------------------------
def read_battery_level(connection):
return bytes([random.randint(0, 100)])
from bumble.profiles.battery_service import BatteryService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python battery_service.py <device-config> <transport-spec>')
print('example: python battery_service.py device1.json usb:0')
print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Add a Battery Service to the GATT sever
device.add_services([
Service(
GATT_DEVICE_BATTERY_SERVICE,
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level)
)
]
)
])
battery_service = BatteryService(lambda _: random.randint(0, 100))
device.add_service(battery_service)
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Battery', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, struct.pack('<H', 0x180F)),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(battery_service.uuid)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising()
await hci_source.wait_for_termination()
await device.start_advertising(auto_restart=True)
# Notify every 3 seconds
while True:
await asyncio.sleep(3.0)
await device.notify_subscribers(battery_service.battery_level_characteristic)
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())

View File

@@ -0,0 +1,80 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: device_information_client.py <transport-spec> <bluetooth-address>')
print('example: device_information_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Device Information service
peer = Peer(connection)
print('=== Discovering Device Information Service')
device_information_service = await peer.discover_service_and_create_proxy(DeviceInformationServiceProxy)
# Check that the service was found
if device_information_service is None:
print('!!! Service not found')
return
# Read and print the fields
if device_information_service.manufacturer_name is not None:
print(color('Manufacturer Name: ', 'green'), await device_information_service.manufacturer_name.read_value())
if device_information_service.model_number is not None:
print(color('Model Number: ', 'green'), await device_information_service.model_number.read_value())
if device_information_service.serial_number is not None:
print(color('Serial Number: ', 'green'), await device_information_service.serial_number.read_value())
if device_information_service.hardware_revision is not None:
print(color('Hardware Revision: ', 'green'), await device_information_service.hardware_revision.read_value())
if device_information_service.firmware_revision is not None:
print(color('Firmware Revision: ', 'green'), await device_information_service.firmware_revision.read_value())
if device_information_service.software_revision is not None:
print(color('Software Revision: ', 'green'), await device_information_service.software_revision.read_value())
if device_information_service.system_id is not None:
print(color('System ID: ', 'green'), await device_information_service.system_id.read_value())
if device_information_service.ieee_regulatory_certification_data_list is not None:
print(color('Regulatory Certification:', 'green'), (await device_information_service.ieee_regulatory_certification_data_list.read_value()).hex())
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,66 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.profiles.device_information_service import DeviceInformationService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService(
manufacturer_name = 'ACME',
model_number = 'AB-102',
serial_number = '7654321',
hardware_revision = '1.1.3',
software_revision = '2.5.6',
system_id = (0x123456, 0x8877665544)
)
device.add_service(device_information_service)
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Device', 'utf-8')),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -1,96 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.transport import open_transport
from bumble.utils import AsyncRunner
from bumble import gatt
# -----------------------------------------------------------------------------
class Listener(Device.Listener):
def __init__(self, device):
self.device = device
self.done = asyncio.get_running_loop().create_future()
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
print(f'=== Connected to {connection}')
# Discover the Device Info service
peer = Peer(connection)
print('=== Discovering Device Info')
await peer.discover_services([gatt.GATT_DEVICE_INFORMATION_SERVICE])
# Check that the service was found
device_info_services = peer.get_services_by_uuid(gatt.GATT_DEVICE_INFORMATION_SERVICE)
if not device_info_services:
print('!!! Service not found')
return
# Get the characteristics we want from the (first) device info service
service = device_info_services[0]
await peer.discover_characteristics([
gatt.GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC
], service)
# Read the manufacturer name
manufacturer_name = peer.get_characteristics_by_uuid(gatt.GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, service)
if manufacturer_name:
value = await peer.read_value(manufacturer_name[0])
print(color('Manufacturer Name:', 'green'), value.decode('utf-8'))
else:
print('>>> No manufacturer name')
self.done.set_result(None)
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: get_peer_device_info.py <transport-spec> <bluetooth-address>')
print('example: get_peer_device_info.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
packet_source, packet_sink = await open_transport(sys.argv[1])
print('<<< connected')
# Create a host using the packet source and sink as controller
host = Host(controller_source=packet_source, controller_sink=packet_sink)
# Create a device to manage the host, with a custom listener
device = Device('Bumble', address = 'F0:F1:F2:F3:F4:F5', host = host)
device.listener = Listener(device)
await device.power_on()
# Connect to a peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
await device.connect(target_address)
await device.listener.done
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,75 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
await device.power_on()
# Connect to the peer
target_address = sys.argv[2]
print(f'=== Connecting to {target_address}...')
connection = await device.connect(target_address)
print(f'=== Connected to {connection}')
# Discover the Heart Rate Service
peer = Peer(connection)
print('=== Discovering Heart Rate Service')
heart_rate_service = await peer.discover_service_and_create_proxy(HeartRateServiceProxy)
# Check that the service was found
if not heart_rate_service:
print('!!! Service not found')
return
# Read the body sensor location
if heart_rate_service.body_sensor_location:
location = await heart_rate_service.body_sensor_location.read_value()
print(color('Sensor Location:', 'green'), location)
# Subscribe to the heart rate measurement
if heart_rate_service.heart_rate_measurement:
await heart_rate_service.heart_rate_measurement.subscribe(
lambda value: print(f'{color("Heart Rate Measurement:", "green")} {value}')
)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,95 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import sys
import time
import math
import random
import struct
import logging
import asyncio
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.profiles.device_information_service import DeviceInformationService
from bumble.profiles.heart_rate_service import HeartRateService
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
# Keep track of accumulated expended energy
energy_start_time = time.time()
def reset_energy_expended():
nonlocal energy_start_time
energy_start_time = time.time()
# Add a Device Information Service and Heart Rate Service to the GATT sever
device_information_service = DeviceInformationService(
manufacturer_name = 'ACME',
model_number = 'HR-102',
serial_number = '7654321',
hardware_revision = '1.1.3',
software_revision = '2.5.6',
system_id = (0x123456, 0x8877665544)
)
heart_rate_service = HeartRateService(
read_heart_rate_measurement = lambda _: HeartRateService.HeartRateMeasurement(
heart_rate = 100 + int(50 * math.sin(time.time() * math.pi / 60)),
sensor_contact_detected = random.choice((True, False, None)),
energy_expended = random.choice((int((time.time() - energy_start_time) * 100), None)),
rr_intervals = random.choice(((random.randint(900, 1100) / 1000, random.randint(900, 1100) / 1000), None))
),
body_sensor_location=HeartRateService.BodySensorLocation.WRIST,
reset_energy_expended=lambda _: reset_energy_expended()
)
device.add_services([device_information_service, heart_rate_service])
# Set the advertising data
device.advertising_data = bytes(
AdvertisingData([
(AdvertisingData.COMPLETE_LOCAL_NAME, bytes('Bumble Heart', 'utf-8')),
(AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, bytes(heart_rate_service.uuid)),
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340))
])
)
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
# Notify every 3 seconds
while True:
await asyncio.sleep(3.0)
await device.notify_subscribers(heart_rate_service.heart_rate_measurement_characteristic)
# -----------------------------------------------------------------------------
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -45,8 +45,7 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller('C1', host_source = hci_source, host_sink = hci_sink, link = link)
print("====", sys.argv)
controller1.address = sys.argv[1]
controller1.random_address = sys.argv[1]
# Create a second controller using the same link
controller2 = Controller('C2', link = link)

View File

@@ -41,10 +41,10 @@ class Listener(Device.Listener):
print('=== Discovering services')
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
for service in peer.services:
await service.discover_characteristics()
for characteristic in service.characteristics:
await peer.discover_descriptors(characteristic)
await characteristic.discover_descriptors()
print('=== Services discovered')
show_services(peer.services)

View File

@@ -25,7 +25,6 @@ from bumble.controller import Controller
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.link import LocalLink
from bumble.utils import AsyncRunner
from bumble.gatt import (
Service,
Characteristic,
@@ -37,43 +36,6 @@ from bumble.gatt import (
)
# -----------------------------------------------------------------------------
class ClientListener(Device.Listener):
def __init__(self, device):
self.device = device
@AsyncRunner.run_in_task()
async def on_connection(self, connection):
print(f'=== Client: connected to {connection}')
# Discover all services
print('=== Discovering services')
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
for service in peer.services:
for characteristic in service.characteristics:
await peer.discover_descriptors(characteristic)
print('=== Services discovered')
show_services(peer.services)
# Discover all attributes
print('=== Discovering attributes')
attributes = await peer.discover_attributes()
for attribute in attributes:
print(attribute)
print('=== Attributes discovered')
# Read all attributes
for attribute in attributes:
try:
value = await peer.read_value(attribute)
print(color(f'0x{attribute.handle:04X} = {value.hex()}', 'green'))
except ProtocolError as error:
print(color(f'cannot read {attribute.handle:04X}:', 'red'), error)
# -----------------------------------------------------------------------------
class ServerListener(Device.Listener):
def on_connection(self, connection):
@@ -90,7 +52,6 @@ async def main():
client_host = Host()
client_host.controller = client_controller
client_device = Device("client", address = 'F0:F1:F2:F3:F4:F5', host = client_host)
client_device.listener = ClientListener(client_device)
await client_device.power_on()
# Setup a stack for the server
@@ -116,7 +77,36 @@ async def main():
server_device.add_service(device_info_service)
# Connect the client to the server
await client_device.connect(server_device.address)
connection = await client_device.connect(server_device.random_address)
print(f'=== Client: connected to {connection}')
# Discover all services
print('=== Discovering services')
peer = Peer(connection)
await peer.discover_services()
for service in peer.services:
await service.discover_characteristics()
for characteristic in service.characteristics:
await characteristic.discover_descriptors()
print('=== Services discovered')
show_services(peer.services)
# Discover all attributes
print('=== Discovering attributes')
attributes = await peer.discover_attributes()
for attribute in attributes:
print(attribute)
print('=== Attributes discovered')
# Read all attributes
for attribute in attributes:
try:
value = await attribute.read_value()
print(color(f'0x{attribute.handle:04X} = {value.hex()}', 'green'))
except ProtocolError as error:
print(color(f'cannot read {attribute.handle:04X}:', 'red'), error)
await asyncio.get_running_loop().create_future()
# -----------------------------------------------------------------------------

View File

@@ -26,7 +26,6 @@ packages = bumble, bumble.transport, bumble.apps, bumble.apps.link_relay
package_dir =
bumble = bumble
bumble.apps = apps
bumble.apps.link_relay = apps/link_relay
install_requires =
aioconsole >= 0.4.1
ansicolors >= 1.1
@@ -66,4 +65,4 @@ development =
documentation =
mkdocs >= 1.2.3
mkdocs-material >= 8.1.9
mkdocstrings >= 0.17.0
mkdocstrings[python] >= 0.19.0

View File

@@ -18,6 +18,7 @@
import asyncio
import logging
import os
import struct
import pytest
from bumble.controller import Controller
@@ -25,6 +26,12 @@ from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue
@@ -91,6 +98,96 @@ def test_ATT_Read_By_Group_Type_Request():
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v)
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == v)
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(c.value == v)
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None)
assert(value == bytes(reversed(v)))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert(a.value == bytes(reversed(v)))
# Packed adapter with single element format
v = 1234
pv = struct.pack('>H', v)
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == v)
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == (v1, v2))
# Mapped adapter
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert(value == pv)
c.value = None
a.write_value(None, pv)
assert(a.value == mapped)
# UTF-8 adapter
v = 'Hello π'
ev = v.encode('utf-8')
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert(value == ev)
c.value = None
a.write_value(None, ev)
assert(a.value == v)
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert(x == b)
result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object()
c.write(z, b)
assert(result == [(z, b)])
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
@@ -199,6 +296,56 @@ async def test_read_write():
assert(characteristic2._last_value[1] == b)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = TwoDevices().devices
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert(len(c) == 1)
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert(len(c) == 1)
c1 = c[0]
v1 = await c1.read_value()
assert(v1 == v)
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', v)[0])
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert(characteristic1.value == b)
v1 = await a1.read_value()
assert(v1 == struct.unpack('>I', b)[0])
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
@@ -330,6 +477,7 @@ async def test_subscribe_notify():
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
# -----------------------------------------------------------------------------
@@ -338,4 +486,6 @@ if __name__ == '__main__':
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())

View File

@@ -1,2 +1,3 @@
[pytest]
junit_logging = all
asyncio_mode = auto

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import itertools
import logging
import os
import pytest
@@ -30,7 +31,8 @@ from bumble.smp import (
PairingConfig,
PairingDelegate,
SMP_PAIRING_NOT_SUPPORTED_ERROR,
SMP_CONFIRM_VALUE_FAILED_ERROR
SMP_CONFIRM_VALUE_FAILED_ERROR,
SMP_ID_KEY_DISTRIBUTION_FLAG,
)
from bumble.core import ProtocolError
@@ -196,11 +198,28 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# -----------------------------------------------------------------------------
IO_CAP = [
PairingDelegate.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
]
SC = [False, True]
MITM = [False, True]
# Key distribution is a 4-bit bitmask
# IdKey is necessary for current SMP structure
KEY_DIST = [i for i in range(16) if (i & SMP_ID_KEY_DISTRIBUTION_FLAG)]
@pytest.mark.asyncio
async def test_self_smp():
@pytest.mark.parametrize('io_cap, sc, mitm, key_dist',
itertools.product(IO_CAP, SC, MITM, KEY_DIST)
)
async def test_self_smp(io_cap, sc, mitm, key_dist):
class Delegate(PairingDelegate):
def __init__(self, name, io_capability):
super().__init__(io_capability)
def __init__(self, name, io_capability, local_initiator_key_distribution, local_responder_key_distribution):
super().__init__(io_capability, local_initiator_key_distribution,
local_responder_key_distribution)
self.name = name
self.reset()
@@ -208,11 +227,11 @@ async def test_self_smp():
self.peer_delegate = None
self.number = asyncio.get_running_loop().create_future()
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
if self.peer_delegate is None:
logger.warn(f'[{self.name}] no peer delegate')
return False
await self.display_number(number)
await self.display_number(number, digits=6)
logger.debug(f'[{self.name}] waiting for peer number')
peer_number = await self.peer_delegate.number
logger.debug(f'[{self.name}] comparing numbers: {number} and {peer_number}')
@@ -231,7 +250,7 @@ async def test_self_smp():
logger.debug(f'[{self.name}] returning number: {peer_number}')
return peer_number
async def display_number(self, number):
async def display_number(self, number, digits):
logger.debug(f'[{self.name}] displaying number: {number}')
self.number.set_result(number)
@@ -240,17 +259,8 @@ async def test_self_smp():
pairing_config_sets = [('Initiator', [None]), ('Responder', [None])]
for pairing_config_set in pairing_config_sets:
for io_capability in [
PairingDelegate.NO_OUTPUT_NO_INPUT,
PairingDelegate.KEYBOARD_INPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_ONLY,
PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT
]:
for sc in [False, True]:
for mitm in [False, True]:
delegate = Delegate(pairing_config_set[0], io_capability)
pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate))
delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist)
pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate))
for pairing_config1 in pairing_config_sets[0][1]:
for pairing_config2 in pairing_config_sets[1][1]:
@@ -262,7 +272,9 @@ async def test_self_smp():
if pairing_config1 and pairing_config2:
pairing_config1.delegate.peer_delegate = pairing_config2.delegate
pairing_config2.delegate.peer_delegate = pairing_config1.delegate
await _test_self_smp_with_configs(pairing_config1, pairing_config2)
# -----------------------------------------------------------------------------
@@ -293,7 +305,7 @@ async def test_self_smp_wrong_pin():
def __init__(self):
super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
async def compare_numbers(self, number):
async def compare_numbers(self, number, digits):
return False
wrong_pin_pairing_config = PairingConfig(delegate = WrongPinDelegate())

View File

@@ -176,6 +176,20 @@ def test_g2():
assert(value == 0x2f9ed5ba)
# -----------------------------------------------------------------------------
def test_h6():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
KEY_ID = bytes.fromhex('6c656272')
assert(h6(KEY, KEY_ID) == bytes.fromhex('2d9ae102 e76dc91c e8d3a9e2 80b16399'))
# -----------------------------------------------------------------------------
def test_h7():
KEY = bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')
SALT = bytes.fromhex('00000000 00000000 00000000 746D7031')
assert(h7(SALT, KEY) == bytes.fromhex('fb173597 c6a3c0ec d2998c2a 75a57011'))
# -----------------------------------------------------------------------------
def test_ah():
irk = bytes(reversed(bytes.fromhex('ec0234a3 57c8ad05 341010a6 0a397d9b')))
@@ -195,4 +209,6 @@ if __name__ == '__main__':
test_f5()
test_f6()
test_g2()
test_h6()
test_h7()
test_ah()