Rework HFP example

This commit is contained in:
Josh Wu
2024-03-05 20:53:28 +08:00
parent 02180088b3
commit 6205199d7f
3 changed files with 165 additions and 98 deletions

View File

@@ -22,7 +22,8 @@ import dataclasses
import enum
import traceback
import pyee
from typing import Dict, List, Union, Set, Any, Optional, TYPE_CHECKING
from typing import Dict, List, Union, Set, Any, Optional, Type, TYPE_CHECKING
from typing_extensions import Self
from bumble import at
from bumble import rfcomm
@@ -417,17 +418,21 @@ class AtResponseType(enum.Enum):
MULTIPLE = 2
@dataclasses.dataclass
class AtResponse:
code: str
parameters: list
def __init__(self, response: bytearray):
code_and_parameters = response.split(b':')
@classmethod
def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
code_and_parameters = buffer.split(b':')
parameters = (
code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray()
)
self.code = code_and_parameters[0].decode()
self.parameters = at.parse_parameters(parameters)
return cls(
code=code_and_parameters[0].decode(),
parameters=at.parse_parameters(parameters),
)
@dataclasses.dataclass
@@ -530,7 +535,7 @@ class HfProtocol(pyee.EventEmitter):
# Isolate the AT response code and parameters.
raw_response = self.read_buffer[header + 2 : trailer]
response = AtResponse(raw_response)
response = AtResponse.parse_from(raw_response)
logger.debug(f"<<< {raw_response.decode()}")
# Consume the response bytes.
@@ -1006,7 +1011,9 @@ class EscoParameters:
transmit_coding_format: CodingFormat
receive_coding_format: CodingFormat
packet_type: HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType
retransmission_effort: HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
retransmission_effort: (
HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort
)
max_latency: int
# Common
@@ -1014,12 +1021,12 @@ class EscoParameters:
output_coding_format: CodingFormat = CodingFormat(CodecID.LINEAR_PCM)
input_coded_data_size: int = 16
output_coded_data_size: int = 16
input_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
)
output_pcm_data_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
)
input_pcm_data_format: (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
output_pcm_data_format: (
HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat
) = HCI_Enhanced_Setup_Synchronous_Connection_Command.PcmDataFormat.TWOS_COMPLEMENT
input_pcm_sample_payload_msb_position: int = 0
output_pcm_sample_payload_msb_position: int = 0
input_data_path: HCI_Enhanced_Setup_Synchronous_Connection_Command.DataPath = (