diff --git a/apps/bap_unicast_client.py b/apps/bap_unicast_client.py index 2c74617..307bb10 100644 --- a/apps/bap_unicast_client.py +++ b/apps/bap_unicast_client.py @@ -6,6 +6,8 @@ Created on 26. Dec. 2024 @author: Markus Jellitsch """ +from typing import Coroutine +import click from scipy import signal import numpy as np import argparse @@ -38,6 +40,7 @@ from bumble.snoop import BtSnooper import functools from bumble.profiles.ascs import AudioStreamControlServiceProxy from bumble.hci import HCI_IsoDataPacket, HCI_LE_1M_PHY, HCI_LE_2M_PHY +from bumble import core import logging from leaudio import LeAudioEncoder @@ -306,81 +309,114 @@ class Listener(Device.Listener): break -async def client() -> None: - global complete_local_name - parser = argparse.ArgumentParser( - description="A simple example of argparse") - parser.add_argument("-c", "--config", type=str, - default="", help="device config file") - parser.add_argument( - "-p", "--port", help="com port (e.g. serial:/dev/ttyUSB0)") - parser.add_argument("-s", "--sample_rate", type=int, - default=0, help="choose a sample rate") - parser.add_argument("-f", "--frame_duration", type=int, - default=0, help="choose a frame duration") - parser.add_argument("-w", "--wave", type=str, - help="choose a frame duration") - parser.add_argument("-t", "--target_name", type=str, - help="target complete local name of the peer") - parser.add_argument("--verbose", "-v", action="count", default=0) - args = parser.parse_args() +def run_async(async_command: Coroutine) -> None: + try: + asyncio.run(async_command) + except core.ProtocolError as error: + print(f"Protocol error: {error}") + sys.exit(1) - if args.verbose > 0: +@click.command() +@click.argument( + 'port', + metavar='com port', + type=str, +) +@click.option( + '--wav', + metavar='WAV_FILE_PATH', + type=str, + help='Wav file path',) +@click.option( + '--target-name', + metavar='TARGET_NAME', + type=str, + default="BUMBLE", + help='Target name' +) +@click.option( + '--sample-rate', + metavar='SAMPLE_RATE', + type=int, + help='Sample rate', +) +@click.option( + '--verbose', + '-v', + is_flag=True, + default=False, + help='Enable verbose logging', +) +def unicast(port,wav,target_name,sample_rate,verbose): + """Start unicast client """ + # ctx.ensure_object(dict) + run_async( + run_unicast( + port=port, + wav=wav, + target_name=target_name, + sample_rate=sample_rate, + verbose=verbose + ) + ) + +async def run_unicast(port,wav,target_name,sample_rate,verbose) -> None: + global complete_local_name + + if verbose > 0: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.ERROR) - print("sample rate", args.sample_rate) - if not args.sample_rate or args.sample_rate == 0: + print("sample rate", sample_rate) + if not sample_rate or sample_rate == 0: app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_16000 app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US app_specific_codec.octets_per_codec_frame = 40 - elif args.sample_rate == 1: + elif sample_rate == 1: app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_24000 app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US app_specific_codec.octets_per_codec_frame = 60 - elif args.sample_rate == 2: + elif sample_rate == 2: app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_48000 app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US app_specific_codec.octets_per_codec_frame = 120 else: raise ValueError("unknown sample rate") - dev_config = args.config - if not dev_config or dev_config == "": - # Define the data as a Python dictionary - config_data = { - "name": "Unicast Client", - "address": "C0:98:E5:49:00:00" - } + # Define the data as a Python dictionary + config_data = { + "name": "Unicast Client", + "address": "C0:98:E5:49:00:00" + } - # Write the data to a JSON file - dev_config = "device.json" - with open(dev_config, "w") as outfile: - # Indent for better readability - json.dump(config_data, outfile, indent=4) + # Write the data to a JSON file + dev_config = "device.json" + with open(dev_config, "w") as outfile: + # Indent for better readability + json.dump(config_data, outfile, indent=4) print(f"sample rate: {app_specific_codec.sampling_frequency.hz} Hz") print(f"frame duration: {app_specific_codec.frame_duration.us} us") print(f"octets per codec frame: {app_specific_codec.octets_per_codec_frame} bytes") - if args.wave: - sound_file = args.wave + if wav: + sound_file = wav TEST_SINE = 0 else: TEST_SINE = 1 print("Test Sine is used") - if args.target_name: - complete_local_name = args.target_name + if target_name: + complete_local_name = target_name print(f"Target: {complete_local_name}") # clean controller reset - clean_reset(args.port) + clean_reset(port) # open the transport - hci_transport = await serial.open_serial_transport(args.port) + hci_transport = await serial.open_serial_transport(port) # create a device to manage the host, with a custom listener device = Device.from_config_file_with_hci( @@ -444,4 +480,4 @@ async def client() -> None: print("all done") def main(): - asyncio.run(client()) + asyncio.run(unicast())