Compare commits

..

1 Commits

Author SHA1 Message Date
pstruebi 7e4948d9ef add small asrc example 2025-10-06 11:04:06 +02:00
33 changed files with 1280 additions and 1359 deletions
+1 -1
View File
@@ -50,7 +50,7 @@ Bumble is easiest to use with a dedicated USB dongle.
This is because internal Bluetooth interfaces tend to be locked down by the operating system.
You can use the [usb_probe](/docs/mkdocs/src/apps_and_tools/usb_probe.md) tool (all platforms) or `lsusb` (Linux or macOS) to list the available USB devices on your system.
See the [USB Transport](/docs/mkdocs/src/transports/usb.md) page for details on how to refer to USB devices. Also, if you are on a mac, see [these instructions](docs/mkdocs/src/platforms/macos.md).
See the [USB Transport](/docs/mkdocs/src/transports/usb.md) page for details on how to refer to USB devices. Also, if your are on a mac, see [these instructions](docs/mkdocs/src/platforms/macos.md).
## License
+3 -1
View File
@@ -41,6 +41,7 @@ import bumble.transport
import bumble.utils
from bumble import company_ids, core, data_types, gatt, hci
from bumble.audio import io as audio_io
from bumble.audio import io_asrc as audio_io_asrc
from bumble.colors import color
from bumble.profiles import bap, bass, le_audio, pbp
@@ -891,7 +892,8 @@ async def run_transmit(
print('Start Periodic Advertising')
await advertising_set.start_periodic()
audio_input = await audio_io.create_audio_input(input, input_format)
#audio_input = await audio_io.create_audio_input(input, input_format)
audio_input = audio_io_asrc.SoundDeviceAudioInputAsrc(input[7:], input_format)
pcm_format = await audio_input.open()
# This try should be replaced with contextlib.aclosing() when python 3.9 is no
# longer needed.
+17 -56
View File
@@ -21,12 +21,11 @@ import dataclasses
import enum
import logging
import struct
from collections.abc import AsyncGenerator, Awaitable, Callable
from typing import Union
from collections.abc import AsyncGenerator
from typing import Awaitable, Callable
from typing_extensions import ClassVar, Self
from bumble import utils
from bumble.codecs import AacAudioRtpPacket
from bumble.company_ids import COMPANY_IDENTIFIERS
from bumble.core import (
@@ -60,18 +59,19 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# fmt: off
class CodecType(utils.OpenIntEnum):
SBC = 0x00
MPEG_1_2_AUDIO = 0x01
MPEG_2_4_AAC = 0x02
ATRAC_FAMILY = 0x03
NON_A2DP = 0xFF
A2DP_SBC_CODEC_TYPE = 0x00
A2DP_MPEG_1_2_AUDIO_CODEC_TYPE = 0x01
A2DP_MPEG_2_4_AAC_CODEC_TYPE = 0x02
A2DP_ATRAC_FAMILY_CODEC_TYPE = 0x03
A2DP_NON_A2DP_CODEC_TYPE = 0xFF
A2DP_SBC_CODEC_TYPE = CodecType.SBC
A2DP_MPEG_1_2_AUDIO_CODEC_TYPE = CodecType.MPEG_1_2_AUDIO
A2DP_MPEG_2_4_AAC_CODEC_TYPE = CodecType.MPEG_2_4_AAC
A2DP_ATRAC_FAMILY_CODEC_TYPE = CodecType.ATRAC_FAMILY
A2DP_NON_A2DP_CODEC_TYPE = CodecType.NON_A2DP
A2DP_CODEC_TYPE_NAMES = {
A2DP_SBC_CODEC_TYPE: 'A2DP_SBC_CODEC_TYPE',
A2DP_MPEG_1_2_AUDIO_CODEC_TYPE: 'A2DP_MPEG_1_2_AUDIO_CODEC_TYPE',
A2DP_MPEG_2_4_AAC_CODEC_TYPE: 'A2DP_MPEG_2_4_AAC_CODEC_TYPE',
A2DP_ATRAC_FAMILY_CODEC_TYPE: 'A2DP_ATRAC_FAMILY_CODEC_TYPE',
A2DP_NON_A2DP_CODEC_TYPE: 'A2DP_NON_A2DP_CODEC_TYPE'
}
SBC_SYNC_WORD = 0x9C
@@ -259,48 +259,9 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)):
]
# -----------------------------------------------------------------------------
class MediaCodecInformation:
'''Base Media Codec Information.'''
@classmethod
def create(
cls, media_codec_type: int, data: bytes
) -> Union[MediaCodecInformation, bytes]:
if media_codec_type == CodecType.SBC:
return SbcMediaCodecInformation.from_bytes(data)
elif media_codec_type == CodecType.MPEG_2_4_AAC:
return AacMediaCodecInformation.from_bytes(data)
elif media_codec_type == CodecType.NON_A2DP:
vendor_media_codec_information = (
VendorSpecificMediaCodecInformation.from_bytes(data)
)
if (
vendor_class_map := A2DP_VENDOR_MEDIA_CODEC_INFORMATION_CLASSES.get(
vendor_media_codec_information.vendor_id
)
) and (
media_codec_information_class := vendor_class_map.get(
vendor_media_codec_information.codec_id
)
):
return media_codec_information_class.from_bytes(
vendor_media_codec_information.value
)
return vendor_media_codec_information
@classmethod
def from_bytes(cls, data: bytes) -> Self:
del data # Unused.
raise NotImplementedError
def __bytes__(self) -> bytes:
raise NotImplementedError
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class SbcMediaCodecInformation(MediaCodecInformation):
class SbcMediaCodecInformation:
'''
A2DP spec - 4.3.2 Codec Specific Information Elements
'''
@@ -384,7 +345,7 @@ class SbcMediaCodecInformation(MediaCodecInformation):
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class AacMediaCodecInformation(MediaCodecInformation):
class AacMediaCodecInformation:
'''
A2DP spec - 4.5.2 Codec Specific Information Elements
'''
@@ -466,7 +427,7 @@ class AacMediaCodecInformation(MediaCodecInformation):
@dataclasses.dataclass
# -----------------------------------------------------------------------------
class VendorSpecificMediaCodecInformation(MediaCodecInformation):
class VendorSpecificMediaCodecInformation:
'''
A2DP spec - 4.7.2 Codec Specific Information Elements
'''
+339
View File
@@ -0,0 +1,339 @@
# Copyright 2025
#
# Drop-in replacement for `SoundDeviceAudioInput` that adds a tiny ASRC stage.
#
# Constraints per request:
# - Only import io_bumble.py at module level.
# - Reuse the ASRC functionality from asrc.py conceptually (PI control + FIFO +
# linear/sinc resampling behavior). We implement a minimal, dependency-free
# variant (linear interpolation with a small PI loop) so this module does not
# import anything else at top-level.
#
# Notes:
# - Input stream is captured via sounddevice (imported lazily inside methods).
# - Input is mono float32 for simplicity; output matches the original class
# signature: INT16, stereo, at the same nominal sample rate as requested.
from .io import PcmFormat, ThreadedAudioInput, logger # only top-level import
class SoundDeviceAudioInputAsrc(ThreadedAudioInput):
"""Sound device audio input with a simple ASRC stage.
Interface-compatible with `io_bumble.SoundDeviceAudioInput`:
- __init__(device_name: str, pcm_format: PcmFormat)
- _open() -> PcmFormat
- _read(frame_size: int) -> bytes
- _close() -> None
Behavior:
- Captures mono float32 frames from the device.
- Buffers into an internal ring buffer.
- Produces stereo INT16 frames using a linear-interp resampler whose
ratio is adjusted by a tiny PI loop to hold FIFO depth near a target.
"""
def __init__(self, device_name: str, pcm_format: str) -> None:
super().__init__()
# Device & format
self._device = int(device_name) if device_name else None
pcm_format: PcmFormat | None
if pcm_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(pcm_format)
self._pcm_format_in = pcm_format
# We always output stereo INT16 at the same nominal sample rate.
self._pcm_format_out = PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
pcm_format.sample_rate,
2,
)
# sounddevice stream (created in _open)
self._stream = None # type: ignore[assignment]
# --- ASRC state (inspired by asrc.py) ---
# Nominal input/output rate ratio
self._r = 1.0
self._integral = 0.0
self._phi = 0.0 # fractional read position within current chunk
# PI gains (tiny to avoid warble)
self._Kp = 2e-6
self._Ki = 5e-8
self._R0 = 1.0
# Target FIFO level and deadband (≈10 ms target, 0.5 ms deadband)
fs = float(self._pcm_format_in.sample_rate)
self._target_samples = max(1, int(0.010 * fs))
self._deadband = max(1, int(0.0005 * fs))
# Ring buffer for mono float32 samples
# Capacity ~2 seconds for headroom
self._rb_cap = max(self._target_samples * 32, int(2 * fs))
self._rb = None # created in _init_rb()
self._ridx = 0
self._size = 0
self._lock = None # created in _init_rb()
self._init_rb()
# Light logging timer
self._last_log = 0.0
# Streaming resampler and internal output buffer (lazy init)
self._rs = None # samplerate.Resampler
self._out_buf = None # numpy.ndarray float32
# ---------------- Internal helpers -----------------
def _init_rb(self) -> None:
# Lazy import standard libs to keep only io_bumble imported at top level
import threading
from array import array
self._rb = array('f', [0.0] * self._rb_cap) # float32 ring buffer
self._lock = threading.Lock()
self._ridx = 0
self._size = 0
def _fifo_len(self) -> int:
with self._lock:
return self._size
def _fifo_write(self, x_f32) -> None:
# x_f32: 1-D float32-like iterable
k = len(x_f32)
if k <= 0:
return
rb = self._rb
if rb is None:
return
with self._lock:
# Trim if larger than capacity: keep last N
if k >= self._rb_cap:
x_f32 = x_f32[-self._rb_cap:]
k = self._rb_cap
# Make room on overflow (drop oldest)
excess = max(0, self._size + k - self._rb_cap)
if excess:
self._ridx = (self._ridx + excess) % self._rb_cap
self._size -= excess
# Write at tail position
wpos = (self._ridx + self._size) % self._rb_cap
first = min(k, self._rb_cap - wpos)
# Write first chunk
from array import array as _array # lazy import
rb[wpos:wpos + first] = _array('f', x_f32[:first])
# Wrap if needed
second = k - first
if second:
rb[0:second] = _array('f', x_f32[first:])
self._size += k
def _fifo_peek_array(self, n: int):
# Returns a Python list[float] copy of up to n samples
rb = self._rb
if rb is None:
return []
m = max(0, min(n, self._fifo_len()))
if m <= 0:
return []
pos = self._ridx
first = min(m, self._rb_cap - pos)
# Copy out
out = [0.0] * m
# First chunk
out[:first] = rb[pos:pos + first]
# Second chunk if wrap
second = m - first
if second > 0:
out[first:] = rb[0:second]
return out
def _fifo_discard(self, n: int) -> None:
with self._lock:
d = max(0, min(n, self._size))
self._ridx = (self._ridx + d) % self._rb_cap
self._size -= d
def _update_ratio(self) -> None:
# PI loop to hold buffer near target
e = self._target_samples - self._fifo_len()
if -self._deadband <= e <= self._deadband:
e = 0.0
cand_integral = self._integral + e
r_unclamped = self._R0 * (1.0 + self._Kp * e + self._Ki * cand_integral)
# Limit to ±1000 ppm vs nominal
ppm_unclamped = 1e6 * (r_unclamped / self._R0 - 1.0)
saturated_high = ppm_unclamped > 1000.0
saturated_low = ppm_unclamped < -1000.0
if saturated_high:
self._r = self._R0 * (1 + 1000e-6)
if e <= 0:
self._integral = cand_integral
self._integral *= 0.99
elif saturated_low:
self._r = self._R0 * (1 - 1000e-6)
if e >= 0:
self._integral = cand_integral
self._integral *= 0.99
else:
self._integral = cand_integral
self._r = r_unclamped
# Occasional log
try:
import time as _time
now = _time.time()
if now - self._last_log > 1.0:
buf_ms = 1000.0 * self._fifo_len() / float(self._pcm_format_in.sample_rate)
print(
f"\nASRC buf={buf_ms:5.1f} ms r={self._r:.9f} corr={1e6 * (self._r / self._R0 - 1.0):+7.1f} ppm"
)
self._last_log = now
except Exception:
# Logging must never break audio
pass
def _process(self, n_out: int) -> list[float]:
# Accumulate at least n_out samples using samplerate.Resampler
if n_out <= 0:
return []
# Lazy imports
import numpy as np # type: ignore
# Lazy init output buffer
if self._out_buf is None:
self._out_buf = np.zeros(0, dtype=np.float32)
# Choose chunk so we don't take too much from FIFO each time
max_chunk = max(256, int(np.ceil(n_out / max(1e-9, self._r))))
safety_iters = 0
while self._out_buf.size < n_out and safety_iters < 16:
safety_iters += 1
available = self._fifo_len()
if available <= 0:
break
take = min(available, max_chunk)
x = self._fifo_peek_array(take)
self._fifo_discard(take)
if not x:
break
x_arr = np.asarray(x, dtype=np.float32)
if self._rs is not None:
try:
y = self._rs.process(x_arr, ratio=float(self._r), end_of_input=False)
except Exception:
logger.exception("ASRC resampler error")
y = None
else:
y = None
if y is not None and getattr(y, 'size', 0):
y = y.astype(np.float32, copy=False)
if self._out_buf.size == 0:
self._out_buf = y
else:
self._out_buf = np.concatenate((self._out_buf, y))
if self._out_buf.size >= n_out:
out = self._out_buf[:n_out]
self._out_buf = self._out_buf[n_out:]
return out.tolist()
else:
# Not enough data produced; pad with zeros
out = np.zeros(n_out, dtype=np.float32)
if self._out_buf.size:
out[: self._out_buf.size] = self._out_buf
self._out_buf = np.zeros(0, dtype=np.float32)
return out.tolist()
def _mono_to_stereo_int16_bytes(self, mono_f32: list[float]) -> bytes:
# Convert [-1,1] float list to stereo int16 little-endian bytes
import struct
ba = bytearray()
for v in mono_f32:
# clip
if v > 1.0:
v = 1.0
elif v < -1.0:
v = -1.0
i16 = int(v * 32767.0)
ba += struct.pack('<hh', i16, i16)
return bytes(ba)
# ---------------- ThreadedAudioInput hooks -----------------
def _open(self) -> PcmFormat:
# Set up sounddevice RawInputStream (int16) and start callback producer
import sounddevice # pylint: disable=import-error
import math
import samplerate as sr # type: ignore
# We capture mono regardless of requested channels, then output stereo.
channels = 1
samplerate = int(self._pcm_format_in.sample_rate)
def _callback(indata, frames, time_info, status): # noqa: ARG001 (signature is fixed)
# indata: raw int16 bytes-like buffer of shape (frames, channels)
try:
if status:
logger.warning("Input status: %s", status)
if frames <= 0:
return
# Interpret raw bytes as little-endian int16 mono
mv = memoryview(indata).cast('h') # len == frames * channels
# Convert to float in [-1, 1]
# Avoid division errors; protect NaN/Inf
mono = []
for i in range(frames):
v = mv[i]
f = float(v) / 32768.0
if not (f == f) or math.isinf(f):
f = 0.0
mono.append(f)
self._fifo_write(mono)
except Exception: # never let callback raise
logger.exception("Audio input callback error")
# Create streaming resampler (mono)
try:
self._rs = sr.Resampler(converter_type="sinc_fastest", channels=1)
except Exception:
logger.exception("Failed to create samplerate.Resampler; audio may be silent")
self._rs = None
self._stream = sounddevice.RawInputStream(
samplerate=samplerate,
device=self._device,
channels=channels,
dtype='int16',
callback=_callback,
)
self._stream.start()
return self._pcm_format_out
def _read(self, frame_size: int) -> bytes:
# Produce 'frame_size' output frames (stereo INT16)
if frame_size <= 0:
return b''
# Update resampling ratio based on FIFO level
try:
self._update_ratio()
except Exception:
# keep going even if update failed
pass
# Process mono float32
mono = self._process(frame_size)
# Convert to stereo int16 LE bytes
return self._mono_to_stereo_int16_bytes(mono)
def _close(self) -> None:
try:
if self._stream is not None:
self._stream.stop()
self._stream.close()
except Exception:
logger.exception('Error closing input stream')
finally:
self._stream = None
+12 -8
View File
@@ -19,11 +19,10 @@ from __future__ import annotations
import logging
import struct
from collections.abc import Callable
from enum import IntEnum
from typing import Optional
from typing import Callable, Optional, cast
from bumble import core, l2cap
from bumble import avc, core, l2cap
from bumble.colors import color
# -----------------------------------------------------------------------------
@@ -145,9 +144,9 @@ class MessageAssembler:
# -----------------------------------------------------------------------------
class Protocol:
CommandHandler = Callable[[int, bytes], None]
CommandHandler = Callable[[int, avc.CommandFrame], None]
command_handlers: dict[int, CommandHandler] # Command handlers, by PID
ResponseHandler = Callable[[int, Optional[bytes]], None]
ResponseHandler = Callable[[int, Optional[avc.ResponseFrame]], None]
response_handlers: dict[int, ResponseHandler] # Response handlers, by PID
next_transaction_label: int
message_assembler: MessageAssembler
@@ -205,15 +204,20 @@ class Protocol:
self.send_ipid(transaction_label, pid)
return
self.command_handlers[pid](transaction_label, payload)
command_frame = cast(avc.CommandFrame, avc.Frame.from_bytes(payload))
self.command_handlers[pid](transaction_label, command_frame)
else:
if pid not in self.response_handlers:
logger.warning(f"no response handler for PID {pid}")
return
# By convention, for an ipid, send a None payload to the response handler.
response_payload = None if ipid else payload
self.response_handlers[pid](transaction_label, response_payload)
if ipid:
response_frame = None
else:
response_frame = cast(avc.ResponseFrame, avc.Frame.from_bytes(payload))
self.response_handlers[pid](transaction_label, response_frame)
def send_message(
self,
+523 -574
View File
File diff suppressed because it is too large Load Diff
+20 -20
View File
@@ -22,9 +22,21 @@ import enum
import functools
import logging
import struct
from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence
from dataclasses import dataclass, field
from typing import ClassVar, Optional, SupportsBytes, TypeVar, Union
from typing import (
AsyncIterator,
Awaitable,
Callable,
ClassVar,
Iterable,
List,
Optional,
Sequence,
SupportsBytes,
TypeVar,
Union,
cast,
)
from bumble import avc, avctp, core, hci, l2cap, utils
from bumble.colors import color
@@ -1750,11 +1762,7 @@ class Protocol(utils.EventEmitter):
),
)
response = self._check_response(response_context, GetCapabilitiesResponse)
return list(
capability
for capability in response.capabilities
if isinstance(capability, EventId)
)
return cast(List[EventId], response.capabilities)
async def get_play_status(self) -> SongAndPlayStatus:
"""Get the play status of the connected peer."""
@@ -2004,12 +2012,9 @@ class Protocol(utils.EventEmitter):
self.emit(self.EVENT_STOP)
def _on_avctp_command(self, transaction_label: int, payload: bytes) -> None:
command = avc.CommandFrame.from_bytes(payload)
if not isinstance(command, avc.CommandFrame):
raise core.InvalidPacketError(
f"{command} is not a valid AV/C Command Frame"
)
def _on_avctp_command(
self, transaction_label: int, command: avc.CommandFrame
) -> None:
logger.debug(
f"<<< AVCTP Command, transaction_label={transaction_label}: " f"{command}"
)
@@ -2068,13 +2073,8 @@ class Protocol(utils.EventEmitter):
self.send_not_implemented_response(transaction_label, command)
def _on_avctp_response(
self, transaction_label: int, payload: Optional[bytes]
self, transaction_label: int, response: Optional[avc.ResponseFrame]
) -> None:
response = avc.ResponseFrame.from_bytes(payload) if payload else None
if not isinstance(response, avc.ResponseFrame):
raise core.InvalidPacketError(
f"{response} is not a valid AV/C Response Frame"
)
logger.debug(
f"<<< AVCTP Response, transaction_label={transaction_label}: {response}"
)
@@ -2391,7 +2391,7 @@ class Protocol(utils.EventEmitter):
effective_volume = await self.delegate.get_absolute_volume()
self.send_avrcp_response(
transaction_label,
avc.ResponseFrame.ResponseCode.ACCEPTED,
avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
SetAbsoluteVolumeResponse(effective_volume),
)
+229 -429
View File
File diff suppressed because it is too large Load Diff
+2
View File
@@ -2263,6 +2263,8 @@ class Device(utils.CompositeEventEmitter):
EVENT_CONNECTION_FAILURE = "connection_failure"
EVENT_SCO_REQUEST = "sco_request"
EVENT_INQUIRY_COMPLETE = "inquiry_complete"
EVENT_REMOTE_NAME = "remote_name"
EVENT_REMOTE_NAME_FAILURE = "remote_name_failure"
EVENT_SCO_CONNECTION = "sco_connection"
EVENT_SCO_CONNECTION_FAILURE = "sco_connection_failure"
EVENT_CIS_REQUEST = "cis_request"
-4
View File
@@ -49,10 +49,6 @@ async def get_driver_for_host(host: Host) -> Optional[Driver]:
driver_classes: dict[str, type[Driver]] = {"rtk": rtk.Driver, "intel": intel.Driver}
probe_list: Iterable[str]
if driver_name := host.hci_metadata.get("driver"):
# The "driver" metadata may include runtime options after a '/' (for example
# "intel/ddc=..."). Keep only the base driver name (the portion before the
# first slash) so it matches a key in driver_classes (e.g. "intel").
driver_name = driver_name.split("/")[0]
# Only probe a single driver
probe_list = [driver_name]
else:
+7 -33
View File
@@ -459,10 +459,6 @@ class Driver(common.Driver):
== ModeOfOperation.OPERATIONAL
):
logger.debug("firmware already loaded")
# If the firmeare is already loaded, still attempt to load any
# device configuration (DDC). DDC can be applied independently of a
# firmware reload and may contain runtime overrides or patches.
await self.load_ddc_if_any()
return
# We only support some platforms and variants.
@@ -602,39 +598,17 @@ class Driver(common.Driver):
await self.reset_complete.wait()
logger.debug("reset complete")
await self.load_ddc_if_any(firmware_base_name)
async def load_ddc_if_any(self, firmware_base_name: Optional[str] = None) -> None:
"""
Check for and load any Device Data Configuration (DDC) blobs.
Args:
firmware_base_name: Base name of the selected firmware (e.g. "ibt-XXXX-YYYY").
If None, don't attempt to look up a .ddc file that
corresponds to the firmware image.
Priority:
1. If a ddc_override was provided via driver metadata, use it (highest priority).
2. Otherwise, if firmware_base_name is provided, attempt to find a .ddc file
that corresponds to the selected firmware image.
3. Finally, if a ddc_addon was provided, append/load it after the primary DDC.
"""
# If an explicit DDC override was supplied, use it and skip file lookup.
# Load the device config if there is one.
if self.ddc_override:
logger.debug("loading overridden DDC")
await self.load_device_config(self.ddc_override)
else:
# Only attempt .ddc file lookup if a firmware_base_name was provided.
if firmware_base_name is None:
logger.debug(
"no firmware_base_name provided; skipping .ddc file lookup"
)
else:
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
if self.ddc_addon:
logger.debug("loading DDC addon")
await self.load_device_config(self.ddc_addon)
+1 -3
View File
@@ -115,14 +115,12 @@ RTK_USB_PRODUCTS = {
# Realtek 8761BUV
(0x0B05, 0x190E),
(0x0BDA, 0x8771),
(0x0BDA, 0x877B),
(0x0BDA, 0xA728),
(0x0BDA, 0xA729),
(0x2230, 0x0016),
(0x2357, 0x0604),
(0x2550, 0x8761),
(0x2B89, 0x8761),
(0x7392, 0xC611),
(0x0BDA, 0x877B),
# Realtek 8821AE
(0x0B05, 0x17DC),
(0x13D3, 0x3414),
-38
View File
@@ -3441,17 +3441,6 @@ class HCI_Write_Synchronous_Flow_Control_Enable_Command(HCI_Command):
synchronous_flow_control_enable: int = field(metadata=metadata(1))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_Set_Controller_To_Host_Flow_Control_Command(HCI_Command):
'''
See Bluetooth spec @ 7.3.38 Set Controller To Host Flow Control command
'''
flow_control_enable: int = field(metadata=metadata(1))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
@@ -4349,15 +4338,6 @@ class HCI_LE_Write_Suggested_Default_Data_Length_Command(HCI_Command):
suggested_max_tx_time: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Read_Local_P_256_Public_Key_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.36 LE LE Read Local P-256 Public Key command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
@@ -4385,15 +4365,6 @@ class HCI_LE_Clear_Resolving_List_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Read_Resolving_List_Size_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.41 LE Read Resolving List Size command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
@@ -5057,15 +5028,6 @@ class HCI_LE_Periodic_Advertising_Terminate_Sync_Command(HCI_Command):
sync_handle: int = field(metadata=metadata(2))
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
class HCI_LE_Read_Transmit_Power_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.74 LE Read Transmit Power command
'''
# -----------------------------------------------------------------------------
@HCI_Command.command
@dataclasses.dataclass
+4 -4
View File
@@ -489,9 +489,9 @@ STATUS_CODES = {
@dataclasses.dataclass
class HfConfiguration:
supported_hf_features: collections.abc.Sequence[HfFeature]
supported_hf_indicators: collections.abc.Sequence[HfIndicator]
supported_audio_codecs: collections.abc.Sequence[AudioCodec]
supported_hf_features: list[HfFeature]
supported_hf_indicators: list[HfIndicator]
supported_audio_codecs: list[AudioCodec]
@dataclasses.dataclass
@@ -753,7 +753,7 @@ class HfProtocol(utils.EventEmitter):
# Build local features.
self.supported_hf_features = sum(configuration.supported_hf_features)
self.supported_audio_codecs = list(configuration.supported_audio_codecs)
self.supported_audio_codecs = configuration.supported_audio_codecs
self.hf_indicators = {
indicator: HfIndicatorState(indicator=indicator)
+1 -1
View File
@@ -246,7 +246,7 @@ class HID(ABC, utils.EventEmitter):
# Create a new L2CAP connection - interrupt channel
try:
channel = await self.connection.create_l2cap_channel(
l2cap.ClassicChannelSpec(HID_INTERRUPT_PSM)
l2cap.ClassicChannelSpec(HID_CONTROL_PSM)
)
channel.sink = self.on_intr_pdu
self.l2cap_intr_channel = channel
+3 -7
View File
@@ -674,14 +674,10 @@ class DLC(utils.EventEmitter):
while (self.tx_buffer and self.tx_credits > 0) or rx_credits_needed > 0:
# Get the next chunk, up to MTU size
if rx_credits_needed > 0:
chunk = bytes([rx_credits_needed])
chunk = bytes([rx_credits_needed]) + self.tx_buffer[: self.mtu - 1]
self.tx_buffer = self.tx_buffer[len(chunk) - 1 :]
self.rx_credits += rx_credits_needed
if self.tx_buffer and self.tx_credits > 0:
chunk += self.tx_buffer[: self.mtu - 1]
self.tx_buffer = self.tx_buffer[len(chunk) - 1 :]
tx_credit_spent = True
else:
tx_credit_spent = False
tx_credit_spent = len(chunk) > 1
else:
chunk = self.tx_buffer[: self.mtu]
self.tx_buffer = self.tx_buffer[len(chunk) :]
+1 -6
View File
@@ -84,12 +84,7 @@ async def open_transport(name: str) -> Transport:
scheme, *tail = name.split(':', 1)
spec = tail[0] if tail else None
metadata = None
# If a spec is provided, check for a metadata section in square brackets.
# The regex captures a comma-separated list of key=value pairs (allowing an
# optional trailing comma). The key is matched by \w+ and the value by [^,\]]+,
# meaning the value may contain any character except a comma or a closing
# bracket (']').
if spec and (m := re.search(r'\[(\w+=[^,\]]+(?:,\w+=[^,\]]+)*,?)\]', spec)):
if spec and (m := re.search(r'\[(\w+=\w+(?:,\w+=\w+)*,?)\]', spec)):
metadata_str = m.group(1)
if m.start() == 0:
# <metadata><spec>
+3 -6
View File
@@ -22,7 +22,6 @@ import contextlib
import io
import logging
import struct
from collections.abc import Awaitable, Callable
from typing import Any, ContextManager, Optional, Protocol
from bumble import core, hci
@@ -390,17 +389,15 @@ class PumpedPacketSource(ParserSource):
# -----------------------------------------------------------------------------
class PumpedPacketSink:
pump_task: Optional[asyncio.Task[None]]
def __init__(self, send: Callable[[bytes], Awaitable[Any]]):
def __init__(self, send):
self.send_function = send
self.packet_queue = asyncio.Queue[bytes]()
self.packet_queue = asyncio.Queue()
self.pump_task = None
def on_packet(self, packet: bytes) -> None:
self.packet_queue.put_nowait(packet)
def start(self) -> None:
def start(self):
async def pump_packets():
while True:
try:
+2 -2
View File
@@ -17,7 +17,7 @@
# -----------------------------------------------------------------------------
import logging
import websockets.asyncio.client
import websockets.client
from bumble.transport.common import (
PumpedPacketSink,
@@ -42,7 +42,7 @@ async def open_ws_client_transport(spec: str) -> Transport:
Example: ws://localhost:7681/v1/websocket/bt
'''
websocket = await websockets.asyncio.client.connect(spec)
websocket = await websockets.client.connect(spec)
class WsTransport(PumpedTransport):
async def close(self):
+8 -16
View File
@@ -16,9 +16,8 @@
# Imports
# -----------------------------------------------------------------------------
import logging
from typing import Optional
import websockets.asyncio.server
import websockets
from bumble.transport.common import ParserSource, PumpedPacketSink, Transport
@@ -41,12 +40,7 @@ async def open_ws_server_transport(spec: str) -> Transport:
'''
class WsServerTransport(Transport):
sink: PumpedPacketSink
source: ParserSource
connection: Optional[websockets.asyncio.server.ServerConnection]
server: Optional[websockets.asyncio.server.Server]
def __init__(self) -> None:
def __init__(self):
source = ParserSource()
sink = PumpedPacketSink(self.send_packet)
self.connection = None
@@ -54,19 +48,17 @@ async def open_ws_server_transport(spec: str) -> Transport:
super().__init__(source, sink)
async def serve(self, local_host: str, local_port: str) -> None:
async def serve(self, local_host, local_port):
self.sink.start()
# pylint: disable-next=no-member
self.server = await websockets.asyncio.server.serve(
handler=self.on_connection,
self.server = await websockets.serve(
ws_handler=self.on_connection,
host=local_host if local_host != '_' else None,
port=int(local_port),
)
logger.debug(f'websocket server ready on port {local_port}')
async def on_connection(
self, connection: websockets.asyncio.server.ServerConnection
) -> None:
async def on_connection(self, connection):
logger.debug(
f'new connection on {connection.local_address} '
f'from {connection.remote_address}'
@@ -85,11 +77,11 @@ async def open_ws_server_transport(spec: str) -> Transport:
# We're now disconnected
self.connection = None
async def send_packet(self, packet: bytes) -> None:
async def send_packet(self, packet):
if self.connection is None:
logger.debug('no connection, dropping packet')
return
await self.connection.send(packet)
return await self.connection.send(packet)
local_host, local_port = spec.rsplit(':', maxsplit=1)
transport = WsServerTransport()
+4
View File
@@ -82,3 +82,7 @@ services and characteristics.
# `run_scanner.py`
Run a host application connected to a 'real' BLE controller over a UART HCI to a dev board running Zephyr in HCI mode (could be any other UART BLE controller, or BlueZ over a virtual UART), that starts scanning and prints out the scan results.
# run auracast with usb stick
bumble-auracast transmit 'serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_CC69A2912F84AE5E-if00,1000000,rtscts' --input device
+3 -3
View File
@@ -20,7 +20,7 @@ import json
import struct
import sys
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble import data_types
@@ -367,7 +367,7 @@ async def keyboard_device(device, command):
if command == 'web':
# Start a Websocket server to receive events from a web page
async def serve(websocket: websockets.asyncio.server.ServerConnection):
async def serve(websocket, _path):
while True:
try:
message = await websocket.recv()
@@ -398,7 +398,7 @@ async def keyboard_device(device, command):
pass
# pylint: disable-next=no-member
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await websockets.serve(serve, 'localhost', 8989)
await asyncio.get_event_loop().create_future()
else:
message = bytes('hello', 'ascii')
+5 -4
View File
@@ -20,7 +20,7 @@ import logging
import sys
from typing import Optional
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble import data_types, decoder, gatt
@@ -29,11 +29,12 @@ from bumble.device import AdvertisingParameters, Device
from bumble.profiles import asha
from bumble.transport import open_transport
ws_connection: Optional[websockets.asyncio.server.ServerConnection] = None
ws_connection: Optional[websockets.WebSocketServerProtocol] = None
g722_decoder = decoder.G722Decoder()
async def ws_server(ws_client: websockets.asyncio.server.ServerConnection):
async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str):
del path
global ws_connection
ws_connection = ws_client
@@ -99,7 +100,7 @@ async def main() -> None:
),
)
await websockets.asyncio.server.serve(ws_server, port=8888)
await websockets.serve(ws_server, port=8888)
await hci_transport.source.terminated
+3 -6
View File
@@ -21,9 +21,8 @@ import asyncio
import json
import logging
import sys
from typing import Optional
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble import a2dp, avc, avdtp, avrcp, utils
@@ -218,8 +217,6 @@ def on_avrcp_start(avrcp_protocol: avrcp.Protocol, websocket_server: WebSocketSe
# -----------------------------------------------------------------------------
class WebSocketServer:
socket: Optional[websockets.asyncio.server.ServerConnection]
def __init__(
self, avrcp_protocol: avrcp.Protocol, avrcp_delegate: Delegate
) -> None:
@@ -230,9 +227,9 @@ class WebSocketServer:
async def start(self) -> None:
# pylint: disable-next=no-member
await websockets.asyncio.server.serve(self.serve, 'localhost', 8989) # type: ignore
await websockets.serve(self.serve, 'localhost', 8989) # type: ignore
async def serve(self, socket: websockets.asyncio.server.ServerConnection) -> None:
async def serve(self, socket, _path) -> None:
print('### WebSocket connected')
self.socket = socket
while True:
+1 -2
View File
@@ -19,7 +19,6 @@ import asyncio
import sys
import bumble.logging
from bumble import hci
from bumble.controller import Controller
from bumble.device import Device
from bumble.gatt import (
@@ -62,7 +61,7 @@ async def main() -> None:
host_sink=hci_transport.sink,
link=link,
)
controller1.random_address = hci.Address(sys.argv[1])
controller1.random_address = sys.argv[1]
# Create a second controller using the same link
controller2 = Controller('C2', link=link)
+5 -4
View File
@@ -22,7 +22,7 @@ import logging
import sys
from typing import Iterable, Optional
import websockets.asyncio.server
import websockets
import bumble.core
import bumble.logging
@@ -33,7 +33,7 @@ from bumble.transport import open_transport
logger = logging.getLogger(__name__)
ws: Optional[websockets.asyncio.server.ServerConnection] = None
ws: Optional[websockets.WebSocketServerProtocol] = None
ag_protocol: Optional[hfp.AgProtocol] = None
source_file: Optional[io.BufferedReader] = None
@@ -114,7 +114,8 @@ def on_hfp_state_change(connected: bool):
send_message(type='hfp_state_change', connected=connected)
async def ws_server(ws_client: websockets.asyncio.server.ServerConnection):
async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str):
del path
global ws
ws = ws_client
@@ -272,7 +273,7 @@ async def main() -> None:
on_dlc(session)
await websockets.asyncio.server.serve(ws_server, port=8888)
await websockets.serve(ws_server, port=8888)
if len(sys.argv) >= 5:
global source_file
+4 -4
View File
@@ -22,7 +22,7 @@ import json
import sys
from typing import Optional
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble import hci, hfp, rfcomm
@@ -30,7 +30,7 @@ from bumble.device import Connection, Device
from bumble.hfp import HfProtocol
from bumble.transport import open_transport
ws: Optional[websockets.asyncio.server.ServerConnection] = None
ws: Optional[websockets.WebSocketServerProtocol] = None
hf_protocol: Optional[HfProtocol] = None
@@ -143,7 +143,7 @@ async def main() -> None:
await device.set_connectable(True)
# Start the UI websocket server to offer a few buttons and input boxes
async def serve(websocket: websockets.asyncio.server.ServerConnection):
async def serve(websocket: websockets.WebSocketServerProtocol, _path):
global ws
ws = websocket
async for message in websocket:
@@ -166,7 +166,7 @@ async def main() -> None:
response = str(await hf_protocol.query_current_calls())
await websocket.send(response)
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await websockets.serve(serve, 'localhost', 8989)
await hci_transport.source.wait_for_termination()
+3 -3
View File
@@ -20,7 +20,7 @@ import json
import struct
import sys
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble.core import (
@@ -425,7 +425,7 @@ deviceData = DeviceData()
async def keyboard_device(hid_device: HID_Device):
# Start a Websocket server to receive events from a web page
async def serve(websocket: websockets.asyncio.server.ServerConnection):
async def serve(websocket, _path):
global deviceData
while True:
try:
@@ -476,7 +476,7 @@ async def keyboard_device(hid_device: HID_Device):
pass
# pylint: disable-next=no-member
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await websockets.serve(serve, 'localhost', 8989)
await asyncio.get_event_loop().create_future()
+4 -4
View File
@@ -20,7 +20,7 @@ import json
import sys
from typing import Optional
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble import data_types
@@ -101,7 +101,7 @@ async def main() -> None:
)
device.add_service(AudioStreamControlService(device, sink_ase_id=[1]))
ws: Optional[websockets.asyncio.server.ServerConnection] = None
ws: Optional[websockets.WebSocketServerProtocol] = None
mcp: Optional[MediaControlServiceProxy] = None
advertising_data = bytes(
@@ -162,7 +162,7 @@ async def main() -> None:
device.on('connection', on_connection)
async def serve(websocket: websockets.asyncio.server.ServerConnection):
async def serve(websocket: websockets.WebSocketServerProtocol, _path):
nonlocal ws
ws = websocket
async for message in websocket:
@@ -173,7 +173,7 @@ async def main() -> None:
)
ws = None
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await websockets.serve(serve, 'localhost', 8989)
await hci_transport.source.terminated
+4 -4
View File
@@ -21,7 +21,7 @@ import secrets
import sys
from typing import Optional
import websockets.asyncio.server
import websockets
import bumble.logging
from bumble import data_types
@@ -110,7 +110,7 @@ async def main() -> None:
vcs = VolumeControlService()
device.add_service(vcs)
ws: Optional[websockets.asyncio.server.ServerConnection] = None
ws: Optional[websockets.WebSocketServerProtocol] = None
def on_volume_state_change():
if ws:
@@ -152,7 +152,7 @@ async def main() -> None:
advertising_data=advertising_data,
)
async def serve(websocket: websockets.asyncio.server.ServerConnection):
async def serve(websocket: websockets.WebSocketServerProtocol, _path):
nonlocal ws
await websocket.send(
dumps_volume_state(vcs.volume_setting, vcs.muted, vcs.change_counter)
@@ -166,7 +166,7 @@ async def main() -> None:
await device.notify_subscribers(vcs.volume_state)
ws = None
await websockets.asyncio.server.serve(serve, 'localhost', 8989)
await websockets.serve(serve, 'localhost', 8989)
await hci_transport.source.terminated
+1 -1
View File
@@ -32,7 +32,7 @@ dependencies = [
"pyserial-asyncio >= 0.5; platform_system!='Emscripten'",
"pyserial >= 3.5; platform_system!='Emscripten'",
"pyusb >= 1.2; platform_system!='Emscripten'",
"websockets >= 15.0.1; platform_system!='Emscripten'",
"websockets == 13.1; platform_system!='Emscripten'",
]
[project.optional-dependencies]
+31 -20
View File
@@ -23,7 +23,18 @@ from typing import Awaitable
import pytest
from bumble import a2dp, avdtp
from bumble import a2dp
from bumble.avdtp import (
A2DP_SBC_CODEC_TYPE,
AVDTP_AUDIO_MEDIA_TYPE,
AVDTP_IDLE_STATE,
AVDTP_STREAMING_STATE,
AVDTP_TSEP_SNK,
Listener,
MediaCodecCapabilities,
MediaPacketPump,
Protocol,
)
from bumble.controller import Controller
from bumble.core import PhysicalTransport
from bumble.device import Device
@@ -124,9 +135,9 @@ async def test_self_connection():
# -----------------------------------------------------------------------------
def source_codec_capabilities():
return avdtp.MediaCodecCapabilities(
media_type=avdtp.MediaType.AUDIO,
media_codec_type=a2dp.CodecType.SBC,
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=a2dp.SbcMediaCodecInformation(
sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100,
channel_mode=a2dp.SbcMediaCodecInformation.ChannelMode.JOINT_STEREO,
@@ -141,9 +152,9 @@ def source_codec_capabilities():
# -----------------------------------------------------------------------------
def sink_codec_capabilities():
return avdtp.MediaCodecCapabilities(
media_type=avdtp.MediaType.AUDIO,
media_codec_type=a2dp.CodecType.SBC,
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=a2dp.SbcMediaCodecInformation(
sampling_frequency=a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_48000
| a2dp.SbcMediaCodecInformation.SamplingFrequency.SF_44100
@@ -190,7 +201,7 @@ async def test_source_sink_1():
sink.on('rtp_packet', on_rtp_packet)
# Create a listener to wait for AVDTP connections
listener = avdtp.Listener.for_device(two_devices.devices[1])
listener = Listener.for_device(two_devices.devices[1])
listener.on('connection', on_avdtp_connection)
async def make_connection():
@@ -203,13 +214,13 @@ async def test_source_sink_1():
return connections[0]
connection = await make_connection()
client = await avdtp.Protocol.connect(connection)
client = await Protocol.connect(connection)
endpoints = await client.discover_remote_endpoints()
assert len(endpoints) == 1
remote_sink = list(endpoints)[0]
assert remote_sink.in_use == 0
assert remote_sink.media_type == avdtp.MediaType.AUDIO
assert remote_sink.tsep == avdtp.StreamEndPointType.SNK
assert remote_sink.media_type == AVDTP_AUDIO_MEDIA_TYPE
assert remote_sink.tsep == AVDTP_TSEP_SNK
async def generate_packets(packet_count):
sequence_number = 0
@@ -228,24 +239,24 @@ async def test_source_sink_1():
rtp_packets_fully_received = asyncio.get_running_loop().create_future()
rtp_packets_expected = 3
rtp_packets = []
pump = avdtp.MediaPacketPump(generate_packets(3))
pump = MediaPacketPump(generate_packets(3))
source = client.add_source(source_codec_capabilities(), pump)
stream = await client.create_stream(source, remote_sink)
await stream.start()
assert stream.state == avdtp.State.STREAMING
assert stream.state == AVDTP_STREAMING_STATE
assert stream.local_endpoint.in_use == 1
assert stream.rtp_channel is not None
assert sink.in_use == 1
assert sink.stream is not None
assert sink.stream.state == avdtp.State.STREAMING
assert sink.stream.state == AVDTP_STREAMING_STATE
await rtp_packets_fully_received
await stream.close()
assert stream.rtp_channel is None
assert source.in_use == 0
assert source.stream.state == avdtp.State.IDLE
assert source.stream.state == AVDTP_IDLE_STATE
assert sink.in_use == 0
assert sink.stream.state == avdtp.State.IDLE
assert sink.stream.state == AVDTP_IDLE_STATE
# Send packets manually
rtp_packets_fully_received = asyncio.get_running_loop().create_future()
@@ -257,12 +268,12 @@ async def test_source_sink_1():
source = client.add_source(source_codec_capabilities(), None)
stream = await client.create_stream(source, remote_sink)
await stream.start()
assert stream.state == avdtp.State.STREAMING
assert stream.state == AVDTP_STREAMING_STATE
assert stream.local_endpoint.in_use == 1
assert stream.rtp_channel is not None
assert sink.in_use == 1
assert sink.stream is not None
assert sink.stream.state == avdtp.State.STREAMING
assert sink.stream.state == AVDTP_STREAMING_STATE
stream.send_media_packet(source_packets[0])
stream.send_media_packet(source_packets[1])
@@ -272,9 +283,9 @@ async def test_source_sink_1():
assert stream.rtp_channel is None
assert len(rtp_packets) == 3
assert source.in_use == 0
assert source.stream.state == avdtp.State.IDLE
assert source.stream.state == AVDTP_IDLE_STATE
assert sink.in_use == 0
assert sink.stream.state == avdtp.State.IDLE
assert sink.stream.state == AVDTP_IDLE_STATE
# -----------------------------------------------------------------------------
+36 -95
View File
@@ -15,108 +15,43 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
from bumble import avdtp
from bumble.a2dp import A2DP_SBC_CODEC_TYPE
from bumble.avdtp import (
AVDTP_AUDIO_MEDIA_TYPE,
AVDTP_DELAY_REPORTING_SERVICE_CATEGORY,
AVDTP_GET_CAPABILITIES,
AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
AVDTP_SET_CONFIGURATION,
Get_Capabilities_Response,
MediaCodecCapabilities,
Message,
ServiceCapabilities,
Set_Configuration_Command,
)
from bumble.rtp import MediaPacket
# -----------------------------------------------------------------------------
@pytest.mark.parametrize(
'message',
(
avdtp.Discover_Command(),
avdtp.Discover_Response(
endpoints=[
avdtp.EndPointInfo(
seid=1,
in_use=1,
media_type=avdtp.MediaType.AUDIO,
tsep=avdtp.StreamEndPointType.SNK,
)
]
def test_messages():
capabilities = [
ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=bytes.fromhex('211502fa'),
),
avdtp.Get_Capabilities_Command(acp_seid=1),
avdtp.Get_Capabilities_Response(
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
avdtp.MediaCodecCapabilities(
media_type=avdtp.AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=bytes.fromhex('211502fa'),
),
avdtp.ServiceCapabilities(avdtp.AVDTP_DELAY_REPORTING_SERVICE_CATEGORY),
]
),
avdtp.Get_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Get_All_Capabilities_Command(acp_seid=1),
avdtp.Get_All_Capabilities_Response(
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
]
),
avdtp.Get_All_Capabilities_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Set_Configuration_Command(
acp_seid=1,
int_seid=2,
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
],
),
avdtp.Set_Configuration_Response(),
avdtp.Set_Configuration_Reject(
service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR,
),
avdtp.Get_Configuration_Command(acp_seid=1),
avdtp.Get_Configuration_Response(
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
]
),
avdtp.Get_Configuration_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Reconfigure_Command(
acp_seid=1,
capabilities=[
avdtp.ServiceCapabilities(avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY)
],
),
avdtp.Reconfigure_Response(),
avdtp.Reconfigure_Reject(
service_category=avdtp.AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY,
error_code=avdtp.AVDTP_UNSUPPORTED_CONFIGURATION_ERROR,
),
avdtp.Open_Command(acp_seid=1),
avdtp.Open_Response(),
avdtp.Open_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Start_Command(acp_seids=[1, 2]),
avdtp.Start_Response(),
avdtp.Start_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR),
avdtp.Close_Command(acp_seid=1),
avdtp.Close_Response(),
avdtp.Close_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.Suspend_Command(acp_seids=[1, 2]),
avdtp.Suspend_Response(),
avdtp.Suspend_Reject(acp_seid=1, error_code=avdtp.AVDTP_BAD_STATE_ERROR),
avdtp.Abort_Command(acp_seid=1),
avdtp.Abort_Response(),
avdtp.Security_Control_Command(acp_seid=1, data=b'foo'),
avdtp.Security_Control_Response(),
avdtp.Security_Control_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
avdtp.General_Reject(),
avdtp.DelayReport_Command(acp_seid=1, delay=100),
avdtp.DelayReport_Response(),
avdtp.DelayReport_Reject(error_code=avdtp.AVDTP_BAD_ACP_SEID_ERROR),
),
)
def test_messages(message: avdtp.Message):
parsed = avdtp.Message.create(
signal_identifier=message.signal_identifier,
message_type=message.message_type,
payload=message.payload,
ServiceCapabilities(AVDTP_DELAY_REPORTING_SERVICE_CATEGORY),
]
message = Get_Capabilities_Response(capabilities)
parsed = Message.create(
AVDTP_GET_CAPABILITIES, Message.MessageType.RESPONSE_ACCEPT, message.payload
)
assert message.payload == parsed.payload
message = Set_Configuration_Command(3, 4, capabilities)
parsed = Message.create(
AVDTP_SET_CONFIGURATION, Message.MessageType.COMMAND, message.payload
)
assert message == parsed
assert message.payload == parsed.payload
@@ -127,3 +62,9 @@ def test_rtp():
)
media_packet = MediaPacket.from_bytes(packet)
print(media_packet)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_messages()
test_rtp()