forked from auracaster/bumble_mirror
Compare commits
22 Commits
gbg/update
...
gbg/update
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50eae2ef54 | ||
|
|
c8883a7d0f | ||
|
|
51321caf5b | ||
|
|
51a94288e2 | ||
|
|
8758856e8c | ||
|
|
deba181857 | ||
|
|
c65188dcbf | ||
|
|
21d607898d | ||
|
|
2698d4534e | ||
|
|
bbcd64286a | ||
|
|
9140afbf8c | ||
|
|
90a682c71b | ||
|
|
e8737a8243 | ||
|
|
72fceca72e | ||
|
|
732294abbc | ||
|
|
dc1204531e | ||
|
|
962114379c | ||
|
|
e6913a3055 | ||
|
|
e21d122aef | ||
|
|
76bca03fe3 | ||
|
|
f1e5c9e59e | ||
|
|
1ceeccbbc0 |
2
.github/workflows/code-check.yml
vendored
2
.github/workflows/code-check.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install ".[build,test,development]"
|
||||
python -m pip install ".[build,test,development,pandora]"
|
||||
- name: Check
|
||||
run: |
|
||||
invoke project.pre-commit
|
||||
|
||||
2
.github/workflows/python-avatar.yml
vendored
2
.github/workflows/python-avatar.yml
vendored
@@ -32,7 +32,7 @@ jobs:
|
||||
- name: Install
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
python -m pip install .[avatar]
|
||||
python -m pip install .[avatar,pandora]
|
||||
- name: Rootcanal
|
||||
run: nohup python -m rootcanal > rootcanal.log &
|
||||
- name: Test
|
||||
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -6,6 +6,8 @@ dist/
|
||||
docs/mkdocs/site
|
||||
test-results.xml
|
||||
__pycache__
|
||||
# Vim
|
||||
.*.sw*
|
||||
# generated by setuptools_scm
|
||||
bumble/_version.py
|
||||
.vscode/launch.json
|
||||
|
||||
@@ -1234,6 +1234,7 @@ class Peripheral(Device.Listener, Connection.Listener):
|
||||
'cyan',
|
||||
)
|
||||
)
|
||||
|
||||
await self.connected.wait()
|
||||
logging.info(color('### Connected', 'cyan'))
|
||||
|
||||
@@ -1593,8 +1594,8 @@ def central(
|
||||
mode_factory = create_mode_factory(ctx, 'gatt-client')
|
||||
classic = ctx.obj['classic']
|
||||
|
||||
asyncio.run(
|
||||
Central(
|
||||
async def run_central():
|
||||
await Central(
|
||||
transport,
|
||||
peripheral_address,
|
||||
classic,
|
||||
@@ -1606,7 +1607,8 @@ def central(
|
||||
encrypt or authenticate,
|
||||
ctx.obj['extended_data_length'],
|
||||
).run()
|
||||
)
|
||||
|
||||
asyncio.run(run_central())
|
||||
|
||||
|
||||
@bench.command()
|
||||
@@ -1617,15 +1619,16 @@ def peripheral(ctx, transport):
|
||||
role_factory = create_role_factory(ctx, 'receiver')
|
||||
mode_factory = create_mode_factory(ctx, 'gatt-server')
|
||||
|
||||
asyncio.run(
|
||||
Peripheral(
|
||||
async def run_peripheral():
|
||||
await Peripheral(
|
||||
transport,
|
||||
ctx.obj['classic'],
|
||||
ctx.obj['extended_data_length'],
|
||||
role_factory,
|
||||
mode_factory,
|
||||
).run()
|
||||
)
|
||||
|
||||
asyncio.run(run_peripheral())
|
||||
|
||||
|
||||
def main():
|
||||
|
||||
@@ -652,7 +652,9 @@ class SbcPacketSource:
|
||||
|
||||
# Prepare for next packets
|
||||
sequence_number += 1
|
||||
sequence_number &= 0xFFFF
|
||||
timestamp += sum((frame.sample_count for frame in frames))
|
||||
timestamp &= 0xFFFFFFFF
|
||||
frames = [frame]
|
||||
frames_size = len(frame.payload)
|
||||
else:
|
||||
|
||||
@@ -325,8 +325,8 @@ class MediaPacket:
|
||||
self.padding = padding
|
||||
self.extension = extension
|
||||
self.marker = marker
|
||||
self.sequence_number = sequence_number
|
||||
self.timestamp = timestamp
|
||||
self.sequence_number = sequence_number & 0xFFFF
|
||||
self.timestamp = timestamp & 0xFFFFFFFF
|
||||
self.ssrc = ssrc
|
||||
self.csrc_list = csrc_list
|
||||
self.payload_type = payload_type
|
||||
@@ -341,7 +341,12 @@ class MediaPacket:
|
||||
| len(self.csrc_list),
|
||||
self.marker << 7 | self.payload_type,
|
||||
]
|
||||
) + struct.pack('>HII', self.sequence_number, self.timestamp, self.ssrc)
|
||||
) + struct.pack(
|
||||
'>HII',
|
||||
self.sequence_number,
|
||||
self.timestamp,
|
||||
self.ssrc,
|
||||
)
|
||||
for csrc in self.csrc_list:
|
||||
header += struct.pack('>I', csrc)
|
||||
return header + self.payload
|
||||
|
||||
@@ -90,6 +90,22 @@ if TYPE_CHECKING:
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Utils
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
def show_services(services: Iterable[ServiceProxy]) -> None:
|
||||
for service in services:
|
||||
print(color(str(service), 'cyan'))
|
||||
|
||||
for characteristic in service.characteristics:
|
||||
print(color(' ' + str(characteristic), 'magenta'))
|
||||
|
||||
for descriptor in characteristic.descriptors:
|
||||
print(color(' ' + str(descriptor), 'green'))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Proxies
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
791
bumble/hfp.py
791
bumble/hfp.py
@@ -15,6 +15,9 @@
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
|
||||
import collections
|
||||
import collections.abc
|
||||
import logging
|
||||
import asyncio
|
||||
@@ -22,16 +25,32 @@ import dataclasses
|
||||
import enum
|
||||
import traceback
|
||||
import pyee
|
||||
from typing import Dict, List, Union, Set, Any, Optional, Type, TYPE_CHECKING
|
||||
import re
|
||||
from typing import (
|
||||
Dict,
|
||||
List,
|
||||
Union,
|
||||
Set,
|
||||
Any,
|
||||
Optional,
|
||||
Type,
|
||||
Tuple,
|
||||
ClassVar,
|
||||
Iterable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import at
|
||||
from bumble import device
|
||||
from bumble import rfcomm
|
||||
from bumble import sdp
|
||||
from bumble.colors import color
|
||||
from bumble.core import (
|
||||
ProtocolError,
|
||||
BT_GENERIC_AUDIO_SERVICE,
|
||||
BT_HANDSFREE_SERVICE,
|
||||
BT_HEADSET_AUDIO_GATEWAY_SERVICE,
|
||||
BT_L2CAP_PROTOCOL_ID,
|
||||
BT_RFCOMM_PROTOCOL_ID,
|
||||
)
|
||||
@@ -40,15 +59,6 @@ from bumble.hci import (
|
||||
CodingFormat,
|
||||
CodecID,
|
||||
)
|
||||
from bumble.sdp import (
|
||||
DataElement,
|
||||
ServiceAttribute,
|
||||
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -329,6 +339,21 @@ class CallInfo:
|
||||
type: Optional[int] = None
|
||||
|
||||
|
||||
class CmeError(enum.IntEnum):
|
||||
"""
|
||||
CME ERROR codes (partial listed).
|
||||
|
||||
TS 127 007 - V6.8.0, 9.2.1 General errors
|
||||
"""
|
||||
|
||||
PHONE_FAILURE = 0
|
||||
OPERATION_NOT_ALLOWED = 3
|
||||
OPERATION_NOT_SUPPORTED = 4
|
||||
MEMORY_FULL = 20
|
||||
INVALID_INDEX = 21
|
||||
NOT_FOUND = 22
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Hands-Free Control Interoperability Requirements
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -402,12 +427,21 @@ STATUS_CODES = [
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Configuration:
|
||||
class HfConfiguration:
|
||||
supported_hf_features: List[HfFeature]
|
||||
supported_hf_indicators: List[HfIndicator]
|
||||
supported_audio_codecs: List[AudioCodec]
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class AgConfiguration:
|
||||
supported_ag_features: Iterable[AgFeature]
|
||||
supported_ag_indicators: collections.abc.Sequence[AgIndicatorState]
|
||||
supported_hf_indicators: Iterable[HfIndicator]
|
||||
supported_ag_call_hold_operations: Iterable[CallHoldOperation]
|
||||
supported_audio_codecs: Iterable[AudioCodec]
|
||||
|
||||
|
||||
class AtResponseType(enum.Enum):
|
||||
"""
|
||||
Indicates if a response is expected from an AT command, and if multiple responses are accepted.
|
||||
@@ -435,18 +469,148 @@ class AtResponse:
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class AtCommand:
|
||||
class SubCode(str, enum.Enum):
|
||||
NONE = ''
|
||||
SET = '='
|
||||
TEST = '=?'
|
||||
READ = '?'
|
||||
|
||||
code: str
|
||||
sub_code: SubCode
|
||||
parameters: list
|
||||
|
||||
_PARSE_PATTERN: ClassVar[re.Pattern] = re.compile(
|
||||
r'AT\+(?P<code>[A-Z]+)(?P<sub_code>=\?|=|\?)?(?P<parameters>.*)'
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
|
||||
if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())):
|
||||
if buffer.startswith(b'ATA'):
|
||||
return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[])
|
||||
if buffer.startswith(b'ATD'):
|
||||
return cls(
|
||||
code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]]
|
||||
)
|
||||
raise HfpProtocolError('Invalid command')
|
||||
|
||||
parameters = []
|
||||
if parameters_text := match.group('parameters'):
|
||||
parameters = at.parse_parameters(parameters_text.encode())
|
||||
|
||||
return cls(
|
||||
code=match.group('code'),
|
||||
sub_code=AtCommand.SubCode(match.group('sub_code') or ''),
|
||||
parameters=parameters,
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class AgIndicatorState:
|
||||
description: str
|
||||
index: int
|
||||
"""State wrapper of AG indicator.
|
||||
|
||||
Attributes:
|
||||
indicator: Indicator of this indicator state.
|
||||
supported_values: Supported values of this indicator.
|
||||
current_status: Current status of this indicator.
|
||||
index: (HF only) Index of this indicator.
|
||||
enabled: (AG only) Whether this indicator is enabled to report.
|
||||
on_test_text: Text message reported in AT+CIND=? of this indicator.
|
||||
"""
|
||||
|
||||
indicator: AgIndicator
|
||||
supported_values: Set[int]
|
||||
current_status: int
|
||||
index: Optional[int] = None
|
||||
enabled: bool = True
|
||||
|
||||
@property
|
||||
def on_test_text(self) -> str:
|
||||
min_value = min(self.supported_values)
|
||||
max_value = max(self.supported_values)
|
||||
if len(self.supported_values) == (max_value - min_value + 1):
|
||||
supported_values_text = f'({min_value}-{max_value})'
|
||||
else:
|
||||
supported_values_text = (
|
||||
f'({",".join(str(v) for v in self.supported_values)})'
|
||||
)
|
||||
return f'(\"{self.indicator.value}\",{supported_values_text})'
|
||||
|
||||
@classmethod
|
||||
def call(cls: Type[Self]) -> Self:
|
||||
"""Default call indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def callsetup(cls: Type[Self]) -> Self:
|
||||
"""Default callsetup indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.CALL_SETUP,
|
||||
supported_values={0, 1, 2, 3},
|
||||
current_status=0,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def callheld(cls: Type[Self]) -> Self:
|
||||
"""Default call indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.CALL_HELD,
|
||||
supported_values={0, 1, 2},
|
||||
current_status=0,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def service(cls: Type[Self]) -> Self:
|
||||
"""Default service indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def signal(cls: Type[Self]) -> Self:
|
||||
"""Default signal indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.SIGNAL,
|
||||
supported_values={0, 1, 2, 3, 4, 5},
|
||||
current_status=0,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def roam(cls: Type[Self]) -> Self:
|
||||
"""Default roam indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def battchg(cls: Type[Self]) -> Self:
|
||||
"""Default battery charge indicator state."""
|
||||
return cls(
|
||||
indicator=AgIndicator.BATTERY_CHARGE,
|
||||
supported_values={0, 1, 2, 3, 4, 5},
|
||||
current_status=0,
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class HfIndicatorState:
|
||||
"""State wrapper of HF indicator.
|
||||
|
||||
Attributes:
|
||||
indicator: Indicator of this indicator state.
|
||||
supported: Whether this indicator is supported.
|
||||
enabled: Whether this indicator is enabled.
|
||||
current_status: Current (last-reported) status value of this indicaotr.
|
||||
"""
|
||||
|
||||
indicator: HfIndicator
|
||||
supported: bool = False
|
||||
enabled: bool = False
|
||||
current_status: int = 0
|
||||
|
||||
|
||||
class HfProtocol(pyee.EventEmitter):
|
||||
@@ -464,6 +628,9 @@ class HfProtocol(pyee.EventEmitter):
|
||||
ag_indicator: AgIndicator
|
||||
"""
|
||||
|
||||
class HfLoopTermination(HfpProtocolError): ...
|
||||
"""Termination signal for run() loop."""
|
||||
|
||||
supported_hf_features: int
|
||||
supported_audio_codecs: List[AudioCodec]
|
||||
|
||||
@@ -477,14 +644,14 @@ class HfProtocol(pyee.EventEmitter):
|
||||
command_lock: asyncio.Lock
|
||||
if TYPE_CHECKING:
|
||||
response_queue: asyncio.Queue[AtResponse]
|
||||
unsolicited_queue: asyncio.Queue[AtResponse]
|
||||
unsolicited_queue: asyncio.Queue[Optional[AtResponse]]
|
||||
else:
|
||||
response_queue: asyncio.Queue
|
||||
unsolicited_queue: asyncio.Queue
|
||||
read_buffer: bytearray
|
||||
active_codec: AudioCodec
|
||||
|
||||
def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None:
|
||||
def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None:
|
||||
super().__init__()
|
||||
|
||||
# Configure internal state.
|
||||
@@ -494,13 +661,14 @@ class HfProtocol(pyee.EventEmitter):
|
||||
self.unsolicited_queue = asyncio.Queue()
|
||||
self.read_buffer = bytearray()
|
||||
self.active_codec = AudioCodec.CVSD
|
||||
self._slc_initialized = False
|
||||
|
||||
# Build local features.
|
||||
self.supported_hf_features = sum(configuration.supported_hf_features)
|
||||
self.supported_audio_codecs = configuration.supported_audio_codecs
|
||||
|
||||
self.hf_indicators = {
|
||||
indicator: HfIndicatorState()
|
||||
indicator: HfIndicatorState(indicator=indicator)
|
||||
for indicator in configuration.supported_hf_indicators
|
||||
}
|
||||
|
||||
@@ -511,6 +679,10 @@ class HfProtocol(pyee.EventEmitter):
|
||||
|
||||
# Bind the AT reader to the RFCOMM channel.
|
||||
self.dlc.sink = self._read_at
|
||||
# Stop the run() loop when L2CAP is closed.
|
||||
self.dlc.multiplexer.l2cap_channel.on(
|
||||
'close', lambda: self.unsolicited_queue.put_nowait(None)
|
||||
)
|
||||
|
||||
def supports_hf_feature(self, feature: HfFeature) -> bool:
|
||||
return (self.supported_hf_features & feature) != 0
|
||||
@@ -621,7 +793,7 @@ class HfProtocol(pyee.EventEmitter):
|
||||
# If both the HF and AG do support the Codec Negotiation feature
|
||||
# then the HF shall send the AT+BAC=<HF available codecs> command to
|
||||
# the AG to notify the AG of the available codecs in the HF.
|
||||
codecs = [str(c) for c in self.supported_audio_codecs]
|
||||
codecs = [str(c.value) for c in self.supported_audio_codecs]
|
||||
await self.execute_command(f"AT+BAC={','.join(codecs)}")
|
||||
|
||||
# 4.2.1.3 AG Indicators
|
||||
@@ -639,7 +811,7 @@ class HfProtocol(pyee.EventEmitter):
|
||||
|
||||
self.ag_indicators = []
|
||||
for index, indicator in enumerate(response.parameters):
|
||||
description = indicator[0].decode()
|
||||
description = AgIndicator(indicator[0].decode())
|
||||
supported_values = []
|
||||
for value in indicator[1]:
|
||||
value = value.split(b'-')
|
||||
@@ -697,7 +869,7 @@ class HfProtocol(pyee.EventEmitter):
|
||||
# shall send the AT+BIND=<HF supported HF indicators> command to the AG
|
||||
# to notify the AG of the supported indicators’ assigned numbers in the
|
||||
# HF. The AG shall respond with OK
|
||||
indicators = [str(i) for i in self.hf_indicators.keys()]
|
||||
indicators = [str(i.value) for i in self.hf_indicators]
|
||||
await self.execute_command(f"AT+BIND={','.join(indicators)}")
|
||||
|
||||
# After having provided the AG with the HF indicators it supports,
|
||||
@@ -733,6 +905,7 @@ class HfProtocol(pyee.EventEmitter):
|
||||
self.hf_indicators[indicator].enabled = True
|
||||
|
||||
logger.info("SLC setup completed")
|
||||
self._slc_initialized = True
|
||||
|
||||
async def setup_audio_connection(self):
|
||||
"""4.11.2 Audio Connection Setup by HF."""
|
||||
@@ -820,15 +993,17 @@ class HfProtocol(pyee.EventEmitter):
|
||||
return calls
|
||||
|
||||
async def update_ag_indicator(self, index: int, value: int):
|
||||
self.ag_indicators[index].current_status = value
|
||||
self.emit('ag_indicator', self.ag_indicators[index])
|
||||
logger.info(
|
||||
f"AG indicator updated: {self.ag_indicators[index].description}, {value}"
|
||||
)
|
||||
# CIEV is in 1-index, while ag_indicators is in 0-index.
|
||||
ag_indicator = self.ag_indicators[index - 1]
|
||||
ag_indicator.current_status = value
|
||||
self.emit('ag_indicator', ag_indicator)
|
||||
logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}")
|
||||
|
||||
async def handle_unsolicited(self):
|
||||
"""Handle unsolicited result codes sent by the audio gateway."""
|
||||
result = await self.unsolicited_queue.get()
|
||||
if not result:
|
||||
raise HfProtocol.HfLoopTermination()
|
||||
if result.code == "+BCS":
|
||||
await self.setup_codec_connection(int(result.parameters[0]))
|
||||
elif result.code == "+CIEV":
|
||||
@@ -846,14 +1021,350 @@ class HfProtocol(pyee.EventEmitter):
|
||||
"""
|
||||
|
||||
try:
|
||||
await self.initiate_slc()
|
||||
if not self._slc_initialized:
|
||||
await self.initiate_slc()
|
||||
while True:
|
||||
await self.handle_unsolicited()
|
||||
except HfProtocol.HfLoopTermination:
|
||||
logger.info('Loop terminated')
|
||||
except Exception:
|
||||
logger.error("HFP-HF protocol failed with the following error:")
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
|
||||
class AgProtocol(pyee.EventEmitter):
|
||||
"""
|
||||
Implementation for the Audio-Gateway side of the Hands-Free profile.
|
||||
|
||||
Reference specification Hands-Free Profile v1.8.
|
||||
|
||||
Emitted events:
|
||||
slc_complete: Emit when SLC procedure is completed.
|
||||
codec_negotiation: When codec is renegotiated, notify the new codec.
|
||||
Args:
|
||||
active_codec: AudioCodec
|
||||
hf_indicator: When HF update their indicators, notify the new state.
|
||||
Args:
|
||||
hf_indicator: HfIndicator
|
||||
codec_connection_request: Emit when HF sends AT+BCC to request codec connection.
|
||||
answer: Emit when HF sends ATA to answer phone call.
|
||||
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
|
||||
dial: Emit when HF sends ATD to dial phone call.
|
||||
"""
|
||||
|
||||
supported_hf_features: int
|
||||
supported_hf_indicators: Set[HfIndicator]
|
||||
supported_audio_codecs: List[AudioCodec]
|
||||
|
||||
supported_ag_features: int
|
||||
supported_ag_call_hold_operations: List[CallHoldOperation]
|
||||
|
||||
ag_indicators: List[AgIndicatorState]
|
||||
hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState]
|
||||
|
||||
dlc: rfcomm.DLC
|
||||
|
||||
read_buffer: bytearray
|
||||
active_codec: AudioCodec
|
||||
|
||||
indicator_report_enabled: bool
|
||||
inband_ringtone_enabled: bool
|
||||
cme_error_enabled: bool
|
||||
_remained_slc_setup_features: Set[HfFeature]
|
||||
|
||||
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
|
||||
super().__init__()
|
||||
|
||||
# Configure internal state.
|
||||
self.dlc = dlc
|
||||
self.read_buffer = bytearray()
|
||||
self.active_codec = AudioCodec.CVSD
|
||||
|
||||
# Build local features.
|
||||
self.supported_ag_features = sum(configuration.supported_ag_features)
|
||||
self.supported_ag_call_hold_operations = list(
|
||||
configuration.supported_ag_call_hold_operations
|
||||
)
|
||||
self.ag_indicators = list(configuration.supported_ag_indicators)
|
||||
self.supported_hf_indicators = set(configuration.supported_hf_indicators)
|
||||
self.inband_ringtone_enabled = True
|
||||
self._remained_slc_setup_features = set()
|
||||
|
||||
# Clear remote features.
|
||||
self.supported_hf_features = 0
|
||||
self.supported_audio_codecs = []
|
||||
self.indicator_report_enabled = False
|
||||
self.cme_error_enabled = False
|
||||
|
||||
self.hf_indicators = collections.OrderedDict()
|
||||
|
||||
# Bind the AT reader to the RFCOMM channel.
|
||||
self.dlc.sink = self._read_at
|
||||
|
||||
def supports_hf_feature(self, feature: HfFeature) -> bool:
|
||||
return (self.supported_hf_features & feature) != 0
|
||||
|
||||
def supports_ag_feature(self, feature: AgFeature) -> bool:
|
||||
return (self.supported_ag_features & feature) != 0
|
||||
|
||||
def _read_at(self, data: bytes):
|
||||
"""
|
||||
Reads AT messages from the RFCOMM channel.
|
||||
"""
|
||||
# Append to the read buffer.
|
||||
self.read_buffer.extend(data)
|
||||
|
||||
# Locate the trailer.
|
||||
trailer = self.read_buffer.find(b'\r')
|
||||
if trailer == -1:
|
||||
return
|
||||
|
||||
# Isolate the AT response code and parameters.
|
||||
raw_command = self.read_buffer[:trailer]
|
||||
command = AtCommand.parse_from(raw_command)
|
||||
logger.debug(f"<<< {raw_command.decode()}")
|
||||
|
||||
# Consume the response bytes.
|
||||
self.read_buffer = self.read_buffer[trailer + 1 :]
|
||||
|
||||
if command.sub_code == AtCommand.SubCode.TEST:
|
||||
handler_name = f'_on_{command.code.lower()}_test'
|
||||
elif command.sub_code == AtCommand.SubCode.READ:
|
||||
handler_name = f'_on_{command.code.lower()}_read'
|
||||
else:
|
||||
handler_name = f'_on_{command.code.lower()}'
|
||||
|
||||
if handler := getattr(self, handler_name, None):
|
||||
handler(*command.parameters)
|
||||
else:
|
||||
logger.warning('Handler %s not found', handler_name)
|
||||
self.send_response('ERROR')
|
||||
|
||||
def send_response(self, response: str) -> None:
|
||||
"""Sends an AT response."""
|
||||
self.dlc.write(f'\r\n{response}\r\n')
|
||||
|
||||
def send_cme_error(self, error_code: CmeError) -> None:
|
||||
"""Sends an CME ERROR response.
|
||||
|
||||
If CME Error is not enabled by HF, sends ERROR instead.
|
||||
"""
|
||||
if self.cme_error_enabled:
|
||||
self.send_response(f'+CME ERROR: {error_code.value}')
|
||||
else:
|
||||
self.send_error()
|
||||
|
||||
def send_ok(self) -> None:
|
||||
"""Sends an OK response."""
|
||||
self.send_response('OK')
|
||||
|
||||
def send_error(self) -> None:
|
||||
"""Sends an ERROR response."""
|
||||
self.send_response('ERROR')
|
||||
|
||||
def set_inband_ringtone_enabled(self, enabled: bool) -> None:
|
||||
"""Enables or disables in-band ringtone."""
|
||||
|
||||
self.inband_ringtone_enabled = enabled
|
||||
self.send_response(f'+BSIR: {1 if enabled else 0}')
|
||||
|
||||
def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
|
||||
"""Updates AG indicator.
|
||||
|
||||
Args:
|
||||
indicator: Name of the indicator.
|
||||
value: new value of the indicator.
|
||||
"""
|
||||
|
||||
search_result = next(
|
||||
(
|
||||
(index, state)
|
||||
for index, state in enumerate(self.ag_indicators)
|
||||
if state.indicator == indicator
|
||||
),
|
||||
None,
|
||||
)
|
||||
if not search_result:
|
||||
raise KeyError(f'{indicator} is not supported.')
|
||||
|
||||
index, indicator_state = search_result
|
||||
if not self.indicator_report_enabled:
|
||||
logger.warning('AG indicator report is disabled')
|
||||
if not indicator_state.enabled:
|
||||
logger.warning(f'AG indicator {indicator} is disabled')
|
||||
|
||||
indicator_state.current_status = value
|
||||
self.send_response(f'+CIEV: {index+1},{value}')
|
||||
|
||||
async def negotiate_codec(self, codec: AudioCodec) -> None:
|
||||
"""Starts codec negotiation."""
|
||||
|
||||
if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
|
||||
logger.warning('Local does not support Codec Negotiation')
|
||||
if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION):
|
||||
logger.warning('Peer does not support Codec Negotiation')
|
||||
if codec not in self.supported_audio_codecs:
|
||||
logger.warning(f'{codec} is not supported by peer')
|
||||
|
||||
at_bcs_future = asyncio.get_running_loop().create_future()
|
||||
self.once('codec_negotiation', at_bcs_future.set_result)
|
||||
self.send_response(f'+BCS: {codec.value}')
|
||||
if (new_codec := await at_bcs_future) != codec:
|
||||
raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
|
||||
|
||||
def _check_remained_slc_commands(self) -> None:
|
||||
if not self._remained_slc_setup_features:
|
||||
self.emit('slc_complete')
|
||||
|
||||
def _on_brsf(self, hf_features: bytes) -> None:
|
||||
self.supported_hf_features = int(hf_features)
|
||||
self.send_response(f'+BRSF: {self.supported_ag_features}')
|
||||
self.send_ok()
|
||||
|
||||
if self.supports_hf_feature(
|
||||
HfFeature.HF_INDICATORS
|
||||
) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
|
||||
self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS)
|
||||
|
||||
if self.supports_hf_feature(
|
||||
HfFeature.THREE_WAY_CALLING
|
||||
) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
|
||||
self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING)
|
||||
|
||||
def _on_bac(self, *args) -> None:
|
||||
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
|
||||
self.send_ok()
|
||||
|
||||
def _on_bcs(self, codec: bytes) -> None:
|
||||
self.active_codec = AudioCodec(int(codec))
|
||||
self.send_ok()
|
||||
self.emit('codec_negotiation', self.active_codec)
|
||||
|
||||
def _on_cind_test(self) -> None:
|
||||
if not self.ag_indicators:
|
||||
self.send_cme_error(CmeError.NOT_FOUND)
|
||||
return
|
||||
|
||||
indicator_list_str = ",".join(
|
||||
indicator.on_test_text for indicator in self.ag_indicators
|
||||
)
|
||||
self.send_response(f'+CIND: {indicator_list_str}')
|
||||
self.send_ok()
|
||||
|
||||
def _on_cind_read(self) -> None:
|
||||
if not self.ag_indicators:
|
||||
self.send_cme_error(CmeError.NOT_FOUND)
|
||||
return
|
||||
|
||||
indicator_list_str = ",".join(
|
||||
str(indicator.current_status) for indicator in self.ag_indicators
|
||||
)
|
||||
self.send_response(f'+CIND: {indicator_list_str}')
|
||||
self.send_ok()
|
||||
|
||||
self._check_remained_slc_commands()
|
||||
|
||||
def _on_cmer(
|
||||
self,
|
||||
mode: bytes,
|
||||
keypad: Optional[bytes] = None,
|
||||
display: Optional[bytes] = None,
|
||||
indicator: bytes = b'',
|
||||
) -> None:
|
||||
if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1):
|
||||
logger.error(
|
||||
f'Unexpected values: mode={mode!r}, keypad={keypad!r}, '
|
||||
f'display={display!r}, indicator={indicator!r}'
|
||||
)
|
||||
self.send_cme_error(CmeError.INVALID_INDEX)
|
||||
|
||||
self.indicator_report_enabled = bool(int(indicator))
|
||||
self.send_ok()
|
||||
|
||||
def _on_cmee(self, enabled: bytes) -> None:
|
||||
self.cme_error_enabled = bool(int(enabled))
|
||||
self.send_ok()
|
||||
|
||||
def _on_bind(self, *args) -> None:
|
||||
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
|
||||
self.send_error()
|
||||
return
|
||||
|
||||
peer_supported_indicators = set(
|
||||
HfIndicator(int(indicator)) for indicator in args
|
||||
)
|
||||
self.hf_indicators = collections.OrderedDict(
|
||||
{
|
||||
indicator: HfIndicatorState(indicator=indicator)
|
||||
for indicator in self.supported_hf_indicators.intersection(
|
||||
peer_supported_indicators
|
||||
)
|
||||
}
|
||||
)
|
||||
self.send_ok()
|
||||
|
||||
def _on_bind_test(self) -> None:
|
||||
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
|
||||
self.send_error()
|
||||
return
|
||||
|
||||
hf_indicator_list_str = ",".join(
|
||||
str(indicator.value) for indicator in self.supported_hf_indicators
|
||||
)
|
||||
self.send_response(f'+BIND: ({hf_indicator_list_str})')
|
||||
self.send_ok()
|
||||
|
||||
def _on_bind_read(self) -> None:
|
||||
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
|
||||
self.send_error()
|
||||
return
|
||||
|
||||
for indicator in self.hf_indicators:
|
||||
self.send_response(f'+BIND: {indicator.value},1')
|
||||
|
||||
self.send_ok()
|
||||
|
||||
self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS)
|
||||
self._check_remained_slc_commands()
|
||||
|
||||
def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None:
|
||||
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
|
||||
self.send_error()
|
||||
return
|
||||
|
||||
index = HfIndicator(int(index_bytes))
|
||||
if index not in self.hf_indicators:
|
||||
self.send_error()
|
||||
return
|
||||
|
||||
self.hf_indicators[index].current_status = int(value_bytes)
|
||||
self.emit('hf_indicator', self.hf_indicators[index])
|
||||
self.send_ok()
|
||||
|
||||
def _on_bia(self, *args) -> None:
|
||||
for enabled, state in zip(args, self.ag_indicators):
|
||||
state.enabled = bool(int(enabled))
|
||||
self.send_ok()
|
||||
|
||||
def _on_bcc(self) -> None:
|
||||
self.emit('codec_connection_request')
|
||||
self.send_ok()
|
||||
|
||||
def _on_a(self) -> None:
|
||||
"""ATA handler."""
|
||||
self.emit('answer')
|
||||
self.send_ok()
|
||||
|
||||
def _on_d(self, number: bytes) -> None:
|
||||
"""ATD handler."""
|
||||
self.emit('dial', number.decode())
|
||||
self.send_ok()
|
||||
|
||||
def _on_chup(self) -> None:
|
||||
self.emit('hang_up')
|
||||
self.send_ok()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Normative SDP definitions
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -907,9 +1418,12 @@ class AgSdpFeature(enum.IntFlag):
|
||||
VOICE_RECOGNITION_TEST = 0x80
|
||||
|
||||
|
||||
def sdp_records(
|
||||
service_record_handle: int, rfcomm_channel: int, configuration: Configuration
|
||||
) -> List[ServiceAttribute]:
|
||||
def make_hf_sdp_records(
|
||||
service_record_handle: int,
|
||||
rfcomm_channel: int,
|
||||
configuration: HfConfiguration,
|
||||
version: ProfileVersion = ProfileVersion.V1_8,
|
||||
) -> List[sdp.ServiceAttribute]:
|
||||
"""
|
||||
Generates the SDP record for HFP Hands-Free support.
|
||||
|
||||
@@ -941,53 +1455,226 @@ def sdp_records(
|
||||
hf_supported_features |= HfSdpFeature.WIDE_BAND
|
||||
|
||||
return [
|
||||
ServiceAttribute(
|
||||
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_32(service_record_handle),
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||||
sdp.DataElement.unsigned_integer_32(service_record_handle),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_HANDSFREE_SERVICE),
|
||||
DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
|
||||
sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
|
||||
sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
|
||||
DataElement.sequence(
|
||||
sdp.DataElement.sequence(
|
||||
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
|
||||
),
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
|
||||
DataElement.unsigned_integer_8(rfcomm_channel),
|
||||
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
|
||||
sdp.DataElement.unsigned_integer_8(rfcomm_channel),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
DataElement.sequence(
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
DataElement.sequence(
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
DataElement.uuid(BT_HANDSFREE_SERVICE),
|
||||
DataElement.unsigned_integer_16(ProfileVersion.V1_8),
|
||||
sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
|
||||
sdp.DataElement.unsigned_integer_16(version),
|
||||
]
|
||||
)
|
||||
]
|
||||
),
|
||||
),
|
||||
ServiceAttribute(
|
||||
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||||
DataElement.unsigned_integer_16(hf_supported_features),
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||||
sdp.DataElement.unsigned_integer_16(hf_supported_features),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def make_ag_sdp_records(
|
||||
service_record_handle: int,
|
||||
rfcomm_channel: int,
|
||||
configuration: AgConfiguration,
|
||||
version: ProfileVersion = ProfileVersion.V1_8,
|
||||
) -> List[sdp.ServiceAttribute]:
|
||||
"""
|
||||
Generates the SDP record for HFP Audio-Gateway support.
|
||||
|
||||
The record exposes the features supported in the input configuration,
|
||||
and the allocated RFCOMM channel.
|
||||
"""
|
||||
|
||||
ag_supported_features = 0
|
||||
|
||||
if AgFeature.EC_NR in configuration.supported_ag_features:
|
||||
ag_supported_features |= AgSdpFeature.EC_NR
|
||||
if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features:
|
||||
ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING
|
||||
if (
|
||||
AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS
|
||||
in configuration.supported_ag_features
|
||||
):
|
||||
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
|
||||
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
|
||||
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
|
||||
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
|
||||
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
|
||||
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
|
||||
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
|
||||
if AudioCodec.MSBC in configuration.supported_audio_codecs:
|
||||
ag_supported_features |= AgSdpFeature.WIDE_BAND
|
||||
|
||||
return [
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
|
||||
sdp.DataElement.unsigned_integer_32(service_record_handle),
|
||||
),
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
|
||||
sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
|
||||
]
|
||||
),
|
||||
),
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
sdp.DataElement.sequence(
|
||||
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
|
||||
),
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
|
||||
sdp.DataElement.unsigned_integer_8(rfcomm_channel),
|
||||
]
|
||||
),
|
||||
]
|
||||
),
|
||||
),
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
sdp.DataElement.sequence(
|
||||
[
|
||||
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
|
||||
sdp.DataElement.unsigned_integer_16(version),
|
||||
]
|
||||
)
|
||||
]
|
||||
),
|
||||
),
|
||||
sdp.ServiceAttribute(
|
||||
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||||
sdp.DataElement.unsigned_integer_16(ag_supported_features),
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
async def find_hf_sdp_record(
|
||||
connection: device.Connection,
|
||||
) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]:
|
||||
"""Searches a Hands-Free SDP record from remote device.
|
||||
|
||||
Args:
|
||||
connection: ACL connection to make SDP search.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping from channel number to service class UUID list.
|
||||
"""
|
||||
async with sdp.Client(connection) as sdp_client:
|
||||
search_result = await sdp_client.search_attributes(
|
||||
uuids=[BT_HANDSFREE_SERVICE],
|
||||
attribute_ids=[
|
||||
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||||
],
|
||||
)
|
||||
for attribute_lists in search_result:
|
||||
channel: Optional[int] = None
|
||||
version: Optional[ProfileVersion] = None
|
||||
features: Optional[HfSdpFeature] = None
|
||||
for attribute in attribute_lists:
|
||||
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
|
||||
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
||||
protocol_descriptor_list = attribute.value.value
|
||||
channel = protocol_descriptor_list[1].value[1].value
|
||||
elif (
|
||||
attribute.id
|
||||
== sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
|
||||
):
|
||||
profile_descriptor_list = attribute.value.value
|
||||
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
|
||||
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
|
||||
features = HfSdpFeature(attribute.value.value)
|
||||
if not channel or not version or features is None:
|
||||
logger.warning(f"Bad result {attribute_lists}.")
|
||||
return None
|
||||
return (channel, version, features)
|
||||
return None
|
||||
|
||||
|
||||
async def find_ag_sdp_record(
|
||||
connection: device.Connection,
|
||||
) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]:
|
||||
"""Searches an Audio-Gateway SDP record from remote device.
|
||||
|
||||
Args:
|
||||
connection: ACL connection to make SDP search.
|
||||
|
||||
Returns:
|
||||
Dictionary mapping from channel number to service class UUID list.
|
||||
"""
|
||||
async with sdp.Client(connection) as sdp_client:
|
||||
search_result = await sdp_client.search_attributes(
|
||||
uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE],
|
||||
attribute_ids=[
|
||||
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
|
||||
],
|
||||
)
|
||||
for attribute_lists in search_result:
|
||||
channel: Optional[int] = None
|
||||
version: Optional[ProfileVersion] = None
|
||||
features: Optional[AgSdpFeature] = None
|
||||
for attribute in attribute_lists:
|
||||
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
|
||||
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
|
||||
protocol_descriptor_list = attribute.value.value
|
||||
channel = protocol_descriptor_list[1].value[1].value
|
||||
elif (
|
||||
attribute.id
|
||||
== sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
|
||||
):
|
||||
profile_descriptor_list = attribute.value.value
|
||||
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
|
||||
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
|
||||
features = AgSdpFeature(attribute.value.value)
|
||||
if not channel or not version or features is None:
|
||||
logger.warning(f"Bad result {attribute_lists}.")
|
||||
return None
|
||||
return (channel, version, features)
|
||||
return None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# ESCO Codec Default Parameters
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -184,7 +184,7 @@ class Host(AbortableEventEmitter):
|
||||
self.long_term_key_provider = None
|
||||
self.link_key_provider = None
|
||||
self.pairing_io_capability_provider = None # Classic only
|
||||
self.snooper = None
|
||||
self.snooper: Optional[Snooper] = None
|
||||
|
||||
# Connect to the source and sink if specified
|
||||
if controller_source:
|
||||
|
||||
@@ -19,8 +19,8 @@
|
||||
import struct
|
||||
from typing import Optional, Tuple
|
||||
|
||||
from ..gatt_client import ProfileServiceProxy
|
||||
from ..gatt import (
|
||||
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
|
||||
from bumble.gatt import (
|
||||
GATT_DEVICE_INFORMATION_SERVICE,
|
||||
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
|
||||
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
|
||||
@@ -104,7 +104,16 @@ class DeviceInformationService(TemplateService):
|
||||
class DeviceInformationServiceProxy(ProfileServiceProxy):
|
||||
SERVICE_CLASS = DeviceInformationService
|
||||
|
||||
def __init__(self, service_proxy):
|
||||
manufacturer_name: Optional[UTF8CharacteristicAdapter]
|
||||
model_number: Optional[UTF8CharacteristicAdapter]
|
||||
serial_number: Optional[UTF8CharacteristicAdapter]
|
||||
hardware_revision: Optional[UTF8CharacteristicAdapter]
|
||||
firmware_revision: Optional[UTF8CharacteristicAdapter]
|
||||
software_revision: Optional[UTF8CharacteristicAdapter]
|
||||
system_id: Optional[DelegatedCharacteristicAdapter]
|
||||
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
|
||||
|
||||
def __init__(self, service_proxy: ServiceProxy):
|
||||
self.service_proxy = service_proxy
|
||||
|
||||
for field, uuid in (
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import socket
|
||||
|
||||
from .common import Transport, StreamPacketSource
|
||||
|
||||
@@ -28,6 +29,13 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
|
||||
# A pass-through function to ease mock testing.
|
||||
async def _create_server(*args, **kw_args):
|
||||
await asyncio.get_running_loop().create_server(*args, **kw_args)
|
||||
|
||||
|
||||
async def open_tcp_server_transport(spec: str) -> Transport:
|
||||
'''
|
||||
Open a TCP server transport.
|
||||
@@ -38,7 +46,22 @@ async def open_tcp_server_transport(spec: str) -> Transport:
|
||||
|
||||
Example: _:9001
|
||||
'''
|
||||
local_host, local_port = spec.split(':')
|
||||
return await _open_tcp_server_transport_impl(
|
||||
host=local_host if local_host != '_' else None, port=int(local_port)
|
||||
)
|
||||
|
||||
|
||||
async def open_tcp_server_transport_with_socket(sock: socket.socket) -> Transport:
|
||||
'''
|
||||
Open a TCP server transport with an existing socket.
|
||||
|
||||
One reason to use this variant is to let python pick an unused port.
|
||||
'''
|
||||
return await _open_tcp_server_transport_impl(sock=sock)
|
||||
|
||||
|
||||
async def _open_tcp_server_transport_impl(**kwargs) -> Transport:
|
||||
class TcpServerTransport(Transport):
|
||||
async def close(self):
|
||||
await super().close()
|
||||
@@ -77,13 +100,10 @@ async def open_tcp_server_transport(spec: str) -> Transport:
|
||||
else:
|
||||
logger.debug('no client, dropping packet')
|
||||
|
||||
local_host, local_port = spec.split(':')
|
||||
packet_source = StreamPacketSource()
|
||||
packet_sink = TcpServerPacketSink()
|
||||
await asyncio.get_running_loop().create_server(
|
||||
lambda: TcpServerProtocol(packet_source, packet_sink),
|
||||
host=local_host if local_host != '_' else None,
|
||||
port=int(local_port),
|
||||
await _create_server(
|
||||
lambda: TcpServerProtocol(packet_source, packet_sink), **kwargs
|
||||
)
|
||||
|
||||
return TcpServerTransport(packet_source, packet_sink)
|
||||
|
||||
@@ -61,7 +61,7 @@ async def func4(x, y):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
print("MAIN: start, loop=", asyncio.get_running_loop())
|
||||
print("MAIN: invoke func1")
|
||||
func1(1, 2)
|
||||
|
||||
@@ -21,23 +21,29 @@ import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.transport import open_transport
|
||||
from bumble.profiles.battery_service import BatteryServiceProxy
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
|
||||
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
|
||||
@@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: python battery_server.py <device-config> <transport-spec>')
|
||||
print('example: python battery_server.py device1.json usb:0')
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Add a Battery Service to the GATT sever
|
||||
battery_service = BatteryService(lambda _: random.randint(0, 100))
|
||||
|
||||
@@ -21,12 +21,13 @@ import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.hci import Address
|
||||
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
|
||||
from bumble.transport import open_transport
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print(
|
||||
'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
|
||||
@@ -35,11 +36,16 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
|
||||
@@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: python device_info_server.py <device-config> <transport-spec>')
|
||||
print('example: python device_info_server.py device1.json usb:0')
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Add a Device Information Service to the GATT sever
|
||||
device_information_service = DeviceInformationService(
|
||||
@@ -64,7 +66,7 @@ async def main():
|
||||
# Go!
|
||||
await device.power_on()
|
||||
await device.start_advertising(auto_restart=True)
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -21,23 +21,29 @@ import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.transport import open_transport
|
||||
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
|
||||
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create and start a device
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
|
||||
@@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
|
||||
print('example: python heart_rate_server.py device1.json usb:0')
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Keep track of accumulated expended energy
|
||||
energy_start_time = time.time()
|
||||
|
||||
@@ -416,7 +416,7 @@ async def keyboard_device(device, command):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: python keyboard.py <device-config> <transport-spec> <command>'
|
||||
@@ -434,9 +434,11 @@ async def main():
|
||||
)
|
||||
return
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
# Create a device to manage the host
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
command = sys.argv[3]
|
||||
if command == 'connect':
|
||||
|
||||
@@ -139,18 +139,20 @@ async def find_a2dp_service(connection):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
|
||||
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Start the controller
|
||||
@@ -187,7 +189,7 @@ async def main():
|
||||
client = await AVDTP_Protocol.connect(connection, avdtp_version)
|
||||
|
||||
# Discover all endpoints on the remote device
|
||||
endpoints = await client.discover_remote_endpoints()
|
||||
endpoints = list(await client.discover_remote_endpoints())
|
||||
print(f'@@@ Found {len(endpoints)} endpoints')
|
||||
for endpoint in endpoints:
|
||||
print('@@@', endpoint)
|
||||
|
||||
@@ -19,6 +19,7 @@ import asyncio
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
@@ -41,7 +42,7 @@ from bumble.a2dp import (
|
||||
SbcMediaCodecInformation,
|
||||
)
|
||||
|
||||
Context = {'output': None}
|
||||
Context: Dict[Any, Any] = {'output': None}
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -104,7 +105,7 @@ def on_rtp_packet(packet):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
|
||||
@@ -114,14 +115,16 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
with open(sys.argv[3], 'wb') as sbc_file:
|
||||
Context['output'] = sbc_file
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Setup the SDP to expose the sink service
|
||||
@@ -162,7 +165,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
|
||||
@@ -126,11 +126,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Setup the SDP to expose the SRC service
|
||||
@@ -186,7 +188,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -28,7 +28,7 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]'
|
||||
@@ -50,10 +50,12 @@ async def main():
|
||||
target = None
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
if advertising_type.is_scannable:
|
||||
device.scan_response_data = bytes(
|
||||
@@ -66,7 +68,7 @@ async def main():
|
||||
|
||||
await device.power_on()
|
||||
await device.start_advertising(advertising_type=advertising_type, target=target)
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 4:
|
||||
print(
|
||||
'Usage: python run_asha_sink.py <device-config> <transport-spec> '
|
||||
@@ -60,8 +60,10 @@ async def main():
|
||||
|
||||
audio_out = open(sys.argv[3], 'wb')
|
||||
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
|
||||
# Handler for audio control commands
|
||||
def on_audio_control_point_write(_connection, value):
|
||||
@@ -197,7 +199,7 @@ async def main():
|
||||
await device.power_on()
|
||||
await device.start_advertising(auto_restart=True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
|
||||
@@ -341,11 +341,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Setup the SDP to expose the sink service
|
||||
|
||||
@@ -32,7 +32,7 @@ from bumble.sdp import (
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_classic_connect.py <device-config> <transport-spec> '
|
||||
@@ -42,11 +42,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
device.le_enabled = False
|
||||
await device.power_on()
|
||||
|
||||
@@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = {
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_classic_discoverable.py <device-config> <transport-spec>')
|
||||
print('example: run_classic_discoverable.py classic1.json usb:04b4:f901')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
device.sdp_service_records = SDP_SERVICE_RECORDS
|
||||
await device.power_on()
|
||||
@@ -111,7 +113,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,8 +20,8 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.core import DeviceClass
|
||||
|
||||
@@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: run_classic_discovery.py <transport-spec>')
|
||||
print('example: run_classic_discovery.py usb:04b4:f901')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
device.listener = DiscoveryListener()
|
||||
await device.power_on()
|
||||
await device.start_discovery()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> '
|
||||
@@ -37,11 +37,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
@@ -56,7 +58,7 @@ async def main():
|
||||
print(f'!!! Encryption failed: {error}')
|
||||
return
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 4:
|
||||
print(
|
||||
'Usage: run_controller.py <controller-address> <device-config> '
|
||||
@@ -49,7 +49,7 @@ async def main():
|
||||
return
|
||||
|
||||
print('>>> connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[3]) as hci_transport:
|
||||
print('>>> connected')
|
||||
|
||||
# Create a local link
|
||||
@@ -57,7 +57,10 @@ async def main():
|
||||
|
||||
# Create a first controller using the packet source/sink as its host interface
|
||||
controller1 = Controller(
|
||||
'C1', host_source=hci_source, host_sink=hci_sink, link=link
|
||||
'C1',
|
||||
host_source=hci_transport.source,
|
||||
host_sink=hci_transport.sink,
|
||||
link=link,
|
||||
)
|
||||
controller1.random_address = sys.argv[1]
|
||||
|
||||
@@ -98,7 +101,7 @@ async def main():
|
||||
await device.start_advertising()
|
||||
await device.start_scanning()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,9 +20,9 @@ import asyncio
|
||||
import sys
|
||||
import os
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.controller import Controller
|
||||
from bumble.hci import Address
|
||||
from bumble.link import LocalLink
|
||||
from bumble.transport import open_transport_or_link
|
||||
|
||||
@@ -45,14 +45,14 @@ class ScannerListener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 2:
|
||||
print('Usage: run_controller.py <transport-spec>')
|
||||
print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
|
||||
return
|
||||
|
||||
print('>>> connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('>>> connected')
|
||||
|
||||
# Create a local link
|
||||
@@ -60,22 +60,25 @@ async def main():
|
||||
|
||||
# Create a first controller using the packet source/sink as its host interface
|
||||
controller1 = Controller(
|
||||
'C1', host_source=hci_source, host_sink=hci_sink, link=link
|
||||
'C1',
|
||||
host_source=hci_transport.source,
|
||||
host_sink=hci_transport.sink,
|
||||
link=link,
|
||||
public_address='E0:E1:E2:E3:E4:E5',
|
||||
)
|
||||
controller1.address = 'E0:E1:E2:E3:E4:E5'
|
||||
|
||||
# Create a second controller using the same link
|
||||
controller2 = Controller('C2', link=link)
|
||||
|
||||
# Create a device with a scanner listener
|
||||
device = Device.with_hci(
|
||||
'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2
|
||||
'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
|
||||
)
|
||||
device.listener = ScannerListener()
|
||||
await device.power_on()
|
||||
await device.start_scanning()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,31 +20,36 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.hci import Address
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.snoop import BtSnooper
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) != 3:
|
||||
print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>')
|
||||
print('example: run_device_with_snooper.py usb:0 btsnoop.log')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
|
||||
with open(sys.argv[2], "wb") as snoop_file:
|
||||
device.host.snooper = BtSnooper(snoop_file)
|
||||
await device.power_on()
|
||||
await device.start_scanning()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -69,7 +69,7 @@ class Listener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_gatt_client.py <device-config> <transport-spec> '
|
||||
@@ -79,11 +79,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host, with a custom listener
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.listener = Listener(device)
|
||||
await device.power_on()
|
||||
|
||||
|
||||
@@ -19,21 +19,21 @@ import asyncio
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.core import ProtocolError
|
||||
from bumble.controller import Controller
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.hci import Address
|
||||
from bumble.host import Host
|
||||
from bumble.link import LocalLink
|
||||
from bumble.gatt import (
|
||||
Service,
|
||||
Characteristic,
|
||||
Descriptor,
|
||||
show_services,
|
||||
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
|
||||
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
|
||||
GATT_DEVICE_INFORMATION_SERVICE,
|
||||
)
|
||||
from bumble.gatt_client import show_services
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -43,7 +43,7 @@ class ServerListener(Device.Listener):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
# Create a local link
|
||||
link = LocalLink()
|
||||
|
||||
@@ -51,14 +51,18 @@ async def main():
|
||||
client_controller = Controller("client controller", link=link)
|
||||
client_host = Host()
|
||||
client_host.controller = client_controller
|
||||
client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host)
|
||||
client_device = Device(
|
||||
"client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host
|
||||
)
|
||||
await client_device.power_on()
|
||||
|
||||
# Setup a stack for the server
|
||||
server_controller = Controller("server controller", link=link)
|
||||
server_host = Host()
|
||||
server_host.controller = server_controller
|
||||
server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host)
|
||||
server_device = Device(
|
||||
"server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host
|
||||
)
|
||||
server_device.listener = ServerListener()
|
||||
await server_device.power_on()
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_gatt_server.py <device-config> <transport-spec> '
|
||||
@@ -81,11 +81,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.listener = Listener(device)
|
||||
|
||||
# Add a few entries to the device's GATT server
|
||||
@@ -146,7 +148,7 @@ async def main():
|
||||
else:
|
||||
await device.start_advertising(auto_restart=True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,123 +20,48 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
|
||||
from bumble.colors import color
|
||||
|
||||
import bumble.core
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
from bumble.core import (
|
||||
BT_HANDSFREE_SERVICE,
|
||||
BT_RFCOMM_PROTOCOL_ID,
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
)
|
||||
from bumble import rfcomm, hfp
|
||||
from bumble.hci import HCI_SynchronousDataPacket
|
||||
from bumble.sdp import (
|
||||
Client as SDP_Client,
|
||||
DataElement,
|
||||
ServiceAttribute,
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# pylint: disable-next=too-many-nested-blocks
|
||||
async def list_rfcomm_channels(device, connection):
|
||||
# Connect to the SDP Server
|
||||
sdp_client = SDP_Client(connection)
|
||||
await sdp_client.connect()
|
||||
|
||||
# Search for services that support the Handsfree Profile
|
||||
search_result = await sdp_client.search_attributes(
|
||||
[BT_HANDSFREE_SERVICE],
|
||||
[
|
||||
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
|
||||
def _default_configuration() -> hfp.AgConfiguration:
|
||||
return hfp.AgConfiguration(
|
||||
supported_ag_features=[
|
||||
hfp.AgFeature.HF_INDICATORS,
|
||||
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
|
||||
hfp.AgFeature.REJECT_CALL,
|
||||
hfp.AgFeature.CODEC_NEGOTIATION,
|
||||
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||
],
|
||||
supported_ag_indicators=[
|
||||
hfp.AgIndicatorState.call(),
|
||||
hfp.AgIndicatorState.service(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.signal(),
|
||||
hfp.AgIndicatorState.roam(),
|
||||
hfp.AgIndicatorState.battchg(),
|
||||
],
|
||||
supported_hf_indicators=[
|
||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||
hfp.HfIndicator.BATTERY_LEVEL,
|
||||
],
|
||||
supported_ag_call_hold_operations=[],
|
||||
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
|
||||
)
|
||||
print(color('==================================', 'blue'))
|
||||
print(color('Handsfree Services:', 'yellow'))
|
||||
rfcomm_channels = []
|
||||
# pylint: disable-next=too-many-nested-blocks
|
||||
for attribute_list in search_result:
|
||||
# Look for the RFCOMM Channel number
|
||||
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
|
||||
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
|
||||
)
|
||||
if protocol_descriptor_list:
|
||||
for protocol_descriptor in protocol_descriptor_list.value:
|
||||
if len(protocol_descriptor.value) >= 2:
|
||||
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
|
||||
print(color('SERVICE:', 'green'))
|
||||
print(
|
||||
color(' RFCOMM Channel:', 'cyan'),
|
||||
protocol_descriptor.value[1].value,
|
||||
)
|
||||
rfcomm_channels.append(protocol_descriptor.value[1].value)
|
||||
|
||||
# List profiles
|
||||
bluetooth_profile_descriptor_list = (
|
||||
ServiceAttribute.find_attribute_in_list(
|
||||
attribute_list,
|
||||
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
|
||||
)
|
||||
)
|
||||
if bluetooth_profile_descriptor_list:
|
||||
if bluetooth_profile_descriptor_list.value:
|
||||
if (
|
||||
bluetooth_profile_descriptor_list.value[0].type
|
||||
== DataElement.SEQUENCE
|
||||
):
|
||||
bluetooth_profile_descriptors = (
|
||||
bluetooth_profile_descriptor_list.value
|
||||
)
|
||||
else:
|
||||
# Sometimes, instead of a list of lists, we just
|
||||
# find a list. Fix that
|
||||
bluetooth_profile_descriptors = [
|
||||
bluetooth_profile_descriptor_list
|
||||
]
|
||||
|
||||
print(color(' Profiles:', 'green'))
|
||||
for (
|
||||
bluetooth_profile_descriptor
|
||||
) in bluetooth_profile_descriptors:
|
||||
version_major = (
|
||||
bluetooth_profile_descriptor.value[1].value >> 8
|
||||
)
|
||||
version_minor = (
|
||||
bluetooth_profile_descriptor.value[1].value
|
||||
& 0xFF
|
||||
)
|
||||
print(
|
||||
' '
|
||||
f'{bluetooth_profile_descriptor.value[0].value}'
|
||||
f' - version {version_major}.{version_minor}'
|
||||
)
|
||||
|
||||
# List service classes
|
||||
service_class_id_list = ServiceAttribute.find_attribute_in_list(
|
||||
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
|
||||
)
|
||||
if service_class_id_list:
|
||||
if service_class_id_list.value:
|
||||
print(color(' Service Classes:', 'green'))
|
||||
for service_class_id in service_class_id_list.value:
|
||||
print(' ', service_class_id.value)
|
||||
|
||||
await sdp_client.disconnect()
|
||||
return rfcomm_channels
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
|
||||
@@ -149,11 +74,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
await device.power_on()
|
||||
|
||||
@@ -164,13 +91,14 @@ async def main():
|
||||
print(f'=== Connected to {connection.peer_address}!')
|
||||
|
||||
# Get a list of all the Handsfree services (should only be 1)
|
||||
channels = await list_rfcomm_channels(device, connection)
|
||||
if len(channels) == 0:
|
||||
if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
|
||||
print('!!! no service found')
|
||||
return
|
||||
|
||||
# Pick the first one
|
||||
channel = channels[0]
|
||||
channel, version, hf_sdp_features = hfp_record
|
||||
print(f'HF version: {version}')
|
||||
print(f'HF features: {hf_sdp_features}')
|
||||
|
||||
# Request authentication
|
||||
print('*** Authenticating...')
|
||||
@@ -205,51 +133,9 @@ async def main():
|
||||
|
||||
device.host.on('sco_packet', on_sco)
|
||||
|
||||
# Protocol loop (just for testing at this point)
|
||||
protocol = hfp.HfpProtocol(session)
|
||||
while True:
|
||||
line = await protocol.next_line()
|
||||
ag_protocol = hfp.AgProtocol(session, _default_configuration())
|
||||
|
||||
if line.startswith('AT+BRSF='):
|
||||
protocol.send_response_line('+BRSF: 30')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CIND=?'):
|
||||
protocol.send_response_line(
|
||||
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
|
||||
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
|
||||
'("callheld",(0-2))'
|
||||
)
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CIND?'):
|
||||
protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CMER='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CHLD=?'):
|
||||
protocol.send_response_line('+CHLD: 0')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+BTRH?'):
|
||||
protocol.send_response_line('+BTRH: 0')
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+CLIP='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+VGS='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+BIA='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+BVRA='):
|
||||
protocol.send_response_line(
|
||||
'+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
|
||||
)
|
||||
elif line.startswith('AT+XEVENT='):
|
||||
protocol.send_response_line('OK')
|
||||
elif line.startswith('AT+XAPL='):
|
||||
protocol.send_response_line('OK')
|
||||
else:
|
||||
print(color('UNSUPPORTED AT COMMAND', 'red'))
|
||||
protocol.send_response_line('ERROR')
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.terminated
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -37,7 +37,7 @@ hf_protocol: Optional[HfProtocol] = None
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
|
||||
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
|
||||
print('*** DLC connected', dlc)
|
||||
global hf_protocol
|
||||
hf_protocol = HfProtocol(dlc, configuration)
|
||||
@@ -84,19 +84,19 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
|
||||
print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Hands-Free profile configuration.
|
||||
# TODO: load configuration from file.
|
||||
configuration = hfp.Configuration(
|
||||
configuration = hfp.HfConfiguration(
|
||||
supported_hf_features=[
|
||||
hfp.HfFeature.THREE_WAY_CALLING,
|
||||
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
|
||||
@@ -116,7 +116,9 @@ async def main():
|
||||
)
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create and register a server
|
||||
@@ -128,7 +130,9 @@ async def main():
|
||||
|
||||
# Advertise the HFP RFComm channel in the SDP
|
||||
device.sdp_service_records = {
|
||||
0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
|
||||
0x00010001: hfp.make_hf_sdp_records(
|
||||
0x00010001, channel_number, configuration
|
||||
)
|
||||
}
|
||||
|
||||
# Let's go!
|
||||
@@ -164,7 +168,7 @@ async def main():
|
||||
|
||||
await websockets.serve(serve, 'localhost', 8989)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -489,7 +489,7 @@ async def keyboard_device(hid_device):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
|
||||
@@ -601,11 +601,13 @@ async def main():
|
||||
asyncio.create_task(handle_virtual_cable_unplug())
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create and register HID device
|
||||
@@ -742,7 +744,7 @@ async def main():
|
||||
print("Executing in Web mode")
|
||||
await keyboard_device(hid_device)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_hid_host.py <device-config> <transport-spec> '
|
||||
@@ -324,11 +324,13 @@ async def main():
|
||||
asyncio.create_task(handle_virtual_cable_unplug())
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< CONNECTED')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create HID host and start it
|
||||
@@ -557,7 +559,7 @@ async def main():
|
||||
# Interrupt Channel
|
||||
await hid_host.connect_interrupt_channel()
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print('Usage: run_notifier.py <device-config> <transport-spec>')
|
||||
print('example: run_notifier.py device1.json usb:0')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.listener = Listener(device)
|
||||
|
||||
# Add a few entries to the device's GATT server
|
||||
|
||||
@@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session):
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 5:
|
||||
print(
|
||||
'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
|
||||
@@ -178,11 +178,13 @@ async def main():
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
await device.power_on()
|
||||
|
||||
@@ -192,8 +194,8 @@ async def main():
|
||||
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
|
||||
print(f'=== Connected to {connection.peer_address}!')
|
||||
|
||||
channel = sys.argv[4]
|
||||
if channel == 'discover':
|
||||
channel_str = sys.argv[4]
|
||||
if channel_str == 'discover':
|
||||
await list_rfcomm_channels(connection)
|
||||
return
|
||||
|
||||
@@ -213,7 +215,7 @@ async def main():
|
||||
rfcomm_mux = await rfcomm_client.start()
|
||||
print('@@@ Started')
|
||||
|
||||
channel = int(channel)
|
||||
channel = int(channel_str)
|
||||
print(f'### Opening session for channel {channel}...')
|
||||
try:
|
||||
session = await rfcomm_mux.open_dlc(channel)
|
||||
@@ -229,7 +231,7 @@ async def main():
|
||||
tcp_port = int(sys.argv[5])
|
||||
asyncio.create_task(tcp_server(tcp_port, session))
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -107,7 +107,7 @@ class TcpServer:
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 4:
|
||||
print(
|
||||
'Usage: run_rfcomm_server.py <device-config> <transport-spec> '
|
||||
@@ -124,11 +124,13 @@ async def main():
|
||||
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device
|
||||
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
|
||||
device = Device.from_config_file_with_hci(
|
||||
sys.argv[1], hci_transport.source, hci_transport.sink
|
||||
)
|
||||
device.classic_enabled = True
|
||||
|
||||
# Create a TCP server
|
||||
@@ -153,7 +155,7 @@ async def main():
|
||||
await device.set_discoverable(True)
|
||||
await device.set_connectable(True)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -20,27 +20,31 @@ import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.hci import Address
|
||||
from bumble.device import Device
|
||||
from bumble.transport import open_transport_or_link
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main():
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 2:
|
||||
print('Usage: run_scanner.py <transport-spec> [filter]')
|
||||
print('example: run_scanner.py usb:0')
|
||||
return
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
|
||||
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
|
||||
print('<<< connected')
|
||||
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
|
||||
|
||||
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
|
||||
device = Device.with_hci(
|
||||
'Bumble',
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
hci_transport.source,
|
||||
hci_transport.sink,
|
||||
)
|
||||
|
||||
@device.on('advertisement')
|
||||
def _(advertisement):
|
||||
def on_adv(advertisement):
|
||||
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
|
||||
advertisement.address.address_type
|
||||
]
|
||||
@@ -67,10 +71,11 @@ async def main():
|
||||
f'{advertisement.data.to_string(separator)}'
|
||||
)
|
||||
|
||||
device.on('advertisement', on_adv)
|
||||
await device.power_on()
|
||||
await device.start_scanning(filter_duplicates=filter_duplicates)
|
||||
|
||||
await hci_source.wait_for_termination()
|
||||
await hci_transport.source.wait_for_termination()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
19
setup.cfg
19
setup.cfg
@@ -33,21 +33,20 @@ include_package_data = True
|
||||
install_requires =
|
||||
aiohttp ~= 3.8; platform_system!='Emscripten'
|
||||
appdirs >= 1.4; platform_system!='Emscripten'
|
||||
bt-test-interfaces >= 0.0.2; platform_system!='Emscripten'
|
||||
click == 8.1.3; platform_system!='Emscripten'
|
||||
click >= 8.1.3; platform_system!='Emscripten'
|
||||
cryptography == 39; platform_system!='Emscripten'
|
||||
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
|
||||
# versions available on PyPI. Relax the version requirement since it's better than being
|
||||
# completely unable to import the package in case of version mismatch.
|
||||
cryptography >= 39.0; platform_system=='Emscripten'
|
||||
grpcio == 1.57.0; platform_system!='Emscripten'
|
||||
grpcio >= 1.62.1; platform_system!='Emscripten'
|
||||
humanize >= 4.6.0; platform_system!='Emscripten'
|
||||
libusb1 >= 2.0.1; platform_system!='Emscripten'
|
||||
libusb-package == 1.0.26.1; platform_system!='Emscripten'
|
||||
platformdirs == 3.10.0; platform_system!='Emscripten'
|
||||
platformdirs >= 3.10.0; platform_system!='Emscripten'
|
||||
prompt_toolkit >= 3.0.16; platform_system!='Emscripten'
|
||||
prettytable >= 3.6.0; platform_system!='Emscripten'
|
||||
protobuf >= 3.12.4; platform_system!='Emscripten'
|
||||
protobuf >= 4.24.2; platform_system!='Emscripten'
|
||||
pyee >= 8.2.2
|
||||
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
|
||||
pyserial >= 3.5; platform_system!='Emscripten'
|
||||
@@ -83,12 +82,12 @@ build =
|
||||
build >= 0.7
|
||||
test =
|
||||
pytest >= 8.0
|
||||
pytest-asyncio == 0.21.1
|
||||
pytest-asyncio >= 0.23.5
|
||||
pytest-html >= 3.2.0
|
||||
coverage >= 6.4
|
||||
development =
|
||||
black == 24.3
|
||||
grpcio-tools >= 1.57.0
|
||||
grpcio-tools >= 1.62.1
|
||||
invoke >= 1.7.3
|
||||
mypy == 1.8.0
|
||||
nox >= 2022
|
||||
@@ -98,8 +97,10 @@ development =
|
||||
types-invoke >= 1.7.3
|
||||
types-protobuf >= 4.21.0
|
||||
avatar =
|
||||
pandora-avatar == 0.0.8
|
||||
rootcanal == 1.9.0 ; python_version>='3.10'
|
||||
pandora-avatar == 0.0.9
|
||||
rootcanal == 1.10.0 ; python_version>='3.10'
|
||||
pandora =
|
||||
bt-test-interfaces >= 0.0.6
|
||||
documentation =
|
||||
mkdocs >= 1.4.0
|
||||
mkdocs-material >= 8.5.6
|
||||
|
||||
@@ -19,8 +19,9 @@ import asyncio
|
||||
import logging
|
||||
import os
|
||||
import pytest
|
||||
import pytest_asyncio
|
||||
|
||||
from typing import Tuple
|
||||
from typing import Tuple, Optional
|
||||
|
||||
from .test_utils import TwoDevices
|
||||
from bumble import core
|
||||
@@ -35,10 +36,73 @@ from bumble import hci
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_hf_configuration() -> hfp.HfConfiguration:
|
||||
return hfp.HfConfiguration(
|
||||
supported_hf_features=[
|
||||
hfp.HfFeature.CODEC_NEGOTIATION,
|
||||
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||
hfp.HfFeature.HF_INDICATORS,
|
||||
],
|
||||
supported_hf_indicators=[
|
||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||
hfp.HfIndicator.BATTERY_LEVEL,
|
||||
],
|
||||
supported_audio_codecs=[
|
||||
hfp.AudioCodec.CVSD,
|
||||
hfp.AudioCodec.MSBC,
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
|
||||
return hfp.HfSdpFeature.WIDE_BAND
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_ag_configuration() -> hfp.AgConfiguration:
|
||||
return hfp.AgConfiguration(
|
||||
supported_ag_features=[
|
||||
hfp.AgFeature.HF_INDICATORS,
|
||||
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
|
||||
hfp.AgFeature.REJECT_CALL,
|
||||
hfp.AgFeature.CODEC_NEGOTIATION,
|
||||
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
|
||||
],
|
||||
supported_ag_indicators=[
|
||||
hfp.AgIndicatorState.call(),
|
||||
hfp.AgIndicatorState.service(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.callsetup(),
|
||||
hfp.AgIndicatorState.signal(),
|
||||
hfp.AgIndicatorState.roam(),
|
||||
hfp.AgIndicatorState.battchg(),
|
||||
],
|
||||
supported_hf_indicators=[
|
||||
hfp.HfIndicator.ENHANCED_SAFETY,
|
||||
hfp.HfIndicator.BATTERY_LEVEL,
|
||||
],
|
||||
supported_ag_call_hold_operations=[],
|
||||
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
|
||||
return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def make_hfp_connections(
|
||||
hf_config: hfp.Configuration,
|
||||
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
|
||||
hf_config: Optional[hfp.HfConfiguration] = None,
|
||||
ag_config: Optional[hfp.AgConfiguration] = None,
|
||||
):
|
||||
if not hf_config:
|
||||
hf_config = _default_hf_configuration()
|
||||
if not ag_config:
|
||||
ag_config = _default_ag_configuration()
|
||||
|
||||
# Setup devices
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
@@ -55,38 +119,200 @@ async def make_hfp_connections(
|
||||
|
||||
# Setup HFP connection
|
||||
hf = hfp.HfProtocol(client_dlc, hf_config)
|
||||
ag = hfp.HfpProtocol(server_dlc)
|
||||
return hf, ag
|
||||
ag = hfp.AgProtocol(server_dlc, ag_config)
|
||||
|
||||
await hf.initiate_slc()
|
||||
return (hf, ag)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest_asyncio.fixture
|
||||
async def hfp_connections():
|
||||
hf, ag = await make_hfp_connections()
|
||||
hf_loop_task = asyncio.create_task(hf.run())
|
||||
|
||||
try:
|
||||
yield (hf, ag)
|
||||
finally:
|
||||
# Close the coroutine.
|
||||
hf.unsolicited_queue.put_nowait(None)
|
||||
await hf_loop_task
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_slc():
|
||||
hf_config = hfp.Configuration(
|
||||
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
|
||||
)
|
||||
hf, ag = await make_hfp_connections(hf_config)
|
||||
|
||||
async def ag_loop():
|
||||
while line := await ag.next_line():
|
||||
if line.startswith('AT+BRSF'):
|
||||
ag.send_response_line('+BRSF: 0')
|
||||
elif line.startswith('AT+CIND=?'):
|
||||
ag.send_response_line(
|
||||
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
|
||||
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
|
||||
'("callheld",(0-2))'
|
||||
async def test_slc_with_minimal_features():
|
||||
hf, ag = await make_hfp_connections(
|
||||
hfp.HfConfiguration(
|
||||
supported_audio_codecs=[],
|
||||
supported_hf_features=[],
|
||||
supported_hf_indicators=[],
|
||||
),
|
||||
hfp.AgConfiguration(
|
||||
supported_ag_call_hold_operations=[],
|
||||
supported_ag_features=[],
|
||||
supported_ag_indicators=[
|
||||
hfp.AgIndicatorState(
|
||||
indicator=hfp.AgIndicator.CALL,
|
||||
supported_values={0, 1},
|
||||
current_status=0,
|
||||
)
|
||||
elif line.startswith('AT+CIND?'):
|
||||
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
|
||||
ag.send_response_line('OK')
|
||||
],
|
||||
supported_hf_indicators=[],
|
||||
supported_audio_codecs=[],
|
||||
),
|
||||
)
|
||||
|
||||
ag_task = asyncio.create_task(ag_loop())
|
||||
assert hf.supported_ag_features == ag.supported_ag_features
|
||||
assert hf.supported_hf_features == ag.supported_hf_features
|
||||
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
||||
assert a.indicator == b.indicator
|
||||
assert a.current_status == b.current_status
|
||||
|
||||
await hf.initiate_slc()
|
||||
ag_task.cancel()
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
assert hf.supported_ag_features == ag.supported_ag_features
|
||||
assert hf.supported_hf_features == ag.supported_hf_features
|
||||
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
|
||||
assert a.indicator == b.indicator
|
||||
assert a.current_status == b.current_status
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
hf.on('ag_indicator', future.set_result)
|
||||
|
||||
ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
|
||||
|
||||
indicator: hfp.AgIndicatorState = await future
|
||||
assert indicator.current_status == 1
|
||||
assert indicator.indicator == hfp.AgIndicator.CALL
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('hf_indicator', future.set_result)
|
||||
|
||||
await hf.execute_command('AT+BIEV=2,100')
|
||||
|
||||
indicator: hfp.HfIndicatorState = await future
|
||||
assert indicator.current_status == 100
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_codec_negotiation(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
futures = [
|
||||
asyncio.get_running_loop().create_future(),
|
||||
asyncio.get_running_loop().create_future(),
|
||||
]
|
||||
hf.on('codec_negotiation', futures[0].set_result)
|
||||
ag.on('codec_negotiation', futures[1].set_result)
|
||||
await ag.negotiate_codec(hfp.AudioCodec.MSBC)
|
||||
|
||||
assert await futures[0] == await futures[1]
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
NUMBER = 'ATD123456789'
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('dial', future.set_result)
|
||||
await hf.execute_command(f'ATD{NUMBER}')
|
||||
|
||||
number: str = await future
|
||||
assert number == NUMBER
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('answer', lambda: future.set_result(None))
|
||||
await hf.answer_incoming_call()
|
||||
|
||||
await future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_reject_incoming_call(
|
||||
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
|
||||
):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('hang_up', lambda: future.set_result(None))
|
||||
await hf.reject_incoming_call()
|
||||
|
||||
await future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
|
||||
hf, ag = hfp_connections
|
||||
|
||||
future = asyncio.get_running_loop().create_future()
|
||||
ag.on('hang_up', lambda: future.set_result(None))
|
||||
await hf.terminate_call()
|
||||
|
||||
await future
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_hf_sdp_record():
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
|
||||
devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
|
||||
1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
|
||||
)
|
||||
|
||||
assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
|
||||
2,
|
||||
hfp.ProfileVersion.V1_8,
|
||||
_default_hf_sdp_features(),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@pytest.mark.asyncio
|
||||
async def test_ag_sdp_record():
|
||||
devices = TwoDevices()
|
||||
await devices.setup_connection()
|
||||
|
||||
devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
|
||||
1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
|
||||
)
|
||||
|
||||
assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
|
||||
2,
|
||||
hfp.ProfileVersion.V1_8,
|
||||
_default_ag_sdp_features(),
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
64
tests/transport_tcp_server_test.py
Normal file
64
tests/transport_tcp_server_test.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# Copyright 2024 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import pytest
|
||||
import socket
|
||||
import unittest
|
||||
from unittest.mock import ANY, patch
|
||||
|
||||
from bumble.transport.tcp_server import (
|
||||
open_tcp_server_transport,
|
||||
open_tcp_server_transport_with_socket,
|
||||
)
|
||||
|
||||
|
||||
class OpenTcpServerTransportTests(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.patcher = patch('bumble.transport.tcp_server._create_server')
|
||||
self.mock_create_server = self.patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
self.patcher.stop()
|
||||
|
||||
def test_open_with_spec(self):
|
||||
asyncio.run(open_tcp_server_transport('localhost:32100'))
|
||||
self.mock_create_server.assert_awaited_once_with(
|
||||
ANY, host='localhost', port=32100
|
||||
)
|
||||
|
||||
def test_open_with_port_only_spec(self):
|
||||
asyncio.run(open_tcp_server_transport('_:32100'))
|
||||
self.mock_create_server.assert_awaited_once_with(ANY, host=None, port=32100)
|
||||
|
||||
def test_open_with_socket(self):
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
asyncio.run(open_tcp_server_transport_with_socket(sock=sock))
|
||||
self.mock_create_server.assert_awaited_once_with(ANY, sock=sock)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not os.environ.get('PYTEST_NOSKIP', 0),
|
||||
reason='''\
|
||||
Not hermetic. Should only run manually with
|
||||
$ PYTEST_NOSKIP=1 pytest tests
|
||||
''',
|
||||
)
|
||||
def test_open_with_real_socket():
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
||||
sock.bind(('localhost', 0))
|
||||
port = sock.getsockname()[1]
|
||||
assert port != 0
|
||||
asyncio.run(open_tcp_server_transport_with_socket(sock=sock))
|
||||
Reference in New Issue
Block a user