Compare commits

..

6 Commits

Author SHA1 Message Date
zxzxwu d03fc14cfd Merge pull request #573 from ypomortsev/yegor
HFP: Fix reading multiple AT commands from a single data packet
2024-10-23 13:23:58 +08:00
Yegor Pomortsev c6bf27fd2c Fix test_hf_batched_response 2024-10-22 12:41:17 -07:00
Yegor Pomortsev 654030e789 Add tests for batched HFP commands/responses; reformat 2024-10-21 16:32:20 -07:00
Gilles Boccon-Gibod 1de7d2cd6f Merge pull request #571 from google/gbg/a2dp-player
a2dp player
2024-10-19 07:40:43 -07:00
Yegor Pomortsev e1714c16cc HFP: Fix reading multiple AT commands from a single data packet
The `data` received in `_read_at` may have multiple commands.

This fixes `execute_command` timing out when waiting for an `OK`
response when it is in the same data buffer, e.g. during SLC
initialization: b'\r\n+BRSF: 3904\r\n\r\nOK\r\n'
2024-10-18 13:21:24 -07:00
William Escande 23f46b36b3 HAP: wait for pairing event (#551) 2024-10-10 11:34:44 -07:00
4 changed files with 108 additions and 59 deletions
+45 -41
View File
@@ -795,29 +795,32 @@ class HfProtocol(pyee.EventEmitter):
# Append to the read buffer.
self.read_buffer.extend(data)
# Locate header and trailer.
header = self.read_buffer.find(b'\r\n')
trailer = self.read_buffer.find(b'\r\n', header + 2)
if header == -1 or trailer == -1:
return
while self.read_buffer:
# Locate header and trailer.
header = self.read_buffer.find(b'\r\n')
trailer = self.read_buffer.find(b'\r\n', header + 2)
if header == -1 or trailer == -1:
return
# Isolate the AT response code and parameters.
raw_response = self.read_buffer[header + 2 : trailer]
response = AtResponse.parse_from(raw_response)
logger.debug(f"<<< {raw_response.decode()}")
# Isolate the AT response code and parameters.
raw_response = self.read_buffer[header + 2 : trailer]
response = AtResponse.parse_from(raw_response)
logger.debug(f"<<< {raw_response.decode()}")
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 2 :]
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 2 :]
# Forward the received code to the correct queue.
if self.command_lock.locked() and (
response.code in STATUS_CODES or response.code in RESPONSE_CODES
):
self.response_queue.put_nowait(response)
elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response)
else:
logger.warning(f"dropping unexpected response with code '{response.code}'")
# Forward the received code to the correct queue.
if self.command_lock.locked() and (
response.code in STATUS_CODES or response.code in RESPONSE_CODES
):
self.response_queue.put_nowait(response)
elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response)
else:
logger.warning(
f"dropping unexpected response with code '{response.code}'"
)
async def execute_command(
self,
@@ -1244,31 +1247,32 @@ class AgProtocol(pyee.EventEmitter):
# Append to the read buffer.
self.read_buffer.extend(data)
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
while self.read_buffer:
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
def send_response(self, response: str) -> None:
"""Sends an AT response."""
+27 -18
View File
@@ -25,7 +25,7 @@ from bumble.utils import AsyncRunner, OpenIntEnum
from bumble.hci import Address
from dataclasses import dataclass, field
import logging
from typing import Dict, List, Optional, Set, Union
from typing import Any, Dict, List, Optional, Set, Union
# -----------------------------------------------------------------------------
@@ -271,24 +271,12 @@ class HearingAccessService(gatt.TemplateService):
def on_disconnection(_reason) -> None:
self.currently_connected_clients.remove(connection)
# TODO Should we filter on device bonded && device is HAP ?
self.currently_connected_clients.add(connection)
if (
connection.peer_address
not in self.preset_changed_operations_history_per_device
):
self.preset_changed_operations_history_per_device[
connection.peer_address
] = []
return
@connection.on('pairing') # type: ignore
def on_pairing(*_: Any) -> None:
self.on_incoming_paired_connection(connection)
async def on_connection_async() -> None:
# Send all the PresetChangedOperation that occur when not connected
await self._preset_changed_operation(connection)
# Update the active preset index if needed
await self.notify_active_preset_for_connection(connection)
connection.abort_on('disconnection', on_connection_async())
if connection.peer_resolvable_address:
self.on_incoming_paired_connection(connection)
self.hearing_aid_features_characteristic = gatt.Characteristic(
uuid=gatt.GATT_HEARING_AID_FEATURES_CHARACTERISTIC,
@@ -325,6 +313,27 @@ class HearingAccessService(gatt.TemplateService):
]
)
def on_incoming_paired_connection(self, connection: Connection):
'''Setup initial operations to handle a remote bonded HAP device'''
# TODO Should we filter on HAP device only ?
self.currently_connected_clients.add(connection)
if (
connection.peer_address
not in self.preset_changed_operations_history_per_device
):
self.preset_changed_operations_history_per_device[
connection.peer_address
] = []
return
async def on_connection_async() -> None:
# Send all the PresetChangedOperation that occur when not connected
await self._preset_changed_operation(connection)
# Update the active preset index if needed
await self.notify_active_preset_for_connection(connection)
connection.abort_on('disconnection', on_connection_async())
def _on_read_active_preset_index(
self, __connection__: Optional[Connection]
) -> bytes:
+5
View File
@@ -25,6 +25,7 @@ import sys
from bumble import att, device
from bumble.profiles import hap
from .test_utils import TwoDevices
from bumble.keys import PairingKeys
# -----------------------------------------------------------------------------
# Logging
@@ -86,6 +87,10 @@ async def hap_client():
devices.connections[0].encryption = 1 # type: ignore
devices.connections[1].encryption = 1 # type: ignore
devices[0].on_pairing(
devices.connections[0], devices.connections[0].peer_address, PairingKeys(), True
)
peer = device.Peer(devices.connections[1]) # type: ignore
hap_client = await peer.discover_service_and_create_proxy(
hap.HearingAccessServiceProxy
+31
View File
@@ -569,6 +569,37 @@ async def test_sco_setup():
await asyncio.gather(*sco_disconnection_futures)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
ag.dlc.write(b'\r\n+BIND: (1,2)\r\n\r\nOK\r\n')
await hf.execute_command("AT+BIND=?", response_type=hfp.AtResponseType.SINGLE)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
answer_future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: answer_future.set_result(None))
hang_up_future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: hang_up_future.set_result(None))
hf.dlc.write(b'ATA\rAT+CHUP\r')
await answer_future
await hang_up_future
# -----------------------------------------------------------------------------
async def run():
await test_slc()