forked from auracaster/bumble_mirror
Initial support for ANCS client functionality
This commit is contained in:
@@ -223,7 +223,12 @@ UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731
|
|||||||
# Exceptions
|
# Exceptions
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
class ATT_Error(ProtocolError):
|
class ATT_Error(ProtocolError):
|
||||||
def __init__(self, error_code, att_handle=0x0000, message=''):
|
error_code: int
|
||||||
|
att_handle: int
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, error_code: int, att_handle: int = 0x0000, message: str = ''
|
||||||
|
) -> None:
|
||||||
super().__init__(
|
super().__init__(
|
||||||
error_code,
|
error_code,
|
||||||
error_namespace='att',
|
error_namespace='att',
|
||||||
@@ -233,7 +238,10 @@ class ATT_Error(ProtocolError):
|
|||||||
self.message = message
|
self.message = message
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
|
return (
|
||||||
|
f'ATT_Error(error={self.error_name}, '
|
||||||
|
f'handle={self.att_handle:04X}): {self.message}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
|||||||
@@ -286,6 +286,22 @@ GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-32
|
|||||||
GATT_ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
|
GATT_ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
|
||||||
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
|
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
|
||||||
|
|
||||||
|
# Apple Notification Center Service
|
||||||
|
GATT_ANCS_SERVICE = UUID('7905F431-B5CE-4E99-A40F-4B1E122D00D0', 'Apple Notification Center')
|
||||||
|
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC = UUID('9FBF120D-6301-42D9-8C58-25E699A21DBD', 'Notification Source')
|
||||||
|
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC = UUID('69D1D8F3-45E1-49A8-9821-9BBDFDAAD9D9', 'Control Point')
|
||||||
|
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC = UUID('22EAC6E9-24D6-4BB5-BE44-B36ACE7C7BFB', 'Data Source')
|
||||||
|
|
||||||
|
# Apple Media Service
|
||||||
|
GATT_AMS_SERVICE = UUID('89D3502B-0F36-433A-8EF4-C502AD55F8DC', 'Apple Media')
|
||||||
|
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC = UUID('9B3C81D8-57B1-4A8A-B8DF-0E56F7CA51C2', 'Remote Command')
|
||||||
|
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC = UUID('2F7CABCE-808D-411F-9A0C-BB92BA96C102', 'Entity Update')
|
||||||
|
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC = UUID('C6B2F38C-23AB-46D8-A6AB-A3A870BBD5D7', 'Entity Attribute')
|
||||||
|
|
||||||
|
# Misc Apple Services
|
||||||
|
GATT_APPLE_CONTINUITY_SERVICE = UUID('D0611E78-BBB4-4591-A5F8-487910AE4366', 'Apple Continuity')
|
||||||
|
GATT_APPLE_NEARBY_SERVICE = UUID('9FA480E0-4967-4542-9390-D343DC5D04AE', 'Apple Nearby')
|
||||||
|
|
||||||
# Misc
|
# Misc
|
||||||
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
|
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
|
||||||
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
|
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
|
||||||
|
|||||||
514
bumble/profiles/ancs.py
Normal file
514
bumble/profiles/ancs.py
Normal file
@@ -0,0 +1,514 @@
|
|||||||
|
# Copyright 2025 Google LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
"""
|
||||||
|
Apple Notification Center Service (ANCS).
|
||||||
|
"""
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Imports
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
from __future__ import annotations
|
||||||
|
import asyncio
|
||||||
|
import dataclasses
|
||||||
|
import datetime
|
||||||
|
import enum
|
||||||
|
import logging
|
||||||
|
import struct
|
||||||
|
from typing import Optional, Sequence, Union
|
||||||
|
|
||||||
|
from pyee import EventEmitter
|
||||||
|
|
||||||
|
from bumble.att import ATT_Error
|
||||||
|
from bumble.device import Peer
|
||||||
|
from bumble.gatt import (
|
||||||
|
Characteristic,
|
||||||
|
GATT_ANCS_SERVICE,
|
||||||
|
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC,
|
||||||
|
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC,
|
||||||
|
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC,
|
||||||
|
TemplateService,
|
||||||
|
)
|
||||||
|
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
|
||||||
|
from bumble.gatt_adapters import SerializableCharacteristicProxyAdapter
|
||||||
|
from bumble.utils import OpenIntEnum
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Constants
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
_DEFAULT_ATTRIBUTE_MAX_LENGTH = 65535
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Logging
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Protocol
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class ActionId(OpenIntEnum):
|
||||||
|
POSITIVE = 0
|
||||||
|
NEGATIVE = 1
|
||||||
|
|
||||||
|
|
||||||
|
class AppAttributeId(OpenIntEnum):
|
||||||
|
DISPLAY_NAME = 0
|
||||||
|
|
||||||
|
|
||||||
|
class CategoryId(OpenIntEnum):
|
||||||
|
OTHER = 0
|
||||||
|
INCOMING_CALL = 1
|
||||||
|
MISSED_CALL = 2
|
||||||
|
VOICEMAIL = 3
|
||||||
|
SOCIAL = 4
|
||||||
|
SCHEDULE = 5
|
||||||
|
EMAIL = 6
|
||||||
|
NEWS = 7
|
||||||
|
HEALTH_AND_FITNESS = 8
|
||||||
|
BUSINESS_AND_FINANCE = 9
|
||||||
|
LOCATION = 10
|
||||||
|
ENTERTAINMENT = 11
|
||||||
|
|
||||||
|
|
||||||
|
class CommandId(OpenIntEnum):
|
||||||
|
GET_NOTIFICATION_ATTRIBUTES = 0
|
||||||
|
GET_APP_ATTRIBUTES = 1
|
||||||
|
PERFORM_NOTIFICATION_ACTION = 2
|
||||||
|
|
||||||
|
|
||||||
|
class EventId(OpenIntEnum):
|
||||||
|
NOTIFICATION_ADDED = 0
|
||||||
|
NOTIFICATION_MODIFIED = 1
|
||||||
|
NOTIFICATION_REMOVED = 2
|
||||||
|
|
||||||
|
|
||||||
|
class EventFlags(enum.IntFlag):
|
||||||
|
SILENT = 1 << 0
|
||||||
|
IMPORTANT = 1 << 1
|
||||||
|
PRE_EXISTING = 1 << 2
|
||||||
|
POSITIVE_ACTION = 1 << 3
|
||||||
|
NEGATIVE_ACTION = 1 << 4
|
||||||
|
|
||||||
|
|
||||||
|
class NotificationAttributeId(OpenIntEnum):
|
||||||
|
APP_IDENTIFIER = 0
|
||||||
|
TITLE = 1
|
||||||
|
SUBTITLE = 2
|
||||||
|
MESSAGE = 3
|
||||||
|
MESSAGE_SIZE = 4
|
||||||
|
DATE = 5
|
||||||
|
POSITIVE_ACTION_LABEL = 6
|
||||||
|
NEGATIVE_ACTION_LABEL = 7
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class NotificationAttribute:
|
||||||
|
attribute_id: NotificationAttributeId
|
||||||
|
value: Union[str, int, datetime.datetime]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class AppAttribute:
|
||||||
|
attribute_id: AppAttributeId
|
||||||
|
value: str
|
||||||
|
|
||||||
|
|
||||||
|
@dataclasses.dataclass
|
||||||
|
class Notification:
|
||||||
|
event_id: EventId
|
||||||
|
event_flags: EventFlags
|
||||||
|
category_id: CategoryId
|
||||||
|
category_count: int
|
||||||
|
notification_uid: int
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, data: bytes) -> Notification:
|
||||||
|
return cls(
|
||||||
|
event_id=EventId(data[0]),
|
||||||
|
event_flags=EventFlags(data[1]),
|
||||||
|
category_id=CategoryId(data[2]),
|
||||||
|
category_count=data[3],
|
||||||
|
notification_uid=int.from_bytes(data[4:8], 'little'),
|
||||||
|
)
|
||||||
|
|
||||||
|
def __bytes__(self) -> bytes:
|
||||||
|
return struct.pack(
|
||||||
|
"<BBBBI",
|
||||||
|
self.event_id,
|
||||||
|
self.event_flags,
|
||||||
|
self.category_id,
|
||||||
|
self.category_count,
|
||||||
|
self.notification_uid,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ErrorCode(OpenIntEnum):
|
||||||
|
UNKNOWN_COMMAND = 0xA0
|
||||||
|
INVALID_COMMAND = 0xA1
|
||||||
|
INVALID_PARAMETER = 0xA2
|
||||||
|
ACTION_FAILED = 0xA3
|
||||||
|
|
||||||
|
|
||||||
|
class ProtocolError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CommandError(Exception):
|
||||||
|
def __init__(self, error_code: ErrorCode) -> None:
|
||||||
|
self.error_code = error_code
|
||||||
|
|
||||||
|
def __str__(self) -> str:
|
||||||
|
return f"CommandError(error_code={self.error_code.name})"
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# GATT Server-side
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class Ancs(TemplateService):
|
||||||
|
UUID = GATT_ANCS_SERVICE
|
||||||
|
|
||||||
|
notification_source_characteristic: Characteristic
|
||||||
|
data_source_characteristic: Characteristic
|
||||||
|
control_point_characteristic: Characteristic
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
# TODO not the final implementation
|
||||||
|
self.notification_source_characteristic = Characteristic(
|
||||||
|
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC,
|
||||||
|
Characteristic.Properties.NOTIFY,
|
||||||
|
Characteristic.Permissions.READABLE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO not the final implementation
|
||||||
|
self.data_source_characteristic = Characteristic(
|
||||||
|
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC,
|
||||||
|
Characteristic.Properties.NOTIFY,
|
||||||
|
Characteristic.Permissions.READABLE,
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO not the final implementation
|
||||||
|
self.control_point_characteristic = Characteristic(
|
||||||
|
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC,
|
||||||
|
Characteristic.Properties.WRITE,
|
||||||
|
Characteristic.Permissions.WRITEABLE,
|
||||||
|
)
|
||||||
|
|
||||||
|
super().__init__(
|
||||||
|
[
|
||||||
|
self.notification_source_characteristic,
|
||||||
|
self.data_source_characteristic,
|
||||||
|
self.control_point_characteristic,
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# GATT Client-side
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
class AncsProxy(ProfileServiceProxy):
|
||||||
|
SERVICE_CLASS = Ancs
|
||||||
|
|
||||||
|
notification_source: CharacteristicProxy[Notification]
|
||||||
|
data_source: CharacteristicProxy
|
||||||
|
control_point: CharacteristicProxy[bytes]
|
||||||
|
|
||||||
|
def __init__(self, service_proxy: ServiceProxy):
|
||||||
|
self.notification_source = SerializableCharacteristicProxyAdapter(
|
||||||
|
service_proxy.get_required_characteristic_by_uuid(
|
||||||
|
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC
|
||||||
|
),
|
||||||
|
Notification,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.data_source = service_proxy.get_required_characteristic_by_uuid(
|
||||||
|
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC
|
||||||
|
)
|
||||||
|
|
||||||
|
self.control_point = service_proxy.get_required_characteristic_by_uuid(
|
||||||
|
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class AncsClient(EventEmitter):
|
||||||
|
_expected_response_command_id: Optional[CommandId]
|
||||||
|
_expected_response_notification_uid: Optional[int]
|
||||||
|
_expected_response_app_identifier: Optional[str]
|
||||||
|
_expected_app_identifier: Optional[str]
|
||||||
|
_expected_response_tuples: int
|
||||||
|
_response_accumulator: bytes
|
||||||
|
|
||||||
|
def __init__(self, ancs_proxy: AncsProxy) -> None:
|
||||||
|
super().__init__()
|
||||||
|
self._ancs_proxy = ancs_proxy
|
||||||
|
self._command_semaphore = asyncio.Semaphore()
|
||||||
|
self._response: Optional[asyncio.Future] = None
|
||||||
|
self._reset_response()
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
async def for_peer(cls, peer: Peer) -> Optional[AncsClient]:
|
||||||
|
ancs_proxy = await peer.discover_service_and_create_proxy(AncsProxy)
|
||||||
|
if ancs_proxy is None:
|
||||||
|
return None
|
||||||
|
return cls(ancs_proxy)
|
||||||
|
|
||||||
|
async def start(self) -> None:
|
||||||
|
await self._ancs_proxy.notification_source.subscribe(self._on_notification)
|
||||||
|
await self._ancs_proxy.data_source.subscribe(self._on_data)
|
||||||
|
self._started = True
|
||||||
|
|
||||||
|
async def stop(self) -> None:
|
||||||
|
await self._ancs_proxy.notification_source.unsubscribe(self._on_notification)
|
||||||
|
await self._ancs_proxy.data_source.unsubscribe(self._on_data)
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
def _reset_response(self) -> None:
|
||||||
|
self._expected_response_command_id = None
|
||||||
|
self._expected_response_notification_uid = None
|
||||||
|
self._expected_app_identifier = None
|
||||||
|
self._expected_response_tuples = 0
|
||||||
|
self._response_accumulator = b""
|
||||||
|
|
||||||
|
def _on_notification(self, notification: Notification) -> None:
|
||||||
|
logger.debug(f"ANCS NOTIFICATION: {notification}")
|
||||||
|
self.emit("notification", notification)
|
||||||
|
|
||||||
|
def _on_data(self, data: bytes) -> None:
|
||||||
|
logger.debug(f"ANCS DATA: {data.hex()}")
|
||||||
|
|
||||||
|
if not self._response:
|
||||||
|
logger.warning("received unexpected data, discarding")
|
||||||
|
return
|
||||||
|
|
||||||
|
self._response_accumulator += data
|
||||||
|
|
||||||
|
# Try to parse the accumulated data until we have all we need.
|
||||||
|
if not self._response_accumulator:
|
||||||
|
logger.warning("empty data from data source")
|
||||||
|
return
|
||||||
|
|
||||||
|
command_id = self._response_accumulator[0]
|
||||||
|
if command_id != self._expected_response_command_id:
|
||||||
|
logger.warning(
|
||||||
|
"unexpected response command id: "
|
||||||
|
f"expected {self._expected_response_command_id} "
|
||||||
|
f"but got {command_id}"
|
||||||
|
)
|
||||||
|
self._reset_response()
|
||||||
|
if not self._response.done():
|
||||||
|
self._response.set_exception(ProtocolError())
|
||||||
|
|
||||||
|
if len(self._response_accumulator) < 5:
|
||||||
|
# Not enough data yet.
|
||||||
|
return
|
||||||
|
|
||||||
|
attributes: list[Union[NotificationAttribute, AppAttribute]] = []
|
||||||
|
|
||||||
|
if command_id == CommandId.GET_NOTIFICATION_ATTRIBUTES:
|
||||||
|
(notification_uid,) = struct.unpack_from(
|
||||||
|
"<I", self._response_accumulator, 1
|
||||||
|
)
|
||||||
|
if notification_uid != self._expected_response_notification_uid:
|
||||||
|
logger.warning(
|
||||||
|
"unexpected response notification uid: "
|
||||||
|
f"expected {self._expected_response_notification_uid} "
|
||||||
|
f"but got {notification_uid}"
|
||||||
|
)
|
||||||
|
self._reset_response()
|
||||||
|
if not self._response.done():
|
||||||
|
self._response.set_exception(ProtocolError())
|
||||||
|
|
||||||
|
attribute_data = self._response_accumulator[5:]
|
||||||
|
while len(attribute_data) >= 3:
|
||||||
|
attribute_id, attribute_data_length = struct.unpack_from(
|
||||||
|
"<BH", attribute_data, 0
|
||||||
|
)
|
||||||
|
if len(attribute_data) < 3 + attribute_data_length:
|
||||||
|
return
|
||||||
|
str_value = attribute_data[3 : 3 + attribute_data_length].decode(
|
||||||
|
"utf-8"
|
||||||
|
)
|
||||||
|
value: Union[str, int, datetime.datetime]
|
||||||
|
if attribute_id == NotificationAttributeId.MESSAGE_SIZE:
|
||||||
|
value = int(str_value)
|
||||||
|
elif attribute_id == NotificationAttributeId.DATE:
|
||||||
|
year = int(str_value[:4])
|
||||||
|
month = int(str_value[4:6])
|
||||||
|
day = int(str_value[6:8])
|
||||||
|
hour = int(str_value[9:11])
|
||||||
|
minute = int(str_value[11:13])
|
||||||
|
second = int(str_value[13:15])
|
||||||
|
value = datetime.datetime(year, month, day, hour, minute, second)
|
||||||
|
else:
|
||||||
|
value = str_value
|
||||||
|
attributes.append(
|
||||||
|
NotificationAttribute(NotificationAttributeId(attribute_id), value)
|
||||||
|
)
|
||||||
|
attribute_data = attribute_data[3 + attribute_data_length :]
|
||||||
|
elif command_id == CommandId.GET_APP_ATTRIBUTES:
|
||||||
|
if 0 not in self._response_accumulator[1:]:
|
||||||
|
# No null-terminated string yet.
|
||||||
|
return
|
||||||
|
|
||||||
|
app_identifier_length = self._response_accumulator.find(0, 1) - 1
|
||||||
|
app_identifier = self._response_accumulator[
|
||||||
|
1 : 1 + app_identifier_length
|
||||||
|
].decode("utf-8")
|
||||||
|
if app_identifier != self._expected_response_app_identifier:
|
||||||
|
logger.warning(
|
||||||
|
"unexpected response app identifier: "
|
||||||
|
f"expected {self._expected_response_app_identifier} "
|
||||||
|
f"but got {app_identifier}"
|
||||||
|
)
|
||||||
|
self._reset_response()
|
||||||
|
if not self._response.done():
|
||||||
|
self._response.set_exception(ProtocolError())
|
||||||
|
|
||||||
|
attribute_data = self._response_accumulator[1 + app_identifier_length + 1 :]
|
||||||
|
while len(attribute_data) >= 3:
|
||||||
|
attribute_id, attribute_data_length = struct.unpack_from(
|
||||||
|
"<BH", attribute_data, 0
|
||||||
|
)
|
||||||
|
if len(attribute_data) < 3 + attribute_data_length:
|
||||||
|
return
|
||||||
|
attributes.append(
|
||||||
|
AppAttribute(
|
||||||
|
AppAttributeId(attribute_id),
|
||||||
|
attribute_data[3 : 3 + attribute_data_length].decode("utf-8"),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
attribute_data = attribute_data[3 + attribute_data_length :]
|
||||||
|
else:
|
||||||
|
logger.warning(f"unexpected response command id {command_id}")
|
||||||
|
return
|
||||||
|
|
||||||
|
if len(attributes) < self._expected_response_tuples:
|
||||||
|
# We have not received all the tuples yet.
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._response.done():
|
||||||
|
self._response.set_result(attributes)
|
||||||
|
|
||||||
|
async def _send_command(self, command: bytes) -> None:
|
||||||
|
try:
|
||||||
|
await self._ancs_proxy.control_point.write_value(
|
||||||
|
command, with_response=True
|
||||||
|
)
|
||||||
|
except ATT_Error as error:
|
||||||
|
raise CommandError(error_code=ErrorCode(error.error_code)) from error
|
||||||
|
|
||||||
|
async def get_notification_attributes(
|
||||||
|
self,
|
||||||
|
notification_uid: int,
|
||||||
|
attributes: Sequence[
|
||||||
|
Union[NotificationAttributeId, tuple[NotificationAttributeId, int]]
|
||||||
|
],
|
||||||
|
) -> list[NotificationAttribute]:
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("client not started")
|
||||||
|
|
||||||
|
command = struct.pack(
|
||||||
|
"<BI", CommandId.GET_NOTIFICATION_ATTRIBUTES, notification_uid
|
||||||
|
)
|
||||||
|
for attribute in attributes:
|
||||||
|
attribute_max_length = 0
|
||||||
|
if isinstance(attribute, tuple):
|
||||||
|
attribute_id, attribute_max_length = attribute
|
||||||
|
if attribute_id not in (
|
||||||
|
NotificationAttributeId.TITLE,
|
||||||
|
NotificationAttributeId.SUBTITLE,
|
||||||
|
NotificationAttributeId.MESSAGE,
|
||||||
|
):
|
||||||
|
raise ValueError(
|
||||||
|
"this attribute does not allow specifying a max length"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
attribute_id = attribute
|
||||||
|
if attribute_id in (
|
||||||
|
NotificationAttributeId.TITLE,
|
||||||
|
NotificationAttributeId.SUBTITLE,
|
||||||
|
NotificationAttributeId.MESSAGE,
|
||||||
|
):
|
||||||
|
attribute_max_length = _DEFAULT_ATTRIBUTE_MAX_LENGTH
|
||||||
|
|
||||||
|
if attribute_max_length:
|
||||||
|
command += struct.pack("<BH", attribute_id, attribute_max_length)
|
||||||
|
else:
|
||||||
|
command += struct.pack("B", attribute_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with self._command_semaphore:
|
||||||
|
self._expected_response_notification_uid = notification_uid
|
||||||
|
self._expected_response_tuples = len(attributes)
|
||||||
|
self._expected_response_command_id = (
|
||||||
|
CommandId.GET_NOTIFICATION_ATTRIBUTES
|
||||||
|
)
|
||||||
|
self._response = asyncio.Future()
|
||||||
|
|
||||||
|
# Send the command.
|
||||||
|
await self._send_command(command)
|
||||||
|
|
||||||
|
# Wait for the response.
|
||||||
|
return await self._response
|
||||||
|
finally:
|
||||||
|
self._reset_response()
|
||||||
|
|
||||||
|
async def get_app_attributes(
|
||||||
|
self, app_identifier: str, attributes: Sequence[AppAttributeId]
|
||||||
|
) -> list[AppAttribute]:
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("client not started")
|
||||||
|
|
||||||
|
command = (
|
||||||
|
bytes([CommandId.GET_APP_ATTRIBUTES])
|
||||||
|
+ app_identifier.encode("utf-8")
|
||||||
|
+ b"\0"
|
||||||
|
)
|
||||||
|
for attribute_id in attributes:
|
||||||
|
command += struct.pack("B", attribute_id)
|
||||||
|
|
||||||
|
try:
|
||||||
|
async with self._command_semaphore:
|
||||||
|
self._expected_response_app_identifier = app_identifier
|
||||||
|
self._expected_response_tuples = len(attributes)
|
||||||
|
self._expected_response_command_id = CommandId.GET_APP_ATTRIBUTES
|
||||||
|
self._response = asyncio.Future()
|
||||||
|
|
||||||
|
# Send the command.
|
||||||
|
await self._send_command(command)
|
||||||
|
|
||||||
|
# Wait for the response.
|
||||||
|
return await self._response
|
||||||
|
finally:
|
||||||
|
self._reset_response()
|
||||||
|
|
||||||
|
async def perform_action(self, notification_uid: int, action: ActionId) -> None:
|
||||||
|
if not self._started:
|
||||||
|
raise RuntimeError("client not started")
|
||||||
|
|
||||||
|
command = struct.pack(
|
||||||
|
"<BIB", CommandId.PERFORM_NOTIFICATION_ACTION, notification_uid, action
|
||||||
|
)
|
||||||
|
|
||||||
|
async with self._command_semaphore:
|
||||||
|
await self._send_command(command)
|
||||||
|
|
||||||
|
async def perform_positive_action(self, notification_uid: int) -> None:
|
||||||
|
return await self.perform_action(notification_uid, ActionId.POSITIVE)
|
||||||
|
|
||||||
|
async def perform_negative_action(self, notification_uid: int) -> None:
|
||||||
|
return await self.perform_action(notification_uid, ActionId.NEGATIVE)
|
||||||
215
examples/run_ancs_client.py
Normal file
215
examples/run_ancs_client.py
Normal file
@@ -0,0 +1,215 @@
|
|||||||
|
# Copyright 2025 Google LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
# Imports
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
import asyncio
|
||||||
|
import sys
|
||||||
|
import os
|
||||||
|
import logging
|
||||||
|
from bumble.colors import color
|
||||||
|
|
||||||
|
from bumble.device import Device, Peer
|
||||||
|
from bumble.transport import open_transport
|
||||||
|
from bumble.profiles.ancs import (
|
||||||
|
AncsClient,
|
||||||
|
AppAttribute,
|
||||||
|
AppAttributeId,
|
||||||
|
EventFlags,
|
||||||
|
EventId,
|
||||||
|
Notification,
|
||||||
|
NotificationAttributeId,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
_cached_app_names: dict[str, str] = {}
|
||||||
|
_notification_queue = asyncio.Queue[Notification]()
|
||||||
|
|
||||||
|
|
||||||
|
async def process_notifications(ancs_client: AncsClient):
|
||||||
|
while True:
|
||||||
|
notification = await _notification_queue.get()
|
||||||
|
|
||||||
|
prefix = " "
|
||||||
|
if notification.event_id == EventId.NOTIFICATION_ADDED:
|
||||||
|
print_color = "green"
|
||||||
|
if notification.event_flags & EventFlags.PRE_EXISTING:
|
||||||
|
prefix = " Existing "
|
||||||
|
else:
|
||||||
|
prefix = " New "
|
||||||
|
elif notification.event_id == EventId.NOTIFICATION_REMOVED:
|
||||||
|
print_color = "red"
|
||||||
|
elif notification.event_id == EventId.NOTIFICATION_MODIFIED:
|
||||||
|
print_color = "yellow"
|
||||||
|
else:
|
||||||
|
print_color = "white"
|
||||||
|
|
||||||
|
print(
|
||||||
|
color(
|
||||||
|
(
|
||||||
|
f"[{notification.event_id.name}]{prefix}Notification "
|
||||||
|
f"({notification.notification_uid}):"
|
||||||
|
),
|
||||||
|
print_color,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
print(color(" Event ID: ", "yellow"), notification.event_id.name)
|
||||||
|
print(color(" Event Flags: ", "yellow"), notification.event_flags.name)
|
||||||
|
print(color(" Category ID: ", "yellow"), notification.category_id.name)
|
||||||
|
print(color(" Category Count:", "yellow"), notification.category_count)
|
||||||
|
|
||||||
|
if notification.event_id not in (
|
||||||
|
EventId.NOTIFICATION_ADDED,
|
||||||
|
EventId.NOTIFICATION_MODIFIED,
|
||||||
|
):
|
||||||
|
continue
|
||||||
|
|
||||||
|
requested_attributes = [
|
||||||
|
NotificationAttributeId.APP_IDENTIFIER,
|
||||||
|
NotificationAttributeId.TITLE,
|
||||||
|
NotificationAttributeId.SUBTITLE,
|
||||||
|
NotificationAttributeId.MESSAGE,
|
||||||
|
NotificationAttributeId.DATE,
|
||||||
|
]
|
||||||
|
if notification.event_flags & EventFlags.NEGATIVE_ACTION:
|
||||||
|
requested_attributes.append(NotificationAttributeId.NEGATIVE_ACTION_LABEL)
|
||||||
|
if notification.event_flags & EventFlags.POSITIVE_ACTION:
|
||||||
|
requested_attributes.append(NotificationAttributeId.POSITIVE_ACTION_LABEL)
|
||||||
|
|
||||||
|
attributes = await ancs_client.get_notification_attributes(
|
||||||
|
notification.notification_uid, requested_attributes
|
||||||
|
)
|
||||||
|
max_attribute_name_width = max(
|
||||||
|
(len(attribute.attribute_id.name) for attribute in attributes)
|
||||||
|
)
|
||||||
|
app_identifier = str(
|
||||||
|
next(
|
||||||
|
(
|
||||||
|
attribute.value
|
||||||
|
for attribute in attributes
|
||||||
|
if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER
|
||||||
|
)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
if app_identifier not in _cached_app_names:
|
||||||
|
app_attributes = await ancs_client.get_app_attributes(
|
||||||
|
app_identifier, [AppAttributeId.DISPLAY_NAME]
|
||||||
|
)
|
||||||
|
_cached_app_names[app_identifier] = app_attributes[0].value
|
||||||
|
app_name = _cached_app_names[app_identifier]
|
||||||
|
|
||||||
|
for attribute in attributes:
|
||||||
|
padding = ' ' * (
|
||||||
|
max_attribute_name_width - len(attribute.attribute_id.name)
|
||||||
|
)
|
||||||
|
suffix = (
|
||||||
|
f" ({app_name})"
|
||||||
|
if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER
|
||||||
|
else ""
|
||||||
|
)
|
||||||
|
print(
|
||||||
|
color(f" {attribute.attribute_id.name}:{padding}", "blue"),
|
||||||
|
f"{attribute.value}{suffix}",
|
||||||
|
)
|
||||||
|
|
||||||
|
print()
|
||||||
|
|
||||||
|
|
||||||
|
def on_ancs_notification(notification: Notification) -> None:
|
||||||
|
_notification_queue.put_nowait(notification)
|
||||||
|
|
||||||
|
|
||||||
|
async def handle_command_client(
|
||||||
|
ancs_client: AncsClient, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||||
|
) -> None:
|
||||||
|
while True:
|
||||||
|
command = (await reader.readline()).decode("utf-8").strip()
|
||||||
|
|
||||||
|
try:
|
||||||
|
command_name, command_args = command.split(" ", 1)
|
||||||
|
if command_name == "+":
|
||||||
|
notification_uid = int(command_args)
|
||||||
|
await ancs_client.perform_positive_action(notification_uid)
|
||||||
|
elif command_name == "-":
|
||||||
|
notification_uid = int(command_args)
|
||||||
|
await ancs_client.perform_negative_action(notification_uid)
|
||||||
|
else:
|
||||||
|
writer.write(f"unknown command {command_name}".encode("utf-8"))
|
||||||
|
except Exception as error:
|
||||||
|
writer.write(f"ERROR: {error}\n".encode("utf-8"))
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
async def main() -> None:
|
||||||
|
if len(sys.argv) < 3:
|
||||||
|
print(
|
||||||
|
'Usage: run_ancs_client.py <device-config> <transport-spec> '
|
||||||
|
'<bluetooth-address> <mtu>'
|
||||||
|
)
|
||||||
|
print('example: run_ancs_client.py device1.json usb:0 E1:CA:72:48:C4:E8 512')
|
||||||
|
return
|
||||||
|
device_config, transport_spec, bluetooth_address, mtu = sys.argv[1:]
|
||||||
|
|
||||||
|
print('<<< connecting to HCI...')
|
||||||
|
async with await open_transport(transport_spec) as hci_transport:
|
||||||
|
print('<<< connected')
|
||||||
|
|
||||||
|
# Create a device to manage the host, with a custom listener
|
||||||
|
device = Device.from_config_file_with_hci(
|
||||||
|
device_config, hci_transport.source, hci_transport.sink
|
||||||
|
)
|
||||||
|
await device.power_on()
|
||||||
|
|
||||||
|
# Connect to the peer
|
||||||
|
print(f'=== Connecting to {bluetooth_address}...')
|
||||||
|
connection = await device.connect(bluetooth_address)
|
||||||
|
print(f'=== Connected: {connection}')
|
||||||
|
|
||||||
|
await connection.encrypt()
|
||||||
|
|
||||||
|
peer = Peer(connection)
|
||||||
|
mtu_int = int(mtu)
|
||||||
|
if mtu_int:
|
||||||
|
new_mtu = await peer.request_mtu(mtu_int)
|
||||||
|
print(f'ATT MTU = {new_mtu}')
|
||||||
|
ancs_client = await AncsClient.for_peer(peer)
|
||||||
|
if ancs_client is None:
|
||||||
|
print("!!! no ANCS service found")
|
||||||
|
return
|
||||||
|
await ancs_client.start()
|
||||||
|
|
||||||
|
print('Subscribing to updates')
|
||||||
|
ancs_client.on("notification", on_ancs_notification)
|
||||||
|
|
||||||
|
# Process all notifications in a task.
|
||||||
|
notification_processing_task = asyncio.create_task(
|
||||||
|
process_notifications(ancs_client)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Accept a TCP connection to handle commands.
|
||||||
|
tcp_server = await asyncio.start_server(
|
||||||
|
lambda reader, writer: handle_command_client(ancs_client, reader, writer),
|
||||||
|
'127.0.0.1',
|
||||||
|
9000,
|
||||||
|
)
|
||||||
|
print("Accepting command client on port 9000")
|
||||||
|
async with tcp_server:
|
||||||
|
await tcp_server.serve_forever()
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||||
|
asyncio.run(main())
|
||||||
Reference in New Issue
Block a user