Compare commits

...

37 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod f2925ca647 support async read/write for characteristic values 2023-12-27 11:52:22 -08:00
Gilles Boccon-Gibod 5d83deffa4 Merge pull request #345 from rdhavan/bumble_hid_device
Bumble hid device implementation - Application and hid profile
2023-12-26 11:10:34 -08:00
Gilles Boccon-Gibod 2878cca478 Merge pull request #378 from benquike/pair_linger
Improve the linger option of bumble-pair
2023-12-26 10:55:28 -08:00
Gilles Boccon-Gibod 53934716db Merge pull request #377 from benquike/irk
Add functions/tool for gen/verifying BLE IRK/RPA
2023-12-26 10:54:18 -08:00
Hui Peng d885d45824 Add functions/tool for gen/verifying BLE IRK/RPA 2023-12-26 09:34:19 -08:00
zxzxwu 8ccfc90fe6 Merge pull request #379 from zxzxwu/addr
Add random address generation methods
2023-12-25 17:28:49 +08:00
Josh Wu 92aa7e9e2a Add random address generation methods 2023-12-24 18:07:40 +08:00
zxzxwu 6139ca8045 Merge pull request #374 from zxzxwu/csip
Complete CSIP and CAP
2023-12-23 02:49:35 +08:00
Josh Wu 87c76a4a0e Complete CSIP and CAP
Also add random address generation functions.
2023-12-23 02:14:32 +08:00
Hui Peng f7b66db873 Improve the linger option in pair tool
No matter pairing fails or not, make linger effective
2023-12-21 17:25:42 -08:00
skarnataki 0b314bd7f7 Updated absctract class and method for on_ctrl_pdu in hid.py 2023-12-18 13:36:25 +00:00
skarnataki 9da2e32ad7 Review comment Fix 3 - rename json file and usage of Optional in parameters 2023-12-15 09:42:57 +00:00
Snehal Karnataki 93c0875740 Merge branch 'google:main' into bumble_hid_device 2023-12-13 09:51:27 +00:00
skarnataki 5e3ecb74e4 Review comment fix -2 2023-12-05 13:41:30 +00:00
Snehal Karnataki c59be293c8 Merge branch 'google:main' into bumble_hid_device 2023-12-05 13:07:36 +00:00
Snehal Karnataki 6d22ed80ec Merge branch 'google:main' into bumble_hid_device 2023-12-04 07:29:04 +00:00
Snehal Karnataki ffb3eca68b Merge branch 'google:main' into bumble_hid_device 2023-11-30 04:50:05 +00:00
skarnataki 403a13e4c6 Review comment fix HID device 2023-11-28 13:42:25 +00:00
Snehal Karnataki ad0f035df5 Merge branch 'google:main' into bumble_hid_device 2023-11-28 13:06:32 +00:00
skarnataki 07f71fc895 Project format and lint error fix. Redefination if Device class needs to be discussed 2023-11-27 13:04:54 +00:00
Fahad Afroze f47b9178ad Added GET_REPORT and SET_REPORT changes
Added changes to handle invalid cases
2023-11-27 11:55:35 +00:00
SneKarnataki 4f399249bd Merge branch 'google:main' into bumble_hid_device 2023-11-27 09:00:44 +00:00
skarnataki 9324237828 send_data comment fix and lint error fix 2023-11-24 11:13:20 +00:00
Fahad Afroze d1033c018a Modified DeviceData class 2023-11-24 05:42:31 +00:00
Fahad Afroze 0f29052ade Added mousemove changes
Also modified keyboard data on keyup
2023-11-23 17:46:55 +00:00
skarnataki 0578e84586 Menu and name change review comments fix 2023-11-23 15:43:22 +00:00
Fahad Afroze 6ab41c466f Add review comment changes 3 2023-11-23 12:27:56 +00:00
Fahad Afroze 98a1093ebf Add review comment changes 2
Also corrected sending mouseData
2023-11-23 09:53:16 +00:00
dhavan caf04373f3 keyboard data moved to DeviceData class 2023-11-23 08:01:07 +00:00
SneKarnataki d4e8526766 Merge branch 'google:main' into bumble_hid_device 2023-11-23 07:59:43 +00:00
dhavan 515b83a8c7 deleted: bumble/classic3.json
modified:   examples/keyboard.html
2023-11-23 06:10:52 +00:00
dhavan dc18595c8a MTU size check added 2023-11-23 05:17:44 +00:00
SneKarnataki 488bcfe9c6 Merge branch 'google:main' into bumble_hid_device 2023-11-23 04:03:53 +00:00
dhavan d6cefdff8e Renamed the status message class 2023-11-22 17:14:24 +00:00
dhavan dc410b14c4 SET_REPORT and GET_REPORT implemented 2023-11-22 16:05:33 +00:00
dhavan 4c49ef9403 SET_REPORT implemented 2023-11-22 12:31:34 +00:00
dhavan ba85dcbda5 Get the changes from hid_device to bumble_hid_device
Modified the get_report_cb
2023-11-22 11:06:27 +00:00
26 changed files with 1862 additions and 279 deletions
+63
View File
@@ -0,0 +1,63 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import click
from bumble.colors import color
from bumble.hci import Address
from bumble.helpers import generate_irk, verify_rpa_with_irk
@click.group()
def cli():
'''
This is a tool for generating IRK, RPA,
and verifying IRK/RPA pairs
'''
@click.command()
def gen_irk() -> None:
print(generate_irk().hex())
@click.command()
@click.argument("irk", type=str)
def gen_rpa(irk: str) -> None:
irk_bytes = bytes.fromhex(irk)
rpa = Address.generate_private_address(irk_bytes)
print(rpa.to_string(with_type_qualifier=False))
@click.command()
@click.argument("irk", type=str)
@click.argument("rpa", type=str)
def verify_rpa(irk: str, rpa: str) -> None:
address = Address(rpa)
irk_bytes = bytes.fromhex(irk)
if verify_rpa_with_irk(address, irk_bytes):
print(color("Verified", "green"))
else:
print(color("Not Verified", "red"))
def main():
cli.add_command(gen_irk)
cli.add_command(gen_rpa)
cli.add_command(verify_rpa)
cli()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()
+4 -4
View File
@@ -777,7 +777,7 @@ class ConsoleApp:
if not service:
continue
values = [
attribute.read_value(connection)
await attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
@@ -796,11 +796,11 @@ class ConsoleApp:
if not characteristic:
continue
values = [
attribute.read_value(connection)
await attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
values = [await attribute.read_value(None)]
# TODO: future optimization: convert CCCD value to human readable string
@@ -944,7 +944,7 @@ class ConsoleApp:
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
await attribute.write_value(None, value)
if attribute.has_properties(Characteristic.NOTIFY):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties(Characteristic.INDICATE):
+6 -8
View File
@@ -52,11 +52,13 @@ from bumble.att import (
class Waiter:
instance = None
def __init__(self):
def __init__(self, linger=False):
self.done = asyncio.get_running_loop().create_future()
self.linger = linger
def terminate(self):
self.done.set_result(None)
if not self.linger:
self.done.set_result(None)
async def wait_until_terminated(self):
return await self.done
@@ -302,7 +304,7 @@ async def pair(
hci_transport,
address_or_name,
):
Waiter.instance = Waiter()
Waiter.instance = Waiter(linger=linger)
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
@@ -396,7 +398,6 @@ async def pair(
address_or_name,
transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT,
)
pairing_failure = False
if not request:
try:
@@ -405,11 +406,8 @@ async def pair(
else:
await connection.authenticate()
except ProtocolError as error:
pairing_failure = True
print(color(f'Pairing failed: {error}', 'red'))
if not linger or pairing_failure:
return
else:
if mode == 'le':
# Advertise so that peers can find us and connect
@@ -459,7 +457,7 @@ class LogHandler(logging.Handler):
help='Enable CTKD',
show_default=True,
)
@click.option('--linger', default=True, is_flag=True, help='Linger after pairing')
@click.option('--linger', default=False, is_flag=True, help='Linger after pairing')
@click.option(
'--io',
type=click.Choice(
+53 -11
View File
@@ -25,9 +25,21 @@
from __future__ import annotations
import enum
import functools
import inspect
import struct
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Type,
Union,
TYPE_CHECKING,
)
from pyee import EventEmitter
from typing import Dict, Type, List, Protocol, Union, Optional, Any, TYPE_CHECKING
from bumble.core import UUID, name_or_number, ProtocolError
from bumble.hci import HCI_Object, key_with_value
@@ -722,12 +734,38 @@ class ATT_Handle_Value_Confirmation(ATT_PDU):
# -----------------------------------------------------------------------------
class ConnectionValue(Protocol):
def read(self, connection) -> bytes:
...
class AttributeValue:
'''
Attribute value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
'''
def write(self, connection, value: bytes) -> None:
...
def __init__(
self,
read: Union[
Callable[[Optional[Connection]], bytes],
Callable[[Optional[Connection]], Awaitable[bytes]],
None,
] = None,
write: Union[
Callable[[Optional[Connection], bytes], None],
Callable[[Optional[Connection], bytes], Awaitable[None]],
None,
] = None,
):
self._read = read
self._write = write
def read(self, connection: Optional[Connection]) -> Union[bytes, Awaitable[bytes]]:
return self._read(connection) if self._read else b''
def write(
self, connection: Optional[Connection], value: bytes
) -> Union[Awaitable[None], None]:
if self._write:
return self._write(connection, value)
return None
# -----------------------------------------------------------------------------
@@ -770,13 +808,13 @@ class Attribute(EventEmitter):
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
value: Union[str, bytes, ConnectionValue]
value: Union[bytes, AttributeValue]
def __init__(
self,
attribute_type: Union[str, bytes, UUID],
permissions: Union[str, Attribute.Permissions],
value: Union[str, bytes, ConnectionValue] = b'',
value: Union[str, bytes, AttributeValue] = b'',
) -> None:
EventEmitter.__init__(self)
self.handle = 0
@@ -806,7 +844,7 @@ class Attribute(EventEmitter):
def decode_value(self, value_bytes: bytes) -> Any:
return value_bytes
def read_value(self, connection: Optional[Connection]) -> bytes:
async def read_value(self, connection: Optional[Connection]) -> bytes:
if (
(self.permissions & self.READ_REQUIRES_ENCRYPTION)
and connection is not None
@@ -832,6 +870,8 @@ class Attribute(EventEmitter):
if hasattr(self.value, 'read'):
try:
value = self.value.read(connection)
if inspect.isawaitable(value):
value = await value
except ATT_Error as error:
raise ATT_Error(
error_code=error.error_code, att_handle=self.handle
@@ -841,7 +881,7 @@ class Attribute(EventEmitter):
return self.encode_value(value)
def write_value(self, connection: Connection, value_bytes: bytes) -> None:
async def write_value(self, connection: Connection, value_bytes: bytes) -> None:
if (
self.permissions & self.WRITE_REQUIRES_ENCRYPTION
) and not connection.encryption:
@@ -864,7 +904,9 @@ class Attribute(EventEmitter):
if hasattr(self.value, 'write'):
try:
self.value.write(connection, value) # pylint: disable=not-callable
result = self.value.write(connection, value)
if inspect.isawaitable(result):
await result
except ATT_Error as error:
raise ATT_Error(
error_code=error.error_code, att_handle=self.handle
+10
View File
@@ -100,6 +100,16 @@ class EccKey:
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
def generate_prand() -> bytes:
'''Generates random 3 bytes, with the 2 most significant bits of 0b01.
See Bluetooth spec, Vol 6, Part E - Table 1.2.
'''
prand_bytes = secrets.token_bytes(6)
return prand_bytes[:2] + bytes([(prand_bytes[2] & 0b01111111) | 0b01000000])
# -----------------------------------------------------------------------------
def xor(x: bytes, y: bytes) -> bytes:
assert len(x) == len(y)
+66 -51
View File
@@ -23,16 +23,28 @@
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import enum
import functools
import logging
import struct
from typing import Optional, Sequence, Iterable, List, Union
from typing import (
Callable,
Dict,
Iterable,
List,
Optional,
Sequence,
Union,
TYPE_CHECKING,
)
from .colors import color
from .core import UUID, get_dict_key_by_value
from .att import Attribute
from bumble.colors import color
from bumble.core import UUID
from bumble.att import Attribute, AttributeValue
if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy
from bumble.device import Connection
# -----------------------------------------------------------------------------
@@ -368,9 +380,12 @@ class TemplateService(Service):
UUID: UUID
def __init__(
self, characteristics: List[Characteristic], primary: bool = True
self,
characteristics: List[Characteristic],
primary: bool = True,
included_services: List[Service] = [],
) -> None:
super().__init__(self.UUID, characteristics, primary)
super().__init__(self.UUID, characteristics, primary, included_services)
# -----------------------------------------------------------------------------
@@ -519,56 +534,43 @@ class CharacteristicDeclaration(Attribute):
# -----------------------------------------------------------------------------
class CharacteristicValue:
'''
Characteristic value where reading and/or writing is delegated to functions
passed as arguments to the constructor.
'''
def __init__(self, read=None, write=None):
self._read = read
self._write = write
def read(self, connection):
return self._read(connection) if self._read else b''
def write(self, connection, value):
if self._write:
self._write(connection, value)
class CharacteristicValue(AttributeValue):
"""Same as AttributeValue, for backward compatibility"""
# -----------------------------------------------------------------------------
class CharacteristicAdapter:
'''
An adapter that can adapt any object with `read_value` and `write_value`
methods (like Characteristic and CharacteristicProxy objects) by wrapping
those methods with ones that return/accept encoded/decoded values.
Objects with async methods are considered proxies, so the adaptation is one
where the return value of `read_value` is decoded and the value passed to
`write_value` is encoded. Other objects are considered local characteristics
so the adaptation is one where the return value of `read_value` is encoded
and the value passed to `write_value` is decoded.
If the characteristic has a `subscribe` method, it is wrapped with one where
the values are decoded before being passed to the subscriber.
An adapter that can adapt Characteristic and AttributeProxy objects
by wrapping their `read_value()` and `write_value()` methods with ones that
return/accept encoded/decoded values.
For proxies (i.e used by a GATT client), the adaptation is one where the return
value of `read_value()` is decoded and the value passed to `write_value()` is
encoded. The `subscribe()` method, is wrapped with one where the values are decoded
before being passed to the subscriber.
For local values (i.e hosted by a GATT server) the adaptation is one where the
return value of `read_value()` is encoded and the value passed to `write_value()`
is decoded.
'''
def __init__(self, characteristic):
self.wrapped_characteristic = characteristic
self.subscribers = {} # Map from subscriber to proxy subscriber
read_value: Callable
write_value: Callable
if asyncio.iscoroutinefunction(
characteristic.read_value
) and asyncio.iscoroutinefunction(characteristic.write_value):
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
else:
def __init__(self, characteristic: Union[Characteristic, AttributeProxy]):
self.wrapped_characteristic = characteristic
self.subscribers: Dict[
Callable, Callable
] = {} # Map from subscriber to proxy subscriber
if isinstance(characteristic, Characteristic):
self.read_value = self.read_encoded_value
self.write_value = self.write_encoded_value
if hasattr(self.wrapped_characteristic, 'subscribe'):
else:
self.read_value = self.read_decoded_value
self.write_value = self.write_decoded_value
self.subscribe = self.wrapped_subscribe
if hasattr(self.wrapped_characteristic, 'unsubscribe'):
self.unsubscribe = self.wrapped_unsubscribe
def __getattr__(self, name):
@@ -587,11 +589,13 @@ class CharacteristicAdapter:
else:
setattr(self.wrapped_characteristic, name, value)
def read_encoded_value(self, connection):
return self.encode_value(self.wrapped_characteristic.read_value(connection))
async def read_encoded_value(self, connection):
return self.encode_value(
await self.wrapped_characteristic.read_value(connection)
)
def write_encoded_value(self, connection, value):
return self.wrapped_characteristic.write_value(
async def write_encoded_value(self, connection, value):
return await self.wrapped_characteristic.write_value(
connection, self.decode_value(value)
)
@@ -726,13 +730,24 @@ class Descriptor(Attribute):
'''
def __str__(self) -> str:
if isinstance(self.value, bytes):
value_str = self.value.hex()
elif isinstance(self.value, CharacteristicValue):
value = self.value.read(None)
if isinstance(value, bytes):
value_str = value.hex()
else:
value_str = '<async>'
else:
value_str = '<...>'
return (
f'Descriptor(handle=0x{self.handle:04X}, '
f'type={self.type}, '
f'value={self.read_value(None).hex()})'
f'value={value_str})'
)
# -----------------------------------------------------------------------------
class ClientCharacteristicConfigurationBits(enum.IntFlag):
'''
See Vol 3, Part G - 3.3.3.3 - Table 3.11 Client Characteristic Configuration bit
+29 -21
View File
@@ -31,9 +31,9 @@ import struct
from typing import List, Tuple, Optional, TypeVar, Type, Dict, Iterable, TYPE_CHECKING
from pyee import EventEmitter
from .colors import color
from .core import UUID
from .att import (
from bumble.colors import color
from bumble.core import UUID
from bumble.att import (
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_ATTRIBUTE_NOT_LONG_ERROR,
ATT_CID,
@@ -60,7 +60,7 @@ from .att import (
ATT_Write_Response,
Attribute,
)
from .gatt import (
from bumble.gatt import (
GATT_CHARACTERISTIC_ATTRIBUTE_TYPE,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
GATT_MAX_ATTRIBUTE_VALUE_SIZE,
@@ -74,6 +74,7 @@ from .gatt import (
Descriptor,
Service,
)
from bumble.utils import AsyncRunner
if TYPE_CHECKING:
from bumble.device import Device, Connection
@@ -379,7 +380,7 @@ class Server(EventEmitter):
# Get or encode the value
value = (
attribute.read_value(connection)
await attribute.read_value(connection)
if value is None
else attribute.encode_value(value)
)
@@ -422,7 +423,7 @@ class Server(EventEmitter):
# Get or encode the value
value = (
attribute.read_value(connection)
await attribute.read_value(connection)
if value is None
else attribute.encode_value(value)
)
@@ -650,7 +651,8 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_find_by_type_value_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_find_by_type_value_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.3.3 Find By Type Value Request
'''
@@ -658,13 +660,13 @@ class Server(EventEmitter):
# Build list of returned attributes
pdu_space_available = connection.att_mtu - 2
attributes = []
for attribute in (
async for attribute in (
attribute
for attribute in self.attributes
if attribute.handle >= request.starting_handle
and attribute.handle <= request.ending_handle
and attribute.type == request.attribute_type
and attribute.read_value(connection) == request.attribute_value
and (await attribute.read_value(connection)) == request.attribute_value
and pdu_space_available >= 4
):
# TODO: check permissions
@@ -702,7 +704,8 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_read_by_type_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_by_type_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.1 Read By Type Request
'''
@@ -725,7 +728,7 @@ class Server(EventEmitter):
and pdu_space_available
):
try:
attribute_value = attribute.read_value(connection)
attribute_value = await attribute.read_value(connection)
except ATT_Error as error:
# If the first attribute is unreadable, return an error
# Otherwise return attributes up to this point
@@ -767,14 +770,15 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_read_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.3 Read Request
'''
if attribute := self.get_attribute(request.attribute_handle):
try:
value = attribute.read_value(connection)
value = await attribute.read_value(connection)
except ATT_Error as error:
response = ATT_Error_Response(
request_opcode_in_error=request.op_code,
@@ -792,14 +796,15 @@ class Server(EventEmitter):
)
self.send_response(connection, response)
def on_att_read_blob_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_blob_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.5 Read Blob Request
'''
if attribute := self.get_attribute(request.attribute_handle):
try:
value = attribute.read_value(connection)
value = await attribute.read_value(connection)
except ATT_Error as error:
response = ATT_Error_Response(
request_opcode_in_error=request.op_code,
@@ -836,7 +841,8 @@ class Server(EventEmitter):
)
self.send_response(connection, response)
def on_att_read_by_group_type_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_read_by_group_type_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.4.9 Read by Group Type Request
'''
@@ -864,7 +870,7 @@ class Server(EventEmitter):
):
# No need to catch permission errors here, since these attributes
# must all be world-readable
attribute_value = attribute.read_value(connection)
attribute_value = await attribute.read_value(connection)
# Check the attribute value size
max_attribute_size = min(connection.att_mtu - 6, 251)
if len(attribute_value) > max_attribute_size:
@@ -903,7 +909,8 @@ class Server(EventEmitter):
self.send_response(connection, response)
def on_att_write_request(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_write_request(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.5.1 Write Request
'''
@@ -936,12 +943,13 @@ class Server(EventEmitter):
return
# Accept the value
attribute.write_value(connection, request.attribute_value)
await attribute.write_value(connection, request.attribute_value)
# Done
self.send_response(connection, ATT_Write_Response())
def on_att_write_command(self, connection, request):
@AsyncRunner.run_in_task()
async def on_att_write_command(self, connection, request):
'''
See Bluetooth spec Vol 3, Part F - 3.4.5.3 Write Command
'''
@@ -959,7 +967,7 @@ class Server(EventEmitter):
# Accept the value
try:
attribute.write_value(connection, request.attribute_value)
await attribute.write_value(connection, request.attribute_value)
except Exception as error:
logger.exception(f'!!! ignoring exception: {error}')
+39
View File
@@ -21,9 +21,11 @@ import dataclasses
import enum
import functools
import logging
import secrets
import struct
from typing import Any, Dict, Callable, Optional, Type, Union, List
from bumble import crypto
from .colors import color
from .core import (
BT_BR_EDR_TRANSPORT,
@@ -1881,6 +1883,43 @@ class Address:
address_type = data[offset - 1]
return Address.parse_address_with_type(data, offset, address_type)
@classmethod
def generate_static_address(cls) -> Address:
'''Generates Random Static Address, with the 2 most significant bits of 0b11.
See Bluetooth spec, Vol 6, Part B - Table 1.2.
'''
address_bytes = secrets.token_bytes(6)
address_bytes = address_bytes[:5] + bytes([address_bytes[5] | 0b11000000])
return Address(
address=address_bytes, address_type=Address.RANDOM_DEVICE_ADDRESS
)
@classmethod
def generate_private_address(cls, irk: bytes = b'') -> Address:
'''Generates Random Private MAC Address.
If IRK is present, a Resolvable Private Address, with the 2 most significant
bits of 0b01 will be generated. Otherwise, a Non-resolvable Private Address,
with the 2 most significant bits of 0b00 will be generated.
See Bluetooth spec, Vol 6, Part B - Table 1.2.
Args:
irk: Local Identity Resolving Key(IRK), in little-endian. If not set, a
non-resolvable address will be generated.
'''
if irk:
prand = crypto.generate_prand()
address_bytes = crypto.ah(irk, prand) + prand
else:
address_bytes = secrets.token_bytes(6)
address_bytes = address_bytes[:5] + bytes([address_bytes[5] & 0b00111111])
return Address(
address=address_bytes, address_type=Address.RANDOM_DEVICE_ADDRESS
)
def __init__(
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
):
+14
View File
@@ -37,6 +37,7 @@ from bumble.l2cap import (
L2CAP_Connection_Response,
)
from bumble.hci import (
Address,
HCI_EVENT_PACKET,
HCI_ACL_DATA_PACKET,
HCI_DISCONNECTION_COMPLETE_EVENT,
@@ -48,6 +49,7 @@ from bumble.hci import (
)
from bumble.rfcomm import RFCOMM_Frame, RFCOMM_PSM
from bumble.sdp import SDP_PDU, SDP_PSM
from bumble import crypto
# -----------------------------------------------------------------------------
# Logging
@@ -232,3 +234,15 @@ class PacketTracer:
)
self.host_to_controller_analyzer.peer = self.controller_to_host_analyzer
self.controller_to_host_analyzer.peer = self.host_to_controller_analyzer
def generate_irk() -> bytes:
return crypto.r()
def verify_rpa_with_irk(rpa: Address, irk: bytes) -> bool:
rpa_bytes = bytes(rpa)
prand_given = rpa_bytes[3:]
hash_given = rpa_bytes[:3]
hash_local = crypto.ah(irk, prand_given)
return hash_local[:3] == hash_given
+314 -93
View File
@@ -19,16 +19,17 @@ from __future__ import annotations
from dataclasses import dataclass
import logging
import enum
import struct
from abc import ABC, abstractmethod
from pyee import EventEmitter
from typing import Optional, TYPE_CHECKING
from typing import Optional, Callable, TYPE_CHECKING
from typing_extensions import override
from bumble import l2cap
from bumble import l2cap, device
from bumble.colors import color
from bumble.core import InvalidStateError, ProtocolError
if TYPE_CHECKING:
from bumble.device import Device, Connection
from .hci import Address
# -----------------------------------------------------------------------------
@@ -60,6 +61,7 @@ class Message:
NOT_READY = 0x01
ERR_INVALID_REPORT_ID = 0x02
ERR_UNSUPPORTED_REQUEST = 0x03
ERR_INVALID_PARAMETER = 0x04
ERR_UNKNOWN = 0x0E
ERR_FATAL = 0x0F
@@ -101,13 +103,14 @@ class GetReportMessage(Message):
def __bytes__(self) -> bytes:
packet_bytes = bytearray()
packet_bytes.append(self.report_id)
packet_bytes.extend(
[(self.buffer_size & 0xFF), ((self.buffer_size >> 8) & 0xFF)]
)
if self.report_type == Message.ReportType.OTHER_REPORT:
if self.buffer_size == 0:
return self.header(self.report_type) + packet_bytes
else:
return self.header(0x08 | self.report_type) + packet_bytes
return (
self.header(0x08 | self.report_type)
+ packet_bytes
+ struct.pack("<H", self.buffer_size)
)
@dataclass
@@ -120,6 +123,16 @@ class SetReportMessage(Message):
return self.header(self.report_type) + self.data
@dataclass
class SendControlData(Message):
report_type: int
data: bytes
message_type = Message.MessageType.DATA
def __bytes__(self) -> bytes:
return self.header(self.report_type) + self.data
@dataclass
class GetProtocolMessage(Message):
message_type = Message.MessageType.GET_PROTOCOL
@@ -161,31 +174,47 @@ class VirtualCableUnplug(Message):
return self.header(Message.ControlCommand.VIRTUAL_CABLE_UNPLUG)
# Device sends input report, host sends output report.
@dataclass
class SendData(Message):
data: bytes
report_type: int
message_type = Message.MessageType.DATA
def __bytes__(self) -> bytes:
return self.header(Message.ReportType.OUTPUT_REPORT) + self.data
return self.header(self.report_type) + self.data
@dataclass
class SendHandshakeMessage(Message):
result_code: int
message_type = Message.MessageType.HANDSHAKE
def __bytes__(self) -> bytes:
return self.header(self.result_code)
# -----------------------------------------------------------------------------
class Host(EventEmitter):
l2cap_ctrl_channel: Optional[l2cap.ClassicChannel]
l2cap_intr_channel: Optional[l2cap.ClassicChannel]
class HID(ABC, EventEmitter):
l2cap_ctrl_channel: Optional[l2cap.ClassicChannel] = None
l2cap_intr_channel: Optional[l2cap.ClassicChannel] = None
connection: Optional[device.Connection] = None
def __init__(self, device: Device, connection: Connection) -> None:
class Role(enum.IntEnum):
HOST = 0x00
DEVICE = 0x01
def __init__(self, device: device.Device, role: Role) -> None:
super().__init__()
self.remote_device_bd_address: Optional[Address] = None
self.device = device
self.connection = connection
self.l2cap_ctrl_channel = None
self.l2cap_intr_channel = None
self.role = role
# Register ourselves with the L2CAP channel manager
device.register_l2cap_server(HID_CONTROL_PSM, self.on_connection)
device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_connection)
device.register_l2cap_server(HID_CONTROL_PSM, self.on_l2cap_connection)
device.register_l2cap_server(HID_INTERRUPT_PSM, self.on_l2cap_connection)
device.on('connection', self.on_device_connection)
async def connect_control_channel(self) -> None:
# Create a new L2CAP connection - control channel
@@ -229,9 +258,18 @@ class Host(EventEmitter):
self.l2cap_ctrl_channel = None
await channel.disconnect()
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
def on_device_connection(self, connection: device.Connection) -> None:
self.connection = connection
self.remote_device_bd_address = connection.peer_address
connection.on('disconnection', self.on_device_disconnection)
def on_device_disconnection(self, reason: int) -> None:
self.connection = None
def on_l2cap_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
logger.debug(f'+++ New L2CAP connection: {l2cap_channel}')
l2cap_channel.on('open', lambda: self.on_l2cap_channel_open(l2cap_channel))
l2cap_channel.on('close', lambda: self.on_l2cap_channel_close(l2cap_channel))
def on_l2cap_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
if l2cap_channel.psm == HID_CONTROL_PSM:
@@ -242,63 +280,20 @@ class Host(EventEmitter):
self.l2cap_intr_channel.sink = self.on_intr_pdu
logger.debug(f'$$$ L2CAP channel open: {l2cap_channel}')
def on_ctrl_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
# Here we will receive all kinds of packets, parse and then call respective callbacks
message_type = pdu[0] >> 4
param = pdu[0] & 0x0F
if message_type == Message.MessageType.HANDSHAKE:
logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}')
self.emit('handshake', Message.Handshake(param))
elif message_type == Message.MessageType.DATA:
logger.debug('<<< HID CONTROL DATA')
self.emit('data', pdu)
elif message_type == Message.MessageType.CONTROL:
if param == Message.ControlCommand.SUSPEND:
logger.debug('<<< HID SUSPEND')
self.emit('suspend', pdu)
elif param == Message.ControlCommand.EXIT_SUSPEND:
logger.debug('<<< HID EXIT SUSPEND')
self.emit('exit_suspend', pdu)
elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
self.emit('virtual_cable_unplug')
else:
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
def on_l2cap_channel_close(self, l2cap_channel: l2cap.ClassicChannel) -> None:
if l2cap_channel.psm == HID_CONTROL_PSM:
self.l2cap_ctrl_channel = None
else:
logger.debug('<<< HID CONTROL DATA')
self.emit('data', pdu)
self.l2cap_intr_channel = None
logger.debug(f'$$$ L2CAP channel close: {l2cap_channel}')
@abstractmethod
def on_ctrl_pdu(self, pdu: bytes) -> None:
pass
def on_intr_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID INTERRUPT PDU: {pdu.hex()}')
self.emit("data", pdu)
def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None:
msg = GetReportMessage(
report_type=report_type, report_id=report_id, buffer_size=buffer_size
)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def set_report(self, report_type: int, data: bytes):
msg = SetReportMessage(report_type=report_type, data=data)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def get_protocol(self):
msg = GetProtocolMessage()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def set_protocol(self, protocol_mode: int):
msg = SetProtocolMessage(protocol_mode=protocol_mode)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
self.emit("interrupt_data", pdu)
def send_pdu_on_ctrl(self, msg: bytes) -> None:
assert self.l2cap_ctrl_channel
@@ -308,26 +303,252 @@ class Host(EventEmitter):
assert self.l2cap_intr_channel
self.l2cap_intr_channel.send_pdu(msg)
def send_data(self, data):
msg = SendData(data)
def send_data(self, data: bytes) -> None:
if self.role == HID.Role.HOST:
report_type = Message.ReportType.OUTPUT_REPORT
else:
report_type = Message.ReportType.INPUT_REPORT
msg = SendData(data, report_type)
hid_message = bytes(msg)
logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}')
self.send_pdu_on_intr(hid_message)
if self.l2cap_intr_channel is not None:
logger.debug(f'>>> HID INTERRUPT SEND DATA, PDU: {hid_message.hex()}')
self.send_pdu_on_intr(hid_message)
def suspend(self):
msg = Suspend()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{hid_message.hex()}')
self.send_pdu_on_ctrl(msg)
def exit_suspend(self):
msg = ExitSuspend()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}')
self.send_pdu_on_ctrl(msg)
def virtual_cable_unplug(self):
def virtual_cable_unplug(self) -> None:
msg = VirtualCableUnplug()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL VIRTUAL CABLE UNPLUG, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(msg)
self.send_pdu_on_ctrl(hid_message)
# -----------------------------------------------------------------------------
class Device(HID):
class GetSetReturn(enum.IntEnum):
FAILURE = 0x00
REPORT_ID_NOT_FOUND = 0x01
ERR_UNSUPPORTED_REQUEST = 0x02
ERR_UNKNOWN = 0x03
ERR_INVALID_PARAMETER = 0x04
SUCCESS = 0xFF
class GetSetStatus:
def __init__(self) -> None:
self.data = bytearray()
self.status = 0
def __init__(self, device: device.Device) -> None:
super().__init__(device, HID.Role.DEVICE)
get_report_cb: Optional[Callable[[int, int, int], None]] = None
set_report_cb: Optional[Callable[[int, int, int, bytes], None]] = None
get_protocol_cb: Optional[Callable[[], None]] = None
set_protocol_cb: Optional[Callable[[int], None]] = None
@override
def on_ctrl_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
param = pdu[0] & 0x0F
message_type = pdu[0] >> 4
if message_type == Message.MessageType.GET_REPORT:
logger.debug('<<< HID GET REPORT')
self.handle_get_report(pdu)
elif message_type == Message.MessageType.SET_REPORT:
logger.debug('<<< HID SET REPORT')
self.handle_set_report(pdu)
elif message_type == Message.MessageType.GET_PROTOCOL:
logger.debug('<<< HID GET PROTOCOL')
self.handle_get_protocol(pdu)
elif message_type == Message.MessageType.SET_PROTOCOL:
logger.debug('<<< HID SET PROTOCOL')
self.handle_set_protocol(pdu)
elif message_type == Message.MessageType.DATA:
logger.debug('<<< HID CONTROL DATA')
self.emit('control_data', pdu)
elif message_type == Message.MessageType.CONTROL:
if param == Message.ControlCommand.SUSPEND:
logger.debug('<<< HID SUSPEND')
self.emit('suspend')
elif param == Message.ControlCommand.EXIT_SUSPEND:
logger.debug('<<< HID EXIT SUSPEND')
self.emit('exit_suspend')
elif param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
self.emit('virtual_cable_unplug')
else:
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
else:
logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED')
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def send_handshake_message(self, result_code: int) -> None:
msg = SendHandshakeMessage(result_code)
hid_message = bytes(msg)
logger.debug(f'>>> HID HANDSHAKE MESSAGE, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def send_control_data(self, report_type: int, data: bytes):
msg = SendControlData(report_type=report_type, data=data)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL DATA: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def handle_get_report(self, pdu: bytes):
if self.get_report_cb is None:
logger.debug("GetReport callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
return
report_type = pdu[0] & 0x03
buffer_flag = (pdu[0] & 0x08) >> 3
report_id = pdu[1]
logger.debug("buffer_flag: " + str(buffer_flag))
if buffer_flag == 1:
buffer_size = (pdu[3] << 8) | pdu[2]
else:
buffer_size = 0
ret = self.get_report_cb(report_id, report_type, buffer_size)
assert ret is not None
if ret.status == self.GetSetReturn.FAILURE:
self.send_handshake_message(Message.Handshake.ERR_UNKNOWN)
elif ret.status == self.GetSetReturn.SUCCESS:
data = bytearray()
data.append(report_id)
data.extend(ret.data)
if len(data) < self.l2cap_ctrl_channel.mtu: # type: ignore[union-attr]
self.send_control_data(report_type=report_type, data=data)
else:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
elif ret.status == self.GetSetReturn.ERR_UNSUPPORTED_REQUEST:
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_get_report_cb(self, cb: Callable[[int, int, int], None]) -> None:
self.get_report_cb = cb
logger.debug("GetReport callback registered successfully")
def handle_set_report(self, pdu: bytes):
if self.set_report_cb is None:
logger.debug("SetReport callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
return
report_type = pdu[0] & 0x03
report_id = pdu[1]
report_data = pdu[2:]
report_size = len(report_data) + 1
ret = self.set_report_cb(report_id, report_type, report_size, report_data)
assert ret is not None
if ret.status == self.GetSetReturn.SUCCESS:
self.send_handshake_message(Message.Handshake.SUCCESSFUL)
elif ret.status == self.GetSetReturn.ERR_INVALID_PARAMETER:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)
elif ret.status == self.GetSetReturn.REPORT_ID_NOT_FOUND:
self.send_handshake_message(Message.Handshake.ERR_INVALID_REPORT_ID)
else:
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_set_report_cb(
self, cb: Callable[[int, int, int, bytes], None]
) -> None:
self.set_report_cb = cb
logger.debug("SetReport callback registered successfully")
def handle_get_protocol(self, pdu: bytes):
if self.get_protocol_cb is None:
logger.debug("GetProtocol callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
return
ret = self.get_protocol_cb()
assert ret is not None
if ret.status == self.GetSetReturn.SUCCESS:
self.send_control_data(Message.ReportType.OTHER_REPORT, ret.data)
else:
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_get_protocol_cb(self, cb: Callable[[], None]) -> None:
self.get_protocol_cb = cb
logger.debug("GetProtocol callback registered successfully")
def handle_set_protocol(self, pdu: bytes):
if self.set_protocol_cb is None:
logger.debug("SetProtocol callback not registered !!")
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
return
ret = self.set_protocol_cb(pdu[0] & 0x01)
assert ret is not None
if ret.status == self.GetSetReturn.SUCCESS:
self.send_handshake_message(Message.Handshake.SUCCESSFUL)
else:
self.send_handshake_message(Message.Handshake.ERR_UNSUPPORTED_REQUEST)
def register_set_protocol_cb(self, cb: Callable[[int], None]) -> None:
self.set_protocol_cb = cb
logger.debug("SetProtocol callback registered successfully")
# -----------------------------------------------------------------------------
class Host(HID):
def __init__(self, device: device.Device) -> None:
super().__init__(device, HID.Role.HOST)
def get_report(self, report_type: int, report_id: int, buffer_size: int) -> None:
msg = GetReportMessage(
report_type=report_type, report_id=report_id, buffer_size=buffer_size
)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL GET REPORT, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def set_report(self, report_type: int, data: bytes) -> None:
msg = SetReportMessage(report_type=report_type, data=data)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL SET REPORT, PDU:{hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def get_protocol(self) -> None:
msg = GetProtocolMessage()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL GET PROTOCOL, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def set_protocol(self, protocol_mode: int) -> None:
msg = SetProtocolMessage(protocol_mode=protocol_mode)
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL SET PROTOCOL, PDU: {hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def suspend(self) -> None:
msg = Suspend()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL SUSPEND, PDU:{hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
def exit_suspend(self) -> None:
msg = ExitSuspend()
hid_message = bytes(msg)
logger.debug(f'>>> HID CONTROL EXIT SUSPEND, PDU:{hid_message.hex()}')
self.send_pdu_on_ctrl(hid_message)
@override
def on_ctrl_pdu(self, pdu: bytes) -> None:
logger.debug(f'<<< HID CONTROL PDU: {pdu.hex()}')
param = pdu[0] & 0x0F
message_type = pdu[0] >> 4
if message_type == Message.MessageType.HANDSHAKE:
logger.debug(f'<<< HID HANDSHAKE: {Message.Handshake(param).name}')
self.emit('handshake', Message.Handshake(param))
elif message_type == Message.MessageType.DATA:
logger.debug('<<< HID CONTROL DATA')
self.emit('control_data', pdu)
elif message_type == Message.MessageType.CONTROL:
if param == Message.ControlCommand.VIRTUAL_CABLE_UNPLUG:
logger.debug('<<< HID VIRTUAL CABLE UNPLUG')
self.emit('virtual_cable_unplug')
else:
logger.debug('<<< HID CONTROL OPERATION UNSUPPORTED')
else:
logger.debug('<<< HID MESSAGE TYPE UNSUPPORTED')
+2 -2
View File
@@ -18,7 +18,7 @@
# -----------------------------------------------------------------------------
import struct
import logging
from typing import List
from typing import List, Optional
from bumble import l2cap
from ..core import AdvertisingData
@@ -67,7 +67,7 @@ class AshaService(TemplateService):
self.emit('volume', connection, value[0])
# Handler for audio control commands
def on_audio_control_point_write(connection: Connection, value):
def on_audio_control_point_write(connection: Optional[Connection], value):
logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
opcode = value[0]
if opcode == AshaService.OPCODE_START:
+3 -3
View File
@@ -114,7 +114,7 @@ class SamplingFrequency(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
# fmt: off
FREQ_8000 = 0x01
FREQ_8000 = 0x01
FREQ_11025 = 0x02
FREQ_16000 = 0x03
FREQ_22050 = 0x04
@@ -430,7 +430,7 @@ class AseResponseCode(enum.IntEnum):
REJECTED_METADATA = 0x0B
INVALID_METADATA = 0x0C
INSUFFICIENT_RESOURCES = 0x0D
UNSPECIFIED_ERROR = 0x0E
UNSPECIFIED_ERROR = 0x0E
class AseReasonCode(enum.IntEnum):
@@ -1066,7 +1066,7 @@ class AseStateMachine(gatt.Characteristic):
# Readonly. Do nothing in the setter.
pass
def on_read(self, _: device.Connection) -> bytes:
def on_read(self, _: Optional[device.Connection]) -> bytes:
return self.value
def __str__(self) -> str:
+52
View File
@@ -0,0 +1,52 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from bumble import gatt
from bumble import gatt_client
from bumble.profiles import csip
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class CommonAudioServiceService(gatt.TemplateService):
UUID = gatt.GATT_COMMON_AUDIO_SERVICE
def __init__(
self,
coordinated_set_identification_service: csip.CoordinatedSetIdentificationService,
) -> None:
self.coordinated_set_identification_service = (
coordinated_set_identification_service
)
super().__init__(
characteristics=[],
included_services=[coordinated_set_identification_service],
)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class CommonAudioServiceServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = CommonAudioServiceService
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
+62 -4
View File
@@ -21,6 +21,9 @@ import enum
import struct
from typing import Optional
from bumble import core
from bumble import crypto
from bumble import device
from bumble import gatt
from bumble import gatt_client
@@ -43,9 +46,43 @@ class MemberLock(enum.IntEnum):
# -----------------------------------------------------------------------------
# Utils
# Crypto Toolbox
# -----------------------------------------------------------------------------
# TODO: Implement RSI Generator
def s1(m: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.3 s1 SALT generation function.
'''
return crypto.aes_cmac(m[::-1], bytes(16))[::-1]
def k1(n: bytes, salt: bytes, p: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.4 k1 derivation function.
'''
t = crypto.aes_cmac(n[::-1], salt[::-1])
return crypto.aes_cmac(p[::-1], t)[::-1]
def sef(k: bytes, r: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.5 SIRK encryption function sef.
'''
return crypto.xor(k1(k, s1(b'SIRKenc'[::-1]), b'csis'[::-1]), r)
def sih(k: bytes, r: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.7 Resolvable Set Identifier hash function sih.
'''
return crypto.e(k, r + bytes(13))[:3]
def generate_rsi(sirk: bytes) -> bytes:
'''
Coordinated Set Identification Service - 4.8 Resolvable Set Identifier generation operation.
'''
prand = crypto.generate_prand()
return sih(sirk, prand) + prand
# -----------------------------------------------------------------------------
@@ -54,6 +91,7 @@ class MemberLock(enum.IntEnum):
class CoordinatedSetIdentificationService(gatt.TemplateService):
UUID = gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE
set_identity_resolving_key: bytes
set_identity_resolving_key_characteristic: gatt.Characteristic
coordinated_set_size_characteristic: Optional[gatt.Characteristic] = None
set_member_lock_characteristic: Optional[gatt.Characteristic] = None
@@ -62,19 +100,21 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
def __init__(
self,
set_identity_resolving_key: bytes,
set_identity_resolving_key_type: SirkType,
coordinated_set_size: Optional[int] = None,
set_member_lock: Optional[MemberLock] = None,
set_member_rank: Optional[int] = None,
) -> None:
characteristics = []
self.set_identity_resolving_key = set_identity_resolving_key
self.set_identity_resolving_key_type = set_identity_resolving_key_type
self.set_identity_resolving_key_characteristic = gatt.Characteristic(
uuid=gatt.GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
# TODO: Implement encrypted SIRK reader.
value=struct.pack('B', SirkType.PLAINTEXT) + set_identity_resolving_key,
value=gatt.CharacteristicValue(read=self.on_sirk_read),
)
characteristics.append(self.set_identity_resolving_key_characteristic)
@@ -112,6 +152,24 @@ class CoordinatedSetIdentificationService(gatt.TemplateService):
super().__init__(characteristics)
def on_sirk_read(self, _connection: Optional[device.Connection]) -> bytes:
if self.set_identity_resolving_key_type == SirkType.PLAINTEXT:
return bytes([SirkType.PLAINTEXT]) + self.set_identity_resolving_key
else:
raise NotImplementedError('TODO: Pending async Characteristic read.')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.RESOLVABLE_SET_IDENTIFIER,
generate_rsi(self.set_identity_resolving_key),
),
]
)
)
# -----------------------------------------------------------------------------
# Client
+3 -6
View File
@@ -280,17 +280,14 @@ class AsyncRunner:
def wrapper(*args, **kwargs):
coroutine = func(*args, **kwargs)
if queue is None:
# Create a task to run the coroutine
# Spawn the coroutine as a task
async def run():
try:
await coroutine
except Exception:
logger.warning(
f'{color("!!! Exception in wrapper:", "red")} '
f'{traceback.format_exc()}'
)
logger.exception(color("!!! Exception in wrapper:", "red"))
asyncio.create_task(run())
AsyncRunner.spawn(run())
else:
# Queue the coroutine to be awaited by the work queue
queue.enqueue(coroutine)
+5
View File
@@ -0,0 +1,5 @@
{
"name": "Bumble HID Keyboard",
"class_of_device": 9664,
"keystore": "JsonKeyStore"
}
+3 -3
View File
@@ -40,9 +40,9 @@
}
}
function onMouseMove(event) {
//console.log(event.clientX, event.clientY)
mouseInfo.innerText = `MOUSE: x=${event.clientX}, y=${event.clientY}`
send({ type:'mousemove', x: event.clientX, y: event.clientY })
//console.log(event.movementX, event.movementY)
mouseInfo.innerText = `MOUSE: x=${event.movementX}, y=${event.movementY}`
send({ type:'mousemove', x: event.movementX, y: event.movementY })
}
function onKeyDown(event) {
+116
View File
@@ -0,0 +1,116 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
import secrets
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
Address,
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
sirk = secrets.token_bytes(16)
for i, device in enumerate(devices):
device.random_address = Address(secrets.token_bytes(6))
await device.power_on()
csis = CoordinatedSetIdentificationService(
set_identity_resolving_key=sirk,
set_identity_resolving_key_type=SirkType.PLAINTEXT,
coordinated_set_size=2,
)
device.add_service(CommonAudioServiceService(csis))
advertising_data = (
bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes(f'Bumble LE Audio-{i}', 'utf-8'),
),
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(CoordinatedSetIdentificationService.UUID),
),
]
)
)
+ csis.get_advertising_data()
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+748
View File
@@ -0,0 +1,748 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import json
import websockets
from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_L2CAP_PROTOCOL_ID,
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
BT_HIDP_PROTOCOL_ID,
UUID,
)
from bumble.hci import Address
from bumble.hid import (
Device as HID_Device,
HID_CONTROL_PSM,
HID_INTERRUPT_PSM,
Message,
)
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PUBLIC_BROWSE_ROOT,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_ALL_ATTRIBUTES_RANGE,
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
)
from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
# SDP attributes for Bluetooth HID devices
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID = 0x0100
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID = 0x0101
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID = 0x0102
SDP_HID_DEVICE_RELEASE_NUMBER_ATTRIBUTE_ID = 0x0200 # [DEPRECATED]
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID = 0x0201
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID = 0x0202
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID = 0x0203
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID = 0x0204
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID = 0x0205
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0x0206
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID = 0x0207
SDP_HID_SDP_DISABLE_ATTRIBUTE_ID = 0x0208 # [DEPRECATED]
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID = 0x0209
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID = 0x020A
SDP_HID_PROFILE_VERSION_ATTRIBUTE_ID = 0x020B # DEPRECATED]
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID = 0x020C
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID = 0x020D
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID = 0x020E
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID = 0x020F
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID = 0x0210
# Refer to HID profile specification v1.1.1, "5.3 Service Discovery Protocol (SDP)" for details
# HID SDP attribute values
LANGUAGE = 0x656E # 0x656E uint16 “en” (English)
ENCODING = 0x6A # 0x006A uint16 UTF-8 encoding
PRIMARY_LANGUAGE_BASE_ID = 0x100 # 0x0100 uint16 PrimaryLanguageBaseID
VERSION_NUMBER = 0x0101 # 0x0101 uint16 version number (v1.1)
SERVICE_NAME = b'Bumble HID'
SERVICE_DESCRIPTION = b'Bumble'
PROVIDER_NAME = b'Bumble'
HID_PARSER_VERSION = 0x0111 # uint16 0x0111 (v1.1.1)
HID_DEVICE_SUBCLASS = 0xC0 # Combo keyboard/pointing device
HID_COUNTRY_CODE = 0x21 # 0x21 Uint8, USA
HID_VIRTUAL_CABLE = True # Virtual cable enabled
HID_RECONNECT_INITIATE = True # Reconnect initiate enabled
REPORT_DESCRIPTOR_TYPE = 0x22 # 0x22 Type = Report Descriptor
HID_LANGID_BASE_LANGUAGE = 0x0409 # 0x0409 Language = English (United States)
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET = 0x100 # 0x0100 Default
HID_BATTERY_POWER = True # Battery power enabled
HID_REMOTE_WAKE = True # Remote wake enabled
HID_SUPERVISION_TIMEOUT = 0xC80 # uint16 0xC80 (2s)
HID_NORMALLY_CONNECTABLE = True # Normally connectable enabled
HID_BOOT_DEVICE = True # Boot device support enabled
HID_SSR_HOST_MAX_LATENCY = 0x640 # uint16 0x640 (1s)
HID_SSR_HOST_MIN_TIMEOUT = 0xC80 # uint16 0xC80 (2s)
HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor
# pylint: disable=line-too-long
[
0x05,
0x01, # Usage Page (Generic Desktop Ctrls)
0x09,
0x06, # Usage (Keyboard)
0xA1,
0x01, # Collection (Application)
0x85,
0x01, # . Report ID (1)
0x05,
0x07, # . Usage Page (Kbrd/Keypad)
0x19,
0xE0, # . Usage Minimum (0xE0)
0x29,
0xE7, # . Usage Maximum (0xE7)
0x15,
0x00, # . Logical Minimum (0)
0x25,
0x01, # . Logical Maximum (1)
0x75,
0x01, # . Report Size (1)
0x95,
0x08, # . Report Count (8)
0x81,
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95,
0x01, # . Report Count (1)
0x75,
0x08, # . Report Size (8)
0x81,
0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95,
0x05, # . Report Count (5)
0x75,
0x01, # . Report Size (1)
0x05,
0x08, # . Usage Page (LEDs)
0x19,
0x01, # . Usage Minimum (Num Lock)
0x29,
0x05, # . Usage Maximum (Kana)
0x91,
0x02, # . Output (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
0x95,
0x01, # . Report Count (1)
0x75,
0x03, # . Report Size (3)
0x91,
0x03, # . Output (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position,Non-volatile)
0x95,
0x06, # . Report Count (6)
0x75,
0x08, # . Report Size (8)
0x15,
0x00, # . Logical Minimum (0)
0x25,
0x65, # . Logical Maximum (101)
0x05,
0x07, # . Usage Page (Kbrd/Keypad)
0x19,
0x00, # . Usage Minimum (0x00)
0x29,
0x65, # . Usage Maximum (0x65)
0x81,
0x00, # . Input (Data,Array,Abs,No Wrap,Linear,Preferred State,No Null Position)
0xC0, # End Collection
0x05,
0x01, # Usage Page (Generic Desktop Ctrls)
0x09,
0x02, # Usage (Mouse)
0xA1,
0x01, # Collection (Application)
0x85,
0x02, # . Report ID (2)
0x09,
0x01, # . Usage (Pointer)
0xA1,
0x00, # . Collection (Physical)
0x05,
0x09, # . Usage Page (Button)
0x19,
0x01, # . Usage Minimum (0x01)
0x29,
0x03, # . Usage Maximum (0x03)
0x15,
0x00, # . Logical Minimum (0)
0x25,
0x01, # . Logical Maximum (1)
0x95,
0x03, # . Report Count (3)
0x75,
0x01, # . Report Size (1)
0x81,
0x02, # . Input (Data,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x95,
0x01, # . Report Count (1)
0x75,
0x05, # . Report Size (5)
0x81,
0x03, # . Input (Const,Var,Abs,No Wrap,Linear,Preferred State,No Null Position)
0x05,
0x01, # . Usage Page (Generic Desktop Ctrls)
0x09,
0x30, # . Usage (X)
0x09,
0x31, # . Usage (Y)
0x15,
0x81, # . Logical Minimum (-127)
0x25,
0x7F, # . Logical Maximum (127)
0x75,
0x08, # . Report Size (8)
0x95,
0x02, # . Report Count (2)
0x81,
0x06, # . Input (Data,Var,Rel,No Wrap,Linear,Preferred State,No Null Position)
0xC0, # . End Collection
0xC0, # End Collection
]
)
# Default protocol mode set to report protocol
protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL
# -----------------------------------------------------------------------------
def sdp_records():
service_record_handle = 0x00010002
return {
service_record_handle: [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE)]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(HID_CONTROL_PSM),
]
),
DataElement.sequence(
[
DataElement.uuid(BT_HIDP_PROTOCOL_ID),
]
),
]
),
),
ServiceAttribute(
SDP_LANGUAGE_BASE_ATTRIBUTE_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.unsigned_integer_16(LANGUAGE),
DataElement.unsigned_integer_16(ENCODING),
DataElement.unsigned_integer_16(PRIMARY_LANGUAGE_BASE_ID),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(BT_HUMAN_INTERFACE_DEVICE_SERVICE),
DataElement.unsigned_integer_16(VERSION_NUMBER),
]
),
]
),
),
ServiceAttribute(
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
DataElement.unsigned_integer_16(
HID_INTERRUPT_PSM
),
]
),
DataElement.sequence(
[
DataElement.uuid(BT_HIDP_PROTOCOL_ID),
]
),
]
),
]
),
),
ServiceAttribute(
SDP_HID_SERVICE_NAME_ATTRIBUTE_ID,
DataElement(DataElement.TEXT_STRING, SERVICE_NAME),
),
ServiceAttribute(
SDP_HID_SERVICE_DESCRIPTION_ATTRIBUTE_ID,
DataElement(DataElement.TEXT_STRING, SERVICE_DESCRIPTION),
),
ServiceAttribute(
SDP_HID_PROVIDER_NAME_ATTRIBUTE_ID,
DataElement(DataElement.TEXT_STRING, PROVIDER_NAME),
),
ServiceAttribute(
SDP_HID_PARSER_VERSION_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(HID_PARSER_VERSION),
),
ServiceAttribute(
SDP_HID_DEVICE_SUBCLASS_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(HID_DEVICE_SUBCLASS),
),
ServiceAttribute(
SDP_HID_COUNTRY_CODE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(HID_COUNTRY_CODE),
),
ServiceAttribute(
SDP_HID_VIRTUAL_CABLE_ATTRIBUTE_ID,
DataElement.boolean(HID_VIRTUAL_CABLE),
),
ServiceAttribute(
SDP_HID_RECONNECT_INITIATE_ATTRIBUTE_ID,
DataElement.boolean(HID_RECONNECT_INITIATE),
),
ServiceAttribute(
SDP_HID_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.unsigned_integer_16(REPORT_DESCRIPTOR_TYPE),
DataElement(DataElement.TEXT_STRING, HID_REPORT_MAP),
]
),
]
),
),
ServiceAttribute(
SDP_HID_LANGID_BASE_LIST_ATTRIBUTE_ID,
DataElement.sequence(
[
DataElement.sequence(
[
DataElement.unsigned_integer_16(
HID_LANGID_BASE_LANGUAGE
),
DataElement.unsigned_integer_16(
HID_LANGID_BASE_BLUETOOTH_STRING_OFFSET
),
]
),
]
),
),
ServiceAttribute(
SDP_HID_BATTERY_POWER_ATTRIBUTE_ID,
DataElement.boolean(HID_BATTERY_POWER),
),
ServiceAttribute(
SDP_HID_REMOTE_WAKE_ATTRIBUTE_ID,
DataElement.boolean(HID_REMOTE_WAKE),
),
ServiceAttribute(
SDP_HID_SUPERVISION_TIMEOUT_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(HID_SUPERVISION_TIMEOUT),
),
ServiceAttribute(
SDP_HID_NORMALLY_CONNECTABLE_ATTRIBUTE_ID,
DataElement.boolean(HID_NORMALLY_CONNECTABLE),
),
ServiceAttribute(
SDP_HID_BOOT_DEVICE_ATTRIBUTE_ID,
DataElement.boolean(HID_BOOT_DEVICE),
),
ServiceAttribute(
SDP_HID_SSR_HOST_MAX_LATENCY_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(HID_SSR_HOST_MAX_LATENCY),
),
ServiceAttribute(
SDP_HID_SSR_HOST_MIN_TIMEOUT_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(HID_SSR_HOST_MIN_TIMEOUT),
),
]
}
# -----------------------------------------------------------------------------
async def get_stream_reader(pipe) -> asyncio.StreamReader:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader(loop=loop)
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, pipe)
return reader
class DeviceData:
def __init__(self) -> None:
self.keyboardData = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
)
self.mouseData = bytearray([0x02, 0x00, 0x00, 0x00])
# Device's live data - Mouse and Keyboard will be stored in this
deviceData = DeviceData()
# -----------------------------------------------------------------------------
async def keyboard_device(hid_device):
# Start a Websocket server to receive events from a web page
async def serve(websocket, _path):
global deviceData
while True:
try:
message = await websocket.recv()
print('Received: ', str(message))
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'keydown':
# Only deal with keys a to z for now
key = parsed['key']
if len(key) == 1:
code = ord(key)
if ord('a') <= code <= ord('z'):
hid_code = 0x04 + code - ord('a')
deviceData.keyboardData = bytearray(
[
0x01,
0x00,
0x00,
hid_code,
0x00,
0x00,
0x00,
0x00,
0x00,
]
)
hid_device.send_data(deviceData.keyboardData)
elif message_type == 'keyup':
deviceData.keyboardData = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
)
hid_device.send_data(deviceData.keyboardData)
elif message_type == "mousemove":
# logical min and max values
log_min = -127
log_max = 127
x = parsed['x']
y = parsed['y']
# limiting x and y values within logical max and min range
x = max(log_min, min(log_max, x))
y = max(log_min, min(log_max, y))
x_cord = x.to_bytes(signed=True)
y_cord = y.to_bytes(signed=True)
deviceData.mouseData = bytearray([0x02, 0x00]) + x_cord + y_cord
hid_device.send_data(deviceData.mouseData)
except websockets.exceptions.ConnectionClosedOK:
pass
# pylint: disable-next=no-member
await websockets.serve(serve, 'localhost', 8989)
await asyncio.get_event_loop().create_future()
# -----------------------------------------------------------------------------
async def main():
if len(sys.argv) < 3:
print(
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
' where <command> is one of:\n'
' test-mode (run with menu enabled for testing)\n'
' web (run a keyboard with keypress input from a web page, '
'see keyboard.html'
)
print('example: python run_hid_device.py hid_keyboard.json usb:0 web')
print('example: python run_hid_device.py hid_keyboard.json usb:0 test-mode')
return
async def handle_virtual_cable_unplug():
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
await hid_device.disconnect_interrupt_channel()
await hid_device.disconnect_control_channel()
await device.keystore.delete(hid_host_bd_addr) # type: ignore
connection = hid_device.connection
if connection is not None:
await connection.disconnect()
def on_hid_data_cb(pdu: bytes):
print(f'Received Data, PDU: {pdu.hex()}')
def on_get_report_cb(report_id: int, report_type: int, buffer_size: int):
retValue = hid_device.GetSetStatus()
print(
"GET_REPORT report_id: "
+ str(report_id)
+ "report_type: "
+ str(report_type)
+ "buffer_size:"
+ str(buffer_size)
)
if report_type == Message.ReportType.INPUT_REPORT:
if report_id == 1:
retValue.data = deviceData.keyboardData[1:]
retValue.status = hid_device.GetSetReturn.SUCCESS
elif report_id == 2:
retValue.data = deviceData.mouseData[1:]
retValue.status = hid_device.GetSetReturn.SUCCESS
else:
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
if buffer_size:
data_len = buffer_size - 1
retValue.data = retValue.data[:data_len]
elif report_type == Message.ReportType.OUTPUT_REPORT:
# This sample app has nothing to do with the report received, to enable PTS
# testing, we will return single byte random data.
retValue.data = bytearray([0x11])
retValue.status = hid_device.GetSetReturn.SUCCESS
elif report_type == Message.ReportType.FEATURE_REPORT:
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_type == Message.ReportType.OTHER_REPORT:
if report_id == 3:
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
else:
retValue.status = hid_device.GetSetReturn.FAILURE
return retValue
def on_set_report_cb(
report_id: int, report_type: int, report_size: int, data: bytes
):
retValue = hid_device.GetSetStatus()
print(
"SET_REPORT report_id: "
+ str(report_id)
+ "report_type: "
+ str(report_type)
+ "report_size "
+ str(report_size)
+ "data:"
+ str(data)
)
if report_type == Message.ReportType.FEATURE_REPORT:
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_type == Message.ReportType.INPUT_REPORT:
if report_id == 1 and report_size != len(deviceData.keyboardData):
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_id == 2 and report_size != len(deviceData.mouseData):
retValue.status = hid_device.GetSetReturn.ERR_INVALID_PARAMETER
elif report_id == 3:
retValue.status = hid_device.GetSetReturn.REPORT_ID_NOT_FOUND
else:
retValue.status = hid_device.GetSetReturn.SUCCESS
else:
retValue.status = hid_device.GetSetReturn.SUCCESS
return retValue
def on_get_protocol_cb():
retValue = hid_device.GetSetStatus()
retValue.data = protocol_mode.to_bytes()
retValue.status = hid_device.GetSetReturn.SUCCESS
return retValue
def on_set_protocol_cb(protocol: int):
retValue = hid_device.GetSetStatus()
# We do not support SET_PROTOCOL.
print("SET_PROTOCOL report_id: " + str(protocol))
retValue.status = hid_device.GetSetReturn.ERR_UNSUPPORTED_REQUEST
return retValue
def on_virtual_cable_unplug_cb():
print(f'Received Virtual Cable Unplug')
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
# Create and register HID device
hid_device = HID_Device(device)
# Register for call backs
hid_device.on('interrupt_data', on_hid_data_cb)
hid_device.register_get_report_cb(on_get_report_cb)
hid_device.register_set_report_cb(on_set_report_cb)
hid_device.register_get_protocol_cb(on_get_protocol_cb)
hid_device.register_set_protocol_cb(on_set_protocol_cb)
# Register for virtual cable unplug call back
hid_device.on('virtual_cable_unplug', on_virtual_cable_unplug_cb)
# Setup the SDP to advertise HID Device service
device.sdp_service_records = sdp_records()
# Start the controller
await device.power_on()
# Start being discoverable and connectable
await device.set_discoverable(True)
await device.set_connectable(True)
async def menu():
reader = await get_stream_reader(sys.stdin)
while True:
print(
"\n************************ HID Device Menu *****************************\n"
)
print(" 1. Connect Control Channel")
print(" 2. Connect Interrupt Channel")
print(" 3. Disconnect Control Channel")
print(" 4. Disconnect Interrupt Channel")
print(" 5. Send Report on Interrupt Channel")
print(" 6. Virtual Cable Unplug")
print(" 7. Disconnect device")
print(" 8. Delete Bonding")
print(" 9. Re-connect to device")
print("10. Exit ")
print("\nEnter your choice : \n")
choice = await reader.readline()
choice = choice.decode('utf-8').strip()
if choice == '1':
await hid_device.connect_control_channel()
elif choice == '2':
await hid_device.connect_interrupt_channel()
elif choice == '3':
await hid_device.disconnect_control_channel()
elif choice == '4':
await hid_device.disconnect_interrupt_channel()
elif choice == '5':
print(" 1. Report ID 0x01")
print(" 2. Report ID 0x02")
print(" 3. Invalid Report ID")
choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip()
if choice1 == '1':
data = bytearray(
[0x01, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00]
)
hid_device.send_data(data)
data = bytearray(
[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
)
hid_device.send_data(data)
elif choice1 == '2':
data = bytearray([0x02, 0x00, 0x00, 0xF6])
hid_device.send_data(data)
data = bytearray([0x02, 0x00, 0x00, 0x00])
hid_device.send_data(data)
elif choice1 == '3':
data = bytearray([0x00, 0x00, 0x00, 0x00])
hid_device.send_data(data)
data = bytearray([0x00, 0x00, 0x00, 0x00])
hid_device.send_data(data)
else:
print('Incorrect option selected')
elif choice == '6':
hid_device.virtual_cable_unplug()
try:
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
await device.keystore.delete(hid_host_bd_addr)
except KeyError:
print('Device not found or Device already unpaired.')
elif choice == '7':
connection = hid_device.connection
if connection is not None:
await connection.disconnect()
else:
print("Already disconnected from device")
elif choice == '8':
try:
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
await device.keystore.delete(hid_host_bd_addr)
except KeyError:
print('Device NOT found or Device already unpaired.')
elif choice == '9':
hid_host_bd_addr = str(hid_device.remote_device_bd_address)
connection = await device.connect(
hid_host_bd_addr, transport=BT_BR_EDR_TRANSPORT
)
await connection.authenticate()
await connection.encrypt()
elif choice == '10':
sys.exit("Exit successful")
else:
print("Invalid option selected.")
if (len(sys.argv) > 3) and (sys.argv[3] == 'test-mode'):
# Test mode for PTS/Unit testing
await menu()
else:
# default option is using keyboard.html (web)
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
+51 -20
View File
@@ -285,7 +285,10 @@ async def main():
print('example: run_hid_host.py classic1.json usb:0 E1:CA:72:48:C4:E8/P')
return
def on_hid_data_cb(pdu):
def on_hid_control_data_cb(pdu: bytes):
print(f'Received Control Data, PDU: {pdu.hex()}')
def on_hid_interrupt_data_cb(pdu: bytes):
report_type = pdu[0] & 0x0F
if len(pdu) == 1:
print(color(f'Warning: No report received', 'yellow'))
@@ -305,7 +308,7 @@ async def main():
if (report_length <= 1) or (report_id == 0):
return
# Parse report over interrupt channel
if report_type == Message.ReportType.INPUT_REPORT:
ReportParser.parse_input_report(pdu[1:]) # type: ignore
@@ -313,7 +316,9 @@ async def main():
await hid_host.disconnect_interrupt_channel()
await hid_host.disconnect_control_channel()
await device.keystore.delete(target_address) # type: ignore
await connection.disconnect()
connection = hid_host.connection
if connection is not None:
await connection.disconnect()
def on_hid_virtual_cable_unplug_cb():
asyncio.create_task(handle_virtual_cable_unplug())
@@ -325,6 +330,18 @@ async def main():
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
# Create HID host and start it
print('@@@ Starting HID Host...')
hid_host = Host(device)
# Register for HID data call back
hid_host.on('interrupt_data', on_hid_interrupt_data_cb)
hid_host.on('control_data', on_hid_control_data_cb)
# Register for virtual cable unplug call back
hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb)
await device.power_on()
# Connect to a peer
@@ -345,16 +362,6 @@ async def main():
await get_hid_device_sdp_record(connection)
# Create HID host and start it
print('@@@ Starting HID Host...')
hid_host = Host(device, connection)
# Register for HID data call back
hid_host.on('data', on_hid_data_cb)
# Register for virtual cable unplug call back
hid_host.on('virtual_cable_unplug', on_hid_virtual_cable_unplug_cb)
async def menu():
reader = await get_stream_reader(sys.stdin)
while True:
@@ -369,13 +376,14 @@ async def main():
print(" 6. Set Report")
print(" 7. Set Protocol Mode")
print(" 8. Get Protocol Mode")
print(" 9. Send Report")
print(" 9. Send Report on Interrupt Channel")
print("10. Suspend")
print("11. Exit Suspend")
print("12. Virtual Cable Unplug")
print("13. Disconnect device")
print("14. Delete Bonding")
print("15. Re-connect to device")
print("16. Exit")
print("\nEnter your choice : \n")
choice = await reader.readline()
@@ -394,21 +402,40 @@ async def main():
await hid_host.disconnect_interrupt_channel()
elif choice == '5':
print(" 1. Report ID 0x02")
print(" 2. Report ID 0x03")
print(" 3. Report ID 0x05")
print(" 1. Input Report with ID 0x01")
print(" 2. Input Report with ID 0x02")
print(" 3. Input Report with ID 0x0F - Invalid ReportId")
print(" 4. Output Report with ID 0x02")
print(" 5. Feature Report with ID 0x05 - Unsupported Request")
print(" 6. Input Report with ID 0x02, BufferSize 3")
print(" 7. Output Report with ID 0x03, BufferSize 2")
print(" 8. Feature Report with ID 0x05, BufferSize 3")
choice1 = await reader.readline()
choice1 = choice1.decode('utf-8').strip()
if choice1 == '1':
hid_host.get_report(1, 2, 3)
hid_host.get_report(1, 1, 0)
elif choice1 == '2':
hid_host.get_report(2, 3, 2)
hid_host.get_report(1, 2, 0)
elif choice1 == '3':
hid_host.get_report(3, 5, 3)
hid_host.get_report(1, 5, 0)
elif choice1 == '4':
hid_host.get_report(2, 2, 0)
elif choice1 == '5':
hid_host.get_report(3, 15, 0)
elif choice1 == '6':
hid_host.get_report(1, 2, 3)
elif choice1 == '7':
hid_host.get_report(2, 3, 2)
elif choice1 == '8':
hid_host.get_report(3, 5, 3)
else:
print('Incorrect option selected')
@@ -484,6 +511,7 @@ async def main():
hid_host.virtual_cable_unplug()
try:
await device.keystore.delete(target_address)
print("Unpair successful")
except KeyError:
print('Device not found or Device already unpaired.')
@@ -513,6 +541,9 @@ async def main():
await connection.authenticate()
await connection.encrypt()
elif choice == '16':
sys.exit("Exit successful")
else:
print("Invalid option selected.")
+32 -21
View File
@@ -20,6 +20,7 @@ import logging
import sys
import os
import struct
import secrets
from bumble.core import AdvertisingData
from bumble.device import Device, CisLink
from bumble.hci import (
@@ -39,6 +40,8 @@ from bumble.profiles.bap import (
PublishedAudioCapabilitiesService,
AudioStreamControlService,
)
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.transport import open_transport_or_link
@@ -60,6 +63,11 @@ async def main() -> None:
await device.power_on()
csis = CoordinatedSetIdentificationService(
set_identity_resolving_key=secrets.token_bytes(16),
set_identity_resolving_key_type=SirkType.PLAINTEXT,
)
device.add_service(CommonAudioServiceService(csis))
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
@@ -108,29 +116,32 @@ async def main() -> None:
device.add_service(AudioStreamControlService(device, sink_ase_id=[1, 2]))
advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
advertising_data = (
bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)
+ csis.get_advertising_data()
)
subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0',
+1
View File
@@ -56,6 +56,7 @@ install_requires =
[options.entry_points]
console_scripts =
bumble-ble-rpa-tool = bumble.apps.ble_rpa_tool:main
bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
+2 -1
View File
@@ -48,7 +48,8 @@ from bumble.profiles.bap import (
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
from .test_utils import TwoDevices
from tests.test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
+71
View File
@@ -0,0 +1,71 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import logging
from bumble import device
from bumble import gatt
from bumble.profiles import cap
from bumble.profiles import csip
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cas():
SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa')
devices = TwoDevices()
devices[0].add_service(
cap.CommonAudioServiceService(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
set_identity_resolving_key_type=csip.SirkType.PLAINTEXT,
)
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
cas_client = await peer.discover_service_and_create_proxy(
cap.CommonAudioServiceServiceProxy
)
included_services = await peer.discover_included_services(cas_client.service_proxy)
assert any(
service.uuid == gatt.GATT_COORDINATED_SET_IDENTIFICATION_SERVICE
for service in included_services
)
# -----------------------------------------------------------------------------
async def run():
await test_cas()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+37
View File
@@ -31,6 +31,41 @@ from .test_utils import TwoDevices
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_s1():
assert (
csip.s1(b'SIRKenc'[::-1])
== bytes.fromhex('6901983f 18149e82 3c7d133a 7d774572')[::-1]
)
# -----------------------------------------------------------------------------
def test_k1():
K = bytes.fromhex('676e1b9b d448696f 061ec622 3ce5ced9')[::-1]
SALT = csip.s1(b'SIRKenc'[::-1])
P = b'csis'[::-1]
assert (
csip.k1(K, SALT, P)
== bytes.fromhex('5277453c c094d982 b0e8ee53 2f2d1f8b')[::-1]
)
# -----------------------------------------------------------------------------
def test_sih():
SIRK = bytes.fromhex('457d7d09 21a1fd22 cecd8c86 dd72cccd')[::-1]
PRAND = bytes.fromhex('69f563')[::-1]
assert csip.sih(SIRK, PRAND) == bytes.fromhex('1948da')[::-1]
# -----------------------------------------------------------------------------
def test_sef():
SIRK = bytes.fromhex('457d7d09 21a1fd22 cecd8c86 dd72cccd')[::-1]
K = bytes.fromhex('676e1b9b d448696f 061ec622 3ce5ced9')[::-1]
assert (
csip.sef(K, SIRK) == bytes.fromhex('170a3835 e13524a0 7e2562d5 f25fd346')[::-1]
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_csis():
@@ -40,6 +75,7 @@ async def test_csis():
devices[0].add_service(
csip.CoordinatedSetIdentificationService(
set_identity_resolving_key=SIRK,
set_identity_resolving_key_type=csip.SirkType.PLAINTEXT,
coordinated_set_size=2,
set_member_lock=csip.MemberLock.UNLOCKED,
set_member_rank=0,
@@ -65,6 +101,7 @@ async def test_csis():
# -----------------------------------------------------------------------------
async def run():
test_sih()
await test_csis()
+76 -31
View File
@@ -20,11 +20,10 @@ import logging
import os
import struct
import pytest
from unittest.mock import Mock, ANY
from unittest.mock import AsyncMock, Mock, ANY
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.gatt_server import Server
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
@@ -120,9 +119,9 @@ async def test_characteristic_encoding():
Characteristic.READABLE,
123,
)
x = c.read_value(None)
x = await c.read_value(None)
assert x == bytes([123])
c.write_value(None, bytes([122]))
await c.write_value(None, bytes([122]))
assert c.value == 122
class FooProxy(CharacteristicProxy):
@@ -152,7 +151,22 @@ async def test_characteristic_encoding():
bytes([123]),
)
service = Service('3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic])
async def async_read(connection):
return 0x05060708
async_characteristic = PackedCharacteristicAdapter(
Characteristic(
'2AB7E91B-43E8-4F73-AC3B-80C1683B47F9',
Characteristic.Properties.READ,
Characteristic.READABLE,
CharacteristicValue(read=async_read),
),
'>I',
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829', [characteristic, async_characteristic]
)
server.add_service(service)
await client.power_on()
@@ -184,6 +198,13 @@ async def test_characteristic_encoding():
await async_barrier()
assert characteristic.value == bytes([50])
c2 = peer.get_characteristics_by_uuid(async_characteristic.uuid)
assert len(c2) == 1
c2 = c2[0]
cd2 = PackedCharacteristicAdapter(c2, ">I")
cd2v = await cd2.read_value()
assert cd2v == 0x05060708
last_change = None
def on_change(value):
@@ -285,7 +306,8 @@ async def test_attribute_getters():
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
@pytest.mark.asyncio
async def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(
@@ -296,11 +318,11 @@ def test_CharacteristicAdapter():
)
a = CharacteristicAdapter(c)
value = a.read_value(None)
value = await a.read_value(None)
assert value == v
v = bytes([3, 4, 5])
a.write_value(None, v)
await a.write_value(None, v)
assert c.value == v
# Simple delegated adapter
@@ -308,11 +330,11 @@ def test_CharacteristicAdapter():
c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
)
value = a.read_value(None)
value = await a.read_value(None)
assert value == bytes(reversed(v))
v = bytes([3, 4, 5])
a.write_value(None, v)
await a.write_value(None, v)
assert a.value == bytes(reversed(v))
# Packed adapter with single element format
@@ -321,10 +343,10 @@ def test_CharacteristicAdapter():
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
value = await a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
await a.write_value(None, pv)
assert a.value == v
# Packed adapter with multi-element format
@@ -334,10 +356,10 @@ def test_CharacteristicAdapter():
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
value = await a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
await a.write_value(None, pv)
assert a.value == (v1, v2)
# Mapped adapter
@@ -348,10 +370,10 @@ def test_CharacteristicAdapter():
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
value = await a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
await a.write_value(None, pv)
assert a.value == mapped
# UTF-8 adapter
@@ -360,27 +382,49 @@ def test_CharacteristicAdapter():
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
value = await a.read_value(None)
assert value == ev
c.value = None
a.write_value(None, ev)
await a.write_value(None, ev)
assert a.value == v
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
@pytest.mark.asyncio
async def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
async def read_value(connection):
return b
c = CharacteristicValue(read=read_value)
x = await c.read(None)
assert x == b
result = []
c = CharacteristicValue(
write=lambda connection, value: result.append((connection, value))
)
m = Mock()
c = CharacteristicValue(write=m)
z = object()
c.write(z, b)
assert result == [(z, b)]
m.assert_called_once_with(z, b)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_CharacteristicValue_async():
b = bytes([1, 2, 3])
async def read_value(connection):
return b
c = CharacteristicValue(read=read_value)
x = await c.read(None)
assert x == b
m = AsyncMock()
c = CharacteristicValue(write=m)
z = object()
await c.write(z, b)
m.assert_called_once_with(z, b)
# -----------------------------------------------------------------------------
@@ -961,12 +1005,18 @@ Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration
# -----------------------------------------------------------------------------
async def async_main():
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_unsubscribe()
await test_characteristic_encoding()
await test_mtu_exchange()
await test_CharacteristicValue()
await test_CharacteristicValue_async()
await test_CharacteristicAdapter()
# -----------------------------------------------------------------------------
@@ -1105,9 +1155,4 @@ def test_get_attribute_group():
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())