Implement optional synchronization for has (#704)

HAS can be run in synchronized mode, requiring a server to be able to
communicate with another CL.
This CL is doing a quick implementation of such functionality for preset
selection.
This commit is contained in:
William Escande
2025-06-02 14:57:59 -07:00
committed by GitHub
parent 4c4f8c8225
commit 9ad276a757

View File

@@ -236,6 +236,8 @@ class HearingAccessService(gatt.TemplateService):
preset_records: Dict[int, PresetRecord] # key is the preset index
read_presets_request_in_progress: bool
other_server_in_binaural_set: Optional[HearingAccessService] = None
preset_changed_operations_history_per_device: Dict[
Address, List[PresetChangedOperation]
]
@@ -513,7 +515,7 @@ class HearingAccessService(gatt.TemplateService):
for connection in self.currently_connected_clients:
await self.notify_active_preset_for_connection(connection)
async def set_active_preset(self, connection: Connection, value: bytes) -> None:
async def set_active_preset(self, value: bytes) -> None:
index = value[1]
preset = self.preset_records.get(index, None)
if (
@@ -530,10 +532,10 @@ class HearingAccessService(gatt.TemplateService):
self.active_preset_index = index
await self.notify_active_preset()
async def _on_set_active_preset(self, connection: Connection, value: bytes):
await self.set_active_preset(connection, value)
async def _on_set_active_preset(self, _: Connection, value: bytes):
await self.set_active_preset(value)
async def set_next_or_previous_preset(self, connection: Connection, is_previous):
async def set_next_or_previous_preset(self, is_previous):
'''Set the next or the previous preset as active'''
if self.active_preset_index == 0x00:
@@ -563,48 +565,47 @@ class HearingAccessService(gatt.TemplateService):
self.active_preset_index = first_preset.index
await self.notify_active_preset()
async def _on_set_next_preset(
self, connection: Connection, __value__: bytes
) -> None:
await self.set_next_or_previous_preset(connection, False)
async def _on_set_next_preset(self, _: Connection, __value__: bytes) -> None:
await self.set_next_or_previous_preset(False)
async def _on_set_previous_preset(
self, connection: Connection, __value__: bytes
) -> None:
await self.set_next_or_previous_preset(connection, True)
async def _on_set_previous_preset(self, _: Connection, __value__: bytes) -> None:
await self.set_next_or_previous_preset(True)
async def _on_set_active_preset_synchronized_locally(
self, connection: Connection, value: bytes
self, _: Connection, value: bytes
):
if (
self.server_features.preset_synchronization_support
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED
):
raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
await self.set_active_preset(connection, value)
# TODO (low priority) inform other server of the change
await self.set_active_preset(value)
if self.other_server_in_binaural_set:
await self.other_server_in_binaural_set.set_active_preset(value)
async def _on_set_next_preset_synchronized_locally(
self, connection: Connection, __value__: bytes
self, _: Connection, __value__: bytes
):
if (
self.server_features.preset_synchronization_support
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED
):
raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
await self.set_next_or_previous_preset(connection, False)
# TODO (low priority) inform other server of the change
await self.set_next_or_previous_preset(False)
if self.other_server_in_binaural_set:
await self.other_server_in_binaural_set.set_next_or_previous_preset(False)
async def _on_set_previous_preset_synchronized_locally(
self, connection: Connection, __value__: bytes
self, _: Connection, __value__: bytes
):
if (
self.server_features.preset_synchronization_support
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_SUPPORTED
== PresetSynchronizationSupport.PRESET_SYNCHRONIZATION_IS_NOT_SUPPORTED
):
raise att.ATT_Error(ErrorCode.PRESET_SYNCHRONIZATION_NOT_SUPPORTED)
await self.set_next_or_previous_preset(connection, True)
# TODO (low priority) inform other server of the change
await self.set_next_or_previous_preset(True)
if self.other_server_in_binaural_set:
await self.other_server_in_binaural_set.set_next_or_previous_preset(True)
# -----------------------------------------------------------------------------