Files
leaudio/apps/bap_unicast_client.py

448 lines
14 KiB
Python

"""
BAP Unicast Client Class
Created on 26. Dec. 2024
@author: Markus Jellitsch
"""
from scipy import signal
import numpy as np
import argparse
import json
import serial as ser
from bumble.profiles.bap import (
AudioLocation,
SamplingFrequency,
FrameDuration,
CodecSpecificConfiguration,
)
from bumble.profiles.ascs import (
AudioStreamControlServiceProxy,
ASE_Config_Codec,
ASE_Config_QOS,
ASE_Disable,
ASE_Enable,
)
from bumble.hci import CodecID, CodingFormat
from bumble.profiles.pacs import (
PacRecord,
PublishedAudioCapabilitiesServiceProxy,
)
from bumble.transport import serial
from bumble.utils import AsyncRunner
from bumble.device import Device, Peer, ConnectionParametersPreferences, Connection
from bumble.core import AdvertisingData
from bumble.snoop import BtSnooper
import functools
from bumble.profiles.ascs import AudioStreamControlServiceProxy
from bumble.hci import HCI_IsoDataPacket, HCI_LE_1M_PHY, HCI_LE_2M_PHY
import logging
from leaudio import LeAudioEncoder
import asyncio
import time
import os
from bumble import hci
import sys
from leaudio import generate_sine_data, read_wav_file
app_specific_codec = CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_24000,
frame_duration=FrameDuration.DURATION_10000_US,
audio_channel_allocation=AudioLocation.FRONT_RIGHT,
octets_per_codec_frame=60,
codec_frames_per_sdu=1,
)
TEST_SINE = 1
complete_local_name = "BUMBLE"
iso_packets = []
upsampled_left_channel = None
def clean_reset(com_port):
try:
with ser.Serial(com_port, baudrate=1000000, timeout=1) as s:
# Send the HCI_RESET command (0x03)
s.write(bytes(hci.HCI_Reset_Command()))
time.sleep(0.5)
s.write(bytes(hci.HCI_Reset_Command()))
print(f"HCI_RESET command sent to {com_port}")
except Exception as e:
print(f"Error opening or accessing {com_port}: {e}")
class Listener(Device.Listener):
packet_sequence_number = 0
def __init__(self, device):
self.device = device
@AsyncRunner.run_in_task()
async def on_advertisement(self, advertisement):
def parse_ltv_packet(data):
packets = []
i = 0
while i < len(data)-1:
length_byte = data[i]
type_byte = data[i + 1]
value_bytes = data[i+2:i+length_byte+1]
packets.append((length_byte, type_byte, value_bytes))
i += length_byte+1
return packets
ltv_packets = parse_ltv_packet(advertisement.data_bytes)
for _, type, values in ltv_packets:
if type is AdvertisingData.COMPLETE_LOCAL_NAME:
to_compare = bytes(complete_local_name, "utf-8")
if values == to_compare:
print("found device " + complete_local_name)
await self.device.stop_scanning()
params = ConnectionParametersPreferences()
params.connection_interval_min = (47)
params.connection_interval_max = (47)
prefs = {HCI_LE_1M_PHY: params, HCI_LE_2M_PHY: params}
await self.device.connect(peer_address=advertisement.address, connection_parameters_preferences=prefs)
@AsyncRunner.run_in_task()
async def on_connection(self, connection: Connection):
notifications = {1: asyncio.Queue()}
def on_notification(data: bytes, ase_id: int):
notifications[ase_id].put_nowait(data)
print(f'=== Connected to {connection}')
peer = Peer(connection)
# set PHY to 2M
await connection.set_phy(rx_phys=[HCI_LE_2M_PHY], tx_phys=[HCI_LE_2M_PHY])
# pair with the device
await connection.pair()
# request mtu change
mtu = await peer.request_mtu(1691)
# get remote features
remote_features = await self.device.get_remote_le_features(connection)
print(f"peer supports the following features:{remote_features}")
# discover services
pacs_client = await peer.discover_service_and_create_proxy(PublishedAudioCapabilitiesServiceProxy)
ascs_client = await peer.discover_service_and_create_proxy(AudioStreamControlServiceProxy)
# read sink PACs
response = await pacs_client.sink_pac.read_value()
pac_record = PacRecord.from_bytes(response[1:])
print(pac_record)
# enable ASCS notifications
await ascs_client.ase_control_point.subscribe()
await ascs_client.sink_ase[0].subscribe(
functools.partial(on_notification, ase_id=1)
)
# read sink ASE state
sink_state = await ascs_client.sink_ase[0].read_value()
print(sink_state)
print(app_specific_codec)
await ascs_client.ase_control_point.write_value(
ASE_Config_Codec(
ase_id=[1],
target_latency=[0x3],
target_phy=[2],
codec_id=[CodingFormat(CodecID.LC3)],
codec_specific_configuration=[app_specific_codec],
)
)
# wait for notification
await notifications[1].get()
print("ASE: codec configured")
# setup the CIG
cis_handles = await self.device.setup_cig(
cig_id=1,
cis_id=[1],
sdu_interval=(app_specific_codec.frame_duration.us,
app_specific_codec.frame_duration.us),
framing=0,
max_sdu=(app_specific_codec.octets_per_codec_frame, 0),
retransmission_number=15,
max_transport_latency=(95, 95),
)
# configure ASE (config QOS)
await ascs_client.ase_control_point.write_value(
ASE_Config_QOS(
ase_id=[1],
cig_id=[1],
cis_id=[1],
sdu_interval=[app_specific_codec.frame_duration.us],
framing=[0],
phy=[2],
max_sdu=[app_specific_codec.octets_per_codec_frame],
retransmission_number=[15],
max_transport_latency=[95],
presentation_delay=[40000],
)
)
# wait for notifications
await notifications[1].get()
print("ASE: QOS configured")
# configure ASE (Enable)
await ascs_client.ase_control_point.write_value(
ASE_Enable(
ase_id=[1],
# neeeded?
metadata=[bytes([0x03, 0x02, 0x01, 0x00])],
)
)
# wait for notifications
await notifications[1].get()
print('ASE: enabling')
# create CIS
await self.device.create_cis(
[
(cis_handles[0], connection.handle)
]
)
print('ASE: cis established')
await self.device.send_command(
hci.HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=cis_handles[0],
data_path_direction=hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER,
data_path_id=0x00, # Fixed HCI
codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
controller_delay=0,
codec_configuration=b'',
))
# wait for notifications
await notifications[1].get()
print('ASE: audio stream enabled')
# prepere the ISO packets
self.packet_sequence_number = 0
self.iso_packet = HCI_IsoDataPacket(
connection_handle=cis_handles[0],
data_total_length=app_specific_codec.octets_per_codec_frame + 4,
packet_sequence_number=self.packet_sequence_number,
pb_flag=0b10,
packet_status_flag=0,
iso_sdu_length=app_specific_codec.octets_per_codec_frame,
iso_sdu_fragment=bytes(
[0]*app_specific_codec.octets_per_codec_frame),
)
self.send_complete = False
def on_iso_pdu_sent(event):
if self.packet_sequence_number < len(iso_packets) - 1:
# send the next ISO packet
self.packet_sequence_number += 1
self.iso_packet.packet_sequence_number = self.packet_sequence_number
self.iso_packet.iso_sdu_fragment = iso_packets[self.packet_sequence_number]
self.device.host.send_hci_packet(self.iso_packet)
else:
self.send_complete = True
self.device.host.on('packet_complete', on_iso_pdu_sent)
self.device.host.send_hci_packet(self.iso_packet)
while True:
await asyncio.sleep(1)
if self.send_complete:
print("send complete. bye bye")
await self.device.power_off()
self.device.future.set_result(None)
break
async def client() -> None:
global complete_local_name
parser = argparse.ArgumentParser(
description="A simple example of argparse")
parser.add_argument("-c", "--config", type=str,
default="", help="device config file")
parser.add_argument(
"-p", "--port", help="com port (e.g. serial:/dev/ttyUSB0)")
parser.add_argument("-s", "--sample_rate", type=int,
default=0, help="choose a sample rate")
parser.add_argument("-f", "--frame_duration", type=int,
default=0, help="choose a frame duration")
parser.add_argument("-w", "--wave", type=str,
help="choose a frame duration")
parser.add_argument("-t", "--target_name", type=str,
help="target complete local name of the peer")
parser.add_argument("--verbose", "-v", action="count", default=0)
args = parser.parse_args()
if args.verbose > 0:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.ERROR)
print("sample rate", args.sample_rate)
if not args.sample_rate or args.sample_rate == 0:
app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_16000
app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US
app_specific_codec.octets_per_codec_frame = 40
elif args.sample_rate == 1:
app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_24000
app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US
app_specific_codec.octets_per_codec_frame = 60
elif args.sample_rate == 2:
app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_48000
app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US
app_specific_codec.octets_per_codec_frame = 120
else:
raise ValueError("unknown sample rate")
dev_config = args.config
if not dev_config or dev_config == "":
# Define the data as a Python dictionary
config_data = {
"name": "Unicast Client",
"address": "C0:98:E5:49:00:00"
}
# Write the data to a JSON file
dev_config = "device.json"
with open(dev_config, "w") as outfile:
# Indent for better readability
json.dump(config_data, outfile, indent=4)
print(f"sample rate: {app_specific_codec.sampling_frequency.hz} Hz")
print(f"frame duration: {app_specific_codec.frame_duration.us} us")
print(f"octets per codec frame: {app_specific_codec.octets_per_codec_frame} bytes")
if args.wave:
sound_file = args.wave
TEST_SINE = 0
else:
TEST_SINE = 1
print("Test Sine is used")
if args.target_name:
complete_local_name = args.target_name
print(f"Target: {complete_local_name}")
# clean controller reset
clean_reset(args.port)
# open the transport
hci_transport = await serial.open_serial_transport(args.port)
# create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(
dev_config, hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
device.cis_enabled = True
# create snoop file
f = open("log.btsnoop", "wb")
Snooper = BtSnooper(f)
device.host.snooper = Snooper
# setup the LC3 encoder
encoder = LeAudioEncoder()
encoder.setup_encoders(
app_specific_codec.sampling_frequency.hz,
app_specific_codec.frame_duration.us,
1,
)
num_runs = 0
# prepare the samples
# calculate the number of samples per frame duration.
sample_size = int(app_specific_codec.sampling_frequency.hz *
app_specific_codec.frame_duration.us / 1000 / 1000)
if TEST_SINE == 0:
if os.path.isfile(sound_file):
upsampled_left_channel = read_wav_file(sound_file,app_specific_codec.sampling_frequency.hz)
else:
raise FileNotFoundError(f"The file {sound_file} does not exist.")
num_runs = len(upsampled_left_channel) // sample_size
else:
num_runs = 2000
for i in range(num_runs):
if TEST_SINE == 0:
pcm_data = upsampled_left_channel[i *
sample_size:i*sample_size+sample_size]
else:
pcm_data = generate_sine_data(
1000, app_specific_codec.sampling_frequency.hz, app_specific_codec.frame_duration.us / 1000000)
data = encoder.encode(
app_specific_codec.octets_per_codec_frame, 1, 1, bytes(pcm_data))
iso_packets.append(data)
print("encoding finished. num_iso_packets:", len(iso_packets))
print("power on ...")
await device.power_on()
print(f'start scanning ...')
await device.start_scanning(scanning_phys=[HCI_LE_1M_PHY], legacy=False)
device.future = asyncio.get_running_loop().create_future()
await device.future
print("all done")
def main():
asyncio.run(client())