Compare commits

..

10 Commits

Author SHA1 Message Date
pstruebi 7e4948d9ef add small asrc example 2025-10-06 11:04:06 +02:00
zxzxwu 32d448edf3 Merge pull request #790 from markusjellitsch/task/fix-cis-reconnect
Fix - Allow re-creation of CIS link when not successfull
2025-09-26 19:55:49 +08:00
markus 3d615b13ce fix accessing pending_cis dict 2025-09-26 12:38:38 +02:00
Markus Jellitsch 1ad92dc759 Update bumble/device.py
Co-authored-by: zxzxwu <92432172+zxzxwu@users.noreply.github.com>
2025-09-26 12:25:50 +02:00
markus aacfd4328c satisfy the linter, return None 2025-09-26 12:02:54 +02:00
markus 6aa1f5211c use local cis_link.handle to the pop the dict 2025-09-26 11:13:52 +02:00
markus df8e454ee5 pop cis link only when cis created successfully 2025-09-26 10:58:37 +02:00
Gilles Boccon-Gibod aec50ac616 Merge pull request #789 from google/gbg/nrf-uart-flow-control 2025-09-26 09:34:33 +02:00
zxzxwu 6e6b4cd4b2 Merge pull request #773 from wescande/main
HAP: wait for MTU to process reconnection event
2025-09-26 01:36:45 +08:00
William Escande 8a5f6a61d5 HAP: wait for MTU to process reconnection event
When HAP reconnect, it sends indication of all events that happen during
the disconnection.
But it should wait for the profile to be ready and for the MTU to have
been negotiated or else the remote may not be ready yet.

As a side effect of this, the current GattServer doesn't re-populate the
handle of subscriber during a reconnection, we have to bypass this check
to send the notification
2025-09-16 16:18:16 -07:00
6 changed files with 384 additions and 7 deletions
+3 -1
View File
@@ -41,6 +41,7 @@ import bumble.transport
import bumble.utils
from bumble import company_ids, core, data_types, gatt, hci
from bumble.audio import io as audio_io
from bumble.audio import io_asrc as audio_io_asrc
from bumble.colors import color
from bumble.profiles import bap, bass, le_audio, pbp
@@ -891,7 +892,8 @@ async def run_transmit(
print('Start Periodic Advertising')
await advertising_set.start_periodic()
audio_input = await audio_io.create_audio_input(input, input_format)
#audio_input = await audio_io.create_audio_input(input, input_format)
audio_input = audio_io_asrc.SoundDeviceAudioInputAsrc(input[7:], input_format)
pcm_format = await audio_input.open()
# This try should be replaced with contextlib.aclosing() when python 3.9 is no
# longer needed.
+339
View File
@@ -0,0 +1,339 @@
# Copyright 2025
#
# Drop-in replacement for `SoundDeviceAudioInput` that adds a tiny ASRC stage.
#
# Constraints per request:
# - Only import io_bumble.py at module level.
# - Reuse the ASRC functionality from asrc.py conceptually (PI control + FIFO +
# linear/sinc resampling behavior). We implement a minimal, dependency-free
# variant (linear interpolation with a small PI loop) so this module does not
# import anything else at top-level.
#
# Notes:
# - Input stream is captured via sounddevice (imported lazily inside methods).
# - Input is mono float32 for simplicity; output matches the original class
# signature: INT16, stereo, at the same nominal sample rate as requested.
from .io import PcmFormat, ThreadedAudioInput, logger # only top-level import
class SoundDeviceAudioInputAsrc(ThreadedAudioInput):
"""Sound device audio input with a simple ASRC stage.
Interface-compatible with `io_bumble.SoundDeviceAudioInput`:
- __init__(device_name: str, pcm_format: PcmFormat)
- _open() -> PcmFormat
- _read(frame_size: int) -> bytes
- _close() -> None
Behavior:
- Captures mono float32 frames from the device.
- Buffers into an internal ring buffer.
- Produces stereo INT16 frames using a linear-interp resampler whose
ratio is adjusted by a tiny PI loop to hold FIFO depth near a target.
"""
def __init__(self, device_name: str, pcm_format: str) -> None:
super().__init__()
# Device & format
self._device = int(device_name) if device_name else None
pcm_format: PcmFormat | None
if pcm_format == 'auto':
pcm_format = None
else:
pcm_format = PcmFormat.from_str(pcm_format)
self._pcm_format_in = pcm_format
# We always output stereo INT16 at the same nominal sample rate.
self._pcm_format_out = PcmFormat(
PcmFormat.Endianness.LITTLE,
PcmFormat.SampleType.INT16,
pcm_format.sample_rate,
2,
)
# sounddevice stream (created in _open)
self._stream = None # type: ignore[assignment]
# --- ASRC state (inspired by asrc.py) ---
# Nominal input/output rate ratio
self._r = 1.0
self._integral = 0.0
self._phi = 0.0 # fractional read position within current chunk
# PI gains (tiny to avoid warble)
self._Kp = 2e-6
self._Ki = 5e-8
self._R0 = 1.0
# Target FIFO level and deadband (≈10 ms target, 0.5 ms deadband)
fs = float(self._pcm_format_in.sample_rate)
self._target_samples = max(1, int(0.010 * fs))
self._deadband = max(1, int(0.0005 * fs))
# Ring buffer for mono float32 samples
# Capacity ~2 seconds for headroom
self._rb_cap = max(self._target_samples * 32, int(2 * fs))
self._rb = None # created in _init_rb()
self._ridx = 0
self._size = 0
self._lock = None # created in _init_rb()
self._init_rb()
# Light logging timer
self._last_log = 0.0
# Streaming resampler and internal output buffer (lazy init)
self._rs = None # samplerate.Resampler
self._out_buf = None # numpy.ndarray float32
# ---------------- Internal helpers -----------------
def _init_rb(self) -> None:
# Lazy import standard libs to keep only io_bumble imported at top level
import threading
from array import array
self._rb = array('f', [0.0] * self._rb_cap) # float32 ring buffer
self._lock = threading.Lock()
self._ridx = 0
self._size = 0
def _fifo_len(self) -> int:
with self._lock:
return self._size
def _fifo_write(self, x_f32) -> None:
# x_f32: 1-D float32-like iterable
k = len(x_f32)
if k <= 0:
return
rb = self._rb
if rb is None:
return
with self._lock:
# Trim if larger than capacity: keep last N
if k >= self._rb_cap:
x_f32 = x_f32[-self._rb_cap:]
k = self._rb_cap
# Make room on overflow (drop oldest)
excess = max(0, self._size + k - self._rb_cap)
if excess:
self._ridx = (self._ridx + excess) % self._rb_cap
self._size -= excess
# Write at tail position
wpos = (self._ridx + self._size) % self._rb_cap
first = min(k, self._rb_cap - wpos)
# Write first chunk
from array import array as _array # lazy import
rb[wpos:wpos + first] = _array('f', x_f32[:first])
# Wrap if needed
second = k - first
if second:
rb[0:second] = _array('f', x_f32[first:])
self._size += k
def _fifo_peek_array(self, n: int):
# Returns a Python list[float] copy of up to n samples
rb = self._rb
if rb is None:
return []
m = max(0, min(n, self._fifo_len()))
if m <= 0:
return []
pos = self._ridx
first = min(m, self._rb_cap - pos)
# Copy out
out = [0.0] * m
# First chunk
out[:first] = rb[pos:pos + first]
# Second chunk if wrap
second = m - first
if second > 0:
out[first:] = rb[0:second]
return out
def _fifo_discard(self, n: int) -> None:
with self._lock:
d = max(0, min(n, self._size))
self._ridx = (self._ridx + d) % self._rb_cap
self._size -= d
def _update_ratio(self) -> None:
# PI loop to hold buffer near target
e = self._target_samples - self._fifo_len()
if -self._deadband <= e <= self._deadband:
e = 0.0
cand_integral = self._integral + e
r_unclamped = self._R0 * (1.0 + self._Kp * e + self._Ki * cand_integral)
# Limit to ±1000 ppm vs nominal
ppm_unclamped = 1e6 * (r_unclamped / self._R0 - 1.0)
saturated_high = ppm_unclamped > 1000.0
saturated_low = ppm_unclamped < -1000.0
if saturated_high:
self._r = self._R0 * (1 + 1000e-6)
if e <= 0:
self._integral = cand_integral
self._integral *= 0.99
elif saturated_low:
self._r = self._R0 * (1 - 1000e-6)
if e >= 0:
self._integral = cand_integral
self._integral *= 0.99
else:
self._integral = cand_integral
self._r = r_unclamped
# Occasional log
try:
import time as _time
now = _time.time()
if now - self._last_log > 1.0:
buf_ms = 1000.0 * self._fifo_len() / float(self._pcm_format_in.sample_rate)
print(
f"\nASRC buf={buf_ms:5.1f} ms r={self._r:.9f} corr={1e6 * (self._r / self._R0 - 1.0):+7.1f} ppm"
)
self._last_log = now
except Exception:
# Logging must never break audio
pass
def _process(self, n_out: int) -> list[float]:
# Accumulate at least n_out samples using samplerate.Resampler
if n_out <= 0:
return []
# Lazy imports
import numpy as np # type: ignore
# Lazy init output buffer
if self._out_buf is None:
self._out_buf = np.zeros(0, dtype=np.float32)
# Choose chunk so we don't take too much from FIFO each time
max_chunk = max(256, int(np.ceil(n_out / max(1e-9, self._r))))
safety_iters = 0
while self._out_buf.size < n_out and safety_iters < 16:
safety_iters += 1
available = self._fifo_len()
if available <= 0:
break
take = min(available, max_chunk)
x = self._fifo_peek_array(take)
self._fifo_discard(take)
if not x:
break
x_arr = np.asarray(x, dtype=np.float32)
if self._rs is not None:
try:
y = self._rs.process(x_arr, ratio=float(self._r), end_of_input=False)
except Exception:
logger.exception("ASRC resampler error")
y = None
else:
y = None
if y is not None and getattr(y, 'size', 0):
y = y.astype(np.float32, copy=False)
if self._out_buf.size == 0:
self._out_buf = y
else:
self._out_buf = np.concatenate((self._out_buf, y))
if self._out_buf.size >= n_out:
out = self._out_buf[:n_out]
self._out_buf = self._out_buf[n_out:]
return out.tolist()
else:
# Not enough data produced; pad with zeros
out = np.zeros(n_out, dtype=np.float32)
if self._out_buf.size:
out[: self._out_buf.size] = self._out_buf
self._out_buf = np.zeros(0, dtype=np.float32)
return out.tolist()
def _mono_to_stereo_int16_bytes(self, mono_f32: list[float]) -> bytes:
# Convert [-1,1] float list to stereo int16 little-endian bytes
import struct
ba = bytearray()
for v in mono_f32:
# clip
if v > 1.0:
v = 1.0
elif v < -1.0:
v = -1.0
i16 = int(v * 32767.0)
ba += struct.pack('<hh', i16, i16)
return bytes(ba)
# ---------------- ThreadedAudioInput hooks -----------------
def _open(self) -> PcmFormat:
# Set up sounddevice RawInputStream (int16) and start callback producer
import sounddevice # pylint: disable=import-error
import math
import samplerate as sr # type: ignore
# We capture mono regardless of requested channels, then output stereo.
channels = 1
samplerate = int(self._pcm_format_in.sample_rate)
def _callback(indata, frames, time_info, status): # noqa: ARG001 (signature is fixed)
# indata: raw int16 bytes-like buffer of shape (frames, channels)
try:
if status:
logger.warning("Input status: %s", status)
if frames <= 0:
return
# Interpret raw bytes as little-endian int16 mono
mv = memoryview(indata).cast('h') # len == frames * channels
# Convert to float in [-1, 1]
# Avoid division errors; protect NaN/Inf
mono = []
for i in range(frames):
v = mv[i]
f = float(v) / 32768.0
if not (f == f) or math.isinf(f):
f = 0.0
mono.append(f)
self._fifo_write(mono)
except Exception: # never let callback raise
logger.exception("Audio input callback error")
# Create streaming resampler (mono)
try:
self._rs = sr.Resampler(converter_type="sinc_fastest", channels=1)
except Exception:
logger.exception("Failed to create samplerate.Resampler; audio may be silent")
self._rs = None
self._stream = sounddevice.RawInputStream(
samplerate=samplerate,
device=self._device,
channels=channels,
dtype='int16',
callback=_callback,
)
self._stream.start()
return self._pcm_format_out
def _read(self, frame_size: int) -> bytes:
# Produce 'frame_size' output frames (stereo INT16)
if frame_size <= 0:
return b''
# Update resampling ratio based on FIFO level
try:
self._update_ratio()
except Exception:
# keep going even if update failed
pass
# Process mono float32
mono = self._process(frame_size)
# Convert to stereo int16 LE bytes
return self._mono_to_stereo_int16_bytes(mono)
def _close(self) -> None:
try:
if self._stream is not None:
self._stream.stop()
self._stream.close()
except Exception:
logger.exception('Error closing input stream')
finally:
self._stream = None
+2 -1
View File
@@ -4727,7 +4727,7 @@ class Device(utils.CompositeEventEmitter):
self, cis_acl_pairs: Sequence[tuple[int, Connection]]
) -> list[CisLink]:
for cis_handle, acl_connection in cis_acl_pairs:
cis_id, cig_id = self._pending_cis.pop(cis_handle)
cis_id, cig_id = self._pending_cis[cis_handle]
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
@@ -4743,6 +4743,7 @@ class Device(utils.CompositeEventEmitter):
}
def on_cis_establishment(cis_link: CisLink) -> None:
self._pending_cis.pop(cis_link.handle)
if pending_future := pending_cis_establishments.get(cis_link.handle):
pending_future.set_result(cis_link)
+33 -4
View File
@@ -273,12 +273,19 @@ class HearingAccessService(gatt.TemplateService):
def on_disconnection(_reason) -> None:
self.currently_connected_clients.discard(connection)
@connection.on(connection.EVENT_CONNECTION_ATT_MTU_UPDATE)
def on_mtu_update(*_: Any) -> None:
self.on_incoming_connection(connection)
@connection.on(connection.EVENT_CONNECTION_ENCRYPTION_CHANGE)
def on_encryption_change(*_: Any) -> None:
self.on_incoming_connection(connection)
@connection.on(connection.EVENT_PAIRING)
def on_pairing(*_: Any) -> None:
self.on_incoming_paired_connection(connection)
self.on_incoming_connection(connection)
if connection.peer_resolvable_address:
self.on_incoming_paired_connection(connection)
self.on_incoming_connection(connection)
self.hearing_aid_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC,
@@ -315,9 +322,30 @@ class HearingAccessService(gatt.TemplateService):
]
)
def on_incoming_paired_connection(self, connection: Connection):
def on_incoming_connection(self, connection: Connection):
'''Setup initial operations to handle a remote bonded HAP device'''
# TODO Should we filter on HAP device only ?
if not connection.is_encrypted:
logging.debug(f'HAS: {connection.peer_address} is not encrypted')
return
if not connection.peer_resolvable_address:
logging.debug(f'HAS: {connection.peer_address} is not paired')
return
if connection.att_mtu < 49:
logging.debug(
f'HAS: {connection.peer_address} invalid MTU={connection.att_mtu}'
)
return
if connection.peer_address in self.currently_connected_clients:
logging.debug(
f'HAS: Already connected to {connection.peer_address} nothing to do'
)
return
self.currently_connected_clients.add(connection)
if (
connection.peer_address
@@ -457,6 +485,7 @@ class HearingAccessService(gatt.TemplateService):
connection,
self.hearing_aid_preset_control_point,
value=op_list[0].to_bytes(len(op_list) == 1),
force=True, # TODO GATT notification subscription should be persistent
)
# Remove item once sent, and keep the non sent item in the list
op_list.pop(0)
+4
View File
@@ -82,3 +82,7 @@ services and characteristics.
# `run_scanner.py`
Run a host application connected to a 'real' BLE controller over a UART HCI to a dev board running Zephyr in HCI mode (could be any other UART BLE controller, or BlueZ over a virtual UART), that starts scanning and prints out the scan results.
# run auracast with usb stick
bumble-auracast transmit 'serial:/dev/serial/by-id/usb-ZEPHYR_Zephyr_HCI_UART_sample_CC69A2912F84AE5E-if00,1000000,rtscts' --input device
+3 -1
View File
@@ -82,7 +82,6 @@ async def hap_client():
)
await devices.setup_connection()
# TODO negotiate MTU > 49 to not truncate preset names
# Mock encryption.
devices.connections[0].encryption = 1 # type: ignore
@@ -93,6 +92,9 @@ async def hap_client():
)
peer = device.Peer(devices.connections[1]) # type: ignore
await peer.request_mtu(49)
peer2 = device.Peer(devices.connections[0]) # type: ignore
await peer2.request_mtu(49)
hap_client = await peer.discover_service_and_create_proxy(
hap.HearingAccessServiceProxy
)