diff --git a/bumble/att.py b/bumble/att.py index df0fbdf1..2a3c6dd2 100644 --- a/bumble/att.py +++ b/bumble/att.py @@ -223,7 +223,12 @@ UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731 # Exceptions # ----------------------------------------------------------------------------- class ATT_Error(ProtocolError): - def __init__(self, error_code, att_handle=0x0000, message=''): + error_code: int + att_handle: int + + def __init__( + self, error_code: int, att_handle: int = 0x0000, message: str = '' + ) -> None: super().__init__( error_code, error_namespace='att', @@ -233,7 +238,10 @@ class ATT_Error(ProtocolError): self.message = message def __str__(self): - return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}' + return ( + f'ATT_Error(error={self.error_name}, ' + f'handle={self.att_handle:04X}): {self.message}' + ) # ----------------------------------------------------------------------------- diff --git a/bumble/gatt.py b/bumble/gatt.py index 106d7c8d..3dacbf7a 100644 --- a/bumble/gatt.py +++ b/bumble/gatt.py @@ -286,6 +286,22 @@ GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-32 GATT_ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume') GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT') +# Apple Notification Center Service +GATT_ANCS_SERVICE = UUID('7905F431-B5CE-4E99-A40F-4B1E122D00D0', 'Apple Notification Center') +GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC = UUID('9FBF120D-6301-42D9-8C58-25E699A21DBD', 'Notification Source') +GATT_ANCS_CONTROL_POINT_CHARACTERISTIC = UUID('69D1D8F3-45E1-49A8-9821-9BBDFDAAD9D9', 'Control Point') +GATT_ANCS_DATA_SOURCE_CHARACTERISTIC = UUID('22EAC6E9-24D6-4BB5-BE44-B36ACE7C7BFB', 'Data Source') + +# Apple Media Service +GATT_AMS_SERVICE = UUID('89D3502B-0F36-433A-8EF4-C502AD55F8DC', 'Apple Media') +GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC = UUID('9B3C81D8-57B1-4A8A-B8DF-0E56F7CA51C2', 'Remote Command') +GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC = UUID('2F7CABCE-808D-411F-9A0C-BB92BA96C102', 'Entity Update') +GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC = UUID('C6B2F38C-23AB-46D8-A6AB-A3A870BBD5D7', 'Entity Attribute') + +# Misc Apple Services +GATT_APPLE_CONTINUITY_SERVICE = UUID('D0611E78-BBB4-4591-A5F8-487910AE4366', 'Apple Continuity') +GATT_APPLE_NEARBY_SERVICE = UUID('9FA480E0-4967-4542-9390-D343DC5D04AE', 'Apple Nearby') + # Misc GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name') GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance') diff --git a/bumble/profiles/ancs.py b/bumble/profiles/ancs.py new file mode 100644 index 00000000..d9eac9f9 --- /dev/null +++ b/bumble/profiles/ancs.py @@ -0,0 +1,514 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Apple Notification Center Service (ANCS). +""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +from __future__ import annotations +import asyncio +import dataclasses +import datetime +import enum +import logging +import struct +from typing import Optional, Sequence, Union + +from pyee import EventEmitter + +from bumble.att import ATT_Error +from bumble.device import Peer +from bumble.gatt import ( + Characteristic, + GATT_ANCS_SERVICE, + GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC, + GATT_ANCS_CONTROL_POINT_CHARACTERISTIC, + GATT_ANCS_DATA_SOURCE_CHARACTERISTIC, + TemplateService, +) +from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy +from bumble.gatt_adapters import SerializableCharacteristicProxyAdapter +from bumble.utils import OpenIntEnum + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +_DEFAULT_ATTRIBUTE_MAX_LENGTH = 65535 + + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Protocol +# ----------------------------------------------------------------------------- +class ActionId(OpenIntEnum): + POSITIVE = 0 + NEGATIVE = 1 + + +class AppAttributeId(OpenIntEnum): + DISPLAY_NAME = 0 + + +class CategoryId(OpenIntEnum): + OTHER = 0 + INCOMING_CALL = 1 + MISSED_CALL = 2 + VOICEMAIL = 3 + SOCIAL = 4 + SCHEDULE = 5 + EMAIL = 6 + NEWS = 7 + HEALTH_AND_FITNESS = 8 + BUSINESS_AND_FINANCE = 9 + LOCATION = 10 + ENTERTAINMENT = 11 + + +class CommandId(OpenIntEnum): + GET_NOTIFICATION_ATTRIBUTES = 0 + GET_APP_ATTRIBUTES = 1 + PERFORM_NOTIFICATION_ACTION = 2 + + +class EventId(OpenIntEnum): + NOTIFICATION_ADDED = 0 + NOTIFICATION_MODIFIED = 1 + NOTIFICATION_REMOVED = 2 + + +class EventFlags(enum.IntFlag): + SILENT = 1 << 0 + IMPORTANT = 1 << 1 + PRE_EXISTING = 1 << 2 + POSITIVE_ACTION = 1 << 3 + NEGATIVE_ACTION = 1 << 4 + + +class NotificationAttributeId(OpenIntEnum): + APP_IDENTIFIER = 0 + TITLE = 1 + SUBTITLE = 2 + MESSAGE = 3 + MESSAGE_SIZE = 4 + DATE = 5 + POSITIVE_ACTION_LABEL = 6 + NEGATIVE_ACTION_LABEL = 7 + + +@dataclasses.dataclass +class NotificationAttribute: + attribute_id: NotificationAttributeId + value: Union[str, int, datetime.datetime] + + +@dataclasses.dataclass +class AppAttribute: + attribute_id: AppAttributeId + value: str + + +@dataclasses.dataclass +class Notification: + event_id: EventId + event_flags: EventFlags + category_id: CategoryId + category_count: int + notification_uid: int + + @classmethod + def from_bytes(cls, data: bytes) -> Notification: + return cls( + event_id=EventId(data[0]), + event_flags=EventFlags(data[1]), + category_id=CategoryId(data[2]), + category_count=data[3], + notification_uid=int.from_bytes(data[4:8], 'little'), + ) + + def __bytes__(self) -> bytes: + return struct.pack( + " None: + self.error_code = error_code + + def __str__(self) -> str: + return f"CommandError(error_code={self.error_code.name})" + + +# ----------------------------------------------------------------------------- +# GATT Server-side +# ----------------------------------------------------------------------------- +class Ancs(TemplateService): + UUID = GATT_ANCS_SERVICE + + notification_source_characteristic: Characteristic + data_source_characteristic: Characteristic + control_point_characteristic: Characteristic + + def __init__(self) -> None: + # TODO not the final implementation + self.notification_source_characteristic = Characteristic( + GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC, + Characteristic.Properties.NOTIFY, + Characteristic.Permissions.READABLE, + ) + + # TODO not the final implementation + self.data_source_characteristic = Characteristic( + GATT_ANCS_DATA_SOURCE_CHARACTERISTIC, + Characteristic.Properties.NOTIFY, + Characteristic.Permissions.READABLE, + ) + + # TODO not the final implementation + self.control_point_characteristic = Characteristic( + GATT_ANCS_CONTROL_POINT_CHARACTERISTIC, + Characteristic.Properties.WRITE, + Characteristic.Permissions.WRITEABLE, + ) + + super().__init__( + [ + self.notification_source_characteristic, + self.data_source_characteristic, + self.control_point_characteristic, + ] + ) + + +# ----------------------------------------------------------------------------- +# GATT Client-side +# ----------------------------------------------------------------------------- +class AncsProxy(ProfileServiceProxy): + SERVICE_CLASS = Ancs + + notification_source: CharacteristicProxy[Notification] + data_source: CharacteristicProxy + control_point: CharacteristicProxy[bytes] + + def __init__(self, service_proxy: ServiceProxy): + self.notification_source = SerializableCharacteristicProxyAdapter( + service_proxy.get_required_characteristic_by_uuid( + GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC + ), + Notification, + ) + + self.data_source = service_proxy.get_required_characteristic_by_uuid( + GATT_ANCS_DATA_SOURCE_CHARACTERISTIC + ) + + self.control_point = service_proxy.get_required_characteristic_by_uuid( + GATT_ANCS_CONTROL_POINT_CHARACTERISTIC + ) + + +class AncsClient(EventEmitter): + _expected_response_command_id: Optional[CommandId] + _expected_response_notification_uid: Optional[int] + _expected_response_app_identifier: Optional[str] + _expected_app_identifier: Optional[str] + _expected_response_tuples: int + _response_accumulator: bytes + + def __init__(self, ancs_proxy: AncsProxy) -> None: + super().__init__() + self._ancs_proxy = ancs_proxy + self._command_semaphore = asyncio.Semaphore() + self._response: Optional[asyncio.Future] = None + self._reset_response() + self._started = False + + @classmethod + async def for_peer(cls, peer: Peer) -> Optional[AncsClient]: + ancs_proxy = await peer.discover_service_and_create_proxy(AncsProxy) + if ancs_proxy is None: + return None + return cls(ancs_proxy) + + async def start(self) -> None: + await self._ancs_proxy.notification_source.subscribe(self._on_notification) + await self._ancs_proxy.data_source.subscribe(self._on_data) + self._started = True + + async def stop(self) -> None: + await self._ancs_proxy.notification_source.unsubscribe(self._on_notification) + await self._ancs_proxy.data_source.unsubscribe(self._on_data) + self._started = False + + def _reset_response(self) -> None: + self._expected_response_command_id = None + self._expected_response_notification_uid = None + self._expected_app_identifier = None + self._expected_response_tuples = 0 + self._response_accumulator = b"" + + def _on_notification(self, notification: Notification) -> None: + logger.debug(f"ANCS NOTIFICATION: {notification}") + self.emit("notification", notification) + + def _on_data(self, data: bytes) -> None: + logger.debug(f"ANCS DATA: {data.hex()}") + + if not self._response: + logger.warning("received unexpected data, discarding") + return + + self._response_accumulator += data + + # Try to parse the accumulated data until we have all we need. + if not self._response_accumulator: + logger.warning("empty data from data source") + return + + command_id = self._response_accumulator[0] + if command_id != self._expected_response_command_id: + logger.warning( + "unexpected response command id: " + f"expected {self._expected_response_command_id} " + f"but got {command_id}" + ) + self._reset_response() + if not self._response.done(): + self._response.set_exception(ProtocolError()) + + if len(self._response_accumulator) < 5: + # Not enough data yet. + return + + attributes: list[Union[NotificationAttribute, AppAttribute]] = [] + + if command_id == CommandId.GET_NOTIFICATION_ATTRIBUTES: + (notification_uid,) = struct.unpack_from( + "= 3: + attribute_id, attribute_data_length = struct.unpack_from( + "= 3: + attribute_id, attribute_data_length = struct.unpack_from( + " None: + try: + await self._ancs_proxy.control_point.write_value( + command, with_response=True + ) + except ATT_Error as error: + raise CommandError(error_code=ErrorCode(error.error_code)) from error + + async def get_notification_attributes( + self, + notification_uid: int, + attributes: Sequence[ + Union[NotificationAttributeId, tuple[NotificationAttributeId, int]] + ], + ) -> list[NotificationAttribute]: + if not self._started: + raise RuntimeError("client not started") + + command = struct.pack( + " list[AppAttribute]: + if not self._started: + raise RuntimeError("client not started") + + command = ( + bytes([CommandId.GET_APP_ATTRIBUTES]) + + app_identifier.encode("utf-8") + + b"\0" + ) + for attribute_id in attributes: + command += struct.pack("B", attribute_id) + + try: + async with self._command_semaphore: + self._expected_response_app_identifier = app_identifier + self._expected_response_tuples = len(attributes) + self._expected_response_command_id = CommandId.GET_APP_ATTRIBUTES + self._response = asyncio.Future() + + # Send the command. + await self._send_command(command) + + # Wait for the response. + return await self._response + finally: + self._reset_response() + + async def perform_action(self, notification_uid: int, action: ActionId) -> None: + if not self._started: + raise RuntimeError("client not started") + + command = struct.pack( + " None: + return await self.perform_action(notification_uid, ActionId.POSITIVE) + + async def perform_negative_action(self, notification_uid: int) -> None: + return await self.perform_action(notification_uid, ActionId.NEGATIVE) diff --git a/examples/run_ancs_client.py b/examples/run_ancs_client.py new file mode 100644 index 00000000..e6ec1082 --- /dev/null +++ b/examples/run_ancs_client.py @@ -0,0 +1,215 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import sys +import os +import logging +from bumble.colors import color + +from bumble.device import Device, Peer +from bumble.transport import open_transport +from bumble.profiles.ancs import ( + AncsClient, + AppAttribute, + AppAttributeId, + EventFlags, + EventId, + Notification, + NotificationAttributeId, +) + + +# ----------------------------------------------------------------------------- +_cached_app_names: dict[str, str] = {} +_notification_queue = asyncio.Queue[Notification]() + + +async def process_notifications(ancs_client: AncsClient): + while True: + notification = await _notification_queue.get() + + prefix = " " + if notification.event_id == EventId.NOTIFICATION_ADDED: + print_color = "green" + if notification.event_flags & EventFlags.PRE_EXISTING: + prefix = " Existing " + else: + prefix = " New " + elif notification.event_id == EventId.NOTIFICATION_REMOVED: + print_color = "red" + elif notification.event_id == EventId.NOTIFICATION_MODIFIED: + print_color = "yellow" + else: + print_color = "white" + + print( + color( + ( + f"[{notification.event_id.name}]{prefix}Notification " + f"({notification.notification_uid}):" + ), + print_color, + ) + ) + print(color(" Event ID: ", "yellow"), notification.event_id.name) + print(color(" Event Flags: ", "yellow"), notification.event_flags.name) + print(color(" Category ID: ", "yellow"), notification.category_id.name) + print(color(" Category Count:", "yellow"), notification.category_count) + + if notification.event_id not in ( + EventId.NOTIFICATION_ADDED, + EventId.NOTIFICATION_MODIFIED, + ): + continue + + requested_attributes = [ + NotificationAttributeId.APP_IDENTIFIER, + NotificationAttributeId.TITLE, + NotificationAttributeId.SUBTITLE, + NotificationAttributeId.MESSAGE, + NotificationAttributeId.DATE, + ] + if notification.event_flags & EventFlags.NEGATIVE_ACTION: + requested_attributes.append(NotificationAttributeId.NEGATIVE_ACTION_LABEL) + if notification.event_flags & EventFlags.POSITIVE_ACTION: + requested_attributes.append(NotificationAttributeId.POSITIVE_ACTION_LABEL) + + attributes = await ancs_client.get_notification_attributes( + notification.notification_uid, requested_attributes + ) + max_attribute_name_width = max( + (len(attribute.attribute_id.name) for attribute in attributes) + ) + app_identifier = str( + next( + ( + attribute.value + for attribute in attributes + if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER + ) + ) + ) + if app_identifier not in _cached_app_names: + app_attributes = await ancs_client.get_app_attributes( + app_identifier, [AppAttributeId.DISPLAY_NAME] + ) + _cached_app_names[app_identifier] = app_attributes[0].value + app_name = _cached_app_names[app_identifier] + + for attribute in attributes: + padding = ' ' * ( + max_attribute_name_width - len(attribute.attribute_id.name) + ) + suffix = ( + f" ({app_name})" + if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER + else "" + ) + print( + color(f" {attribute.attribute_id.name}:{padding}", "blue"), + f"{attribute.value}{suffix}", + ) + + print() + + +def on_ancs_notification(notification: Notification) -> None: + _notification_queue.put_nowait(notification) + + +async def handle_command_client( + ancs_client: AncsClient, reader: asyncio.StreamReader, writer: asyncio.StreamWriter +) -> None: + while True: + command = (await reader.readline()).decode("utf-8").strip() + + try: + command_name, command_args = command.split(" ", 1) + if command_name == "+": + notification_uid = int(command_args) + await ancs_client.perform_positive_action(notification_uid) + elif command_name == "-": + notification_uid = int(command_args) + await ancs_client.perform_negative_action(notification_uid) + else: + writer.write(f"unknown command {command_name}".encode("utf-8")) + except Exception as error: + writer.write(f"ERROR: {error}\n".encode("utf-8")) + + +# ----------------------------------------------------------------------------- +async def main() -> None: + if len(sys.argv) < 3: + print( + 'Usage: run_ancs_client.py ' + ' ' + ) + print('example: run_ancs_client.py device1.json usb:0 E1:CA:72:48:C4:E8 512') + return + device_config, transport_spec, bluetooth_address, mtu = sys.argv[1:] + + print('<<< connecting to HCI...') + async with await open_transport(transport_spec) as hci_transport: + print('<<< connected') + + # Create a device to manage the host, with a custom listener + device = Device.from_config_file_with_hci( + device_config, hci_transport.source, hci_transport.sink + ) + await device.power_on() + + # Connect to the peer + print(f'=== Connecting to {bluetooth_address}...') + connection = await device.connect(bluetooth_address) + print(f'=== Connected: {connection}') + + await connection.encrypt() + + peer = Peer(connection) + mtu_int = int(mtu) + if mtu_int: + new_mtu = await peer.request_mtu(mtu_int) + print(f'ATT MTU = {new_mtu}') + ancs_client = await AncsClient.for_peer(peer) + if ancs_client is None: + print("!!! no ANCS service found") + return + await ancs_client.start() + + print('Subscribing to updates') + ancs_client.on("notification", on_ancs_notification) + + # Process all notifications in a task. + notification_processing_task = asyncio.create_task( + process_notifications(ancs_client) + ) + + # Accept a TCP connection to handle commands. + tcp_server = await asyncio.start_server( + lambda reader, writer: handle_command_client(ancs_client, reader, writer), + '127.0.0.1', + 9000, + ) + print("Accepting command client on port 9000") + async with tcp_server: + await tcp_server.serve_forever() + + +# ----------------------------------------------------------------------------- +logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) +asyncio.run(main())