Rework Python binding

This commit is contained in:
Josh Wu
2024-12-11 17:50:20 +08:00
committed by Antoine SOULIER
parent bb85f7dde4
commit 9c59375ae0
5 changed files with 363 additions and 191 deletions

View File

@@ -12,6 +12,9 @@ authors = [
description = "LC3 Codec library wrapper" description = "LC3 Codec library wrapper"
requires-python = ">=3.10" requires-python = ">=3.10"
[project.optional-dependencies]
dev = ["pytest"]
[project.urls] [project.urls]
Homepage = "https://github.com/google/liblc3" Homepage = "https://github.com/google/liblc3"

View File

@@ -13,96 +13,124 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from __future__ import annotations
import array import array
import ctypes import ctypes
import enum import enum
import glob import glob
import os import os
import typing
from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p from ctypes import c_bool, c_byte, c_int, c_uint, c_size_t, c_void_p
from ctypes.util import find_library from ctypes.util import find_library
from collections.abc import Iterable
class BaseError(Exception):
"""Base error raised by liblc3."""
class InitializationError(RuntimeError, BaseError):
"""Error raised when liblc3 cannot be initialized."""
class InvalidArgumentError(ValueError, BaseError):
"""Error raised when a bad argument is given."""
class _PcmFormat(enum.IntEnum):
S16 = 0
S24 = 1
S24_3LE = 2
FLOAT = 3
class _Base: class _Base:
def __init__(self, frame_duration, samplerate, nchannels, **kwargs): def __init__(
self,
frame_duration_us: int,
sample_rate_hz: int,
num_channels: int,
hrmode: bool = False,
pcm_sample_rate_hz: int | None = None,
libpath: str | None = None,
) -> None:
self.hrmode = False self.hrmode = hrmode
self.dt_us = int(frame_duration * 1000) self.frame_duration_us = frame_duration_us
self.sr_hz = int(samplerate) self.sample_rate_hz = sample_rate_hz
self.sr_pcm_hz = self.sr_hz self.pcm_sample_rate_hz = pcm_sample_rate_hz or self.sample_rate_hz
self.nchannels = nchannels self.num_channels = num_channels
libpath = None if self.frame_duration_us not in [2500, 5000, 7500, 10000]:
raise InvalidArgumentError(
"Invalid frame duration: %.1f ms" % self.frame_duration_us
)
for k in kwargs.keys(): allowed_samplerate = (
if k == 'hrmode': [8000, 16000, 24000, 32000, 48000] if not self.hrmode else [48000, 96000]
self.hrmode = bool(kwargs[k]) )
elif k == 'pcm_samplerate':
self.sr_pcm_hz = int(kwargs[k])
elif k == 'libpath':
libpath = kwargs[k]
else:
raise ValueError("Invalid keyword argument: " + k)
if self.dt_us not in [2500, 5000, 7500, 10000]: if self.sample_rate_hz not in allowed_samplerate:
raise ValueError( raise InvalidArgumentError("Invalid sample rate: %d Hz" % sample_rate_hz)
"Invalid frame duration: %.1f ms" % frame_duration)
allowed_samplerate = [8000, 16000, 24000, 32000, 48000] \
if not self.hrmode else [48000, 96000]
if self.sr_hz not in allowed_samplerate:
raise ValueError("Invalid sample rate: %d Hz" % samplerate)
if libpath is None: if libpath is None:
mesonpy_lib = glob.glob(os.path.join(os.path.dirname(__file__), '.lc3.mesonpy.libs', '*lc3*')) mesonpy_lib = glob.glob(
os.path.join(os.path.dirname(__file__), ".lc3.mesonpy.libs", "*lc3*")
)
if mesonpy_lib: if mesonpy_lib:
libpath = mesonpy_lib[0] libpath = mesonpy_lib[0]
else: else:
libpath = find_library("lc3") libpath = find_library("lc3")
if not libpath: if not libpath:
raise Exception("LC3 library not found") raise InitializationError("LC3 library not found")
lib = ctypes.cdll.LoadLibrary(libpath) lib = ctypes.cdll.LoadLibrary(libpath)
try: if not all(
lib.lc3_hr_frame_samples \ hasattr(lib, func)
and lib.lc3_hr_frame_block_bytes \ for func in (
and lib.lc3_hr_resolve_bitrate \ "lc3_hr_frame_samples",
and lib.lc3_hr_delay_samples "lc3_hr_frame_block_bytes",
"lc3_hr_resolve_bitrate",
except AttributeError: "lc3_hr_delay_samples",
)
):
if self.hrmode: if self.hrmode:
raise Exception('High-Resolution interface not available') raise InitializationError("High-Resolution interface not available")
lib.lc3_hr_frame_samples = \ lc3_hr_frame_samples = lambda hrmode, dt_us, sr_hz: lib.lc3_frame_samples(
lambda hrmode, dt_us, sr_hz: \ dt_us, sr_hz
lib.lc3_frame_samples(dt_us, sr_hz) )
lc3_hr_frame_block_bytes = (
lib.lc3_hr_frame_block_bytes = \ lambda hrmode, dt_us, sr_hz, num_channels, bitrate: num_channels
lambda hrmode, dt_us, sr_hz, nchannels, bitrate: \ * lib.lc3_frame_bytes(dt_us, bitrate // 2)
nchannels * lib.lc3_frame_bytes(dt_us, bitrate // 2) )
lc3_hr_resolve_bitrate = (
lib.lc3_hr_resolve_bitrate = \ lambda hrmode, dt_us, sr_hz, nbytes: lib.lc3_resolve_bitrate(
lambda hrmode, dt_us, sr_hz, nbytes: \ dt_us, nbytes
lib.lc3_resolve_bitrate(dt_us, nbytes) )
)
lib.lc3_hr_delay_samples = \ lc3_hr_delay_samples = lambda hrmode, dt_us, sr_hz: lib.lc3_delay_samples(
lambda hrmode, dt_us, sr_hz: \ dt_us, sr_hz
lib.lc3_delay_samples(dt_us, sr_hz) )
setattr(lib, "lc3_hr_frame_samples", lc3_hr_frame_samples)
setattr(lib, "lc3_hr_frame_block_bytes", lc3_hr_frame_block_bytes)
setattr(lib, "lc3_hr_resolve_bitrate", lc3_hr_resolve_bitrate)
setattr(lib, "lc3_hr_delay_samples", lc3_hr_delay_samples)
lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int] lib.lc3_hr_frame_samples.argtypes = [c_bool, c_int, c_int]
lib.lc3_hr_frame_block_bytes.argtypes = \ lib.lc3_hr_frame_block_bytes.argtypes = [c_bool, c_int, c_int, c_int, c_int]
[c_bool, c_int, c_int, c_int, c_int]
lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int] lib.lc3_hr_resolve_bitrate.argtypes = [c_bool, c_int, c_int, c_int]
lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int] lib.lc3_hr_delay_samples.argtypes = [c_bool, c_int, c_int]
self.lib = lib self.lib = lib
libc = ctypes.cdll.LoadLibrary(find_library("c")) if not (libc_path := find_library("c")):
raise InitializationError("Unable to find libc")
libc = ctypes.cdll.LoadLibrary(libc_path)
self.malloc = libc.malloc self.malloc = libc.malloc
self.malloc.argtypes = [c_size_t] self.malloc.argtypes = [c_size_t]
@@ -111,138 +139,192 @@ class _Base:
self.free = libc.free self.free = libc.free
self.free.argtypes = [c_void_p] self.free.argtypes = [c_void_p]
def get_frame_samples(self): def get_frame_samples(self) -> int:
""" """
Returns the number of PCM samples in an LC3 frame Returns the number of PCM samples in an LC3 frame.
""" """
ret = self.lib.lc3_hr_frame_samples( ret = self.lib.lc3_hr_frame_samples(
self.hrmode, self.dt_us, self.sr_pcm_hz) self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz
)
if ret < 0: if ret < 0:
raise ValueError("Bad parameters") raise InvalidArgumentError("Bad parameters")
return ret return ret
def get_frame_bytes(self, bitrate): def get_frame_bytes(self, bitrate: int) -> int:
""" """
Returns the size of LC3 frame blocks, from bitrate in bit per seconds. Returns the size of LC3 frame blocks, from bitrate in bit per seconds.
A target `bitrate` equals 0 or `INT32_MAX` returns respectively A target `bitrate` equals 0 or `INT32_MAX` returns respectively
the minimum and maximum allowed size. the minimum and maximum allowed size.
""" """
ret = self.lib.lc3_hr_frame_block_bytes( ret = self.lib.lc3_hr_frame_block_bytes(
self.hrmode, self.dt_us, self.sr_hz, self.nchannels, bitrate) self.hrmode,
self.frame_duration_us,
self.sample_rate_hz,
self.num_channels,
bitrate,
)
if ret < 0: if ret < 0:
raise ValueError("Bad parameters") raise InvalidArgumentError("Bad parameters")
return ret return ret
def resolve_bitrate(self, nbytes): def resolve_bitrate(self, num_bytes: int) -> int:
""" """
Returns the bitrate in bits per seconds, from the size of LC3 frames. Returns the bitrate in bits per seconds, from the size of LC3 frames.
""" """
ret = self.lib.lc3_hr_resolve_bitrate( ret = self.lib.lc3_hr_resolve_bitrate(
self.hrmode, self.dt_us, self.sr_hz, nbytes) self.hrmode, self.frame_duration_us, self.sample_rate_hz, num_bytes
)
if ret < 0: if ret < 0:
raise ValueError("Bad parameters") raise InvalidArgumentError("Bad parameters")
return ret return ret
def get_delay_samples(self): def get_delay_samples(self) -> int:
"""
Returns the algorithmic delay, as a number of samples.
""" """
Returns the algorithmic delay, as a number of samples.
"""
ret = self.lib.lc3_hr_delay_samples( ret = self.lib.lc3_hr_delay_samples(
self.hrmode, self.dt_us, self.sr_pcm_hz) self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz
)
if ret < 0: if ret < 0:
raise ValueError("Bad parameters") raise InvalidArgumentError("Bad parameters")
return ret return ret
@staticmethod @classmethod
def _resolve_pcm_format(bitdepth): def _resolve_pcm_format(cls, bit_depth: int | None) -> tuple[
PCM_FORMAT_S16 = 0 _PcmFormat,
PCM_FORMAT_S24 = 1 type[ctypes.c_int16] | type[ctypes.Array[ctypes.c_byte]] | type[ctypes.c_float],
PCM_FORMAT_S24_3LE = 2 ]:
PCM_FORMAT_FLOAT = 3 match bit_depth:
case 16:
match bitdepth: return (_PcmFormat.S16, ctypes.c_int16)
case 16: return (PCM_FORMAT_S16, ctypes.c_int16) case 24:
case 24: return (PCM_FORMAT_S24_3LE, 3 * ctypes.c_byte) return (_PcmFormat.S24_3LE, 3 * ctypes.c_byte)
case None: return (PCM_FORMAT_FLOAT, ctypes.c_float) case None:
case _: raise ValueError("Could not interpret PCM bitdepth") return (_PcmFormat.FLOAT, ctypes.c_float)
case _:
raise InvalidArgumentError("Could not interpret PCM bit_depth")
class Encoder(_Base): class Encoder(_Base):
""" """
LC3 Encoder wrapper LC3 Encoder wrapper.
The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 The `frame_duration_us`, in microsecond, is any of 2500, 5000, 7500, or 10000.
or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 The `sample_rate_hz`, in Hertz, is any of 8000, 16000, 24000, 32000
or 48000, unless High-Resolution mode is enabled. In High-Resolution mode, or 48000, unless High-Resolution mode is enabled. In High-Resolution mode,
the `samplerate` is 48000 or 96000. the `sample_rate_hz` is 48000 or 96000.
By default, one channel is processed. When `nchannels` is greater than one, By default, one channel is processed. When `num_channels` is greater than one,
the PCM input stream is read interleaved and consecutives LC3 frames are the PCM input stream is read interleaved and consecutives LC3 frames are
output, for each channel. output, for each channel.
Keyword arguments: Optional arguments:
hrmode : Enable High-Resolution mode, default is `False`. hrmode : Enable High-Resolution mode, default is `False`.
sr_pcm_hz : Input PCM samplerate, enable downsampling of input. input_sample_rate_hz : Input PCM samplerate, enable downsampling of input.
libpath : LC3 library path and name libpath : LC3 library path and name
""" """
class c_encoder_t(c_void_p): class c_encoder_t(c_void_p):
pass pass
def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): def __init__(
self,
frame_duration_us: int,
sample_rate_hz: int,
num_channels: int = 1,
hrmode: bool = False,
input_sample_rate_hz: int | None = None,
libpath: str | None = None,
) -> None:
super().__init__(frame_duration, samplerate, nchannels, **kwargs) super().__init__(
frame_duration_us,
sample_rate_hz,
num_channels,
hrmode,
input_sample_rate_hz,
libpath,
)
lib = self.lib lib = self.lib
try: if not all(
lib.lc3_hr_encoder_size \ hasattr(lib, func)
and lib.lc3_hr_setup_encoder for func in ("lc3_hr_encoder_size", "lc3_hr_setup_encoder")
):
if self.hrmode:
raise InitializationError("High-Resolution interface not available")
except AttributeError: lc3_hr_encoder_size = lambda hrmode, dt_us, sr_hz: lib.lc3_encoder_size(
dt_us, sr_hz
)
assert not self.hrmode lc3_hr_setup_encoder = (
lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: lib.lc3_setup_encoder(
lib.lc3_hr_encoder_size = \ dt_us, sr_hz, sr_pcm_hz, mem
lambda hrmode, dt_us, sr_hz: \ )
lib.lc3_encoder_size(dt_us, sr_hz) )
setattr(lib, "lc3_hr_encoder_size", lc3_hr_encoder_size)
lib.lc3_hr_setup_encoder = \ setattr(lib, "lc3_hr_setup_encoder", lc3_hr_setup_encoder)
lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \
lib.lc3_setup_encoder(dt_us, sr_hz, sr_pcm_hz, mem)
lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int] lib.lc3_hr_encoder_size.argtypes = [c_bool, c_int, c_int]
lib.lc3_hr_encoder_size.restype = c_uint lib.lc3_hr_encoder_size.restype = c_uint
lib.lc3_hr_setup_encoder.argtypes = \ lib.lc3_hr_setup_encoder.argtypes = [c_bool, c_int, c_int, c_int, c_void_p]
[c_bool, c_int, c_int, c_int, c_void_p]
lib.lc3_hr_setup_encoder.restype = self.c_encoder_t lib.lc3_hr_setup_encoder.restype = self.c_encoder_t
lib.lc3_encode.argtypes = \ lib.lc3_encode.argtypes = [
[self.c_encoder_t, c_int, c_void_p, c_int, c_int, c_void_p] self.c_encoder_t,
c_int,
c_void_p,
c_int,
c_int,
c_void_p,
]
def new_encoder(): return lib.lc3_hr_setup_encoder( def new_encoder():
self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, return lib.lc3_hr_setup_encoder(
self.malloc(lib.lc3_hr_encoder_size( self.hrmode,
self.hrmode, self.dt_us, self.sr_pcm_hz))) self.frame_duration_us,
self.sample_rate_hz,
self.pcm_sample_rate_hz,
self.malloc(
lib.lc3_hr_encoder_size(
self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz
)
),
)
self.__encoders = [new_encoder() for i in range(nchannels)] self.__encoders = [new_encoder() for _ in range(num_channels)]
def __del__(self): def __del__(self) -> None:
try: try:
(self.free(encoder) for encoder in self.__encoders) (self.free(encoder) for encoder in self.__encoders)
finally: finally:
return return
def encode(self, pcm, nbytes, bitdepth=None): @typing.overload
""" def encode(
Encode LC3 frame(s), for each channel. self,
pcm: bytes | bytearray | memoryview | Iterable[float],
num_bytes: int,
bit_depth: None = None,
) -> bytes: ...
The `pcm` input is given in two ways. When no `bitdepth` is defined, @typing.overload
def encode(
self, pcm: bytes | bytearray | memoryview, num_bytes: int, bit_depth: int
) -> bytes: ...
def encode(self, pcm, num_bytes: int, bit_depth: int | None = None) -> bytes:
"""
Encodes LC3 frame(s), for each channel.
The `pcm` input is given in two ways. When no `bit_depth` is defined,
it's a vector of floating point values from -1 to 1, coding the sample it's a vector of floating point values from -1 to 1, coding the sample
levels. When `bitdepth` is defined, `pcm` is interpreted as a byte-like levels. When `bit_depth` is defined, `pcm` is interpreted as a byte-like
object, each sample coded on `bitdepth` bits (16 or 24). object, each sample coded on `bit_depth` bits (16 or 24).
The machine endianness, or little endian, is used for 16 or 24 bits The machine endianness, or little endian, is used for 16 or 24 bits
width, respectively. width, respectively.
In both cases, the `pcm` vector data is padded with zeros when In both cases, the `pcm` vector data is padded with zeros when
@@ -250,151 +332,191 @@ class Encoder(_Base):
Channels concatenation of encoded LC3 frames, of `nbytes`, is returned. Channels concatenation of encoded LC3 frames, of `nbytes`, is returned.
""" """
nchannels = self.nchannels nchannels = self.num_channels
frame_samples = self.get_frame_samples() frame_samples = self.get_frame_samples()
(pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) (pcm_fmt, pcm_t) = self._resolve_pcm_format(bit_depth)
pcm_len = nchannels * frame_samples pcm_len = nchannels * frame_samples
if bitdepth is None: if bit_depth is None:
pcm_buffer = array.array('f', pcm) pcm_buffer = array.array("f", pcm)
# Invert test to catch NaN # Invert test to catch NaN
if not abs(sum(pcm)) / frame_samples < 2: if not abs(sum(pcm)) / frame_samples < 2:
raise ValueError("Out of range PCM input") raise InvalidArgumentError("Out of range PCM input")
padding = max(pcm_len - frame_samples, 0) padding = max(pcm_len - frame_samples, 0)
pcm_buffer.extend(array.array('f', [0] * padding)) pcm_buffer.extend(array.array("f", [0] * padding))
else: else:
padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0) padding = max(pcm_len * ctypes.sizeof(pcm_t) - len(pcm), 0)
pcm_buffer = bytearray(pcm) + bytearray(padding) pcm_buffer = bytearray(pcm) + bytearray(padding) # type: ignore
data_buffer = (c_byte * nbytes)() data_buffer = (c_byte * num_bytes)()
data_offset = 0 data_offset = 0
for (ich, encoder) in enumerate(self.__encoders): for ich, encoder in enumerate(self.__encoders):
pcm_offset = ich * ctypes.sizeof(pcm_t) pcm_offset = ich * ctypes.sizeof(pcm_t)
pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset)
data_size = nbytes // nchannels + int(ich < nbytes % nchannels) data_size = num_bytes // nchannels + int(ich < num_bytes % nchannels)
data = (c_byte * data_size).from_buffer(data_buffer, data_offset) data = (c_byte * data_size).from_buffer(data_buffer, data_offset)
data_offset += data_size data_offset += data_size
ret = self.lib.lc3_encode( ret = self.lib.lc3_encode(encoder, pcm_fmt, pcm, nchannels, len(data), data)
encoder, pcm_fmt, pcm, nchannels, len(data), data)
if ret < 0: if ret < 0:
raise ValueError("Bad parameters") raise InvalidArgumentError("Bad parameters")
return bytes(data_buffer) return bytes(data_buffer)
class Decoder(_Base): class Decoder(_Base):
""" """
LC3 Decoder wrapper LC3 Decoder wrapper.
The `frame_duration` expressed in milliseconds is any of 2.5, 5.0, 7.5 The `frame_duration_us`, in microsecond, is any of 2500, 5000, 7500, or 10000.
or 10.0. The `samplerate`, in Hertz, is any of 8000, 16000, 24000, 32000 The `sample_rate_hz`, in Hertz, is any of 8000, 16000, 24000, 32000
or 48000, unless High-Resolution mode is enabled. In High-Resolution or 48000, unless High-Resolution mode is enabled. In High-Resolution mode,
mode, the `samplerate` is 48000 or 96000. the `sample_rate_hz` is 48000 or 96000.
By default, one channel is processed. When `nchannels` is greater than one, By default, one channel is processed. When `num_chanels` is greater than one,
the PCM input stream is read interleaved and consecutives LC3 frames are the PCM input stream is read interleaved and consecutives LC3 frames are
output, for each channel. output, for each channel.
Keyword arguments: Optional arguments:
hrmode : Enable High-Resolution mode, default is `False`. hrmode : Enable High-Resolution mode, default is `False`.
sr_pcm_hz : Output PCM samplerate, enable upsampling of output. output_sample_rate_hz : Output PCM sample_rate_hz, enable upsampling of output.
libpath : LC3 library path and name libpath : LC3 library path and name
""" """
class c_decoder_t(c_void_p): class c_decoder_t(c_void_p):
pass pass
def __init__(self, frame_duration, samplerate, nchannels=1, **kwargs): def __init__(
self,
frame_duration_us: int,
sample_rate_hz: int,
num_channels: int = 1,
hrmode: bool = False,
output_sample_rate_hz: int | None = None,
libpath: str | None = None,
) -> None:
super().__init__(frame_duration, samplerate, nchannels, **kwargs) super().__init__(
frame_duration_us,
sample_rate_hz,
num_channels,
hrmode,
output_sample_rate_hz,
libpath,
)
lib = self.lib lib = self.lib
try: if not all(
lib.lc3_hr_decoder_size \ hasattr(lib, func)
and lib.lc3_hr_setup_decoder for func in ("lc3_hr_decoder_size", "lc3_hr_setup_decoder")
):
if self.hrmode:
raise InitializationError("High-Resolution interface not available")
except AttributeError: lc3_hr_decoder_size = lambda hrmode, dt_us, sr_hz: lib.lc3_decoder_size(
dt_us, sr_hz
)
assert not self.hrmode lc3_hr_setup_decoder = (
lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: lib.lc3_setup_decoder(
lib.lc3_hr_decoder_size = \ dt_us, sr_hz, sr_pcm_hz, mem
lambda hrmode, dt_us, sr_hz: \ )
lib.lc3_decoder_size(dt_us, sr_hz) )
setattr(lib, "lc3_hr_decoder_size", lc3_hr_decoder_size)
lib.lc3_hr_setup_decoder = \ setattr(lib, "lc3_hr_setup_decoder", lc3_hr_setup_decoder)
lambda hrmode, dt_us, sr_hz, sr_pcm_hz, mem: \
lib.lc3_setup_decoder(dt_us, sr_hz, sr_pcm_hz, mem)
lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int] lib.lc3_hr_decoder_size.argtypes = [c_bool, c_int, c_int]
lib.lc3_hr_decoder_size.restype = c_uint lib.lc3_hr_decoder_size.restype = c_uint
lib.lc3_hr_setup_decoder.argtypes = \ lib.lc3_hr_setup_decoder.argtypes = [c_bool, c_int, c_int, c_int, c_void_p]
[c_bool, c_int, c_int, c_int, c_void_p]
lib.lc3_hr_setup_decoder.restype = self.c_decoder_t lib.lc3_hr_setup_decoder.restype = self.c_decoder_t
lib.lc3_decode.argtypes = \ lib.lc3_decode.argtypes = [
[self.c_decoder_t, c_void_p, c_int, c_int, c_void_p, c_int] self.c_decoder_t,
c_void_p,
c_int,
c_int,
c_void_p,
c_int,
]
def new_decoder(): return lib.lc3_hr_setup_decoder( def new_decoder():
self.hrmode, self.dt_us, self.sr_hz, self.sr_pcm_hz, return lib.lc3_hr_setup_decoder(
self.malloc(lib.lc3_hr_decoder_size( self.hrmode,
self.hrmode, self.dt_us, self.sr_pcm_hz))) self.frame_duration_us,
self.sample_rate_hz,
self.pcm_sample_rate_hz,
self.malloc(
lib.lc3_hr_decoder_size(
self.hrmode, self.frame_duration_us, self.pcm_sample_rate_hz
)
),
)
self.__decoders = [new_decoder() for i in range(nchannels)] self.__decoders = [new_decoder() for i in range(num_channels)]
def __del__(self): def __del__(self) -> None:
try: try:
(self.free(decoder) for decoder in self.__decoders) (self.free(decoder) for decoder in self.__decoders)
finally: finally:
return return
def decode(self, data, bitdepth=None): @typing.overload
def decode(
self, data: bytes | bytearray | memoryview, bit_depth: None = None
) -> array.array[float]: ...
@typing.overload
def decode(self, data: bytes | bytearray | memoryview, bit_depth: int) -> bytes: ...
def decode(
self, data: bytes | bytearray | memoryview, bit_depth: int | None = None
) -> bytes | array.array[float]:
""" """
Decode an LC3 frame Decodes an LC3 frame.
The input `data` is the channels concatenation of LC3 frames in a The input `data` is the channels concatenation of LC3 frames in a
byte-like object. Interleaved PCM samples are returned according to byte-like object. Interleaved PCM samples are returned according to
the `bitdepth` indication. the `bit_depth` indication.
When no `bitdepth` is defined, it's a vector of floating point values When no `bit_depth` is defined, it's a vector of floating point values
from -1 to 1, coding the sample levels. When `bitdepth` is defined, from -1 to 1, coding the sample levels. When `bit_depth` is defined,
it returns a byte array, each sample coded on `bitdepth` bits. it returns a byte array, each sample coded on `bit_depth` bits.
The machine endianness, or little endian, is used for 16 or 24 bits The machine endianness, or little endian, is used for 16 or 24 bits
width, respectively. width, respectively.
""" """
nchannels = self.nchannels num_channels = self.num_channels
frame_samples = self.get_frame_samples()
(pcm_fmt, pcm_t) = self._resolve_pcm_format(bitdepth) (pcm_fmt, pcm_t) = self._resolve_pcm_format(bit_depth)
pcm_len = nchannels * self.get_frame_samples() pcm_len = num_channels * self.get_frame_samples()
pcm_buffer = (pcm_t * pcm_len)() pcm_buffer = (pcm_t * pcm_len)()
data_buffer = bytearray(data) data_buffer = bytearray(data)
data_offset = 0 data_offset = 0
for (ich, decoder) in enumerate(self.__decoders): for ich, decoder in enumerate(self.__decoders):
pcm_offset = ich * ctypes.sizeof(pcm_t) pcm_offset = ich * ctypes.sizeof(pcm_t)
pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset) pcm = (pcm_t * (pcm_len - ich)).from_buffer(pcm_buffer, pcm_offset)
data_size = len(data_buffer) // nchannels + \ data_size = len(data_buffer) // num_channels + int(
int(ich < len(data_buffer) % nchannels) ich < len(data_buffer) % num_channels
data = (c_byte * data_size).from_buffer(data_buffer, data_offset) )
buf = (c_byte * data_size).from_buffer(data_buffer, data_offset)
data_offset += data_size data_offset += data_size
ret = self.lib.lc3_decode( ret = self.lib.lc3_decode(
decoder, data, len(data), pcm_fmt, pcm, self.nchannels) decoder, buf, len(buf), pcm_fmt, pcm, self.num_channels
)
if ret < 0: if ret < 0:
raise ValueError("Bad parameters") raise InvalidArgumentError("Bad parameters")
return array.array('f', pcm_buffer) \ return array.array("f", pcm_buffer) if bit_depth is None else bytes(pcm_buffer)
if bitdepth is None else bytes(pcm_buffer)

View File

@@ -0,0 +1,47 @@
import array
import lc3
import pytest
@pytest.mark.parametrize("bit_depth,decoded_length", [(16, 960), (24, 1440)])
def test_decode_with_bit_depth(bit_depth, decoded_length) -> None:
decoder = lc3.Decoder(frame_duration_us=10000, sample_rate_hz=48000)
decoded_frame = decoder.decode(bytes(120), bit_depth=bit_depth)
assert isinstance(decoded_frame, bytes)
assert len(decoded_frame) == decoded_length
def test_decode_without_bit_depth() -> None:
decoder = lc3.Decoder(frame_duration_us=10000, sample_rate_hz=48000)
decoded_frame = decoder.decode(bytes(120))
assert isinstance(decoded_frame, array.array)
assert len(decoded_frame) == 480
assert all(isinstance(e, float) for e in decoded_frame)
def test_decode_with_bad_bit_depth() -> None:
decoder = lc3.Decoder(frame_duration_us=10000, sample_rate_hz=48000)
with pytest.raises(lc3.InvalidArgumentError):
decoder.decode(bytes(120), bit_depth=128)
@pytest.mark.parametrize("bit_depth", [16, 24])
def test_encode_with_bit_depth(bit_depth) -> None:
encoder = lc3.Encoder(frame_duration_us=10000, sample_rate_hz=48000)
encoded_frame = encoder.encode(bytes(1920), num_bytes=120, bit_depth=bit_depth)
assert isinstance(encoded_frame, bytes)
assert len(encoded_frame) == 120
@pytest.mark.parametrize("pcm", [bytes(1920), [0.0] * 1920])
def test_encode_without_bit_depth(pcm) -> None:
encoder = lc3.Encoder(frame_duration_us=10000, sample_rate_hz=48000)
encoded_frame = encoder.encode(pcm, num_bytes=120, bit_depth=None)
assert isinstance(encoded_frame, bytes)
assert len(encoded_frame) == 120
def test_encode_with_bad_bit_depth() -> None:
encoder = lc3.Encoder(frame_duration_us=10000, sample_rate_hz=48000)
with pytest.raises(lc3.InvalidArgumentError):
encoder.encode(bytes(1920), num_bytes=120, bit_depth=128)

View File

@@ -53,7 +53,7 @@ if header[0] != 0xcc1c:
samplerate = header[2] * 100 samplerate = header[2] * 100
nchannels = header[4] nchannels = header[4]
frame_duration = header[5] / 100 frame_duration = header[5] * 10
stream_length = header[7] stream_length = header[7]
# --- Setup output --- # --- Setup output ---
@@ -80,7 +80,7 @@ encoded_length = stream_length + dec.get_delay_samples()
for i in range(0, encoded_length, frame_length): for i in range(0, encoded_length, frame_length):
lc3_frame_size = struct.unpack('=H', f_lc3.read(2))[0] lc3_frame_size = struct.unpack('=H', f_lc3.read(2))[0]
pcm = dec.decode(f_lc3.read(lc3_frame_size), bitdepth=bitdepth) pcm = dec.decode(f_lc3.read(lc3_frame_size), bit_depth=bitdepth)
pcm = pcm[max(encoded_length - stream_length - i, 0) * pcm_size: pcm = pcm[max(encoded_length - stream_length - i, 0) * pcm_size:
min(encoded_length - i, frame_length) * pcm_size] min(encoded_length - i, frame_length) * pcm_size]

View File

@@ -57,7 +57,7 @@ stream_length = wavfile.getnframes()
# --- Setup encoder --- # --- Setup encoder ---
enc = lc3.Encoder( enc = lc3.Encoder(
args.frame_duration, samplerate, nchannels, libpath=args.libpath) int(args.frame_duration)*1000, samplerate, nchannels, libpath=args.libpath)
frame_size = enc.get_frame_bytes(args.bitrate) frame_size = enc.get_frame_bytes(args.bitrate)
frame_length = enc.get_frame_samples() frame_length = enc.get_frame_samples()
bitrate = enc.resolve_bitrate(frame_size) bitrate = enc.resolve_bitrate(frame_size)
@@ -77,7 +77,7 @@ for i in range(0, stream_length, frame_length):
f_lc3.write(struct.pack('=H', frame_size)) f_lc3.write(struct.pack('=H', frame_size))
pcm = wavfile.readframes(frame_length) pcm = wavfile.readframes(frame_length)
f_lc3.write(enc.encode(pcm, frame_size, bitdepth=bitdepth)) f_lc3.write(enc.encode(pcm, frame_size, bit_depth=bitdepth))
# --- Cleanup --- # --- Cleanup ---