Compare commits

...

89 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
97ad7e5741 Merge pull request #472 from google/gbg/update-pandora-deps
update protobuf dep and make pandora install optional
2024-04-18 11:21:29 -07:00
Charlie Boutier
71df062e07 pyusb: power_cycle if '!' is present at the start of the transport 2024-04-17 14:12:55 -07:00
Charlie Boutier
049f9021e9 pyusb: powercycle the dongle 2024-04-17 14:12:55 -07:00
Gilles Boccon-Gibod
50eae2ef54 add pandora to code-check action 2024-04-17 13:19:07 -07:00
Gilles Boccon-Gibod
c8883a7d0f update protobuf dep and make pandora install optional 2024-04-17 13:14:21 -07:00
zxzxwu
51321caf5b Merge pull request #470 from zxzxwu/examples
Type hint all examples
2024-04-16 02:56:08 +08:00
zxzxwu
51a94288e2 Type hint all examples 2024-04-15 12:48:21 +00:00
zxzxwu
8758856e8c Merge pull request #465 from zxzxwu/hfp-ag
HFP AG implementation
2024-04-12 22:15:25 +08:00
Josh Wu
deba181857 HFP AG implementation 2024-04-10 09:51:37 +00:00
zxzxwu
c65188dcbf Merge pull request #466 from zxzxwu/format
Fix format presubmit error
2024-04-09 02:59:36 +08:00
Josh Wu
21d607898d Fix format presubmit error 2024-04-09 01:44:04 +08:00
Gilles Boccon-Gibod
2698d4534e Merge pull request #435 from jeru/main
open_tcp_server_transport: allow explicit sock as input.
2024-04-04 19:17:07 -07:00
zxzxwu
bbcd64286a Merge pull request #463 from zxzxwu/hfp
Correct HFP AG indicator index
2024-04-04 12:53:19 +08:00
Gilles Boccon-Gibod
9140afbf8c Merge pull request #456 from google/gbg/update-dependencies
update some dependencies
2024-04-03 17:50:18 -06:00
Gilles Boccon-Gibod
90a682c71b bump to avatar 0.0.9 2024-04-03 16:26:07 -07:00
Gilles Boccon-Gibod
e8737a8243 update to more recent versions 2024-04-03 10:00:11 -07:00
Gilles Boccon-Gibod
72fceca72e update some dependencies 2024-04-03 10:00:09 -07:00
Gilles Boccon-Gibod
732294abbc Merge pull request #462 from google/gbg/461
fix #461
2024-04-03 10:56:05 -06:00
Josh Wu
dc1204531e Correct HFP AG indicator index 2024-04-03 17:58:04 +08:00
Gilles Boccon-Gibod
962114379c fix #461 2024-04-02 23:14:32 -07:00
Gilles Boccon-Gibod
e6913a3055 Merge pull request #457 from google/gbg/bench-ascyncio-main
delay creation of runner object
2024-04-02 21:39:37 -06:00
Gilles Boccon-Gibod
e21d122aef Merge pull request #458 from google/gbg/update-formatter
update black formatter to version 24
2024-04-02 21:39:24 -06:00
Gilles Boccon-Gibod
58d4ab913a update black formatter to version 24 2024-04-01 14:44:46 -07:00
Gilles Boccon-Gibod
76bca03fe3 format with the project's version of black 2024-04-01 14:39:34 -07:00
Gilles Boccon-Gibod
f1e5c9e59e delay creation of runner object 2024-04-01 14:25:38 -07:00
zxzxwu
ec82242462 Merge pull request #440 from zxzxwu/hfp
Rework HFP example
2024-03-27 16:54:41 +08:00
zxzxwu
a4efdd3f3e Merge pull request #442 from zxzxwu/unicast_ad
Implement Unicast Server Advertising Data
2024-03-27 16:54:06 +08:00
Gilles Boccon-Gibod
69c6643bb8 Merge pull request #452 from marshallpierce/mp/rust-0.2.0
Bumble crate 0.2.0
2024-03-21 17:15:43 -07:00
Marshall Pierce
b8214bf948 Bumble crate 0.2.0 2024-03-21 12:36:32 -06:00
Charlie Boutier
a9c62c44b3 pandora host: change AdvertisingType
change advertising type from high duty to low duty

Test: python le_host_test.py -c config.yml --test_bed android.bumbles --tests "test_scan('connectable','non_scannable','directed',0)" -v
2024-03-20 11:17:50 -07:00
Charlie Boutier
7d0b4ef4e0 pandora_server: Parse FLAGS into advertising data
Bug: 328089785
2024-03-18 09:20:55 -07:00
Charlie Boutier
313340f1c6 intel driver: check the vendorId and productId 2024-03-15 10:53:33 -07:00
Charlie Boutier
e8ed69fb09 pyusb: Collect vendorId and productId as metadata 2024-03-15 10:53:33 -07:00
David Duarte
16d5cf6770 usb: Add usb path moniker
Add a new moniker for usb and pyusb driver allowing
to select the usb device using its bus id and port
path like `usb:3-3.4.1`.
2024-03-15 09:17:39 -07:00
Gilles Boccon-Gibod
a2caf1deb2 Merge pull request #448 from BenjaminLawson/bump-avatar
Bump pandora-avatar to 0.0.8
2024-03-14 20:49:28 -07:00
Ben Lawson
01bfdd2c98 Bump pandora-avater to 0.0.8 2024-03-14 14:13:27 -07:00
Gilles Boccon-Gibod
4a60df108a Merge pull request #447 from BenjaminLawson/bump-rootcanal
Bump rootcanal to 1.9.0
2024-03-14 14:00:36 -07:00
Ben Lawson
ad48109748 Bump rootcanal to 1.9.0 2024-03-14 13:15:02 -07:00
Cheng Sheng
1ceeccbbc0 open_tcp_server_transport: allow explicit sock as input.
When a user doesn't need an exact port, but cares more about getting
SOME unused port, they can do:
* Create a socket outside with port=None or port=0.
* Use socket.getsockname()[1] to get the allocated port and pass to the
TCP client somehow.
* Use the created socket to create a TCP server transport.

Use-case: unit-testing embedded software that implements a BLE host. The
controller will be a Bumble controller, connected to the host via a TCP
channel.
* The host will have a TCP-client HCI transport for testing.
* The pytest setup code will allocate the TCP server and pass the port
number to the host.

Also add some unittests with python mock.
2024-03-13 19:34:05 +01:00
Gilles Boccon-Gibod
44c51c13ac Merge pull request #445 from google/gbg/driver-probe-fix
fix intel driver probe
2024-03-12 12:51:08 -07:00
Gilles Boccon-Gibod
7507be1eab update metadata when setting the host controller directly 2024-03-12 11:50:47 -07:00
Gilles Boccon-Gibod
cbe9446dcf fix intel driver probe 2024-03-12 09:54:20 -07:00
Charlie Boutier
174930399a intel: send vsc INTEL_DDC_CONFIG_WRITE
This VSC enable host-initiated role-switching after connection.

Implement this VSC in a driver fashion.

Test: avatar security_test with the Bluetooth Dongle Intel BE200
2024-03-11 09:15:18 -07:00
Josh Wu
35db4a4c93 Implement Unicast Server Advertising Data 2024-03-08 16:48:37 +08:00
Gilles Boccon-Gibod
1f3aee5566 Merge pull request #438 from BenjaminLawson/pandora-extended-advertising
Implement Pandora extended advertising
2024-03-07 20:36:56 -08:00
Ben Lawson
256044a789 Implement Pandora extended advertising
Support setting the PHY of Pandora scans.
2024-03-07 16:18:49 -08:00
Josh Wu
6205199d7f Rework HFP example 2024-03-05 20:53:28 +08:00
Gilles Boccon-Gibod
e554bd1033 Merge pull request #434 from google/gbg/show-timestamps
show timestamps from snoop logs
2024-02-29 11:44:23 -08:00
Gilles Boccon-Gibod
38981cefa1 pad index field 2024-02-28 11:46:35 -08:00
Gilles Boccon-Gibod
f2d601f411 show timestamps from snoop logs 2024-02-27 16:40:37 -08:00
zxzxwu
6e7c64c1de Merge pull request #431 from zxzxwu/rust
Bump Rust to 1.76.0
2024-02-23 15:14:30 +08:00
Josh Wu
565d51f4db Bump Rust to 1.76.0
```
error: failed to compile `cargo-all-features v1.10.0`, intermediate artifacts can be found at `/tmp/cargo-installshCmAG`

Caused by:
  package `clap v4.5.1` cannot be built because it requires rustc 1.74 or newer, while the currently active rustc version is 1.70.0
  Try re-running cargo install with `--locked`

```
2024-02-22 15:22:20 +08:00
Gilles Boccon-Gibod
de8f3d9c1e Merge pull request #426 from akuker/patch-1
Add clarification to short circuit list feature
2024-02-12 21:22:14 -08:00
Tony Kuker
cde6d48690 Add clarification to short circuit list feature 2024-02-12 12:22:36 -06:00
zxzxwu
02180088b3 Merge pull request #425 from zxzxwu/command
Refactor command supporting list
2024-02-07 21:45:52 +08:00
zxzxwu
90f49267d1 Merge pull request #424 from zxzxwu/adv
Fix double-disable legacy advertising set
2024-02-06 16:13:51 +08:00
Josh Wu
0e6d69cd7b Refactor command supporting list 2024-02-06 12:06:00 +08:00
Josh Wu
9eccc583d5 Fix double-disable legacy advertising set
When legacy advertising set is disabled passively(by set termination),
the legacy advertising set won't be released, and the next
stop_advertising() call will try to disable it again and cause an error.
2024-02-06 12:00:30 +08:00
Gilles Boccon-Gibod
f4aeaa6eb3 Merge pull request #422 from google/gbg/bench-rfcomm-params
add rfcomm options and fix l2cap mtu negotiation
2024-02-05 09:14:16 -08:00
Gilles Boccon-Gibod
d7489a644a update websockets version (for better typecheck) 2024-02-05 09:07:39 -08:00
Gilles Boccon-Gibod
a877283360 add rfcomm options and fix l2cap mtu negotiation 2024-02-05 08:56:59 -08:00
zxzxwu
6d91e7e79b Merge pull request #423 from zxzxwu/vcp
Fix Lint error in VCP example
2024-02-06 00:40:05 +08:00
Josh Wu
567146b143 Fix Lint error in VCP example 2024-02-04 21:23:22 +08:00
zxzxwu
1a3272d7ca Merge pull request #412 from zxzxwu/vcp
Add Volume Control Service
2024-02-04 00:42:51 +08:00
zxzxwu
1ee1ff0b62 Merge pull request #420 from zxzxwu/rfc
Add RFCOMM and SDP context manager and search helper
2024-02-04 00:42:24 +08:00
zxzxwu
729fd97748 Merge pull request #419 from zxzxwu/feat
Add local LMP feature reader
2024-02-03 13:51:19 +08:00
Josh Wu
e308051885 Add LMP feature reader 2024-02-03 13:29:25 +08:00
Josh Wu
10e53553d7 Add RFCOMM and SDP helpers 2024-02-03 13:13:35 +08:00
Gilles Boccon-Gibod
ef0b30d059 Merge pull request #382 from google/gbg/extended-advertising-v2
extended advertising v2
2024-02-02 20:43:28 -08:00
Gilles Boccon-Gibod
e7e9f9509a update rootcanal version 2024-02-02 20:33:19 -08:00
zxzxwu
c6cfd101df Merge pull request #415 from zxzxwu/hfp
HFP: State memory and event emission
2024-02-02 11:36:53 +08:00
Josh Wu
d2dcf063ee HFP: State memory and event emit 2024-02-01 12:08:43 +08:00
Michael Mogenson
d15bc7d664 Merge pull request #417 from mogenson/controller-loopback-cid-range
controller_loopback: LE support and max packet count
2024-01-31 21:13:21 -05:00
zxzxwu
e4364d18a7 Merge pull request #418 from zxzxwu/rfc
RFCOMM: Slightly refactor and correct constants
2024-02-01 01:30:53 +08:00
Josh Wu
6a34c9f224 RFCOMM: Slightly refactor and correct constants 2024-02-01 01:18:56 +08:00
Michael Mogenson
2a764fd6bb controller_loopback: LE support and max packet count
Bound the packet count CLI option. We're using the L2CAP header CID for
a paket ID, so the max packet count value has to fit into this 16-bit
field.

Add support for controllers that are LE only by checking the
le_acl_packet_queue.max_size.

Tested with 65535 max packet count. Took 138 seconds at 481 kB/s with a
USB BT dongle.
2024-01-31 10:26:51 -05:00
Josh Wu
3e8ce38eba Add Volume Control Service 2024-01-31 10:04:30 +08:00
Gilles Boccon-Gibod
8d2f37aa7a inclusive language 2024-01-28 19:09:39 -08:00
Gilles Boccon-Gibod
b7b70ebcbb address PR comments 2024-01-28 19:09:37 -08:00
Gilles Boccon-Gibod
8ba91f4986 fix assert 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
79a5e953bc comply with limits for certain advertising event types 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
20de5ea250 format 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
bad9ce272c add doc 2024-01-28 19:02:32 -08:00
Gilles Boccon-Gibod
d3273ffa8c format (+3 squashed commits)
Squashed commits:
[60e610f] wip
[eeab73d] wip
[3cdd5b8] basic first pass
2024-01-28 19:02:30 -08:00
zxzxwu
071fc2723a Merge pull request #376 from zxzxwu/host
Manage lifecycle of CIS and SCO links in host
2024-01-28 22:09:08 +08:00
zxzxwu
ef4ea86f58 Merge pull request #381 from zxzxwu/offload
Support non-directed address generation offload
2024-01-28 22:08:32 +08:00
Gilles Boccon-Gibod
dfdaa149d0 Merge pull request #337 from google/gbg/avrcp
Add AVRCP support
2024-01-28 01:27:52 -08:00
Josh Wu
c40824e51c Support non-directed address generation offload 2024-01-26 16:02:40 +08:00
Josh Wu
da60386385 Manage lifecycle of CIS and SCO links in host 2024-01-18 11:56:38 +08:00
109 changed files with 5464 additions and 2245 deletions

View File

@@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development]"
python -m pip install ".[build,test,development,pandora]"
- name: Check
run: |
invoke project.pre-commit

View File

@@ -32,7 +32,7 @@ jobs:
- name: Install
run: |
python -m pip install --upgrade pip
python -m pip install .[avatar]
python -m pip install .[avatar,pandora]
- name: Rootcanal
run: nohup python -m rootcanal > rootcanal.log &
- name: Test

View File

@@ -47,7 +47,7 @@ jobs:
strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11" ]
rust-version: [ "1.70.0", "stable" ]
rust-version: [ "1.76.0", "stable" ]
fail-fast: false
steps:
- name: Check out from Git

5
.gitignore vendored
View File

@@ -6,9 +6,14 @@ dist/
docs/mkdocs/site
test-results.xml
__pycache__
# Vim
.*.sw*
# generated by setuptools_scm
bumble/_version.py
.vscode/launch.json
.vscode/settings.json
/.idea
venv/
.venv/
# snoop logs
out/

View File

@@ -74,6 +74,8 @@
"substates",
"tobytes",
"tsep",
"UNMUTE",
"unmuted",
"usbmodem",
"vhci",
"websockets",

View File

@@ -50,10 +50,8 @@ from bumble.sdp import (
SDP_PUBLIC_BROWSE_ROOT,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement,
ServiceAttribute,
Client as SdpClient,
)
from bumble.transport import open_transport_or_link
import bumble.rfcomm
@@ -89,6 +87,7 @@ DEFAULT_LINGER_TIME = 1.0
DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0
DEFAULT_RFCOMM_CHANNEL = 8
DEFAULT_RFCOMM_MTU = 2048
# -----------------------------------------------------------------------------
@@ -198,48 +197,6 @@ def make_sdp_records(channel):
}
async def find_rfcomm_channel_with_uuid(connection: Connection, uuid: str) -> int:
# Connect to the SDP Server
sdp_client = SdpClient(connection)
await sdp_client.connect()
# Search for services with an L2CAP service attribute
search_result = await sdp_client.search_attributes(
[BT_L2CAP_PROTOCOL_ID],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
],
)
for attribute_list in search_result:
service_uuid = None
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
for service_class_id in service_class_id_list.value:
service_uuid = service_class_id.value
if str(service_uuid) != uuid:
# This service doesn't have a UUID or isn't the right one.
continue
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
await sdp_client.disconnect()
return protocol_descriptor.value[1].value
await sdp_client.disconnect()
return 0
def log_stats(title, stats):
stats_min = min(stats)
stats_max = max(stats)
@@ -552,9 +509,11 @@ class Ping:
packet = struct.pack(
'>bbI',
PacketType.SEQUENCE,
PACKET_FLAG_LAST
if self.current_packet_index == self.tx_packet_count - 1
else 0,
(
PACKET_FLAG_LAST
if self.current_packet_index == self.tx_packet_count - 1
else 0
),
self.current_packet_index,
) + bytes(self.tx_packet_size - 6)
logging.info(color(f'Sending packet {self.current_packet_index}', 'yellow'))
@@ -940,11 +899,14 @@ class L2capServer(StreamedPacketIO):
# RfcommClient
# -----------------------------------------------------------------------------
class RfcommClient(StreamedPacketIO):
def __init__(self, device, channel, uuid):
def __init__(self, device, channel, uuid, l2cap_mtu, max_frame_size, window_size):
super().__init__()
self.device = device
self.channel = channel
self.uuid = uuid
self.l2cap_mtu = l2cap_mtu
self.max_frame_size = max_frame_size
self.window_size = window_size
self.rfcomm_session = None
self.ready = asyncio.Event()
@@ -957,7 +919,9 @@ class RfcommClient(StreamedPacketIO):
logging.info(
color(f'@@@ Discovering channel number from UUID {self.uuid}', 'cyan')
)
channel = await find_rfcomm_channel_with_uuid(connection, self.uuid)
channel = await bumble.rfcomm.find_rfcomm_channel_with_uuid(
connection, self.uuid
)
logging.info(color(f'@@@ Channel number = {channel}', 'cyan'))
if channel == 0:
logging.info(color('!!! No RFComm service with this UUID found', 'red'))
@@ -966,13 +930,21 @@ class RfcommClient(StreamedPacketIO):
# Create a client and start it
logging.info(color('*** Starting RFCOMM client...', 'blue'))
rfcomm_client = bumble.rfcomm.Client(connection)
rfcomm_options = {}
if self.l2cap_mtu:
rfcomm_options['l2cap_mtu'] = self.l2cap_mtu
rfcomm_client = bumble.rfcomm.Client(connection, **rfcomm_options)
rfcomm_mux = await rfcomm_client.start()
logging.info(color('*** Started', 'blue'))
logging.info(color(f'### Opening session for channel {channel}...', 'yellow'))
try:
rfcomm_session = await rfcomm_mux.open_dlc(channel)
dlc_options = {}
if self.max_frame_size:
dlc_options['max_frame_size'] = self.max_frame_size
if self.window_size:
dlc_options['window_size'] = self.window_size
rfcomm_session = await rfcomm_mux.open_dlc(channel, **dlc_options)
logging.info(color(f'### Session open: {rfcomm_session}', 'yellow'))
except bumble.core.ConnectionError as error:
logging.info(color(f'!!! Session open failed: {error}', 'red'))
@@ -997,13 +969,16 @@ class RfcommClient(StreamedPacketIO):
# RfcommServer
# -----------------------------------------------------------------------------
class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel):
def __init__(self, device, channel, l2cap_mtu):
super().__init__()
self.dlc = None
self.ready = asyncio.Event()
# Create and register a server
rfcomm_server = bumble.rfcomm.Server(device)
server_options = {}
if l2cap_mtu:
server_options['l2cap_mtu'] = l2cap_mtu
rfcomm_server = bumble.rfcomm.Server(device, **server_options)
# Listen for incoming DLC connections
channel_number = rfcomm_server.listen(self.on_dlc, channel)
@@ -1089,9 +1064,9 @@ class Central(Connection.Listener):
if self.phy not in (None, HCI_LE_1M_PHY):
# Add an connections parameters entry for this PHY.
self.connection_parameter_preferences[
self.phy
] = connection_parameter_preferences
self.connection_parameter_preferences[self.phy] = (
connection_parameter_preferences
)
else:
self.connection_parameter_preferences = None
@@ -1259,6 +1234,7 @@ class Peripheral(Device.Listener, Connection.Listener):
'cyan',
)
)
await self.connected.wait()
logging.info(color('### Connected', 'cyan'))
@@ -1340,11 +1316,20 @@ def create_mode_factory(ctx, default_mode):
if mode == 'rfcomm-client':
return RfcommClient(
device, channel=ctx.obj['rfcomm_channel'], uuid=ctx.obj['rfcomm_uuid']
device,
channel=ctx.obj['rfcomm_channel'],
uuid=ctx.obj['rfcomm_uuid'],
l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'],
max_frame_size=ctx.obj['rfcomm_max_frame_size'],
window_size=ctx.obj['rfcomm_window_size'],
)
if mode == 'rfcomm-server':
return RfcommServer(device, channel=ctx.obj['rfcomm_channel'])
return RfcommServer(
device,
channel=ctx.obj['rfcomm_channel'],
l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'],
)
raise ValueError('invalid mode')
@@ -1431,6 +1416,21 @@ def create_role_factory(ctx, default_role):
default=DEFAULT_RFCOMM_UUID,
help='RFComm service UUID to use (ignored if --rfcomm-channel is not 0)',
)
@click.option(
'--rfcomm-l2cap-mtu',
type=int,
help='RFComm L2CAP MTU',
)
@click.option(
'--rfcomm-max-frame-size',
type=int,
help='RFComm maximum frame size',
)
@click.option(
'--rfcomm-window-size',
type=int,
help='RFComm window size',
)
@click.option(
'--l2cap-psm',
type=int,
@@ -1528,6 +1528,9 @@ def bench(
linger,
rfcomm_channel,
rfcomm_uuid,
rfcomm_l2cap_mtu,
rfcomm_max_frame_size,
rfcomm_window_size,
l2cap_psm,
l2cap_mtu,
l2cap_mps,
@@ -1540,6 +1543,9 @@ def bench(
ctx.obj['att_mtu'] = att_mtu
ctx.obj['rfcomm_channel'] = rfcomm_channel
ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['rfcomm_l2cap_mtu'] = rfcomm_l2cap_mtu
ctx.obj['rfcomm_max_frame_size'] = rfcomm_max_frame_size
ctx.obj['rfcomm_window_size'] = rfcomm_window_size
ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['l2cap_mtu'] = l2cap_mtu
ctx.obj['l2cap_mps'] = l2cap_mps
@@ -1588,8 +1594,8 @@ def central(
mode_factory = create_mode_factory(ctx, 'gatt-client')
classic = ctx.obj['classic']
asyncio.run(
Central(
async def run_central():
await Central(
transport,
peripheral_address,
classic,
@@ -1601,7 +1607,8 @@ def central(
encrypt or authenticate,
ctx.obj['extended_data_length'],
).run()
)
asyncio.run(run_central())
@bench.command()
@@ -1612,15 +1619,16 @@ def peripheral(ctx, transport):
role_factory = create_role_factory(ctx, 'receiver')
mode_factory = create_mode_factory(ctx, 'gatt-server')
asyncio.run(
Peripheral(
async def run_peripheral():
await Peripheral(
transport,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
role_factory,
mode_factory,
).run()
)
asyncio.run(run_peripheral())
def main():

View File

@@ -99,7 +99,12 @@ class Loopback:
# make sure data can fit in one l2cap pdu
l2cap_header_size = 4
max_packet_size = host.acl_packet_queue.max_packet_size - l2cap_header_size
max_packet_size = (
host.acl_packet_queue
if host.acl_packet_queue
else host.le_acl_packet_queue
).max_packet_size - l2cap_header_size
if self.packet_size > max_packet_size:
print(
color(
@@ -183,7 +188,7 @@ class Loopback:
'--packet-count',
'-c',
metavar='COUNT',
type=int,
type=click.IntRange(1, 65535),
default=10,
help='Packet count',
)

View File

@@ -15,7 +15,11 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import datetime
import logging
import os
import struct
import click
from bumble.colors import color
@@ -24,6 +28,14 @@ from bumble.transport.common import PacketReader
from bumble.helpers import PacketTracer
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class SnoopPacketReader:
'''
@@ -36,12 +48,18 @@ class SnoopPacketReader:
DATALINK_BSCP = 1003
DATALINK_H5 = 1004
IDENTIFICATION_PATTERN = b'btsnoop\0'
TIMESTAMP_ANCHOR = datetime.datetime(2000, 1, 1)
TIMESTAMP_DELTA = 0x00E03AB44A676000
ONE_MICROSECOND = datetime.timedelta(microseconds=1)
def __init__(self, source):
self.source = source
self.at_end = False
# Read the header
identification_pattern = source.read(8)
if identification_pattern.hex().lower() != '6274736e6f6f7000':
if identification_pattern != self.IDENTIFICATION_PATTERN:
raise ValueError(
'not a valid snoop file, unexpected identification pattern'
)
@@ -55,19 +73,32 @@ class SnoopPacketReader:
# Read the record header
header = self.source.read(24)
if len(header) < 24:
return (0, None)
self.at_end = True
return (None, 0, None)
# Parse the header
(
original_length,
included_length,
packet_flags,
_cumulative_drops,
_timestamp_seconds,
_timestamp_microsecond,
) = struct.unpack('>IIIIII', header)
timestamp,
) = struct.unpack('>IIIIQ', header)
# Abort on truncated packets
# Skip truncated packets
if original_length != included_length:
return (0, None)
print(
color(
f"!!! truncated packet ({included_length}/{original_length})", "red"
)
)
self.source.read(included_length)
return (None, 0, None)
# Convert the timestamp to a datetime object.
ts_dt = self.TIMESTAMP_ANCHOR + datetime.timedelta(
microseconds=timestamp - self.TIMESTAMP_DELTA
)
if self.data_link_type == self.DATALINK_H1:
# The packet is un-encapsulated, look at the flags to figure out its type
@@ -89,7 +120,17 @@ class SnoopPacketReader:
bytes([packet_type]) + self.source.read(included_length),
)
return (packet_flags & 1, self.source.read(included_length))
return (ts_dt, packet_flags & 1, self.source.read(included_length))
# -----------------------------------------------------------------------------
class Printer:
def __init__(self):
self.index = 0
def print(self, message: str) -> None:
self.index += 1
print(f"[{self.index:8}]{message}")
# -----------------------------------------------------------------------------
@@ -122,24 +163,28 @@ def main(format, vendors, filename):
packet_reader = PacketReader(input)
def read_next_packet():
return (0, packet_reader.next_packet())
return (None, 0, packet_reader.next_packet())
else:
packet_reader = SnoopPacketReader(input)
read_next_packet = packet_reader.next_packet
tracer = PacketTracer(emit_message=print)
printer = Printer()
tracer = PacketTracer(emit_message=printer.print)
while True:
while not packet_reader.at_end:
try:
(direction, packet) = read_next_packet()
if packet is None:
break
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction)
(timestamp, direction, packet) = read_next_packet()
if packet:
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction, timestamp)
else:
printer.print(color("[TRUNCATED]", "red"))
except Exception as error:
logger.exception()
print(color(f'!!! {error}', 'red'))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
main() # pylint: disable=no-value-for-parameter

View File

@@ -76,6 +76,7 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
DEFAULT_UI_PORT = 7654
# -----------------------------------------------------------------------------
class AudioExtractor:
@staticmethod

View File

@@ -24,6 +24,7 @@ from bumble.device import Device
from bumble.keys import JsonKeyStore
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def unbond_with_keystore(keystore, address):
if address is None:

View File

@@ -652,7 +652,9 @@ class SbcPacketSource:
# Prepare for next packets
sequence_number += 1
sequence_number &= 0xFFFF
timestamp += sum((frame.sample_count for frame in frames))
timestamp &= 0xFFFFFFFF
frames = [frame]
frames_size = len(frame.payload)
else:

View File

@@ -655,7 +655,7 @@ class ATT_Write_Command(ATT_PDU):
@ATT_PDU.subclass(
[
('attribute_handle', HANDLE_FIELD_SPEC),
('attribute_value', '*')
('attribute_value', '*'),
# ('authentication_signature', 'TODO')
]
)

View File

@@ -325,8 +325,8 @@ class MediaPacket:
self.padding = padding
self.extension = extension
self.marker = marker
self.sequence_number = sequence_number
self.timestamp = timestamp
self.sequence_number = sequence_number & 0xFFFF
self.timestamp = timestamp & 0xFFFFFFFF
self.ssrc = ssrc
self.csrc_list = csrc_list
self.payload_type = payload_type
@@ -341,7 +341,12 @@ class MediaPacket:
| len(self.csrc_list),
self.marker << 7 | self.payload_type,
]
) + struct.pack('>HII', self.sequence_number, self.timestamp, self.ssrc)
) + struct.pack(
'>HII',
self.sequence_number,
self.timestamp,
self.ssrc,
)
for csrc in self.csrc_list:
header += struct.pack('>I', csrc)
return header + self.payload
@@ -1470,10 +1475,10 @@ class Protocol(EventEmitter):
f'[{transaction_label}] {message}'
)
max_fragment_size = (
self.l2cap_channel.mtu - 3
self.l2cap_channel.peer_mtu - 3
) # Enough space for a 3-byte start packet header
payload = message.payload
if len(payload) + 2 <= self.l2cap_channel.mtu:
if len(payload) + 2 <= self.l2cap_channel.peer_mtu:
# Fits in a single packet
packet_type = self.PacketType.SINGLE_PACKET
else:
@@ -1545,9 +1550,10 @@ class Protocol(EventEmitter):
assert False # Should never reach this
async def get_capabilities(
self, seid: int
) -> Union[Get_Capabilities_Response, Get_All_Capabilities_Response,]:
async def get_capabilities(self, seid: int) -> Union[
Get_Capabilities_Response,
Get_All_Capabilities_Response,
]:
if self.version > (1, 2):
return await self.send_command(Get_All_Capabilities_Command(seid))

View File

@@ -1745,9 +1745,11 @@ class Protocol(pyee.EventEmitter):
avc.CommandFrame.CommandType.CONTROL,
avc.Frame.SubunitType.PANEL,
0,
avc.PassThroughFrame.StateFlag.PRESSED
if pressed
else avc.PassThroughFrame.StateFlag.RELEASED,
(
avc.PassThroughFrame.StateFlag.PRESSED
if pressed
else avc.PassThroughFrame.StateFlag.RELEASED
),
key,
b'',
)

View File

@@ -134,15 +134,15 @@ class Controller:
self.hci_sink = None
self.link = link
self.central_connections: Dict[
Address, Connection
] = {} # Connections where this controller is the central
self.peripheral_connections: Dict[
Address, Connection
] = {} # Connections where this controller is the peripheral
self.classic_connections: Dict[
Address, Connection
] = {} # Connections in BR/EDR
self.central_connections: Dict[Address, Connection] = (
{}
) # Connections where this controller is the central
self.peripheral_connections: Dict[Address, Connection] = (
{}
) # Connections where this controller is the peripheral
self.classic_connections: Dict[Address, Connection] = (
{}
) # Connections in BR/EDR
self.central_cis_links: Dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: Dict[int, CisLink] = {} # CIS links by handle
@@ -1151,7 +1151,28 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.4.3 Read Local Supported Features Command
'''
return bytes([HCI_SUCCESS]) + self.lmp_features
return bytes([HCI_SUCCESS]) + self.lmp_features[:8]
def on_hci_read_local_extended_features_command(self, command):
'''
See Bluetooth spec Vol 4, Part E - 7.4.4 Read Local Extended Features Command
'''
if command.page_number * 8 > len(self.lmp_features):
return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR])
return (
bytes(
[
# Status
HCI_SUCCESS,
# Page number
command.page_number,
# Max page number
len(self.lmp_features) // 8 - 1,
]
)
# Features of the current page
+ self.lmp_features[command.page_number * 8 : (command.page_number + 1) * 8]
)
def on_hci_read_buffer_size_command(self, _command):
'''
@@ -1522,6 +1543,20 @@ class Controller:
}
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
Length Command
'''
return struct.pack('<BH', HCI_SUCCESS, 0x0672)
def on_hci_le_read_number_of_supported_advertising_sets_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.58 LE Read Number of Supported
Advertising Set Command
'''
return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

File diff suppressed because it is too large Load Diff

View File

@@ -25,7 +25,7 @@ import pathlib
import platform
from typing import Dict, Iterable, Optional, Type, TYPE_CHECKING
from . import rtk
from . import rtk, intel
from .common import Driver
if TYPE_CHECKING:
@@ -45,7 +45,7 @@ async def get_driver_for_host(host: Host) -> Optional[Driver]:
found.
If a "driver" HCI metadata entry is present, only that driver class will be probed.
"""
driver_classes: Dict[str, Type[Driver]] = {"rtk": rtk.Driver}
driver_classes: Dict[str, Type[Driver]] = {"rtk": rtk.Driver, "intel": intel.Driver}
probe_list: Iterable[str]
if driver_name := host.hci_metadata.get("driver"):
# Only probe a single driver

102
bumble/drivers/intel.py Normal file
View File

@@ -0,0 +1,102 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
from bumble.drivers import common
from bumble.hci import (
hci_vendor_command_op_code, # type: ignore
HCI_Command,
HCI_Reset_Command,
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constant
# -----------------------------------------------------------------------------
INTEL_USB_PRODUCTS = {
# Intel AX210
(0x8087, 0x0032),
# Intel BE200
(0x8087, 0x0036),
}
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore
HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00]
HCI_Command.register_commands(globals())
@HCI_Command.command( # type: ignore
fields=[("params", "*")],
return_parameters_fields=[
("params", "*"),
],
)
class Hci_Intel_DDC_Config_Write_Command(HCI_Command):
pass
class Driver(common.Driver):
def __init__(self, host):
self.host = host
@staticmethod
def check(host):
driver = host.hci_metadata.get("driver")
if driver == "intel":
return True
vendor_id = host.hci_metadata.get("vendor_id")
product_id = host.hci_metadata.get("product_id")
if vendor_id is None or product_id is None:
logger.debug("USB metadata not sufficient")
return False
if (vendor_id, product_id) not in INTEL_USB_PRODUCTS:
logger.debug(
f"USB device ({vendor_id:04X}, {product_id:04X}) " "not in known list"
)
return False
return True
@classmethod
async def for_host(cls, host, force=False): # type: ignore
# Only instantiate this driver if explicitly selected
if not force and not cls.check(host):
return None
return cls(host)
async def init_controller(self):
self.host.ready = True
await self.host.send_command(HCI_Reset_Command(), check_result=True)
await self.host.send_command(
Hci_Intel_DDC_Config_Write_Command(
params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD
)
)

View File

@@ -36,6 +36,7 @@ logger = logging.getLogger(__name__)
# Classes
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class GenericAccessService(Service):
def __init__(self, device_name, appearance=(0, 0)):

View File

@@ -342,9 +342,11 @@ class Service(Attribute):
uuid = UUID(uuid)
super().__init__(
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
if primary
else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
(
GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
if primary
else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE
),
Attribute.READABLE,
uuid.to_pdu_bytes(),
)
@@ -560,9 +562,9 @@ class CharacteristicAdapter:
def __init__(self, characteristic: Union[Characteristic, AttributeProxy]):
self.wrapped_characteristic = characteristic
self.subscribers: Dict[
Callable, Callable
] = {} # Map from subscriber to proxy subscriber
self.subscribers: Dict[Callable, Callable] = (
{}
) # Map from subscriber to proxy subscriber
if isinstance(characteristic, Characteristic):
self.read_value = self.read_encoded_value

View File

@@ -90,6 +90,22 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def show_services(services: Iterable[ServiceProxy]) -> None:
for service in services:
print(color(str(service), 'cyan'))
for characteristic in service.characteristics:
print(color(' ' + str(characteristic), 'magenta'))
for descriptor in characteristic.descriptors:
print(color(' ' + str(descriptor), 'green'))
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------
@@ -352,9 +368,7 @@ class Client:
if c.uuid == uuid
]
def get_attribute_grouping(
self, attribute_handle: int
) -> Optional[
def get_attribute_grouping(self, attribute_handle: int) -> Optional[
Union[
ServiceProxy,
Tuple[ServiceProxy, CharacteristicProxy],
@@ -1068,7 +1082,7 @@ class Client:
logger.warning('!!! unexpected response, there is no pending request')
return
# Sanity check: the response should match the pending request unless it is
# The response should match the pending request unless it is
# an error response
if att_pdu.op_code != ATT_ERROR_RESPONSE:
expected_response_name = self.pending_request.name.replace(

View File

@@ -328,7 +328,7 @@ class Server(EventEmitter):
f'handle=0x{characteristic.handle:04X}: {value.hex()}'
)
# Sanity check
# Check parameters
if len(value) != 2:
logger.warning('CCCD value not 2 bytes long')
return
@@ -445,9 +445,9 @@ class Server(EventEmitter):
assert self.pending_confirmations[connection.handle] is None
# Create a future value to hold the eventual response
pending_confirmation = self.pending_confirmations[
connection.handle
] = asyncio.get_running_loop().create_future()
pending_confirmation = self.pending_confirmations[connection.handle] = (
asyncio.get_running_loop().create_future()
)
try:
self.send_gatt_pdu(connection.handle, indication.to_bytes())

File diff suppressed because it is too large Load Diff

View File

@@ -18,6 +18,7 @@
from __future__ import annotations
from collections.abc import Callable, MutableMapping
import datetime
from typing import cast, Any, Optional
import logging
@@ -66,12 +67,13 @@ PSM_NAMES = {
rfcomm.RFCOMM_PSM: 'RFCOMM',
sdp.SDP_PSM: 'SDP',
avdtp.AVDTP_PSM: 'AVDTP',
avctp.AVCTP_PSM: 'AVCTP'
avctp.AVCTP_PSM: 'AVCTP',
# TODO: add more PSM values
}
AVCTP_PID_NAMES = {avrcp.AVRCP_PID: 'AVRCP'}
# -----------------------------------------------------------------------------
class PacketTracer:
class AclStream:
@@ -207,6 +209,7 @@ class PacketTracer:
self.label = label
self.emit_message = emit_message
self.acl_streams = {} # ACL streams, by connection handle
self.packet_timestamp: Optional[datetime.datetime] = None
def start_acl_stream(self, connection_handle: int) -> PacketTracer.AclStream:
logger.info(
@@ -234,7 +237,10 @@ class PacketTracer:
# Let the other forwarder know so it can cleanup its stream as well
self.peer.end_acl_stream(connection_handle)
def on_packet(self, packet: HCI_Packet) -> None:
def on_packet(
self, timestamp: Optional[datetime.datetime], packet: HCI_Packet
) -> None:
self.packet_timestamp = timestamp
self.emit(packet)
if packet.hci_packet_type == HCI_ACL_DATA_PACKET:
@@ -254,13 +260,22 @@ class PacketTracer:
)
def emit(self, message: Any) -> None:
self.emit_message(f'[{self.label}] {message}')
if self.packet_timestamp:
prefix = f"[{self.packet_timestamp.strftime('%Y-%m-%d %H:%M:%S.%f')}]"
else:
prefix = ""
self.emit_message(f'{prefix}[{self.label}] {message}')
def trace(self, packet: HCI_Packet, direction: int = 0) -> None:
def trace(
self,
packet: HCI_Packet,
direction: int = 0,
timestamp: Optional[datetime.datetime] = None,
) -> None:
if direction == 0:
self.host_to_controller_analyzer.on_packet(packet)
self.host_to_controller_analyzer.on_packet(timestamp, packet)
else:
self.controller_to_host_analyzer.on_packet(packet)
self.controller_to_host_analyzer.on_packet(timestamp, packet)
def __init__(
self,

File diff suppressed because it is too large Load Diff

View File

@@ -48,6 +48,7 @@ HID_INTERRUPT_PSM = 0x0013
class Message:
message_type: MessageType
# Report types
class ReportType(enum.IntEnum):
OTHER_REPORT = 0x00
@@ -416,7 +417,7 @@ class Device(HID):
data = bytearray()
data.append(report_id)
data.extend(ret.data)
if len(data) < self.l2cap_ctrl_channel.mtu: # type: ignore[union-attr]
if len(data) < self.l2cap_ctrl_channel.peer_mtu: # type: ignore[union-attr]
self.send_control_data(report_type=report_type, data=data)
else:
self.send_handshake_message(Message.Handshake.ERR_INVALID_PARAMETER)

View File

@@ -18,68 +18,35 @@
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging
import struct
from typing import Any, Awaitable, Callable, Deque, Dict, Optional, cast, TYPE_CHECKING
from typing import (
Any,
Awaitable,
Callable,
Deque,
Dict,
Optional,
Set,
cast,
TYPE_CHECKING,
)
from bumble.colors import color
from bumble.l2cap import L2CAP_PDU
from bumble.snoop import Snooper
from bumble import drivers
from .hci import (
Address,
HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET,
HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
HCI_READ_BUFFER_SIZE_COMMAND,
HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_SUPPORTED_COMMANDS_FLAGS,
HCI_SYNCHRONOUS_DATA_PACKET,
HCI_VERSION_BLUETOOTH_CORE_4_0,
HCI_AclDataPacket,
HCI_AclDataPacketAssembler,
HCI_Command,
HCI_Command_Complete_Event,
HCI_Constant,
HCI_Error,
HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command,
HCI_LE_Read_Local_Supported_Features_Command,
HCI_LE_Read_Suggested_Default_Data_Length_Command,
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command,
HCI_LE_Set_Event_Mask_Command,
HCI_LE_Write_Suggested_Default_Data_Length_Command,
HCI_Link_Key_Request_Negative_Reply_Command,
HCI_Link_Key_Request_Reply_Command,
HCI_Packet,
HCI_Read_Buffer_Size_Command,
HCI_Read_Local_Supported_Commands_Command,
HCI_Read_Local_Version_Information_Command,
HCI_Reset_Command,
HCI_Set_Event_Mask_Command,
HCI_SynchronousDataPacket,
LeFeatureMask,
)
from .core import (
from bumble import hci
from bumble.core import (
BT_BR_EDR_TRANSPORT,
BT_LE_TRANSPORT,
ConnectionPHY,
ConnectionParameters,
)
from .utils import AbortableEventEmitter
from .transport.common import TransportLostError
from bumble.utils import AbortableEventEmitter
from bumble.transport.common import TransportLostError
if TYPE_CHECKING:
from .transport.common import TransportSink, TransportSource
@@ -99,15 +66,15 @@ class AclPacketQueue:
self,
max_packet_size: int,
max_in_flight: int,
send: Callable[[HCI_Packet], None],
send: Callable[[hci.HCI_Packet], None],
) -> None:
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
self.in_flight = 0
self.send = send
self.packets: Deque[HCI_AclDataPacket] = collections.deque()
self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque()
def enqueue(self, packet: HCI_AclDataPacket) -> None:
def enqueue(self, packet: hci.HCI_AclDataPacket) -> None:
self.packets.appendleft(packet)
self.check_queue()
@@ -139,11 +106,13 @@ class AclPacketQueue:
# -----------------------------------------------------------------------------
class Connection:
def __init__(self, host: Host, handle: int, peer_address: Address, transport: int):
def __init__(
self, host: Host, handle: int, peer_address: hci.Address, transport: int
):
self.host = host
self.handle = handle
self.peer_address = peer_address
self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu)
self.transport = transport
acl_packet_queue: Optional[AclPacketQueue] = (
host.le_acl_packet_queue
@@ -153,7 +122,7 @@ class Connection:
assert acl_packet_queue
self.acl_packet_queue = acl_packet_queue
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None:
def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
self.assembler.feed_packet(packet)
def on_acl_pdu(self, pdu: bytes) -> None:
@@ -161,9 +130,25 @@ class Connection:
self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class ScoLink:
peer_address: hci.Address
handle: int
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CisLink:
peer_address: hci.Address
handle: int
# -----------------------------------------------------------------------------
class Host(AbortableEventEmitter):
connections: Dict[int, Connection]
cis_links: Dict[int, CisLink]
sco_links: Dict[int, ScoLink]
acl_packet_queue: Optional[AclPacketQueue] = None
le_acl_packet_queue: Optional[AclPacketQueue] = None
hci_sink: Optional[TransportSink] = None
@@ -171,7 +156,7 @@ class Host(AbortableEventEmitter):
long_term_key_provider: Optional[
Callable[[int, bytes, int], Awaitable[Optional[bytes]]]
]
link_key_provider: Optional[Callable[[Address], Awaitable[Optional[bytes]]]]
link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]]
def __init__(
self,
@@ -183,18 +168,23 @@ class Host(AbortableEventEmitter):
self.hci_metadata = {}
self.ready = False # True when we can accept incoming packets
self.connections = {} # Connections, by connection handle
self.cis_links = {} # CIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
self.pending_response = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None
self.local_supported_commands = bytes(64)
self.local_supported_commands = 0
self.local_le_features = 0
self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features
self.suggested_max_tx_octets = 251 # Max allowed
self.suggested_max_tx_time = 2120 # Max allowed
self.command_semaphore = asyncio.Semaphore(1)
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
self.snooper = None
self.snooper: Optional[Snooper] = None
# Connect to the source and sink if specified
if controller_source:
@@ -204,7 +194,7 @@ class Host(AbortableEventEmitter):
def find_connection_by_bd_addr(
self,
bd_addr: Address,
bd_addr: hci.Address,
transport: Optional[int] = None,
check_address_type: bool = False,
) -> Optional[Connection]:
@@ -246,49 +236,167 @@ class Host(AbortableEventEmitter):
# Send a reset command unless a driver has already done so.
if reset_needed:
await self.send_command(HCI_Reset_Command(), check_result=True)
await self.send_command(hci.HCI_Reset_Command(), check_result=True)
self.ready = True
response = await self.send_command(
HCI_Read_Local_Supported_Commands_Command(), check_result=True
hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True
)
self.local_supported_commands = int.from_bytes(
response.return_parameters.supported_commands, 'little'
)
self.local_supported_commands = response.return_parameters.supported_commands
if self.supports_command(HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(
HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True
)
self.local_le_features = struct.unpack(
'<Q', response.return_parameters.le_features
)[0]
if self.supports_command(HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND):
response = await self.send_command(
HCI_Read_Local_Version_Information_Command(), check_result=True
hci.HCI_Read_Local_Version_Information_Command(), check_result=True
)
self.local_version = response.return_parameters
if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND):
max_page_number = 0
page_number = 0
lmp_features = 0
while page_number <= max_page_number:
response = await self.send_command(
hci.HCI_Read_Local_Extended_Features_Command(
page_number=page_number
),
check_result=True,
)
lmp_features |= int.from_bytes(
response.return_parameters.extended_lmp_features, 'little'
) << (64 * page_number)
max_page_number = response.return_parameters.maximum_page_number
page_number += 1
self.local_lmp_features = hci.LmpFeatureMask(lmp_features)
elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND):
response = await self.send_command(
hci.HCI_Read_Local_Supported_Features_Command(), check_result=True
)
self.local_lmp_features = hci.LmpFeatureMask(
int.from_bytes(response.return_parameters.lmp_features, 'little')
)
await self.send_command(
HCI_Set_Event_Mask_Command(event_mask=bytes.fromhex('FFFFFFFFFFFFFF3F'))
hci.HCI_Set_Event_Mask_Command(
event_mask=hci.HCI_Set_Event_Mask_Command.mask(
[
hci.HCI_INQUIRY_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_EVENT,
hci.HCI_CONNECTION_COMPLETE_EVENT,
hci.HCI_CONNECTION_REQUEST_EVENT,
hci.HCI_DISCONNECTION_COMPLETE_EVENT,
hci.HCI_AUTHENTICATION_COMPLETE_EVENT,
hci.HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT,
hci.HCI_ENCRYPTION_CHANGE_EVENT,
hci.HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT,
hci.HCI_LINK_KEY_TYPE_CHANGED_EVENT,
hci.HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT,
hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT,
hci.HCI_QOS_SETUP_COMPLETE_EVENT,
hci.HCI_HARDWARE_ERROR_EVENT,
hci.HCI_FLUSH_OCCURRED_EVENT,
hci.HCI_ROLE_CHANGE_EVENT,
hci.HCI_MODE_CHANGE_EVENT,
hci.HCI_RETURN_LINK_KEYS_EVENT,
hci.HCI_PIN_CODE_REQUEST_EVENT,
hci.HCI_LINK_KEY_REQUEST_EVENT,
hci.HCI_LINK_KEY_NOTIFICATION_EVENT,
hci.HCI_LOOPBACK_COMMAND_EVENT,
hci.HCI_DATA_BUFFER_OVERFLOW_EVENT,
hci.HCI_MAX_SLOTS_CHANGE_EVENT,
hci.HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT,
hci.HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT,
hci.HCI_QOS_VIOLATION_EVENT,
hci.HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT,
hci.HCI_FLOW_SPECIFICATION_COMPLETE_EVENT,
hci.HCI_INQUIRY_RESULT_WITH_RSSI_EVENT,
hci.HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT,
hci.HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT,
hci.HCI_SNIFF_SUBRATING_EVENT,
hci.HCI_EXTENDED_INQUIRY_RESULT_EVENT,
hci.HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT,
hci.HCI_IO_CAPABILITY_REQUEST_EVENT,
hci.HCI_IO_CAPABILITY_RESPONSE_EVENT,
hci.HCI_USER_CONFIRMATION_REQUEST_EVENT,
hci.HCI_USER_PASSKEY_REQUEST_EVENT,
hci.HCI_REMOTE_OOB_DATA_REQUEST_EVENT,
hci.HCI_SIMPLE_PAIRING_COMPLETE_EVENT,
hci.HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT,
hci.HCI_ENHANCED_FLUSH_COMPLETE_EVENT,
hci.HCI_USER_PASSKEY_NOTIFICATION_EVENT,
hci.HCI_KEYPRESS_NOTIFICATION_EVENT,
hci.HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT,
hci.HCI_LE_META_EVENT,
]
)
)
)
if (
self.local_version is not None
and self.local_version.hci_version <= HCI_VERSION_BLUETOOTH_CORE_4_0
and self.local_version.hci_version <= hci.HCI_VERSION_BLUETOOTH_CORE_4_0
):
# Some older controllers don't like event masks with bits they don't
# understand
le_event_mask = bytes.fromhex('1F00000000000000')
else:
le_event_mask = bytes.fromhex('FFFFFFFF00000000')
le_event_mask = hci.HCI_LE_Set_Event_Mask_Command.mask(
[
hci.HCI_LE_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT,
hci.HCI_LE_LONG_TERM_KEY_REQUEST_EVENT,
hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT,
hci.HCI_LE_DATA_LENGTH_CHANGE_EVENT,
hci.HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT,
hci.HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT,
hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT,
hci.HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT,
hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT,
hci.HCI_LE_SCAN_TIMEOUT_EVENT,
hci.HCI_LE_ADVERTISING_SET_TERMINATED_EVENT,
hci.HCI_LE_SCAN_REQUEST_RECEIVED_EVENT,
hci.HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT,
hci.HCI_LE_CONNECTION_IQ_REPORT_EVENT,
hci.HCI_LE_CTE_REQUEST_FAILED_EVENT,
hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT,
hci.HCI_LE_CIS_ESTABLISHED_EVENT,
hci.HCI_LE_CIS_REQUEST_EVENT,
hci.HCI_LE_CREATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_TERMINATE_BIG_COMPLETE_EVENT,
hci.HCI_LE_BIG_SYNC_ESTABLISHED_EVENT,
hci.HCI_LE_BIG_SYNC_LOST_EVENT,
hci.HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT,
hci.HCI_LE_PATH_LOSS_THRESHOLD_EVENT,
hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT,
hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT,
hci.HCI_LE_SUBRATE_CHANGE_EVENT,
]
)
await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
)
if self.supports_command(HCI_READ_BUFFER_SIZE_COMMAND):
if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
HCI_Read_Buffer_Size_Command(), check_result=True
hci.HCI_Read_Buffer_Size_Command(), check_result=True
)
hc_acl_data_packet_length = (
response.return_parameters.hc_acl_data_packet_length
@@ -311,9 +419,9 @@ class Host(AbortableEventEmitter):
hc_le_acl_data_packet_length = 0
hc_total_num_le_acl_data_packets = 0
if self.supports_command(HCI_LE_READ_BUFFER_SIZE_COMMAND):
if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND):
response = await self.send_command(
HCI_LE_Read_Buffer_Size_Command(), check_result=True
hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True
)
hc_le_acl_data_packet_length = (
response.return_parameters.hc_le_acl_data_packet_length
@@ -340,10 +448,12 @@ class Host(AbortableEventEmitter):
)
if self.supports_command(
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND):
hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
) and self.supports_command(
hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND
):
response = await self.send_command(
HCI_LE_Read_Suggested_Default_Data_Length_Command()
hci.HCI_LE_Read_Suggested_Default_Data_Length_Command()
)
suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets
suggested_max_tx_time = response.return_parameters.suggested_max_tx_time
@@ -352,12 +462,34 @@ class Host(AbortableEventEmitter):
or suggested_max_tx_time != self.suggested_max_tx_time
):
await self.send_command(
HCI_LE_Write_Suggested_Default_Data_Length_Command(
hci.HCI_LE_Write_Suggested_Default_Data_Length_Command(
suggested_max_tx_octets=self.suggested_max_tx_octets,
suggested_max_tx_time=self.suggested_max_tx_time,
)
)
if self.supports_command(
hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(),
check_result=True,
)
self.number_of_supported_advertising_sets = (
response.return_parameters.num_supported_advertising_sets
)
if self.supports_command(
hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND
):
response = await self.send_command(
hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(),
check_result=True,
)
self.maximum_advertising_data_length = (
response.return_parameters.max_advertising_data_length
)
@property
def controller(self) -> Optional[TransportSink]:
return self.hci_sink
@@ -366,7 +498,7 @@ class Host(AbortableEventEmitter):
def controller(self, controller) -> None:
self.set_packet_sink(controller)
if controller:
controller.set_packet_sink(self)
self.set_packet_source(controller)
def set_packet_sink(self, sink: Optional[TransportSink]) -> None:
self.hci_sink = sink
@@ -375,7 +507,7 @@ class Host(AbortableEventEmitter):
source.set_packet_sink(self)
self.hci_metadata = getattr(source, 'metadata', self.hci_metadata)
def send_hci_packet(self, packet: HCI_Packet) -> None:
def send_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}')
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER)
@@ -406,11 +538,12 @@ class Host(AbortableEventEmitter):
else:
status = response.return_parameters.status
if status != HCI_SUCCESS:
if status != hci.HCI_SUCCESS:
logger.warning(
f'{command.name} failed ({HCI_Constant.error_name(status)})'
f'{command.name} failed '
f'({hci.HCI_Constant.error_name(status)})'
)
raise HCI_Error(status)
raise hci.HCI_Error(status)
return response
except Exception as error:
@@ -423,8 +556,8 @@ class Host(AbortableEventEmitter):
self.pending_response = None
# Use this method to send a command from a task
def send_command_sync(self, command: HCI_Command) -> None:
async def send_command(command: HCI_Command) -> None:
def send_command_sync(self, command: hci.HCI_Command) -> None:
async def send_command(command: hci.HCI_Command) -> None:
await self.send_command(command)
asyncio.create_task(send_command(command))
@@ -449,7 +582,7 @@ class Host(AbortableEventEmitter):
pb_flag = 0
while bytes_remaining:
data_total_length = min(bytes_remaining, packet_queue.max_packet_size)
acl_packet = HCI_AclDataPacket(
acl_packet = hci.HCI_AclDataPacket(
connection_handle=connection_handle,
pb_flag=pb_flag,
bc_flag=0,
@@ -462,35 +595,26 @@ class Host(AbortableEventEmitter):
offset += data_total_length
bytes_remaining -= data_total_length
def supports_command(self, command):
# Find the support flag position for this command
for octet, flags in enumerate(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag_position, value in enumerate(flags):
if value == command:
# Check if the flag is set
if octet < len(self.local_supported_commands) and flag_position < 8:
return (
self.local_supported_commands[octet] & (1 << flag_position)
) != 0
return False
def supports_command(self, op_code: int) -> bool:
return (
self.local_supported_commands
& hci.HCI_SUPPORTED_COMMANDS_MASKS.get(op_code, 0)
) != 0
@property
def supported_commands(self):
commands = []
for octet, flags in enumerate(self.local_supported_commands):
if octet < len(HCI_SUPPORTED_COMMANDS_FLAGS):
for flag in range(8):
if flags & (1 << flag) != 0:
command = HCI_SUPPORTED_COMMANDS_FLAGS[octet][flag]
if command is not None:
commands.append(command)
def supported_commands(self) -> Set[int]:
return set(
op_code
for op_code, mask in hci.HCI_SUPPORTED_COMMANDS_MASKS.items()
if self.local_supported_commands & mask
)
return commands
def supports_le_features(self, feature: LeFeatureMask) -> bool:
def supports_le_features(self, feature: hci.LeFeatureMask) -> bool:
return (self.local_le_features & feature) == feature
def supports_lmp_features(self, feature: hci.LmpFeatureMask) -> bool:
return self.local_lmp_features & (feature) == feature
@property
def supported_le_features(self):
return [
@@ -499,10 +623,10 @@ class Host(AbortableEventEmitter):
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet: bytes) -> None:
hci_packet = HCI_Packet.from_bytes(packet)
hci_packet = hci.HCI_Packet.from_bytes(packet)
if self.ready or (
isinstance(hci_packet, HCI_Command_Complete_Event)
and hci_packet.command_opcode == HCI_RESET_COMMAND
isinstance(hci_packet, hci.HCI_Command_Complete_Event)
and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
):
self.on_hci_packet(hci_packet)
else:
@@ -515,44 +639,44 @@ class Host(AbortableEventEmitter):
self.emit('flush')
def on_hci_packet(self, packet: HCI_Packet) -> None:
def on_hci_packet(self, packet: hci.HCI_Packet) -> None:
logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}')
if self.snooper:
self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST)
# If the packet is a command, invoke the handler for this packet
if packet.hci_packet_type == HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(HCI_Command, packet))
elif packet.hci_packet_type == HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(HCI_Event, packet))
elif packet.hci_packet_type == HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
if packet.hci_packet_type == hci.HCI_COMMAND_PACKET:
self.on_hci_command_packet(cast(hci.HCI_Command, packet))
elif packet.hci_packet_type == hci.HCI_EVENT_PACKET:
self.on_hci_event_packet(cast(hci.HCI_Event, packet))
elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET:
self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet))
elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet))
else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
def on_hci_command_packet(self, command: HCI_Command) -> None:
def on_hci_command_packet(self, command: hci.HCI_Command) -> None:
logger.warning(f'!!! unexpected command packet: {command}')
def on_hci_event_packet(self, event: HCI_Event) -> None:
def on_hci_event_packet(self, event: hci.HCI_Event) -> None:
handler_name = f'on_{event.name.lower()}'
handler = getattr(self, handler_name, self.on_hci_event)
handler(event)
def on_hci_acl_data_packet(self, packet: HCI_AclDataPacket) -> None:
def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None:
# Look for the connection to which this data belongs
if connection := self.connections.get(packet.connection_handle):
connection.on_hci_acl_data_packet(packet)
def on_hci_sco_data_packet(self, packet: HCI_SynchronousDataPacket) -> None:
def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None:
# Experimental
self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
@@ -616,11 +740,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_connection_complete_event(self, event):
# Check if this is a cancellation
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### LE CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.peer_address} as {HCI_Constant.role_name(event.role)}'
f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}'
)
connection = self.connections.get(event.connection_handle)
@@ -660,7 +784,7 @@ class Host(AbortableEventEmitter):
self.on_hci_le_connection_complete_event(event)
def on_hci_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] '
@@ -696,25 +820,38 @@ class Host(AbortableEventEmitter):
def on_hci_disconnection_complete_event(self, event):
# Find the connection
if (connection := self.connections.get(event.connection_handle)) is None:
handle = event.connection_handle
if (
connection := (
self.connections.get(handle)
or self.cis_links.get(handle)
or self.sco_links.get(handle)
)
) is None:
logger.warning('!!! DISCONNECTION COMPLETE: unknown handle')
return
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'### DISCONNECTION: [0x{event.connection_handle:04X}] '
f'### DISCONNECTION: [0x{handle:04X}] '
f'{connection.peer_address} '
f'reason={event.reason}'
)
del self.connections[event.connection_handle]
# Notify the listeners
self.emit('disconnection', event.connection_handle, event.reason)
self.emit('disconnection', handle, event.reason)
# Remove the handle reference
_ = (
self.connections.pop(handle, 0)
or self.cis_links.pop(handle, 0)
or self.sco_links.pop(handle, 0)
)
else:
logger.debug(f'### DISCONNECTION FAILED: {event.status}')
# Notify the listeners
self.emit('disconnection_failure', event.connection_handle, event.status)
self.emit('disconnection_failure', handle, event.status)
def on_hci_le_connection_update_complete_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
@@ -722,7 +859,7 @@ class Host(AbortableEventEmitter):
return
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
connection_parameters = ConnectionParameters(
event.connection_interval,
event.peripheral_latency,
@@ -742,7 +879,7 @@ class Host(AbortableEventEmitter):
return
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy)
self.emit('connection_phy_update', connection.handle, connection_phy)
else:
@@ -761,6 +898,7 @@ class Host(AbortableEventEmitter):
event.status,
event.advertising_handle,
event.connection_handle,
event.num_completed_extended_advertising_events,
)
def on_hci_le_cis_request_event(self, event):
@@ -774,7 +912,11 @@ class Host(AbortableEventEmitter):
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.cis_links[event.connection_handle] = CisLink(
handle=event.connection_handle,
peer_address=hci.Address.ANY,
)
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
@@ -789,7 +931,7 @@ class Host(AbortableEventEmitter):
# For now, just accept everything
# TODO: delegate the decision
self.send_command_sync(
HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command(
connection_handle=event.connection_handle,
interval_min=event.interval_min,
interval_max=event.interval_max,
@@ -820,12 +962,12 @@ class Host(AbortableEventEmitter):
),
)
if long_term_key:
response = HCI_LE_Long_Term_Key_Request_Reply_Command(
response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command(
connection_handle=event.connection_handle,
long_term_key=long_term_key,
)
else:
response = HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command(
connection_handle=event.connection_handle
)
@@ -834,13 +976,18 @@ class Host(AbortableEventEmitter):
asyncio.create_task(send_long_term_key())
def on_hci_synchronous_connection_complete_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
# Create/update the connection
logger.debug(
f'### SCO CONNECTION: [0x{event.connection_handle:04X}] '
f'{event.bd_addr}'
)
self.sco_links[event.connection_handle] = ScoLink(
peer_address=event.bd_addr,
handle=event.connection_handle,
)
# Notify the client
self.emit(
'sco_connection',
@@ -858,16 +1005,16 @@ class Host(AbortableEventEmitter):
pass
def on_hci_role_change_event(self, event):
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
logger.debug(
f'role change for {event.bd_addr}: '
f'{HCI_Constant.role_name(event.new_role)}'
f'{hci.HCI_Constant.role_name(event.new_role)}'
)
self.emit('role_change', event.bd_addr, event.new_role)
else:
logger.debug(
f'role change for {event.bd_addr} failed: '
f'{HCI_Constant.error_name(event.status)}'
f'{hci.HCI_Constant.error_name(event.status)}'
)
self.emit('role_change_failure', event.bd_addr, event.status)
@@ -883,7 +1030,7 @@ class Host(AbortableEventEmitter):
def on_hci_authentication_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('connection_authentication', event.connection_handle)
else:
self.emit(
@@ -894,7 +1041,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_change_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit(
'connection_encryption_change',
event.connection_handle,
@@ -907,7 +1054,7 @@ class Host(AbortableEventEmitter):
def on_hci_encryption_key_refresh_complete_event(self, event):
# Notify the client
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('connection_encryption_key_refresh', event.connection_handle)
else:
self.emit(
@@ -928,16 +1075,16 @@ class Host(AbortableEventEmitter):
def on_hci_link_key_notification_event(self, event):
logger.debug(
f'link key for {event.bd_addr}: {event.link_key.hex()}, '
f'type={HCI_Constant.link_key_type_name(event.key_type)}'
f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}'
)
self.emit('link_key', event.bd_addr, event.link_key, event.key_type)
def on_hci_simple_pairing_complete_event(self, event):
logger.debug(
f'simple pairing complete for {event.bd_addr}: '
f'status={HCI_Constant.status_name(event.status)}'
f'status={hci.HCI_Constant.status_name(event.status)}'
)
if event.status == HCI_SUCCESS:
if event.status == hci.HCI_SUCCESS:
self.emit('classic_pairing', event.bd_addr)
else:
self.emit('classic_pairing_failure', event.bd_addr, event.status)
@@ -957,11 +1104,11 @@ class Host(AbortableEventEmitter):
self.link_key_provider(event.bd_addr),
)
if link_key:
response = HCI_Link_Key_Request_Reply_Command(
response = hci.HCI_Link_Key_Request_Reply_Command(
bd_addr=event.bd_addr, link_key=link_key
)
else:
response = HCI_Link_Key_Request_Negative_Reply_Command(
response = hci.HCI_Link_Key_Request_Negative_Reply_Command(
bd_addr=event.bd_addr
)
@@ -1018,7 +1165,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_remote_name_request_complete_event(self, event):
if event.status != HCI_SUCCESS:
if event.status != hci.HCI_SUCCESS:
self.emit('remote_name_failure', event.bd_addr, event.status)
else:
utf8_name = event.remote_name
@@ -1036,7 +1183,7 @@ class Host(AbortableEventEmitter):
)
def on_hci_le_read_remote_features_complete_event(self, event):
if event.status != HCI_SUCCESS:
if event.status != hci.HCI_SUCCESS:
self.emit(
'le_remote_features_failure', event.connection_handle, event.status
)

View File

@@ -128,10 +128,10 @@ class PairingKeys:
def print(self, prefix=''):
keys_dict = self.to_dict()
for (container_property, value) in keys_dict.items():
for container_property, value in keys_dict.items():
if isinstance(value, dict):
print(f'{prefix}{color(container_property, "cyan")}:')
for (key_property, key_value) in value.items():
for key_property, key_value in value.items():
print(f'{prefix} {color(key_property, "green")}: {key_value}')
else:
print(f'{prefix}{color(container_property, "cyan")}: {value}')
@@ -158,7 +158,7 @@ class KeyStore:
async def get_resolving_keys(self):
all_keys = await self.get_all()
resolving_keys = []
for (name, keys) in all_keys:
for name, keys in all_keys:
if keys.irk is not None:
if keys.address_type is None:
address_type = Address.RANDOM_DEVICE_ADDRESS
@@ -171,7 +171,7 @@ class KeyStore:
async def print(self, prefix=''):
entries = await self.get_all()
separator = ''
for (name, keys) in entries:
for name, keys in entries:
print(separator + prefix + color(name, 'yellow'))
keys.print(prefix=prefix + ' ')
separator = '\n'

View File

@@ -173,7 +173,7 @@ L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE = 0x01
@dataclasses.dataclass
class ClassicChannelSpec:
psm: Optional[int] = None
mtu: int = L2CAP_MIN_BR_EDR_MTU
mtu: int = L2CAP_DEFAULT_MTU
@dataclasses.dataclass
@@ -208,7 +208,7 @@ class L2CAP_PDU:
@staticmethod
def from_bytes(data: bytes) -> L2CAP_PDU:
# Sanity check
# Check parameters
if len(data) < 4:
raise ValueError('not enough data for L2CAP header')
@@ -749,6 +749,8 @@ class ClassicChannel(EventEmitter):
sink: Optional[Callable[[bytes], Any]]
state: State
connection: Connection
mtu: int
peer_mtu: int
def __init__(
self,
@@ -765,6 +767,7 @@ class ClassicChannel(EventEmitter):
self.signaling_cid = signaling_cid
self.state = self.State.CLOSED
self.mtu = mtu
self.peer_mtu = L2CAP_MIN_BR_EDR_MTU
self.psm = psm
self.source_cid = source_cid
self.destination_cid = 0
@@ -861,7 +864,7 @@ class ClassicChannel(EventEmitter):
[
(
L2CAP_MAXIMUM_TRANSMISSION_UNIT_CONFIGURATION_OPTION_TYPE,
struct.pack('<H', L2CAP_DEFAULT_MTU),
struct.pack('<H', self.mtu),
)
]
)
@@ -926,8 +929,8 @@ class ClassicChannel(EventEmitter):
options = L2CAP_Control_Frame.decode_configuration_options(request.options)
for option in options:
if option[0] == L2CAP_MTU_CONFIGURATION_PARAMETER_TYPE:
self.mtu = struct.unpack('<H', option[1])[0]
logger.debug(f'MTU = {self.mtu}')
self.peer_mtu = struct.unpack('<H', option[1])[0]
logger.debug(f'peer MTU = {self.peer_mtu}')
self.send_control_frame(
L2CAP_Configure_Response(
@@ -1026,7 +1029,7 @@ class ClassicChannel(EventEmitter):
return (
f'Channel({self.source_cid}->{self.destination_cid}, '
f'PSM={self.psm}, '
f'MTU={self.mtu}, '
f'MTU={self.mtu}/{self.peer_mtu}, '
f'state={self.state.name})'
)

View File

@@ -34,8 +34,11 @@ from bumble.device import (
DEVICE_DEFAULT_SCAN_INTERVAL,
DEVICE_DEFAULT_SCAN_WINDOW,
Advertisement,
AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
Phy,
)
from bumble.gatt import Service
from bumble.hci import (
@@ -47,9 +50,12 @@ from bumble.hci import (
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
from pandora.host_grpc_aio import HostServicer
from pandora import host_pb2
from pandora.host_pb2 import (
NOT_CONNECTABLE,
NOT_DISCOVERABLE,
DISCOVERABLE_LIMITED,
DISCOVERABLE_GENERAL,
PRIMARY_1M,
PRIMARY_CODED,
SECONDARY_1M,
@@ -65,6 +71,7 @@ from pandora.host_pb2 import (
ConnectResponse,
DataTypes,
DisconnectRequest,
DiscoverabilityMode,
InquiryResponse,
PrimaryPhy,
ReadLocalAddressResponse,
@@ -94,6 +101,25 @@ SECONDARY_PHY_MAP: Dict[int, SecondaryPhy] = {
3: SECONDARY_CODED,
}
PRIMARY_PHY_TO_BUMBLE_PHY_MAP: Dict[PrimaryPhy, Phy] = {
PRIMARY_1M: Phy.LE_1M,
PRIMARY_CODED: Phy.LE_CODED,
}
SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
SECONDARY_NONE: Phy.LE_1M,
SECONDARY_1M: Phy.LE_1M,
SECONDARY_2M: Phy.LE_2M,
SECONDARY_CODED: Phy.LE_CODED,
}
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
}
class HostService(HostServicer):
waited_connections: Set[int]
@@ -261,9 +287,9 @@ class HostService(HostServicer):
self.log.debug(f"WaitDisconnection: {connection_handle}")
if connection := self.device.lookup_connection(connection_handle):
disconnection_future: asyncio.Future[
None
] = asyncio.get_running_loop().create_future()
disconnection_future: asyncio.Future[None] = (
asyncio.get_running_loop().create_future()
)
def on_disconnection(_: None) -> None:
disconnection_future.set_result(None)
@@ -281,10 +307,113 @@ class HostService(HostServicer):
async def Advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
if not request.legacy:
raise NotImplementedError(
"TODO: add support for extended advertising in Bumble"
try:
if request.legacy:
async for rsp in self.legacy_advertise(request, context):
yield rsp
else:
async for rsp in self.extended_advertise(request, context):
yield rsp
finally:
pass
async def extended_advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
advertising_data = bytes(self.unpack_data_types(request.data))
scan_response_data = bytes(self.unpack_data_types(request.scan_response_data))
scannable = len(scan_response_data) != 0
advertising_event_properties = AdvertisingEventProperties(
is_connectable=request.connectable,
is_scannable=scannable,
is_directed=request.target is not None,
is_high_duty_cycle_directed_connectable=False,
is_legacy=False,
is_anonymous=False,
include_tx_power=False,
)
peer_address = Address.ANY
if request.target:
# Need to reverse bytes order since Bumble Address is using MSB.
target_bytes = bytes(reversed(request.target))
if request.target_variant() == "public":
peer_address = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS)
else:
peer_address = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
advertising_parameters = AdvertisingParameters(
advertising_event_properties=advertising_event_properties,
own_address_type=OWN_ADDRESS_MAP[request.own_address_type],
peer_address=peer_address,
primary_advertising_phy=PRIMARY_PHY_TO_BUMBLE_PHY_MAP[request.primary_phy],
secondary_advertising_phy=SECONDARY_PHY_TO_BUMBLE_PHY_MAP[
request.secondary_phy
],
)
if advertising_interval := request.interval:
advertising_parameters.primary_advertising_interval_min = int(
advertising_interval
)
advertising_parameters.primary_advertising_interval_max = int(
advertising_interval
)
if interval_range := request.interval_range:
advertising_parameters.primary_advertising_interval_max += int(
interval_range
)
advertising_set = await self.device.create_advertising_set(
advertising_parameters=advertising_parameters,
advertising_data=advertising_data,
scan_response_data=scan_response_data,
)
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
if request.connectable:
def on_connection(connection: bumble.device.Connection) -> None:
if (
connection.transport == BT_LE_TRANSPORT
and connection.role == BT_PERIPHERAL_ROLE
):
pending_connection.set_result(connection)
self.device.on('connection', on_connection)
try:
# Advertise until RPC is canceled
while True:
if not advertising_set.enabled:
self.log.debug('Advertise (extended)')
await advertising_set.start()
if not request.connectable:
await asyncio.sleep(1)
continue
connection = await pending_connection
pending_connection = asyncio.get_running_loop().create_future()
cookie = any_pb2.Any(value=connection.handle.to_bytes(4, 'big'))
yield AdvertiseResponse(connection=Connection(cookie=cookie))
await asyncio.sleep(1)
finally:
try:
self.log.debug('Stop Advertise (extended)')
await advertising_set.stop()
await advertising_set.remove()
except Exception:
pass
async def legacy_advertise(
self, request: AdvertiseRequest, context: grpc.ServicerContext
) -> AsyncGenerator[AdvertiseResponse, None]:
if advertising_interval := request.interval:
self.device.config.advertising_interval_min = int(advertising_interval)
self.device.config.advertising_interval_max = int(advertising_interval)
@@ -357,14 +486,10 @@ class HostService(HostServicer):
target_bytes = bytes(reversed(request.target))
if request.target_variant() == "public":
target = Address(target_bytes, Address.PUBLIC_DEVICE_ADDRESS)
advertising_type = (
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY
) # FIXME: HIGH_DUTY ?
advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
else:
target = Address(target_bytes, Address.RANDOM_DEVICE_ADDRESS)
advertising_type = (
AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY
) # FIXME: HIGH_DUTY ?
advertising_type = AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY
if request.connectable:
@@ -391,9 +516,9 @@ class HostService(HostServicer):
await asyncio.sleep(1)
continue
pending_connection: asyncio.Future[
bumble.device.Connection
] = asyncio.get_running_loop().create_future()
pending_connection: asyncio.Future[bumble.device.Connection] = (
asyncio.get_running_loop().create_future()
)
self.log.debug('Wait for LE connection...')
connection = await pending_connection
@@ -422,23 +547,31 @@ class HostService(HostServicer):
self, request: ScanRequest, context: grpc.ServicerContext
) -> AsyncGenerator[ScanningResponse, None]:
# TODO: modify `start_scanning` to accept floats instead of int for ms values
if request.phys:
raise NotImplementedError("TODO: add support for `request.phys`")
self.log.debug('Scan')
scanning_phys = []
if PRIMARY_1M in request.phys:
scanning_phys.append(int(Phy.LE_1M))
if PRIMARY_CODED in request.phys:
scanning_phys.append(int(Phy.LE_CODED))
if not scanning_phys:
scanning_phys = [int(Phy.LE_1M), int(Phy.LE_CODED)]
scan_queue: asyncio.Queue[Advertisement] = asyncio.Queue()
handler = self.device.on('advertisement', scan_queue.put_nowait)
await self.device.start_scanning(
legacy=request.legacy,
active=not request.passive,
own_address_type=request.own_address_type,
scan_interval=int(request.interval)
if request.interval
else DEVICE_DEFAULT_SCAN_INTERVAL,
scan_window=int(request.window)
if request.window
else DEVICE_DEFAULT_SCAN_WINDOW,
scan_interval=(
int(request.interval)
if request.interval
else DEVICE_DEFAULT_SCAN_INTERVAL
),
scan_window=(
int(request.window) if request.window else DEVICE_DEFAULT_SCAN_WINDOW
),
scanning_phys=scanning_phys,
)
try:
@@ -651,9 +784,11 @@ class HostService(HostServicer):
*struct.pack('<H', dt.peripheral_connection_interval_min),
*struct.pack(
'<H',
dt.peripheral_connection_interval_max
if dt.peripheral_connection_interval_max
else dt.peripheral_connection_interval_min,
(
dt.peripheral_connection_interval_max
if dt.peripheral_connection_interval_max
else dt.peripheral_connection_interval_min
),
),
]
),
@@ -735,6 +870,16 @@ class HostService(HostServicer):
)
)
flag_map = {
NOT_DISCOVERABLE: 0x00,
DISCOVERABLE_LIMITED: AdvertisingData.LE_LIMITED_DISCOVERABLE_MODE_FLAG,
DISCOVERABLE_GENERAL: AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG,
}
if dt.le_discoverability_mode:
flags = flag_map[dt.le_discoverability_mode]
ad_structures.append((AdvertisingData.FLAGS, flags.to_bytes(1, 'big')))
return AdvertisingData(ad_structures)
def pack_data_types(self, ad: AdvertisingData) -> DataTypes:

View File

@@ -383,9 +383,9 @@ class SecurityService(SecurityServicer):
connection.transport
] == request.level_variant()
wait_for_security: asyncio.Future[
str
] = asyncio.get_running_loop().create_future()
wait_for_security: asyncio.Future[str] = (
asyncio.get_running_loop().create_future()
)
authenticate_task: Optional[asyncio.Future[None]] = None
pair_task: Optional[asyncio.Future[None]] = None

View File

@@ -24,8 +24,9 @@ import enum
import struct
import functools
import logging
from typing import Optional, List, Union, Type, Dict, Any, Tuple, cast
from typing import Optional, List, Union, Type, Dict, Any, Tuple
from bumble import core
from bumble import colors
from bumble import device
from bumble import hci
@@ -228,6 +229,14 @@ class SupportedFrameDuration(enum.IntFlag):
DURATION_10000_US_PREFERRED = 0b0010
class AnnouncementType(enum.IntEnum):
'''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements'''
# fmt: off
GENERAL = 0x00
TARGETED = 0x01
# -----------------------------------------------------------------------------
# ASE Operations
# -----------------------------------------------------------------------------
@@ -453,6 +462,34 @@ class AudioRole(enum.IntEnum):
SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
@dataclasses.dataclass
class UnicastServerAdvertisingData:
"""Advertising Data for ASCS."""
announcement_type: AnnouncementType = AnnouncementType.TARGETED
available_audio_contexts: ContextType = ContextType.MEDIA
metadata: bytes = b''
def __bytes__(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
)
+ self.metadata,
)
]
)
)
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------

View File

@@ -19,8 +19,8 @@
import struct
from typing import Optional, Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
@@ -59,7 +59,7 @@ class DeviceInformationService(TemplateService):
firmware_revision: Optional[str] = None,
software_revision: Optional[str] = None,
system_id: Optional[Tuple[int, int]] = None, # (OUI, Manufacturer ID)
ieee_regulatory_certification_data_list: Optional[bytes] = None
ieee_regulatory_certification_data_list: Optional[bytes] = None,
# TODO: pnp_id
):
characteristics = [
@@ -104,10 +104,19 @@ class DeviceInformationService(TemplateService):
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
def __init__(self, service_proxy):
manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
for (field, uuid) in (
for field, uuid in (
('manufacturer_name', GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),
('model_number', GATT_MODEL_NUMBER_STRING_CHARACTERISTIC),
('serial_number', GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC),

228
bumble/profiles/vcp.py Normal file
View File

@@ -0,0 +1,228 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
from bumble import att
from bumble import device
from bumble import gatt
from bumble import gatt_client
from typing import Optional
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
MIN_VOLUME = 0
MAX_VOLUME = 255
class ErrorCode(enum.IntEnum):
'''
See Volume Control Service 1.6. Application error codes.
'''
INVALID_CHANGE_COUNTER = 0x80
OPCODE_NOT_SUPPORTED = 0x81
class VolumeFlags(enum.IntFlag):
'''
See Volume Control Service 3.3. Volume Flags.
'''
VOLUME_SETTING_PERSISTED = 0x01
# RFU
class VolumeControlPointOpcode(enum.IntEnum):
'''
See Volume Control Service Table 3.3: Volume Control Point procedure requirements.
'''
# fmt: off
RELATIVE_VOLUME_DOWN = 0x00
RELATIVE_VOLUME_UP = 0x01
UNMUTE_RELATIVE_VOLUME_DOWN = 0x02
UNMUTE_RELATIVE_VOLUME_UP = 0x03
SET_ABSOLUTE_VOLUME = 0x04
UNMUTE = 0x05
MUTE = 0x06
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class VolumeControlService(gatt.TemplateService):
UUID = gatt.GATT_VOLUME_CONTROL_SERVICE
volume_state: gatt.Characteristic
volume_control_point: gatt.Characteristic
volume_flags: gatt.Characteristic
volume_setting: int
muted: int
change_counter: int
def __init__(
self,
step_size: int = 16,
volume_setting: int = 0,
muted: int = 0,
change_counter: int = 0,
volume_flags: int = 0,
) -> None:
self.step_size = step_size
self.volume_setting = volume_setting
self.muted = muted
self.change_counter = change_counter
self.volume_state = gatt.Characteristic(
uuid=gatt.GATT_VOLUME_STATE_CHARACTERISTIC,
properties=(
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY
),
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(read=self._on_read_volume_state),
)
self.volume_control_point = gatt.Characteristic(
uuid=gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.WRITE,
permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=gatt.CharacteristicValue(write=self._on_write_volume_control_point),
)
self.volume_flags = gatt.Characteristic(
uuid=gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=bytes([volume_flags]),
)
super().__init__(
[
self.volume_state,
self.volume_control_point,
self.volume_flags,
]
)
@property
def volume_state_bytes(self) -> bytes:
return bytes([self.volume_setting, self.muted, self.change_counter])
@volume_state_bytes.setter
def volume_state_bytes(self, new_value: bytes) -> None:
self.volume_setting, self.muted, self.change_counter = new_value
def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes:
return self.volume_state_bytes
def _on_write_volume_control_point(
self, connection: Optional[device.Connection], value: bytes
) -> None:
assert connection
opcode = VolumeControlPointOpcode(value[0])
change_counter = value[1]
if change_counter != self.change_counter:
raise att.ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
handler = getattr(self, '_on_' + opcode.name.lower())
if handler(*value[2:]):
self.change_counter = (self.change_counter + 1) % 256
connection.abort_on(
'disconnection',
connection.device.notify_subscribers(
attribute=self.volume_state,
value=self.volume_state_bytes,
),
)
self.emit(
'volume_state', self.volume_setting, self.muted, self.change_counter
)
def _on_relative_volume_down(self) -> bool:
old_volume = self.volume_setting
self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME)
return self.volume_setting != old_volume
def _on_relative_volume_up(self) -> bool:
old_volume = self.volume_setting
self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME)
return self.volume_setting != old_volume
def _on_unmute_relative_volume_down(self) -> bool:
old_volume, old_muted_state = self.volume_setting, self.muted
self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME)
self.muted = 0
return (self.volume_setting, self.muted) != (old_volume, old_muted_state)
def _on_unmute_relative_volume_up(self) -> bool:
old_volume, old_muted_state = self.volume_setting, self.muted
self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME)
self.muted = 0
return (self.volume_setting, self.muted) != (old_volume, old_muted_state)
def _on_set_absolute_volume(self, volume_setting: int) -> bool:
old_volume_setting = self.volume_setting
self.volume_setting = volume_setting
return old_volume_setting != self.volume_setting
def _on_unmute(self) -> bool:
old_muted_state = self.muted
self.muted = 0
return self.muted != old_muted_state
def _on_mute(self) -> bool:
old_muted_state = self.muted
self.muted = 1
return self.muted != old_muted_state
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = VolumeControlService
volume_control_point: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
self.service_proxy = service_proxy
self.volume_state = gatt.PackedCharacteristicAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_VOLUME_STATE_CHARACTERISTIC
)[0],
'BBB',
)
self.volume_control_point = service_proxy.get_characteristics_by_uuid(
gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC
)[0]
self.volume_flags = gatt.PackedCharacteristicAdapter(
service_proxy.get_characteristics_by_uuid(
gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC
)[0],
'B',
)

View File

@@ -19,12 +19,16 @@ from __future__ import annotations
import logging
import asyncio
import dataclasses
import enum
from typing import Callable, Dict, List, Optional, Tuple, Union, TYPE_CHECKING
from typing_extensions import Self
from pyee import EventEmitter
from . import core, l2cap
from bumble import core
from bumble import l2cap
from bumble import sdp
from .colors import color
from .core import (
UUID,
@@ -34,15 +38,6 @@ from .core import (
InvalidStateError,
ProtocolError,
)
from .sdp import (
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_PUBLIC_BROWSE_ROOT,
DataElement,
ServiceAttribute,
)
if TYPE_CHECKING:
from bumble.device import Device, Connection
@@ -60,27 +55,18 @@ logger = logging.getLogger(__name__)
RFCOMM_PSM = 0x0003
class FrameType(enum.IntEnum):
SABM = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first
UA = 0x63 # Control field [0,1,1,0,_,0,1,1] LSB-first
DM = 0x0F # Control field [1,1,1,1,_,0,0,0] LSB-first
DISC = 0x43 # Control field [0,1,0,_,0,0,1,1] LSB-first
UIH = 0xEF # Control field [1,1,1,_,1,1,1,1] LSB-first
UI = 0x03 # Control field [0,0,0,_,0,0,1,1] LSB-first
# Frame types
RFCOMM_SABM_FRAME = 0x2F # Control field [1,1,1,1,_,1,0,0] LSB-first
RFCOMM_UA_FRAME = 0x63 # Control field [0,1,1,0,_,0,1,1] LSB-first
RFCOMM_DM_FRAME = 0x0F # Control field [1,1,1,1,_,0,0,0] LSB-first
RFCOMM_DISC_FRAME = 0x43 # Control field [0,1,0,_,0,0,1,1] LSB-first
RFCOMM_UIH_FRAME = 0xEF # Control field [1,1,1,_,1,1,1,1] LSB-first
RFCOMM_UI_FRAME = 0x03 # Control field [0,0,0,_,0,0,1,1] LSB-first
class MccType(enum.IntEnum):
PN = 0x20
MSC = 0x38
RFCOMM_FRAME_TYPE_NAMES = {
RFCOMM_SABM_FRAME: 'SABM',
RFCOMM_UA_FRAME: 'UA',
RFCOMM_DM_FRAME: 'DM',
RFCOMM_DISC_FRAME: 'DISC',
RFCOMM_UIH_FRAME: 'UIH',
RFCOMM_UI_FRAME: 'UI'
}
# MCC Types
RFCOMM_MCC_PN_TYPE = 0x20
RFCOMM_MCC_MSC_TYPE = 0x38
# FCS CRC
CRC_TABLE = bytes([
@@ -118,7 +104,8 @@ CRC_TABLE = bytes([
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
])
RFCOMM_DEFAULT_WINDOW_SIZE = 16
RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_WINDOW_SIZE = 7
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
@@ -130,29 +117,33 @@ RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
# -----------------------------------------------------------------------------
def make_service_sdp_records(
service_record_handle: int, channel: int, uuid: Optional[UUID] = None
) -> List[ServiceAttribute]:
) -> List[sdp.ServiceAttribute]:
"""
Create SDP records for an RFComm service given a channel number and an
optional UUID. A Service Class Attribute is included only if the UUID is not None.
"""
records = [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(service_record_handle),
sdp.ServiceAttribute(
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_32(service_record_handle),
),
ServiceAttribute(
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
sdp.ServiceAttribute(
sdp.SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[sdp.DataElement.uuid(sdp.SDP_PUBLIC_BROWSE_ROOT)]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
sdp.ServiceAttribute(
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
DataElement.sequence(
sdp.DataElement.sequence(
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
),
sdp.DataElement.sequence(
[
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
DataElement.unsigned_integer_8(channel),
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
sdp.DataElement.unsigned_integer_8(channel),
]
),
]
@@ -162,15 +153,81 @@ def make_service_sdp_records(
if uuid:
records.append(
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence([DataElement.uuid(uuid)]),
sdp.ServiceAttribute(
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence([sdp.DataElement.uuid(uuid)]),
)
)
return records
# -----------------------------------------------------------------------------
async def find_rfcomm_channels(connection: Connection) -> Dict[int, List[UUID]]:
"""Searches all RFCOMM channels and their associated UUID from SDP service records.
Args:
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
"""
results = {}
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[core.BT_RFCOMM_PROTOCOL_ID],
attribute_ids=[
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
],
)
for attribute_lists in search_result:
service_classes: List[UUID] = []
channel: Optional[int] = None
for attribute in attribute_lists:
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
protocol_descriptor_list = attribute.value.value
channel = protocol_descriptor_list[1].value[1].value
elif attribute.id == sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID:
service_class_id_list = attribute.value.value
service_classes = [
service_class.value for service_class in service_class_id_list
]
if not service_classes or not channel:
logger.warning(f"Bad result {attribute_lists}.")
else:
results[channel] = service_classes
return results
# -----------------------------------------------------------------------------
async def find_rfcomm_channel_with_uuid(
connection: Connection, uuid: str | UUID
) -> Optional[int]:
"""Searches an RFCOMM channel associated with given UUID from service records.
Args:
connection: ACL connection to make SDP search.
uuid: UUID of service record to search for.
Returns:
RFCOMM channel number if found, otherwise None.
"""
if isinstance(uuid, str):
uuid = UUID(uuid)
return next(
(
channel
for channel, class_id_list in (
await find_rfcomm_channels(connection)
).items()
if uuid in class_id_list
),
None,
)
# -----------------------------------------------------------------------------
def compute_fcs(buffer: bytes) -> int:
result = 0xFF
@@ -183,7 +240,7 @@ def compute_fcs(buffer: bytes) -> int:
class RFCOMM_Frame:
def __init__(
self,
frame_type: int,
frame_type: FrameType,
c_r: int,
dlci: int,
p_f: int,
@@ -206,14 +263,11 @@ class RFCOMM_Frame:
self.length = bytes([(length << 1) | 1])
self.address = (dlci << 2) | (c_r << 1) | 1
self.control = frame_type | (p_f << 4)
if frame_type == RFCOMM_UIH_FRAME:
if frame_type == FrameType.UIH:
self.fcs = compute_fcs(bytes([self.address, self.control]))
else:
self.fcs = compute_fcs(bytes([self.address, self.control]) + self.length)
def type_name(self) -> str:
return RFCOMM_FRAME_TYPE_NAMES[self.type]
@staticmethod
def parse_mcc(data) -> Tuple[int, bool, bytes]:
mcc_type = data[0] >> 2
@@ -237,24 +291,24 @@ class RFCOMM_Frame:
@staticmethod
def sabm(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_SABM_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.SABM, c_r, dlci, 1)
@staticmethod
def ua(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_UA_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.UA, c_r, dlci, 1)
@staticmethod
def dm(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_DM_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.DM, c_r, dlci, 1)
@staticmethod
def disc(c_r: int, dlci: int):
return RFCOMM_Frame(RFCOMM_DISC_FRAME, c_r, dlci, 1)
return RFCOMM_Frame(FrameType.DISC, c_r, dlci, 1)
@staticmethod
def uih(c_r: int, dlci: int, information: bytes, p_f: int = 0):
return RFCOMM_Frame(
RFCOMM_UIH_FRAME, c_r, dlci, p_f, information, with_credits=(p_f == 1)
FrameType.UIH, c_r, dlci, p_f, information, with_credits=(p_f == 1)
)
@staticmethod
@@ -262,7 +316,7 @@ class RFCOMM_Frame:
# Extract fields
dlci = (data[0] >> 2) & 0x3F
c_r = (data[0] >> 1) & 0x01
frame_type = data[1] & 0xEF
frame_type = FrameType(data[1] & 0xEF)
p_f = (data[1] >> 4) & 0x01
length = data[2]
if length & 0x01:
@@ -291,7 +345,7 @@ class RFCOMM_Frame:
def __str__(self) -> str:
return (
f'{color(self.type_name(), "yellow")}'
f'{color(self.type.name, "yellow")}'
f'(c/r={self.c_r},'
f'dlci={self.dlci},'
f'p/f={self.p_f},'
@@ -301,6 +355,7 @@ class RFCOMM_Frame:
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class RFCOMM_MCC_PN:
dlci: int
cl: int
@@ -310,23 +365,11 @@ class RFCOMM_MCC_PN:
max_retransmissions: int
window_size: int
def __init__(
self,
dlci: int,
cl: int,
priority: int,
ack_timer: int,
max_frame_size: int,
max_retransmissions: int,
window_size: int,
) -> None:
self.dlci = dlci
self.cl = cl
self.priority = priority
self.ack_timer = ack_timer
self.max_frame_size = max_frame_size
self.max_retransmissions = max_retransmissions
self.window_size = window_size
def __post_init__(self) -> None:
if self.window_size < 1 or self.window_size > 7:
logger.warning(
f'Error Recovery Window size {self.window_size} is out of range [1, 7].'
)
@staticmethod
def from_bytes(data: bytes) -> RFCOMM_MCC_PN:
@@ -337,7 +380,7 @@ class RFCOMM_MCC_PN:
ack_timer=data[3],
max_frame_size=data[4] | data[5] << 8,
max_retransmissions=data[6],
window_size=data[7],
window_size=data[7] & 0x07,
)
def __bytes__(self) -> bytes:
@@ -350,23 +393,14 @@ class RFCOMM_MCC_PN:
self.max_frame_size & 0xFF,
(self.max_frame_size >> 8) & 0xFF,
self.max_retransmissions & 0xFF,
self.window_size & 0xFF,
# Only 3 bits are meaningful.
self.window_size & 0x07,
]
)
def __str__(self) -> str:
return (
f'PN(dlci={self.dlci},'
f'cl={self.cl},'
f'priority={self.priority},'
f'ack_timer={self.ack_timer},'
f'max_frame_size={self.max_frame_size},'
f'max_retransmissions={self.max_retransmissions},'
f'window_size={self.window_size})'
)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class RFCOMM_MCC_MSC:
dlci: int
fc: int
@@ -375,16 +409,6 @@ class RFCOMM_MCC_MSC:
ic: int
dv: int
def __init__(
self, dlci: int, fc: int, rtc: int, rtr: int, ic: int, dv: int
) -> None:
self.dlci = dlci
self.fc = fc
self.rtc = rtc
self.rtr = rtr
self.ic = ic
self.dv = dv
@staticmethod
def from_bytes(data: bytes) -> RFCOMM_MCC_MSC:
return RFCOMM_MCC_MSC(
@@ -409,16 +433,6 @@ class RFCOMM_MCC_MSC:
]
)
def __str__(self) -> str:
return (
f'MSC(dlci={self.dlci},'
f'fc={self.fc},'
f'rtc={self.rtc},'
f'rtr={self.rtr},'
f'ic={self.ic},'
f'dv={self.dv})'
)
# -----------------------------------------------------------------------------
class DLC(EventEmitter):
@@ -460,7 +474,7 @@ class DLC(EventEmitter):
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
self.mtu = min(
max_frame_size, self.multiplexer.l2cap_channel.mtu - max_overhead
max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
)
def change_state(self, new_state: State) -> None:
@@ -471,7 +485,7 @@ class DLC(EventEmitter):
self.multiplexer.send_frame(frame)
def on_frame(self, frame: RFCOMM_Frame) -> None:
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler = getattr(self, f'on_{frame.type.name}_frame'.lower())
handler(frame)
def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
@@ -485,9 +499,7 @@ class DLC(EventEmitter):
# Exchange the modem status with the peer
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(
mcc_type=RFCOMM_MCC_MSC_TYPE, c_r=1, data=bytes(msc)
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
@@ -503,9 +515,7 @@ class DLC(EventEmitter):
# Exchange the modem status with the peer
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(
mcc_type=RFCOMM_MCC_MSC_TYPE, c_r=1, data=bytes(msc)
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
@@ -559,9 +569,7 @@ class DLC(EventEmitter):
# Command
logger.debug(f'<<< MCC MSC Command: {msc}')
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(
mcc_type=RFCOMM_MCC_MSC_TYPE, c_r=0, data=bytes(msc)
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=0, data=bytes(msc))
logger.debug(f'>>> MCC MSC Response: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
else:
@@ -589,7 +597,7 @@ class DLC(EventEmitter):
max_retransmissions=0,
window_size=self.window_size,
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=0, data=bytes(pn))
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn))
logger.debug(f'>>> PN Response: {pn}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
self.change_state(DLC.State.CONNECTING)
@@ -711,7 +719,7 @@ class Multiplexer(EventEmitter):
if frame.dlci == 0:
self.on_frame(frame)
else:
if frame.type == RFCOMM_DM_FRAME:
if frame.type == FrameType.DM:
# DM responses are for a DLCI, but since we only create the dlc when we
# receive a PN response (because we need the parameters), we handle DM
# frames at the Multiplexer level
@@ -724,7 +732,7 @@ class Multiplexer(EventEmitter):
dlc.on_frame(frame)
def on_frame(self, frame: RFCOMM_Frame) -> None:
handler = getattr(self, f'on_{frame.type_name()}_frame'.lower())
handler = getattr(self, f'on_{frame.type.name}_frame'.lower())
handler(frame)
def on_sabm_frame(self, _frame: RFCOMM_Frame) -> None:
@@ -772,10 +780,10 @@ class Multiplexer(EventEmitter):
def on_uih_frame(self, frame: RFCOMM_Frame) -> None:
(mcc_type, c_r, value) = RFCOMM_Frame.parse_mcc(frame.information)
if mcc_type == RFCOMM_MCC_PN_TYPE:
if mcc_type == MccType.PN:
pn = RFCOMM_MCC_PN.from_bytes(value)
self.on_mcc_pn(c_r, pn)
elif mcc_type == RFCOMM_MCC_MSC_TYPE:
elif mcc_type == MccType.MSC:
mcs = RFCOMM_MCC_MSC.from_bytes(value)
self.on_mcc_msc(c_r, mcs)
@@ -871,7 +879,7 @@ class Multiplexer(EventEmitter):
max_retransmissions=0,
window_size=window_size,
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=RFCOMM_MCC_PN_TYPE, c_r=1, data=bytes(pn))
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=1, data=bytes(pn))
logger.debug(f'>>> Sending MCC: {pn}')
self.open_result = asyncio.get_running_loop().create_future()
self.change_state(Multiplexer.State.OPENING)
@@ -901,8 +909,11 @@ class Client:
multiplexer: Optional[Multiplexer]
l2cap_channel: Optional[l2cap.ClassicChannel]
def __init__(self, connection: Connection) -> None:
def __init__(
self, connection: Connection, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None:
self.connection = connection
self.l2cap_mtu = l2cap_mtu
self.l2cap_channel = None
self.multiplexer = None
@@ -910,7 +921,7 @@ class Client:
# Create a new L2CAP connection
try:
self.l2cap_channel = await self.connection.create_l2cap_channel(
spec=l2cap.ClassicChannelSpec(RFCOMM_PSM)
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=self.l2cap_mtu)
)
except ProtocolError as error:
logger.warning(f'L2CAP connection failed: {error}')
@@ -933,22 +944,33 @@ class Client:
self.multiplexer = None
# Close the L2CAP channel
# TODO
if self.l2cap_channel:
await self.l2cap_channel.disconnect()
self.l2cap_channel = None
async def __aenter__(self) -> Multiplexer:
return await self.start()
async def __aexit__(self, *args) -> None:
await self.shutdown()
# -----------------------------------------------------------------------------
class Server(EventEmitter):
acceptors: Dict[int, Callable[[DLC], None]]
def __init__(self, device: Device) -> None:
def __init__(
self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None:
super().__init__()
self.device = device
self.multiplexer = None
self.acceptors = {}
# Register ourselves with the L2CAP channel manager
device.create_l2cap_server(
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM), handler=self.on_connection
self.l2cap_server = device.create_l2cap_server(
spec=l2cap.ClassicChannelSpec(psm=RFCOMM_PSM, mtu=l2cap_mtu),
handler=self.on_connection,
)
def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int:
@@ -998,3 +1020,9 @@ class Server(EventEmitter):
acceptor = self.acceptors.get(dlc.dlci >> 1)
if acceptor:
acceptor(dlc)
def __enter__(self) -> Self:
return self
def __exit__(self, *args) -> None:
self.l2cap_server.close()

View File

@@ -19,6 +19,7 @@ from __future__ import annotations
import logging
import struct
from typing import Dict, List, Type, Optional, Tuple, Union, NewType, TYPE_CHECKING
from typing_extensions import Self
from . import core, l2cap
from .colors import color
@@ -824,11 +825,13 @@ class Client:
)
attribute_id_list = DataElement.sequence(
[
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
(
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
for attribute_id in attribute_ids
]
)
@@ -880,11 +883,13 @@ class Client:
attribute_id_list = DataElement.sequence(
[
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
(
DataElement.unsigned_integer(
attribute_id[0], value_size=attribute_id[1]
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
)
if isinstance(attribute_id, tuple)
else DataElement.unsigned_integer_16(attribute_id)
for attribute_id in attribute_ids
]
)
@@ -920,6 +925,13 @@ class Client:
return ServiceAttribute.list_from_data_elements(attribute_list_sequence.value)
async def __aenter__(self) -> Self:
await self.connect()
return self
async def __aexit__(self, *args) -> None:
await self.disconnect()
# -----------------------------------------------------------------------------
class Server:

View File

@@ -737,9 +737,9 @@ class Session:
# Create a future that can be used to wait for the session to complete
if self.is_initiator:
self.pairing_result: Optional[
asyncio.Future[None]
] = asyncio.get_running_loop().create_future()
self.pairing_result: Optional[asyncio.Future[None]] = (
asyncio.get_running_loop().create_future()
)
else:
self.pairing_result = None
@@ -1993,10 +1993,8 @@ class Manager(EventEmitter):
) -> None:
# Store the keys in the key store
if self.device.keystore and identity_address is not None:
self.device.abort_on(
'flush', self.device.update_keys(str(identity_address), keys)
)
# Make sure on_pairing emits after key update.
await self.device.update_keys(str(identity_address), keys)
# Notify the device
self.device.on_pairing(session.connection, identity_address, keys, session.sc)

View File

@@ -59,15 +59,13 @@ class TransportLostError(Exception):
# Typing Protocols
# -----------------------------------------------------------------------------
class TransportSink(Protocol):
def on_packet(self, packet: bytes) -> None:
...
def on_packet(self, packet: bytes) -> None: ...
class TransportSource(Protocol):
terminated: asyncio.Future[None]
def set_packet_sink(self, sink: TransportSink) -> None:
...
def set_packet_sink(self, sink: TransportSink) -> None: ...
# -----------------------------------------------------------------------------
@@ -168,11 +166,13 @@ class PacketReader:
def __init__(self, source: io.BufferedReader) -> None:
self.source = source
self.at_end = False
def next_packet(self) -> Optional[bytes]:
# Get the packet type
packet_type = self.source.read(1)
if len(packet_type) != 1:
self.at_end = True
return None
# Get the packet info based on its type

View File

@@ -23,11 +23,24 @@ import time
import usb.core
import usb.util
from typing import Optional
from usb.core import Device as UsbDevice
from usb.core import USBError
from usb.util import CTRL_TYPE_CLASS, CTRL_RECIPIENT_OTHER
from usb.legacy import REQ_SET_FEATURE, REQ_CLEAR_FEATURE, CLASS_HUB
from .common import Transport, ParserSource
from .. import hci
from ..colors import color
# -----------------------------------------------------------------------------
# Constant
# -----------------------------------------------------------------------------
USB_PORT_FEATURE_POWER = 8
POWER_CYCLE_DELAY = 1
RESET_DELAY = 3
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
@@ -113,9 +126,10 @@ async def open_pyusb_transport(spec: str) -> Transport:
self.loop.call_soon_threadsafe(self.stop_event.set)
class UsbPacketSource(asyncio.Protocol, ParserSource):
def __init__(self, device, sco_enabled):
def __init__(self, device, metadata, sco_enabled):
super().__init__()
self.device = device
self.metadata = metadata
self.loop = asyncio.get_running_loop()
self.queue = asyncio.Queue()
self.dequeue_task = None
@@ -213,9 +227,22 @@ async def open_pyusb_transport(spec: str) -> Transport:
usb_find = libusb_package.find
# Find the device according to the spec moniker
power_cycle = False
if spec.startswith('!'):
power_cycle = True
spec = spec[1:]
if ':' in spec:
vendor_id, product_id = spec.split(':')
device = usb_find(idVendor=int(vendor_id, 16), idProduct=int(product_id, 16))
elif '-' in spec:
def device_path(device):
if device.port_numbers:
return f'{device.bus}-{".".join(map(str, device.port_numbers))}'
else:
return str(device.bus)
device = usb_find(custom_match=lambda device: device_path(device) == spec)
else:
device_index = int(spec)
devices = list(
@@ -235,6 +262,17 @@ async def open_pyusb_transport(spec: str) -> Transport:
raise ValueError('device not found')
logger.debug(f'USB Device: {device}')
# Power Cycle the device
if power_cycle:
try:
device = await _power_cycle(device) # type: ignore
except Exception as e:
logging.debug(e)
logging.info(f"Unable to power cycle {hex(device.idVendor)} {hex(device.idProduct)}") # type: ignore
# Collect the metadata
device_metadata = {'vendor_id': device.idVendor, 'product_id': device.idProduct}
# Detach the kernel driver if needed
if device.is_kernel_driver_active(0):
logger.debug("detaching kernel driver")
@@ -289,9 +327,79 @@ async def open_pyusb_transport(spec: str) -> Transport:
# except usb.USBError:
# logger.warning('failed to set alternate setting')
packet_source = UsbPacketSource(device, sco_enabled)
packet_source = UsbPacketSource(device, device_metadata, sco_enabled)
packet_sink = UsbPacketSink(device)
packet_source.start()
packet_sink.start()
return UsbTransport(device, packet_source, packet_sink)
async def _power_cycle(device: UsbDevice) -> UsbDevice:
"""
For devices connected to compatible USB hubs: Performs a power cycle on a given USB device.
This involves temporarily disabling its port on the hub and then re-enabling it.
"""
device_path = f'{device.bus}-{".".join(map(str, device.port_numbers))}' # type: ignore
hub = _find_hub_by_device_path(device_path)
if hub:
try:
device_port = device.port_numbers[-1] # type: ignore
_set_port_status(hub, device_port, False)
await asyncio.sleep(POWER_CYCLE_DELAY)
_set_port_status(hub, device_port, True)
await asyncio.sleep(RESET_DELAY)
# Device needs to be find again otherwise it will appear as disconnected
return usb.core.find(idVendor=device.idVendor, idProduct=device.idProduct) # type: ignore
except USBError as e:
logger.error(f"Adjustment needed: Please revise the udev rule for device {hex(device.idVendor)}:{hex(device.idProduct)} for proper recognition.") # type: ignore
logger.error(e)
return device
def _set_port_status(device: UsbDevice, port: int, on: bool):
"""Sets the power status of a specific port on a USB hub."""
device.ctrl_transfer(
bmRequestType=CTRL_TYPE_CLASS | CTRL_RECIPIENT_OTHER,
bRequest=REQ_SET_FEATURE if on else REQ_CLEAR_FEATURE,
wIndex=port,
wValue=USB_PORT_FEATURE_POWER,
)
def _find_device_by_path(sys_path: str) -> Optional[UsbDevice]:
"""Finds a USB device based on its system path."""
bus_num, *port_parts = sys_path.split('-')
ports = [int(port) for port in port_parts[0].split('.')]
devices = usb.core.find(find_all=True, bus=int(bus_num))
if devices:
for device in devices:
if device.bus == int(bus_num) and list(device.port_numbers) == ports: # type: ignore
return device
return None
def _find_hub_by_device_path(sys_path: str) -> Optional[UsbDevice]:
"""Finds the USB hub associated with a specific device path."""
hub_sys_path = sys_path.rsplit('.', 1)[0]
hub_device = _find_device_by_path(hub_sys_path)
if hub_device is None:
return None
else:
return hub_device if _is_hub(hub_device) else None
def _is_hub(device: UsbDevice) -> bool:
"""Checks if a USB device is a hub"""
if device.bDeviceClass == CLASS_HUB: # type: ignore
return True
for config in device:
for interface in config:
if interface.bInterfaceClass == CLASS_HUB: # type: ignore
return True
return False

View File

@@ -18,6 +18,7 @@
from __future__ import annotations
import asyncio
import logging
import socket
from .common import Transport, StreamPacketSource
@@ -28,6 +29,13 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# A pass-through function to ease mock testing.
async def _create_server(*args, **kw_args):
await asyncio.get_running_loop().create_server(*args, **kw_args)
async def open_tcp_server_transport(spec: str) -> Transport:
'''
Open a TCP server transport.
@@ -38,7 +46,22 @@ async def open_tcp_server_transport(spec: str) -> Transport:
Example: _:9001
'''
local_host, local_port = spec.split(':')
return await _open_tcp_server_transport_impl(
host=local_host if local_host != '_' else None, port=int(local_port)
)
async def open_tcp_server_transport_with_socket(sock: socket.socket) -> Transport:
'''
Open a TCP server transport with an existing socket.
One reason to use this variant is to let python pick an unused port.
'''
return await _open_tcp_server_transport_impl(sock=sock)
async def _open_tcp_server_transport_impl(**kwargs) -> Transport:
class TcpServerTransport(Transport):
async def close(self):
await super().close()
@@ -77,13 +100,10 @@ async def open_tcp_server_transport(spec: str) -> Transport:
else:
logger.debug('no client, dropping packet')
local_host, local_port = spec.split(':')
packet_source = StreamPacketSource()
packet_sink = TcpServerPacketSink()
await asyncio.get_running_loop().create_server(
lambda: TcpServerProtocol(packet_source, packet_sink),
host=local_host if local_host != '_' else None,
port=int(local_port),
await _create_server(
lambda: TcpServerProtocol(packet_source, packet_sink), **kwargs
)
return TcpServerTransport(packet_source, packet_sink)

View File

@@ -396,6 +396,16 @@ async def open_usb_transport(spec: str) -> Transport:
break
device_index -= 1
device.close()
elif '-' in spec:
def device_path(device):
return f'{device.getBusNumber()}-{".".join(map(str, device.getPortNumberList()))}'
for device in context.getDeviceIterator(skip_on_error=True):
if device_path(device) == spec:
found = device
break
device.close()
else:
# Look for a compatible device by index
def device_is_bluetooth_hci(device):
@@ -439,7 +449,7 @@ async def open_usb_transport(spec: str) -> Transport:
# Look for the first interface with the right class and endpoints
def find_endpoints(device):
# pylint: disable-next=too-many-nested-blocks
for (configuration_index, configuration) in enumerate(device):
for configuration_index, configuration in enumerate(device):
interface = None
for interface in configuration:
setting = None

View File

@@ -117,12 +117,12 @@ class EventWatcher:
self.handlers = []
@overload
def on(self, emitter: EventEmitter, event: str) -> Callable[[_Handler], _Handler]:
...
def on(
self, emitter: EventEmitter, event: str
) -> Callable[[_Handler], _Handler]: ...
@overload
def on(self, emitter: EventEmitter, event: str, handler: _Handler) -> _Handler:
...
def on(self, emitter: EventEmitter, event: str, handler: _Handler) -> _Handler: ...
def on(
self, emitter: EventEmitter, event: str, handler: Optional[_Handler] = None
@@ -144,12 +144,14 @@ class EventWatcher:
return wrapper if handler is None else wrapper(handler)
@overload
def once(self, emitter: EventEmitter, event: str) -> Callable[[_Handler], _Handler]:
...
def once(
self, emitter: EventEmitter, event: str
) -> Callable[[_Handler], _Handler]: ...
@overload
def once(self, emitter: EventEmitter, event: str, handler: _Handler) -> _Handler:
...
def once(
self, emitter: EventEmitter, event: str, handler: _Handler
) -> _Handler: ...
def once(
self, emitter: EventEmitter, event: str, handler: Optional[_Handler] = None
@@ -226,13 +228,13 @@ class CompositeEventEmitter(AbortableEventEmitter):
if self._listener:
# Call the deregistration methods for each base class that has them
for cls in self._listener.__class__.mro():
if hasattr(cls, '_bumble_register_composite'):
cls._bumble_deregister_composite(listener, self)
if '_bumble_register_composite' in cls.__dict__:
cls._bumble_deregister_composite(self._listener, self)
self._listener = listener
if listener:
# Call the registration methods for each base class that has them
for cls in listener.__class__.mro():
if hasattr(cls, '_bumble_deregister_composite'):
if '_bumble_deregister_composite' in cls.__dict__:
cls._bumble_register_composite(listener, self)

View File

@@ -12,12 +12,25 @@ a host that send custom HCI commands that the controller may not understand.
```
python hci_bridge.py <host-transport-spec> <controller-transport-spec> [command-short-circuit-list]
```
The command-short-circuit-list field is specified by a series of comma separated Opcode Group
Field (OGF) : OpCode Command Field (OCF) pairs. The OGF/OCF values are specified in the Blutooth
core specification.
For the commands that are listed in the short-circuit-list, the HCI bridge will always generate
a Command Complete Event for the specified op code. The return parameter will be HCI_SUCCESS.
This feature can only be used for commands that return Command Complete. Other events will not be
generated by the HCI bridge tool.
!!! example "UDP to Serial"
```
python hci_bridge.py udp:0.0.0.0:9000,127.0.0.1:9001 serial:/dev/tty.usbmodem0006839912171,1000000 0x3f:0x0070,0x3f:0x0074,0x3f:0x0077,0x3f:0x0078
```
In this example, the short circuit list is specified to respond to the Vendor-specific Opcode Group
Field (0x3f) commands 0x70, 0x74, 0x77, 0x78 with Command Complete. The short circuit list can be
used where the Host uses some HCI commands that are not supported/implemented by the Controller.
!!! example "PTY to Link Relay"
```
python hci_bridge.py serial:emulated_uart_pty,1000000 link-relay:ws://127.0.0.1:10723/test
@@ -28,3 +41,4 @@ a host that send custom HCI commands that the controller may not understand.
(through which the communication with other virtual controllers will be mediated).
NOTE: this assumes you're running a Link Relay on port `10723`.

View File

@@ -10,7 +10,7 @@ used with particular HCI controller.
When the transport for an HCI controller is instantiated from a transport name,
a driver may also be forced by specifying ``driver=<driver-name>`` in the optional
metadata portion of the transport name. For example,
``usb:[driver=-rtk]0`` indicates that the ``rtk`` driver should be used with the
``usb:[driver=rtk]0`` indicates that the ``rtk`` driver should be used with the
first USB device, even if a normal probe would not have selected it based on the
USB vendor ID and product ID.

View File

@@ -10,6 +10,7 @@ The moniker for a USB transport is either:
* `usb:<vendor>:<product>`
* `usb:<vendor>:<product>/<serial-number>`
* `usb:<vendor>:<product>#<index>`
* `usb:<bus>-<port_numbers>`
with `<index>` as a 0-based index (0 being the first one) to select amongst all the matching devices when there are more than one.
In the `usb:<index>` form, matching devices are the ones supporting Bluetooth HCI, as declared by their Class, Subclass and Protocol.
@@ -17,6 +18,8 @@ In the `usb:<vendor>:<product>#<index>` form, matching devices are the ones with
`<vendor>` and `<product>` are a vendor ID and product ID in hexadecimal.
with `<port_numbers>` as a list of all port numbers from root separated with dots `.`
In addition, if the moniker ends with the symbol "!", the device will be used in "forced" mode:
the first USB interface of the device will be used, regardless of the interface class/subclass.
This may be useful for some devices that use a custom class/subclass but may nonetheless work as-is.
@@ -37,6 +40,9 @@ This may be useful for some devices that use a custom class/subclass but may non
`usb:0B05:17CB!`
The BT USB dongle vendor=0B05 and product=17CB, in "forced" mode.
`usb:3-3.4.1`
The BT USB dongle on bus 3 on port path 3, 4, 1.
## Alternative
The library includes two different implementations of the USB transport, implemented using different python bindings for `libusb`.

View File

@@ -25,6 +25,7 @@ from bumble.utils import AsyncRunner
my_work_queue1 = AsyncRunner.WorkQueue()
my_work_queue2 = AsyncRunner.WorkQueue(create_task=False)
# -----------------------------------------------------------------------------
@AsyncRunner.run_in_task()
async def func1(x, y):
@@ -60,7 +61,7 @@ async def func4(x, y):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
print("MAIN: start, loop=", asyncio.get_running_loop())
print("MAIN: invoke func1")
func1(1, 2)

View File

@@ -21,23 +21,29 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Battery Service to the GATT sever
battery_service = BatteryService(lambda _: random.randint(0, 100))

View File

@@ -21,12 +21,13 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print(
'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
@@ -35,11 +36,16 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService(
@@ -64,7 +66,7 @@ async def main():
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -21,23 +21,29 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Keep track of accumulated expended energy
energy_start_time = time.time()

View File

@@ -1,79 +1,132 @@
<html>
<head>
<style>
* {
font-family: sans-serif;
}
<html data-bs-theme="dark">
label {
display: block;
}
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous">
</head>
<body>
<nav class="navbar navbar-dark bg-primary">
<div class="container">
<span class="navbar-brand mb-0 h1">Bumble Handsfree</span>
</div>
</nav>
<br>
<div class="container">
<label class="form-label">Server Port</label>
<div class="input-group mb-3">
<input type="text" class="form-control" aria-label="Port Number" value="8989" id="port">
<button class="btn btn-primary" type="button" onclick="connect()">Connect</button>
</div>
<label class="form-label">Dial Phone Number</label>
<div class="input-group mb-3">
<input type="text" class="form-control" placeholder="Phone Number" aria-label="Phone Number"
id="dial_number">
<button class="btn btn-primary" type="button"
onclick="send_at_command(`ATD${dialNumberInput.value}`)">Dial</button>
</div>
<label class="form-label">Send AT Command</label>
<div class="input-group mb-3">
<input type="text" class="form-control" placeholder="AT Command" aria-label="AT command" id="at_command">
<button class="btn btn-primary" type="button"
onclick="send_at_command(document.getElementById('at_command').value)">Send</button>
</div>
<div class="row">
<div class="col-auto">
<label class="form-label">Battery Level</label>
<div class="input-group mb-3">
<input type="text" class="form-control" placeholder="0 - 100" aria-label="Battery Level"
id="battery_level">
<button class="btn btn-primary" type="button"
onclick="send_at_command(`AT+BIEV=2,${document.getElementById('battery_level').value}`)">Set</button>
</div>
</div>
<div class="col-auto">
<label class="form-label">Speaker Volume</label>
<div class="input-group mb-3 col-auto">
<input type="text" class="form-control" placeholder="0 - 15" aria-label="Speaker Volume"
id="speaker_volume">
<button class="btn btn-primary" type="button"
onclick="send_at_command(`AT+VGS=${document.getElementById('speaker_volume').value}`)">Set</button>
</div>
</div>
<div class="col-auto">
<label class="form-label">Mic Volume</label>
<div class="input-group mb-3 col-auto">
<input type="text" class="form-control" placeholder="0 - 15" aria-label="Mic Volume"
id="mic_volume">
<button class="btn btn-primary" type="button"
onclick="send_at_command(`AT+VGM=${document.getElementById('mic_volume').value}`)">Set</button>
</div>
</div>
</div>
<button class="btn btn-primary" onclick="send_at_command('ATA')">Answer</button>
<button class="btn btn-primary" onclick="send_at_command('AT+CHUP')">Hang Up</button>
<button class="btn btn-primary" onclick="send_at_command('AT+BLDN')">Redial</button>
<button class="btn btn-primary" onclick="send({ type: 'query_call'})">Get Call Status</button>
<br><br>
<button class="btn btn-primary" onclick="send_at_command('AT+BVRA=1')">Start Voice Assistant</button>
<button class="btn btn-primary" onclick="send_at_command('AT+BVRA=0')">Stop Voice Assistant</button>
input, label {
margin: .4rem 0;
}
</style>
</head>
<body>
Server Port <input id="port" type="text" value="8989"></input> <button onclick="connect()">Connect</button><br>
AT Command <input type="text" id="at_command" required size="10"> <button onclick="send_at_command()">Send</button><br>
Dial Phone Number <input type="text" id="dial_number" required size="10"> <button onclick="dial()">Dial</button><br>
<button onclick="answer()">Answer</button>
<button onclick="hangup()">Hang Up</button>
<button onclick="start_voice_assistant()">Start Voice Assistant</button>
<button onclick="stop_voice_assistant()">Stop Voice Assistant</button>
<hr>
<div id="socketState"></div>
<script>
<div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2">
<h3>Log</h3>
<code id="log" style="white-space: pre-line;"></code>
</div>
</div>
<script>
let portInput = document.getElementById("port")
let atCommandInput = document.getElementById("at_command")
let dialNumberInput = document.getElementById("dial_number")
let socketState = document.getElementById("socketState")
let log = document.getElementById("log")
let socket
function connect() {
socket = new WebSocket(`ws://localhost:${portInput.value}`);
socket.onopen = _ => {
socketState.innerText = 'OPEN'
log.textContent += 'OPEN\n'
}
socket.onclose = _ => {
socketState.innerText = 'CLOSED'
log.textContent += 'CLOSED\n'
}
socket.onerror = (error) => {
socketState.innerText = 'ERROR'
log.textContent += 'ERROR\n'
console.log(`ERROR: ${error}`)
}
socket.onmessage = (event) => {
log.textContent += `<-- ${event.data}\n`
let volume_state = JSON.parse(event.data)
volumeSetting.value = volume_state.volume_setting
changeCounter.value = volume_state.change_counter
muted.checked = volume_state.muted ? true : false
}
}
function send(message) {
if (socket && socket.readyState == WebSocket.OPEN) {
socket.send(JSON.stringify(message))
let jsonMessage = JSON.stringify(message)
log.textContent += `--> ${jsonMessage}\n`
socket.send(jsonMessage)
} else {
log.textContent += 'NOT CONNECTED\n'
}
}
function send_at_command() {
send({ type:'at_command', command: atCommandInput.value })
function send_at_command(command) {
send({ type: 'at_command', 'command': command })
}
</script>
</div>
</body>
function answer() {
send({ type:'at_command', command: 'ATA' })
}
function hangup() {
send({ type:'at_command', command: 'AT+CHUP' })
}
function dial() {
send({ type:'at_command', command: `ATD${dialNumberInput.value}` })
}
function start_voice_assistant() {
send(({ type:'at_command', command: 'AT+BVRA=1' }))
}
function stop_voice_assistant() {
send(({ type:'at_command', command: 'AT+BVRA=0' }))
}
</script>
</body>
</html>
</html>

View File

@@ -416,7 +416,7 @@ async def keyboard_device(device, command):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: python keyboard.py <device-config> <transport-spec> <command>'
@@ -434,9 +434,11 @@ async def main():
)
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
command = sys.argv[3]
if command == 'connect':

View File

@@ -2,5 +2,6 @@
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"address": "F0:F1:F2:F3:F4:FA",
"class_of_device": 2376708,
"advertising_interval": 100
}

View File

@@ -0,0 +1,9 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"address": "F0:F1:F2:F3:F4:FA",
"classic_enabled": true,
"cis_enabled": true,
"class_of_device": 2376708,
"advertising_interval": 100
}

View File

@@ -139,18 +139,20 @@ async def find_a2dp_service(connection):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Start the controller
@@ -187,7 +189,7 @@ async def main():
client = await AVDTP_Protocol.connect(connection, avdtp_version)
# Discover all endpoints on the remote device
endpoints = await client.discover_remote_endpoints()
endpoints = list(await client.discover_remote_endpoints())
print(f'@@@ Found {len(endpoints)} endpoints')
for endpoint in endpoints:
print('@@@', endpoint)

View File

@@ -19,6 +19,7 @@ import asyncio
import sys
import os
import logging
from typing import Any, Dict
from bumble.device import Device
from bumble.transport import open_transport_or_link
@@ -41,7 +42,7 @@ from bumble.a2dp import (
SbcMediaCodecInformation,
)
Context = {'output': None}
Context: Dict[Any, Any] = {'output': None}
# -----------------------------------------------------------------------------
@@ -104,7 +105,7 @@ def on_rtp_packet(packet):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
@@ -114,14 +115,16 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
with open(sys.argv[3], 'wb') as sbc_file:
Context['output'] = sbc_file
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the sink service
@@ -162,7 +165,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -74,7 +74,7 @@ def codec_capabilities():
# -----------------------------------------------------------------------------
def on_avdtp_connection(read_function, protocol):
packet_source = SbcPacketSource(
read_function, protocol.l2cap_channel.mtu, codec_capabilities()
read_function, protocol.l2cap_channel.peer_mtu, codec_capabilities()
)
packet_pump = MediaPacketPump(packet_source.packets)
protocol.add_source(packet_source.codec_capabilities, packet_pump)
@@ -98,7 +98,7 @@ async def stream_packets(read_function, protocol):
# Stream the packets
packet_source = SbcPacketSource(
read_function, protocol.l2cap_channel.mtu, codec_capabilities()
read_function, protocol.l2cap_channel.peer_mtu, codec_capabilities()
)
packet_pump = MediaPacketPump(packet_source.packets)
source = protocol.add_source(packet_source.codec_capabilities, packet_pump)
@@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
@@ -126,11 +126,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the SRC service
@@ -186,7 +188,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -19,14 +19,16 @@ import asyncio
import logging
import sys
import os
import struct
from bumble.core import AdvertisingData
from bumble.device import AdvertisingType, Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]'
@@ -48,13 +50,25 @@ async def main():
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
AdvertisingData(
[
(AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340)),
]
)
)
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: python run_asha_sink.py <device-config> <transport-spec> '
@@ -60,8 +60,10 @@ async def main():
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Handler for audio control commands
def on_audio_control_point_write(_connection, value):
@@ -197,7 +199,7 @@ async def main():
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
@@ -341,11 +341,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the sink service

View File

@@ -22,10 +22,11 @@ import os
from bumble.device import (
Device,
Connection,
AdvertisingParameters,
AdvertisingEventProperties,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
@@ -61,12 +62,7 @@ async def main() -> None:
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
advertising_set = await devices[0].create_advertising_set()
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC

View File

@@ -32,7 +32,7 @@ from bumble.sdp import (
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_classic_connect.py <device-config> <transport-spec> '
@@ -42,11 +42,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.le_enabled = False
await device.power_on()

View File

@@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = {
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_classic_discoverable.py <device-config> <transport-spec>')
print('example: run_classic_discoverable.py classic1.json usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.sdp_service_records = SDP_SERVICE_RECORDS
await device.power_on()
@@ -111,7 +113,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,8 +20,8 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
from bumble.core import DeviceClass
@@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 2:
print('Usage: run_classic_discovery.py <transport-spec>')
print('example: run_classic_discovery.py usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.listener = DiscoveryListener()
await device.power_on()
await device.start_discovery()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> '
@@ -37,11 +37,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
# Connect to the peer
@@ -56,7 +58,7 @@ async def main():
print(f'!!! Encryption failed: {error}')
return
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: run_controller.py <controller-address> <device-config> '
@@ -49,7 +49,7 @@ async def main():
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[3]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -57,7 +57,10 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link
'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
)
controller1.random_address = sys.argv[1]
@@ -98,7 +101,7 @@ async def main():
await device.start_advertising()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,9 +20,9 @@ import asyncio
import sys
import os
from bumble.colors import color
from bumble.device import Device
from bumble.controller import Controller
from bumble.hci import Address
from bumble.link import LocalLink
from bumble.transport import open_transport_or_link
@@ -45,14 +45,14 @@ class ScannerListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 2:
print('Usage: run_controller.py <transport-spec>')
print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -60,22 +60,25 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link
'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
public_address='E0:E1:E2:E3:E4:E5',
)
controller1.address = 'E0:E1:E2:E3:E4:E5'
# Create a second controller using the same link
controller2 = Controller('C2', link=link)
# Create a device with a scanner listener
device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2
'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
)
device.listener = ScannerListener()
await device.power_on()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -98,13 +98,7 @@ async def main() -> None:
)
+ csis.get_advertising_data()
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await device.create_advertising_set(advertising_data=advertising_data)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]

View File

@@ -20,30 +20,36 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.snoop import BtSnooper
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>')
print('example: run_device_with_snooper.py usb:0 btsnoop.log')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
with open(sys.argv[2], "wb") as snoop_file:
device.host.snooper = BtSnooper(snoop_file)
await device.power_on()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -19,8 +19,13 @@ import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingType, Device
from bumble.hci import Address, HCI_LE_Set_Extended_Advertising_Parameters_Command
from bumble.device import (
AdvertisingParameters,
AdvertisingEventProperties,
AdvertisingType,
Device,
)
from bumble.hci import Address
from bumble.transport import open_transport_or_link
@@ -35,20 +40,16 @@ async def main() -> None:
return
if len(sys.argv) >= 4:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(
int(sys.argv[3])
)
advertising_properties = AdvertisingEventProperties.from_advertising_type(
AdvertisingType(int(sys.argv[3]))
)
else:
advertising_properties = (
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
)
advertising_properties = AdvertisingEventProperties()
if len(sys.argv) >= 5:
target = Address(sys.argv[4])
peer_address = Address(sys.argv[4])
else:
target = Address.ANY
peer_address = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
@@ -58,8 +59,11 @@ async def main() -> None:
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
await device.start_extended_advertising(
advertising_properties=advertising_properties, target=target
await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=advertising_properties,
peer_address=peer_address,
)
)
await hci_transport.source.terminated

View File

@@ -0,0 +1,99 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device
from bumble.hci import Address
from bumble.core import AdvertisingData
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_extended_advertiser_2.py <config-file> <transport-spec>')
print('example: run_extended_advertiser_2.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
if not device.supports_le_extended_advertising:
print("Device does not support extended advertising")
return
print("Max advertising sets:", device.host.number_of_supported_advertising_sets)
print(
"Max advertising data length:", device.host.maximum_advertising_data_length
)
if device.host.number_of_supported_advertising_sets >= 1:
advertising_data1 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 1".encode("utf-8"))]
)
set1 = await device.create_advertising_set(
advertising_data=bytes(advertising_data1),
)
print("Selected TX power 1:", set1.selected_tx_power)
advertising_data2 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 2".encode("utf-8"))]
)
if device.host.number_of_supported_advertising_sets >= 2:
set2 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F1"),
advertising_parameters=AdvertisingParameters(),
advertising_data=bytes(advertising_data2),
auto_start=False,
auto_restart=True,
)
print("Selected TX power 2:", set2.selected_tx_power)
await set2.start()
if device.host.number_of_supported_advertising_sets >= 3:
scan_response_data3 = AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, "Bumble 3".encode("utf-8"))]
)
set3 = await device.create_advertising_set(
random_address=Address("F0:F0:F0:F0:F0:F2"),
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False, is_scannable=True
)
),
scan_response_data=bytes(scan_response_data3),
)
print("Selected TX power 3:", set2.selected_tx_power)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -69,7 +69,7 @@ class Listener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_client.py <device-config> <transport-spec> '
@@ -79,11 +79,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
await device.power_on()

View File

@@ -19,21 +19,21 @@ import asyncio
import os
import logging
from bumble.colors import color
from bumble.core import ProtocolError
from bumble.controller import Controller
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.host import Host
from bumble.link import LocalLink
from bumble.gatt import (
Service,
Characteristic,
Descriptor,
show_services,
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_DEVICE_INFORMATION_SERVICE,
)
from bumble.gatt_client import show_services
# -----------------------------------------------------------------------------
@@ -43,7 +43,7 @@ class ServerListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
# Create a local link
link = LocalLink()
@@ -51,14 +51,18 @@ async def main():
client_controller = Controller("client controller", link=link)
client_host = Host()
client_host.controller = client_controller
client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host)
client_device = Device(
"client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host
)
await client_device.power_on()
# Setup a stack for the server
server_controller = Controller("server controller", link=link)
server_host = Host()
server_host.controller = server_controller
server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host)
server_device = Device(
"server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host
)
server_device.listener = ServerListener()
await server_device.power_on()

View File

@@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_server.py <device-config> <transport-spec> '
@@ -81,11 +81,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
# Add a few entries to the device's GATT server
@@ -146,7 +148,7 @@ async def main():
else:
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,123 +20,48 @@ import sys
import os
import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_HANDSFREE_SERVICE,
BT_RFCOMM_PROTOCOL_ID,
BT_BR_EDR_TRANSPORT,
)
from bumble import rfcomm, hfp
from bumble.hci import HCI_SynchronousDataPacket
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services that support the Handsfree Profile
search_result = await sdp_client.search_attributes(
[BT_HANDSFREE_SERVICE],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
def _default_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
print(color('==================================', 'blue'))
print(color('Handsfree Services:', 'yellow'))
rfcomm_channels = []
# pylint: disable-next=too-many-nested-blocks
for attribute_list in search_result:
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
print(color('SERVICE:', 'green'))
print(
color(' RFCOMM Channel:', 'cyan'),
protocol_descriptor.value[1].value,
)
rfcomm_channels.append(protocol_descriptor.value[1].value)
# List profiles
bluetooth_profile_descriptor_list = (
ServiceAttribute.find_attribute_in_list(
attribute_list,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
)
if bluetooth_profile_descriptor_list:
if bluetooth_profile_descriptor_list.value:
if (
bluetooth_profile_descriptor_list.value[0].type
== DataElement.SEQUENCE
):
bluetooth_profile_descriptors = (
bluetooth_profile_descriptor_list.value
)
else:
# Sometimes, instead of a list of lists, we just
# find a list. Fix that
bluetooth_profile_descriptors = [
bluetooth_profile_descriptor_list
]
print(color(' Profiles:', 'green'))
for (
bluetooth_profile_descriptor
) in bluetooth_profile_descriptors:
version_major = (
bluetooth_profile_descriptor.value[1].value >> 8
)
version_minor = (
bluetooth_profile_descriptor.value[1].value
& 0xFF
)
print(
' '
f'{bluetooth_profile_descriptor.value[0].value}'
f' - version {version_major}.{version_minor}'
)
# List service classes
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
print(color(' Service Classes:', 'green'))
for service_class_id in service_class_id_list.value:
print(' ', service_class_id.value)
await sdp_client.disconnect()
return rfcomm_channels
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
@@ -149,11 +74,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
await device.power_on()
@@ -164,13 +91,14 @@ async def main():
print(f'=== Connected to {connection.peer_address}!')
# Get a list of all the Handsfree services (should only be 1)
channels = await list_rfcomm_channels(device, connection)
if len(channels) == 0:
if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
print('!!! no service found')
return
# Pick the first one
channel = channels[0]
channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}')
# Request authentication
print('*** Authenticating...')
@@ -205,51 +133,9 @@ async def main():
device.host.on('sco_packet', on_sco)
# Protocol loop (just for testing at this point)
protocol = hfp.HfpProtocol(session)
while True:
line = await protocol.next_line()
ag_protocol = hfp.AgProtocol(session, _default_configuration())
if line.startswith('AT+BRSF='):
protocol.send_response_line('+BRSF: 30')
protocol.send_response_line('OK')
elif line.startswith('AT+CIND=?'):
protocol.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
)
protocol.send_response_line('OK')
elif line.startswith('AT+CIND?'):
protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
protocol.send_response_line('OK')
elif line.startswith('AT+CMER='):
protocol.send_response_line('OK')
elif line.startswith('AT+CHLD=?'):
protocol.send_response_line('+CHLD: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+BTRH?'):
protocol.send_response_line('+BTRH: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+CLIP='):
protocol.send_response_line('OK')
elif line.startswith('AT+VGS='):
protocol.send_response_line('OK')
elif line.startswith('AT+BIA='):
protocol.send_response_line('OK')
elif line.startswith('AT+BVRA='):
protocol.send_response_line(
'+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
)
elif line.startswith('AT+XEVENT='):
protocol.send_response_line('OK')
elif line.startswith('AT+XAPL='):
protocol.send_response_line('OK')
else:
print(color('UNSUPPORTED AT COMMAND', 'red'))
protocol.send_response_line('ERROR')
await hci_source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -16,68 +16,87 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import contextlib
import sys
import os
import logging
import json
import websockets
import functools
from typing import Optional
from bumble.device import Device
from bumble import rfcomm
from bumble import hci
from bumble.device import Device, Connection
from bumble.transport import open_transport_or_link
from bumble.rfcomm import Server as RfcommServer
from bumble import hfp
from bumble.hfp import HfProtocol
# -----------------------------------------------------------------------------
class UiServer:
protocol: Optional[HfProtocol] = None
async def start(self):
"""Start a Websocket server to receive events from a web page."""
async def serve(websocket, _path):
while True:
try:
message = await websocket.recv()
print('Received: ', str(message))
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'at_command':
if self.protocol is not None:
await self.protocol.execute_command(parsed['command'])
except websockets.exceptions.ConnectionClosedOK:
pass
# pylint: disable=no-member
await websockets.serve(serve, 'localhost', 8989)
ws: Optional[websockets.WebSocketServerProtocol] = None
hf_protocol: Optional[HfProtocol] = None
# -----------------------------------------------------------------------------
def on_dlc(dlc, configuration: hfp.Configuration):
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
print('*** DLC connected', dlc)
protocol = HfProtocol(dlc, configuration)
UiServer.protocol = protocol
asyncio.create_task(protocol.run())
global hf_protocol
hf_protocol = HfProtocol(dlc, configuration)
asyncio.create_task(hf_protocol.run())
def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol):
if connection == protocol.dlc.multiplexer.l2cap_channel.connection:
if link_type == hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.SCO_CVSD_D1
]
elif protocol.active_codec == hfp.AudioCodec.MSBC:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_MSBC_T2
]
elif protocol.active_codec == hfp.AudioCodec.CVSD:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S4
]
connection.abort_on(
'disconnection',
connection.device.send_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address, **esco_parameters.asdict()
)
),
)
handler = functools.partial(on_sco_request, protocol=hf_protocol)
dlc.multiplexer.l2cap_channel.connection.device.on('sco_request', handler)
dlc.multiplexer.l2cap_channel.once(
'close',
lambda: dlc.multiplexer.l2cap_channel.connection.device.remove_listener(
'sco_request', handler
),
)
def on_ag_indicator(indicator):
global ws
if ws:
asyncio.create_task(ws.send(str(indicator)))
hf_protocol.on('ag_indicator', on_ag_indicator)
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Hands-Free profile configuration.
# TODO: load configuration from file.
configuration = hfp.Configuration(
configuration = hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
@@ -97,11 +116,13 @@ async def main():
)
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register a server
rfcomm_server = RfcommServer(device)
rfcomm_server = rfcomm.Server(device)
# Listen for incoming DLC connections
channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration))
@@ -109,7 +130,9 @@ async def main():
# Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = {
0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
0x00010001: hfp.make_hf_sdp_records(
0x00010001, channel_number, configuration
)
}
# Let's go!
@@ -120,10 +143,32 @@ async def main():
await device.set_connectable(True)
# Start the UI websocket server to offer a few buttons and input boxes
ui_server = UiServer()
await ui_server.start()
async def serve(websocket: websockets.WebSocketServerProtocol, _path):
global ws
ws = websocket
async for message in websocket:
with contextlib.suppress(websockets.exceptions.ConnectionClosedOK):
print('Received: ', str(message))
await hci_source.wait_for_termination()
parsed = json.loads(message)
message_type = parsed['type']
if message_type == 'at_command':
if hf_protocol is not None:
response = str(
await hf_protocol.execute_command(
parsed['command'],
response_type=hfp.AtResponseType.MULTIPLE,
)
)
await websocket.send(response)
elif message_type == 'query_call':
if hf_protocol:
response = str(await hf_protocol.query_current_calls())
await websocket.send(response)
await websockets.serve(serve, 'localhost', 8989)
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -229,6 +229,7 @@ HID_REPORT_MAP = bytes( # Text String, 50 Octet Report Descriptor
# Default protocol mode set to report protocol
protocol_mode = Message.ProtocolMode.REPORT_PROTOCOL
# -----------------------------------------------------------------------------
def sdp_records():
service_record_handle = 0x00010002
@@ -427,6 +428,7 @@ class DeviceData:
# Device's live data - Mouse and Keyboard will be stored in this
deviceData = DeviceData()
# -----------------------------------------------------------------------------
async def keyboard_device(hid_device):
@@ -487,7 +489,7 @@ async def keyboard_device(hid_device):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
@@ -599,11 +601,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register HID device
@@ -740,7 +744,7 @@ async def main():
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_hid_host.py <device-config> <transport-spec> '
@@ -324,11 +324,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< CONNECTED')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create HID host and start it
@@ -557,7 +559,7 @@ async def main():
# Interrupt Channel
await hid_host.connect_interrupt_channel()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_notifier.py <device-config> <transport-spec>')
print('example: run_notifier.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
# Add a few entries to the device's GATT server

View File

@@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 5:
print(
'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
@@ -178,11 +178,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
await device.power_on()
@@ -192,8 +194,8 @@ async def main():
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!')
channel = sys.argv[4]
if channel == 'discover':
channel_str = sys.argv[4]
if channel_str == 'discover':
await list_rfcomm_channels(connection)
return
@@ -213,7 +215,7 @@ async def main():
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
channel = int(channel)
channel = int(channel_str)
print(f'### Opening session for channel {channel}...')
try:
session = await rfcomm_mux.open_dlc(channel)
@@ -229,7 +231,7 @@ async def main():
tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -107,7 +107,7 @@ class TcpServer:
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_rfcomm_server.py <device-config> <transport-spec> '
@@ -124,11 +124,13 @@ async def main():
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create a TCP server
@@ -153,7 +155,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,27 +20,31 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 2:
print('Usage: run_scanner.py <transport-spec> [filter]')
print('example: run_scanner.py usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
@device.on('advertisement')
def _(advertisement):
def on_adv(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
advertisement.address.address_type
]
@@ -67,10 +71,11 @@ async def main():
f'{advertisement.data.to_string(separator)}'
)
device.on('advertisement', on_adv)
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -26,11 +26,10 @@ from bumble.device import Device, CisLink
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_IsoDataPacket,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
UnicastServerAdvertisingData,
CodecSpecificCapabilities,
ContextType,
AudioLocation,
@@ -99,14 +98,14 @@ async def main() -> None:
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
SupportedSamplingFrequency.FREQ_48000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
min_octets_per_codec_frame=120,
max_octets_per_codec_frame=120,
supported_max_codec_frames_per_sdu=1,
),
),
@@ -142,6 +141,7 @@ async def main() -> None:
)
)
+ csis.get_advertising_data()
+ bytes(UnicastServerAdvertisingData())
)
subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0',
@@ -159,7 +159,7 @@ async def main() -> None:
+ struct.pack(
'<HHHHHHI',
18, # Header length.
24000 // 100, # Sampling Rate(/100Hz).
48000 // 100, # Sampling Rate(/100Hz).
0, # Bitrate(unused).
1, # Channels.
10000 // 10, # Frame duration(/10us).
@@ -179,11 +179,7 @@ async def main() -> None:
device.once('cis_establishment', on_cis)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
await device.create_advertising_set(
advertising_data=advertising_data,
)

View File

@@ -0,0 +1,193 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
import secrets
import websockets
import json
from bumble.core import AdvertisingData
from bumble.device import Device, AdvertisingParameters, AdvertisingEventProperties
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
)
from bumble.profiles.bap import (
UnicastServerAdvertisingData,
CodecSpecificCapabilities,
ContextType,
AudioLocation,
SupportedSamplingFrequency,
SupportedFrameDuration,
PacRecord,
PublishedAudioCapabilitiesService,
AudioStreamControlService,
)
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.profiles.vcp import VolumeControlService
from bumble.transport import open_transport_or_link
from typing import Optional
def dumps_volume_state(volume_setting: int, muted: int, change_counter: int) -> str:
return json.dumps(
{
'volume_setting': volume_setting,
'muted': muted,
'change_counter': change_counter,
}
)
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_vcp_renderer.py <config-file>' '<transport-spec-for-device>')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
# Add "placeholder" services to enable Android LEA features.
csis = CoordinatedSetIdentificationService(
set_identity_resolving_key=secrets.token_bytes(16),
set_identity_resolving_key_type=SirkType.PLAINTEXT,
)
device.add_service(CommonAudioServiceService(csis))
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
sink_pac=[
# Codec Capability Setting 48_4
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_48000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=120,
max_octets_per_codec_frame=120,
supported_max_codec_frames_per_sdu=1,
),
),
],
)
)
device.add_service(AudioStreamControlService(device, sink_ase_id=[1, 2]))
vcs = VolumeControlService()
device.add_service(vcs)
ws: Optional[websockets.WebSocketServerProtocol] = None
def on_volume_state(volume_setting: int, muted: int, change_counter: int):
if ws:
asyncio.create_task(
ws.send(dumps_volume_state(volume_setting, muted, change_counter))
)
vcs.on('volume_state', on_volume_state)
advertising_data = (
bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.FLAGS,
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_HOST_FLAG
| AdvertisingData.BR_EDR_CONTROLLER_FLAG
]
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)
+ csis.get_advertising_data()
+ bytes(UnicastServerAdvertisingData())
)
await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(),
own_address_type=OwnAddressType.PUBLIC,
),
advertising_data=advertising_data,
)
async def serve(websocket: websockets.WebSocketServerProtocol, _path):
nonlocal ws
await websocket.send(
dumps_volume_state(vcs.volume_setting, vcs.muted, vcs.change_counter)
)
ws = websocket
async for message in websocket:
volume_state = json.loads(message)
vcs.volume_state_bytes = bytes(
[
volume_state['volume_setting'],
volume_state['muted'],
volume_state['change_counter'],
]
)
await device.notify_subscribers(
vcs.volume_state, vcs.volume_state_bytes
)
ws = None
await websockets.serve(serve, 'localhost', 8989)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

103
examples/vcp_renderer.html Normal file
View File

@@ -0,0 +1,103 @@
<html data-bs-theme="dark">
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous">
</head>
<body>
<div class="container">
<label for="server-port" class="form-label">Server Port</label>
<div class="input-group mb-3">
<input type="text" class="form-control" aria-label="Port Number" value="8989" id="port">
<button class="btn btn-primary" type="button" onclick="connect()">Connect</button>
</div>
<div class="row">
<div class="col">
<label for="volume_setting" class="form-label">Volume Setting</label>
<input type="range" class="form-range" min="0" max="255" id="volume_setting">
</div>
<div class="col">
<label for="change_counter" class="form-label">Change Counter</label>
<input type="range" class="form-range" min="0" max="255" id="change_counter">
</div>
<div class="col">
<div class="form-check form-switch">
<input class="form-check-input" type="checkbox" role="switch" id="muted">
<label class="form-check-label" for="muted">Muted</label>
</div>
</div>
</div>
<button class="btn btn-primary" type="button" onclick="update_state()">Notify New Volume State</button>
<hr>
<div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2">
<h3>Log</h3>
<code id="socketState">
</code>
</div>
</div>
<script>
let portInput = document.getElementById("port")
let volumeSetting = document.getElementById("volume_setting")
let muted = document.getElementById("muted")
let changeCounter = document.getElementById("change_counter")
let socket = null
function connect() {
if (socket != null) {
return
}
socket = new WebSocket(`ws://localhost:${portInput.value}`);
socket.onopen = _ => {
socketState.innerText += 'OPEN\n'
}
socket.onclose = _ => {
socketState.innerText += 'CLOSED\n'
socket = null
}
socket.onerror = (error) => {
socketState.innerText += 'ERROR\n'
console.log(`ERROR: ${error}`)
}
socket.onmessage = (event) => {
socketState.innerText += `<- ${event.data}\n`
let volume_state = JSON.parse(event.data)
volumeSetting.value = volume_state.volume_setting
changeCounter.value = volume_state.change_counter
muted.checked = volume_state.muted ? true : false
}
}
function send(message) {
if (socket && socket.readyState == WebSocket.OPEN) {
let jsonMessage = JSON.stringify(message)
socketState.innerText += `-> ${jsonMessage}\n`
socket.send(jsonMessage)
} else {
socketState.innerText += 'NOT CONNECTED\n'
}
}
function update_state() {
send({
volume_setting: parseInt(volumeSetting.value),
change_counter: parseInt(changeCounter.value),
muted: muted.checked ? 1 : 0
})
}
</script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/js/bootstrap.bundle.min.js"
integrity="sha384-C6RzsynM9kWDrMNeT87bh95OGNyZPhcTNXj1NW7RuBCsyN/o0jlpcV8Qyq46cDfL"
crossorigin="anonymous"></script>
</body>
</html>

View File

@@ -1,7 +1,10 @@
# Next
# 0.2.0
- Code-gen company ID table
- Unstable support for extended advertisements
- CLI tools for downloading Realtek firmware
- PDL-generated types for HCI commands
# 0.1.0
- Initial release
- Initial release

2
rust/Cargo.lock generated
View File

@@ -182,7 +182,7 @@ dependencies = [
[[package]]
name = "bumble"
version = "0.1.0"
version = "0.2.0"
dependencies = [
"anyhow",
"bytes",

View File

@@ -1,7 +1,7 @@
[package]
name = "bumble"
description = "Rust API for the Bumble Bluetooth stack"
version = "0.1.0"
version = "0.2.0"
edition = "2021"
license = "Apache-2.0"
homepage = "https://google.github.io/bumble/index.html"
@@ -10,7 +10,7 @@ documentation = "https://docs.rs/crate/bumble"
authors = ["Marshall Pierce <marshallpierce@google.com>"]
keywords = ["bluetooth", "ble"]
categories = ["api-bindings", "network-programming"]
rust-version = "1.70.0"
rust-version = "1.76.0"
# https://github.com/frewsxcv/cargo-all-features#options
[package.metadata.cargo-all-features]

View File

@@ -37,6 +37,11 @@ PYTHONPATH=..:[virtualenv site-packages] \
cargo run --features bumble-tools --bin bumble -- --help
```
Notable subcommands:
- `firmware realtek download`: download Realtek firmware for various chipsets so that it can be automatically loaded when needed
- `usb probe`: show USB devices, highlighting the ones usable for Bluetooth
# Development
Run the tests:
@@ -63,4 +68,4 @@ To regenerate the assigned number tables based on the Python codebase:
```
PYTHONPATH=.. cargo run --bin gen-assigned-numbers --features dev-tools
```
```

View File

@@ -35,7 +35,7 @@ impl Controller {
/// module specifies the defaults. Must be called from a thread with a Python event loop, which
/// should be true on `tokio::main` and `async_std::main`.
///
/// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars.
/// For more info, see <https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars>.
pub async fn new(
name: &str,
host_source: Option<TransportSource>,

View File

@@ -149,7 +149,7 @@ impl ToPyObject for Address {
/// An error meaning that the u64 value did not represent a valid BT address.
#[derive(Debug)]
pub struct InvalidAddress(u64);
pub struct InvalidAddress(#[allow(unused)] u64);
impl TryInto<packets::Address> for Address {
type Error = ConversionError<InvalidAddress>;

View File

@@ -71,7 +71,7 @@ impl LeConnectionOrientedChannel {
/// Must be called from a thread with a Python event loop, which should be true on
/// `tokio::main` and `async_std::main`.
///
/// For more info, see https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars.
/// For more info, see <https://awestlake87.github.io/pyo3-asyncio/master/doc/pyo3_asyncio/#event-loop-references-and-contextvars>.
pub async fn disconnect(&mut self) -> PyResult<()> {
Python::with_gil(|py| {
self.0

View File

@@ -33,26 +33,25 @@ include_package_data = True
install_requires =
aiohttp ~= 3.8; platform_system!='Emscripten'
appdirs >= 1.4; platform_system!='Emscripten'
bt-test-interfaces >= 0.0.2; platform_system!='Emscripten'
click == 8.1.3; platform_system!='Emscripten'
click >= 8.1.3; platform_system!='Emscripten'
cryptography == 39; platform_system!='Emscripten'
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
cryptography >= 39.0; platform_system=='Emscripten'
grpcio == 1.57.0; platform_system!='Emscripten'
grpcio >= 1.62.1; platform_system!='Emscripten'
humanize >= 4.6.0; platform_system!='Emscripten'
libusb1 >= 2.0.1; platform_system!='Emscripten'
libusb-package == 1.0.26.1; platform_system!='Emscripten'
platformdirs == 3.10.0; platform_system!='Emscripten'
platformdirs >= 3.10.0; platform_system!='Emscripten'
prompt_toolkit >= 3.0.16; platform_system!='Emscripten'
prettytable >= 3.6.0; platform_system!='Emscripten'
protobuf >= 3.12.4; platform_system!='Emscripten'
protobuf >= 4.24.2; platform_system!='Emscripten'
pyee >= 8.2.2
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 8.1; platform_system!='Emscripten'
websockets >= 12.0; platform_system!='Emscripten'
[options.entry_points]
console_scripts =
@@ -83,12 +82,12 @@ build =
build >= 0.7
test =
pytest >= 8.0
pytest-asyncio == 0.21.1
pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0
coverage >= 6.4
development =
black == 22.10
grpcio-tools >= 1.57.0
black == 24.3
grpcio-tools >= 1.62.1
invoke >= 1.7.3
mypy == 1.8.0
nox >= 2022
@@ -98,8 +97,10 @@ development =
types-invoke >= 1.7.3
types-protobuf >= 4.21.0
avatar =
pandora-avatar == 0.0.5
rootcanal == 1.4.0 ; python_version>='3.10'
pandora-avatar == 0.0.9
rootcanal == 1.10.0 ; python_version>='3.10'
pandora =
bt-test-interfaces >= 0.0.6
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6

View File

@@ -17,6 +17,7 @@
# -----------------------------------------------------------------------------
from bumble.core import AdvertisingData, UUID, get_dict_key_by_value
# -----------------------------------------------------------------------------
def test_ad_data():
data = bytes([2, AdvertisingData.TX_POWER_LEVEL, 123])

View File

@@ -28,7 +28,7 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import Connection, Device
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.host import AclPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -50,7 +50,8 @@ from bumble.gatt import (
GATT_APPEARANCE_CHARACTERISTIC,
)
from .test_utils import TwoDevices
from .test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
# Logging
@@ -254,12 +255,12 @@ async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_legacy_advertising()
assert device.legacy_advertiser
await device.start_advertising()
assert device.is_advertising
# Stop advertising
await advertiser.stop()
assert not device.legacy_advertiser
await device.stop_advertising()
assert not device.is_advertising
# -----------------------------------------------------------------------------
@@ -273,7 +274,7 @@ async def test_legacy_advertising_connection(own_address_type):
peer_address = Address('F0:F1:F2:F3:F4:F5')
# Start advertising
advertiser = await device.start_legacy_advertising()
await device.start_advertising()
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
@@ -301,7 +302,7 @@ async def test_legacy_advertising_connection(own_address_type):
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_legacy_advertising(auto_restart=auto_restart)
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
@@ -310,20 +311,18 @@ async def test_legacy_advertising_disconnection(auto_restart):
ConnectionParameters(0, 0, 0),
)
device.start_legacy_advertising = mock.AsyncMock()
device.on_advertising_set_termination(
HCI_SUCCESS, device.legacy_advertising_set.advertising_handle, 0x0001, 0
)
device.on_disconnection(0x0001, 0)
await async_barrier()
await async_barrier()
if auto_restart:
device.start_legacy_advertising.assert_called_with(
advertising_type=advertiser.advertising_type,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
assert device.is_advertising
else:
device.start_legacy_advertising.assert_not_called()
assert not device.is_advertising
# -----------------------------------------------------------------------------
@@ -332,12 +331,13 @@ async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
# Start advertising
advertiser = await device.start_extended_advertising()
assert device.extended_advertisers
advertising_set = await device.create_advertising_set()
assert device.extended_advertising_sets
assert advertising_set.enabled
# Stop advertising
await advertiser.stop()
assert not device.extended_advertisers
await advertising_set.stop()
assert not advertising_set.enabled
# -----------------------------------------------------------------------------
@@ -349,8 +349,8 @@ async def test_extended_advertising():
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(
own_address_type=own_address_type
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
device.on_connection(
0x0001,
@@ -361,8 +361,9 @@ async def test_extended_advertising_connection(own_address_type):
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
advertising_set.advertising_handle,
0x0001,
0,
)
if own_address_type == OwnAddressType.PUBLIC:
@@ -375,45 +376,6 @@ async def test_extended_advertising_connection(own_address_type):
await asyncio.sleep(0.0001)
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'auto_restart,',
(True, False),
)
@pytest.mark.asyncio
async def test_extended_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertiser = await device.start_extended_advertising(auto_restart=auto_restart)
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
BT_PERIPHERAL_ROLE,
ConnectionParameters(0, 0, 0),
)
device.on_advertising_set_termination(
HCI_SUCCESS,
advertiser.handle,
0x0001,
)
device.start_extended_advertising = mock.AsyncMock()
device.on_disconnection(0x0001, 0)
if auto_restart:
device.start_extended_advertising.assert_called_with(
advertising_properties=advertiser.advertising_properties,
own_address_type=advertiser.own_address_type,
auto_restart=advertiser.auto_restart,
advertising_data=advertiser.advertising_data,
scan_response_data=advertiser.scan_response_data,
)
else:
device.start_extended_advertising.assert_not_called()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():
@@ -467,9 +429,8 @@ async def test_cis():
await asyncio.gather(*peripheral_cis_futures.values())
assert len(cis_links) == 2
# TODO: Fix Host CIS support.
# await cis_links[0].disconnect()
# await cis_links[1].disconnect()
await cis_links[0].disconnect()
await cis_links[1].disconnect()
# -----------------------------------------------------------------------------

View File

@@ -50,6 +50,7 @@ from bumble.att import (
ATT_Error_Response,
ATT_Read_By_Group_Type_Request,
)
from .test_utils import async_barrier
# -----------------------------------------------------------------------------
@@ -456,13 +457,6 @@ class LinkedDevices:
self.paired = [None, None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():

View File

@@ -23,6 +23,8 @@ from bumble.hci import (
HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_RESET_COMMAND,
HCI_SUCCESS,
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
Address,
CodingFormat,
CodecID,
@@ -274,8 +276,14 @@ def test_HCI_Set_Event_Mask_Command():
# -----------------------------------------------------------------------------
def test_HCI_LE_Set_Event_Mask_Command():
command = HCI_LE_Set_Event_Mask_Command(
le_event_mask=bytes.fromhex('0011223344556677')
le_event_mask=HCI_LE_Set_Event_Mask_Command.mask(
[
HCI_LE_CONNECTION_COMPLETE_EVENT,
HCI_LE_ENHANCED_CONNECTION_COMPLETE_V2_EVENT,
]
)
)
assert command.le_event_mask == bytes.fromhex('0100000000010000')
basic_check(command)

View File

@@ -19,26 +19,90 @@ import asyncio
import logging
import os
import pytest
import pytest_asyncio
from typing import Tuple
from typing import Tuple, Optional
from .test_utils import TwoDevices
from bumble import core
from bumble import device
from bumble import hfp
from bumble import rfcomm
from bumble import hci
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def _default_hf_configuration() -> hfp.HfConfiguration:
return hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.CODEC_NEGOTIATION,
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
hfp.HfFeature.HF_INDICATORS,
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_audio_codecs=[
hfp.AudioCodec.CVSD,
hfp.AudioCodec.MSBC,
],
)
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return hfp.HfSdpFeature.WIDE_BAND
# -----------------------------------------------------------------------------
def _default_ag_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
# -----------------------------------------------------------------------------
async def make_hfp_connections(
hf_config: hfp.Configuration,
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
hf_config: Optional[hfp.HfConfiguration] = None,
ag_config: Optional[hfp.AgConfiguration] = None,
):
if not hf_config:
hf_config = _default_hf_configuration()
if not ag_config:
ag_config = _default_ag_configuration()
# Setup devices
devices = TwoDevices()
await devices.setup_connection()
@@ -55,38 +119,200 @@ async def make_hfp_connections(
# Setup HFP connection
hf = hfp.HfProtocol(client_dlc, hf_config)
ag = hfp.HfpProtocol(server_dlc)
return hf, ag
ag = hfp.AgProtocol(server_dlc, ag_config)
await hf.initiate_slc()
return (hf, ag)
# -----------------------------------------------------------------------------
@pytest_asyncio.fixture
async def hfp_connections():
hf, ag = await make_hfp_connections()
hf_loop_task = asyncio.create_task(hf.run())
try:
yield (hf, ag)
finally:
# Close the coroutine.
hf.unsolicited_queue.put_nowait(None)
await hf_loop_task
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc():
hf_config = hfp.Configuration(
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
)
hf, ag = await make_hfp_connections(hf_config)
async def ag_loop():
while line := await ag.next_line():
if line.startswith('AT+BRSF'):
ag.send_response_line('+BRSF: 0')
elif line.startswith('AT+CIND=?'):
ag.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
async def test_slc_with_minimal_features():
hf, ag = await make_hfp_connections(
hfp.HfConfiguration(
supported_audio_codecs=[],
supported_hf_features=[],
supported_hf_indicators=[],
),
hfp.AgConfiguration(
supported_ag_call_hold_operations=[],
supported_ag_features=[],
supported_ag_indicators=[
hfp.AgIndicatorState(
indicator=hfp.AgIndicator.CALL,
supported_values={0, 1},
current_status=0,
)
elif line.startswith('AT+CIND?'):
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
ag.send_response_line('OK')
],
supported_hf_indicators=[],
supported_audio_codecs=[],
),
)
ag_task = asyncio.create_task(ag_loop())
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
await hf.initiate_slc()
ag_task.cancel()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
hf.on('ag_indicator', future.set_result)
ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
indicator: hfp.AgIndicatorState = await future
assert indicator.current_status == 1
assert indicator.indicator == hfp.AgIndicator.CALL
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hf_indicator', future.set_result)
await hf.execute_command('AT+BIEV=2,100')
indicator: hfp.HfIndicatorState = await future
assert indicator.current_status == 100
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
hf.on('codec_negotiation', futures[0].set_result)
ag.on('codec_negotiation', futures[1].set_result)
await ag.negotiate_codec(hfp.AudioCodec.MSBC)
assert await futures[0] == await futures[1]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
NUMBER = 'ATD123456789'
future = asyncio.get_running_loop().create_future()
ag.on('dial', future.set_result)
await hf.execute_command(f'ATD{NUMBER}')
number: str = await future
assert number == NUMBER
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: future.set_result(None))
await hf.answer_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.reject_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.terminate_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_hf_sdp_features(),
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_ag_sdp_features(),
)
# -----------------------------------------------------------------------------
@@ -109,7 +335,7 @@ async def test_sco_setup():
devices[1].accept(devices[0].public_address),
)
def on_sco_request(_connection: device.Connection, _link_type: int):
def on_sco_request(_connection, _link_type: int):
connections[1].abort_on(
'disconnection',
devices[1].send_command(
@@ -124,17 +350,13 @@ async def test_sco_setup():
devices[1].on('sco_request', on_sco_request)
sco_connections = [
sco_connection_futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
devices[0].on(
'sco_connection', lambda sco_link: sco_connections[0].set_result(sco_link)
)
devices[1].on(
'sco_connection', lambda sco_link: sco_connections[1].set_result(sco_link)
)
for device, future in zip(devices, sco_connection_futures):
device.on('sco_connection', future.set_result)
await devices[0].send_command(
hci.HCI_Enhanced_Setup_Synchronous_Connection_Command(
@@ -142,8 +364,17 @@ async def test_sco_setup():
**hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(),
)
)
sco_connections = await asyncio.gather(*sco_connection_futures)
await asyncio.gather(*sco_connections)
sco_disconnection_futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
for future, sco_connection in zip(sco_disconnection_futures, sco_connections):
sco_connection.on('disconnection', future.set_result)
await sco_connections[0].disconnect()
await asyncio.gather(*sco_disconnection_futures)
# -----------------------------------------------------------------------------

Some files were not shown because too many files have changed in this diff Show More