diff --git a/apps/auracast.py b/apps/auracast.py index 463fc9b5..89f77a97 100644 --- a/apps/auracast.py +++ b/apps/auracast.py @@ -363,6 +363,11 @@ async def run_discover_broadcasts( hci_sink, ) await device.power_on() + + if not device.supports_le_periodic_advertising: + print(color('Periodic advertising not supported', 'red')) + return + discoverer = BroadcastDiscoverer(device, filter_duplicates, sync_timeout) await discoverer.run() await hci_source.terminated diff --git a/apps/device_info.py b/apps/device_info.py new file mode 100644 index 00000000..3b885c9d --- /dev/null +++ b/apps/device_info.py @@ -0,0 +1,221 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import asyncio +import os +import logging +from typing import Callable, Iterable, Optional + +import click + +from bumble.core import ProtocolError +from bumble.colors import color +from bumble.device import Device, Peer +from bumble.gatt import Service +from bumble.profiles.device_information_service import DeviceInformationServiceProxy +from bumble.profiles.battery_service import BatteryServiceProxy +from bumble.profiles.gap import GenericAccessServiceProxy +from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy +from bumble.transport import open_transport_or_link + + +# ----------------------------------------------------------------------------- +async def try_show(function: Callable, *args, **kwargs) -> None: + try: + await function(*args, **kwargs) + except ProtocolError as error: + print(color('ERROR:', 'red'), error) + + +# ----------------------------------------------------------------------------- +def show_services(services: Iterable[Service]) -> None: + for service in services: + print(color(str(service), 'cyan')) + + for characteristic in service.characteristics: + print(color(' ' + str(characteristic), 'magenta')) + + +# ----------------------------------------------------------------------------- +async def show_gap_information( + gap_service: GenericAccessServiceProxy, +): + print(color('### Generic Access Profile', 'yellow')) + + if gap_service.device_name: + print( + color(' Device Name:', 'green'), + await gap_service.device_name.read_value(), + ) + + if gap_service.appearance: + print( + color(' Appearance: ', 'green'), + await gap_service.appearance.read_value(), + ) + + print() + + +# ----------------------------------------------------------------------------- +async def show_device_information( + device_information_service: DeviceInformationServiceProxy, +): + print(color('### Device Information', 'yellow')) + + if device_information_service.manufacturer_name: + print( + color(' Manufacturer Name:', 'green'), + await device_information_service.manufacturer_name.read_value(), + ) + + if device_information_service.model_number: + print( + color(' Model Number: ', 'green'), + await device_information_service.model_number.read_value(), + ) + + if device_information_service.serial_number: + print( + color(' Serial Number: ', 'green'), + await device_information_service.serial_number.read_value(), + ) + + print() + + +# ----------------------------------------------------------------------------- +async def show_battery_level( + battery_service: BatteryServiceProxy, +): + print(color('### Battery Information', 'yellow')) + + if battery_service.battery_level: + print( + color(' Battery Level: ', 'green'), + await battery_service.battery_level.read_value(), + ) + + print() + + +# ----------------------------------------------------------------------------- +async def show_tmas( + tmas: TelephonyAndMediaAudioServiceProxy, +): + print(color('### Telephony And Media Audio Service', 'yellow')) + + if tmas.role: + print( + color(' Role:', 'green'), + await tmas.role.read_value(), + ) + + print() + + +# ----------------------------------------------------------------------------- +async def show_device_info(peer, done: Optional[asyncio.Future]) -> None: + # Discover all services + print(color('### Discovering Services and Characteristics', 'magenta')) + await peer.discover_services() + for service in peer.services: + await service.discover_characteristics() + + print(color('=== Services ===', 'yellow')) + show_services(peer.services) + print() + + if gap_service := peer.create_service_proxy(GenericAccessServiceProxy): + await try_show(show_gap_information, gap_service) + + if device_information_service := peer.create_service_proxy( + DeviceInformationServiceProxy + ): + await try_show(show_device_information, device_information_service) + + if battery_service := peer.create_service_proxy(BatteryServiceProxy): + await try_show(show_battery_level, battery_service) + + if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy): + await try_show(show_tmas, tmas) + + if done is not None: + done.set_result(None) + + +# ----------------------------------------------------------------------------- +async def async_main(device_config, encrypt, transport, address_or_name): + async with await open_transport_or_link(transport) as (hci_source, hci_sink): + + # Create a device + if device_config: + device = Device.from_config_file_with_hci( + device_config, hci_source, hci_sink + ) + else: + device = Device.with_hci( + 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink + ) + await device.power_on() + + if address_or_name: + # Connect to the target peer + print(color('>>> Connecting...', 'green')) + connection = await device.connect(address_or_name) + print(color('>>> Connected', 'green')) + + # Encrypt the connection if required + if encrypt: + print(color('+++ Encrypting connection...', 'blue')) + await connection.encrypt() + print(color('+++ Encryption established', 'blue')) + + await show_device_info(Peer(connection), None) + else: + # Wait for a connection + done = asyncio.get_running_loop().create_future() + device.on( + 'connection', + lambda connection: asyncio.create_task( + show_device_info(Peer(connection), done) + ), + ) + await device.start_advertising(auto_restart=True) + + print(color('### Waiting for connection...', 'blue')) + await done + + +# ----------------------------------------------------------------------------- +@click.command() +@click.option('--device-config', help='Device configuration', type=click.Path()) +@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False) +@click.argument('transport') +@click.argument('address-or-name', required=False) +def main(device_config, encrypt, transport, address_or_name): + """ + Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified, + wait for an incoming connection. + """ + logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) + asyncio.run(async_main(device_config, encrypt, transport, address_or_name)) + + +# ----------------------------------------------------------------------------- +if __name__ == '__main__': + main() diff --git a/apps/gatt_dump.py b/apps/gatt_dump.py index a3205c00..3b3e874e 100644 --- a/apps/gatt_dump.py +++ b/apps/gatt_dump.py @@ -75,11 +75,15 @@ async def async_main(device_config, encrypt, transport, address_or_name): if address_or_name: # Connect to the target peer + print(color('>>> Connecting...', 'green')) connection = await device.connect(address_or_name) + print(color('>>> Connected', 'green')) # Encrypt the connection if required if encrypt: + print(color('+++ Encrypting connection...', 'blue')) await connection.encrypt() + print(color('+++ Encryption established', 'blue')) await dump_gatt_db(Peer(connection), None) else: diff --git a/bumble/core.py b/bumble/core.py index a566650e..f6d42dd5 100644 --- a/bumble/core.py +++ b/bumble/core.py @@ -148,6 +148,10 @@ class InvalidOperationError(BaseBumbleError, RuntimeError): """Invalid Operation Error""" +class NotSupportedError(BaseBumbleError, RuntimeError): + """Not Supported""" + + class OutOfResourcesError(BaseBumbleError, RuntimeError): """Out of Resources Error""" diff --git a/bumble/device.py b/bumble/device.py index 9a9c4dbc..1b5b4840 100644 --- a/bumble/device.py +++ b/bumble/device.py @@ -174,7 +174,7 @@ from .hci import ( phy_list_to_bits, ) from .host import Host -from .gap import GenericAccessService +from .profiles.gap import GenericAccessService from .core import ( BT_BR_EDR_TRANSPORT, BT_CENTRAL_ROLE, @@ -189,6 +189,7 @@ from .core import ( InvalidArgumentError, InvalidOperationError, InvalidStateError, + NotSupportedError, OutOfResourcesError, UnreachableError, ) @@ -1209,8 +1210,13 @@ class Peer: return self.gatt_client.get_characteristics_by_uuid(uuid, service) - def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS: - return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client)) + def create_service_proxy( + self, proxy_class: Type[_PROXY_CLASS] + ) -> Optional[_PROXY_CLASS]: + if proxy := proxy_class.from_client(self.gatt_client): + return cast(_PROXY_CLASS, proxy) + + return None async def discover_service_and_create_proxy( self, proxy_class: Type[_PROXY_CLASS] @@ -2384,6 +2390,10 @@ class Device(CompositeEventEmitter): def supports_le_extended_advertising(self): return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING) + @property + def supports_le_periodic_advertising(self): + return self.supports_le_features(LeFeatureMask.LE_PERIODIC_ADVERTISING) + async def start_advertising( self, advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, @@ -2786,6 +2796,10 @@ class Device(CompositeEventEmitter): sync_timeout: float = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT, filter_duplicates: bool = False, ) -> PeriodicAdvertisingSync: + # Check that the controller supports the feature. + if not self.supports_le_periodic_advertising: + raise NotSupportedError() + # Check that there isn't already an equivalent entry if any( sync.advertiser_address == advertiser_address and sync.sid == sid @@ -3016,7 +3030,7 @@ class Device(CompositeEventEmitter): peer_address = Address.from_string_for_transport( peer_address, transport ) - except InvalidArgumentError: + except (InvalidArgumentError, ValueError): # If the address is not parsable, assume it is a name instead logger.debug('looking for peer by name') peer_address = await self.find_peer_by_name( diff --git a/bumble/gatt_client.py b/bumble/gatt_client.py index 6d4dcf6a..68b829a3 100644 --- a/bumble/gatt_client.py +++ b/bumble/gatt_client.py @@ -253,7 +253,7 @@ class ProfileServiceProxy: SERVICE_CLASS: Type[TemplateService] @classmethod - def from_client(cls, client: Client) -> ProfileServiceProxy: + def from_client(cls, client: Client) -> Optional[ProfileServiceProxy]: return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID) @@ -405,7 +405,7 @@ class Client: if not already_known: self.services.append(service) - async def discover_services(self, uuids: Iterable[UUID] = []) -> List[ServiceProxy]: + async def discover_services(self, uuids: Iterable[UUID] = ()) -> List[ServiceProxy]: ''' See Vol 3, Part G - 4.4.1 Discover All Primary Services ''' diff --git a/bumble/profiles/bass.py b/bumble/profiles/bass.py new file mode 100644 index 00000000..a12f44de --- /dev/null +++ b/bumble/profiles/bass.py @@ -0,0 +1,94 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for + +"""LE Audio - Broadcast Audio Scan Service""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +from typing import Optional + +from bumble import device +from bumble import gatt +from bumble import gatt_client +from bumble import utils + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Constants +# ----------------------------------------------------------------------------- +class ApplicationError(utils.OpenIntEnum): + OPCODE_NOT_SUPPORTED = 0x80 + INVALID_SOURCE_ID = 0x81 + + +# ----------------------------------------------------------------------------- +class BroadcastAudioScanService(gatt.TemplateService): + UUID = gatt.GATT_BROADCAST_AUDIO_SCAN_SERVICE + + def __init__(self): + self.broadcast_audio_scan_control_point_characteristic = gatt.Characteristic( + gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC, + gatt.Characteristic.Properties.WRITE + | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE, + gatt.Characteristic.WRITEABLE, + gatt.CharacteristicValue( + write=self.on_broadcast_audio_scan_control_point_write + ), + ) + + self.broadcast_receive_state_characteristic = gatt.Characteristic( + gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC, + gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY, + gatt.Characteristic.Permissions.READABLE + | gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, + b'12', # TEST + ) + + super().__init__([self.battery_level_characteristic]) + + def on_broadcast_audio_scan_control_point_write( + self, connection: device.Connection, value: bytes + ) -> None: + pass + + +# ----------------------------------------------------------------------------- +class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy): + SERVICE_CLASS = BroadcastAudioScanService + + broadcast_audio_scan_control_point: Optional[gatt_client.CharacteristicProxy] + broadcast_receive_state: Optional[gatt_client.CharacteristicProxy] + + def __init__(self, service_proxy: gatt_client.ServiceProxy): + self.service_proxy = service_proxy + + if characteristics := service_proxy.get_characteristics_by_uuid( + gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC + ): + self.broadcast_audio_scan_control_point = characteristics[0] + else: + self.broadcast_audio_scan_control_point = None + + if characteristics := service_proxy.get_characteristics_by_uuid( + gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC + ): + self.broadcast_receive_state = characteristics[0] + else: + self.broadcast_receive_state = None diff --git a/bumble/profiles/gap.py b/bumble/profiles/gap.py new file mode 100644 index 00000000..0dd6e512 --- /dev/null +++ b/bumble/profiles/gap.py @@ -0,0 +1,110 @@ +# Copyright 2021-2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Generic Access Profile""" + +# ----------------------------------------------------------------------------- +# Imports +# ----------------------------------------------------------------------------- +import logging +import struct +from typing import Optional, Tuple, Union + +from bumble.core import Appearance +from bumble.gatt import ( + TemplateService, + Characteristic, + CharacteristicAdapter, + DelegatedCharacteristicAdapter, + UTF8CharacteristicAdapter, + GATT_GENERIC_ACCESS_SERVICE, + GATT_DEVICE_NAME_CHARACTERISTIC, + GATT_APPEARANCE_CHARACTERISTIC, +) +from bumble.gatt_client import ProfileServiceProxy, ServiceProxy + +# ----------------------------------------------------------------------------- +# Logging +# ----------------------------------------------------------------------------- +logger = logging.getLogger(__name__) + + +# ----------------------------------------------------------------------------- +# Classes +# ----------------------------------------------------------------------------- + + +# ----------------------------------------------------------------------------- +class GenericAccessService(TemplateService): + UUID = GATT_GENERIC_ACCESS_SERVICE + + def __init__( + self, device_name: str, appearance: Union[Appearance, Tuple[int, int], int] = 0 + ): + if isinstance(appearance, int): + appearance_int = appearance + elif isinstance(appearance, tuple): + appearance_int = (appearance[0] << 6) | appearance[1] + elif isinstance(appearance, Appearance): + appearance_int = int(appearance) + else: + raise TypeError() + + self.device_name_characteristic = Characteristic( + GATT_DEVICE_NAME_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + device_name.encode('utf-8')[:248], + ) + + self.appearance_characteristic = Characteristic( + GATT_APPEARANCE_CHARACTERISTIC, + Characteristic.Properties.READ, + Characteristic.READABLE, + struct.pack('