forked from auracaster/bumble_mirror
Compare commits
20 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 49b2c13e69 | |||
| 962737a97b | |||
| 85496aaff5 | |||
| a95e601a5c | |||
| df218b5370 | |||
| 0f737244b5 | |||
| a258ba383a | |||
| c53e1d2480 | |||
| 620c135ac4 | |||
| fca73a49a3 | |||
| cf70db84a1 | |||
| 7731c41f80 | |||
| 278341cbc0 | |||
| fb49a87494 | |||
| eba82b9d9a | |||
| 677fc77d3c | |||
| e026de295f | |||
| 52c15705e9 | |||
| 45ca0ef071 | |||
| e0af954baa |
@@ -200,3 +200,22 @@
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
|
||||
---
|
||||
|
||||
|
||||
Files: bumble/colors.py
|
||||
Copyright (c) 2012 Giorgos Verigakis <verigak@gmail.com>
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
+4
-2
@@ -558,11 +558,13 @@ class GattServer:
|
||||
# Setup the GATT service
|
||||
self.speed_tx = Characteristic(
|
||||
SPEED_TX_UUID,
|
||||
Characteristic.WRITE,
|
||||
Characteristic.Properties.WRITE,
|
||||
Characteristic.WRITEABLE,
|
||||
CharacteristicValue(write=self.on_tx_write),
|
||||
)
|
||||
self.speed_rx = Characteristic(SPEED_RX_UUID, Characteristic.NOTIFY, 0)
|
||||
self.speed_rx = Characteristic(
|
||||
SPEED_RX_UUID, Characteristic.Properties.NOTIFY, 0
|
||||
)
|
||||
|
||||
speed_service = Service(
|
||||
SPEED_SERVICE_UUID,
|
||||
|
||||
+213
-17
@@ -24,10 +24,12 @@ import logging
|
||||
import os
|
||||
import random
|
||||
import re
|
||||
from typing import Optional
|
||||
import humanize
|
||||
from typing import Optional, Union
|
||||
from collections import OrderedDict
|
||||
|
||||
import click
|
||||
from prettytable import PrettyTable
|
||||
|
||||
from prompt_toolkit import Application
|
||||
from prompt_toolkit.history import FileHistory
|
||||
@@ -125,7 +127,8 @@ class ConsoleApp:
|
||||
|
||||
def __init__(self):
|
||||
self.known_addresses = set()
|
||||
self.known_attributes = []
|
||||
self.known_remote_attributes = []
|
||||
self.known_local_attributes = []
|
||||
self.device = None
|
||||
self.connected_peer = None
|
||||
self.top_tab = 'device'
|
||||
@@ -162,6 +165,8 @@ class ConsoleApp:
|
||||
'device': None,
|
||||
'local-services': None,
|
||||
'remote-services': None,
|
||||
'local-values': None,
|
||||
'remote-values': None,
|
||||
},
|
||||
'filter': {
|
||||
'address': None,
|
||||
@@ -172,10 +177,11 @@ class ConsoleApp:
|
||||
'disconnect': None,
|
||||
'discover': {'services': None, 'attributes': None},
|
||||
'request-mtu': None,
|
||||
'read': LiveCompleter(self.known_attributes),
|
||||
'write': LiveCompleter(self.known_attributes),
|
||||
'subscribe': LiveCompleter(self.known_attributes),
|
||||
'unsubscribe': LiveCompleter(self.known_attributes),
|
||||
'read': LiveCompleter(self.known_remote_attributes),
|
||||
'write': LiveCompleter(self.known_remote_attributes),
|
||||
'local-write': LiveCompleter(self.known_local_attributes),
|
||||
'subscribe': LiveCompleter(self.known_remote_attributes),
|
||||
'unsubscribe': LiveCompleter(self.known_remote_attributes),
|
||||
'set-phy': {'1m': None, '2m': None, 'coded': None},
|
||||
'set-default-phy': None,
|
||||
'quit': None,
|
||||
@@ -207,6 +213,8 @@ class ConsoleApp:
|
||||
self.log_text = FormattedTextControl(
|
||||
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
|
||||
)
|
||||
self.local_values_text = FormattedTextControl()
|
||||
self.remote_values_text = FormattedTextControl()
|
||||
self.log_height = Dimension(min=7, weight=4)
|
||||
self.log_max_lines = 100
|
||||
self.log_lines = []
|
||||
@@ -221,10 +229,18 @@ class ConsoleApp:
|
||||
Frame(Window(self.local_services_text), title='Local Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'local-services'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.local_values_text), title='Local Values'),
|
||||
filter=Condition(lambda: self.top_tab == 'local-values'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.remote_services_text), title='Remote Services'),
|
||||
filter=Condition(lambda: self.top_tab == 'remote-services'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.remote_values_text), title='Remote Values'),
|
||||
filter=Condition(lambda: self.top_tab == 'remote-values'),
|
||||
),
|
||||
ConditionalContainer(
|
||||
Frame(Window(self.log_text, height=self.log_height), title='Log'),
|
||||
filter=Condition(lambda: self.top_tab == 'log'),
|
||||
@@ -366,17 +382,19 @@ class ConsoleApp:
|
||||
|
||||
def show_remote_services(self, services):
|
||||
lines = []
|
||||
del self.known_attributes[:]
|
||||
del self.known_remote_attributes[:]
|
||||
for service in services:
|
||||
lines.append(("ansicyan", f"{service}\n"))
|
||||
|
||||
for characteristic in service.characteristics:
|
||||
lines.append(('ansimagenta', f' {characteristic} + \n'))
|
||||
self.known_attributes.append(
|
||||
self.known_remote_attributes.append(
|
||||
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
|
||||
)
|
||||
self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
|
||||
self.known_attributes.append(f'#{characteristic.handle:X}')
|
||||
self.known_remote_attributes.append(
|
||||
f'*.{characteristic.uuid.to_hex_str()}'
|
||||
)
|
||||
self.known_remote_attributes.append(f'#{characteristic.handle:X}')
|
||||
for descriptor in characteristic.descriptors:
|
||||
lines.append(("ansigreen", f" {descriptor}\n"))
|
||||
|
||||
@@ -385,12 +403,31 @@ class ConsoleApp:
|
||||
|
||||
def show_local_services(self, attributes):
|
||||
lines = []
|
||||
del self.known_local_attributes[:]
|
||||
for attribute in attributes:
|
||||
if isinstance(attribute, Service):
|
||||
# Save the most recent service for use later
|
||||
service = attribute
|
||||
lines.append(("ansicyan", f"{attribute}\n"))
|
||||
elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
|
||||
elif isinstance(attribute, Characteristic):
|
||||
# CharacteristicDeclaration includes all info from Characteristic
|
||||
# no need to print it twice
|
||||
continue
|
||||
elif isinstance(attribute, CharacteristicDeclaration):
|
||||
# Save the most recent characteristic declaration for use later
|
||||
characteristic_declaration = attribute
|
||||
self.known_local_attributes.append(
|
||||
f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
|
||||
)
|
||||
self.known_local_attributes.append(
|
||||
f'#{attribute.characteristic.handle:X}'
|
||||
)
|
||||
lines.append(("ansimagenta", f" {attribute}\n"))
|
||||
elif isinstance(attribute, Descriptor):
|
||||
self.known_local_attributes.append(
|
||||
f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
|
||||
)
|
||||
self.known_local_attributes.append(f'#{attribute.handle:X}')
|
||||
lines.append(("ansigreen", f" {attribute}\n"))
|
||||
else:
|
||||
lines.append(("ansiyellow", f"{attribute}\n"))
|
||||
@@ -494,7 +531,7 @@ class ConsoleApp:
|
||||
|
||||
self.show_attributes(attributes)
|
||||
|
||||
def find_characteristic(self, param) -> Optional[CharacteristicProxy]:
|
||||
def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
|
||||
if not self.connected_peer:
|
||||
return None
|
||||
parts = param.split('.')
|
||||
@@ -516,6 +553,38 @@ class ConsoleApp:
|
||||
|
||||
return None
|
||||
|
||||
def find_local_attribute(
|
||||
self, param
|
||||
) -> Optional[Union[Characteristic, Descriptor]]:
|
||||
parts = param.split('.')
|
||||
if len(parts) == 3:
|
||||
service_uuid = UUID(parts[0])
|
||||
characteristic_uuid = UUID(parts[1])
|
||||
descriptor_uuid = UUID(parts[2])
|
||||
return self.device.gatt_server.get_descriptor_attribute(
|
||||
service_uuid, characteristic_uuid, descriptor_uuid
|
||||
)
|
||||
if len(parts) == 2:
|
||||
service_uuid = UUID(parts[0])
|
||||
characteristic_uuid = UUID(parts[1])
|
||||
characteristic_attributes = (
|
||||
self.device.gatt_server.get_characteristic_attributes(
|
||||
service_uuid, characteristic_uuid
|
||||
)
|
||||
)
|
||||
if characteristic_attributes:
|
||||
return characteristic_attributes[1]
|
||||
return None
|
||||
elif len(parts) == 1:
|
||||
if parts[0].startswith('#'):
|
||||
attribute_handle = int(f'{parts[0][1:]}', 16)
|
||||
attribute = self.device.gatt_server.get_attribute(attribute_handle)
|
||||
if isinstance(attribute, (Characteristic, Descriptor)):
|
||||
return attribute
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
async def rssi_monitor_loop(self):
|
||||
while True:
|
||||
if self.monitor_rssi and self.connected_peer:
|
||||
@@ -674,10 +743,109 @@ class ConsoleApp:
|
||||
'device',
|
||||
'local-services',
|
||||
'remote-services',
|
||||
'local-values',
|
||||
'remote-values',
|
||||
}:
|
||||
self.top_tab = params[0]
|
||||
self.ui.invalidate()
|
||||
|
||||
while self.top_tab == 'local-values':
|
||||
await self.do_show_local_values()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
while self.top_tab == 'remote-values':
|
||||
await self.do_show_remote_values()
|
||||
await asyncio.sleep(1)
|
||||
|
||||
async def do_show_local_values(self):
|
||||
prettytable = PrettyTable()
|
||||
field_names = ["Service", "Characteristic", "Descriptor"]
|
||||
|
||||
# if there's no connections, add a column just for value
|
||||
if not self.device.connections:
|
||||
field_names.append("Value")
|
||||
|
||||
# if there are connections, add a column for each connection's value
|
||||
for connection in self.device.connections.values():
|
||||
field_names.append(f"Connection {connection.handle}")
|
||||
|
||||
for attribute in self.device.gatt_server.attributes:
|
||||
if isinstance(attribute, Characteristic):
|
||||
service = self.device.gatt_server.get_attribute_group(
|
||||
attribute.handle, Service
|
||||
)
|
||||
if not service:
|
||||
continue
|
||||
values = [
|
||||
attribute.read_value(connection)
|
||||
for connection in self.device.connections.values()
|
||||
]
|
||||
if not values:
|
||||
values = [attribute.read_value(None)]
|
||||
prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
|
||||
|
||||
elif isinstance(attribute, Descriptor):
|
||||
service = self.device.gatt_server.get_attribute_group(
|
||||
attribute.handle, Service
|
||||
)
|
||||
if not service:
|
||||
continue
|
||||
characteristic = self.device.gatt_server.get_attribute_group(
|
||||
attribute.handle, Characteristic
|
||||
)
|
||||
if not characteristic:
|
||||
continue
|
||||
values = [
|
||||
attribute.read_value(connection)
|
||||
for connection in self.device.connections.values()
|
||||
]
|
||||
if not values:
|
||||
values = [attribute.read_value(None)]
|
||||
|
||||
# TODO: future optimization: convert CCCD value to human readable string
|
||||
|
||||
prettytable.add_row(
|
||||
[service.uuid, characteristic.uuid, attribute.type] + values
|
||||
)
|
||||
|
||||
prettytable.field_names = field_names
|
||||
self.local_values_text.text = prettytable.get_string()
|
||||
self.ui.invalidate()
|
||||
|
||||
async def do_show_remote_values(self):
|
||||
prettytable = PrettyTable(
|
||||
field_names=[
|
||||
"Connection",
|
||||
"Service",
|
||||
"Characteristic",
|
||||
"Descriptor",
|
||||
"Time",
|
||||
"Value",
|
||||
]
|
||||
)
|
||||
for connection in self.device.connections.values():
|
||||
for handle, (time, value) in connection.gatt_client.cached_values.items():
|
||||
row = [connection.handle]
|
||||
attribute = connection.gatt_client.get_attributes(handle)
|
||||
if not attribute:
|
||||
continue
|
||||
if len(attribute) == 3:
|
||||
row.extend(
|
||||
[attribute[0].uuid, attribute[1].uuid, attribute[2].type]
|
||||
)
|
||||
elif len(attribute) == 2:
|
||||
row.extend([attribute[0].uuid, attribute[1].uuid, ""])
|
||||
elif len(attribute) == 1:
|
||||
row.extend([attribute[0].uuid, "", ""])
|
||||
else:
|
||||
continue
|
||||
|
||||
row.extend([humanize.naturaltime(time), value])
|
||||
prettytable.add_row(row)
|
||||
|
||||
self.remote_values_text.text = prettytable.get_string()
|
||||
self.ui.invalidate()
|
||||
|
||||
async def do_get_phy(self, _):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
@@ -720,7 +888,7 @@ class ConsoleApp:
|
||||
self.show_error('not connected')
|
||||
return
|
||||
|
||||
characteristic = self.find_characteristic(params[0])
|
||||
characteristic = self.find_remote_characteristic(params[0])
|
||||
if characteristic is None:
|
||||
self.show_error('no such characteristic')
|
||||
return
|
||||
@@ -745,15 +913,43 @@ class ConsoleApp:
|
||||
except ValueError:
|
||||
value = str.encode(params[1]) # must be a string
|
||||
|
||||
characteristic = self.find_characteristic(params[0])
|
||||
characteristic = self.find_remote_characteristic(params[0])
|
||||
if characteristic is None:
|
||||
self.show_error('no such characteristic')
|
||||
return
|
||||
|
||||
# use write with response if supported
|
||||
with_response = characteristic.properties & Characteristic.WRITE
|
||||
with_response = characteristic.properties & Characteristic.Properties.WRITE
|
||||
await characteristic.write_value(value, with_response=with_response)
|
||||
|
||||
async def do_local_write(self, params):
|
||||
if len(params) != 2:
|
||||
self.show_error(
|
||||
'invalid syntax', 'expected local-write <attribute> <value>'
|
||||
)
|
||||
return
|
||||
|
||||
if params[1].upper().startswith("0X"):
|
||||
value = bytes.fromhex(params[1][2:]) # parse as hex string
|
||||
else:
|
||||
try:
|
||||
value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer
|
||||
except ValueError:
|
||||
value = str.encode(params[1]) # must be a string
|
||||
|
||||
attribute = self.find_local_attribute(params[0])
|
||||
if not attribute:
|
||||
self.show_error('invalid syntax', 'unable to find attribute')
|
||||
return
|
||||
|
||||
# send data to any subscribers
|
||||
if isinstance(attribute, Characteristic):
|
||||
attribute.write_value(None, value)
|
||||
if attribute.has_properties(Characteristic.NOTIFY):
|
||||
await self.device.gatt_server.notify_subscribers(attribute)
|
||||
if attribute.has_properties(Characteristic.INDICATE):
|
||||
await self.device.gatt_server.indicate_subscribers(attribute)
|
||||
|
||||
async def do_subscribe(self, params):
|
||||
if not self.connected_peer:
|
||||
self.show_error('not connected')
|
||||
@@ -763,7 +959,7 @@ class ConsoleApp:
|
||||
self.show_error('invalid syntax', 'expected subscribe <attribute>')
|
||||
return
|
||||
|
||||
characteristic = self.find_characteristic(params[0])
|
||||
characteristic = self.find_remote_characteristic(params[0])
|
||||
if characteristic is None:
|
||||
self.show_error('no such characteristic')
|
||||
return
|
||||
@@ -783,7 +979,7 @@ class ConsoleApp:
|
||||
self.show_error('invalid syntax', 'expected subscribe <attribute>')
|
||||
return
|
||||
|
||||
characteristic = self.find_characteristic(params[0])
|
||||
characteristic = self.find_remote_characteristic(params[0])
|
||||
if characteristic is None:
|
||||
self.show_error('no such characteristic')
|
||||
return
|
||||
|
||||
+3
-4
@@ -230,13 +230,13 @@ class GattlinkNodeBridge(GattlinkL2capEndpoint, Device.Listener):
|
||||
)
|
||||
self.tx_characteristic = Characteristic(
|
||||
GG_GATTLINK_TX_CHARACTERISTIC_UUID,
|
||||
Characteristic.NOTIFY,
|
||||
Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
)
|
||||
self.tx_characteristic.on('subscription', self.on_tx_subscription)
|
||||
self.psm_characteristic = Characteristic(
|
||||
GG_GATTLINK_L2CAP_CHANNEL_PSM_CHARACTERISTIC_UUID,
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
bytes([psm, 0]),
|
||||
)
|
||||
@@ -339,8 +339,7 @@ async def run(
|
||||
|
||||
# Create a UDP to TX bridge (receive from TX, send to UDP)
|
||||
bridge.tx_socket, _ = await loop.create_datagram_endpoint(
|
||||
# pylint: disable-next=unnecessary-lambda
|
||||
lambda: asyncio.DatagramProtocol(),
|
||||
asyncio.DatagramProtocol,
|
||||
remote_addr=(send_host, send_port),
|
||||
)
|
||||
|
||||
|
||||
+2
-1
@@ -302,7 +302,8 @@ async def pair(
|
||||
[
|
||||
Characteristic(
|
||||
'552957FB-CF1F-4A31-9535-E78847E1A714',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
CharacteristicValue(
|
||||
read=read_with_error, write=write_with_error
|
||||
|
||||
+4
-4
@@ -190,7 +190,7 @@ class ATT_Error(ProtocolError):
|
||||
super().__init__(
|
||||
error_code,
|
||||
error_namespace='att',
|
||||
error_name=ATT_PDU.error_name(self.error_code),
|
||||
error_name=ATT_PDU.error_name(error_code),
|
||||
)
|
||||
self.att_handle = att_handle
|
||||
self.message = message
|
||||
@@ -750,10 +750,10 @@ class Attribute(EventEmitter):
|
||||
permissions_str.split(","),
|
||||
0,
|
||||
)
|
||||
except TypeError:
|
||||
except TypeError as exc:
|
||||
raise TypeError(
|
||||
f"Attribute::permissions error:\nExpected a string containing any of the keys, seperated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
|
||||
)
|
||||
f"Attribute::permissions error:\nExpected a string containing any of the keys, separated by commas: {','.join(Attribute.PERMISSION_NAMES.values())}\nGot: {permissions_str}"
|
||||
) from exc
|
||||
|
||||
def __init__(self, attribute_type, permissions, value=b''):
|
||||
EventEmitter.__init__(self)
|
||||
|
||||
+18
-9
@@ -529,6 +529,7 @@ class Connection(CompositeEventEmitter):
|
||||
authenticated: bool
|
||||
sc: bool
|
||||
link_key_type: int
|
||||
gatt_client: gatt_client.Client
|
||||
|
||||
@composite_listener
|
||||
class Listener:
|
||||
@@ -878,7 +879,7 @@ device_host_event_handlers: list[str] = []
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class Device(CompositeEventEmitter):
|
||||
# incomplete list of fields.
|
||||
# Incomplete list of fields.
|
||||
random_address: Address
|
||||
public_address: Address
|
||||
classic_enabled: bool
|
||||
@@ -893,6 +894,7 @@ class Device(CompositeEventEmitter):
|
||||
Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]]
|
||||
]
|
||||
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
|
||||
config: DeviceConfiguration
|
||||
|
||||
@composite_listener
|
||||
class Listener:
|
||||
@@ -980,9 +982,10 @@ class Device(CompositeEventEmitter):
|
||||
self.connect_own_address_type = None
|
||||
|
||||
# Use the initial config or a default
|
||||
config = config or DeviceConfiguration()
|
||||
self.config = config
|
||||
|
||||
self.public_address = Address('00:00:00:00:00:00')
|
||||
if config is None:
|
||||
config = DeviceConfiguration()
|
||||
self.name = config.name
|
||||
self.random_address = config.address
|
||||
self.class_of_device = config.class_of_device
|
||||
@@ -990,7 +993,7 @@ class Device(CompositeEventEmitter):
|
||||
self.advertising_data = config.advertising_data
|
||||
self.advertising_interval_min = config.advertising_interval_min
|
||||
self.advertising_interval_max = config.advertising_interval_max
|
||||
self.keystore = KeyStore.create_for_device(config)
|
||||
self.keystore = None
|
||||
self.irk = config.irk
|
||||
self.le_enabled = config.le_enabled
|
||||
self.classic_enabled = config.classic_enabled
|
||||
@@ -1018,7 +1021,9 @@ class Device(CompositeEventEmitter):
|
||||
descriptors.append(new_descriptor)
|
||||
new_characteristic = Characteristic(
|
||||
uuid=characteristic["uuid"],
|
||||
properties=characteristic["properties"],
|
||||
properties=Characteristic.Properties.from_string(
|
||||
characteristic["properties"]
|
||||
),
|
||||
permissions=characteristic["permissions"],
|
||||
descriptors=descriptors,
|
||||
)
|
||||
@@ -1165,6 +1170,7 @@ class Device(CompositeEventEmitter):
|
||||
# Reset the controller
|
||||
await self.host.reset()
|
||||
|
||||
# Try to get the public address from the controller
|
||||
response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg]
|
||||
if response.return_parameters.status == HCI_SUCCESS:
|
||||
logger.debug(
|
||||
@@ -1172,6 +1178,11 @@ class Device(CompositeEventEmitter):
|
||||
)
|
||||
self.public_address = response.return_parameters.bd_addr
|
||||
|
||||
# Instantiate the Key Store (we do this here rather than at __init__ time
|
||||
# because some Key Store implementations use the public address as a namespace)
|
||||
if self.keystore is None:
|
||||
self.keystore = KeyStore.create_for_device(self)
|
||||
|
||||
if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND):
|
||||
await self.send_command(
|
||||
HCI_Write_LE_Host_Support_Command(
|
||||
@@ -1600,7 +1611,7 @@ class Device(CompositeEventEmitter):
|
||||
pending connection.
|
||||
|
||||
connection_parameters_preferences: (BLE only, ignored for BR/EDR)
|
||||
* None: use all PHYs with default parameters
|
||||
* None: use the 1M PHY with default parameters
|
||||
* map: each entry has a PHY as key and a ConnectionParametersPreferences
|
||||
object as value
|
||||
|
||||
@@ -1669,9 +1680,7 @@ class Device(CompositeEventEmitter):
|
||||
if connection_parameters_preferences is None:
|
||||
if connection_parameters_preferences is None:
|
||||
connection_parameters_preferences = {
|
||||
HCI_LE_1M_PHY: ConnectionParametersPreferences.default,
|
||||
HCI_LE_2M_PHY: ConnectionParametersPreferences.default,
|
||||
HCI_LE_CODED_PHY: ConnectionParametersPreferences.default,
|
||||
HCI_LE_1M_PHY: ConnectionParametersPreferences.default
|
||||
}
|
||||
|
||||
self.connect_own_address_type = own_address_type
|
||||
|
||||
+2
-2
@@ -41,14 +41,14 @@ class GenericAccessService(Service):
|
||||
def __init__(self, device_name, appearance=(0, 0)):
|
||||
device_name_characteristic = Characteristic(
|
||||
GATT_DEVICE_NAME_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
device_name.encode('utf-8')[:248],
|
||||
)
|
||||
|
||||
appearance_characteristic = Characteristic(
|
||||
GATT_APPEARANCE_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
struct.pack('<H', (appearance[0] << 6) | appearance[1]),
|
||||
)
|
||||
|
||||
+57
-47
@@ -28,7 +28,7 @@ import enum
|
||||
import functools
|
||||
import logging
|
||||
import struct
|
||||
from typing import Optional, Sequence
|
||||
from typing import Optional, Sequence, List
|
||||
|
||||
from .colors import color
|
||||
from .core import UUID, get_dict_key_by_value
|
||||
@@ -259,63 +259,68 @@ class Characteristic(Attribute):
|
||||
See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
|
||||
'''
|
||||
|
||||
# Property flags
|
||||
BROADCAST = 0x01
|
||||
READ = 0x02
|
||||
WRITE_WITHOUT_RESPONSE = 0x04
|
||||
WRITE = 0x08
|
||||
NOTIFY = 0x10
|
||||
INDICATE = 0x20
|
||||
AUTHENTICATED_SIGNED_WRITES = 0x40
|
||||
EXTENDED_PROPERTIES = 0x80
|
||||
uuid: UUID
|
||||
properties: Characteristic.Properties
|
||||
|
||||
PROPERTY_NAMES = {
|
||||
BROADCAST: 'BROADCAST',
|
||||
READ: 'READ',
|
||||
WRITE_WITHOUT_RESPONSE: 'WRITE_WITHOUT_RESPONSE',
|
||||
WRITE: 'WRITE',
|
||||
NOTIFY: 'NOTIFY',
|
||||
INDICATE: 'INDICATE',
|
||||
AUTHENTICATED_SIGNED_WRITES: 'AUTHENTICATED_SIGNED_WRITES',
|
||||
EXTENDED_PROPERTIES: 'EXTENDED_PROPERTIES',
|
||||
}
|
||||
class Properties(enum.IntFlag):
|
||||
"""Property flags"""
|
||||
|
||||
@staticmethod
|
||||
def property_name(property_int):
|
||||
return Characteristic.PROPERTY_NAMES.get(property_int, '')
|
||||
BROADCAST = 0x01
|
||||
READ = 0x02
|
||||
WRITE_WITHOUT_RESPONSE = 0x04
|
||||
WRITE = 0x08
|
||||
NOTIFY = 0x10
|
||||
INDICATE = 0x20
|
||||
AUTHENTICATED_SIGNED_WRITES = 0x40
|
||||
EXTENDED_PROPERTIES = 0x80
|
||||
|
||||
@staticmethod
|
||||
def properties_as_string(properties):
|
||||
return ','.join(
|
||||
[
|
||||
Characteristic.property_name(p)
|
||||
for p in Characteristic.PROPERTY_NAMES
|
||||
if properties & p
|
||||
]
|
||||
)
|
||||
@staticmethod
|
||||
def from_string(properties_str: str) -> Characteristic.Properties:
|
||||
property_names: List[str] = []
|
||||
for property in Characteristic.Properties:
|
||||
if property.name is None:
|
||||
raise TypeError()
|
||||
property_names.append(property.name)
|
||||
|
||||
@staticmethod
|
||||
def string_to_properties(properties_str: str):
|
||||
return functools.reduce(
|
||||
lambda x, y: x | get_dict_key_by_value(Characteristic.PROPERTY_NAMES, y),
|
||||
properties_str.split(","),
|
||||
0,
|
||||
)
|
||||
def string_to_property(property_string) -> Characteristic.Properties:
|
||||
for property in zip(Characteristic.Properties, property_names):
|
||||
if property_string == property[1]:
|
||||
return property[0]
|
||||
raise TypeError(f"Unable to convert {property_string} to Property")
|
||||
|
||||
try:
|
||||
return functools.reduce(
|
||||
lambda x, y: x | string_to_property(y),
|
||||
properties_str.split(","),
|
||||
Characteristic.Properties(0),
|
||||
)
|
||||
except TypeError:
|
||||
raise TypeError(
|
||||
f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by commas: {','.join(property_names)}\nGot: {properties_str}"
|
||||
)
|
||||
|
||||
# For backwards compatibility these are defined here
|
||||
# For new code, please use Characteristic.Properties.X
|
||||
BROADCAST = Properties.BROADCAST
|
||||
READ = Properties.READ
|
||||
WRITE_WITHOUT_RESPONSE = Properties.WRITE_WITHOUT_RESPONSE
|
||||
WRITE = Properties.WRITE
|
||||
NOTIFY = Properties.NOTIFY
|
||||
INDICATE = Properties.INDICATE
|
||||
AUTHENTICATED_SIGNED_WRITES = Properties.AUTHENTICATED_SIGNED_WRITES
|
||||
EXTENDED_PROPERTIES = Properties.EXTENDED_PROPERTIES
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
uuid,
|
||||
properties,
|
||||
properties: Characteristic.Properties,
|
||||
permissions,
|
||||
value=b'',
|
||||
descriptors: Sequence[Descriptor] = (),
|
||||
):
|
||||
super().__init__(uuid, permissions, value)
|
||||
self.uuid = self.type
|
||||
if isinstance(properties, str):
|
||||
self.properties = Characteristic.string_to_properties(properties)
|
||||
else:
|
||||
self.properties = properties
|
||||
self.properties = properties
|
||||
self.descriptors = descriptors
|
||||
|
||||
def get_descriptor(self, descriptor_type):
|
||||
@@ -325,12 +330,15 @@ class Characteristic(Attribute):
|
||||
|
||||
return None
|
||||
|
||||
def has_properties(self, properties: Characteristic.Properties) -> bool:
|
||||
return self.properties & properties == properties
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
f'Characteristic(handle=0x{self.handle:04X}, '
|
||||
f'end=0x{self.end_group_handle:04X}, '
|
||||
f'uuid={self.uuid}, '
|
||||
f'properties={Characteristic.properties_as_string(self.properties)})'
|
||||
f'{self.properties!s})'
|
||||
)
|
||||
|
||||
|
||||
@@ -340,6 +348,8 @@ class CharacteristicDeclaration(Attribute):
|
||||
See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
|
||||
'''
|
||||
|
||||
characteristic: Characteristic
|
||||
|
||||
def __init__(self, characteristic, value_handle):
|
||||
declaration_bytes = (
|
||||
struct.pack('<BH', characteristic.properties, value_handle)
|
||||
@@ -355,8 +365,8 @@ class CharacteristicDeclaration(Attribute):
|
||||
return (
|
||||
f'CharacteristicDeclaration(handle=0x{self.handle:04X}, '
|
||||
f'value_handle=0x{self.value_handle:04X}, '
|
||||
f'uuid={self.characteristic.uuid}, properties='
|
||||
f'{Characteristic.properties_as_string(self.characteristic.properties)})'
|
||||
f'uuid={self.characteristic.uuid}, '
|
||||
f'{self.characteristic.properties!s})'
|
||||
)
|
||||
|
||||
|
||||
|
||||
+63
-10
@@ -27,7 +27,8 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import struct
|
||||
from typing import List, Optional
|
||||
from datetime import datetime
|
||||
from typing import List, Optional, Dict, Tuple, Callable, Union, Any
|
||||
|
||||
from pyee import EventEmitter
|
||||
|
||||
@@ -62,7 +63,6 @@ from .gatt import (
|
||||
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE,
|
||||
GATT_REQUEST_TIMEOUT,
|
||||
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
|
||||
Service,
|
||||
Characteristic,
|
||||
ClientCharacteristicConfigurationBits,
|
||||
)
|
||||
@@ -139,12 +139,21 @@ class ServiceProxy(AttributeProxy):
|
||||
|
||||
|
||||
class CharacteristicProxy(AttributeProxy):
|
||||
properties: Characteristic.Properties
|
||||
descriptors: List[DescriptorProxy]
|
||||
subscribers: Dict[Any, Callable]
|
||||
|
||||
def __init__(self, client, handle, end_group_handle, uuid, properties):
|
||||
def __init__(
|
||||
self,
|
||||
client,
|
||||
handle,
|
||||
end_group_handle,
|
||||
uuid,
|
||||
properties: int,
|
||||
):
|
||||
super().__init__(client, handle, end_group_handle, uuid)
|
||||
self.uuid = uuid
|
||||
self.properties = properties
|
||||
self.properties = Characteristic.Properties(properties)
|
||||
self.descriptors = []
|
||||
self.descriptors_discovered = False
|
||||
self.subscribers = {} # Map from subscriber to proxy subscriber
|
||||
@@ -159,7 +168,9 @@ class CharacteristicProxy(AttributeProxy):
|
||||
async def discover_descriptors(self):
|
||||
return await self.client.discover_descriptors(self)
|
||||
|
||||
async def subscribe(self, subscriber=None, prefer_notify=True):
|
||||
async def subscribe(
|
||||
self, subscriber: Optional[Callable] = None, prefer_notify=True
|
||||
):
|
||||
if subscriber is not None:
|
||||
if subscriber in self.subscribers:
|
||||
# We already have a proxy subscriber
|
||||
@@ -186,7 +197,7 @@ class CharacteristicProxy(AttributeProxy):
|
||||
return (
|
||||
f'Characteristic(handle=0x{self.handle:04X}, '
|
||||
f'uuid={self.uuid}, '
|
||||
f'properties={Characteristic.properties_as_string(self.properties)})'
|
||||
f'{self.properties!s})'
|
||||
)
|
||||
|
||||
|
||||
@@ -213,6 +224,7 @@ class ProfileServiceProxy:
|
||||
# -----------------------------------------------------------------------------
|
||||
class Client:
|
||||
services: List[ServiceProxy]
|
||||
cached_values: Dict[int, Tuple[datetime, bytes]]
|
||||
|
||||
def __init__(self, connection):
|
||||
self.connection = connection
|
||||
@@ -225,6 +237,7 @@ class Client:
|
||||
) # Notification subscribers, by attribute handle
|
||||
self.indication_subscribers = {} # Indication subscribers, by attribute handle
|
||||
self.services = []
|
||||
self.cached_values = {}
|
||||
|
||||
def send_gatt_pdu(self, pdu):
|
||||
self.connection.send_l2cap_pdu(ATT_CID, pdu)
|
||||
@@ -309,6 +322,35 @@ class Client:
|
||||
if c.uuid == uuid
|
||||
]
|
||||
|
||||
def get_attribute_grouping(
|
||||
self, attribute_handle: int
|
||||
) -> Optional[
|
||||
Union[
|
||||
ServiceProxy,
|
||||
Tuple[ServiceProxy, CharacteristicProxy],
|
||||
Tuple[ServiceProxy, CharacteristicProxy, DescriptorProxy],
|
||||
]
|
||||
]:
|
||||
"""
|
||||
Get the attribute(s) associated with an attribute handle
|
||||
"""
|
||||
for service in self.services:
|
||||
if service.handle == attribute_handle:
|
||||
return service
|
||||
if service.handle <= attribute_handle <= service.end_group_handle:
|
||||
for characteristic in service.characteristics:
|
||||
if characteristic.handle == attribute_handle:
|
||||
return (service, characteristic)
|
||||
if (
|
||||
characteristic.handle
|
||||
<= attribute_handle
|
||||
<= characteristic.end_group_handle
|
||||
):
|
||||
for descriptor in characteristic.descriptors:
|
||||
if descriptor.handle == attribute_handle:
|
||||
return (service, characteristic, descriptor)
|
||||
return None
|
||||
|
||||
def on_service_discovered(self, service):
|
||||
'''Add a service to the service list if it wasn't already there'''
|
||||
already_known = False
|
||||
@@ -678,8 +720,8 @@ class Client:
|
||||
return
|
||||
|
||||
if (
|
||||
characteristic.properties & Characteristic.NOTIFY
|
||||
and characteristic.properties & Characteristic.INDICATE
|
||||
characteristic.properties & Characteristic.Properties.NOTIFY
|
||||
and characteristic.properties & Characteristic.Properties.INDICATE
|
||||
):
|
||||
if prefer_notify:
|
||||
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
|
||||
@@ -687,10 +729,10 @@ class Client:
|
||||
else:
|
||||
bits = ClientCharacteristicConfigurationBits.INDICATION
|
||||
subscribers = self.indication_subscribers
|
||||
elif characteristic.properties & Characteristic.NOTIFY:
|
||||
elif characteristic.properties & Characteristic.Properties.NOTIFY:
|
||||
bits = ClientCharacteristicConfigurationBits.NOTIFICATION
|
||||
subscribers = self.notification_subscribers
|
||||
elif characteristic.properties & Characteristic.INDICATE:
|
||||
elif characteristic.properties & Characteristic.Properties.INDICATE:
|
||||
bits = ClientCharacteristicConfigurationBits.INDICATION
|
||||
subscribers = self.indication_subscribers
|
||||
else:
|
||||
@@ -800,6 +842,7 @@ class Client:
|
||||
|
||||
offset += len(part)
|
||||
|
||||
self.cache_value(attribute_handle, attribute_value)
|
||||
# Return the value as bytes
|
||||
return attribute_value
|
||||
|
||||
@@ -934,6 +977,8 @@ class Client:
|
||||
)
|
||||
if not subscribers:
|
||||
logger.warning('!!! received notification with no subscriber')
|
||||
|
||||
self.cache_value(notification.attribute_handle, notification.attribute_value)
|
||||
for subscriber in subscribers:
|
||||
if callable(subscriber):
|
||||
subscriber(notification.attribute_value)
|
||||
@@ -945,6 +990,8 @@ class Client:
|
||||
subscribers = self.indication_subscribers.get(indication.attribute_handle, [])
|
||||
if not subscribers:
|
||||
logger.warning('!!! received indication with no subscriber')
|
||||
|
||||
self.cache_value(indication.attribute_handle, indication.attribute_value)
|
||||
for subscriber in subscribers:
|
||||
if callable(subscriber):
|
||||
subscriber(indication.attribute_value)
|
||||
@@ -953,3 +1000,9 @@ class Client:
|
||||
|
||||
# Confirm that we received the indication
|
||||
self.send_confirmation(ATT_Handle_Value_Confirmation())
|
||||
|
||||
def cache_value(self, attribute_handle: int, value: bytes):
|
||||
self.cached_values[attribute_handle] = (
|
||||
datetime.now(),
|
||||
value,
|
||||
)
|
||||
|
||||
+20
-2
@@ -27,7 +27,7 @@ import asyncio
|
||||
import logging
|
||||
from collections import defaultdict
|
||||
import struct
|
||||
from typing import List, Tuple, Optional
|
||||
from typing import List, Tuple, Optional, TypeVar, Type
|
||||
from pyee import EventEmitter
|
||||
|
||||
from .colors import color
|
||||
@@ -135,6 +135,21 @@ class Server(EventEmitter):
|
||||
return attribute
|
||||
return None
|
||||
|
||||
AttributeGroupType = TypeVar('AttributeGroupType', Service, Characteristic)
|
||||
|
||||
def get_attribute_group(
|
||||
self, handle: int, group_type: Type[AttributeGroupType]
|
||||
) -> Optional[AttributeGroupType]:
|
||||
return next(
|
||||
(
|
||||
attribute
|
||||
for attribute in self.attributes
|
||||
if isinstance(attribute, group_type)
|
||||
and attribute.handle <= handle <= attribute.end_group_handle
|
||||
),
|
||||
None,
|
||||
)
|
||||
|
||||
def get_service_attribute(self, service_uuid: UUID) -> Optional[Service]:
|
||||
return next(
|
||||
(
|
||||
@@ -228,7 +243,10 @@ class Server(EventEmitter):
|
||||
# unless there is one already
|
||||
if (
|
||||
characteristic.properties
|
||||
& (Characteristic.NOTIFY | Characteristic.INDICATE)
|
||||
& (
|
||||
Characteristic.Properties.NOTIFY
|
||||
| Characteristic.Properties.INDICATE
|
||||
)
|
||||
and characteristic.get_descriptor(
|
||||
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR
|
||||
)
|
||||
|
||||
+25
-9
@@ -20,15 +20,19 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
import json
|
||||
from typing import Optional
|
||||
from typing import TYPE_CHECKING, Optional
|
||||
|
||||
from .colors import color
|
||||
from .hci import Address
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .device import Device
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
@@ -173,13 +177,13 @@ class KeyStore:
|
||||
separator = '\n'
|
||||
|
||||
@staticmethod
|
||||
def create_for_device(device_config):
|
||||
if device_config.keystore is None:
|
||||
def create_for_device(device: Device) -> Optional[KeyStore]:
|
||||
if device.config.keystore is None:
|
||||
return None
|
||||
|
||||
keystore_type = device_config.keystore.split(':', 1)[0]
|
||||
keystore_type = device.config.keystore.split(':', 1)[0]
|
||||
if keystore_type == 'JsonKeyStore':
|
||||
return JsonKeyStore.from_device_config(device_config)
|
||||
return JsonKeyStore.from_device(device)
|
||||
|
||||
return None
|
||||
|
||||
@@ -204,7 +208,9 @@ class JsonKeyStore(KeyStore):
|
||||
self.directory_name = os.path.join(
|
||||
appdirs.user_data_dir(self.APP_NAME, self.APP_AUTHOR), self.KEYS_DIR
|
||||
)
|
||||
json_filename = f'{self.namespace}.json'.lower().replace(':', '-')
|
||||
json_filename = (
|
||||
f'{self.namespace}.json'.lower().replace(':', '-').replace('/p', '-p')
|
||||
)
|
||||
self.filename = os.path.join(self.directory_name, json_filename)
|
||||
else:
|
||||
self.filename = filename
|
||||
@@ -213,9 +219,19 @@ class JsonKeyStore(KeyStore):
|
||||
logger.debug(f'JSON keystore: {self.filename}')
|
||||
|
||||
@staticmethod
|
||||
def from_device_config(device_config):
|
||||
params = device_config.keystore.split(':', 1)[1:]
|
||||
namespace = str(device_config.address)
|
||||
def from_device(device: Device) -> Optional[JsonKeyStore]:
|
||||
if not device.config.keystore:
|
||||
return None
|
||||
|
||||
params = device.config.keystore.split(':', 1)[1:]
|
||||
|
||||
# Use a namespace based on the device address
|
||||
if device.public_address not in (Address.ANY, Address.ANY_RANDOM):
|
||||
namespace = str(device.public_address)
|
||||
elif device.random_address != Address.ANY_RANDOM:
|
||||
namespace = str(device.random_address)
|
||||
else:
|
||||
namespace = JsonKeyStore.DEFAULT_NAMESPACE
|
||||
if params:
|
||||
filename = params[0]
|
||||
else:
|
||||
|
||||
@@ -103,7 +103,7 @@ class AshaService(TemplateService):
|
||||
|
||||
self.read_only_properties_characteristic = Characteristic(
|
||||
GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes(
|
||||
[
|
||||
@@ -120,19 +120,20 @@ class AshaService(TemplateService):
|
||||
|
||||
self.audio_control_point_characteristic = Characteristic(
|
||||
GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
|
||||
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.WRITEABLE,
|
||||
CharacteristicValue(write=on_audio_control_point_write),
|
||||
)
|
||||
self.audio_status_characteristic = Characteristic(
|
||||
GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
bytes([0]),
|
||||
)
|
||||
self.volume_characteristic = Characteristic(
|
||||
GATT_ASHA_VOLUME_CHARACTERISTIC,
|
||||
Characteristic.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.WRITEABLE,
|
||||
CharacteristicValue(write=on_volume_write),
|
||||
)
|
||||
@@ -151,7 +152,7 @@ class AshaService(TemplateService):
|
||||
self.psm = self.device.register_l2cap_channel_server(self.psm, on_coc, 8)
|
||||
self.le_psm_out_characteristic = Characteristic(
|
||||
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
struct.pack('<H', self.psm),
|
||||
)
|
||||
|
||||
@@ -36,7 +36,7 @@ class BatteryService(TemplateService):
|
||||
self.battery_level_characteristic = PackedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
CharacteristicValue(read=read_battery_level),
|
||||
),
|
||||
|
||||
@@ -63,7 +63,9 @@ class DeviceInformationService(TemplateService):
|
||||
# TODO: pnp_id
|
||||
):
|
||||
characteristics = [
|
||||
Characteristic(uuid, Characteristic.READ, Characteristic.READABLE, field)
|
||||
Characteristic(
|
||||
uuid, Characteristic.Properties.READ, Characteristic.READABLE, field
|
||||
)
|
||||
for (field, uuid) in (
|
||||
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
|
||||
(model_number, GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
|
||||
@@ -79,7 +81,7 @@ class DeviceInformationService(TemplateService):
|
||||
characteristics.append(
|
||||
Characteristic(
|
||||
GATT_SYSTEM_ID_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
self.pack_system_id(*system_id),
|
||||
)
|
||||
@@ -89,7 +91,7 @@ class DeviceInformationService(TemplateService):
|
||||
characteristics.append(
|
||||
Characteristic(
|
||||
GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
ieee_regulatory_certification_data_list,
|
||||
)
|
||||
|
||||
@@ -152,7 +152,7 @@ class HeartRateService(TemplateService):
|
||||
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
|
||||
Characteristic.NOTIFY,
|
||||
Characteristic.Properties.NOTIFY,
|
||||
0,
|
||||
CharacteristicValue(read=read_heart_rate_measurement),
|
||||
),
|
||||
@@ -164,7 +164,7 @@ class HeartRateService(TemplateService):
|
||||
if body_sensor_location is not None:
|
||||
self.body_sensor_location_characteristic = Characteristic(
|
||||
GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes([int(body_sensor_location)]),
|
||||
)
|
||||
@@ -182,7 +182,7 @@ class HeartRateService(TemplateService):
|
||||
self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
|
||||
Characteristic(
|
||||
GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
|
||||
Characteristic.WRITE,
|
||||
Characteristic.Properties.WRITE,
|
||||
Characteristic.WRITEABLE,
|
||||
CharacteristicValue(write=write_heart_rate_control_point_value),
|
||||
),
|
||||
|
||||
+8
-4
@@ -645,7 +645,7 @@ class Session:
|
||||
},
|
||||
}
|
||||
|
||||
def __init__(self, manager, connection, pairing_config):
|
||||
def __init__(self, manager, connection, pairing_config, is_initiator):
|
||||
self.manager = manager
|
||||
self.connection = connection
|
||||
self.preq = None
|
||||
@@ -684,7 +684,7 @@ class Session:
|
||||
self.ctkd_task = None
|
||||
|
||||
# Decide if we're the initiator or the responder
|
||||
self.is_initiator = connection.role == BT_CENTRAL_ROLE
|
||||
self.is_initiator = is_initiator
|
||||
self.is_responder = not self.is_initiator
|
||||
|
||||
# Listen for connection events
|
||||
@@ -1680,6 +1680,8 @@ class Manager(EventEmitter):
|
||||
def on_smp_pdu(self, connection, pdu):
|
||||
# Look for a session with this connection, and create one if none exists
|
||||
if not (session := self.sessions.get(connection.handle)):
|
||||
if connection.role == BT_CENTRAL_ROLE:
|
||||
logger.warning('Remote starts pairing as Peripheral!')
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
if pairing_config is None:
|
||||
# Pairing disabled
|
||||
@@ -1688,7 +1690,7 @@ class Manager(EventEmitter):
|
||||
SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR),
|
||||
)
|
||||
return
|
||||
session = Session(self, connection, pairing_config)
|
||||
session = Session(self, connection, pairing_config, is_initiator=False)
|
||||
self.sessions[connection.handle] = session
|
||||
|
||||
# Parse the L2CAP payload into an SMP Command object
|
||||
@@ -1709,10 +1711,12 @@ class Manager(EventEmitter):
|
||||
|
||||
async def pair(self, connection):
|
||||
# TODO: check if there's already a session for this connection
|
||||
if connection.role != BT_CENTRAL_ROLE:
|
||||
logger.warning('Start pairing as Peripheral!')
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
if pairing_config is None:
|
||||
raise ValueError('pairing config must not be None when initiating')
|
||||
session = Session(self, connection, pairing_config)
|
||||
session = Session(self, connection, pairing_config, is_initiator=True)
|
||||
self.sessions[connection.handle] = session
|
||||
return await session.pair()
|
||||
|
||||
|
||||
+11
-9
@@ -209,7 +209,7 @@ async def keyboard_host(device, peer_address):
|
||||
return
|
||||
for i, characteristic in enumerate(report_characteristics):
|
||||
print(color('REPORT:', 'yellow'), characteristic)
|
||||
if characteristic.properties & Characteristic.NOTIFY:
|
||||
if characteristic.properties & Characteristic.Properties.NOTIFY:
|
||||
await peer.discover_descriptors(characteristic)
|
||||
report_reference_descriptor = characteristic.get_descriptor(
|
||||
GATT_REPORT_REFERENCE_DESCRIPTOR
|
||||
@@ -241,7 +241,9 @@ async def keyboard_device(device, command):
|
||||
# Create an 'input report' characteristic to send keyboard reports to the host
|
||||
input_report_characteristic = Characteristic(
|
||||
GATT_REPORT_CHARACTERISTIC,
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([0, 0, 0, 0, 0, 0, 0, 0]),
|
||||
[
|
||||
@@ -256,8 +258,8 @@ async def keyboard_device(device, command):
|
||||
# Create an 'output report' characteristic to receive keyboard reports from the host
|
||||
output_report_characteristic = Characteristic(
|
||||
GATT_REPORT_CHARACTERISTIC,
|
||||
Characteristic.READ
|
||||
| Characteristic.WRITE
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([0]),
|
||||
@@ -278,7 +280,7 @@ async def keyboard_device(device, command):
|
||||
[
|
||||
Characteristic(
|
||||
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
'Bumble',
|
||||
)
|
||||
@@ -289,13 +291,13 @@ async def keyboard_device(device, command):
|
||||
[
|
||||
Characteristic(
|
||||
GATT_PROTOCOL_MODE_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes([HID_REPORT_PROTOCOL]),
|
||||
),
|
||||
Characteristic(
|
||||
GATT_HID_INFORMATION_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
# bcdHID=1.1, bCountryCode=0x00,
|
||||
# Flags=RemoteWake|NormallyConnectable
|
||||
@@ -309,7 +311,7 @@ async def keyboard_device(device, command):
|
||||
),
|
||||
Characteristic(
|
||||
GATT_REPORT_MAP_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
HID_KEYBOARD_REPORT_MAP,
|
||||
),
|
||||
@@ -322,7 +324,7 @@ async def keyboard_device(device, command):
|
||||
[
|
||||
Characteristic(
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes([100]),
|
||||
)
|
||||
|
||||
@@ -101,7 +101,7 @@ async def main():
|
||||
# Add the ASHA service to the GATT server
|
||||
read_only_properties_characteristic = Characteristic(
|
||||
ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes(
|
||||
[
|
||||
@@ -127,13 +127,13 @@ async def main():
|
||||
)
|
||||
audio_control_point_characteristic = Characteristic(
|
||||
ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
|
||||
Characteristic.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.WRITEABLE,
|
||||
CharacteristicValue(write=on_audio_control_point_write),
|
||||
)
|
||||
audio_status_characteristic = Characteristic(
|
||||
ASHA_AUDIO_STATUS_CHARACTERISTIC,
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
bytes([0]),
|
||||
)
|
||||
@@ -145,7 +145,7 @@ async def main():
|
||||
)
|
||||
le_psm_out_characteristic = Characteristic(
|
||||
ASHA_LE_PSM_OUT_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
struct.pack('<H', psm),
|
||||
)
|
||||
|
||||
@@ -80,7 +80,7 @@ async def main():
|
||||
)
|
||||
manufacturer_name_characteristic = Characteristic(
|
||||
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
"Fitbit",
|
||||
[descriptor],
|
||||
|
||||
@@ -70,7 +70,7 @@ async def main():
|
||||
)
|
||||
manufacturer_name_characteristic = Characteristic(
|
||||
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
"Fitbit",
|
||||
[descriptor],
|
||||
|
||||
@@ -96,7 +96,7 @@ async def main():
|
||||
)
|
||||
manufacturer_name_characteristic = Characteristic(
|
||||
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
'Fitbit',
|
||||
[descriptor],
|
||||
@@ -109,13 +109,13 @@ async def main():
|
||||
[
|
||||
Characteristic(
|
||||
'D901B45B-4916-412E-ACCA-376ECB603B2C',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
CharacteristicValue(read=my_custom_read, write=my_custom_write),
|
||||
),
|
||||
Characteristic(
|
||||
'552957FB-CF1F-4A31-9535-E78847E1A714',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
CharacteristicValue(
|
||||
read=my_custom_read_with_error, write=my_custom_write_with_error
|
||||
@@ -123,7 +123,7 @@ async def main():
|
||||
),
|
||||
Characteristic(
|
||||
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
'hello',
|
||||
),
|
||||
|
||||
@@ -74,19 +74,21 @@ async def main():
|
||||
# Add a few entries to the device's GATT server
|
||||
characteristic1 = Characteristic(
|
||||
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
bytes([0x40]),
|
||||
)
|
||||
characteristic2 = Characteristic(
|
||||
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
|
||||
Characteristic.READ | Characteristic.INDICATE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
|
||||
Characteristic.READABLE,
|
||||
bytes([0x41]),
|
||||
)
|
||||
characteristic3 = Characteristic(
|
||||
'8EBDEBAE-0017-418E-8D3B-3A3809492165',
|
||||
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.NOTIFY
|
||||
| Characteristic.Properties.INDICATE,
|
||||
Characteristic.READABLE,
|
||||
bytes([0x42]),
|
||||
)
|
||||
|
||||
@@ -43,6 +43,8 @@ install_requires =
|
||||
pyserial >= 3.5; platform_system!='Emscripten'
|
||||
pyusb >= 1.2; platform_system!='Emscripten'
|
||||
websockets >= 8.1; platform_system!='Emscripten'
|
||||
prettytable >= 3.6.0
|
||||
humanize >= 4.6.0
|
||||
|
||||
[options.entry_points]
|
||||
console_scripts =
|
||||
|
||||
+165
-32
@@ -23,6 +23,7 @@ import pytest
|
||||
|
||||
from bumble.controller import Controller
|
||||
from bumble.gatt_client import CharacteristicProxy
|
||||
from bumble.gatt_server import Server
|
||||
from bumble.link import LocalLink
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
@@ -114,7 +115,7 @@ async def test_characteristic_encoding():
|
||||
|
||||
c = Foo(
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
123,
|
||||
)
|
||||
@@ -143,7 +144,9 @@ async def test_characteristic_encoding():
|
||||
|
||||
characteristic = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
@@ -239,7 +242,9 @@ async def test_attribute_getters():
|
||||
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
|
||||
characteristic = Characteristic(
|
||||
characteristic_uuid,
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
@@ -284,7 +289,7 @@ def test_CharacteristicAdapter():
|
||||
v = bytes([1, 2, 3])
|
||||
c = Characteristic(
|
||||
GATT_BATTERY_LEVEL_CHARACTERISTIC,
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
v,
|
||||
)
|
||||
@@ -420,7 +425,7 @@ async def test_read_write():
|
||||
|
||||
characteristic1 = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
)
|
||||
|
||||
@@ -437,7 +442,7 @@ async def test_read_write():
|
||||
|
||||
characteristic2 = Characteristic(
|
||||
'66DE9057-C848-4ACA-B993-D675644EBB85',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
CharacteristicValue(
|
||||
read=on_characteristic2_read, write=on_characteristic2_write
|
||||
@@ -500,7 +505,7 @@ async def test_read_write2():
|
||||
v = bytes([0x11, 0x22, 0x33, 0x44])
|
||||
characteristic1 = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
value=v,
|
||||
)
|
||||
@@ -544,7 +549,7 @@ async def test_subscribe_notify():
|
||||
|
||||
characteristic1 = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE,
|
||||
bytes([1, 2, 3]),
|
||||
)
|
||||
@@ -560,7 +565,7 @@ async def test_subscribe_notify():
|
||||
|
||||
characteristic2 = Characteristic(
|
||||
'66DE9057-C848-4ACA-B993-D675644EBB85',
|
||||
Characteristic.READ | Characteristic.INDICATE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.INDICATE,
|
||||
Characteristic.READABLE,
|
||||
bytes([4, 5, 6]),
|
||||
)
|
||||
@@ -576,7 +581,9 @@ async def test_subscribe_notify():
|
||||
|
||||
characteristic3 = Characteristic(
|
||||
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
|
||||
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.NOTIFY
|
||||
| Characteristic.Properties.INDICATE,
|
||||
Characteristic.READABLE,
|
||||
bytes([7, 8, 9]),
|
||||
)
|
||||
@@ -796,32 +803,46 @@ async def test_mtu_exchange():
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_char_property_to_string():
|
||||
# single
|
||||
assert Characteristic.property_name(0x01) == "BROADCAST"
|
||||
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
|
||||
assert str(Characteristic.Properties(0x01)) == "Properties.BROADCAST"
|
||||
assert str(Characteristic.Properties.BROADCAST) == "Properties.BROADCAST"
|
||||
|
||||
# double
|
||||
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
|
||||
assert str(Characteristic.Properties(0x03)) == "Properties.READ|BROADCAST"
|
||||
assert (
|
||||
Characteristic.properties_as_string(
|
||||
Characteristic.BROADCAST | Characteristic.READ
|
||||
)
|
||||
== "BROADCAST,READ"
|
||||
str(Characteristic.Properties.BROADCAST | Characteristic.Properties.READ)
|
||||
== "Properties.READ|BROADCAST"
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_char_property_string_to_type():
|
||||
def test_characteristic_property_from_string():
|
||||
# single
|
||||
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
|
||||
assert (
|
||||
Characteristic.Properties.from_string("BROADCAST")
|
||||
== Characteristic.Properties.BROADCAST
|
||||
)
|
||||
|
||||
# double
|
||||
assert (
|
||||
Characteristic.string_to_properties("BROADCAST,READ")
|
||||
== Characteristic.BROADCAST | Characteristic.READ
|
||||
Characteristic.Properties.from_string("BROADCAST,READ")
|
||||
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
|
||||
)
|
||||
assert (
|
||||
Characteristic.string_to_properties("READ,BROADCAST")
|
||||
== Characteristic.BROADCAST | Characteristic.READ
|
||||
Characteristic.Properties.from_string("READ,BROADCAST")
|
||||
== Characteristic.Properties.BROADCAST | Characteristic.Properties.READ
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_characteristic_property_from_string_assert():
|
||||
with pytest.raises(TypeError) as e_info:
|
||||
Characteristic.Properties.from_string("BROADCAST,HELLO")
|
||||
|
||||
assert (
|
||||
str(e_info.value)
|
||||
== """Characteristic.Properties::from_string() error:
|
||||
Expected a string containing any of the keys, separated by commas: BROADCAST,READ,WRITE_WITHOUT_RESPONSE,WRITE,NOTIFY,INDICATE,AUTHENTICATED_SIGNED_WRITES,EXTENDED_PROPERTIES
|
||||
Got: BROADCAST,HELLO"""
|
||||
)
|
||||
|
||||
|
||||
@@ -832,7 +853,9 @@ async def test_server_string():
|
||||
|
||||
characteristic = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
@@ -843,13 +866,13 @@ async def test_server_string():
|
||||
assert (
|
||||
str(server.gatt_server)
|
||||
== """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
|
||||
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
|
||||
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
|
||||
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
|
||||
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
|
||||
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
|
||||
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), Properties.READ)
|
||||
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
|
||||
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), Properties.READ)
|
||||
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
|
||||
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
|
||||
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
|
||||
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
|
||||
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, Properties.NOTIFY|WRITE|READ)
|
||||
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
|
||||
)
|
||||
|
||||
@@ -871,21 +894,131 @@ def test_attribute_string_to_permissions():
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_charracteristic_permissions():
|
||||
def test_characteristic_permissions():
|
||||
characteristic = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
'READABLE,WRITEABLE',
|
||||
)
|
||||
assert characteristic.permissions == 3
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_characteristic_has_properties():
|
||||
characteristic = Characteristic(
|
||||
'FDB159DB-036C-49E3-B3DB-6325AC750806',
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.NOTIFY,
|
||||
'READABLE,WRITEABLE',
|
||||
)
|
||||
assert characteristic.has_properties(Characteristic.Properties.READ)
|
||||
assert characteristic.has_properties(
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE
|
||||
)
|
||||
assert not characteristic.has_properties(
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE
|
||||
| Characteristic.Properties.INDICATE
|
||||
)
|
||||
assert not characteristic.has_properties(Characteristic.Properties.INDICATE)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_descriptor_permissions():
|
||||
descriptor = Descriptor('2902', 'READABLE,WRITEABLE')
|
||||
assert descriptor.permissions == 3
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def test_get_attribute_group():
|
||||
device = Device()
|
||||
|
||||
# add some services / characteristics to the gatt server
|
||||
characteristic1 = Characteristic(
|
||||
'1111',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
characteristic2 = Characteristic(
|
||||
'2222',
|
||||
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([123]),
|
||||
)
|
||||
services = [Service('1212', [characteristic1]), Service('3233', [characteristic2])]
|
||||
device.gatt_server.add_services(services)
|
||||
|
||||
# get the handles from gatt server
|
||||
characteristic_attributes1 = device.gatt_server.get_characteristic_attributes(
|
||||
UUID('1212'), UUID('1111')
|
||||
)
|
||||
assert characteristic_attributes1 is not None
|
||||
characteristic_attributes2 = device.gatt_server.get_characteristic_attributes(
|
||||
UUID('3233'), UUID('2222')
|
||||
)
|
||||
assert characteristic_attributes2 is not None
|
||||
descriptor1 = device.gatt_server.get_descriptor_attribute(
|
||||
UUID('1212'), UUID('1111'), UUID('2902')
|
||||
)
|
||||
assert descriptor1 is not None
|
||||
descriptor2 = device.gatt_server.get_descriptor_attribute(
|
||||
UUID('3233'), UUID('2222'), UUID('2902')
|
||||
)
|
||||
assert descriptor2 is not None
|
||||
|
||||
# confirm the handles map back to the service
|
||||
assert (
|
||||
UUID('1212')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes1[0].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('1212')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes1[1].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('1212')
|
||||
== device.gatt_server.get_attribute_group(descriptor1.handle, Service).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('3233')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes2[0].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('3233')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
characteristic_attributes2[1].handle, Service
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('3233')
|
||||
== device.gatt_server.get_attribute_group(descriptor2.handle, Service).uuid
|
||||
)
|
||||
|
||||
# confirm the handles map back to the characteristic
|
||||
assert (
|
||||
UUID('1111')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
descriptor1.handle, Characteristic
|
||||
).uuid
|
||||
)
|
||||
assert (
|
||||
UUID('2222')
|
||||
== device.gatt_server.get_attribute_group(
|
||||
descriptor2.handle, Characteristic
|
||||
).uuid
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
if __name__ == '__main__':
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
|
||||
+8
-5
@@ -163,25 +163,28 @@ async def test_self_gatt():
|
||||
# Add some GATT characteristics to device 1
|
||||
c1 = Characteristic(
|
||||
'3A143AD7-D4A7-436B-97D6-5B62C315E833',
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes([1, 2, 3]),
|
||||
)
|
||||
c2 = Characteristic(
|
||||
'9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8',
|
||||
Characteristic.READ | Characteristic.WRITE,
|
||||
Characteristic.Properties.READ | Characteristic.Properties.WRITE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([4, 5, 6]),
|
||||
)
|
||||
c3 = Characteristic(
|
||||
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
|
||||
Characteristic.READ | Characteristic.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
|
||||
Characteristic.READABLE | Characteristic.WRITEABLE,
|
||||
bytes([7, 8, 9]),
|
||||
)
|
||||
c4 = Characteristic(
|
||||
'84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
|
||||
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
|
||||
Characteristic.Properties.READ
|
||||
| Characteristic.Properties.NOTIFY
|
||||
| Characteristic.Properties.INDICATE,
|
||||
Characteristic.READABLE,
|
||||
bytes([1, 1, 1]),
|
||||
)
|
||||
@@ -234,7 +237,7 @@ async def test_self_gatt_long_read():
|
||||
characteristics = [
|
||||
Characteristic(
|
||||
f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
|
||||
Characteristic.READ,
|
||||
Characteristic.Properties.READ,
|
||||
Characteristic.READABLE,
|
||||
bytes([x & 255 for x in range(i)]),
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user