refactor broadcast_source
This commit is contained in:
@@ -23,8 +23,6 @@ import dataclasses
|
||||
import functools
|
||||
import logging
|
||||
import os
|
||||
import wave
|
||||
import itertools
|
||||
from typing import cast, Any, AsyncGenerator, Coroutine, Dict, Optional, Tuple
|
||||
from scipy import signal
|
||||
import scipy.io.wavfile as wav
|
||||
@@ -64,328 +62,16 @@ AURACAST_DEFAULT_DEVICE_NAME = 'Bumble Auracast'
|
||||
AURACAST_DEFAULT_DEVICE_ADDRESS = hci.Address('F0:F1:F2:F3:F4:F5')
|
||||
AURACAST_DEFAULT_SYNC_TIMEOUT = 5.0
|
||||
AURACAST_DEFAULT_ATT_MTU = 256
|
||||
iso_index:int = 0
|
||||
|
||||
AURACAST_DEFAULT_SAMPLING_FREQUENCY = 48000
|
||||
AURACAST_DEFAULT_FRAME_DURATION = 10000
|
||||
iso_index: int = 0
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Scan For Broadcasts
|
||||
# -----------------------------------------------------------------------------
|
||||
class BroadcastScanner(pyee.EventEmitter):
|
||||
@dataclasses.dataclass
|
||||
class Broadcast(pyee.EventEmitter):
|
||||
name: str | None
|
||||
sync: bumble.device.PeriodicAdvertisingSync
|
||||
broadcast_id: int
|
||||
rssi: int = 0
|
||||
public_broadcast_announcement: Optional[pbp.PublicBroadcastAnnouncement] = None
|
||||
broadcast_audio_announcement: Optional[bap.BroadcastAudioAnnouncement] = None
|
||||
basic_audio_announcement: Optional[bap.BasicAudioAnnouncement] = None
|
||||
appearance: Optional[core.Appearance] = None
|
||||
biginfo: Optional[bumble.device.BIGInfoAdvertisement] = None
|
||||
manufacturer_data: Optional[Tuple[str, bytes]] = None
|
||||
|
||||
def __post_init__(self) -> None:
|
||||
super().__init__()
|
||||
self.sync.on('establishment', self.on_sync_establishment)
|
||||
self.sync.on('loss', self.on_sync_loss)
|
||||
self.sync.on('periodic_advertisement', self.on_periodic_advertisement)
|
||||
self.sync.on('biginfo_advertisement', self.on_biginfo_advertisement)
|
||||
|
||||
def update(self, advertisement: bumble.device.Advertisement) -> None:
|
||||
self.rssi = advertisement.rssi
|
||||
for service_data in advertisement.data.get_all(
|
||||
core.AdvertisingData.SERVICE_DATA
|
||||
):
|
||||
assert isinstance(service_data, tuple)
|
||||
service_uuid, data = service_data
|
||||
assert isinstance(data, bytes)
|
||||
|
||||
if service_uuid == gatt.GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE:
|
||||
self.public_broadcast_announcement = (
|
||||
pbp.PublicBroadcastAnnouncement.from_bytes(data)
|
||||
)
|
||||
continue
|
||||
|
||||
if service_uuid == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE:
|
||||
self.broadcast_audio_announcement = (
|
||||
bap.BroadcastAudioAnnouncement.from_bytes(data)
|
||||
)
|
||||
continue
|
||||
|
||||
self.appearance = advertisement.data.get( # type: ignore[assignment]
|
||||
core.AdvertisingData.APPEARANCE
|
||||
)
|
||||
|
||||
if manufacturer_data := advertisement.data.get(
|
||||
core.AdvertisingData.MANUFACTURER_SPECIFIC_DATA
|
||||
):
|
||||
assert isinstance(manufacturer_data, tuple)
|
||||
company_id = cast(int, manufacturer_data[0])
|
||||
data = cast(bytes, manufacturer_data[1])
|
||||
self.manufacturer_data = (
|
||||
company_ids.COMPANY_IDENTIFIERS.get(
|
||||
company_id, f'0x{company_id:04X}'
|
||||
),
|
||||
data,
|
||||
)
|
||||
|
||||
self.emit('update')
|
||||
|
||||
def print(self) -> None:
|
||||
print(
|
||||
color('Broadcast:', 'yellow'),
|
||||
self.sync.advertiser_address,
|
||||
color(self.sync.state.name, 'green'),
|
||||
)
|
||||
if self.name is not None:
|
||||
print(f' {color("Name", "cyan")}: {self.name}')
|
||||
if self.appearance:
|
||||
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
|
||||
print(f' {color("RSSI", "cyan")}: {self.rssi}')
|
||||
print(f' {color("SID", "cyan")}: {self.sync.sid}')
|
||||
|
||||
if self.manufacturer_data:
|
||||
print(
|
||||
f' {color("Manufacturer Data", "cyan")}: '
|
||||
f'{self.manufacturer_data[0]} -> {self.manufacturer_data[1].hex()}'
|
||||
)
|
||||
|
||||
if self.broadcast_audio_announcement:
|
||||
print(
|
||||
f' {color("Broadcast ID", "cyan")}: '
|
||||
f'{self.broadcast_audio_announcement.broadcast_id}'
|
||||
)
|
||||
|
||||
if self.public_broadcast_announcement:
|
||||
print(
|
||||
f' {color("Features", "cyan")}: '
|
||||
f'{self.public_broadcast_announcement.features}'
|
||||
)
|
||||
print(
|
||||
f' {color("Metadata", "cyan")}: '
|
||||
f'{self.public_broadcast_announcement.metadata}'
|
||||
)
|
||||
|
||||
if self.basic_audio_announcement:
|
||||
print(color(' Audio:', 'cyan'))
|
||||
print(
|
||||
color(' Presentation Delay:', 'magenta'),
|
||||
self.basic_audio_announcement.presentation_delay,
|
||||
)
|
||||
for subgroup in self.basic_audio_announcement.subgroups:
|
||||
print(color(' Subgroup:', 'magenta'))
|
||||
print(color(' Codec ID:', 'yellow'))
|
||||
print(
|
||||
color(' Coding Format: ', 'green'),
|
||||
subgroup.codec_id.codec_id.name,
|
||||
)
|
||||
print(
|
||||
color(' Company ID: ', 'green'),
|
||||
subgroup.codec_id.company_id,
|
||||
)
|
||||
print(
|
||||
color(' Vendor Specific Codec ID:', 'green'),
|
||||
subgroup.codec_id.vendor_specific_codec_id,
|
||||
)
|
||||
print(
|
||||
color(' Codec Config:', 'yellow'),
|
||||
subgroup.codec_specific_configuration,
|
||||
)
|
||||
print(color(' Metadata: ', 'yellow'), subgroup.metadata)
|
||||
|
||||
for bis in subgroup.bis:
|
||||
print(color(f' BIS [{bis.index}]:', 'yellow'))
|
||||
print(
|
||||
color(' Codec Config:', 'green'),
|
||||
bis.codec_specific_configuration,
|
||||
)
|
||||
|
||||
if self.biginfo:
|
||||
print(color(' BIG:', 'cyan'))
|
||||
print(
|
||||
color(' Number of BIS:', 'magenta'),
|
||||
self.biginfo.num_bis,
|
||||
)
|
||||
print(
|
||||
color(' PHY: ', 'magenta'),
|
||||
self.biginfo.phy.name,
|
||||
)
|
||||
print(
|
||||
color(' Framed: ', 'magenta'),
|
||||
self.biginfo.framed,
|
||||
)
|
||||
print(
|
||||
color(' Encrypted: ', 'magenta'),
|
||||
self.biginfo.encrypted,
|
||||
)
|
||||
|
||||
def on_sync_establishment(self) -> None:
|
||||
self.emit('sync_establishment')
|
||||
|
||||
def on_sync_loss(self) -> None:
|
||||
self.basic_audio_announcement = None
|
||||
self.biginfo = None
|
||||
self.emit('sync_loss')
|
||||
|
||||
def on_periodic_advertisement(
|
||||
self, advertisement: bumble.device.PeriodicAdvertisement
|
||||
) -> None:
|
||||
if advertisement.data is None:
|
||||
return
|
||||
|
||||
for service_data in advertisement.data.get_all(
|
||||
core.AdvertisingData.SERVICE_DATA
|
||||
):
|
||||
assert isinstance(service_data, tuple)
|
||||
service_uuid, data = service_data
|
||||
assert isinstance(data, bytes)
|
||||
|
||||
if service_uuid == gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE:
|
||||
self.basic_audio_announcement = (
|
||||
bap.BasicAudioAnnouncement.from_bytes(data)
|
||||
)
|
||||
break
|
||||
|
||||
self.emit('change')
|
||||
|
||||
def on_biginfo_advertisement(
|
||||
self, advertisement: bumble.device.BIGInfoAdvertisement
|
||||
) -> None:
|
||||
self.biginfo = advertisement
|
||||
self.emit('change')
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
device: bumble.device.Device,
|
||||
filter_duplicates: bool,
|
||||
sync_timeout: float,
|
||||
):
|
||||
super().__init__()
|
||||
self.device = device
|
||||
self.filter_duplicates = filter_duplicates
|
||||
self.sync_timeout = sync_timeout
|
||||
self.broadcasts = dict[hci.Address, BroadcastScanner.Broadcast]()
|
||||
device.on('advertisement', self.on_advertisement)
|
||||
|
||||
async def start(self) -> None:
|
||||
await self.device.start_scanning(
|
||||
active=False,
|
||||
filter_duplicates=False,
|
||||
)
|
||||
|
||||
async def stop(self) -> None:
|
||||
await self.device.stop_scanning()
|
||||
|
||||
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
|
||||
if not (
|
||||
ads := advertisement.data.get_all(
|
||||
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
|
||||
)
|
||||
) or not (
|
||||
broadcast_audio_announcement := next(
|
||||
(
|
||||
ad
|
||||
for ad in ads
|
||||
if isinstance(ad, tuple)
|
||||
and ad[0] == gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
|
||||
),
|
||||
None,
|
||||
)
|
||||
):
|
||||
return
|
||||
|
||||
broadcast_name = advertisement.data.get(core.AdvertisingData.BROADCAST_NAME)
|
||||
assert isinstance(broadcast_name, str) or broadcast_name is None
|
||||
assert isinstance(broadcast_audio_announcement[1], bytes)
|
||||
|
||||
if broadcast := self.broadcasts.get(advertisement.address):
|
||||
broadcast.update(advertisement)
|
||||
return
|
||||
|
||||
bumble.utils.AsyncRunner.spawn(
|
||||
self.on_new_broadcast(
|
||||
broadcast_name,
|
||||
advertisement,
|
||||
bap.BroadcastAudioAnnouncement.from_bytes(
|
||||
broadcast_audio_announcement[1]
|
||||
).broadcast_id,
|
||||
)
|
||||
)
|
||||
|
||||
async def on_new_broadcast(
|
||||
self,
|
||||
name: str | None,
|
||||
advertisement: bumble.device.Advertisement,
|
||||
broadcast_id: int,
|
||||
) -> None:
|
||||
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
|
||||
advertiser_address=advertisement.address,
|
||||
sid=advertisement.sid,
|
||||
sync_timeout=self.sync_timeout,
|
||||
filter_duplicates=self.filter_duplicates,
|
||||
)
|
||||
broadcast = self.Broadcast(name, periodic_advertising_sync, broadcast_id)
|
||||
broadcast.update(advertisement)
|
||||
self.broadcasts[advertisement.address] = broadcast
|
||||
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))
|
||||
self.emit('new_broadcast', broadcast)
|
||||
|
||||
def on_broadcast_loss(self, broadcast: Broadcast) -> None:
|
||||
del self.broadcasts[broadcast.sync.advertiser_address]
|
||||
bumble.utils.AsyncRunner.spawn(broadcast.sync.terminate())
|
||||
self.emit('broadcast_loss', broadcast)
|
||||
|
||||
|
||||
class PrintingBroadcastScanner(pyee.EventEmitter):
|
||||
def __init__(
|
||||
self, device: bumble.device.Device, filter_duplicates: bool, sync_timeout: float
|
||||
) -> None:
|
||||
super().__init__()
|
||||
self.scanner = BroadcastScanner(device, filter_duplicates, sync_timeout)
|
||||
self.scanner.on('new_broadcast', self.on_new_broadcast)
|
||||
self.scanner.on('broadcast_loss', self.on_broadcast_loss)
|
||||
self.scanner.on('update', self.refresh)
|
||||
self.status_message = ''
|
||||
|
||||
async def start(self) -> None:
|
||||
self.status_message = color('Scanning...', 'green')
|
||||
await self.scanner.start()
|
||||
|
||||
def on_new_broadcast(self, broadcast: BroadcastScanner.Broadcast) -> None:
|
||||
self.status_message = color(
|
||||
f'+Found {len(self.scanner.broadcasts)} broadcasts', 'green'
|
||||
)
|
||||
broadcast.on('change', self.refresh)
|
||||
broadcast.on('update', self.refresh)
|
||||
self.refresh()
|
||||
|
||||
def on_broadcast_loss(self, broadcast: BroadcastScanner.Broadcast) -> None:
|
||||
self.status_message = color(
|
||||
f'-Found {len(self.scanner.broadcasts)} broadcasts', 'green'
|
||||
)
|
||||
self.refresh()
|
||||
|
||||
def refresh(self) -> None:
|
||||
# Clear the screen from the top
|
||||
print('\033[H')
|
||||
print('\033[0J')
|
||||
print('\033[H')
|
||||
|
||||
# Print the status message
|
||||
print(self.status_message)
|
||||
print("==========================================")
|
||||
|
||||
# Print all broadcasts
|
||||
for broadcast in self.scanner.broadcasts.values():
|
||||
broadcast.print()
|
||||
print('------------------------------------------')
|
||||
|
||||
# Clear the screen to the bottom
|
||||
print('\033[0J')
|
||||
|
||||
|
||||
@contextlib.asynccontextmanager
|
||||
async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device, Any]:
|
||||
transport = "serial:" +transport
|
||||
async with await bumble.transport.open_transport(transport) as (
|
||||
hci_source,
|
||||
hci_sink,
|
||||
@@ -395,7 +81,8 @@ async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device,
|
||||
address=AURACAST_DEFAULT_DEVICE_ADDRESS,
|
||||
keystore='JsonKeyStore',
|
||||
)
|
||||
|
||||
|
||||
|
||||
device = bumble.device.Device.from_config_with_hci(
|
||||
device_config,
|
||||
hci_source,
|
||||
@@ -406,329 +93,12 @@ async def create_device(transport: str) -> AsyncGenerator[bumble.device.Device,
|
||||
yield device
|
||||
|
||||
|
||||
async def find_broadcast_by_name(
|
||||
device: bumble.device.Device, name: Optional[str]
|
||||
) -> BroadcastScanner.Broadcast:
|
||||
result = asyncio.get_running_loop().create_future()
|
||||
|
||||
def on_broadcast_change(broadcast: BroadcastScanner.Broadcast) -> None:
|
||||
if broadcast.basic_audio_announcement and not result.done():
|
||||
print(color('Broadcast basic audio announcement received', 'green'))
|
||||
result.set_result(broadcast)
|
||||
|
||||
def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None:
|
||||
if name is None or broadcast.name == name:
|
||||
print(color('Broadcast found:', 'green'), broadcast.name)
|
||||
broadcast.on('change', lambda: on_broadcast_change(broadcast))
|
||||
return
|
||||
|
||||
print(color(f'Skipping broadcast {broadcast.name}'))
|
||||
|
||||
scanner = BroadcastScanner(device, False, AURACAST_DEFAULT_SYNC_TIMEOUT)
|
||||
scanner.on('new_broadcast', on_new_broadcast)
|
||||
await scanner.start()
|
||||
|
||||
broadcast = await result
|
||||
await scanner.stop()
|
||||
|
||||
return broadcast
|
||||
|
||||
|
||||
async def run_scan(
|
||||
filter_duplicates: bool, sync_timeout: float, transport: str
|
||||
) -> None:
|
||||
async with create_device(transport) as device:
|
||||
if not device.supports_le_periodic_advertising:
|
||||
print(color('Periodic advertising not supported', 'red'))
|
||||
return
|
||||
|
||||
scanner = PrintingBroadcastScanner(device, filter_duplicates, sync_timeout)
|
||||
await scanner.start()
|
||||
await asyncio.get_running_loop().create_future()
|
||||
|
||||
|
||||
async def run_assist(
|
||||
broadcast_name: Optional[str],
|
||||
source_id: Optional[int],
|
||||
command: str,
|
||||
transport: str,
|
||||
address: str,
|
||||
) -> None:
|
||||
async with create_device(transport) as device:
|
||||
if not device.supports_le_periodic_advertising:
|
||||
print(color('Periodic advertising not supported', 'red'))
|
||||
return
|
||||
|
||||
# Connect to the server
|
||||
print(f'=== Connecting to {address}...')
|
||||
connection = await device.connect(address)
|
||||
peer = bumble.device.Peer(connection)
|
||||
print(f'=== Connected to {peer}')
|
||||
|
||||
print("+++ Encrypting connection...")
|
||||
await peer.connection.encrypt()
|
||||
print("+++ Connection encrypted")
|
||||
|
||||
# Request a larger MTU
|
||||
mtu = AURACAST_DEFAULT_ATT_MTU
|
||||
print(color(f'$$$ Requesting MTU={mtu}', 'yellow'))
|
||||
await peer.request_mtu(mtu)
|
||||
|
||||
# Get the BASS service
|
||||
bass_client = await peer.discover_service_and_create_proxy(
|
||||
bass.BroadcastAudioScanServiceProxy
|
||||
)
|
||||
|
||||
# Check that the service was found
|
||||
if not bass_client:
|
||||
print(color('!!! Broadcast Audio Scan Service not found', 'red'))
|
||||
return
|
||||
|
||||
# Subscribe to and read the broadcast receive state characteristics
|
||||
for i, broadcast_receive_state in enumerate(
|
||||
bass_client.broadcast_receive_states
|
||||
):
|
||||
try:
|
||||
await broadcast_receive_state.subscribe(
|
||||
lambda value, i=i: print(
|
||||
f"{color(f'Broadcast Receive State Update [{i}]:', 'green')} {value}"
|
||||
)
|
||||
)
|
||||
except core.ProtocolError as error:
|
||||
print(
|
||||
color(
|
||||
f'!!! Failed to subscribe to Broadcast Receive State characteristic:',
|
||||
'red',
|
||||
),
|
||||
error,
|
||||
)
|
||||
value = await broadcast_receive_state.read_value()
|
||||
print(
|
||||
f'{color(f"Initial Broadcast Receive State [{i}]:", "green")} {value}'
|
||||
)
|
||||
|
||||
if command == 'monitor-state':
|
||||
await peer.sustain()
|
||||
return
|
||||
|
||||
if command == 'add-source':
|
||||
# Find the requested broadcast
|
||||
await bass_client.remote_scan_started()
|
||||
if broadcast_name:
|
||||
print(color('Scanning for broadcast:', 'cyan'), broadcast_name)
|
||||
else:
|
||||
print(color('Scanning for any broadcast', 'cyan'))
|
||||
broadcast = await find_broadcast_by_name(device, broadcast_name)
|
||||
|
||||
if broadcast.broadcast_audio_announcement is None:
|
||||
print(color('No broadcast audio announcement found', 'red'))
|
||||
return
|
||||
|
||||
if (
|
||||
broadcast.basic_audio_announcement is None
|
||||
or not broadcast.basic_audio_announcement.subgroups
|
||||
):
|
||||
print(color('No subgroups found', 'red'))
|
||||
return
|
||||
|
||||
# Add the source
|
||||
print(color('Adding source:', 'blue'), broadcast.sync.advertiser_address)
|
||||
await bass_client.add_source(
|
||||
broadcast.sync.advertiser_address,
|
||||
broadcast.sync.sid,
|
||||
broadcast.broadcast_audio_announcement.broadcast_id,
|
||||
bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_AVAILABLE,
|
||||
0xFFFF,
|
||||
[
|
||||
bass.SubgroupInfo(
|
||||
bass.SubgroupInfo.ANY_BIS,
|
||||
bytes(broadcast.basic_audio_announcement.subgroups[0].metadata),
|
||||
)
|
||||
],
|
||||
)
|
||||
|
||||
# Initiate a PA Sync Transfer
|
||||
await broadcast.sync.transfer(peer.connection)
|
||||
|
||||
# Notify the sink that we're done scanning.
|
||||
await bass_client.remote_scan_stopped()
|
||||
|
||||
await peer.sustain()
|
||||
return
|
||||
|
||||
if command == 'modify-source':
|
||||
if source_id is None:
|
||||
print(color('!!! modify-source requires --source-id'))
|
||||
return
|
||||
|
||||
# Find the requested broadcast
|
||||
await bass_client.remote_scan_started()
|
||||
if broadcast_name:
|
||||
print(color('Scanning for broadcast:', 'cyan'), broadcast_name)
|
||||
else:
|
||||
print(color('Scanning for any broadcast', 'cyan'))
|
||||
broadcast = await find_broadcast_by_name(device, broadcast_name)
|
||||
|
||||
if broadcast.broadcast_audio_announcement is None:
|
||||
print(color('No broadcast audio announcement found', 'red'))
|
||||
return
|
||||
|
||||
if (
|
||||
broadcast.basic_audio_announcement is None
|
||||
or not broadcast.basic_audio_announcement.subgroups
|
||||
):
|
||||
print(color('No subgroups found', 'red'))
|
||||
return
|
||||
|
||||
# Modify the source
|
||||
print(
|
||||
color('Modifying source:', 'blue'),
|
||||
source_id,
|
||||
)
|
||||
await bass_client.modify_source(
|
||||
source_id,
|
||||
bass.PeriodicAdvertisingSyncParams.SYNCHRONIZE_TO_PA_PAST_NOT_AVAILABLE,
|
||||
0xFFFF,
|
||||
[
|
||||
bass.SubgroupInfo(
|
||||
bass.SubgroupInfo.ANY_BIS,
|
||||
bytes(broadcast.basic_audio_announcement.subgroups[0].metadata),
|
||||
)
|
||||
],
|
||||
)
|
||||
await peer.sustain()
|
||||
return
|
||||
|
||||
if command == 'remove-source':
|
||||
if source_id is None:
|
||||
print(color('!!! remove-source requires --source-id'))
|
||||
return
|
||||
|
||||
# Remove the source
|
||||
print(color('Removing source:', 'blue'), source_id)
|
||||
await bass_client.remove_source(source_id)
|
||||
await peer.sustain()
|
||||
return
|
||||
|
||||
print(color(f'!!! invalid command {command}'))
|
||||
|
||||
|
||||
async def run_pair(transport: str, address: str) -> None:
|
||||
async with create_device(transport) as device:
|
||||
|
||||
# Connect to the server
|
||||
print(f'=== Connecting to {address}...')
|
||||
async with device.connect_as_gatt(address) as peer:
|
||||
print(f'=== Connected to {peer}')
|
||||
|
||||
print("+++ Initiating pairing...")
|
||||
await peer.connection.pair()
|
||||
print("+++ Paired")
|
||||
|
||||
|
||||
async def run_receive(
|
||||
transport: str,
|
||||
broadcast_id: int,
|
||||
broadcast_code: str | None,
|
||||
sync_timeout: float,
|
||||
subgroup_index: int,
|
||||
) -> None:
|
||||
async with create_device(transport) as device:
|
||||
if not device.supports_le_periodic_advertising:
|
||||
print(color('Periodic advertising not supported', 'red'))
|
||||
return
|
||||
|
||||
scanner = BroadcastScanner(device, False, sync_timeout)
|
||||
scan_result: asyncio.Future[BroadcastScanner.Broadcast] = (
|
||||
asyncio.get_running_loop().create_future()
|
||||
)
|
||||
|
||||
def on_new_broadcast(broadcast: BroadcastScanner.Broadcast) -> None:
|
||||
if scan_result.done():
|
||||
return
|
||||
if broadcast.broadcast_id == broadcast_id:
|
||||
scan_result.set_result(broadcast)
|
||||
|
||||
scanner.on('new_broadcast', on_new_broadcast)
|
||||
await scanner.start()
|
||||
print('Start scanning...')
|
||||
broadcast = await scan_result
|
||||
print('Advertisement found:')
|
||||
broadcast.print()
|
||||
basic_audio_announcement_scanned = asyncio.Event()
|
||||
|
||||
def on_change() -> None:
|
||||
if (
|
||||
broadcast.basic_audio_announcement
|
||||
and not basic_audio_announcement_scanned.is_set()
|
||||
):
|
||||
basic_audio_announcement_scanned.set()
|
||||
|
||||
broadcast.on('change', on_change)
|
||||
if not broadcast.basic_audio_announcement:
|
||||
print('Wait for Basic Audio Announcement...')
|
||||
await basic_audio_announcement_scanned.wait()
|
||||
print('Basic Audio Announcement found')
|
||||
broadcast.print()
|
||||
print('Stop scanning')
|
||||
await scanner.stop()
|
||||
print('Start sync to BIG')
|
||||
|
||||
assert broadcast.basic_audio_announcement
|
||||
subgroup = broadcast.basic_audio_announcement.subgroups[subgroup_index]
|
||||
configuration = subgroup.codec_specific_configuration
|
||||
assert configuration
|
||||
assert (sampling_frequency := configuration.sampling_frequency)
|
||||
assert (frame_duration := configuration.frame_duration)
|
||||
|
||||
big_sync = await device.create_big_sync(
|
||||
broadcast.sync,
|
||||
bumble.device.BigSyncParameters(
|
||||
big_sync_timeout=0x4000,
|
||||
bis=[bis.index for bis in subgroup.bis],
|
||||
broadcast_code=(
|
||||
bytes.fromhex(broadcast_code) if broadcast_code else None
|
||||
),
|
||||
),
|
||||
)
|
||||
num_bis = len(big_sync.bis_links)
|
||||
decoder = lc3.Decoder(
|
||||
frame_duration_us=frame_duration.us,
|
||||
sample_rate_hz=sampling_frequency.hz,
|
||||
num_channels=num_bis,
|
||||
)
|
||||
sdus = [b''] * num_bis
|
||||
subprocess = await asyncio.create_subprocess_shell(
|
||||
f'stdbuf -i0 ffplay -ar {sampling_frequency.hz} -ac {num_bis} -f f32le pipe:0',
|
||||
stdin=asyncio.subprocess.PIPE,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE,
|
||||
)
|
||||
for i, bis_link in enumerate(big_sync.bis_links):
|
||||
print(f'Setup ISO for BIS {bis_link.handle}')
|
||||
|
||||
def sink(index: int, packet: hci.HCI_IsoDataPacket):
|
||||
nonlocal sdus
|
||||
sdus[index] = packet.iso_sdu_fragment
|
||||
if all(sdus) and subprocess.stdin:
|
||||
subprocess.stdin.write(decoder.decode(b''.join(sdus)).tobytes())
|
||||
sdus = [b''] * num_bis
|
||||
|
||||
bis_link.sink = functools.partial(sink, i)
|
||||
await bis_link.setup_data_path(
|
||||
direction=bis_link.Direction.CONTROLLER_TO_HOST
|
||||
)
|
||||
|
||||
terminated = asyncio.Event()
|
||||
big_sync.on(big_sync.Event.TERMINATION, lambda _: terminated.set())
|
||||
await terminated.wait()
|
||||
|
||||
|
||||
async def run_broadcast(
|
||||
transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str
|
||||
transport: str, broadcast_id: int, broadcast_code: str | None, wav_file_path: str | None
|
||||
) -> None:
|
||||
|
||||
TEST_SINE = 0
|
||||
|
||||
TEST_SINE = 1 if wav_file_path is None else 0
|
||||
print(f'Test Sine: {TEST_SINE}')
|
||||
encoder = LeAudioEncoder()
|
||||
async with create_device(transport) as device:
|
||||
if not device.supports_le_periodic_advertising:
|
||||
@@ -739,27 +109,29 @@ async def run_broadcast(
|
||||
f = open("log.btsnoop", "wb")
|
||||
Snooper = BtSnooper(f)
|
||||
device.host.snooper = Snooper
|
||||
encoder.setup_encoders(48000,10000,1)
|
||||
frames = list[bytes]()
|
||||
sine = generate_sine_data(1000,48000,0.01)
|
||||
print(len(sine))
|
||||
|
||||
sample_size = 480
|
||||
print(wav_file_path)
|
||||
upsampled_left_channel = read_wav_file(wav_file_path,48000)
|
||||
num_runs = len(upsampled_left_channel) // sample_size
|
||||
TEST_SINE = 0
|
||||
for i in range(num_runs):
|
||||
|
||||
if TEST_SINE == 0:
|
||||
pcm = upsampled_left_channel[i *
|
||||
sample_size:i*sample_size+sample_size]
|
||||
iso = encoder.encode(100,1,1,bytes(pcm))
|
||||
else:
|
||||
iso = encoder.encode(100,1,1,sine)
|
||||
|
||||
frames.append(iso)
|
||||
|
||||
# setup Lc3 encoder
|
||||
encoder.setup_encoders(48000, 10000, 1)
|
||||
|
||||
frames = list[bytes]()
|
||||
|
||||
if TEST_SINE == 1:
|
||||
data_to_encode = generate_sine_data(1000, 48000, 0.01)
|
||||
num_runs = 2000
|
||||
else:
|
||||
sample_size = 480
|
||||
data_to_encode = read_wav_file(wav_file_path, 48000)
|
||||
num_runs = len(data_to_encode) // sample_size
|
||||
|
||||
for i in range(num_runs):
|
||||
if TEST_SINE == 0:
|
||||
pcm = data_to_encode[i * sample_size:i*sample_size+sample_size]
|
||||
iso = encoder.encode(100, 1, 1, bytes(pcm))
|
||||
else:
|
||||
iso = encoder.encode(100, 1, 1, data_to_encode)
|
||||
|
||||
frames.append(iso)
|
||||
|
||||
del encoder
|
||||
print('Encoding complete.')
|
||||
|
||||
@@ -794,7 +166,8 @@ async def run_broadcast(
|
||||
)
|
||||
],
|
||||
)
|
||||
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(broadcast_id)
|
||||
broadcast_audio_announcement = bap.BroadcastAudioAnnouncement(
|
||||
broadcast_id)
|
||||
print('Start Advertising')
|
||||
advertising_set = await device.create_advertising_set(
|
||||
advertising_parameters=bumble.device.AdvertisingParameters(
|
||||
@@ -837,7 +210,6 @@ async def run_broadcast(
|
||||
),
|
||||
)
|
||||
print('Setup ISO Data Path')
|
||||
|
||||
for bis_link in big.bis_links:
|
||||
await bis_link.setup_data_path(
|
||||
direction=bis_link.Direction.HOST_TO_CONTROLLER
|
||||
@@ -845,20 +217,18 @@ async def run_broadcast(
|
||||
|
||||
dummy_frame = bytearray(100)
|
||||
big.bis_links[0].write(dummy_frame)
|
||||
#big.bis_links[1].write(dummy_frame)
|
||||
# big.bis_links[1].write(dummy_frame)
|
||||
|
||||
def on_iso_pdu_sent(event):
|
||||
|
||||
global iso_index
|
||||
print('ISO PDU sent')
|
||||
big.bis_links[0].write(frames[iso_index])
|
||||
iso_index +=1
|
||||
if iso_index == len(frames):
|
||||
iso_index += 1
|
||||
if iso_index == len(frames):
|
||||
iso_index = 0
|
||||
|
||||
|
||||
device.host.on('packet_complete',on_iso_pdu_sent)
|
||||
print('Start sending frames')
|
||||
device.host.on('packet_complete', on_iso_pdu_sent)
|
||||
print('Start sending frames ...')
|
||||
while True:
|
||||
await asyncio.sleep(1)
|
||||
|
||||
@@ -884,8 +254,16 @@ def run_async(async_command: Coroutine) -> None:
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@click.command()
|
||||
@click.option('transport','-p', type=str)
|
||||
@click.option('wav_file_path','-w' , type=str)
|
||||
@click.argument(
|
||||
'port',
|
||||
metavar='com port',
|
||||
type=str,
|
||||
)
|
||||
@click.option(
|
||||
'--wav',
|
||||
metavar='WAV_FILE_PATH',
|
||||
type=str,
|
||||
help='Wav file path',)
|
||||
@click.option(
|
||||
'--broadcast_id',
|
||||
metavar='BROADCAST_ID',
|
||||
@@ -899,23 +277,23 @@ def run_async(async_command: Coroutine) -> None:
|
||||
type=str,
|
||||
help='Broadcast encryption code in hex format',
|
||||
)
|
||||
|
||||
def broadcast(transport, broadcast_id, broadcast_code, wav_file_path):
|
||||
def broadcast(port, broadcast_id, broadcast_code, wav):
|
||||
"""Start a broadcast as a source."""
|
||||
#ctx.ensure_object(dict)
|
||||
# ctx.ensure_object(dict)
|
||||
run_async(
|
||||
run_broadcast(
|
||||
transport=transport,
|
||||
transport=port,
|
||||
broadcast_id=broadcast_id,
|
||||
broadcast_code=broadcast_code,
|
||||
wav_file_path=wav_file_path,
|
||||
wav_file_path=wav,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'ERROR').upper())
|
||||
|
||||
logging.basicConfig(level=os.environ.get(
|
||||
'BUMBLE_LOGLEVEL', 'ERROR').upper())
|
||||
broadcast()
|
||||
|
||||
|
||||
@@ -929,4 +307,3 @@ if __name__ == "__main__":
|
||||
# SCAN 2 OFF 2
|
||||
# open F0F1F2F3F4F5 BROAD 2 0 1e40
|
||||
# MUSIC 91 PLAY
|
||||
|
||||
|
||||
@@ -1,121 +0,0 @@
|
||||
|
||||
import wasmtime
|
||||
import ctypes
|
||||
from typing import List, cast
|
||||
import wasmtime.loader
|
||||
import utils.liblc3 as liblc3 # type: ignore
|
||||
import enum
|
||||
|
||||
store = wasmtime.loader.store
|
||||
|
||||
_memory = cast(wasmtime.Memory, liblc3.memory)
|
||||
|
||||
STACK_POINTER = _memory.data_len(store)
|
||||
|
||||
_memory.grow(store, 1)
|
||||
|
||||
# Mapping wasmtime memory to linear address
|
||||
|
||||
memory = (ctypes.c_ubyte * _memory.data_len(store)).from_address(
|
||||
|
||||
ctypes.addressof(_memory.data_ptr(store).contents) # type: ignore
|
||||
|
||||
)
|
||||
|
||||
|
||||
class Liblc3PcmFormat(enum.IntEnum):
|
||||
|
||||
S16 = 0
|
||||
|
||||
S24 = 1
|
||||
|
||||
S24_3LE = 2
|
||||
|
||||
FLOAT = 3
|
||||
|
||||
|
||||
DEFAULT_PCM_SAMPLE_RATE = 48000
|
||||
MAX_DECODER_SIZE = liblc3.lc3_decoder_size(10000, DEFAULT_PCM_SAMPLE_RATE)
|
||||
MAX_ENCODER_SIZE = liblc3.lc3_encoder_size(10000, DEFAULT_PCM_SAMPLE_RATE)
|
||||
|
||||
DECODER_STACK_POINTER = STACK_POINTER
|
||||
ENCODER_STACK_POINTER = DECODER_STACK_POINTER + MAX_DECODER_SIZE * 2
|
||||
DECODE_BUFFER_STACK_POINTER = ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * 2
|
||||
ENCODE_BUFFER_STACK_POINTER = DECODE_BUFFER_STACK_POINTER + 8192
|
||||
|
||||
|
||||
DEFAULT_PCM_FORMAT = Liblc3PcmFormat
|
||||
|
||||
DEFAULT_PCM_BYTES_PER_SAMPLE = 2
|
||||
|
||||
|
||||
class LeAudioEncoder:
|
||||
|
||||
def __init__(self):
|
||||
self.encoders: List[int] = []
|
||||
pass
|
||||
|
||||
def setup_encoders(self, sample_rate: int, frame_duration_us: int, num_channels: int) -> None:
|
||||
"""Setup LE audio encoders
|
||||
|
||||
Args:
|
||||
sample_rate (int): Sample rate in Hz
|
||||
frame_duration_us (int): Frame duration in microseconds
|
||||
num_channels (int): Number of channels
|
||||
"""
|
||||
self.encoders[:num_channels] = [
|
||||
liblc3.lc3_setup_encoder(
|
||||
frame_duration_us,
|
||||
sample_rate,
|
||||
0, # Input sample rate
|
||||
ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * i,
|
||||
)
|
||||
for i in range(num_channels)
|
||||
]
|
||||
|
||||
def encode(
|
||||
self,
|
||||
sdu_length: int,
|
||||
num_channels: int,
|
||||
input_stride: int,
|
||||
input_data: bytes,
|
||||
) -> bytes:
|
||||
"""Encode a LE audio frame
|
||||
|
||||
Args:
|
||||
sdu_length (int): Length of the SDU
|
||||
num_channels (int): Number of channels
|
||||
input_stride (int): Stride of the input data
|
||||
input_data (bytes): Input data to encode
|
||||
|
||||
Returns:
|
||||
bytes: Encoded data
|
||||
"""
|
||||
if not input_data:
|
||||
return b""
|
||||
|
||||
input_buffer_offset = ENCODE_BUFFER_STACK_POINTER
|
||||
input_buffer_size = len(input_data)
|
||||
|
||||
# Copy into wasm memory
|
||||
memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_data
|
||||
|
||||
output_buffer_offset = input_buffer_offset + input_buffer_size
|
||||
output_buffer_size = sdu_length
|
||||
output_frame_size = output_buffer_size // num_channels
|
||||
|
||||
for i in range(num_channels):
|
||||
result = liblc3.lc3_encode(
|
||||
self.encoders[i],
|
||||
0,
|
||||
input_buffer_offset + DEFAULT_PCM_BYTES_PER_SAMPLE * i,
|
||||
input_stride,
|
||||
output_frame_size,
|
||||
output_buffer_offset + output_frame_size * i,
|
||||
)
|
||||
|
||||
if result != 0:
|
||||
raise RuntimeError(f"lc3_encode failed, result={result}")
|
||||
|
||||
# Extract encoded data from the output buffer
|
||||
return bytes(memory[output_buffer_offset : output_buffer_offset + output_buffer_size])
|
||||
Binary file not shown.
Reference in New Issue
Block a user