add broadcast code encoding

This commit is contained in:
Gilles Boccon-Gibod
2025-03-17 19:56:02 -04:00
parent b2d9541f8f
commit 5e55c0e358
3 changed files with 39 additions and 26 deletions

View File

@@ -99,6 +99,25 @@ def codec_config_string(
return '\n'.join(indent + line for line in lines)
def broadcast_code_bytes(broadcast_code: str) -> bytes:
"""
Convert a broadcast code string to a 16-byte value.
If `broadcast_code` is `0x` followed by 32 hex characters, it is interpreted as a
raw 16-byte raw broadcast code in big-endian byte order.
Otherwise, `broadcast_code` is converted to a 16-byte value as specified in
BLUETOOTH CORE SPECIFICATION Version 6.0 | Vol 3, Part C , section 3.2.6.3
"""
if broadcast_code.startswith("0x") and len(broadcast_code) == 34:
return bytes.fromhex(broadcast_code[2:])[::-1]
broadcast_code_utf8 = broadcast_code.encode("utf-8")
if len(broadcast_code_utf8) > 16:
raise ValueError("broadcast code must be <= 16 bytes in utf-8 encoding")
padding = bytes(16 - len(broadcast_code_utf8))
return broadcast_code_utf8 + padding
# -----------------------------------------------------------------------------
# Scan For Broadcasts
# -----------------------------------------------------------------------------
@@ -234,22 +253,14 @@ class BroadcastScanner(pyee.EventEmitter):
if self.biginfo:
print(color(' BIG:', 'cyan'))
print(
color(' Number of BIS:', 'magenta'),
self.biginfo.num_bis,
)
print(
color(' PHY: ', 'magenta'),
self.biginfo.phy.name,
)
print(
color(' Framed: ', 'magenta'),
self.biginfo.framed,
)
print(
color(' Encrypted: ', 'magenta'),
self.biginfo.encrypted,
)
print(color(' Number of BIS:', 'magenta'), self.biginfo.num_bis)
print(color(' ISO Interval: ', 'magenta'), self.biginfo.iso_interval)
print(color(' Max PDU: ', 'magenta'), self.biginfo.max_pdu)
print(color(' SDU Interval: ', 'magenta'), self.biginfo.sdu_interval)
print(color(' Max SDU: ', 'magenta'), self.biginfo.max_sdu)
print(color(' PHY: ', 'magenta'), self.biginfo.phy.name)
print(color(' Framed: ', 'magenta'), self.biginfo.framed)
print(color(' Encrypted: ', 'magenta'), self.biginfo.encrypted)
def on_sync_establishment(self) -> None:
self.emit('sync_establishment')
@@ -702,14 +713,13 @@ async def run_receive(
def on_change() -> None:
if (
broadcast.basic_audio_announcement
and not basic_audio_announcement_scanned.is_set()
):
broadcast.basic_audio_announcement and broadcast.biginfo
) and not basic_audio_announcement_scanned.is_set():
basic_audio_announcement_scanned.set()
broadcast.on('change', on_change)
if not broadcast.basic_audio_announcement:
print('Wait for Basic Audio Announcement...')
if not broadcast.basic_audio_announcement or not broadcast.biginfo:
print('Wait for Basic Audio Announcement and BIG Info...')
await basic_audio_announcement_scanned.wait()
print('Basic Audio Announcement found')
broadcast.print()
@@ -730,7 +740,7 @@ async def run_receive(
big_sync_timeout=0x4000,
bis=[bis.index for bis in subgroup.bis],
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
broadcast_code_bytes(broadcast_code) if broadcast_code else None
),
),
)
@@ -944,7 +954,7 @@ async def run_transmit(
max_transport_latency=65,
rtn=4,
broadcast_code=(
bytes.fromhex(broadcast_code) if broadcast_code else None
broadcast_code_bytes(broadcast_code) if broadcast_code else None
),
),
)
@@ -1088,7 +1098,7 @@ def pair(ctx, transport, address):
'--broadcast-code',
metavar='BROADCAST_CODE',
type=str,
help='Broadcast encryption code in hex format',
help='Broadcast encryption code (string or raw hex format prefixed with 0x)',
)
@click.option(
'--sync-timeout',

View File

@@ -40,7 +40,7 @@ class PublicBroadcastAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
features = cls.Features(data[0])
metadata_length = data[1]
metadata_ltv = data[1 : 1 + metadata_length]
metadata_ltv = data[2 : 2 + metadata_length]
return cls(
features=features, metadata=le_audio.Metadata.from_bytes(metadata_ltv)
)

View File

@@ -302,7 +302,10 @@ class ParserSource(BaseSource):
# -----------------------------------------------------------------------------
class StreamPacketSource(asyncio.Protocol, ParserSource):
def data_received(self, data: bytes) -> None:
self.parser.feed_data(data)
try:
self.parser.feed_data(data)
except core.InvalidPacketError:
logger.warning("invalid packet, ignoring data")
# -----------------------------------------------------------------------------