Compare commits

...

22 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
50eae2ef54 add pandora to code-check action 2024-04-17 13:19:07 -07:00
Gilles Boccon-Gibod
c8883a7d0f update protobuf dep and make pandora install optional 2024-04-17 13:14:21 -07:00
zxzxwu
51321caf5b Merge pull request #470 from zxzxwu/examples
Type hint all examples
2024-04-16 02:56:08 +08:00
zxzxwu
51a94288e2 Type hint all examples 2024-04-15 12:48:21 +00:00
zxzxwu
8758856e8c Merge pull request #465 from zxzxwu/hfp-ag
HFP AG implementation
2024-04-12 22:15:25 +08:00
Josh Wu
deba181857 HFP AG implementation 2024-04-10 09:51:37 +00:00
zxzxwu
c65188dcbf Merge pull request #466 from zxzxwu/format
Fix format presubmit error
2024-04-09 02:59:36 +08:00
Josh Wu
21d607898d Fix format presubmit error 2024-04-09 01:44:04 +08:00
Gilles Boccon-Gibod
2698d4534e Merge pull request #435 from jeru/main
open_tcp_server_transport: allow explicit sock as input.
2024-04-04 19:17:07 -07:00
zxzxwu
bbcd64286a Merge pull request #463 from zxzxwu/hfp
Correct HFP AG indicator index
2024-04-04 12:53:19 +08:00
Gilles Boccon-Gibod
9140afbf8c Merge pull request #456 from google/gbg/update-dependencies
update some dependencies
2024-04-03 17:50:18 -06:00
Gilles Boccon-Gibod
90a682c71b bump to avatar 0.0.9 2024-04-03 16:26:07 -07:00
Gilles Boccon-Gibod
e8737a8243 update to more recent versions 2024-04-03 10:00:11 -07:00
Gilles Boccon-Gibod
72fceca72e update some dependencies 2024-04-03 10:00:09 -07:00
Gilles Boccon-Gibod
732294abbc Merge pull request #462 from google/gbg/461
fix #461
2024-04-03 10:56:05 -06:00
Josh Wu
dc1204531e Correct HFP AG indicator index 2024-04-03 17:58:04 +08:00
Gilles Boccon-Gibod
962114379c fix #461 2024-04-02 23:14:32 -07:00
Gilles Boccon-Gibod
e6913a3055 Merge pull request #457 from google/gbg/bench-ascyncio-main
delay creation of runner object
2024-04-02 21:39:37 -06:00
Gilles Boccon-Gibod
e21d122aef Merge pull request #458 from google/gbg/update-formatter
update black formatter to version 24
2024-04-02 21:39:24 -06:00
Gilles Boccon-Gibod
76bca03fe3 format with the project's version of black 2024-04-01 14:39:34 -07:00
Gilles Boccon-Gibod
f1e5c9e59e delay creation of runner object 2024-04-01 14:25:38 -07:00
Cheng Sheng
1ceeccbbc0 open_tcp_server_transport: allow explicit sock as input.
When a user doesn't need an exact port, but cares more about getting
SOME unused port, they can do:
* Create a socket outside with port=None or port=0.
* Use socket.getsockname()[1] to get the allocated port and pass to the
TCP client somehow.
* Use the created socket to create a TCP server transport.

Use-case: unit-testing embedded software that implements a BLE host. The
controller will be a Bumble controller, connected to the host via a TCP
channel.
* The host will have a TCP-client HCI transport for testing.
* The pytest setup code will allocate the TCP server and pass the port
number to the host.

Also add some unittests with python mock.
2024-03-13 19:34:05 +01:00
46 changed files with 1391 additions and 382 deletions

View File

@@ -33,7 +33,7 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install ".[build,test,development]"
python -m pip install ".[build,test,development,pandora]"
- name: Check
run: |
invoke project.pre-commit

View File

@@ -32,7 +32,7 @@ jobs:
- name: Install
run: |
python -m pip install --upgrade pip
python -m pip install .[avatar]
python -m pip install .[avatar,pandora]
- name: Rootcanal
run: nohup python -m rootcanal > rootcanal.log &
- name: Test

2
.gitignore vendored
View File

@@ -6,6 +6,8 @@ dist/
docs/mkdocs/site
test-results.xml
__pycache__
# Vim
.*.sw*
# generated by setuptools_scm
bumble/_version.py
.vscode/launch.json

View File

@@ -1234,6 +1234,7 @@ class Peripheral(Device.Listener, Connection.Listener):
'cyan',
)
)
await self.connected.wait()
logging.info(color('### Connected', 'cyan'))
@@ -1593,8 +1594,8 @@ def central(
mode_factory = create_mode_factory(ctx, 'gatt-client')
classic = ctx.obj['classic']
asyncio.run(
Central(
async def run_central():
await Central(
transport,
peripheral_address,
classic,
@@ -1606,7 +1607,8 @@ def central(
encrypt or authenticate,
ctx.obj['extended_data_length'],
).run()
)
asyncio.run(run_central())
@bench.command()
@@ -1617,15 +1619,16 @@ def peripheral(ctx, transport):
role_factory = create_role_factory(ctx, 'receiver')
mode_factory = create_mode_factory(ctx, 'gatt-server')
asyncio.run(
Peripheral(
async def run_peripheral():
await Peripheral(
transport,
ctx.obj['classic'],
ctx.obj['extended_data_length'],
role_factory,
mode_factory,
).run()
)
asyncio.run(run_peripheral())
def main():

View File

@@ -652,7 +652,9 @@ class SbcPacketSource:
# Prepare for next packets
sequence_number += 1
sequence_number &= 0xFFFF
timestamp += sum((frame.sample_count for frame in frames))
timestamp &= 0xFFFFFFFF
frames = [frame]
frames_size = len(frame.payload)
else:

View File

@@ -325,8 +325,8 @@ class MediaPacket:
self.padding = padding
self.extension = extension
self.marker = marker
self.sequence_number = sequence_number
self.timestamp = timestamp
self.sequence_number = sequence_number & 0xFFFF
self.timestamp = timestamp & 0xFFFFFFFF
self.ssrc = ssrc
self.csrc_list = csrc_list
self.payload_type = payload_type
@@ -341,7 +341,12 @@ class MediaPacket:
| len(self.csrc_list),
self.marker << 7 | self.payload_type,
]
) + struct.pack('>HII', self.sequence_number, self.timestamp, self.ssrc)
) + struct.pack(
'>HII',
self.sequence_number,
self.timestamp,
self.ssrc,
)
for csrc in self.csrc_list:
header += struct.pack('>I', csrc)
return header + self.payload

View File

@@ -90,6 +90,22 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def show_services(services: Iterable[ServiceProxy]) -> None:
for service in services:
print(color(str(service), 'cyan'))
for characteristic in service.characteristics:
print(color(' ' + str(characteristic), 'magenta'))
for descriptor in characteristic.descriptors:
print(color(' ' + str(descriptor), 'green'))
# -----------------------------------------------------------------------------
# Proxies
# -----------------------------------------------------------------------------

View File

@@ -15,6 +15,9 @@
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import collections
import collections.abc
import logging
import asyncio
@@ -22,16 +25,32 @@ import dataclasses
import enum
import traceback
import pyee
from typing import Dict, List, Union, Set, Any, Optional, Type, TYPE_CHECKING
import re
from typing import (
Dict,
List,
Union,
Set,
Any,
Optional,
Type,
Tuple,
ClassVar,
Iterable,
TYPE_CHECKING,
)
from typing_extensions import Self
from bumble import at
from bumble import device
from bumble import rfcomm
from bumble import sdp
from bumble.colors import color
from bumble.core import (
ProtocolError,
BT_GENERIC_AUDIO_SERVICE,
BT_HANDSFREE_SERVICE,
BT_HEADSET_AUDIO_GATEWAY_SERVICE,
BT_L2CAP_PROTOCOL_ID,
BT_RFCOMM_PROTOCOL_ID,
)
@@ -40,15 +59,6 @@ from bumble.hci import (
CodingFormat,
CodecID,
)
from bumble.sdp import (
DataElement,
ServiceAttribute,
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
)
# -----------------------------------------------------------------------------
@@ -329,6 +339,21 @@ class CallInfo:
type: Optional[int] = None
class CmeError(enum.IntEnum):
"""
CME ERROR codes (partial listed).
TS 127 007 - V6.8.0, 9.2.1 General errors
"""
PHONE_FAILURE = 0
OPERATION_NOT_ALLOWED = 3
OPERATION_NOT_SUPPORTED = 4
MEMORY_FULL = 20
INVALID_INDEX = 21
NOT_FOUND = 22
# -----------------------------------------------------------------------------
# Hands-Free Control Interoperability Requirements
# -----------------------------------------------------------------------------
@@ -402,12 +427,21 @@ STATUS_CODES = [
@dataclasses.dataclass
class Configuration:
class HfConfiguration:
supported_hf_features: List[HfFeature]
supported_hf_indicators: List[HfIndicator]
supported_audio_codecs: List[AudioCodec]
@dataclasses.dataclass
class AgConfiguration:
supported_ag_features: Iterable[AgFeature]
supported_ag_indicators: collections.abc.Sequence[AgIndicatorState]
supported_hf_indicators: Iterable[HfIndicator]
supported_ag_call_hold_operations: Iterable[CallHoldOperation]
supported_audio_codecs: Iterable[AudioCodec]
class AtResponseType(enum.Enum):
"""
Indicates if a response is expected from an AT command, and if multiple responses are accepted.
@@ -435,18 +469,148 @@ class AtResponse:
)
@dataclasses.dataclass
class AtCommand:
class SubCode(str, enum.Enum):
NONE = ''
SET = '='
TEST = '=?'
READ = '?'
code: str
sub_code: SubCode
parameters: list
_PARSE_PATTERN: ClassVar[re.Pattern] = re.compile(
r'AT\+(?P<code>[A-Z]+)(?P<sub_code>=\?|=|\?)?(?P<parameters>.*)'
)
@classmethod
def parse_from(cls: Type[Self], buffer: bytearray) -> Self:
if not (match := cls._PARSE_PATTERN.fullmatch(buffer.decode())):
if buffer.startswith(b'ATA'):
return cls(code='A', sub_code=AtCommand.SubCode.NONE, parameters=[])
if buffer.startswith(b'ATD'):
return cls(
code='D', sub_code=AtCommand.SubCode.NONE, parameters=[buffer[3:]]
)
raise HfpProtocolError('Invalid command')
parameters = []
if parameters_text := match.group('parameters'):
parameters = at.parse_parameters(parameters_text.encode())
return cls(
code=match.group('code'),
sub_code=AtCommand.SubCode(match.group('sub_code') or ''),
parameters=parameters,
)
@dataclasses.dataclass
class AgIndicatorState:
description: str
index: int
"""State wrapper of AG indicator.
Attributes:
indicator: Indicator of this indicator state.
supported_values: Supported values of this indicator.
current_status: Current status of this indicator.
index: (HF only) Index of this indicator.
enabled: (AG only) Whether this indicator is enabled to report.
on_test_text: Text message reported in AT+CIND=? of this indicator.
"""
indicator: AgIndicator
supported_values: Set[int]
current_status: int
index: Optional[int] = None
enabled: bool = True
@property
def on_test_text(self) -> str:
min_value = min(self.supported_values)
max_value = max(self.supported_values)
if len(self.supported_values) == (max_value - min_value + 1):
supported_values_text = f'({min_value}-{max_value})'
else:
supported_values_text = (
f'({",".join(str(v) for v in self.supported_values)})'
)
return f'(\"{self.indicator.value}\",{supported_values_text})'
@classmethod
def call(cls: Type[Self]) -> Self:
"""Default call indicator state."""
return cls(
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
)
@classmethod
def callsetup(cls: Type[Self]) -> Self:
"""Default callsetup indicator state."""
return cls(
indicator=AgIndicator.CALL_SETUP,
supported_values={0, 1, 2, 3},
current_status=0,
)
@classmethod
def callheld(cls: Type[Self]) -> Self:
"""Default call indicator state."""
return cls(
indicator=AgIndicator.CALL_HELD,
supported_values={0, 1, 2},
current_status=0,
)
@classmethod
def service(cls: Type[Self]) -> Self:
"""Default service indicator state."""
return cls(
indicator=AgIndicator.SERVICE, supported_values={0, 1}, current_status=0
)
@classmethod
def signal(cls: Type[Self]) -> Self:
"""Default signal indicator state."""
return cls(
indicator=AgIndicator.SIGNAL,
supported_values={0, 1, 2, 3, 4, 5},
current_status=0,
)
@classmethod
def roam(cls: Type[Self]) -> Self:
"""Default roam indicator state."""
return cls(
indicator=AgIndicator.CALL, supported_values={0, 1}, current_status=0
)
@classmethod
def battchg(cls: Type[Self]) -> Self:
"""Default battery charge indicator state."""
return cls(
indicator=AgIndicator.BATTERY_CHARGE,
supported_values={0, 1, 2, 3, 4, 5},
current_status=0,
)
@dataclasses.dataclass
class HfIndicatorState:
"""State wrapper of HF indicator.
Attributes:
indicator: Indicator of this indicator state.
supported: Whether this indicator is supported.
enabled: Whether this indicator is enabled.
current_status: Current (last-reported) status value of this indicaotr.
"""
indicator: HfIndicator
supported: bool = False
enabled: bool = False
current_status: int = 0
class HfProtocol(pyee.EventEmitter):
@@ -464,6 +628,9 @@ class HfProtocol(pyee.EventEmitter):
ag_indicator: AgIndicator
"""
class HfLoopTermination(HfpProtocolError): ...
"""Termination signal for run() loop."""
supported_hf_features: int
supported_audio_codecs: List[AudioCodec]
@@ -477,14 +644,14 @@ class HfProtocol(pyee.EventEmitter):
command_lock: asyncio.Lock
if TYPE_CHECKING:
response_queue: asyncio.Queue[AtResponse]
unsolicited_queue: asyncio.Queue[AtResponse]
unsolicited_queue: asyncio.Queue[Optional[AtResponse]]
else:
response_queue: asyncio.Queue
unsolicited_queue: asyncio.Queue
read_buffer: bytearray
active_codec: AudioCodec
def __init__(self, dlc: rfcomm.DLC, configuration: Configuration) -> None:
def __init__(self, dlc: rfcomm.DLC, configuration: HfConfiguration) -> None:
super().__init__()
# Configure internal state.
@@ -494,13 +661,14 @@ class HfProtocol(pyee.EventEmitter):
self.unsolicited_queue = asyncio.Queue()
self.read_buffer = bytearray()
self.active_codec = AudioCodec.CVSD
self._slc_initialized = False
# Build local features.
self.supported_hf_features = sum(configuration.supported_hf_features)
self.supported_audio_codecs = configuration.supported_audio_codecs
self.hf_indicators = {
indicator: HfIndicatorState()
indicator: HfIndicatorState(indicator=indicator)
for indicator in configuration.supported_hf_indicators
}
@@ -511,6 +679,10 @@ class HfProtocol(pyee.EventEmitter):
# Bind the AT reader to the RFCOMM channel.
self.dlc.sink = self._read_at
# Stop the run() loop when L2CAP is closed.
self.dlc.multiplexer.l2cap_channel.on(
'close', lambda: self.unsolicited_queue.put_nowait(None)
)
def supports_hf_feature(self, feature: HfFeature) -> bool:
return (self.supported_hf_features & feature) != 0
@@ -621,7 +793,7 @@ class HfProtocol(pyee.EventEmitter):
# If both the HF and AG do support the Codec Negotiation feature
# then the HF shall send the AT+BAC=<HF available codecs> command to
# the AG to notify the AG of the available codecs in the HF.
codecs = [str(c) for c in self.supported_audio_codecs]
codecs = [str(c.value) for c in self.supported_audio_codecs]
await self.execute_command(f"AT+BAC={','.join(codecs)}")
# 4.2.1.3 AG Indicators
@@ -639,7 +811,7 @@ class HfProtocol(pyee.EventEmitter):
self.ag_indicators = []
for index, indicator in enumerate(response.parameters):
description = indicator[0].decode()
description = AgIndicator(indicator[0].decode())
supported_values = []
for value in indicator[1]:
value = value.split(b'-')
@@ -697,7 +869,7 @@ class HfProtocol(pyee.EventEmitter):
# shall send the AT+BIND=<HF supported HF indicators> command to the AG
# to notify the AG of the supported indicators assigned numbers in the
# HF. The AG shall respond with OK
indicators = [str(i) for i in self.hf_indicators.keys()]
indicators = [str(i.value) for i in self.hf_indicators]
await self.execute_command(f"AT+BIND={','.join(indicators)}")
# After having provided the AG with the HF indicators it supports,
@@ -733,6 +905,7 @@ class HfProtocol(pyee.EventEmitter):
self.hf_indicators[indicator].enabled = True
logger.info("SLC setup completed")
self._slc_initialized = True
async def setup_audio_connection(self):
"""4.11.2 Audio Connection Setup by HF."""
@@ -820,15 +993,17 @@ class HfProtocol(pyee.EventEmitter):
return calls
async def update_ag_indicator(self, index: int, value: int):
self.ag_indicators[index].current_status = value
self.emit('ag_indicator', self.ag_indicators[index])
logger.info(
f"AG indicator updated: {self.ag_indicators[index].description}, {value}"
)
# CIEV is in 1-index, while ag_indicators is in 0-index.
ag_indicator = self.ag_indicators[index - 1]
ag_indicator.current_status = value
self.emit('ag_indicator', ag_indicator)
logger.info(f"AG indicator updated: {ag_indicator.indicator}, {value}")
async def handle_unsolicited(self):
"""Handle unsolicited result codes sent by the audio gateway."""
result = await self.unsolicited_queue.get()
if not result:
raise HfProtocol.HfLoopTermination()
if result.code == "+BCS":
await self.setup_codec_connection(int(result.parameters[0]))
elif result.code == "+CIEV":
@@ -846,14 +1021,350 @@ class HfProtocol(pyee.EventEmitter):
"""
try:
await self.initiate_slc()
if not self._slc_initialized:
await self.initiate_slc()
while True:
await self.handle_unsolicited()
except HfProtocol.HfLoopTermination:
logger.info('Loop terminated')
except Exception:
logger.error("HFP-HF protocol failed with the following error:")
logger.error(traceback.format_exc())
class AgProtocol(pyee.EventEmitter):
"""
Implementation for the Audio-Gateway side of the Hands-Free profile.
Reference specification Hands-Free Profile v1.8.
Emitted events:
slc_complete: Emit when SLC procedure is completed.
codec_negotiation: When codec is renegotiated, notify the new codec.
Args:
active_codec: AudioCodec
hf_indicator: When HF update their indicators, notify the new state.
Args:
hf_indicator: HfIndicator
codec_connection_request: Emit when HF sends AT+BCC to request codec connection.
answer: Emit when HF sends ATA to answer phone call.
hang_up: Emit when HF sends AT+CHUP to hang up phone call.
dial: Emit when HF sends ATD to dial phone call.
"""
supported_hf_features: int
supported_hf_indicators: Set[HfIndicator]
supported_audio_codecs: List[AudioCodec]
supported_ag_features: int
supported_ag_call_hold_operations: List[CallHoldOperation]
ag_indicators: List[AgIndicatorState]
hf_indicators: collections.OrderedDict[HfIndicator, HfIndicatorState]
dlc: rfcomm.DLC
read_buffer: bytearray
active_codec: AudioCodec
indicator_report_enabled: bool
inband_ringtone_enabled: bool
cme_error_enabled: bool
_remained_slc_setup_features: Set[HfFeature]
def __init__(self, dlc: rfcomm.DLC, configuration: AgConfiguration) -> None:
super().__init__()
# Configure internal state.
self.dlc = dlc
self.read_buffer = bytearray()
self.active_codec = AudioCodec.CVSD
# Build local features.
self.supported_ag_features = sum(configuration.supported_ag_features)
self.supported_ag_call_hold_operations = list(
configuration.supported_ag_call_hold_operations
)
self.ag_indicators = list(configuration.supported_ag_indicators)
self.supported_hf_indicators = set(configuration.supported_hf_indicators)
self.inband_ringtone_enabled = True
self._remained_slc_setup_features = set()
# Clear remote features.
self.supported_hf_features = 0
self.supported_audio_codecs = []
self.indicator_report_enabled = False
self.cme_error_enabled = False
self.hf_indicators = collections.OrderedDict()
# Bind the AT reader to the RFCOMM channel.
self.dlc.sink = self._read_at
def supports_hf_feature(self, feature: HfFeature) -> bool:
return (self.supported_hf_features & feature) != 0
def supports_ag_feature(self, feature: AgFeature) -> bool:
return (self.supported_ag_features & feature) != 0
def _read_at(self, data: bytes):
"""
Reads AT messages from the RFCOMM channel.
"""
# Append to the read buffer.
self.read_buffer.extend(data)
# Locate the trailer.
trailer = self.read_buffer.find(b'\r')
if trailer == -1:
return
# Isolate the AT response code and parameters.
raw_command = self.read_buffer[:trailer]
command = AtCommand.parse_from(raw_command)
logger.debug(f"<<< {raw_command.decode()}")
# Consume the response bytes.
self.read_buffer = self.read_buffer[trailer + 1 :]
if command.sub_code == AtCommand.SubCode.TEST:
handler_name = f'_on_{command.code.lower()}_test'
elif command.sub_code == AtCommand.SubCode.READ:
handler_name = f'_on_{command.code.lower()}_read'
else:
handler_name = f'_on_{command.code.lower()}'
if handler := getattr(self, handler_name, None):
handler(*command.parameters)
else:
logger.warning('Handler %s not found', handler_name)
self.send_response('ERROR')
def send_response(self, response: str) -> None:
"""Sends an AT response."""
self.dlc.write(f'\r\n{response}\r\n')
def send_cme_error(self, error_code: CmeError) -> None:
"""Sends an CME ERROR response.
If CME Error is not enabled by HF, sends ERROR instead.
"""
if self.cme_error_enabled:
self.send_response(f'+CME ERROR: {error_code.value}')
else:
self.send_error()
def send_ok(self) -> None:
"""Sends an OK response."""
self.send_response('OK')
def send_error(self) -> None:
"""Sends an ERROR response."""
self.send_response('ERROR')
def set_inband_ringtone_enabled(self, enabled: bool) -> None:
"""Enables or disables in-band ringtone."""
self.inband_ringtone_enabled = enabled
self.send_response(f'+BSIR: {1 if enabled else 0}')
def update_ag_indicator(self, indicator: AgIndicator, value: int) -> None:
"""Updates AG indicator.
Args:
indicator: Name of the indicator.
value: new value of the indicator.
"""
search_result = next(
(
(index, state)
for index, state in enumerate(self.ag_indicators)
if state.indicator == indicator
),
None,
)
if not search_result:
raise KeyError(f'{indicator} is not supported.')
index, indicator_state = search_result
if not self.indicator_report_enabled:
logger.warning('AG indicator report is disabled')
if not indicator_state.enabled:
logger.warning(f'AG indicator {indicator} is disabled')
indicator_state.current_status = value
self.send_response(f'+CIEV: {index+1},{value}')
async def negotiate_codec(self, codec: AudioCodec) -> None:
"""Starts codec negotiation."""
if not self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
logger.warning('Local does not support Codec Negotiation')
if not self.supports_hf_feature(HfFeature.CODEC_NEGOTIATION):
logger.warning('Peer does not support Codec Negotiation')
if codec not in self.supported_audio_codecs:
logger.warning(f'{codec} is not supported by peer')
at_bcs_future = asyncio.get_running_loop().create_future()
self.once('codec_negotiation', at_bcs_future.set_result)
self.send_response(f'+BCS: {codec.value}')
if (new_codec := await at_bcs_future) != codec:
raise HfpProtocolError(f'Expect codec: {codec}, but get {new_codec}')
def _check_remained_slc_commands(self) -> None:
if not self._remained_slc_setup_features:
self.emit('slc_complete')
def _on_brsf(self, hf_features: bytes) -> None:
self.supported_hf_features = int(hf_features)
self.send_response(f'+BRSF: {self.supported_ag_features}')
self.send_ok()
if self.supports_hf_feature(
HfFeature.HF_INDICATORS
) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
self._remained_slc_setup_features.add(HfFeature.HF_INDICATORS)
if self.supports_hf_feature(
HfFeature.THREE_WAY_CALLING
) and self.supports_ag_feature(AgFeature.THREE_WAY_CALLING):
self._remained_slc_setup_features.add(HfFeature.THREE_WAY_CALLING)
def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.send_ok()
def _on_bcs(self, codec: bytes) -> None:
self.active_codec = AudioCodec(int(codec))
self.send_ok()
self.emit('codec_negotiation', self.active_codec)
def _on_cind_test(self) -> None:
if not self.ag_indicators:
self.send_cme_error(CmeError.NOT_FOUND)
return
indicator_list_str = ",".join(
indicator.on_test_text for indicator in self.ag_indicators
)
self.send_response(f'+CIND: {indicator_list_str}')
self.send_ok()
def _on_cind_read(self) -> None:
if not self.ag_indicators:
self.send_cme_error(CmeError.NOT_FOUND)
return
indicator_list_str = ",".join(
str(indicator.current_status) for indicator in self.ag_indicators
)
self.send_response(f'+CIND: {indicator_list_str}')
self.send_ok()
self._check_remained_slc_commands()
def _on_cmer(
self,
mode: bytes,
keypad: Optional[bytes] = None,
display: Optional[bytes] = None,
indicator: bytes = b'',
) -> None:
if int(mode) != 3 or keypad or display or int(indicator) not in (0, 1):
logger.error(
f'Unexpected values: mode={mode!r}, keypad={keypad!r}, '
f'display={display!r}, indicator={indicator!r}'
)
self.send_cme_error(CmeError.INVALID_INDEX)
self.indicator_report_enabled = bool(int(indicator))
self.send_ok()
def _on_cmee(self, enabled: bytes) -> None:
self.cme_error_enabled = bool(int(enabled))
self.send_ok()
def _on_bind(self, *args) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
peer_supported_indicators = set(
HfIndicator(int(indicator)) for indicator in args
)
self.hf_indicators = collections.OrderedDict(
{
indicator: HfIndicatorState(indicator=indicator)
for indicator in self.supported_hf_indicators.intersection(
peer_supported_indicators
)
}
)
self.send_ok()
def _on_bind_test(self) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
hf_indicator_list_str = ",".join(
str(indicator.value) for indicator in self.supported_hf_indicators
)
self.send_response(f'+BIND: ({hf_indicator_list_str})')
self.send_ok()
def _on_bind_read(self) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
for indicator in self.hf_indicators:
self.send_response(f'+BIND: {indicator.value},1')
self.send_ok()
self._remained_slc_setup_features.remove(HfFeature.HF_INDICATORS)
self._check_remained_slc_commands()
def _on_biev(self, index_bytes: bytes, value_bytes: bytes) -> None:
if not self.supports_ag_feature(AgFeature.HF_INDICATORS):
self.send_error()
return
index = HfIndicator(int(index_bytes))
if index not in self.hf_indicators:
self.send_error()
return
self.hf_indicators[index].current_status = int(value_bytes)
self.emit('hf_indicator', self.hf_indicators[index])
self.send_ok()
def _on_bia(self, *args) -> None:
for enabled, state in zip(args, self.ag_indicators):
state.enabled = bool(int(enabled))
self.send_ok()
def _on_bcc(self) -> None:
self.emit('codec_connection_request')
self.send_ok()
def _on_a(self) -> None:
"""ATA handler."""
self.emit('answer')
self.send_ok()
def _on_d(self, number: bytes) -> None:
"""ATD handler."""
self.emit('dial', number.decode())
self.send_ok()
def _on_chup(self) -> None:
self.emit('hang_up')
self.send_ok()
# -----------------------------------------------------------------------------
# Normative SDP definitions
# -----------------------------------------------------------------------------
@@ -907,9 +1418,12 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_TEST = 0x80
def sdp_records(
service_record_handle: int, rfcomm_channel: int, configuration: Configuration
) -> List[ServiceAttribute]:
def make_hf_sdp_records(
service_record_handle: int,
rfcomm_channel: int,
configuration: HfConfiguration,
version: ProfileVersion = ProfileVersion.V1_8,
) -> List[sdp.ServiceAttribute]:
"""
Generates the SDP record for HFP Hands-Free support.
@@ -941,53 +1455,226 @@ def sdp_records(
hf_supported_features |= HfSdpFeature.WIDE_BAND
return [
ServiceAttribute(
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
DataElement.unsigned_integer_32(service_record_handle),
sdp.ServiceAttribute(
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_32(service_record_handle),
),
ServiceAttribute(
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
DataElement.sequence(
sdp.ServiceAttribute(
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
DataElement.uuid(BT_HANDSFREE_SERVICE),
DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
]
),
),
ServiceAttribute(
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
sdp.ServiceAttribute(
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
DataElement.sequence(
sdp.DataElement.sequence(
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
),
sdp.DataElement.sequence(
[
DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
DataElement.unsigned_integer_8(rfcomm_channel),
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
sdp.DataElement.unsigned_integer_8(rfcomm_channel),
]
),
]
),
),
ServiceAttribute(
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
DataElement.sequence(
sdp.ServiceAttribute(
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
DataElement.sequence(
sdp.DataElement.sequence(
[
DataElement.uuid(BT_HANDSFREE_SERVICE),
DataElement.unsigned_integer_16(ProfileVersion.V1_8),
sdp.DataElement.uuid(BT_HANDSFREE_SERVICE),
sdp.DataElement.unsigned_integer_16(version),
]
)
]
),
),
ServiceAttribute(
SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
DataElement.unsigned_integer_16(hf_supported_features),
sdp.ServiceAttribute(
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_16(hf_supported_features),
),
]
def make_ag_sdp_records(
service_record_handle: int,
rfcomm_channel: int,
configuration: AgConfiguration,
version: ProfileVersion = ProfileVersion.V1_8,
) -> List[sdp.ServiceAttribute]:
"""
Generates the SDP record for HFP Audio-Gateway support.
The record exposes the features supported in the input configuration,
and the allocated RFCOMM channel.
"""
ag_supported_features = 0
if AgFeature.EC_NR in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.EC_NR
if AgFeature.THREE_WAY_CALLING in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.THREE_WAY_CALLING
if (
AgFeature.ENHANCED_VOICE_RECOGNITION_STATUS
in configuration.supported_ag_features
):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND
return [
sdp.ServiceAttribute(
sdp.SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_32(service_record_handle),
),
sdp.ServiceAttribute(
sdp.SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
]
),
),
sdp.ServiceAttribute(
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.sequence(
[sdp.DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]
),
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
sdp.DataElement.unsigned_integer_8(rfcomm_channel),
]
),
]
),
),
sdp.ServiceAttribute(
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.DataElement.sequence(
[
sdp.DataElement.sequence(
[
sdp.DataElement.uuid(BT_HEADSET_AUDIO_GATEWAY_SERVICE),
sdp.DataElement.unsigned_integer_16(version),
]
)
]
),
),
sdp.ServiceAttribute(
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
sdp.DataElement.unsigned_integer_16(ag_supported_features),
),
]
async def find_hf_sdp_record(
connection: device.Connection,
) -> Optional[Tuple[int, ProfileVersion, HfSdpFeature]]:
"""Searches a Hands-Free SDP record from remote device.
Args:
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[BT_HANDSFREE_SERVICE],
attribute_ids=[
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
for attribute_lists in search_result:
channel: Optional[int] = None
version: Optional[ProfileVersion] = None
features: Optional[HfSdpFeature] = None
for attribute in attribute_lists:
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
protocol_descriptor_list = attribute.value.value
channel = protocol_descriptor_list[1].value[1].value
elif (
attribute.id
== sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
profile_descriptor_list = attribute.value.value
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
features = HfSdpFeature(attribute.value.value)
if not channel or not version or features is None:
logger.warning(f"Bad result {attribute_lists}.")
return None
return (channel, version, features)
return None
async def find_ag_sdp_record(
connection: device.Connection,
) -> Optional[Tuple[int, ProfileVersion, AgSdpFeature]]:
"""Searches an Audio-Gateway SDP record from remote device.
Args:
connection: ACL connection to make SDP search.
Returns:
Dictionary mapping from channel number to service class UUID list.
"""
async with sdp.Client(connection) as sdp_client:
search_result = await sdp_client.search_attributes(
uuids=[BT_HEADSET_AUDIO_GATEWAY_SERVICE],
attribute_ids=[
sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
],
)
for attribute_lists in search_result:
channel: Optional[int] = None
version: Optional[ProfileVersion] = None
features: Optional[AgSdpFeature] = None
for attribute in attribute_lists:
# The layout is [[L2CAP_PROTOCOL], [RFCOMM_PROTOCOL, RFCOMM_CHANNEL]].
if attribute.id == sdp.SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID:
protocol_descriptor_list = attribute.value.value
channel = protocol_descriptor_list[1].value[1].value
elif (
attribute.id
== sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
):
profile_descriptor_list = attribute.value.value
version = ProfileVersion(profile_descriptor_list[0].value[1].value)
elif attribute.id == sdp.SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID:
features = AgSdpFeature(attribute.value.value)
if not channel or not version or features is None:
logger.warning(f"Bad result {attribute_lists}.")
return None
return (channel, version, features)
return None
# -----------------------------------------------------------------------------
# ESCO Codec Default Parameters
# -----------------------------------------------------------------------------

View File

@@ -184,7 +184,7 @@ class Host(AbortableEventEmitter):
self.long_term_key_provider = None
self.link_key_provider = None
self.pairing_io_capability_provider = None # Classic only
self.snooper = None
self.snooper: Optional[Snooper] = None
# Connect to the source and sink if specified
if controller_source:

View File

@@ -19,8 +19,8 @@
import struct
from typing import Optional, Tuple
from ..gatt_client import ProfileServiceProxy
from ..gatt import (
from bumble.gatt_client import ServiceProxy, ProfileServiceProxy, CharacteristicProxy
from bumble.gatt import (
GATT_DEVICE_INFORMATION_SERVICE,
GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC,
GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC,
@@ -104,7 +104,16 @@ class DeviceInformationService(TemplateService):
class DeviceInformationServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = DeviceInformationService
def __init__(self, service_proxy):
manufacturer_name: Optional[UTF8CharacteristicAdapter]
model_number: Optional[UTF8CharacteristicAdapter]
serial_number: Optional[UTF8CharacteristicAdapter]
hardware_revision: Optional[UTF8CharacteristicAdapter]
firmware_revision: Optional[UTF8CharacteristicAdapter]
software_revision: Optional[UTF8CharacteristicAdapter]
system_id: Optional[DelegatedCharacteristicAdapter]
ieee_regulatory_certification_data_list: Optional[CharacteristicProxy]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
for field, uuid in (

View File

@@ -18,6 +18,7 @@
from __future__ import annotations
import asyncio
import logging
import socket
from .common import Transport, StreamPacketSource
@@ -28,6 +29,13 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# A pass-through function to ease mock testing.
async def _create_server(*args, **kw_args):
await asyncio.get_running_loop().create_server(*args, **kw_args)
async def open_tcp_server_transport(spec: str) -> Transport:
'''
Open a TCP server transport.
@@ -38,7 +46,22 @@ async def open_tcp_server_transport(spec: str) -> Transport:
Example: _:9001
'''
local_host, local_port = spec.split(':')
return await _open_tcp_server_transport_impl(
host=local_host if local_host != '_' else None, port=int(local_port)
)
async def open_tcp_server_transport_with_socket(sock: socket.socket) -> Transport:
'''
Open a TCP server transport with an existing socket.
One reason to use this variant is to let python pick an unused port.
'''
return await _open_tcp_server_transport_impl(sock=sock)
async def _open_tcp_server_transport_impl(**kwargs) -> Transport:
class TcpServerTransport(Transport):
async def close(self):
await super().close()
@@ -77,13 +100,10 @@ async def open_tcp_server_transport(spec: str) -> Transport:
else:
logger.debug('no client, dropping packet')
local_host, local_port = spec.split(':')
packet_source = StreamPacketSource()
packet_sink = TcpServerPacketSink()
await asyncio.get_running_loop().create_server(
lambda: TcpServerProtocol(packet_source, packet_sink),
host=local_host if local_host != '_' else None,
port=int(local_port),
await _create_server(
lambda: TcpServerProtocol(packet_source, packet_sink), **kwargs
)
return TcpServerTransport(packet_source, packet_sink)

View File

@@ -61,7 +61,7 @@ async def func4(x, y):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
print("MAIN: start, loop=", asyncio.get_running_loop())
print("MAIN: invoke func1")
func1(1, 2)

View File

@@ -21,23 +21,29 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.battery_service import BatteryServiceProxy
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -29,14 +29,16 @@ from bumble.profiles.battery_service import BatteryService
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python battery_server.py <device-config> <transport-spec>')
print('example: python battery_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Battery Service to the GATT sever
battery_service = BatteryService(lambda _: random.randint(0, 100))

View File

@@ -21,12 +21,13 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.profiles.device_information_service import DeviceInformationServiceProxy
from bumble.transport import open_transport
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print(
'Usage: device_information_client.py <transport-spec> <bluetooth-address>'
@@ -35,11 +36,16 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -28,14 +28,16 @@ from bumble.profiles.device_information_service import DeviceInformationService
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python device_info_server.py <device-config> <transport-spec>')
print('example: python device_info_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Add a Device Information Service to the GATT sever
device_information_service = DeviceInformationService(
@@ -64,7 +66,7 @@ async def main():
# Go!
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -21,23 +21,29 @@ import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport
from bumble.profiles.heart_rate_service import HeartRateServiceProxy
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: heart_rate_client.py <transport-spec> <bluetooth-address>')
print('example: heart_rate_client.py usb:0 E1:CA:72:48:C4:E8')
return
print('<<< connecting to HCI...')
async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport(sys.argv[1]) as hci_transport:
print('<<< connected')
# Create and start a device
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
await device.power_on()
# Connect to the peer

View File

@@ -33,14 +33,16 @@ from bumble.utils import AsyncRunner
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
print('example: python heart_rate_server.py device1.json usb:0')
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Keep track of accumulated expended energy
energy_start_time = time.time()

View File

@@ -416,7 +416,7 @@ async def keyboard_device(device, command):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: python keyboard.py <device-config> <transport-spec> <command>'
@@ -434,9 +434,11 @@ async def main():
)
return
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
command = sys.argv[3]
if command == 'connect':

View File

@@ -139,18 +139,20 @@ async def find_a2dp_service(connection):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print('Usage: run_a2dp_info.py <device-config> <transport-spec> <bt-addr>')
print('example: run_a2dp_info.py classic1.json usb:0 14:7D:DA:4E:53:A8')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Start the controller
@@ -187,7 +189,7 @@ async def main():
client = await AVDTP_Protocol.connect(connection, avdtp_version)
# Discover all endpoints on the remote device
endpoints = await client.discover_remote_endpoints()
endpoints = list(await client.discover_remote_endpoints())
print(f'@@@ Found {len(endpoints)} endpoints')
for endpoint in endpoints:
print('@@@', endpoint)

View File

@@ -19,6 +19,7 @@ import asyncio
import sys
import os
import logging
from typing import Any, Dict
from bumble.device import Device
from bumble.transport import open_transport_or_link
@@ -41,7 +42,7 @@ from bumble.a2dp import (
SbcMediaCodecInformation,
)
Context = {'output': None}
Context: Dict[Any, Any] = {'output': None}
# -----------------------------------------------------------------------------
@@ -104,7 +105,7 @@ def on_rtp_packet(packet):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_sink.py <device-config> <transport-spec> <sbc-file> '
@@ -114,14 +115,16 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
with open(sys.argv[3], 'wb') as sbc_file:
Context['output'] = sbc_file
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the sink service
@@ -162,7 +165,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -114,7 +114,7 @@ async def stream_packets(read_function, protocol):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_a2dp_source.py <device-config> <transport-spec> <sbc-file> '
@@ -126,11 +126,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the SRC service
@@ -186,7 +188,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -28,7 +28,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_advertiser.py <config-file> <transport-spec> [type] [address]'
@@ -50,10 +50,12 @@ async def main():
target = None
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
if advertising_type.is_scannable:
device.scan_response_data = bytes(
@@ -66,7 +68,7 @@ async def main():
await device.power_on()
await device.start_advertising(advertising_type=advertising_type, target=target)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -49,7 +49,7 @@ ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: python run_asha_sink.py <device-config> <transport-spec> '
@@ -60,8 +60,10 @@ async def main():
audio_out = open(sys.argv[3], 'wb')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
# Handler for audio control commands
def on_audio_control_point_write(_connection, value):
@@ -197,7 +199,7 @@ async def main():
await device.power_on()
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -331,7 +331,7 @@ class Delegate(avrcp.Delegate):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_avrcp_controller.py <device-config> <transport-spec> '
@@ -341,11 +341,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Setup the SDP to expose the sink service

View File

@@ -32,7 +32,7 @@ from bumble.sdp import (
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_classic_connect.py <device-config> <transport-spec> '
@@ -42,11 +42,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.le_enabled = False
await device.power_on()

View File

@@ -91,18 +91,20 @@ SDP_SERVICE_RECORDS = {
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_classic_discoverable.py <device-config> <transport-spec>')
print('example: run_classic_discoverable.py classic1.json usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
device.sdp_service_records = SDP_SERVICE_RECORDS
await device.power_on()
@@ -111,7 +113,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,8 +20,8 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device
from bumble.hci import Address
from bumble.transport import open_transport_or_link
from bumble.core import DeviceClass
@@ -53,22 +53,27 @@ class DiscoveryListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 2:
print('Usage: run_classic_discovery.py <transport-spec>')
print('example: run_classic_discovery.py usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
device.listener = DiscoveryListener()
await device.power_on()
await device.start_discovery()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -25,7 +25,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_connect_and_encrypt.py <device-config> <transport-spec> '
@@ -37,11 +37,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
await device.power_on()
# Connect to the peer
@@ -56,7 +58,7 @@ async def main():
print(f'!!! Encryption failed: {error}')
return
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -36,7 +36,7 @@ from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 4:
print(
'Usage: run_controller.py <controller-address> <device-config> '
@@ -49,7 +49,7 @@ async def main():
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[3]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[3]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -57,7 +57,10 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link
'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
)
controller1.random_address = sys.argv[1]
@@ -98,7 +101,7 @@ async def main():
await device.start_advertising()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,9 +20,9 @@ import asyncio
import sys
import os
from bumble.colors import color
from bumble.device import Device
from bumble.controller import Controller
from bumble.hci import Address
from bumble.link import LocalLink
from bumble.transport import open_transport_or_link
@@ -45,14 +45,14 @@ class ScannerListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 2:
print('Usage: run_controller.py <transport-spec>')
print('example: run_controller_with_scanner.py serial:/dev/pts/14,1000000')
return
print('>>> connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('>>> connected')
# Create a local link
@@ -60,22 +60,25 @@ async def main():
# Create a first controller using the packet source/sink as its host interface
controller1 = Controller(
'C1', host_source=hci_source, host_sink=hci_sink, link=link
'C1',
host_source=hci_transport.source,
host_sink=hci_transport.sink,
link=link,
public_address='E0:E1:E2:E3:E4:E5',
)
controller1.address = 'E0:E1:E2:E3:E4:E5'
# Create a second controller using the same link
controller2 = Controller('C2', link=link)
# Create a device with a scanner listener
device = Device.with_hci(
'Bumble', 'F0:F1:F2:F3:F4:F5', controller2, controller2
'Bumble', Address('F0:F1:F2:F3:F4:F5'), controller2, controller2
)
device.listener = ScannerListener()
await device.power_on()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,31 +20,36 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.snoop import BtSnooper
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) != 3:
print('Usage: run_device_with_snooper.py <transport-spec> <snoop-file>')
print('example: run_device_with_snooper.py usb:0 btsnoop.log')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
with open(sys.argv[2], "wb") as snoop_file:
device.host.snooper = BtSnooper(snoop_file)
await device.power_on()
await device.start_scanning()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -69,7 +69,7 @@ class Listener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_client.py <device-config> <transport-spec> '
@@ -79,11 +79,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
await device.power_on()

View File

@@ -19,21 +19,21 @@ import asyncio
import os
import logging
from bumble.colors import color
from bumble.core import ProtocolError
from bumble.controller import Controller
from bumble.device import Device, Peer
from bumble.hci import Address
from bumble.host import Host
from bumble.link import LocalLink
from bumble.gatt import (
Service,
Characteristic,
Descriptor,
show_services,
GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR,
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
GATT_DEVICE_INFORMATION_SERVICE,
)
from bumble.gatt_client import show_services
# -----------------------------------------------------------------------------
@@ -43,7 +43,7 @@ class ServerListener(Device.Listener):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
# Create a local link
link = LocalLink()
@@ -51,14 +51,18 @@ async def main():
client_controller = Controller("client controller", link=link)
client_host = Host()
client_host.controller = client_controller
client_device = Device("client", address='F0:F1:F2:F3:F4:F5', host=client_host)
client_device = Device(
"client", address=Address('F0:F1:F2:F3:F4:F5'), host=client_host
)
await client_device.power_on()
# Setup a stack for the server
server_controller = Controller("server controller", link=link)
server_host = Host()
server_host.controller = server_controller
server_device = Device("server", address='F6:F7:F8:F9:FA:FB', host=server_host)
server_device = Device(
"server", address=Address('F6:F7:F8:F9:FA:FB'), host=server_host
)
server_device.listener = ServerListener()
await server_device.power_on()

View File

@@ -71,7 +71,7 @@ def my_custom_write_with_error(connection, value):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_gatt_server.py <device-config> <transport-spec> '
@@ -81,11 +81,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
# Add a few entries to the device's GATT server
@@ -146,7 +148,7 @@ async def main():
else:
await device.start_advertising(auto_restart=True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,123 +20,48 @@ import sys
import os
import logging
from bumble.colors import color
import bumble.core
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.core import (
BT_HANDSFREE_SERVICE,
BT_RFCOMM_PROTOCOL_ID,
BT_BR_EDR_TRANSPORT,
)
from bumble import rfcomm, hfp
from bumble.hci import HCI_SynchronousDataPacket
from bumble.sdp import (
Client as SDP_Client,
DataElement,
ServiceAttribute,
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# pylint: disable-next=too-many-nested-blocks
async def list_rfcomm_channels(device, connection):
# Connect to the SDP Server
sdp_client = SDP_Client(connection)
await sdp_client.connect()
# Search for services that support the Handsfree Profile
search_result = await sdp_client.search_attributes(
[BT_HANDSFREE_SERVICE],
[
SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
def _default_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
print(color('==================================', 'blue'))
print(color('Handsfree Services:', 'yellow'))
rfcomm_channels = []
# pylint: disable-next=too-many-nested-blocks
for attribute_list in search_result:
# Look for the RFCOMM Channel number
protocol_descriptor_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID
)
if protocol_descriptor_list:
for protocol_descriptor in protocol_descriptor_list.value:
if len(protocol_descriptor.value) >= 2:
if protocol_descriptor.value[0].value == BT_RFCOMM_PROTOCOL_ID:
print(color('SERVICE:', 'green'))
print(
color(' RFCOMM Channel:', 'cyan'),
protocol_descriptor.value[1].value,
)
rfcomm_channels.append(protocol_descriptor.value[1].value)
# List profiles
bluetooth_profile_descriptor_list = (
ServiceAttribute.find_attribute_in_list(
attribute_list,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
)
if bluetooth_profile_descriptor_list:
if bluetooth_profile_descriptor_list.value:
if (
bluetooth_profile_descriptor_list.value[0].type
== DataElement.SEQUENCE
):
bluetooth_profile_descriptors = (
bluetooth_profile_descriptor_list.value
)
else:
# Sometimes, instead of a list of lists, we just
# find a list. Fix that
bluetooth_profile_descriptors = [
bluetooth_profile_descriptor_list
]
print(color(' Profiles:', 'green'))
for (
bluetooth_profile_descriptor
) in bluetooth_profile_descriptors:
version_major = (
bluetooth_profile_descriptor.value[1].value >> 8
)
version_minor = (
bluetooth_profile_descriptor.value[1].value
& 0xFF
)
print(
' '
f'{bluetooth_profile_descriptor.value[0].value}'
f' - version {version_major}.{version_minor}'
)
# List service classes
service_class_id_list = ServiceAttribute.find_attribute_in_list(
attribute_list, SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID
)
if service_class_id_list:
if service_class_id_list.value:
print(color(' Service Classes:', 'green'))
for service_class_id in service_class_id_list.value:
print(' ', service_class_id.value)
await sdp_client.disconnect()
return rfcomm_channels
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_hfp_gateway.py <device-config> <transport-spec> '
@@ -149,11 +74,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
await device.power_on()
@@ -164,13 +91,14 @@ async def main():
print(f'=== Connected to {connection.peer_address}!')
# Get a list of all the Handsfree services (should only be 1)
channels = await list_rfcomm_channels(device, connection)
if len(channels) == 0:
if not (hfp_record := await hfp.find_hf_sdp_record(connection)):
print('!!! no service found')
return
# Pick the first one
channel = channels[0]
channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}')
# Request authentication
print('*** Authenticating...')
@@ -205,51 +133,9 @@ async def main():
device.host.on('sco_packet', on_sco)
# Protocol loop (just for testing at this point)
protocol = hfp.HfpProtocol(session)
while True:
line = await protocol.next_line()
ag_protocol = hfp.AgProtocol(session, _default_configuration())
if line.startswith('AT+BRSF='):
protocol.send_response_line('+BRSF: 30')
protocol.send_response_line('OK')
elif line.startswith('AT+CIND=?'):
protocol.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
)
protocol.send_response_line('OK')
elif line.startswith('AT+CIND?'):
protocol.send_response_line('+CIND: 0,0,1,4,1,5,0')
protocol.send_response_line('OK')
elif line.startswith('AT+CMER='):
protocol.send_response_line('OK')
elif line.startswith('AT+CHLD=?'):
protocol.send_response_line('+CHLD: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+BTRH?'):
protocol.send_response_line('+BTRH: 0')
protocol.send_response_line('OK')
elif line.startswith('AT+CLIP='):
protocol.send_response_line('OK')
elif line.startswith('AT+VGS='):
protocol.send_response_line('OK')
elif line.startswith('AT+BIA='):
protocol.send_response_line('OK')
elif line.startswith('AT+BVRA='):
protocol.send_response_line(
'+BVRA: 1,1,12AA,1,1,"Message 1 from Janina"'
)
elif line.startswith('AT+XEVENT='):
protocol.send_response_line('OK')
elif line.startswith('AT+XAPL='):
protocol.send_response_line('OK')
else:
print(color('UNSUPPORTED AT COMMAND', 'red'))
protocol.send_response_line('ERROR')
await hci_source.wait_for_termination()
await hci_transport.source.terminated
# -----------------------------------------------------------------------------

View File

@@ -37,7 +37,7 @@ hf_protocol: Optional[HfProtocol] = None
# -----------------------------------------------------------------------------
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
def on_dlc(dlc: rfcomm.DLC, configuration: hfp.HfConfiguration):
print('*** DLC connected', dlc)
global hf_protocol
hf_protocol = HfProtocol(dlc, configuration)
@@ -84,19 +84,19 @@ def on_dlc(dlc: rfcomm.DLC, configuration: hfp.Configuration):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_classic_hfp.py <device-config> <transport-spec>')
print('example: run_classic_hfp.py classic2.json usb:04b4:f901')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Hands-Free profile configuration.
# TODO: load configuration from file.
configuration = hfp.Configuration(
configuration = hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.THREE_WAY_CALLING,
hfp.HfFeature.REMOTE_VOLUME_CONTROL,
@@ -116,7 +116,9 @@ async def main():
)
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register a server
@@ -128,7 +130,9 @@ async def main():
# Advertise the HFP RFComm channel in the SDP
device.sdp_service_records = {
0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
0x00010001: hfp.make_hf_sdp_records(
0x00010001, channel_number, configuration
)
}
# Let's go!
@@ -164,7 +168,7 @@ async def main():
await websockets.serve(serve, 'localhost', 8989)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -489,7 +489,7 @@ async def keyboard_device(hid_device):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: python run_hid_device.py <device-config> <transport-spec> <command>'
@@ -601,11 +601,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create and register HID device
@@ -742,7 +744,7 @@ async def main():
print("Executing in Web mode")
await keyboard_device(hid_device)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -275,7 +275,7 @@ async def get_stream_reader(pipe) -> asyncio.StreamReader:
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_hid_host.py <device-config> <transport-spec> '
@@ -324,11 +324,13 @@ async def main():
asyncio.create_task(handle_virtual_cable_unplug())
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< CONNECTED')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create HID host and start it
@@ -557,7 +559,7 @@ async def main():
# Interrupt Channel
await hid_host.connect_interrupt_channel()
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -57,18 +57,20 @@ def on_my_characteristic_subscription(peer, enabled):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_notifier.py <device-config> <transport-spec>')
print('example: run_notifier.py device1.json usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device to manage the host
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.listener = Listener(device)
# Add a few entries to the device's GATT server

View File

@@ -165,7 +165,7 @@ async def tcp_server(tcp_port, rfcomm_session):
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 5:
print(
'Usage: run_rfcomm_client.py <device-config> <transport-spec> '
@@ -178,11 +178,13 @@ async def main():
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
await device.power_on()
@@ -192,8 +194,8 @@ async def main():
connection = await device.connect(target_address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}!')
channel = sys.argv[4]
if channel == 'discover':
channel_str = sys.argv[4]
if channel_str == 'discover':
await list_rfcomm_channels(connection)
return
@@ -213,7 +215,7 @@ async def main():
rfcomm_mux = await rfcomm_client.start()
print('@@@ Started')
channel = int(channel)
channel = int(channel_str)
print(f'### Opening session for channel {channel}...')
try:
session = await rfcomm_mux.open_dlc(channel)
@@ -229,7 +231,7 @@ async def main():
tcp_port = int(sys.argv[5])
asyncio.create_task(tcp_server(tcp_port, session))
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -107,7 +107,7 @@ class TcpServer:
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 4:
print(
'Usage: run_rfcomm_server.py <device-config> <transport-spec> '
@@ -124,11 +124,13 @@ async def main():
uuid = 'E6D55659-C8B4-4B85-96BB-B1143AF6D3AE'
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.classic_enabled = True
# Create a TCP server
@@ -153,7 +155,7 @@ async def main():
await device.set_discoverable(True)
await device.set_connectable(True)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -20,27 +20,31 @@ import sys
import os
import logging
from bumble.colors import color
from bumble.hci import Address
from bumble.device import Device
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main():
async def main() -> None:
if len(sys.argv) < 2:
print('Usage: run_scanner.py <transport-spec> [filter]')
print('example: run_scanner.py usb:0')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[1]) as (hci_source, hci_sink):
async with await open_transport_or_link(sys.argv[1]) as hci_transport:
print('<<< connected')
filter_duplicates = len(sys.argv) == 3 and sys.argv[2] == 'filter'
device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
device = Device.with_hci(
'Bumble',
Address('F0:F1:F2:F3:F4:F5'),
hci_transport.source,
hci_transport.sink,
)
@device.on('advertisement')
def _(advertisement):
def on_adv(advertisement):
address_type_string = ('PUBLIC', 'RANDOM', 'PUBLIC_ID', 'RANDOM_ID')[
advertisement.address.address_type
]
@@ -67,10 +71,11 @@ async def main():
f'{advertisement.data.to_string(separator)}'
)
device.on('advertisement', on_adv)
await device.power_on()
await device.start_scanning(filter_duplicates=filter_duplicates)
await hci_source.wait_for_termination()
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------

View File

@@ -33,21 +33,20 @@ include_package_data = True
install_requires =
aiohttp ~= 3.8; platform_system!='Emscripten'
appdirs >= 1.4; platform_system!='Emscripten'
bt-test-interfaces >= 0.0.2; platform_system!='Emscripten'
click == 8.1.3; platform_system!='Emscripten'
click >= 8.1.3; platform_system!='Emscripten'
cryptography == 39; platform_system!='Emscripten'
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
cryptography >= 39.0; platform_system=='Emscripten'
grpcio == 1.57.0; platform_system!='Emscripten'
grpcio >= 1.62.1; platform_system!='Emscripten'
humanize >= 4.6.0; platform_system!='Emscripten'
libusb1 >= 2.0.1; platform_system!='Emscripten'
libusb-package == 1.0.26.1; platform_system!='Emscripten'
platformdirs == 3.10.0; platform_system!='Emscripten'
platformdirs >= 3.10.0; platform_system!='Emscripten'
prompt_toolkit >= 3.0.16; platform_system!='Emscripten'
prettytable >= 3.6.0; platform_system!='Emscripten'
protobuf >= 3.12.4; platform_system!='Emscripten'
protobuf >= 4.24.2; platform_system!='Emscripten'
pyee >= 8.2.2
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
@@ -83,12 +82,12 @@ build =
build >= 0.7
test =
pytest >= 8.0
pytest-asyncio == 0.21.1
pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0
coverage >= 6.4
development =
black == 24.3
grpcio-tools >= 1.57.0
grpcio-tools >= 1.62.1
invoke >= 1.7.3
mypy == 1.8.0
nox >= 2022
@@ -98,8 +97,10 @@ development =
types-invoke >= 1.7.3
types-protobuf >= 4.21.0
avatar =
pandora-avatar == 0.0.8
rootcanal == 1.9.0 ; python_version>='3.10'
pandora-avatar == 0.0.9
rootcanal == 1.10.0 ; python_version>='3.10'
pandora =
bt-test-interfaces >= 0.0.6
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6

View File

@@ -19,8 +19,9 @@ import asyncio
import logging
import os
import pytest
import pytest_asyncio
from typing import Tuple
from typing import Tuple, Optional
from .test_utils import TwoDevices
from bumble import core
@@ -35,10 +36,73 @@ from bumble import hci
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def _default_hf_configuration() -> hfp.HfConfiguration:
return hfp.HfConfiguration(
supported_hf_features=[
hfp.HfFeature.CODEC_NEGOTIATION,
hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
hfp.HfFeature.HF_INDICATORS,
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_audio_codecs=[
hfp.AudioCodec.CVSD,
hfp.AudioCodec.MSBC,
],
)
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return hfp.HfSdpFeature.WIDE_BAND
# -----------------------------------------------------------------------------
def _default_ag_configuration() -> hfp.AgConfiguration:
return hfp.AgConfiguration(
supported_ag_features=[
hfp.AgFeature.HF_INDICATORS,
hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY,
hfp.AgFeature.REJECT_CALL,
hfp.AgFeature.CODEC_NEGOTIATION,
hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED,
],
supported_ag_indicators=[
hfp.AgIndicatorState.call(),
hfp.AgIndicatorState.service(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.callsetup(),
hfp.AgIndicatorState.signal(),
hfp.AgIndicatorState.roam(),
hfp.AgIndicatorState.battchg(),
],
supported_hf_indicators=[
hfp.HfIndicator.ENHANCED_SAFETY,
hfp.HfIndicator.BATTERY_LEVEL,
],
supported_ag_call_hold_operations=[],
supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC],
)
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return hfp.AgSdpFeature.WIDE_BAND | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
# -----------------------------------------------------------------------------
async def make_hfp_connections(
hf_config: hfp.Configuration,
) -> Tuple[hfp.HfProtocol, hfp.HfpProtocol]:
hf_config: Optional[hfp.HfConfiguration] = None,
ag_config: Optional[hfp.AgConfiguration] = None,
):
if not hf_config:
hf_config = _default_hf_configuration()
if not ag_config:
ag_config = _default_ag_configuration()
# Setup devices
devices = TwoDevices()
await devices.setup_connection()
@@ -55,38 +119,200 @@ async def make_hfp_connections(
# Setup HFP connection
hf = hfp.HfProtocol(client_dlc, hf_config)
ag = hfp.HfpProtocol(server_dlc)
return hf, ag
ag = hfp.AgProtocol(server_dlc, ag_config)
await hf.initiate_slc()
return (hf, ag)
# -----------------------------------------------------------------------------
@pytest_asyncio.fixture
async def hfp_connections():
hf, ag = await make_hfp_connections()
hf_loop_task = asyncio.create_task(hf.run())
try:
yield (hf, ag)
finally:
# Close the coroutine.
hf.unsolicited_queue.put_nowait(None)
await hf_loop_task
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc():
hf_config = hfp.Configuration(
supported_hf_features=[], supported_hf_indicators=[], supported_audio_codecs=[]
)
hf, ag = await make_hfp_connections(hf_config)
async def ag_loop():
while line := await ag.next_line():
if line.startswith('AT+BRSF'):
ag.send_response_line('+BRSF: 0')
elif line.startswith('AT+CIND=?'):
ag.send_response_line(
'+CIND: ("call",(0,1)),("callsetup",(0-3)),("service",(0-1)),'
'("signal",(0-5)),("roam",(0,1)),("battchg",(0-5)),'
'("callheld",(0-2))'
async def test_slc_with_minimal_features():
hf, ag = await make_hfp_connections(
hfp.HfConfiguration(
supported_audio_codecs=[],
supported_hf_features=[],
supported_hf_indicators=[],
),
hfp.AgConfiguration(
supported_ag_call_hold_operations=[],
supported_ag_features=[],
supported_ag_indicators=[
hfp.AgIndicatorState(
indicator=hfp.AgIndicator.CALL,
supported_values={0, 1},
current_status=0,
)
elif line.startswith('AT+CIND?'):
ag.send_response_line('+CIND: 0,0,1,4,1,5,0')
ag.send_response_line('OK')
],
supported_hf_indicators=[],
supported_audio_codecs=[],
),
)
ag_task = asyncio.create_task(ag_loop())
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
await hf.initiate_slc()
ag_task.cancel()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
assert hf.supported_ag_features == ag.supported_ag_features
assert hf.supported_hf_features == ag.supported_hf_features
for a, b in zip(hf.ag_indicators, ag.ag_indicators):
assert a.indicator == b.indicator
assert a.current_status == b.current_status
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
hf.on('ag_indicator', future.set_result)
ag.update_ag_indicator(hfp.AgIndicator.CALL, 1)
indicator: hfp.AgIndicatorState = await future
assert indicator.current_status == 1
assert indicator.indicator == hfp.AgIndicator.CALL
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hf_indicator', future.set_result)
await hf.execute_command('AT+BIEV=2,100')
indicator: hfp.HfIndicatorState = await future
assert indicator.current_status == 100
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
futures = [
asyncio.get_running_loop().create_future(),
asyncio.get_running_loop().create_future(),
]
hf.on('codec_negotiation', futures[0].set_result)
ag.on('codec_negotiation', futures[1].set_result)
await ag.negotiate_codec(hfp.AudioCodec.MSBC)
assert await futures[0] == await futures[1]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
NUMBER = 'ATD123456789'
future = asyncio.get_running_loop().create_future()
ag.on('dial', future.set_result)
await hf.execute_command(f'ATD{NUMBER}')
number: str = await future
assert number == NUMBER
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('answer', lambda: future.set_result(None))
await hf.answer_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]
):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.reject_incoming_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]):
hf, ag = hfp_connections
future = asyncio.get_running_loop().create_future()
ag.on('hang_up', lambda: future.set_result(None))
await hf.terminate_call()
await future
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records(
1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_hf_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_hf_sdp_features(),
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_sdp_record():
devices = TwoDevices()
await devices.setup_connection()
devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records(
1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8
)
assert await hfp.find_ag_sdp_record(devices.connections[1]) == (
2,
hfp.ProfileVersion.V1_8,
_default_ag_sdp_features(),
)
# -----------------------------------------------------------------------------

View File

@@ -0,0 +1,64 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import pytest
import socket
import unittest
from unittest.mock import ANY, patch
from bumble.transport.tcp_server import (
open_tcp_server_transport,
open_tcp_server_transport_with_socket,
)
class OpenTcpServerTransportTests(unittest.TestCase):
def setUp(self):
self.patcher = patch('bumble.transport.tcp_server._create_server')
self.mock_create_server = self.patcher.start()
def tearDown(self):
self.patcher.stop()
def test_open_with_spec(self):
asyncio.run(open_tcp_server_transport('localhost:32100'))
self.mock_create_server.assert_awaited_once_with(
ANY, host='localhost', port=32100
)
def test_open_with_port_only_spec(self):
asyncio.run(open_tcp_server_transport('_:32100'))
self.mock_create_server.assert_awaited_once_with(ANY, host=None, port=32100)
def test_open_with_socket(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
asyncio.run(open_tcp_server_transport_with_socket(sock=sock))
self.mock_create_server.assert_awaited_once_with(ANY, sock=sock)
@pytest.mark.skipif(
not os.environ.get('PYTEST_NOSKIP', 0),
reason='''\
Not hermetic. Should only run manually with
$ PYTEST_NOSKIP=1 pytest tests
''',
)
def test_open_with_real_socket():
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(('localhost', 0))
port = sock.getsockname()[1]
assert port != 0
asyncio.run(open_tcp_server_transport_with_socket(sock=sock))