diff --git a/bumble/profiles/bap.py b/bumble/profiles/bap.py new file mode 100644 index 00000000..76015d52 --- /dev/null +++ b/bumble/profiles/bap.py @@ -0,0 +1,496 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations + +from collections.abc import Sequence +import dataclasses +import enum +import struct +import functools +from typing import Optional, List, Union + +from bumble import hci +from bumble import gatt +from bumble import gatt_client + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- + + +class AudioLocation(enum.IntFlag): + '''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location''' + + # fmt: off + NOT_ALLOWED = 0x00000000 + FRONT_LEFT = 0x00000001 + FRONT_RIGHT = 0x00000002 + FRONT_CENTER = 0x00000004 + LOW_FREQUENCY_EFFECTS_1 = 0x00000008 + BACK_LEFT = 0x00000010 + BACK_RIGHT = 0x00000020 + FRONT_LEFT_OF_CENTER = 0x00000040 + FRONT_RIGHT_OF_CENTER = 0x00000080 + BACK_CENTER = 0x00000100 + LOW_FREQUENCY_EFFECTS_2 = 0x00000200 + SIDE_LEFT = 0x00000400 + SIDE_RIGHT = 0x00000800 + TOP_FRONT_LEFT = 0x00001000 + TOP_FRONT_RIGHT = 0x00002000 + TOP_FRONT_CENTER = 0x00004000 + TOP_CENTER = 0x00008000 + TOP_BACK_LEFT = 0x00010000 + TOP_BACK_RIGHT = 0x00020000 + TOP_SIDE_LEFT = 0x00040000 + TOP_SIDE_RIGHT = 0x00080000 + TOP_BACK_CENTER = 0x00100000 + BOTTOM_FRONT_CENTER = 0x00200000 + BOTTOM_FRONT_LEFT = 0x00400000 + BOTTOM_FRONT_RIGHT = 0x00800000 + FRONT_LEFT_WIDE = 0x01000000 + FRONT_RIGHT_WIDE = 0x02000000 + LEFT_SURROUND = 0x04000000 + RIGHT_SURROUND = 0x08000000 + + +class AudioInputType(enum.IntEnum): + '''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type''' + + # fmt: off + UNSPECIFIED = 0x00 + BLUETOOTH = 0x01 + MICROPHONE = 0x02 + ANALOG = 0x03 + DIGITAL = 0x04 + RADIO = 0x05 + STREAMING = 0x06 + AMBIENT = 0x07 + + +class ContextType(enum.IntFlag): + '''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type''' + + # fmt: off + PROHIBITED = 0x0000 + CONVERSATIONAL = 0x0002 + MEDIA = 0x0004 + GAME = 0x0008 + INSTRUCTIONAL = 0x0010 + VOICE_ASSISTANTS = 0x0020 + LIVE = 0x0040 + SOUND_EFFECTS = 0x0080 + NOTIFICATIONS = 0x0100 + RINGTONE = 0x0200 + ALERTS = 0x0400 + EMERGENCY_ALARM = 0x0800 + + +class SamplingFrequency(enum.IntEnum): + '''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency''' + + # fmt: off + FREQ_8000 = 0x01 + FREQ_11025 = 0x02 + FREQ_16000 = 0x03 + FREQ_22050 = 0x04 + FREQ_24000 = 0x05 + FREQ_32000 = 0x06 + FREQ_44100 = 0x07 + FREQ_48000 = 0x08 + FREQ_88200 = 0x09 + FREQ_96000 = 0x0A + FREQ_176400 = 0x0B + FREQ_192000 = 0x0C + FREQ_384000 = 0x0D + # fmt: on + + @classmethod + def from_hz(cls, frequency: int) -> SamplingFrequency: + return { + 8000: SamplingFrequency.FREQ_8000, + 11025: SamplingFrequency.FREQ_11025, + 16000: SamplingFrequency.FREQ_16000, + 22050: SamplingFrequency.FREQ_22050, + 24000: SamplingFrequency.FREQ_24000, + 32000: SamplingFrequency.FREQ_32000, + 44100: SamplingFrequency.FREQ_44100, + 48000: SamplingFrequency.FREQ_48000, + 88200: SamplingFrequency.FREQ_88200, + 96000: SamplingFrequency.FREQ_96000, + 176400: SamplingFrequency.FREQ_176400, + 192000: SamplingFrequency.FREQ_192000, + 384000: SamplingFrequency.FREQ_384000, + }[frequency] + + @property + def hz(self) -> int: + return { + SamplingFrequency.FREQ_8000: 8000, + SamplingFrequency.FREQ_11025: 11025, + SamplingFrequency.FREQ_16000: 16000, + SamplingFrequency.FREQ_22050: 22050, + SamplingFrequency.FREQ_24000: 24000, + SamplingFrequency.FREQ_32000: 32000, + SamplingFrequency.FREQ_44100: 44100, + SamplingFrequency.FREQ_48000: 48000, + SamplingFrequency.FREQ_88200: 88200, + SamplingFrequency.FREQ_96000: 96000, + SamplingFrequency.FREQ_176400: 176400, + SamplingFrequency.FREQ_192000: 192000, + SamplingFrequency.FREQ_384000: 384000, + }[self] + + +class SupportedSamplingFrequency(enum.IntFlag): + '''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency''' + + # fmt: off + FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1) + FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1) + FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1) + FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1) + FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1) + FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1) + FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1) + FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1) + FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1) + FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1) + FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1) + FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1) + FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1) + # fmt: on + + @classmethod + def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency: + MAPPING = { + 8000: SupportedSamplingFrequency.FREQ_8000, + 11025: SupportedSamplingFrequency.FREQ_11025, + 16000: SupportedSamplingFrequency.FREQ_16000, + 22050: SupportedSamplingFrequency.FREQ_22050, + 24000: SupportedSamplingFrequency.FREQ_24000, + 32000: SupportedSamplingFrequency.FREQ_32000, + 44100: SupportedSamplingFrequency.FREQ_44100, + 48000: SupportedSamplingFrequency.FREQ_48000, + 88200: SupportedSamplingFrequency.FREQ_88200, + 96000: SupportedSamplingFrequency.FREQ_96000, + 176400: SupportedSamplingFrequency.FREQ_176400, + 192000: SupportedSamplingFrequency.FREQ_192000, + 384000: SupportedSamplingFrequency.FREQ_384000, + } + + return functools.reduce( + lambda x, y: x | MAPPING[y], + frequencies, + cls(0), + ) + + +class FrameDuration(enum.IntEnum): + '''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration''' + + # fmt: off + DURATION_7500_US = 0x00 + DURATION_10000_US = 0x01 + + +class SupportedFrameDuration(enum.IntFlag): + '''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration''' + + # fmt: off + DURATION_7500_US_SUPPORTED = 0b0001 + DURATION_10000_US_SUPPORTED = 0b0010 + DURATION_7500_US_PREFERRED = 0b0001 + DURATION_10000_US_PREFERRED = 0b0010 + + +# ----------------------------------------------------------------------------- +# Utils +# ----------------------------------------------------------------------------- + + +def bits_to_channel_counts(data: int) -> List[int]: + pos = 0 + counts = [] + while data != 0: + # Bit 0 = count 1 + # Bit 1 = count 2, and so on + pos += 1 + if data & 1: + counts.append(pos) + data >>= 1 + return counts + + +def channel_counts_to_bits(counts: Sequence[int]) -> int: + return sum(set([1 << (count - 1) for count in counts])) + + +# ----------------------------------------------------------------------------- +# Structures +# ----------------------------------------------------------------------------- + + +@dataclasses.dataclass +class CodecSpecificCapabilities: + '''See: + * Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures + * Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements + ''' + + class Type(enum.IntEnum): + # fmt: off + SAMPLING_FREQUENCY = 0x01 + FRAME_DURATION = 0x02 + AUDIO_CHANNEL_COUNT = 0x03 + OCTETS_PER_FRAME = 0x04 + CODEC_FRAMES_PER_SDU = 0x05 + + supported_sampling_frequencies: SupportedSamplingFrequency + supported_frame_durations: SupportedFrameDuration + supported_audio_channel_counts: Sequence[int] + min_octets_per_codec_frame: int + max_octets_per_codec_frame: int + supported_max_codec_frames_per_sdu: int + + @classmethod + def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities: + offset = 0 + # Allowed default values. + supported_audio_channel_counts = [1] + supported_max_codec_frames_per_sdu = 1 + while offset < len(data): + length, type = struct.unpack_from('BB', data, offset) + offset += 2 + value = int.from_bytes(data[offset : offset + length - 1], 'little') + offset += length - 1 + + if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY: + supported_sampling_frequencies = SupportedSamplingFrequency(value) + elif type == CodecSpecificCapabilities.Type.FRAME_DURATION: + supported_frame_durations = SupportedFrameDuration(value) + elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT: + supported_audio_channel_counts = bits_to_channel_counts(value) + elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME: + min_octets_per_sample = value & 0xFFFF + max_octets_per_sample = value >> 16 + elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU: + supported_max_codec_frames_per_sdu = value + + # It is expected here that if some fields are missing, an error should be raised. + return CodecSpecificCapabilities( + supported_sampling_frequencies=supported_sampling_frequencies, + supported_frame_durations=supported_frame_durations, + supported_audio_channel_counts=supported_audio_channel_counts, + min_octets_per_codec_frame=min_octets_per_sample, + max_octets_per_codec_frame=max_octets_per_sample, + supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu, + ) + + def __bytes__(self) -> bytes: + return struct.pack( + ' PacRecord: + offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0) + codec_specific_capabilities_size = data[offset] + + offset += 1 + codec_specific_capabilities_bytes = data[ + offset : offset + codec_specific_capabilities_size + ] + offset += codec_specific_capabilities_size + metadata_size = data[offset] + metadata = data[offset : offset + metadata_size] + + codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] + if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC: + codec_specific_capabilities = codec_specific_capabilities_bytes + else: + codec_specific_capabilities = CodecSpecificCapabilities.from_bytes( + codec_specific_capabilities_bytes + ) + + return PacRecord( + coding_format=coding_format, + codec_specific_capabilities=codec_specific_capabilities, + metadata=metadata, + ) + + def __bytes__(self) -> bytes: + capabilities_bytes = bytes(self.codec_specific_capabilities) + return ( + bytes(self.coding_format) + + bytes([len(capabilities_bytes)]) + + capabilities_bytes + + bytes([len(self.metadata)]) + + self.metadata + ) + + +# ----------------------------------------------------------------------------- +# Server +# ----------------------------------------------------------------------------- +class PublishedAudioCapabilitiesService(gatt.TemplateService): + UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE + + sink_pac: Optional[gatt.Characteristic] + sink_audio_locations: Optional[gatt.Characteristic] + source_pac: Optional[gatt.Characteristic] + source_audio_locations: Optional[gatt.Characteristic] + available_audio_contexts: gatt.Characteristic + supported_audio_contexts: gatt.Characteristic + + def __init__( + self, + supported_source_context: ContextType, + supported_sink_context: ContextType, + available_source_context: ContextType, + available_sink_context: ContextType, + sink_pac: Sequence[PacRecord] = [], + sink_audio_locations: Optional[AudioLocation] = None, + source_pac: Sequence[PacRecord] = [], + source_audio_locations: Optional[AudioLocation] = None, + ) -> None: + characteristics = [] + + self.supported_audio_contexts = gatt.Characteristic( + uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC, + properties=gatt.Characteristic.Properties.READ, + permissions=gatt.Characteristic.Permissions.READABLE, + value=struct.pack(' None: + if len(sys.argv) < 3: + print('Usage: run_cig_setup.py ' '') + return + + print('<<< connecting to HCI...') + async with await open_transport_or_link(sys.argv[2]) as hci_transport: + print('<<< connected') + + device = Device.from_config_file_with_hci( + sys.argv[1], hci_transport.source, hci_transport.sink + ) + device.cis_enabled = True + + await device.power_on() + + device.add_service( + PublishedAudioCapabilitiesService( + supported_source_context=ContextType.PROHIBITED, + available_source_context=ContextType.PROHIBITED, + supported_sink_context=ContextType.MEDIA, + available_sink_context=ContextType.MEDIA, + sink_audio_locations=( + AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT + ), + sink_pac=[ + # Codec Capability Setting 16_2 + PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=CodecSpecificCapabilities( + supported_sampling_frequencies=( + SupportedSamplingFrequency.FREQ_16000 + ), + supported_frame_durations=( + SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_counts=[1], + min_octets_per_codec_frame=40, + max_octets_per_codec_frame=40, + supported_max_codec_frames_per_sdu=1, + ), + ), + # Codec Capability Setting 24_2 + PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=CodecSpecificCapabilities( + supported_sampling_frequencies=( + SupportedSamplingFrequency.FREQ_24000 + ), + supported_frame_durations=( + SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_counts=[1], + min_octets_per_codec_frame=60, + max_octets_per_codec_frame=60, + supported_max_codec_frames_per_sdu=1, + ), + ), + ], + ) + ) + + advertising_data = bytes( + AdvertisingData( + [ + ( + AdvertisingData.COMPLETE_LOCAL_NAME, + bytes('Bumble LE Audio', 'utf-8'), + ), + ( + AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS, + bytes(PublishedAudioCapabilitiesService.UUID), + ), + ] + ) + ) + + await device.start_extended_advertising( + advertising_properties=( + HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING + ), + own_address_type=OwnAddressType.RANDOM, + advertising_data=advertising_data, + ) + + await hci_transport.source.terminated + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper()) +asyncio.run(main()) diff --git a/tests/bap_test.py b/tests/bap_test.py new file mode 100644 index 00000000..01fc568e --- /dev/null +++ b/tests/bap_test.py @@ -0,0 +1,151 @@ +# Copyright 2021-2023 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import os +import pytest +import logging + +from bumble import device +from bumble.hci import CodecID, CodingFormat +from bumble.profiles.bap import ( + AudioLocation, + SupportedFrameDuration, + SupportedSamplingFrequency, + CodecSpecificCapabilities, + ContextType, + PacRecord, + PublishedAudioCapabilitiesService, + PublishedAudioCapabilitiesServiceProxy, +) +from .test_utils import TwoDevices + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +def test_codec_specific_capabilities() -> None: + SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000 + FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED + AUDIO_CHANNEL_COUNTS = [1] + cap = CodecSpecificCapabilities( + supported_sampling_frequencies=SAMPLE_FREQUENCY, + supported_frame_durations=FRAME_SURATION, + supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS, + min_octets_per_codec_frame=40, + max_octets_per_codec_frame=40, + supported_max_codec_frames_per_sdu=1, + ) + assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap + + +# ----------------------------------------------------------------------------- +def test_pac_record() -> None: + SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000 + FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED + AUDIO_CHANNEL_COUNTS = [1] + cap = CodecSpecificCapabilities( + supported_sampling_frequencies=SAMPLE_FREQUENCY, + supported_frame_durations=FRAME_SURATION, + supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS, + min_octets_per_codec_frame=40, + max_octets_per_codec_frame=40, + supported_max_codec_frames_per_sdu=1, + ) + + pac_record = PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=cap, + metadata=b'', + ) + assert PacRecord.from_bytes(bytes(pac_record)) == pac_record + + +# ----------------------------------------------------------------------------- +def test_vendor_specific_pac_record() -> None: + # Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata. + RAW_DATA = bytes.fromhex('ffe000ffff0000') + assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA + + +# ----------------------------------------------------------------------------- +@pytest.mark.asyncio +async def test_pacs(): + devices = TwoDevices() + devices[0].add_service( + PublishedAudioCapabilitiesService( + supported_sink_context=ContextType.MEDIA, + available_sink_context=ContextType.MEDIA, + supported_source_context=0, + available_source_context=0, + sink_pac=[ + # Codec Capability Setting 16_2 + PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=CodecSpecificCapabilities( + supported_sampling_frequencies=( + SupportedSamplingFrequency.FREQ_16000 + ), + supported_frame_durations=( + SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_counts=[1], + min_octets_per_codec_frame=40, + max_octets_per_codec_frame=40, + supported_max_codec_frames_per_sdu=1, + ), + ), + # Codec Capability Setting 24_2 + PacRecord( + coding_format=CodingFormat(CodecID.LC3), + codec_specific_capabilities=CodecSpecificCapabilities( + supported_sampling_frequencies=( + SupportedSamplingFrequency.FREQ_24000 + ), + supported_frame_durations=( + SupportedFrameDuration.DURATION_10000_US_SUPPORTED + ), + supported_audio_channel_counts=[1], + min_octets_per_codec_frame=60, + max_octets_per_codec_frame=60, + supported_max_codec_frames_per_sdu=1, + ), + ), + ], + sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT, + ) + ) + + await devices.setup_connection() + peer = device.Peer(devices.connections[1]) + pacs_client = await peer.discover_service_and_create_proxy( + PublishedAudioCapabilitiesServiceProxy + ) + + +# ----------------------------------------------------------------------------- +async def run(): + await test_pacs() + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(run())