use bis link API

This commit is contained in:
Gilles Boccon-Gibod
2025-02-22 13:32:58 -08:00
parent b932bafe6d
commit cc21ed27c7

View File

@@ -783,16 +783,8 @@ async def run_receive(
for i, bis_link in enumerate(big_sync.bis_links):
print(f'Setup ISO for BIS {bis_link.handle}')
bis_link.sink = functools.partial(sink, lc3_queues[i])
await device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=bis_link.handle,
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST,
data_path_id=0,
codec_id=hci.CodingFormat(codec_id=hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
),
check_result=True,
await bis_link.setup_data_path(
direction=bis_link.Direction.CONTROLLER_TO_HOST
)
terminated = asyncio.Event()
@@ -953,13 +945,14 @@ async def run_transmit(
),
)
for bis_link in big.bis_links:
print(f'Setup ISO for BIS {bis_link.handle}')
await bis_link.setup_data_path(
direction=bis_link.Direction.HOST_TO_CONTROLLER
)
iso_queues = [
bumble.device.IsoPacketStream(big.bis_links[0], 64),
bumble.device.IsoPacketStream(big.bis_links[1], 64),
bumble.device.IsoPacketStream(bis_link, 64)
for bis_link in big.bis_links
]
def on_flow():