HFP: State memory and event emit

This commit is contained in:
Josh Wu
2024-01-30 14:36:03 +08:00
parent 071fc2723a
commit d2dcf063ee
2 changed files with 258 additions and 68 deletions

View File

@@ -21,12 +21,11 @@ import asyncio
import dataclasses import dataclasses
import enum import enum
import traceback import traceback
import warnings import pyee
from typing import Dict, List, Union, Set, Any, TYPE_CHECKING from typing import Dict, List, Union, Set, Any, Optional, TYPE_CHECKING
from . import at
from . import rfcomm
from bumble import at
from bumble import rfcomm
from bumble.colors import color from bumble.colors import color
from bumble.core import ( from bumble.core import (
ProtocolError, ProtocolError,
@@ -79,7 +78,6 @@ class HfpProtocol:
lines_available: asyncio.Event lines_available: asyncio.Event
def __init__(self, dlc: rfcomm.DLC) -> None: def __init__(self, dlc: rfcomm.DLC) -> None:
warnings.warn("See HfProtocol", DeprecationWarning)
self.dlc = dlc self.dlc = dlc
self.buffer = '' self.buffer = ''
self.lines = collections.deque() self.lines = collections.deque()
@@ -128,10 +126,13 @@ class HfpProtocol:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# HF supported features (AT+BRSF=) (normative).
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
# and 3GPP 27.007
class HfFeature(enum.IntFlag): class HfFeature(enum.IntFlag):
"""
HF supported features (AT+BRSF=) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
EC_NR = 0x001 # Echo Cancel & Noise reduction EC_NR = 0x001 # Echo Cancel & Noise reduction
THREE_WAY_CALLING = 0x002 THREE_WAY_CALLING = 0x002
CLI_PRESENTATION_CAPABILITY = 0x004 CLI_PRESENTATION_CAPABILITY = 0x004
@@ -146,10 +147,13 @@ class HfFeature(enum.IntFlag):
VOICE_RECOGNITION_TEST = 0x800 VOICE_RECOGNITION_TEST = 0x800
# AG supported features (+BRSF:) (normative).
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
# and 3GPP 27.007
class AgFeature(enum.IntFlag): class AgFeature(enum.IntFlag):
"""
AG supported features (+BRSF:) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
THREE_WAY_CALLING = 0x001 THREE_WAY_CALLING = 0x001
EC_NR = 0x002 # Echo Cancel & Noise reduction EC_NR = 0x002 # Echo Cancel & Noise reduction
VOICE_RECOGNITION_FUNCTION = 0x004 VOICE_RECOGNITION_FUNCTION = 0x004
@@ -166,52 +170,90 @@ class AgFeature(enum.IntFlag):
VOICE_RECOGNITION_TEST = 0x2000 VOICE_RECOGNITION_TEST = 0x2000
# Audio Codec IDs (normative).
# Hands-Free Profile v1.8, 10 Appendix B
class AudioCodec(enum.IntEnum): class AudioCodec(enum.IntEnum):
"""
Audio Codec IDs (normative).
Hands-Free Profile v1.9, 11 Appendix B
"""
CVSD = 0x01 # Support for CVSD audio codec CVSD = 0x01 # Support for CVSD audio codec
MSBC = 0x02 # Support for mSBC audio codec MSBC = 0x02 # Support for mSBC audio codec
LC3_SWB = 0x03 # Support for LC3-SWB audio codec
# HF Indicators (normative).
# Bluetooth Assigned Numbers, 6.10.1 HF Indicators
class HfIndicator(enum.IntEnum): class HfIndicator(enum.IntEnum):
"""
HF Indicators (normative).
Bluetooth Assigned Numbers, 6.10.1 HF Indicators.
"""
ENHANCED_SAFETY = 0x01 # Enhanced safety feature ENHANCED_SAFETY = 0x01 # Enhanced safety feature
BATTERY_LEVEL = 0x02 # Battery level feature BATTERY_LEVEL = 0x02 # Battery level feature
# Call Hold supported operations (normative).
# AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services
class CallHoldOperation(enum.IntEnum): class CallHoldOperation(enum.IntEnum):
"""
Call Hold supported operations (normative).
AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services.
"""
RELEASE_ALL_HELD_CALLS = 0 # Release all held calls RELEASE_ALL_HELD_CALLS = 0 # Release all held calls
RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other
HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other
ADD_HELD_CALL = 3 # Adds a held call to conversation ADD_HELD_CALL = 3 # Adds a held call to conversation
# Response Hold status (normative).
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
# and 3GPP 27.007
class ResponseHoldStatus(enum.IntEnum): class ResponseHoldStatus(enum.IntEnum):
"""
Response Hold status (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
INC_CALL_HELD = 0 # Put incoming call on hold INC_CALL_HELD = 0 # Put incoming call on hold
HELD_CALL_ACC = 1 # Accept a held incoming call HELD_CALL_ACC = 1 # Accept a held incoming call
HELD_CALL_REJ = 2 # Reject a held incoming call HELD_CALL_REJ = 2 # Reject a held incoming call
# Values for the Call Setup AG indicator (normative). class AgIndicator(enum.Enum):
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 """
# and 3GPP 27.007 Values for the AG indicator (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
SERVICE = 'service'
CALL = 'call'
CALL_SETUP = 'callsetup'
CALL_HELD = 'callheld'
SIGNAL = 'signal'
ROAM = 'roam'
BATTERY_CHARGE = 'battchg'
class CallSetupAgIndicator(enum.IntEnum): class CallSetupAgIndicator(enum.IntEnum):
"""
Values for the Call Setup AG indicator (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
NOT_IN_CALL_SETUP = 0 NOT_IN_CALL_SETUP = 0
INCOMING_CALL_PROCESS = 1 INCOMING_CALL_PROCESS = 1
OUTGOING_CALL_SETUP = 2 OUTGOING_CALL_SETUP = 2
REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call
# Values for the Call Held AG indicator (normative).
# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
# and 3GPP 27.007
class CallHeldAgIndicator(enum.IntEnum): class CallHeldAgIndicator(enum.IntEnum):
"""
Values for the Call Held AG indicator (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
NO_CALLS_HELD = 0 NO_CALLS_HELD = 0
# Call is placed on hold or active/held calls swapped # Call is placed on hold or active/held calls swapped
# (The AG has both an active AND a held call) # (The AG has both an active AND a held call)
@@ -219,16 +261,24 @@ class CallHeldAgIndicator(enum.IntEnum):
CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call
# Call Info direction (normative).
# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
class CallInfoDirection(enum.IntEnum): class CallInfoDirection(enum.IntEnum):
"""
Call Info direction (normative).
AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
"""
MOBILE_ORIGINATED_CALL = 0 MOBILE_ORIGINATED_CALL = 0
MOBILE_TERMINATED_CALL = 1 MOBILE_TERMINATED_CALL = 1
# Call Info status (normative).
# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
class CallInfoStatus(enum.IntEnum): class CallInfoStatus(enum.IntEnum):
"""
Call Info status (normative).
AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
"""
ACTIVE = 0 ACTIVE = 0
HELD = 1 HELD = 1
DIALING = 2 DIALING = 2
@@ -237,15 +287,47 @@ class CallInfoStatus(enum.IntEnum):
WAITING = 5 WAITING = 5
# Call Info mode (normative).
# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
class CallInfoMode(enum.IntEnum): class CallInfoMode(enum.IntEnum):
"""
Call Info mode (normative).
AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
"""
VOICE = 0 VOICE = 0
DATA = 1 DATA = 1
FAX = 2 FAX = 2
UNKNOWN = 9 UNKNOWN = 9
class CallInfoMultiParty(enum.IntEnum):
"""
Call Info Multi-Party state (normative).
AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
"""
NOT_IN_CONFERENCE = 0
IN_CONFERENCE = 1
@dataclasses.dataclass
class CallInfo:
"""
Enhanced call status.
AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls.
"""
index: int
direction: CallInfoDirection
status: CallInfoStatus
mode: CallInfoMode
multi_party: CallInfoMultiParty
number: Optional[int] = None
type: Optional[int] = None
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Hands-Free Control Interoperability Requirements # Hands-Free Control Interoperability Requirements
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -326,8 +408,9 @@ class Configuration:
class AtResponseType(enum.Enum): class AtResponseType(enum.Enum):
"""Indicate if a response is expected from an AT command, and if multiple """
responses are accepted.""" Indicates if a response is expected from an AT command, and if multiple responses are accepted.
"""
NONE = 0 NONE = 0
SINGLE = 1 SINGLE = 1
@@ -361,9 +444,20 @@ class HfIndicatorState:
enabled: bool = False enabled: bool = False
class HfProtocol: class HfProtocol(pyee.EventEmitter):
"""Implementation for the Hands-Free side of the Hands-Free profile. """
Reference specification Hands-Free Profile v1.8""" Implementation for the Hands-Free side of the Hands-Free profile.
Reference specification Hands-Free Profile v1.8.
Emitted events:
codec_negotiation: When codec is renegotiated, notify the new codec.
Args:
active_codec: AudioCodec
ag_indicator: When AG update their indicators, notify the new state.
Args:
ag_indicator: AgIndicator
"""
supported_hf_features: int supported_hf_features: int
supported_audio_codecs: List[AudioCodec] supported_audio_codecs: List[AudioCodec]
@@ -383,14 +477,18 @@ class HfProtocol:
response_queue: asyncio.Queue response_queue: asyncio.Queue
unsolicited_queue: asyncio.Queue unsolicited_queue: asyncio.Queue
read_buffer: bytearray read_buffer: bytearray
active_codec: AudioCodec
def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None:
super().__init__()
def __init__(self, dlc: rfcomm.DLC, configuration: Configuration):
# Configure internal state. # Configure internal state.
self.dlc = dlc self.dlc = dlc
self.command_lock = asyncio.Lock() self.command_lock = asyncio.Lock()
self.response_queue = asyncio.Queue() self.response_queue = asyncio.Queue()
self.unsolicited_queue = asyncio.Queue() self.unsolicited_queue = asyncio.Queue()
self.read_buffer = bytearray() self.read_buffer = bytearray()
self.active_codec = AudioCodec.CVSD
# Build local features. # Build local features.
self.supported_hf_features = sum(configuration.supported_hf_features) self.supported_hf_features = sum(configuration.supported_hf_features)
@@ -415,10 +513,12 @@ class HfProtocol:
def supports_ag_feature(self, feature: AgFeature) -> bool: def supports_ag_feature(self, feature: AgFeature) -> bool:
return (self.supported_ag_features & feature) != 0 return (self.supported_ag_features & feature) != 0
# Read AT messages from the RFCOMM channel.
# Enqueue AT commands, responses, unsolicited responses to their
# respective queues, and set the corresponding event.
def _read_at(self, data: bytes): def _read_at(self, data: bytes):
"""
Reads AT messages from the RFCOMM channel.
Enqueues AT commands, responses, unsolicited responses to their respective queues, and set the corresponding event.
"""
# Append to the read buffer. # Append to the read buffer.
self.read_buffer.extend(data) self.read_buffer.extend(data)
@@ -446,17 +546,25 @@ class HfProtocol:
else: else:
logger.warning(f"dropping unexpected response with code '{response.code}'") logger.warning(f"dropping unexpected response with code '{response.code}'")
# Send an AT command and wait for the peer response.
# Wait for the AT responses sent by the peer, to the status code.
# Raises asyncio.TimeoutError if the status is not received
# after a timeout (default 1 second).
# Raises ProtocolError if the status is not OK.
async def execute_command( async def execute_command(
self, self,
cmd: str, cmd: str,
timeout: float = 1.0, timeout: float = 1.0,
response_type: AtResponseType = AtResponseType.NONE, response_type: AtResponseType = AtResponseType.NONE,
) -> Union[None, AtResponse, List[AtResponse]]: ) -> Union[None, AtResponse, List[AtResponse]]:
"""
Sends an AT command and wait for the peer response.
Wait for the AT responses sent by the peer, to the status code.
Args:
cmd: the AT command in string to execute.
timeout: timeout in float seconds.
response_type: type of response.
Raises:
asyncio.TimeoutError: the status is not received after a timeout (default 1 second).
ProtocolError: the status is not OK.
"""
async with self.command_lock: async with self.command_lock:
logger.debug(f">>> {cmd}") logger.debug(f">>> {cmd}")
self.dlc.write(cmd + '\r') self.dlc.write(cmd + '\r')
@@ -479,8 +587,9 @@ class HfProtocol:
raise HfpProtocolError(result.code) raise HfpProtocolError(result.code)
responses.append(result) responses.append(result)
# 4.2.1 Service Level Connection Initialization.
async def initiate_slc(self): async def initiate_slc(self):
"""4.2.1 Service Level Connection Initialization."""
# 4.2.1.1 Supported features exchange # 4.2.1.1 Supported features exchange
# First, in the initialization procedure, the HF shall send the # First, in the initialization procedure, the HF shall send the
# AT+BRSF=<HF supported features> command to the AG to both notify # AT+BRSF=<HF supported features> command to the AG to both notify
@@ -620,16 +729,17 @@ class HfProtocol:
logger.info("SLC setup completed") logger.info("SLC setup completed")
# 4.11.2 Audio Connection Setup by HF
async def setup_audio_connection(self): async def setup_audio_connection(self):
"""4.11.2 Audio Connection Setup by HF."""
# When the HF triggers the establishment of the Codec Connection it # When the HF triggers the establishment of the Codec Connection it
# shall send the AT command AT+BCC to the AG. The AG shall respond with # shall send the AT command AT+BCC to the AG. The AG shall respond with
# OK if it will start the Codec Connection procedure, and with ERROR # OK if it will start the Codec Connection procedure, and with ERROR
# if it cannot start the Codec Connection procedure. # if it cannot start the Codec Connection procedure.
await self.execute_command("AT+BCC") await self.execute_command("AT+BCC")
# 4.11.3 Codec Connection Setup
async def setup_codec_connection(self, codec_id: int): async def setup_codec_connection(self, codec_id: int):
"""4.11.3 Codec Connection Setup."""
# The AG shall send a +BCS=<Codec ID> unsolicited response to the HF. # The AG shall send a +BCS=<Codec ID> unsolicited response to the HF.
# The HF shall then respond to the incoming unsolicited response with # The HF shall then respond to the incoming unsolicited response with
# the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the # the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the
@@ -647,27 +757,29 @@ class HfProtocol:
# Synchronous Connection with the settings that are determined by the # Synchronous Connection with the settings that are determined by the
# ID. The HF shall be ready to accept the synchronous connection # ID. The HF shall be ready to accept the synchronous connection
# establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>. # establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>.
self.active_codec = AudioCodec(codec_id)
self.emit('codec_negotiation', self.active_codec)
logger.info("codec connection setup completed") logger.info("codec connection setup completed")
# 4.13.1 Answer Incoming Call from the HF In-Band Ringing
async def answer_incoming_call(self): async def answer_incoming_call(self):
"""4.13.1 Answer Incoming Call from the HF - In-Band Ringing."""
# The user accepts the incoming voice call by using the proper means # The user accepts the incoming voice call by using the proper means
# provided by the HF. The HF shall then send the ATA command # provided by the HF. The HF shall then send the ATA command
# (see Section 4.34) to the AG. The AG shall then begin the procedure for # (see Section 4.34) to the AG. The AG shall then begin the procedure for
# accepting the incoming call. # accepting the incoming call.
await self.execute_command("ATA") await self.execute_command("ATA")
# 4.14.1 Reject an Incoming Call from the HF
async def reject_incoming_call(self): async def reject_incoming_call(self):
"""4.14.1 Reject an Incoming Call from the HF."""
# The user rejects the incoming call by using the User Interface on the # The user rejects the incoming call by using the User Interface on the
# Hands-Free unit. The HF shall then send the AT+CHUP command # Hands-Free unit. The HF shall then send the AT+CHUP command
# (see Section 4.34) to the AG. This may happen at any time during the # (see Section 4.34) to the AG. This may happen at any time during the
# procedures described in Sections 4.13.1 and 4.13.2. # procedures described in Sections 4.13.1 and 4.13.2.
await self.execute_command("AT+CHUP") await self.execute_command("AT+CHUP")
# 4.15.1 Terminate a Call Process from the HF
async def terminate_call(self): async def terminate_call(self):
"""4.15.1 Terminate a Call Process from the HF."""
# The user may abort the ongoing call process using whatever means # The user may abort the ongoing call process using whatever means
# provided by the Hands-Free unit. The HF shall send AT+CHUP command # provided by the Hands-Free unit. The HF shall send AT+CHUP command
# (see Section 4.34) to the AG, and the AG shall then start the # (see Section 4.34) to the AG, and the AG shall then start the
@@ -676,8 +788,35 @@ class HfProtocol:
# code, with the value indicating (call=0). # code, with the value indicating (call=0).
await self.execute_command("AT+CHUP") await self.execute_command("AT+CHUP")
async def query_current_calls(self) -> List[CallInfo]:
"""4.32.1 Query List of Current Calls in AG.
Return:
List of current calls in AG.
"""
responses = await self.execute_command(
"AT+CLCC", response_type=AtResponseType.MULTIPLE
)
assert isinstance(responses, list)
calls = []
for response in responses:
call_info = CallInfo(
index=int(response.parameters[0]),
direction=CallInfoDirection(int(response.parameters[1])),
status=CallInfoStatus(int(response.parameters[2])),
mode=CallInfoMode(int(response.parameters[3])),
multi_party=CallInfoMultiParty(int(response.parameters[4])),
)
if len(response.parameters) >= 7:
call_info.number = int(response.parameters[5])
call_info.type = int(response.parameters[6])
calls.append(call_info)
return calls
async def update_ag_indicator(self, index: int, value: int): async def update_ag_indicator(self, index: int, value: int):
self.ag_indicators[index].current_status = value self.ag_indicators[index].current_status = value
self.emit('ag_indicator', self.ag_indicators[index])
logger.info( logger.info(
f"AG indicator updated: {self.ag_indicators[index].description}, {value}" f"AG indicator updated: {self.ag_indicators[index].description}, {value}"
) )
@@ -695,9 +834,11 @@ class HfProtocol:
logging.info(f"unhandled unsolicited response {result.code}") logging.info(f"unhandled unsolicited response {result.code}")
async def run(self): async def run(self):
"""Main rountine for the Hands-Free side of the HFP protocol. """
Initiates the service level connection then loops handling Main routine for the Hands-Free side of the HFP protocol.
unsolicited AG responses."""
Initiates the service level connection then loops handling unsolicited AG responses.
"""
try: try:
await self.initiate_slc() await self.initiate_slc()
@@ -713,9 +854,13 @@ class HfProtocol:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Profile version (normative).
# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
class ProfileVersion(enum.IntEnum): class ProfileVersion(enum.IntEnum):
"""
Profile version (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
V1_5 = 0x0105 V1_5 = 0x0105
V1_6 = 0x0106 V1_6 = 0x0106
V1_7 = 0x0107 V1_7 = 0x0107
@@ -723,9 +868,13 @@ class ProfileVersion(enum.IntEnum):
V1_9 = 0x0109 V1_9 = 0x0109
# HF supported features (normative).
# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
class HfSdpFeature(enum.IntFlag): class HfSdpFeature(enum.IntFlag):
"""
HF supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
EC_NR = 0x01 # Echo Cancel & Noise reduction EC_NR = 0x01 # Echo Cancel & Noise reduction
THREE_WAY_CALLING = 0x02 THREE_WAY_CALLING = 0x02
CLI_PRESENTATION_CAPABILITY = 0x04 CLI_PRESENTATION_CAPABILITY = 0x04
@@ -736,9 +885,13 @@ class HfSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_TEST = 0x80 VOICE_RECOGNITION_TEST = 0x80
# AG supported features (normative).
# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
class AgSdpFeature(enum.IntFlag): class AgSdpFeature(enum.IntFlag):
"""
AG supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
THREE_WAY_CALLING = 0x01 THREE_WAY_CALLING = 0x01
EC_NR = 0x02 # Echo Cancel & Noise reduction EC_NR = 0x02 # Echo Cancel & Noise reduction
VOICE_RECOGNITION_FUNCTION = 0x04 VOICE_RECOGNITION_FUNCTION = 0x04
@@ -752,9 +905,12 @@ class AgSdpFeature(enum.IntFlag):
def sdp_records( def sdp_records(
service_record_handle: int, rfcomm_channel: int, configuration: Configuration service_record_handle: int, rfcomm_channel: int, configuration: Configuration
) -> List[ServiceAttribute]: ) -> List[ServiceAttribute]:
"""Generate the SDP record for HFP Hands-Free support. """
Generates the SDP record for HFP Hands-Free support.
The record exposes the features supported in the input configuration, The record exposes the features supported in the input configuration,
and the allocated RFCOMM channel.""" and the allocated RFCOMM channel.
"""
hf_supported_features = 0 hf_supported_features = 0

View File

@@ -21,11 +21,13 @@ import os
import logging import logging
import json import json
import websockets import websockets
import functools
from typing import Optional from typing import Optional
from bumble.device import Device from bumble import rfcomm
from bumble import hci
from bumble.device import Device, Connection
from bumble.transport import open_transport_or_link from bumble.transport import open_transport_or_link
from bumble.rfcomm import Server as RfcommServer
from bumble import hfp from bumble import hfp
from bumble.hfp import HfProtocol from bumble.hfp import HfProtocol
@@ -57,12 +59,44 @@ class UiServer:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def on_dlc(dlc, configuration: hfp.Configuration): def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
print('*** DLC connected', dlc) print('*** DLC connected', dlc)
protocol = HfProtocol(dlc, configuration) protocol = HfProtocol(dlc, configuration)
UiServer.protocol = protocol UiServer.protocol = protocol
asyncio.create_task(protocol.run()) asyncio.create_task(protocol.run())
def on_sco_request(connection: Connection, link_type: int, protocol: HfProtocol):
if connection == protocol.dlc.multiplexer.l2cap_channel.connection:
if link_type == hci.HCI_Connection_Complete_Event.SCO_LINK_TYPE:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.SCO_CVSD_D1
]
elif protocol.active_codec == hfp.AudioCodec.MSBC:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_MSBC_T2
]
elif protocol.active_codec == hfp.AudioCodec.CVSD:
esco_parameters = hfp.ESCO_PARAMETERS[
hfp.DefaultCodecParameters.ESCO_CVSD_S4
]
connection.abort_on(
'disconnection',
connection.device.send_command(
hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command(
bd_addr=connection.peer_address, **esco_parameters.asdict()
)
),
)
handler = functools.partial(on_sco_request, protocol=protocol)
dlc.multiplexer.l2cap_channel.connection.device.on('sco_request', handler)
dlc.multiplexer.l2cap_channel.once(
'close',
lambda: dlc.multiplexer.l2cap_channel.connection.device.remove_listener(
'sco_request', handler
),
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
async def main(): async def main():
@@ -101,7 +135,7 @@ async def main():
device.classic_enabled = True device.classic_enabled = True
# Create and register a server # Create and register a server
rfcomm_server = RfcommServer(device) rfcomm_server = rfcomm.Server(device)
# Listen for incoming DLC connections # Listen for incoming DLC connections
channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration)) channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration))