Improve Broadcast Scanning

This commit is contained in:
Josh Wu
2024-11-15 17:57:13 +08:00
parent 5a72eefb89
commit c88b32a406
7 changed files with 233 additions and 61 deletions

View File

@@ -60,7 +60,7 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass
class Broadcast(pyee.EventEmitter):
name: str
name: str | None
sync: bumble.device.PeriodicAdvertisingSync
rssi: int = 0
public_broadcast_announcement: Optional[
@@ -135,7 +135,8 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address,
color(self.sync.state.name, 'green'),
)
print(f' {color("Name", "cyan")}: {self.name}')
if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -174,7 +175,7 @@ class BroadcastScanner(pyee.EventEmitter):
print(color(' Codec ID:', 'yellow'))
print(
color(' Coding Format: ', 'green'),
subgroup.codec_id.coding_format.name,
subgroup.codec_id.codec_id.name,
)
print(
color(' Company ID: ', 'green'),
@@ -274,13 +275,24 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if (
broadcast_name := advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
if not (
ads := advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
)
) is None:
) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
)
):
return
assert isinstance(broadcast_name, str)
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
assert isinstance(broadcast_name, str) or broadcast_name is None
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -291,7 +303,7 @@ class BroadcastScanner(pyee.EventEmitter):
)
async def on_new_broadcast(
self, name: str, advertisement: bumble.device.Advertisement
self, name: str | None, advertisement: bumble.device.Advertisement
) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address,
@@ -299,10 +311,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates,
)
broadcast = self.Broadcast(
name,
periodic_advertising_sync,
)
broadcast = self.Broadcast(name, periodic_advertising_sync)
broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))

View File

@@ -486,7 +486,12 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
pcm = decode(
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
@@ -495,11 +500,17 @@ class Speaker:
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
@@ -514,10 +525,17 @@ class Speaker:
),
)
else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
if ase.role == ascs.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,

View File

@@ -1624,6 +1624,9 @@ class AdvertisingData:
[bytes([len(x[1]) + 1, x[0]]) + x[1] for x in self.ad_structures]
)
def to_bytes(self) -> bytes:
return bytes(self)
def to_string(self, separator=', '):
return separator.join(
[AdvertisingData.ad_data_to_string(x[0], x[1]) for x in self.ad_structures]

View File

@@ -1482,7 +1482,7 @@ class CodingFormat:
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
@@ -1492,6 +1492,10 @@ class CodingFormat:
vendor_specific_codec_id=vendor_specific_codec_id,
)
@classmethod
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id

View File

@@ -398,18 +398,21 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency
frame_duration: FrameDuration
audio_channel_allocation: AudioLocation
octets_per_codec_frame: int
codec_frames_per_sdu: int
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0
# Allowed default values.
audio_channel_allocation = AudioLocation.NOT_ALLOWED
codec_frames_per_sdu = 1
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
@@ -427,8 +430,6 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,
@@ -438,23 +439,43 @@ class CodecSpecificConfiguration:
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBBBBBBBIBBHBBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
return b''.join(
[
struct.pack(fmt, length, tag, value)
for fmt, length, tag, value in [
(
'<BBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
),
(
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
)
@@ -466,6 +487,25 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little'))
def to_bytes(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def __bytes__(self) -> bytes:
return self.to_bytes()
def get_advertising_data(self) -> bytes:
return core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes()
+ self.to_bytes()
),
)
]
).to_bytes()
@dataclasses.dataclass
class BasicAudioAnnouncement:
@@ -474,26 +514,43 @@ class BasicAudioAnnouncement:
index: int
codec_specific_configuration: CodecSpecificConfiguration
@dataclasses.dataclass
class CodecInfo:
coding_format: hci.CodecID
company_id: int
vendor_specific_codec_id: int
def to_bytes(self) -> bytes:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([self.index, len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
)
@classmethod
def from_bytes(cls, data: bytes) -> Self:
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
def __bytes__(self) -> bytes:
return self.to_bytes()
@dataclasses.dataclass
class Subgroup:
codec_id: BasicAudioAnnouncement.CodecInfo
codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
def to_bytes(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ self.codec_id.to_bytes()
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
def __bytes__(self) -> bytes:
return self.to_bytes()
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -505,7 +562,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]):
num_bis = data[offset]
offset += 1
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
offset += 5
codec_specific_configuration_length = data[offset]
offset += 1
@@ -549,3 +606,26 @@ class BasicAudioAnnouncement:
)
return cls(presentation_delay, subgroups)
def to_bytes(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def __bytes__(self) -> bytes:
return self.to_bytes()
def get_advertising_data(self) -> bytes:
return core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes()
+ self.to_bytes()
),
)
]
).to_bytes()

View File

@@ -161,7 +161,13 @@ async def main() -> None:
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration)
if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
# Write a LC3 header.
file_output.write(
bytes([0x1C, 0xCC]) # Header.

View File

@@ -39,6 +39,8 @@ from bumble.profiles.ascs import (
)
from bumble.profiles.bap import (
AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration,
SupportedSamplingFrequency,
SamplingFrequency,
@@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():