Compare commits

...

15 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 2c2f512180 add comment to explain the initial role choice 2023-04-07 12:19:28 -07:00
Gilles Boccon-Gibod 859aea5a99 fix role state for classic connections 2023-04-07 10:24:26 -07:00
Gilles Boccon-Gibod c53e1d2480 Merge pull request #171 from google/gbg/keystore-name
use device public or static address for keystore namespace
2023-04-03 17:58:18 -07:00
Gilles Boccon-Gibod 620c135ac4 only instantiate keystore if not already set 2023-04-03 17:52:51 -07:00
Gilles Boccon-Gibod fca73a49a3 use device public or static address for keystore namespace 2023-04-03 12:39:22 -07:00
Alan Rosenthal cf70db84a1 Merge pull request #170 from AlanRosenthal/alan/fix-char-proxy-print
Fix CharacteristicProxy __str__
2023-03-31 17:25:50 -04:00
Alan Rosenthal 7731c41f80 Fix CharacteristicProxy __str__
property was really an int, and needed to be transformed into a `Characteristic.Properties`
2023-03-31 17:06:33 -04:00
Alan Rosenthal 278341cbc0 Merge pull request #167 from AlanRosenthal/alan/properties
Create Characteristic.Property
2023-03-31 16:14:38 -04:00
Alan Rosenthal fb49a87494 Create Characteristic.Property
Move all Characteristic properties into its own `enum.IntFlag` class
2023-03-31 16:09:24 -04:00
Alan Rosenthal eba82b9d9a Merge pull request #164 from AlanRosenthal/alan/local-write
Add `local-write` to bumble-console
2023-03-31 16:07:09 -04:00
Alan Rosenthal 677fc77d3c Merge pull request #163 from AlanRosenthal/alan/local-values
Add `show local-values`
2023-03-31 16:03:52 -04:00
Alan Rosenthal e026de295f Add show local-values
This PR adds a way to display the local gatt characteristics/descriptors values

If no connections, it shows the value of every characteristic/descriptor.
When there's a connection, it shows the value for each specific connection - CCCDs are connection specific

This screen auto-updates every second
2023-03-31 00:20:07 +00:00
Alan Rosenthal 52c15705e9 Add local-write to bumble-console
Add a command to update the local gatt server, and notify/indicate subscribes (if any)
2023-03-30 12:33:32 -04:00
Lucas Abel 45ca0ef071 Merge pull request #166 from google/uael/att-fix
att: fixed use of unknown attribute
2023-03-30 07:12:28 -07:00
uael e0af954baa att: fixed use of unknown attribute 2023-03-30 14:05:43 +00:00
28 changed files with 592 additions and 214 deletions
+4 -2
View File
@@ -558,11 +558,13 @@ class GattServer:
# Setup the GATT service
self.speed_tx = Characteristic(
SPEED_TX_UUID,
Characteristic.WRITE,
Characteristic.Properties.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=self.on_tx_write),
)
self.speed_rx = Characteristic(SPEED_RX_UUID, Characteristic.NOTIFY, 0)
self.speed_rx = Characteristic(
SPEED_RX_UUID, Characteristic.Properties.NOTIFY, 0
)
speed_service = Service(
SPEED_SERVICE_UUID,
+167 -17
View File
@@ -24,10 +24,11 @@ import logging
import os
import random
import re
from typing import Optional
from typing import Optional, Union
from collections import OrderedDict
import click
from prettytable import PrettyTable
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
@@ -125,7 +126,8 @@ class ConsoleApp:
def __init__(self):
self.known_addresses = set()
self.known_attributes = []
self.known_remote_attributes = []
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'device'
@@ -162,6 +164,7 @@ class ConsoleApp:
'device': None,
'local-services': None,
'remote-services': None,
'local-values': None,
},
'filter': {
'address': None,
@@ -172,10 +175,11 @@ class ConsoleApp:
'disconnect': None,
'discover': {'services': None, 'attributes': None},
'request-mtu': None,
'read': LiveCompleter(self.known_attributes),
'write': LiveCompleter(self.known_attributes),
'subscribe': LiveCompleter(self.known_attributes),
'unsubscribe': LiveCompleter(self.known_attributes),
'read': LiveCompleter(self.known_remote_attributes),
'write': LiveCompleter(self.known_remote_attributes),
'local-write': LiveCompleter(self.known_local_attributes),
'subscribe': LiveCompleter(self.known_remote_attributes),
'unsubscribe': LiveCompleter(self.known_remote_attributes),
'set-phy': {'1m': None, '2m': None, 'coded': None},
'set-default-phy': None,
'quit': None,
@@ -207,6 +211,7 @@ class ConsoleApp:
self.log_text = FormattedTextControl(
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
)
self.local_values_text = FormattedTextControl()
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = []
@@ -221,6 +226,10 @@ class ConsoleApp:
Frame(Window(self.local_services_text), title='Local Services'),
filter=Condition(lambda: self.top_tab == 'local-services'),
),
ConditionalContainer(
Frame(Window(self.local_values_text), title='Local Values'),
filter=Condition(lambda: self.top_tab == 'local-values'),
),
ConditionalContainer(
Frame(Window(self.remote_services_text), title='Remote Services'),
filter=Condition(lambda: self.top_tab == 'remote-services'),
@@ -366,17 +375,19 @@ class ConsoleApp:
def show_remote_services(self, services):
lines = []
del self.known_attributes[:]
del self.known_remote_attributes[:]
for service in services:
lines.append(("ansicyan", f"{service}\n"))
for characteristic in service.characteristics:
lines.append(('ansimagenta', f' {characteristic} + \n'))
self.known_attributes.append(
self.known_remote_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
)
self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
self.known_attributes.append(f'#{characteristic.handle:X}')
self.known_remote_attributes.append(
f'*.{characteristic.uuid.to_hex_str()}'
)
self.known_remote_attributes.append(f'#{characteristic.handle:X}')
for descriptor in characteristic.descriptors:
lines.append(("ansigreen", f" {descriptor}\n"))
@@ -385,12 +396,31 @@ class ConsoleApp:
def show_local_services(self, attributes):
lines = []
del self.known_local_attributes[:]
for attribute in attributes:
if isinstance(attribute, Service):
# Save the most recent service for use later
service = attribute
lines.append(("ansicyan", f"{attribute}\n"))
elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
elif isinstance(attribute, Characteristic):
# CharacteristicDeclaration includes all info from Characteristic
# no need to print it twice
continue
elif isinstance(attribute, CharacteristicDeclaration):
# Save the most recent characteristic declaration for use later
characteristic_declaration = attribute
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
)
self.known_local_attributes.append(
f'#{attribute.characteristic.handle:X}'
)
lines.append(("ansimagenta", f" {attribute}\n"))
elif isinstance(attribute, Descriptor):
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
)
self.known_local_attributes.append(f'#{attribute.handle:X}')
lines.append(("ansigreen", f" {attribute}\n"))
else:
lines.append(("ansiyellow", f"{attribute}\n"))
@@ -494,7 +524,7 @@ class ConsoleApp:
self.show_attributes(attributes)
def find_characteristic(self, param) -> Optional[CharacteristicProxy]:
def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
if not self.connected_peer:
return None
parts = param.split('.')
@@ -516,6 +546,38 @@ class ConsoleApp:
return None
def find_local_attribute(
self, param
) -> Optional[Union[Characteristic, Descriptor]]:
parts = param.split('.')
if len(parts) == 3:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
descriptor_uuid = UUID(parts[2])
return self.device.gatt_server.get_descriptor_attribute(
service_uuid, characteristic_uuid, descriptor_uuid
)
if len(parts) == 2:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
characteristic_attributes = (
self.device.gatt_server.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
)
if characteristic_attributes:
return characteristic_attributes[1]
return None
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
attribute = self.device.gatt_server.get_attribute(attribute_handle)
if isinstance(attribute, (Characteristic, Descriptor)):
return attribute
return None
return None
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
@@ -674,10 +736,70 @@ class ConsoleApp:
'device',
'local-services',
'remote-services',
'local-values',
}:
self.top_tab = params[0]
self.ui.invalidate()
while self.top_tab == 'local-values':
await self.do_show_local_values()
await asyncio.sleep(1)
async def do_show_local_values(self):
prettytable = PrettyTable()
field_names = ["Service", "Characteristic", "Descriptor"]
# if there's no connections, add a column just for value
if not self.device.connections:
field_names.append("Value")
# if there are connections, add a column for each connection's value
for connection in self.device.connections.values():
field_names.append(f"Connection {connection.handle}")
for attribute in self.device.gatt_server.attributes:
if isinstance(attribute, Characteristic):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
elif isinstance(attribute, Descriptor):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
characteristic = self.device.gatt_server.get_attribute_group(
attribute.handle, Characteristic
)
if not characteristic:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
# TODO: future optimization: convert CCCD value to human readable string
prettytable.add_row(
[service.uuid, characteristic.uuid, attribute.type] + values
)
prettytable.field_names = field_names
self.local_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_get_phy(self, _):
if not self.connected_peer:
self.show_error('not connected')
@@ -720,7 +842,7 @@ class ConsoleApp:
self.show_error('not connected')
return
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -745,15 +867,43 @@ class ConsoleApp:
except ValueError:
value = str.encode(params[1]) # must be a string
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
# use write with response if supported
with_response = characteristic.properties & Characteristic.WRITE
with_response = characteristic.properties & Characteristic.Properties.WRITE
await characteristic.write_value(value, with_response=with_response)
async def do_local_write(self, params):
if len(params) != 2:
self.show_error(
'invalid syntax', 'expected local-write <attribute> <value>'
)
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else:
try:
value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer
except ValueError:
value = str.encode(params[1]) # must be a string
attribute = self.find_local_attribute(params[0])
if not attribute:
self.show_error('invalid syntax', 'unable to find attribute')
return
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
if attribute.has_properties([Characteristic.NOTIFY]):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties([Characteristic.INDICATE]):
await self.device.gatt_server.indicate_subscribers(attribute)
async def do_subscribe(self, params):
if not self.connected_peer:
self.show_error('not connected')
@@ -763,7 +913,7 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
@@ -783,7 +933,7 @@ class ConsoleApp:
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
characteristic = self.find_characteristic(params[0])
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
+3 -4
View File
@@ -230,13 +230,13 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
)
self.tx_characteristic = Characteristic(
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
Characteristic.NOTIFY,
Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
)
self.tx_characteristic.on('subscription', self.on_tx_subscription)
self.psm_characteristic = Characteristic(
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([psm, 0]),
)
@@ -339,8 +339,7 @@ async def run(
# Create a UDP to TX bridge (receive from TX, send to UDP)
bridge.tx_socket, _ = await loop.create_datagram_endpoint(
# pylint: disable-next=unnecessary-lambda
lambda: asyncio.DatagramProtocol(),
asyncio.DatagramProtocol,
remote_addr=(send_host, send_port),
)
+13 -1
View File
@@ -264,6 +264,7 @@ async def pair(
sc,
mitm,
bond,
ctkd,
io,
prompt,
request,
@@ -302,7 +303,8 @@ async def pair(
[
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=read_with_error, write=write_with_error
@@ -316,6 +318,7 @@ async def pair(
if mode == 'classic':
device.classic_enabled = True
device.le_enabled = False
device.classic_smp_enabled = ctkd
# Get things going
await device.power_on()
@@ -378,6 +381,13 @@ class LogHandler(logging.Handler):
@click.option(
'--bond', type=bool, default=True, help='Enable bonding', show_default=True
)
@click.option(
'--ctkd',
type=bool,
default=True,
help='Enable CTKD',
show_default=True,
)
@click.option(
'--io',
type=click.Choice(
@@ -404,6 +414,7 @@ def main(
sc,
mitm,
bond,
ctkd,
io,
prompt,
request,
@@ -426,6 +437,7 @@ def main(
sc,
mitm,
bond,
ctkd,
io,
prompt,
request,
+4 -4
View File
@@ -190,7 +190,7 @@ class ATT_Error(ProtocolError):
super().__init__(
error_code,
error_namespace='att',
error_name=ATT_PDU.error_name(self.error_code),
error_name=ATT_PDU.error_name(error_code),
)
self.att_handle = att_handle
self.message = message
@@ -750,10 +750,10 @@ class Attribute(EventEmitter):
permissions_str.split(","),
0,
)
except TypeError:
except TypeError as exc:
raise TypeError(
f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
)
f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
) from exc
def __init__(self, attribute_type, permissions, value=b''):
EventEmitter.__init__(self)
+55 -28
View File
@@ -595,7 +595,7 @@ class Connection(CompositeEventEmitter):
# [Classic only]
@classmethod
def incomplete(cls, device, peer_address):
def incomplete(cls, device, peer_address, role):
"""
Instantiate an incomplete connection (ie. one waiting for a HCI Connection
Complete event).
@@ -608,28 +608,30 @@ class Connection(CompositeEventEmitter):
device.public_address,
peer_address,
None,
None,
role,
None,
None,
)
# [Classic only]
def complete(self, handle, peer_resolvable_address, role, parameters):
def complete(self, handle, parameters):
"""
Finish an incomplete connection upon completion.
"""
assert self.handle is None
assert self.transport == BT_BR_EDR_TRANSPORT
self.handle = handle
self.peer_resolvable_address = peer_resolvable_address
# Quirk: role might be known before complete
if self.role is None:
self.role = role
self.parameters = parameters
@property
def role_name(self):
return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL'
if self.role is None:
return 'NOT-SET'
if self.role == BT_CENTRAL_ROLE:
return 'CENTRAL'
if self.role == BT_PERIPHERAL_ROLE:
return 'PERIPHERAL'
return f'UNKNOWN[{self.role}]'
@property
def is_encrypted(self):
@@ -637,7 +639,7 @@ class Connection(CompositeEventEmitter):
@property
def is_incomplete(self) -> bool:
return self.handle == None
return self.handle is None
def send_l2cap_pdu(self, cid, pdu):
self.device.send_l2cap_pdu(self.handle, cid, pdu)
@@ -750,10 +752,11 @@ class DeviceConfiguration:
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = True
self.le_simultaneous_enabled = False
self.classic_enabled = False
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.classic_smp_enabled = True
self.classic_accept_any = True
self.connectable = True
self.discoverable = True
@@ -788,6 +791,9 @@ class DeviceConfiguration:
self.classic_ssp_enabled = config.get(
'classic_ssp_enabled', self.classic_ssp_enabled
)
self.classic_smp_enabled = config.get(
'classic_smp_enabled', self.classic_smp_enabled
)
self.classic_accept_any = config.get(
'classic_accept_any', self.classic_accept_any
)
@@ -878,7 +884,7 @@ device_host_event_handlers: list[str] = []
# -----------------------------------------------------------------------------
class Device(CompositeEventEmitter):
# incomplete list of fields.
# Incomplete list of fields.
random_address: Address
public_address: Address
classic_enabled: bool
@@ -893,6 +899,7 @@ class Device(CompositeEventEmitter):
Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
]
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration
@composite_listener
class Listener:
@@ -980,9 +987,10 @@ class Device(CompositeEventEmitter):
self.connect_own_address_type = None
# Use the initial config or a default
config = config or DeviceConfiguration()
self.config = config
self.public_address = Address('00:00:00:00:00:00')
if config is None:
config = DeviceConfiguration()
self.name = config.name
self.random_address = config.address
self.class_of_device = config.class_of_device
@@ -990,13 +998,14 @@ class Device(CompositeEventEmitter):
self.advertising_data = config.advertising_data
self.advertising_interval_min = config.advertising_interval_min
self.advertising_interval_max = config.advertising_interval_max
self.keystore = KeyStore.create_for_device(config)
self.keystore = None
self.irk = config.irk
self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled
self.discoverable = config.discoverable
self.connectable = config.connectable
self.classic_accept_any = config.classic_accept_any
@@ -1018,7 +1027,9 @@ class Device(CompositeEventEmitter):
descriptors.append(new_descriptor)
new_characteristic = Characteristic(
uuid=characteristic["uuid"],
properties=characteristic["properties"],
properties=Characteristic.Properties.from_string(
characteristic["properties"]
),
permissions=characteristic["permissions"],
descriptors=descriptors,
)
@@ -1039,9 +1050,6 @@ class Device(CompositeEventEmitter):
# Setup SMP
self.smp_manager = smp.Manager(self)
self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu)
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_BR_CID, self.on_smp_pdu
)
# Register the SDP server with the L2CAP Channel Manager
self.sdp_server.register(self.l2cap_channel_manager)
@@ -1165,6 +1173,7 @@ class Device(CompositeEventEmitter):
# Reset the controller
await self.host.reset()
# Try to get the public address from the controller
response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg]
if response.return_parameters.status == HCI_SUCCESS:
logger.debug(
@@ -1172,6 +1181,17 @@ class Device(CompositeEventEmitter):
)
self.public_address = response.return_parameters.bd_addr
# Instantiate the Key Store (we do this here rather than at __init__ time
# because some Key Store implementations use the public address as a namespace)
if self.keystore is None:
self.keystore = KeyStore.create_for_device(self)
# Finish setting up SMP based on post-init configurable options
if self.classic_smp_enabled:
self.l2cap_channel_manager.register_fixed_channel(
smp.SMP_BR_CID, self.on_smp_pdu
)
if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
await self.send_command(
HCI_Write_LE_Host_Support_Command(
@@ -1793,7 +1813,7 @@ class Device(CompositeEventEmitter):
else:
# Save pending connection
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address
self, peer_address, BT_CENTRAL_ROLE
)
# TODO: allow passing other settings
@@ -1930,9 +1950,12 @@ class Device(CompositeEventEmitter):
self.on('connection', on_connection)
self.on('connection_failure', on_connection_failure)
# Save pending connection
# Save pending connection, with the Peripheral role.
# Even if we requested a role switch in the HCI_Accept_Connection_Request
# command, this connection is still considered Peripheral until an eventual
# role change event.
self.pending_connections[peer_address] = Connection.incomplete(
self, peer_address
self, peer_address, BT_PERIPHERAL_ROLE
)
try:
@@ -2205,6 +2228,9 @@ class Device(CompositeEventEmitter):
keys = await self.keystore.get(str(address))
if keys is not None:
logger.debug('found keys in the key store')
if keys.link_key is None:
logger.debug('no link key')
return None
return keys.link_key.value
# [Classic only]
@@ -2454,25 +2480,24 @@ class Device(CompositeEventEmitter):
connection_handle,
transport,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
):
logger.debug(
f'*** Connection: [0x{connection_handle:04X}] '
f'{peer_address} as {HCI_Constant.role_name(role)}'
f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}'
)
if connection_handle in self.connections:
logger.warning(
'new connection reuses the same handle as a previous connection'
)
peer_resolvable_address = None
if transport == BT_BR_EDR_TRANSPORT:
# Create a new connection
connection = self.pending_connections.pop(peer_address)
connection.complete(
connection_handle, peer_resolvable_address, role, connection_parameters
)
connection.complete(connection_handle, connection_parameters)
self.connections[connection_handle] = connection
# Emit an event to notify listeners of the new connection
@@ -2584,7 +2609,9 @@ class Device(CompositeEventEmitter):
# device configuration is set to accept any incoming connection
elif self.classic_accept_any:
# Save pending connection
self.pending_connections[bd_addr] = Connection.incomplete(self, bd_addr)
self.pending_connections[bd_addr] = Connection.incomplete(
self, bd_addr, BT_PERIPHERAL_ROLE
)
self.host.send_command_sync(
HCI_Accept_Connection_Request_Command(
+2 -2
View File
@@ -41,14 +41,14 @@ class GenericAccessService(Service):
def __init__(self, device_name, appearance=(0, 0)):
device_name_characteristic = Characteristic(
GATT_DEVICE_NAME_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
device_name.encode('utf-8')[:248],
)
appearance_characteristic = Characteristic(
GATT_APPEARANCE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', (appearance[0] << 6) | appearance[1]),
)
+57 -47
View File
@@ -28,7 +28,7 @@ import enum
import functools
import logging
import struct
from typing import Optional, Sequence
from typing import Optional, Sequence, List
from .colors import color
from .core import UUID, get_dict_key_by_value
@@ -259,63 +259,68 @@ class Characteristic(Attribute):
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
'''
# Property flags
BROADCAST = 0x01
READ = 0x02
WRITE_WITHOUT_RESPONSE = 0x04
WRITE = 0x08
NOTIFY = 0x10
INDICATE = 0x20
AUTHENTICATED_SIGNED_WRITES = 0x40
EXTENDED_PROPERTIES = 0x80
uuid: UUID
properties: Characteristic.Properties
PROPERTY_NAMES = {
BROADCAST: 'BROADCAST',
READ: 'READ',
WRITE_WITHOUT_RESPONSE: 'WRITE_WITHOUT_RESPONSE',
WRITE: 'WRITE',
NOTIFY: 'NOTIFY',
INDICATE: 'INDICATE',
AUTHENTICATED_SIGNED_WRITES: 'AUTHENTICATED_SIGNED_WRITES',
EXTENDED_PROPERTIES: 'EXTENDED_PROPERTIES',
}
class Properties(enum.IntFlag):
"""Property flags"""
@staticmethod
def property_name(property_int):
return Characteristic.PROPERTY_NAMES.get(property_int, '')
BROADCAST = 0x01
READ = 0x02
WRITE_WITHOUT_RESPONSE = 0x04
WRITE = 0x08
NOTIFY = 0x10
INDICATE = 0x20
AUTHENTICATED_SIGNED_WRITES = 0x40
EXTENDED_PROPERTIES = 0x80
@staticmethod
def properties_as_string(properties):
return ','.join(
[
Characteristic.property_name(p)
for p in Characteristic.PROPERTY_NAMES
if properties & p
]
)
@staticmethod
def from_string(properties_str: str) -> Characteristic.Properties:
property_names: List[str] = []
for property in Characteristic.Properties:
if property.name is None:
raise TypeError()
property_names.append(property.name)
@staticmethod
def string_to_properties(properties_str: str):
return functools.reduce(
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
properties_str.split(","),
0,
)
def string_to_property(property_string) -> Characteristic.Properties:
for property in zip(Characteristic.Properties, property_names):
if property_string == property[1]:
return property[0]
raise TypeError(f"Unable to convert {property_string} to Property")
try:
return functools.reduce(
lambda x, y: x | string_to_property(y),
properties_str.split(","),
Characteristic.Properties(0),
)
except TypeError:
raise TypeError(
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by commas: {','.join(property_names)}\nGot: {properties_str}"
)
# For backwards compatibility these are defined here
# For new code, please use Characteristic.Properties.X
BROADCAST = Properties.BROADCAST
READ = Properties.READ
WRITE_WITHOUT_RESPONSE = Properties.WRITE_WITHOUT_RESPONSE
WRITE = Properties.WRITE
NOTIFY = Properties.NOTIFY
INDICATE = Properties.INDICATE
AUTHENTICATED_SIGNED_WRITES = Properties.AUTHENTICATED_SIGNED_WRITES
EXTENDED_PROPERTIES = Properties.EXTENDED_PROPERTIES
def __init__(
self,
uuid,
properties,
properties: Characteristic.Properties,
permissions,
value=b'',
descriptors: Sequence[Descriptor] = (),
):
super().__init__(uuid, permissions, value)
self.uuid = self.type
if isinstance(properties, str):
self.properties = Characteristic.string_to_properties(properties)
else:
self.properties = properties
self.properties = properties
self.descriptors = descriptors
def get_descriptor(self, descriptor_type):
@@ -325,12 +330,15 @@ class Characteristic(Attribute):
return None
def has_properties(self, properties: Characteristic.Properties) -> bool:
return self.properties & properties == properties
def __str__(self):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
f'end=0x{self.end_group_handle:04X}, '
f'uuid={self.uuid}, '
f'properties={Characteristic.properties_as_string(self.properties)})'
f'{self.properties!s})'
)
@@ -340,6 +348,8 @@ class CharacteristicDeclaration(Attribute):
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
'''
characteristic: Characteristic
def __init__(self, characteristic, value_handle):
declaration_bytes = (
struct.pack('<BH', characteristic.properties, value_handle)
@@ -355,8 +365,8 @@ class CharacteristicDeclaration(Attribute):
return (
f'CharacteristicDeclaration(handle=0x{self.handle:04X}, '
f'value_handle=0x{self.value_handle:04X}, '
f'uuid={self.characteristic.uuid}, properties='
f'{Characteristic.properties_as_string(self.characteristic.properties)})'
f'uuid={self.characteristic.uuid}, '
f'{self.characteristic.properties!s})'
)
+17 -9
View File
@@ -27,7 +27,7 @@ from __future__ import annotations
import asyncio
import logging
import struct
from typing import List, Optional
from typing import List, Optional, Dict, Any, Callable
from pyee import EventEmitter
@@ -62,7 +62,6 @@ from .gatt import (
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Service,
Characteristic,
ClientCharacteristicConfigurationBits,
)
@@ -139,12 +138,21 @@ class ServiceProxy(AttributeProxy):
class CharacteristicProxy(AttributeProxy):
properties: Characteristic.Properties
descriptors: List[DescriptorProxy]
subscribers: Dict[Any, Callable]
def __init__(self, client, handle, end_group_handle, uuid, properties):
def __init__(
self,
client,
handle,
end_group_handle,
uuid,
properties: int,
):
super().__init__(client, handle, end_group_handle, uuid)
self.uuid = uuid
self.properties = properties
self.properties = Characteristic.Properties(properties)
self.descriptors = []
self.descriptors_discovered = False
self.subscribers = {} # Map from subscriber to proxy subscriber
@@ -186,7 +194,7 @@ class CharacteristicProxy(AttributeProxy):
return (
f'Characteristic(handle=0x{self.handle:04X}, '
f'uuid={self.uuid}, '
f'properties={Characteristic.properties_as_string(self.properties)})'
f'{self.properties!s})'
)
@@ -678,8 +686,8 @@ class Client:
return
if (
characteristic.properties & Characteristic.NOTIFY
and characteristic.properties & Characteristic.INDICATE
characteristic.properties & Characteristic.Properties.NOTIFY
and characteristic.properties & Characteristic.Properties.INDICATE
):
if prefer_notify:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
@@ -687,10 +695,10 @@ class Client:
else:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
elif characteristic.properties & Characteristic.NOTIFY:
elif characteristic.properties & Characteristic.Properties.NOTIFY:
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
subscribers = self.notification_subscribers
elif characteristic.properties & Characteristic.INDICATE:
elif characteristic.properties & Characteristic.Properties.INDICATE:
bits = ClientCharacteristicConfigurationBits.INDICATION
subscribers = self.indication_subscribers
else:
+20 -2
View File
@@ -27,7 +27,7 @@ import asyncio
import logging
from collections import defaultdict
import struct
from typing import List, Tuple, Optional
from typing import List, Tuple, Optional, TypeVar, Type
from pyee import EventEmitter
from .colors import color
@@ -135,6 +135,21 @@ class Server(EventEmitter):
return attribute
return None
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
def get_attribute_group(
self, handle: int, group_type: Type[AttributeGroupType]
) -> Optional[AttributeGroupType]:
return next(
(
attribute
for attribute in self.attributes
if isinstance(attribute, group_type)
and attribute.handle <= handle <= attribute.end_group_handle
),
None,
)
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
return next(
(
@@ -228,7 +243,10 @@ class Server(EventEmitter):
# unless there is one already
if (
characteristic.properties
& (Characteristic.NOTIFY | Characteristic.INDICATE)
& (
Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE
)
and characteristic.get_descriptor(
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
)
+3 -13
View File
@@ -94,10 +94,9 @@ HOST_HC_TOTAL_NUM_ACL_DATA_PACKETS = 1
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, host, handle, role, peer_address, transport):
def __init__(self, host, handle, peer_address, transport):
self.host = host
self.handle = handle
self.role = role
self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
@@ -534,7 +533,7 @@ class Host(AbortableEventEmitter):
if event.status == HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### CONNECTION: [0x{event.connection_handle:04X}] '
f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}'
)
@@ -543,7 +542,6 @@ class Host(AbortableEventEmitter):
connection = Connection(
self,
event.connection_handle,
event.role,
event.peer_address,
BT_LE_TRANSPORT,
)
@@ -560,7 +558,6 @@ class Host(AbortableEventEmitter):
event.connection_handle,
BT_LE_TRANSPORT,
event.peer_address,
None,
event.role,
connection_parameters,
)
@@ -589,7 +586,6 @@ class Host(AbortableEventEmitter):
connection = Connection(
self,
event.connection_handle,
BT_CENTRAL_ROLE,
event.bd_addr,
BT_BR_EDR_TRANSPORT,
)
@@ -602,7 +598,6 @@ class Host(AbortableEventEmitter):
BT_BR_EDR_TRANSPORT,
event.bd_addr,
None,
BT_CENTRAL_ROLE,
None,
)
else:
@@ -622,8 +617,7 @@ class Host(AbortableEventEmitter):
if event.status == HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{event.connection_handle:04X}] '
f'{connection.peer_address} as '
f'{HCI_Constant.role_name(connection.role)}, '
f'{connection.peer_address} '
f'reason={event.reason}'
)
del self.connections[event.connection_handle]
@@ -739,10 +733,6 @@ class Host(AbortableEventEmitter):
f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}'
)
if connection := self.find_connection_by_bd_addr(
event.bd_addr, BT_BR_EDR_TRANSPORT
):
connection.role = event.new_role
self.emit('role_change', event.bd_addr, event.new_role)
else:
logger.debug(
+26 -10
View File
@@ -20,15 +20,19 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import logging
import os
import json
from typing import Optional
from typing import TYPE_CHECKING, Optional
from .colors import color
from .hci import Address
if TYPE_CHECKING:
from .device import Device
# -----------------------------------------------------------------------------
# Logging
@@ -173,13 +177,13 @@ class KeyStore:
separator = '\n'
@staticmethod
def create_for_device(device_config):
if device_config.keystore is None:
def create_for_device(device: Device) -> Optional[KeyStore]:
if device.config.keystore is None:
return None
keystore_type = device_config.keystore.split(':', 1)[0]
keystore_type = device.config.keystore.split(':', 1)[0]
if keystore_type == 'JsonKeyStore':
return JsonKeyStore.from_device_config(device_config)
return JsonKeyStore.from_device(device)
return None
@@ -204,7 +208,9 @@ class JsonKeyStore(KeyStore):
self.directory_name = os.path.join(
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
)
json_filename = f'{self.namespace}.json'.lower().replace(':', '-')
json_filename = (
f'{self.namespace}.json'.lower().replace(':', '-').replace('/p', '-p')
)
self.filename = os.path.join(self.directory_name, json_filename)
else:
self.filename = filename
@@ -213,9 +219,19 @@ class JsonKeyStore(KeyStore):
logger.debug(f'JSON keystore: {self.filename}')
@staticmethod
def from_device_config(device_config):
params = device_config.keystore.split(':', 1)[1:]
namespace = str(device_config.address)
def from_device(device: Device) -> Optional[JsonKeyStore]:
if not device.config.keystore:
return None
params = device.config.keystore.split(':', 1)[1:]
# Use a namespace based on the device address
if device.public_address not in (Address.ANY, Address.ANY_RANDOM):
namespace = str(device.public_address)
elif device.random_address != Address.ANY_RANDOM:
namespace = str(device.random_address)
else:
namespace = JsonKeyStore.DEFAULT_NAMESPACE
if params:
filename = params[0]
else:
@@ -257,7 +273,7 @@ class JsonKeyStore(KeyStore):
db = await self.load()
namespace = db.setdefault(self.namespace, {})
namespace[name] = keys.to_dict()
namespace.setdefault(name, {}).update(keys.to_dict())
await self.save(db)
+6 -5
View File
@@ -103,7 +103,7 @@ class AshaService(TemplateService):
self.read_only_properties_characteristic = Characteristic(
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
@@ -120,19 +120,20 @@ class AshaService(TemplateService):
self.audio_control_point_characteristic = Characteristic(
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
self.audio_status_characteristic = Characteristic(
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
self.volume_characteristic = Characteristic(
GATT_ASHA_VOLUME_CHARACTERISTIC,
Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_volume_write),
)
@@ -151,7 +152,7 @@ class AshaService(TemplateService):
self.psm = self.device.register_l2cap_channel_server(self.psm, on_coc, 8)
self.le_psm_out_characteristic = Characteristic(
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', self.psm),
)
+1 -1
View File
@@ -36,7 +36,7 @@ class BatteryService(TemplateService):
self.battery_level_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
CharacteristicValue(read=read_battery_level),
),
@@ -63,7 +63,9 @@ class DeviceInformationService(TemplateService):
# TODO: pnp_id
):
characteristics = [
Characteristic(uuid, Characteristic.READ, Characteristic.READABLE, field)
Characteristic(
uuid, Characteristic.Properties.READ, Characteristic.READABLE, field
)
for (field, uuid) in (
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
(model_number, GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
@@ -79,7 +81,7 @@ class DeviceInformationService(TemplateService):
characteristics.append(
Characteristic(
GATT_SYSTEM_ID_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
self.pack_system_id(*system_id),
)
@@ -89,7 +91,7 @@ class DeviceInformationService(TemplateService):
characteristics.append(
Characteristic(
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
ieee_regulatory_certification_data_list,
)
+3 -3
View File
@@ -152,7 +152,7 @@ class HeartRateService(TemplateService):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.NOTIFY,
Characteristic.Properties.NOTIFY,
0,
CharacteristicValue(read=read_heart_rate_measurement),
),
@@ -164,7 +164,7 @@ class HeartRateService(TemplateService):
if body_sensor_location is not None:
self.body_sensor_location_characteristic = Characteristic(
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([int(body_sensor_location)]),
)
@@ -182,7 +182,7 @@ class HeartRateService(TemplateService):
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
Characteristic(
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE,
Characteristic.Properties.WRITE,
Characteristic.WRITEABLE,
CharacteristicValue(write=write_heart_rate_control_point_value),
),
+1 -1
View File
@@ -439,7 +439,7 @@ class DLC(EventEmitter):
logger.debug(
f'<<< Credits [{self.dlci}]: '
f'received {credits}, total={self.tx_credits}'
f'received {received_credits}, total={self.tx_credits}'
)
data = data[1:]
+1 -1
View File
@@ -553,7 +553,7 @@ class PairingConfig:
def __init__(
self,
sc: bool = True,
mitm: bool = True,
mitm: bool = False,
bonding: bool = True,
delegate: Optional[PairingDelegate] = None,
) -> None:
+3 -1
View File
@@ -1,4 +1,6 @@
{
"name": "Bumble Hands-Free",
"class_of_device": 2360324
"class_of_device": 2360324,
"keystore": "JsonKeyStore",
"le_enabled": false
}
+11 -9
View File
@@ -209,7 +209,7 @@ async def keyboard_host(device, peer_address):
return
for i, characteristic in enumerate(report_characteristics):
print(color('REPORT:', 'yellow'), characteristic)
if characteristic.properties & Characteristic.NOTIFY:
if characteristic.properties & Characteristic.Properties.NOTIFY:
await peer.discover_descriptors(characteristic)
report_reference_descriptor = characteristic.get_descriptor(
GATT_REPORT_REFERENCE_DESCRIPTOR
@@ -241,7 +241,9 @@ async def keyboard_device(device, command):
# Create an 'input report' characteristic to send keyboard reports to the host
input_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0, 0, 0, 0, 0, 0, 0, 0]),
[
@@ -256,8 +258,8 @@ async def keyboard_device(device, command):
# Create an 'output report' characteristic to receive keyboard reports from the host
output_report_characteristic = Characteristic(
GATT_REPORT_CHARACTERISTIC,
Characteristic.READ
| Characteristic.WRITE
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([0]),
@@ -278,7 +280,7 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Bumble',
)
@@ -289,13 +291,13 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_PROTOCOL_MODE_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([HID_REPORT_PROTOCOL]),
),
Characteristic(
GATT_HID_INFORMATION_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
# bcdHID=1.1, bCountryCode=0x00,
# Flags=RemoteWake|NormallyConnectable
@@ -309,7 +311,7 @@ async def keyboard_device(device, command):
),
Characteristic(
GATT_REPORT_MAP_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
HID_KEYBOARD_REPORT_MAP,
),
@@ -322,7 +324,7 @@ async def keyboard_device(device, command):
[
Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([100]),
)
+4 -4
View File
@@ -101,7 +101,7 @@ async def main():
# Add the ASHA service to the GATT server
read_only_properties_characteristic = Characteristic(
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(
[
@@ -127,13 +127,13 @@ async def main():
)
audio_control_point_characteristic = Characteristic(
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.WRITEABLE,
CharacteristicValue(write=on_audio_control_point_write),
)
audio_status_characteristic = Characteristic(
ASHA_AUDIO_STATUS_CHARACTERISTIC,
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0]),
)
@@ -145,7 +145,7 @@ async def main():
)
le_psm_out_characteristic = Characteristic(
ASHA_LE_PSM_OUT_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', psm),
)
+1 -1
View File
@@ -80,7 +80,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
[descriptor],
+1 -1
View File
@@ -70,7 +70,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
"Fitbit",
[descriptor],
+4 -4
View File
@@ -96,7 +96,7 @@ async def main():
)
manufacturer_name_characteristic = Characteristic(
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
'Fitbit',
[descriptor],
@@ -109,13 +109,13 @@ async def main():
[
Characteristic(
'D901B45B-4916-412E-ACCA-376ECB603B2C',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=my_custom_read, write=my_custom_write),
),
Characteristic(
'552957FB-CF1F-4A31-9535-E78847E1A714',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=my_custom_read_with_error, write=my_custom_write_with_error
@@ -123,7 +123,7 @@ async def main():
),
Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
'hello',
),
+5 -3
View File
@@ -74,19 +74,21 @@ async def main():
# Add a few entries to the device's GATT server
characteristic1 = Characteristic(
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([0x40]),
)
characteristic2 = Characteristic(
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([0x41]),
)
characteristic3 = Characteristic(
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([0x42]),
)
+1
View File
@@ -43,6 +43,7 @@ install_requires =
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 8.1; platform_system!='Emscripten'
prettytable >= 3.6.0
[options.entry_points]
console_scripts =
+165 -32
View File
@@ -23,6 +23,7 @@ import pytest
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_server import Server
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -114,7 +115,7 @@ async def test_characteristic_encoding():
c = Foo(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
123,
)
@@ -143,7 +144,9 @@ async def test_characteristic_encoding():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -239,7 +242,9 @@ async def test_attribute_getters():
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -284,7 +289,7 @@ def test_CharacteristicAdapter():
v = bytes([1, 2, 3])
c = Characteristic(
GATT_BATTERY_LEVEL_CHARACTERISTIC,
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
v,
)
@@ -420,7 +425,7 @@ async def test_read_write():
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
)
@@ -437,7 +442,7 @@ async def test_read_write():
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(
read=on_characteristic2_read, write=on_characteristic2_write
@@ -500,7 +505,7 @@ async def test_read_write2():
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v,
)
@@ -544,7 +549,7 @@ async def test_subscribe_notify():
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
@@ -560,7 +565,7 @@ async def test_subscribe_notify():
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([4, 5, 6]),
)
@@ -576,7 +581,9 @@ async def test_subscribe_notify():
characteristic3 = Characteristic(
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([7, 8, 9]),
)
@@ -796,32 +803,46 @@ async def test_mtu_exchange():
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert Characteristic.property_name(0x01) == "BROADCAST"
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
assert str(Characteristic.Properties(0x01)) == "Properties.BROADCAST"
assert str(Characteristic.Properties.BROADCAST) == "Properties.BROADCAST"
# double
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
assert str(Characteristic.Properties(0x03)) == "Properties.READ|BROADCAST"
assert (
Characteristic.properties_as_string(
Characteristic.BROADCAST | Characteristic.READ
)
== "BROADCAST,READ"
str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
== "Properties.READ|BROADCAST"
)
# -----------------------------------------------------------------------------
def test_char_property_string_to_type():
def test_characteristic_property_from_string():
# single
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
assert (
Characteristic.Properties.from_string("BROADCAST")
== Characteristic.Properties.BROADCAST
)
# double
assert (
Characteristic.string_to_properties("BROADCAST,READ")
== Characteristic.BROADCAST | Characteristic.READ
Characteristic.Properties.from_string("BROADCAST,READ")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
assert (
Characteristic.string_to_properties("READ,BROADCAST")
== Characteristic.BROADCAST | Characteristic.READ
Characteristic.Properties.from_string("READ,BROADCAST")
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
)
# -----------------------------------------------------------------------------
def test_characteristic_property_from_string_assert():
with pytest.raises(TypeError) as e_info:
Characteristic.Properties.from_string("BROADCAST,HELLO")
assert (
str(e_info.value)
== """Characteristic.Properties::from_string() error:
Expected a string containing any of the keys, separated by commas: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
Got: BROADCAST,HELLO"""
)
@@ -832,7 +853,9 @@ async def test_server_string():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
@@ -843,13 +866,13 @@ async def test_server_string():
assert (
str(server.gatt_server)
== """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
)
@@ -871,21 +894,131 @@ def test_attribute_string_to_permissions():
# -----------------------------------------------------------------------------
def test_charracteristic_permissions():
def test_characteristic_permissions():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.permissions == 3
# -----------------------------------------------------------------------------
def test_characteristic_has_properties():
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.NOTIFY,
'READABLE,WRITEABLE',
)
assert characteristic.has_properties(Characteristic.Properties.READ)
assert characteristic.has_properties(
Characteristic.Properties.READ | Characteristic.Properties.WRITE
)
assert not characteristic.has_properties(
Characteristic.Properties.READ
| Characteristic.Properties.WRITE
| Characteristic.Properties.INDICATE
)
assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
# -----------------------------------------------------------------------------
def test_descriptor_permissions():
descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
assert descriptor.permissions == 3
# -----------------------------------------------------------------------------
def test_get_attribute_group():
device = Device()
# add some services / characteristics to the gatt server
characteristic1 = Characteristic(
'1111',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
characteristic2 = Characteristic(
'2222',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123]),
)
services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
device.gatt_server.add_services(services)
# get the handles from gatt server
characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
UUID('1212'), UUID('1111')
)
assert characteristic_attributes1 is not None
characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
UUID('3233'), UUID('2222')
)
assert characteristic_attributes2 is not None
descriptor1 = device.gatt_server.get_descriptor_attribute(
UUID('1212'), UUID('1111'), UUID('2902')
)
assert descriptor1 is not None
descriptor2 = device.gatt_server.get_descriptor_attribute(
UUID('3233'), UUID('2222'), UUID('2902')
)
assert descriptor2 is not None
# confirm the handles map back to the service
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[0].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(
characteristic_attributes1[1].handle, Service
).uuid
)
assert (
UUID('1212')
== device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[0].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(
characteristic_attributes2[1].handle, Service
).uuid
)
assert (
UUID('3233')
== device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
)
# confirm the handles map back to the characteristic
assert (
UUID('1111')
== device.gatt_server.get_attribute_group(
descriptor1.handle, Characteristic
).uuid
)
assert (
UUID('2222')
== device.gatt_server.get_attribute_group(
descriptor2.handle, Characteristic
).uuid
)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
+9 -6
View File
@@ -163,25 +163,28 @@ async def test_self_gatt():
# Add some GATT characteristics to device 1
c1 = Characteristic(
'3A143AD7-D4A7-436B-97D6-5B62C315E833',
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([1, 2, 3]),
)
c2 = Characteristic(
'9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8',
Characteristic.READ | Characteristic.WRITE,
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([4, 5, 6]),
)
c3 = Characteristic(
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
Characteristic.READ | Characteristic.WRITE_WITHOUT_RESPONSE,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([7, 8, 9]),
)
c4 = Characteristic(
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.INDICATE,
Characteristic.READABLE,
bytes([1, 1, 1]),
)
@@ -234,7 +237,7 @@ async def test_self_gatt_long_read():
characteristics = [
Characteristic(
f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
Characteristic.READ,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes([x & 255 for x in range(i)]),
)
@@ -444,7 +447,7 @@ async def test_self_smp_wrong_pin():
async def compare_numbers(self, number, digits):
return False
wrong_pin_pairing_config = PairingConfig(delegate=WrongPinDelegate())
wrong_pin_pairing_config = PairingConfig(mitm=True, delegate=WrongPinDelegate())
paired = False
try:
await _test_self_smp_with_configs(