Compare commits

...

11 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod c034297bc0 update to black formatter 25.1 2025-08-02 21:11:34 -07:00
Gilles Boccon-Gibod a1eff958e6 do not wait for display 2025-08-02 21:10:45 -07:00
Gilles Boccon-Gibod efdc770fde Merge pull request #737 from leifdreizler/fix-spdx-license
Update license field to use proper SPDX identifier
2025-08-02 11:22:58 -07:00
Leif 357d7f9c22 Update pyproject.toml 2025-08-02 08:18:36 -04:00
Leif Dreizler 3bc08b4e0d Update license field to use proper SPDX identifier
This changes the license field to be a valid [SPDX identifier](https://spdx.org/licenses) aligning with [PEP 639](https://peps.python.org/pep-0639/#project-source-metadata). This populates the `license_expression` field in the PyPI API and is used by downstream tools including deps.dev

These changes were generated by Claude after reviewing the license and manifest files in your repository, but opened and reviewed by me. Please let me know if the analysis is incorrect and thanks for being an OSS maintainer.
2025-08-01 20:19:25 -04:00
Gilles Boccon-Gibod 1dc0950177 Merge pull request #730 from google/gbg/apple-media-service
basic AMS implementation
2025-07-29 22:34:25 -07:00
zxzxwu df0fd74533 Merge pull request #733 from zxzxwu/l2cap
Fix L2CAP_Control_Frame errors
2025-07-30 13:12:44 +08:00
Josh Wu 822f97fa84 Fix L2CAP errors 2025-07-30 12:00:20 +08:00
Gilles Boccon-Gibod 4a6b0ef840 Merge pull request #732 from google/gbg/722
fix #722
2025-07-29 10:50:02 -07:00
Gilles Boccon-Gibod bf8a2cdcb5 add discrete command methods 2025-07-26 20:24:55 -07:00
Gilles Boccon-Gibod 4bf7448a01 basic AMS implementation 2025-07-22 14:57:52 -07:00
10 changed files with 730 additions and 71 deletions
+6 -4
View File
@@ -94,15 +94,17 @@
"ycursor"
],
"[python]": {
"editor.rulers": [88]
"editor.rulers": [88],
"editor.defaultFormatter": "ms-python.black-formatter"
},
"python.formatting.provider": "black",
"pylint.importStrategy": "useBundled",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python-envs.defaultEnvManager": "ms-python.python:system",
"python-envs.pythonProjects": []
"python-envs.pythonProjects": [],
"python.terminal.launchArgs": [
]
}
+1 -1
View File
@@ -1117,7 +1117,7 @@ class Protocol(utils.EventEmitter):
@staticmethod
def _check_vendor_dependent_frame(
frame: Union[avc.VendorDependentCommandFrame, avc.VendorDependentResponseFrame]
frame: Union[avc.VendorDependentCommandFrame, avc.VendorDependentResponseFrame],
) -> bool:
if frame.company_id != AVRCP_BLUETOOTH_SIG_COMPANY_ID:
logger.debug("unsupported company id, ignoring")
+17 -30
View File
@@ -213,7 +213,7 @@ class L2CAP_Control_Frame:
fields: ClassVar[hci.Fields] = ()
code: int = dataclasses.field(default=0, init=False)
name: str = dataclasses.field(default='', init=False)
_data: Optional[bytes] = dataclasses.field(default=None, init=False)
_payload: Optional[bytes] = dataclasses.field(default=None, init=False)
identifier: int
@@ -223,7 +223,8 @@ class L2CAP_Control_Frame:
subclass = L2CAP_Control_Frame.classes.get(code)
if subclass is None:
instance = L2CAP_Control_Frame(pdu)
instance = L2CAP_Control_Frame(identifier=identifier)
instance.payload = pdu[4:]
instance.code = CommandCode(code)
instance.name = instance.code.name
return instance
@@ -232,11 +233,11 @@ class L2CAP_Control_Frame:
identifier=identifier,
)
frame.identifier = identifier
frame.data = pdu[4:]
if length != len(pdu):
frame.payload = pdu[4:]
if length != len(frame.payload):
logger.warning(
color(
f'!!! length mismatch: expected {len(pdu) - 4} but got {length}',
f'!!! length mismatch: expected {length} but got {len(frame.payload)}',
'red',
)
)
@@ -273,34 +274,20 @@ class L2CAP_Control_Frame:
return subclass
def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None:
self.identifier = kwargs.get('identifier', 0)
if self.fields:
if kwargs:
hci.HCI_Object.init_from_fields(self, self.fields, kwargs)
if pdu is None:
data = hci.HCI_Object.dict_to_bytes(kwargs, self.fields)
pdu = (
bytes([self.code, self.identifier])
+ struct.pack('<H', len(data))
+ data
)
self.data = pdu[4:] if pdu else b''
@property
def data(self) -> bytes:
if self._data is None:
self._data = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._data
def payload(self) -> bytes:
if self._payload is None:
self._payload = hci.HCI_Object.dict_to_bytes(self.__dict__, self.fields)
return self._payload
@data.setter
def data(self, parameters: bytes) -> None:
self._data = parameters
@payload.setter
def payload(self, payload: bytes) -> None:
self._payload = payload
def __bytes__(self) -> bytes:
return (
struct.pack('<BBH', self.code, self.identifier, len(self.data) + 4)
+ self.data
struct.pack('<BBH', self.code, self.identifier, len(self.payload))
+ self.payload
)
def __str__(self) -> str:
@@ -308,8 +295,8 @@ class L2CAP_Control_Frame:
if fields := getattr(self, 'fields', None):
result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ')
else:
if len(self.data) > 1:
result += f': {self.data.hex()}'
if len(self.payload) > 1:
result += f': {self.payload.hex()}'
return result
+1 -1
View File
@@ -49,7 +49,7 @@ _SERVICERS_HOOKS: list[Callable[[PandoraDevice, Config, grpc.aio.Server], None]]
def register_servicer_hook(
hook: Callable[[PandoraDevice, Config, grpc.aio.Server], None]
hook: Callable[[PandoraDevice, Config, grpc.aio.Server], None],
) -> None:
_SERVICERS_HOOKS.append(hook)
+404
View File
@@ -0,0 +1,404 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Apple Media Service (AMS).
"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import dataclasses
import enum
import logging
from typing import Optional, Iterable, Union
from bumble.device import Peer
from bumble.gatt import (
Characteristic,
GATT_AMS_SERVICE,
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC,
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC,
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC,
TemplateService,
)
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
from bumble import utils
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Protocol
# -----------------------------------------------------------------------------
class RemoteCommandId(utils.OpenIntEnum):
PLAY = 0
PAUSE = 1
TOGGLE_PLAY_PAUSE = 2
NEXT_TRACK = 3
PREVIOUS_TRACK = 4
VOLUME_UP = 5
VOLUME_DOWN = 6
ADVANCE_REPEAT_MODE = 7
ADVANCE_SHUFFLE_MODE = 8
SKIP_FORWARD = 9
SKIP_BACKWARD = 10
LIKE_TRACK = 11
DISLIKE_TRACK = 12
BOOKMARK_TRACK = 13
class EntityId(utils.OpenIntEnum):
PLAYER = 0
QUEUE = 1
TRACK = 2
class ActionId(utils.OpenIntEnum):
POSITIVE = 0
NEGATIVE = 1
class EntityUpdateFlags(enum.IntFlag):
TRUNCATED = 1
class PlayerAttributeId(utils.OpenIntEnum):
NAME = 0
PLAYBACK_INFO = 1
VOLUME = 2
class QueueAttributeId(utils.OpenIntEnum):
INDEX = 0
COUNT = 1
SHUFFLE_MODE = 2
REPEAT_MODE = 3
class ShuffleMode(utils.OpenIntEnum):
OFF = 0
ONE = 1
ALL = 2
class RepeatMode(utils.OpenIntEnum):
OFF = 0
ONE = 1
ALL = 2
class TrackAttributeId(utils.OpenIntEnum):
ARTIST = 0
ALBUM = 1
TITLE = 2
DURATION = 3
class PlaybackState(utils.OpenIntEnum):
PAUSED = 0
PLAYING = 1
REWINDING = 2
FAST_FORWARDING = 3
@dataclasses.dataclass
class PlaybackInfo:
playback_state: PlaybackState = PlaybackState.PAUSED
playback_rate: float = 1.0
elapsed_time: float = 0.0
# -----------------------------------------------------------------------------
# GATT Server-side
# -----------------------------------------------------------------------------
class Ams(TemplateService):
UUID = GATT_AMS_SERVICE
remote_command_characteristic: Characteristic
entity_update_characteristic: Characteristic
entity_attribute_characteristic: Characteristic
def __init__(self) -> None:
# TODO not the final implementation
self.remote_command_characteristic = Characteristic(
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC,
Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.Permissions.WRITEABLE,
)
# TODO not the final implementation
self.entity_update_characteristic = Characteristic(
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC,
Characteristic.Properties.NOTIFY | Characteristic.Properties.WRITE,
Characteristic.Permissions.WRITEABLE,
)
# TODO not the final implementation
self.entity_attribute_characteristic = Characteristic(
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC,
Characteristic.Properties.READ
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
Characteristic.Permissions.WRITEABLE | Characteristic.Permissions.READABLE,
)
super().__init__(
[
self.remote_command_characteristic,
self.entity_update_characteristic,
self.entity_attribute_characteristic,
]
)
# -----------------------------------------------------------------------------
# GATT Client-side
# -----------------------------------------------------------------------------
class AmsProxy(ProfileServiceProxy):
SERVICE_CLASS = Ams
# NOTE: these don't use adapters, because the format for write and notifications
# are different.
remote_command: CharacteristicProxy[bytes]
entity_update: CharacteristicProxy[bytes]
entity_attribute: CharacteristicProxy[bytes]
def __init__(self, service_proxy: ServiceProxy):
self.remote_command = service_proxy.get_required_characteristic_by_uuid(
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC
)
self.entity_update = service_proxy.get_required_characteristic_by_uuid(
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC
)
self.entity_attribute = service_proxy.get_required_characteristic_by_uuid(
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC
)
class AmsClient(utils.EventEmitter):
EVENT_SUPPORTED_COMMANDS = "supported_commands"
EVENT_PLAYER_NAME = "player_name"
EVENT_PLAYER_PLAYBACK_INFO = "player_playback_info"
EVENT_PLAYER_VOLUME = "player_volume"
EVENT_QUEUE_COUNT = "queue_count"
EVENT_QUEUE_INDEX = "queue_index"
EVENT_QUEUE_SHUFFLE_MODE = "queue_shuffle_mode"
EVENT_QUEUE_REPEAT_MODE = "queue_repeat_mode"
EVENT_TRACK_ARTIST = "track_artist"
EVENT_TRACK_ALBUM = "track_album"
EVENT_TRACK_TITLE = "track_title"
EVENT_TRACK_DURATION = "track_duration"
supported_commands: set[RemoteCommandId]
player_name: str = ""
player_playback_info: PlaybackInfo = PlaybackInfo(PlaybackState.PAUSED, 0.0, 0.0)
player_volume: float = 1.0
queue_count: int = 0
queue_index: int = 0
queue_shuffle_mode: ShuffleMode = ShuffleMode.OFF
queue_repeat_mode: RepeatMode = RepeatMode.OFF
track_artist: str = ""
track_album: str = ""
track_title: str = ""
track_duration: float = 0.0
def __init__(self, ams_proxy: AmsProxy) -> None:
super().__init__()
self._ams_proxy = ams_proxy
self._started = False
self._read_attribute_semaphore = asyncio.Semaphore()
self.supported_commands = set()
@classmethod
async def for_peer(cls, peer: Peer) -> Optional[AmsClient]:
ams_proxy = await peer.discover_service_and_create_proxy(AmsProxy)
if ams_proxy is None:
return None
return cls(ams_proxy)
async def start(self) -> None:
logger.debug("subscribing to remote command characteristic")
await self._ams_proxy.remote_command.subscribe(
self._on_remote_command_notification
)
logger.debug("subscribing to entity update characteristic")
await self._ams_proxy.entity_update.subscribe(
lambda data: utils.AsyncRunner.spawn(
self._on_entity_update_notification(data)
)
)
self._started = True
async def stop(self) -> None:
await self._ams_proxy.remote_command.unsubscribe(
self._on_remote_command_notification
)
await self._ams_proxy.entity_update.unsubscribe(
self._on_entity_update_notification
)
self._started = False
async def observe(
self,
entity: EntityId,
attributes: Iterable[
Union[PlayerAttributeId, QueueAttributeId, TrackAttributeId]
],
) -> None:
await self._ams_proxy.entity_update.write_value(
bytes([entity] + list(attributes)), with_response=True
)
async def command(self, command: RemoteCommandId) -> None:
await self._ams_proxy.remote_command.write_value(
bytes([command]), with_response=True
)
async def play(self) -> None:
await self.command(RemoteCommandId.PLAY)
async def pause(self) -> None:
await self.command(RemoteCommandId.PAUSE)
async def toggle_play_pause(self) -> None:
await self.command(RemoteCommandId.TOGGLE_PLAY_PAUSE)
async def next_track(self) -> None:
await self.command(RemoteCommandId.NEXT_TRACK)
async def previous_track(self) -> None:
await self.command(RemoteCommandId.PREVIOUS_TRACK)
async def volume_up(self) -> None:
await self.command(RemoteCommandId.VOLUME_UP)
async def volume_down(self) -> None:
await self.command(RemoteCommandId.VOLUME_DOWN)
async def advance_repeat_mode(self) -> None:
await self.command(RemoteCommandId.ADVANCE_REPEAT_MODE)
async def advance_shuffle_mode(self) -> None:
await self.command(RemoteCommandId.ADVANCE_SHUFFLE_MODE)
async def skip_forward(self) -> None:
await self.command(RemoteCommandId.SKIP_FORWARD)
async def skip_backward(self) -> None:
await self.command(RemoteCommandId.SKIP_BACKWARD)
async def like_track(self) -> None:
await self.command(RemoteCommandId.LIKE_TRACK)
async def dislike_track(self) -> None:
await self.command(RemoteCommandId.DISLIKE_TRACK)
async def bookmark_track(self) -> None:
await self.command(RemoteCommandId.BOOKMARK_TRACK)
def _on_remote_command_notification(self, data: bytes) -> None:
supported_commands = [RemoteCommandId(command) for command in data]
logger.debug(
f"supported commands: {[command.name for command in supported_commands]}"
)
for command in supported_commands:
self.supported_commands.add(command)
self.emit(self.EVENT_SUPPORTED_COMMANDS)
async def _on_entity_update_notification(self, data: bytes) -> None:
entity = EntityId(data[0])
flags = EntityUpdateFlags(data[2])
value = data[3:]
if flags & EntityUpdateFlags.TRUNCATED:
logger.debug("truncated attribute, fetching full value")
# Write the entity and attribute we're interested in
# (protected by a semaphore, so that we only read one attribute at a time)
async with self._read_attribute_semaphore:
await self._ams_proxy.entity_attribute.write_value(
data[:2], with_response=True
)
value = await self._ams_proxy.entity_attribute.read_value()
if entity == EntityId.PLAYER:
player_attribute = PlayerAttributeId(data[1])
if player_attribute == PlayerAttributeId.NAME:
self.player_name = value.decode()
self.emit(self.EVENT_PLAYER_NAME)
elif player_attribute == PlayerAttributeId.PLAYBACK_INFO:
playback_state_str, playback_rate_str, elapsed_time_str = (
value.decode().split(",")
)
self.player_playback_info = PlaybackInfo(
PlaybackState(int(playback_state_str)),
float(playback_rate_str),
float(elapsed_time_str),
)
self.emit(self.EVENT_PLAYER_PLAYBACK_INFO)
elif player_attribute == PlayerAttributeId.VOLUME:
self.player_volume = float(value.decode())
self.emit(self.EVENT_PLAYER_VOLUME)
else:
logger.warning(f"received unknown player attribute {player_attribute}")
elif entity == EntityId.QUEUE:
queue_attribute = QueueAttributeId(data[1])
if queue_attribute == QueueAttributeId.COUNT:
self.queue_count = int(value)
self.emit(self.EVENT_QUEUE_COUNT)
elif queue_attribute == QueueAttributeId.INDEX:
self.queue_index = int(value)
self.emit(self.EVENT_QUEUE_INDEX)
elif queue_attribute == QueueAttributeId.REPEAT_MODE:
self.queue_repeat_mode = RepeatMode(int(value))
self.emit(self.EVENT_QUEUE_REPEAT_MODE)
elif queue_attribute == QueueAttributeId.SHUFFLE_MODE:
self.queue_shuffle_mode = ShuffleMode(int(value))
self.emit(self.EVENT_QUEUE_SHUFFLE_MODE)
else:
logger.warning(f"received unknown queue attribute {queue_attribute}")
elif entity == EntityId.TRACK:
track_attribute = TrackAttributeId(data[1])
if track_attribute == TrackAttributeId.ARTIST:
self.track_artist = value.decode()
self.emit(self.EVENT_TRACK_ARTIST)
elif track_attribute == TrackAttributeId.ALBUM:
self.track_album = value.decode()
self.emit(self.EVENT_TRACK_ALBUM)
elif track_attribute == TrackAttributeId.TITLE:
self.track_title = value.decode()
self.emit(self.EVENT_TRACK_TITLE)
elif track_attribute == TrackAttributeId.DURATION:
self.track_duration = float(value.decode())
self.emit(self.EVENT_TRACK_DURATION)
else:
logger.warning(f"received unknown track attribute {track_attribute}")
else:
logger.warning(f"received unknown attribute ID {data[1]}")
+3 -1
View File
@@ -946,7 +946,9 @@ class Session:
self.tk = self.passkey.to_bytes(16, byteorder='little')
logger.debug(f'TK from passkey = {self.tk.hex()}')
await self.pairing_config.delegate.display_number(self.passkey, digits=6)
self.connection.cancel_on_disconnection(
self.pairing_config.delegate.display_number(self.passkey, digits=6)
)
def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
# Prompt the user for the passkey displayed on the peer
+220
View File
@@ -0,0 +1,220 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import sys
import os
import logging
from bumble.colors import color
from bumble.device import Device, Peer
from bumble.transport import open_transport
from bumble.profiles.ams import (
AmsClient,
EntityId,
PlayerAttributeId,
QueueAttributeId,
TrackAttributeId,
RemoteCommandId,
)
# -----------------------------------------------------------------------------
async def handle_command_client(
ams_client: AmsClient, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
) -> None:
while True:
command = (await reader.readline()).decode("utf-8")
if not command.endswith("\n"):
print("command client terminated")
return
command = command.strip()
try:
if command.upper() in [member.name for member in RemoteCommandId]:
await ams_client.command(RemoteCommandId[command.upper()])
continue
except Exception as error:
writer.write(f"ERROR: {error}\n".encode("utf-8"))
writer.write(f"unknown command {command}\n".encode("utf-8"))
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_ams_client.py <device-config> <transport-spec> '
'<bluetooth-address> <mtu>'
)
print('example: run_ams_client.py device1.json usb:0 E1:CA:72:48:C4:E8 512')
return
device_config, transport_spec, bluetooth_address, mtu = sys.argv[1:]
print('<<< connecting to HCI...')
async with await open_transport(transport_spec) as hci_transport:
print('<<< connected')
# Create a device to manage the host, with a custom listener
device = Device.from_config_file_with_hci(
device_config, hci_transport.source, hci_transport.sink
)
await device.power_on()
# Connect to the peer
print(f'=== Connecting to {bluetooth_address}...')
connection = await device.connect(bluetooth_address)
print(f'=== Connected: {connection}')
await connection.encrypt()
peer = Peer(connection)
mtu_int = int(mtu)
if mtu_int:
new_mtu = await peer.request_mtu(mtu_int)
print(f'ATT MTU = {new_mtu}')
ams_client = await AmsClient.for_peer(peer)
if ams_client is None:
print("!!! no AMS service found")
return
# Register event handlers
def on_supported_commands():
print(
color("Supported commands:", "magenta"),
", ".join([command.name for command in ams_client.supported_commands]),
)
ams_client.on(AmsClient.EVENT_SUPPORTED_COMMANDS, on_supported_commands)
def on_player_name():
print(color("Player Name:", "green"), ams_client.player_name)
ams_client.on(AmsClient.EVENT_PLAYER_NAME, on_player_name)
def on_player_playback_info():
print(
color("Playback State:", "green"),
ams_client.player_playback_info.playback_state.name,
)
print(
color("Playback Rate: ", "green"),
ams_client.player_playback_info.playback_rate,
)
print(
color("Elapsed Time: ", "green"),
ams_client.player_playback_info.elapsed_time,
)
ams_client.on(AmsClient.EVENT_PLAYER_PLAYBACK_INFO, on_player_playback_info)
def on_player_volume():
print(color("Volume:", "green"), ams_client.player_volume)
ams_client.on(AmsClient.EVENT_PLAYER_VOLUME, on_player_volume)
def on_queue_count():
print(color("Queue Count:", "yellow"), ams_client.queue_count)
ams_client.on(AmsClient.EVENT_QUEUE_COUNT, on_queue_count)
def on_queue_index():
print(color("Queue Index:", "yellow"), ams_client.queue_index)
ams_client.on(AmsClient.EVENT_QUEUE_INDEX, on_queue_index)
def on_queue_shuffle_mode():
print(
color("Queue Shuffle Mode:", "yellow"),
ams_client.queue_shuffle_mode.name,
)
ams_client.on(AmsClient.EVENT_QUEUE_SHUFFLE_MODE, on_queue_shuffle_mode)
def on_queue_repeat_mode():
print(
color("Queue Repeat Mode:", "yellow"), ams_client.queue_repeat_mode.name
)
ams_client.on(AmsClient.EVENT_QUEUE_REPEAT_MODE, on_queue_repeat_mode)
def on_track_artist():
print(color("Track Artist:", "cyan"), ams_client.track_artist)
ams_client.on(AmsClient.EVENT_TRACK_ARTIST, on_track_artist)
def on_track_album():
print(color("Track Album:", "cyan"), ams_client.track_album)
ams_client.on(AmsClient.EVENT_TRACK_ALBUM, on_track_album)
def on_track_title():
print(color("Track Title:", "cyan"), ams_client.track_title)
ams_client.on(AmsClient.EVENT_TRACK_TITLE, on_track_title)
def on_track_duration():
print(color("Track Duration:", "cyan"), ams_client.track_duration)
ams_client.on(AmsClient.EVENT_TRACK_DURATION, on_track_duration)
# Start the client
await ams_client.start()
# Observe the player, queue and track
await ams_client.observe(
EntityId.PLAYER,
[
PlayerAttributeId.NAME,
PlayerAttributeId.PLAYBACK_INFO,
PlayerAttributeId.VOLUME,
],
)
await ams_client.observe(
EntityId.QUEUE,
[
QueueAttributeId.COUNT,
QueueAttributeId.INDEX,
QueueAttributeId.REPEAT_MODE,
QueueAttributeId.SHUFFLE_MODE,
],
)
await ams_client.observe(
EntityId.TRACK,
[
TrackAttributeId.ALBUM,
TrackAttributeId.ARTIST,
TrackAttributeId.DURATION,
TrackAttributeId.TITLE,
],
)
# Accept a TCP connection to handle commands.
tcp_server = await asyncio.start_server(
lambda reader, writer: handle_command_client(ams_client, reader, writer),
'127.0.0.1',
9000,
)
print("Accepting command client on port 9000")
async with tcp_server:
await tcp_server.serve_forever()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(main())
+3 -1
View File
@@ -7,6 +7,8 @@ name = "bumble"
dynamic = ["version"]
description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation"
readme = "README.md"
license = "Apache-2.0"
license-files = ["LICENSE"]
authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.9"
dependencies = [
@@ -42,7 +44,7 @@ test = [
"coverage >= 6.4",
]
development = [
"black == 24.3",
"black ~= 25.1",
"bt-test-interfaces >= 0.0.6",
"grpcio-tools >= 1.62.1",
"invoke >= 1.7.3",
+9 -9
View File
@@ -237,7 +237,7 @@ async def test_hf_indicator(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtoco
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_codec_negotiation(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -281,7 +281,7 @@ async def test_answer(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_reject_incoming_call(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -307,7 +307,7 @@ async def test_terminate_call(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProto
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_without_calls(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -317,7 +317,7 @@ async def test_query_calls_without_calls(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_query_calls_with_calls(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
ag.calls.append(
@@ -418,7 +418,7 @@ async def test_speaker_volume(hfp_connections: tuple[hfp.HfProtocol, hfp.AgProto
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_microphone_volume(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
microphone_volume_future = asyncio.get_running_loop().create_future()
@@ -448,7 +448,7 @@ async def test_cli_notification(hfp_connections: tuple[hfp.HfProtocol, hfp.AgPro
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_hf(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
@@ -462,7 +462,7 @@ async def test_voice_recognition_from_hf(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_voice_recognition_from_ag(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
voice_recognition_future = asyncio.get_running_loop().create_future()
@@ -572,7 +572,7 @@ async def test_sco_setup():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_hf_batched_response(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
@@ -584,7 +584,7 @@ async def test_hf_batched_response(
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ag_batched_commands(
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol]
hfp_connections: tuple[hfp.HfProtocol, hfp.AgProtocol],
):
hf, ag = hfp_connections
+66 -24
View File
@@ -22,12 +22,8 @@ import random
import pytest
from bumble.core import ProtocolError
from bumble.l2cap import (
L2CAP_Connection_Request,
ClassicChannelSpec,
LeCreditBasedChannelSpec,
)
from .test_utils import TwoDevices
from bumble import l2cap
from .test_utils import TwoDevices, async_barrier
# -----------------------------------------------------------------------------
@@ -41,42 +37,53 @@ logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_helpers():
psm = L2CAP_Connection_Request.serialize_psm(0x01)
psm = l2cap.L2CAP_Connection_Request.serialize_psm(0x01)
assert psm == bytes([0x01, 0x00])
psm = L2CAP_Connection_Request.serialize_psm(0x1023)
psm = l2cap.L2CAP_Connection_Request.serialize_psm(0x1023)
assert psm == bytes([0x23, 0x10])
psm = L2CAP_Connection_Request.serialize_psm(0x242311)
psm = l2cap.L2CAP_Connection_Request.serialize_psm(0x242311)
assert psm == bytes([0x11, 0x23, 0x24])
(offset, psm) = L2CAP_Connection_Request.parse_psm(
(offset, psm) = l2cap.L2CAP_Connection_Request.parse_psm(
bytes([0x00, 0x01, 0x00, 0x44]), 1
)
assert offset == 3
assert psm == 0x01
(offset, psm) = L2CAP_Connection_Request.parse_psm(
(offset, psm) = l2cap.L2CAP_Connection_Request.parse_psm(
bytes([0x00, 0x23, 0x10, 0x44]), 1
)
assert offset == 3
assert psm == 0x1023
(offset, psm) = L2CAP_Connection_Request.parse_psm(
(offset, psm) = l2cap.L2CAP_Connection_Request.parse_psm(
bytes([0x00, 0x11, 0x23, 0x24, 0x44]), 1
)
assert offset == 4
assert psm == 0x242311
rq = L2CAP_Connection_Request(psm=0x01, source_cid=0x44, identifier=0x88)
rq = l2cap.L2CAP_Connection_Request(psm=0x01, source_cid=0x44, identifier=0x88)
brq = bytes(rq)
srq = L2CAP_Connection_Request.from_bytes(brq)
assert isinstance(srq, L2CAP_Connection_Request)
srq = l2cap.L2CAP_Connection_Request.from_bytes(brq)
assert isinstance(srq, l2cap.L2CAP_Connection_Request)
assert srq.psm == rq.psm
assert srq.source_cid == rq.source_cid
assert srq.identifier == rq.identifier
# -----------------------------------------------------------------------------
def test_unimplemented_control_frame():
frame = l2cap.L2CAP_Control_Frame(identifier=1)
frame.code = 0xFF
frame.payload = b'123456'
parsed = l2cap.L2CAP_Control_Frame.from_bytes(bytes(frame))
assert parsed.code == 0xFF
assert parsed.payload == b'123456'
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_basic_connection():
@@ -87,7 +94,7 @@ async def test_basic_connection():
# Check that if there's no one listening, we can't connect
with pytest.raises(ProtocolError):
l2cap_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(psm)
spec=l2cap.LeCreditBasedChannelSpec(psm)
)
# Now add a listener
@@ -104,10 +111,10 @@ async def test_basic_connection():
channel.sink = on_data
devices.devices[1].create_l2cap_server(
spec=LeCreditBasedChannelSpec(psm=1234), handler=on_coc
spec=l2cap.LeCreditBasedChannelSpec(psm=1234), handler=on_coc
)
l2cap_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(psm)
spec=l2cap.LeCreditBasedChannelSpec(psm)
)
messages = (bytes([1, 2, 3]), bytes([4, 5, 6]), bytes(10000))
@@ -137,6 +144,41 @@ async def test_basic_connection():
assert sent_bytes == received_bytes
# -----------------------------------------------------------------------------
@pytest.mark.parametrize("info_type,", list(l2cap.L2CAP_Information_Request.InfoType))
async def test_l2cap_information_request(monkeypatch, info_type):
# TODO: Replace handlers with API when implemented
devices = await TwoDevices.create_with_connection()
# Register handlers
info_rsp = list[l2cap.L2CAP_Information_Response]()
def on_l2cap_information_response(connection, cid, frame):
info_rsp.append(frame)
assert (connection := devices.connections[0])
channel_manager = devices[0].l2cap_channel_manager
monkeypatch.setattr(
channel_manager,
'on_l2cap_information_response',
on_l2cap_information_response,
raising=False,
)
channel_manager.send_control_frame(
connection,
l2cap.L2CAP_LE_SIGNALING_CID,
l2cap.L2CAP_Information_Request(
identifier=channel_manager.next_identifier(connection),
info_type=info_type,
),
)
await async_barrier()
response = info_rsp[0]
assert response.result == l2cap.L2CAP_Information_Response.Result.SUCCESS
# -----------------------------------------------------------------------------
async def transfer_payload(max_credits, mtu, mps):
devices = TwoDevices()
@@ -151,11 +193,11 @@ async def transfer_payload(max_credits, mtu, mps):
channel.sink = on_data
server = devices.devices[1].create_l2cap_server(
spec=LeCreditBasedChannelSpec(max_credits=max_credits, mtu=mtu, mps=mps),
spec=l2cap.LeCreditBasedChannelSpec(max_credits=max_credits, mtu=mtu, mps=mps),
handler=on_coc,
)
l2cap_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(server.psm)
spec=l2cap.LeCreditBasedChannelSpec(server.psm)
)
messages = [bytes([1, 2, 3, 4, 5, 6, 7]) * x for x in (3, 10, 100, 789)]
@@ -205,10 +247,10 @@ async def test_bidirectional_transfer():
client_received.append(data)
server = devices.devices[1].create_l2cap_server(
spec=LeCreditBasedChannelSpec(), handler=on_server_coc
spec=l2cap.LeCreditBasedChannelSpec(), handler=on_server_coc
)
client_channel = await devices.connections[0].create_l2cap_channel(
spec=LeCreditBasedChannelSpec(server.psm)
spec=l2cap.LeCreditBasedChannelSpec(server.psm)
)
client_channel.sink = on_client_data
@@ -242,10 +284,10 @@ async def test_mtu():
channel.on('open', lambda: on_channel_open(channel))
server = devices.devices[1].create_l2cap_server(
spec=ClassicChannelSpec(mtu=345), handler=on_channel
spec=l2cap.ClassicChannelSpec(mtu=345), handler=on_channel
)
client_channel = await devices.connections[0].create_l2cap_channel(
spec=ClassicChannelSpec(server.psm, mtu=456)
spec=l2cap.ClassicChannelSpec(server.psm, mtu=456)
)
assert client_channel.peer_mtu == 345