This commit is contained in:
Gilles Boccon-Gibod
2024-07-15 11:58:21 -07:00
parent c0b17d9aff
commit de7b99ce34
10 changed files with 571 additions and 8 deletions

View File

@@ -148,6 +148,10 @@ class InvalidOperationError(BaseBumbleError, RuntimeError):
"""Invalid Operation Error"""
class NotSupportedError(BaseBumbleError, RuntimeError):
"""Not Supported"""
class OutOfResourcesError(BaseBumbleError, RuntimeError):
"""Out of Resources Error"""

View File

@@ -174,7 +174,7 @@ from .hci import (
phy_list_to_bits,
)
from .host import Host
from .gap import GenericAccessService
from .profiles.gap import GenericAccessService
from .core import (
BT_BR_EDR_TRANSPORT,
BT_CENTRAL_ROLE,
@@ -189,6 +189,7 @@ from .core import (
InvalidArgumentError,
InvalidOperationError,
InvalidStateError,
NotSupportedError,
OutOfResourcesError,
UnreachableError,
)
@@ -1209,8 +1210,13 @@ class Peer:
return self.gatt_client.get_characteristics_by_uuid(uuid, service)
def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS:
return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client))
def create_service_proxy(
self, proxy_class: Type[_PROXY_CLASS]
) -> Optional[_PROXY_CLASS]:
if proxy := proxy_class.from_client(self.gatt_client):
return cast(_PROXY_CLASS, proxy)
return None
async def discover_service_and_create_proxy(
self, proxy_class: Type[_PROXY_CLASS]
@@ -2384,6 +2390,10 @@ class Device(CompositeEventEmitter):
def supports_le_extended_advertising(self):
return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING)
@property
def supports_le_periodic_advertising(self):
return self.supports_le_features(LeFeatureMask.LE_PERIODIC_ADVERTISING)
async def start_advertising(
self,
advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE,
@@ -2786,6 +2796,10 @@ class Device(CompositeEventEmitter):
sync_timeout: float = DEVICE_DEFAULT_PERIODIC_ADVERTISING_SYNC_TIMEOUT,
filter_duplicates: bool = False,
) -> PeriodicAdvertisingSync:
# Check that the controller supports the feature.
if not self.supports_le_periodic_advertising:
raise NotSupportedError()
# Check that there isn't already an equivalent entry
if any(
sync.advertiser_address == advertiser_address and sync.sid == sid
@@ -3016,7 +3030,7 @@ class Device(CompositeEventEmitter):
peer_address = Address.from_string_for_transport(
peer_address, transport
)
except InvalidArgumentError:
except (InvalidArgumentError, ValueError):
# If the address is not parsable, assume it is a name instead
logger.debug('looking for peer by name')
peer_address = await self.find_peer_by_name(

View File

@@ -253,7 +253,7 @@ class ProfileServiceProxy:
SERVICE_CLASS: Type[TemplateService]
@classmethod
def from_client(cls, client: Client) -> ProfileServiceProxy:
def from_client(cls, client: Client) -> Optional[ProfileServiceProxy]:
return ServiceProxy.from_client(cls, client, cls.SERVICE_CLASS.UUID)
@@ -405,7 +405,7 @@ class Client:
if not already_known:
self.services.append(service)
async def discover_services(self, uuids: Iterable[UUID] = []) -> List[ServiceProxy]:
async def discover_services(self, uuids: Iterable[UUID] = ()) -> List[ServiceProxy]:
'''
See Vol 3, Part G - 4.4.1 Discover All Primary Services
'''

94
bumble/profiles/bass.py Normal file
View File

@@ -0,0 +1,94 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for
"""LE Audio - Broadcast Audio Scan Service"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
from typing import Optional
from bumble import device
from bumble import gatt
from bumble import gatt_client
from bumble import utils
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class ApplicationError(utils.OpenIntEnum):
OPCODE_NOT_SUPPORTED = 0x80
INVALID_SOURCE_ID = 0x81
# -----------------------------------------------------------------------------
class BroadcastAudioScanService(gatt.TemplateService):
UUID = gatt.GATT_BROADCAST_AUDIO_SCAN_SERVICE
def __init__(self):
self.broadcast_audio_scan_control_point_characteristic = gatt.Characteristic(
gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC,
gatt.Characteristic.Properties.WRITE
| gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
write=self.on_broadcast_audio_scan_control_point_write
),
)
self.broadcast_receive_state_characteristic = gatt.Characteristic(
gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC,
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY,
gatt.Characteristic.Permissions.READABLE
| gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
b'12', # TEST
)
super().__init__([self.battery_level_characteristic])
def on_broadcast_audio_scan_control_point_write(
self, connection: device.Connection, value: bytes
) -> None:
pass
# -----------------------------------------------------------------------------
class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BroadcastAudioScanService
broadcast_audio_scan_control_point: Optional[gatt_client.CharacteristicProxy]
broadcast_receive_state: Optional[gatt_client.CharacteristicProxy]
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC
):
self.broadcast_audio_scan_control_point = characteristics[0]
else:
self.broadcast_audio_scan_control_point = None
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC
):
self.broadcast_receive_state = characteristics[0]
else:
self.broadcast_receive_state = None

110
bumble/profiles/gap.py Normal file
View File

@@ -0,0 +1,110 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Generic Access Profile"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import struct
from typing import Optional, Tuple, Union
from bumble.core import Appearance
from bumble.gatt import (
TemplateService,
Characteristic,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
UTF8CharacteristicAdapter,
GATT_GENERIC_ACCESS_SERVICE,
GATT_DEVICE_NAME_CHARACTERISTIC,
GATT_APPEARANCE_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
class GenericAccessService(TemplateService):
UUID = GATT_GENERIC_ACCESS_SERVICE
def __init__(
self, device_name: str, appearance: Union[Appearance, Tuple[int, int], int] = 0
):
if isinstance(appearance, int):
appearance_int = appearance
elif isinstance(appearance, tuple):
appearance_int = (appearance[0] << 6) | appearance[1]
elif isinstance(appearance, Appearance):
appearance_int = int(appearance)
else:
raise TypeError()
self.device_name_characteristic = Characteristic(
GATT_DEVICE_NAME_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
device_name.encode('utf-8')[:248],
)
self.appearance_characteristic = Characteristic(
GATT_APPEARANCE_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', appearance_int),
)
super().__init__(
[self.device_name_characteristic, self.appearance_characteristic]
)
# -----------------------------------------------------------------------------
class GenericAccessServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GenericAccessService
device_name: Optional[CharacteristicAdapter]
appearance: Optional[DelegatedCharacteristicAdapter]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_DEVICE_NAME_CHARACTERISTIC
):
self.device_name = UTF8CharacteristicAdapter(characteristics[0])
else:
self.device_name = None
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_APPEARANCE_CHARACTERISTIC
):
self.appearance = DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda value: Appearance.from_int(
struct.unpack_from('<H', value, 0)[0],
),
)
else:
self.appearance = None

87
bumble/profiles/tmap.py Normal file
View File

@@ -0,0 +1,87 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""LE Audio - Telephony and Media Audio Profile"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import enum
import logging
import struct
from typing import Optional
from bumble.gatt import (
TemplateService,
Characteristic,
DelegatedCharacteristicAdapter,
GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE,
GATT_TMAP_ROLE_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class Role(enum.IntFlag):
CALL_GATEWAY = 1 << 0
CALL_TERMINAL = 1 << 1
UNICAST_MEDIA_SENDER = 1 << 2
UNICAST_MEDIA_RECEIVER = 1 << 3
BROADCAST_MEDIA_SENDER = 1 << 4
BROADCAST_MEDIA_RECEIVER = 1 << 5
# -----------------------------------------------------------------------------
class TelephonyAndMediaAudioService(TemplateService):
UUID = GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE
def __init__(self, role: Role):
self.role_characteristic = Characteristic(
GATT_TMAP_ROLE_CHARACTERISTIC,
Characteristic.Properties.READ,
Characteristic.READABLE,
struct.pack('<H', int(role)),
)
super().__init__([self.role_characteristic])
# -----------------------------------------------------------------------------
class TelephonyAndMediaAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = TelephonyAndMediaAudioService
role: Optional[DelegatedCharacteristicAdapter]
def __init__(self, service_proxy: ServiceProxy):
self.service_proxy = service_proxy
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_TMAP_ROLE_CHARACTERISTIC
):
self.role = DelegatedCharacteristicAdapter(
characteristics[0],
decode=lambda value: Role(
struct.unpack_from('<H', value, 0)[0],
),
)
else:
self.role = None