Compare commits

...

13 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod c034297bc0 update to black formatter 25.1 2025-08-02 21:11:34 -07:00
Gilles Boccon-Gibod a1eff958e6 do not wait for display 2025-08-02 21:10:45 -07:00
Gilles Boccon-Gibod efdc770fde Merge pull request #737 from leifdreizler/fix-spdx-license
Update license field to use proper SPDX identifier
2025-08-02 11:22:58 -07:00
Leif 357d7f9c22 Update pyproject.toml 2025-08-02 08:18:36 -04:00
Leif Dreizler 3bc08b4e0d Update license field to use proper SPDX identifier
This changes the license field to be a valid [SPDX identifier](https://spdx.org/licenses) aligning with [PEP 639](https://peps.python.org/pep-0639/#project-source-metadata). This populates the `license_expression` field in the PyPI API and is used by downstream tools including deps.dev

These changes were generated by Claude after reviewing the license and manifest files in your repository, but opened and reviewed by me. Please let me know if the analysis is incorrect and thanks for being an OSS maintainer.
2025-08-01 20:19:25 -04:00
Gilles Boccon-Gibod 1dc0950177 Merge pull request #730 from google/gbg/apple-media-service
basic AMS implementation
2025-07-29 22:34:25 -07:00
zxzxwu df0fd74533 Merge pull request #733 from zxzxwu/l2cap
Fix L2CAP_Control_Frame errors
2025-07-30 13:12:44 +08:00
Josh Wu 822f97fa84 Fix L2CAP errors 2025-07-30 12:00:20 +08:00
Gilles Boccon-Gibod 4a6b0ef840 Merge pull request #732 from google/gbg/722
fix #722
2025-07-29 10:50:02 -07:00
Gilles Boccon-Gibod a6ead0147e fix #722 2025-07-28 13:36:55 -07:00
Gilles Boccon-Gibod 0665e9ca5c Merge pull request #731 from google/gbg/common-logger
use common logger
2025-07-28 10:22:30 -07:00
Gilles Boccon-Gibod bf8a2cdcb5 add discrete command methods 2025-07-26 20:24:55 -07:00
Gilles Boccon-Gibod 4bf7448a01 basic AMS implementation 2025-07-22 14:57:52 -07:00
14 changed files with 816 additions and 224 deletions
+6 -4
View File
@@ -94,15 +94,17 @@
"ycursor"
],
"[python]": {
"editor.rulers": [88]
"editor.rulers": [88],
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "black",
"pylint.importStrategy": "useBundled",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.pythonProjects": []
"python-envs.pythonProjects": [],
"python.terminal.launchArgs": [
]
}
+1 -1
View File
@@ -1117,7 +1117,7 @@ class Protocol(utils.EventEmitter):
@staticmethod
def _check_vendor_dependent_frame(
frame: Union[avc.VendorDependentCommandFrame, avc.VendorDependentResponseFrame]
frame: Union[avc.VendorDependentCommandFrame, avc.VendorDependentResponseFrame],
) -> bool:
if frame.company_id != AVRCP_BLUETOOTH_SIG_COMPANY_ID:
logger.debug("unsupported company id, ignoring")
+32 -22
View File
@@ -370,6 +370,12 @@ class Controller:
return connection
return None
def find_peripheral_connection_by_handle(self, handle):
for connection in self.peripheral_connections.values():
if connection.handle == handle:
return connection
return None
def find_classic_connection_by_handle(self, handle):
for connection in self.classic_connections.values():
if connection.handle == handle:
@@ -414,7 +420,7 @@ class Controller:
)
)
def on_link_central_disconnected(self, peer_address, reason):
def on_link_disconnected(self, peer_address, reason):
'''
Called when an active disconnection occurs from a peer
'''
@@ -431,6 +437,17 @@ class Controller:
# Remove the connection
del self.peripheral_connections[peer_address]
elif connection := self.central_connections.get(peer_address):
self.send_hci_packet(
HCI_Disconnection_Complete_Event(
status=HCI_SUCCESS,
connection_handle=connection.handle,
reason=reason,
)
)
# Remove the connection
del self.central_connections[peer_address]
else:
logger.warning(f'!!! No peripheral connection found for {peer_address}')
@@ -479,7 +496,7 @@ class Controller:
)
)
def on_link_peripheral_disconnection_complete(self, disconnection_command, status):
def on_link_disconnection_complete(self, disconnection_command, status):
'''
Called when a disconnection has been completed
'''
@@ -499,26 +516,11 @@ class Controller:
):
logger.debug(f'CENTRAL Connection removed: {connection}')
del self.central_connections[connection.peer_address]
def on_link_peripheral_disconnected(self, peer_address):
'''
Called when a connection to a peripheral is broken
'''
# Send a disconnection complete event
if connection := self.central_connections.get(peer_address):
self.send_hci_packet(
HCI_Disconnection_Complete_Event(
status=HCI_SUCCESS,
connection_handle=connection.handle,
reason=HCI_CONNECTION_TIMEOUT_ERROR,
)
)
# Remove the connection
del self.central_connections[peer_address]
else:
logger.warning(f'!!! No central connection found for {peer_address}')
elif connection := self.find_peripheral_connection_by_handle(
disconnection_command.connection_handle
):
logger.debug(f'PERIPHERAL Connection removed: {connection}')
del self.peripheral_connections[connection.peer_address]
def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk):
# For now, just setup the encryption without asking the host
@@ -877,6 +879,14 @@ class Controller:
else:
# Remove the connection
del self.central_connections[connection.peer_address]
elif connection := self.find_peripheral_connection_by_handle(handle):
if self.link:
self.link.disconnect(
self.random_address, connection.peer_address, command
)
else:
# Remove the connection
del self.peripheral_connections[connection.peer_address]
elif connection := self.find_classic_connection_by_handle(handle):
if self.link:
self.link.classic_disconnect(
+17 -30
View File
@@ -213,7 +213,7 @@ class L2CAP_Control_Frame:
fields: ClassVar[hci.Fields] = ()
code: int = dataclasses.field(default=0, init=False)
name: str = dataclasses.field(default='', init=False)
_data: Optional[bytes] = dataclasses.field(default=None, init=False)
_payload: Optional[bytes] = dataclasses.field(default=None, init=False)
identifier: int
@@ -223,7 +223,8 @@ class L2CAP_Control_Frame:
subclass = L2CAP_Control_Frame.classes.get(code)
if subclass is None:
instance = L2CAP_Control_Frame(pdu)
instance = L2CAP_Control_Frame(identifier=identifier)
instance.payload = pdu[4:]
instance.code = CommandCode(code)
instance.name = instance.code.name
return instance
@@ -232,11 +233,11 @@ class L2CAP_Control_Frame:
identifier=identifier,
)
frame.identifier = identifier
frame.data = pdu[4:]
if length != len(pdu):
frame.payload = pdu[4:]
if length != len(frame.payload):
logger.warning(
color(
f'!!! length mismatch: expected {len(pdu) - 4} but got {length}',
f'!!! length mismatch: expected {length} but got {len(frame.payload)}',
'red',
)
)
@@ -273,34 +274,20 @@ class L2CAP_Control_Frame:
return subclass
def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None:
self.identifier = kwargs.get('identifier', 0)
if self.fields:
if kwargs:
hci.HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
data = hci.HCI_Object.dict_to_bytes(kwargs, self.fields)
pdu = (
bytes([self.code, self.identifier])
+ struct.pack('<H', len(data))
+ data
)
self.data = pdu[4:] if pdu else b''
@property
def data(self) -> bytes:
if self._data is None:
self._data = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._data
def payload(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
@data.setter
def data(self, parameters: bytes) -> None:
self._data = parameters
@payload.setter
def payload(self, payload: bytes) -> None:
self._payload = payload
def __bytes__(self) -> bytes:
return (
struct.pack('<BBH', self.code, self.identifier, len(self.data) + 4)
+ self.data
struct.pack('<BBH', self.code, self.identifier, len(self.payload))
+ self.payload
)
def __str__(self) -> str:
@@ -308,8 +295,8 @@ class L2CAP_Control_Frame:
if fields := getattr(self, 'fields', None):
result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.data) > 1:
result += f': {self.data.hex()}'
if len(self.payload) > 1:
result += f': {self.payload.hex()}'
return result
+10 -10
View File
@@ -159,29 +159,29 @@ class LocalLink:
asyncio.get_running_loop().call_soon(self.on_connection_complete)
def on_disconnection_complete(
self, central_address, peripheral_address, disconnect_command
self, initiating_address, target_address, disconnect_command
):
# Find the controller that initiated the disconnection
if not (central_controller := self.find_controller(central_address)):
if not (initiating_controller := self.find_controller(initiating_address)):
logger.warning('!!! Initiating controller not found')
return
# Disconnect from the first controller with a matching address
if peripheral_controller := self.find_controller(peripheral_address):
peripheral_controller.on_link_central_disconnected(
central_address, disconnect_command.reason
if target_controller := self.find_controller(target_address):
target_controller.on_link_disconnected(
initiating_address, disconnect_command.reason
)
central_controller.on_link_peripheral_disconnection_complete(
initiating_controller.on_link_disconnection_complete(
disconnect_command, HCI_SUCCESS
)
def disconnect(self, central_address, peripheral_address, disconnect_command):
def disconnect(self, initiating_address, target_address, disconnect_command):
logger.debug(
f'$$$ DISCONNECTION {central_address} -> '
f'{peripheral_address}: reason = {disconnect_command.reason}'
f'$$$ DISCONNECTION {initiating_address} -> '
f'{target_address}: reason = {disconnect_command.reason}'
)
args = [central_address, peripheral_address, disconnect_command]
args = [initiating_address, target_address, disconnect_command]
asyncio.get_running_loop().call_soon(self.on_disconnection_complete, *args)
# pylint: disable=too-many-arguments
+1 -1
View File
@@ -49,7 +49,7 @@ _SERVICERS_HOOKS: list[Callable[[PandoraDevice, Config, grpc.aio.Server], None]]
def register_servicer_hook(
hook: Callable[[PandoraDevice, Config, grpc.aio.Server], None]
hook: Callable[[PandoraDevice, Config, grpc.aio.Server], None],
) -> None:
_SERVICERS_HOOKS.append(hook)
+404
View File
@@ -0,0 +1,404 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Apple Media Service (AMS).
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import dataclasses
import enum
import logging
from typing import Optional, Iterable, Union
from bumble.device import Peer
from bumble.gatt import (
Characteristic,
GATT_AMS_SERVICE,
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC,
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC,
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC,
TemplateService,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
from bumble import utils
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Protocol
# -----------------------------------------------------------------------------
class RemoteCommandId(utils.OpenIntEnum):
PLAY = 0
PAUSE = 1
TOGGLE_PLAY_PAUSE = 2
NEXT_TRACK = 3
PREVIOUS_TRACK = 4
VOLUME_UP = 5
VOLUME_DOWN = 6
ADVANCE_REPEAT_MODE = 7
ADVANCE_SHUFFLE_MODE = 8
SKIP_FORWARD = 9
SKIP_BACKWARD = 10
LIKE_TRACK = 11
DISLIKE_TRACK = 12
BOOKMARK_TRACK = 13
class EntityId(utils.OpenIntEnum):
PLAYER = 0
QUEUE = 1
TRACK = 2
class ActionId(utils.OpenIntEnum):
POSITIVE = 0
NEGATIVE = 1
class EntityUpdateFlags(enum.IntFlag):
TRUNCATED = 1
class PlayerAttributeId(utils.OpenIntEnum):
NAME = 0
PLAYBACK_INFO = 1
VOLUME = 2
class QueueAttributeId(utils.OpenIntEnum):
INDEX = 0
COUNT = 1
SHUFFLE_MODE = 2
REPEAT_MODE = 3
class ShuffleMode(utils.OpenIntEnum):
OFF = 0
ONE = 1
ALL = 2
class RepeatMode(utils.OpenIntEnum):
OFF = 0
ONE = 1
ALL = 2
class TrackAttributeId(utils.OpenIntEnum):
ARTIST = 0
ALBUM = 1
TITLE = 2
DURATION = 3
class PlaybackState(utils.OpenIntEnum):
PAUSED = 0
PLAYING = 1
REWINDING = 2
FAST_FORWARDING = 3
@dataclasses.dataclass
class PlaybackInfo:
playback_state: PlaybackState = PlaybackState.PAUSED
playback_rate: float = 1.0
elapsed_time: float = 0.0
# -----------------------------------------------------------------------------
# GATT Server-side
# -----------------------------------------------------------------------------
class Ams(TemplateService):
UUID = GATT_AMS_SERVICE
remote_command_characteristic: Characteristic
entity_update_characteristic: Characteristic
entity_attribute_characteristic: Characteristic
def __init__(self) -> None:
# TODO not the final implementation
self.remote_command_characteristic = Characteristic(
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC,
Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.Permissions.WRITEABLE,
)
# TODO not the final implementation
self.entity_update_characteristic = Characteristic(
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC,
Characteristic.Properties.NOTIFY | Characteristic.Properties.WRITE,
Characteristic.Permissions.WRITEABLE,
)
# TODO not the final implementation
self.entity_attribute_characteristic = Characteristic(
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.Permissions.WRITEABLE | Characteristic.Permissions.READABLE,
)
super().__init__(
[
self.remote_command_characteristic,
self.entity_update_characteristic,
self.entity_attribute_characteristic,
]
)
# -----------------------------------------------------------------------------
# GATT Client-side
# -----------------------------------------------------------------------------
class AmsProxy(ProfileServiceProxy):
SERVICE_CLASS = Ams
# NOTE: these don't use adapters, because the format for write and notifications
# are different.
remote_command: CharacteristicProxy[bytes]
entity_update: CharacteristicProxy[bytes]
entity_attribute: CharacteristicProxy[bytes]
def __init__(self, service_proxy: ServiceProxy):
self.remote_command = service_proxy.get_required_characteristic_by_uuid(
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC
)
self.entity_update = service_proxy.get_required_characteristic_by_uuid(
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC
)
self.entity_attribute = service_proxy.get_required_characteristic_by_uuid(
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC
)
class AmsClient(utils.EventEmitter):
EVENT_SUPPORTED_COMMANDS = "supported_commands"
EVENT_PLAYER_NAME = "player_name"
EVENT_PLAYER_PLAYBACK_INFO = "player_playback_info"
EVENT_PLAYER_VOLUME = "player_volume"
EVENT_QUEUE_COUNT = "queue_count"
EVENT_QUEUE_INDEX = "queue_index"
EVENT_QUEUE_SHUFFLE_MODE = "queue_shuffle_mode"
EVENT_QUEUE_REPEAT_MODE = "queue_repeat_mode"
EVENT_TRACK_ARTIST = "track_artist"
EVENT_TRACK_ALBUM = "track_album"
EVENT_TRACK_TITLE = "track_title"
EVENT_TRACK_DURATION = "track_duration"
supported_commands: set[RemoteCommandId]
player_name: str = ""
player_playback_info: PlaybackInfo = PlaybackInfo(PlaybackState.PAUSED, 0.0, 0.0)
player_volume: float = 1.0
queue_count: int = 0
queue_index: int = 0
queue_shuffle_mode: ShuffleMode = ShuffleMode.OFF
queue_repeat_mode: RepeatMode = RepeatMode.OFF
track_artist: str = ""
track_album: str = ""
track_title: str = ""
track_duration: float = 0.0
def __init__(self, ams_proxy: AmsProxy) -> None:
super().__init__()
self._ams_proxy = ams_proxy
self._started = False
self._read_attribute_semaphore = asyncio.Semaphore()
self.supported_commands = set()
@classmethod
async def for_peer(cls, peer: Peer) -> Optional[AmsClient]:
ams_proxy = await peer.discover_service_and_create_proxy(AmsProxy)
if ams_proxy is None:
return None
return cls(ams_proxy)
async def start(self) -> None:
logger.debug("subscribing to remote command characteristic")
await self._ams_proxy.remote_command.subscribe(
self._on_remote_command_notification
)
logger.debug("subscribing to entity update characteristic")
await self._ams_proxy.entity_update.subscribe(
lambda data: utils.AsyncRunner.spawn(
self._on_entity_update_notification(data)
)
)
self._started = True
async def stop(self) -> None:
await self._ams_proxy.remote_command.unsubscribe(
self._on_remote_command_notification
)
await self._ams_proxy.entity_update.unsubscribe(
self._on_entity_update_notification
)
self._started = False
async def observe(
self,
entity: EntityId,
attributes: Iterable[
Union[PlayerAttributeId, QueueAttributeId, TrackAttributeId]
],
) -> None:
await self._ams_proxy.entity_update.write_value(
bytes([entity] + list(attributes)), with_response=True
)
async def command(self, command: RemoteCommandId) -> None:
await self._ams_proxy.remote_command.write_value(
bytes([command]), with_response=True
)
async def play(self) -> None:
await self.command(RemoteCommandId.PLAY)
async def pause(self) -> None:
await self.command(RemoteCommandId.PAUSE)
async def toggle_play_pause(self) -> None:
await self.command(RemoteCommandId.TOGGLE_PLAY_PAUSE)
async def next_track(self) -> None:
await self.command(RemoteCommandId.NEXT_TRACK)
async def previous_track(self) -> None:
await self.command(RemoteCommandId.PREVIOUS_TRACK)
async def volume_up(self) -> None:
await self.command(RemoteCommandId.VOLUME_UP)
async def volume_down(self) -> None:
await self.command(RemoteCommandId.VOLUME_DOWN)
async def advance_repeat_mode(self) -> None:
await self.command(RemoteCommandId.ADVANCE_REPEAT_MODE)
async def advance_shuffle_mode(self) -> None:
await self.command(RemoteCommandId.ADVANCE_SHUFFLE_MODE)
async def skip_forward(self) -> None:
await self.command(RemoteCommandId.SKIP_FORWARD)
async def skip_backward(self) -> None:
await self.command(RemoteCommandId.SKIP_BACKWARD)
async def like_track(self) -> None:
await self.command(RemoteCommandId.LIKE_TRACK)
async def dislike_track(self) -> None:
await self.command(RemoteCommandId.DISLIKE_TRACK)
async def bookmark_track(self) -> None:
await self.command(RemoteCommandId.BOOKMARK_TRACK)
def _on_remote_command_notification(self, data: bytes) -> None:
supported_commands = [RemoteCommandId(command) for command in data]
logger.debug(
f"supported commands: {[command.name for command in supported_commands]}"
)
for command in supported_commands:
self.supported_commands.add(command)
self.emit(self.EVENT_SUPPORTED_COMMANDS)
async def _on_entity_update_notification(self, data: bytes) -> None:
entity = EntityId(data[0])
flags = EntityUpdateFlags(data[2])
value = data[3:]
if flags & EntityUpdateFlags.TRUNCATED:
logger.debug("truncated attribute, fetching full value")
# Write the entity and attribute we're interested in
# (protected by a semaphore, so that we only read one attribute at a time)
async with self._read_attribute_semaphore:
await self._ams_proxy.entity_attribute.write_value(
data[:2], with_response=True
)
value = await self._ams_proxy.entity_attribute.read_value()
if entity == EntityId.PLAYER:
player_attribute = PlayerAttributeId(data[1])
if player_attribute == PlayerAttributeId.NAME:
self.player_name = value.decode()
self.emit(self.EVENT_PLAYER_NAME)
elif player_attribute == PlayerAttributeId.PLAYBACK_INFO:
playback_state_str, playback_rate_str, elapsed_time_str = (
value.decode().split(",")
)
self.player_playback_info = PlaybackInfo(
PlaybackState(int(playback_state_str)),
float(playback_rate_str),
float(elapsed_time_str),
)
self.emit(self.EVENT_PLAYER_PLAYBACK_INFO)
elif player_attribute == PlayerAttributeId.VOLUME:
self.player_volume = float(value.decode())
self.emit(self.EVENT_PLAYER_VOLUME)
else:
logger.warning(f"received unknown player attribute {player_attribute}")
elif entity == EntityId.QUEUE:
queue_attribute = QueueAttributeId(data[1])
if queue_attribute == QueueAttributeId.COUNT:
self.queue_count = int(value)
self.emit(self.EVENT_QUEUE_COUNT)
elif queue_attribute == QueueAttributeId.INDEX:
self.queue_index = int(value)
self.emit(self.EVENT_QUEUE_INDEX)
elif queue_attribute == QueueAttributeId.REPEAT_MODE:
self.queue_repeat_mode = RepeatMode(int(value))
self.emit(self.EVENT_QUEUE_REPEAT_MODE)
elif queue_attribute == QueueAttributeId.SHUFFLE_MODE:
self.queue_shuffle_mode = ShuffleMode(int(value))
self.emit(self.EVENT_QUEUE_SHUFFLE_MODE)
else:
logger.warning(f"received unknown queue attribute {queue_attribute}")
elif entity == EntityId.TRACK:
track_attribute = TrackAttributeId(data[1])
if track_attribute == TrackAttributeId.ARTIST:
self.track_artist = value.decode()
self.emit(self.EVENT_TRACK_ARTIST)
elif track_attribute == TrackAttributeId.ALBUM:
self.track_album = value.decode()
self.emit(self.EVENT_TRACK_ALBUM)
elif track_attribute == TrackAttributeId.TITLE:
self.track_title = value.decode()
self.emit(self.EVENT_TRACK_TITLE)
elif track_attribute == TrackAttributeId.DURATION:
self.track_duration = float(value.decode())
self.emit(self.EVENT_TRACK_DURATION)
else:
logger.warning(f"received unknown track attribute {track_attribute}")
else:
logger.warning(f"received unknown attribute ID {data[1]}")
+3 -1
View File
@@ -946,7 +946,9 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
await self.pairing_config.delegate.display_number(self.passkey, digits=6)
self.connection.cancel_on_disconnection(
self.pairing_config.delegate.display_number(self.passkey, digits=6)
)
def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
# Prompt the user for the passkey displayed on the peer
+220
View File
@@ -0,0 +1,220 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.ams import (
AmsClient,
EntityId,
PlayerAttributeId,
QueueAttributeId,
TrackAttributeId,
RemoteCommandId,
)
# -----------------------------------------------------------------------------
async def handle_command_client(
ams_client: AmsClient, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
while True:
command = (await reader.readline()).decode("utf-8")
if not command.endswith("\n"):
print("command client terminated")
return
command = command.strip()
try:
if command.upper() in [member.name for member in RemoteCommandId]:
await ams_client.command(RemoteCommandId[command.upper()])
continue
except Exception as error:
writer.write(f"ERROR: {error}\n".encode("utf-8"))
writer.write(f"unknown command {command}\n".encode("utf-8"))
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_ams_client.py <device-config> <transport-spec> '
'<bluetooth-address> <mtu>'
)
print('example: run_ams_client.py device1.json usb:0 E1:CA:72:48:C4:E8 512')
return
device_config, transport_spec, bluetooth_address, mtu = sys.argv[1:]
print('<<< connecting to HCI...')
async with await open_transport(transport_spec) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(
device_config, hci_transport.source, hci_transport.sink
)
await device.power_on()
# Connect to the peer
print(f'=== Connecting to {bluetooth_address}...')
connection = await device.connect(bluetooth_address)
print(f'=== Connected: {connection}')
await connection.encrypt()
peer = Peer(connection)
mtu_int = int(mtu)
if mtu_int:
new_mtu = await peer.request_mtu(mtu_int)
print(f'ATT MTU = {new_mtu}')
ams_client = await AmsClient.for_peer(peer)
if ams_client is None:
print("!!! no AMS service found")
return
# Register event handlers
def on_supported_commands():
print(
color("Supported commands:", "magenta"),
", ".join([command.name for command in ams_client.supported_commands]),
)
ams_client.on(AmsClient.EVENT_SUPPORTED_COMMANDS, on_supported_commands)
def on_player_name():
print(color("Player Name:", "green"), ams_client.player_name)
ams_client.on(AmsClient.EVENT_PLAYER_NAME, on_player_name)
def on_player_playback_info():
print(
color("Playback State:", "green"),
ams_client.player_playback_info.playback_state.name,
)
print(
color("Playback Rate: ", "green"),
ams_client.player_playback_info.playback_rate,
)
print(
color("Elapsed Time: ", "green"),
ams_client.player_playback_info.elapsed_time,
)
ams_client.on(AmsClient.EVENT_PLAYER_PLAYBACK_INFO, on_player_playback_info)
def on_player_volume():
print(color("Volume:", "green"), ams_client.player_volume)
ams_client.on(AmsClient.EVENT_PLAYER_VOLUME, on_player_volume)
def on_queue_count():
print(color("Queue Count:", "yellow"), ams_client.queue_count)
ams_client.on(AmsClient.EVENT_QUEUE_COUNT, on_queue_count)
def on_queue_index():
print(color("Queue Index:", "yellow"), ams_client.queue_index)
ams_client.on(AmsClient.EVENT_QUEUE_INDEX, on_queue_index)
def on_queue_shuffle_mode():
print(
color("Queue Shuffle Mode:", "yellow"),
ams_client.queue_shuffle_mode.name,
)
ams_client.on(AmsClient.EVENT_QUEUE_SHUFFLE_MODE, on_queue_shuffle_mode)
def on_queue_repeat_mode():
print(
color("Queue Repeat Mode:", "yellow"), ams_client.queue_repeat_mode.name
)
ams_client.on(AmsClient.EVENT_QUEUE_REPEAT_MODE, on_queue_repeat_mode)
def on_track_artist():
print(color("Track Artist:", "cyan"), ams_client.track_artist)
ams_client.on(AmsClient.EVENT_TRACK_ARTIST, on_track_artist)
def on_track_album():
print(color("Track Album:", "cyan"), ams_client.track_album)
ams_client.on(AmsClient.EVENT_TRACK_ALBUM, on_track_album)
def on_track_title():
print(color("Track Title:", "cyan"), ams_client.track_title)
ams_client.on(AmsClient.EVENT_TRACK_TITLE, on_track_title)
def on_track_duration():
print(color("Track Duration:", "cyan"), ams_client.track_duration)
ams_client.on(AmsClient.EVENT_TRACK_DURATION, on_track_duration)
# Start the client
await ams_client.start()
# Observe the player, queue and track
await ams_client.observe(
EntityId.PLAYER,
[
PlayerAttributeId.NAME,
PlayerAttributeId.PLAYBACK_INFO,
PlayerAttributeId.VOLUME,
],
)
await ams_client.observe(
EntityId.QUEUE,
[
QueueAttributeId.COUNT,
QueueAttributeId.INDEX,
QueueAttributeId.REPEAT_MODE,
QueueAttributeId.SHUFFLE_MODE,
],
)
await ams_client.observe(
EntityId.TRACK,
[
TrackAttributeId.ALBUM,
TrackAttributeId.ARTIST,
TrackAttributeId.DURATION,
TrackAttributeId.TITLE,
],
)
# Accept a TCP connection to handle commands.
tcp_server = await asyncio.start_server(
lambda reader, writer: handle_command_client(ams_client, reader, writer),
'127.0.0.1',
9000,
)
print("Accepting command client on port 9000")
async with tcp_server:
await tcp_server.serve_forever()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(main())
+3 -1
View File
@@ -7,6 +7,8 @@ name = "bumble"
dynamic = ["version"]
description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation"
readme = "README.md"
license = "Apache-2.0"
license-files = ["LICENSE"]
authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.9"
dependencies = [
@@ -42,7 +44,7 @@ test = [
"coverage >= 6.4",
]
development = [
"black == 24.3",
"black ~= 25.1",
"bt-test-interfaces >= 0.0.6",
"grpcio-tools >= 1.62.1",
"invoke >= 1.7.3",
+9 -9
View File
@@ -237,7 +237,7 @@ async def test_hf_indicator(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtoco
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -281,7 +281,7 @@ async def test_answer(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -307,7 +307,7 @@ async def test_terminate_call(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProto
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_without_calls(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -317,7 +317,7 @@ async def test_query_calls_without_calls(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_with_calls(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
ag.calls.append(
@@ -418,7 +418,7 @@ async def test_speaker_volume(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProto
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_microphone_volume(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
microphone_volume_future = asyncio.get_running_loop().create_future()
@@ -448,7 +448,7 @@ async def test_cli_notification(hfp_connections: tuple[hfp.HfProtocol, hfp.AgPro
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_hf(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
@@ -462,7 +462,7 @@ async def test_voice_recognition_from_hf(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_ag(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
@@ -572,7 +572,7 @@ async def test_sco_setup():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -584,7 +584,7 @@ async def test_hf_batched_response(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
+66 -24
View File
@@ -22,12 +22,8 @@ import random
import pytest
from bumble.core import ProtocolError
from bumble.l2cap import (
L2CAP_Connection_Request,
ClassicChannelSpec,
LeCreditBasedChannelSpec,
)
from .test_utils import TwoDevices
from bumble import l2cap
from .test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
@@ -41,42 +37,53 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_helpers():
psm = L2CAP_Connection_Request.serialize_psm(0x01)
psm = l2cap.L2CAP_Connection_Request.serialize_psm(0x01)
assert psm == bytes([0x01, 0x00])
psm = L2CAP_Connection_Request.serialize_psm(0x1023)
psm = l2cap.L2CAP_Connection_Request.serialize_psm(0x1023)
assert psm == bytes([0x23, 0x10])
psm = L2CAP_Connection_Request.serialize_psm(0x242311)
psm = l2cap.L2CAP_Connection_Request.serialize_psm(0x242311)
assert psm == bytes([0x11, 0x23, 0x24])
(offset, psm) = L2CAP_Connection_Request.parse_psm(
(offset, psm) = l2cap.L2CAP_Connection_Request.parse_psm(
bytes([0x00, 0x01, 0x00, 0x44]), 1
)
assert offset == 3
assert psm == 0x01
(offset, psm) = L2CAP_Connection_Request.parse_psm(
(offset, psm) = l2cap.L2CAP_Connection_Request.parse_psm(
bytes([0x00, 0x23, 0x10, 0x44]), 1
)
assert offset == 3
assert psm == 0x1023
(offset, psm) = L2CAP_Connection_Request.parse_psm(
(offset, psm) = l2cap.L2CAP_Connection_Request.parse_psm(
bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1
)
assert offset == 4
assert psm == 0x242311
rq = L2CAP_Connection_Request(psm=0x01, source_cid=0x44, identifier=0x88)
rq = l2cap.L2CAP_Connection_Request(psm=0x01, source_cid=0x44, identifier=0x88)
brq = bytes(rq)
srq = L2CAP_Connection_Request.from_bytes(brq)
assert isinstance(srq, L2CAP_Connection_Request)
srq = l2cap.L2CAP_Connection_Request.from_bytes(brq)
assert isinstance(srq, l2cap.L2CAP_Connection_Request)
assert srq.psm == rq.psm
assert srq.source_cid == rq.source_cid
assert srq.identifier == rq.identifier
# -----------------------------------------------------------------------------
def test_unimplemented_control_frame():
frame = l2cap.L2CAP_Control_Frame(identifier=1)
frame.code = 0xFF
frame.payload = b'123456'
parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame))
assert parsed.code == 0xFF
assert parsed.payload == b'123456'
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_basic_connection():
@@ -87,7 +94,7 @@ async def test_basic_connection():
# Check that if there's no one listening, we can't connect
with pytest.raises(ProtocolError):
l2cap_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(psm)
spec=l2cap.LeCreditBasedChannelSpec(psm)
)
# Now add a listener
@@ -104,10 +111,10 @@ async def test_basic_connection():
channel.sink = on_data
devices.devices[1].create_l2cap_server(
spec=LeCreditBasedChannelSpec(psm=1234), handler=on_coc
spec=l2cap.LeCreditBasedChannelSpec(psm=1234), handler=on_coc
)
l2cap_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(psm)
spec=l2cap.LeCreditBasedChannelSpec(psm)
)
messages = (bytes([1, 2, 3]), bytes([4, 5, 6]), bytes(10000))
@@ -137,6 +144,41 @@ async def test_basic_connection():
assert sent_bytes == received_bytes
# -----------------------------------------------------------------------------
@pytest.mark.parametrize("info_type,", list(l2cap.L2CAP_Information_Request.InfoType))
async def test_l2cap_information_request(monkeypatch, info_type):
# TODO: Replace handlers with API when implemented
devices = await TwoDevices.create_with_connection()
# Register handlers
info_rsp = list[l2cap.L2CAP_Information_Response]()
def on_l2cap_information_response(connection, cid, frame):
info_rsp.append(frame)
assert (connection := devices.connections[0])
channel_manager = devices[0].l2cap_channel_manager
monkeypatch.setattr(
channel_manager,
'on_l2cap_information_response',
on_l2cap_information_response,
raising=False,
)
channel_manager.send_control_frame(
connection,
l2cap.L2CAP_LE_SIGNALING_CID,
l2cap.L2CAP_Information_Request(
identifier=channel_manager.next_identifier(connection),
info_type=info_type,
),
)
await async_barrier()
response = info_rsp[0]
assert response.result == l2cap.L2CAP_Information_Response.Result.SUCCESS
# -----------------------------------------------------------------------------
async def transfer_payload(max_credits, mtu, mps):
devices = TwoDevices()
@@ -151,11 +193,11 @@ async def transfer_payload(max_credits, mtu, mps):
channel.sink = on_data
server = devices.devices[1].create_l2cap_server(
spec=LeCreditBasedChannelSpec(max_credits=max_credits, mtu=mtu, mps=mps),
spec=l2cap.LeCreditBasedChannelSpec(max_credits=max_credits, mtu=mtu, mps=mps),
handler=on_coc,
)
l2cap_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(server.psm)
spec=l2cap.LeCreditBasedChannelSpec(server.psm)
)
messages = [bytes([1, 2, 3, 4, 5, 6, 7]) * x for x in (3, 10, 100, 789)]
@@ -205,10 +247,10 @@ async def test_bidirectional_transfer():
client_received.append(data)
server = devices.devices[1].create_l2cap_server(
spec=LeCreditBasedChannelSpec(), handler=on_server_coc
spec=l2cap.LeCreditBasedChannelSpec(), handler=on_server_coc
)
client_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(server.psm)
spec=l2cap.LeCreditBasedChannelSpec(server.psm)
)
client_channel.sink = on_client_data
@@ -242,10 +284,10 @@ async def test_mtu():
channel.on('open', lambda: on_channel_open(channel))
server = devices.devices[1].create_l2cap_server(
spec=ClassicChannelSpec(mtu=345), handler=on_channel
spec=l2cap.ClassicChannelSpec(mtu=345), handler=on_channel
)
client_channel = await devices.connections[0].create_l2cap_channel(
spec=ClassicChannelSpec(server.psm, mtu=456)
spec=l2cap.ClassicChannelSpec(server.psm, mtu=456)
)
assert client_channel.peer_mtu == 345
+27 -111
View File
@@ -23,13 +23,9 @@ import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from bumble.controller import Controller
from bumble.core import PhysicalTransport
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.device import Peer
from bumble.gatt import Service, Characteristic
from bumble.transport.common import AsyncPipeSink
from bumble.pairing import PairingConfig, PairingDelegate
from bumble.smp import (
SMP_PAIRING_NOT_SUPPORTED_ERROR,
@@ -38,9 +34,10 @@ from bumble.smp import (
OobLegacyContext,
)
from bumble.core import ProtocolError
from bumble.keys import PairingKeys
from bumble.hci import Role
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
@@ -49,63 +46,26 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = LocalLink()
self.controllers = [
Controller('C1', link=self.link, public_address=addresses[0]),
Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
Device(
address=addresses[0],
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address=addresses[1],
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]
self.paired = [
asyncio.get_event_loop().create_future(),
asyncio.get_event_loop().create_future(),
]
def on_connection(self, which, connection):
self.connections[which] = connection
def on_paired(self, which: int, keys: PairingKeys):
self.paired[which].set_result(keys)
@pytest.mark.asyncio
async def test_self_connection():
two_devices = TwoDevices()
await two_devices.setup_connection()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_connection():
# Create two devices, each with a controller, attached to the same link
async def test_self_disconnection():
two_devices = TwoDevices()
await two_devices.setup_connection()
await two_devices.connections[0].disconnect()
assert two_devices.connections[0] is None
assert two_devices.connections[1] is None
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
two_devices = TwoDevices()
await two_devices.setup_connection()
await two_devices.connections[1].disconnect()
assert two_devices.connections[0] is None
assert two_devices.connections[1] is None
# -----------------------------------------------------------------------------
@@ -115,24 +75,14 @@ async def test_self_connection():
(Role.CENTRAL, Role.PERIPHERAL),
)
async def test_self_classic_connection(responder_role):
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
await two_devices.setup_connection()
# Connect the two devices
await asyncio.gather(
@@ -203,15 +153,9 @@ async def test_self_gatt():
s4 = Service('3A12C182-14E2-4FE0-8C5B-65D7C569F9DB', [], included_services=[s2, s3])
two_devices.devices[1].add_services([s1, s2, s4])
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
connection = await two_devices.devices[0].connect(
two_devices.devices[1].random_address
)
peer = Peer(connection)
await two_devices.setup_connection()
peer = Peer(two_devices.connections[0])
bogus_uuid = 'A0AA6007-0B48-4BBE-80AC-0DE9AAF541EA'
result = await peer.discover_services([bogus_uuid])
@@ -264,15 +208,9 @@ async def test_self_gatt_long_read():
service = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', characteristics)
two_devices.devices[1].add_service(service)
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
connection = await two_devices.devices[0].connect(
two_devices.devices[1].random_address
)
peer = Peer(connection)
await two_devices.setup_connection()
peer = Peer(two_devices.connections[0])
result = await peer.discover_service(service.uuid)
assert len(result) == 1
@@ -289,25 +227,12 @@ async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Connect the two devices
connection = await two_devices.devices[0].connect(
two_devices.devices[1].random_address
)
await two_devices.setup_connection()
connection = two_devices.connections[0]
assert not connection.is_encrypted
# Attach connection listeners
# Attach pairing listeners
two_devices.connections[0].on(
'pairing', lambda keys: two_devices.on_paired(0, keys)
)
@@ -488,23 +413,13 @@ async def test_self_smp_over_classic():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Enable Classic connections
two_devices.devices[0].classic_enabled = True
two_devices.devices[1].classic_enabled = True
# Start
# Connect the two devices
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await asyncio.gather(
two_devices.devices[0].connect(
two_devices.devices[1].public_address, transport=PhysicalTransport.BR_EDR
@@ -650,6 +565,7 @@ async def test_self_smp_oob_legacy():
# -----------------------------------------------------------------------------
async def run_test_self():
await test_self_connection()
await test_self_disconnection()
await test_self_gatt()
await test_self_gatt_long_read()
await test_self_smp()
+17 -10
View File
@@ -25,6 +25,7 @@ from bumble.device import Device, Connection
from bumble.host import Host
from bumble.transport.common import AsyncPipeSink
from bumble.hci import Address
from bumble.keys import PairingKeys
# -----------------------------------------------------------------------------
@@ -51,16 +52,6 @@ class TwoDevices:
),
]
self.paired = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
def on_paired(self, which, keys):
self.paired[which] = keys
async def setup_connection(self) -> None:
# Attach listeners
self.devices[0].on(
'connection', lambda connection: self.on_connection(0, connection)
)
@@ -68,6 +59,22 @@ class TwoDevices:
'connection', lambda connection: self.on_connection(1, connection)
)
self.paired = [
asyncio.get_event_loop().create_future(),
asyncio.get_event_loop().create_future(),
]
def on_connection(self, which, connection):
self.connections[which] = connection
connection.on('disconnection', lambda code: self.on_disconnection(which))
def on_disconnection(self, which):
self.connections[which] = None
def on_paired(self, which: int, keys: PairingKeys) -> None:
self.paired[which].set_result(keys)
async def setup_connection(self) -> None:
# Start
await self.devices[0].power_on()
await self.devices[1].power_on()