better docs and GATT fixes

This commit is contained in:
Gilles Boccon-Gibod
2025-02-06 15:48:39 -05:00
parent 26e87f09fe
commit 33435c2980
4 changed files with 54 additions and 70 deletions

View File

@@ -1195,7 +1195,7 @@ def transmit(
input, input,
input_format, input_format,
): ):
"""Transmit a broadcast source.""" """Transmit a broadcast source"""
if manufacturer_data: if manufacturer_data:
vendor_id_str, data_hex = manufacturer_data.split(':') vendor_id_str, data_hex = manufacturer_data.split(':')
vendor_id = int(vendor_id_str) vendor_id = int(vendor_id_str)

View File

@@ -42,7 +42,7 @@ from typing import (
) )
from bumble.colors import color from bumble.colors import color
from bumble.core import BaseBumbleError, UUID from bumble.core import BaseBumbleError, InvalidOperationError, UUID
from bumble.att import Attribute, AttributeValue from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable from bumble.utils import ByteSerializable
@@ -679,10 +679,14 @@ class DelegatedCharacteristicAdapter(CharacteristicAdapter):
self.decode = decode self.decode = decode
def encode_value(self, value): def encode_value(self, value):
return self.encode(value) if self.encode else value if self.encode is None:
raise InvalidOperationError('delegated adapter does not have an encoder')
return self.encode(value)
def decode_value(self, value): def decode_value(self, value):
return self.decode(value) if self.decode else value if self.decode is None:
raise InvalidOperationError('delegate adapter does not have a decoder')
return self.decode(value)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -82,9 +82,7 @@ class VolumeOffsetState:
async def notify_subscribers_via_connection(self, connection: Connection) -> None: async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None assert self.attribute_value is not None
await connection.device.notify_subscribers( await connection.device.notify_subscribers(attribute=self.attribute_value)
attribute=self.attribute_value, value=bytes(self)
)
def on_read(self, _connection: Optional[Connection]) -> bytes: def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self) return bytes(self)
@@ -111,9 +109,7 @@ class VocsAudioLocation:
assert self.attribute_value assert self.attribute_value
self.audio_location = AudioLocation(int.from_bytes(value, 'little')) self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers( await connection.device.notify_subscribers(attribute=self.attribute_value)
attribute=self.attribute_value, value=value
)
@dataclass @dataclass
@@ -169,9 +165,7 @@ class AudioOutputDescription:
assert self.attribute_value assert self.attribute_value
self.audio_output_description = value.decode('utf-8') self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers( await connection.device.notify_subscribers(attribute=self.attribute_value)
attribute=self.attribute_value, value=value
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -203,37 +197,30 @@ class VolumeOffsetControlService(TemplateService):
VolumeOffsetControlPoint(self.volume_offset_state) VolumeOffsetControlPoint(self.volume_offset_state)
) )
self.volume_offset_state_characteristic = DelegatedCharacteristicAdapter( self.volume_offset_state_characteristic = Characteristic(
Characteristic( uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC, properties=(
properties=( Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
),
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
), ),
encode=lambda value: bytes(value), permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
) )
self.audio_location_characteristic = DelegatedCharacteristicAdapter( self.audio_location_characteristic = Characteristic(
Characteristic( uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC, properties=(
properties=( Characteristic.Properties.READ
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
| Characteristic.Properties.NOTIFY | Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE ),
), permissions=(
permissions=( Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION ),
), value=CharacteristicValue(
value=CharacteristicValue( read=self.audio_location.on_read,
read=self.audio_location.on_read, write=self.audio_location.on_write,
write=self.audio_location.on_write,
),
), ),
encode=lambda value: bytes(value),
decode=VocsAudioLocation.from_bytes,
) )
self.audio_location.attribute_value = self.audio_location_characteristic.value self.audio_location.attribute_value = self.audio_location_characteristic.value
@@ -244,25 +231,22 @@ class VolumeOffsetControlService(TemplateService):
value=CharacteristicValue(write=self.volume_offset_control_point.on_write), value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
) )
self.audio_output_description_characteristic = DelegatedCharacteristicAdapter( self.audio_output_description_characteristic = Characteristic(
Characteristic( uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC, properties=(
properties=( Characteristic.Properties.READ
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
| Characteristic.Properties.NOTIFY | Characteristic.Properties.WRITE_WITHOUT_RESPONSE
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE ),
), permissions=(
permissions=( Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION ),
), value=CharacteristicValue(
value=CharacteristicValue( read=self.audio_output_description.on_read,
read=self.audio_output_description.on_read, write=self.audio_output_description.on_write,
write=self.audio_output_description.on_write, ),
),
)
) )
self.audio_output_description.attribute_value = ( self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value self.audio_output_description_characteristic.value
) )
@@ -287,18 +271,19 @@ class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
def __init__(self, service_proxy: ServiceProxy) -> None: def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy self.service_proxy = service_proxy
self.volume_offset_state = DelegatedCharacteristicAdapter( self.volume_offset_state = SerializableCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
), ),
decode=VolumeOffsetState.from_bytes, VolumeOffsetState,
) )
self.audio_location = SerializableCharacteristicAdapter( self.audio_location = DelegatedCharacteristicAdapter(
service_proxy.get_required_characteristic_by_uuid( service_proxy.get_required_characteristic_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC GATT_AUDIO_LOCATION_CHARACTERISTIC
), ),
VocsAudioLocation, encode=lambda value: bytes([int(value)]),
decode=lambda data: AudioLocation(data[0]),
) )
self.volume_offset_control_point = ( self.volume_offset_control_point = (

View File

@@ -32,7 +32,6 @@ from bumble.profiles.vocs import (
SetVolumeOffsetOpCode, SetVolumeOffsetOpCode,
VolumeOffsetControlServiceProxy, VolumeOffsetControlServiceProxy,
VolumeOffsetState, VolumeOffsetState,
VocsAudioLocation,
) )
from bumble.profiles.vcs import VolumeControlService, VolumeControlServiceProxy from bumble.profiles.vcs import VolumeControlService, VolumeControlServiceProxy
from bumble.profiles.bap import AudioLocation from bumble.profiles.bap import AudioLocation
@@ -81,9 +80,7 @@ async def test_init_service(vocs_client: VolumeOffsetControlServiceProxy):
volume_offset=0, volume_offset=0,
change_counter=0, change_counter=0,
) )
assert await vocs_client.audio_location.read_value() == VocsAudioLocation( assert await vocs_client.audio_location.read_value() == AudioLocation.NOT_ALLOWED
audio_location=AudioLocation.NOT_ALLOWED
)
description = await vocs_client.audio_output_description.read_value() description = await vocs_client.audio_output_description.read_value()
assert description == '' assert description == ''
@@ -162,11 +159,9 @@ async def test_set_volume_offset(vocs_client: VolumeOffsetControlServiceProxy):
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy): async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy):
new_audio_location = VocsAudioLocation(audio_location=AudioLocation.FRONT_LEFT) new_audio_location = AudioLocation.FRONT_LEFT
await vocs_client.audio_location.write_value( await vocs_client.audio_location.write_value(new_audio_location)
struct.pack('<I', new_audio_location.audio_location)
)
location = await vocs_client.audio_location.read_value() location = await vocs_client.audio_location.read_value()
assert location == new_audio_location assert location == new_audio_location