Compare commits

...

44 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
49b2c13e69 only use 1M parameters by default 2023-04-09 17:57:11 -07:00
Lucas Abel
962737a97b Merge pull request #173 from zxzxwu/smp
SMP: Determine initiator by direction instead of role
2023-04-07 09:50:06 -07:00
Josh Wu
85496aaff5 SMP: Determine initiator by direction instead of role
Though in the spec this is not allowed, but in some ambiguous cases such
as SMP over BR/EDR or we want to test how remote devices handle invalid
pairing requests, we still need to allow this behavior, with some logs
to let users know it's invalid.
2023-04-08 00:02:48 +08:00
Lucas Abel
a95e601a5c Merge pull request #172 from qiaoccolato/main
LICENCE: recorde colors.py licence
2023-04-05 13:31:13 -07:00
Qiao Yang
df218b5370 LICENCE: recorde colors.py licence 2023-04-05 19:02:44 +00:00
Alan Rosenthal
0f737244b5 Merge pull request #169 from AlanRosenthal/alan/remote-values
Add `show remote-values`
2023-04-05 09:00:48 -04:00
Alan Rosenthal
a258ba383a Add show remote-values
`gatt_client` caches values read/notifications/indications and displays the most recent value to the user
2023-04-04 18:15:42 +00:00
Gilles Boccon-Gibod
c53e1d2480 Merge pull request #171 from google/gbg/keystore-name
use device public or static address for keystore namespace
2023-04-03 17:58:18 -07:00
Gilles Boccon-Gibod
620c135ac4 only instantiate keystore if not already set 2023-04-03 17:52:51 -07:00
Gilles Boccon-Gibod
fca73a49a3 use device public or static address for keystore namespace 2023-04-03 12:39:22 -07:00
Alan Rosenthal
cf70db84a1 Merge pull request #170 from AlanRosenthal/alan/fix-char-proxy-print
Fix CharacteristicProxy __str__
2023-03-31 17:25:50 -04:00
Alan Rosenthal
7731c41f80 Fix CharacteristicProxy __str__
property was really an int, and needed to be transformed into a `Characteristic.Properties`
2023-03-31 17:06:33 -04:00
Alan Rosenthal
278341cbc0 Merge pull request #167 from AlanRosenthal/alan/properties
Create Characteristic.Property
2023-03-31 16:14:38 -04:00
Alan Rosenthal
fb49a87494 Create Characteristic.Property
Move all Characteristic properties into its own `enum.IntFlag` class
2023-03-31 16:09:24 -04:00
Alan Rosenthal
eba82b9d9a Merge pull request #164 from AlanRosenthal/alan/local-write
Add `local-write` to bumble-console
2023-03-31 16:07:09 -04:00
Alan Rosenthal
677fc77d3c Merge pull request #163 from AlanRosenthal/alan/local-values
Add `show local-values`
2023-03-31 16:03:52 -04:00
Alan Rosenthal
e026de295f Add show local-values
This PR adds a way to display the local gatt characteristics/descriptors values

If no connections, it shows the value of every characteristic/descriptor.
When there's a connection, it shows the value for each specific connection - CCCDs are connection specific

This screen auto-updates every second
2023-03-31 00:20:07 +00:00
Alan Rosenthal
52c15705e9 Add local-write to bumble-console
Add a command to update the local gatt server, and notify/indicate subscribes (if any)
2023-03-30 12:33:32 -04:00
Lucas Abel
45ca0ef071 Merge pull request #166 from google/uael/att-fix
att: fixed use of unknown attribute
2023-03-30 07:12:28 -07:00
uael
e0af954baa att: fixed use of unknown attribute 2023-03-30 14:05:43 +00:00
Lucas Abel
044597de66 Merge pull request #161 from google/uael/smp-get-number-type-hint
smp: fix `PairingDelegate.get_number` return type
2023-03-28 11:48:09 -07:00
Lucas Abel
fb68fa6a33 Merge pull request #162 from zxzxwu/roleswitch
Add role switch test and assertion in self test
2023-03-28 06:41:03 -07:00
Josh Wu
b6fe7460ac Add role switch test and assertion in self test 2023-03-28 12:52:00 +08:00
Lucas Abel
5c59b6ca6d Merge pull request #158 from benquike/main
Fix HCI_PIN_Code_Reply_Command
2023-03-27 17:37:54 -07:00
Hui Peng
dcd66743f6 Use delegate.get_string to get pin code 2023-03-27 17:08:26 -07:00
Hui Peng
423a5a95d8 add get_string API in PairingDelegate 2023-03-27 17:02:12 -07:00
Lucas Abel
6f1f185642 Merge pull request #155 from akuker/main
Fix typo in console header
2023-03-27 16:16:27 -07:00
Lucas Abel
8e881fdb18 smp: fix PairingDelegate.get_number return type
This function can return `None` to indicate a negative reply,
update the type hint accordingly.
2023-03-27 22:51:23 +00:00
Lucas Abel
4907022398 Merge pull request #157 from yuyangh/main
Add connection into event emit
2023-03-27 14:38:41 -07:00
Lucas Abel
e93f71c035 Add missing Connection import 2023-03-27 14:27:48 -07:00
Alan Rosenthal
94ff80563b Merge pull request #160 from AlanRosenthal/alan/types
Add some missing types to apps/console.py, bumble/gatt_client.py
2023-03-27 15:00:03 -04:00
Lucas Abel
552deab8a7 Add Connection type
Co-authored-by: Alan Rosenthal <1288897+AlanRosenthal@users.noreply.github.com>
2023-03-27 11:53:34 -07:00
Lucas Abel
a72beb1b06 Merge pull request #144 from zxzxwu/classic_link
Add Classic Bluetooth link support
2023-03-27 11:41:42 -07:00
Lucas Abel
7e62d4a81a Merge pull request #150 from zxzxwu/roleswitch
Support BR/EDR role switch & change events
2023-03-27 11:41:29 -07:00
Alan Rosenthal
a50181e6b8 Add some missing types to apps/console.py, bumble/gatt_client.py
Added via code inspection (not via a tool like pytype)
2023-03-25 16:12:38 +00:00
Josh Wu
9e1358536b Add switch_role 2023-03-25 15:17:50 +08:00
Josh Wu
21d8a0d577 Add Classic Local Link support
Currently supported features:
* Connect
* Accept
* Switch Role
* Disconnect
* ACL data transmittion
2023-03-25 15:11:59 +08:00
Hui Peng
a8e61673d0 Fix HCI_PIN_Code_Reply_Command in Device.on_pin_code_request 2023-03-25 03:48:56 +00:00
Hui Peng
bd25cf27df Fix a misconfig of HCI_PIN_Code_Reply_Command
The pin_code field is of fixed length of 16 bytes
2023-03-25 03:47:07 +00:00
Alan Rosenthal
fdf2da7023 Merge pull request #159 from AlanRosenthal/alan/permissions
Fix typo when parsing device-config's gatt server
2023-03-24 19:33:05 -04:00
Alan Rosenthal
dfb6734324 Fix typo when parsing device-config's gatt server
* 'permission' instead of 'permissions'
* Also added a more user friendly error message when Attribute.string_to_permissions fails
```
TypeError: Attribute::permissions error:
Expected a string containing any of the keys, seperated by commas: READABLE,WRITEABLE,READ_REQUIRES_ENCRYPTION,WRITE_REQUIRES_ENCRYPTION,READ_REQUIRES_AUTHENTICATION,WRITE_REQUIRES_AUTHENTICATION,READ_REQUIRES_AUTHORIZATION,WRITE_REQUIRES_AUTHORIZATION
Got: 1
```
```
Exception: Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'
```
2023-03-24 16:11:18 -04:00
Yuyang Huang
51ae6a5969 Add connection into event emit 2023-03-24 11:16:10 -07:00
Josh Wu
4fc13585cc Handle BR/EDR connection roles 2023-03-24 15:13:48 +08:00
Tony Kuker
c5e5397ed8 Fix typo in console header 2023-03-23 21:44:55 +00:00
32 changed files with 1271 additions and 265 deletions

19
LICENSE
View File

@@ -200,3 +200,22 @@
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
Files: bumble/colors.py
Copyright (c) 2012 Giorgos Verigakis <verigak@gmail.com>
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View File

@@ -558,11 +558,13 @@ class GattServer:
# Setup the GATT service
self.speed_tx = Characteristic(
SPEED_TX_UUID,
Characteristic.WRITE,
Characteristic.Properties.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_tx_write),
)
self.speed_rx = Characteristic(SPEED_RX_UUID, Characteristic.NOTIFY, 0)
self.speed_rx = Characteristic(
SPEED_RX_UUID, Characteristic.Properties.NOTIFY, 0
)
speed_service = Service(
SPEED_SERVICE_UUID,

View File

@@ -24,9 +24,12 @@ import logging
import os
import random
import re
import humanize
from typing import Optional, Union
from collections import OrderedDict
import click
from prettytable import PrettyTable
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
@@ -58,6 +61,7 @@ from bumble.device import ConnectionParametersPreferences, Device, Connection, P
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
from bumble.gatt_client import CharacteristicProxy
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
@@ -119,9 +123,12 @@ def parse_phys(phys):
# Console App
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
def __init__(self):
self.known_addresses = set()
self.known_attributes = []
self.known_remote_attributes = []
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'device'
@@ -158,6 +165,8 @@ class ConsoleApp:
'device': None,
'local-services': None,
'remote-services': None,
'local-values': None,
'remote-values': None,
},
'filter': {
'address': None,
@@ -168,10 +177,11 @@ class ConsoleApp:
'disconnect': None,
'discover': {'services': None, 'attributes': None},
'request-mtu': None,
'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes),
'read': LiveCompleter(self.known_remote_attributes),
'write': LiveCompleter(self.known_remote_attributes),
'local-write': LiveCompleter(self.known_local_attributes),
'subscribe': LiveCompleter(self.known_remote_attributes),
'unsubscribe': LiveCompleter(self.known_remote_attributes),
'set-phy': {'1m': None, '2m': None, 'coded': None},
'set-default-phy': None,
'quit': None,
@@ -203,6 +213,8 @@ class ConsoleApp:
self.log_text = FormattedTextControl(
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
)
self.local_values_text = FormattedTextControl()
self.remote_values_text = FormattedTextControl()
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = []
@@ -218,9 +230,17 @@ class ConsoleApp:
filter=Condition(lambda: self.top_tab == 'local-services'),
),
ConditionalContainer(
Frame(Window(self.remote_services_text), title='Remove Services'),
Frame(Window(self.local_values_text), title='Local Values'),
filter=Condition(lambda: self.top_tab == 'local-values'),
),
ConditionalContainer(
Frame(Window(self.remote_services_text), title='Remote Services'),
filter=Condition(lambda: self.top_tab == 'remote-services'),
),
ConditionalContainer(
Frame(Window(self.remote_values_text), title='Remote Values'),
filter=Condition(lambda: self.top_tab == 'remote-values'),
),
ConditionalContainer(
Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log'),
@@ -362,17 +382,19 @@ class ConsoleApp:
def show_remote_services(self, services):
lines = []
del self.known_attributes[:]
del self.known_remote_attributes[:]
for service in services:
lines.append(("ansicyan", f"{service}\n"))
for characteristic in service.characteristics:
lines.append(('ansimagenta', f' {characteristic} + \n'))
self.known_attributes.append(
self.known_remote_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
)
self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
self.known_attributes.append(f'#{characteristic.handle:X}')
self.known_remote_attributes.append(
f'*.{characteristic.uuid.to_hex_str()}'
)
self.known_remote_attributes.append(f'#{characteristic.handle:X}')
for descriptor in characteristic.descriptors:
lines.append(("ansigreen", f" {descriptor}\n"))
@@ -381,12 +403,31 @@ class ConsoleApp:
def show_local_services(self, attributes):
lines = []
del self.known_local_attributes[:]
for attribute in attributes:
if isinstance(attribute, Service):
# Save the most recent service for use later
service = attribute
lines.append(("ansicyan", f"{attribute}\n"))
elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
elif isinstance(attribute, Characteristic):
# CharacteristicDeclaration includes all info from Characteristic
# no need to print it twice
continue
elif isinstance(attribute, CharacteristicDeclaration):
# Save the most recent characteristic declaration for use later
characteristic_declaration = attribute
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
)
self.known_local_attributes.append(
f'#{attribute.characteristic.handle:X}'
)
lines.append(("ansimagenta", f" {attribute}\n"))
elif isinstance(attribute, Descriptor):
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
)
self.known_local_attributes.append(f'#{attribute.handle:X}')
lines.append(("ansigreen", f" {attribute}\n"))
else:
lines.append(("ansiyellow", f"{attribute}\n"))
@@ -490,7 +531,9 @@ class ConsoleApp:
self.show_attributes(attributes)
def find_characteristic(self, param):
def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
if not self.connected_peer:
return None
parts = param.split('.')
if len(parts) == 2:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
@@ -510,6 +553,38 @@ class ConsoleApp:
return None
def find_local_attribute(
self, param
) -> Optional[Union[Characteristic, Descriptor]]:
parts = param.split('.')
if len(parts) == 3:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
descriptor_uuid = UUID(parts[2])
return self.device.gatt_server.get_descriptor_attribute(
service_uuid, characteristic_uuid, descriptor_uuid
)
if len(parts) == 2:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
characteristic_attributes = (
self.device.gatt_server.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
)
if characteristic_attributes:
return characteristic_attributes[1]
return None
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
attribute = self.device.gatt_server.get_attribute(attribute_handle)
if isinstance(attribute, (Characteristic, Descriptor)):
return attribute
return None
return None
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
@@ -668,10 +743,109 @@ class ConsoleApp:
'device',
'local-services',
'remote-services',
'local-values',
'remote-values',
}:
self.top_tab = params[0]
self.ui.invalidate()
while self.top_tab == 'local-values':
await self.do_show_local_values()
await asyncio.sleep(1)
while self.top_tab == 'remote-values':
await self.do_show_remote_values()
await asyncio.sleep(1)
async def do_show_local_values(self):
prettytable = PrettyTable()
field_names = ["Service", "Characteristic", "Descriptor"]
# if there's no connections, add a column just for value
if not self.device.connections:
field_names.append("Value")
# if there are connections, add a column for each connection's value
for connection in self.device.connections.values():
field_names.append(f"Connection {connection.handle}")
for attribute in self.device.gatt_server.attributes:
if isinstance(attribute, Characteristic):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
elif isinstance(attribute, Descriptor):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
characteristic = self.device.gatt_server.get_attribute_group(
attribute.handle, Characteristic
)
if not characteristic:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
# TODO: future optimization: convert CCCD value to human readable string
prettytable.add_row(
[service.uuid, characteristic.uuid, attribute.type] + values
)
prettytable.field_names = field_names
self.local_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_show_remote_values(self):
prettytable = PrettyTable(
field_names=[
"Connection",
"Service",
"Characteristic",
"Descriptor",
"Time",
"Value",
]
)
for connection in self.device.connections.values():
for handle, (time, value) in connection.gatt_client.cached_values.items():
row = [connection.handle]
attribute = connection.gatt_client.get_attributes(handle)
if not attribute:
continue
if len(attribute) == 3:
row.extend(
[attribute[0].uuid, attribute[1].uuid, attribute[2].type]
)
elif len(attribute) == 2:
row.extend([attribute[0].uuid, attribute[1].uuid, ""])
elif len(attribute) == 1:
row.extend([attribute[0].uuid, "", ""])
else:
continue
row.extend([humanize.naturaltime(time), value])
prettytable.add_row(row)
self.remote_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_get_phy(self, _):
if not self.connected_peer:
self.show_error('not connected')
@@ -714,7 +888,7 @@ class ConsoleApp:
self.show_error('not connected')
return
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -739,15 +913,43 @@ class ConsoleApp:
except ValueError:
value = str.encode(params[1]) # must be a string
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
# use write with response if supported
with_response = characteristic.properties & Characteristic.WRITE
with_response = characteristic.properties & Characteristic.Properties.WRITE
await characteristic.write_value(value, with_response=with_response)
async def do_local_write(self, params):
if len(params) != 2:
self.show_error(
'invalid syntax', 'expected local-write <attribute> <value>'
)
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else:
try:
value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer
except ValueError:
value = str.encode(params[1]) # must be a string
attribute = self.find_local_attribute(params[0])
if not attribute:
self.show_error('invalid syntax', 'unable to find attribute')
return
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
if attribute.has_properties(Characteristic.NOTIFY):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties(Characteristic.INDICATE):
await self.device.gatt_server.indicate_subscribers(attribute)
async def do_subscribe(self, params):
if not self.connected_peer:
self.show_error('not connected')
@@ -757,7 +959,7 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -777,7 +979,7 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return

View File

@@ -230,13 +230,13 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
)
self.tx_characteristic = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.NOTIFY,
Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
)
self.tx_characteristic.on('subscription', self.on_tx_subscription)
self.psm_characteristic = Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([psm, 0]),
)
@@ -339,8 +339,7 @@ async def run(
# Create a UDP to TX bridge (receive from TX, send to UDP)
bridge.tx_socket, _ = await loop.create_datagram_endpoint(
# pylint: disable-next=unnecessary-lambda
lambda: asyncio.DatagramProtocol(),
asyncio.DatagramProtocol,
remote_addr=(send_host, send_port),
)

View File

@@ -302,7 +302,8 @@ async def pair(
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=read_with_error, write=write_with_error

View File

@@ -28,8 +28,8 @@ import struct
from pyee import EventEmitter
from typing import Dict, Type, TYPE_CHECKING
from bumble.core import UUID, name_or_number, get_dict_key_by_value
from bumble.hci import HCI_Object, key_with_value
from bumble.core import UUID, name_or_number, get_dict_key_by_value, ProtocolError
from bumble.hci import HCI_Object, key_with_value, HCI_Constant
from bumble.colors import color
if TYPE_CHECKING:
@@ -185,13 +185,18 @@ UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731
# -----------------------------------------------------------------------------
# Exceptions
# -----------------------------------------------------------------------------
class ATT_Error(Exception):
def __init__(self, error_code, att_handle=0x0000):
self.error_code = error_code
class ATT_Error(ProtocolError):
def __init__(self, error_code, att_handle=0x0000, message=''):
super().__init__(
error_code,
error_namespace='att',
error_name=ATT_PDU.error_name(error_code),
)
self.att_handle = att_handle
self.message = message
def __str__(self):
return f'ATT_Error({ATT_PDU.error_name(self.error_code)})'
return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
# -----------------------------------------------------------------------------
@@ -739,11 +744,16 @@ class Attribute(EventEmitter):
@staticmethod
def string_to_permissions(permissions_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Attribute.PERMISSION_NAMES, y),
permissions_str.split(","),
0,
)
try:
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Attribute.PERMISSION_NAMES, y),
permissions_str.split(","),
0,
)
except TypeError as exc:
raise TypeError(
f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
) from exc
def __init__(self, attribute_type, permissions, value=b''):
EventEmitter.__init__(self)

View File

@@ -21,7 +21,12 @@ import itertools
import random
import struct
from bumble.colors import color
from bumble.core import BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE
from bumble.core import (
BT_CENTRAL_ROLE,
BT_PERIPHERAL_ROLE,
BT_LE_TRANSPORT,
BT_BR_EDR_TRANSPORT,
)
from bumble.hci import (
HCI_ACL_DATA_PACKET,
@@ -29,17 +34,21 @@ from bumble.hci import (
HCI_COMMAND_PACKET,
HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTION_TIMEOUT_ERROR,
HCI_CONTROLLER_BUSY_ERROR,
HCI_EVENT_PACKET,
HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR,
HCI_LE_1M_PHY,
HCI_SUCCESS,
HCI_UNKNOWN_HCI_COMMAND_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
HCI_VERSION_BLUETOOTH_CORE_5_0,
Address,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
HCI_Connection_Request_Event,
HCI_Disconnection_Complete_Event,
HCI_Encryption_Change_Event,
HCI_LE_Advertising_Report_Event,
@@ -47,7 +56,9 @@ from bumble.hci import (
HCI_LE_Read_Remote_Features_Complete_Event,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_Role_Change_Event,
)
from typing import Optional, Union, Dict
# -----------------------------------------------------------------------------
@@ -65,13 +76,14 @@ class DataObject:
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, controller, handle, role, peer_address, link):
def __init__(self, controller, handle, role, peer_address, link, transport):
self.controller = controller
self.handle = handle
self.role = role
self.peer_address = peer_address
self.link = link
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
def on_hci_acl_data_packet(self, packet):
self.assembler.feed_packet(packet)
@@ -82,23 +94,33 @@ class Connection:
def on_acl_pdu(self, data):
if self.link:
self.link.send_acl_data(
self.controller.random_address, self.peer_address, data
self.controller, self.peer_address, self.transport, data
)
# -----------------------------------------------------------------------------
class Controller:
def __init__(self, name, host_source=None, host_sink=None, link=None):
def __init__(
self,
name,
host_source=None,
host_sink=None,
link=None,
public_address: Optional[Union[bytes, str, Address]] = None,
):
self.name = name
self.hci_sink = None
self.link = link
self.central_connections = (
{}
) # Connections where this controller is the central
self.peripheral_connections = (
{}
) # Connections where this controller is the peripheral
self.central_connections: Dict[
Address, Connection
] = {} # Connections where this controller is the central
self.peripheral_connections: Dict[
Address, Connection
] = {} # Connections where this controller is the peripheral
self.classic_connections: Dict[
Address, Connection
] = {} # Connections in BR/EDR
self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.hci_revision = 0
@@ -148,7 +170,14 @@ class Controller:
self.advertising_timer_handle = None
self._random_address = Address('00:00:00:00:00:00')
self._public_address = None
if isinstance(public_address, Address):
self._public_address = public_address
elif public_address is not None:
self._public_address = Address(
public_address, Address.PUBLIC_DEVICE_ADDRESS
)
else:
self._public_address = Address('00:00:00:00:00:00')
# Set the source and sink interfaces
if host_source:
@@ -271,7 +300,9 @@ class Controller:
handle = 0
max_handle = 0
for connection in itertools.chain(
self.central_connections.values(), self.peripheral_connections.values()
self.central_connections.values(),
self.peripheral_connections.values(),
self.classic_connections.values(),
):
max_handle = max(max_handle, connection.handle)
if connection.handle == handle:
@@ -279,14 +310,19 @@ class Controller:
handle = max_handle + 1
return handle
def find_connection_by_address(self, address):
def find_le_connection_by_address(self, address):
return self.central_connections.get(address) or self.peripheral_connections.get(
address
)
def find_classic_connection_by_address(self, address):
return self.classic_connections.get(address)
def find_connection_by_handle(self, handle):
for connection in itertools.chain(
self.central_connections.values(), self.peripheral_connections.values()
self.central_connections.values(),
self.peripheral_connections.values(),
self.classic_connections.values(),
):
if connection.handle == handle:
return connection
@@ -298,6 +334,12 @@ class Controller:
return connection
return None
def find_classic_connection_by_handle(self, handle):
for connection in self.classic_connections.values():
if connection.handle == handle:
return connection
return None
def on_link_central_connected(self, central_address):
'''
Called when an incoming connection occurs from a central on the link
@@ -310,7 +352,12 @@ class Controller:
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
self, connection_handle, BT_PERIPHERAL_ROLE, peer_address, self.link
self,
connection_handle,
BT_PERIPHERAL_ROLE,
peer_address,
self.link,
BT_LE_TRANSPORT,
)
self.peripheral_connections[peer_address] = connection
logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
@@ -364,7 +411,12 @@ class Controller:
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
self, connection_handle, BT_CENTRAL_ROLE, peer_address, self.link
self,
connection_handle,
BT_CENTRAL_ROLE,
peer_address,
self.link,
BT_LE_TRANSPORT,
)
self.central_connections[peer_address] = connection
logger.debug(
@@ -432,16 +484,19 @@ class Controller:
def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk):
# For now, just setup the encryption without asking the host
if connection := self.find_connection_by_address(peer_address):
if connection := self.find_le_connection_by_address(peer_address):
self.send_hci_packet(
HCI_Encryption_Change_Event(
status=0, connection_handle=connection.handle, encryption_enabled=1
)
)
def on_link_acl_data(self, sender_address, data):
def on_link_acl_data(self, sender_address, transport, data):
# Look for the connection to which this data belongs
connection = self.find_connection_by_address(sender_address)
if transport == BT_LE_TRANSPORT:
connection = self.find_le_connection_by_address(sender_address)
else:
connection = self.find_classic_connection_by_address(sender_address)
if connection is None:
logger.warning(f'!!! no connection for {sender_address}')
return
@@ -478,6 +533,87 @@ class Controller:
)
self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
############################################################
# Classic link connections
############################################################
def on_classic_connection_request(self, peer_address, link_type):
self.send_hci_packet(
HCI_Connection_Request_Event(
bd_addr=peer_address,
class_of_device=0,
link_type=link_type,
)
)
def on_classic_connection_complete(self, peer_address, status):
if status == HCI_SUCCESS:
# Allocate (or reuse) a connection handle
peer_address = peer_address
connection = self.classic_connections.get(peer_address)
if connection is None:
connection_handle = self.allocate_connection_handle()
connection = Connection(
controller=self,
handle=connection_handle,
# Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
role=BT_CENTRAL_ROLE,
peer_address=peer_address,
link=self.link,
transport=BT_BR_EDR_TRANSPORT,
)
self.classic_connections[peer_address] = connection
logger.debug(
f'New CLASSIC connection handle: 0x{connection_handle:04X}'
)
else:
connection_handle = connection.handle
self.send_hci_packet(
HCI_Connection_Complete_Event(
status=status,
connection_handle=connection_handle,
bd_addr=peer_address,
encryption_enabled=False,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
)
else:
connection = None
self.send_hci_packet(
HCI_Connection_Complete_Event(
status=status,
connection_handle=0,
bd_addr=peer_address,
encryption_enabled=False,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
)
def on_classic_disconnected(self, peer_address, reason):
# Send a disconnection complete event
if connection := self.classic_connections.get(peer_address):
self.send_hci_packet(
HCI_Disconnection_Complete_Event(
status=HCI_SUCCESS,
connection_handle=connection.handle,
reason=reason,
)
)
# Remove the connection
del self.classic_connections[peer_address]
else:
logger.warning(f'!!! No classic connection found for {peer_address}')
def on_classic_role_change(self, peer_address, new_role):
self.send_hci_packet(
HCI_Role_Change_Event(
status=HCI_SUCCESS,
bd_addr=peer_address,
new_role=new_role,
)
)
############################################################
# Advertising support
############################################################
@@ -521,7 +657,31 @@ class Controller:
See Bluetooth spec Vol 2, Part E - 7.1.5 Create Connection command
'''
# TODO: classic mode not supported yet
if self.link is None:
return
logger.debug(f'Connection request to {command.bd_addr}')
# Check that we don't already have a pending connection
if self.link.get_pending_connection():
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_CONTROLLER_BUSY_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.link.classic_connect(self, command.bd_addr)
# Say that the connection is pending
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_COMMAND_STATUS_PENDING,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
def on_hci_disconnect_command(self, command):
'''
@@ -537,19 +697,57 @@ class Controller:
)
# Notify the link of the disconnection
if not (
connection := self.find_central_connection_by_handle(
command.connection_handle
)
):
logger.warning('connection not found')
return
handle = command.connection_handle
if connection := self.find_central_connection_by_handle(handle):
if self.link:
self.link.disconnect(
self.random_address, connection.peer_address, command
)
else:
# Remove the connection
del self.central_connections[connection.peer_address]
elif connection := self.find_classic_connection_by_handle(handle):
if self.link:
self.link.classic_disconnect(
self,
connection.peer_address,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
)
else:
# Remove the connection
del self.classic_connections[connection.peer_address]
if self.link:
self.link.disconnect(self.random_address, connection.peer_address, command)
else:
# Remove the connection
del self.central_connections[connection.peer_address]
def on_hci_accept_connection_request_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.1.8 Accept Connection Request command
'''
if self.link is None:
return
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.link.classic_accept_connection(self, command.bd_addr, command.role)
def on_hci_switch_role_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.2.8 Switch Role command
'''
if self.link is None:
return
self.send_hci_packet(
HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.link.classic_switch_role(self, command.bd_addr, command.role)
def on_hci_set_event_mask_command(self, command):
'''
@@ -627,6 +825,12 @@ class Controller:
ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR
return bytes([ret])
def on_hci_write_extended_inquiry_response_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command
'''
return bytes([HCI_SUCCESS])
def on_hci_write_simple_pairing_mode_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command

View File

@@ -101,6 +101,7 @@ from .hci import (
HCI_Read_RSSI_Command,
HCI_Reject_Connection_Request_Command,
HCI_Remote_Name_Request_Command,
HCI_Switch_Role_Command,
HCI_Set_Connection_Encryption_Command,
HCI_StatusError,
HCI_User_Confirmation_Request_Negative_Reply_Command,
@@ -528,6 +529,7 @@ class Connection(CompositeEventEmitter):
authenticated: bool
sc: bool
link_key_type: int
gatt_client: gatt_client.Client
@composite_listener
class Listener:
@@ -621,7 +623,9 @@ class Connection(CompositeEventEmitter):
assert self.transport == BT_BR_EDR_TRANSPORT
self.handle = handle
self.peer_resolvable_address = peer_resolvable_address
self.role = role
# Quirk: role might be known before complete
if self.role is None:
self.role = role
self.parameters = parameters
@property
@@ -669,6 +673,9 @@ class Connection(CompositeEventEmitter):
async def encrypt(self, enable: bool = True) -> None:
return await self.device.encrypt(self, enable)
async def switch_role(self, role: int) -> None:
return await self.device.switch_role(self, role)
async def sustain(self, timeout=None):
"""Idles the current task waiting for a disconnect or timeout"""
@@ -872,7 +879,7 @@ device_host_event_handlers: list[str] = []
# -----------------------------------------------------------------------------
class Device(CompositeEventEmitter):
# incomplete list of fields.
# Incomplete list of fields.
random_address: Address
public_address: Address
classic_enabled: bool
@@ -887,6 +894,7 @@ class Device(CompositeEventEmitter):
Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
@composite_listener
class Listener:
@@ -974,9 +982,10 @@ class Device(CompositeEventEmitter):
self.connect_own_address_type = None
# Use the initial config or a default
config = config or DeviceConfiguration()
self.config = config
self.public_address = Address('00:00:00:00:00:00')
if config is None:
config = DeviceConfiguration()
self.name = config.name
self.random_address = config.address
self.class_of_device = config.class_of_device
@@ -984,7 +993,7 @@ class Device(CompositeEventEmitter):
self.advertising_data = config.advertising_data
self.advertising_interval_min = config.advertising_interval_min
self.advertising_interval_max = config.advertising_interval_max
self.keystore = KeyStore.create_for_device(config)
self.keystore = None
self.irk = config.irk
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
@@ -1000,14 +1009,21 @@ class Device(CompositeEventEmitter):
for characteristic in service.get("characteristics", []):
descriptors = []
for descriptor in characteristic.get("descriptors", []):
# Leave this check until 5/25/2023
if descriptor.get("permission", False):
raise Exception(
"Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'"
)
new_descriptor = Descriptor(
attribute_type=descriptor["descriptor_type"],
permissions=descriptor["permission"],
permissions=descriptor["permissions"],
)
descriptors.append(new_descriptor)
new_characteristic = Characteristic(
uuid=characteristic["uuid"],
properties=characteristic["properties"],
properties=Characteristic.Properties.from_string(
characteristic["properties"]
),
permissions=characteristic["permissions"],
descriptors=descriptors,
)
@@ -1154,6 +1170,7 @@ class Device(CompositeEventEmitter):
# Reset the controller
await self.host.reset()
# Try to get the public address from the controller
response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg]
if response.return_parameters.status == HCI_SUCCESS:
logger.debug(
@@ -1161,6 +1178,11 @@ class Device(CompositeEventEmitter):
)
self.public_address = response.return_parameters.bd_addr
# Instantiate the Key Store (we do this here rather than at __init__ time
# because some Key Store implementations use the public address as a namespace)
if self.keystore is None:
self.keystore = KeyStore.create_for_device(self)
if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
await self.send_command(
HCI_Write_LE_Host_Support_Command(
@@ -1589,7 +1611,7 @@ class Device(CompositeEventEmitter):
pending connection.
connection_parameters_preferences: (BLE only, ignored for BR/EDR)
* None: use all PHYs with default parameters
* None: use the 1M PHY with default parameters
* map: each entry has a PHY as key and a ConnectionParametersPreferences
object as value
@@ -1658,9 +1680,7 @@ class Device(CompositeEventEmitter):
if connection_parameters_preferences is None:
if connection_parameters_preferences is None:
connection_parameters_preferences = {
HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default,
HCI_LE_1M_PHY: ConnectionParametersPreferences.default
}
self.connect_own_address_type = own_address_type
@@ -2313,6 +2333,34 @@ class Device(CompositeEventEmitter):
'connection_encryption_failure', on_encryption_failure
)
# [Classic only]
async def switch_role(self, connection: Connection, role: int):
pending_role_change = asyncio.get_running_loop().create_future()
def on_role_change(new_role):
pending_role_change.set_result(new_role)
def on_role_change_failure(error_code):
pending_role_change.set_exception(HCI_Error(error_code))
connection.on('role_change', on_role_change)
connection.on('role_change_failure', on_role_change_failure)
try:
result = await self.send_command(
HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) # type: ignore[call-arg]
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_Switch_Role_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
await connection.abort_on('disconnection', pending_role_change)
finally:
connection.remove_listener('role_change', on_role_change)
connection.remove_listener('role_change_failure', on_role_change_failure)
# [Classic only]
async def request_remote_name(self, remote: Union[Address, Connection]) -> str:
# Set up event handlers
@@ -2804,19 +2852,22 @@ class Device(CompositeEventEmitter):
async def get_pin_code():
pin_code = await connection.abort_on(
'disconnection', pairing_config.delegate.get_number()
'disconnection', pairing_config.delegate.get_string(16)
)
if pin_code is not None:
pin_code = bytes(str(pin_code).zfill(6))
pin_code = bytes(pin_code, encoding='utf-8')
pin_code_len = len(pin_code)
assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes"
await self.host.send_command(
HCI_PIN_Code_Request_Reply_Command(
bd_addr=connection.peer_address,
pin_code_length=len(pin_code),
pin_code_length=pin_code_len,
pin_code=pin_code,
)
)
else:
logger.debug("delegate.get_string() returned None")
await self.host.send_command(
HCI_PIN_Code_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
@@ -2974,6 +3025,21 @@ class Device(CompositeEventEmitter):
)
connection.emit('connection_data_length_change')
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_role_change(self, connection, new_role):
connection.role = new_role
connection.emit('role_change', new_role)
# [Classic only]
@host_event_handler
@try_with_connection_from_address
def on_role_change_failure(self, connection, address, error):
if connection:
connection.emit('role_change_failure', error)
self.emit('role_change_failure', address, error)
@with_connection_from_handle
def on_pairing_start(self, connection):
connection.emit('pairing_start')

View File

@@ -41,14 +41,14 @@ class GenericAccessService(Service):
def __init__(self, device_name, appearance=(0, 0)):
device_name_characteristic = Characteristic(
GATT_DEVICE_NAME_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
device_name.encode('utf-8')[:248],
)
appearance_characteristic = Characteristic(
GATT_APPEARANCE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', (appearance[0] << 6) | appearance[1]),
)

View File

@@ -28,7 +28,7 @@ import enum
import functools
import logging
import struct
from typing import Optional, Sequence
from typing import Optional, Sequence, List
from .colors import color
from .core import UUID, get_dict_key_by_value
@@ -259,63 +259,68 @@ class Characteristic(Attribute):
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
'''
# Property flags
BROADCAST = 0x01
READ = 0x02
WRITE_WITHOUT_RESPONSE = 0x04
WRITE = 0x08
NOTIFY = 0x10
INDICATE = 0x20
AUTHENTICATED_SIGNED_WRITES = 0x40
EXTENDED_PROPERTIES = 0x80
uuid: UUID
properties: Characteristic.Properties
PROPERTY_NAMES = {
BROADCAST: 'BROADCAST',
READ: 'READ',
WRITE_WITHOUT_RESPONSE: 'WRITE_WITHOUT_RESPONSE',
WRITE: 'WRITE',
NOTIFY: 'NOTIFY',
INDICATE: 'INDICATE',
AUTHENTICATED_SIGNED_WRITES: 'AUTHENTICATED_SIGNED_WRITES',
EXTENDED_PROPERTIES: 'EXTENDED_PROPERTIES',
}
class Properties(enum.IntFlag):
"""Property flags"""
@staticmethod
def property_name(property_int):
return Characteristic.PROPERTY_NAMES.get(property_int, '')
BROADCAST = 0x01
READ = 0x02
WRITE_WITHOUT_RESPONSE = 0x04
WRITE = 0x08
NOTIFY = 0x10
INDICATE = 0x20
AUTHENTICATED_SIGNED_WRITES = 0x40
EXTENDED_PROPERTIES = 0x80
@staticmethod
def properties_as_string(properties):
return ','.join(
[
Characteristic.property_name(p)
for p in Characteristic.PROPERTY_NAMES
if properties & p
]
)
@staticmethod
def from_string(properties_str: str) -> Characteristic.Properties:
property_names: List[str] = []
for property in Characteristic.Properties:
if property.name is None:
raise TypeError()
property_names.append(property.name)
@staticmethod
def string_to_properties(properties_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
properties_str.split(","),
0,
)
def string_to_property(property_string) -> Characteristic.Properties:
for property in zip(Characteristic.Properties, property_names):
if property_string == property[1]:
return property[0]
raise TypeError(f"Unable to convert {property_string} to Property")
try:
return functools.reduce(
lambda x, y: x | string_to_property(y),
properties_str.split(","),
Characteristic.Properties(0),
)
except TypeError:
raise TypeError(
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by commas: {','.join(property_names)}\nGot: {properties_str}"
)
# For backwards compatibility these are defined here
# For new code, please use Characteristic.Properties.X
BROADCAST = Properties.BROADCAST
READ = Properties.READ
WRITE_WITHOUT_RESPONSE = Properties.WRITE_WITHOUT_RESPONSE
WRITE = Properties.WRITE
NOTIFY = Properties.NOTIFY
INDICATE = Properties.INDICATE
AUTHENTICATED_SIGNED_WRITES = Properties.AUTHENTICATED_SIGNED_WRITES
EXTENDED_PROPERTIES = Properties.EXTENDED_PROPERTIES
def __init__(
self,
uuid,
properties,
properties: Characteristic.Properties,
permissions,
value=b'',
descriptors: Sequence[Descriptor] = (),
):
super().__init__(uuid, permissions, value)
self.uuid = self.type
if isinstance(properties, str):
self.properties = Characteristic.string_to_properties(properties)
else:
self.properties = properties
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
@@ -325,12 +330,15 @@ class Characteristic(Attribute):
return None
def has_properties(self, properties: Characteristic.Properties) -> bool:
return self.properties & properties == properties
def __str__(self):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
f'end=0x{self.end_group_handle:04X}, '
f'uuid={self.uuid}, '
f'properties={Characteristic.properties_as_string(self.properties)})'
f'{self.properties!s})'
)
@@ -340,6 +348,8 @@ class CharacteristicDeclaration(Attribute):
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
'''
characteristic: Characteristic
def __init__(self, characteristic, value_handle):
declaration_bytes = (
struct.pack('<BH', characteristic.properties, value_handle)
@@ -355,8 +365,8 @@ class CharacteristicDeclaration(Attribute):
return (
f'CharacteristicDeclaration(handle=0x{self.handle:04X}, '
f'value_handle=0x{self.value_handle:04X}, '
f'uuid={self.characteristic.uuid}, properties='
f'{Characteristic.properties_as_string(self.characteristic.properties)})'
f'uuid={self.characteristic.uuid}, '
f'{self.characteristic.properties!s})'
)

View File

@@ -23,9 +23,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import struct
from datetime import datetime
from typing import List, Optional, Dict, Tuple, Callable, Union, Any
from pyee import EventEmitter
@@ -50,6 +53,7 @@ from .att import (
ATT_Read_Request,
ATT_Write_Command,
ATT_Write_Request,
ATT_Error,
)
from . import core
from .core import UUID, InvalidStateError, ProtocolError
@@ -73,6 +77,8 @@ logger = logging.getLogger(__name__)
# Proxies
# -----------------------------------------------------------------------------
class AttributeProxy(EventEmitter):
client: Client
def __init__(self, client, handle, end_group_handle, attribute_type):
EventEmitter.__init__(self)
self.client = client
@@ -101,6 +107,9 @@ class AttributeProxy(EventEmitter):
class ServiceProxy(AttributeProxy):
uuid: UUID
characteristics: List[CharacteristicProxy]
@staticmethod
def from_client(service_class, client, service_uuid):
# The service and its characteristics are considered to have already been
@@ -130,10 +139,21 @@ class ServiceProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy):
def __init__(self, client, handle, end_group_handle, uuid, properties):
properties: Characteristic.Properties
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable]
def __init__(
self,
client,
handle,
end_group_handle,
uuid,
properties: int,
):
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = properties
self.properties = Characteristic.Properties(properties)
self.descriptors = []
self.descriptors_discovered = False
self.subscribers = {} # Map from subscriber to proxy subscriber
@@ -148,7 +168,9 @@ class CharacteristicProxy(AttributeProxy):
async def discover_descriptors(self):
return await self.client.discover_descriptors(self)
async def subscribe(self, subscriber=None, prefer_notify=True):
async def subscribe(
self, subscriber: Optional[Callable] = None, prefer_notify=True
):
if subscriber is not None:
if subscriber in self.subscribers:
# We already have a proxy subscriber
@@ -175,7 +197,7 @@ class CharacteristicProxy(AttributeProxy):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
f'uuid={self.uuid}, '
f'properties={Characteristic.properties_as_string(self.properties)})'
f'{self.properties!s})'
)
@@ -201,6 +223,9 @@ class ProfileServiceProxy:
# GATT Client
# -----------------------------------------------------------------------------
class Client:
services: List[ServiceProxy]
cached_values: Dict[int, Tuple[datetime, bytes]]
def __init__(self, connection):
self.connection = connection
self.mtu_exchange_done = False
@@ -212,6 +237,7 @@ class Client:
) # Notification subscribers, by attribute handle
self.indication_subscribers = {} # Indication subscribers, by attribute handle
self.services = []
self.cached_values = {}
def send_gatt_pdu(self, pdu):
self.connection.send_l2cap_pdu(ATT_CID, pdu)
@@ -296,6 +322,35 @@ class Client:
if c.uuid == uuid
]
def get_attribute_grouping(
self, attribute_handle: int
) -> Optional[
Union[
ServiceProxy,
Tuple[ServiceProxy, CharacteristicProxy],
Tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy],
]
]:
"""
Get the attribute(s) associated with an attribute handle
"""
for service in self.services:
if service.handle == attribute_handle:
return service
if service.handle <= attribute_handle <= service.end_group_handle:
for characteristic in service.characteristics:
if characteristic.handle == attribute_handle:
return (service, characteristic)
if (
characteristic.handle
<= attribute_handle
<= characteristic.end_group_handle
):
for descriptor in characteristic.descriptors:
if descriptor.handle == attribute_handle:
return (service, characteristic, descriptor)
return None
def on_service_discovered(self, service):
'''Add a service to the service list if it wasn't already there'''
already_known = False
@@ -306,7 +361,7 @@ class Client:
if not already_known:
self.services.append(service)
async def discover_services(self, uuids=None):
async def discover_services(self, uuids=None) -> List[ServiceProxy]:
'''
See Vol 3, Part G - 4.4.1 Discover All Primary Services
'''
@@ -332,8 +387,10 @@ class Client:
'!!! unexpected error while discovering services: '
f'{HCI_Constant.error_name(response.error_code)}'
)
# TODO raise appropriate exception
return
raise ATT_Error(
error_code=response.error_code,
message='Unexpected error while discovering services',
)
break
for (
@@ -349,7 +406,7 @@ class Client:
logger.warning(
f'bogus handle values: {attribute_handle} {end_group_handle}'
)
return
return []
# Create a service proxy for this service
service = ServiceProxy(
@@ -452,7 +509,9 @@ class Client:
# TODO
return []
async def discover_characteristics(self, uuids, service):
async def discover_characteristics(
self, uuids, service: Optional[ServiceProxy]
) -> List[CharacteristicProxy]:
'''
See Vol 3, Part G - 4.6.1 Discover All Characteristics of a Service and 4.6.2
Discover Characteristics by UUID
@@ -465,12 +524,12 @@ class Client:
services = [service] if service else self.services
# Perform characteristic discovery for each service
discovered_characteristics = []
discovered_characteristics: List[CharacteristicProxy] = []
for service in services:
starting_handle = service.handle
ending_handle = service.end_group_handle
characteristics = []
characteristics: List[CharacteristicProxy] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Read_By_Type_Request(
@@ -491,8 +550,10 @@ class Client:
'!!! unexpected error while discovering characteristics: '
f'{HCI_Constant.error_name(response.error_code)}'
)
# TODO raise appropriate exception
return
raise ATT_Error(
error_code=response.error_code,
message='Unexpected error while discovering characteristics',
)
break
# Stop if for some reason the list was empty
@@ -535,8 +596,11 @@ class Client:
return discovered_characteristics
async def discover_descriptors(
self, characteristic=None, start_handle=None, end_handle=None
):
self,
characteristic: Optional[CharacteristicProxy] = None,
start_handle=None,
end_handle=None,
) -> List[DescriptorProxy]:
'''
See Vol 3, Part G - 4.7.1 Discover All Characteristic Descriptors
'''
@@ -549,7 +613,7 @@ class Client:
else:
return []
descriptors = []
descriptors: List[DescriptorProxy] = []
while starting_handle <= ending_handle:
response = await self.send_request(
ATT_Find_Information_Request(
@@ -656,8 +720,8 @@ class Client:
return
if (
characteristic.properties & Characteristic.NOTIFY
and characteristic.properties & Characteristic.INDICATE
characteristic.properties & Characteristic.Properties.NOTIFY
and characteristic.properties & Characteristic.Properties.INDICATE
):
if prefer_notify:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
@@ -665,10 +729,10 @@ class Client:
else:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
elif characteristic.properties & Characteristic.NOTIFY:
elif characteristic.properties & Characteristic.Properties.NOTIFY:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
elif characteristic.properties & Characteristic.INDICATE:
elif characteristic.properties & Characteristic.Properties.INDICATE:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
else:
@@ -778,6 +842,7 @@ class Client:
offset += len(part)
self.cache_value(attribute_handle, attribute_value)
# Return the value as bytes
return attribute_value
@@ -912,6 +977,8 @@ class Client:
)
if not subscribers:
logger.warning('!!! received notification with no subscriber')
self.cache_value(notification.attribute_handle, notification.attribute_value)
for subscriber in subscribers:
if callable(subscriber):
subscriber(notification.attribute_value)
@@ -923,6 +990,8 @@ class Client:
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
if not subscribers:
logger.warning('!!! received indication with no subscriber')
self.cache_value(indication.attribute_handle, indication.attribute_value)
for subscriber in subscribers:
if callable(subscriber):
subscriber(indication.attribute_value)
@@ -931,3 +1000,9 @@ class Client:
# Confirm that we received the indication
self.send_confirmation(ATT_Handle_Value_Confirmation())
def cache_value(self, attribute_handle: int, value: bytes):
self.cached_values[attribute_handle] = (
datetime.now(),
value,
)

View File

@@ -27,7 +27,7 @@ import asyncio
import logging
from collections import defaultdict
import struct
from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, TypeVar, Type
from pyee import EventEmitter
from .colors import color
@@ -135,6 +135,21 @@ class Server(EventEmitter):
return attribute
return None
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
def get_attribute_group(
self, handle: int, group_type: Type[AttributeGroupType]
) -> Optional[AttributeGroupType]:
return next(
(
attribute
for attribute in self.attributes
if isinstance(attribute, group_type)
and attribute.handle <= handle <= attribute.end_group_handle
),
None,
)
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
return next(
(
@@ -228,7 +243,10 @@ class Server(EventEmitter):
# unless there is one already
if (
characteristic.properties
& (Characteristic.NOTIFY | Characteristic.INDICATE)
& (
Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE
)
and characteristic.get_descriptor(
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
)

View File

@@ -2102,7 +2102,7 @@ class HCI_Link_Key_Request_Negative_Reply_Command(HCI_Command):
fields=[
('bd_addr', Address.parse_address),
('pin_code_length', 1),
('pin_code', '*'),
('pin_code', 16),
],
return_parameters_fields=[
('status', STATUS_SPEC),

View File

@@ -24,7 +24,10 @@ from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from typing import Optional
from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_COMMAND_PACKET,
@@ -141,6 +144,24 @@ class Host(AbortableEventEmitter):
if controller_sink:
self.set_packet_sink(controller_sink)
def find_connection_by_bd_addr(
self,
bd_addr: Address,
transport: Optional[int] = None,
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type
):
continue
if transport is None or connection.transport == transport:
return connection
return None
async def flush(self) -> None:
# Make sure no command is pending
await self.command_semaphore.acquire()
@@ -718,12 +739,17 @@ class Host(AbortableEventEmitter):
f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}'
)
# TODO: lookup the connection and update the role
if connection := self.find_connection_by_bd_addr(
event.bd_addr, BT_BR_EDR_TRANSPORT
):
connection.role = event.new_role
self.emit('role_change', event.bd_addr, event.new_role)
else:
logger.debug(
f'role change for {event.bd_addr} failed: '
f'{HCI_Constant.error_name(event.status)}'
)
self.emit('role_change_failure', event.bd_addr, event.status)
def on_hci_le_data_length_change_event(self, event):
self.emit(

View File

@@ -20,15 +20,19 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import os
import json
from typing import Optional
from typing import TYPE_CHECKING, Optional
from .colors import color
from .hci import Address
if TYPE_CHECKING:
from .device import Device
# -----------------------------------------------------------------------------
# Logging
@@ -173,13 +177,13 @@ class KeyStore:
separator = '\n'
@staticmethod
def create_for_device(device_config):
if device_config.keystore is None:
def create_for_device(device: Device) -> Optional[KeyStore]:
if device.config.keystore is None:
return None
keystore_type = device_config.keystore.split(':', 1)[0]
keystore_type = device.config.keystore.split(':', 1)[0]
if keystore_type == 'JsonKeyStore':
return JsonKeyStore.from_device_config(device_config)
return JsonKeyStore.from_device(device)
return None
@@ -204,7 +208,9 @@ class JsonKeyStore(KeyStore):
self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
)
json_filename = f'{self.namespace}.json'.lower().replace(':', '-')
json_filename = (
f'{self.namespace}.json'.lower().replace(':', '-').replace('/p', '-p')
)
self.filename = os.path.join(self.directory_name, json_filename)
else:
self.filename = filename
@@ -213,9 +219,19 @@ class JsonKeyStore(KeyStore):
logger.debug(f'JSON keystore: {self.filename}')
@staticmethod
def from_device_config(device_config):
params = device_config.keystore.split(':', 1)[1:]
namespace = str(device_config.address)
def from_device(device: Device) -> Optional[JsonKeyStore]:
if not device.config.keystore:
return None
params = device.config.keystore.split(':', 1)[1:]
# Use a namespace based on the device address
if device.public_address not in (Address.ANY, Address.ANY_RANDOM):
namespace = str(device.public_address)
elif device.random_address != Address.ANY_RANDOM:
namespace = str(device.random_address)
else:
namespace = JsonKeyStore.DEFAULT_NAMESPACE
if params:
filename = params[0]
else:

View File

@@ -19,12 +19,15 @@ import logging
import asyncio
from functools import partial
from bumble.core import BT_PERIPHERAL_ROLE, BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT
from bumble.colors import color
from bumble.hci import (
Address,
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
HCI_PAGE_TIMEOUT_ERROR,
HCI_Connection_Complete_Event,
)
# -----------------------------------------------------------------------------
@@ -57,6 +60,11 @@ class LocalLink:
def __init__(self):
self.controllers = set()
self.pending_connection = None
self.pending_classic_connection = None
############################################################
# Common utils
############################################################
def add_controller(self, controller):
logger.debug(f'new controller: {controller}')
@@ -71,22 +79,39 @@ class LocalLink:
return controller
return None
def on_address_changed(self, controller):
pass
def find_classic_controller(self, address):
for controller in self.controllers:
if controller.public_address == address:
return controller
return None
def get_pending_connection(self):
return self.pending_connection
############################################################
# LE handlers
############################################################
def on_address_changed(self, controller):
pass
def send_advertising_data(self, sender_address, data):
# Send the advertising data to all controllers, except the sender
for controller in self.controllers:
if controller.random_address != sender_address:
controller.on_link_advertising_data(sender_address, data)
def send_acl_data(self, sender_address, destination_address, data):
def send_acl_data(self, sender_controller, destination_address, transport, data):
# Send the data to the first controller with a matching address
if controller := self.find_controller(destination_address):
controller.on_link_acl_data(sender_address, data)
if transport == BT_LE_TRANSPORT:
destination_controller = self.find_controller(destination_address)
source_address = sender_controller.random_address
elif transport == BT_BR_EDR_TRANSPORT:
destination_controller = self.find_classic_controller(destination_address)
source_address = sender_controller.public_address
if destination_controller is not None:
destination_controller.on_link_acl_data(source_address, transport, data)
def on_connection_complete(self):
# Check that we expect this call
@@ -163,6 +188,89 @@ class LocalLink:
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_encrypted(central_address, rand, ediv, ltk)
############################################################
# Classic handlers
############################################################
def classic_connect(self, initiator_controller, responder_address):
logger.debug(
f'[Classic] {initiator_controller.public_address} connects to {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
if responder_controller is None:
initiator_controller.on_classic_connection_complete(
responder_address, HCI_PAGE_TIMEOUT_ERROR
)
return
self.pending_classic_connection = (initiator_controller, responder_controller)
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
HCI_Connection_Complete_Event.ACL_LINK_TYPE,
)
def classic_accept_connection(
self, responder_controller, initiator_address, responder_role
):
logger.debug(
f'[Classic] {responder_controller.public_address} accepts to connect {initiator_address}'
)
initiator_controller = self.find_classic_controller(initiator_address)
if initiator_controller is None:
responder_controller.on_classic_connection_complete(
responder_controller.public_address, HCI_PAGE_TIMEOUT_ERROR
)
return
async def task():
if responder_role != BT_PERIPHERAL_ROLE:
initiator_controller.on_classic_role_change(
responder_controller.public_address, int(not (responder_role))
)
initiator_controller.on_classic_connection_complete(
responder_controller.public_address, HCI_SUCCESS
)
asyncio.create_task(task())
responder_controller.on_classic_role_change(
initiator_controller.public_address, responder_role
)
responder_controller.on_classic_connection_complete(
initiator_controller.public_address, HCI_SUCCESS
)
self.pending_classic_connection = None
def classic_disconnect(self, initiator_controller, responder_address, reason):
logger.debug(
f'[Classic] {initiator_controller.public_address} disconnects {responder_address}'
)
responder_controller = self.find_classic_controller(responder_address)
async def task():
initiator_controller.on_classic_disconnected(responder_address, reason)
asyncio.create_task(task())
responder_controller.on_classic_disconnected(
initiator_controller.public_address, reason
)
def classic_switch_role(
self, initiator_controller, responder_address, initiator_new_role
):
responder_controller = self.find_classic_controller(responder_address)
if responder_controller is None:
return
async def task():
initiator_controller.on_classic_role_change(
responder_address, initiator_new_role
)
asyncio.create_task(task())
responder_controller.on_classic_role_change(
initiator_controller.public_address, int(not (initiator_new_role))
)
# -----------------------------------------------------------------------------
class RemoteLink:
@@ -200,6 +308,9 @@ class RemoteLink:
def get_pending_connection(self):
return self.pending_connection
def get_pending_classic_connection(self):
return self.pending_classic_connection
async def wait_until_connected(self):
await self.websocket
@@ -366,7 +477,8 @@ class RemoteLink:
async def send_acl_data_to_relay(self, peer_address, data):
await self.send_targeted_message(peer_address, f'acl:{data.hex()}')
def send_acl_data(self, _, peer_address, data):
def send_acl_data(self, _, peer_address, _transport, data):
# TODO: handle different transport
self.execute(partial(self.send_acl_data_to_relay, peer_address, data))
async def send_connection_request_to_relay(self, peer_address):

View File

@@ -20,7 +20,7 @@ import struct
import logging
from typing import List
from ..core import AdvertisingData
from ..device import Device
from ..device import Device, Connection
from ..gatt import (
GATT_ASHA_SERVICE,
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
@@ -60,12 +60,12 @@ class AshaService(TemplateService):
self.psm = psm # a non-zero psm is mainly for testing purpose
# Handler for volume control
def on_volume_write(_connection, value):
def on_volume_write(connection, value):
logger.info(f'--- VOLUME Write:{value[0]}')
self.emit('volume', value[0])
self.emit('volume', connection, value[0])
# Handler for audio control commands
def on_audio_control_point_write(_connection, value):
def on_audio_control_point_write(connection: Connection, value):
logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == AshaService.OPCODE_START:
@@ -79,6 +79,7 @@ class AshaService(TemplateService):
)
self.emit(
'start',
connection,
{
'codec': value[1],
'audiotype': value[2],
@@ -88,7 +89,7 @@ class AshaService(TemplateService):
)
elif opcode == AshaService.OPCODE_STOP:
logger.info('### STOP')
self.emit('stop')
self.emit('stop', connection)
elif opcode == AshaService.OPCODE_STATUS:
logger.info(f'### STATUS: connected={value[1]}')
@@ -102,7 +103,7 @@ class AshaService(TemplateService):
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
@@ -119,19 +120,20 @@ class AshaService(TemplateService):
self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write),
)
@@ -141,7 +143,7 @@ class AshaService(TemplateService):
def on_data(data):
logging.debug(f'<<< data received:{data}')
self.emit('data', data)
self.emit('data', channel.connection, data)
self.audio_out_data += data
channel.sink = on_data
@@ -150,7 +152,7 @@ class AshaService(TemplateService):
self.psm = self.device.register_l2cap_channel_server(self.psm, on_coc, 8)
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', self.psm),
)

View File

@@ -36,7 +36,7 @@ class BatteryService(TemplateService):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level),
),

View File

@@ -63,7 +63,9 @@ class DeviceInformationService(TemplateService):
# TODO: pnp_id
):
characteristics = [
Characteristic(uuid, Characteristic.READ, Characteristic.READABLE, field)
Characteristic(
uuid, Characteristic.Properties.READ, Characteristic.READABLE, field
)
for (field, uuid) in (
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
(model_number, GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
@@ -79,7 +81,7 @@ class DeviceInformationService(TemplateService):
characteristics.append(
Characteristic(
GATT_SYSTEM_ID_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
self.pack_system_id(*system_id),
)
@@ -89,7 +91,7 @@ class DeviceInformationService(TemplateService):
characteristics.append(
Characteristic(
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
ieee_regulatory_certification_data_list,
)

View File

@@ -152,7 +152,7 @@ class HeartRateService(TemplateService):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.NOTIFY,
Characteristic.Properties.NOTIFY,
0,
CharacteristicValue(read=read_heart_rate_measurement),
),
@@ -164,7 +164,7 @@ class HeartRateService(TemplateService):
if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([int(body_sensor_location)]),
)
@@ -182,7 +182,7 @@ class HeartRateService(TemplateService):
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE,
Characteristic.Properties.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=write_heart_rate_control_point_value),
),

View File

@@ -522,9 +522,19 @@ class PairingDelegate:
async def compare_numbers(self, number: int, digits: int) -> bool:
return True
async def get_number(self) -> int:
async def get_number(self) -> Optional[int]:
'''
Returns an optional number as an answer to a passkey request.
Returning `None` will result in a negative reply.
'''
return 0
async def get_string(self, max_length) -> Optional[str]:
'''
Returns a string whose utf-8 encoding is up to max_length bytes.
'''
return None
# pylint: disable-next=unused-argument
async def display_number(self, number: int, digits: int) -> None:
pass
@@ -635,7 +645,7 @@ class Session:
},
}
def __init__(self, manager, connection, pairing_config):
def __init__(self, manager, connection, pairing_config, is_initiator):
self.manager = manager
self.connection = connection
self.preq = None
@@ -674,7 +684,7 @@ class Session:
self.ctkd_task = None
# Decide if we're the initiator or the responder
self.is_initiator = connection.role == BT_CENTRAL_ROLE
self.is_initiator = is_initiator
self.is_responder = not self.is_initiator
# Listen for connection events
@@ -1670,6 +1680,8 @@ class Manager(EventEmitter):
def on_smp_pdu(self, connection, pdu):
# Look for a session with this connection, and create one if none exists
if not (session := self.sessions.get(connection.handle)):
if connection.role == BT_CENTRAL_ROLE:
logger.warning('Remote starts pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
# Pairing disabled
@@ -1678,7 +1690,7 @@ class Manager(EventEmitter):
SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR),
)
return
session = Session(self, connection, pairing_config)
session = Session(self, connection, pairing_config, is_initiator=False)
self.sessions[connection.handle] = session
# Parse the L2CAP payload into an SMP Command object
@@ -1699,10 +1711,12 @@ class Manager(EventEmitter):
async def pair(self, connection):
# TODO: check if there's already a session for this connection
if connection.role != BT_CENTRAL_ROLE:
logger.warning('Start pairing as Peripheral!')
pairing_config = self.pairing_config_factory(connection)
if pairing_config is None:
raise ValueError('pairing config must not be None when initiating')
session = Session(self, connection, pairing_config)
session = Session(self, connection, pairing_config, is_initiator=True)
self.sessions[connection.handle] = session
return await session.pair()

View File

@@ -209,7 +209,7 @@ async def keyboard_host(device, peer_address):
return
for i, characteristic in enumerate(report_characteristics):
print(color('REPORT:', 'yellow'), characteristic)
if characteristic.properties & Characteristic.NOTIFY:
if characteristic.properties & Characteristic.Properties.NOTIFY:
await peer.discover_descriptors(characteristic)
report_reference_descriptor = characteristic.get_descriptor(
GATT_REPORT_REFERENCE_DESCRIPTOR
@@ -241,7 +241,9 @@ async def keyboard_device(device, command):
# Create an 'input report' characteristic to send keyboard reports to the host
input_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0, 0, 0, 0, 0, 0, 0, 0]),
[
@@ -256,8 +258,8 @@ async def keyboard_device(device, command):
# Create an 'output report' characteristic to receive keyboard reports from the host
output_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ
| Characteristic.WRITE
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0]),
@@ -278,7 +280,7 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Bumble',
)
@@ -289,13 +291,13 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([HID_REPORT_PROTOCOL]),
),
Characteristic(
GATT_HID_INFORMATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
# bcdHID=1.1, bCountryCode=0x00,
# Flags=RemoteWake|NormallyConnectable
@@ -309,7 +311,7 @@ async def keyboard_device(device, command):
),
Characteristic(
GATT_REPORT_MAP_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
HID_KEYBOARD_REPORT_MAP,
),
@@ -322,7 +324,7 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([100]),
)

View File

@@ -101,7 +101,7 @@ async def main():
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
@@ -127,13 +127,13 @@ async def main():
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
@@ -145,7 +145,7 @@ async def main():
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', psm),
)

View File

@@ -80,7 +80,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
[descriptor],

View File

@@ -70,7 +70,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
[descriptor],

View File

@@ -96,7 +96,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Fitbit',
[descriptor],
@@ -109,13 +109,13 @@ async def main():
[
Characteristic(
'D901B45B-4916-412E-ACCA-376ECB603B2C',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=my_custom_read, write=my_custom_write),
),
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=my_custom_read_with_error, write=my_custom_write_with_error
@@ -123,7 +123,7 @@ async def main():
),
Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
'hello',
),

View File

@@ -74,19 +74,21 @@ async def main():
# Add a few entries to the device's GATT server
characteristic1 = Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0x40]),
)
characteristic2 = Characteristic(
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([0x41]),
)
characteristic3 = Characteristic(
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([0x42]),
)

View File

@@ -43,6 +43,8 @@ install_requires =
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 8.1; platform_system!='Emscripten'
prettytable >= 3.6.0
humanize >= 4.6.0
[options.entry_points]
console_scripts =

View File

@@ -21,6 +21,7 @@ import os
import pytest
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.link import LocalLink
from bumble.device import Device
from bumble.host import Host
@@ -58,18 +59,19 @@ class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = LocalLink()
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
Controller('C1', link=self.link, public_address=addresses[0]),
Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
Device(
address='F0:F1:F2:F3:F4:F5',
address=addresses[0],
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address='F5:F4:F3:F2:F1:F0',
address=addresses[1],
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]
@@ -79,6 +81,9 @@ class TwoDevices:
def on_connection(self, which, connection):
self.connections[which] = connection
def on_paired(self, which, keys):
self.paired[which] = keys
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@@ -94,12 +99,21 @@ async def test_self_connection():
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
await asyncio.gather(
two_devices.devices[0].connect(
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
),
two_devices.devices[1].accept(two_devices.devices[0].public_address),
)
# Check the post conditions
assert two_devices.connections[0] is not None
@@ -152,6 +166,9 @@ def sink_codec_capabilities():
@pytest.mark.asyncio
async def test_source_sink_1():
two_devices = TwoDevices()
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
@@ -171,9 +188,16 @@ async def test_source_sink_1():
listener = Listener(Listener.create_registrar(two_devices.devices[1]))
listener.on('connection', on_avdtp_connection)
connection = await two_devices.devices[0].connect(
two_devices.devices[1].random_address
)
async def make_connection():
connections = await asyncio.gather(
two_devices.devices[0].connect(
two_devices.devices[1].public_address, BT_BR_EDR_TRANSPORT
),
two_devices.devices[1].accept(two_devices.devices[0].public_address),
)
return connections[0]
connection = await make_connection()
client = await Protocol.connect(connection)
endpoints = await client.discover_remote_endpoints()
assert len(endpoints) == 1

View File

@@ -23,6 +23,7 @@ import pytest
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_server import Server
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -114,7 +115,7 @@ async def test_characteristic_encoding():
c = Foo(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
123,
)
@@ -143,7 +144,9 @@ async def test_characteristic_encoding():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -239,7 +242,9 @@ async def test_attribute_getters():
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -284,7 +289,7 @@ def test_CharacteristicAdapter():
v = bytes([1, 2, 3])
c = Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
v,
)
@@ -420,7 +425,7 @@ async def test_read_write():
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
)
@@ -437,7 +442,7 @@ async def test_read_write():
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=on_characteristic2_read, write=on_characteristic2_write
@@ -500,7 +505,7 @@ async def test_read_write2():
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v,
)
@@ -544,7 +549,7 @@ async def test_subscribe_notify():
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
@@ -560,7 +565,7 @@ async def test_subscribe_notify():
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([4, 5, 6]),
)
@@ -576,7 +581,9 @@ async def test_subscribe_notify():
characteristic3 = Characteristic(
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([7, 8, 9]),
)
@@ -796,32 +803,46 @@ async def test_mtu_exchange():
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert Characteristic.property_name(0x01) == "BROADCAST"
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
assert str(Characteristic.Properties(0x01)) == "Properties.BROADCAST"
assert str(Characteristic.Properties.BROADCAST) == "Properties.BROADCAST"
# double
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
assert str(Characteristic.Properties(0x03)) == "Properties.READ|BROADCAST"
assert (
Characteristic.properties_as_string(
Characteristic.BROADCAST | Characteristic.READ
)
== "BROADCAST,READ"
str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
== "Properties.READ|BROADCAST"
)
# -----------------------------------------------------------------------------
def test_char_property_string_to_type():
def test_characteristic_property_from_string():
# single
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
assert (
Characteristic.Properties.from_string("BROADCAST")
== Characteristic.Properties.BROADCAST
)
# double
assert (
Characteristic.string_to_properties("BROADCAST,READ")
== Characteristic.BROADCAST | Characteristic.READ
Characteristic.Properties.from_string("BROADCAST,READ")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
assert (
Characteristic.string_to_properties("READ,BROADCAST")
== Characteristic.BROADCAST | Characteristic.READ
Characteristic.Properties.from_string("READ,BROADCAST")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
# -----------------------------------------------------------------------------
def test_characteristic_property_from_string_assert():
with pytest.raises(TypeError) as e_info:
Characteristic.Properties.from_string("BROADCAST,HELLO")
assert (
str(e_info.value)
== """Characteristic.Properties::from_string() error:
Expected a string containing any of the keys, separated by commas: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
Got: BROADCAST,HELLO"""
)
@@ -832,7 +853,9 @@ async def test_server_string():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -843,13 +866,13 @@ async def test_server_string():
assert (
str(server.gatt_server)
== """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
)
@@ -871,21 +894,131 @@ def test_attribute_string_to_permissions():
# -----------------------------------------------------------------------------
def test_charracteristic_permissions():
def test_characteristic_permissions():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.permissions == 3
# -----------------------------------------------------------------------------
def test_characteristic_has_properties():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.has_properties(Characteristic.Properties.READ)
assert characteristic.has_properties(
Characteristic.Properties.READ | Characteristic.Properties.WRITE
)
assert not characteristic.has_properties(
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.INDICATE
)
assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
# -----------------------------------------------------------------------------
def test_descriptor_permissions():
descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
assert descriptor.permissions == 3
# -----------------------------------------------------------------------------
def test_get_attribute_group():
device = Device()
# add some services / characteristics to the gatt server
characteristic1 = Characteristic(
'1111',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
characteristic2 = Characteristic(
'2222',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
device.gatt_server.add_services(services)
# get the handles from gatt server
characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
UUID('1212'), UUID('1111')
)
assert characteristic_attributes1 is not None
characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
UUID('3233'), UUID('2222')
)
assert characteristic_attributes2 is not None
descriptor1 = device.gatt_server.get_descriptor_attribute(
UUID('1212'), UUID('1111'), UUID('2902')
)
assert descriptor1 is not None
descriptor2 = device.gatt_server.get_descriptor_attribute(
UUID('3233'), UUID('2222'), UUID('2902')
)
assert descriptor2 is not None
# confirm the handles map back to the service
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[0].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[1].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[0].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[1].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
)
# confirm the handles map back to the characteristic
assert (
UUID('1111')
== device.gatt_server.get_attribute_group(
descriptor1.handle, Characteristic
).uuid
)
assert (
UUID('2222')
== device.gatt_server.get_attribute_group(
descriptor2.handle, Characteristic
).uuid
)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())

View File

@@ -215,12 +215,18 @@ def test_HCI_Command():
# -----------------------------------------------------------------------------
def test_HCI_PIN_Code_Request_Reply_Command():
pin_code = b'1234'
pin_code_length = len(pin_code)
# here to make the test pass, we need to
# pad pin_code, as HCI_Object.format_fields
# does not do it for us
padded_pin_code = pin_code + bytes(16 - pin_code_length)
command = HCI_PIN_Code_Request_Reply_Command(
bd_addr=Address(
'00:11:22:33:44:55', address_type=Address.PUBLIC_DEVICE_ADDRESS
),
pin_code_length=4,
pin_code=b'1234',
pin_code_length=pin_code_length,
pin_code=padded_pin_code,
)
basic_check(command)

View File

@@ -22,6 +22,7 @@ import os
import pytest
from bumble.controller import Controller
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -47,18 +48,19 @@ class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = LocalLink()
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
Controller('C1', link=self.link, public_address=addresses[0]),
Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
Device(
address='F0:F1:F2:F3:F4:F5',
address=addresses[0],
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address='F5:F4:F3:F2:F1:F0',
address=addresses[1],
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]
@@ -98,6 +100,60 @@ async def test_self_connection():
assert two_devices.connections[1] is not None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
@pytest.mark.parametrize(
'responder_role,',
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
)
async def test_self_classic_connection(responder_role):
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await asyncio.gather(
two_devices.devices[0].connect(
two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
),
two_devices.devices[1].accept(
two_devices.devices[0].public_address, responder_role
),
)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
# Check the role
assert two_devices.connections[0].role != responder_role
assert two_devices.connections[1].role == responder_role
# Role switch
await two_devices.connections[0].switch_role(responder_role)
# Check the role
assert two_devices.connections[0].role == responder_role
assert two_devices.connections[1].role != responder_role
await two_devices.connections[0].disconnect()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_gatt():
@@ -107,25 +163,28 @@ async def test_self_gatt():
# Add some GATT characteristics to device 1
c1 = Characteristic(
'3A143AD7-D4A7-436B-97D6-5B62C315E833',
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
c2 = Characteristic(
'9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([4, 5, 6]),
)
c3 = Characteristic(
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
Characteristic.READ | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([7, 8, 9]),
)
c4 = Characteristic(
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([1, 1, 1]),
)
@@ -178,7 +237,7 @@ async def test_self_gatt_long_read():
characteristics = [
Characteristic(
f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([x & 255 for x in range(i)]),
)