Compare commits

...

48 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
c80f89d20f update cryptography dependency 2024-12-18 22:01:42 -05:00
Gilles Boccon-Gibod
62e4670a39 Merge pull request #606 from wpiet/gmap-wip
Add `Gaming Audio Profile`
2024-12-18 11:56:57 -05:00
zxzxwu
99695bb264 Merge pull request #610 from zxzxwu/cfg
Remove setup.py and setup.cfg
2024-12-19 00:53:12 +08:00
Josh Wu
eb54898106 Remove setup.py and setup.cfg 2024-12-19 00:45:13 +08:00
Gilles Boccon-Gibod
4f5ee204d2 Update code-check.yml
Hot fix because 3.13.1 somehow breaks the current version of pylint. Will revert to 3.13 without pining to 3.13.0 when pylint is fixed
2024-12-18 11:36:08 -05:00
Wojciech Pietraszewski
2552e21db1 Add characteristics initial values
Sets default values for characteristics if not specified explicitly
2024-12-04 17:00:29 +01:00
Wojciech Pietraszewski
6168f87e2f Add characteristics conditionally
Only adds a characteristic if the corresponding role has been set
2024-12-04 12:57:34 +01:00
Gilles Boccon-Gibod
ca7d2ca4df Merge pull request #607 from google/gbg/pandora-deps
move pandora deps to development
2024-12-03 09:42:44 -08:00
Gilles Boccon-Gibod
60723323e9 move pandora deps to development 2024-12-03 09:08:30 -08:00
Gilles Boccon-Gibod
3ce7b9255b Merge pull request #598 from google/gbg/gatt-class-adapter
Add a class-based GATT adapter
2024-12-03 08:46:30 -08:00
Gilles Boccon-Gibod
97fcfc2fa0 Merge pull request #604 from jmdietrich-gcx/add_encryption_key_size_to_pairing_config
Add maximum encryption key size to PairingDelegate
2024-12-03 08:30:53 -08:00
Wojciech Pietraszewski
19674e3758 Add Gaming Audio Profile
Adds initial support for `Gaming Audio Service`.
2024-12-02 11:15:10 +01:00
Jan-Marcel Dietrich
1130e1db8f Fix code formatting 2024-12-02 09:01:18 +01:00
Gilles Boccon-Gibod
37c7f3a58a Merge pull request #603 from google/gbg/fix-pair-oob
fix oob support in pair.py
2024-12-01 08:43:04 -08:00
Gilles Boccon-Gibod
0a12b2bf2e Merge pull request #585 from wpiet/vocs
Add `Volume Offset Control Service`
2024-11-29 10:41:30 -08:00
Gilles Boccon-Gibod
d014acbe63 Merge pull request #597 from google/gbg/intel-hci
intel hci
2024-11-29 10:41:10 -08:00
Jan-Marcel Dietrich
07f9997a49 Add maximum encryption key size to PairingDelegate
So far the maxmium encryption key size has been hardcoded to 16 bytes in
'send_pairing_request_command()' and 'send_pairing_response_comman()'. By
making this configurable via the PairingDelegate, one can test how devices
respond to smaller encryption key sizes. Default remains 16 bytes.
2024-11-28 14:15:51 +01:00
Gilles Boccon-Gibod
b9f91f695a fix oob support in pair.py 2024-11-27 12:58:03 -08:00
Gilles Boccon-Gibod
082d55af10 Merge pull request #599 from google/gbg/hfp-19
add super wide band constants
2024-11-25 07:47:40 -08:00
Gilles Boccon-Gibod
4c3fd5688d Merge pull request #600 from google/gbg/unify-to-bytes
only use `__bytes__` when not argument is needed.
2024-11-25 07:44:17 -08:00
Gilles Boccon-Gibod
9d3d5495ce only use __bytes__ when not argument is needed. 2024-11-23 15:56:14 -08:00
Gilles Boccon-Gibod
b3869f267c add super wide band constants 2024-11-23 09:27:03 -08:00
Gilles Boccon-Gibod
8715333706 Add a GATT adapter that uses from_bytes and __bytes__ as conversion methods. 2024-11-23 09:13:04 -08:00
Gilles Boccon-Gibod
b57096abe2 Merge pull request #595 from wpiet/aics-opcode-fix
Amend Opcode value in `Audio Input Control Service`
2024-11-23 08:56:23 -08:00
Gilles Boccon-Gibod
48685c8587 improve vendor event support 2024-11-23 08:55:50 -08:00
Wojciech Pietraszewski
100bea6b41 Fix typos
Amends the typo in the `INACTIVE` field in `Audio Input Status` characteristic.
Amends the typo in the log message of `_set_gain_settings` method.
2024-11-21 18:29:44 +01:00
Wojciech Pietraszewski
63819bf9dd Amend Opcode value in Audio Input Control Service
Corrects the Audio Input Control Point
Opcode value for `Set Gain Setting` field.
2024-11-21 16:40:49 +01:00
Wojciech Pietraszewski
6e55390930 Add Volume Offset Control Service
Adds initial support for VOCS.
2024-11-21 11:56:14 +01:00
zxzxwu
e3fdab4175 Merge pull request #593 from zxzxwu/periodic
Support Periodic Advertising
2024-11-19 17:22:37 +08:00
Josh Wu
bbcd14dbf0 Support Periodic Advertising 2024-11-19 16:27:13 +08:00
zxzxwu
01dc0d574b Merge pull request #590 from SergeantSerk/parse-scan-response-data
Correctly parse scan response from device config
2024-11-17 15:39:11 +08:00
zxzxwu
5e959d638e Merge pull request #591 from zxzxwu/auracast_scan
Improve Broadcast Scanning
2024-11-16 04:10:27 +08:00
Gilles Boccon-Gibod
8d908288c8 Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-15 10:19:20 -08:00
Josh Wu
c88b32a406 Improve Broadcast Scanning 2024-11-16 02:02:28 +08:00
zxzxwu
5a72eefb89 Merge pull request #587 from zxzxwu/device
Replace HCI member imports in device.py
2024-11-13 15:25:32 +08:00
Josh Wu
430046944b Replace HCI member import in device.py 2024-11-12 16:53:21 +08:00
zxzxwu
21d23320eb Merge pull request #584 from zxzxwu/commands6.0
Add Core Spec 6.0 new commands support mapping
2024-11-12 04:17:24 +00:00
Serkan
d0990ee04d Correctly parse scan response from device config
Parses scan response data correctly just like advertising data
2024-11-07 21:49:33 +03:00
Josh Wu
2d88e853e8 Add Core Spec 6.0 new commands support mapping 2024-11-07 14:36:54 +08:00
Gilles Boccon-Gibod
a060a70fba Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-04 13:03:57 -08:00
Gilles Boccon-Gibod
a06394ad4a Merge pull request #582 from google/gbg/580
fix #580
2024-11-04 13:03:15 -08:00
Gilles Boccon-Gibod
a1414c2b5b add unsubscribe test 2024-11-03 19:08:27 -08:00
Gilles Boccon-Gibod
b2864dac2d fix #580 2024-11-02 10:29:40 -07:00
Gilles Boccon-Gibod
b78f895143 Merge pull request #579 from jmdietrich-gcx/unsubscribe_characteristic_in_gatt_client
Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
2024-10-31 04:07:02 -07:00
zxzxwu
c4e9726828 Merge pull request #581 from zxzxwu/context
[BAP] Add missing Unspecified context type
2024-10-31 11:04:25 +00:00
Gilles Boccon-Gibod
d4b8e8348a Merge pull request #574 from google/gbg/update-python-versions
remove test for deprecated Python 3.8 and add 3.13
2024-10-31 03:44:01 -07:00
Josh Wu
19debaa52e [BAP] Add missing Unspecified context type 2024-10-31 18:11:40 +08:00
Jan-Marcel Dietrich
73fe564321 Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
GATT Client's subscribe() adds the characteristic itself as subscriber.
Therefore the characteristic has to be removed in unsubscribe(), if it's
the last subscriber. Otherwise the clean up does not work correctly and
the CCCD never is set back to 0 in the remote device.
2024-10-30 07:34:22 +01:00
57 changed files with 3484 additions and 1023 deletions

View File

@@ -16,7 +16,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
strategy: strategy:
matrix: matrix:
python-version: ["3.9", "3.10", "3.11", "3.12", "3.13"] python-version: ["3.9", "3.10", "3.11", "3.12", "3.13.0"]
fail-fast: false fail-fast: false
steps: steps:
@@ -33,7 +33,7 @@ jobs:
- name: Install dependencies - name: Install dependencies
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install ".[build,test,development,pandora]" python -m pip install ".[build,test,development]"
- name: Check - name: Check
run: | run: |
invoke project.pre-commit invoke project.pre-commit

View File

@@ -32,7 +32,7 @@ jobs:
- name: Install - name: Install
run: | run: |
python -m pip install --upgrade pip python -m pip install --upgrade pip
python -m pip install .[avatar,pandora] python -m pip install .[avatar]
- name: Rootcanal - name: Rootcanal
run: nohup python -m rootcanal > rootcanal.log & run: nohup python -m rootcanal > rootcanal.log &
- name: Test - name: Test

View File

@@ -60,7 +60,7 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter): class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass @dataclasses.dataclass
class Broadcast(pyee.EventEmitter): class Broadcast(pyee.EventEmitter):
name: str name: str | None
sync: bumble.device.PeriodicAdvertisingSync sync: bumble.device.PeriodicAdvertisingSync
rssi: int = 0 rssi: int = 0
public_broadcast_announcement: Optional[ public_broadcast_announcement: Optional[
@@ -135,7 +135,8 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address, self.sync.advertiser_address,
color(self.sync.state.name, 'green'), color(self.sync.state.name, 'green'),
) )
print(f' {color("Name", "cyan")}: {self.name}') if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance: if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}') print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -174,7 +175,7 @@ class BroadcastScanner(pyee.EventEmitter):
print(color(' Codec ID:', 'yellow')) print(color(' Codec ID:', 'yellow'))
print( print(
color(' Coding Format: ', 'green'), color(' Coding Format: ', 'green'),
subgroup.codec_id.coding_format.name, subgroup.codec_id.codec_id.name,
) )
print( print(
color(' Company ID: ', 'green'), color(' Company ID: ', 'green'),
@@ -274,13 +275,24 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning() await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if ( if not (
broadcast_name := advertisement.data.get( ads := advertisement.data.get_all(
bumble.core.AdvertisingData.BROADCAST_NAME bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
) )
) is None: ) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
)
):
return return
assert isinstance(broadcast_name, str)
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
assert isinstance(broadcast_name, str) or broadcast_name is None
if broadcast := self.broadcasts.get(advertisement.address): if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement) broadcast.update(advertisement)
@@ -291,7 +303,7 @@ class BroadcastScanner(pyee.EventEmitter):
) )
async def on_new_broadcast( async def on_new_broadcast(
self, name: str, advertisement: bumble.device.Advertisement self, name: str | None, advertisement: bumble.device.Advertisement
) -> None: ) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync( periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address, advertiser_address=advertisement.address,
@@ -299,10 +311,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout, sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates, filter_duplicates=self.filter_duplicates,
) )
broadcast = self.Broadcast( broadcast = self.Broadcast(name, periodic_advertising_sync)
name,
periodic_advertising_sync,
)
broadcast.update(advertisement) broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))

View File

@@ -199,7 +199,7 @@ def log_stats(title, stats, precision=2):
stats_min = min(stats) stats_min = min(stats)
stats_max = max(stats) stats_max = max(stats)
stats_avg = statistics.mean(stats) stats_avg = statistics.mean(stats)
stats_stdev = statistics.stdev(stats) stats_stdev = statistics.stdev(stats) if len(stats) >= 2 else 0
logging.info( logging.info(
color( color(
( (
@@ -468,6 +468,7 @@ class Ping:
for run in range(self.repeat + 1): for run in range(self.repeat + 1):
self.done.clear() self.done.clear()
self.ping_times = []
if run > 0 and self.repeat and self.repeat_delay: if run > 0 and self.repeat and self.repeat_delay:
logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green')) logging.info(color(f'*** Repeat delay: {self.repeat_delay}', 'green'))

View File

@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]), return_parameters=bytes([hci.HCI_SUCCESS]),
) )
# Return a packet with 'respond to sender' set to True # Return a packet with 'respond to sender' set to True
return (response.to_bytes(), True) return (bytes(response), True)
return None return None

View File

@@ -486,7 +486,12 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine): def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration) if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
pcm = decode( pcm = decode(
codec_config.frame_duration.us, codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count, codec_config.audio_channel_allocation.channel_count,
@@ -495,11 +500,17 @@ class Speaker:
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm)) self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None: def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING: if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE: if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on( ase.cis_link.abort_on(
'disconnection', 'disconnection',
lc3_source_task( lc3_source_task(
@@ -514,10 +525,17 @@ class Speaker:
), ),
) )
else: else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase) ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED: elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration if (
assert isinstance(codec_config, bap.CodecSpecificConfiguration) not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
if ase.role == ascs.AudioRole.SOURCE: if ase.role == ascs.AudioRole.SOURCE:
setup_encoders( setup_encoders(
codec_config.sampling_frequency.hz, codec_config.sampling_frequency.hz,

View File

@@ -373,7 +373,9 @@ async def pair(
shared_data = ( shared_data = (
None None
if oob == '-' if oob == '-'
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob))) else OobData.from_ad(
AdvertisingData.from_bytes(bytes.fromhex(oob))
).shared_data
) )
legacy_context = OobLegacyContext() legacy_context = OobLegacyContext()
oob_contexts = PairingConfig.OobConfig( oob_contexts = PairingConfig.OobConfig(
@@ -381,16 +383,19 @@ async def pair(
peer_data=shared_data, peer_data=shared_data,
legacy_context=legacy_context, legacy_context=legacy_context,
) )
oob_data = OobData(
address=device.random_address,
shared_data=shared_data,
legacy_context=legacy_context,
)
print(color('@@@-----------------------------------', 'yellow')) print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow')) print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow')) if shared_data is None:
oob_data = OobData(
address=device.random_address, shared_data=our_oob_context.share()
)
print(
color(
f'@@@ SHARE: {bytes(oob_data.to_ad()).hex()}',
'yellow',
)
)
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow')) print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
print(color('@@@-----------------------------------', 'yellow')) print(color('@@@-----------------------------------', 'yellow'))
else: else:
oob_contexts = None oob_contexts = None

View File

@@ -144,18 +144,18 @@ class Printer:
help='Format of the input file', help='Format of the input file',
) )
@click.option( @click.option(
'--vendors', '--vendor',
type=click.Choice(['android', 'zephyr']), type=click.Choice(['android', 'zephyr']),
multiple=True, multiple=True,
help='Support vendor-specific commands (list one or more)', help='Support vendor-specific commands (list one or more)',
) )
@click.argument('filename') @click.argument('filename')
# pylint: disable=redefined-builtin # pylint: disable=redefined-builtin
def main(format, vendors, filename): def main(format, vendor, filename):
for vendor in vendors: for vendor_name in vendor:
if vendor == 'android': if vendor_name == 'android':
import bumble.vendor.android.hci import bumble.vendor.android.hci
elif vendor == 'zephyr': elif vendor_name == 'zephyr':
import bumble.vendor.zephyr.hci import bumble.vendor.zephyr.hci
input = open(filename, 'rb') input = open(filename, 'rb')
@@ -180,7 +180,7 @@ def main(format, vendors, filename):
else: else:
printer.print(color("[TRUNCATED]", "red")) printer.print(color("[TRUNCATED]", "red"))
except Exception as error: except Exception as error:
logger.exception() logger.exception('')
print(color(f'!!! {error}', 'red')) print(color(f'!!! {error}', 'red'))

View File

@@ -57,6 +57,7 @@ if TYPE_CHECKING:
# pylint: disable=line-too-long # pylint: disable=line-too-long
ATT_CID = 0x04 ATT_CID = 0x04
ATT_PSM = 0x001F
ATT_ERROR_RESPONSE = 0x01 ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02 ATT_EXCHANGE_MTU_REQUEST = 0x02
@@ -291,9 +292,6 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset): def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property @property
def is_command(self): def is_command(self):
return ((self.op_code >> 6) & 1) == 1 return ((self.op_code >> 6) & 1) == 1
@@ -303,7 +301,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1 return ((self.op_code >> 7) & 1) == 1
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.pdu
def __str__(self): def __str__(self):
result = color(self.name, 'yellow') result = color(self.name, 'yellow')
@@ -759,13 +757,13 @@ class AttributeValue:
def __init__( def __init__(
self, self,
read: Union[ read: Union[
Callable[[Optional[Connection]], bytes], Callable[[Optional[Connection]], Any],
Callable[[Optional[Connection]], Awaitable[bytes]], Callable[[Optional[Connection]], Awaitable[Any]],
None, None,
] = None, ] = None,
write: Union[ write: Union[
Callable[[Optional[Connection], bytes], None], Callable[[Optional[Connection], Any], None],
Callable[[Optional[Connection], bytes], Awaitable[None]], Callable[[Optional[Connection], Any], Awaitable[None]],
None, None,
] = None, ] = None,
): ):
@@ -824,13 +822,13 @@ class Attribute(EventEmitter):
READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION READ_REQUIRES_AUTHORIZATION = Permissions.READ_REQUIRES_AUTHORIZATION
WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION WRITE_REQUIRES_AUTHORIZATION = Permissions.WRITE_REQUIRES_AUTHORIZATION
value: Union[bytes, AttributeValue] value: Any
def __init__( def __init__(
self, self,
attribute_type: Union[str, bytes, UUID], attribute_type: Union[str, bytes, UUID],
permissions: Union[str, Attribute.Permissions], permissions: Union[str, Attribute.Permissions],
value: Union[str, bytes, AttributeValue] = b'', value: Any = b'',
) -> None: ) -> None:
EventEmitter.__init__(self) EventEmitter.__init__(self)
self.handle = 0 self.handle = 0
@@ -848,11 +846,7 @@ class Attribute(EventEmitter):
else: else:
self.type = attribute_type self.type = attribute_type
# Convert the value to a byte array self.value = value
if isinstance(value, str):
self.value = bytes(value, 'utf-8')
else:
self.value = value
def encode_value(self, value: Any) -> bytes: def encode_value(self, value: Any) -> bytes:
return value return value
@@ -895,6 +889,8 @@ class Attribute(EventEmitter):
else: else:
value = self.value value = self.value
self.emit('read', connection, value)
return self.encode_value(value) return self.encode_value(value)
async def write_value(self, connection: Connection, value_bytes: bytes) -> None: async def write_value(self, connection: Connection, value_bytes: bytes) -> None:

View File

@@ -314,7 +314,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}' f'{color("CONTROLLER -> HOST", "green")}: {packet}'
) )
if self.host: if self.host:
self.host.on_packet(packet.to_bytes()) self.host.on_packet(bytes(packet))
# This method allows the controller to emulate the same API as a transport source # This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self): async def wait_for_termination(self):
@@ -1192,7 +1192,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
''' '''
bd_addr = ( bd_addr = (
self._public_address.to_bytes() bytes(self._public_address)
if self._public_address is not None if self._public_address is not None
else bytes(6) else bytes(6)
) )
@@ -1543,6 +1543,41 @@ class Controller:
} }
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command): def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
''' '''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1557,6 +1592,27 @@ class Controller:
''' '''
return struct.pack('<BB', HCI_SUCCESS, 0xF0) return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command): def on_hci_le_read_transmit_power_command(self, _command):
''' '''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

File diff suppressed because it is too large Load Diff

View File

@@ -20,6 +20,8 @@ Common types for drivers.
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import abc import abc
from bumble import core
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Classes # Classes

View File

@@ -11,18 +11,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
"""
Support for Intel USB controllers.
Loosely based on the Fuchsia OS implementation.
"""
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import collections
import dataclasses
import logging import logging
import os
import pathlib
import platform
import struct
from typing import Any, Deque, Optional, TYPE_CHECKING
from bumble import core
from bumble.drivers import common from bumble.drivers import common
from bumble.hci import ( from bumble import hci
hci_vendor_command_op_code, # type: ignore from bumble import utils
HCI_Command,
HCI_Reset_Command, if TYPE_CHECKING:
) from bumble.host import Host
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Logging # Logging
@@ -34,39 +49,328 @@ logger = logging.getLogger(__name__)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
INTEL_USB_PRODUCTS = { INTEL_USB_PRODUCTS = {
# Intel AX210 (0x8087, 0x0032), # AX210
(0x8087, 0x0032), (0x8087, 0x0036), # BE200
# Intel BE200
(0x8087, 0x0036),
} }
INTEL_FW_IMAGE_NAMES = [
"ibt-0040-0041",
"ibt-0040-1020",
"ibt-0040-1050",
"ibt-0040-2120",
"ibt-0040-4150",
"ibt-0041-0041",
"ibt-0180-0041",
"ibt-0180-1050",
"ibt-0180-4150",
"ibt-0291-0291",
"ibt-1040-0041",
"ibt-1040-1020",
"ibt-1040-1050",
"ibt-1040-2120",
"ibt-1040-4150",
]
INTEL_FIRMWARE_DIR_ENV = "BUMBLE_INTEL_FIRMWARE_DIR"
INTEL_LINUX_FIRMWARE_DIR = "/lib/firmware/intel"
_MAX_FRAGMENT_SIZE = 252
_POST_RESET_DELAY = 0.2
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# HCI Commands # HCI Commands
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
HCI_INTEL_DDC_CONFIG_WRITE_COMMAND = hci_vendor_command_op_code(0xFC8B) # type: ignore HCI_INTEL_WRITE_DEVICE_CONFIG_COMMAND = hci.hci_vendor_command_op_code(0x008B)
HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD = [0x03, 0xE4, 0x02, 0x00] HCI_INTEL_READ_VERSION_COMMAND = hci.hci_vendor_command_op_code(0x0005)
HCI_INTEL_RESET_COMMAND = hci.hci_vendor_command_op_code(0x0001)
HCI_INTEL_SECURE_SEND_COMMAND = hci.hci_vendor_command_op_code(0x0009)
HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND = hci.hci_vendor_command_op_code(0x000E)
HCI_Command.register_commands(globals()) hci.HCI_Command.register_commands(globals())
@HCI_Command.command( # type: ignore @hci.HCI_Command.command(
fields=[("params", "*")], fields=[
("param0", 1),
],
return_parameters_fields=[ return_parameters_fields=[
("params", "*"), ("status", hci.STATUS_SPEC),
("tlv", "*"),
], ],
) )
class Hci_Intel_DDC_Config_Write_Command(HCI_Command): class HCI_Intel_Read_Version_Command(hci.HCI_Command):
pass pass
@hci.HCI_Command.command(
fields=[("data_type", 1), ("data", "*")],
return_parameters_fields=[
("status", 1),
],
)
class Hci_Intel_Secure_Send_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[
("reset_type", 1),
("patch_enable", 1),
("ddc_reload", 1),
("boot_option", 1),
("boot_address", 4),
],
return_parameters_fields=[
("data", "*"),
],
)
class HCI_Intel_Reset_Command(hci.HCI_Command):
pass
@hci.HCI_Command.command(
fields=[("data", "*")],
return_parameters_fields=[
("status", hci.STATUS_SPEC),
("params", "*"),
],
)
class Hci_Intel_Write_Device_Config_Command(hci.HCI_Command):
pass
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def intel_firmware_dir() -> pathlib.Path:
"""
Returns:
A path to a subdir of the project data dir for Intel firmware.
The directory is created if it doesn't exist.
"""
from bumble.drivers import project_data_dir
p = project_data_dir() / "firmware" / "intel"
p.mkdir(parents=True, exist_ok=True)
return p
def _find_binary_path(file_name: str) -> pathlib.Path | None:
# First check if an environment variable is set
if INTEL_FIRMWARE_DIR_ENV in os.environ:
if (
path := pathlib.Path(os.environ[INTEL_FIRMWARE_DIR_ENV]) / file_name
).is_file():
logger.debug(f"{file_name} found in env dir")
return path
# When the environment variable is set, don't look elsewhere
return None
# Then, look where the firmware download tool writes by default
if (path := intel_firmware_dir() / file_name).is_file():
logger.debug(f"{file_name} found in project data dir")
return path
# Then, look in the package's driver directory
if (path := pathlib.Path(__file__).parent / "intel_fw" / file_name).is_file():
logger.debug(f"{file_name} found in package dir")
return path
# On Linux, check the system's FW directory
if (
platform.system() == "Linux"
and (path := pathlib.Path(INTEL_LINUX_FIRMWARE_DIR) / file_name).is_file()
):
logger.debug(f"{file_name} found in Linux system FW dir")
return path
# Finally look in the current directory
if (path := pathlib.Path.cwd() / file_name).is_file():
logger.debug(f"{file_name} found in CWD")
return path
return None
def _parse_tlv(data: bytes) -> list[tuple[ValueType, Any]]:
result: list[tuple[ValueType, Any]] = []
while len(data) >= 2:
value_type = ValueType(data[0])
value_length = data[1]
value = data[2 : 2 + value_length]
typed_value: Any
if value_type == ValueType.END:
break
if value_type in (ValueType.CNVI, ValueType.CNVR):
(v,) = struct.unpack("<I", value)
typed_value = (
(((v >> 0) & 0xF) << 12)
| (((v >> 4) & 0xF) << 0)
| (((v >> 8) & 0xF) << 4)
| (((v >> 24) & 0xF) << 8)
)
elif value_type == ValueType.HARDWARE_INFO:
(v,) = struct.unpack("<I", value)
typed_value = HardwareInfo(
HardwarePlatform((v >> 8) & 0xFF), HardwareVariant((v >> 16) & 0x3F)
)
elif value_type in (
ValueType.USB_VENDOR_ID,
ValueType.USB_PRODUCT_ID,
ValueType.DEVICE_REVISION,
):
(typed_value,) = struct.unpack("<H", value)
elif value_type == ValueType.CURRENT_MODE_OF_OPERATION:
typed_value = ModeOfOperation(value[0])
elif value_type in (
ValueType.BUILD_TYPE,
ValueType.BUILD_NUMBER,
ValueType.SECURE_BOOT,
ValueType.OTP_LOCK,
ValueType.API_LOCK,
ValueType.DEBUG_LOCK,
ValueType.SECURE_BOOT_ENGINE_TYPE,
):
typed_value = value[0]
elif value_type == ValueType.TIMESTAMP:
typed_value = Timestamp(value[0], value[1])
elif value_type == ValueType.FIRMWARE_BUILD:
typed_value = FirmwareBuild(value[0], Timestamp(value[1], value[2]))
elif value_type == ValueType.BLUETOOTH_ADDRESS:
typed_value = hci.Address(
value, address_type=hci.Address.PUBLIC_DEVICE_ADDRESS
)
else:
typed_value = value
result.append((value_type, typed_value))
data = data[2 + value_length :]
return result
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class DriverError(core.BaseBumbleError):
def __init__(self, message: str) -> None:
super().__init__(message)
self.message = message
def __str__(self) -> str:
return f"IntelDriverError({self.message})"
class ValueType(utils.OpenIntEnum):
END = 0x00
CNVI = 0x10
CNVR = 0x11
HARDWARE_INFO = 0x12
DEVICE_REVISION = 0x16
CURRENT_MODE_OF_OPERATION = 0x1C
USB_VENDOR_ID = 0x17
USB_PRODUCT_ID = 0x18
TIMESTAMP = 0x1D
BUILD_TYPE = 0x1E
BUILD_NUMBER = 0x1F
SECURE_BOOT = 0x28
OTP_LOCK = 0x2A
API_LOCK = 0x2B
DEBUG_LOCK = 0x2C
FIRMWARE_BUILD = 0x2D
SECURE_BOOT_ENGINE_TYPE = 0x2F
BLUETOOTH_ADDRESS = 0x30
class HardwarePlatform(utils.OpenIntEnum):
INTEL_37 = 0x37
class HardwareVariant(utils.OpenIntEnum):
# This is a just a partial list.
# Add other constants here as new hardware is encountered and tested.
TYPHOON_PEAK = 0x17
GALE_PEAK = 0x1C
@dataclasses.dataclass
class HardwareInfo:
platform: HardwarePlatform
variant: HardwareVariant
@dataclasses.dataclass
class Timestamp:
week: int
year: int
@dataclasses.dataclass
class FirmwareBuild:
build_number: int
timestamp: Timestamp
class ModeOfOperation(utils.OpenIntEnum):
BOOTLOADER = 0x01
INTERMEDIATE = 0x02
OPERATIONAL = 0x03
class SecureBootEngineType(utils.OpenIntEnum):
RSA = 0x00
ECDSA = 0x01
@dataclasses.dataclass
class BootParams:
css_header_offset: int
css_header_size: int
pki_offset: int
pki_size: int
sig_offset: int
sig_size: int
write_offset: int
_BOOT_PARAMS = {
SecureBootEngineType.RSA: BootParams(0, 128, 128, 256, 388, 256, 964),
SecureBootEngineType.ECDSA: BootParams(644, 128, 772, 96, 868, 96, 964),
}
class Driver(common.Driver): class Driver(common.Driver):
def __init__(self, host): def __init__(self, host: Host) -> None:
self.host = host self.host = host
self.max_in_flight_firmware_load_commands = 1
self.pending_firmware_load_commands: Deque[hci.HCI_Command] = (
collections.deque()
)
self.can_send_firmware_load_command = asyncio.Event()
self.can_send_firmware_load_command.set()
self.firmware_load_complete = asyncio.Event()
self.reset_complete = asyncio.Event()
# Parse configuration options from the driver name.
self.ddc_addon: Optional[bytes] = None
self.ddc_override: Optional[bytes] = None
driver = host.hci_metadata.get("driver")
if driver is not None and driver.startswith("intel/"):
for key, value in [
key_eq_value.split(":") for key_eq_value in driver[6:].split("+")
]:
if key == "ddc_addon":
self.ddc_addon = bytes.fromhex(value)
elif key == "ddc_override":
self.ddc_override = bytes.fromhex(value)
@staticmethod @staticmethod
def check(host): def check(host: Host) -> bool:
driver = host.hci_metadata.get("driver") driver = host.hci_metadata.get("driver")
if driver == "intel": if driver == "intel" or driver is not None and driver.startswith("intel/"):
return True return True
vendor_id = host.hci_metadata.get("vendor_id") vendor_id = host.hci_metadata.get("vendor_id")
@@ -85,18 +389,283 @@ class Driver(common.Driver):
return True return True
@classmethod @classmethod
async def for_host(cls, host, force=False): # type: ignore async def for_host(cls, host: Host, force: bool = False):
# Only instantiate this driver if explicitly selected # Only instantiate this driver if explicitly selected
if not force and not cls.check(host): if not force and not cls.check(host):
return None return None
return cls(host) return cls(host)
async def init_controller(self): def on_packet(self, packet: bytes) -> None:
"""Handler for event packets that are received from an ACL channel"""
event = hci.HCI_Event.from_bytes(packet)
if not isinstance(event, hci.HCI_Command_Complete_Event):
self.host.on_hci_event_packet(event)
return
if not event.return_parameters == hci.HCI_SUCCESS:
raise DriverError("HCI_Command_Complete_Event error")
if self.max_in_flight_firmware_load_commands != event.num_hci_command_packets:
logger.debug(
"max_in_flight_firmware_load_commands update: "
f"{event.num_hci_command_packets}"
)
self.max_in_flight_firmware_load_commands = event.num_hci_command_packets
logger.debug(f"event: {event}")
self.pending_firmware_load_commands.popleft()
in_flight = len(self.pending_firmware_load_commands)
logger.debug(f"event received, {in_flight} still in flight")
if in_flight < self.max_in_flight_firmware_load_commands:
self.can_send_firmware_load_command.set()
async def send_firmware_load_command(self, command: hci.HCI_Command) -> None:
# Wait until we can send.
await self.can_send_firmware_load_command.wait()
# Send the command and adjust counters.
self.host.send_hci_packet(command)
self.pending_firmware_load_commands.append(command)
in_flight = len(self.pending_firmware_load_commands)
if in_flight >= self.max_in_flight_firmware_load_commands:
logger.debug(f"max commands in flight reached [{in_flight}]")
self.can_send_firmware_load_command.clear()
async def send_firmware_data(self, data_type: int, data: bytes) -> None:
while data:
fragment_size = min(len(data), _MAX_FRAGMENT_SIZE)
fragment = data[:fragment_size]
data = data[fragment_size:]
await self.send_firmware_load_command(
Hci_Intel_Secure_Send_Command(data_type=data_type, data=fragment)
)
async def load_firmware(self) -> None:
self.host.ready = True self.host.ready = True
await self.host.send_command(HCI_Reset_Command(), check_result=True) device_info = await self.read_device_info()
await self.host.send_command( logger.debug(
Hci_Intel_DDC_Config_Write_Command( "device info: \n%s",
params=HCI_INTEL_DDC_CONFIG_WRITE_PAYLOAD "\n".join(
[
f" {value_type.name}: {value}"
for value_type, value in device_info.items()
]
),
)
# Check if the firmware is already loaded.
if (
device_info.get(ValueType.CURRENT_MODE_OF_OPERATION)
== ModeOfOperation.OPERATIONAL
):
logger.debug("firmware already loaded")
return
# We only support some platforms and variants.
hardware_info = device_info.get(ValueType.HARDWARE_INFO)
if hardware_info is None:
raise DriverError("hardware info missing")
if hardware_info.platform != HardwarePlatform.INTEL_37:
raise DriverError("hardware platform not supported")
if hardware_info.variant not in (
HardwareVariant.TYPHOON_PEAK,
HardwareVariant.GALE_PEAK,
):
raise DriverError("hardware variant not supported")
# Compute the firmware name.
if ValueType.CNVI not in device_info or ValueType.CNVR not in device_info:
raise DriverError("insufficient device info, missing CNVI or CNVR")
firmware_base_name = (
"ibt-"
f"{device_info[ValueType.CNVI]:04X}-"
f"{device_info[ValueType.CNVR]:04X}"
)
logger.debug(f"FW base name: {firmware_base_name}")
firmware_name = f"{firmware_base_name}.sfi"
firmware_path = _find_binary_path(firmware_name)
if not firmware_path:
logger.warning(f"Firmware file {firmware_name} not found")
logger.warning("See https://google.github.io/bumble/drivers/intel.html")
return None
logger.debug(f"loading firmware from {firmware_path}")
firmware_image = firmware_path.read_bytes()
engine_type = device_info.get(ValueType.SECURE_BOOT_ENGINE_TYPE)
if engine_type is None:
raise DriverError("secure boot engine type missing")
if engine_type not in _BOOT_PARAMS:
raise DriverError("secure boot engine type not supported")
boot_params = _BOOT_PARAMS[engine_type]
if len(firmware_image) < boot_params.write_offset:
raise DriverError("firmware image too small")
# Register to receive vendor events.
def on_vendor_event(event: hci.HCI_Vendor_Event):
logger.debug(f"vendor event: {event}")
event_type = event.parameters[0]
if event_type == 0x02:
# Boot event
logger.debug("boot complete")
self.reset_complete.set()
elif event_type == 0x06:
# Firmware load event
logger.debug("download complete")
self.firmware_load_complete.set()
else:
logger.debug(f"ignoring vendor event type {event_type}")
self.host.on("vendor_event", on_vendor_event)
# We need to temporarily intercept packets from the controller,
# because they are formatted as HCI event packets but are received
# on the ACL channel, so the host parser would get confused.
saved_on_packet = self.host.on_packet
self.host.on_packet = self.on_packet # type: ignore
self.firmware_load_complete.clear()
# Send the CSS header
data = firmware_image[
boot_params.css_header_offset : boot_params.css_header_offset
+ boot_params.css_header_size
]
await self.send_firmware_data(0x00, data)
# Send the PKI header
data = firmware_image[
boot_params.pki_offset : boot_params.pki_offset + boot_params.pki_size
]
await self.send_firmware_data(0x03, data)
# Send the Signature header
data = firmware_image[
boot_params.sig_offset : boot_params.sig_offset + boot_params.sig_size
]
await self.send_firmware_data(0x02, data)
# Send the rest of the image.
# The payload consists of command objects, which are sent when they add up
# to a multiple of 4 bytes.
boot_address = 0
offset = boot_params.write_offset
fragment_size = 0
while offset + 3 < len(firmware_image):
(command_opcode,) = struct.unpack_from(
"<H", firmware_image, offset + fragment_size
)
command_size = firmware_image[offset + fragment_size + 2]
if command_opcode == HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND:
(boot_address,) = struct.unpack_from(
"<I", firmware_image, offset + fragment_size + 3
)
logger.debug(
"found HCI_INTEL_WRITE_BOOT_PARAMS_COMMAND, "
f"boot_address={boot_address}"
)
fragment_size += 3 + command_size
if fragment_size % 4 == 0:
await self.send_firmware_data(
0x01, firmware_image[offset : offset + fragment_size]
)
logger.debug(f"sent {fragment_size} bytes")
offset += fragment_size
fragment_size = 0
# Wait for the firmware loading to be complete.
logger.debug("waiting for firmware to be loaded")
await self.firmware_load_complete.wait()
logger.debug("firmware loaded")
# Restore the original packet handler.
self.host.on_packet = saved_on_packet # type: ignore
# Reset
self.reset_complete.clear()
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x00,
patch_enable=0x01,
ddc_reload=0x00,
boot_option=0x01,
boot_address=boot_address,
) )
) )
logger.debug("waiting for reset completion")
await self.reset_complete.wait()
logger.debug("reset complete")
# Load the device config if there is one.
if self.ddc_override:
logger.debug("loading overridden DDC")
await self.load_device_config(self.ddc_override)
else:
ddc_name = f"{firmware_base_name}.ddc"
ddc_path = _find_binary_path(ddc_name)
if ddc_path:
logger.debug(f"loading DDC from {ddc_path}")
ddc_data = ddc_path.read_bytes()
await self.load_device_config(ddc_data)
if self.ddc_addon:
logger.debug("loading DDC addon")
await self.load_device_config(self.ddc_addon)
async def load_device_config(self, ddc_data: bytes) -> None:
while ddc_data:
ddc_len = 1 + ddc_data[0]
ddc_payload = ddc_data[:ddc_len]
await self.host.send_command(
Hci_Intel_Write_Device_Config_Command(data=ddc_payload)
)
ddc_data = ddc_data[ddc_len:]
async def reboot_bootloader(self) -> None:
self.host.send_hci_packet(
HCI_Intel_Reset_Command(
reset_type=0x01,
patch_enable=0x01,
ddc_reload=0x01,
boot_option=0x00,
boot_address=0,
)
)
await asyncio.sleep(_POST_RESET_DELAY)
async def read_device_info(self) -> dict[ValueType, Any]:
self.host.ready = True
response = await self.host.send_command(hci.HCI_Reset_Command())
if not (
isinstance(response, hci.HCI_Command_Complete_Event)
and response.return_parameters
in (hci.HCI_UNKNOWN_HCI_COMMAND_ERROR, hci.HCI_SUCCESS)
):
# When the controller is in operational mode, the response is a
# successful response.
# When the controller is in bootloader mode,
# HCI_UNKNOWN_HCI_COMMAND_ERROR is the expected response. Anything
# else is a failure.
logger.warning(f"unexpected response: {response}")
raise DriverError("unexpected HCI response")
# Read the firmware version.
response = await self.host.send_command(
HCI_Intel_Read_Version_Command(param0=0xFF)
)
if not isinstance(response, hci.HCI_Command_Complete_Event):
raise DriverError("unexpected HCI response")
if response.return_parameters.status != 0: # type: ignore
raise DriverError("HCI_Intel_Read_Version_Command error")
tlvs = _parse_tlv(response.return_parameters.tlv) # type: ignore
# Convert the list to a dict. That's Ok here because we only expect each type
# to appear just once.
return dict(tlvs)
async def init_controller(self):
await self.load_firmware()

View File

@@ -28,12 +28,15 @@ import functools
import logging import logging
import struct import struct
from typing import ( from typing import (
Any,
Callable, Callable,
Dict, Dict,
Iterable, Iterable,
List, List,
Optional, Optional,
Sequence, Sequence,
SupportsBytes,
Type,
Union, Union,
TYPE_CHECKING, TYPE_CHECKING,
) )
@@ -41,6 +44,7 @@ from typing import (
from bumble.colors import color from bumble.colors import color
from bumble.core import BaseBumbleError, UUID from bumble.core import BaseBumbleError, UUID
from bumble.att import Attribute, AttributeValue from bumble.att import Attribute, AttributeValue
from bumble.utils import ByteSerializable
if TYPE_CHECKING: if TYPE_CHECKING:
from bumble.gatt_client import AttributeProxy from bumble.gatt_client import AttributeProxy
@@ -275,6 +279,13 @@ GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC = UUID.from_16_bits(0x2BCC, 'Sou
GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts') GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts') GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
# Gaming Audio Service (GMAS)
GATT_GMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2C00, 'GMAP Role')
GATT_UGG_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C01, 'UGG Features')
GATT_UGT_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C02, 'UGT Features')
GATT_BGS_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C03, 'BGS Features')
GATT_BGR_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2C04, 'BGR Features')
# Hearing Access Service # Hearing Access Service
GATT_HEARING_AID_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2BDA, 'Hearing Aid Features') GATT_HEARING_AID_FEATURES_CHARACTERISTIC = UUID.from_16_bits(0x2BDA, 'Hearing Aid Features')
GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BDB, 'Hearing Aid Preset Control Point') GATT_HEARING_AID_PRESET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BDB, 'Hearing Aid Preset Control Point')
@@ -343,7 +354,7 @@ class Service(Attribute):
def __init__( def __init__(
self, self,
uuid: Union[str, UUID], uuid: Union[str, UUID],
characteristics: List[Characteristic], characteristics: Iterable[Characteristic],
primary=True, primary=True,
included_services: Iterable[Service] = (), included_services: Iterable[Service] = (),
) -> None: ) -> None:
@@ -362,7 +373,7 @@ class Service(Attribute):
) )
self.uuid = uuid self.uuid = uuid
self.included_services = list(included_services) self.included_services = list(included_services)
self.characteristics = characteristics[:] self.characteristics = list(characteristics)
self.primary = primary self.primary = primary
def get_advertising_data(self) -> Optional[bytes]: def get_advertising_data(self) -> Optional[bytes]:
@@ -393,7 +404,7 @@ class TemplateService(Service):
def __init__( def __init__(
self, self,
characteristics: List[Characteristic], characteristics: Iterable[Characteristic],
primary: bool = True, primary: bool = True,
included_services: Iterable[Service] = (), included_services: Iterable[Service] = (),
) -> None: ) -> None:
@@ -410,7 +421,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None: def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack( declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes() '<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
) )
super().__init__( super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
@@ -490,7 +501,7 @@ class Characteristic(Attribute):
uuid: Union[str, bytes, UUID], uuid: Union[str, bytes, UUID],
properties: Characteristic.Properties, properties: Characteristic.Properties,
permissions: Union[str, Attribute.Permissions], permissions: Union[str, Attribute.Permissions],
value: Union[str, bytes, CharacteristicValue] = b'', value: Any = b'',
descriptors: Sequence[Descriptor] = (), descriptors: Sequence[Descriptor] = (),
): ):
super().__init__(uuid, permissions, value) super().__init__(uuid, permissions, value)
@@ -525,7 +536,11 @@ class CharacteristicDeclaration(Attribute):
characteristic: Characteristic characteristic: Characteristic
def __init__(self, characteristic: Characteristic, value_handle: int) -> None: def __init__(
self,
characteristic: Characteristic,
value_handle: int,
) -> None:
declaration_bytes = ( declaration_bytes = (
struct.pack('<BH', characteristic.properties, value_handle) struct.pack('<BH', characteristic.properties, value_handle)
+ characteristic.uuid.to_pdu_bytes() + characteristic.uuid.to_pdu_bytes()
@@ -705,7 +720,7 @@ class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
''' '''
Adapter that packs/unpacks characteristic values according to a standard Adapter that packs/unpacks characteristic values according to a standard
Python `struct` format. Python `struct` format.
The adapted `read_value` and `write_value` methods return/accept aa dictionary which The adapted `read_value` and `write_value` methods return/accept a dictionary which
is packed/unpacked according to format, with the arguments extracted from the is packed/unpacked according to format, with the arguments extracted from the
dictionary by key, in the same order as they occur in the `keys` parameter. dictionary by key, in the same order as they occur in the `keys` parameter.
''' '''
@@ -735,6 +750,24 @@ class UTF8CharacteristicAdapter(CharacteristicAdapter):
return value.decode('utf-8') return value.decode('utf-8')
# -----------------------------------------------------------------------------
class SerializableCharacteristicAdapter(CharacteristicAdapter):
'''
Adapter that converts any class to/from bytes using the class'
`to_bytes` and `__bytes__` methods, respectively.
'''
def __init__(self, characteristic, cls: Type[ByteSerializable]):
super().__init__(characteristic)
self.cls = cls
def encode_value(self, value: SupportsBytes) -> bytes:
return bytes(value)
def decode_value(self, value: bytes) -> Any:
return self.cls.from_bytes(value)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Descriptor(Attribute): class Descriptor(Attribute):
''' '''

View File

@@ -292,7 +292,7 @@ class Client:
logger.debug( logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}' f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
) )
self.send_gatt_pdu(command.to_bytes()) self.send_gatt_pdu(bytes(command))
async def send_request(self, request: ATT_PDU): async def send_request(self, request: ATT_PDU):
logger.debug( logger.debug(
@@ -310,7 +310,7 @@ class Client:
self.pending_request = request self.pending_request = request
try: try:
self.send_gatt_pdu(request.to_bytes()) self.send_gatt_pdu(bytes(request))
response = await asyncio.wait_for( response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT self.pending_response, GATT_REQUEST_TIMEOUT
) )
@@ -328,7 +328,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] ' f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}' f'{confirmation}'
) )
self.send_gatt_pdu(confirmation.to_bytes()) self.send_gatt_pdu(bytes(confirmation))
async def request_mtu(self, mtu: int) -> int: async def request_mtu(self, mtu: int) -> int:
# Check the range # Check the range
@@ -898,6 +898,12 @@ class Client:
) and subscriber in subscribers: ) and subscriber in subscribers:
subscribers.remove(subscriber) subscribers.remove(subscriber)
# The characteristic itself is added as subscriber. If it is the
# last remaining subscriber, we remove it, such that the clean up
# works correctly. Otherwise the CCCD never is set back to 0.
if len(subscribers) == 1 and characteristic in subscribers:
subscribers.remove(characteristic)
# Cleanup if we removed the last one # Cleanup if we removed the last one
if not subscribers: if not subscribers:
del subscriber_set[characteristic.handle] del subscriber_set[characteristic.handle]

View File

@@ -28,7 +28,17 @@ import asyncio
import logging import logging
from collections import defaultdict from collections import defaultdict
import struct import struct
from typing import List, Tuple, Optional, TypeVar, Type, Dict, Iterable, TYPE_CHECKING from typing import (
Dict,
Iterable,
List,
Optional,
Tuple,
TypeVar,
Type,
Union,
TYPE_CHECKING,
)
from pyee import EventEmitter from pyee import EventEmitter
from bumble.colors import color from bumble.colors import color
@@ -68,6 +78,7 @@ from bumble.gatt import (
GATT_REQUEST_TIMEOUT, GATT_REQUEST_TIMEOUT,
GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE, GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE,
Characteristic, Characteristic,
CharacteristicAdapter,
CharacteristicDeclaration, CharacteristicDeclaration,
CharacteristicValue, CharacteristicValue,
IncludedServiceDeclaration, IncludedServiceDeclaration,
@@ -353,7 +364,7 @@ class Server(EventEmitter):
logger.debug( logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}' f'GATT Response from server: [0x{connection.handle:04X}] {response}'
) )
self.send_gatt_pdu(connection.handle, response.to_bytes()) self.send_gatt_pdu(connection.handle, bytes(response))
async def notify_subscriber( async def notify_subscriber(
self, self,
@@ -450,7 +461,7 @@ class Server(EventEmitter):
) )
try: try:
self.send_gatt_pdu(connection.handle, indication.to_bytes()) self.send_gatt_pdu(connection.handle, bytes(indication))
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT) await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error: except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red')) logger.warning(color('!!! GATT Indicate timeout', 'red'))

View File

@@ -915,6 +915,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3), HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3),
HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4), HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4),
HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1), HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1),
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+2),
HCI_LE_CS_WRITE_CACHED_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+3),
HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4), HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4),
HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5), HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5),
HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6), HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6),
@@ -940,6 +942,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3), HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3),
HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4), HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4),
HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5), HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5),
HCI_LE_CS_CREATE_CONFIG_COMMAND : 1 << (16*8+6),
HCI_LE_CS_REMOVE_CONFIG_COMMAND : 1 << (16*8+7),
HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0), HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0),
HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1), HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1),
HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2), HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2),
@@ -963,13 +967,20 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2), HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2),
HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3), HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3),
HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4), HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4),
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+5),
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+6),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES : 1 << (20*8+7),
HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2), HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2),
HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0), HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0),
HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1), HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1),
HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2), HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2),
HCI_LE_CS_TEST_COMMAND : 1 << (23*8+3),
HCI_LE_CS_TEST_END_COMMAND : 1 << (23*8+4),
HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0), HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0),
HCI_LE_CS_SECURITY_ENABLE_COMMAND : 1 << (24*8+1),
HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5), HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5),
HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6), HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6),
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND : 1 << (24*8+7),
HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0), HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0),
HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1), HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1),
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2), HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2),
@@ -1000,6 +1011,10 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4), HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4),
HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5), HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5),
HCI_LE_TEST_END_COMMAND : 1 << (28*8+6), HCI_LE_TEST_END_COMMAND : 1 << (28*8+6),
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND : 1 << (28*8+7),
HCI_LE_CS_SET_CHANNEL_CLASSIFICATION_COMMAND : 1 << (29*8+0),
HCI_LE_CS_SET_PROCEDURE_PARAMETERS_COMMAND : 1 << (29*8+1),
HCI_LE_CS_PROCEDURE_ENABLE_COMMAND : 1 << (29*8+2),
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3), HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3),
HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4), HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4),
HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5), HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5),
@@ -1136,11 +1151,21 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0), HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0),
HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1), HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1),
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2), HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2),
HCI_LE_SET_DECISION_DATA_COMMAND : 1 << (46*8+3),
HCI_LE_SET_DECISION_INSTRUCTIONS_COMMAND : 1 << (46*8+4),
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5), HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5),
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6), HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6),
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7), HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7),
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0), HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0),
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1), HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1),
HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (47*8+2),
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND : 1 << (47*8+3),
HCI_LE_SET_HOST_FEATURE_V2_COMMAND : 1 << (47*8+4),
HCI_LE_ADD_DEVICE_TO_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+5),
HCI_LE_REMOVE_DEVICE_FROM_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+6),
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
} }
# LE Supported Features # LE Supported Features
@@ -1457,7 +1482,7 @@ class CodingFormat:
vendor_specific_codec_id: int = 0 vendor_specific_codec_id: int = 0
@classmethod @classmethod
def parse_from_bytes(cls, data: bytes, offset: int): def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from( (codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset '<BHH', data, offset
) )
@@ -1467,14 +1492,15 @@ class CodingFormat:
vendor_specific_codec_id=vendor_specific_codec_id, vendor_specific_codec_id=vendor_specific_codec_id,
) )
def to_bytes(self) -> bytes: @classmethod
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def __bytes__(self) -> bytes:
return struct.pack( return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id '<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
) )
def __bytes__(self) -> bytes:
return self.to_bytes()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Constant: class HCI_Constant:
@@ -1691,7 +1717,7 @@ class HCI_Object:
field_length = len(field_bytes) field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr( elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes' field_value, '__bytes__'
): ):
field_bytes = bytes(field_value) field_bytes = bytes(field_value)
if isinstance(field_type, int) and 4 < field_type <= 256: if isinstance(field_type, int) and 4 < field_type <= 256:
@@ -1736,7 +1762,7 @@ class HCI_Object:
def from_bytes(cls, data, offset, fields): def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields)) return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def to_bytes(self): def __bytes__(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields) return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@staticmethod @staticmethod
@@ -1831,9 +1857,6 @@ class HCI_Object:
for field_name, field_value in field_strings for field_name, field_value in field_strings
) )
def __bytes__(self):
return self.to_bytes()
def __init__(self, fields, **kwargs): def __init__(self, fields, **kwargs):
self.fields = fields self.fields = fields
self.init_from_fields(self, fields, kwargs) self.init_from_fields(self, fields, kwargs)
@@ -2008,9 +2031,6 @@ class Address:
def is_static(self): def is_static(self):
return self.is_random and (self.address_bytes[5] >> 6 == 3) return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_bytes(self):
return self.address_bytes
def to_string(self, with_type_qualifier=True): def to_string(self, with_type_qualifier=True):
''' '''
String representation of the address, MSB first, with an optional type String representation of the address, MSB first, with an optional type
@@ -2022,7 +2042,7 @@ class Address:
return result + '/P' return result + '/P'
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.address_bytes
def __hash__(self): def __hash__(self):
return hash(self.address_bytes) return hash(self.address_bytes)
@@ -2228,16 +2248,13 @@ class HCI_Command(HCI_Packet):
self.op_code = op_code self.op_code = op_code
self.parameters = parameters self.parameters = parameters
def to_bytes(self): def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return ( return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters)) struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
+ parameters + parameters
) )
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
result = color(self.name, 'green') result = color(self.name, 'green')
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
@@ -4302,6 +4319,61 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
''' '''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
('periodic_advertising_interval_min', 2),
('periodic_advertising_interval_max', 2),
('periodic_advertising_properties', 2),
]
)
class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command
'''
class Properties(enum.IntFlag):
INCLUDE_TX_POWER = 1 << 6
advertising_handle: int
periodic_advertising_interval_min: int
periodic_advertising_interval_max: int
periodic_advertising_properties: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
(
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
).name,
},
),
(
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
)
class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command
'''
advertising_handle: int
operation: int
advertising_data: bytes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command([('enable', 1), ('advertising_handle', 1)]) @HCI_Command.command([('enable', 1), ('advertising_handle', 1)])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command): class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):
@@ -4996,6 +5068,7 @@ class HCI_Event(HCI_Packet):
hci_packet_type = HCI_EVENT_PACKET hci_packet_type = HCI_EVENT_PACKET
event_names: Dict[int, str] = {} event_names: Dict[int, str] = {}
event_classes: Dict[int, Type[HCI_Event]] = {} event_classes: Dict[int, Type[HCI_Event]] = {}
vendor_factory: Optional[Callable[[bytes], Optional[HCI_Event]]] = None
@staticmethod @staticmethod
def event(fields=()): def event(fields=()):
@@ -5053,37 +5126,41 @@ class HCI_Event(HCI_Packet):
return event_class return event_class
@staticmethod @classmethod
def from_bytes(packet: bytes) -> HCI_Event: def from_bytes(cls, packet: bytes) -> HCI_Event:
event_code = packet[1] event_code = packet[1]
length = packet[2] length = packet[2]
parameters = packet[3:] parameters = packet[3:]
if len(parameters) != length: if len(parameters) != length:
raise InvalidPacketError('invalid packet length') raise InvalidPacketError('invalid packet length')
cls: Any subclass: Any
if event_code == HCI_LE_META_EVENT: if event_code == HCI_LE_META_EVENT:
# We do this dispatch here and not in the subclass in order to avoid call # We do this dispatch here and not in the subclass in order to avoid call
# loops # loops
subevent_code = parameters[0] subevent_code = parameters[0]
cls = HCI_LE_Meta_Event.subevent_classes.get(subevent_code) subclass = HCI_LE_Meta_Event.subevent_classes.get(subevent_code)
if cls is None: if subclass is None:
# No class registered, just use a generic class instance # No class registered, just use a generic class instance
return HCI_LE_Meta_Event(subevent_code, parameters) return HCI_LE_Meta_Event(subevent_code, parameters)
elif event_code == HCI_VENDOR_EVENT: elif event_code == HCI_VENDOR_EVENT:
subevent_code = parameters[0] # Invoke all the registered factories to see if any of them can handle
cls = HCI_Vendor_Event.subevent_classes.get(subevent_code) # the event
if cls is None: if cls.vendor_factory:
# No class registered, just use a generic class instance if event := cls.vendor_factory(parameters):
return HCI_Vendor_Event(subevent_code, parameters) return event
# No factory, or the factory could not create an instance,
# return a generic vendor event
return HCI_Event(event_code, parameters)
else: else:
cls = HCI_Event.event_classes.get(event_code) subclass = HCI_Event.event_classes.get(event_code)
if cls is None: if subclass is None:
# No class registered, just use a generic class instance # No class registered, just use a generic class instance
return HCI_Event(event_code, parameters) return HCI_Event(event_code, parameters)
# Invoke the factory to create a new instance # Invoke the factory to create a new instance
return cls.from_parameters(parameters) # type: ignore return subclass.from_parameters(parameters) # type: ignore
@classmethod @classmethod
def from_parameters(cls, parameters): def from_parameters(cls, parameters):
@@ -5106,13 +5183,10 @@ class HCI_Event(HCI_Packet):
self.event_code = event_code self.event_code = event_code
self.parameters = parameters self.parameters = parameters
def to_bytes(self): def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
result = color(self.name, 'magenta') result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
@@ -5129,11 +5203,11 @@ HCI_Event.register_events(globals())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Extended_Event(HCI_Event): class HCI_Extended_Event(HCI_Event):
''' '''
HCI_Event subclass for events that has a subevent code. HCI_Event subclass for events that have a subevent code.
''' '''
subevent_names: Dict[int, str] = {} subevent_names: Dict[int, str] = {}
subevent_classes: Dict[int, Type[HCI_Extended_Event]] subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod @classmethod
def event(cls, fields=()): def event(cls, fields=()):
@@ -5184,7 +5258,22 @@ class HCI_Extended_Event(HCI_Event):
cls.subevent_names.update(cls.subevent_map(symbols)) cls.subevent_names.update(cls.subevent_map(symbols))
@classmethod @classmethod
def from_parameters(cls, parameters): def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
"""
Factory method that parses the subevent code, finds a registered subclass,
and creates an instance if found.
"""
subevent_code = parameters[0]
if subclass := cls.subevent_classes.get(subevent_code):
return subclass.from_parameters(parameters)
return None
@classmethod
def from_parameters(cls, parameters: bytes) -> HCI_Extended_Event:
"""Factory method for subclasses (the subevent code has already been parsed)"""
self = cls.__new__(cls) self = cls.__new__(cls)
HCI_Extended_Event.__init__(self, self.subevent_code, parameters) HCI_Extended_Event.__init__(self, self.subevent_code, parameters)
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
@@ -5225,12 +5314,6 @@ class HCI_LE_Meta_Event(HCI_Extended_Event):
HCI_LE_Meta_Event.register_subevents(globals()) HCI_LE_Meta_Event.register_subevents(globals())
# -----------------------------------------------------------------------------
class HCI_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes = {}
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_LE_Meta_Event.event( @HCI_LE_Meta_Event.event(
[ [
@@ -6104,8 +6187,9 @@ class HCI_Command_Complete_Event(HCI_Event):
See Bluetooth spec @ 7.7.14 Command Complete Event See Bluetooth spec @ 7.7.14 Command Complete Event
''' '''
return_parameters = b'' num_hci_command_packets: int
command_opcode: int command_opcode: int
return_parameters = b''
def map_return_parameters(self, return_parameters): def map_return_parameters(self, return_parameters):
'''Map simple 'status' return parameters to their named constant form''' '''Map simple 'status' return parameters to their named constant form'''
@@ -6641,6 +6725,14 @@ class HCI_Remote_Host_Supported_Features_Notification_Event(HCI_Event):
''' '''
# -----------------------------------------------------------------------------
@HCI_Event.event([('data', "*")])
class HCI_Vendor_Event(HCI_Event):
'''
See Bluetooth spec @ 5.4.4 HCI Event packet
'''
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_AclDataPacket(HCI_Packet): class HCI_AclDataPacket(HCI_Packet):
''' '''
@@ -6663,7 +6755,7 @@ class HCI_AclDataPacket(HCI_Packet):
connection_handle, pb_flag, bc_flag, data_total_length, data connection_handle, pb_flag, bc_flag, data_total_length, data
) )
def to_bytes(self): def __bytes__(self):
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return ( return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length) struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
@@ -6677,9 +6769,6 @@ class HCI_AclDataPacket(HCI_Packet):
self.data_total_length = data_total_length self.data_total_length = data_total_length
self.data = data self.data = data
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
return ( return (
f'{color("ACL", "blue")}: ' f'{color("ACL", "blue")}: '
@@ -6713,7 +6802,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
connection_handle, packet_status, data_total_length, data connection_handle, packet_status, data_total_length, data
) )
def to_bytes(self) -> bytes: def __bytes__(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle h = (self.packet_status << 12) | self.connection_handle
return ( return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length) struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
@@ -6732,9 +6821,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
self.data_total_length = data_total_length self.data_total_length = data_total_length
self.data = data self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str: def __str__(self) -> str:
return ( return (
f'{color("SCO", "blue")}: ' f'{color("SCO", "blue")}: '
@@ -6807,9 +6893,6 @@ class HCI_IsoDataPacket(HCI_Packet):
) )
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH' fmt = '<BHH'
args = [ args = [
HCI_ISO_DATA_PACKET, HCI_ISO_DATA_PACKET,

View File

@@ -141,7 +141,7 @@ class HfFeature(enum.IntFlag):
""" """
HF supported features (AT+BRSF=) (normative). HF supported features (AT+BRSF=) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
""" """
EC_NR = 0x001 # Echo Cancel & Noise reduction EC_NR = 0x001 # Echo Cancel & Noise reduction
@@ -155,14 +155,14 @@ class HfFeature(enum.IntFlag):
HF_INDICATORS = 0x100 HF_INDICATORS = 0x100
ESCO_S4_SETTINGS_SUPPORTED = 0x200 ESCO_S4_SETTINGS_SUPPORTED = 0x200
ENHANCED_VOICE_RECOGNITION_STATUS = 0x400 ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
VOICE_RECOGNITION_TEST = 0x800 VOICE_RECOGNITION_TEXT = 0x800
class AgFeature(enum.IntFlag): class AgFeature(enum.IntFlag):
""" """
AG supported features (+BRSF:) (normative). AG supported features (+BRSF:) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
""" """
THREE_WAY_CALLING = 0x001 THREE_WAY_CALLING = 0x001
@@ -178,7 +178,7 @@ class AgFeature(enum.IntFlag):
HF_INDICATORS = 0x400 HF_INDICATORS = 0x400
ESCO_S4_SETTINGS_SUPPORTED = 0x800 ESCO_S4_SETTINGS_SUPPORTED = 0x800
ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000 ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
VOICE_RECOGNITION_TEST = 0x2000 VOICE_RECOGNITION_TEXT = 0x2000
class AudioCodec(enum.IntEnum): class AudioCodec(enum.IntEnum):
@@ -1390,6 +1390,7 @@ class AgProtocol(pyee.EventEmitter):
def _on_bac(self, *args) -> None: def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.emit('supported_audio_codecs', self.supported_audio_codecs)
self.send_ok() self.send_ok()
def _on_bcs(self, codec: bytes) -> None: def _on_bcs(self, codec: bytes) -> None:
@@ -1618,7 +1619,7 @@ class ProfileVersion(enum.IntEnum):
""" """
Profile version (normative). Profile version (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. Hands-Free Profile v1.8, 6.3 SDP Interoperability Requirements.
""" """
V1_5 = 0x0105 V1_5 = 0x0105
@@ -1632,7 +1633,7 @@ class HfSdpFeature(enum.IntFlag):
""" """
HF supported features (normative). HF supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
""" """
EC_NR = 0x01 # Echo Cancel & Noise reduction EC_NR = 0x01 # Echo Cancel & Noise reduction
@@ -1640,16 +1641,17 @@ class HfSdpFeature(enum.IntFlag):
CLI_PRESENTATION_CAPABILITY = 0x04 CLI_PRESENTATION_CAPABILITY = 0x04
VOICE_RECOGNITION_ACTIVATION = 0x08 VOICE_RECOGNITION_ACTIVATION = 0x08
REMOTE_VOLUME_CONTROL = 0x10 REMOTE_VOLUME_CONTROL = 0x10
WIDE_BAND = 0x20 # Wide band speech WIDE_BAND_SPEECH = 0x20
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEST = 0x80 VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND = 0x100
class AgSdpFeature(enum.IntFlag): class AgSdpFeature(enum.IntFlag):
""" """
AG supported features (normative). AG supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
""" """
THREE_WAY_CALLING = 0x01 THREE_WAY_CALLING = 0x01
@@ -1657,9 +1659,10 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_FUNCTION = 0x04 VOICE_RECOGNITION_FUNCTION = 0x04
IN_BAND_RING_TONE_CAPABILITY = 0x08 IN_BAND_RING_TONE_CAPABILITY = 0x08
VOICE_TAG = 0x10 # Attach a number to voice tag VOICE_TAG = 0x10 # Attach a number to voice tag
WIDE_BAND = 0x20 # Wide band speech WIDE_BAND_SPEECH = 0x20
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEST = 0x80 VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND_SPEED_SPEECH = 0x100
def make_hf_sdp_records( def make_hf_sdp_records(
@@ -1692,11 +1695,11 @@ def make_hf_sdp_records(
in configuration.supported_hf_features in configuration.supported_hf_features
): ):
hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features: if HfFeature.VOICE_RECOGNITION_TEXT in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEXT
if AudioCodec.MSBC in configuration.supported_audio_codecs: if AudioCodec.MSBC in configuration.supported_audio_codecs:
hf_supported_features |= HfSdpFeature.WIDE_BAND hf_supported_features |= HfSdpFeature.WIDE_BAND_SPEECH
return [ return [
sdp.ServiceAttribute( sdp.ServiceAttribute(
@@ -1772,14 +1775,14 @@ def make_ag_sdp_records(
in configuration.supported_ag_features in configuration.supported_ag_features
): ):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features: if AgFeature.VOICE_RECOGNITION_TEXT in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEXT
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features: if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features: if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs: if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND ag_supported_features |= AgSdpFeature.WIDE_BAND_SPEECH
return [ return [
sdp.ServiceAttribute( sdp.ServiceAttribute(

View File

@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False, check_address_type: bool = False,
) -> Optional[Connection]: ) -> Optional[Connection]:
for connection in self.connections.values(): for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes(): if bytes(connection.peer_address) == bytes(bd_addr):
if ( if (
check_address_type check_address_type
and connection.peer_address.address_type != bd_addr.address_type and connection.peer_address.address_type != bd_addr.address_type
@@ -552,7 +552,7 @@ class Host(AbortableEventEmitter):
return response return response
except Exception as error: except Exception as error:
logger.warning( logger.exception(
f'{color("!!! Exception while sending command:", "red")} {error}' f'{color("!!! Exception while sending command:", "red")} {error}'
) )
raise error raise error
@@ -1248,3 +1248,6 @@ class Host(AbortableEventEmitter):
event.connection_handle, event.connection_handle,
int.from_bytes(event.le_features, 'little'), int.from_bytes(event.le_features, 'little'),
) )
def on_hci_vendor_event(self, event):
self.emit('vendor_event', event)

View File

@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload) return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def to_bytes(self) -> bytes: def __bytes__(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid) header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload return header + self.payload
@@ -233,9 +233,6 @@ class L2CAP_PDU:
self.cid = cid self.cid = cid
self.payload = payload self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str: def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}' return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -333,11 +330,8 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset): def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.to_bytes() return self.pdu
def __str__(self) -> str: def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]' result = f'{color(self.name, "yellow")} [ID={self.identifier}]'

View File

@@ -139,16 +139,19 @@ class PairingDelegate:
io_capability: IoCapability io_capability: IoCapability
local_initiator_key_distribution: KeyDistribution local_initiator_key_distribution: KeyDistribution
local_responder_key_distribution: KeyDistribution local_responder_key_distribution: KeyDistribution
maximum_encryption_key_size: int
def __init__( def __init__(
self, self,
io_capability: IoCapability = NO_OUTPUT_NO_INPUT, io_capability: IoCapability = NO_OUTPUT_NO_INPUT,
local_initiator_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION, local_initiator_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION,
local_responder_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION, local_responder_key_distribution: KeyDistribution = DEFAULT_KEY_DISTRIBUTION,
maximum_encryption_key_size: int = 16,
) -> None: ) -> None:
self.io_capability = io_capability self.io_capability = io_capability
self.local_initiator_key_distribution = local_initiator_key_distribution self.local_initiator_key_distribution = local_initiator_key_distribution
self.local_responder_key_distribution = local_responder_key_distribution self.local_responder_key_distribution = local_responder_key_distribution
self.maximum_encryption_key_size = maximum_encryption_key_size
@property @property
def classic_io_capability(self) -> int: def classic_io_capability(self) -> int:

View File

@@ -39,7 +39,6 @@ from bumble.device import (
AdvertisingEventProperties, AdvertisingEventProperties,
AdvertisingType, AdvertisingType,
Device, Device,
Phy,
) )
from bumble.gatt import Service from bumble.gatt import Service
from bumble.hci import ( from bumble.hci import (
@@ -47,6 +46,7 @@ from bumble.hci import (
HCI_PAGE_TIMEOUT_ERROR, HCI_PAGE_TIMEOUT_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address, Address,
Phy,
) )
from google.protobuf import any_pb2 # pytype: disable=pyi-error from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error from google.protobuf import empty_pb2 # pytype: disable=pyi-error

View File

@@ -17,6 +17,7 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import logging import logging
import struct import struct
@@ -28,10 +29,11 @@ from bumble.device import Connection
from bumble.att import ATT_Error from bumble.att import ATT_Error
from bumble.gatt import ( from bumble.gatt import (
Characteristic, Characteristic,
DelegatedCharacteristicAdapter, SerializableCharacteristicAdapter,
PackedCharacteristicAdapter,
TemplateService, TemplateService,
CharacteristicValue, CharacteristicValue,
PackedCharacteristicAdapter, UTF8CharacteristicAdapter,
GATT_AUDIO_INPUT_CONTROL_SERVICE, GATT_AUDIO_INPUT_CONTROL_SERVICE,
GATT_AUDIO_INPUT_STATE_CHARACTERISTIC, GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC, GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
@@ -95,7 +97,7 @@ class AudioInputStatus(OpenIntEnum):
Cf. 3.4 Audio Input Status Cf. 3.4 Audio Input Status
''' '''
INATIVE = 0x00 INACTIVE = 0x00
ACTIVE = 0x01 ACTIVE = 0x01
@@ -104,7 +106,7 @@ class AudioInputControlPointOpCode(OpenIntEnum):
Cf. 3.5.1 Audio Input Control Point procedure requirements Cf. 3.5.1 Audio Input Control Point procedure requirements
''' '''
SET_GAIN_SETTING = 0x00 SET_GAIN_SETTING = 0x01
UNMUTE = 0x02 UNMUTE = 0x02
MUTE = 0x03 MUTE = 0x03
SET_MANUAL_GAIN_MODE = 0x04 SET_MANUAL_GAIN_MODE = 0x04
@@ -154,9 +156,6 @@ class AudioInputState:
attribute=self.attribute_value, value=bytes(self) attribute=self.attribute_value, value=bytes(self)
) )
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@dataclass @dataclass
class GainSettingsProperties: class GainSettingsProperties:
@@ -173,7 +172,7 @@ class GainSettingsProperties:
(gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = ( (gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = (
struct.unpack('BBB', data) struct.unpack('BBB', data)
) )
GainSettingsProperties( return GainSettingsProperties(
gain_settings_unit, gain_settings_minimum, gain_settings_maximum gain_settings_unit, gain_settings_minimum, gain_settings_maximum
) )
@@ -186,9 +185,6 @@ class GainSettingsProperties:
] ]
) )
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@dataclass @dataclass
class AudioInputControlPoint: class AudioInputControlPoint:
@@ -239,7 +235,7 @@ class AudioInputControlPoint:
or gain_settings_operand or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum > self.gain_settings_properties.gain_settings_maximum
): ):
logger.error("gain_seetings value out of range") logger.error("gain_settings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE) raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
if self.audio_input_state.gain_settings != gain_settings_operand: if self.audio_input_state.gain_settings != gain_settings_operand:
@@ -321,21 +317,14 @@ class AudioInputDescription:
audio_input_description: str = "Bluetooth" audio_input_description: str = "Bluetooth"
attribute_value: Optional[CharacteristicValue] = None attribute_value: Optional[CharacteristicValue] = None
@classmethod def on_read(self, _connection: Optional[Connection]) -> str:
def from_bytes(cls, data: bytes): return self.audio_input_description
return cls(audio_input_description=data.decode('utf-8'))
def __bytes__(self) -> bytes: async def on_write(self, connection: Optional[Connection], value: str) -> None:
return self.audio_input_description.encode('utf-8')
def on_read(self, _connection: Optional[Connection]) -> bytes:
return self.audio_input_description.encode('utf-8')
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection assert connection
assert self.attribute_value assert self.attribute_value
self.audio_input_description = value.decode('utf-8') self.audio_input_description = value
await connection.device.notify_subscribers( await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value attribute=self.attribute_value, value=value
) )
@@ -375,26 +364,29 @@ class AICSService(TemplateService):
self.audio_input_state, self.gain_settings_properties self.audio_input_state, self.gain_settings_properties
) )
self.audio_input_state_characteristic = DelegatedCharacteristicAdapter( self.audio_input_state_characteristic = SerializableCharacteristicAdapter(
Characteristic( Characteristic(
uuid=GATT_AUDIO_INPUT_STATE_CHARACTERISTIC, uuid=GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
properties=Characteristic.Properties.READ properties=Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY, | Characteristic.Properties.NOTIFY,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.audio_input_state.on_read), value=self.audio_input_state,
), ),
encode=lambda value: bytes(value), AudioInputState,
) )
self.audio_input_state.attribute_value = ( self.audio_input_state.attribute_value = (
self.audio_input_state_characteristic.value self.audio_input_state_characteristic.value
) )
self.gain_settings_properties_characteristic = DelegatedCharacteristicAdapter( self.gain_settings_properties_characteristic = (
Characteristic( SerializableCharacteristicAdapter(
uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC, Characteristic(
properties=Characteristic.Properties.READ, uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, properties=Characteristic.Properties.READ,
value=CharacteristicValue(read=self.gain_settings_properties.on_read), permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=self.gain_settings_properties,
),
GainSettingsProperties,
) )
) )
@@ -402,7 +394,7 @@ class AICSService(TemplateService):
uuid=GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC, uuid=GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
properties=Characteristic.Properties.READ, properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=audio_input_type, value=bytes(audio_input_type, 'utf-8'),
) )
self.audio_input_status_characteristic = Characteristic( self.audio_input_status_characteristic = Characteristic(
@@ -412,18 +404,14 @@ class AICSService(TemplateService):
value=bytes([self.audio_input_status]), value=bytes([self.audio_input_status]),
) )
self.audio_input_control_point_characteristic = DelegatedCharacteristicAdapter( self.audio_input_control_point_characteristic = Characteristic(
Characteristic( uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC, properties=Characteristic.Properties.WRITE,
properties=Characteristic.Properties.WRITE, permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, value=CharacteristicValue(write=self.audio_input_control_point.on_write),
value=CharacteristicValue(
write=self.audio_input_control_point.on_write
),
)
) )
self.audio_input_description_characteristic = DelegatedCharacteristicAdapter( self.audio_input_description_characteristic = UTF8CharacteristicAdapter(
Characteristic( Characteristic(
uuid=GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC, uuid=GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
properties=Characteristic.Properties.READ properties=Characteristic.Properties.READ
@@ -469,8 +457,8 @@ class AICSServiceProxy(ProfileServiceProxy):
) )
): ):
raise gatt.InvalidServiceError("Audio Input State Characteristic not found") raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
self.audio_input_state = DelegatedCharacteristicAdapter( self.audio_input_state = SerializableCharacteristicAdapter(
characteristic=characteristics[0], decode=AudioInputState.from_bytes characteristics[0], AudioInputState
) )
if not ( if not (
@@ -481,9 +469,8 @@ class AICSServiceProxy(ProfileServiceProxy):
raise gatt.InvalidServiceError( raise gatt.InvalidServiceError(
"Gain Settings Attribute Characteristic not found" "Gain Settings Attribute Characteristic not found"
) )
self.gain_settings_properties = PackedCharacteristicAdapter( self.gain_settings_properties = SerializableCharacteristicAdapter(
characteristics[0], characteristics[0], GainSettingsProperties
'BBB',
) )
if not ( if not (
@@ -494,10 +481,7 @@ class AICSServiceProxy(ProfileServiceProxy):
raise gatt.InvalidServiceError( raise gatt.InvalidServiceError(
"Audio Input Status Characteristic not found" "Audio Input Status Characteristic not found"
) )
self.audio_input_status = PackedCharacteristicAdapter( self.audio_input_status = PackedCharacteristicAdapter(characteristics[0], 'B')
characteristics[0],
'B',
)
if not ( if not (
characteristics := service_proxy.get_characteristics_by_uuid( characteristics := service_proxy.get_characteristics_by_uuid(
@@ -517,4 +501,4 @@ class AICSServiceProxy(ProfileServiceProxy):
raise gatt.InvalidServiceError( raise gatt.InvalidServiceError(
"Audio Input Description Characteristic not found" "Audio Input Description Characteristic not found"
) )
self.audio_input_description = characteristics[0] self.audio_input_description = UTF8CharacteristicAdapter(characteristics[0])

View File

@@ -102,6 +102,7 @@ class ContextType(enum.IntFlag):
# fmt: off # fmt: off
PROHIBITED = 0x0000 PROHIBITED = 0x0000
UNSPECIFIED = 0x0001
CONVERSATIONAL = 0x0002 CONVERSATIONAL = 0x0002
MEDIA = 0x0004 MEDIA = 0x0004
GAME = 0x0008 GAME = 0x0008
@@ -264,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack( struct.pack(
'<2sBIB', '<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(), bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
self.announcement_type, self.announcement_type,
self.available_audio_contexts, self.available_audio_contexts,
len(self.metadata), len(self.metadata),
@@ -397,18 +398,21 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04 OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05 CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int codec_frames_per_sdu: int | None = None
@classmethod @classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration: def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0 offset = 0
# Allowed default values. sampling_frequency: SamplingFrequency | None = None
audio_channel_allocation = AudioLocation.NOT_ALLOWED frame_duration: FrameDuration | None = None
codec_frames_per_sdu = 1 audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
while offset < len(data): while offset < len(data):
length, type = struct.unpack_from('BB', data, offset) length, type = struct.unpack_from('BB', data, offset)
offset += 2 offset += 2
@@ -426,8 +430,6 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU: elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration( return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency, sampling_frequency=sampling_frequency,
frame_duration=frame_duration, frame_duration=frame_duration,
@@ -437,23 +439,43 @@ class CodecSpecificConfiguration:
) )
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return struct.pack( return b''.join(
'<BBBBBBBBIBBHBBB', [
2, struct.pack(fmt, length, tag, value)
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY, for fmt, length, tag, value in [
self.sampling_frequency, (
2, '<BBB',
CodecSpecificConfiguration.Type.FRAME_DURATION, 2,
self.frame_duration, CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
5, self.sampling_frequency,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION, ),
self.audio_channel_allocation, (
3, '<BBB',
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME, 2,
self.octets_per_codec_frame, CodecSpecificConfiguration.Type.FRAME_DURATION,
2, self.frame_duration,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU, ),
self.codec_frames_per_sdu, (
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
) )
@@ -465,6 +487,24 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self: def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little')) return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
@dataclasses.dataclass @dataclasses.dataclass
class BasicAudioAnnouncement: class BasicAudioAnnouncement:
@@ -473,26 +513,37 @@ class BasicAudioAnnouncement:
index: int index: int
codec_specific_configuration: CodecSpecificConfiguration codec_specific_configuration: CodecSpecificConfiguration
@dataclasses.dataclass def __bytes__(self) -> bytes:
class CodecInfo: codec_specific_configuration_bytes = bytes(
coding_format: hci.CodecID self.codec_specific_configuration
company_id: int )
vendor_specific_codec_id: int return (
bytes([self.index, len(codec_specific_configuration_bytes)])
@classmethod + codec_specific_configuration_bytes
def from_bytes(cls, data: bytes) -> Self: )
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
@dataclasses.dataclass @dataclasses.dataclass
class Subgroup: class Subgroup:
codec_id: BasicAudioAnnouncement.CodecInfo codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS] bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
presentation_delay: int presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup] subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -504,7 +555,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]): for _ in range(data[3]):
num_bis = data[offset] num_bis = data[offset]
offset += 1 offset += 1
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5]) codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
offset += 5 offset += 5
codec_specific_configuration_length = data[offset] codec_specific_configuration_length = data[offset]
offset += 1 offset += 1
@@ -548,3 +599,25 @@ class BasicAudioAnnouncement:
) )
return cls(presentation_delay, subgroups) return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)

View File

@@ -276,10 +276,7 @@ class BroadcastReceiveState:
subgroups: List[SubgroupInfo] subgroups: List[SubgroupInfo]
@classmethod @classmethod
def from_bytes(cls, data: bytes) -> Optional[BroadcastReceiveState]: def from_bytes(cls, data: bytes) -> BroadcastReceiveState:
if not data:
return None
source_id = data[0] source_id = data[0]
_, source_address = hci.Address.parse_address_preceded_by_type(data, 2) _, source_address = hci.Address.parse_address_preceded_by_type(data, 2)
source_adv_sid = data[8] source_adv_sid = data[8]
@@ -357,7 +354,7 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = BroadcastAudioScanService SERVICE_CLASS = BroadcastAudioScanService
broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy broadcast_audio_scan_control_point: gatt_client.CharacteristicProxy
broadcast_receive_states: List[gatt.DelegatedCharacteristicAdapter] broadcast_receive_states: List[gatt.SerializableCharacteristicAdapter]
def __init__(self, service_proxy: gatt_client.ServiceProxy): def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy self.service_proxy = service_proxy
@@ -381,8 +378,8 @@ class BroadcastAudioScanServiceProxy(gatt_client.ProfileServiceProxy):
"Broadcast Receive State characteristic not found" "Broadcast Receive State characteristic not found"
) )
self.broadcast_receive_states = [ self.broadcast_receive_states = [
gatt.DelegatedCharacteristicAdapter( gatt.SerializableCharacteristicAdapter(
characteristic, decode=BroadcastReceiveState.from_bytes characteristic, BroadcastReceiveState
) )
for characteristic in characteristics for characteristic in characteristics
] ]

View File

@@ -64,7 +64,10 @@ class DeviceInformationService(TemplateService):
): ):
characteristics = [ characteristics = [
Characteristic( Characteristic(
uuid, Characteristic.Properties.READ, Characteristic.READABLE, field uuid,
Characteristic.Properties.READ,
Characteristic.READABLE,
bytes(field, 'utf-8'),
) )
for (field, uuid) in ( for (field, uuid) in (
(manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC), (manufacturer_name, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC),

198
bumble/profiles/gmap.py Normal file
View File

@@ -0,0 +1,198 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""LE Audio - Gaming Audio Profile"""
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from typing import Optional
from bumble.gatt import (
TemplateService,
DelegatedCharacteristicAdapter,
Characteristic,
GATT_GAMING_AUDIO_SERVICE,
GATT_GMAP_ROLE_CHARACTERISTIC,
GATT_UGG_FEATURES_CHARACTERISTIC,
GATT_UGT_FEATURES_CHARACTERISTIC,
GATT_BGS_FEATURES_CHARACTERISTIC,
GATT_BGR_FEATURES_CHARACTERISTIC,
InvalidServiceError,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from enum import IntFlag
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
class GmapRole(IntFlag):
UNICAST_GAME_GATEWAY = 1 << 0
UNICAST_GAME_TERMINAL = 1 << 1
BROADCAST_GAME_SENDER = 1 << 2
BROADCAST_GAME_RECEIVER = 1 << 3
class UggFeatures(IntFlag):
UGG_MULTIPLEX = 1 << 0
UGG_96_KBPS_SOURCE = 1 << 1
UGG_MULTISINK = 1 << 2
class UgtFeatures(IntFlag):
UGT_SOURCE = 1 << 0
UGT_80_KBPS_SOURCE = 1 << 1
UGT_SINK = 1 << 2
UGT_64_KBPS_SINK = 1 << 3
UGT_MULTIPLEX = 1 << 4
UGT_MULTISINK = 1 << 5
UGT_MULTISOURCE = 1 << 6
class BgsFeatures(IntFlag):
BGS_96_KBPS = 1 << 0
class BgrFeatures(IntFlag):
BGR_MULTISINK = 1 << 0
BGR_MULTIPLEX = 1 << 1
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class GamingAudioService(TemplateService):
UUID = GATT_GAMING_AUDIO_SERVICE
gmap_role: Characteristic
ugg_features: Optional[Characteristic] = None
ugt_features: Optional[Characteristic] = None
bgs_features: Optional[Characteristic] = None
bgr_features: Optional[Characteristic] = None
def __init__(
self,
gmap_role: GmapRole,
ugg_features: Optional[UggFeatures] = None,
ugt_features: Optional[UgtFeatures] = None,
bgs_features: Optional[BgsFeatures] = None,
bgr_features: Optional[BgrFeatures] = None,
) -> None:
characteristics = []
ugg_features = UggFeatures(0) if ugg_features is None else ugg_features
ugt_features = UgtFeatures(0) if ugt_features is None else ugt_features
bgs_features = BgsFeatures(0) if bgs_features is None else bgs_features
bgr_features = BgrFeatures(0) if bgr_features is None else bgr_features
self.gmap_role = Characteristic(
uuid=GATT_GMAP_ROLE_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', gmap_role),
)
characteristics.append(self.gmap_role)
if gmap_role & GmapRole.UNICAST_GAME_GATEWAY:
self.ugg_features = Characteristic(
uuid=GATT_UGG_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', ugg_features),
)
characteristics.append(self.ugg_features)
if gmap_role & GmapRole.UNICAST_GAME_TERMINAL:
self.ugt_features = Characteristic(
uuid=GATT_UGT_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', ugt_features),
)
characteristics.append(self.ugt_features)
if gmap_role & GmapRole.BROADCAST_GAME_SENDER:
self.bgs_features = Characteristic(
uuid=GATT_BGS_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', bgs_features),
)
characteristics.append(self.bgs_features)
if gmap_role & GmapRole.BROADCAST_GAME_RECEIVER:
self.bgr_features = Characteristic(
uuid=GATT_BGR_FEATURES_CHARACTERISTIC,
properties=Characteristic.Properties.READ,
permissions=Characteristic.Permissions.READABLE,
value=struct.pack('B', bgr_features),
)
characteristics.append(self.bgr_features)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class GamingAudioServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = GamingAudioService
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_GMAP_ROLE_CHARACTERISTIC
)
):
raise InvalidServiceError("GMAP Role Characteristic not found")
self.gmap_role = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: GmapRole(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGG_FEATURES_CHARACTERISTIC
):
self.ugg_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: UggFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_UGT_FEATURES_CHARACTERISTIC
):
self.ugt_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: UgtFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGS_FEATURES_CHARACTERISTIC
):
self.bgs_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: BgsFeatures(value[0]),
)
if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_BGR_FEATURES_CHARACTERISTIC
):
self.bgr_features = DelegatedCharacteristicAdapter(
characteristic=characteristics[0],
decode=lambda value: BgrFeatures(value[0]),
)

View File

@@ -30,6 +30,7 @@ from ..gatt import (
TemplateService, TemplateService,
Characteristic, Characteristic,
CharacteristicValue, CharacteristicValue,
SerializableCharacteristicAdapter,
DelegatedCharacteristicAdapter, DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter, PackedCharacteristicAdapter,
) )
@@ -150,15 +151,14 @@ class HeartRateService(TemplateService):
body_sensor_location=None, body_sensor_location=None,
reset_energy_expended=None, reset_energy_expended=None,
): ):
self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter( self.heart_rate_measurement_characteristic = SerializableCharacteristicAdapter(
Characteristic( Characteristic(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
Characteristic.Properties.NOTIFY, Characteristic.Properties.NOTIFY,
0, 0,
CharacteristicValue(read=read_heart_rate_measurement), CharacteristicValue(read=read_heart_rate_measurement),
), ),
# pylint: disable=unnecessary-lambda HeartRateService.HeartRateMeasurement,
encode=lambda value: bytes(value),
) )
characteristics = [self.heart_rate_measurement_characteristic] characteristics = [self.heart_rate_measurement_characteristic]
@@ -204,9 +204,8 @@ class HeartRateServiceProxy(ProfileServiceProxy):
if characteristics := service_proxy.get_characteristics_by_uuid( if characteristics := service_proxy.get_characteristics_by_uuid(
GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
): ):
self.heart_rate_measurement = DelegatedCharacteristicAdapter( self.heart_rate_measurement = SerializableCharacteristicAdapter(
characteristics[0], characteristics[0], HeartRateService.HeartRateMeasurement
decode=HeartRateService.HeartRateMeasurement.from_bytes,
) )
else: else:
self.heart_rate_measurement = None self.heart_rate_measurement = None

330
bumble/profiles/vocs.py Normal file
View File

@@ -0,0 +1,330 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
from dataclasses import dataclass
from typing import Optional
from bumble.device import Connection
from bumble.att import ATT_Error
from bumble.gatt import (
Characteristic,
DelegatedCharacteristicAdapter,
TemplateService,
CharacteristicValue,
UTF8CharacteristicAdapter,
InvalidServiceError,
GATT_VOLUME_OFFSET_CONTROL_SERVICE,
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
GATT_AUDIO_LOCATION_CHARACTERISTIC,
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
)
from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
from bumble.utils import OpenIntEnum
from bumble.profiles.bap import AudioLocation
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
MIN_VOLUME_OFFSET = -255
MAX_VOLUME_OFFSET = 255
CHANGE_COUNTER_MAX_VALUE = 0xFF
class SetVolumeOffsetOpCode(OpenIntEnum):
SET_VOLUME_OFFSET = 0x01
class ErrorCode(OpenIntEnum):
"""
See Volume Offset Control Service 1.6. Application error codes.
"""
INVALID_CHANGE_COUNTER = 0x80
OPCODE_NOT_SUPPORTED = 0x81
VALUE_OUT_OF_RANGE = 0x82
# -----------------------------------------------------------------------------
@dataclass
class VolumeOffsetState:
volume_offset: int = 0
change_counter: int = 0
attribute_value: Optional[CharacteristicValue] = None
def __bytes__(self) -> bytes:
return struct.pack('<hB', self.volume_offset, self.change_counter)
@classmethod
def from_bytes(cls, data: bytes):
volume_offset, change_counter = struct.unpack('<hB', data)
return cls(volume_offset, change_counter)
def increment_change_counter(self) -> None:
self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
async def notify_subscribers_via_connection(self, connection: Connection) -> None:
assert self.attribute_value is not None
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=bytes(self)
)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
@dataclass
class VocsAudioLocation:
audio_location: AudioLocation = AudioLocation.NOT_ALLOWED
attribute_value: Optional[CharacteristicValue] = None
def __bytes__(self) -> bytes:
return struct.pack('<I', self.audio_location)
@classmethod
def from_bytes(cls, data: bytes):
audio_location = AudioLocation(struct.unpack('<I', data)[0])
return cls(audio_location)
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
self.audio_location = AudioLocation(int.from_bytes(value, 'little'))
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
@dataclass
class VolumeOffsetControlPoint:
volume_offset_state: VolumeOffsetState
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
opcode = value[0]
if opcode != SetVolumeOffsetOpCode.SET_VOLUME_OFFSET:
raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
change_counter, volume_offset = struct.unpack('<Bh', value[1:])
await self._set_volume_offset(connection, change_counter, volume_offset)
async def _set_volume_offset(
self,
connection: Connection,
change_counter_operand: int,
volume_offset_operand: int,
) -> None:
change_counter = self.volume_offset_state.change_counter
if change_counter != change_counter_operand:
raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
if not MIN_VOLUME_OFFSET <= volume_offset_operand <= MAX_VOLUME_OFFSET:
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
self.volume_offset_state.volume_offset = volume_offset_operand
self.volume_offset_state.increment_change_counter()
await self.volume_offset_state.notify_subscribers_via_connection(connection)
@dataclass
class AudioOutputDescription:
audio_output_description: str = ''
attribute_value: Optional[CharacteristicValue] = None
@classmethod
def from_bytes(cls, data: bytes):
return cls(audio_output_description=data.decode('utf-8'))
def __bytes__(self) -> bytes:
return self.audio_output_description.encode('utf-8')
def on_read(self, _connection: Optional[Connection]) -> bytes:
return bytes(self)
async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
assert connection
assert self.attribute_value
self.audio_output_description = value.decode('utf-8')
await connection.device.notify_subscribers(
attribute=self.attribute_value, value=value
)
# -----------------------------------------------------------------------------
class VolumeOffsetControlService(TemplateService):
UUID = GATT_VOLUME_OFFSET_CONTROL_SERVICE
def __init__(
self,
volume_offset_state: Optional[VolumeOffsetState] = None,
audio_location: Optional[VocsAudioLocation] = None,
audio_output_description: Optional[AudioOutputDescription] = None,
) -> None:
self.volume_offset_state = (
VolumeOffsetState() if volume_offset_state is None else volume_offset_state
)
self.audio_location = (
VocsAudioLocation() if audio_location is None else audio_location
)
self.audio_output_description = (
AudioOutputDescription()
if audio_output_description is None
else audio_output_description
)
self.volume_offset_control_point: VolumeOffsetControlPoint = (
VolumeOffsetControlPoint(self.volume_offset_state)
)
self.volume_offset_state_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY
),
permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
value=CharacteristicValue(read=self.volume_offset_state.on_read),
),
encode=lambda value: bytes(value),
)
self.audio_location_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_LOCATION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_location.on_read,
write=self.audio_location.on_write,
),
),
encode=lambda value: bytes(value),
decode=VocsAudioLocation.from_bytes,
)
self.audio_location.attribute_value = self.audio_location_characteristic.value
self.volume_offset_control_point_characteristic = Characteristic(
uuid=GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC,
properties=Characteristic.Properties.WRITE,
permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
value=CharacteristicValue(write=self.volume_offset_control_point.on_write),
)
self.audio_output_description_characteristic = DelegatedCharacteristicAdapter(
Characteristic(
uuid=GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC,
properties=(
Characteristic.Properties.READ
| Characteristic.Properties.NOTIFY
| Characteristic.Properties.WRITE_WITHOUT_RESPONSE
),
permissions=(
Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
| Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION
),
value=CharacteristicValue(
read=self.audio_output_description.on_read,
write=self.audio_output_description.on_write,
),
)
)
self.audio_output_description.attribute_value = (
self.audio_output_description_characteristic.value
)
super().__init__(
characteristics=[
self.volume_offset_state_characteristic, # type: ignore
self.audio_location_characteristic, # type: ignore
self.volume_offset_control_point_characteristic, # type: ignore
self.audio_output_description_characteristic, # type: ignore
],
primary=False,
)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class VolumeOffsetControlServiceProxy(ProfileServiceProxy):
SERVICE_CLASS = VolumeOffsetControlService
def __init__(self, service_proxy: ServiceProxy) -> None:
self.service_proxy = service_proxy
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC
)
):
raise InvalidServiceError("Volume Offset State characteristic not found")
self.volume_offset_state = DelegatedCharacteristicAdapter(
characteristics[0], decode=VolumeOffsetState.from_bytes
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_LOCATION_CHARACTERISTIC
)
):
raise InvalidServiceError("Audio Location characteristic not found")
self.audio_location = DelegatedCharacteristicAdapter(
characteristics[0],
encode=lambda value: bytes(value),
decode=VocsAudioLocation.from_bytes,
)
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC
)
):
raise InvalidServiceError(
"Volume Offset Control Point characteristic not found"
)
self.volume_offset_control_point = characteristics[0]
if not (
characteristics := service_proxy.get_characteristics_by_uuid(
GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC
)
):
raise InvalidServiceError(
"Audio Output Description characteristic not found"
)
self.audio_output_description = UTF8CharacteristicAdapter(characteristics[0])

View File

@@ -344,9 +344,6 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica ] # Keep a copy so we can re-serialize to an exact replica
return result return result
def to_bytes(self):
return bytes(self)
def __bytes__(self): def __bytes__(self):
# Return early if we have a cache # Return early if we have a cache
if self.bytes: if self.bytes:
@@ -623,11 +620,8 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset): def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.pdu
def __str__(self): def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]' result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'

View File

@@ -298,11 +298,8 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None: def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.pdu
def __str__(self): def __str__(self):
result = color(self.name, 'yellow') result = color(self.name, 'yellow')
@@ -698,6 +695,7 @@ class Session:
self.ltk_ediv = 0 self.ltk_ediv = 0
self.ltk_rand = bytes(8) self.ltk_rand = bytes(8)
self.link_key: Optional[bytes] = None self.link_key: Optional[bytes] = None
self.maximum_encryption_key_size: int = 0
self.initiator_key_distribution: int = 0 self.initiator_key_distribution: int = 0
self.responder_key_distribution: int = 0 self.responder_key_distribution: int = 0
self.peer_random_value: Optional[bytes] = None self.peer_random_value: Optional[bytes] = None
@@ -744,6 +742,10 @@ class Session:
else: else:
self.pairing_result = None self.pairing_result = None
self.maximum_encryption_key_size = (
pairing_config.delegate.maximum_encryption_key_size
)
# Key Distribution (default values before negotiation) # Key Distribution (default values before negotiation)
self.initiator_key_distribution = ( self.initiator_key_distribution = (
pairing_config.delegate.local_initiator_key_distribution pairing_config.delegate.local_initiator_key_distribution
@@ -996,7 +998,7 @@ class Session:
io_capability=self.io_capability, io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag, oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req, auth_req=self.auth_req,
maximum_encryption_key_size=16, maximum_encryption_key_size=self.maximum_encryption_key_size,
initiator_key_distribution=self.initiator_key_distribution, initiator_key_distribution=self.initiator_key_distribution,
responder_key_distribution=self.responder_key_distribution, responder_key_distribution=self.responder_key_distribution,
) )
@@ -1008,7 +1010,7 @@ class Session:
io_capability=self.io_capability, io_capability=self.io_capability,
oob_data_flag=self.oob_data_flag, oob_data_flag=self.oob_data_flag,
auth_req=self.auth_req, auth_req=self.auth_req,
maximum_encryption_key_size=16, maximum_encryption_key_size=self.maximum_encryption_key_size,
initiator_key_distribution=self.initiator_key_distribution, initiator_key_distribution=self.initiator_key_distribution,
responder_key_distribution=self.responder_key_distribution, responder_key_distribution=self.responder_key_distribution,
) )
@@ -1839,7 +1841,7 @@ class Session:
if self.is_initiator: if self.is_initiator:
if self.pairing_method == PairingMethod.OOB: if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command() self.send_pairing_random_command()
else: elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command() self.send_pairing_confirm_command()
else: else:
if self.pairing_method == PairingMethod.PASSKEY: if self.pairing_method == PairingMethod.PASSKEY:
@@ -1949,7 +1951,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}' f'{connection.peer_address}: {command}'
) )
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, command.to_bytes()) connection.send_l2cap_pdu(cid, bytes(command))
def on_smp_security_request_command( def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command self, connection: Connection, request: SMP_Security_Request_Command

View File

@@ -370,11 +370,13 @@ class PumpedPacketSource(ParserSource):
self.parser.feed_data(packet) self.parser.feed_data(packet)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug('source pump task done') logger.debug('source pump task done')
self.terminated.set_result(None) if not self.terminated.done():
self.terminated.set_result(None)
break break
except Exception as error: except Exception as error:
logger.warning(f'exception while waiting for packet: {error}') logger.warning(f'exception while waiting for packet: {error}')
self.terminated.set_exception(error) if not self.terminated.done():
self.terminated.set_exception(error)
break break
self.pump_task = asyncio.create_task(pump_packets()) self.pump_task = asyncio.create_task(pump_packets())

View File

@@ -149,7 +149,10 @@ async def open_usb_transport(spec: str) -> Transport:
if status != usb1.TRANSFER_COMPLETED: if status != usb1.TRANSFER_COMPLETED:
logger.warning( logger.warning(
color(f'!!! OUT transfer not completed: status={status}', 'red') color(
f'!!! OUT transfer not completed: status={status}',
'red',
)
) )
async def process_queue(self): async def process_queue(self):
@@ -275,7 +278,10 @@ async def open_usb_transport(spec: str) -> Transport:
) )
else: else:
logger.warning( logger.warning(
color(f'!!! IN transfer not completed: status={status}', 'red') color(
f'!!! IN[{packet_type}] transfer not completed: status={status}',
'red',
)
) )
self.loop.call_soon_threadsafe(self.on_transport_lost) self.loop.call_soon_threadsafe(self.on_transport_lost)

View File

@@ -24,17 +24,19 @@ import logging
import sys import sys
import warnings import warnings
from typing import ( from typing import (
Awaitable,
Set,
TypeVar,
List,
Tuple,
Callable,
Any, Any,
Awaitable,
Callable,
List,
Optional, Optional,
Protocol,
Set,
Tuple,
TypeVar,
Union, Union,
overload, overload,
) )
from typing_extensions import Self
from pyee import EventEmitter from pyee import EventEmitter
@@ -487,3 +489,16 @@ class OpenIntEnum(enum.IntEnum):
obj._value_ = value obj._value_ = value
obj._name_ = f"{cls.__name__}[{value}]" obj._name_ = f"{cls.__name__}[{value}]"
return obj return obj
# -----------------------------------------------------------------------------
class ByteSerializable(Protocol):
"""
Type protocol for classes that can be instantiated from bytes and serialized
to bytes.
"""
@classmethod
def from_bytes(cls, data: bytes) -> Self: ...
def __bytes__(self) -> bytes: ...

View File

@@ -16,6 +16,7 @@
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
import struct import struct
from typing import Dict, Optional, Type
from bumble.hci import ( from bumble.hci import (
name_or_number, name_or_number,
@@ -24,7 +25,9 @@ from bumble.hci import (
HCI_Constant, HCI_Constant,
HCI_Object, HCI_Object,
HCI_Command, HCI_Command,
HCI_Vendor_Event, HCI_Event,
HCI_Extended_Event,
HCI_VENDOR_EVENT,
STATUS_SPEC, STATUS_SPEC,
) )
@@ -48,7 +51,6 @@ HCI_DYNAMIC_AUDIO_BUFFER_COMMAND = hci_vendor_command_op_code(0x15F)
HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58 HCI_BLUETOOTH_QUALITY_REPORT_EVENT = 0x58
HCI_Command.register_commands(globals()) HCI_Command.register_commands(globals())
HCI_Vendor_Event.register_subevents(globals())
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -279,7 +281,29 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Vendor_Event.event( class HCI_Android_Vendor_Event(HCI_Extended_Event):
event_code: int = HCI_VENDOR_EVENT
subevent_classes: Dict[int, Type[HCI_Extended_Event]] = {}
@classmethod
def subclass_from_parameters(
cls, parameters: bytes
) -> Optional[HCI_Extended_Event]:
subevent_code = parameters[0]
if subevent_code == HCI_BLUETOOTH_QUALITY_REPORT_EVENT:
quality_report_id = parameters[1]
if quality_report_id in (0x01, 0x02, 0x03, 0x04, 0x07, 0x08, 0x09):
return HCI_Bluetooth_Quality_Report_Event.from_parameters(parameters)
return None
HCI_Android_Vendor_Event.register_subevents(globals())
HCI_Event.vendor_factory = HCI_Android_Vendor_Event.subclass_from_parameters
# -----------------------------------------------------------------------------
@HCI_Extended_Event.event(
fields=[ fields=[
('quality_report_id', 1), ('quality_report_id', 1),
('packet_types', 1), ('packet_types', 1),
@@ -308,10 +332,11 @@ class HCI_Dynamic_Audio_Buffer_Command(HCI_Command):
('tx_last_subevent_packets', 4), ('tx_last_subevent_packets', 4),
('crc_error_packets', 4), ('crc_error_packets', 4),
('rx_duplicate_packets', 4), ('rx_duplicate_packets', 4),
('rx_unreceived_packets', 4),
('vendor_specific_parameters', '*'), ('vendor_specific_parameters', '*'),
] ]
) )
class HCI_Bluetooth_Quality_Report_Event(HCI_Vendor_Event): class HCI_Bluetooth_Quality_Report_Event(HCI_Android_Vendor_Event):
# pylint: disable=line-too-long # pylint: disable=line-too-long
''' '''
See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event See https://source.android.com/docs/core/connect/bluetooth/hci_requirements#bluetooth-quality-report-sub-event

View File

@@ -17,3 +17,4 @@ USB vendor ID and product ID.
Drivers included in the module are: Drivers included in the module are:
* [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles. * [Realtek](realtek.md): Loading of Firmware and Config for Realtek USB dongles.
* [Intel](intel.md): Loading of Firmware and Config for Intel USB controllers.

View File

@@ -0,0 +1,73 @@
INTEL DRIVER
==============
This driver supports loading firmware images and optional config data to
Intel USB controllers.
A number of USB dongles are supported, but likely not all.
The initial implementation has been tested on BE200 and AX210 controllers.
When using a USB controller, the USB product ID and vendor ID are used
to find whether a matching set of firmware image and config data
is needed for that specific model. If a match exists, the driver will try
load the firmware image and, if needed, config data.
Alternatively, the metadata property ``driver=intel`` may be specified in a transport
name to force that driver to be used (ex: ``usb:[driver=intel]0`` instead of just
``usb:0`` for the first USB device).
The driver will look for the firmware and config files by name, in order, in:
* The directory specified by the environment variable `BUMBLE_INTEL_FIRMWARE_DIR`
if set.
* The directory `<package-dir>/drivers/intel_fw` where `<package-dir>` is the directory
where the `bumble` package is installed.
* The current directory.
It is also possible to override or extend the config data with parameters passed via the
transport name. The driver name `intel` may be suffixed with `/<param:value>[+<param:value>]...`
The supported params are:
* `ddc_addon`: configuration data to add to the data loaded from the config data file
* `ddc_override`: configuration data to use instead of the data loaded from the config data file.
With both `dcc_addon` and `dcc_override`, the param value is a hex-encoded byte array containing
the config data (same format as the config file).
Example transport name:
`usb:[driver=intel/dcc_addon:03E40200]0`
Obtaining Firmware Images and Config Data
-----------------------------------------
Firmware images and config data may be obtained from a variety of online
sources.
To facilitate finding a downloading the, the utility program `bumble-intel-fw-download`
may be used.
```
Usage: bumble-intel-fw-download [OPTIONS]
Download Intel firmware images and configs.
Options:
--output-dir TEXT Output directory where the files will be saved.
Defaults to the OS-specific app data dir, which the
driver will check when trying to find firmware
--source [linux-kernel] [default: linux-kernel]
--single TEXT Only download a single image set, by its base name
--force Overwrite files if they already exist
--help Show this message and exit.
```
Utility
-------
The `bumble-intel-util` utility may be used to interact with an Intel USB controller.
```
Usage: bumble-intel-util [OPTIONS] COMMAND [ARGS]...
Options:
--help Show this message and exit.
Commands:
bootloader Reboot in bootloader mode.
info Get the firmware info.
load Load a firmware image.
```

View File

@@ -282,7 +282,7 @@ async def keyboard_device(device, command):
GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC, GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC,
Characteristic.Properties.READ, Characteristic.Properties.READ,
Characteristic.READABLE, Characteristic.READABLE,
'Bumble', bytes('Bumble', 'utf-8'),
) )
], ],
), ),

View File

@@ -127,7 +127,7 @@ async def main() -> None:
'486F64C6-4B5F-4B3B-8AFF-EDE134A8446A', '486F64C6-4B5F-4B3B-8AFF-EDE134A8446A',
Characteristic.Properties.READ | Characteristic.Properties.NOTIFY, Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
Characteristic.READABLE, Characteristic.READABLE,
'hello', bytes('hello', 'utf-8'),
), ),
], ],
) )

View File

@@ -0,0 +1,319 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio
import dataclasses
import logging
import os
import random
import struct
import sys
from typing import Any, List, Union
from bumble.device import Connection, Device, Peer
from bumble import transport
from bumble import gatt
from bumble import hci
from bumble import core
# -----------------------------------------------------------------------------
SERVICE_UUID = core.UUID("50DB505C-8AC4-4738-8448-3B1D9CC09CC5")
CHARACTERISTIC_UUID_BASE = "D901B45B-4916-412E-ACCA-0000000000"
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CustomSerializableClass:
x: int
y: int
@classmethod
def from_bytes(cls, data: bytes) -> CustomSerializableClass:
return cls(*struct.unpack(">II", data))
def __bytes__(self) -> bytes:
return struct.pack(">II", self.x, self.y)
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CustomClass:
a: int
b: int
@classmethod
def decode(cls, data: bytes) -> CustomClass:
return cls(*struct.unpack(">II", data))
def encode(self) -> bytes:
return struct.pack(">II", self.a, self.b)
# -----------------------------------------------------------------------------
async def client(device: Device, address: hci.Address) -> None:
print(f'=== Connecting to {address}...')
connection = await device.connect(address)
print('=== Connected')
# Discover all characteristics.
peer = Peer(connection)
print("*** Discovering services and characteristics...")
await peer.discover_all()
print("*** Discovery complete")
service = peer.get_services_by_uuid(SERVICE_UUID)[0]
characteristics = []
for index in range(1, 9):
characteristics.append(
service.get_characteristics_by_uuid(
CHARACTERISTIC_UUID_BASE + f"{index:02X}"
)[0]
)
# Read all characteristics as raw bytes.
for characteristic in characteristics:
value = await characteristic.read_value()
print(f"### {characteristic} = {value} ({value.hex()})")
# Static characteristic with a bytes value.
c1 = characteristics[0]
c1_value = await c1.read_value()
print(f"@@@ C1 {c1} value = {c1_value} (type={type(c1_value)})")
await c1.write_value("happy π day".encode("utf-8"))
# Static characteristic with a string value.
c2 = gatt.UTF8CharacteristicAdapter(characteristics[1])
c2_value = await c2.read_value()
print(f"@@@ C2 {c2} value = {c2_value} (type={type(c2_value)})")
await c2.write_value("happy π day")
# Static characteristic with a tuple value.
c3 = gatt.PackedCharacteristicAdapter(characteristics[2], ">III")
c3_value = await c3.read_value()
print(f"@@@ C3 {c3} value = {c3_value} (type={type(c3_value)})")
await c3.write_value((2001, 2002, 2003))
# Static characteristic with a named tuple value.
c4 = gatt.MappedCharacteristicAdapter(
characteristics[3], ">III", ["f1", "f2", "f3"]
)
c4_value = await c4.read_value()
print(f"@@@ C4 {c4} value = {c4_value} (type={type(c4_value)})")
await c4.write_value({"f1": 4001, "f2": 4002, "f3": 4003})
# Static characteristic with a serializable value.
c5 = gatt.SerializableCharacteristicAdapter(
characteristics[4], CustomSerializableClass
)
c5_value = await c5.read_value()
print(f"@@@ C5 {c5} value = {c5_value} (type={type(c5_value)})")
await c5.write_value(CustomSerializableClass(56, 57))
# Static characteristic with a delegated value.
c6 = gatt.DelegatedCharacteristicAdapter(
characteristics[5], encode=CustomClass.encode, decode=CustomClass.decode
)
c6_value = await c6.read_value()
print(f"@@@ C6 {c6} value = {c6_value} (type={type(c6_value)})")
await c6.write_value(CustomClass(6, 7))
# Dynamic characteristic with a bytes value.
c7 = characteristics[6]
c7_value = await c7.read_value()
print(f"@@@ C7 {c7} value = {c7_value} (type={type(c7_value)})")
await c7.write_value(bytes.fromhex("01020304"))
# Dynamic characteristic with a string value.
c8 = gatt.UTF8CharacteristicAdapter(characteristics[7])
c8_value = await c8.read_value()
print(f"@@@ C8 {c8} value = {c8_value} (type={type(c8_value)})")
await c8.write_value("howdy")
# -----------------------------------------------------------------------------
def dynamic_read(selector: str) -> Union[bytes, str]:
if selector == "bytes":
print("$$$ Returning random bytes")
return random.randbytes(7)
elif selector == "string":
print("$$$ Returning random string")
return random.randbytes(7).hex()
raise ValueError("invalid selector")
# -----------------------------------------------------------------------------
def dynamic_write(selector: str, value: Any) -> None:
print(f"$$$ Received[{selector}]: {value} (type={type(value)})")
# -----------------------------------------------------------------------------
def on_characteristic_read(characteristic: gatt.Characteristic, value: Any) -> None:
"""Event listener invoked when a characteristic is read."""
print(f"<<< READ: {characteristic} -> {value} ({type(value)})")
# -----------------------------------------------------------------------------
def on_characteristic_write(characteristic: gatt.Characteristic, value: Any) -> None:
"""Event listener invoked when a characteristic is written."""
print(f"<<< WRITE: {characteristic} <- {value} ({type(value)})")
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 2:
print("Usage: run_gatt_with_adapters.py <transport-spec> [<bluetooth-address>]")
print("example: run_gatt_with_adapters.py usb:0 E1:CA:72:48:C4:E8")
return
async with await transport.open_transport(sys.argv[1]) as hci_transport:
# Create a device to manage the host
device = Device.with_hci(
"Bumble",
hci.Address("F0:F1:F2:F3:F4:F5"),
hci_transport.source,
hci_transport.sink,
)
# Static characteristic with a bytes value.
c1 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "01",
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
b'hello',
)
# Static characteristic with a string value.
c2 = gatt.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "02",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
'hello',
)
)
# Static characteristic with a tuple value.
c3 = gatt.PackedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "03",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
(1007, 1008, 1009),
),
">III",
)
# Static characteristic with a named tuple value.
c4 = gatt.MappedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "04",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
{"f1": 3007, "f2": 3008, "f3": 3009},
),
">III",
["f1", "f2", "f3"],
)
# Static characteristic with a serializable value.
c5 = gatt.SerializableCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "05",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomSerializableClass(11, 12),
),
CustomSerializableClass,
)
# Static characteristic with a delegated value.
c6 = gatt.DelegatedCharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "06",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
CustomClass(1, 2),
),
encode=CustomClass.encode,
decode=CustomClass.decode,
)
# Dynamic characteristic with a bytes value.
c7 = gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "07",
gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("bytes"),
write=lambda connection, value: dynamic_write("bytes", value),
),
)
# Dynamic characteristic with a string value.
c8 = gatt.UTF8CharacteristicAdapter(
gatt.Characteristic(
CHARACTERISTIC_UUID_BASE + "08",
gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.WRITE,
gatt.Characteristic.READABLE | gatt.Characteristic.WRITEABLE,
gatt.CharacteristicValue(
read=lambda connection: dynamic_read("string"),
write=lambda connection, value: dynamic_write("string", value),
),
)
)
characteristics: List[
Union[gatt.Characteristic, gatt.CharacteristicAdapter]
] = [c1, c2, c3, c4, c5, c6, c7, c8]
# Listen for read and write events.
for characteristic in characteristics:
characteristic.on(
"read",
lambda _, value, c=characteristic: on_characteristic_read(c, value),
)
characteristic.on(
"write",
lambda _, value, c=characteristic: on_characteristic_write(c, value),
)
device.add_service(gatt.Service(SERVICE_UUID, characteristics)) # type: ignore
# Get things going
await device.power_on()
# Connect to a peer
if len(sys.argv) > 2:
await client(device, hci.Address(sys.argv[2]))
else:
await device.start_advertising(auto_restart=True)
await hci_transport.source.wait_for_termination()
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(main())

View File

@@ -21,9 +21,9 @@ import sys
import os import os
import io import io
import logging import logging
import websockets from typing import Iterable, Optional
from typing import Optional import websockets
import bumble.core import bumble.core
from bumble.device import Device, ScoLink from bumble.device import Device, ScoLink
@@ -82,6 +82,10 @@ def on_microphone_volume(level: int):
send_message(type='microphone_volume', level=level) send_message(type='microphone_volume', level=level)
def on_supported_audio_codecs(codecs: Iterable[hfp.AudioCodec]):
send_message(type='supported_audio_codecs', codecs=[codec.name for codec in codecs])
def on_sco_state_change(codec: int): def on_sco_state_change(codec: int):
if codec == hfp.AudioCodec.CVSD: if codec == hfp.AudioCodec.CVSD:
sample_rate = 8000 sample_rate = 8000
@@ -207,6 +211,7 @@ async def main() -> None:
ag_protocol = hfp.AgProtocol(dlc, configuration) ag_protocol = hfp.AgProtocol(dlc, configuration)
ag_protocol.on('speaker_volume', on_speaker_volume) ag_protocol.on('speaker_volume', on_speaker_volume)
ag_protocol.on('microphone_volume', on_microphone_volume) ag_protocol.on('microphone_volume', on_microphone_volume)
ag_protocol.on('supported_audio_codecs', on_supported_audio_codecs)
on_hfp_state_change(True) on_hfp_state_change(True)
dlc.multiplexer.l2cap_channel.on( dlc.multiplexer.l2cap_channel.on(
'close', lambda: on_hfp_state_change(False) 'close', lambda: on_hfp_state_change(False)
@@ -241,7 +246,7 @@ async def main() -> None:
# Pick the first one # Pick the first one
channel, version, hf_sdp_features = hfp_record channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}') print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}') print(f'HF features: {hf_sdp_features.name}')
# Request authentication # Request authentication
print('*** Authenticating...') print('*** Authenticating...')

View File

@@ -161,7 +161,13 @@ async def main() -> None:
else: else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb') file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration) if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
# Write a LC3 header. # Write a LC3 header.
file_output.write( file_output.write(
bytes([0x1C, 0xCC]) # Header. bytes([0x1C, 0xCC]) # Header.

View File

@@ -1,21 +1,131 @@
[build-system] [build-system]
requires = ["setuptools>=52", "wheel", "setuptools_scm>=6.2"] requires = ["setuptools>=61", "wheel", "setuptools_scm>=8"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project]
name = "bumble"
dynamic = ["version"]
description = "Bluetooth Stack for Apps, Emulation, Test and Experimentation"
readme = "README.md"
authors = [{ name = "Google", email = "bumble-dev@google.com" }]
requires-python = ">=3.8"
dependencies = [
"aiohttp ~= 3.8; platform_system!='Emscripten'",
"appdirs >= 1.4; platform_system!='Emscripten'",
"click >= 8.1.3; platform_system!='Emscripten'",
"cryptography >= 39; platform_system!='Emscripten'",
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
"cryptography >= 39.0; platform_system=='Emscripten'",
"grpcio >= 1.62.1; platform_system!='Emscripten'",
"humanize >= 4.6.0; platform_system!='Emscripten'",
"libusb1 >= 2.0.1; platform_system!='Emscripten'",
"libusb-package == 1.0.26.1; platform_system!='Emscripten'",
"platformdirs >= 3.10.0; platform_system!='Emscripten'",
"prompt_toolkit >= 3.0.16; platform_system!='Emscripten'",
"prettytable >= 3.6.0; platform_system!='Emscripten'",
"protobuf >= 3.12.4; platform_system!='Emscripten'",
"pyee >= 8.2.2",
"pyserial-asyncio >= 0.5; platform_system!='Emscripten'",
"pyserial >= 3.5; platform_system!='Emscripten'",
"pyusb >= 1.2; platform_system!='Emscripten'",
"websockets == 13.1; platform_system!='Emscripten'",
]
[project.optional-dependencies]
build = ["build >= 0.7"]
test = [
"pytest >= 8.2",
"pytest-asyncio >= 0.23.5",
"pytest-html >= 3.2.0",
"coverage >= 6.4",
]
development = [
"black == 24.3",
"bt-test-interfaces >= 0.0.6",
"grpcio-tools >= 1.62.1",
"invoke >= 1.7.3",
"mobly >= 1.12.2",
"mypy == 1.12.0",
"nox >= 2022",
"pylint == 3.3.1",
"pyyaml >= 6.0",
"types-appdirs >= 1.4.3",
"types-invoke >= 1.7.3",
"types-protobuf >= 4.21.0",
"wasmtime == 20.0.0",
]
avatar = [
"pandora-avatar == 0.0.10",
"rootcanal == 1.10.0 ; python_version>='3.10'",
]
pandora = ["bt-test-interfaces >= 0.0.6"]
documentation = [
"mkdocs >= 1.4.0",
"mkdocs-material >= 8.5.6",
"mkdocstrings[python] >= 0.19.0",
]
[project.scripts]
bumble-ble-rpa-tool = "bumble.apps.ble_rpa_tool:main"
bumble-console = "bumble.apps.console:main"
bumble-controller-info = "bumble.apps.controller_info:main"
bumble-controller-loopback = "bumble.apps.controller_loopback:main"
bumble-gatt-dump = "bumble.apps.gatt_dump:main"
bumble-hci-bridge = "bumble.apps.hci_bridge:main"
bumble-l2cap-bridge = "bumble.apps.l2cap_bridge:main"
bumble-rfcomm-bridge = "bumble.apps.rfcomm_bridge:main"
bumble-pair = "bumble.apps.pair:main"
bumble-scan = "bumble.apps.scan:main"
bumble-show = "bumble.apps.show:main"
bumble-unbond = "bumble.apps.unbond:main"
bumble-usb-probe = "bumble.apps.usb_probe:main"
bumble-link-relay = "bumble.apps.link_relay.link_relay:main"
bumble-bench = "bumble.apps.bench:main"
bumble-player = "bumble.apps.player.player:main"
bumble-speaker = "bumble.apps.speaker.speaker:main"
bumble-pandora-server = "bumble.apps.pandora_server:main"
bumble-rtk-util = "bumble.tools.rtk_util:main"
bumble-rtk-fw-download = "bumble.tools.rtk_fw_download:main"
bumble-intel-util = "bumble.tools.intel_util:main"
bumble-intel-fw-download = "bumble.tools.intel_fw_download:main"
[project.urls]
Homepage = "https://github.com/google/bumble"
[tool.setuptools]
packages = [
"bumble",
"bumble.transport",
"bumble.transport.grpc_protobuf",
"bumble.drivers",
"bumble.profiles",
"bumble.apps",
"bumble.apps.link_relay",
"bumble.pandora",
"bumble.tools",
]
[tool.setuptools.package-dir]
"bumble" = "bumble"
"bumble.apps" = "apps"
"bumble.tools" = "tools"
[tool.setuptools_scm] [tool.setuptools_scm]
write_to = "bumble/_version.py" write_to = "bumble/_version.py"
[tool.setuptools.package-data]
"*" = ["*.pyi", "py.typed"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
pythonpath = "." pythonpath = "."
testpaths = [ testpaths = ["tests"]
"tests"
]
[tool.pylint.master] [tool.pylint.master]
init-hook = 'import sys; sys.path.append(".")' init-hook = 'import sys; sys.path.append(".")'
ignore-paths = [ ignore-paths = ['.*_pb2(_grpc)?.py']
'.*_pb2(_grpc)?.py'
]
[tool.pylint.messages_control] [tool.pylint.messages_control]
max-line-length = "88" max-line-length = "88"
@@ -25,8 +135,8 @@ disable = [
"fixme", "fixme",
"logging-fstring-interpolation", "logging-fstring-interpolation",
"logging-not-lazy", "logging-not-lazy",
"no-member", # Temporary until pylint works better with class/method decorators "no-member", # Temporary until pylint works better with class/method decorators
"no-value-for-parameter", # Temporary until pylint works better with class/method decorators "no-value-for-parameter", # Temporary until pylint works better with class/method decorators
"missing-class-docstring", "missing-class-docstring",
"missing-function-docstring", "missing-function-docstring",
"missing-module-docstring", "missing-module-docstring",
@@ -41,11 +151,11 @@ disable = [
] ]
[tool.pylint.main] [tool.pylint.main]
ignore="pandora" # FIXME: pylint does not support stubs yet: ignore = "pandora" # FIXME: pylint does not support stubs yet:
[tool.pylint.typecheck] [tool.pylint.typecheck]
signature-mutators="AsyncRunner.run_in_task" signature-mutators = "AsyncRunner.run_in_task"
disable="not-callable" disable = "not-callable"
[tool.black] [tool.black]
skip-string-normalization = true skip-string-normalization = true
@@ -85,4 +195,3 @@ ignore_missing_imports = true
[[tool.mypy.overrides]] [[tool.mypy.overrides]]
module = "usb1.*" module = "usb1.*"
ignore_missing_imports = true ignore_missing_imports = true

View File

@@ -80,7 +80,7 @@ impl Address {
/// Creates a new [Address] object. /// Creates a new [Address] object.
pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> { pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> {
Python::with_gil(|py| { Python::with_gil(|py| {
PyModule::import(py, intern!(py, "bumble.device"))? PyModule::import(py, intern!(py, "bumble.hci"))?
.getattr(intern!(py, "Address"))? .getattr(intern!(py, "Address"))?
.call1((address, address_type)) .call1((address, address_type))
.map(|any| Self(any.into())) .map(|any| Self(any.into()))

111
setup.cfg
View File

@@ -1,111 +0,0 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[metadata]
name = bumble
use_scm_version = True
description = Bluetooth Stack for Apps, Emulation, Test and Experimentation
long_description = file: README.md
long_description_content_type = text/markdown
author = Google
author_email = tbd@tbd.com
url = https://github.com/google/bumble
[options]
python_requires = >=3.8
packages = bumble, bumble.transport, bumble.transport.grpc_protobuf, bumble.drivers, bumble.profiles, bumble.apps, bumble.apps.link_relay, bumble.pandora, bumble.tools
package_dir =
bumble = bumble
bumble.apps = apps
bumble.tools = tools
include_package_data = True
install_requires =
aiohttp ~= 3.8; platform_system!='Emscripten'
appdirs >= 1.4; platform_system!='Emscripten'
click >= 8.1.3; platform_system!='Emscripten'
cryptography == 39; platform_system!='Emscripten'
# Pyodide bundles a version of cryptography that is built for wasm, which may not match the
# versions available on PyPI. Relax the version requirement since it's better than being
# completely unable to import the package in case of version mismatch.
cryptography >= 39.0; platform_system=='Emscripten'
grpcio >= 1.62.1; platform_system!='Emscripten'
humanize >= 4.6.0; platform_system!='Emscripten'
libusb1 >= 2.0.1; platform_system!='Emscripten'
libusb-package == 1.0.26.1; platform_system!='Emscripten'
platformdirs >= 3.10.0; platform_system!='Emscripten'
prompt_toolkit >= 3.0.16; platform_system!='Emscripten'
prettytable >= 3.6.0; platform_system!='Emscripten'
protobuf >= 3.12.4; platform_system!='Emscripten'
pyee >= 8.2.2
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 12.0; platform_system!='Emscripten'
[options.entry_points]
console_scripts =
bumble-ble-rpa-tool = bumble.apps.ble_rpa_tool:main
bumble-console = bumble.apps.console:main
bumble-controller-info = bumble.apps.controller_info:main
bumble-controller-loopback = bumble.apps.controller_loopback:main
bumble-gatt-dump = bumble.apps.gatt_dump:main
bumble-hci-bridge = bumble.apps.hci_bridge:main
bumble-l2cap-bridge = bumble.apps.l2cap_bridge:main
bumble-rfcomm-bridge = bumble.apps.rfcomm_bridge:main
bumble-pair = bumble.apps.pair:main
bumble-scan = bumble.apps.scan:main
bumble-show = bumble.apps.show:main
bumble-unbond = bumble.apps.unbond:main
bumble-usb-probe = bumble.apps.usb_probe:main
bumble-link-relay = bumble.apps.link_relay.link_relay:main
bumble-bench = bumble.apps.bench:main
bumble-player = bumble.apps.player.player:main
bumble-speaker = bumble.apps.speaker.speaker:main
bumble-pandora-server = bumble.apps.pandora_server:main
bumble-rtk-util = bumble.tools.rtk_util:main
bumble-rtk-fw-download = bumble.tools.rtk_fw_download:main
[options.package_data]
* = py.typed, *.pyi
[options.extras_require]
build =
build >= 0.7
test =
pytest >= 8.2
pytest-asyncio >= 0.23.5
pytest-html >= 3.2.0
coverage >= 6.4
development =
black == 24.3
grpcio-tools >= 1.62.1
invoke >= 1.7.3
mobly >= 1.12.2
mypy == 1.12.0
nox >= 2022
pylint == 3.3.1
pyyaml >= 6.0
types-appdirs >= 1.4.3
types-invoke >= 1.7.3
types-protobuf >= 4.21.0
wasmtime == 20.0.0
avatar =
pandora-avatar == 0.0.10
rootcanal == 1.10.0 ; python_version>='3.10'
pandora =
bt-test-interfaces >= 0.0.6
documentation =
mkdocs >= 1.4.0
mkdocs-material >= 8.5.6
mkdocstrings[python] >= 0.19.0

View File

@@ -1,17 +0,0 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from setuptools import setup
setup()

View File

@@ -28,6 +28,7 @@ from bumble.profiles.aics import (
AudioInputState, AudioInputState,
AICSServiceProxy, AICSServiceProxy,
GainMode, GainMode,
GainSettingsProperties,
AudioInputStatus, AudioInputStatus,
AudioInputControlPointOpCode, AudioInputControlPointOpCode,
ErrorCode, ErrorCode,
@@ -82,7 +83,12 @@ async def test_init_service(aics_client: AICSServiceProxy):
gain_mode=GainMode.MANUAL, gain_mode=GainMode.MANUAL,
change_counter=0, change_counter=0,
) )
assert await aics_client.gain_settings_properties.read_value() == (1, 0, 255) assert (
await aics_client.gain_settings_properties.read_value()
== GainSettingsProperties(
gain_settings_unit=1, gain_settings_minimum=0, gain_settings_maximum=255
)
)
assert await aics_client.audio_input_status.read_value() == ( assert await aics_client.audio_input_status.read_value() == (
AudioInputStatus.ACTIVE AudioInputStatus.ACTIVE
) )
@@ -481,12 +487,12 @@ async def test_set_automatic_gain_mode_when_automatic_only(
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_audio_input_description_initial_value(aics_client: AICSServiceProxy): async def test_audio_input_description_initial_value(aics_client: AICSServiceProxy):
description = await aics_client.audio_input_description.read_value() description = await aics_client.audio_input_description.read_value()
assert description.decode('utf-8') == "Bluetooth" assert description == "Bluetooth"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_audio_input_description_write_and_read(aics_client: AICSServiceProxy): async def test_audio_input_description_write_and_read(aics_client: AICSServiceProxy):
new_description = "Line Input".encode('utf-8') new_description = "Line Input"
await aics_client.audio_input_description.write_value(new_description) await aics_client.audio_input_description.write_value(new_description)

View File

@@ -39,6 +39,8 @@ from bumble.profiles.ascs import (
) )
from bumble.profiles.bap import ( from bumble.profiles.bap import (
AudioLocation, AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration, SupportedFrameDuration,
SupportedSamplingFrequency, SupportedSamplingFrequency,
SamplingFrequency, SamplingFrequency,
@@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pacs(): async def test_pacs():

View File

@@ -19,9 +19,7 @@ import asyncio
import functools import functools
import logging import logging
import os import os
from types import LambdaType
import pytest import pytest
from unittest import mock
from bumble.core import ( from bumble.core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
@@ -29,7 +27,13 @@ from bumble.core import (
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
ConnectionParameters, ConnectionParameters,
) )
from bumble.device import AdvertisingParameters, Connection, Device from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import AclPacketQueue, Host from bumble.host import AclPacketQueue, Host
from bumble.hci import ( from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -265,7 +269,8 @@ async def test_flush():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_legacy_advertising(): async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host)) device = TwoDevices()[0]
await device.power_on()
# Start advertising # Start advertising
await device.start_advertising() await device.start_advertising()
@@ -283,7 +288,10 @@ async def test_legacy_advertising():
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart): async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host)) devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart) await device.start_advertising(auto_restart=auto_restart)
device.on_connection( device.on_connection(
@@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier() await async_barrier()
if auto_restart: if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising assert device.is_advertising
else: else:
assert not device.is_advertising assert not device.is_advertising
@@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising(): async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host)) device = TwoDevices()[0]
await device.power_on()
# Start advertising # Start advertising
advertising_set = await device.create_advertising_set() advertising_set = await device.create_advertising_set()
@@ -332,7 +346,8 @@ async def test_extended_advertising():
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type): async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) device = TwoDevices()[0]
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type):
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type): async def test_extended_advertising_connection_out_of_order(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) devices = TwoDevices()
peer_address = Address('F0:F1:F2:F3:F4:F5') device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
) )
@@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
peer_address, Address('F0:F1:F2:F3:F4:F5'),
None, None,
None, None,
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
@@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier() await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_remote_le_features(): async def test_get_remote_le_features():

View File

@@ -15,11 +15,13 @@
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
# Imports # Imports
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
from __future__ import annotations
import asyncio import asyncio
import logging import logging
import os import os
import struct import struct
import pytest import pytest
from typing_extensions import Self
from unittest.mock import AsyncMock, Mock, ANY from unittest.mock import AsyncMock, Mock, ANY
from bumble.controller import Controller from bumble.controller import Controller
@@ -31,6 +33,7 @@ from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC, GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter, CharacteristicAdapter,
SerializableCharacteristicAdapter,
DelegatedCharacteristicAdapter, DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter, PackedCharacteristicAdapter,
MappedCharacteristicAdapter, MappedCharacteristicAdapter,
@@ -57,7 +60,7 @@ from .test_utils import async_barrier
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def basic_check(x): def basic_check(x):
pdu = x.to_bytes() pdu = bytes(x)
parsed = ATT_PDU.from_bytes(pdu) parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x) x_str = str(x)
parsed_str = str(parsed) parsed_str = str(parsed)
@@ -74,7 +77,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u)) v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes()) w = UUID.from_bytes(bytes(v))
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234) u1 = UUID.from_16_bits(0x1234)
@@ -310,7 +313,7 @@ async def test_attribute_getters():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_CharacteristicAdapter(): async def test_CharacteristicAdapter() -> None:
# Check that the CharacteristicAdapter base class is transparent # Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3]) v = bytes([1, 2, 3])
c = Characteristic( c = Characteristic(
@@ -329,67 +332,94 @@ async def test_CharacteristicAdapter():
assert c.value == v assert c.value == v
# Simple delegated adapter # Simple delegated adapter
a = DelegatedCharacteristicAdapter( delegated = DelegatedCharacteristicAdapter(
c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)) c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))
) )
value = await a.read_value(None) delegated_value = await delegated.read_value(None)
assert value == bytes(reversed(v)) assert delegated_value == bytes(reversed(v))
v = bytes([3, 4, 5]) delegated_value2 = bytes([3, 4, 5])
await a.write_value(None, v) await delegated.write_value(None, delegated_value2)
assert a.value == bytes(reversed(v)) assert delegated.value == bytes(reversed(delegated_value2))
# Packed adapter with single element format # Packed adapter with single element format
v = 1234 packed_value_ref = 1234
pv = struct.pack('>H', v) packed_value_bytes = struct.pack('>H', packed_value_ref)
c.value = v c.value = packed_value_ref
a = PackedCharacteristicAdapter(c, '>H') packed = PackedCharacteristicAdapter(c, '>H')
value = await a.read_value(None) packed_value_read = await packed.read_value(None)
assert value == pv assert packed_value_read == packed_value_bytes
c.value = None c.value = b''
await a.write_value(None, pv) await packed.write_value(None, packed_value_bytes)
assert a.value == v assert packed.value == packed_value_ref
# Packed adapter with multi-element format # Packed adapter with multi-element format
v1 = 1234 v1 = 1234
v2 = 5678 v2 = 5678
pv = struct.pack('>HH', v1, v2) packed_multi_value_bytes = struct.pack('>HH', v1, v2)
c.value = (v1, v2) c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH') packed_multi = PackedCharacteristicAdapter(c, '>HH')
value = await a.read_value(None) packed_multi_read_value = await packed_multi.read_value(None)
assert value == pv assert packed_multi_read_value == packed_multi_value_bytes
c.value = None packed_multi.value = b''
await a.write_value(None, pv) await packed_multi.write_value(None, packed_multi_value_bytes)
assert a.value == (v1, v2) assert packed_multi.value == (v1, v2)
# Mapped adapter # Mapped adapter
v1 = 1234 v1 = 1234
v2 = 5678 v2 = 5678
pv = struct.pack('>HH', v1, v2) packed_mapped_value_bytes = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2} mapped = {'v1': v1, 'v2': v2}
c.value = mapped c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) packed_mapped = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = await a.read_value(None) packed_mapped_read_value = await packed_mapped.read_value(None)
assert value == pv assert packed_mapped_read_value == packed_mapped_value_bytes
c.value = None c.value = b''
await a.write_value(None, pv) await packed_mapped.write_value(None, packed_mapped_value_bytes)
assert a.value == mapped assert packed_mapped.value == mapped
# UTF-8 adapter # UTF-8 adapter
v = 'Hello π' string_value = 'Hello π'
ev = v.encode('utf-8') string_value_bytes = string_value.encode('utf-8')
c.value = v c.value = string_value
a = UTF8CharacteristicAdapter(c) string_c = UTF8CharacteristicAdapter(c)
value = await a.read_value(None) string_read_value = await string_c.read_value(None)
assert value == ev assert string_read_value == string_value_bytes
c.value = None c.value = b''
await a.write_value(None, ev) await string_c.write_value(None, string_value_bytes)
assert a.value == v assert string_c.value == string_value
# Class adapter
class BlaBla:
def __init__(self, a: int, b: int) -> None:
self.a = a
self.b = b
@classmethod
def from_bytes(cls, data: bytes) -> Self:
a, b = struct.unpack(">II", data)
return cls(a, b)
def __bytes__(self) -> bytes:
return struct.pack(">II", self.a, self.b)
class_value = BlaBla(3, 4)
class_value_bytes = struct.pack(">II", 3, 4)
c.value = class_value
class_c = SerializableCharacteristicAdapter(c, BlaBla)
class_read_value = await class_c.read_value(None)
assert class_read_value == class_value_bytes
c.value = b''
await class_c.write_value(None, class_value_bytes)
assert isinstance(c.value, BlaBla)
assert c.value.a == 3
assert c.value.b == 4
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@@ -851,7 +881,12 @@ async def test_unsubscribe():
await async_barrier() await async_barrier()
mock1.assert_called_once_with(ANY, True, False) mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe() assert len(server.gatt_server.subscribers) == 1
def callback(_):
pass
await c2.subscribe(callback)
await async_barrier() await async_barrier()
mock2.assert_called_once_with(ANY, True, False) mock2.assert_called_once_with(ANY, True, False)
@@ -861,10 +896,16 @@ async def test_unsubscribe():
mock1.assert_called_once_with(ANY, False, False) mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock() mock2.reset_mock()
await c2.unsubscribe() await c2.unsubscribe(callback)
await async_barrier() await async_barrier()
mock2.assert_called_once_with(ANY, False, False) mock2.assert_called_once_with(ANY, False, False)
# All CCCDs should be zeros now
assert list(server.gatt_server.subscribers.values())[0] == {
c1.handle: bytes([0, 0]),
c2.handle: bytes([0, 0]),
}
mock1.reset_mock() mock1.reset_mock()
await c1.unsubscribe() await c1.unsubscribe()
await async_barrier() await async_barrier()

84
tests/gmap_test.py Normal file
View File

@@ -0,0 +1,84 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
import pytest_asyncio
from bumble import device
from bumble.profiles.gmap import (
GamingAudioService,
GamingAudioServiceProxy,
GmapRole,
UggFeatures,
UgtFeatures,
BgrFeatures,
BgsFeatures,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
gmas_service = GamingAudioService(
gmap_role=GmapRole.UNICAST_GAME_GATEWAY
| GmapRole.UNICAST_GAME_TERMINAL
| GmapRole.BROADCAST_GAME_RECEIVER
| GmapRole.BROADCAST_GAME_SENDER,
ugg_features=UggFeatures.UGG_MULTISINK,
ugt_features=UgtFeatures.UGT_SOURCE,
bgr_features=BgrFeatures.BGR_MULTISINK,
bgs_features=BgsFeatures.BGS_96_KBPS,
)
@pytest_asyncio.fixture
async def gmap_client():
devices = TwoDevices()
devices[0].add_service(gmas_service)
await devices.setup_connection()
assert devices.connections[0]
assert devices.connections[1]
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
peer = device.Peer(devices.connections[1])
gmap_client = await peer.discover_service_and_create_proxy(GamingAudioServiceProxy)
assert gmap_client
yield gmap_client
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_service(gmap_client: GamingAudioServiceProxy):
assert (
await gmap_client.gmap_role.read_value()
== GmapRole.UNICAST_GAME_GATEWAY
| GmapRole.UNICAST_GAME_TERMINAL
| GmapRole.BROADCAST_GAME_RECEIVER
| GmapRole.BROADCAST_GAME_SENDER
)
assert await gmap_client.ugg_features.read_value() == UggFeatures.UGG_MULTISINK
assert await gmap_client.ugt_features.read_value() == UgtFeatures.UGT_SOURCE
assert await gmap_client.bgr_features.read_value() == BgrFeatures.BGR_MULTISINK
assert await gmap_client.bgs_features.read_value() == BgsFeatures.BGS_96_KBPS

View File

@@ -75,13 +75,13 @@ from bumble.hci import (
def basic_check(x): def basic_check(x):
packet = x.to_bytes() packet = bytes(x)
print(packet.hex()) print(packet.hex())
parsed = HCI_Packet.from_bytes(packet) parsed = HCI_Packet.from_bytes(packet)
x_str = str(x) x_str = str(x)
parsed_str = str(parsed) parsed_str = str(parsed)
print(x_str) print(x_str)
parsed_bytes = parsed.to_bytes() parsed_bytes = bytes(parsed)
assert x_str == parsed_str assert x_str == parsed_str
assert packet == parsed_bytes assert packet == parsed_bytes
@@ -188,7 +188,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]), return_parameters=bytes([7]),
) )
basic_check(event) basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes()) event = HCI_Packet.from_bytes(bytes(event))
assert event.return_parameters == 7 assert event.return_parameters == 7
# With a simple status as an integer status # With a simple status as an integer status
@@ -562,7 +562,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2' '6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
) )
assert packet.to_bytes() == data assert bytes(packet) == data
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -61,7 +61,7 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature: def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return ( return (
hfp.HfSdpFeature.WIDE_BAND hfp.HfSdpFeature.WIDE_BAND_SPEECH
| hfp.HfSdpFeature.THREE_WAY_CALLING | hfp.HfSdpFeature.THREE_WAY_CALLING
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
) )
@@ -108,7 +108,7 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature: def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return ( return (
hfp.AgSdpFeature.WIDE_BAND hfp.AgSdpFeature.WIDE_BAND_SPEECH
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
| hfp.AgSdpFeature.THREE_WAY_CALLING | hfp.AgSdpFeature.THREE_WAY_CALLING
) )

View File

@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0]) result = await peer.discover_included_services(result[0])
assert len(result) == 2 assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID # Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes() assert bytes(result[1].uuid) == bytes(s3.uuid)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

184
tests/vocs_test.py Normal file
View File

@@ -0,0 +1,184 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import pytest
import pytest_asyncio
import struct
from bumble import device
from bumble.att import ATT_Error
from bumble.profiles.vocs import (
VolumeOffsetControlService,
ErrorCode,
MIN_VOLUME_OFFSET,
MAX_VOLUME_OFFSET,
SetVolumeOffsetOpCode,
VolumeOffsetControlServiceProxy,
VolumeOffsetState,
VocsAudioLocation,
)
from bumble.profiles.vcp import VolumeControlService, VolumeControlServiceProxy
from bumble.profiles.bap import AudioLocation
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Tests
# -----------------------------------------------------------------------------
vocs_service = VolumeOffsetControlService()
vcp_service = VolumeControlService(included_services=[vocs_service])
@pytest_asyncio.fixture
async def vocs_client():
devices = TwoDevices()
devices[0].add_service(vcp_service)
await devices.setup_connection()
assert devices.connections[0]
assert devices.connections[1]
devices.connections[0].encryption = 1
devices.connections[1].encryption = 1
peer = device.Peer(devices.connections[1])
vcp_client = await peer.discover_service_and_create_proxy(VolumeControlServiceProxy)
assert vcp_client
included_services = await peer.discover_included_services(vcp_client.service_proxy)
assert included_services
vocs_service_discovered = included_services[0]
await peer.discover_characteristics(service=vocs_service_discovered)
vocs_client = VolumeOffsetControlServiceProxy(vocs_service_discovered)
yield vocs_client
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_init_service(vocs_client: VolumeOffsetControlServiceProxy):
assert await vocs_client.volume_offset_state.read_value() == VolumeOffsetState(
volume_offset=0,
change_counter=0,
)
assert await vocs_client.audio_location.read_value() == VocsAudioLocation(
audio_location=AudioLocation.NOT_ALLOWED
)
description = await vocs_client.audio_output_description.read_value()
assert description == ''
@pytest.mark.asyncio
async def test_wrong_opcode_raise_error(vocs_client: VolumeOffsetControlServiceProxy):
with pytest.raises(ATT_Error) as e:
await vocs_client.volume_offset_control_point.write_value(
bytes(
[
0xFF,
]
),
with_response=True,
)
assert e.value.error_code == ErrorCode.OPCODE_NOT_SUPPORTED
@pytest.mark.asyncio
async def test_wrong_change_counter_raise_error(
vocs_client: VolumeOffsetControlServiceProxy,
):
initial_offset = vocs_service.volume_offset_state.volume_offset
initial_counter = vocs_service.volume_offset_state.change_counter
wrong_counter = initial_counter + 1
with pytest.raises(ATT_Error) as e:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, wrong_counter, 0
),
with_response=True,
)
assert e.value.error_code == ErrorCode.INVALID_CHANGE_COUNTER
counter = await vocs_client.volume_offset_state.read_value()
assert counter == VolumeOffsetState(initial_offset, initial_counter)
@pytest.mark.asyncio
async def test_wrong_volume_offset_raise_error(
vocs_client: VolumeOffsetControlServiceProxy,
):
invalid_offset_low = MIN_VOLUME_OFFSET - 1
invalid_offset_high = MAX_VOLUME_OFFSET + 1
with pytest.raises(ATT_Error) as e_low:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, invalid_offset_low
),
with_response=True,
)
assert e_low.value.error_code == ErrorCode.VALUE_OUT_OF_RANGE
with pytest.raises(ATT_Error) as e_high:
await vocs_client.volume_offset_control_point.write_value(
struct.pack(
'<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, invalid_offset_high
),
with_response=True,
)
assert e_high.value.error_code == ErrorCode.VALUE_OUT_OF_RANGE
@pytest.mark.asyncio
async def test_set_volume_offset(vocs_client: VolumeOffsetControlServiceProxy):
await vocs_client.volume_offset_control_point.write_value(
struct.pack('<BBh', SetVolumeOffsetOpCode.SET_VOLUME_OFFSET, 0, -255),
)
assert await vocs_client.volume_offset_state.read_value() == VolumeOffsetState(
-255, 1
)
@pytest.mark.asyncio
async def test_set_audio_channel_location(vocs_client: VolumeOffsetControlServiceProxy):
new_audio_location = VocsAudioLocation(audio_location=AudioLocation.FRONT_LEFT)
await vocs_client.audio_location.write_value(
struct.pack('<I', new_audio_location.audio_location)
)
location = await vocs_client.audio_location.read_value()
assert location == new_audio_location
@pytest.mark.asyncio
async def test_set_audio_output_description(
vocs_client: VolumeOffsetControlServiceProxy,
):
new_description = 'Left Speaker'
await vocs_client.audio_output_description.write_value(new_description)
description = await vocs_client.audio_output_description.read_value()
assert description == new_description

130
tools/intel_fw_download.py Normal file
View File

@@ -0,0 +1,130 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import pathlib
import urllib.request
import urllib.error
import click
from bumble.colors import color
from bumble.drivers import intel
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
LINUX_KERNEL_GIT_SOURCE = "https://git.kernel.org/pub/scm/linux/kernel/git/firmware/linux-firmware.git/plain/intel"
# -----------------------------------------------------------------------------
# Functions
# -----------------------------------------------------------------------------
def download_file(base_url, name):
url = f"{base_url}/{name}"
with urllib.request.urlopen(url) as file:
data = file.read()
print(f"Downloaded {name}: {len(data)} bytes")
return data
# -----------------------------------------------------------------------------
@click.command
@click.option(
"--output-dir",
default="",
help="Output directory where the files will be saved. Defaults to the OS-specific"
"app data dir, which the driver will check when trying to find firmware",
show_default=True,
)
@click.option(
"--source",
type=click.Choice(["linux-kernel"]),
default="linux-kernel",
show_default=True,
)
@click.option("--single", help="Only download a single image set, by its base name")
@click.option("--force", is_flag=True, help="Overwrite files if they already exist")
def main(output_dir, source, single, force):
"""Download Intel firmware images and configs."""
# Check that the output dir exists
if output_dir == '':
output_dir = intel.intel_firmware_dir()
else:
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
print("Output dir does not exist or is not a directory")
return
base_url = {
"linux-kernel": LINUX_KERNEL_GIT_SOURCE,
}[source]
print("Downloading")
print(color("FROM:", "green"), base_url)
print(color("TO:", "green"), output_dir)
if single:
images = [(f"{single}.sfi", f"{single}.ddc")]
else:
images = [
(f"{base_name}.sfi", f"{base_name}.ddc")
for base_name in intel.INTEL_FW_IMAGE_NAMES
]
for fw_name, config_name in images:
print(color("---", "yellow"))
fw_image_out = output_dir / fw_name
if not force and fw_image_out.exists():
print(color(f"{fw_image_out} already exists, skipping", "red"))
continue
if config_name:
config_image_out = output_dir / config_name
if not force and config_image_out.exists():
print(color("f{config_image_out} already exists, skipping", "red"))
continue
try:
fw_image = download_file(base_url, fw_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {fw_name}: {error}")
continue
config_image = None
if config_name:
try:
config_image = download_file(base_url, config_name)
except urllib.error.HTTPError as error:
print(f"Failed to download {config_name}: {error}")
continue
fw_image_out.write_bytes(fw_image)
if config_image:
config_image_out.write_bytes(config_image)
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()

154
tools/intel_util.py Normal file
View File

@@ -0,0 +1,154 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import logging
import asyncio
import os
from typing import Any, Optional
import click
from bumble.colors import color
from bumble import transport
from bumble.drivers import intel
from bumble.host import Host
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def print_device_info(device_info: dict[intel.ValueType, Any]) -> None:
if (mode := device_info.get(intel.ValueType.CURRENT_MODE_OF_OPERATION)) is not None:
print(
color("MODE:", "yellow"),
mode.name,
)
print(color("DETAILS:", "yellow"))
for key, value in device_info.items():
print(f" {color(key.name, 'green')}: {value}")
# -----------------------------------------------------------------------------
async def get_driver(host: Host, force: bool) -> Optional[intel.Driver]:
# Create a driver
driver = await intel.Driver.for_host(host, force)
if driver is None:
print("Device does not appear to be an Intel device")
return None
return driver
# -----------------------------------------------------------------------------
async def do_info(usb_transport, force):
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_load(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.load_firmware()
# Get and print the device info
print_device_info(await driver.read_device_info())
# -----------------------------------------------------------------------------
async def do_bootloader(usb_transport: str, force: bool) -> None:
async with await transport.open_transport(usb_transport) as (
hci_source,
hci_sink,
):
host = Host(hci_source, hci_sink)
driver = await get_driver(host, force)
if driver is None:
return
# Reboot in bootloader mode
await driver.reboot_bootloader()
# -----------------------------------------------------------------------------
@click.group()
def main():
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Try to get the device info even if the USB info doesn't match",
)
def info(usb_transport, force):
"""Get the firmware info."""
asyncio.run(do_info(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Load even if the USB info doesn't match",
)
def load(usb_transport, force):
"""Load a firmware image."""
asyncio.run(do_load(usb_transport, force))
@main.command
@click.argument("usb_transport")
@click.option(
"--force",
is_flag=True,
default=False,
help="Attempt to reboot event if the USB info doesn't match",
)
def bootloader(usb_transport, force):
"""Reboot in bootloader mode."""
asyncio.run(do_bootloader(usb_transport, force))
# -----------------------------------------------------------------------------
if __name__ == '__main__':
main()