Compare commits

...

27 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod b8b78ca1ee add missing file 2025-07-27 15:02:42 -07:00
Gilles Boccon-Gibod d611d25802 resolve merge conflicts 2025-07-26 21:20:52 -07:00
zxzxwu cce2e4d4e3 Merge pull request #729 from zxzxwu/link
Remove link-relay and RemoteLink
2025-07-23 14:35:59 +08:00
Josh Wu 1b44e73f90 Remove link-relay and RemoteLink 2025-07-21 12:37:55 +08:00
zxzxwu 1a81c5d05c Merge pull request #718 from zxzxwu/l2cap
Migrate L2CAP packets to dataclasses
2025-07-20 18:38:19 +08:00
zxzxwu d8a43f0151 Merge pull request #728 from zxzxwu/hci
Allow register HCI packets with custom names
2025-07-20 18:36:39 +08:00
Josh Wu 858788f05e Migrate L2CAP packets to dataclasses 2025-07-20 18:30:02 +08:00
Josh Wu 41f8797a4c Add more test cases for custom packets 2025-07-20 18:25:11 +08:00
Josh Wu fc3fd7f25b Allow register HCI packets with custom names 2025-07-19 21:19:53 +08:00
zxzxwu 48bbf9f1e0 Merge pull request #721 from khsiao-google/main
Implement HCI_Mode_Change_Event
2025-07-18 16:49:23 +08:00
khsiao-google 3d6c595c6e Merge branch 'google:main' into main 2025-07-16 05:22:43 +00:00
zxzxwu d9d971b8b3 Merge pull request #720 from adjscent/main
Update pyproject.toml to python 3.9
2025-07-14 19:34:19 +08:00
zxzxwu a5effb433b Merge pull request #727 from vvydria/fix-HCI_LE_Set_Privacy_Mode_Command
fix: add missing metadata call for `peer_identity_address_type` in `HCI_LE_Set_Privacy_Mode_Command`
2025-07-14 19:14:18 +08:00
Vitalii Vydria 8802c95d31 fix: metadata call for peer_identity_address_type
Fixed a crash caused by a missing `metadata` initialization for the `peer_identity_address_type` field in the `HCI_LE_Set_Privacy_Mode_Command` dataclass.
The absence of this call led to incorrect field setup, resulting in runtime exceptions during `HCI_LE_Set_Privacy_Mode_Command` handling.
2025-07-14 13:39:08 +03:00
khsiao-google a184cae560 Implement HCI_Mode_Change_Event 2025-07-14 08:43:27 +00:00
Gilles Boccon-Gibod fa6fe2aaca Merge pull request #723 from google/gbg/bt-bench-iso
add iso support to bench app
2025-07-07 18:03:13 +02:00
Gilles Boccon-Gibod 43a8cc37f8 add iso support to bench app 2025-07-07 13:03:19 +02:00
adjscent e45143e33d Update pyproject.toml 2025-07-06 01:19:04 +08:00
zxzxwu 1c1b947455 Merge pull request #716 from zxzxwu/hci
Migrate all HCI_Command to dataclasses
2025-07-02 22:16:37 +08:00
zxzxwu d7ddffd275 Merge pull request #719 from zxzxwu/rust
Fix Rust linter errors
2025-07-02 22:07:15 +08:00
Josh Wu 3cb97d2373 Fix Rust linter errors 2025-07-02 12:41:39 +08:00
Josh Wu bad037b010 Migrate all HCI_Command to dataclasses 2025-06-26 02:10:07 +08:00
zxzxwu 88777710a4 Merge pull request #715 from zxzxwu/hci
Migrate all HCI_Event to dataclasses
2025-06-26 00:15:00 +08:00
Josh Wu 0ab5b6c49a Migrate all HCI_Event to dataclasses 2025-06-25 17:07:22 +08:00
zxzxwu 22ff0d5e32 Merge pull request #711 from zxzxwu/hci_extended_event_migration
Migrate all HCI_Extended_Event to dataclasses
2025-06-24 17:47:50 +08:00
Josh Wu 2f5de37d76 Migrate all HCI_Extended_Event to dataclasses 2025-06-24 17:15:22 +08:00
Gilles Boccon-Gibod 799d730f88 Merge pull request #714 from google/gbg/oob-sc-fix
fix legacy pairing with oob
2025-06-24 05:40:08 +02:00
109 changed files with 3884 additions and 4139 deletions
+3 -1
View File
@@ -102,5 +102,7 @@
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true
"python.testing.pytestEnabled": true,
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.pythonProjects": []
}
-3
View File
@@ -12,9 +12,6 @@ Apps
## `show.py`
Parse a file with HCI packets and print the details of each packet in a human readable form
## `link_relay.py`
Simple WebSocket relay for virtual RemoteLink instances to communicate with each other through.
## `hci_bridge.py`
This app acts as a simple bridge between two HCI transports, with a host on one side and
a controller on the other. All the HCI packets bridged between the two are printed on the console
+9 -6
View File
@@ -23,7 +23,6 @@ import contextlib
import dataclasses
import functools
import logging
import os
import struct
from typing import (
Any,
@@ -54,6 +53,8 @@ from bumble.profiles import bass
import bumble.device
import bumble.transport
import bumble.utils
import bumble.logging
# -----------------------------------------------------------------------------
# Logging
@@ -128,7 +129,7 @@ class BroadcastScanner(bumble.utils.EventEmitter):
broadcast_audio_announcement: Optional[bap.BroadcastAudioAnnouncement] = None
basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None
appearance: Optional[core.Appearance] = None
biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None
biginfo: Optional[bumble.device.BigInfoAdvertisement] = None
manufacturer_data: Optional[tuple[str, bytes]] = None
def __post_init__(self) -> None:
@@ -255,8 +256,10 @@ class BroadcastScanner(bumble.utils.EventEmitter):
print(color(' SDU Interval: ', 'magenta'), self.biginfo.sdu_interval)
print(color(' Max SDU: ', 'magenta'), self.biginfo.max_sdu)
print(color(' PHY: ', 'magenta'), self.biginfo.phy.name)
print(color(' Framed: ', 'magenta'), self.biginfo.framed)
print(color(' Encrypted: ', 'magenta'), self.biginfo.encrypted)
print(color(' Framing: ', 'magenta'), self.biginfo.framing.name)
print(
color(' Encryption: ', 'magenta'), self.biginfo.encryption.name
)
def on_sync_establishment(self) -> None:
self.emit('sync_establishment')
@@ -286,7 +289,7 @@ class BroadcastScanner(bumble.utils.EventEmitter):
self.emit('change')
def on_biginfo_advertisement(
self, advertisement: bumble.device.BIGInfoAdvertisement
self, advertisement: bumble.device.BigInfoAdvertisement
) -> None:
self.biginfo = advertisement
self.emit('change')
@@ -1233,7 +1236,7 @@ def transmit(
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
auracast()
+365 -22
View File
@@ -19,7 +19,6 @@ import asyncio
import dataclasses
import enum
import logging
import os
import statistics
import struct
import time
@@ -36,7 +35,15 @@ from bumble.core import (
CommandTimeoutError,
)
from bumble.colors import color
from bumble.device import Connection, ConnectionParametersPreferences, Device, Peer
from bumble.core import ConnectionPHY
from bumble.device import (
CigParameters,
CisLink,
Connection,
ConnectionParametersPreferences,
Device,
Peer,
)
from bumble.gatt import Characteristic, CharacteristicValue, Service
from bumble.hci import (
HCI_LE_1M_PHY,
@@ -46,6 +53,7 @@ from bumble.hci import (
HCI_Constant,
HCI_Error,
HCI_StatusError,
HCI_IsoDataPacket,
)
from bumble.sdp import (
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
@@ -56,11 +64,12 @@ from bumble.sdp import (
DataElement,
ServiceAttribute,
)
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.rfcomm
import bumble.core
from bumble.utils import AsyncRunner
from bumble.pairing import PairingConfig
import bumble.logging
# -----------------------------------------------------------------------------
@@ -83,11 +92,21 @@ SPEED_TX_UUID = 'E789C754-41A1-45F4-A948-A0A1A90DBA53'
SPEED_RX_UUID = '016A2CC7-E14B-4819-935F-1F56EAE4098D'
DEFAULT_RFCOMM_UUID = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
DEFAULT_L2CAP_PSM = 128
DEFAULT_L2CAP_MAX_CREDITS = 128
DEFAULT_L2CAP_MTU = 1024
DEFAULT_L2CAP_MPS = 1024
DEFAULT_ISO_MAX_SDU_C_TO_P = 251
DEFAULT_ISO_MAX_SDU_P_TO_C = 251
DEFAULT_ISO_SDU_INTERVAL_C_TO_P = 10000
DEFAULT_ISO_SDU_INTERVAL_P_TO_C = 10000
DEFAULT_ISO_MAX_TRANSPORT_LATENCY_C_TO_P = 35
DEFAULT_ISO_MAX_TRANSPORT_LATENCY_P_TO_C = 35
DEFAULT_ISO_RTN_C_TO_P = 3
DEFAULT_ISO_RTN_P_TO_C = 3
DEFAULT_LINGER_TIME = 1.0
DEFAULT_POST_CONNECTION_WAIT_TIME = 1.0
@@ -104,14 +123,14 @@ def le_phy_name(phy_id):
)
def print_connection_phy(phy):
def print_connection_phy(phy: ConnectionPHY) -> None:
logging.info(
color('@@@ PHY: ', 'yellow') + f'TX:{le_phy_name(phy.tx_phy)}/'
f'RX:{le_phy_name(phy.rx_phy)}'
)
def print_connection(connection):
def print_connection(connection: Connection) -> None:
params = []
if connection.transport == PhysicalTransport.LE:
params.append(
@@ -136,6 +155,34 @@ def print_connection(connection):
logging.info(color('@@@ Connection: ', 'yellow') + ' '.join(params))
def print_cis_link(cis_link: CisLink) -> None:
logging.info(color("@@@ CIS established", "green"))
logging.info(color('@@@ ISO interval: ', 'green') + f"{cis_link.iso_interval}ms")
logging.info(color('@@@ NSE: ', 'green') + f"{cis_link.nse}")
logging.info(color('@@@ Central->Peripheral:', 'green'))
if cis_link.phy_c_to_p is not None:
logging.info(
color('@@@ PHY: ', 'green') + f"{cis_link.phy_c_to_p.name}"
)
logging.info(
color('@@@ Latency: ', 'green') + f"{cis_link.transport_latency_c_to_p}µs"
)
logging.info(color('@@@ BN: ', 'green') + f"{cis_link.bn_c_to_p}")
logging.info(color('@@@ FT: ', 'green') + f"{cis_link.ft_c_to_p}")
logging.info(color('@@@ Max PDU: ', 'green') + f"{cis_link.max_pdu_c_to_p}")
logging.info(color('@@@ Peripheral->Central:', 'green'))
if cis_link.phy_p_to_c is not None:
logging.info(
color('@@@ PHY: ', 'green') + f"{cis_link.phy_p_to_c.name}"
)
logging.info(
color('@@@ Latency: ', 'green') + f"{cis_link.transport_latency_p_to_c}µs"
)
logging.info(color('@@@ BN: ', 'green') + f"{cis_link.bn_p_to_c}")
logging.info(color('@@@ FT: ', 'green') + f"{cis_link.ft_p_to_c}")
logging.info(color('@@@ Max PDU: ', 'green') + f"{cis_link.max_pdu_p_to_c}")
def make_sdp_records(channel):
return {
0x00010001: [
@@ -461,7 +508,8 @@ class Sender:
self.bytes_sent += len(packet)
await self.packet_io.send_packet(packet)
await self.done.wait()
if self.packet_io.can_receive():
await self.done.wait()
run_counter = f'[{run + 1} of {self.repeat + 1}]' if self.repeat else ''
logging.info(color(f'=== {run_counter} Done!', 'magenta'))
@@ -491,6 +539,9 @@ class Sender:
)
self.done.set()
def is_sender(self):
return True
# -----------------------------------------------------------------------------
# Receiver
@@ -538,7 +589,8 @@ class Receiver:
logging.info(
color(
f'!!! Unexpected packet, expected {self.expected_packet_index} '
f'but received {packet.sequence}'
f'but received {packet.sequence}',
'red',
)
)
@@ -581,6 +633,9 @@ class Receiver:
await self.done.wait()
logging.info(color('=== Done!', 'magenta'))
def is_sender(self):
return False
# -----------------------------------------------------------------------------
# Ping
@@ -716,7 +771,8 @@ class Ping:
color(
f'!!! Unexpected packet, '
f'expected {self.next_expected_packet_index} '
f'but received {packet.sequence}'
f'but received {packet.sequence}',
'red',
)
)
@@ -724,6 +780,9 @@ class Ping:
self.done.set()
return
def is_sender(self):
return True
# -----------------------------------------------------------------------------
# Pong
@@ -768,7 +827,8 @@ class Pong:
logging.info(
color(
f'!!! Unexpected packet, expected {self.expected_packet_index} '
f'but received {packet.sequence}'
f'but received {packet.sequence}',
'red',
)
)
@@ -790,6 +850,9 @@ class Pong:
await self.done.wait()
logging.info(color('=== Done!', 'magenta'))
def is_sender(self):
return False
# -----------------------------------------------------------------------------
# GattClient
@@ -953,6 +1016,9 @@ class StreamedPacketIO:
# pylint: disable-next=not-callable
self.io_sink(struct.pack('>H', len(packet)) + packet)
def can_receive(self):
return True
# -----------------------------------------------------------------------------
# L2capClient
@@ -1224,6 +1290,96 @@ class RfcommServer(StreamedPacketIO):
await self.dlc.drain()
# -----------------------------------------------------------------------------
# IsoClient
# -----------------------------------------------------------------------------
class IsoClient(StreamedPacketIO):
def __init__(
self,
device: Device,
) -> None:
super().__init__()
self.device = device
self.ready = asyncio.Event()
self.cis_link: Optional[CisLink] = None
async def on_connection(
self, connection: Connection, cis_link: CisLink, sender: bool
) -> None:
connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection)
self.cis_link = cis_link
self.io_sink = cis_link.write
await cis_link.setup_data_path(
cis_link.Direction.HOST_TO_CONTROLLER
if sender
else cis_link.Direction.CONTROLLER_TO_HOST
)
cis_link.sink = self.on_iso_packet
self.ready.set()
def on_iso_packet(self, iso_packet: HCI_IsoDataPacket) -> None:
self.on_packet(iso_packet.iso_sdu_fragment)
def on_disconnection(self, _):
pass
async def drain(self):
if self.cis_link is None:
return
await self.cis_link.drain()
def can_receive(self):
return False
# -----------------------------------------------------------------------------
# IsoServer
# -----------------------------------------------------------------------------
class IsoServer(StreamedPacketIO):
def __init__(
self,
device: Device,
):
super().__init__()
self.device = device
self.cis_link: Optional[CisLink] = None
self.ready = asyncio.Event()
logging.info(
color(
'### Listening for ISO connection',
'yellow',
)
)
async def on_connection(
self, connection: Connection, cis_link: CisLink, sender: bool
) -> None:
connection.on(connection.EVENT_DISCONNECTION, self.on_disconnection)
self.io_sink = cis_link.write
await cis_link.setup_data_path(
cis_link.Direction.HOST_TO_CONTROLLER
if sender
else cis_link.Direction.CONTROLLER_TO_HOST
)
cis_link.sink = self.on_iso_packet
self.ready.set()
def on_iso_packet(self, iso_packet: HCI_IsoDataPacket) -> None:
self.on_packet(iso_packet.iso_sdu_fragment)
def on_disconnection(self, _):
pass
async def drain(self):
if self.cis_link is None:
return
await self.cis_link.drain()
def can_receive(self):
return False
# -----------------------------------------------------------------------------
# Central
# -----------------------------------------------------------------------------
@@ -1232,13 +1388,22 @@ class Central(Connection.Listener):
self,
transport,
peripheral_address,
classic,
scenario_factory,
mode_factory,
connection_interval,
phy,
authenticate,
encrypt,
iso,
iso_sdu_interval_c_to_p,
iso_sdu_interval_p_to_c,
iso_max_sdu_c_to_p,
iso_max_sdu_p_to_c,
iso_max_transport_latency_c_to_p,
iso_max_transport_latency_p_to_c,
iso_rtn_c_to_p,
iso_rtn_p_to_c,
classic,
extended_data_length,
role_switch,
le_scan,
@@ -1250,6 +1415,15 @@ class Central(Connection.Listener):
self.transport = transport
self.peripheral_address = peripheral_address
self.classic = classic
self.iso = iso
self.iso_sdu_interval_c_to_p = iso_sdu_interval_c_to_p
self.iso_sdu_interval_p_to_c = iso_sdu_interval_p_to_c
self.iso_max_sdu_c_to_p = iso_max_sdu_c_to_p
self.iso_max_sdu_p_to_c = iso_max_sdu_p_to_c
self.iso_max_transport_latency_c_to_p = iso_max_transport_latency_c_to_p
self.iso_max_transport_latency_p_to_c = iso_max_transport_latency_p_to_c
self.iso_rtn_c_to_p = iso_rtn_c_to_p
self.iso_rtn_p_to_c = iso_rtn_p_to_c
self.scenario_factory = scenario_factory
self.mode_factory = mode_factory
self.authenticate = authenticate
@@ -1296,7 +1470,7 @@ class Central(Connection.Listener):
async def run(self):
logging.info(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
async with await open_transport(self.transport) as (
hci_source,
hci_sink,
):
@@ -1308,6 +1482,13 @@ class Central(Connection.Listener):
)
mode = self.mode_factory(self.device)
scenario = self.scenario_factory(mode)
self.device.classic_enabled = self.classic
self.device.cis_enabled = self.iso
# Set up a pairing config factory with minimal requirements.
self.device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
await pre_power_on(self.device, self.classic)
await self.device.power_on()
@@ -1392,7 +1573,72 @@ class Central(Connection.Listener):
)
)
await mode.on_connection(self.connection)
# Setup ISO streams.
if self.iso:
if scenario.is_sender():
sdu_interval_c_to_p = (
self.iso_sdu_interval_c_to_p or DEFAULT_ISO_SDU_INTERVAL_C_TO_P
)
sdu_interval_p_to_c = self.iso_sdu_interval_p_to_c or 0
max_transport_latency_c_to_p = (
self.iso_max_transport_latency_c_to_p
or DEFAULT_ISO_MAX_TRANSPORT_LATENCY_C_TO_P
)
max_transport_latency_p_to_c = (
self.iso_max_transport_latency_p_to_c or 0
)
max_sdu_c_to_p = (
self.iso_max_sdu_c_to_p or DEFAULT_ISO_MAX_SDU_C_TO_P
)
max_sdu_p_to_c = self.iso_max_sdu_p_to_c or 0
rtn_c_to_p = self.iso_rtn_c_to_p or DEFAULT_ISO_RTN_C_TO_P
rtn_p_to_c = self.iso_rtn_p_to_c or 0
else:
sdu_interval_p_to_c = (
self.iso_sdu_interval_p_to_c or DEFAULT_ISO_SDU_INTERVAL_P_TO_C
)
sdu_interval_c_to_p = self.iso_sdu_interval_c_to_p or 0
max_transport_latency_p_to_c = (
self.iso_max_transport_latency_p_to_c
or DEFAULT_ISO_MAX_TRANSPORT_LATENCY_P_TO_C
)
max_transport_latency_c_to_p = (
self.iso_max_transport_latency_c_to_p or 0
)
max_sdu_p_to_c = (
self.iso_max_sdu_p_to_c or DEFAULT_ISO_MAX_SDU_P_TO_C
)
max_sdu_c_to_p = self.iso_max_sdu_c_to_p or 0
rtn_p_to_c = self.iso_rtn_p_to_c or DEFAULT_ISO_RTN_P_TO_C
rtn_c_to_p = self.iso_rtn_c_to_p or 0
cis_handles = await self.device.setup_cig(
CigParameters(
cig_id=1,
sdu_interval_c_to_p=sdu_interval_c_to_p,
sdu_interval_p_to_c=sdu_interval_p_to_c,
max_transport_latency_c_to_p=max_transport_latency_c_to_p,
max_transport_latency_p_to_c=max_transport_latency_p_to_c,
cis_parameters=[
CigParameters.CisParameters(
cis_id=2,
max_sdu_c_to_p=max_sdu_c_to_p,
max_sdu_p_to_c=max_sdu_p_to_c,
rtn_c_to_p=rtn_c_to_p,
rtn_p_to_c=rtn_p_to_c,
)
],
)
)
cis_link = (
await self.device.create_cis([(cis_handles[0], self.connection)])
)[0]
print_cis_link(cis_link)
await mode.on_connection(
self.connection, cis_link, scenario.is_sender()
)
else:
await mode.on_connection(self.connection)
await scenario.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
@@ -1428,6 +1674,7 @@ class Peripheral(Device.Listener, Connection.Listener):
scenario_factory,
mode_factory,
classic,
iso,
extended_data_length,
role_switch,
le_scan,
@@ -1437,6 +1684,7 @@ class Peripheral(Device.Listener, Connection.Listener):
):
self.transport = transport
self.classic = classic
self.iso = iso
self.scenario_factory = scenario_factory
self.mode_factory = mode_factory
self.extended_data_length = extended_data_length
@@ -1457,7 +1705,7 @@ class Peripheral(Device.Listener, Connection.Listener):
async def run(self):
logging.info(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
async with await open_transport(self.transport) as (
hci_source,
hci_sink,
):
@@ -1470,6 +1718,13 @@ class Peripheral(Device.Listener, Connection.Listener):
self.device.listener = self
self.mode = self.mode_factory(self.device)
self.scenario = self.scenario_factory(self.mode)
self.device.classic_enabled = self.classic
self.device.cis_enabled = self.iso
# Set up a pairing config factory with minimal requirements.
self.device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
await pre_power_on(self.device, self.classic)
await self.device.power_on()
@@ -1501,7 +1756,21 @@ class Peripheral(Device.Listener, Connection.Listener):
logging.info(color('### Connected', 'cyan'))
print_connection(self.connection)
await self.mode.on_connection(self.connection)
if self.iso:
async def on_cis_request(cis_link: CisLink) -> None:
logging.info(color("@@@ Accepting CIS", "green"))
await self.device.accept_cis_request(cis_link)
print_cis_link(cis_link)
await self.mode.on_connection(
self.connection, cis_link, self.scenario.is_sender()
)
self.connection.on(self.connection.EVENT_CIS_REQUEST, on_cis_request)
else:
await self.mode.on_connection(self.connection)
await self.scenario.run()
await asyncio.sleep(DEFAULT_LINGER_TIME)
@@ -1613,6 +1882,12 @@ def create_mode_factory(ctx, default_mode):
credits_threshold=ctx.obj['rfcomm_credits_threshold'],
)
if mode == 'iso-server':
return IsoServer(device)
if mode == 'iso-client':
return IsoClient(device)
raise ValueError('invalid mode')
return create_mode
@@ -1640,6 +1915,9 @@ def create_scenario_factory(ctx, default_scenario):
return Receiver(packet_io, ctx.obj['linger'])
if scenario == 'ping':
if isinstance(packet_io, (IsoClient, IsoServer)):
raise ValueError('ping not supported with ISO')
return Ping(
packet_io,
start_delay=ctx.obj['start_delay'],
@@ -1651,6 +1929,9 @@ def create_scenario_factory(ctx, default_scenario):
)
if scenario == 'pong':
if isinstance(packet_io, (IsoClient, IsoServer)):
raise ValueError('pong not supported with ISO')
return Pong(packet_io, ctx.obj['linger'])
raise ValueError('invalid scenario')
@@ -1674,6 +1955,8 @@ def create_scenario_factory(ctx, default_scenario):
'l2cap-server',
'rfcomm-client',
'rfcomm-server',
'iso-client',
'iso-server',
]
),
)
@@ -1896,6 +2179,7 @@ def bench(
ctx.obj['classic_page_scan'] = classic_page_scan
ctx.obj['classic_inquiry_scan'] = classic_inquiry_scan
ctx.obj['classic'] = mode in ('rfcomm-client', 'rfcomm-server')
ctx.obj['iso'] = mode in ('iso-client', 'iso-server')
@bench.command()
@@ -1917,26 +2201,88 @@ def bench(
@click.option('--phy', type=click.Choice(['1m', '2m', 'coded']), help='PHY to use')
@click.option('--authenticate', is_flag=True, help='Authenticate (RFComm only)')
@click.option('--encrypt', is_flag=True, help='Encrypt the connection (RFComm only)')
@click.option(
'--iso-sdu-interval-c-to-p',
type=int,
help='ISO SDU central -> peripheral (microseconds)',
)
@click.option(
'--iso-sdu-interval-p-to-c',
type=int,
help='ISO SDU interval peripheral -> central (microseconds)',
)
@click.option(
'--iso-max-sdu-c-to-p',
type=int,
help='ISO max SDU central -> peripheral',
)
@click.option(
'--iso-max-sdu-p-to-c',
type=int,
help='ISO max SDU peripheral -> central',
)
@click.option(
'--iso-max-transport-latency-c-to-p',
type=int,
help='ISO max transport latency central -> peripheral (milliseconds)',
)
@click.option(
'--iso-max-transport-latency-p-to-c',
type=int,
help='ISO max transport latency peripheral -> central (milliseconds)',
)
@click.option(
'--iso-rtn-c-to-p',
type=int,
help='ISO RTN central -> peripheral (integer count)',
)
@click.option(
'--iso-rtn-p-to-c',
type=int,
help='ISO RTN peripheral -> central (integer count)',
)
@click.pass_context
def central(
ctx, transport, peripheral_address, connection_interval, phy, authenticate, encrypt
ctx,
transport,
peripheral_address,
connection_interval,
phy,
authenticate,
encrypt,
iso_sdu_interval_c_to_p,
iso_sdu_interval_p_to_c,
iso_max_sdu_c_to_p,
iso_max_sdu_p_to_c,
iso_max_transport_latency_c_to_p,
iso_max_transport_latency_p_to_c,
iso_rtn_c_to_p,
iso_rtn_p_to_c,
):
"""Run as a central (initiates the connection)"""
scenario_factory = create_scenario_factory(ctx, 'send')
mode_factory = create_mode_factory(ctx, 'gatt-client')
classic = ctx.obj['classic']
async def run_central():
await Central(
transport,
peripheral_address,
classic,
scenario_factory,
mode_factory,
connection_interval,
phy,
authenticate,
encrypt or authenticate,
ctx.obj['iso'],
iso_sdu_interval_c_to_p,
iso_sdu_interval_p_to_c,
iso_max_sdu_c_to_p,
iso_max_sdu_p_to_c,
iso_max_transport_latency_c_to_p,
iso_max_transport_latency_p_to_c,
iso_rtn_c_to_p,
iso_rtn_p_to_c,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
ctx.obj['role_switch'],
ctx.obj['le_scan'],
@@ -1962,6 +2308,7 @@ def peripheral(ctx, transport):
scenario_factory,
mode_factory,
ctx.obj['classic'],
ctx.obj['iso'],
ctx.obj['extended_data_length'],
ctx.obj['role_switch'],
ctx.obj['le_scan'],
@@ -1974,11 +2321,7 @@ def peripheral(ctx, transport):
def main():
logging.basicConfig(
level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper(),
format="[%(asctime)s.%(msecs)03d] %(levelname)s:%(name)s:%(message)s",
datefmt="%H:%M:%S",
)
bumble.logging.setup_basic_logging('INFO')
bench()
+2 -2
View File
@@ -64,7 +64,7 @@ from bumble.device import (
Peer,
)
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
from bumble.gatt_client import CharacteristicProxy
from bumble.hci import (
@@ -291,7 +291,7 @@ class ConsoleApp:
async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport(transport) as (hci_source, hci_sink):
if device_config:
self.device = Device.from_config_file_with_hci(
device_config, hci_source, hci_sink
+44 -12
View File
@@ -16,8 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import time
import click
@@ -58,7 +56,8 @@ from bumble.hci import (
HCI_Read_Local_Version_Information_Command,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -242,28 +241,43 @@ async def get_codecs_info(host: Host) -> None:
# -----------------------------------------------------------------------------
async def async_main(latency_probes, transport):
async def async_main(
latency_probes, latency_probe_interval, latency_probe_command, transport
):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport(transport) as (hci_source, hci_sink):
print('<<< connected')
host = Host(hci_source, hci_sink)
await host.reset()
# Measure the latency if requested
# (we add an extra probe at the start, that we ignore, just to ensure that
# the transport is primed)
latencies = []
if latency_probes:
for _ in range(latency_probes):
if latency_probe_command:
probe_hci_command = HCI_Command.from_bytes(
bytes.fromhex(latency_probe_command)
)
else:
probe_hci_command = HCI_Read_Local_Version_Information_Command()
for iteration in range(1 + latency_probes):
if latency_probe_interval:
await asyncio.sleep(latency_probe_interval / 1000)
start = time.time()
await host.send_command(HCI_Read_Local_Version_Information_Command())
latencies.append(1000 * (time.time() - start))
await host.send_command(probe_hci_command)
if iteration:
latencies.append(1000 * (time.time() - start))
print(
color('HCI Command Latency:', 'yellow'),
(
f'min={min(latencies):.2f}, '
f'max={max(latencies):.2f}, '
f'average={sum(latencies)/len(latencies):.2f}'
f'average={sum(latencies)/len(latencies):.2f},'
),
[f'{latency:.4}' for latency in latencies],
'\n',
)
@@ -311,10 +325,28 @@ async def async_main(latency_probes, transport):
type=int,
help='Send N commands to measure HCI transport latency statistics',
)
@click.option(
'--latency-probe-interval',
metavar='INTERVAL',
type=int,
help='Interval between latency probes (milliseconds)',
)
@click.option(
'--latency-probe-command',
metavar='COMMAND_HEX',
help=(
'Probe command (HCI Command packet bytes, in hex. Use 0177FC00 for'
' a loopback test with the HCI remote proxy app)'
),
)
@click.argument('transport')
def main(latency_probes, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
asyncio.run(async_main(latency_probes, transport))
def main(latency_probes, latency_probe_interval, latency_probe_command, transport):
bumble.logging.setup_basic_logging()
asyncio.run(
async_main(
latency_probes, latency_probe_interval, latency_probe_command, transport
)
)
# -----------------------------------------------------------------------------
+7 -7
View File
@@ -16,10 +16,11 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
import click
from bumble.colors import color
from bumble.hci import (
HCI_READ_LOOPBACK_MODE_COMMAND,
@@ -29,8 +30,8 @@ from bumble.hci import (
LoopbackMode,
)
from bumble.host import Host
from bumble.transport import open_transport_or_link
import click
from bumble.transport import open_transport
import bumble.logging
class Loopback:
@@ -88,7 +89,7 @@ class Loopback:
async def run(self):
"""Run a loopback throughput test"""
print(color('>>> Connecting to HCI...', 'green'))
async with await open_transport_or_link(self.transport) as (
async with await open_transport(self.transport) as (
hci_source,
hci_sink,
):
@@ -194,8 +195,7 @@ class Loopback:
)
@click.argument('transport')
def main(packet_size, packet_count, transport):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
bumble.logging.setup_basic_logging()
loopback = Loopback(packet_size, packet_count, transport)
asyncio.run(loopback.run())
+4 -5
View File
@@ -15,14 +15,13 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import sys
import os
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -42,7 +41,7 @@ async def async_main():
transports = []
controllers = []
for index, transport_name in enumerate(sys.argv[1:]):
transport = await open_transport_or_link(transport_name)
transport = await open_transport(transport_name)
transports.append(transport)
controller = Controller(
f'C{index}',
@@ -62,7 +61,7 @@ async def async_main():
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
asyncio.run(async_main())
+4 -5
View File
@@ -16,8 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
from typing import Callable, Iterable, Optional
import click
@@ -32,7 +30,8 @@ from bumble.profiles.gap import GenericAccessServiceProxy
from bumble.profiles.pacs import PublishedAudioCapabilitiesServiceProxy
from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy
from bumble.profiles.vcs import VolumeControlServiceProxy
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -215,7 +214,7 @@ async def show_device_info(peer, done: Optional[asyncio.Future]) -> None:
# -----------------------------------------------------------------------------
async def async_main(device_config, encrypt, transport, address_or_name):
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport(transport) as (hci_source, hci_sink):
# Create a device
if device_config:
@@ -267,7 +266,7 @@ def main(device_config, encrypt, transport, address_or_name):
Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified,
wait for an incoming connection.
"""
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
asyncio.run(async_main(device_config, encrypt, transport, address_or_name))
+5 -5
View File
@@ -16,15 +16,15 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import click
import bumble.core
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.gatt import show_services
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -60,7 +60,7 @@ async def dump_gatt_db(peer, done):
# -----------------------------------------------------------------------------
async def async_main(device_config, encrypt, transport, address_or_name):
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport(transport) as (hci_source, hci_sink):
# Create a device
if device_config:
@@ -112,7 +112,7 @@ def main(device_config, encrypt, transport, address_or_name):
Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified,
wait for an incoming connection.
"""
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
asyncio.run(async_main(device_config, encrypt, transport, address_or_name))
+5 -5
View File
@@ -16,9 +16,8 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import struct
import logging
import click
from bumble import l2cap
@@ -27,8 +26,9 @@ from bumble.device import Device, Peer
from bumble.core import AdvertisingData
from bumble.gatt import Service, Characteristic, CharacteristicValue
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.hci import HCI_Constant
import bumble.logging
# -----------------------------------------------------------------------------
@@ -325,7 +325,7 @@ async def run(
receive_port,
):
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
async with await open_transport(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
# Instantiate a bridge object
@@ -383,6 +383,7 @@ def main(
receive_host,
receive_port,
):
bumble.logging.setup_basic_logging('WARNING')
asyncio.run(
run(
hci_transport,
@@ -397,6 +398,5 @@ def main(
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__':
main()
+5 -4
View File
@@ -17,11 +17,12 @@
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
import sys
from bumble import hci, transport
from bumble.bridge import HCI_Bridge
import bumble.logging
# -----------------------------------------------------------------------------
# Logging
@@ -46,14 +47,14 @@ async def async_main():
return
print('>>> connecting to HCI...')
async with await transport.open_transport_or_link(sys.argv[1]) as (
async with await transport.open_transport(sys.argv[1]) as (
hci_host_source,
hci_host_sink,
):
print('>>> connected')
print('>>> connecting to HCI...')
async with await transport.open_transport_or_link(sys.argv[2]) as (
async with await transport.open_transport(sys.argv[2]) as (
hci_controller_source,
hci_controller_sink,
):
@@ -100,7 +101,7 @@ async def async_main():
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
asyncio.run(async_main())
+5 -5
View File
@@ -16,16 +16,16 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import click
from bumble import l2cap
from bumble.colors import color
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.device import Device
from bumble.utils import FlowControlAsyncPipe
from bumble.hci import HCI_Constant
import bumble.logging
# -----------------------------------------------------------------------------
@@ -258,7 +258,7 @@ class ClientBridge:
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
async with await open_transport(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
@@ -356,6 +356,6 @@ def client(context, bluetooth_address, tcp_host, tcp_port):
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
if __name__ == '__main__':
bumble.logging.setup_basic_logging('WARNING')
cli(obj={}) # pylint: disable=no-value-for-parameter
+8 -3
View File
@@ -22,7 +22,6 @@ import datetime
import functools
from importlib import resources
import json
import os
import logging
import pathlib
import weakref
@@ -44,6 +43,7 @@ from bumble.device import Device, DeviceConfiguration, AdvertisingParameters, Ci
from bumble.transport import open_transport
from bumble.profiles import ascs, bap, pacs
from bumble.hci import Address, CodecID, CodingFormat, HCI_IsoDataPacket
import bumble.logging
# -----------------------------------------------------------------------------
@@ -337,7 +337,12 @@ class Speaker:
),
(
AdvertisingData.FLAGS,
bytes([AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG]),
bytes(
[
AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG
| AdvertisingData.BR_EDR_NOT_SUPPORTED_FLAG
]
),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
@@ -449,7 +454,7 @@ def speaker(ui_port: int, device_config: str, transport: str, lc3_file: str) ->
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
speaker()
View File
-289
View File
@@ -1,289 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ----------------------------------------------------------------------------
# Imports
# ----------------------------------------------------------------------------
import sys
import logging
import json
import asyncio
import argparse
import uuid
import os
from urllib.parse import urlparse
import websockets
from bumble.colors import color
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
# Constants
# ----------------------------------------------------------------------------
DEFAULT_RELAY_PORT = 10723
# ----------------------------------------------------------------------------
# Utils
# ----------------------------------------------------------------------------
def error_to_json(error):
return json.dumps({'error': error})
def error_to_result(error):
return f'result:{error_to_json(error)}'
async def broadcast_message(message, connections):
# Send to all the connections
tasks = [connection.send_message(message) for connection in connections]
if tasks:
await asyncio.gather(*tasks)
# ----------------------------------------------------------------------------
# Connection class
# ----------------------------------------------------------------------------
class Connection:
"""
A Connection represents a client connected to the relay over a websocket
"""
def __init__(self, room, websocket):
self.room = room
self.websocket = websocket
self.address = str(uuid.uuid4())
async def send_message(self, message):
try:
logger.debug(color(f'->{self.address}: {message}', 'yellow'))
return await self.websocket.send(message)
except websockets.exceptions.WebSocketException as error:
logger.info(f'! client "{self}" disconnected: {error}')
await self.cleanup()
async def send_error(self, error):
return await self.send_message(f'result:{error_to_json(error)}')
async def receive_message(self):
try:
message = await self.websocket.recv()
logger.debug(color(f'<-{self.address}: {message}', 'blue'))
return message
except websockets.exceptions.WebSocketException as error:
logger.info(color(f'! client "{self}" disconnected: {error}', 'red'))
await self.cleanup()
async def cleanup(self):
if self.room:
await self.room.remove_connection(self)
def set_address(self, address):
logger.info(f'Connection address changed: {self.address} -> {address}')
self.address = address
def __str__(self):
return (
f'Connection(address="{self.address}", '
f'client={self.websocket.remote_address[0]}:'
f'{self.websocket.remote_address[1]})'
)
# ----------------------------------------------------------------------------
# Room class
# ----------------------------------------------------------------------------
class Room:
"""
A Room is a collection of bridged connections
"""
def __init__(self, relay, name):
self.relay = relay
self.name = name
self.observers = []
self.connections = []
async def add_connection(self, connection):
logger.info(f'New participant in {self.name}: {connection}')
self.connections.append(connection)
await self.broadcast_message(connection, f'joined:{connection.address}')
async def remove_connection(self, connection):
if connection in self.connections:
self.connections.remove(connection)
await self.broadcast_message(connection, f'left:{connection.address}')
def find_connections_by_address(self, address):
return [c for c in self.connections if c.address == address]
async def bridge_connection(self, connection):
while True:
# Wait for a message
message = await connection.receive_message()
# Skip empty messages
if message is None:
return
# Parse the message to decide how to handle it
if message.startswith('@'):
# This is a targeted message
await self.on_targeted_message(connection, message)
elif message.startswith('/'):
# This is an RPC request
await self.on_rpc_request(connection, message)
else:
await connection.send_message(
f'result:{error_to_json("error: invalid message")}'
)
async def broadcast_message(self, sender, message):
'''
Send to all connections in the room except back to the sender
'''
await broadcast_message(message, [c for c in self.connections if c != sender])
async def on_rpc_request(self, connection, message):
command, *params = message.split(' ', 1)
if handler := getattr(
self, f'on_{command[1:].lower().replace("-","_")}_command', None
):
try:
result = await handler(connection, params)
except Exception as error:
result = error_to_result(error)
else:
result = error_to_result('unknown command')
await connection.send_message(result or 'result:{}')
async def on_targeted_message(self, connection, message):
target, *payload = message.split(' ', 1)
if not payload:
return error_to_json('missing arguments')
payload = payload[0]
target = target[1:]
# Determine what targets to send to
if target == '*':
# Send to all connections in the room except the connection from which the
# message was received
connections = [c for c in self.connections if c != connection]
else:
connections = self.find_connections_by_address(target)
if not connections:
# Unicast with no recipient, let the sender know
await connection.send_message(f'unreachable:{target}')
# Send to targets
await broadcast_message(f'message:{connection.address}/{payload}', connections)
async def on_set_address_command(self, connection, params):
if not params:
return error_to_result('missing address')
current_address = connection.address
new_address = params[0]
connection.set_address(new_address)
await self.broadcast_message(
connection, f'address-changed:from={current_address},to={new_address}'
)
# ----------------------------------------------------------------------------
class Relay:
"""
A relay accepts connections with the following url: ws://<hostname>/<room>.
Participants in a room can communicate with each other
"""
def __init__(self, port):
self.port = port
self.rooms = {}
self.observers = []
def start(self):
logger.info(f'Starting Relay on port {self.port}')
# pylint: disable-next=no-member
return websockets.serve(self.serve, '0.0.0.0', self.port, ping_interval=None)
async def serve_as_controller(self, connection):
pass
async def serve(self, websocket, path):
logger.debug(f'New connection with path {path}')
# Parse the path
parsed = urlparse(path)
# Check if this is a controller client
if parsed.path == '/':
return await self.serve_as_controller(Connection('', websocket))
# Find or create a room for this connection
room_name = parsed.path[1:].split('/')[0]
if room_name not in self.rooms:
self.rooms[room_name] = Room(self, room_name)
room = self.rooms[room_name]
# Add the connection to the room
connection = Connection(room, websocket)
await room.add_connection(connection)
# Bridge until the connection is closed
await room.bridge_connection(connection)
# ----------------------------------------------------------------------------
def main():
# Check the Python version
if sys.version_info < (3, 6, 1):
print('ERROR: Python 3.6.1 or higher is required')
sys.exit(1)
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
# Parse arguments
arg_parser = argparse.ArgumentParser(description='Bumble Link Relay')
arg_parser.add_argument('--log-level', default='INFO', help='logger level')
arg_parser.add_argument('--log-config', help='logger config file (YAML)')
arg_parser.add_argument(
'--port', type=int, default=DEFAULT_RELAY_PORT, help='Port to listen on'
)
args = arg_parser.parse_args()
# Setup logger
if args.log_config:
from logging import config # pylint: disable=import-outside-toplevel
config.fileConfig(args.log_config)
else:
logging.basicConfig(level=getattr(logging, args.log_level.upper()))
# Start a relay
relay = Relay(args.port)
asyncio.get_event_loop().run_until_complete(relay.start())
asyncio.get_event_loop().run_forever()
# ----------------------------------------------------------------------------
if __name__ == '__main__':
main()
-21
View File
@@ -1,21 +0,0 @@
[loggers]
keys=root
[handlers]
keys=stream_handler
[formatters]
keys=formatter
[logger_root]
level=DEBUG
handlers=stream_handler
[handler_stream_handler]
class=StreamHandler
level=DEBUG
formatter=formatter
args=(sys.stderr,)
[formatter_formatter]
format=%(asctime)s %(name)-12s %(levelname)-8s %(message)s
+2 -2
View File
@@ -26,7 +26,7 @@ from prompt_toolkit.shortcuts import PromptSession
from bumble.a2dp import make_audio_sink_service_sdp_records
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.pairing import OobData, PairingDelegate, PairingConfig
from bumble.smp import OobContext, OobLegacyContext
from bumble.smp import error_name as smp_error_name
@@ -349,7 +349,7 @@ async def pair(
Waiter.instance = Waiter(linger=linger)
print('<<< connecting to HCI...')
async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
async with await open_transport(hci_transport) as (hci_source, hci_sink):
print('<<< connected')
# Create a device to manage the host
+2 -3
View File
@@ -17,8 +17,6 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import asyncio.subprocess
import os
import logging
from typing import Optional, Union
@@ -63,6 +61,7 @@ from bumble.hci import Address, HCI_CONNECTION_ALREADY_EXISTS_ERROR, HCI_Constan
from bumble.pairing import PairingConfig
from bumble.transport import open_transport
from bumble.utils import AsyncRunner
import bumble.logging
# -----------------------------------------------------------------------------
@@ -599,7 +598,7 @@ def play(context, address, audio_format, audio_file):
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(level=os.environ.get("BUMBLE_LOGLEVEL", "WARNING").upper())
bumble.logging.setup_basic_logging("WARNING")
player_cli()
+3 -4
View File
@@ -16,8 +16,6 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
@@ -30,6 +28,7 @@ from bumble import hci
from bumble import rfcomm
from bumble import transport
from bumble import utils
import bumble.logging
# -----------------------------------------------------------------------------
@@ -406,7 +405,7 @@ class ClientBridge:
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print("<<< connecting to HCI...")
async with await transport.open_transport_or_link(hci_transport) as (
async with await transport.open_transport(hci_transport) as (
hci_source,
hci_sink,
):
@@ -515,6 +514,6 @@ def client(context, bluetooth_address, tcp_host, tcp_port, authenticate, encrypt
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get("BUMBLE_LOGLEVEL", "WARNING").upper())
if __name__ == "__main__":
bumble.logging.setup_basic_logging("WARNING")
cli(obj={}) # pylint: disable=no-value-for-parameter
+4 -5
View File
@@ -16,17 +16,16 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import click
from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.keys import JsonKeyStore
from bumble.smp import AddressResolver
from bumble.device import Advertisement
from bumble.hci import Address, HCI_Constant, HCI_LE_1M_PHY, HCI_LE_CODED_PHY
import bumble.logging
# -----------------------------------------------------------------------------
@@ -127,7 +126,7 @@ async def scan(
transport,
):
print('<<< connecting to HCI...')
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
async with await open_transport(transport) as (hci_source, hci_sink):
print('<<< connected')
if device_config:
@@ -237,7 +236,7 @@ def main(
device_config,
transport,
):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
bumble.logging.setup_basic_logging('WARNING')
asyncio.run(
scan(
min_rssi,
+2 -2
View File
@@ -18,7 +18,6 @@
import datetime
import importlib
import logging
import os
import struct
import click
@@ -27,6 +26,7 @@ from bumble.colors import color
from bumble import hci
from bumble.transport.common import PacketReader
from bumble.helpers import PacketTracer
import bumble.logging
# -----------------------------------------------------------------------------
@@ -188,5 +188,5 @@ def main(format, vendor, filename):
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
bumble.logging.setup_basic_logging('WARNING')
main() # pylint: disable=no-value-for-parameter
+2 -6
View File
@@ -21,7 +21,6 @@ import asyncio.subprocess
from importlib import resources
import enum
import json
import os
import logging
import pathlib
import subprocess
@@ -58,6 +57,7 @@ from bumble.a2dp import (
from bumble.utils import AsyncRunner
from bumble.codecs import AacAudioRtpPacket
from bumble.rtp import MediaPacket
import bumble.logging
# -----------------------------------------------------------------------------
@@ -833,11 +833,7 @@ def speaker(
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(
level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper(),
format="[%(asctime)s.%(msecs)03d] %(levelname)s:%(name)s:%(message)s",
datefmt="%H:%M:%S",
)
bumble.logging.setup_basic_logging('WARNING')
speaker()
+2 -3
View File
@@ -16,13 +16,12 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
import click
from bumble.device import Device
from bumble.keys import JsonKeyStore
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -68,7 +67,7 @@ def main(keystore_file, hci_transport, device_config, address):
instantiated.
If no address is passed, the existing pairing keys for all addresses are printed.
"""
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging()
if not keystore_file and not hci_transport:
print('either --keystore-file or --hci-transport must be specified.')
+2 -3
View File
@@ -26,13 +26,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import os
import logging
import click
import usb1
from bumble.colors import color
from bumble.transport.usb import load_libusb
import bumble.logging
# -----------------------------------------------------------------------------
@@ -169,7 +168,7 @@ def is_bluetooth_hci(device):
@click.command()
@click.option('--verbose', is_flag=True, default=False, help='Print more details')
def main(verbose):
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
bumble.logging.setup_basic_logging('WARNING')
load_libusb()
with usb1.USBContext() as context:
+72 -11
View File
@@ -27,7 +27,7 @@ from bumble.colors import color
from bumble.core import (
PhysicalTransport,
)
from bumble import hci
from bumble.hci import (
HCI_ACL_DATA_PACKET,
HCI_COMMAND_DISALLOWED_ERROR,
@@ -394,7 +394,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=PhysicalTransport.LE,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
self.peripheral_connections[peer_address] = connection
logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}')
@@ -454,7 +454,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=PhysicalTransport.LE,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
self.central_connections[peer_address] = connection
logger.debug(
@@ -618,8 +618,8 @@ class Controller:
cis_sync_delay=0,
transport_latency_c_to_p=0,
transport_latency_p_to_c=0,
phy_c_to_p=0,
phy_p_to_c=0,
phy_c_to_p=1,
phy_p_to_c=1,
nse=0,
bn_c_to_p=0,
bn_p_to_c=0,
@@ -695,7 +695,7 @@ class Controller:
peer_address=peer_address,
link=self.link,
transport=PhysicalTransport.BR_EDR,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
self.classic_connections[peer_address] = connection
logger.debug(
@@ -709,7 +709,7 @@ class Controller:
connection_handle=connection_handle,
bd_addr=peer_address,
encryption_enabled=False,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
)
else:
@@ -720,7 +720,7 @@ class Controller:
connection_handle=0,
bd_addr=peer_address,
encryption_enabled=False,
link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE,
link_type=HCI_Connection_Complete_Event.LinkType.ACL,
)
)
@@ -945,7 +945,7 @@ class Controller:
)
)
self.link.classic_sco_connect(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
)
def on_hci_enhanced_accept_synchronous_connection_request_command(self, command):
@@ -974,10 +974,71 @@ class Controller:
)
)
self.link.classic_accept_sco_connection(
self, connection.peer_address, HCI_Connection_Complete_Event.ESCO_LINK_TYPE
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
)
def on_hci_switch_role_command(self, command):
def on_hci_sniff_mode_command(self, command: hci.HCI_Sniff_Mode_Command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.2 Sniff Mode command
'''
if self.link is None:
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.send_hci_packet(
hci.HCI_Mode_Change_Event(
status=HCI_SUCCESS,
connection_handle=command.connection_handle,
current_mode=hci.HCI_Mode_Change_Event.Mode.SNIFF,
interval=2,
)
)
def on_hci_exit_sniff_mode_command(self, command: hci.HCI_Exit_Sniff_Mode_Command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.3 Exit Sniff Mode command
'''
if self.link is None:
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
return
self.send_hci_packet(
hci.HCI_Command_Status_Event(
status=HCI_SUCCESS,
num_hci_command_packets=1,
command_opcode=command.op_code,
)
)
self.send_hci_packet(
hci.HCI_Mode_Change_Event(
status=HCI_SUCCESS,
connection_handle=command.connection_handle,
current_mode=hci.HCI_Mode_Change_Event.Mode.ACTIVE,
interval=2,
)
)
def on_hci_switch_role_command(self, command: hci.HCI_Switch_Role_Command):
'''
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
'''
+274 -113
View File
@@ -139,6 +139,9 @@ DEVICE_DEFAULT_ADVERTISING_TX_POWER = (
DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_SKIP = 0
DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT = 5.0
DEVICE_DEFAULT_LE_RPA_TIMEOUT = 15 * 60 # 15 minutes (in seconds)
DEVICE_DEFAULT_ISO_CIS_MAX_SDU = 251
DEVICE_DEFAULT_ISO_CIS_RTN = 10
DEVICE_DEFAULT_ISO_CIS_MAX_TRANSPORT_LATENCY = 100
# fmt: on
# pylint: enable=line-too-long
@@ -489,7 +492,18 @@ class PeriodicAdvertisement:
# -----------------------------------------------------------------------------
@dataclass
class BIGInfoAdvertisement:
class BigInfoAdvertisement:
class Framing(utils.OpenIntEnum):
# fmt: off
UNFRAMED = 0X00
FRAMED_SEGMENTABLE_MODE = 0X01
FRAMED_UNSEGMENTED_MODE = 0X02
class Encryption(utils.OpenIntEnum):
# fmt: off
UNENCRYPTED = 0x00
ENCRYPTED = 0x01
address: hci.Address
sid: int
num_bis: int
@@ -502,8 +516,8 @@ class BIGInfoAdvertisement:
sdu_interval: int
max_sdu: int
phy: hci.Phy
framed: bool
encrypted: bool
framing: Framing
encryption: Encryption
@classmethod
def from_report(cls, address: hci.Address, sid: int, report) -> Self:
@@ -520,8 +534,8 @@ class BIGInfoAdvertisement:
report.sdu_interval,
report.max_sdu,
hci.Phy(report.phy),
report.framing != 0,
report.encryption != 0,
cls.Framing(report.framing),
cls.Encryption(report.encryption),
)
@@ -1013,7 +1027,7 @@ class PeriodicAdvertisingSync(utils.EventEmitter):
def on_biginfo_advertising_report(self, report) -> None:
self.emit(
self.EVENT_BIGINFO_ADVERTISEMENT,
BIGInfoAdvertisement.from_report(self.advertiser_address, self.sid, report),
BigInfoAdvertisement.from_report(self.advertiser_address, self.sid, report),
)
def __str__(self) -> str:
@@ -1031,14 +1045,24 @@ class PeriodicAdvertisingSync(utils.EventEmitter):
# -----------------------------------------------------------------------------
@dataclass
class BigParameters:
class Packing(utils.OpenIntEnum):
# fmt: off
SEQUENTIAL = 0x00
INTERLEAVED = 0x01
class Framing(utils.OpenIntEnum):
# fmt: off
UNFRAMED = 0x00
FRAMED = 0x01
num_bis: int
sdu_interval: int
sdu_interval: int # SDU interval, in microseconds
max_sdu: int
max_transport_latency: int
max_transport_latency: int # Max transport latency, in milliseconds
rtn: int
phy: hci.PhyBit = hci.PhyBit.LE_2M
packing: int = 0
framing: int = 0
packing: Packing = Packing.SEQUENTIAL
framing: Framing = Framing.UNFRAMED
broadcast_code: bytes | None = None
@@ -1061,15 +1085,15 @@ class Big(utils.EventEmitter):
state: State = State.PENDING
# Attributes provided by BIG Create Complete event
big_sync_delay: int = 0
transport_latency_big: int = 0
phy: int = 0
big_sync_delay: int = 0 # Sync delay, in microseconds
transport_latency_big: int = 0 # Transport latency, in microseconds
phy: hci.Phy = hci.Phy.LE_1M
nse: int = 0
bn: int = 0
pto: int = 0
irc: int = 0
max_pdu: int = 0
iso_interval: float = 0.0
iso_interval: float = 0.0 # ISO interval, in milliseconds
bis_links: Sequence[BisLink] = ()
def __post_init__(self) -> None:
@@ -1499,10 +1523,74 @@ class _IsoLink:
"""Write an ISO SDU."""
self.device.host.send_iso_sdu(connection_handle=self.handle, sdu=sdu)
async def get_tx_time_stamp(self) -> tuple[int, int, int]:
response = await self.device.host.send_command(
hci.HCI_LE_Read_ISO_TX_Sync_Command(connection_handle=self.handle),
check_result=True,
)
return (
response.return_parameters.packet_sequence_number,
response.return_parameters.tx_time_stamp,
response.return_parameters.time_offset,
)
@property
def data_packet_queue(self) -> DataPacketQueue | None:
return self.device.host.get_data_packet_queue(self.handle)
async def drain(self) -> None:
if data_packet_queue := self.data_packet_queue:
await data_packet_queue.drain(self.handle)
# -----------------------------------------------------------------------------
@dataclass
class CigParameters:
class WorstCaseSca(utils.OpenIntEnum):
# fmt: off
SCA_251_TO_500_PPM = 0x00
SCA_151_TO_250_PPM = 0x01
SCA_101_TO_150_PPM = 0x02
SCA_76_TO_100_PPM = 0x03
SCA_51_TO_75_PPM = 0x04
SCA_31_TO_50_PPM = 0x05
SCA_21_TO_30_PPM = 0x06
SCA_0_TO_20_PPM = 0x07
class Packing(utils.OpenIntEnum):
# fmt: off
SEQUENTIAL = 0x00
INTERLEAVED = 0x01
class Framing(utils.OpenIntEnum):
# fmt: off
UNFRAMED = 0x00
FRAMED = 0x01
@dataclass
class CisParameters:
cis_id: int
max_sdu_c_to_p: int = DEVICE_DEFAULT_ISO_CIS_MAX_SDU
max_sdu_p_to_c: int = DEVICE_DEFAULT_ISO_CIS_MAX_SDU
phy_c_to_p: hci.PhyBit = hci.PhyBit.LE_2M
phy_p_to_c: hci.PhyBit = hci.PhyBit.LE_2M
rtn_c_to_p: int = DEVICE_DEFAULT_ISO_CIS_RTN # Number of C->P retransmissions
rtn_p_to_c: int = DEVICE_DEFAULT_ISO_CIS_RTN # Number of P->C retransmissions
cig_id: int
cis_parameters: list[CisParameters]
sdu_interval_c_to_p: int # C->P SDU interval, in microseconds
sdu_interval_p_to_c: int # P->C SDU interval, in microseconds
worst_case_sca: WorstCaseSca = WorstCaseSca.SCA_251_TO_500_PPM
packing: Packing = Packing.SEQUENTIAL
framing: Framing = Framing.UNFRAMED
max_transport_latency_c_to_p: int = (
DEVICE_DEFAULT_ISO_CIS_MAX_TRANSPORT_LATENCY # Max C->P transport latency, in milliseconds
)
max_transport_latency_p_to_c: int = (
DEVICE_DEFAULT_ISO_CIS_MAX_TRANSPORT_LATENCY # Max C->P transport latency, in milliseconds
)
# -----------------------------------------------------------------------------
@dataclass
@@ -1516,6 +1604,20 @@ class CisLink(utils.EventEmitter, _IsoLink):
handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
cig_sync_delay: int = 0 # CIG sync delay, in microseconds
cis_sync_delay: int = 0 # CIS sync delay, in microseconds
transport_latency_c_to_p: int = 0 # C->P transport latency, in microseconds
transport_latency_p_to_c: int = 0 # P->C transport latency, in microseconds
phy_c_to_p: Optional[hci.Phy] = None
phy_p_to_c: Optional[hci.Phy] = None
nse: int = 0
bn_c_to_p: int = 0
bn_p_to_c: int = 0
ft_c_to_p: int = 0
ft_p_to_c: int = 0
max_pdu_c_to_p: int = 0
max_pdu_p_to_c: int = 0
iso_interval: float = 0.0 # ISO interval, in milliseconds
state: State = State.PENDING
sink: Callable[[hci.HCI_IsoDataPacket], Any] | None = None
@@ -1598,6 +1700,7 @@ class Connection(utils.CompositeEventEmitter):
peer_resolvable_address: Optional[hci.Address]
peer_le_features: Optional[hci.LeFeatureMask]
role: hci.Role
parameters: Parameters
encryption: int
encryption_key_size: int
authenticated: bool
@@ -1607,6 +1710,8 @@ class Connection(utils.CompositeEventEmitter):
pairing_peer_authentication_requirements: Optional[int]
cs_configs: dict[int, ChannelSoundingConfig] # Config ID to Configuration
cs_procedures: dict[int, ChannelSoundingProcedure] # Config ID to Procedures
classic_mode: int = hci.HCI_Mode_Change_Event.Mode.ACTIVE
classic_interval: int = 0
EVENT_CONNECTION_ATT_MTU_UPDATE = "connection_att_mtu_update"
EVENT_DISCONNECTION = "disconnection"
@@ -1633,6 +1738,8 @@ class Connection(utils.CompositeEventEmitter):
EVENT_CHANNEL_SOUNDING_CONFIG_REMOVED = "channel_sounding_config_removed"
EVENT_CHANNEL_SOUNDING_PROCEDURE_FAILURE = "channel_sounding_procedure_failure"
EVENT_CHANNEL_SOUNDING_PROCEDURE = "channel_sounding_procedure"
EVENT_MODE_CHANGE = "mode_change"
EVENT_MODE_CHANGE_FAILURE = "mode_change_failure"
EVENT_ROLE_CHANGE = "role_change"
EVENT_ROLE_CHANGE_FAILURE = "role_change_failure"
EVENT_CLASSIC_PAIRING = "classic_pairing"
@@ -1642,6 +1749,9 @@ class Connection(utils.CompositeEventEmitter):
EVENT_PAIRING_FAILURE = "pairing_failure"
EVENT_SECURITY_REQUEST = "security_request"
EVENT_LINK_KEY = "link_key"
EVENT_CIS_REQUEST = "cis_request"
EVENT_CIS_ESTABLISHMENT = "cis_establishment"
EVENT_CIS_ESTABLISHMENT_FAILURE = "cis_establishment_failure"
@utils.composite_listener
class Listener:
@@ -4569,48 +4679,39 @@ class Device(utils.CompositeEventEmitter):
@utils.experimental('Only for testing.')
async def setup_cig(
self,
cig_id: int,
cis_id: Sequence[int],
sdu_interval: tuple[int, int],
framing: int,
max_sdu: tuple[int, int],
retransmission_number: int,
max_transport_latency: tuple[int, int],
parameters: CigParameters,
) -> list[int]:
"""Sends hci.HCI_LE_Set_CIG_Parameters_Command.
Args:
cig_id: CIG_ID.
cis_id: CID ID list.
sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
framing: Un-framing(0) or Framing(1).
max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
retransmission_number: retransmission_number.
max_transport_latency: Max transport latencies of
(Central->Peripheral, Peripheral->Cental).
parameters: CIG parameters.
Returns:
List of created CIS handles corresponding to the same order of [cid_id].
"""
num_cis = len(cis_id)
num_cis = len(parameters.cis_parameters)
response = await self.send_command(
hci.HCI_LE_Set_CIG_Parameters_Command(
cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1],
worst_case_sca=0x00, # 251-500 ppm
packing=0x00, # Sequential
framing=framing,
max_transport_latency_c_to_p=max_transport_latency[0],
max_transport_latency_p_to_c=max_transport_latency[1],
cis_id=cis_id,
max_sdu_c_to_p=[max_sdu[0]] * num_cis,
max_sdu_p_to_c=[max_sdu[1]] * num_cis,
phy_c_to_p=[hci.HCI_LE_2M_PHY] * num_cis,
phy_p_to_c=[hci.HCI_LE_2M_PHY] * num_cis,
rtn_c_to_p=[retransmission_number] * num_cis,
rtn_p_to_c=[retransmission_number] * num_cis,
cig_id=parameters.cig_id,
sdu_interval_c_to_p=parameters.sdu_interval_c_to_p,
sdu_interval_p_to_c=parameters.sdu_interval_p_to_c,
worst_case_sca=parameters.worst_case_sca,
packing=int(parameters.packing),
framing=int(parameters.framing),
max_transport_latency_c_to_p=parameters.max_transport_latency_c_to_p,
max_transport_latency_p_to_c=parameters.max_transport_latency_p_to_c,
cis_id=[cis.cis_id for cis in parameters.cis_parameters],
max_sdu_c_to_p=[
cis.max_sdu_c_to_p for cis in parameters.cis_parameters
],
max_sdu_p_to_c=[
cis.max_sdu_p_to_c for cis in parameters.cis_parameters
],
phy_c_to_p=[cis.phy_c_to_p for cis in parameters.cis_parameters],
phy_p_to_c=[cis.phy_p_to_c for cis in parameters.cis_parameters],
rtn_c_to_p=[cis.rtn_c_to_p for cis in parameters.cis_parameters],
rtn_p_to_c=[cis.rtn_p_to_c for cis in parameters.cis_parameters],
),
check_result=True,
)
@@ -4618,19 +4719,17 @@ class Device(utils.CompositeEventEmitter):
# Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
# Server, so here it only provides a basic functionality for testing.
cis_handles = response.return_parameters.connection_handle[:]
for id, cis_handle in zip(cis_id, cis_handles):
self._pending_cis[cis_handle] = (id, cig_id)
for cis, cis_handle in zip(parameters.cis_parameters, cis_handles):
self._pending_cis[cis_handle] = (cis.cis_id, parameters.cig_id)
return cis_handles
# [LE only]
@utils.experimental('Only for testing.')
async def create_cis(
self, cis_acl_pairs: Sequence[tuple[int, int]]
self, cis_acl_pairs: Sequence[tuple[int, Connection]]
) -> list[CisLink]:
for cis_handle, acl_handle in cis_acl_pairs:
acl_connection = self.lookup_connection(acl_handle)
assert acl_connection
for cis_handle, acl_connection in cis_acl_pairs:
cis_id, cig_id = self._pending_cis.pop(cis_handle)
self.cis_links[cis_handle] = CisLink(
device=self,
@@ -4650,8 +4749,8 @@ class Device(utils.CompositeEventEmitter):
if pending_future := pending_cis_establishments.get(cis_link.handle):
pending_future.set_result(cis_link)
def on_cis_establishment_failure(cis_handle: int, status: int) -> None:
if pending_future := pending_cis_establishments.get(cis_handle):
def on_cis_establishment_failure(cis_link: CisLink, status: int) -> None:
if pending_future := pending_cis_establishments.get(cis_link.handle):
pending_future.set_exception(hci.HCI_Error(status))
watcher.on(self, self.EVENT_CIS_ESTABLISHMENT, on_cis_establishment)
@@ -4661,7 +4760,7 @@ class Device(utils.CompositeEventEmitter):
await self.send_command(
hci.HCI_LE_Create_CIS_Command(
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
acl_connection_handle=[p[1].handle for p in cis_acl_pairs],
),
check_result=True,
)
@@ -4670,26 +4769,21 @@ class Device(utils.CompositeEventEmitter):
# [LE only]
@utils.experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
async def accept_cis_request(self, cis_link: CisLink) -> None:
"""[LE Only] Accepts an incoming CIS request.
When the specified CIS handle is already created, this method returns the
existed CIS link object immediately.
This method returns when the CIS is established, or raises an exception if
the CIS establishment fails.
Args:
handle: CIS handle to accept.
Returns:
CIS link object on the given handle.
"""
if not (cis_link := self.cis_links.get(handle)):
raise InvalidStateError(f'No pending CIS request of handle {handle}')
# There might be multiple ASE sharing a CIS channel.
# If one of them has accepted the request, the others should just leverage it.
async with self._cis_lock:
if cis_link.state == CisLink.State.ESTABLISHED:
return cis_link
return
with closing(utils.EventWatcher()) as watcher:
pending_establishment = asyncio.get_running_loop().create_future()
@@ -4708,26 +4802,24 @@ class Device(utils.CompositeEventEmitter):
)
await self.send_command(
hci.HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
hci.HCI_LE_Accept_CIS_Request_Command(
connection_handle=cis_link.handle
),
check_result=True,
)
await pending_establishment
return cis_link
# Mypy believes this is reachable when context is an ExitStack.
raise UnreachableError()
# [LE only]
@utils.experimental('Only for testing.')
async def reject_cis_request(
self,
handle: int,
cis_link: CisLink,
reason: int = hci.HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
await self.send_command(
hci.HCI_LE_Reject_CIS_Request_Command(
connection_handle=handle, reason=reason
connection_handle=cis_link.handle, reason=reason
),
check_result=True,
)
@@ -5084,8 +5176,8 @@ class Device(utils.CompositeEventEmitter):
# Store the keys in the key store
if self.keystore:
authenticated = key_type in (
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256,
)
pairing_keys = PairingKeys(
link_key=PairingKeys.Key(value=link_key, authenticated=authenticated),
@@ -5265,7 +5357,7 @@ class Device(utils.CompositeEventEmitter):
big.bis_links = [BisLink(handle=handle, big=big) for handle in bis_handles]
big.big_sync_delay = big_sync_delay
big.transport_latency_big = transport_latency_big
big.phy = phy
big.phy = hci.Phy(phy)
big.nse = nse
big.bn = bn
big.pto = pto
@@ -5532,8 +5624,8 @@ class Device(utils.CompositeEventEmitter):
# Handle SCO request.
if link_type in (
hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE,
hci.HCI_Connection_Complete_Event.ESCO_LINK_TYPE,
hci.HCI_Connection_Complete_Event.LinkType.SCO,
hci.HCI_Connection_Complete_Event.LinkType.ESCO,
):
if connection := self.find_connection_by_bd_addr(
bd_addr, transport=PhysicalTransport.BR_EDR
@@ -5641,7 +5733,7 @@ class Device(utils.CompositeEventEmitter):
# [Classic only]
@host_event_handler
@with_connection_from_address
def on_authentication_io_capability_request(self, connection):
def on_authentication_io_capability_request(self, connection: Connection):
# Ask what the pairing config should be for this connection
pairing_config = self.pairing_config_factory(connection)
@@ -5649,13 +5741,13 @@ class Device(utils.CompositeEventEmitter):
authentication_requirements = (
# No Bonding
(
hci.HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.AuthenticationRequirements.MITM_NOT_REQUIRED_NO_BONDING,
hci.AuthenticationRequirements.MITM_REQUIRED_NO_BONDING,
),
# General Bonding
(
hci.HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS,
hci.AuthenticationRequirements.MITM_NOT_REQUIRED_GENERAL_BONDING,
hci.AuthenticationRequirements.MITM_REQUIRED_GENERAL_BONDING,
),
)[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0]
@@ -5710,30 +5802,30 @@ class Device(utils.CompositeEventEmitter):
raise UnreachableError()
# See Bluetooth spec @ Vol 3, Part C 5.2.2.6
methods = {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
methods: dict[int, dict[int, Callable[[], Awaitable[bool]]]] = {
hci.IoCapability.DISPLAY_ONLY: {
hci.IoCapability.DISPLAY_ONLY: display_auto_confirm,
hci.IoCapability.DISPLAY_YES_NO: display_confirm,
hci.IoCapability.KEYBOARD_ONLY: na,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
hci.IoCapability.DISPLAY_YES_NO: {
hci.IoCapability.DISPLAY_ONLY: display_auto_confirm,
hci.IoCapability.DISPLAY_YES_NO: display_confirm,
hci.IoCapability.KEYBOARD_ONLY: na,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: na,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: na,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: na,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
hci.IoCapability.KEYBOARD_ONLY: {
hci.IoCapability.DISPLAY_ONLY: na,
hci.IoCapability.DISPLAY_YES_NO: na,
hci.IoCapability.KEYBOARD_ONLY: na,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
hci.HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm,
hci.HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm,
hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm,
hci.HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm,
hci.IoCapability.NO_INPUT_NO_OUTPUT: {
hci.IoCapability.DISPLAY_ONLY: confirm,
hci.IoCapability.DISPLAY_YES_NO: confirm,
hci.IoCapability.KEYBOARD_ONLY: auto_confirm,
hci.IoCapability.NO_INPUT_NO_OUTPUT: auto_confirm,
},
}
@@ -5789,6 +5881,19 @@ class Device(utils.CompositeEventEmitter):
utils.AsyncRunner.spawn(reply())
# [Classic only]
@host_event_handler
@with_connection_from_handle
def on_mode_change(
self, connection: Connection, status: int, current_mode: int, interval: int
):
if status == hci.HCI_SUCCESS:
connection.classic_mode = current_mode
connection.classic_interval = interval
connection.emit(connection.EVENT_MODE_CHANGE)
else:
connection.emit(connection.EVENT_MODE_CHANGE_FAILURE, status)
# [Classic only]
@host_event_handler
@with_connection_from_address
@@ -5799,7 +5904,7 @@ class Device(utils.CompositeEventEmitter):
io_capability = pairing_config.delegate.classic_io_capability
# Respond
if io_capability == hci.HCI_KEYBOARD_ONLY_IO_CAPABILITY:
if io_capability == hci.IoCapability.KEYBOARD_ONLY:
# Ask the user to enter a string
async def get_pin_code():
pin_code = await connection.cancel_on_disconnection(
@@ -5929,24 +6034,63 @@ class Device(utils.CompositeEventEmitter):
f'cis_id=[0x{cis_id:02X}] ***'
)
# LE_CIS_Established event doesn't provide info, so we must store them here.
self.cis_links[cis_handle] = CisLink(
cis_link = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cig_id=cig_id,
cis_id=cis_id,
)
self.emit(self.EVENT_CIS_REQUEST, acl_connection, cis_handle, cig_id, cis_id)
self.cis_links[cis_handle] = cis_link
acl_connection.emit(acl_connection.EVENT_CIS_REQUEST, cis_link)
self.emit(self.EVENT_CIS_REQUEST, cis_link)
# [LE only]
@host_event_handler
@utils.experimental('Only for testing')
def on_cis_establishment(self, cis_handle: int) -> None:
def on_cis_establishment(
self,
cis_handle: int,
cig_sync_delay: int,
cis_sync_delay: int,
transport_latency_c_to_p: int,
transport_latency_p_to_c: int,
phy_c_to_p: int,
phy_p_to_c: int,
nse: int,
bn_c_to_p: int,
bn_p_to_c: int,
ft_c_to_p: int,
ft_p_to_c: int,
max_pdu_c_to_p: int,
max_pdu_p_to_c: int,
iso_interval: int,
) -> None:
if cis_handle not in self.cis_links:
logger.warning("CIS link not found")
return
cis_link = self.cis_links[cis_handle]
cis_link.state = CisLink.State.ESTABLISHED
assert cis_link.acl_connection
# Update the CIS
cis_link.cig_sync_delay = cig_sync_delay
cis_link.cis_sync_delay = cis_sync_delay
cis_link.transport_latency_c_to_p = transport_latency_c_to_p
cis_link.transport_latency_p_to_c = transport_latency_p_to_c
cis_link.phy_c_to_p = hci.Phy(phy_c_to_p)
cis_link.phy_p_to_c = hci.Phy(phy_p_to_c)
cis_link.nse = nse
cis_link.bn_c_to_p = bn_c_to_p
cis_link.bn_p_to_c = bn_p_to_c
cis_link.ft_c_to_p = ft_c_to_p
cis_link.ft_p_to_c = ft_p_to_c
cis_link.max_pdu_c_to_p = max_pdu_c_to_p
cis_link.max_pdu_p_to_c = max_pdu_p_to_c
cis_link.iso_interval = iso_interval * 1.25
logger.debug(
f'*** CIS Establishment '
f'{cis_link.acl_connection.peer_address}, '
@@ -5956,16 +6100,27 @@ class Device(utils.CompositeEventEmitter):
)
cis_link.emit(cis_link.EVENT_ESTABLISHMENT)
cis_link.acl_connection.emit(
cis_link.acl_connection.EVENT_CIS_ESTABLISHMENT, cis_link
)
self.emit(self.EVENT_CIS_ESTABLISHMENT, cis_link)
# [LE only]
@host_event_handler
@utils.experimental('Only for testing')
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
if (cis_link := self.cis_links.pop(cis_handle, None)) is None:
logger.warning("CIS link not found")
return
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle):
cis_link.emit(cis_link.EVENT_ESTABLISHMENT_FAILURE, status)
self.emit(self.EVENT_CIS_ESTABLISHMENT_FAILURE, cis_handle, status)
cis_link.emit(cis_link.EVENT_ESTABLISHMENT_FAILURE, status)
cis_link.acl_connection.emit(
cis_link.acl_connection.EVENT_CIS_ESTABLISHMENT_FAILURE,
cis_link,
status,
)
self.emit(self.EVENT_CIS_ESTABLISHMENT_FAILURE, cis_link, status)
# [LE only]
@host_event_handler
@@ -5979,7 +6134,7 @@ class Device(utils.CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_encryption_change(
self, connection, encryption, encryption_key_size
self, connection: Connection, encryption: int, encryption_key_size: int
):
logger.debug(
f'*** Connection Encryption Change: [0x{connection.handle:04X}] '
@@ -5992,14 +6147,14 @@ class Device(utils.CompositeEventEmitter):
if (
not connection.authenticated
and connection.transport == PhysicalTransport.BR_EDR
and encryption == hci.HCI_Encryption_Change_Event.AES_CCM
and encryption == hci.HCI_Encryption_Change_Event.Enabled.AES_CCM
):
connection.authenticated = True
connection.sc = True
if (
not connection.authenticated
and connection.transport == PhysicalTransport.LE
and encryption == hci.HCI_Encryption_Change_Event.E0_OR_AES_CCM
and encryption == hci.HCI_Encryption_Change_Event.Enabled.E0_OR_AES_CCM
):
connection.authenticated = True
connection.sc = True
@@ -6026,13 +6181,19 @@ class Device(utils.CompositeEventEmitter):
@host_event_handler
@with_connection_from_handle
def on_connection_parameters_update(self, connection, connection_parameters):
def on_connection_parameters_update(
self, connection: Connection, connection_parameters: core.ConnectionParameters
):
logger.debug(
f'*** Connection Parameters Update: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, '
f'{connection_parameters}'
)
connection.parameters = connection_parameters
connection.parameters = Connection.Parameters(
connection_parameters.connection_interval * 1.25,
connection_parameters.peripheral_latency,
connection_parameters.supervision_timeout * 10.0,
)
connection.emit(connection.EVENT_CONNECTION_PARAMETERS_UPDATE)
@host_event_handler
+18 -20
View File
@@ -90,12 +90,10 @@ HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
hci.HCI_Command.register_commands(globals())
@hci.HCI_Command.command(
fields=[
("param0", 1),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Intel_Read_Version_Command(hci.HCI_Command):
param0: int = dataclasses.field(metadata=hci.metadata(1))
return_parameters_fields = [
("status", hci.STATUS_SPEC),
@@ -103,35 +101,35 @@ class HCI_Intel_Read_Version_Command(hci.HCI_Command):
]
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
data_type: int = dataclasses.field(metadata=hci.metadata(1))
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
("status", 1),
]
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Intel_Reset_Command(hci.HCI_Command):
reset_type: int = dataclasses.field(metadata=hci.metadata(1))
patch_enable: int = dataclasses.field(metadata=hci.metadata(1))
ddc_reload: int = dataclasses.field(metadata=hci.metadata(1))
boot_option: int = dataclasses.field(metadata=hci.metadata(1))
boot_address: int = dataclasses.field(metadata=hci.metadata(4))
return_parameters_fields = [
("data", "*"),
]
@hci.HCI_Command.command(
fields=[("data", "*")],
)
@hci.HCI_Command.command
@dataclasses.dataclass
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
data: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
("status", hci.STATUS_SPEC),
+27 -32
View File
@@ -20,7 +20,7 @@ Based on various online bits of information, including the Linux kernel.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from dataclasses import dataclass
from dataclasses import dataclass, field
import asyncio
import enum
import logging
@@ -33,14 +33,7 @@ import weakref
from bumble import core
from bumble.hci import (
hci_vendor_command_op_code,
STATUS_SPEC,
HCI_SUCCESS,
HCI_Command,
HCI_Reset_Command,
HCI_Read_Local_Version_Information_Command,
)
from bumble import hci
from bumble.drivers import common
# -----------------------------------------------------------------------------
@@ -182,26 +175,29 @@ RTK_USB_PRODUCTS = {
# -----------------------------------------------------------------------------
# HCI Commands
# -----------------------------------------------------------------------------
HCI_RTK_READ_ROM_VERSION_COMMAND = hci_vendor_command_op_code(0x6D)
HCI_RTK_DOWNLOAD_COMMAND = hci_vendor_command_op_code(0x20)
HCI_RTK_DROP_FIRMWARE_COMMAND = hci_vendor_command_op_code(0x66)
HCI_Command.register_commands(globals())
HCI_RTK_READ_ROM_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x6D)
HCI_RTK_DOWNLOAD_COMMAND = hci.hci_vendor_command_op_code(0x20)
HCI_RTK_DROP_FIRMWARE_COMMAND = hci.hci_vendor_command_op_code(0x66)
hci.HCI_Command.register_commands(globals())
@HCI_Command.command()
class HCI_RTK_Read_ROM_Version_Command(HCI_Command):
return_parameters_fields = [("status", STATUS_SPEC), ("version", 1)]
@hci.HCI_Command.command
@dataclass
class HCI_RTK_Read_ROM_Version_Command(hci.HCI_Command):
return_parameters_fields = [("status", hci.STATUS_SPEC), ("version", 1)]
@HCI_Command.command(
fields=[("index", 1), ("payload", RTK_FRAGMENT_LENGTH)],
)
class HCI_RTK_Download_Command(HCI_Command):
return_parameters_fields = [("status", STATUS_SPEC), ("index", 1)]
@hci.HCI_Command.command
@dataclass
class HCI_RTK_Download_Command(hci.HCI_Command):
index: int = field(metadata=hci.metadata(1))
payload: bytes = field(metadata=hci.metadata(RTK_FRAGMENT_LENGTH))
return_parameters_fields = [("status", hci.STATUS_SPEC), ("index", 1)]
@HCI_Command.command()
class HCI_RTK_Drop_Firmware_Command(HCI_Command):
@hci.HCI_Command.command
@dataclass
class HCI_RTK_Drop_Firmware_Command(hci.HCI_Command):
pass
@@ -497,17 +493,17 @@ class Driver(common.Driver):
async def driver_info_for_host(cls, host):
try:
await host.send_command(
HCI_Reset_Command(),
hci.HCI_Reset_Command(),
check_result=True,
response_timeout=cls.POST_RESET_DELAY,
)
host.ready = True # Needed to let the host know the controller is ready.
except asyncio.exceptions.TimeoutError:
logger.warning("timeout waiting for hci reset, retrying")
await host.send_command(HCI_Reset_Command(), check_result=True)
await host.send_command(hci.HCI_Reset_Command(), check_result=True)
host.ready = True
command = HCI_Read_Local_Version_Information_Command()
command = hci.HCI_Read_Local_Version_Information_Command()
response = await host.send_command(command, check_result=True)
if response.command_opcode != command.op_code:
logger.error("failed to probe local version information")
@@ -594,7 +590,7 @@ class Driver(common.Driver):
response = await self.host.send_command(
HCI_RTK_Read_ROM_Version_Command(), check_result=True
)
if response.return_parameters.status != HCI_SUCCESS:
if response.return_parameters.status != hci.HCI_SUCCESS:
logger.warning("can't get ROM version")
return
rom_version = response.return_parameters.version
@@ -632,9 +628,8 @@ class Driver(common.Driver):
fragment = payload[fragment_offset : fragment_offset + RTK_FRAGMENT_LENGTH]
logger.debug(f"downloading fragment {fragment_index}")
await self.host.send_command(
HCI_RTK_Download_Command(
index=download_index, payload=fragment, check_result=True
)
HCI_RTK_Download_Command(index=download_index, payload=fragment),
check_result=True,
)
logger.debug("download complete!")
@@ -643,7 +638,7 @@ class Driver(common.Driver):
response = await self.host.send_command(
HCI_RTK_Read_ROM_Version_Command(), check_result=True
)
if response.return_parameters.status != HCI_SUCCESS:
if response.return_parameters.status != hci.HCI_SUCCESS:
logger.warning("can't get ROM version")
else:
rom_version = response.return_parameters.version
@@ -666,7 +661,7 @@ class Driver(common.Driver):
async def init_controller(self):
await self.download_firmware()
await self.host.send_command(HCI_Reset_Command(), check_result=True)
await self.host.send_command(hci.HCI_Reset_Command(), check_result=True)
logger.info(f"loaded FW image {self.driver_info.fw_name}")
+2017 -2245
View File
File diff suppressed because it is too large Load Diff
+4 -5
View File
@@ -34,9 +34,8 @@ from bumble.att import ATT_CID, ATT_PDU
from bumble.smp import SMP_CID, SMP_Command
from bumble.core import name_or_number
from bumble.l2cap import (
CommandCode,
L2CAP_PDU,
L2CAP_CONNECTION_REQUEST,
L2CAP_CONNECTION_RESPONSE,
L2CAP_SIGNALING_CID,
L2CAP_LE_SIGNALING_CID,
L2CAP_Control_Frame,
@@ -106,14 +105,14 @@ class PacketTracer:
self.analyzer.emit(control_frame)
# Check if this signals a new channel
if control_frame.code == L2CAP_CONNECTION_REQUEST:
if control_frame.code == CommandCode.L2CAP_CONNECTION_REQUEST:
connection_request = cast(L2CAP_Connection_Request, control_frame)
self.psms[connection_request.source_cid] = connection_request.psm
elif control_frame.code == L2CAP_CONNECTION_RESPONSE:
elif control_frame.code == CommandCode.L2CAP_CONNECTION_RESPONSE:
connection_response = cast(L2CAP_Connection_Response, control_frame)
if (
connection_response.result
== L2CAP_Connection_Response.CONNECTION_SUCCESSFUL
== L2CAP_Connection_Response.Result.CONNECTION_SUCCESSFUL
):
if self.peer and (
psm := self.peer.psms.get(connection_response.source_cid)
+69 -19
View File
@@ -38,7 +38,6 @@ from bumble.snoop import Snooper
from bumble import drivers
from bumble import hci
from bumble.core import (
PhysicalTransport,
PhysicalTransport,
ConnectionPHY,
ConnectionParameters,
@@ -72,6 +71,11 @@ class DataPacketQueue(utils.EventEmitter):
max_packet_size: int
class PerConnectionState:
def __init__(self) -> None:
self.in_flight = 0
self.drained = asyncio.Event()
def __init__(
self,
max_packet_size: int,
@@ -82,9 +86,12 @@ class DataPacketQueue(utils.EventEmitter):
self.max_packet_size = max_packet_size
self.max_in_flight = max_in_flight
self._in_flight = 0 # Total number of packets in flight across all connections
self._in_flight_per_connection: dict[int, int] = collections.defaultdict(
int
) # Number of packets in flight per connection
self._connection_state: dict[int, DataPacketQueue.PerConnectionState] = (
collections.defaultdict(DataPacketQueue.PerConnectionState)
)
self._drained_per_connection: dict[int, asyncio.Event] = (
collections.defaultdict(asyncio.Event)
)
self._send = send
self._packets: collections.deque[tuple[hci.HCI_Packet, int]] = (
collections.deque()
@@ -136,36 +143,40 @@ class DataPacketQueue(utils.EventEmitter):
self._completed += flushed_count
self._packets = collections.deque(packets_to_keep)
if connection_handle in self._in_flight_per_connection:
in_flight = self._in_flight_per_connection[connection_handle]
if connection_state := self._connection_state.pop(connection_handle, None):
in_flight = connection_state.in_flight
self._completed += in_flight
self._in_flight -= in_flight
del self._in_flight_per_connection[connection_handle]
connection_state.drained.set()
def _check_queue(self) -> None:
while self._packets and self._in_flight < self.max_in_flight:
packet, connection_handle = self._packets.pop()
self._send(packet)
self._in_flight += 1
self._in_flight_per_connection[connection_handle] += 1
connection_state = self._connection_state[connection_handle]
connection_state.in_flight += 1
connection_state.drained.clear()
def on_packets_completed(self, packet_count: int, connection_handle: int) -> None:
"""Mark one or more packets associated with a connection as completed."""
if connection_handle not in self._in_flight_per_connection:
if connection_handle not in self._connection_state:
logger.warning(
f'received completion for unknown connection {connection_handle}'
)
return
in_flight_for_connection = self._in_flight_per_connection[connection_handle]
if packet_count <= in_flight_for_connection:
self._in_flight_per_connection[connection_handle] -= packet_count
connection_state = self._connection_state[connection_handle]
if packet_count <= connection_state.in_flight:
connection_state.in_flight -= packet_count
else:
logger.warning(
f'{packet_count} completed for {connection_handle} '
f'but only {in_flight_for_connection} in flight'
f'but only {connection_state.in_flight} in flight'
)
self._in_flight_per_connection[connection_handle] = 0
connection_state.in_flight = 0
if connection_state.in_flight == 0:
connection_state.drained.set()
if packet_count <= self._in_flight:
self._in_flight -= packet_count
@@ -180,6 +191,13 @@ class DataPacketQueue(utils.EventEmitter):
self._check_queue()
self.emit('flow')
async def drain(self, connection_handle: int) -> None:
"""Wait until there are no pending packets for a connection."""
if not (connection_state := self._connection_state.get(connection_handle)):
raise ValueError('no such connection')
await connection_state.drained.wait()
# -----------------------------------------------------------------------------
class Connection:
@@ -835,8 +853,8 @@ class Host(utils.EventEmitter):
def on_packet(self, packet: bytes) -> None:
try:
hci_packet = hci.HCI_Packet.from_bytes(packet)
except Exception as error:
logger.warning(f'!!! error parsing packet from bytes: {error}')
except Exception:
logger.exception('!!! error parsing packet from bytes')
return
if self.ready or (
@@ -1269,7 +1287,24 @@ class Host(utils.EventEmitter):
self.cis_links[event.connection_handle] = IsoLink(
handle=event.connection_handle, packet_queue=self.iso_packet_queue
)
self.emit('cis_establishment', event.connection_handle)
self.emit(
'cis_establishment',
event.connection_handle,
event.cig_sync_delay,
event.cis_sync_delay,
event.transport_latency_c_to_p,
event.transport_latency_p_to_c,
event.phy_c_to_p,
event.phy_p_to_c,
event.nse,
event.bn_c_to_p,
event.bn_p_to_c,
event.ft_c_to_p,
event.ft_p_to_c,
event.max_pdu_c_to_p,
event.max_pdu_p_to_c,
event.iso_interval,
)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
@@ -1357,6 +1392,15 @@ class Host(utils.EventEmitter):
def on_hci_synchronous_connection_changed_event(self, event):
pass
def on_hci_mode_change_event(self, event: hci.HCI_Mode_Change_Event):
self.emit(
'mode_change',
event.connection_handle,
event.status,
event.current_mode,
event.interval,
)
def on_hci_role_change_event(self, event):
if event.status == hci.HCI_SUCCESS:
logger.debug(
@@ -1372,6 +1416,10 @@ class Host(utils.EventEmitter):
self.emit('role_change_failure', event.bd_addr, event.status)
def on_hci_le_data_length_change_event(self, event):
if (connection := self.connections.get(event.connection_handle)) is None:
logger.warning('!!! DATA LENGTH CHANGE: unknown handle')
return
self.emit(
'connection_data_length_change',
event.connection_handle,
@@ -1392,7 +1440,7 @@ class Host(utils.EventEmitter):
event.status,
)
def on_hci_encryption_change_event(self, event):
def on_hci_encryption_change_event(self, event: hci.HCI_Encryption_Change_Event):
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
@@ -1406,7 +1454,9 @@ class Host(utils.EventEmitter):
'connection_encryption_failure', event.connection_handle, event.status
)
def on_hci_encryption_change_v2_event(self, event):
def on_hci_encryption_change_v2_event(
self, event: hci.HCI_Encryption_Change_V2_Event
):
# Notify the client
if event.status == hci.HCI_SUCCESS:
self.emit(
+297 -374
View File
File diff suppressed because it is too large Load Diff
+4 -268
View File
@@ -17,19 +17,13 @@
# -----------------------------------------------------------------------------
import logging
import asyncio
from functools import partial
from bumble.core import (
PhysicalTransport,
InvalidStateError,
)
from bumble.colors import color
from bumble import core
from bumble.hci import (
Address,
Role,
HCI_SUCCESS,
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
HCI_CONNECTION_TIMEOUT_ERROR,
HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
HCI_PAGE_TIMEOUT_ERROR,
HCI_Connection_Complete_Event,
@@ -115,10 +109,10 @@ class LocalLink:
def send_acl_data(self, sender_controller, destination_address, transport, data):
# Send the data to the first controller with a matching address
if transport == PhysicalTransport.LE:
if transport == core.PhysicalTransport.LE:
destination_controller = self.find_controller(destination_address)
source_address = sender_controller.random_address
elif transport == PhysicalTransport.BR_EDR:
elif transport == core.PhysicalTransport.BR_EDR:
destination_controller = self.find_classic_controller(destination_address)
source_address = sender_controller.public_address
else:
@@ -274,7 +268,7 @@ class LocalLink:
responder_controller.on_classic_connection_request(
initiator_controller.public_address,
HCI_Connection_Complete_Event.ACL_LINK_TYPE,
HCI_Connection_Complete_Event.LinkType.ACL,
)
def classic_accept_connection(
@@ -384,261 +378,3 @@ class LocalLink:
responder_controller.on_classic_sco_connection_complete(
initiator_controller.public_address, HCI_SUCCESS, link_type
)
# -----------------------------------------------------------------------------
class RemoteLink:
'''
A Link implementation that communicates with other virtual controllers via a
WebSocket relay
'''
def __init__(self, uri):
self.controller = None
self.uri = uri
self.execution_queue = asyncio.Queue()
self.websocket = asyncio.get_running_loop().create_future()
self.rpc_result = None
self.pending_connection = None
self.central_connections = set() # List of addresses that we have connected to
self.peripheral_connections = (
set()
) # List of addresses that have connected to us
# Connect and run asynchronously
asyncio.create_task(self.run_connection())
asyncio.create_task(self.run_executor_loop())
def add_controller(self, controller):
if self.controller:
raise InvalidStateError('controller already set')
self.controller = controller
def remove_controller(self, controller):
if self.controller != controller:
raise InvalidStateError('controller mismatch')
self.controller = None
def get_pending_connection(self):
return self.pending_connection
def get_pending_classic_connection(self):
return self.pending_classic_connection
async def wait_until_connected(self):
await self.websocket
def execute(self, async_function):
self.execution_queue.put_nowait(async_function())
async def run_executor_loop(self):
logger.debug('executor loop starting')
while True:
item = await self.execution_queue.get()
try:
await item
except Exception as error:
logger.warning(
f'{color("!!! Exception in async handler:", "red")} {error}'
)
async def run_connection(self):
import websockets # lazy import
# Connect to the relay
logger.debug(f'connecting to {self.uri}')
# pylint: disable-next=no-member
websocket = await websockets.connect(self.uri)
self.websocket.set_result(websocket)
logger.debug(f'connected to {self.uri}')
while True:
message = await websocket.recv()
logger.debug(f'received message: {message}')
keyword, *payload = message.split(':', 1)
handler_name = f'on_{keyword}_received'
handler = getattr(self, handler_name, None)
if handler:
await handler(payload[0] if payload else None)
def close(self):
if self.websocket.done():
logger.debug('closing websocket')
websocket = self.websocket.result()
asyncio.create_task(websocket.close())
async def on_result_received(self, result):
if self.rpc_result:
self.rpc_result.set_result(result)
async def on_left_received(self, address):
if address in self.central_connections:
self.controller.on_link_peripheral_disconnected(Address(address))
self.central_connections.remove(address)
if address in self.peripheral_connections:
self.controller.on_link_central_disconnected(
address, HCI_CONNECTION_TIMEOUT_ERROR
)
self.peripheral_connections.remove(address)
async def on_unreachable_received(self, target):
await self.on_left_received(target)
async def on_message_received(self, message):
sender, *payload = message.split('/', 1)
if payload:
keyword, *payload = payload[0].split(':', 1)
handler_name = f'on_{keyword}_message_received'
handler = getattr(self, handler_name, None)
if handler:
await handler(sender, payload[0] if payload else None)
async def on_advertisement_message_received(self, sender, advertisement):
try:
self.controller.on_link_advertising_data(
Address(sender), bytes.fromhex(advertisement)
)
except Exception:
logger.exception('exception')
async def on_acl_message_received(self, sender, acl_data):
try:
self.controller.on_link_acl_data(Address(sender), bytes.fromhex(acl_data))
except Exception:
logger.exception('exception')
async def on_connect_message_received(self, sender, _):
# Remember the connection
self.peripheral_connections.add(sender)
# Notify the controller
logger.debug(f'connection from central {sender}')
self.controller.on_link_central_connected(Address(sender))
# Accept the connection by responding to it
await self.send_targeted_message(sender, 'connected')
async def on_connected_message_received(self, sender, _):
if not self.pending_connection:
logger.warning('received a connection ack, but no connection is pending')
return
# Remember the connection
self.central_connections.add(sender)
# Notify the controller
logger.debug(f'connected to peripheral {self.pending_connection.peer_address}')
self.controller.on_link_peripheral_connection_complete(
self.pending_connection, HCI_SUCCESS
)
async def on_disconnect_message_received(self, sender, message):
# Notify the controller
params = parse_parameters(message)
reason = int(params.get('reason', str(HCI_CONNECTION_TIMEOUT_ERROR)))
self.controller.on_link_central_disconnected(Address(sender), reason)
# Forget the connection
if sender in self.peripheral_connections:
self.peripheral_connections.remove(sender)
async def on_encrypted_message_received(self, sender, _):
# TODO parse params to get real args
self.controller.on_link_encrypted(Address(sender), bytes(8), 0, bytes(16))
async def send_rpc_command(self, command):
# Ensure we have a connection
websocket = await self.websocket
# Create a future value to hold the eventual result
assert self.rpc_result is None
self.rpc_result = asyncio.get_running_loop().create_future()
# Send the command
await websocket.send(command)
# Wait for the result
rpc_result = await self.rpc_result
self.rpc_result = None
logger.debug(f'rpc_result: {rpc_result}')
# TODO: parse the result
async def send_targeted_message(self, target, message):
# Ensure we have a connection
websocket = await self.websocket
# Send the message
await websocket.send(f'@{target} {message}')
async def notify_address_changed(self):
await self.send_rpc_command(f'/set-address {self.controller.random_address}')
def on_address_changed(self, controller):
logger.info(f'address changed for {controller}: {controller.random_address}')
# Notify the relay of the change
self.execute(self.notify_address_changed)
async def send_advertising_data_to_relay(self, data):
await self.send_targeted_message('*', f'advertisement:{data.hex()}')
def send_advertising_data(self, _, data):
self.execute(partial(self.send_advertising_data_to_relay, data))
async def send_acl_data_to_relay(self, peer_address, data):
await self.send_targeted_message(peer_address, f'acl:{data.hex()}')
def send_acl_data(self, _, peer_address, _transport, data):
# TODO: handle different transport
self.execute(partial(self.send_acl_data_to_relay, peer_address, data))
async def send_connection_request_to_relay(self, peer_address):
await self.send_targeted_message(peer_address, 'connect')
def connect(self, _, le_create_connection_command):
if self.pending_connection:
logger.warning('connection already pending')
return
self.pending_connection = le_create_connection_command
self.execute(
partial(
self.send_connection_request_to_relay,
str(le_create_connection_command.peer_address),
)
)
def on_disconnection_complete(self, disconnect_command):
self.controller.on_link_peripheral_disconnection_complete(
disconnect_command, HCI_SUCCESS
)
def disconnect(self, central_address, peripheral_address, disconnect_command):
logger.debug(
f'disconnect {central_address} -> '
f'{peripheral_address}: reason = {disconnect_command.reason}'
)
self.execute(
partial(
self.send_targeted_message,
peripheral_address,
f'disconnect:reason={disconnect_command.reason}',
)
)
asyncio.get_running_loop().call_soon(
self.on_disconnection_complete, disconnect_command
)
def on_connection_encrypted(self, _, peripheral_address, rand, ediv, ltk):
asyncio.get_running_loop().call_soon(
self.controller.on_link_encrypted, peripheral_address, rand, ediv, ltk
)
self.execute(
partial(
self.send_targeted_message,
peripheral_address,
f'encrypted:ltk={ltk.hex()}',
)
)
+65
View File
@@ -0,0 +1,65 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import functools
import logging
import os
from bumble import colors
# -----------------------------------------------------------------------------
class ColorFormatter(logging.Formatter):
_colorizers = {
logging.DEBUG: functools.partial(colors.color, fg="white"),
logging.INFO: functools.partial(colors.color, fg="green"),
logging.WARNING: functools.partial(colors.color, fg="yellow"),
logging.ERROR: functools.partial(colors.color, fg="red"),
logging.CRITICAL: functools.partial(colors.color, fg="black", bg="red"),
}
_formatters = {
level: logging.Formatter(
fmt=colorizer("{asctime}.{msecs:03.0f} {levelname:.1} {name}: ")
+ "{message}",
datefmt="%H:%M:%S",
style="{",
)
for level, colorizer in _colorizers.items()
}
def format(self, record: logging.LogRecord) -> str:
return self._formatters[record.levelno].format(record)
def setup_basic_logging(default_level: str = "INFO") -> None:
"""
Set up basic logging with logging.basicConfig, configured with a simple formatter
that prints out the date and log level in color.
If the BUMBLE_LOGLEVEL environment variable is set to the name of a log level, it
is used. Otherwise the default_level argument is used.
Args:
default_level: default logging level
"""
handler = logging.StreamHandler()
handler.setFormatter(ColorFormatter())
logging.basicConfig(
level=os.environ.get("BUMBLE_LOGLEVEL", default_level).upper(),
handlers=[handler],
)
+11 -17
View File
@@ -21,13 +21,7 @@ from dataclasses import dataclass
import secrets
from typing import Optional
from bumble.hci import (
Address,
HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY,
HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_KEYBOARD_ONLY_IO_CAPABILITY,
)
from bumble import hci
from bumble.smp import (
SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
SMP_KEYBOARD_ONLY_IO_CAPABILITY,
@@ -50,7 +44,7 @@ from bumble.core import AdvertisingData, LeRole
class OobData:
"""OOB data that can be sent from one device to another."""
address: Optional[Address] = None
address: Optional[hci.Address] = None
role: Optional[LeRole] = None
shared_data: Optional[OobSharedData] = None
legacy_context: Optional[OobLegacyContext] = None
@@ -62,7 +56,7 @@ class OobData:
shared_data_r: Optional[bytes] = None
for ad_type, ad_data in ad.ad_structures:
if ad_type == AdvertisingData.LE_BLUETOOTH_DEVICE_ADDRESS:
instance.address = Address(ad_data)
instance.address = hci.Address(ad_data)
elif ad_type == AdvertisingData.LE_ROLE:
instance.role = LeRole(ad_data[0])
elif ad_type == AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE:
@@ -130,11 +124,11 @@ class PairingDelegate:
# Default mapping from abstract to Classic I/O capabilities.
# Subclasses may override this if they prefer a different mapping.
CLASSIC_IO_CAPABILITIES_MAP = {
NO_OUTPUT_NO_INPUT: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY,
KEYBOARD_INPUT_ONLY: HCI_KEYBOARD_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_ONLY: HCI_DISPLAY_ONLY_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: HCI_DISPLAY_YES_NO_IO_CAPABILITY,
NO_OUTPUT_NO_INPUT: hci.IoCapability.NO_INPUT_NO_OUTPUT,
KEYBOARD_INPUT_ONLY: hci.IoCapability.KEYBOARD_ONLY,
DISPLAY_OUTPUT_ONLY: hci.IoCapability.DISPLAY_ONLY,
DISPLAY_OUTPUT_AND_YES_NO_INPUT: hci.IoCapability.DISPLAY_YES_NO,
DISPLAY_OUTPUT_AND_KEYBOARD_INPUT: hci.IoCapability.DISPLAY_YES_NO,
}
io_capability: IoCapability
@@ -160,7 +154,7 @@ class PairingDelegate:
# pylint: disable=line-too-long
return self.CLASSIC_IO_CAPABILITIES_MAP.get(
self.io_capability, HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
self.io_capability, hci.IoCapability.NO_INPUT_NO_OUTPUT
)
@property
@@ -237,8 +231,8 @@ class PairingConfig:
"""Configuration for the Pairing protocol."""
class AddressType(enum.IntEnum):
PUBLIC = Address.PUBLIC_DEVICE_ADDRESS
RANDOM = Address.RANDOM_DEVICE_ADDRESS
PUBLIC = hci.Address.PUBLIC_DEVICE_ADDRESS
RANDOM = hci.Address.RANDOM_DEVICE_ADDRESS
@dataclass
class OobConfig:
+4 -4
View File
@@ -244,16 +244,16 @@ class SecurityService(SecurityServicer):
and connection.authenticated
and link_key_type
in (
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE,
hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192,
hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256,
)
)
if level == LEVEL4:
return (
connection.encryption == hci.HCI_Encryption_Change_Event.AES_CCM
connection.encryption == hci.HCI_Encryption_Change_Event.Enabled.AES_CCM
and connection.authenticated
and link_key_type
== hci.HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE
== hci.LinkKeyType.AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256
)
raise InvalidArgumentError(f"Unexpected level {level}")
+5 -11
View File
@@ -343,22 +343,16 @@ class AseStateMachine(gatt.Characteristic):
self.service.device.EVENT_CIS_ESTABLISHMENT, self.on_cis_establishment
)
def on_cis_request(
self,
acl_connection: device.Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
) -> None:
def on_cis_request(self, cis_link: device.CisLink) -> None:
if (
cig_id == self.cig_id
and cis_id == self.cis_id
cis_link.cig_id == self.cig_id
and cis_link.cis_id == self.cis_id
and self.state == self.State.ENABLING
):
utils.cancel_on_event(
acl_connection,
cis_link.acl_connection,
'flush',
self.service.device.accept_cis_request(cis_handle),
self.service.device.accept_cis_request(cis_link),
)
def on_cis_establishment(self, cis_link: device.CisLink) -> None:
+2 -16
View File
@@ -20,9 +20,9 @@ import logging
import os
from typing import Optional
from bumble import utils
from bumble.transport.common import (
Transport,
AsyncPipeSink,
SnoopingTransport,
TransportSpecError,
)
@@ -195,6 +195,7 @@ async def _open_transport(scheme: str, spec: Optional[str]) -> Transport:
# -----------------------------------------------------------------------------
@utils.deprecated("RemoteLink has been removed. Use open_transport instead.")
async def open_transport_or_link(name: str) -> Transport:
"""
Open a transport or a link relay.
@@ -205,21 +206,6 @@ async def open_transport_or_link(name: str) -> Transport:
When the name starts with "link-relay:", open a link relay (see RemoteLink
for details on what the arguments are).
For other namespaces, see `open_transport`.
"""
if name.startswith('link-relay:'):
logger.warning('Link Relay has been deprecated.')
from bumble.controller import Controller
from bumble.link import RemoteLink # lazy import
link = RemoteLink(name[11:])
await link.wait_until_connected()
controller = Controller('remote', link=link) # type:ignore[arg-type]
class LinkTransport(Transport):
async def close(self):
link.close()
return _wrap_transport(LinkTransport(controller, AsyncPipeSink(controller)))
return await open_transport(name)
+106 -186
View File
@@ -15,21 +15,12 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import dataclasses
from dataclasses import field
import struct
from typing import Optional
from bumble.hci import (
name_or_number,
hci_vendor_command_op_code,
Address,
HCI_Constant,
HCI_Object,
HCI_Command,
HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC,
)
from bumble import hci
# -----------------------------------------------------------------------------
@@ -41,27 +32,28 @@ from bumble.hci import (
#
# pylint: disable-next=line-too-long
# See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#chip-capabilities-and-configuration
HCI_LE_GET_VENDOR_CAPABILITIES_COMMAND = hci_vendor_command_op_code(0x153)
HCI_LE_APCF_COMMAND = hci_vendor_command_op_code(0x157)
HCI_GET_CONTROLLER_ACTIVITY_ENERGY_INFO_COMMAND = hci_vendor_command_op_code(0x159)
HCI_A2DP_HARDWARE_OFFLOAD_COMMAND = hci_vendor_command_op_code(0x15D)
HCI_BLUETOOTH_QUALITY_REPORT_COMMAND = hci_vendor_command_op_code(0x15E)
HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_LE_GET_VENDOR_CAPABILITIES_COMMAND = hci.hci_vendor_command_op_code(0x153)
HCI_LE_APCF_COMMAND = hci.hci_vendor_command_op_code(0x157)
HCI_GET_CONTROLLER_ACTIVITY_ENERGY_INFO_COMMAND = hci.hci_vendor_command_op_code(0x159)
HCI_A2DP_HARDWARE_OFFLOAD_COMMAND = hci.hci_vendor_command_op_code(0x15D)
HCI_BLUETOOTH_QUALITY_REPORT_COMMAND = hci.hci_vendor_command_op_code(0x15E)
HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci.hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals())
hci.HCI_Command.register_commands(globals())
# -----------------------------------------------------------------------------
@HCI_Command.command()
class HCI_LE_Get_Vendor_Capabilities_Command(HCI_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Get_Vendor_Capabilities_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#vendor-specific-capabilities
'''
return_parameters_fields = [
('status', STATUS_SPEC),
('status', hci.STATUS_SPEC),
('max_advt_instances', 1),
('offloaded_resolution_of_private_address', 1),
('total_scan_results_storage', 2),
@@ -85,13 +77,13 @@ class HCI_LE_Get_Vendor_Capabilities_Command(HCI_Command):
# there are no more bytes to parse, and leave un-signal parameters set to
# None (older versions)
nones = {field: None for field, _ in cls.return_parameters_fields}
return_parameters = HCI_Object(cls.return_parameters_fields, **nones)
return_parameters = hci.HCI_Object(cls.return_parameters_fields, **nones)
try:
offset = 0
for field in cls.return_parameters_fields:
field_name, field_type = field
field_value, field_size = HCI_Object.parse_field(
field_value, field_size = hci.HCI_Object.parse_field(
parameters, offset, field_type
)
setattr(return_parameters, field_name, field_value)
@@ -103,19 +95,9 @@ class HCI_LE_Get_Vendor_Capabilities_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
class HCI_LE_APCF_Command(HCI_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_LE_APCF_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_apcf_command
@@ -123,60 +105,41 @@ class HCI_LE_APCF_Command(HCI_Command):
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# APCF Subcommands
class Opcode(hci.SpecableEnum):
ENABLE = 0x00
SET_FILTERING_PARAMETERS = 0x01
BROADCASTER_ADDRESS = 0x02
SERVICE_UUID = 0x03
SERVICE_SOLICITATION_UUID = 0x04
LOCAL_NAME = 0x05
MANUFACTURER_DATA = 0x06
SERVICE_DATA = 0x07
TRANSPORT_DISCOVERY_SERVICE = 0x08
AD_TYPE_FILTER = 0x09
READ_EXTENDED_FEATURES = 0xFF
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
('status', STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_LE_APCF_Command.opcode_name(x),
},
),
('status', hci.STATUS_SPEC),
('opcode', Opcode.type_spec(1)),
('payload', '*'),
]
# APCF Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
APCF_ENABLE = 0x00
APCF_SET_FILTERING_PARAMETERS = 0x01
APCF_BROADCASTER_ADDRESS = 0x02
APCF_SERVICE_UUID = 0x03
APCF_SERVICE_SOLICITATION_UUID = 0x04
APCF_LOCAL_NAME = 0x05
APCF_MANUFACTURER_DATA = 0x06
APCF_SERVICE_DATA = 0x07
APCF_TRANSPORT_DISCOVERY_SERVICE = 0x08
APCF_AD_TYPE_FILTER = 0x09
APCF_READ_EXTENDED_FEATURES = 0xFF
OPCODE_NAMES = {
APCF_ENABLE: 'APCF_ENABLE',
APCF_SET_FILTERING_PARAMETERS: 'APCF_SET_FILTERING_PARAMETERS',
APCF_BROADCASTER_ADDRESS: 'APCF_BROADCASTER_ADDRESS',
APCF_SERVICE_UUID: 'APCF_SERVICE_UUID',
APCF_SERVICE_SOLICITATION_UUID: 'APCF_SERVICE_SOLICITATION_UUID',
APCF_LOCAL_NAME: 'APCF_LOCAL_NAME',
APCF_MANUFACTURER_DATA: 'APCF_MANUFACTURER_DATA',
APCF_SERVICE_DATA: 'APCF_SERVICE_DATA',
APCF_TRANSPORT_DISCOVERY_SERVICE: 'APCF_TRANSPORT_DISCOVERY_SERVICE',
APCF_AD_TYPE_FILTER: 'APCF_AD_TYPE_FILTER',
APCF_READ_EXTENDED_FEATURES: 'APCF_READ_EXTENDED_FEATURES',
}
@classmethod
def opcode_name(cls, opcode):
return name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@HCI_Command.command()
class HCI_Get_Controller_Activity_Energy_Info_Command(HCI_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Get_Controller_Activity_Energy_Info_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#le_get_controller_activity_energy_info
'''
return_parameters_fields = [
('status', STATUS_SPEC),
('status', hci.STATUS_SPEC),
('total_tx_time_ms', 4),
('total_rx_time_ms', 4),
('total_idle_time_ms', 4),
@@ -185,19 +148,9 @@ class HCI_Get_Controller_Activity_Energy_Info_Command(HCI_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
class HCI_A2DP_Hardware_Offload_Command(HCI_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_A2DP_Hardware_Offload_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#a2dp-hardware-offload-support
@@ -205,47 +158,26 @@ class HCI_A2DP_Hardware_Offload_Command(HCI_Command):
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# A2DP Hardware Offload Subcommands
class Opcode(hci.SpecableEnum):
START_A2DP_OFFLOAD = 0x01
STOP_A2DP_OFFLOAD = 0x02
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
('status', STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_A2DP_Hardware_Offload_Command.opcode_name(x),
},
),
('status', hci.STATUS_SPEC),
('opcode', Opcode.type_spec(1)),
('payload', '*'),
]
# A2DP Hardware Offload Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
START_A2DP_OFFLOAD = 0x01
STOP_A2DP_OFFLOAD = 0x02
OPCODE_NAMES = {
START_A2DP_OFFLOAD: 'START_A2DP_OFFLOAD',
STOP_A2DP_OFFLOAD: 'STOP_A2DP_OFFLOAD',
}
@classmethod
def opcode_name(cls, opcode):
return name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x),
},
),
('payload', '*'),
],
)
class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Dynamic_Audio_Buffer_Command(hci.HCI_Command):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#dynamic-audio-buffer-command
@@ -253,40 +185,30 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
NOTE: the subcommand-specific payloads are left as opaque byte arrays in this
implementation. A future enhancement may define subcommand-specific data structures.
'''
# Dynamic Audio Buffer Subcommands
class Opcode(hci.SpecableEnum):
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
opcode: int = dataclasses.field(metadata=Opcode.type_metadata(1))
payload: bytes = dataclasses.field(metadata=hci.metadata("*"))
return_parameters_fields = [
('status', STATUS_SPEC),
(
'opcode',
{
'size': 1,
'mapper': lambda x: HCI_Dynamic_Audio_Buffer_Command.opcode_name(x),
},
),
('status', hci.STATUS_SPEC),
('opcode', Opcode.type_spec(1)),
('payload', '*'),
]
# Dynamic Audio Buffer Subcommands
# TODO: use the OpenIntEnum class (when upcoming PR is merged)
GET_AUDIO_BUFFER_TIME_CAPABILITY = 0x01
OPCODE_NAMES = {
GET_AUDIO_BUFFER_TIME_CAPABILITY: 'GET_AUDIO_BUFFER_TIME_CAPABILITY',
}
@classmethod
def opcode_name(cls, opcode):
return name_or_number(cls.OPCODE_NAMES, opcode)
# -----------------------------------------------------------------------------
class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: dict[int, type[HCI_Extended_Event]] = {}
class HCI_Android_Vendor_Event(hci.HCI_Extended_Event):
event_code: int = hci.HCI_VENDOR_EVENT
subevent_classes: dict[int, type[hci.HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
) -> Optional[hci.HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
@@ -297,45 +219,43 @@ class HCI_Android_Vendor_Event(HCI_Extended_Event):
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.add_vendor_factory(HCI_Android_Vendor_Event.subclass_from_parameters)
hci.HCI_Event.add_vendor_factory(HCI_Android_Vendor_Event.subclass_from_parameters)
# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[
('quality_report_id', 1),
('packet_types', 1),
('connection_handle', 2),
('connection_role', {'size': 1, 'mapper': HCI_Constant.role_name}),
('tx_power_level', -1),
('rssi', -1),
('snr', 1),
('unused_afh_channel_count', 1),
('afh_select_unideal_channel_count', 1),
('lsto', 2),
('connection_piconet_clock', 4),
('retransmission_count', 4),
('no_rx_count', 4),
('nak_count', 4),
('last_tx_ack_timestamp', 4),
('flow_off_count', 4),
('last_flow_on_timestamp', 4),
('buffer_overflow_bytes', 4),
('buffer_underflow_bytes', 4),
('bdaddr', Address.parse_address),
('cal_failed_item_count', 1),
('tx_total_packets', 4),
('tx_unacked_packets', 4),
('tx_flushed_packets', 4),
('tx_last_subevent_packets', 4),
('crc_error_packets', 4),
('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'),
]
)
@hci.HCI_Extended_Event.event
@dataclasses.dataclass
class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long
'''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event
'''
quality_report_id: int = field(metadata=hci.metadata(1))
packet_types: int = field(metadata=hci.metadata(1))
connection_handle: int = field(metadata=hci.metadata(2))
connection_role: int = field(metadata=hci.Role.type_metadata(1))
tx_power_level: int = field(metadata=hci.metadata(-1))
rssi: int = field(metadata=hci.metadata(-1))
snr: int = field(metadata=hci.metadata(1))
unused_afh_channel_count: int = field(metadata=hci.metadata(1))
afh_select_unideal_channel_count: int = field(metadata=hci.metadata(1))
lsto: int = field(metadata=hci.metadata(2))
connection_piconet_clock: int = field(metadata=hci.metadata(4))
retransmission_count: int = field(metadata=hci.metadata(4))
no_rx_count: int = field(metadata=hci.metadata(4))
nak_count: int = field(metadata=hci.metadata(4))
last_tx_ack_timestamp: int = field(metadata=hci.metadata(4))
flow_off_count: int = field(metadata=hci.metadata(4))
last_flow_on_timestamp: int = field(metadata=hci.metadata(4))
buffer_overflow_bytes: int = field(metadata=hci.metadata(4))
buffer_underflow_bytes: int = field(metadata=hci.metadata(4))
bdaddr: hci.Address = field(metadata=hci.metadata(hci.Address.parse_address))
cal_failed_item_count: int = field(metadata=hci.metadata(1))
tx_total_packets: int = field(metadata=hci.metadata(4))
tx_unacked_packets: int = field(metadata=hci.metadata(4))
tx_flushed_packets: int = field(metadata=hci.metadata(4))
tx_last_subevent_packets: int = field(metadata=hci.metadata(4))
crc_error_packets: int = field(metadata=hci.metadata(4))
rx_duplicate_packets: int = field(metadata=hci.metadata(4))
rx_unreceived_packets: int = field(metadata=hci.metadata(4))
vendor_specific_parameters: bytes = field(metadata=hci.metadata('*'))
+21 -18
View File
@@ -15,11 +15,9 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from bumble.hci import (
hci_vendor_command_op_code,
HCI_Command,
STATUS_SPEC,
)
import dataclasses
from bumble import hci
# -----------------------------------------------------------------------------
@@ -31,10 +29,10 @@ from bumble.hci import (
#
# pylint: disable-next=line-too-long
# See https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
HCI_WRITE_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000E)
HCI_READ_TX_POWER_LEVEL_COMMAND = hci_vendor_command_op_code(0x000F)
HCI_WRITE_TX_POWER_LEVEL_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_READ_TX_POWER_LEVEL_COMMAND = hci.hci_vendor_command_op_code(0x000F)
HCI_Command.register_commands(globals())
hci.HCI_Command.register_commands(globals())
# -----------------------------------------------------------------------------
@@ -49,10 +47,9 @@ class TX_Power_Level_Command:
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('handle_type', 1), ('connection_handle', 2), ('tx_power_level', -1)],
)
class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Write_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
'''
Write TX power level. See BT_HCI_OP_VS_WRITE_TX_POWER_LEVEL in
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
@@ -61,8 +58,12 @@ class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
TX_POWER_HANDLE_TYPE_SCAN should be zero.
'''
handle_type: int = dataclasses.field(metadata=hci.metadata(1))
connection_handle: int = dataclasses.field(metadata=hci.metadata(2))
tx_power_level: int = dataclasses.field(metadata=hci.metadata(-1))
return_parameters_fields = [
('status', STATUS_SPEC),
('status', hci.STATUS_SPEC),
('handle_type', 1),
('connection_handle', 2),
('selected_tx_power_level', -1),
@@ -70,10 +71,9 @@ class HCI_Write_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
# -----------------------------------------------------------------------------
@HCI_Command.command(
fields=[('handle_type', 1), ('connection_handle', 2)],
)
class HCI_Read_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
@hci.HCI_Command.command
@dataclasses.dataclass
class HCI_Read_Tx_Power_Level_Command(hci.HCI_Command, TX_Power_Level_Command):
'''
Read TX power level. See BT_HCI_OP_VS_READ_TX_POWER_LEVEL in
https://github.com/zephyrproject-rtos/zephyr/blob/main/include/zephyr/bluetooth/hci_vs.h
@@ -82,8 +82,11 @@ class HCI_Read_Tx_Power_Level_Command(HCI_Command, TX_Power_Level_Command):
TX_POWER_HANDLE_TYPE_SCAN should be zero.
'''
handle_type: int = dataclasses.field(metadata=hci.metadata(1))
connection_handle: int = dataclasses.field(metadata=hci.metadata(2))
return_parameters_fields = [
('status', STATUS_SPEC),
('status', hci.STATUS_SPEC),
('handle_type', 1),
('connection_handle', 2),
('tx_power_level', -1),
-1
View File
@@ -57,7 +57,6 @@ nav:
- Pair: apps_and_tools/pair.md
- Unbond: apps_and_tools/unbond.md
- USB Probe: apps_and_tools/usb_probe.md
- Link Relay: apps_and_tools/link_relay.md
- Hardware:
- hardware/index.md
- Platforms:
-1
View File
@@ -13,4 +13,3 @@ These include:
* [Golden Gate Bridge](gg_bridge.md) - Bridge between GATT and UDP to use with the Golden Gate "stack tool".
* [Show](show.md) - Parse a file with HCI packets and print the details of each packet in a human readable form.
* [Speaker](speaker.md) - Virtual Bluetooth speaker, with a command line and browser-based UI.
* [Link Relay](link_relay.md) - WebSocket relay for virtual RemoteLink instances to communicate with each other.
@@ -1,33 +0,0 @@
LINK RELAY TOOL
===============
The Link Relay is a WebSocket relay, which acts like an online chat system, where each "chat room" can be joined by multiple virtual controllers, which can then communicate with each other, as if connected with radio communication.
```
usage: python link_relay.py [-h] [--log-level LOG_LEVEL] [--log-config LOG_CONFIG] [--port PORT]
optional arguments:
-h, --help show this help message and exit
--log-level LOG_LEVEL
logger level
--log-config LOG_CONFIG
logger config file (YAML)
--port PORT Port to listen on
```
(the default port is `10723`)
When running, the link relay waits for connections on its listening port.
The WebSocket path used by a connecting client indicates which virtual "chat room" to join.
!!! tip "Connecting to the relay as a controller"
Most of the examples and tools that take a transport moniker as an argument also accept a link relay moniker, which is equivalent to a transport to a virtual controller that is connected to a relay.
The moniker syntax is: `link-relay:ws://<hostname>/<room>` where `<hostname>` is the hostname to connect to and `<room>` is the virtual "chat room" in a relay.
Example: `link-relay:ws://localhost:10723/test` will join the `test` "chat room"
!!! tip "Connecting to the relay as an observer"
It is possible to connect to a "chat room" in a relay as an observer, rather than a virtual controller. In this case, a text-based console can be used to observe what is going on in the "chat room". Tools like [`wscat`](https://github.com/websockets/wscat#readme) or [`websocat`](https://github.com/vi/websocat) can be used for that.
Example: `wscat --connect ws://localhost:10723/test`
-7
View File
@@ -56,13 +56,6 @@ Included in the project are two types of Link interface implementations:
The LocalLink implementation is a simple object used by an application that instantiates
more than one Controller objects and connects them in-memory and in-process.
#### Remote Link
The RemoteLink implementation communicates with other virtual controllers over a WebSocket.
Multiple instances of RemoteLink objects communicate with each other through a simple
WebSocket relay that can host any number of virtual 'rooms', where each 'room' is
a set of controllers that can communicate between themselves.
The `link_relay` app is where this relay is implemented.
## Host
The Host component connects to a controller over an HCI interface. It is responsible to sending commands and ACL data to the controller and receiving back events and ACL data.
+2 -3
View File
@@ -15,11 +15,10 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
from bumble.utils import AsyncRunner
import bumble.logging
# -----------------------------------------------------------------------------
my_work_queue1 = AsyncRunner.WorkQueue()
@@ -83,5 +82,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+2 -3
View File
@@ -17,13 +17,12 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy
import bumble.logging
# -----------------------------------------------------------------------------
@@ -72,5 +71,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,15 +17,14 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import random
import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryService
import bumble.logging
# -----------------------------------------------------------------------------
@@ -35,7 +34,7 @@ async def main() -> None:
print('example: python battery_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
@@ -74,5 +73,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+3 -3
View File
@@ -17,13 +17,13 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -116,5 +116,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,14 +17,13 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import struct
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.profiles.device_information_service import DeviceInformationService
import bumble.logging
# -----------------------------------------------------------------------------
@@ -34,7 +33,7 @@ async def main() -> None:
print('example: python device_info_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
@@ -70,5 +69,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+3 -3
View File
@@ -17,13 +17,13 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
import bumble.logging
# -----------------------------------------------------------------------------
@@ -76,5 +76,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -20,16 +20,15 @@ import time
import math
import random
import struct
import logging
import asyncio
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.profiles.device_information_service import DeviceInformationService
from bumble.profiles.heart_rate_service import HeartRateService
from bumble.utils import AsyncRunner
import bumble.logging
# -----------------------------------------------------------------------------
@@ -39,7 +38,7 @@ async def main() -> None:
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
@@ -128,5 +127,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+8 -7
View File
@@ -17,17 +17,16 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import struct
import json
import websockets
from bumble.colors import color
import websockets
from bumble.colors import color
from bumble.core import AdvertisingData
from bumble.device import Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.gatt import (
Descriptor,
Service,
@@ -45,6 +44,8 @@ from bumble.gatt import (
GATT_HID_CONTROL_POINT_CHARACTERISTIC,
GATT_REPORT_REFERENCE_DESCRIPTOR,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -434,7 +435,7 @@ async def main() -> None:
)
return
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
# Create a device to manage the host
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
@@ -450,5 +451,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,12 +17,10 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import (
PhysicalTransport,
BT_AVDTP_PROTOCOL_ID,
@@ -39,6 +37,7 @@ from bumble.sdp import (
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -146,7 +145,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -198,5 +197,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -17,12 +17,10 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from typing import Any
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import PhysicalTransport
from bumble.avdtp import (
AVDTP_AUDIO_MEDIA_TYPE,
@@ -35,6 +33,8 @@ from bumble.a2dp import (
A2DP_SBC_CODEC_TYPE,
SbcMediaCodecInformation,
)
import bumble.logging
Context: dict[Any, Any] = {'output': None}
@@ -112,7 +112,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
with open(sys.argv[3], 'wb') as sbc_file:
@@ -166,5 +166,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,12 +17,10 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import PhysicalTransport
from bumble.avdtp import (
find_avdtp_service_with_connection,
@@ -38,6 +36,7 @@ from bumble.a2dp import (
SbcMediaCodecInformation,
SbcPacketSource,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -120,7 +119,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -186,5 +185,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -16,15 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
import struct
from bumble.core import AdvertisingData
from bumble.device import AdvertisingType, Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -50,7 +49,7 @@ async def main() -> None:
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -72,5 +71,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+3 -4
View File
@@ -17,10 +17,8 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.ancs import (
@@ -31,6 +29,7 @@ from bumble.profiles.ancs import (
Notification,
NotificationAttributeId,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -210,5 +209,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+7 -10
View File
@@ -17,18 +17,19 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import websockets
from typing import Optional
import websockets
from bumble import decoder
from bumble import gatt
from bumble.core import AdvertisingData
from bumble.device import Device, AdvertisingParameters
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.profiles import asha
import bumble.logging
ws_connection: Optional[websockets.WebSocketServerProtocol] = None
g722_decoder = decoder.G722Decoder()
@@ -50,7 +51,7 @@ async def main() -> None:
print('example: python run_asha_sink.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
@@ -111,9 +112,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(
level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper(),
format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -4
View File
@@ -19,18 +19,19 @@ from __future__ import annotations
import asyncio
import json
import sys
import os
import logging
import websockets
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import PhysicalTransport
from bumble import avc
from bumble import avrcp
from bumble import avdtp
from bumble import a2dp
from bumble import utils
import bumble.logging
logger = logging.getLogger(__name__)
@@ -344,7 +345,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -409,5 +410,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -18,15 +18,15 @@
from __future__ import annotations
import asyncio
import logging
import sys
import os
import functools
from bumble import core
from bumble import hci
from bumble.device import Connection, Device, ChannelSoundingCapabilities
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# From https://cs.android.com/android/platform/superproject/main/+/main:packages/modules/Bluetooth/system/gd/hci/distance_measurement_manager.cc.
CS_TONE_ANTENNA_CONFIG_MAPPING_TABLE = [
@@ -78,7 +78,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -150,5 +150,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+33 -23
View File
@@ -16,16 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble import utils
from bumble.device import Device, Connection
from bumble.device import Device, CigParameters, CisLink, Connection
from bumble.hci import (
OwnAddressType,
)
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -40,7 +38,7 @@ async def main() -> None:
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
open_transport(sys.argv[2]), open_transport(sys.argv[3])
)
print('<<< connected')
@@ -61,27 +59,39 @@ async def main() -> None:
devices[0].random_address, own_address_type=OwnAddressType.RANDOM
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 255),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 5),
CigParameters(
cig_id=1,
cis_parameters=[
CigParameters.CisParameters(
cis_id=2,
max_sdu_c_to_p=120,
max_sdu_p_to_c=0,
rtn_c_to_p=13,
rtn_p_to_c=13,
),
CigParameters.CisParameters(
cis_id=3,
max_sdu_c_to_p=120,
max_sdu_p_to_c=0,
rtn_c_to_p=13,
rtn_p_to_c=13,
),
],
sdu_interval_c_to_p=10000,
sdu_interval_p_to_c=255,
framing=CigParameters.Framing.UNFRAMED,
max_transport_latency_c_to_p=100,
max_transport_latency_p_to_c=5,
),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.cancel_on_disconnection(devices[0].accept_cis_request(cis_handle))
def on_cis_request(connection: Connection, cis_link: CisLink):
connection.cancel_on_disconnection(devices[0].accept_cis_request(cis_link))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
cis_links = await devices[1].create_cis([(cis, connection) for cis in cis_handles])
for cis_link in cis_links:
await cis_link.disconnect()
@@ -92,5 +102,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -6
View File
@@ -17,18 +17,17 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import PhysicalTransport, BT_L2CAP_PROTOCOL_ID, CommandTimeoutError
from bumble.sdp import (
Client as SDP_Client,
SDP_PUBLIC_BROWSE_ROOT,
SDP_ALL_ATTRIBUTES_RANGE,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -42,7 +41,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -117,5 +116,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -17,11 +17,9 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.sdp import (
DataElement,
ServiceAttribute,
@@ -38,6 +36,8 @@ from bumble.core import (
BT_AVDTP_PROTOCOL_ID,
BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
)
import bumble.logging
# -----------------------------------------------------------------------------
SDP_SERVICE_RECORDS = {
@@ -98,7 +98,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -117,5 +117,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -17,13 +17,13 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import DeviceClass
import bumble.logging
# -----------------------------------------------------------------------------
@@ -60,7 +60,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci(
@@ -77,5 +77,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,11 +17,10 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -37,7 +36,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -62,5 +61,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -15,10 +15,8 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import sys
import os
from bumble.gatt import (
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
@@ -32,7 +30,8 @@ from bumble.device import Device
from bumble.host import Host
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -49,7 +48,7 @@ async def main() -> None:
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[3]) as hci_transport:
async with await open_transport(sys.argv[3]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -105,5 +104,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -15,16 +15,16 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import sys
import os
from bumble.colors import color
from bumble.device import Device
from bumble.controller import Controller
from bumble.hci import Address
from bumble.link import LocalLink
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -52,7 +52,7 @@ async def main() -> None:
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
async with await open_transport(sys.argv[1]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -82,5 +82,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -6
View File
@@ -16,9 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
import secrets
from bumble.core import AdvertisingData
@@ -28,8 +26,8 @@ from bumble.hci import (
)
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -44,7 +42,7 @@ async def main() -> None:
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
open_transport(sys.argv[2]), open_transport(sys.argv[3])
)
print('<<< connected')
@@ -101,5 +99,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -17,12 +17,12 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.snoop import BtSnooper
import bumble.logging
# -----------------------------------------------------------------------------
@@ -33,7 +33,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci(
@@ -52,5 +52,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -6
View File
@@ -16,15 +16,14 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.core import PhysicalTransport
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -42,7 +41,7 @@ async def main() -> None:
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
open_transport(sys.argv[2]), open_transport(sys.argv[3])
)
print('<<< connected')
@@ -83,5 +82,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -6
View File
@@ -16,9 +16,8 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
AdvertisingParameters,
AdvertisingEventProperties,
@@ -26,8 +25,8 @@ from bumble.device import (
Device,
)
from bumble.hci import Address
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -52,7 +51,7 @@ async def main() -> None:
peer_address = Address.ANY
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -69,5 +68,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+5 -5
View File
@@ -16,13 +16,13 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import AdvertisingParameters, AdvertisingEventProperties, Device
from bumble.hci import Address
from bumble.core import AdvertisingData
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -33,7 +33,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -96,5 +96,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,15 +17,14 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.core import ProtocolError
from bumble.device import Device, Peer
from bumble.gatt import show_services
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.utils import AsyncRunner
import bumble.logging
# -----------------------------------------------------------------------------
@@ -79,7 +78,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
@@ -101,5 +100,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+3 -3
View File
@@ -16,8 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import logging
from bumble.colors import color
from bumble.core import ProtocolError
from bumble.controller import Controller
@@ -34,6 +33,7 @@ from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
)
from bumble.gatt_client import show_services
import bumble.logging
# -----------------------------------------------------------------------------
@@ -119,5 +119,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,11 +17,9 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.device import Device, Connection
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.att import ATT_Error, ATT_INSUFFICIENT_ENCRYPTION_ERROR
from bumble.gatt import (
Service,
@@ -32,6 +30,7 @@ from bumble.gatt import (
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_DEVICE_INFORMATION_SERVICE,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -81,7 +80,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
@@ -152,5 +151,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
@@ -17,16 +17,15 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.gatt import (
Service,
Characteristic,
)
from bumble.pairing import PairingConfig, PairingDelegate
import bumble.logging
# -----------------------------------------------------------------------------
@@ -58,7 +57,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
@@ -107,5 +106,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+2 -3
View File
@@ -20,8 +20,6 @@ import asyncio
import dataclasses
import functools
import enum
import logging
import os
import random
import struct
import sys
@@ -34,6 +32,7 @@ from bumble import gatt_adapters
from bumble import gatt_client
from bumble import hci
from bumble import core
import bumble.logging
# -----------------------------------------------------------------------------
@@ -432,5 +431,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -16,9 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.core import AdvertisingData
from bumble.device import Device
@@ -32,8 +30,9 @@ from bumble.profiles.hap import (
WritablePresetsSupport,
PresetRecord,
)
from bumble.transport import open_transport
import bumble.logging
from bumble.transport import open_transport_or_link
server_features = HearingAidFeatures(
HearingAidType.MONAURAL_HEARING_AID,
@@ -56,7 +55,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -102,5 +101,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -4
View File
@@ -18,7 +18,6 @@
import asyncio
import json
import sys
import os
import io
import logging
from typing import Iterable, Optional
@@ -27,9 +26,10 @@ import websockets
import bumble.core
from bumble.device import Device, ScoLink
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import PhysicalTransport
from bumble import hci, rfcomm, hfp
import bumble.logging
logger = logging.getLogger(__name__)
@@ -191,7 +191,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -286,5 +286,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+8 -8
View File
@@ -18,20 +18,20 @@
import asyncio
import contextlib
import sys
import os
import logging
import json
import websockets
import functools
from typing import Optional
from bumble import utils
import websockets
from bumble import rfcomm
from bumble import hci
from bumble.device import Device, Connection
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble import hfp
from bumble.hfp import HfProtocol
import bumble.logging
ws: Optional[websockets.WebSocketServerProtocol] = None
hf_protocol: Optional[HfProtocol] = None
@@ -46,7 +46,7 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol):
if connection == protocol.dlc.multiplexer.l2cap_channel.connection:
if link_type == hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE:
if link_type == hci.HCI_Connection_Complete_Event.LinkType.SCO:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.SCO_CVSD_D1
]
@@ -94,7 +94,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Hands-Free profile configuration.
@@ -175,5 +175,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+7 -6
View File
@@ -17,14 +17,13 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
import json
import websockets
import struct
import websockets
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import (
PhysicalTransport,
BT_L2CAP_PROTOCOL_ID,
@@ -49,6 +48,8 @@ from bumble.sdp import (
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
)
import bumble.logging
# -----------------------------------------------------------------------------
# SDP attributes for Bluetooth HID devices
@@ -597,7 +598,7 @@ async def main() -> None:
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -744,5 +745,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -6
View File
@@ -17,13 +17,11 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import (
BT_HUMAN_INTERFACE_DEVICE_SERVICE,
PhysicalTransport,
@@ -41,6 +39,7 @@ from bumble.sdp import (
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
)
import bumble.logging
from hid_report_parser import ReportParser
# -----------------------------------------------------------------------------
@@ -324,7 +323,7 @@ async def main() -> None:
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< CONNECTED')
# Create a device
@@ -565,6 +564,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+7 -9
View File
@@ -16,13 +16,12 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
import websockets
import json
from typing import Optional
import websockets
from bumble import utils
from bumble.core import AdvertisingData
from bumble.device import (
Device,
@@ -52,9 +51,8 @@ from bumble.profiles.mcp import (
MediaControlPointOpcode,
)
from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService
from bumble.transport import open_transport_or_link
from typing import Optional
from bumble.transport import open_transport
import bumble.logging
# -----------------------------------------------------------------------------
@@ -64,7 +62,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -191,5 +189,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,13 +17,12 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import random
import logging
from bumble.device import Device, Connection
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.gatt import Service, Characteristic
import bumble.logging
# -----------------------------------------------------------------------------
@@ -64,7 +63,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
@@ -128,5 +127,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,14 +17,12 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.core import (
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
@@ -39,6 +37,7 @@ from bumble.sdp import (
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
import bumble.logging
# -----------------------------------------------------------------------------
@@ -178,7 +177,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -237,5 +236,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,15 +17,14 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.core import UUID
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble.rfcomm import Server
from bumble.utils import AsyncRunner
from bumble.rfcomm import make_service_sdp_records
import bumble.logging
# -----------------------------------------------------------------------------
@@ -124,7 +123,7 @@ async def main() -> None:
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
@@ -159,5 +158,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+4 -5
View File
@@ -17,12 +17,11 @@
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
from bumble import logging
# -----------------------------------------------------------------------------
@@ -33,7 +32,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
@@ -79,5 +78,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+8 -12
View File
@@ -18,9 +18,7 @@
import asyncio
import datetime
import functools
import logging
import sys
import os
import io
import struct
import secrets
@@ -45,7 +43,8 @@ from bumble.profiles.bap import (
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService
from bumble.transport import open_transport_or_link
from bumble.transport import open_transport
import bumble.logging
def _sink_pac_record() -> PacRecord:
@@ -77,11 +76,11 @@ file_outputs: dict[AseStateMachine, io.BufferedWriter] = {}
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_cig_setup.py <config-file>' '<transport-spec-for-device>')
print('Usage: run_cig_setup.py <config-file> <transport-spec-for-device>')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -149,12 +148,9 @@ async def main() -> None:
sdu += pdu.iso_sdu_fragment
file_outputs[ase].write(sdu)
def on_ase_state_change(
state: AseStateMachine.State,
ase: AseStateMachine,
) -> None:
if state != AseStateMachine.State.STREAMING:
if file_output := file_outputs.pop(ase):
def on_ase_state_change(ase: AseStateMachine) -> None:
if ase.state != AseStateMachine.State.STREAMING:
if file_output := file_outputs.pop(ase, None):
file_output.close()
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
@@ -203,5 +199,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
+7 -9
View File
@@ -16,12 +16,12 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
import secrets
import websockets
import json
from typing import Optional
import websockets
from bumble.core import AdvertisingData
from bumble.device import Device, AdvertisingParameters, AdvertisingEventProperties
@@ -43,10 +43,8 @@ from bumble.profiles.pacs import PacRecord, PublishedAudioCapabilitiesService
from bumble.profiles.cap import CommonAudioServiceService
from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.profiles.vcs import VolumeControlService
from bumble.transport import open_transport_or_link
from typing import Optional
from bumble.transport import open_transport
import bumble.logging
def dumps_volume_state(volume_setting: int, muted: int, change_counter: int) -> str:
@@ -66,7 +64,7 @@ async def main() -> None:
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
async with await open_transport(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
@@ -186,5 +184,5 @@ async def main() -> None:
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
bumble.logging.setup_basic_logging('DEBUG')
asyncio.run(main())
@@ -1,5 +1,7 @@
package com.github.google.bumble.remotehci;
import static com.github.google.bumble.remotehci.HciPacket.Type.COMMAND;
import android.os.RemoteException;
import android.util.Log;
@@ -16,6 +18,10 @@ public class HciProxy {
private int mAclPacketsSent;
private int mScoPacketsSent;
private static final byte[] LOOPBACK_COMMAND_COMPLETE_EVENT = {
0x0E, 0x04, 0x01, 0x77, (byte)0xFC, 0x00
};
HciProxy(int port, Listener listener) throws HalException {
this.mListener = listener;
@@ -84,6 +90,14 @@ public class HciProxy {
@Override
public void onPacket(HciPacket.Type type, byte[] packet) {
// Short-circuit a local response when a special latency-testing packet
// is received.
if (type == COMMAND && packet[0] == (byte)0x77 && packet[1] == (byte)0xFC) {
Log.d(TAG, "LOOPBACK");
mServer.sendPacket(HciPacket.Type.EVENT, LOOPBACK_COMMAND_COMPLETE_EVENT);
return;
}
Log.d(TAG, String.format("HOST->CONTROLLER: type=%s, size=%d", type, packet.length));
hciHal.sendPacket(type, packet);
@@ -12,7 +12,7 @@ import java.net.Socket;
public class HciServer {
private static final String TAG = "HciServer";
private static final int BUFFER_SIZE = 1024;
private static final int BUFFER_SIZE = 8192;
private final int mPort;
private final Listener mListener;
private OutputStream mOutputStream;
+1 -3
View File
@@ -8,7 +8,7 @@ dynamic = ["version"]
description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation"
readme = "README.md"
authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.8"
requires-python = ">=3.9"
dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
@@ -85,7 +85,6 @@ bumble-scan = "bumble.apps.scan:main"
bumble-show = "bumble.apps.show:main"
bumble-unbond = "bumble.apps.unbond:main"
bumble-usb-probe = "bumble.apps.usb_probe:main"
bumble-link-relay = "bumble.apps.link_relay.link_relay:main"
bumble-bench = "bumble.apps.bench:main"
bumble-player = "bumble.apps.player.player:main"
bumble-speaker = "bumble.apps.speaker.speaker:main"
@@ -107,7 +106,6 @@ packages = [
"bumble.drivers",
"bumble.profiles",
"bumble.apps",
"bumble.apps.link_relay",
"bumble.pandora",
"bumble.tools",
]
+2 -2
View File
@@ -127,7 +127,7 @@ async fn main() -> PyResult<()> {
} else {
matching
.iter()
.map(|t| format!("{}", t))
.map(|t| format!("{t}"))
.join(" / ")
.blue()
.to_string()
@@ -148,7 +148,7 @@ async fn main() -> PyResult<()> {
.next()
.unwrap_or_else(|| format!("0x{}", hex::encode_upper(&data)));
println!(" [{}]: {}", code_str, data_str)
println!(" [{code_str}]: {data_str}")
});
Ok(())
+9 -9
View File
@@ -163,7 +163,7 @@ impl CommonDataType {
/// Apply type-specific human-oriented formatting to data, if any is applicable
pub fn format_data(&self, data: &[u8]) -> Option<String> {
match self {
Self::Flags => Some(Flags::matching(data).map(|f| format!("{:?}", f)).join(",")),
Self::Flags => Some(Flags::matching(data).map(|f| format!("{f:?}")).join(",")),
Self::CompleteListOf16BitServiceClassUuids
| Self::IncompleteListOf16BitServiceClassUuids
| Self::ListOf16BitServiceSolicitationUuids => {
@@ -174,8 +174,8 @@ impl CommonDataType {
.map(|uuid| {
SERVICE_IDS
.get(&uuid)
.map(|name| format!("{:?} ({name})", uuid))
.unwrap_or_else(|| format!("{:?}", uuid))
.map(|name| format!("{uuid:?} ({name})"))
.unwrap_or_else(|| format!("{uuid:?}"))
})
.join(", ")
})
@@ -185,14 +185,14 @@ impl CommonDataType {
| Self::IncompleteListOf32BitServiceClassUuids
| Self::ListOf32BitServiceSolicitationUuids => {
combinator::complete(multi::many0(Uuid32::parse))(data)
.map(|(_res, uuids)| uuids.into_iter().map(|u| format!("{:?}", u)).join(", "))
.map(|(_res, uuids)| uuids.into_iter().map(|u| format!("{u:?}")).join(", "))
.ok()
}
Self::CompleteListOf128BitServiceClassUuids
| Self::IncompleteListOf128BitServiceClassUuids
| Self::ListOf128BitServiceSolicitationUuids => {
combinator::complete(multi::many0(Uuid128::parse_le))(data)
.map(|(_res, uuids)| uuids.into_iter().map(|u| format!("{:?}", u)).join(", "))
.map(|(_res, uuids)| uuids.into_iter().map(|u| format!("{u:?}")).join(", "))
.ok()
}
Self::ServiceData16BitUuid => Uuid16::parse_le(data)
@@ -201,8 +201,8 @@ impl CommonDataType {
"service={:?}, data={}",
SERVICE_IDS
.get(&uuid)
.map(|name| format!("{:?} ({name})", uuid))
.unwrap_or_else(|| format!("{:?}", uuid)),
.map(|name| format!("{uuid:?} ({name})"))
.unwrap_or_else(|| format!("{uuid:?}")),
hex::encode_upper(rem)
)
})
@@ -214,7 +214,7 @@ impl CommonDataType {
.map(|(rem, uuid)| format!("service={:?}, data={}", uuid, hex::encode_upper(rem)))
.ok(),
Self::ShortenedLocalName | Self::CompleteLocalName => {
std::str::from_utf8(data).ok().map(|s| format!("\"{}\"", s))
std::str::from_utf8(data).ok().map(|s| format!("\"{s}\""))
}
Self::TxPowerLevel => {
let (_, tx) =
@@ -230,7 +230,7 @@ impl CommonDataType {
COMPANY_IDS
.get(&id)
.map(|s| s.to_string())
.unwrap_or_else(|| format!("{:?}", id)),
.unwrap_or_else(|| format!("{id:?}")),
hex::encode_upper(rem)
))
}
+3 -3
View File
@@ -56,7 +56,7 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
ble_connection.on_disconnection(|_py, reason| {
let disconnection_info = match HciConstant::error_name(reason) {
Ok(info_string) => info_string,
Err(py_err) => format!("failed to get disconnection error name ({})", py_err),
Err(py_err) => format!("failed to get disconnection error name ({py_err})"),
};
println!(
"{} {}",
@@ -114,10 +114,10 @@ async fn proxy_data_between_tcp_and_l2cap(
mtu: Option<u16>,
mps: Option<u16>,
) -> PyResult<()> {
println!("{}", format!("<<< TCP connection from {}", addr).magenta());
println!("{}", format!("<<< TCP connection from {addr}").magenta());
println!(
"{}",
format!(">>> Opening L2CAP channel on PSM = {}", psm).yellow()
format!(">>> Opening L2CAP channel on PSM = {psm}").yellow()
);
let mut l2cap_channel = match ble_connection
+1 -1
View File
@@ -158,7 +158,7 @@ async fn proxy_tcp_rx_to_l2cap_tx(
}
}
Err(e) => {
println!("{}", format!("!!! TCP connection lost: {}", e).red());
println!("{}", format!("!!! TCP connection lost: {e}").red());
if let Some(mut channel) = l2cap_channel.lock().await.take() {
let _ = channel.disconnect().await.map_err(|e| {
eprintln!("Failed to call disconnect on l2cap channel: {e}");
+1 -1
View File
@@ -83,7 +83,7 @@ pub async fn start(args: &Args, device: &mut Device) -> PyResult<()> {
connection.on_disconnection(|_py, reason| {
let disconnection_info = match HciConstant::error_name(reason) {
Ok(info_string) => info_string,
Err(py_err) => format!("failed to get disconnection error name ({})", py_err),
Err(py_err) => format!("failed to get disconnection error name ({py_err})"),
};
println!(
"{} {}",
+2 -2
View File
@@ -100,7 +100,7 @@ pub(crate) fn probe(verbose: bool) -> anyhow::Result<()> {
.map(|serials| serials.contains(s))
.unwrap_or(false)
{
transport_names.push(format!("{}/{}", basic_transport_name, s))
transport_names.push(format!("{basic_transport_name}/{s}"))
}
}
@@ -310,7 +310,7 @@ impl ClassInfo {
self.sub_class,
self.protocol,
self.protocol_name()
.map(|s| format!(" [{}]", s))
.map(|s| format!(" [{s}]"))
.unwrap_or_default()
)
}
+1 -1
View File
@@ -25,7 +25,7 @@ from bumble.core import PhysicalTransport
from bumble.link import LocalLink
from bumble.device import Device
from bumble.host import Host
from bumble.transport import AsyncPipeSink
from bumble.transport.common import AsyncPipeSink
from bumble.avdtp import (
AVDTP_IDLE_STATE,
AVDTP_STREAMING_STATE,

Some files were not shown because too many files have changed in this diff Show More