Merge pull request #585 from wpiet/vocs

Add `Volume Offset Control Service`
This commit is contained in:
Gilles Boccon-Gibod
2024-11-29 10:41:30 -08:00
committed by GitHub
2 changed files with 514 additions and 0 deletions

330
bumble/profiles/vocs.py Normal file
View File

@@ -0,0 +1,330 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from dataclasses import dataclass
from typing import Optional
from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Characteristic,
DelegatedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
UTF8CharacteristicAdapter,
InvalidServiceError,
GATT_VOLUME_OFFSET_CONTROL_SERVICE,
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
GATT_AUDIO_LOCATION_CHARACTERISTIC,
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
from bumble.profiles.bap import AudioLocation
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
MIN_VOLUME_OFFSET = -255
MAX_VOLUME_OFFSET = 255
CHANGE_COUNTER_MAX_VALUE = 0xFF
class SetVolumeOffsetOpCode(OpenIntEnum):
SET_VOLUME_OFFSET = 0x01
class ErrorCode(OpenIntEnum):
"""
See Volume Offset Control Service 1.6. Application error codes.
"""
INVALID_CHANGE_COUNTER = 0x80
OPCODE_NOT_SUPPORTED = 0x81
VALUE_OUT_OF_RANGE = 0x82
# -----------------------------------------------------------------------------
@dataclass
class VolumeOffsetState:
volume_offset: int = 0
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
def __bytes__(self) -> bytes:
return struct.pack('<hB', self.volume_offset, self.change_counter)
@classmethod
def from_bytes(cls, data: bytes):
volume_offset, change_counter = struct.unpack('<hB', data)
return cls(volume_offset, change_counter)
def increment_change_counter(self) -> None:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=bytes(self)
)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@dataclass
class VocsAudioLocation:
audio_location: AudioLocation = AudioLocation.NOT_ALLOWED
attribute_value: Optional[CharacteristicValue] = None
def __bytes__(self) -> bytes:
return struct.pack('<I', self.audio_location)
@classmethod
def from_bytes(cls, data: bytes):
audio_location = AudioLocation(struct.unpack('<I', data)[0])
return cls(audio_location)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
@dataclass
class VolumeOffsetControlPoint:
volume_offset_state: VolumeOffsetState
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
opcode = value[0]
if opcode != SetVolumeOffsetOpCode.SET_VOLUME_OFFSET:
raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
change_counter, volume_offset = struct.unpack('<Bh', value[1:])
await self._set_volume_offset(connection, change_counter, volume_offset)
async def _set_volume_offset(
self,
connection: Connection,
change_counter_operand: int,
volume_offset_operand: int,
) -> None:
change_counter = self.volume_offset_state.change_counter
if change_counter != change_counter_operand:
raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
if not MIN_VOLUME_OFFSET <= volume_offset_operand <= MAX_VOLUME_OFFSET:
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
self.volume_offset_state.volume_offset = volume_offset_operand
self.volume_offset_state.increment_change_counter()
await self.volume_offset_state.notify_subscribers_via_connection(connection)
@dataclass
class AudioOutputDescription:
audio_output_description: str = ''
attribute_value: Optional[CharacteristicValue] = None
@classmethod
def from_bytes(cls, data: bytes):
return cls(audio_output_description=data.decode('utf-8'))
def __bytes__(self) -> bytes:
return self.audio_output_description.encode('utf-8')
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
# -----------------------------------------------------------------------------
class VolumeOffsetControlService(TemplateService):
UUID = GATT_VOLUME_OFFSET_CONTROL_SERVICE
def __init__(
self,
volume_offset_state: Optional[VolumeOffsetState] = None,
audio_location: Optional[VocsAudioLocation] = None,
audio_output_description: Optional[AudioOutputDescription] = None,
) -> None:
self.volume_offset_state = (
VolumeOffsetState() if volume_offset_state is None else volume_offset_state
)
self.audio_location = (
VocsAudioLocation() if audio_location is None else audio_location
)
self.audio_output_description = (
AudioOutputDescription()
if audio_output_description is None
else audio_output_description
)
self.volume_offset_control_point: VolumeOffsetControlPoint = (
VolumeOffsetControlPoint(self.volume_offset_state)
)
self.volume_offset_state_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
),
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
),
encode=lambda value: bytes(value),
)
self.audio_location_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_location.on_read,
write=self.audio_location.on_write,
),
),
encode=lambda value: bytes(value),
decode=VocsAudioLocation.from_bytes,
)
self.audio_location.attribute_value = self.audio_location_characteristic.value
self.volume_offset_control_point_characteristic = Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
)
self.audio_output_description_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
)
)
self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value
)
super().__init__(
characteristics=[
self.volume_offset_state_characteristic, # type: ignore
self.audio_location_characteristic, # type: ignore
self.volume_offset_control_point_characteristic, # type: ignore
self.audio_output_description_characteristic, # type: ignore
],
primary=False,
)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = VolumeOffsetControlService
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
)
):
raise InvalidServiceError("Volume Offset State characteristic not found")
self.volume_offset_state = DelegatedCharacteristicAdapter(
characteristics[0], decode=VolumeOffsetState.from_bytes
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC
)
):
raise InvalidServiceError("Audio Location characteristic not found")
self.audio_location = DelegatedCharacteristicAdapter(
characteristics[0],
encode=lambda value: bytes(value),
decode=VocsAudioLocation.from_bytes,
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC
)
):
raise InvalidServiceError(
"Volume Offset Control Point characteristic not found"
)
self.volume_offset_control_point = characteristics[0]
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC
)
):
raise InvalidServiceError(
"Audio Output Description characteristic not found"
)
self.audio_output_description = UTF8CharacteristicAdapter(characteristics[0])

184
tests/vocs_test.py Normal file
View File

@@ -0,0 +1,184 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
import pytest_asyncio
import struct
from bumble import device
from bumble.att import ATT_Error
from bumble.profiles.vocs import (
VolumeOffsetControlService,
ErrorCode,
MIN_VOLUME_OFFSET,
MAX_VOLUME_OFFSET,
SetVolumeOffsetOpCode,
VolumeOffsetControlServiceProxy,
VolumeOffsetState,
VocsAudioLocation,
)
from bumble.profiles.vcp import VolumeControlService, VolumeControlServiceProxy
from bumble.profiles.bap import AudioLocation
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
vocs_service = VolumeOffsetControlService()
vcp_service = VolumeControlService(included_services=[vocs_service])
@pytest_asyncio.fixture
async def vocs_client():
devices = TwoDevices()
devices[0].add_service(vcp_service)
await devices.setup_connection()
assert devices.connections[0]
assert devices.connections[1]
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
peer = device.Peer(devices.connections[1])
vcp_client = await peer.discover_service_and_create_proxy(VolumeControlServiceProxy)
assert vcp_client
included_services = await peer.discover_included_services(vcp_client.service_proxy)
assert included_services
vocs_service_discovered = included_services[0]
await peer.discover_characteristics(service=vocs_service_discovered)
vocs_client = VolumeOffsetControlServiceProxy(vocs_service_discovered)
yield vocs_client
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_service(vocs_client: VolumeOffsetControlServiceProxy):
assert await vocs_client.volume_offset_state.read_value() == VolumeOffsetState(
volume_offset=0,
change_counter=0,
)
assert await vocs_client.audio_location.read_value() == VocsAudioLocation(
audio_location=AudioLocation.NOT_ALLOWED
)
description = await vocs_client.audio_output_description.read_value()
assert description == ''
@pytest.mark.asyncio
async def test_wrong_opcode_raise_error(vocs_client: VolumeOffsetControlServiceProxy):
with pytest.raises(ATT_Error) as e:
await vocs_client.volume_offset_control_point.write_value(
bytes(
[
0xFF,
]
),
with_response=True,
)
assert e.value.error_code == ErrorCode.OPCODE_NOT_SUPPORTED
@pytest.mark.asyncio
async def test_wrong_change_counter_raise_error(
vocs_client: VolumeOffsetControlServiceProxy,
):
initial_offset = vocs_service.volume_offset_state.volume_offset
initial_counter = vocs_service.volume_offset_state.change_counter
wrong_counter = initial_counter + 1
with pytest.raises(ATT_Error) as e:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, wrong_counter, 0
),
with_response=True,
)
assert e.value.error_code == ErrorCode.INVALID_CHANGE_COUNTER
counter = await vocs_client.volume_offset_state.read_value()
assert counter == VolumeOffsetState(initial_offset, initial_counter)
@pytest.mark.asyncio
async def test_wrong_volume_offset_raise_error(
vocs_client: VolumeOffsetControlServiceProxy,
):
invalid_offset_low = MIN_VOLUME_OFFSET - 1
invalid_offset_high = MAX_VOLUME_OFFSET + 1
with pytest.raises(ATT_Error) as e_low:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, invalid_offset_low
),
with_response=True,
)
assert e_low.value.error_code == ErrorCode.VALUE_OUT_OF_RANGE
with pytest.raises(ATT_Error) as e_high:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, invalid_offset_high
),
with_response=True,
)
assert e_high.value.error_code == ErrorCode.VALUE_OUT_OF_RANGE
@pytest.mark.asyncio
async def test_set_volume_offset(vocs_client: VolumeOffsetControlServiceProxy):
await vocs_client.volume_offset_control_point.write_value(
struct.pack('<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, -255),
)
assert await vocs_client.volume_offset_state.read_value() == VolumeOffsetState(
-255, 1
)
@pytest.mark.asyncio
async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy):
new_audio_location = VocsAudioLocation(audio_location=AudioLocation.FRONT_LEFT)
await vocs_client.audio_location.write_value(
struct.pack('<I', new_audio_location.audio_location)
)
location = await vocs_client.audio_location.read_value()
assert location == new_audio_location
@pytest.mark.asyncio
async def test_set_audio_output_description(
vocs_client: VolumeOffsetControlServiceProxy,
):
new_description = 'Left Speaker'
await vocs_client.audio_output_description.write_value(new_description)
description = await vocs_client.audio_output_description.read_value()
assert description == new_description