add bap_le_unicast client
This commit is contained in:
121
utils/le_audio_encoder.py
Normal file
121
utils/le_audio_encoder.py
Normal file
@@ -0,0 +1,121 @@
|
||||
|
||||
import wasmtime
|
||||
import ctypes
|
||||
from typing import List, cast
|
||||
import wasmtime.loader
|
||||
import liblc3 # type: ignore
|
||||
import enum
|
||||
|
||||
store = wasmtime.loader.store
|
||||
|
||||
_memory = cast(wasmtime.Memory, liblc3.memory)
|
||||
|
||||
STACK_POINTER = _memory.data_len(store)
|
||||
|
||||
_memory.grow(store, 1)
|
||||
|
||||
# Mapping wasmtime memory to linear address
|
||||
|
||||
memory = (ctypes.c_ubyte * _memory.data_len(store)).from_address(
|
||||
|
||||
ctypes.addressof(_memory.data_ptr(store).contents) # type: ignore
|
||||
|
||||
)
|
||||
|
||||
|
||||
class Liblc3PcmFormat(enum.IntEnum):
|
||||
|
||||
S16 = 0
|
||||
|
||||
S24 = 1
|
||||
|
||||
S24_3LE = 2
|
||||
|
||||
FLOAT = 3
|
||||
|
||||
|
||||
DEFAULT_PCM_SAMPLE_RATE = 48000
|
||||
MAX_DECODER_SIZE = liblc3.lc3_decoder_size(10000, DEFAULT_PCM_SAMPLE_RATE)
|
||||
MAX_ENCODER_SIZE = liblc3.lc3_encoder_size(10000, DEFAULT_PCM_SAMPLE_RATE)
|
||||
|
||||
DECODER_STACK_POINTER = STACK_POINTER
|
||||
ENCODER_STACK_POINTER = DECODER_STACK_POINTER + MAX_DECODER_SIZE * 2
|
||||
DECODE_BUFFER_STACK_POINTER = ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * 2
|
||||
ENCODE_BUFFER_STACK_POINTER = DECODE_BUFFER_STACK_POINTER + 8192
|
||||
|
||||
|
||||
DEFAULT_PCM_FORMAT = Liblc3PcmFormat
|
||||
|
||||
DEFAULT_PCM_BYTES_PER_SAMPLE = 2
|
||||
|
||||
|
||||
class LeAudioEncoder:
|
||||
|
||||
def __init__(self):
|
||||
self.encoders: List[int] = []
|
||||
pass
|
||||
|
||||
def setup_encoders(self, sample_rate: int, frame_duration_us: int, num_channels: int) -> None:
|
||||
"""Setup LE audio encoders
|
||||
|
||||
Args:
|
||||
sample_rate (int): Sample rate in Hz
|
||||
frame_duration_us (int): Frame duration in microseconds
|
||||
num_channels (int): Number of channels
|
||||
"""
|
||||
self.encoders[:num_channels] = [
|
||||
liblc3.lc3_setup_encoder(
|
||||
frame_duration_us,
|
||||
sample_rate,
|
||||
0, # Input sample rate
|
||||
ENCODER_STACK_POINTER + MAX_ENCODER_SIZE * i,
|
||||
)
|
||||
for i in range(num_channels)
|
||||
]
|
||||
|
||||
def encode(
|
||||
self,
|
||||
sdu_length: int,
|
||||
num_channels: int,
|
||||
input_stride: int,
|
||||
input_data: bytes,
|
||||
) -> bytes:
|
||||
"""Encode a LE audio frame
|
||||
|
||||
Args:
|
||||
sdu_length (int): Length of the SDU
|
||||
num_channels (int): Number of channels
|
||||
input_stride (int): Stride of the input data
|
||||
input_data (bytes): Input data to encode
|
||||
|
||||
Returns:
|
||||
bytes: Encoded data
|
||||
"""
|
||||
if not input_data:
|
||||
return b""
|
||||
|
||||
input_buffer_offset = ENCODE_BUFFER_STACK_POINTER
|
||||
input_buffer_size = len(input_data)
|
||||
|
||||
# Copy into wasm memory
|
||||
memory[input_buffer_offset : input_buffer_offset + input_buffer_size] = input_data
|
||||
|
||||
output_buffer_offset = input_buffer_offset + input_buffer_size
|
||||
output_buffer_size = sdu_length
|
||||
output_frame_size = output_buffer_size // num_channels
|
||||
|
||||
for i in range(num_channels):
|
||||
result = liblc3.lc3_encode(
|
||||
self.encoders[i],
|
||||
0,
|
||||
input_buffer_offset + DEFAULT_PCM_BYTES_PER_SAMPLE * i,
|
||||
input_stride,
|
||||
output_frame_size,
|
||||
output_buffer_offset + output_frame_size * i,
|
||||
)
|
||||
|
||||
if result != 0:
|
||||
raise RuntimeError(f"lc3_encode failed, result={result}")
|
||||
|
||||
# Extract encoded data from the output buffer
|
||||
return bytes(memory[output_buffer_offset : output_buffer_offset + output_buffer_size])
|
||||
Reference in New Issue
Block a user