Merge pull request #573 from ypomortsev/yegor

HFP: Fix reading multiple AT commands from a single data packet
This commit is contained in:
zxzxwu
2024-10-23 13:23:58 +08:00
committed by GitHub
2 changed files with 76 additions and 41 deletions

View File

@@ -795,6 +795,7 @@ class HfProtocol(pyee.EventEmitter):
# Append to the read buffer. # Append to the read buffer.
self.read_buffer.extend(data) self.read_buffer.extend(data)
while self.read_buffer:
# Locate header and trailer. # Locate header and trailer.
header = self.read_buffer.find(b'\r\n') header = self.read_buffer.find(b'\r\n')
trailer = self.read_buffer.find(b'\r\n', header + 2) trailer = self.read_buffer.find(b'\r\n', header + 2)
@@ -817,7 +818,9 @@ class HfProtocol(pyee.EventEmitter):
elif response.code in UNSOLICITED_CODES: elif response.code in UNSOLICITED_CODES:
self.unsolicited_queue.put_nowait(response) self.unsolicited_queue.put_nowait(response)
else: else:
logger.warning(f"dropping unexpected response with code '{response.code}'") logger.warning(
f"dropping unexpected response with code '{response.code}'"
)
async def execute_command( async def execute_command(
self, self,
@@ -1244,6 +1247,7 @@ class AgProtocol(pyee.EventEmitter):
# Append to the read buffer. # Append to the read buffer.
self.read_buffer.extend(data) self.read_buffer.extend(data)
while self.read_buffer:
# Locate the trailer. # Locate the trailer.
trailer = self.read_buffer.find(b'\r') trailer = self.read_buffer.find(b'\r')
if trailer == -1: if trailer == -1:

View File

@@ -569,6 +569,37 @@ async def test_sco_setup():
await asyncio.gather(*sco_disconnection_futures) await asyncio.gather(*sco_disconnection_futures)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
ag.dlc.write(b'\r\n+BIND: (1,2)\r\n\r\nOK\r\n')
await hf.execute_command("AT+BIND=?", response_type=hfp.AtResponseType.SINGLE)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
answer_future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: answer_future.set_result(None))
hang_up_future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: hang_up_future.set_result(None))
hf.dlc.write(b'ATA\rAT+CHUP\r')
await answer_future
await hang_up_future
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def run(): async def run():
await test_slc() await test_slc()