forked from auracaster/bumble_mirror
Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 776bdae519 | |||
| b2d9541f8f | |||
| 637224d5bc | |||
| 92ab171013 | |||
| 592475e2ed | |||
| 12bcdb7770 | |||
| 7a58f36020 |
+10
-2
@@ -223,7 +223,12 @@ UUID_2_FIELD_SPEC = lambda x, y: UUID.parse_uuid_2(x, y) # noqa: E731
|
||||
# Exceptions
|
||||
# -----------------------------------------------------------------------------
|
||||
class ATT_Error(ProtocolError):
|
||||
def __init__(self, error_code, att_handle=0x0000, message=''):
|
||||
error_code: int
|
||||
att_handle: int
|
||||
|
||||
def __init__(
|
||||
self, error_code: int, att_handle: int = 0x0000, message: str = ''
|
||||
) -> None:
|
||||
super().__init__(
|
||||
error_code,
|
||||
error_namespace='att',
|
||||
@@ -233,7 +238,10 @@ class ATT_Error(ProtocolError):
|
||||
self.message = message
|
||||
|
||||
def __str__(self):
|
||||
return f'ATT_Error(error={self.error_name}, handle={self.att_handle:04X}): {self.message}'
|
||||
return (
|
||||
f'ATT_Error(error={self.error_name}, '
|
||||
f'handle={self.att_handle:04X}): {self.message}'
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
|
||||
@@ -25,8 +25,6 @@ import random
|
||||
import struct
|
||||
from bumble.colors import color
|
||||
from bumble.core import (
|
||||
BT_CENTRAL_ROLE,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
BT_LE_TRANSPORT,
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
)
|
||||
@@ -47,6 +45,7 @@ from bumble.hci import (
|
||||
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
|
||||
HCI_VERSION_BLUETOOTH_CORE_5_0,
|
||||
Address,
|
||||
Role,
|
||||
HCI_AclDataPacket,
|
||||
HCI_AclDataPacketAssembler,
|
||||
HCI_Command_Complete_Event,
|
||||
@@ -98,7 +97,7 @@ class CisLink:
|
||||
class Connection:
|
||||
controller: Controller
|
||||
handle: int
|
||||
role: int
|
||||
role: Role
|
||||
peer_address: Address
|
||||
link: Any
|
||||
transport: int
|
||||
@@ -390,7 +389,7 @@ class Controller:
|
||||
connection = Connection(
|
||||
controller=self,
|
||||
handle=connection_handle,
|
||||
role=BT_PERIPHERAL_ROLE,
|
||||
role=Role.PERIPHERAL,
|
||||
peer_address=peer_address,
|
||||
link=self.link,
|
||||
transport=BT_LE_TRANSPORT,
|
||||
@@ -450,7 +449,7 @@ class Controller:
|
||||
connection = Connection(
|
||||
controller=self,
|
||||
handle=connection_handle,
|
||||
role=BT_CENTRAL_ROLE,
|
||||
role=Role.CENTRAL,
|
||||
peer_address=peer_address,
|
||||
link=self.link,
|
||||
transport=BT_LE_TRANSPORT,
|
||||
@@ -469,7 +468,7 @@ class Controller:
|
||||
HCI_LE_Connection_Complete_Event(
|
||||
status=status,
|
||||
connection_handle=connection.handle if connection else 0,
|
||||
role=BT_CENTRAL_ROLE,
|
||||
role=Role.CENTRAL,
|
||||
peer_address_type=le_create_connection_command.peer_address_type,
|
||||
peer_address=le_create_connection_command.peer_address,
|
||||
connection_interval=le_create_connection_command.connection_interval_min,
|
||||
@@ -693,7 +692,7 @@ class Controller:
|
||||
controller=self,
|
||||
handle=connection_handle,
|
||||
# Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery
|
||||
role=BT_CENTRAL_ROLE,
|
||||
role=Role.CENTRAL,
|
||||
peer_address=peer_address,
|
||||
link=self.link,
|
||||
transport=BT_BR_EDR_TRANSPORT,
|
||||
@@ -761,7 +760,7 @@ class Controller:
|
||||
controller=self,
|
||||
handle=connection_handle,
|
||||
# Role doesn't matter in SCO.
|
||||
role=BT_CENTRAL_ROLE,
|
||||
role=Role.CENTRAL,
|
||||
peer_address=peer_address,
|
||||
link=self.link,
|
||||
transport=BT_BR_EDR_TRANSPORT,
|
||||
|
||||
+5
-4
@@ -31,11 +31,12 @@ from bumble.utils import OpenIntEnum
|
||||
# -----------------------------------------------------------------------------
|
||||
# fmt: off
|
||||
|
||||
BT_CENTRAL_ROLE = 0
|
||||
BT_PERIPHERAL_ROLE = 1
|
||||
class PhysicalTransport(enum.IntEnum):
|
||||
BR_EDR = 0
|
||||
LE = 1
|
||||
|
||||
BT_BR_EDR_TRANSPORT = 0
|
||||
BT_LE_TRANSPORT = 1
|
||||
BT_BR_EDR_TRANSPORT = PhysicalTransport.BR_EDR
|
||||
BT_LE_TRANSPORT = PhysicalTransport.LE
|
||||
|
||||
|
||||
# fmt: on
|
||||
|
||||
+25
-26
@@ -58,9 +58,7 @@ from .host import DataPacketQueue, Host
|
||||
from .profiles.gap import GenericAccessService
|
||||
from .core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_CENTRAL_ROLE,
|
||||
BT_LE_TRANSPORT,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
AdvertisingData,
|
||||
BaseBumbleError,
|
||||
ConnectionParameterUpdateError,
|
||||
@@ -1555,13 +1553,13 @@ class IsoPacketStream:
|
||||
class Connection(CompositeEventEmitter):
|
||||
device: Device
|
||||
handle: int
|
||||
transport: int
|
||||
transport: core.PhysicalTransport
|
||||
self_address: hci.Address
|
||||
self_resolvable_address: Optional[hci.Address]
|
||||
peer_address: hci.Address
|
||||
peer_resolvable_address: Optional[hci.Address]
|
||||
peer_le_features: Optional[hci.LeFeatureMask]
|
||||
role: int
|
||||
role: hci.Role
|
||||
encryption: int
|
||||
authenticated: bool
|
||||
sc: bool
|
||||
@@ -1674,9 +1672,9 @@ class Connection(CompositeEventEmitter):
|
||||
def role_name(self):
|
||||
if self.role is None:
|
||||
return 'NOT-SET'
|
||||
if self.role == BT_CENTRAL_ROLE:
|
||||
if self.role == hci.Role.CENTRAL:
|
||||
return 'CENTRAL'
|
||||
if self.role == BT_PERIPHERAL_ROLE:
|
||||
if self.role == hci.Role.PERIPHERAL:
|
||||
return 'PERIPHERAL'
|
||||
return f'UNKNOWN[{self.role}]'
|
||||
|
||||
@@ -1734,7 +1732,7 @@ class Connection(CompositeEventEmitter):
|
||||
async def encrypt(self, enable: bool = True) -> None:
|
||||
return await self.device.encrypt(self, enable)
|
||||
|
||||
async def switch_role(self, role: int) -> None:
|
||||
async def switch_role(self, role: hci.Role) -> None:
|
||||
return await self.device.switch_role(self, role)
|
||||
|
||||
async def sustain(self, timeout: Optional[float] = None) -> None:
|
||||
@@ -2713,7 +2711,7 @@ class Device(CompositeEventEmitter):
|
||||
if phy == hci.HCI_LE_1M_PHY:
|
||||
return True
|
||||
|
||||
feature_map = {
|
||||
feature_map: dict[int, hci.LeFeatureMask] = {
|
||||
hci.HCI_LE_2M_PHY: hci.LeFeatureMask.LE_2M_PHY,
|
||||
hci.HCI_LE_CODED_PHY: hci.LeFeatureMask.LE_CODED_PHY,
|
||||
}
|
||||
@@ -2734,7 +2732,7 @@ class Device(CompositeEventEmitter):
|
||||
self,
|
||||
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
|
||||
target: Optional[hci.Address] = None,
|
||||
own_address_type: int = hci.OwnAddressType.RANDOM,
|
||||
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
|
||||
auto_restart: bool = False,
|
||||
advertising_data: Optional[bytes] = None,
|
||||
scan_response_data: Optional[bytes] = None,
|
||||
@@ -3015,7 +3013,7 @@ class Device(CompositeEventEmitter):
|
||||
active: bool = True,
|
||||
scan_interval: float = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms
|
||||
scan_window: float = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms
|
||||
own_address_type: int = hci.OwnAddressType.RANDOM,
|
||||
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
|
||||
filter_duplicates: bool = False,
|
||||
scanning_phys: Sequence[int] = (hci.HCI_LE_1M_PHY, hci.HCI_LE_CODED_PHY),
|
||||
) -> None:
|
||||
@@ -3091,7 +3089,7 @@ class Device(CompositeEventEmitter):
|
||||
# pylint: disable=line-too-long
|
||||
hci.HCI_LE_Set_Scan_Parameters_Command(
|
||||
le_scan_type=scan_type,
|
||||
le_scan_interval=int(scan_window / 0.625),
|
||||
le_scan_interval=int(scan_interval / 0.625),
|
||||
le_scan_window=int(scan_window / 0.625),
|
||||
own_address_type=own_address_type,
|
||||
scanning_filter_policy=hci.HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
|
||||
@@ -3381,11 +3379,11 @@ class Device(CompositeEventEmitter):
|
||||
async def connect(
|
||||
self,
|
||||
peer_address: Union[hci.Address, str],
|
||||
transport: int = BT_LE_TRANSPORT,
|
||||
transport: core.PhysicalTransport = BT_LE_TRANSPORT,
|
||||
connection_parameters_preferences: Optional[
|
||||
Dict[int, ConnectionParametersPreferences]
|
||||
dict[hci.Phy, ConnectionParametersPreferences]
|
||||
] = None,
|
||||
own_address_type: int = hci.OwnAddressType.RANDOM,
|
||||
own_address_type: hci.OwnAddressType = hci.OwnAddressType.RANDOM,
|
||||
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
|
||||
always_resolve: bool = False,
|
||||
) -> Connection:
|
||||
@@ -3433,6 +3431,7 @@ class Device(CompositeEventEmitter):
|
||||
# Check parameters
|
||||
if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT):
|
||||
raise InvalidArgumentError('invalid transport')
|
||||
transport = core.PhysicalTransport(transport)
|
||||
|
||||
# Adjust the transport automatically if we need to
|
||||
if transport == BT_LE_TRANSPORT and not self.le_enabled:
|
||||
@@ -3628,7 +3627,7 @@ class Device(CompositeEventEmitter):
|
||||
else:
|
||||
# Save pending connection
|
||||
self.pending_connections[peer_address] = Connection.incomplete(
|
||||
self, peer_address, BT_CENTRAL_ROLE
|
||||
self, peer_address, hci.Role.CENTRAL
|
||||
)
|
||||
|
||||
# TODO: allow passing other settings
|
||||
@@ -3683,7 +3682,7 @@ class Device(CompositeEventEmitter):
|
||||
async def accept(
|
||||
self,
|
||||
peer_address: Union[hci.Address, str] = hci.Address.ANY,
|
||||
role: int = BT_PERIPHERAL_ROLE,
|
||||
role: hci.Role = hci.Role.PERIPHERAL,
|
||||
timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT,
|
||||
) -> Connection:
|
||||
'''
|
||||
@@ -3769,12 +3768,12 @@ class Device(CompositeEventEmitter):
|
||||
self.on('connection', on_connection)
|
||||
self.on('connection_failure', on_connection_failure)
|
||||
|
||||
# Save pending connection, with the Peripheral role.
|
||||
# Save pending connection, with the Peripheral hci.role.
|
||||
# Even if we requested a role switch in the hci.HCI_Accept_Connection_Request
|
||||
# command, this connection is still considered Peripheral until an eventual
|
||||
# role change event.
|
||||
self.pending_connections[peer_address] = Connection.incomplete(
|
||||
self, peer_address, BT_PERIPHERAL_ROLE
|
||||
self, peer_address, hci.Role.PERIPHERAL
|
||||
)
|
||||
|
||||
try:
|
||||
@@ -3903,7 +3902,7 @@ class Device(CompositeEventEmitter):
|
||||
'''
|
||||
|
||||
if use_l2cap:
|
||||
if connection.role != BT_PERIPHERAL_ROLE:
|
||||
if connection.role != hci.Role.PERIPHERAL:
|
||||
raise InvalidStateError(
|
||||
'only peripheral can update connection parameters with l2cap'
|
||||
)
|
||||
@@ -4148,10 +4147,10 @@ class Device(CompositeEventEmitter):
|
||||
if keys.ltk:
|
||||
return keys.ltk.value
|
||||
|
||||
if connection.role == BT_CENTRAL_ROLE and keys.ltk_central:
|
||||
if connection.role == hci.Role.CENTRAL and keys.ltk_central:
|
||||
return keys.ltk_central.value
|
||||
|
||||
if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral:
|
||||
if connection.role == hci.Role.PERIPHERAL and keys.ltk_peripheral:
|
||||
return keys.ltk_peripheral.value
|
||||
return None
|
||||
|
||||
@@ -4303,7 +4302,7 @@ class Device(CompositeEventEmitter):
|
||||
self.emit('key_store_update')
|
||||
|
||||
# [Classic only]
|
||||
async def switch_role(self, connection: Connection, role: int):
|
||||
async def switch_role(self, connection: Connection, role: hci.Role):
|
||||
pending_role_change = asyncio.get_running_loop().create_future()
|
||||
|
||||
def on_role_change(new_role):
|
||||
@@ -5178,11 +5177,11 @@ class Device(CompositeEventEmitter):
|
||||
def on_connection(
|
||||
self,
|
||||
connection_handle: int,
|
||||
transport: int,
|
||||
transport: core.PhysicalTransport,
|
||||
peer_address: hci.Address,
|
||||
self_resolvable_address: Optional[hci.Address],
|
||||
peer_resolvable_address: Optional[hci.Address],
|
||||
role: int,
|
||||
role: hci.Role,
|
||||
connection_parameters: ConnectionParameters,
|
||||
) -> None:
|
||||
# Convert all-zeros addresses into None.
|
||||
@@ -5225,7 +5224,7 @@ class Device(CompositeEventEmitter):
|
||||
peer_address = resolved_address
|
||||
|
||||
self_address = None
|
||||
own_address_type: Optional[int] = None
|
||||
own_address_type: Optional[hci.OwnAddressType] = None
|
||||
if role == hci.HCI_CENTRAL_ROLE:
|
||||
own_address_type = self.connect_own_address_type
|
||||
assert own_address_type is not None
|
||||
@@ -5353,7 +5352,7 @@ class Device(CompositeEventEmitter):
|
||||
elif self.classic_accept_any:
|
||||
# Save pending connection
|
||||
self.pending_connections[bd_addr] = Connection.incomplete(
|
||||
self, bd_addr, BT_PERIPHERAL_ROLE
|
||||
self, bd_addr, hci.Role.PERIPHERAL
|
||||
)
|
||||
|
||||
self.host.send_command_sync(
|
||||
|
||||
@@ -286,6 +286,22 @@ GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID('38663f1a-e711-4cac-b641-32
|
||||
GATT_ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
|
||||
GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
|
||||
|
||||
# Apple Notification Center Service
|
||||
GATT_ANCS_SERVICE = UUID('7905F431-B5CE-4E99-A40F-4B1E122D00D0', 'Apple Notification Center')
|
||||
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC = UUID('9FBF120D-6301-42D9-8C58-25E699A21DBD', 'Notification Source')
|
||||
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC = UUID('69D1D8F3-45E1-49A8-9821-9BBDFDAAD9D9', 'Control Point')
|
||||
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC = UUID('22EAC6E9-24D6-4BB5-BE44-B36ACE7C7BFB', 'Data Source')
|
||||
|
||||
# Apple Media Service
|
||||
GATT_AMS_SERVICE = UUID('89D3502B-0F36-433A-8EF4-C502AD55F8DC', 'Apple Media')
|
||||
GATT_AMS_REMOTE_COMMAND_CHARACTERISTIC = UUID('9B3C81D8-57B1-4A8A-B8DF-0E56F7CA51C2', 'Remote Command')
|
||||
GATT_AMS_ENTITY_UPDATE_CHARACTERISTIC = UUID('2F7CABCE-808D-411F-9A0C-BB92BA96C102', 'Entity Update')
|
||||
GATT_AMS_ENTITY_ATTRIBUTE_CHARACTERISTIC = UUID('C6B2F38C-23AB-46D8-A6AB-A3A870BBD5D7', 'Entity Attribute')
|
||||
|
||||
# Misc Apple Services
|
||||
GATT_APPLE_CONTINUITY_SERVICE = UUID('D0611E78-BBB4-4591-A5F8-487910AE4366', 'Apple Continuity')
|
||||
GATT_APPLE_NEARBY_SERVICE = UUID('9FA480E0-4967-4542-9390-D343DC5D04AE', 'Apple Nearby')
|
||||
|
||||
# Misc
|
||||
GATT_DEVICE_NAME_CHARACTERISTIC = UUID.from_16_bits(0x2A00, 'Device Name')
|
||||
GATT_APPEARANCE_CHARACTERISTIC = UUID.from_16_bits(0x2A01, 'Appearance')
|
||||
|
||||
+76
-71
@@ -24,6 +24,7 @@ import logging
|
||||
import secrets
|
||||
import struct
|
||||
from typing import Any, Callable, Dict, Iterable, List, Optional, Type, Union, ClassVar
|
||||
from typing_extensions import Self
|
||||
|
||||
from bumble import crypto
|
||||
from bumble.colors import color
|
||||
@@ -34,6 +35,7 @@ from bumble.core import (
|
||||
InvalidArgumentError,
|
||||
InvalidPacketError,
|
||||
ProtocolError,
|
||||
PhysicalTransport,
|
||||
bit_flags_to_strings,
|
||||
name_or_number,
|
||||
padded_bytes,
|
||||
@@ -94,7 +96,7 @@ def map_class_of_device(class_of_device):
|
||||
)
|
||||
|
||||
|
||||
def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int:
|
||||
def phy_list_to_bits(phys: Optional[Iterable[Phy]]) -> int:
|
||||
if phys is None:
|
||||
return 0
|
||||
|
||||
@@ -700,30 +702,22 @@ HCI_ERROR_NAMES[HCI_SUCCESS] = 'HCI_SUCCESS'
|
||||
HCI_COMMAND_STATUS_PENDING = 0
|
||||
|
||||
|
||||
class Phy(enum.IntEnum):
|
||||
LE_1M = 1
|
||||
LE_2M = 2
|
||||
LE_CODED = 3
|
||||
|
||||
|
||||
# ACL
|
||||
HCI_ACL_PB_FIRST_NON_FLUSHABLE = 0
|
||||
HCI_ACL_PB_CONTINUATION = 1
|
||||
HCI_ACL_PB_FIRST_FLUSHABLE = 2
|
||||
HCI_ACK_PB_COMPLETE_L2CAP = 3
|
||||
|
||||
# Roles
|
||||
HCI_CENTRAL_ROLE = 0
|
||||
HCI_PERIPHERAL_ROLE = 1
|
||||
|
||||
HCI_ROLE_NAMES = {
|
||||
HCI_CENTRAL_ROLE: 'CENTRAL',
|
||||
HCI_PERIPHERAL_ROLE: 'PERIPHERAL'
|
||||
}
|
||||
|
||||
# LE PHY Types
|
||||
HCI_LE_1M_PHY = 1
|
||||
HCI_LE_2M_PHY = 2
|
||||
HCI_LE_CODED_PHY = 3
|
||||
|
||||
HCI_LE_PHY_NAMES = {
|
||||
HCI_LE_1M_PHY: 'LE 1M',
|
||||
HCI_LE_2M_PHY: 'LE 2M',
|
||||
HCI_LE_CODED_PHY: 'LE Coded'
|
||||
HCI_LE_PHY_NAMES: dict[int,str] = {
|
||||
Phy.LE_1M: 'LE 1M',
|
||||
Phy.LE_2M: 'LE 2M',
|
||||
Phy.LE_CODED: 'LE Coded'
|
||||
}
|
||||
|
||||
HCI_LE_1M_PHY_BIT = 0
|
||||
@@ -732,19 +726,13 @@ HCI_LE_CODED_PHY_BIT = 2
|
||||
|
||||
HCI_LE_PHY_BIT_NAMES = ['LE_1M_PHY', 'LE_2M_PHY', 'LE_CODED_PHY']
|
||||
|
||||
HCI_LE_PHY_TYPE_TO_BIT = {
|
||||
HCI_LE_1M_PHY: HCI_LE_1M_PHY_BIT,
|
||||
HCI_LE_2M_PHY: HCI_LE_2M_PHY_BIT,
|
||||
HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_BIT
|
||||
HCI_LE_PHY_TYPE_TO_BIT: dict[Phy, int] = {
|
||||
Phy.LE_1M: HCI_LE_1M_PHY_BIT,
|
||||
Phy.LE_2M: HCI_LE_2M_PHY_BIT,
|
||||
Phy.LE_CODED: HCI_LE_CODED_PHY_BIT,
|
||||
}
|
||||
|
||||
|
||||
class Phy(enum.IntEnum):
|
||||
LE_1M = HCI_LE_1M_PHY
|
||||
LE_2M = HCI_LE_2M_PHY
|
||||
LE_CODED = HCI_LE_CODED_PHY
|
||||
|
||||
|
||||
class PhyBit(enum.IntFlag):
|
||||
LE_1M = 1 << HCI_LE_1M_PHY_BIT
|
||||
LE_2M = 1 << HCI_LE_2M_PHY_BIT
|
||||
@@ -811,6 +799,19 @@ class CsSubeventAbortReason(OpenIntEnum):
|
||||
SCHEDULING_CONFLICT_OR_LIMITED_RESOURCES = 0x03
|
||||
UNSPECIFIED = 0x0F
|
||||
|
||||
class Role(enum.IntEnum):
|
||||
CENTRAL = 0
|
||||
PERIPHERAL = 1
|
||||
|
||||
# For Backward Compatibility.
|
||||
HCI_CENTRAL_ROLE = Role.CENTRAL
|
||||
HCI_PERIPHERAL_ROLE = Role.PERIPHERAL
|
||||
|
||||
|
||||
HCI_LE_1M_PHY = Phy.LE_1M
|
||||
HCI_LE_2M_PHY = Phy.LE_2M
|
||||
HCI_LE_CODED_PHY = Phy.LE_CODED
|
||||
|
||||
|
||||
# Connection Parameters
|
||||
HCI_CONNECTION_INTERVAL_MS_PER_UNIT = 1.25
|
||||
@@ -889,10 +890,15 @@ HCI_LINK_TYPE_NAMES = {
|
||||
}
|
||||
|
||||
# Address types
|
||||
HCI_PUBLIC_DEVICE_ADDRESS_TYPE = 0x00
|
||||
HCI_RANDOM_DEVICE_ADDRESS_TYPE = 0x01
|
||||
HCI_PUBLIC_IDENTITY_ADDRESS_TYPE = 0x02
|
||||
HCI_RANDOM_IDENTITY_ADDRESS_TYPE = 0x03
|
||||
class AddressType(OpenIntEnum):
|
||||
PUBLIC_DEVICE = 0x00
|
||||
RANDOM_DEVICE = 0x01
|
||||
PUBLIC_IDENTITY = 0x02
|
||||
RANDOM_IDENTITY = 0x03
|
||||
# (Directed Only) Address is RPA, but controller cannot resolve.
|
||||
UNABLE_TO_RESOLVE = 0xFE
|
||||
# (Extended Only) No address.
|
||||
ANONYMOUS = 0xFF
|
||||
|
||||
# Supported Commands Masks
|
||||
# See Bluetooth spec @ 6.27 SUPPORTED COMMANDS
|
||||
@@ -1582,8 +1588,8 @@ class HCI_Constant:
|
||||
return HCI_ERROR_NAMES.get(status, f'0x{status:02X}')
|
||||
|
||||
@staticmethod
|
||||
def role_name(role):
|
||||
return HCI_ROLE_NAMES.get(role, str(role))
|
||||
def role_name(role: int) -> str:
|
||||
return Role(role).name
|
||||
|
||||
@staticmethod
|
||||
def le_phy_name(phy):
|
||||
@@ -1949,17 +1955,10 @@ class Address:
|
||||
address[0] is the LSB of the address, address[5] is the MSB.
|
||||
'''
|
||||
|
||||
PUBLIC_DEVICE_ADDRESS = 0x00
|
||||
RANDOM_DEVICE_ADDRESS = 0x01
|
||||
PUBLIC_IDENTITY_ADDRESS = 0x02
|
||||
RANDOM_IDENTITY_ADDRESS = 0x03
|
||||
|
||||
ADDRESS_TYPE_NAMES = {
|
||||
PUBLIC_DEVICE_ADDRESS: 'PUBLIC_DEVICE_ADDRESS',
|
||||
RANDOM_DEVICE_ADDRESS: 'RANDOM_DEVICE_ADDRESS',
|
||||
PUBLIC_IDENTITY_ADDRESS: 'PUBLIC_IDENTITY_ADDRESS',
|
||||
RANDOM_IDENTITY_ADDRESS: 'RANDOM_IDENTITY_ADDRESS',
|
||||
}
|
||||
PUBLIC_DEVICE_ADDRESS = AddressType.PUBLIC_DEVICE
|
||||
RANDOM_DEVICE_ADDRESS = AddressType.RANDOM_DEVICE
|
||||
PUBLIC_IDENTITY_ADDRESS = AddressType.PUBLIC_IDENTITY
|
||||
RANDOM_IDENTITY_ADDRESS = AddressType.RANDOM_IDENTITY
|
||||
|
||||
# Type declarations
|
||||
NIL: Address
|
||||
@@ -1969,40 +1968,44 @@ class Address:
|
||||
# pylint: disable-next=unnecessary-lambda
|
||||
ADDRESS_TYPE_SPEC = {'size': 1, 'mapper': lambda x: Address.address_type_name(x)}
|
||||
|
||||
@staticmethod
|
||||
def address_type_name(address_type):
|
||||
return name_or_number(Address.ADDRESS_TYPE_NAMES, address_type)
|
||||
@classmethod
|
||||
def address_type_name(cls: type[Self], address_type: int) -> str:
|
||||
return AddressType(address_type).name
|
||||
|
||||
@staticmethod
|
||||
def from_string_for_transport(string, transport):
|
||||
@classmethod
|
||||
def from_string_for_transport(
|
||||
cls: type[Self], string: str, transport: PhysicalTransport
|
||||
) -> Self:
|
||||
if transport == BT_BR_EDR_TRANSPORT:
|
||||
address_type = Address.PUBLIC_DEVICE_ADDRESS
|
||||
else:
|
||||
address_type = Address.RANDOM_DEVICE_ADDRESS
|
||||
return Address(string, address_type)
|
||||
return cls(string, address_type)
|
||||
|
||||
@staticmethod
|
||||
def parse_address(data, offset):
|
||||
@classmethod
|
||||
def parse_address(cls: type[Self], data: bytes, offset: int) -> tuple[int, Self]:
|
||||
# Fix the type to a default value. This is used for parsing type-less Classic
|
||||
# addresses
|
||||
return Address.parse_address_with_type(
|
||||
data, offset, Address.PUBLIC_DEVICE_ADDRESS
|
||||
)
|
||||
return cls.parse_address_with_type(data, offset, Address.PUBLIC_DEVICE_ADDRESS)
|
||||
|
||||
@staticmethod
|
||||
def parse_random_address(data, offset):
|
||||
return Address.parse_address_with_type(
|
||||
data, offset, Address.RANDOM_DEVICE_ADDRESS
|
||||
)
|
||||
@classmethod
|
||||
def parse_random_address(
|
||||
cls: type[Self], data: bytes, offset: int
|
||||
) -> tuple[int, Self]:
|
||||
return cls.parse_address_with_type(data, offset, Address.RANDOM_DEVICE_ADDRESS)
|
||||
|
||||
@staticmethod
|
||||
def parse_address_with_type(data, offset, address_type):
|
||||
return offset + 6, Address(data[offset : offset + 6], address_type)
|
||||
@classmethod
|
||||
def parse_address_with_type(
|
||||
cls: type[Self], data: bytes, offset: int, address_type: AddressType
|
||||
) -> tuple[int, Self]:
|
||||
return offset + 6, cls(data[offset : offset + 6], address_type)
|
||||
|
||||
@staticmethod
|
||||
def parse_address_preceded_by_type(data, offset):
|
||||
address_type = data[offset - 1]
|
||||
return Address.parse_address_with_type(data, offset, address_type)
|
||||
@classmethod
|
||||
def parse_address_preceded_by_type(
|
||||
cls: type[Self], data: bytes, offset: int
|
||||
) -> tuple[int, Self]:
|
||||
address_type = AddressType(data[offset - 1])
|
||||
return cls.parse_address_with_type(data, offset, address_type)
|
||||
|
||||
@classmethod
|
||||
def generate_static_address(cls) -> Address:
|
||||
@@ -2042,8 +2045,10 @@ class Address:
|
||||
)
|
||||
|
||||
def __init__(
|
||||
self, address: Union[bytes, str], address_type: int = RANDOM_DEVICE_ADDRESS
|
||||
):
|
||||
self,
|
||||
address: Union[bytes, str],
|
||||
address_type: AddressType = RANDOM_DEVICE_ADDRESS,
|
||||
) -> None:
|
||||
'''
|
||||
Initialize an instance. `address` may be a byte array in little-endian
|
||||
format, or a hex string in big-endian format (with optional ':'
|
||||
|
||||
+8
-3
@@ -44,6 +44,7 @@ from bumble import hci
|
||||
from bumble.core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_LE_TRANSPORT,
|
||||
PhysicalTransport,
|
||||
ConnectionPHY,
|
||||
ConnectionParameters,
|
||||
)
|
||||
@@ -186,7 +187,11 @@ class DataPacketQueue(pyee.EventEmitter):
|
||||
# -----------------------------------------------------------------------------
|
||||
class Connection:
|
||||
def __init__(
|
||||
self, host: Host, handle: int, peer_address: hci.Address, transport: int
|
||||
self,
|
||||
host: Host,
|
||||
handle: int,
|
||||
peer_address: hci.Address,
|
||||
transport: PhysicalTransport,
|
||||
):
|
||||
self.host = host
|
||||
self.handle = handle
|
||||
@@ -979,7 +984,7 @@ class Host(AbortableEventEmitter):
|
||||
event.peer_address,
|
||||
getattr(event, 'local_resolvable_private_address', None),
|
||||
getattr(event, 'peer_resolvable_private_address', None),
|
||||
event.role,
|
||||
hci.Role(event.role),
|
||||
connection_parameters,
|
||||
)
|
||||
else:
|
||||
@@ -1337,7 +1342,7 @@ class Host(AbortableEventEmitter):
|
||||
f'role change for {event.bd_addr}: '
|
||||
f'{hci.HCI_Constant.role_name(event.new_role)}'
|
||||
)
|
||||
self.emit('role_change', event.bd_addr, event.new_role)
|
||||
self.emit('role_change', event.bd_addr, hci.Role(event.new_role))
|
||||
else:
|
||||
logger.debug(
|
||||
f'role change for {event.bd_addr} failed: '
|
||||
|
||||
+2
-2
@@ -42,7 +42,6 @@ from typing import (
|
||||
from .utils import deprecated
|
||||
from .colors import color
|
||||
from .core import (
|
||||
BT_CENTRAL_ROLE,
|
||||
InvalidStateError,
|
||||
InvalidArgumentError,
|
||||
InvalidPacketError,
|
||||
@@ -52,6 +51,7 @@ from .core import (
|
||||
from .hci import (
|
||||
HCI_LE_Connection_Update_Command,
|
||||
HCI_Object,
|
||||
Role,
|
||||
key_with_value,
|
||||
name_or_number,
|
||||
)
|
||||
@@ -1908,7 +1908,7 @@ class ChannelManager:
|
||||
def on_l2cap_connection_parameter_update_request(
|
||||
self, connection: Connection, cid: int, request
|
||||
):
|
||||
if connection.role == BT_CENTRAL_ROLE:
|
||||
if connection.role == Role.CENTRAL:
|
||||
self.send_control_frame(
|
||||
connection,
|
||||
cid,
|
||||
|
||||
+2
-2
@@ -20,7 +20,6 @@ import asyncio
|
||||
from functools import partial
|
||||
|
||||
from bumble.core import (
|
||||
BT_PERIPHERAL_ROLE,
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_LE_TRANSPORT,
|
||||
InvalidStateError,
|
||||
@@ -28,6 +27,7 @@ from bumble.core import (
|
||||
from bumble.colors import color
|
||||
from bumble.hci import (
|
||||
Address,
|
||||
Role,
|
||||
HCI_SUCCESS,
|
||||
HCI_CONNECTION_ACCEPT_TIMEOUT_ERROR,
|
||||
HCI_CONNECTION_TIMEOUT_ERROR,
|
||||
@@ -292,7 +292,7 @@ class LocalLink:
|
||||
return
|
||||
|
||||
async def task():
|
||||
if responder_role != BT_PERIPHERAL_ROLE:
|
||||
if responder_role != Role.PERIPHERAL:
|
||||
initiator_controller.on_classic_role_change(
|
||||
responder_controller.public_address, int(not (responder_role))
|
||||
)
|
||||
|
||||
+12
-11
@@ -25,7 +25,6 @@ from .config import Config
|
||||
from bumble.core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_LE_TRANSPORT,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
UUID,
|
||||
AdvertisingData,
|
||||
Appearance,
|
||||
@@ -47,6 +46,8 @@ from bumble.hci import (
|
||||
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
|
||||
Address,
|
||||
Phy,
|
||||
Role,
|
||||
OwnAddressType,
|
||||
)
|
||||
from google.protobuf import any_pb2 # pytype: disable=pyi-error
|
||||
from google.protobuf import empty_pb2 # pytype: disable=pyi-error
|
||||
@@ -114,11 +115,11 @@ SECONDARY_PHY_TO_BUMBLE_PHY_MAP: Dict[SecondaryPhy, Phy] = {
|
||||
SECONDARY_CODED: Phy.LE_CODED,
|
||||
}
|
||||
|
||||
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, bumble.hci.OwnAddressType] = {
|
||||
host_pb2.PUBLIC: bumble.hci.OwnAddressType.PUBLIC,
|
||||
host_pb2.RANDOM: bumble.hci.OwnAddressType.RANDOM,
|
||||
host_pb2.RESOLVABLE_OR_PUBLIC: bumble.hci.OwnAddressType.RESOLVABLE_OR_PUBLIC,
|
||||
host_pb2.RESOLVABLE_OR_RANDOM: bumble.hci.OwnAddressType.RESOLVABLE_OR_RANDOM,
|
||||
OWN_ADDRESS_MAP: Dict[host_pb2.OwnAddressType, OwnAddressType] = {
|
||||
host_pb2.PUBLIC: OwnAddressType.PUBLIC,
|
||||
host_pb2.RANDOM: OwnAddressType.RANDOM,
|
||||
host_pb2.RESOLVABLE_OR_PUBLIC: OwnAddressType.RESOLVABLE_OR_PUBLIC,
|
||||
host_pb2.RESOLVABLE_OR_RANDOM: OwnAddressType.RESOLVABLE_OR_RANDOM,
|
||||
}
|
||||
|
||||
|
||||
@@ -250,7 +251,7 @@ class HostService(HostServicer):
|
||||
connection = await self.device.connect(
|
||||
address,
|
||||
transport=BT_LE_TRANSPORT,
|
||||
own_address_type=request.own_address_type,
|
||||
own_address_type=OwnAddressType(request.own_address_type),
|
||||
)
|
||||
except ConnectionError as e:
|
||||
if e.error_code == HCI_PAGE_TIMEOUT_ERROR:
|
||||
@@ -378,7 +379,7 @@ class HostService(HostServicer):
|
||||
def on_connection(connection: bumble.device.Connection) -> None:
|
||||
if (
|
||||
connection.transport == BT_LE_TRANSPORT
|
||||
and connection.role == BT_PERIPHERAL_ROLE
|
||||
and connection.role == Role.PERIPHERAL
|
||||
):
|
||||
connections.put_nowait(connection)
|
||||
|
||||
@@ -496,7 +497,7 @@ class HostService(HostServicer):
|
||||
def on_connection(connection: bumble.device.Connection) -> None:
|
||||
if (
|
||||
connection.transport == BT_LE_TRANSPORT
|
||||
and connection.role == BT_PERIPHERAL_ROLE
|
||||
and connection.role == Role.PERIPHERAL
|
||||
):
|
||||
connections.put_nowait(connection)
|
||||
|
||||
@@ -509,7 +510,7 @@ class HostService(HostServicer):
|
||||
await self.device.start_advertising(
|
||||
target=target,
|
||||
advertising_type=advertising_type,
|
||||
own_address_type=request.own_address_type,
|
||||
own_address_type=OwnAddressType(request.own_address_type),
|
||||
)
|
||||
|
||||
if not request.connectable:
|
||||
@@ -558,7 +559,7 @@ class HostService(HostServicer):
|
||||
await self.device.start_scanning(
|
||||
legacy=request.legacy,
|
||||
active=not request.passive,
|
||||
own_address_type=request.own_address_type,
|
||||
own_address_type=OwnAddressType(request.own_address_type),
|
||||
scan_interval=(
|
||||
int(request.interval)
|
||||
if request.interval
|
||||
|
||||
@@ -24,11 +24,10 @@ from bumble import hci
|
||||
from bumble.core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_LE_TRANSPORT,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
ProtocolError,
|
||||
)
|
||||
from bumble.device import Connection as BumbleConnection, Device
|
||||
from bumble.hci import HCI_Error
|
||||
from bumble.hci import HCI_Error, Role
|
||||
from bumble.utils import EventWatcher
|
||||
from bumble.pairing import PairingConfig, PairingDelegate as BasePairingDelegate
|
||||
from google.protobuf import any_pb2 # pytype: disable=pyi-error
|
||||
@@ -318,7 +317,7 @@ class SecurityService(SecurityServicer):
|
||||
|
||||
if (
|
||||
connection.transport == BT_LE_TRANSPORT
|
||||
and connection.role == BT_PERIPHERAL_ROLE
|
||||
and connection.role == Role.PERIPHERAL
|
||||
):
|
||||
connection.request_pairing()
|
||||
else:
|
||||
|
||||
@@ -20,11 +20,11 @@ import inspect
|
||||
import logging
|
||||
|
||||
from bumble.device import Device
|
||||
from bumble.hci import Address
|
||||
from bumble.hci import Address, AddressType
|
||||
from google.protobuf.message import Message # pytype: disable=pyi-error
|
||||
from typing import Any, Dict, Generator, MutableMapping, Optional, Tuple
|
||||
|
||||
ADDRESS_TYPES: Dict[str, int] = {
|
||||
ADDRESS_TYPES: Dict[str, AddressType] = {
|
||||
"public": Address.PUBLIC_DEVICE_ADDRESS,
|
||||
"random": Address.RANDOM_DEVICE_ADDRESS,
|
||||
"public_identity": Address.PUBLIC_IDENTITY_ADDRESS,
|
||||
|
||||
@@ -0,0 +1,514 @@
|
||||
# Copyright 2025 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
Apple Notification Center Service (ANCS).
|
||||
"""
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
from __future__ import annotations
|
||||
import asyncio
|
||||
import dataclasses
|
||||
import datetime
|
||||
import enum
|
||||
import logging
|
||||
import struct
|
||||
from typing import Optional, Sequence, Union
|
||||
|
||||
from pyee import EventEmitter
|
||||
|
||||
from bumble.att import ATT_Error
|
||||
from bumble.device import Peer
|
||||
from bumble.gatt import (
|
||||
Characteristic,
|
||||
GATT_ANCS_SERVICE,
|
||||
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC,
|
||||
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC,
|
||||
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC,
|
||||
TemplateService,
|
||||
)
|
||||
from bumble.gatt_client import CharacteristicProxy, ProfileServiceProxy, ServiceProxy
|
||||
from bumble.gatt_adapters import SerializableCharacteristicProxyAdapter
|
||||
from bumble.utils import OpenIntEnum
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Constants
|
||||
# -----------------------------------------------------------------------------
|
||||
_DEFAULT_ATTRIBUTE_MAX_LENGTH = 65535
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Logging
|
||||
# -----------------------------------------------------------------------------
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Protocol
|
||||
# -----------------------------------------------------------------------------
|
||||
class ActionId(OpenIntEnum):
|
||||
POSITIVE = 0
|
||||
NEGATIVE = 1
|
||||
|
||||
|
||||
class AppAttributeId(OpenIntEnum):
|
||||
DISPLAY_NAME = 0
|
||||
|
||||
|
||||
class CategoryId(OpenIntEnum):
|
||||
OTHER = 0
|
||||
INCOMING_CALL = 1
|
||||
MISSED_CALL = 2
|
||||
VOICEMAIL = 3
|
||||
SOCIAL = 4
|
||||
SCHEDULE = 5
|
||||
EMAIL = 6
|
||||
NEWS = 7
|
||||
HEALTH_AND_FITNESS = 8
|
||||
BUSINESS_AND_FINANCE = 9
|
||||
LOCATION = 10
|
||||
ENTERTAINMENT = 11
|
||||
|
||||
|
||||
class CommandId(OpenIntEnum):
|
||||
GET_NOTIFICATION_ATTRIBUTES = 0
|
||||
GET_APP_ATTRIBUTES = 1
|
||||
PERFORM_NOTIFICATION_ACTION = 2
|
||||
|
||||
|
||||
class EventId(OpenIntEnum):
|
||||
NOTIFICATION_ADDED = 0
|
||||
NOTIFICATION_MODIFIED = 1
|
||||
NOTIFICATION_REMOVED = 2
|
||||
|
||||
|
||||
class EventFlags(enum.IntFlag):
|
||||
SILENT = 1 << 0
|
||||
IMPORTANT = 1 << 1
|
||||
PRE_EXISTING = 1 << 2
|
||||
POSITIVE_ACTION = 1 << 3
|
||||
NEGATIVE_ACTION = 1 << 4
|
||||
|
||||
|
||||
class NotificationAttributeId(OpenIntEnum):
|
||||
APP_IDENTIFIER = 0
|
||||
TITLE = 1
|
||||
SUBTITLE = 2
|
||||
MESSAGE = 3
|
||||
MESSAGE_SIZE = 4
|
||||
DATE = 5
|
||||
POSITIVE_ACTION_LABEL = 6
|
||||
NEGATIVE_ACTION_LABEL = 7
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class NotificationAttribute:
|
||||
attribute_id: NotificationAttributeId
|
||||
value: Union[str, int, datetime.datetime]
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class AppAttribute:
|
||||
attribute_id: AppAttributeId
|
||||
value: str
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Notification:
|
||||
event_id: EventId
|
||||
event_flags: EventFlags
|
||||
category_id: CategoryId
|
||||
category_count: int
|
||||
notification_uid: int
|
||||
|
||||
@classmethod
|
||||
def from_bytes(cls, data: bytes) -> Notification:
|
||||
return cls(
|
||||
event_id=EventId(data[0]),
|
||||
event_flags=EventFlags(data[1]),
|
||||
category_id=CategoryId(data[2]),
|
||||
category_count=data[3],
|
||||
notification_uid=int.from_bytes(data[4:8], 'little'),
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return struct.pack(
|
||||
"<BBBBI",
|
||||
self.event_id,
|
||||
self.event_flags,
|
||||
self.category_id,
|
||||
self.category_count,
|
||||
self.notification_uid,
|
||||
)
|
||||
|
||||
|
||||
class ErrorCode(OpenIntEnum):
|
||||
UNKNOWN_COMMAND = 0xA0
|
||||
INVALID_COMMAND = 0xA1
|
||||
INVALID_PARAMETER = 0xA2
|
||||
ACTION_FAILED = 0xA3
|
||||
|
||||
|
||||
class ProtocolError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CommandError(Exception):
|
||||
def __init__(self, error_code: ErrorCode) -> None:
|
||||
self.error_code = error_code
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f"CommandError(error_code={self.error_code.name})"
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# GATT Server-side
|
||||
# -----------------------------------------------------------------------------
|
||||
class Ancs(TemplateService):
|
||||
UUID = GATT_ANCS_SERVICE
|
||||
|
||||
notification_source_characteristic: Characteristic
|
||||
data_source_characteristic: Characteristic
|
||||
control_point_characteristic: Characteristic
|
||||
|
||||
def __init__(self) -> None:
|
||||
# TODO not the final implementation
|
||||
self.notification_source_characteristic = Characteristic(
|
||||
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC,
|
||||
Characteristic.Properties.NOTIFY,
|
||||
Characteristic.Permissions.READABLE,
|
||||
)
|
||||
|
||||
# TODO not the final implementation
|
||||
self.data_source_characteristic = Characteristic(
|
||||
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC,
|
||||
Characteristic.Properties.NOTIFY,
|
||||
Characteristic.Permissions.READABLE,
|
||||
)
|
||||
|
||||
# TODO not the final implementation
|
||||
self.control_point_characteristic = Characteristic(
|
||||
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC,
|
||||
Characteristic.Properties.WRITE,
|
||||
Characteristic.Permissions.WRITEABLE,
|
||||
)
|
||||
|
||||
super().__init__(
|
||||
[
|
||||
self.notification_source_characteristic,
|
||||
self.data_source_characteristic,
|
||||
self.control_point_characteristic,
|
||||
]
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# GATT Client-side
|
||||
# -----------------------------------------------------------------------------
|
||||
class AncsProxy(ProfileServiceProxy):
|
||||
SERVICE_CLASS = Ancs
|
||||
|
||||
notification_source: CharacteristicProxy[Notification]
|
||||
data_source: CharacteristicProxy
|
||||
control_point: CharacteristicProxy[bytes]
|
||||
|
||||
def __init__(self, service_proxy: ServiceProxy):
|
||||
self.notification_source = SerializableCharacteristicProxyAdapter(
|
||||
service_proxy.get_required_characteristic_by_uuid(
|
||||
GATT_ANCS_NOTIFICATION_SOURCE_CHARACTERISTIC
|
||||
),
|
||||
Notification,
|
||||
)
|
||||
|
||||
self.data_source = service_proxy.get_required_characteristic_by_uuid(
|
||||
GATT_ANCS_DATA_SOURCE_CHARACTERISTIC
|
||||
)
|
||||
|
||||
self.control_point = service_proxy.get_required_characteristic_by_uuid(
|
||||
GATT_ANCS_CONTROL_POINT_CHARACTERISTIC
|
||||
)
|
||||
|
||||
|
||||
class AncsClient(EventEmitter):
|
||||
_expected_response_command_id: Optional[CommandId]
|
||||
_expected_response_notification_uid: Optional[int]
|
||||
_expected_response_app_identifier: Optional[str]
|
||||
_expected_app_identifier: Optional[str]
|
||||
_expected_response_tuples: int
|
||||
_response_accumulator: bytes
|
||||
|
||||
def __init__(self, ancs_proxy: AncsProxy) -> None:
|
||||
super().__init__()
|
||||
self._ancs_proxy = ancs_proxy
|
||||
self._command_semaphore = asyncio.Semaphore()
|
||||
self._response: Optional[asyncio.Future] = None
|
||||
self._reset_response()
|
||||
self._started = False
|
||||
|
||||
@classmethod
|
||||
async def for_peer(cls, peer: Peer) -> Optional[AncsClient]:
|
||||
ancs_proxy = await peer.discover_service_and_create_proxy(AncsProxy)
|
||||
if ancs_proxy is None:
|
||||
return None
|
||||
return cls(ancs_proxy)
|
||||
|
||||
async def start(self) -> None:
|
||||
await self._ancs_proxy.notification_source.subscribe(self._on_notification)
|
||||
await self._ancs_proxy.data_source.subscribe(self._on_data)
|
||||
self._started = True
|
||||
|
||||
async def stop(self) -> None:
|
||||
await self._ancs_proxy.notification_source.unsubscribe(self._on_notification)
|
||||
await self._ancs_proxy.data_source.unsubscribe(self._on_data)
|
||||
self._started = False
|
||||
|
||||
def _reset_response(self) -> None:
|
||||
self._expected_response_command_id = None
|
||||
self._expected_response_notification_uid = None
|
||||
self._expected_app_identifier = None
|
||||
self._expected_response_tuples = 0
|
||||
self._response_accumulator = b""
|
||||
|
||||
def _on_notification(self, notification: Notification) -> None:
|
||||
logger.debug(f"ANCS NOTIFICATION: {notification}")
|
||||
self.emit("notification", notification)
|
||||
|
||||
def _on_data(self, data: bytes) -> None:
|
||||
logger.debug(f"ANCS DATA: {data.hex()}")
|
||||
|
||||
if not self._response:
|
||||
logger.warning("received unexpected data, discarding")
|
||||
return
|
||||
|
||||
self._response_accumulator += data
|
||||
|
||||
# Try to parse the accumulated data until we have all we need.
|
||||
if not self._response_accumulator:
|
||||
logger.warning("empty data from data source")
|
||||
return
|
||||
|
||||
command_id = self._response_accumulator[0]
|
||||
if command_id != self._expected_response_command_id:
|
||||
logger.warning(
|
||||
"unexpected response command id: "
|
||||
f"expected {self._expected_response_command_id} "
|
||||
f"but got {command_id}"
|
||||
)
|
||||
self._reset_response()
|
||||
if not self._response.done():
|
||||
self._response.set_exception(ProtocolError())
|
||||
|
||||
if len(self._response_accumulator) < 5:
|
||||
# Not enough data yet.
|
||||
return
|
||||
|
||||
attributes: list[Union[NotificationAttribute, AppAttribute]] = []
|
||||
|
||||
if command_id == CommandId.GET_NOTIFICATION_ATTRIBUTES:
|
||||
(notification_uid,) = struct.unpack_from(
|
||||
"<I", self._response_accumulator, 1
|
||||
)
|
||||
if notification_uid != self._expected_response_notification_uid:
|
||||
logger.warning(
|
||||
"unexpected response notification uid: "
|
||||
f"expected {self._expected_response_notification_uid} "
|
||||
f"but got {notification_uid}"
|
||||
)
|
||||
self._reset_response()
|
||||
if not self._response.done():
|
||||
self._response.set_exception(ProtocolError())
|
||||
|
||||
attribute_data = self._response_accumulator[5:]
|
||||
while len(attribute_data) >= 3:
|
||||
attribute_id, attribute_data_length = struct.unpack_from(
|
||||
"<BH", attribute_data, 0
|
||||
)
|
||||
if len(attribute_data) < 3 + attribute_data_length:
|
||||
return
|
||||
str_value = attribute_data[3 : 3 + attribute_data_length].decode(
|
||||
"utf-8"
|
||||
)
|
||||
value: Union[str, int, datetime.datetime]
|
||||
if attribute_id == NotificationAttributeId.MESSAGE_SIZE:
|
||||
value = int(str_value)
|
||||
elif attribute_id == NotificationAttributeId.DATE:
|
||||
year = int(str_value[:4])
|
||||
month = int(str_value[4:6])
|
||||
day = int(str_value[6:8])
|
||||
hour = int(str_value[9:11])
|
||||
minute = int(str_value[11:13])
|
||||
second = int(str_value[13:15])
|
||||
value = datetime.datetime(year, month, day, hour, minute, second)
|
||||
else:
|
||||
value = str_value
|
||||
attributes.append(
|
||||
NotificationAttribute(NotificationAttributeId(attribute_id), value)
|
||||
)
|
||||
attribute_data = attribute_data[3 + attribute_data_length :]
|
||||
elif command_id == CommandId.GET_APP_ATTRIBUTES:
|
||||
if 0 not in self._response_accumulator[1:]:
|
||||
# No null-terminated string yet.
|
||||
return
|
||||
|
||||
app_identifier_length = self._response_accumulator.find(0, 1) - 1
|
||||
app_identifier = self._response_accumulator[
|
||||
1 : 1 + app_identifier_length
|
||||
].decode("utf-8")
|
||||
if app_identifier != self._expected_response_app_identifier:
|
||||
logger.warning(
|
||||
"unexpected response app identifier: "
|
||||
f"expected {self._expected_response_app_identifier} "
|
||||
f"but got {app_identifier}"
|
||||
)
|
||||
self._reset_response()
|
||||
if not self._response.done():
|
||||
self._response.set_exception(ProtocolError())
|
||||
|
||||
attribute_data = self._response_accumulator[1 + app_identifier_length + 1 :]
|
||||
while len(attribute_data) >= 3:
|
||||
attribute_id, attribute_data_length = struct.unpack_from(
|
||||
"<BH", attribute_data, 0
|
||||
)
|
||||
if len(attribute_data) < 3 + attribute_data_length:
|
||||
return
|
||||
attributes.append(
|
||||
AppAttribute(
|
||||
AppAttributeId(attribute_id),
|
||||
attribute_data[3 : 3 + attribute_data_length].decode("utf-8"),
|
||||
)
|
||||
)
|
||||
attribute_data = attribute_data[3 + attribute_data_length :]
|
||||
else:
|
||||
logger.warning(f"unexpected response command id {command_id}")
|
||||
return
|
||||
|
||||
if len(attributes) < self._expected_response_tuples:
|
||||
# We have not received all the tuples yet.
|
||||
return
|
||||
|
||||
if not self._response.done():
|
||||
self._response.set_result(attributes)
|
||||
|
||||
async def _send_command(self, command: bytes) -> None:
|
||||
try:
|
||||
await self._ancs_proxy.control_point.write_value(
|
||||
command, with_response=True
|
||||
)
|
||||
except ATT_Error as error:
|
||||
raise CommandError(error_code=ErrorCode(error.error_code)) from error
|
||||
|
||||
async def get_notification_attributes(
|
||||
self,
|
||||
notification_uid: int,
|
||||
attributes: Sequence[
|
||||
Union[NotificationAttributeId, tuple[NotificationAttributeId, int]]
|
||||
],
|
||||
) -> list[NotificationAttribute]:
|
||||
if not self._started:
|
||||
raise RuntimeError("client not started")
|
||||
|
||||
command = struct.pack(
|
||||
"<BI", CommandId.GET_NOTIFICATION_ATTRIBUTES, notification_uid
|
||||
)
|
||||
for attribute in attributes:
|
||||
attribute_max_length = 0
|
||||
if isinstance(attribute, tuple):
|
||||
attribute_id, attribute_max_length = attribute
|
||||
if attribute_id not in (
|
||||
NotificationAttributeId.TITLE,
|
||||
NotificationAttributeId.SUBTITLE,
|
||||
NotificationAttributeId.MESSAGE,
|
||||
):
|
||||
raise ValueError(
|
||||
"this attribute does not allow specifying a max length"
|
||||
)
|
||||
else:
|
||||
attribute_id = attribute
|
||||
if attribute_id in (
|
||||
NotificationAttributeId.TITLE,
|
||||
NotificationAttributeId.SUBTITLE,
|
||||
NotificationAttributeId.MESSAGE,
|
||||
):
|
||||
attribute_max_length = _DEFAULT_ATTRIBUTE_MAX_LENGTH
|
||||
|
||||
if attribute_max_length:
|
||||
command += struct.pack("<BH", attribute_id, attribute_max_length)
|
||||
else:
|
||||
command += struct.pack("B", attribute_id)
|
||||
|
||||
try:
|
||||
async with self._command_semaphore:
|
||||
self._expected_response_notification_uid = notification_uid
|
||||
self._expected_response_tuples = len(attributes)
|
||||
self._expected_response_command_id = (
|
||||
CommandId.GET_NOTIFICATION_ATTRIBUTES
|
||||
)
|
||||
self._response = asyncio.Future()
|
||||
|
||||
# Send the command.
|
||||
await self._send_command(command)
|
||||
|
||||
# Wait for the response.
|
||||
return await self._response
|
||||
finally:
|
||||
self._reset_response()
|
||||
|
||||
async def get_app_attributes(
|
||||
self, app_identifier: str, attributes: Sequence[AppAttributeId]
|
||||
) -> list[AppAttribute]:
|
||||
if not self._started:
|
||||
raise RuntimeError("client not started")
|
||||
|
||||
command = (
|
||||
bytes([CommandId.GET_APP_ATTRIBUTES])
|
||||
+ app_identifier.encode("utf-8")
|
||||
+ b"\0"
|
||||
)
|
||||
for attribute_id in attributes:
|
||||
command += struct.pack("B", attribute_id)
|
||||
|
||||
try:
|
||||
async with self._command_semaphore:
|
||||
self._expected_response_app_identifier = app_identifier
|
||||
self._expected_response_tuples = len(attributes)
|
||||
self._expected_response_command_id = CommandId.GET_APP_ATTRIBUTES
|
||||
self._response = asyncio.Future()
|
||||
|
||||
# Send the command.
|
||||
await self._send_command(command)
|
||||
|
||||
# Wait for the response.
|
||||
return await self._response
|
||||
finally:
|
||||
self._reset_response()
|
||||
|
||||
async def perform_action(self, notification_uid: int, action: ActionId) -> None:
|
||||
if not self._started:
|
||||
raise RuntimeError("client not started")
|
||||
|
||||
command = struct.pack(
|
||||
"<BIB", CommandId.PERFORM_NOTIFICATION_ACTION, notification_uid, action
|
||||
)
|
||||
|
||||
async with self._command_semaphore:
|
||||
await self._send_command(command)
|
||||
|
||||
async def perform_positive_action(self, notification_uid: int) -> None:
|
||||
return await self.perform_action(notification_uid, ActionId.POSITIVE)
|
||||
|
||||
async def perform_negative_action(self, notification_uid: int) -> None:
|
||||
return await self.perform_action(notification_uid, ActionId.NEGATIVE)
|
||||
+3
-3
@@ -46,13 +46,13 @@ from pyee import EventEmitter
|
||||
from .colors import color
|
||||
from .hci import (
|
||||
Address,
|
||||
Role,
|
||||
HCI_LE_Enable_Encryption_Command,
|
||||
HCI_Object,
|
||||
key_with_value,
|
||||
)
|
||||
from .core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_CENTRAL_ROLE,
|
||||
BT_LE_TRANSPORT,
|
||||
AdvertisingData,
|
||||
InvalidArgumentError,
|
||||
@@ -1975,7 +1975,7 @@ class Manager(EventEmitter):
|
||||
|
||||
# Look for a session with this connection, and create one if none exists
|
||||
if not (session := self.sessions.get(connection.handle)):
|
||||
if connection.role == BT_CENTRAL_ROLE:
|
||||
if connection.role == Role.CENTRAL:
|
||||
logger.warning('Remote starts pairing as Peripheral!')
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
session = self.session_proxy(
|
||||
@@ -1995,7 +1995,7 @@ class Manager(EventEmitter):
|
||||
|
||||
async def pair(self, connection: Connection) -> None:
|
||||
# TODO: check if there's already a session for this connection
|
||||
if connection.role != BT_CENTRAL_ROLE:
|
||||
if connection.role != Role.CENTRAL:
|
||||
logger.warning('Start pairing as Peripheral!')
|
||||
pairing_config = self.pairing_config_factory(connection)
|
||||
session = self.session_proxy(
|
||||
|
||||
@@ -115,9 +115,7 @@ async def open_usb_transport(spec: str) -> Transport:
|
||||
self.acl_out = acl_out
|
||||
self.acl_out_transfer = device.getTransfer()
|
||||
self.acl_out_transfer_ready = asyncio.Semaphore(1)
|
||||
self.packets: asyncio.Queue[bytes] = (
|
||||
asyncio.Queue()
|
||||
) # Queue of packets waiting to be sent
|
||||
self.packets = asyncio.Queue[bytes]() # Queue of packets waiting to be sent
|
||||
self.loop = asyncio.get_running_loop()
|
||||
self.queue_task = None
|
||||
self.cancel_done = self.loop.create_future()
|
||||
|
||||
@@ -0,0 +1,202 @@
|
||||
AURACAST TOOL
|
||||
=============
|
||||
|
||||
The "auracast" tool implements commands that implement broadcasting, receiving
|
||||
and controlling LE Audio broadcasts.
|
||||
|
||||
=== "Running as an installed package"
|
||||
```
|
||||
$ bumble-auracast
|
||||
```
|
||||
|
||||
=== "Running from source"
|
||||
```
|
||||
$ python3 apps/auracast.py <args>
|
||||
```
|
||||
|
||||
# Python Dependencies
|
||||
Try installing the optional `[auracast]` dependencies:
|
||||
|
||||
=== "From source"
|
||||
```bash
|
||||
$ python3 -m pip install ".[auracast]"
|
||||
```
|
||||
|
||||
=== "From PyPI"
|
||||
```bash
|
||||
$ python3 -m pip install "bumble[auracast]"
|
||||
```
|
||||
|
||||
## LC3
|
||||
The `auracast` app depends on the `lc3` python module, which is available
|
||||
either as PyPI module (currently only available for Linux x86_64).
|
||||
When installing Bumble with the optional `auracast` dependency, the `lc3`
|
||||
module will be installed from the `lc3py` PyPI package if available.
|
||||
If not, you will need to install it separately. This can be done with:
|
||||
```bash
|
||||
$ python3 -m pip install "git+https://github.com/google/liblc3.git"
|
||||
```
|
||||
|
||||
## SoundDevice
|
||||
The `sounddevice` module is required for audio output to the host's sound
|
||||
output device(s) and/or input from the host's input device(s).
|
||||
If not installed, the `auracast` app is still functional, but will be limited
|
||||
to non-device inputs and output (files, external processes, ...)
|
||||
|
||||
On macOS and Windows, the `sounddevice` module gets installed with the
|
||||
native PortAudio libraries included.
|
||||
|
||||
For Linux, however, PortAudio must be installed separately.
|
||||
This is typically done with a command like:
|
||||
```bash
|
||||
$ sudo apt install libportaudio2
|
||||
```
|
||||
|
||||
Visit the [sounddevice documentation](https://python-sounddevice.readthedocs.io/)
|
||||
for details.
|
||||
|
||||
|
||||
# General Usage
|
||||
```
|
||||
Usage: bumble-auracast [OPTIONS] COMMAND [ARGS]...
|
||||
|
||||
Options:
|
||||
--help Show this message and exit.
|
||||
|
||||
Commands:
|
||||
assist Scan for broadcasts on behalf of an audio server
|
||||
pair Pair with an audio server
|
||||
receive Receive a broadcast source
|
||||
scan Scan for public broadcasts
|
||||
transmit Transmit a broadcast source
|
||||
```
|
||||
|
||||
Use `bumble-auracast <command> --help` to get more detailed usage information
|
||||
for a specific `<command>`.
|
||||
|
||||
## `assist`
|
||||
Act as a broadcast assistant.
|
||||
|
||||
Use `bumble-auracast assist --help` for details on the commands and options.
|
||||
|
||||
The assistant commands are:
|
||||
|
||||
### `monitor-state`
|
||||
Subscribe to the state characteristic and monitor changes.
|
||||
|
||||
### `add-source`
|
||||
Add a broadcast source. This will instruct the device to start
|
||||
receiving a broadcast.
|
||||
|
||||
### `modify-source`
|
||||
Modify a broadcast source.
|
||||
|
||||
### `remove-source`
|
||||
Remote a broadcast source.
|
||||
|
||||
## `pair`
|
||||
Pair with a device.
|
||||
|
||||
## `receive`
|
||||
Receive a broadcast source.
|
||||
|
||||
The `--output` option specifies where to send the decoded audio samples.
|
||||
The following outputs are supported:
|
||||
|
||||
### Sound Device
|
||||
The `--output` argument is either `device`, to send the audio to the hosts's default sound device, or `device:<DEVICE_ID>` where `<DEVICE_ID>`
|
||||
is the integer ID of one of the available sound devices.
|
||||
When invoked with `--output "device:?"`, a list of available devices and
|
||||
their IDs is printed out.
|
||||
|
||||
### Standard Output
|
||||
With `--output stdout`, the decoded audio samples are written to the
|
||||
standard output (currently always as float32 PCM samples)
|
||||
|
||||
### FFPlay
|
||||
With `--output ffplay`, the decoded audio samples are piped to `ffplay`
|
||||
in a child process. This option is only available if `ffplay` is a command that is available on the host.
|
||||
|
||||
### File
|
||||
With `--output <filename>` or `--output file:<filename>`, the decoded audio
|
||||
samples are written to a file (currently always as float32 PCM)
|
||||
|
||||
## `transmit`
|
||||
Broadcast an audio source as a transmitter.
|
||||
|
||||
The `--input` and `--input-format` options specify what audio input
|
||||
source to transmit.
|
||||
The following inputs are supported:
|
||||
|
||||
### Sound Device
|
||||
The `--input` argument is either `device`, to use the host's default sound
|
||||
device (typically a builtin microphone), or `device:<DEVICE_ID>` where
|
||||
`<DEVICE_ID>` is the integer ID of one of the available sound devices.
|
||||
When invoked with `--input "device:?"`, a list of available devices and their
|
||||
IDs is printed out.
|
||||
|
||||
### Standard Input
|
||||
With `--input stdout`, the audio samples are read from the standard input.
|
||||
(currently always as int16 PCM).
|
||||
|
||||
### File
|
||||
With `--input <filename>` or `--input file:<filename>`, the audio samples
|
||||
are read from a .wav or raw PCM file.
|
||||
|
||||
Use the `--input-format <FORMAT>` option to specify the format of the audio
|
||||
samples in raw PCM files. `<FORMAT>` is expressed as:
|
||||
`<sample-type>,<sample-rate>,<channels>`
|
||||
(the only supported <sample-type> currently is 'int16le' for 16 bit signed integers with little-endian byte order)
|
||||
|
||||
## `scan`
|
||||
Scan for public broadcasts.
|
||||
|
||||
A live display of the available broadcasts is displayed continuously.
|
||||
|
||||
# Compatibility With Some Products
|
||||
The `auracast` app has been tested for compatibility with a few products.
|
||||
The list is still very limited. Please let us know if there are products
|
||||
that are not working well, or if there are specific instructions that should
|
||||
be shared to allow better compatibiity with certain products.
|
||||
|
||||
## Transmitters
|
||||
|
||||
The `receive` command has been tested to successfully receive broadcasts from
|
||||
the following transmitters:
|
||||
|
||||
* JBL GO 4
|
||||
* Flairmesh FlooGoo FMA120
|
||||
* Eppfun AK3040Pro Max
|
||||
* HIGHGAZE BA-25T
|
||||
* Nexum Audio VOCE and USB dongle
|
||||
|
||||
## Receivers
|
||||
|
||||
### Pixel Buds Pro 2
|
||||
|
||||
The Pixel Buds Pro 2 can be used as a broadcast receiver, controlled by the
|
||||
`auracast assist` command, instructing the buds to receive a broadcast.
|
||||
|
||||
Use the `assist --command add-source` command to tell the buds to receive a
|
||||
broadcast.
|
||||
|
||||
Use the `assist --command monitor-state` command to monitor the current sync/receive
|
||||
state of the buds.
|
||||
|
||||
### JBL
|
||||
The JBL GO 4 and other JBL products that support the Auracast feature can be used
|
||||
as transmitters or receivers.
|
||||
|
||||
When running in receiver mode (pressing the Auracast button while not already playing),
|
||||
the JBL speaker will scan for broadcast advertisements with a specific manufacturer data.
|
||||
Use the `--manufacturer-data` option of the `transmit` command in order to include data
|
||||
that will let the speaker recognize the broadcast as a compatible source.
|
||||
|
||||
The manufacturer ID for JBL is 87.
|
||||
Using an option like `--manufacturer-data 87:00000000000000000000000000000000dffd` should work (tested on the
|
||||
JBL GO 4. The `dffd` value at the end of the payload may be different on other models?).
|
||||
|
||||
|
||||
### Others
|
||||
|
||||
* Nexum Audio VOCE and USB dongle
|
||||
@@ -0,0 +1,215 @@
|
||||
# Copyright 2025 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# https://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
# Imports
|
||||
# -----------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
import logging
|
||||
from bumble.colors import color
|
||||
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.transport import open_transport
|
||||
from bumble.profiles.ancs import (
|
||||
AncsClient,
|
||||
AppAttribute,
|
||||
AppAttributeId,
|
||||
EventFlags,
|
||||
EventId,
|
||||
Notification,
|
||||
NotificationAttributeId,
|
||||
)
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
_cached_app_names: dict[str, str] = {}
|
||||
_notification_queue = asyncio.Queue[Notification]()
|
||||
|
||||
|
||||
async def process_notifications(ancs_client: AncsClient):
|
||||
while True:
|
||||
notification = await _notification_queue.get()
|
||||
|
||||
prefix = " "
|
||||
if notification.event_id == EventId.NOTIFICATION_ADDED:
|
||||
print_color = "green"
|
||||
if notification.event_flags & EventFlags.PRE_EXISTING:
|
||||
prefix = " Existing "
|
||||
else:
|
||||
prefix = " New "
|
||||
elif notification.event_id == EventId.NOTIFICATION_REMOVED:
|
||||
print_color = "red"
|
||||
elif notification.event_id == EventId.NOTIFICATION_MODIFIED:
|
||||
print_color = "yellow"
|
||||
else:
|
||||
print_color = "white"
|
||||
|
||||
print(
|
||||
color(
|
||||
(
|
||||
f"[{notification.event_id.name}]{prefix}Notification "
|
||||
f"({notification.notification_uid}):"
|
||||
),
|
||||
print_color,
|
||||
)
|
||||
)
|
||||
print(color(" Event ID: ", "yellow"), notification.event_id.name)
|
||||
print(color(" Event Flags: ", "yellow"), notification.event_flags.name)
|
||||
print(color(" Category ID: ", "yellow"), notification.category_id.name)
|
||||
print(color(" Category Count:", "yellow"), notification.category_count)
|
||||
|
||||
if notification.event_id not in (
|
||||
EventId.NOTIFICATION_ADDED,
|
||||
EventId.NOTIFICATION_MODIFIED,
|
||||
):
|
||||
continue
|
||||
|
||||
requested_attributes = [
|
||||
NotificationAttributeId.APP_IDENTIFIER,
|
||||
NotificationAttributeId.TITLE,
|
||||
NotificationAttributeId.SUBTITLE,
|
||||
NotificationAttributeId.MESSAGE,
|
||||
NotificationAttributeId.DATE,
|
||||
]
|
||||
if notification.event_flags & EventFlags.NEGATIVE_ACTION:
|
||||
requested_attributes.append(NotificationAttributeId.NEGATIVE_ACTION_LABEL)
|
||||
if notification.event_flags & EventFlags.POSITIVE_ACTION:
|
||||
requested_attributes.append(NotificationAttributeId.POSITIVE_ACTION_LABEL)
|
||||
|
||||
attributes = await ancs_client.get_notification_attributes(
|
||||
notification.notification_uid, requested_attributes
|
||||
)
|
||||
max_attribute_name_width = max(
|
||||
(len(attribute.attribute_id.name) for attribute in attributes)
|
||||
)
|
||||
app_identifier = str(
|
||||
next(
|
||||
(
|
||||
attribute.value
|
||||
for attribute in attributes
|
||||
if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER
|
||||
)
|
||||
)
|
||||
)
|
||||
if app_identifier not in _cached_app_names:
|
||||
app_attributes = await ancs_client.get_app_attributes(
|
||||
app_identifier, [AppAttributeId.DISPLAY_NAME]
|
||||
)
|
||||
_cached_app_names[app_identifier] = app_attributes[0].value
|
||||
app_name = _cached_app_names[app_identifier]
|
||||
|
||||
for attribute in attributes:
|
||||
padding = ' ' * (
|
||||
max_attribute_name_width - len(attribute.attribute_id.name)
|
||||
)
|
||||
suffix = (
|
||||
f" ({app_name})"
|
||||
if attribute.attribute_id == NotificationAttributeId.APP_IDENTIFIER
|
||||
else ""
|
||||
)
|
||||
print(
|
||||
color(f" {attribute.attribute_id.name}:{padding}", "blue"),
|
||||
f"{attribute.value}{suffix}",
|
||||
)
|
||||
|
||||
print()
|
||||
|
||||
|
||||
def on_ancs_notification(notification: Notification) -> None:
|
||||
_notification_queue.put_nowait(notification)
|
||||
|
||||
|
||||
async def handle_command_client(
|
||||
ancs_client: AncsClient, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
|
||||
) -> None:
|
||||
while True:
|
||||
command = (await reader.readline()).decode("utf-8").strip()
|
||||
|
||||
try:
|
||||
command_name, command_args = command.split(" ", 1)
|
||||
if command_name == "+":
|
||||
notification_uid = int(command_args)
|
||||
await ancs_client.perform_positive_action(notification_uid)
|
||||
elif command_name == "-":
|
||||
notification_uid = int(command_args)
|
||||
await ancs_client.perform_negative_action(notification_uid)
|
||||
else:
|
||||
writer.write(f"unknown command {command_name}".encode("utf-8"))
|
||||
except Exception as error:
|
||||
writer.write(f"ERROR: {error}\n".encode("utf-8"))
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
async def main() -> None:
|
||||
if len(sys.argv) < 3:
|
||||
print(
|
||||
'Usage: run_ancs_client.py <device-config> <transport-spec> '
|
||||
'<bluetooth-address> <mtu>'
|
||||
)
|
||||
print('example: run_ancs_client.py device1.json usb:0 E1:CA:72:48:C4:E8 512')
|
||||
return
|
||||
device_config, transport_spec, bluetooth_address, mtu = sys.argv[1:]
|
||||
|
||||
print('<<< connecting to HCI...')
|
||||
async with await open_transport(transport_spec) as hci_transport:
|
||||
print('<<< connected')
|
||||
|
||||
# Create a device to manage the host, with a custom listener
|
||||
device = Device.from_config_file_with_hci(
|
||||
device_config, hci_transport.source, hci_transport.sink
|
||||
)
|
||||
await device.power_on()
|
||||
|
||||
# Connect to the peer
|
||||
print(f'=== Connecting to {bluetooth_address}...')
|
||||
connection = await device.connect(bluetooth_address)
|
||||
print(f'=== Connected: {connection}')
|
||||
|
||||
await connection.encrypt()
|
||||
|
||||
peer = Peer(connection)
|
||||
mtu_int = int(mtu)
|
||||
if mtu_int:
|
||||
new_mtu = await peer.request_mtu(mtu_int)
|
||||
print(f'ATT MTU = {new_mtu}')
|
||||
ancs_client = await AncsClient.for_peer(peer)
|
||||
if ancs_client is None:
|
||||
print("!!! no ANCS service found")
|
||||
return
|
||||
await ancs_client.start()
|
||||
|
||||
print('Subscribing to updates')
|
||||
ancs_client.on("notification", on_ancs_notification)
|
||||
|
||||
# Process all notifications in a task.
|
||||
notification_processing_task = asyncio.create_task(
|
||||
process_notifications(ancs_client)
|
||||
)
|
||||
|
||||
# Accept a TCP connection to handle commands.
|
||||
tcp_server = await asyncio.start_server(
|
||||
lambda reader, writer: handle_command_client(ancs_client, reader, writer),
|
||||
'127.0.0.1',
|
||||
9000,
|
||||
)
|
||||
print("Accepting command client on port 9000")
|
||||
async with tcp_server:
|
||||
await tcp_server.serve_forever()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
|
||||
asyncio.run(main())
|
||||
@@ -24,7 +24,6 @@ import pytest
|
||||
from bumble.core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
BT_LE_TRANSPORT,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
ConnectionParameters,
|
||||
)
|
||||
from bumble.device import (
|
||||
@@ -43,6 +42,7 @@ from bumble.hci import (
|
||||
HCI_CONNECTION_FAILED_TO_BE_ESTABLISHED_ERROR,
|
||||
Address,
|
||||
OwnAddressType,
|
||||
Role,
|
||||
HCI_Command_Complete_Event,
|
||||
HCI_Command_Status_Event,
|
||||
HCI_Connection_Complete_Event,
|
||||
@@ -295,7 +295,7 @@ async def test_legacy_advertising_disconnection(auto_restart):
|
||||
peer_address,
|
||||
None,
|
||||
None,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
Role.PERIPHERAL,
|
||||
ConnectionParameters(0, 0, 0),
|
||||
)
|
||||
|
||||
@@ -353,7 +353,7 @@ async def test_extended_advertising_connection(own_address_type):
|
||||
peer_address,
|
||||
None,
|
||||
None,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
Role.PERIPHERAL,
|
||||
ConnectionParameters(0, 0, 0),
|
||||
)
|
||||
device.on_advertising_set_termination(
|
||||
@@ -397,7 +397,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
|
||||
Address('F0:F1:F2:F3:F4:F5'),
|
||||
None,
|
||||
None,
|
||||
BT_PERIPHERAL_ROLE,
|
||||
Role.PERIPHERAL,
|
||||
ConnectionParameters(0, 0, 0),
|
||||
)
|
||||
|
||||
|
||||
+3
-2
@@ -24,7 +24,7 @@ import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
from bumble.controller import Controller
|
||||
from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
|
||||
from bumble.core import BT_BR_EDR_TRANSPORT, BT_LE_TRANSPORT
|
||||
from bumble.link import LocalLink
|
||||
from bumble.device import Device, Peer
|
||||
from bumble.host import Host
|
||||
@@ -39,6 +39,7 @@ from bumble.smp import (
|
||||
)
|
||||
from bumble.core import ProtocolError
|
||||
from bumble.keys import PairingKeys
|
||||
from bumble.hci import Role
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
@@ -111,7 +112,7 @@ async def test_self_connection():
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.parametrize(
|
||||
'responder_role,',
|
||||
(BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
|
||||
(Role.CENTRAL, Role.PERIPHERAL),
|
||||
)
|
||||
async def test_self_classic_connection(responder_role):
|
||||
# Create two devices, each with a controller, attached to the same link
|
||||
|
||||
Reference in New Issue
Block a user