Compare commits

...

18 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
ea493480a9 remove duplicated lines 2024-06-11 13:23:35 -07:00
Gilles Boccon-Gibod
00edd1fbf8 post-rebase fixes 2024-06-10 10:30:59 -07:00
Gilles Boccon-Gibod
999d7b07e1 wip 2024-06-09 11:39:44 -07:00
Gilles Boccon-Gibod
f910a696ad Merge pull request #499 from google/gbg/rfcomm-bridge
rfcomm bridge app
2024-06-05 11:18:13 -07:00
Gilles Boccon-Gibod
e1d10bc482 add rfcomm disconnect test 2024-06-05 10:03:27 -07:00
Gilles Boccon-Gibod
181467f11b Merge pull request #500 from google/gbg/fix-advertising-auto-restart
fix legacy advertising auto restart
2024-06-04 06:39:54 -07:00
Gilles Boccon-Gibod
394137b6f7 fix legacy advertising auto restart 2024-06-03 19:08:46 -07:00
Gilles Boccon-Gibod
f5baf51132 improve DLC parameters 2024-06-03 18:11:13 -07:00
Gilles Boccon-Gibod
f2dc8bd84e wip (+2 squashed commits)
Squashed commits:
[451a295] wip
[ed7b5b6] wip (+1 squashed commit)
Squashed commits:
[9d938c8] wip

wip

wip
2024-05-30 14:59:22 -07:00
zxzxwu
090309302f Merge pull request #372 from zxzxwu/source
ASCS Source Implementation
2024-05-29 13:17:51 +08:00
Charlie Boutier
28e6229b24 Fix: Preserve transport metadata
Preserve transport metadata when wrapping with SnoopingTransport
2024-05-28 09:20:53 -07:00
Josh Wu
1b66f03dbe ASCS: Add Source ASE operations 2024-05-27 14:48:23 +08:00
Gilles Boccon-Gibod
e34f6b5fd3 Merge pull request #484 from google/gbg/quick-fix-002
fix incorrect var reference
2024-05-13 16:11:42 -07:00
Gilles Boccon-Gibod
8a0482c947 Merge pull request #485 from google/gbg/gh-action-py312
add python 3.12 to GH actions
2024-05-13 16:11:25 -07:00
zxzxwu
938a189f3f Merge pull request #478 from zxzxwu/config
Make DeviceConfiguration dataclass
2024-05-13 16:57:15 +08:00
Gilles Boccon-Gibod
2005b4a11b python 3.12 compatibility 2024-05-12 12:54:52 -07:00
Gilles Boccon-Gibod
951fdc8bdd add python 3.12 to GH actions 2024-05-12 12:07:05 -07:00
Josh Wu
a5ac5f26e2 Make DeviceConfiguration dataclass 2024-05-05 17:25:01 +08:00
27 changed files with 1927 additions and 353 deletions

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
fail-fast: false
steps:

View File

@@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
os: ['ubuntu-latest', 'macos-latest', 'windows-latest']
python-version: ["3.8", "3.9", "3.10", "3.11"]
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
fail-fast: false
steps:
@@ -46,7 +46,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: [ "3.8", "3.9", "3.10", "3.11" ]
python-version: [ "3.8", "3.9", "3.10", "3.11", "3.12" ]
rust-version: [ "1.76.0", "stable" ]
fail-fast: false
steps:

View File

@@ -1,6 +1,7 @@
{
"cSpell.words": [
"Abortable",
"aiohttp",
"altsetting",
"ansiblue",
"ansicyan",
@@ -9,6 +10,7 @@
"ansired",
"ansiyellow",
"appendleft",
"ascs",
"ASHA",
"asyncio",
"ATRAC",
@@ -43,6 +45,7 @@
"keyup",
"levelname",
"libc",
"liblc",
"libusb",
"MITM",
"MSBC",
@@ -78,6 +81,7 @@
"unmuted",
"usbmodem",
"vhci",
"wasmtime",
"websockets",
"xcursor",
"ycursor"

View File

@@ -40,6 +40,8 @@ from bumble.hci import (
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
HCI_CENTRAL_ROLE,
HCI_PERIPHERAL_ROLE,
HCI_Constant,
HCI_Error,
HCI_StatusError,
@@ -57,6 +59,7 @@ from bumble.transport import open_transport_or_link
import bumble.rfcomm
import bumble.core
from bumble.utils import AsyncRunner
from bumble.pairing import PairingConfig
# -----------------------------------------------------------------------------
@@ -128,40 +131,34 @@ def le_phy_name(phy_id):
def print_connection(connection):
params = []
if connection.transport == BT_LE_TRANSPORT:
phy_state = (
params.append(
'PHY='
f'TX:{le_phy_name(connection.phy.tx_phy)}/'
f'RX:{le_phy_name(connection.phy.rx_phy)}'
)
data_length = (
params.append(
'DL=('
f'TX:{connection.data_length[0]}/{connection.data_length[1]},'
f'RX:{connection.data_length[2]}/{connection.data_length[3]}'
')'
)
connection_parameters = (
params.append(
'Parameters='
f'{connection.parameters.connection_interval * 1.25:.2f}/'
f'{connection.parameters.peripheral_latency}/'
f'{connection.parameters.supervision_timeout * 10} '
)
params.append(f'MTU={connection.att_mtu}')
else:
phy_state = ''
data_length = ''
connection_parameters = ''
params.append(f'Role={HCI_Constant.role_name(connection.role)}')
mtu = connection.att_mtu
logging.info(
f'{color("@@@ Connection:", "yellow")} '
f'{connection_parameters} '
f'{data_length} '
f'{phy_state} '
f'MTU={mtu}'
)
logging.info(color('@@@ Connection: ', 'yellow') + ' '.join(params))
def make_sdp_records(channel):
@@ -214,6 +211,17 @@ def log_stats(title, stats):
)
async def switch_roles(connection, role):
target_role = HCI_CENTRAL_ROLE if role == "central" else HCI_PERIPHERAL_ROLE
if connection.role != target_role:
logging.info(f'{color("### Switching roles to:", "cyan")} {role}')
try:
await connection.switch_role(target_role)
logging.info(color('### Role switch complete', 'cyan'))
except HCI_Error as error:
logging.info(f'{color("### Role switch failed:", "red")} {error}')
class PacketType(enum.IntEnum):
RESET = 0
SEQUENCE = 1
@@ -899,14 +907,26 @@ class L2capServer(StreamedPacketIO):
# RfcommClient
# -----------------------------------------------------------------------------
class RfcommClient(StreamedPacketIO):
def __init__(self, device, channel, uuid, l2cap_mtu, max_frame_size, window_size):
def __init__(
self,
device,
channel,
uuid,
l2cap_mtu,
max_frame_size,
initial_credits,
max_credits,
credits_threshold,
):
super().__init__()
self.device = device
self.channel = channel
self.uuid = uuid
self.l2cap_mtu = l2cap_mtu
self.max_frame_size = max_frame_size
self.window_size = window_size
self.initial_credits = initial_credits
self.max_credits = max_credits
self.credits_threshold = credits_threshold
self.rfcomm_session = None
self.ready = asyncio.Event()
@@ -940,12 +960,17 @@ class RfcommClient(StreamedPacketIO):
logging.info(color(f'### Opening session for channel {channel}...', 'yellow'))
try:
dlc_options = {}
if self.max_frame_size:
if self.max_frame_size is not None:
dlc_options['max_frame_size'] = self.max_frame_size
if self.window_size:
dlc_options['window_size'] = self.window_size
if self.initial_credits is not None:
dlc_options['initial_credits'] = self.initial_credits
rfcomm_session = await rfcomm_mux.open_dlc(channel, **dlc_options)
logging.info(color(f'### Session open: {rfcomm_session}', 'yellow'))
if self.max_credits is not None:
rfcomm_session.rx_max_credits = self.max_credits
if self.credits_threshold is not None:
rfcomm_session.rx_credits_threshold = self.credits_threshold
except bumble.core.ConnectionError as error:
logging.info(color(f'!!! Session open failed: {error}', 'red'))
await rfcomm_mux.disconnect()
@@ -969,8 +994,19 @@ class RfcommClient(StreamedPacketIO):
# RfcommServer
# -----------------------------------------------------------------------------
class RfcommServer(StreamedPacketIO):
def __init__(self, device, channel, l2cap_mtu):
def __init__(
self,
device,
channel,
l2cap_mtu,
max_frame_size,
initial_credits,
max_credits,
credits_threshold,
):
super().__init__()
self.max_credits = max_credits
self.credits_threshold = credits_threshold
self.dlc = None
self.ready = asyncio.Event()
@@ -981,7 +1017,12 @@ class RfcommServer(StreamedPacketIO):
rfcomm_server = bumble.rfcomm.Server(device, **server_options)
# Listen for incoming DLC connections
channel_number = rfcomm_server.listen(self.on_dlc, channel)
dlc_options = {}
if max_frame_size is not None:
dlc_options['max_frame_size'] = max_frame_size
if initial_credits is not None:
dlc_options['initial_credits'] = initial_credits
channel_number = rfcomm_server.listen(self.on_dlc, channel, **dlc_options)
# Setup the SDP to advertise this channel
device.sdp_service_records = make_sdp_records(channel_number)
@@ -1001,9 +1042,17 @@ class RfcommServer(StreamedPacketIO):
def on_dlc(self, dlc):
logging.info(color(f'*** DLC connected: {dlc}', 'blue'))
if self.credits_threshold is not None:
dlc.rx_threshold = self.credits_threshold
if self.max_credits is not None:
dlc.rx_max_credits = self.max_credits
dlc.sink = self.on_packet
self.io_sink = dlc.write
self.dlc = dlc
if self.max_credits is not None:
dlc.rx_max_credits = self.max_credits
if self.credits_threshold is not None:
dlc.rx_credits_threshold = self.credits_threshold
async def drain(self):
assert self.dlc
@@ -1026,6 +1075,7 @@ class Central(Connection.Listener):
authenticate,
encrypt,
extended_data_length,
role_switch,
):
super().__init__()
self.transport = transport
@@ -1036,6 +1086,7 @@ class Central(Connection.Listener):
self.authenticate = authenticate
self.encrypt = encrypt or authenticate
self.extended_data_length = extended_data_length
self.role_switch = role_switch
self.device = None
self.connection = None
@@ -1086,6 +1137,11 @@ class Central(Connection.Listener):
role = self.role_factory(mode)
self.device.classic_enabled = self.classic
# Set up a pairing config factory with minimal requirements.
self.device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
await self.device.power_on()
if self.classic:
@@ -1114,6 +1170,10 @@ class Central(Connection.Listener):
self.connection.listener = self
print_connection(self.connection)
# Switch roles if needed.
if self.role_switch:
await switch_roles(self.connection, self.role_switch)
# Wait a bit after the connection, some controllers aren't very good when
# we start sending data right away while some connection parameters are
# updated post connection
@@ -1175,20 +1235,30 @@ class Central(Connection.Listener):
def on_connection_data_length_change(self):
print_connection(self.connection)
def on_role_change(self):
print_connection(self.connection)
# -----------------------------------------------------------------------------
# Peripheral
# -----------------------------------------------------------------------------
class Peripheral(Device.Listener, Connection.Listener):
def __init__(
self, transport, classic, extended_data_length, role_factory, mode_factory
self,
transport,
role_factory,
mode_factory,
classic,
extended_data_length,
role_switch,
):
self.transport = transport
self.classic = classic
self.extended_data_length = extended_data_length
self.role_factory = role_factory
self.role = None
self.mode_factory = mode_factory
self.extended_data_length = extended_data_length
self.role_switch = role_switch
self.role = None
self.mode = None
self.device = None
self.connection = None
@@ -1211,6 +1281,11 @@ class Peripheral(Device.Listener, Connection.Listener):
self.role = self.role_factory(self.mode)
self.device.classic_enabled = self.classic
# Set up a pairing config factory with minimal requirements.
self.device.pairing_config_factory = lambda _: PairingConfig(
sc=False, mitm=False, bonding=False
)
await self.device.power_on()
if self.classic:
@@ -1237,6 +1312,7 @@ class Peripheral(Device.Listener, Connection.Listener):
await self.connected.wait()
logging.info(color('### Connected', 'cyan'))
print_connection(self.connection)
await self.mode.on_connection(self.connection)
await self.role.run()
@@ -1253,7 +1329,7 @@ class Peripheral(Device.Listener, Connection.Listener):
AsyncRunner.spawn(self.device.set_connectable(False))
# Request a new data length if needed
if self.extended_data_length:
if not self.classic and self.extended_data_length:
logging.info("+++ Requesting extended data length")
AsyncRunner.spawn(
connection.set_data_length(
@@ -1261,6 +1337,10 @@ class Peripheral(Device.Listener, Connection.Listener):
)
)
# Switch roles if needed.
if self.role_switch:
AsyncRunner.spawn(switch_roles(connection, self.role_switch))
def on_disconnection(self, reason):
logging.info(color(f'!!! Disconnection: reason={reason}', 'red'))
self.connection = None
@@ -1282,6 +1362,9 @@ class Peripheral(Device.Listener, Connection.Listener):
def on_connection_data_length_change(self):
print_connection(self.connection)
def on_role_change(self):
print_connection(self.connection)
# -----------------------------------------------------------------------------
def create_mode_factory(ctx, default_mode):
@@ -1321,7 +1404,9 @@ def create_mode_factory(ctx, default_mode):
uuid=ctx.obj['rfcomm_uuid'],
l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'],
max_frame_size=ctx.obj['rfcomm_max_frame_size'],
window_size=ctx.obj['rfcomm_window_size'],
initial_credits=ctx.obj['rfcomm_initial_credits'],
max_credits=ctx.obj['rfcomm_max_credits'],
credits_threshold=ctx.obj['rfcomm_credits_threshold'],
)
if mode == 'rfcomm-server':
@@ -1329,6 +1414,10 @@ def create_mode_factory(ctx, default_mode):
device,
channel=ctx.obj['rfcomm_channel'],
l2cap_mtu=ctx.obj['rfcomm_l2cap_mtu'],
max_frame_size=ctx.obj['rfcomm_max_frame_size'],
initial_credits=ctx.obj['rfcomm_initial_credits'],
max_credits=ctx.obj['rfcomm_max_credits'],
credits_threshold=ctx.obj['rfcomm_credits_threshold'],
)
raise ValueError('invalid mode')
@@ -1405,6 +1494,11 @@ def create_role_factory(ctx, default_role):
'--extended-data-length',
help='Request a data length upon connection, specified as tx_octets/tx_time',
)
@click.option(
'--role-switch',
type=click.Choice(['central', 'peripheral']),
help='Request role switch upon connection (central or peripheral)',
)
@click.option(
'--rfcomm-channel',
type=int,
@@ -1427,9 +1521,19 @@ def create_role_factory(ctx, default_role):
help='RFComm maximum frame size',
)
@click.option(
'--rfcomm-window-size',
'--rfcomm-initial-credits',
type=int,
help='RFComm window size',
help='RFComm initial credits',
)
@click.option(
'--rfcomm-max-credits',
type=int,
help='RFComm max credits',
)
@click.option(
'--rfcomm-credits-threshold',
type=int,
help='RFComm credits threshold',
)
@click.option(
'--l2cap-psm',
@@ -1459,7 +1563,7 @@ def create_role_factory(ctx, default_role):
'--packet-size',
'-s',
metavar='SIZE',
type=click.IntRange(8, 4096),
type=click.IntRange(8, 8192),
default=500,
help='Packet size (client or ping role)',
)
@@ -1519,6 +1623,7 @@ def bench(
mode,
att_mtu,
extended_data_length,
role_switch,
packet_size,
packet_count,
start_delay,
@@ -1530,7 +1635,9 @@ def bench(
rfcomm_uuid,
rfcomm_l2cap_mtu,
rfcomm_max_frame_size,
rfcomm_window_size,
rfcomm_initial_credits,
rfcomm_max_credits,
rfcomm_credits_threshold,
l2cap_psm,
l2cap_mtu,
l2cap_mps,
@@ -1545,7 +1652,9 @@ def bench(
ctx.obj['rfcomm_uuid'] = rfcomm_uuid
ctx.obj['rfcomm_l2cap_mtu'] = rfcomm_l2cap_mtu
ctx.obj['rfcomm_max_frame_size'] = rfcomm_max_frame_size
ctx.obj['rfcomm_window_size'] = rfcomm_window_size
ctx.obj['rfcomm_initial_credits'] = rfcomm_initial_credits
ctx.obj['rfcomm_max_credits'] = rfcomm_max_credits
ctx.obj['rfcomm_credits_threshold'] = rfcomm_credits_threshold
ctx.obj['l2cap_psm'] = l2cap_psm
ctx.obj['l2cap_mtu'] = l2cap_mtu
ctx.obj['l2cap_mps'] = l2cap_mps
@@ -1557,12 +1666,12 @@ def bench(
ctx.obj['repeat_delay'] = repeat_delay
ctx.obj['pace'] = pace
ctx.obj['linger'] = linger
ctx.obj['extended_data_length'] = (
[int(x) for x in extended_data_length.split('/')]
if extended_data_length
else None
)
ctx.obj['role_switch'] = role_switch
ctx.obj['classic'] = mode in ('rfcomm-client', 'rfcomm-server')
@@ -1606,6 +1715,7 @@ def central(
authenticate,
encrypt or authenticate,
ctx.obj['extended_data_length'],
ctx.obj['role_switch'],
).run()
asyncio.run(run_central())
@@ -1622,10 +1732,11 @@ def peripheral(ctx, transport):
async def run_peripheral():
await Peripheral(
transport,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
role_factory,
mode_factory,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
ctx.obj['role_switch'],
).run()
asyncio.run(run_peripheral())

View File

@@ -27,7 +27,7 @@ from bumble.colors import color
from bumble.core import name_or_number
from bumble.hci import (
map_null_terminated_utf8_string,
LeFeatureMask,
LeFeature,
HCI_SUCCESS,
HCI_VERSION_NAMES,
LMP_VERSION_NAMES,
@@ -140,7 +140,7 @@ async def get_le_info(host: Host) -> None:
print(color('LE Features:', 'yellow'))
for feature in host.supported_le_features:
print(LeFeatureMask(feature).name)
print(f' {LeFeature(feature).name}')
# -----------------------------------------------------------------------------
@@ -224,7 +224,7 @@ async def async_main(latency_probes, transport):
print()
print(color('Supported Commands:', 'yellow'))
for command in host.supported_commands:
print(' ', HCI_Command.command_name(command))
print(f' {HCI_Command.command_name(command)}')
# -----------------------------------------------------------------------------

577
apps/lea_unicast/app.py Normal file
View File

@@ -0,0 +1,577 @@
# Copyright 2021-2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import datetime
import enum
import functools
from importlib import resources
import json
import os
import logging
import pathlib
from typing import Optional, List, cast
import weakref
import struct
import ctypes
import wasmtime
import wasmtime.loader
import liblc3 # type: ignore
import logging
import click
import aiohttp.web
import bumble
from bumble.core import AdvertisingData
from bumble.colors import color
from bumble.device import Device, DeviceConfiguration, AdvertisingParameters
from bumble.transport import open_transport
from bumble.profiles import bap
from bumble.hci import Address, CodecID, CodingFormat, HCI_IsoDataPacket
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
DEFAULT_UI_PORT = 7654
def _sink_pac_record() -> bap.PacRecord:
return bap.PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=bap.CodecSpecificCapabilities(
supported_sampling_frequencies=(
bap.SupportedSamplingFrequency.FREQ_8000
| bap.SupportedSamplingFrequency.FREQ_16000
| bap.SupportedSamplingFrequency.FREQ_24000
| bap.SupportedSamplingFrequency.FREQ_32000
| bap.SupportedSamplingFrequency.FREQ_48000
),
supported_frame_durations=(
bap.SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_count=[1, 2],
min_octets_per_codec_frame=26,
max_octets_per_codec_frame=240,
supported_max_codec_frames_per_sdu=2,
),
)
def _source_pac_record() -> bap.PacRecord:
return bap.PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=bap.CodecSpecificCapabilities(
supported_sampling_frequencies=(
bap.SupportedSamplingFrequency.FREQ_8000
| bap.SupportedSamplingFrequency.FREQ_16000
| bap.SupportedSamplingFrequency.FREQ_24000
| bap.SupportedSamplingFrequency.FREQ_32000
| bap.SupportedSamplingFrequency.FREQ_48000
),
supported_frame_durations=(
bap.SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_count=[1],
min_octets_per_codec_frame=30,
max_octets_per_codec_frame=100,
supported_max_codec_frames_per_sdu=1,
),
)
# -----------------------------------------------------------------------------
# WASM - liblc3
# -----------------------------------------------------------------------------
store = wasmtime.loader.store
_memory = cast(wasmtime.Memory, liblc3.memory)
STACK_POINTER = _memory.data_len(store)
_memory.grow(store, 1)
# Mapping wasmtime memory to linear address
memory = (ctypes.c_ubyte * _memory.data_len(store)).from_address(
ctypes.addressof(_memory.data_ptr(store).contents) # type: ignore
)
class Liblc3PcmFormat(enum.IntEnum):
S16 = 0
S24 = 1
S24_3LE = 2
FLOAT = 3
MAX_DECODER_SIZE = liblc3.lc3_decoder_size(10000, 48000)
MAX_ENCODER_SIZE = liblc3.lc3_encoder_size(10000, 48000)
DECODER_STACK_POINTER = STACK_POINTER
ENCODER_STACK_POINTER = DECODER_STACK_POINTER + MAX_DECODER_SIZE * 2
DECODE_BUFFER_STACK_POINTER = ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * 2
ENCODE_BUFFER_STACK_POINTER = DECODE_BUFFER_STACK_POINTER + 8192
DEFAULT_PCM_SAMPLE_RATE = 48000
DEFAULT_PCM_FORMAT = Liblc3PcmFormat.S16
DEFAULT_PCM_BYTES_PER_SAMPLE = 2
encoders: List[int] = []
decoders: List[int] = []
def setup_encoders(
sample_rate_hz: int, frame_duration_us: int, num_channels: int
) -> None:
logger.info(
f"setup_encoders {sample_rate_hz}Hz {frame_duration_us}us {num_channels}channels"
)
encoders[:num_channels] = [
liblc3.lc3_setup_encoder(
frame_duration_us,
sample_rate_hz,
DEFAULT_PCM_SAMPLE_RATE, # Input sample rate
ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * i,
)
for i in range(num_channels)
]
def setup_decoders(
sample_rate_hz: int, frame_duration_us: int, num_channels: int
) -> None:
logger.info(
f"setup_decoders {sample_rate_hz}Hz {frame_duration_us}us {num_channels}channels"
)
decoders[:num_channels] = [
liblc3.lc3_setup_decoder(
frame_duration_us,
sample_rate_hz,
DEFAULT_PCM_SAMPLE_RATE, # Output sample rate
DECODER_STACK_POINTER + MAX_DECODER_SIZE * i,
)
for i in range(num_channels)
]
def decode(
frame_duration_us: int,
num_channels: int,
input_bytes: bytes,
) -> bytes:
if not input_bytes:
return b''
input_buffer_offset = DECODE_BUFFER_STACK_POINTER
input_buffer_size = len(input_bytes)
input_bytes_per_frame = input_buffer_size // num_channels
# Copy into wasm
memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_bytes # type: ignore
output_buffer_offset = input_buffer_offset + input_buffer_size
output_buffer_size = (
liblc3.lc3_frame_samples(frame_duration_us, DEFAULT_PCM_SAMPLE_RATE)
* DEFAULT_PCM_BYTES_PER_SAMPLE
* num_channels
)
for i in range(num_channels):
res = liblc3.lc3_decode(
decoders[i],
input_buffer_offset + input_bytes_per_frame * i,
input_bytes_per_frame,
DEFAULT_PCM_FORMAT,
output_buffer_offset + i * DEFAULT_PCM_BYTES_PER_SAMPLE,
num_channels, # Stride
)
if res != 0:
logging.error(f"Parsing failed, res={res}")
# Extract decoded data from the output buffer
return bytes(
memory[output_buffer_offset : output_buffer_offset + output_buffer_size]
)
def encode(
sdu_length: int,
num_channels: int,
stride: int,
input_bytes: bytes,
) -> bytes:
if not input_bytes:
return b''
input_buffer_offset = ENCODE_BUFFER_STACK_POINTER
input_buffer_size = len(input_bytes)
# Copy into wasm
memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_bytes # type: ignore
output_buffer_offset = input_buffer_offset + input_buffer_size
output_buffer_size = sdu_length
output_frame_size = output_buffer_size // num_channels
for i in range(num_channels):
res = liblc3.lc3_encode(
encoders[i],
DEFAULT_PCM_FORMAT,
input_buffer_offset + DEFAULT_PCM_BYTES_PER_SAMPLE * i,
stride,
output_frame_size,
output_buffer_offset + output_frame_size * i,
)
if res != 0:
logging.error(f"Parsing failed, res={res}")
# Extract decoded data from the output buffer
return bytes(
memory[output_buffer_offset : output_buffer_offset + output_buffer_size]
)
async def lc3_source_task(
filename: str,
sdu_length: int,
frame_duration_us: int,
device: Device,
cis_handle: int,
) -> None:
with open(filename, 'rb') as f:
header = f.read(44)
assert header[8:12] == b'WAVE'
pcm_num_channel, pcm_sample_rate, _byte_rate, _block_align, bits_per_sample = (
struct.unpack("<HIIHH", header[22:36])
)
assert pcm_sample_rate == DEFAULT_PCM_SAMPLE_RATE
assert bits_per_sample == DEFAULT_PCM_BYTES_PER_SAMPLE * 8
frame_bytes = (
liblc3.lc3_frame_samples(frame_duration_us, DEFAULT_PCM_SAMPLE_RATE)
* DEFAULT_PCM_BYTES_PER_SAMPLE
)
packet_sequence_number = 0
while True:
next_round = datetime.datetime.now() + datetime.timedelta(
microseconds=frame_duration_us
)
pcm_data = f.read(frame_bytes)
sdu = encode(sdu_length, pcm_num_channel, pcm_num_channel, pcm_data)
iso_packet = HCI_IsoDataPacket(
connection_handle=cis_handle,
data_total_length=sdu_length + 4,
packet_sequence_number=packet_sequence_number,
pb_flag=0b10,
packet_status_flag=0,
iso_sdu_length=sdu_length,
iso_sdu_fragment=sdu,
)
device.host.send_hci_packet(iso_packet)
packet_sequence_number += 1
sleep_time = next_round - datetime.datetime.now()
await asyncio.sleep(sleep_time.total_seconds())
# -----------------------------------------------------------------------------
class UiServer:
speaker: weakref.ReferenceType[Speaker]
port: int
def __init__(self, speaker: Speaker, port: int) -> None:
self.speaker = weakref.ref(speaker)
self.port = port
self.channel_socket = None
async def start_http(self) -> None:
"""Start the UI HTTP server."""
app = aiohttp.web.Application()
app.add_routes(
[
aiohttp.web.get('/', self.get_static),
aiohttp.web.get('/index.html', self.get_static),
aiohttp.web.get('/channel', self.get_channel),
]
)
runner = aiohttp.web.AppRunner(app)
await runner.setup()
site = aiohttp.web.TCPSite(runner, 'localhost', self.port)
print('UI HTTP server at ' + color(f'http://127.0.0.1:{self.port}', 'green'))
await site.start()
async def get_static(self, request):
path = request.path
if path == '/':
path = '/index.html'
if path.endswith('.html'):
content_type = 'text/html'
elif path.endswith('.js'):
content_type = 'text/javascript'
elif path.endswith('.css'):
content_type = 'text/css'
elif path.endswith('.svg'):
content_type = 'image/svg+xml'
else:
content_type = 'text/plain'
text = (
resources.files("bumble.apps.lea_unicast")
.joinpath(pathlib.Path(path).relative_to('/'))
.read_text(encoding="utf-8")
)
return aiohttp.web.Response(text=text, content_type=content_type)
async def get_channel(self, request):
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
# Process messages until the socket is closed.
self.channel_socket = ws
async for message in ws:
if message.type == aiohttp.WSMsgType.TEXT:
logger.debug(f'<<< received message: {message.data}')
await self.on_message(message.data)
elif message.type == aiohttp.WSMsgType.ERROR:
logger.debug(
f'channel connection closed with exception {ws.exception()}'
)
self.channel_socket = None
logger.debug('--- channel connection closed')
return ws
async def on_message(self, message_str: str):
# Parse the message as JSON
message = json.loads(message_str)
# Dispatch the message
message_type = message['type']
message_params = message.get('params', {})
handler = getattr(self, f'on_{message_type}_message')
if handler:
await handler(**message_params)
async def on_hello_message(self):
await self.send_message(
'hello',
bumble_version=bumble.__version__,
codec=self.speaker().codec,
streamState=self.speaker().stream_state.name,
)
if connection := self.speaker().connection:
await self.send_message(
'connection',
peer_address=connection.peer_address.to_string(False),
peer_name=connection.peer_name,
)
async def send_message(self, message_type: str, **kwargs) -> None:
if self.channel_socket is None:
return
message = {'type': message_type, 'params': kwargs}
await self.channel_socket.send_json(message)
async def send_audio(self, data: bytes) -> None:
if self.channel_socket is None:
return
try:
await self.channel_socket.send_bytes(data)
except Exception as error:
logger.warning(f'exception while sending audio packet: {error}')
# -----------------------------------------------------------------------------
class Speaker:
def __init__(
self,
device_config_path: Optional[str],
ui_port: int,
transport: str,
lc3_input_file_path: str,
):
self.device_config_path = device_config_path
self.transport = transport
self.lc3_input_file_path = lc3_input_file_path
# Create an HTTP server for the UI
self.ui_server = UiServer(speaker=self, port=ui_port)
async def run(self) -> None:
await self.ui_server.start_http()
async with await open_transport(self.transport) as hci_transport:
# Create a device
if self.device_config_path:
device_config = DeviceConfiguration.from_file(self.device_config_path)
else:
device_config = DeviceConfiguration(
name="Bumble LE Headphone",
class_of_device=0x244418,
keystore="JsonKeyStore",
advertising_interval_min=25,
advertising_interval_max=25,
address=Address('F1:F2:F3:F4:F5:F6'),
)
device_config.le_enabled = True
device_config.cis_enabled = True
self.device = Device.from_config_with_hci(
device_config, hci_transport.source, hci_transport.sink
)
self.device.add_service(
bap.PublishedAudioCapabilitiesService(
supported_source_context=bap.ContextType(0xFFFF),
available_source_context=bap.ContextType(0xFFFF),
supported_sink_context=bap.ContextType(0xFFFF), # All context types
available_sink_context=bap.ContextType(0xFFFF), # All context types
sink_audio_locations=(
bap.AudioLocation.FRONT_LEFT | bap.AudioLocation.FRONT_RIGHT
),
sink_pac=[_sink_pac_record()],
source_audio_locations=bap.AudioLocation.FRONT_LEFT,
source_pac=[_source_pac_record()],
)
)
ascs = bap.AudioStreamControlService(
self.device, sink_ase_id=[1], source_ase_id=[2]
)
self.device.add_service(ascs)
advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes(device_config.name, 'utf-8'),
),
(
AdvertisingData.FLAGS,
bytes([AdvertisingData.LE_GENERAL_DISCOVERABLE_MODE_FLAG]),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(bap.PublishedAudioCapabilitiesService.UUID),
),
]
)
) + bytes(bap.UnicastServerAdvertisingData())
def on_pdu(pdu: HCI_IsoDataPacket, ase: bap.AseStateMachine):
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
pcm = decode(
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
pdu.iso_sdu_fragment,
)
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: bap.AseStateMachine) -> None:
if ase.state == bap.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == bap.AudioRole.SOURCE:
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
filename=self.lc3_input_file_path,
sdu_length=(
codec_config.codec_frames_per_sdu
* codec_config.octets_per_codec_frame
),
frame_duration_us=codec_config.frame_duration.us,
device=self.device,
cis_handle=ase.cis_link.handle,
),
)
else:
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == bap.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if ase.role == bap.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
)
else:
setup_decoders(
codec_config.sampling_frequency.hz,
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
)
for ase in ascs.ase_state_machines.values():
ase.on('state_change', functools.partial(on_ase_state_change, ase=ase))
await self.device.power_on()
await self.device.create_advertising_set(
advertising_data=advertising_data,
auto_restart=True,
advertising_parameters=AdvertisingParameters(
primary_advertising_interval_min=100,
primary_advertising_interval_max=100,
),
)
await hci_transport.source.terminated
@click.command()
@click.option(
'--ui-port',
'ui_port',
metavar='HTTP_PORT',
default=DEFAULT_UI_PORT,
show_default=True,
help='HTTP port for the UI server',
)
@click.option('--device-config', metavar='FILENAME', help='Device configuration file')
@click.argument('transport')
@click.argument('lc3_file')
def speaker(ui_port: int, device_config: str, transport: str, lc3_file: str) -> None:
"""Run the speaker."""
asyncio.run(Speaker(device_config, ui_port, transport, lc3_file).run())
# -----------------------------------------------------------------------------
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper())
speaker()
# -----------------------------------------------------------------------------
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter

View File

@@ -0,0 +1,68 @@
<html data-bs-theme="dark">
<head>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.2/dist/css/bootstrap.min.css" rel="stylesheet"
integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous">
<script src="https://unpkg.com/pcm-player"></script>
</head>
<body>
<nav class="navbar navbar-dark bg-primary">
<div class="container">
<span class="navbar-brand mb-0 h1">Bumble Unicast Server</span>
</div>
</nav>
<br>
<div class="container">
<button type="button" class="btn btn-danger" id="connect-audio" onclick="connectAudio()">Connect Audio</button>
<button class="btn btn-primary" type="button" disabled>
<span class="spinner-border spinner-border-sm" id="ws-status-spinner" aria-hidden="true"></span>
<span role="status" id="ws-status">WebSocket Connecting...</span>
</button>
</div>
<script>
let player = null;
const wsStatus = document.getElementById("ws-status");
const wsStatusSpinner = document.getElementById("ws-status-spinner");
const socket = new WebSocket('ws://127.0.0.1:7654/channel');
socket.binaryType = "arraybuffer";
socket.onmessage = function (message) {
if (typeof message.data === 'string' || message.data instanceof String) {
console.log(`channel MESSAGE: ${message.data}`);
} else {
console.log(typeof (message.data))
// BINARY audio data.
if (player == null) return;
player.feed(message.data);
}
};
socket.onopen = (message) => {
wsStatusSpinner.remove();
wsStatus.textContent = "WebSocket Connected";
}
socket.onclose = (message) => {
wsStatus.textContent = "WebSocket Disconnected";
}
function connectAudio() {
player = new PCMPlayer({
inputCodec: 'Int16',
channels: 2,
sampleRate: 48000,
flushTime: 10,
});
const button = document.getElementById("connect-audio")
button.disabled = true;
button.textContent = "Audio Connected";
}
</script>
</div>
</body>
</html>

BIN
apps/lea_unicast/liblc3.wasm Executable file

Binary file not shown.

511
apps/rfcomm_bridge.py Normal file
View File

@@ -0,0 +1,511 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import time
from typing import Optional
import click
from bumble.colors import color
from bumble.device import Device, DeviceConfiguration, Connection
from bumble import core
from bumble import hci
from bumble import rfcomm
from bumble import transport
from bumble import utils
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
DEFAULT_RFCOMM_UUID = "E6D55659-C8B4-4B85-96BB-B1143AF6D3AE"
DEFAULT_MTU = 4096
DEFAULT_CLIENT_TCP_PORT = 9544
DEFAULT_SERVER_TCP_PORT = 9545
TRACE_MAX_SIZE = 48
# -----------------------------------------------------------------------------
class Tracer:
"""
Trace data buffers transmitted from one endpoint to another, with stats.
"""
def __init__(self, channel_name: str) -> None:
self.channel_name = channel_name
self.last_ts: float = 0.0
def trace_data(self, data: bytes) -> None:
now = time.time()
elapsed_s = now - self.last_ts if self.last_ts else 0
elapsed_ms = int(elapsed_s * 1000)
instant_throughput_kbps = ((len(data) / elapsed_s) / 1000) if elapsed_s else 0.0
hex_str = data[:TRACE_MAX_SIZE].hex() + (
"..." if len(data) > TRACE_MAX_SIZE else ""
)
print(
f"[{self.channel_name}] {len(data):4} bytes "
f"(+{elapsed_ms:4}ms, {instant_throughput_kbps: 7.2f}kB/s) "
f" {hex_str}"
)
self.last_ts = now
# -----------------------------------------------------------------------------
class ServerBridge:
"""
RFCOMM server bridge: waits for a peer to connect an RFCOMM channel.
The RFCOMM channel may be associated with a UUID published in an SDP service
description, or simply be on a system-assigned channel number.
When the connection is made, the bridge connects a TCP socket to a remote host and
bridges the data in both directions, with flow control.
When the RFCOMM channel is closed, the bridge disconnects the TCP socket
and waits for a new channel to be connected.
"""
READ_CHUNK_SIZE = 4096
def __init__(
self, channel: int, uuid: str, trace: bool, tcp_host: str, tcp_port: int
) -> None:
self.device: Optional[Device] = None
self.channel = channel
self.uuid = uuid
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.rfcomm_channel: Optional[rfcomm.DLC] = None
self.tcp_tracer: Optional[Tracer]
self.rfcomm_tracer: Optional[Tracer]
if trace:
self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan"))
self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta"))
else:
self.rfcomm_tracer = None
self.tcp_tracer = None
async def start(self, device: Device) -> None:
self.device = device
# Create and register a server
rfcomm_server = rfcomm.Server(self.device)
# Listen for incoming DLC connections
self.channel = rfcomm_server.listen(self.on_rfcomm_channel, self.channel)
# Setup the SDP to advertise this channel
service_record_handle = 0x00010001
self.device.sdp_service_records = {
service_record_handle: rfcomm.make_service_sdp_records(
service_record_handle, self.channel, core.UUID(self.uuid)
)
}
# We're ready for a connection
self.device.on("connection", self.on_connection)
await self.set_available(True)
print(
color(
(
f"### Listening for RFCOMM connection on {device.public_address}, "
f"channel {self.channel}"
),
"yellow",
)
)
async def set_available(self, available: bool):
# Become discoverable and connectable
assert self.device
await self.device.set_connectable(available)
await self.device.set_discoverable(available)
def on_connection(self, connection):
print(color(f"@@@ Bluetooth connection: {connection}", "blue"))
connection.on("disconnection", self.on_disconnection)
# Don't accept new connections until we're disconnected
utils.AsyncRunner.spawn(self.set_available(False))
def on_disconnection(self, reason: int):
print(
color("@@@ Bluetooth disconnection:", "red"),
hci.HCI_Constant.error_name(reason),
)
# We're ready for a new connection
utils.AsyncRunner.spawn(self.set_available(True))
# Called when an RFCOMM channel is established
@utils.AsyncRunner.run_in_task()
async def on_rfcomm_channel(self, rfcomm_channel):
print(color("*** RFCOMM channel:", "cyan"), rfcomm_channel)
# Connect to the TCP server
print(
color(
f"### Connecting to TCP {self.tcp_host}:{self.tcp_port}",
"yellow",
)
)
try:
reader, writer = await asyncio.open_connection(self.tcp_host, self.tcp_port)
except OSError:
print(color("!!! Connection failed", "red"))
await rfcomm_channel.disconnect()
return
# Pipe data from RFCOMM to TCP
def on_rfcomm_channel_closed():
print(color("*** RFCOMM channel closed", "cyan"))
writer.close()
def write_rfcomm_data(data):
if self.rfcomm_tracer:
self.rfcomm_tracer.trace_data(data)
writer.write(data)
rfcomm_channel.sink = write_rfcomm_data
rfcomm_channel.on("close", on_rfcomm_channel_closed)
# Pipe data from TCP to RFCOMM
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color("### TCP end of stream", "yellow"))
if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED:
await rfcomm_channel.disconnect()
return
if self.tcp_tracer:
self.tcp_tracer.trace_data(data)
rfcomm_channel.write(data)
await rfcomm_channel.drain()
except Exception as error:
print(f"!!! Exception: {error}")
break
writer.close()
await writer.wait_closed()
print(color("~~~ Bye bye", "magenta"))
# -----------------------------------------------------------------------------
class ClientBridge:
"""
RFCOMM client bridge: connects to a BR/EDR device, then waits for an inbound
TCP connection on a specified port number. When a TCP client connects, an
RFCOMM connection to the device is established, and the data is bridged in both
directions, with flow control.
When the TCP connection is closed by the client, the RFCOMM channel is
disconnected, but the connection to the device remains, ready for a new TCP client
to connect.
"""
READ_CHUNK_SIZE = 4096
def __init__(
self,
channel: int,
uuid: str,
trace: bool,
address: str,
tcp_host: str,
tcp_port: int,
encrypt: bool,
):
self.channel = channel
self.uuid = uuid
self.trace = trace
self.address = address
self.tcp_host = tcp_host
self.tcp_port = tcp_port
self.encrypt = encrypt
self.device: Optional[Device] = None
self.connection: Optional[Connection] = None
self.rfcomm_client: Optional[rfcomm.Client]
self.rfcomm_mux: Optional[rfcomm.Multiplexer]
self.tcp_connected: bool = False
self.tcp_tracer: Optional[Tracer]
self.rfcomm_tracer: Optional[Tracer]
if trace:
self.tcp_tracer = Tracer(color("RFCOMM->TCP", "cyan"))
self.rfcomm_tracer = Tracer(color("TCP->RFCOMM", "magenta"))
else:
self.rfcomm_tracer = None
self.tcp_tracer = None
async def connect(self) -> None:
if self.connection:
return
print(color(f"@@@ Connecting to Bluetooth {self.address}", "blue"))
assert self.device
self.connection = await self.device.connect(
self.address, transport=core.BT_BR_EDR_TRANSPORT
)
print(color(f"@@@ Bluetooth connection: {self.connection}", "blue"))
self.connection.on("disconnection", self.on_disconnection)
if self.encrypt:
print(color("@@@ Encrypting Bluetooth connection", "blue"))
await self.connection.encrypt()
print(color("@@@ Bluetooth connection encrypted", "blue"))
self.rfcomm_client = rfcomm.Client(self.connection)
try:
self.rfcomm_mux = await self.rfcomm_client.start()
except BaseException as e:
print(color("!!! Failed to setup RFCOMM connection", "red"), e)
raise
async def start(self, device: Device) -> None:
self.device = device
await device.set_connectable(False)
await device.set_discoverable(False)
# Called when a TCP connection is established
async def on_tcp_connection(reader, writer):
print(color("<<< TCP connection", "magenta"))
if self.tcp_connected:
print(
color("!!! TCP connection already active, rejecting new one", "red")
)
writer.close()
return
self.tcp_connected = True
try:
await self.pipe(reader, writer)
except BaseException as error:
print(color("!!! Exception while piping data:", "red"), error)
return
finally:
writer.close()
await writer.wait_closed()
self.tcp_connected = False
await asyncio.start_server(
on_tcp_connection,
host=self.tcp_host if self.tcp_host != "_" else None,
port=self.tcp_port,
)
print(
color(
f"### Listening for TCP connections on port {self.tcp_port}", "magenta"
)
)
async def pipe(
self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
# Resolve the channel number from the UUID if needed
if self.channel == 0:
await self.connect()
assert self.connection
channel = await rfcomm.find_rfcomm_channel_with_uuid(
self.connection, self.uuid
)
if channel:
print(color(f"### Found RFCOMM channel {channel}", "yellow"))
else:
print(color(f"!!! RFCOMM channel with UUID {self.uuid} not found"))
return
else:
channel = self.channel
# Connect a new RFCOMM channel
await self.connect()
assert self.rfcomm_mux
print(color(f"*** Opening RFCOMM channel {channel}", "green"))
try:
rfcomm_channel = await self.rfcomm_mux.open_dlc(channel)
print(color(f"*** RFCOMM channel open: {rfcomm_channel}", "green"))
except Exception as error:
print(color(f"!!! RFCOMM open failed: {error}", "red"))
return
# Pipe data from RFCOMM to TCP
def on_rfcomm_channel_closed():
print(color("*** RFCOMM channel closed", "green"))
def write_rfcomm_data(data):
if self.trace:
self.rfcomm_tracer.trace_data(data)
writer.write(data)
rfcomm_channel.on("close", on_rfcomm_channel_closed)
rfcomm_channel.sink = write_rfcomm_data
# Pipe data from TCP to RFCOMM
while True:
try:
data = await reader.read(self.READ_CHUNK_SIZE)
if len(data) == 0:
print(color("### TCP end of stream", "yellow"))
if rfcomm_channel.state == rfcomm.DLC.State.CONNECTED:
await rfcomm_channel.disconnect()
self.tcp_connected = False
return
if self.tcp_tracer:
self.tcp_tracer.trace_data(data)
rfcomm_channel.write(data)
await rfcomm_channel.drain()
except Exception as error:
print(f"!!! Exception: {error}")
break
print(color("~~~ Bye bye", "magenta"))
def on_disconnection(self, reason: int) -> None:
print(
color("@@@ Bluetooth disconnection:", "red"),
hci.HCI_Constant.error_name(reason),
)
self.connection = None
# -----------------------------------------------------------------------------
async def run(device_config, hci_transport, bridge):
print("<<< connecting to HCI...")
async with await transport.open_transport_or_link(hci_transport) as (
hci_source,
hci_sink,
):
print("<<< connected")
if device_config:
device = Device.from_config_file_with_hci(
device_config, hci_source, hci_sink
)
else:
device = Device.from_config_with_hci(
DeviceConfiguration(), hci_source, hci_sink
)
device.classic_enabled = True
# Let's go
await device.power_on()
try:
await bridge.start(device)
# Wait until the transport terminates
await hci_source.wait_for_termination()
except core.ConnectionError as error:
print(color(f"!!! Bluetooth connection failed: {error}", "red"))
except Exception as error:
print(f"Exception while running bridge: {error}")
# -----------------------------------------------------------------------------
@click.group()
@click.pass_context
@click.option(
"--device-config",
metavar="CONFIG_FILE",
help="Device configuration file",
)
@click.option(
"--hci-transport", metavar="TRANSPORT_NAME", help="HCI transport", required=True
)
@click.option("--trace", is_flag=True, help="Trace bridged data to stdout")
@click.option(
"--channel",
metavar="CHANNEL_NUMER",
help="RFCOMM channel number",
type=int,
default=0,
)
@click.option(
"--uuid",
metavar="UUID",
help="UUID for the RFCOMM channel",
default=DEFAULT_RFCOMM_UUID,
)
def cli(
context,
device_config,
hci_transport,
trace,
channel,
uuid,
):
context.ensure_object(dict)
context.obj["device_config"] = device_config
context.obj["hci_transport"] = hci_transport
context.obj["trace"] = trace
context.obj["channel"] = channel
context.obj["uuid"] = uuid
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.option("--tcp-host", help="TCP host", default="localhost")
@click.option("--tcp-port", help="TCP port", default=DEFAULT_SERVER_TCP_PORT)
def server(context, tcp_host, tcp_port):
bridge = ServerBridge(
context.obj["channel"],
context.obj["uuid"],
context.obj["trace"],
tcp_host,
tcp_port,
)
asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge))
# -----------------------------------------------------------------------------
@cli.command()
@click.pass_context
@click.argument("bluetooth-address")
@click.option("--tcp-host", help="TCP host", default="_")
@click.option("--tcp-port", help="TCP port", default=DEFAULT_CLIENT_TCP_PORT)
@click.option("--encrypt", is_flag=True, help="Encrypt the connection")
def client(context, bluetooth_address, tcp_host, tcp_port, encrypt):
bridge = ClientBridge(
context.obj["channel"],
context.obj["uuid"],
context.obj["trace"],
bluetooth_address,
tcp_host,
tcp_port,
encrypt,
)
asyncio.run(run(context.obj["device_config"], context.obj["hci_transport"], bridge))
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get("BUMBLE_LOGLEVEL", "WARNING").upper())
if __name__ == "__main__":
cli(obj={}) # pylint: disable=no-value-for-parameter

View File

@@ -17,12 +17,19 @@
# -----------------------------------------------------------------------------
from __future__ import annotations
from enum import IntEnum
import copy
import functools
import json
import asyncio
import logging
import secrets
from contextlib import asynccontextmanager, AsyncExitStack, closing
import sys
from contextlib import (
asynccontextmanager,
AsyncExitStack,
closing,
AbstractAsyncContextManager,
)
from dataclasses import dataclass, field
from collections.abc import Iterable
from typing import (
@@ -40,6 +47,7 @@ from typing import (
overload,
TYPE_CHECKING,
)
from typing_extensions import Self
from pyee import EventEmitter
@@ -959,8 +967,9 @@ class ScoLink(CompositeEventEmitter):
acl_connection: Connection
handle: int
link_type: int
sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None
def __post_init__(self):
def __post_init__(self) -> None:
super().__init__()
async def disconnect(
@@ -982,8 +991,9 @@ class CisLink(CompositeEventEmitter):
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
state: State = State.PENDING
sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None
def __post_init__(self):
def __post_init__(self) -> None:
super().__init__()
async def disconnect(
@@ -1252,75 +1262,47 @@ class Connection(CompositeEventEmitter):
# -----------------------------------------------------------------------------
@dataclass
class DeviceConfiguration:
def __init__(self) -> None:
# Setup defaults
self.name = DEVICE_DEFAULT_NAME
self.address = Address(DEVICE_DEFAULT_ADDRESS)
self.class_of_device = DEVICE_DEFAULT_CLASS_OF_DEVICE
self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL
self.le_enabled = True
# LE host enable 2nd parameter
self.le_simultaneous_enabled = False
self.classic_enabled = False
self.classic_sc_enabled = True
self.classic_ssp_enabled = True
self.classic_smp_enabled = True
self.classic_accept_any = True
self.connectable = True
self.discoverable = True
self.advertising_data = bytes(
AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]
)
# Setup defaults
name: str = DEVICE_DEFAULT_NAME
address: Address = Address(DEVICE_DEFAULT_ADDRESS)
class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE
scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA
advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL
le_enabled: bool = True
# LE host enable 2nd parameter
le_simultaneous_enabled: bool = False
classic_enabled: bool = False
classic_sc_enabled: bool = True
classic_ssp_enabled: bool = True
classic_smp_enabled: bool = True
classic_accept_any: bool = True
connectable: bool = True
discoverable: bool = True
advertising_data: bytes = bytes(
AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(DEVICE_DEFAULT_NAME, 'utf-8'))]
)
self.irk = bytes(16) # This really must be changed for any level of security
self.keystore = None
)
irk: bytes = bytes(16) # This really must be changed for any level of security
keystore: Optional[str] = None
address_resolution_offload: bool = False
cis_enabled: bool = False
def __post_init__(self) -> None:
self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False
self.cis_enabled = False
def load_from_dict(self, config: Dict[str, Any]) -> None:
config = copy.deepcopy(config)
# Load simple properties
self.name = config.get('name', self.name)
if address := config.get('address', None):
if address := config.pop('address', None):
self.address = Address(address)
self.class_of_device = config.get('class_of_device', self.class_of_device)
self.advertising_interval_min = config.get(
'advertising_interval', self.advertising_interval_min
)
self.advertising_interval_max = self.advertising_interval_min
self.keystore = config.get('keystore')
self.le_enabled = config.get('le_enabled', self.le_enabled)
self.le_simultaneous_enabled = config.get(
'le_simultaneous_enabled', self.le_simultaneous_enabled
)
self.classic_enabled = config.get('classic_enabled', self.classic_enabled)
self.classic_sc_enabled = config.get(
'classic_sc_enabled', self.classic_sc_enabled
)
self.classic_ssp_enabled = config.get(
'classic_ssp_enabled', self.classic_ssp_enabled
)
self.classic_smp_enabled = config.get(
'classic_smp_enabled', self.classic_smp_enabled
)
self.classic_accept_any = config.get(
'classic_accept_any', self.classic_accept_any
)
self.connectable = config.get('connectable', self.connectable)
self.discoverable = config.get('discoverable', self.discoverable)
self.gatt_services = config.get('gatt_services', self.gatt_services)
self.address_resolution_offload = config.get(
'address_resolution_offload', self.address_resolution_offload
)
self.cis_enabled = config.get('cis_enabled', self.cis_enabled)
# Load or synthesize an IRK
irk = config.get('irk')
if irk:
if irk := config.pop('irk', None):
self.irk = bytes.fromhex(irk)
elif self.address != Address(DEVICE_DEFAULT_ADDRESS):
# Construct an IRK from the address bytes
@@ -1332,21 +1314,53 @@ class DeviceConfiguration:
# Fallback - when both IRK and address are not set, randomly generate an IRK.
self.irk = secrets.token_bytes(16)
if (name := config.pop('name', None)) is not None:
self.name = name
# Load advertising data
advertising_data = config.get('advertising_data')
if advertising_data:
if advertising_data := config.pop('advertising_data', None):
self.advertising_data = bytes.fromhex(advertising_data)
elif config.get('name') is not None:
elif name is not None:
self.advertising_data = bytes(
AdvertisingData(
[(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))]
)
)
def load_from_file(self, filename):
# Load advertising interval (for backward compatibility)
if advertising_interval := config.pop('advertising_interval', None):
self.advertising_interval_min = advertising_interval
self.advertising_interval_max = advertising_interval
if (
'advertising_interval_max' in config
or 'advertising_interval_min' in config
):
logger.warning(
'Trying to set both advertising_interval and '
'advertising_interval_min/max, advertising_interval will be'
'ignored.'
)
# Load data in primitive types.
for key, value in config.items():
setattr(self, key, value)
def load_from_file(self, filename: str) -> None:
with open(filename, 'r', encoding='utf-8') as file:
self.load_from_dict(json.load(file))
@classmethod
def from_file(cls: Type[Self], filename: str) -> Self:
config = cls()
config.load_from_file(filename)
return config
@classmethod
def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self:
device_config = cls()
device_config.load_from_dict(config)
return device_config
# -----------------------------------------------------------------------------
# Decorators used with the following Device class
@@ -1470,8 +1484,7 @@ class Device(CompositeEventEmitter):
@classmethod
def from_config_file(cls, filename: str) -> Device:
config = DeviceConfiguration()
config.load_from_file(filename)
config = DeviceConfiguration.from_file(filename)
return cls(config=config)
@classmethod
@@ -1488,8 +1501,7 @@ class Device(CompositeEventEmitter):
def from_config_file_with_hci(
cls, filename: str, hci_source: TransportSource, hci_sink: TransportSink
) -> Device:
config = DeviceConfiguration()
config.load_from_file(filename)
config = DeviceConfiguration.from_file(filename)
return cls.from_config_with_hci(config, hci_source, hci_sink)
def __init__(
@@ -1529,6 +1541,12 @@ class Device(CompositeEventEmitter):
Address.ANY: []
} # Futures, by BD address OR [Futures] for Address.ANY
# In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated.
if sys.version_info >= (3, 10):
self._cis_lock = asyncio.Lock()
else:
self._cis_lock = AsyncExitStack()
# Own address type cache
self.connect_own_address_type = None
@@ -3402,49 +3420,71 @@ class Device(CompositeEventEmitter):
for cis_handle, _ in cis_acl_pairs
}
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if pending_future := pending_cis_establishments.get(cis_link.handle):
pending_future.set_result(cis_link)
result = await self.send_command(
def on_cis_establishment_failure(cis_handle: int, status: int) -> None:
if pending_future := pending_cis_establishments.get(cis_handle):
pending_future.set_exception(HCI_Error(status))
watcher.on(self, 'cis_establishment', on_cis_establishment)
watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure)
await self.send_command(
HCI_LE_Create_CIS_Command(
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
),
check_result=True,
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Create_CIS_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
return await asyncio.gather(*pending_cis_establishments.values())
# [LE only]
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
"""[LE Only] Accepts an incoming CIS request.
pending_cis_establishment = asyncio.get_running_loop().create_future()
When the specified CIS handle is already created, this method returns the
existed CIS link object immediately.
with closing(EventWatcher()) as watcher:
Args:
handle: CIS handle to accept.
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)
Returns:
CIS link object on the given handle.
"""
if not (cis_link := self.cis_links.get(handle)):
raise InvalidStateError(f'No pending CIS request of handle {handle}')
return await pending_cis_establishment
# There might be multiple ASE sharing a CIS channel.
# If one of them has accepted the request, the others should just leverage it.
async with self._cis_lock:
if cis_link.state == CisLink.State.ESTABLISHED:
return cis_link
with closing(EventWatcher()) as watcher:
pending_establishment = asyncio.get_running_loop().create_future()
def on_establishment() -> None:
pending_establishment.set_result(None)
def on_establishment_failure(status: int) -> None:
pending_establishment.set_exception(HCI_Error(status))
watcher.on(cis_link, 'establishment', on_establishment)
watcher.on(cis_link, 'establishment_failure', on_establishment_failure)
await self.send_command(
HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
check_result=True,
)
await pending_establishment
return cis_link
# Mypy believes this is reachable when context is an ExitStack.
raise InvalidStateError('Unreachable')
# [LE only]
@experimental('Only for testing.')
@@ -3453,15 +3493,10 @@ class Device(CompositeEventEmitter):
handle: int,
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
result = await self.send_command(
await self.send_command(
HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason),
check_result=True,
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Reject_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask:
"""[LE Only] Reads remote LE supported features.
@@ -3481,11 +3516,17 @@ class Device(CompositeEventEmitter):
if handle == connection.handle:
read_feature_future.set_result(LeFeatureMask(features))
def on_failure(handle: int, status: int):
if handle == connection.handle:
read_feature_future.set_exception(HCI_Error(status))
watcher.on(self.host, 'le_remote_features', on_le_remote_features)
watcher.on(self.host, 'le_remote_features_failure', on_failure)
await self.send_command(
HCI_LE_Read_Remote_Features_Command(
connection_handle=connection.handle
),
check_result=True,
)
return await read_feature_future
@@ -3660,7 +3701,6 @@ class Device(CompositeEventEmitter):
# We were connected via a legacy advertisement.
if self.legacy_advertiser:
own_address_type = self.legacy_advertiser.own_address_type
self.legacy_advertiser = None
else:
# This should not happen, but just in case, pick a default.
logger.warning("connection without an advertiser")
@@ -3691,15 +3731,14 @@ class Device(CompositeEventEmitter):
)
self.connections[connection_handle] = connection
if (
role == HCI_PERIPHERAL_ROLE
and self.legacy_advertiser
and self.legacy_advertiser.auto_restart
):
connection.once(
'disconnection',
lambda _: self.abort_on('flush', self.legacy_advertiser.start()),
)
if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser:
if self.legacy_advertiser.auto_restart:
connection.once(
'disconnection',
lambda _: self.abort_on('flush', self.legacy_advertiser.start()),
)
else:
self.legacy_advertiser = None
if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising:
# We can emit now, we have all the info we need
@@ -4107,8 +4146,8 @@ class Device(CompositeEventEmitter):
@host_event_handler
@experimental('Only for testing')
def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
if sco_link := self.sco_links.get(sco_handle):
sco_link.emit('pdu', packet)
if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink:
sco_link.sink(packet)
# [LE only]
@host_event_handler
@@ -4164,15 +4203,15 @@ class Device(CompositeEventEmitter):
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle):
cis_link.emit('establishment_failure')
cis_link.emit('establishment_failure', status)
self.emit('cis_establishment_failure', cis_handle, status)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
if cis_link := self.cis_links.get(handle):
cis_link.emit('pdu', packet)
if (cis_link := self.cis_links.get(handle)) and cis_link.sink:
cis_link.sink(packet)
@host_event_handler
@with_connection_from_handle

View File

@@ -23,11 +23,11 @@ import functools
import logging
import secrets
import struct
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
from bumble import crypto
from .colors import color
from .core import (
from bumble.colors import color
from bumble.core import (
BT_BR_EDR_TRANSPORT,
AdvertisingData,
DeviceClass,
@@ -36,6 +36,7 @@ from .core import (
name_or_number,
padded_bytes,
)
from bumble.utils import OpenIntEnum
# -----------------------------------------------------------------------------
@@ -1104,7 +1105,7 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
# LE Supported Features
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
class LeFeature(enum.IntEnum):
class LeFeature(OpenIntEnum):
LE_ENCRYPTION = 0
CONNECTION_PARAMETERS_REQUEST_PROCEDURE = 1
EXTENDED_REJECT_INDICATION = 2
@@ -2003,7 +2004,7 @@ class HCI_Packet:
Abstract Base class for HCI packets
'''
hci_packet_type: int
hci_packet_type: ClassVar[int]
@staticmethod
def from_bytes(packet: bytes) -> HCI_Packet:
@@ -6192,12 +6193,23 @@ class HCI_SynchronousDataPacket(HCI_Packet):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class HCI_IsoDataPacket(HCI_Packet):
'''
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
'''
hci_packet_type = HCI_ISO_DATA_PACKET
hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET
connection_handle: int
data_total_length: int
iso_sdu_fragment: bytes
pb_flag: int
ts_flag: int = 0
time_stamp: Optional[int] = None
packet_sequence_number: Optional[int] = None
iso_sdu_length: Optional[int] = None
packet_status_flag: Optional[int] = None
@staticmethod
def from_bytes(packet: bytes) -> HCI_IsoDataPacket:
@@ -6241,28 +6253,6 @@ class HCI_IsoDataPacket(HCI_Packet):
iso_sdu_fragment=iso_sdu_fragment,
)
def __init__(
self,
connection_handle: int,
pb_flag: int,
ts_flag: int,
data_total_length: int,
time_stamp: Optional[int],
packet_sequence_number: Optional[int],
iso_sdu_length: Optional[int],
packet_status_flag: Optional[int],
iso_sdu_fragment: bytes,
) -> None:
self.connection_handle = connection_handle
self.pb_flag = pb_flag
self.ts_flag = ts_flag
self.data_total_length = data_total_length
self.time_stamp = time_stamp
self.packet_sequence_number = packet_sequence_number
self.iso_sdu_length = iso_sdu_length
self.packet_status_flag = packet_status_flag
self.iso_sdu_fragment = iso_sdu_fragment
def __bytes__(self) -> bytes:
return self.to_bytes()

View File

@@ -721,14 +721,16 @@ class Host(AbortableEventEmitter):
for connection_handle, num_completed_packets in zip(
event.connection_handles, event.num_completed_packets
):
if not (connection := self.connections.get(connection_handle)):
if connection := self.connections.get(connection_handle):
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
elif not (
self.cis_links.get(connection_handle)
or self.sco_links.get(connection_handle)
):
logger.warning(
'received packet completion event for unknown handle '
f'0x{connection_handle:04X}'
)
continue
connection.acl_packet_queue.on_packets_completed(num_completed_packets)
# Classic only
def on_hci_connection_request_event(self, event):

View File

@@ -25,7 +25,8 @@ import asyncio
import logging
import os
import json
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Type
from typing_extensions import Self
from .colors import color
from .hci import Address
@@ -253,8 +254,10 @@ class JsonKeyStore(KeyStore):
logger.debug(f'JSON keystore: {self.filename}')
@staticmethod
def from_device(device: Device, filename=None) -> Optional[JsonKeyStore]:
@classmethod
def from_device(
cls: Type[Self], device: Device, filename: Optional[str] = None
) -> Self:
if not filename:
# Extract the filename from the config if there is one
if device.config.keystore is not None:
@@ -270,7 +273,7 @@ class JsonKeyStore(KeyStore):
else:
namespace = JsonKeyStore.DEFAULT_NAMESPACE
return JsonKeyStore(namespace, filename)
return cls(namespace, filename)
async def load(self):
# Try to open the file, without failing. If the file does not exist, it

View File

@@ -70,6 +70,7 @@ L2CAP_LE_SIGNALING_CID = 0x05
L2CAP_MIN_LE_MTU = 23
L2CAP_MIN_BR_EDR_MTU = 48
L2CAP_MAX_BR_EDR_MTU = 65535
L2CAP_DEFAULT_MTU = 2048 # Default value for the MTU we are willing to accept
@@ -832,7 +833,9 @@ class ClassicChannel(EventEmitter):
# Wait for the connection to succeed or fail
try:
return await self.connection_result
return await self.connection.abort_on(
'disconnection', self.connection_result
)
finally:
self.connection_result = None
@@ -2225,7 +2228,7 @@ class ChannelManager:
# Connect
try:
await channel.connect()
except Exception as e:
except BaseException as e:
del connection_channels[source_cid]
raise e

View File

@@ -78,6 +78,10 @@ class AudioLocation(enum.IntFlag):
LEFT_SURROUND = 0x04000000
RIGHT_SURROUND = 0x08000000
@property
def channel_count(self) -> int:
return bin(self.value).count('1')
class AudioInputType(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
@@ -218,6 +222,13 @@ class FrameDuration(enum.IntEnum):
DURATION_7500_US = 0x00
DURATION_10000_US = 0x01
@property
def us(self) -> int:
return {
FrameDuration.DURATION_7500_US: 7500,
FrameDuration.DURATION_10000_US: 10000,
}[self]
class SupportedFrameDuration(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
@@ -534,7 +545,7 @@ class CodecSpecificCapabilities:
supported_sampling_frequencies: SupportedSamplingFrequency
supported_frame_durations: SupportedFrameDuration
supported_audio_channel_counts: Sequence[int]
supported_audio_channel_count: Sequence[int]
min_octets_per_codec_frame: int
max_octets_per_codec_frame: int
supported_max_codec_frames_per_sdu: int
@@ -543,7 +554,7 @@ class CodecSpecificCapabilities:
def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
offset = 0
# Allowed default values.
supported_audio_channel_counts = [1]
supported_audio_channel_count = [1]
supported_max_codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
@@ -556,7 +567,7 @@ class CodecSpecificCapabilities:
elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
supported_frame_durations = SupportedFrameDuration(value)
elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
supported_audio_channel_counts = bits_to_channel_counts(value)
supported_audio_channel_count = bits_to_channel_counts(value)
elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
min_octets_per_sample = value & 0xFFFF
max_octets_per_sample = value >> 16
@@ -567,7 +578,7 @@ class CodecSpecificCapabilities:
return CodecSpecificCapabilities(
supported_sampling_frequencies=supported_sampling_frequencies,
supported_frame_durations=supported_frame_durations,
supported_audio_channel_counts=supported_audio_channel_counts,
supported_audio_channel_count=supported_audio_channel_count,
min_octets_per_codec_frame=min_octets_per_sample,
max_octets_per_codec_frame=max_octets_per_sample,
supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
@@ -584,7 +595,7 @@ class CodecSpecificCapabilities:
self.supported_frame_durations,
2,
CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
channel_counts_to_bits(self.supported_audio_channel_counts),
channel_counts_to_bits(self.supported_audio_channel_count),
5,
CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
self.min_octets_per_codec_frame,
@@ -870,15 +881,22 @@ class AseStateMachine(gatt.Characteristic):
cig_id: int,
cis_id: int,
) -> None:
if cis_id == self.cis_id and self.state == self.State.ENABLING:
if (
cig_id == self.cig_id
and cis_id == self.cis_id
and self.state == self.State.ENABLING
):
acl_connection.abort_on(
'flush', self.service.device.accept_cis_request(cis_handle)
)
def on_cis_establishment(self, cis_link: device.CisLink) -> None:
if cis_link.cis_id == self.cis_id and self.state == self.State.ENABLING:
self.state = self.State.STREAMING
self.cis_link = cis_link
if (
cis_link.cig_id == self.cig_id
and cis_link.cis_id == self.cis_id
and self.state == self.State.ENABLING
):
cis_link.on('disconnection', self.on_cis_disconnection)
async def post_cis_established():
await self.service.device.send_command(
@@ -891,9 +909,15 @@ class AseStateMachine(gatt.Characteristic):
codec_configuration=b'',
)
)
if self.role == AudioRole.SINK:
self.state = self.State.STREAMING
await self.service.device.notify_subscribers(self, self.value)
cis_link.acl_connection.abort_on('flush', post_cis_established())
self.cis_link = cis_link
def on_cis_disconnection(self, _reason) -> None:
self.cis_link = None
def on_config_codec(
self,
@@ -991,11 +1015,17 @@ class AseStateMachine(gatt.Characteristic):
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
)
self.state = self.State.DISABLING
if self.role == AudioRole.SINK:
self.state = self.State.QOS_CONFIGURED
else:
self.state = self.State.DISABLING
return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
if self.state != AseStateMachine.State.DISABLING:
if (
self.role != AudioRole.SOURCE
or self.state != AseStateMachine.State.DISABLING
):
return (
AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
AseReasonCode.NONE,
@@ -1046,6 +1076,7 @@ class AseStateMachine(gatt.Characteristic):
def state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}')
self._state = new_state
self.emit('state_change')
@property
def value(self):
@@ -1118,6 +1149,7 @@ class AudioStreamControlService(gatt.TemplateService):
ase_state_machines: Dict[int, AseStateMachine]
ase_control_point: gatt.Characteristic
_active_client: Optional[device.Connection] = None
def __init__(
self,
@@ -1155,7 +1187,16 @@ class AudioStreamControlService(gatt.TemplateService):
else:
return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE)
def _on_client_disconnected(self, _reason: int) -> None:
for ase in self.ase_state_machines.values():
ase.state = AseStateMachine.State.IDLE
self._active_client = None
def on_write_ase_control_point(self, connection, data):
if not self._active_client and connection:
self._active_client = connection
connection.once('disconnection', self._on_client_disconnected)
operation = ASE_Operation.from_bytes(data)
responses = []
logger.debug(f'*** ASCS Write {operation} ***')

View File

@@ -106,9 +106,11 @@ CRC_TABLE = bytes([
0XBA, 0X2B, 0X59, 0XC8, 0XBD, 0X2C, 0X5E, 0XCF
])
RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_WINDOW_SIZE = 7
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
RFCOMM_DEFAULT_L2CAP_MTU = 2048
RFCOMM_DEFAULT_INITIAL_CREDITS = 7
RFCOMM_DEFAULT_MAX_CREDITS = 32
RFCOMM_DEFAULT_CREDIT_THRESHOLD = RFCOMM_DEFAULT_MAX_CREDITS // 2
RFCOMM_DEFAULT_MAX_FRAME_SIZE = 2000
RFCOMM_DYNAMIC_CHANNEL_NUMBER_START = 1
RFCOMM_DYNAMIC_CHANNEL_NUMBER_END = 30
@@ -365,12 +367,12 @@ class RFCOMM_MCC_PN:
ack_timer: int
max_frame_size: int
max_retransmissions: int
window_size: int
initial_credits: int
def __post_init__(self) -> None:
if self.window_size < 1 or self.window_size > 7:
if self.initial_credits < 1 or self.initial_credits > 7:
logger.warning(
f'Error Recovery Window size {self.window_size} is out of range [1, 7].'
f'Initial credits {self.initial_credits} is out of range [1, 7].'
)
@staticmethod
@@ -382,7 +384,7 @@ class RFCOMM_MCC_PN:
ack_timer=data[3],
max_frame_size=data[4] | data[5] << 8,
max_retransmissions=data[6],
window_size=data[7] & 0x07,
initial_credits=data[7] & 0x07,
)
def __bytes__(self) -> bytes:
@@ -396,7 +398,7 @@ class RFCOMM_MCC_PN:
(self.max_frame_size >> 8) & 0xFF,
self.max_retransmissions & 0xFF,
# Only 3 bits are meaningful.
self.window_size & 0x07,
self.initial_credits & 0x07,
]
)
@@ -446,40 +448,43 @@ class DLC(EventEmitter):
DISCONNECTED = 0x04
RESET = 0x05
connection_result: Optional[asyncio.Future]
_sink: Optional[Callable[[bytes], None]]
_enqueued_rx_packets: collections.deque[bytes]
def __init__(
self,
multiplexer: Multiplexer,
dlci: int,
max_frame_size: int,
window_size: int,
tx_max_frame_size: int,
tx_initial_credits: int,
rx_max_frame_size: int,
rx_initial_credits: int,
) -> None:
super().__init__()
self.multiplexer = multiplexer
self.dlci = dlci
self.max_frame_size = max_frame_size
self.window_size = window_size
self.rx_credits = window_size
self.rx_threshold = window_size // 2
self.tx_credits = window_size
self.rx_max_frame_size = rx_max_frame_size
self.rx_initial_credits = rx_initial_credits
self.rx_max_credits = RFCOMM_DEFAULT_MAX_CREDITS
self.rx_credits = rx_initial_credits
self.rx_credits_threshold = RFCOMM_DEFAULT_CREDIT_THRESHOLD
self.tx_max_frame_size = tx_max_frame_size
self.tx_credits = tx_initial_credits
self.tx_buffer = b''
self.state = DLC.State.INIT
self.role = multiplexer.role
self.c_r = 1 if self.role == Multiplexer.Role.INITIATOR else 0
self.connection_result = None
self.connection_result: Optional[asyncio.Future] = None
self.disconnection_result: Optional[asyncio.Future] = None
self.drained = asyncio.Event()
self.drained.set()
# Queued packets when sink is not set.
self._enqueued_rx_packets = collections.deque(maxlen=DEFAULT_RX_QUEUE_SIZE)
self._sink = None
self._enqueued_rx_packets: collections.deque[bytes] = collections.deque(
maxlen=DEFAULT_RX_QUEUE_SIZE
)
self._sink: Optional[Callable[[bytes], None]] = None
# Compute the MTU
max_overhead = 4 + 1 # header with 2-byte length + fcs
self.mtu = min(
max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
tx_max_frame_size, self.multiplexer.l2cap_channel.peer_mtu - max_overhead
)
@property
@@ -525,20 +530,35 @@ class DLC(EventEmitter):
self.emit('open')
def on_ua_frame(self, _frame: RFCOMM_Frame) -> None:
if self.state != DLC.State.CONNECTING:
if self.state == DLC.State.CONNECTING:
# Exchange the modem status with the peer
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
self.change_state(DLC.State.CONNECTED)
if self.connection_result:
self.connection_result.set_result(None)
self.connection_result = None
self.multiplexer.on_dlc_open_complete(self)
elif self.state == DLC.State.DISCONNECTING:
self.change_state(DLC.State.DISCONNECTED)
if self.disconnection_result:
self.disconnection_result.set_result(None)
self.disconnection_result = None
self.multiplexer.on_dlc_disconnection(self)
self.emit('close')
else:
logger.warning(
color('!!! received SABM when not in CONNECTING state', 'red')
color(
(
'!!! received UA frame when not in '
'CONNECTING or DISCONNECTING state'
),
'red',
)
)
return
# Exchange the modem status with the peer
msc = RFCOMM_MCC_MSC(dlci=self.dlci, fc=0, rtc=1, rtr=1, ic=0, dv=1)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.MSC, c_r=1, data=bytes(msc))
logger.debug(f'>>> MCC MSC Command: {msc}')
self.send_frame(RFCOMM_Frame.uih(c_r=self.c_r, dlci=0, information=mcc))
self.change_state(DLC.State.CONNECTED)
self.multiplexer.on_dlc_open_complete(self)
def on_dm_frame(self, frame: RFCOMM_Frame) -> None:
# TODO: handle all states
@@ -609,6 +629,19 @@ class DLC(EventEmitter):
self.connection_result = asyncio.get_running_loop().create_future()
self.send_frame(RFCOMM_Frame.sabm(c_r=self.c_r, dlci=self.dlci))
async def disconnect(self) -> None:
if self.state != DLC.State.CONNECTED:
raise InvalidStateError('invalid state')
self.disconnection_result = asyncio.get_running_loop().create_future()
self.change_state(DLC.State.DISCONNECTING)
self.send_frame(
RFCOMM_Frame.disc(
c_r=1 if self.role == Multiplexer.Role.INITIATOR else 0, dlci=self.dlci
)
)
await self.disconnection_result
def accept(self) -> None:
if self.state != DLC.State.INIT:
raise InvalidStateError('invalid state')
@@ -618,9 +651,9 @@ class DLC(EventEmitter):
cl=0xE0,
priority=7,
ack_timer=0,
max_frame_size=self.max_frame_size,
max_frame_size=self.rx_max_frame_size,
max_retransmissions=0,
window_size=self.window_size,
initial_credits=self.rx_initial_credits,
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=0, data=bytes(pn))
logger.debug(f'>>> PN Response: {pn}')
@@ -628,8 +661,8 @@ class DLC(EventEmitter):
self.change_state(DLC.State.CONNECTING)
def rx_credits_needed(self) -> int:
if self.rx_credits <= self.rx_threshold:
return self.window_size - self.rx_credits
if self.rx_credits <= self.rx_credits_threshold:
return self.rx_max_credits - self.rx_credits
return 0
@@ -689,8 +722,28 @@ class DLC(EventEmitter):
async def drain(self) -> None:
await self.drained.wait()
def abort(self) -> None:
logger.debug(f'aborting DLC: {self}')
if self.connection_result:
self.connection_result.cancel()
self.connection_result = None
if self.disconnection_result:
self.disconnection_result.cancel()
self.disconnection_result = None
self.change_state(DLC.State.RESET)
self.emit('close')
def __str__(self) -> str:
return f'DLC(dlci={self.dlci},state={self.state.name})'
return (
f'DLC(dlci={self.dlci}, '
f'state={self.state.name}, '
f'rx_max_frame_size={self.rx_max_frame_size}, '
f'rx_credits={self.rx_credits}, '
f'rx_max_credits={self.rx_max_credits}, '
f'tx_max_frame_size={self.tx_max_frame_size}, '
f'tx_credits={self.tx_credits}'
')'
)
# -----------------------------------------------------------------------------
@@ -711,7 +764,7 @@ class Multiplexer(EventEmitter):
connection_result: Optional[asyncio.Future]
disconnection_result: Optional[asyncio.Future]
open_result: Optional[asyncio.Future]
acceptor: Optional[Callable[[int], bool]]
acceptor: Optional[Callable[[int], Optional[Tuple[int, int]]]]
dlcs: Dict[int, DLC]
def __init__(self, l2cap_channel: l2cap.ClassicChannel, role: Role) -> None:
@@ -723,11 +776,15 @@ class Multiplexer(EventEmitter):
self.connection_result = None
self.disconnection_result = None
self.open_result = None
self.open_pn: Optional[RFCOMM_MCC_PN] = None
self.open_rx_max_credits = 0
self.acceptor = None
# Become a sink for the L2CAP channel
l2cap_channel.sink = self.on_pdu
l2cap_channel.on('close', self.on_l2cap_channel_close)
def change_state(self, new_state: State) -> None:
logger.debug(f'{self} state change -> {color(new_state.name, "cyan")}')
self.state = new_state
@@ -791,6 +848,7 @@ class Multiplexer(EventEmitter):
'rfcomm',
)
)
self.open_result = None
else:
logger.warning(f'unexpected state for DM: {self}')
@@ -828,9 +886,16 @@ class Multiplexer(EventEmitter):
else:
if self.acceptor:
channel_number = pn.dlci >> 1
if self.acceptor(channel_number):
if dlc_params := self.acceptor(channel_number):
# Create a new DLC
dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size)
dlc = DLC(
self,
dlci=pn.dlci,
tx_max_frame_size=pn.max_frame_size,
tx_initial_credits=pn.initial_credits,
rx_max_frame_size=dlc_params[0],
rx_initial_credits=dlc_params[1],
)
self.dlcs[pn.dlci] = dlc
# Re-emit the handshake completion event
@@ -848,8 +913,17 @@ class Multiplexer(EventEmitter):
# Response
logger.debug(f'>>> PN Response: {pn}')
if self.state == Multiplexer.State.OPENING:
dlc = DLC(self, pn.dlci, pn.max_frame_size, pn.window_size)
assert self.open_pn
dlc = DLC(
self,
dlci=pn.dlci,
tx_max_frame_size=pn.max_frame_size,
tx_initial_credits=pn.initial_credits,
rx_max_frame_size=self.open_pn.max_frame_size,
rx_initial_credits=self.open_pn.initial_credits,
)
self.dlcs[pn.dlci] = dlc
self.open_pn = None
dlc.connect()
else:
logger.warning('ignoring PN response')
@@ -887,7 +961,7 @@ class Multiplexer(EventEmitter):
self,
channel: int,
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
window_size: int = RFCOMM_DEFAULT_WINDOW_SIZE,
initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS,
) -> DLC:
if self.state != Multiplexer.State.CONNECTED:
if self.state == Multiplexer.State.OPENING:
@@ -895,17 +969,19 @@ class Multiplexer(EventEmitter):
raise InvalidStateError('not connected')
pn = RFCOMM_MCC_PN(
self.open_pn = RFCOMM_MCC_PN(
dlci=channel << 1,
cl=0xF0,
priority=7,
ack_timer=0,
max_frame_size=max_frame_size,
max_retransmissions=0,
window_size=window_size,
initial_credits=initial_credits,
)
mcc = RFCOMM_Frame.make_mcc(mcc_type=MccType.PN, c_r=1, data=bytes(pn))
logger.debug(f'>>> Sending MCC: {pn}')
mcc = RFCOMM_Frame.make_mcc(
mcc_type=MccType.PN, c_r=1, data=bytes(self.open_pn)
)
logger.debug(f'>>> Sending MCC: {self.open_pn}')
self.open_result = asyncio.get_running_loop().create_future()
self.change_state(Multiplexer.State.OPENING)
self.send_frame(
@@ -915,15 +991,31 @@ class Multiplexer(EventEmitter):
information=mcc,
)
)
result = await self.open_result
self.open_result = None
return result
return await self.open_result
def on_dlc_open_complete(self, dlc: DLC) -> None:
logger.debug(f'DLC [{dlc.dlci}] open complete')
self.change_state(Multiplexer.State.CONNECTED)
if self.open_result:
self.open_result.set_result(dlc)
self.open_result = None
def on_dlc_disconnection(self, dlc: DLC) -> None:
logger.debug(f'DLC [{dlc.dlci}] disconnection')
self.dlcs.pop(dlc.dlci, None)
def on_l2cap_channel_close(self) -> None:
logger.debug('L2CAP channel closed, cleaning up')
if self.open_result:
self.open_result.cancel()
self.open_result = None
if self.disconnection_result:
self.disconnection_result.cancel()
self.disconnection_result = None
for dlc in self.dlcs.values():
dlc.abort()
def __str__(self) -> str:
return f'Multiplexer(state={self.state.name})'
@@ -982,15 +1074,13 @@ class Client:
# -----------------------------------------------------------------------------
class Server(EventEmitter):
acceptors: Dict[int, Callable[[DLC], None]]
def __init__(
self, device: Device, l2cap_mtu: int = RFCOMM_DEFAULT_L2CAP_MTU
) -> None:
super().__init__()
self.device = device
self.multiplexer = None
self.acceptors = {}
self.acceptors: Dict[int, Callable[[DLC], None]] = {}
self.dlc_configs: Dict[int, Tuple[int, int]] = {}
# Register ourselves with the L2CAP channel manager
self.l2cap_server = device.create_l2cap_server(
@@ -998,7 +1088,13 @@ class Server(EventEmitter):
handler=self.on_connection,
)
def listen(self, acceptor: Callable[[DLC], None], channel: int = 0) -> int:
def listen(
self,
acceptor: Callable[[DLC], None],
channel: int = 0,
max_frame_size: int = RFCOMM_DEFAULT_MAX_FRAME_SIZE,
initial_credits: int = RFCOMM_DEFAULT_INITIAL_CREDITS,
) -> int:
if channel:
if channel in self.acceptors:
# Busy
@@ -1018,6 +1114,8 @@ class Server(EventEmitter):
return 0
self.acceptors[channel] = acceptor
self.dlc_configs[channel] = (max_frame_size, initial_credits)
return channel
def on_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
@@ -1035,15 +1133,14 @@ class Server(EventEmitter):
# Notify
self.emit('start', multiplexer)
def accept_dlc(self, channel_number: int) -> bool:
return channel_number in self.acceptors
def accept_dlc(self, channel_number: int) -> Optional[Tuple[int, int]]:
return self.dlc_configs.get(channel_number)
def on_dlc(self, dlc: DLC) -> None:
logger.debug(f'@@@ new DLC connected: {dlc}')
# Let the acceptor know
acceptor = self.acceptors.get(dlc.dlci >> 1)
if acceptor:
if acceptor := self.acceptors.get(dlc.dlci >> 1):
acceptor(dlc)
def __enter__(self) -> Self:

View File

@@ -997,7 +997,7 @@ class Server:
try:
handler(sdp_pdu)
except Exception as error:
logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
self.send_response(
SDP_ErrorResponse(
transaction_id=sdp_pdu.transaction_id,

View File

@@ -425,6 +425,10 @@ class SnoopingTransport(Transport):
class Source:
sink: TransportSink
@property
def metadata(self) -> dict[str, Any]:
return getattr(self.source, 'metadata', {})
def __init__(self, source: TransportSource, snooper: Snooper):
self.source = source
self.snooper = snooper

View File

@@ -26,7 +26,7 @@ import websockets
from typing import Optional
import bumble.core
from bumble.device import Device
from bumble.device import Device, ScoLink
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_BR_EDR_TRANSPORT,
@@ -217,11 +217,11 @@ async def main() -> None:
1: hfp.make_ag_sdp_records(1, channel, configuration)
}
def on_sco_connection(sco_link):
def on_sco_connection(sco_link: ScoLink):
assert ag_protocol
on_sco_state_change(ag_protocol.active_codec)
sco_link.on('disconnection', lambda _: on_sco_state_change(0))
sco_link.on('pdu', on_sco_packet)
sco_link.sink = on_sco_packet
device.on('sco_connection', on_sco_connection)
if len(sys.argv) >= 4:

View File

@@ -16,20 +16,28 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import datetime
import functools
import logging
import sys
import os
import io
import struct
import secrets
from typing import Dict
from bumble.core import AdvertisingData
from bumble.device import Device, CisLink
from bumble.device import Device
from bumble.hci import (
CodecID,
CodingFormat,
HCI_IsoDataPacket,
)
from bumble.profiles.bap import (
AseStateMachine,
UnicastServerAdvertisingData,
CodecSpecificConfiguration,
CodecSpecificCapabilities,
ContextType,
AudioLocation,
@@ -45,6 +53,32 @@ from bumble.profiles.csip import CoordinatedSetIdentificationService, SirkType
from bumble.transport import open_transport_or_link
def _sink_pac_record() -> PacRecord:
return PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_8000
| SupportedSamplingFrequency.FREQ_16000
| SupportedSamplingFrequency.FREQ_24000
| SupportedSamplingFrequency.FREQ_32000
| SupportedSamplingFrequency.FREQ_48000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_7500_US_SUPPORTED
| SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_count=[1, 2],
min_octets_per_codec_frame=26,
max_octets_per_codec_frame=240,
supported_max_codec_frames_per_sdu=2,
),
)
file_outputs: Dict[AseStateMachine, io.BufferedWriter] = {}
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
@@ -71,49 +105,17 @@ async def main() -> None:
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
supported_sink_context=ContextType(0xFF), # All context types
available_sink_context=ContextType(0xFF), # All context types
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_48000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=120,
max_octets_per_codec_frame=120,
supported_max_codec_frames_per_sdu=1,
),
),
],
sink_pac=[_sink_pac_record()],
)
)
device.add_service(AudioStreamControlService(device, sink_ase_id=[1, 2]))
ascs = AudioStreamControlService(device, sink_ase_id=[1], source_ase_id=[2])
device.add_service(ascs)
advertising_data = (
bytes(
@@ -143,44 +145,57 @@ async def main() -> None:
+ csis.get_advertising_data()
+ bytes(UnicastServerAdvertisingData())
)
subprocess = await asyncio.create_subprocess_shell(
f'dlc3 | ffplay pipe:0',
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdin = subprocess.stdin
assert stdin
# Write a fake LC3 header to dlc3.
stdin.write(
bytes([0x1C, 0xCC]) # Header.
+ struct.pack(
'<HHHHHHI',
18, # Header length.
48000 // 100, # Sampling Rate(/100Hz).
0, # Bitrate(unused).
1, # Channels.
10000 // 10, # Frame duration(/10us).
0, # RFU.
0x0FFFFFFF, # Frame counts.
)
)
def on_pdu(pdu: HCI_IsoDataPacket):
def on_pdu(ase: AseStateMachine, pdu: HCI_IsoDataPacket):
# LC3 format: |frame_length(2)| + |frame(length)|.
sdu = b''
if pdu.iso_sdu_length:
stdin.write(struct.pack('<H', pdu.iso_sdu_length))
stdin.write(pdu.iso_sdu_fragment)
sdu = struct.pack('<H', pdu.iso_sdu_length)
sdu += pdu.iso_sdu_fragment
file_outputs[ase].write(sdu)
def on_cis(cis_link: CisLink):
cis_link.on('pdu', on_pdu)
def on_ase_state_change(
state: AseStateMachine.State,
ase: AseStateMachine,
) -> None:
if state != AseStateMachine.State.STREAMING:
if file_output := file_outputs.pop(ase):
file_output.close()
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration)
# Write a LC3 header.
file_output.write(
bytes([0x1C, 0xCC]) # Header.
+ struct.pack(
'<HHHHHHI',
18, # Header length.
codec_configuration.sampling_frequency.hz
// 100, # Sampling Rate(/100Hz).
0, # Bitrate(unused).
bin(codec_configuration.audio_channel_allocation).count(
'1'
), # Channels.
codec_configuration.frame_duration.us
// 10, # Frame duration(/10us).
0, # RFU.
0x0FFFFFFF, # Frame counts.
)
)
file_outputs[ase] = file_output
assert ase.cis_link
ase.cis_link.sink = functools.partial(on_pdu, ase)
device.once('cis_establishment', on_cis)
for ase in ascs.ase_state_machines.values():
ase.on(
'state_change',
functools.partial(on_ase_state_change, ase=ase),
)
await device.create_advertising_set(
advertising_data=advertising_data,
auto_restart=True,
)
await hci_transport.source.terminated

View File

@@ -102,7 +102,7 @@ async def main() -> None:
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
supported_audio_channel_count=[1],
min_octets_per_codec_frame=120,
max_octets_per_codec_frame=120,
supported_max_codec_frames_per_sdu=1,

View File

@@ -150,7 +150,8 @@ class AppViewModel : ViewModel() {
} else if (senderPacketSizeSlider < 0.5F) {
512
} else if (senderPacketSizeSlider < 0.7F) {
1024
// 970 is a value that works well on Android.
970
} else if (senderPacketSizeSlider < 0.9F) {
2048
} else {

View File

@@ -56,13 +56,19 @@ class SocketClient(private val viewModel: AppViewModel, private val socket: Blue
thread {
socketDataSource.receive()
socket.close()
sender.abort()
}
Log.info("Startup delay: $DEFAULT_STARTUP_DELAY")
Thread.sleep(DEFAULT_STARTUP_DELAY.toLong());
Log.info("Starting to send")
sender.run()
try {
sender.run()
} catch (error: IOException) {
Log.info("run ended abruptly")
}
cleanup()
}
}

View File

@@ -62,6 +62,7 @@ console_scripts =
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-rfcomm-bridge = bumble.apps.rfcomm_bridge:main
bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main
@@ -81,7 +82,7 @@ console_scripts =
build =
build >= 0.7
test =
pytest >= 8.0
pytest >= 8.2
pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0
coverage >= 6.4
@@ -89,13 +90,14 @@ development =
black == 24.3
grpcio-tools >= 1.62.1
invoke >= 1.7.3
mypy == 1.8.0
mypy == 1.10.0
nox >= 2022
pylint == 2.15.8
pylint == 3.1.0
pyyaml >= 6.0
types-appdirs >= 1.4.3
types-invoke >= 1.7.3
types-protobuf >= 4.21.0
wasmtime == 20.0.0
avatar =
pandora-avatar == 0.0.9
rootcanal == 1.10.0 ; python_version>='3.10'

View File

@@ -72,7 +72,7 @@ def test_codec_specific_capabilities() -> None:
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
supported_audio_channel_count=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
@@ -88,7 +88,7 @@ def test_pac_record() -> None:
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
supported_audio_channel_count=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
@@ -216,7 +216,7 @@ async def test_pacs():
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
supported_audio_channel_count=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
@@ -232,7 +232,7 @@ async def test_pacs():
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
supported_audio_channel_count=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,

View File

@@ -16,6 +16,7 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
import functools
import logging
import os
from types import LambdaType
@@ -35,12 +36,14 @@ from bumble.hci import (
HCI_COMMAND_STATUS_PENDING,
HCI_CREATE_CONNECTION_COMMAND,
HCI_SUCCESS,
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
Address,
OwnAddressType,
HCI_Command_Complete_Event,
HCI_Command_Status_Event,
HCI_Connection_Complete_Event,
HCI_Connection_Request_Event,
HCI_Error,
HCI_Packet,
)
from bumble.gatt import (
@@ -52,6 +55,10 @@ from bumble.gatt import (
from .test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
_TIMEOUT = 0.1
# -----------------------------------------------------------------------------
# Logging
@@ -214,6 +221,12 @@ async def test_device_connect_parallel():
d1.host.set_packet_sink(Sink(d1_flow()))
d2.host.set_packet_sink(Sink(d2_flow()))
d1_accept_task = asyncio.create_task(d1.accept(peer_address=d0.public_address))
d2_accept_task = asyncio.create_task(d2.accept())
# Ensure that the accept tasks have started.
await async_barrier()
[c01, c02, a10, a20] = await asyncio.gather(
*[
asyncio.create_task(
@@ -222,8 +235,8 @@ async def test_device_connect_parallel():
asyncio.create_task(
d0.connect(d2.public_address, transport=BT_BR_EDR_TRANSPORT)
),
asyncio.create_task(d1.accept(peer_address=d0.public_address)),
asyncio.create_task(d2.accept()),
d1_accept_task,
d2_accept_task,
]
)
@@ -385,6 +398,29 @@ async def test_get_remote_le_features():
assert (await devices.connections[0].get_remote_le_features()) is not None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features_failed():
devices = TwoDevices()
await devices.setup_connection()
def on_hci_le_read_remote_features_complete_event(event):
devices[0].host.emit(
'le_remote_features_failure',
event.connection_handle,
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
)
devices[0].host.on_hci_le_read_remote_features_complete_event = (
on_hci_le_read_remote_features_complete_event
)
with pytest.raises(HCI_Error):
await asyncio.wait_for(
devices.connections[0].get_remote_le_features(), _TIMEOUT
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cis():
@@ -433,6 +469,65 @@ async def test_cis():
await cis_links[1].disconnect()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cis_setup_failure():
devices = TwoDevices()
await devices.setup_connection()
cis_requests = asyncio.Queue()
def on_cis_request(
acl_connection: Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
):
del acl_connection, cig_id, cis_id
cis_requests.put_nowait(cis_handle)
devices[1].on('cis_request', on_cis_request)
cis_handles = await devices[0].setup_cig(
cig_id=1,
cis_id=[2],
sdu_interval=(0, 0),
framing=0,
max_sdu=(0, 0),
retransmission_number=0,
max_transport_latency=(0, 0),
)
assert len(cis_handles) == 1
cis_create_task = asyncio.create_task(
devices[0].create_cis(
[
(cis_handles[0], devices.connections[0].handle),
]
)
)
def on_hci_le_cis_established_event(host, event):
host.emit(
'cis_establishment_failure',
event.connection_handle,
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
)
for device in devices:
device.host.on_hci_le_cis_established_event = functools.partial(
on_hci_le_cis_established_event, device.host
)
cis_request = await asyncio.wait_for(cis_requests.get(), _TIMEOUT)
with pytest.raises(HCI_Error):
await asyncio.wait_for(devices[1].accept_cis_request(cis_request), _TIMEOUT)
with pytest.raises(HCI_Error):
await asyncio.wait_for(cis_create_task, _TIMEOUT)
# -----------------------------------------------------------------------------
def test_gatt_services_with_gas():
device = Device(host=Host(None, None))

View File

@@ -62,7 +62,7 @@ def test_frames():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_basic_connection() -> None:
async def test_connection_and_disconnection() -> None:
devices = test_utils.TwoDevices()
await devices.setup_connection()
@@ -83,6 +83,11 @@ async def test_basic_connection() -> None:
dlcs[1].write(b'Lorem ipsum dolor sit amet')
assert await queues[0].get() == b'Lorem ipsum dolor sit amet'
closed = asyncio.Event()
dlcs[1].on('close', closed.set)
await dlcs[1].disconnect()
await closed.wait()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio