Typing A2DP

This commit is contained in:
Josh Wu
2023-11-17 17:29:35 +08:00
parent 0667e83919
commit e1fdb12647

View File

@@ -15,9 +15,13 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import dataclasses
import struct import struct
import logging import logging
from collections import namedtuple from collections.abc import AsyncGenerator
from typing import List, Callable, Awaitable
from .company_ids import COMPANY_IDENTIFIERS from .company_ids import COMPANY_IDENTIFIERS
from .sdp import ( from .sdp import (
@@ -239,24 +243,20 @@ def make_audio_sink_service_sdp_records(service_record_handle, version=(1, 3)):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class SbcMediaCodecInformation( @dataclasses.dataclass
namedtuple( class SbcMediaCodecInformation:
'SbcMediaCodecInformation',
[
'sampling_frequency',
'channel_mode',
'block_length',
'subbands',
'allocation_method',
'minimum_bitpool_value',
'maximum_bitpool_value',
],
)
):
''' '''
A2DP spec - 4.3.2 Codec Specific Information Elements A2DP spec - 4.3.2 Codec Specific Information Elements
''' '''
sampling_frequency: int
channel_mode: int
block_length: int
subbands: int
allocation_method: int
minimum_bitpool_value: int
maximum_bitpool_value: int
SAMPLING_FREQUENCY_BITS = {16000: 1 << 3, 32000: 1 << 2, 44100: 1 << 1, 48000: 1} SAMPLING_FREQUENCY_BITS = {16000: 1 << 3, 32000: 1 << 2, 44100: 1 << 1, 48000: 1}
CHANNEL_MODE_BITS = { CHANNEL_MODE_BITS = {
SBC_MONO_CHANNEL_MODE: 1 << 3, SBC_MONO_CHANNEL_MODE: 1 << 3,
@@ -272,7 +272,7 @@ class SbcMediaCodecInformation(
} }
@staticmethod @staticmethod
def from_bytes(data: bytes) -> 'SbcMediaCodecInformation': def from_bytes(data: bytes) -> SbcMediaCodecInformation:
sampling_frequency = (data[0] >> 4) & 0x0F sampling_frequency = (data[0] >> 4) & 0x0F
channel_mode = (data[0] >> 0) & 0x0F channel_mode = (data[0] >> 0) & 0x0F
block_length = (data[1] >> 4) & 0x0F block_length = (data[1] >> 4) & 0x0F
@@ -293,14 +293,14 @@ class SbcMediaCodecInformation(
@classmethod @classmethod
def from_discrete_values( def from_discrete_values(
cls, cls,
sampling_frequency, sampling_frequency: int,
channel_mode, channel_mode: int,
block_length, block_length: int,
subbands, subbands: int,
allocation_method, allocation_method: int,
minimum_bitpool_value, minimum_bitpool_value: int,
maximum_bitpool_value, maximum_bitpool_value: int,
): ) -> SbcMediaCodecInformation:
return SbcMediaCodecInformation( return SbcMediaCodecInformation(
sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency], sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency],
channel_mode=cls.CHANNEL_MODE_BITS[channel_mode], channel_mode=cls.CHANNEL_MODE_BITS[channel_mode],
@@ -314,14 +314,14 @@ class SbcMediaCodecInformation(
@classmethod @classmethod
def from_lists( def from_lists(
cls, cls,
sampling_frequencies, sampling_frequencies: List[int],
channel_modes, channel_modes: List[int],
block_lengths, block_lengths: List[int],
subbands, subbands: List[int],
allocation_methods, allocation_methods: List[int],
minimum_bitpool_value, minimum_bitpool_value: int,
maximum_bitpool_value, maximum_bitpool_value: int,
): ) -> SbcMediaCodecInformation:
return SbcMediaCodecInformation( return SbcMediaCodecInformation(
sampling_frequency=sum( sampling_frequency=sum(
cls.SAMPLING_FREQUENCY_BITS[x] for x in sampling_frequencies cls.SAMPLING_FREQUENCY_BITS[x] for x in sampling_frequencies
@@ -348,7 +348,7 @@ class SbcMediaCodecInformation(
] ]
) )
def __str__(self): def __str__(self) -> str:
channel_modes = ['MONO', 'DUAL_CHANNEL', 'STEREO', 'JOINT_STEREO'] channel_modes = ['MONO', 'DUAL_CHANNEL', 'STEREO', 'JOINT_STEREO']
allocation_methods = ['SNR', 'Loudness'] allocation_methods = ['SNR', 'Loudness']
return '\n'.join( return '\n'.join(
@@ -367,16 +367,19 @@ class SbcMediaCodecInformation(
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class AacMediaCodecInformation( @dataclasses.dataclass
namedtuple( class AacMediaCodecInformation:
'AacMediaCodecInformation',
['object_type', 'sampling_frequency', 'channels', 'rfa', 'vbr', 'bitrate'],
)
):
''' '''
A2DP spec - 4.5.2 Codec Specific Information Elements A2DP spec - 4.5.2 Codec Specific Information Elements
''' '''
object_type: int
sampling_frequency: int
channels: int
rfa: int
vbr: int
bitrate: int
OBJECT_TYPE_BITS = { OBJECT_TYPE_BITS = {
MPEG_2_AAC_LC_OBJECT_TYPE: 1 << 7, MPEG_2_AAC_LC_OBJECT_TYPE: 1 << 7,
MPEG_4_AAC_LC_OBJECT_TYPE: 1 << 6, MPEG_4_AAC_LC_OBJECT_TYPE: 1 << 6,
@@ -400,7 +403,7 @@ class AacMediaCodecInformation(
CHANNELS_BITS = {1: 1 << 1, 2: 1} CHANNELS_BITS = {1: 1 << 1, 2: 1}
@staticmethod @staticmethod
def from_bytes(data: bytes) -> 'AacMediaCodecInformation': def from_bytes(data: bytes) -> AacMediaCodecInformation:
object_type = data[0] object_type = data[0]
sampling_frequency = (data[1] << 4) | ((data[2] >> 4) & 0x0F) sampling_frequency = (data[1] << 4) | ((data[2] >> 4) & 0x0F)
channels = (data[2] >> 2) & 0x03 channels = (data[2] >> 2) & 0x03
@@ -413,8 +416,13 @@ class AacMediaCodecInformation(
@classmethod @classmethod
def from_discrete_values( def from_discrete_values(
cls, object_type, sampling_frequency, channels, vbr, bitrate cls,
): object_type: int,
sampling_frequency: int,
channels: int,
vbr: int,
bitrate: int,
) -> AacMediaCodecInformation:
return AacMediaCodecInformation( return AacMediaCodecInformation(
object_type=cls.OBJECT_TYPE_BITS[object_type], object_type=cls.OBJECT_TYPE_BITS[object_type],
sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency], sampling_frequency=cls.SAMPLING_FREQUENCY_BITS[sampling_frequency],
@@ -425,7 +433,14 @@ class AacMediaCodecInformation(
) )
@classmethod @classmethod
def from_lists(cls, object_types, sampling_frequencies, channels, vbr, bitrate): def from_lists(
cls,
object_types: List[int],
sampling_frequencies: List[int],
channels: List[int],
vbr: int,
bitrate: int,
) -> AacMediaCodecInformation:
return AacMediaCodecInformation( return AacMediaCodecInformation(
object_type=sum(cls.OBJECT_TYPE_BITS[x] for x in object_types), object_type=sum(cls.OBJECT_TYPE_BITS[x] for x in object_types),
sampling_frequency=sum( sampling_frequency=sum(
@@ -449,7 +464,7 @@ class AacMediaCodecInformation(
] ]
) )
def __str__(self): def __str__(self) -> str:
object_types = [ object_types = [
'MPEG_2_AAC_LC', 'MPEG_2_AAC_LC',
'MPEG_4_AAC_LC', 'MPEG_4_AAC_LC',
@@ -474,26 +489,26 @@ class AacMediaCodecInformation(
) )
@dataclasses.dataclass
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class VendorSpecificMediaCodecInformation: class VendorSpecificMediaCodecInformation:
''' '''
A2DP spec - 4.7.2 Codec Specific Information Elements A2DP spec - 4.7.2 Codec Specific Information Elements
''' '''
vendor_id: int
codec_id: int
value: bytes
@staticmethod @staticmethod
def from_bytes(data): def from_bytes(data: bytes) -> VendorSpecificMediaCodecInformation:
(vendor_id, codec_id) = struct.unpack_from('<IH', data, 0) (vendor_id, codec_id) = struct.unpack_from('<IH', data, 0)
return VendorSpecificMediaCodecInformation(vendor_id, codec_id, data[6:]) return VendorSpecificMediaCodecInformation(vendor_id, codec_id, data[6:])
def __init__(self, vendor_id, codec_id, value): def __bytes__(self) -> bytes:
self.vendor_id = vendor_id
self.codec_id = codec_id
self.value = value
def __bytes__(self):
return struct.pack('<IH', self.vendor_id, self.codec_id, self.value) return struct.pack('<IH', self.vendor_id, self.codec_id, self.value)
def __str__(self): def __str__(self) -> str:
# pylint: disable=line-too-long # pylint: disable=line-too-long
return '\n'.join( return '\n'.join(
[ [
@@ -506,29 +521,27 @@ class VendorSpecificMediaCodecInformation:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@dataclasses.dataclass
class SbcFrame: class SbcFrame:
def __init__( sampling_frequency: int
self, sampling_frequency, block_count, channel_mode, subband_count, payload block_count: int
): channel_mode: int
self.sampling_frequency = sampling_frequency subband_count: int
self.block_count = block_count payload: bytes
self.channel_mode = channel_mode
self.subband_count = subband_count
self.payload = payload
@property @property
def sample_count(self): def sample_count(self) -> int:
return self.subband_count * self.block_count return self.subband_count * self.block_count
@property @property
def bitrate(self): def bitrate(self) -> int:
return 8 * ((len(self.payload) * self.sampling_frequency) // self.sample_count) return 8 * ((len(self.payload) * self.sampling_frequency) // self.sample_count)
@property @property
def duration(self): def duration(self) -> float:
return self.sample_count / self.sampling_frequency return self.sample_count / self.sampling_frequency
def __str__(self): def __str__(self) -> str:
return ( return (
f'SBC(sf={self.sampling_frequency},' f'SBC(sf={self.sampling_frequency},'
f'cm={self.channel_mode},' f'cm={self.channel_mode},'
@@ -540,12 +553,12 @@ class SbcFrame:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class SbcParser: class SbcParser:
def __init__(self, read): def __init__(self, read: Callable[[int], Awaitable[bytes]]) -> None:
self.read = read self.read = read
@property @property
def frames(self): def frames(self) -> AsyncGenerator[SbcFrame, None]:
async def generate_frames(): async def generate_frames() -> AsyncGenerator[SbcFrame, None]:
while True: while True:
# Read 4 bytes of header # Read 4 bytes of header
header = await self.read(4) header = await self.read(4)
@@ -589,7 +602,9 @@ class SbcParser:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class SbcPacketSource: class SbcPacketSource:
def __init__(self, read, mtu, codec_capabilities): def __init__(
self, read: Callable[[int], Awaitable[bytes]], mtu: int, codec_capabilities
) -> None:
self.read = read self.read = read
self.mtu = mtu self.mtu = mtu
self.codec_capabilities = codec_capabilities self.codec_capabilities = codec_capabilities