refactor unicast_client for click args

This commit is contained in:
markus
2025-01-12 15:31:17 +01:00
parent be04e8255a
commit 5a29c2673b

View File

@@ -6,6 +6,8 @@ Created on 26. Dec. 2024
@author: Markus Jellitsch
"""
from typing import Coroutine
import click
from scipy import signal
import numpy as np
import argparse
@@ -38,6 +40,7 @@ from bumble.snoop import BtSnooper
import functools
from bumble.profiles.ascs import AudioStreamControlServiceProxy
from bumble.hci import HCI_IsoDataPacket, HCI_LE_1M_PHY, HCI_LE_2M_PHY
from bumble import core
import logging
from leaudio import LeAudioEncoder
@@ -306,81 +309,114 @@ class Listener(Device.Listener):
break
async def client() -> None:
global complete_local_name
parser = argparse.ArgumentParser(
description="A simple example of argparse")
parser.add_argument("-c", "--config", type=str,
default="", help="device config file")
parser.add_argument(
"-p", "--port", help="com port (e.g. serial:/dev/ttyUSB0)")
parser.add_argument("-s", "--sample_rate", type=int,
default=0, help="choose a sample rate")
parser.add_argument("-f", "--frame_duration", type=int,
default=0, help="choose a frame duration")
parser.add_argument("-w", "--wave", type=str,
help="choose a frame duration")
parser.add_argument("-t", "--target_name", type=str,
help="target complete local name of the peer")
parser.add_argument("--verbose", "-v", action="count", default=0)
args = parser.parse_args()
def run_async(async_command: Coroutine) -> None:
try:
asyncio.run(async_command)
except core.ProtocolError as error:
print(f"Protocol error: {error}")
sys.exit(1)
if args.verbose > 0:
@click.command()
@click.argument(
'port',
metavar='com port',
type=str,
)
@click.option(
'--wav',
metavar='WAV_FILE_PATH',
type=str,
help='Wav file path',)
@click.option(
'--target-name',
metavar='TARGET_NAME',
type=str,
default="BUMBLE",
help='Target name'
)
@click.option(
'--sample-rate',
metavar='SAMPLE_RATE',
type=int,
help='Sample rate',
)
@click.option(
'--verbose',
'-v',
is_flag=True,
default=False,
help='Enable verbose logging',
)
def unicast(port,wav,target_name,sample_rate,verbose):
"""Start unicast client """
# ctx.ensure_object(dict)
run_async(
run_unicast(
port=port,
wav=wav,
target_name=target_name,
sample_rate=sample_rate,
verbose=verbose
)
)
async def run_unicast(port,wav,target_name,sample_rate,verbose) -> None:
global complete_local_name
if verbose > 0:
logging.basicConfig(level=logging.DEBUG)
else:
logging.basicConfig(level=logging.ERROR)
print("sample rate", args.sample_rate)
if not args.sample_rate or args.sample_rate == 0:
print("sample rate", sample_rate)
if not sample_rate or sample_rate == 0:
app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_16000
app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US
app_specific_codec.octets_per_codec_frame = 40
elif args.sample_rate == 1:
elif sample_rate == 1:
app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_24000
app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US
app_specific_codec.octets_per_codec_frame = 60
elif args.sample_rate == 2:
elif sample_rate == 2:
app_specific_codec.sampling_frequency = SamplingFrequency.FREQ_48000
app_specific_codec.frame_duration = FrameDuration.DURATION_10000_US
app_specific_codec.octets_per_codec_frame = 120
else:
raise ValueError("unknown sample rate")
dev_config = args.config
if not dev_config or dev_config == "":
# Define the data as a Python dictionary
config_data = {
"name": "Unicast Client",
"address": "C0:98:E5:49:00:00"
}
# Define the data as a Python dictionary
config_data = {
"name": "Unicast Client",
"address": "C0:98:E5:49:00:00"
}
# Write the data to a JSON file
dev_config = "device.json"
with open(dev_config, "w") as outfile:
# Indent for better readability
json.dump(config_data, outfile, indent=4)
# Write the data to a JSON file
dev_config = "device.json"
with open(dev_config, "w") as outfile:
# Indent for better readability
json.dump(config_data, outfile, indent=4)
print(f"sample rate: {app_specific_codec.sampling_frequency.hz} Hz")
print(f"frame duration: {app_specific_codec.frame_duration.us} us")
print(f"octets per codec frame: {app_specific_codec.octets_per_codec_frame} bytes")
if args.wave:
sound_file = args.wave
if wav:
sound_file = wav
TEST_SINE = 0
else:
TEST_SINE = 1
print("Test Sine is used")
if args.target_name:
complete_local_name = args.target_name
if target_name:
complete_local_name = target_name
print(f"Target: {complete_local_name}")
# clean controller reset
clean_reset(args.port)
clean_reset(port)
# open the transport
hci_transport = await serial.open_serial_transport(args.port)
hci_transport = await serial.open_serial_transport(port)
# create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(
@@ -444,4 +480,4 @@ async def client() -> None:
print("all done")
def main():
asyncio.run(client())
asyncio.run(unicast())