Compare commits

...

26 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
082d55af10 Merge pull request #599 from google/gbg/hfp-19
add super wide band constants
2024-11-25 07:47:40 -08:00
Gilles Boccon-Gibod
4c3fd5688d Merge pull request #600 from google/gbg/unify-to-bytes
only use `__bytes__` when not argument is needed.
2024-11-25 07:44:17 -08:00
Gilles Boccon-Gibod
9d3d5495ce only use __bytes__ when not argument is needed. 2024-11-23 15:56:14 -08:00
Gilles Boccon-Gibod
b3869f267c add super wide band constants 2024-11-23 09:27:03 -08:00
Gilles Boccon-Gibod
b57096abe2 Merge pull request #595 from wpiet/aics-opcode-fix
Amend Opcode value in `Audio Input Control Service`
2024-11-23 08:56:23 -08:00
Wojciech Pietraszewski
100bea6b41 Fix typos
Amends the typo in the `INACTIVE` field in `Audio Input Status` characteristic.
Amends the typo in the log message of `_set_gain_settings` method.
2024-11-21 18:29:44 +01:00
Wojciech Pietraszewski
63819bf9dd Amend Opcode value in Audio Input Control Service
Corrects the Audio Input Control Point
Opcode value for `Set Gain Setting` field.
2024-11-21 16:40:49 +01:00
zxzxwu
e3fdab4175 Merge pull request #593 from zxzxwu/periodic
Support Periodic Advertising
2024-11-19 17:22:37 +08:00
Josh Wu
bbcd14dbf0 Support Periodic Advertising 2024-11-19 16:27:13 +08:00
zxzxwu
01dc0d574b Merge pull request #590 from SergeantSerk/parse-scan-response-data
Correctly parse scan response from device config
2024-11-17 15:39:11 +08:00
zxzxwu
5e959d638e Merge pull request #591 from zxzxwu/auracast_scan
Improve Broadcast Scanning
2024-11-16 04:10:27 +08:00
Josh Wu
c88b32a406 Improve Broadcast Scanning 2024-11-16 02:02:28 +08:00
zxzxwu
5a72eefb89 Merge pull request #587 from zxzxwu/device
Replace HCI member imports in device.py
2024-11-13 15:25:32 +08:00
Josh Wu
430046944b Replace HCI member import in device.py 2024-11-12 16:53:21 +08:00
zxzxwu
21d23320eb Merge pull request #584 from zxzxwu/commands6.0
Add Core Spec 6.0 new commands support mapping
2024-11-12 04:17:24 +00:00
Serkan
d0990ee04d Correctly parse scan response from device config
Parses scan response data correctly just like advertising data
2024-11-07 21:49:33 +03:00
Josh Wu
2d88e853e8 Add Core Spec 6.0 new commands support mapping 2024-11-07 14:36:54 +08:00
Gilles Boccon-Gibod
a060a70fba Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-04 13:03:57 -08:00
Gilles Boccon-Gibod
a06394ad4a Merge pull request #582 from google/gbg/580
fix #580
2024-11-04 13:03:15 -08:00
Gilles Boccon-Gibod
a1414c2b5b add unsubscribe test 2024-11-03 19:08:27 -08:00
Gilles Boccon-Gibod
b2864dac2d fix #580 2024-11-02 10:29:40 -07:00
Gilles Boccon-Gibod
b78f895143 Merge pull request #579 from jmdietrich-gcx/unsubscribe_characteristic_in_gatt_client
Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
2024-10-31 04:07:02 -07:00
zxzxwu
c4e9726828 Merge pull request #581 from zxzxwu/context
[BAP] Add missing Unspecified context type
2024-10-31 11:04:25 +00:00
Gilles Boccon-Gibod
d4b8e8348a Merge pull request #574 from google/gbg/update-python-versions
remove test for deprecated Python 3.8 and add 3.13
2024-10-31 03:44:01 -07:00
Josh Wu
19debaa52e [BAP] Add missing Unspecified context type 2024-10-31 18:11:40 +08:00
Jan-Marcel Dietrich
73fe564321 Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
GATT Client's subscribe() adds the characteristic itself as subscriber.
Therefore the characteristic has to be removed in unsubscribe(), if it's
the last subscriber. Otherwise the clean up does not work correctly and
the CCCD never is set back to 0 in the remote device.
2024-10-30 07:34:22 +01:00
29 changed files with 963 additions and 675 deletions

View File

@@ -60,7 +60,7 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter): class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass @dataclasses.dataclass
class Broadcast(pyee.EventEmitter): class Broadcast(pyee.EventEmitter):
name: str name: str | None
sync: bumble.device.PeriodicAdvertisingSync sync: bumble.device.PeriodicAdvertisingSync
rssi: int = 0 rssi: int = 0
public_broadcast_announcement: Optional[ public_broadcast_announcement: Optional[
@@ -135,7 +135,8 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address, self.sync.advertiser_address,
color(self.sync.state.name, 'green'), color(self.sync.state.name, 'green'),
) )
print(f' {color("Name", "cyan")}: {self.name}') if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance: if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}') print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}') print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -174,7 +175,7 @@ class BroadcastScanner(pyee.EventEmitter):
print(color(' Codec ID:', 'yellow')) print(color(' Codec ID:', 'yellow'))
print( print(
color(' Coding Format: ', 'green'), color(' Coding Format: ', 'green'),
subgroup.codec_id.coding_format.name, subgroup.codec_id.codec_id.name,
) )
print( print(
color(' Company ID: ', 'green'), color(' Company ID: ', 'green'),
@@ -274,13 +275,24 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning() await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None: def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if ( if not (
broadcast_name := advertisement.data.get( ads := advertisement.data.get_all(
bumble.core.AdvertisingData.BROADCAST_NAME bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
) )
) is None: ) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
)
):
return return
assert isinstance(broadcast_name, str)
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
assert isinstance(broadcast_name, str) or broadcast_name is None
if broadcast := self.broadcasts.get(advertisement.address): if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement) broadcast.update(advertisement)
@@ -291,7 +303,7 @@ class BroadcastScanner(pyee.EventEmitter):
) )
async def on_new_broadcast( async def on_new_broadcast(
self, name: str, advertisement: bumble.device.Advertisement self, name: str | None, advertisement: bumble.device.Advertisement
) -> None: ) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync( periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address, advertiser_address=advertisement.address,
@@ -299,10 +311,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout, sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates, filter_duplicates=self.filter_duplicates,
) )
broadcast = self.Broadcast( broadcast = self.Broadcast(name, periodic_advertising_sync)
name,
periodic_advertising_sync,
)
broadcast.update(advertisement) broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast)) periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))

View File

@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]), return_parameters=bytes([hci.HCI_SUCCESS]),
) )
# Return a packet with 'respond to sender' set to True # Return a packet with 'respond to sender' set to True
return (response.to_bytes(), True) return (bytes(response), True)
return None return None

View File

@@ -486,7 +486,12 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine): def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration) if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
pcm = decode( pcm = decode(
codec_config.frame_duration.us, codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count, codec_config.audio_channel_allocation.channel_count,
@@ -495,11 +500,17 @@ class Speaker:
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm)) self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None: def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING: if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE: if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on( ase.cis_link.abort_on(
'disconnection', 'disconnection',
lc3_source_task( lc3_source_task(
@@ -514,10 +525,17 @@ class Speaker:
), ),
) )
else: else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase) ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED: elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration if (
assert isinstance(codec_config, bap.CodecSpecificConfiguration) not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
if ase.role == ascs.AudioRole.SOURCE: if ase.role == ascs.AudioRole.SOURCE:
setup_encoders( setup_encoders(
codec_config.sampling_frequency.hz, codec_config.sampling_frequency.hz,

View File

@@ -291,9 +291,6 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset): def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property @property
def is_command(self): def is_command(self):
return ((self.op_code >> 6) & 1) == 1 return ((self.op_code >> 6) & 1) == 1
@@ -303,7 +300,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1 return ((self.op_code >> 7) & 1) == 1
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.pdu
def __str__(self): def __str__(self):
result = color(self.name, 'yellow') result = color(self.name, 'yellow')

View File

@@ -314,7 +314,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}' f'{color("CONTROLLER -> HOST", "green")}: {packet}'
) )
if self.host: if self.host:
self.host.on_packet(packet.to_bytes()) self.host.on_packet(bytes(packet))
# This method allows the controller to emulate the same API as a transport source # This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self): async def wait_for_termination(self):
@@ -1192,7 +1192,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
''' '''
bd_addr = ( bd_addr = (
self._public_address.to_bytes() bytes(self._public_address)
if self._public_address is not None if self._public_address is not None
else bytes(6) else bytes(6)
) )
@@ -1543,6 +1543,41 @@ class Controller:
} }
return bytes([HCI_SUCCESS]) return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command): def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
''' '''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1557,6 +1592,27 @@ class Controller:
''' '''
return struct.pack('<BB', HCI_SUCCESS, 0xF0) return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command): def on_hci_le_read_transmit_power_command(self, _command):
''' '''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

File diff suppressed because it is too large Load Diff

View File

@@ -410,7 +410,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None: def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack( declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes() '<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
) )
super().__init__( super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes

View File

@@ -292,7 +292,7 @@ class Client:
logger.debug( logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}' f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
) )
self.send_gatt_pdu(command.to_bytes()) self.send_gatt_pdu(bytes(command))
async def send_request(self, request: ATT_PDU): async def send_request(self, request: ATT_PDU):
logger.debug( logger.debug(
@@ -310,7 +310,7 @@ class Client:
self.pending_request = request self.pending_request = request
try: try:
self.send_gatt_pdu(request.to_bytes()) self.send_gatt_pdu(bytes(request))
response = await asyncio.wait_for( response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT self.pending_response, GATT_REQUEST_TIMEOUT
) )
@@ -328,7 +328,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] ' f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}' f'{confirmation}'
) )
self.send_gatt_pdu(confirmation.to_bytes()) self.send_gatt_pdu(bytes(confirmation))
async def request_mtu(self, mtu: int) -> int: async def request_mtu(self, mtu: int) -> int:
# Check the range # Check the range
@@ -898,6 +898,12 @@ class Client:
) and subscriber in subscribers: ) and subscriber in subscribers:
subscribers.remove(subscriber) subscribers.remove(subscriber)
# The characteristic itself is added as subscriber. If it is the
# last remaining subscriber, we remove it, such that the clean up
# works correctly. Otherwise the CCCD never is set back to 0.
if len(subscribers) == 1 and characteristic in subscribers:
subscribers.remove(characteristic)
# Cleanup if we removed the last one # Cleanup if we removed the last one
if not subscribers: if not subscribers:
del subscriber_set[characteristic.handle] del subscriber_set[characteristic.handle]

View File

@@ -353,7 +353,7 @@ class Server(EventEmitter):
logger.debug( logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}' f'GATT Response from server: [0x{connection.handle:04X}] {response}'
) )
self.send_gatt_pdu(connection.handle, response.to_bytes()) self.send_gatt_pdu(connection.handle, bytes(response))
async def notify_subscriber( async def notify_subscriber(
self, self,
@@ -450,7 +450,7 @@ class Server(EventEmitter):
) )
try: try:
self.send_gatt_pdu(connection.handle, indication.to_bytes()) self.send_gatt_pdu(connection.handle, bytes(indication))
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT) await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error: except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red')) logger.warning(color('!!! GATT Indicate timeout', 'red'))

View File

@@ -915,6 +915,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3), HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3),
HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4), HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4),
HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1), HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1),
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+2),
HCI_LE_CS_WRITE_CACHED_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+3),
HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4), HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4),
HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5), HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5),
HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6), HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6),
@@ -940,6 +942,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3), HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3),
HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4), HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4),
HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5), HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5),
HCI_LE_CS_CREATE_CONFIG_COMMAND : 1 << (16*8+6),
HCI_LE_CS_REMOVE_CONFIG_COMMAND : 1 << (16*8+7),
HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0), HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0),
HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1), HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1),
HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2), HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2),
@@ -963,13 +967,20 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2), HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2),
HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3), HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3),
HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4), HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4),
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+5),
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+6),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES : 1 << (20*8+7),
HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2), HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2),
HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0), HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0),
HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1), HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1),
HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2), HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2),
HCI_LE_CS_TEST_COMMAND : 1 << (23*8+3),
HCI_LE_CS_TEST_END_COMMAND : 1 << (23*8+4),
HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0), HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0),
HCI_LE_CS_SECURITY_ENABLE_COMMAND : 1 << (24*8+1),
HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5), HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5),
HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6), HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6),
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND : 1 << (24*8+7),
HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0), HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0),
HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1), HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1),
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2), HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2),
@@ -1000,6 +1011,10 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4), HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4),
HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5), HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5),
HCI_LE_TEST_END_COMMAND : 1 << (28*8+6), HCI_LE_TEST_END_COMMAND : 1 << (28*8+6),
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND : 1 << (28*8+7),
HCI_LE_CS_SET_CHANNEL_CLASSIFICATION_COMMAND : 1 << (29*8+0),
HCI_LE_CS_SET_PROCEDURE_PARAMETERS_COMMAND : 1 << (29*8+1),
HCI_LE_CS_PROCEDURE_ENABLE_COMMAND : 1 << (29*8+2),
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3), HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3),
HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4), HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4),
HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5), HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5),
@@ -1136,11 +1151,21 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0), HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0),
HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1), HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1),
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2), HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2),
HCI_LE_SET_DECISION_DATA_COMMAND : 1 << (46*8+3),
HCI_LE_SET_DECISION_INSTRUCTIONS_COMMAND : 1 << (46*8+4),
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5), HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5),
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6), HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6),
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7), HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7),
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0), HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0),
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1), HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1),
HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (47*8+2),
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND : 1 << (47*8+3),
HCI_LE_SET_HOST_FEATURE_V2_COMMAND : 1 << (47*8+4),
HCI_LE_ADD_DEVICE_TO_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+5),
HCI_LE_REMOVE_DEVICE_FROM_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+6),
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
} }
# LE Supported Features # LE Supported Features
@@ -1457,7 +1482,7 @@ class CodingFormat:
vendor_specific_codec_id: int = 0 vendor_specific_codec_id: int = 0
@classmethod @classmethod
def parse_from_bytes(cls, data: bytes, offset: int): def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from( (codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset '<BHH', data, offset
) )
@@ -1467,14 +1492,15 @@ class CodingFormat:
vendor_specific_codec_id=vendor_specific_codec_id, vendor_specific_codec_id=vendor_specific_codec_id,
) )
def to_bytes(self) -> bytes: @classmethod
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def __bytes__(self) -> bytes:
return struct.pack( return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id '<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
) )
def __bytes__(self) -> bytes:
return self.to_bytes()
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class HCI_Constant: class HCI_Constant:
@@ -1691,7 +1717,7 @@ class HCI_Object:
field_length = len(field_bytes) field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr( elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes' field_value, '__bytes__'
): ):
field_bytes = bytes(field_value) field_bytes = bytes(field_value)
if isinstance(field_type, int) and 4 < field_type <= 256: if isinstance(field_type, int) and 4 < field_type <= 256:
@@ -1736,7 +1762,7 @@ class HCI_Object:
def from_bytes(cls, data, offset, fields): def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields)) return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def to_bytes(self): def __bytes__(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields) return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@staticmethod @staticmethod
@@ -1831,9 +1857,6 @@ class HCI_Object:
for field_name, field_value in field_strings for field_name, field_value in field_strings
) )
def __bytes__(self):
return self.to_bytes()
def __init__(self, fields, **kwargs): def __init__(self, fields, **kwargs):
self.fields = fields self.fields = fields
self.init_from_fields(self, fields, kwargs) self.init_from_fields(self, fields, kwargs)
@@ -2008,9 +2031,6 @@ class Address:
def is_static(self): def is_static(self):
return self.is_random and (self.address_bytes[5] >> 6 == 3) return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_bytes(self):
return self.address_bytes
def to_string(self, with_type_qualifier=True): def to_string(self, with_type_qualifier=True):
''' '''
String representation of the address, MSB first, with an optional type String representation of the address, MSB first, with an optional type
@@ -2022,7 +2042,7 @@ class Address:
return result + '/P' return result + '/P'
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.address_bytes
def __hash__(self): def __hash__(self):
return hash(self.address_bytes) return hash(self.address_bytes)
@@ -2228,16 +2248,13 @@ class HCI_Command(HCI_Packet):
self.op_code = op_code self.op_code = op_code
self.parameters = parameters self.parameters = parameters
def to_bytes(self): def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return ( return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters)) struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
+ parameters + parameters
) )
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
result = color(self.name, 'green') result = color(self.name, 'green')
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
@@ -4302,6 +4319,61 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
''' '''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
('periodic_advertising_interval_min', 2),
('periodic_advertising_interval_max', 2),
('periodic_advertising_properties', 2),
]
)
class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command
'''
class Properties(enum.IntFlag):
INCLUDE_TX_POWER = 1 << 6
advertising_handle: int
periodic_advertising_interval_min: int
periodic_advertising_interval_max: int
periodic_advertising_properties: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
(
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
).name,
},
),
(
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
)
class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command
'''
advertising_handle: int
operation: int
advertising_data: bytes
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command([('enable', 1), ('advertising_handle', 1)]) @HCI_Command.command([('enable', 1), ('advertising_handle', 1)])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command): class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):
@@ -5106,13 +5178,10 @@ class HCI_Event(HCI_Packet):
self.event_code = event_code self.event_code = event_code
self.parameters = parameters self.parameters = parameters
def to_bytes(self): def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
result = color(self.name, 'magenta') result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None): if fields := getattr(self, 'fields', None):
@@ -6663,7 +6732,7 @@ class HCI_AclDataPacket(HCI_Packet):
connection_handle, pb_flag, bc_flag, data_total_length, data connection_handle, pb_flag, bc_flag, data_total_length, data
) )
def to_bytes(self): def __bytes__(self):
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return ( return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length) struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
@@ -6677,9 +6746,6 @@ class HCI_AclDataPacket(HCI_Packet):
self.data_total_length = data_total_length self.data_total_length = data_total_length
self.data = data self.data = data
def __bytes__(self):
return self.to_bytes()
def __str__(self): def __str__(self):
return ( return (
f'{color("ACL", "blue")}: ' f'{color("ACL", "blue")}: '
@@ -6713,7 +6779,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
connection_handle, packet_status, data_total_length, data connection_handle, packet_status, data_total_length, data
) )
def to_bytes(self) -> bytes: def __bytes__(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle h = (self.packet_status << 12) | self.connection_handle
return ( return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length) struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
@@ -6732,9 +6798,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
self.data_total_length = data_total_length self.data_total_length = data_total_length
self.data = data self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str: def __str__(self) -> str:
return ( return (
f'{color("SCO", "blue")}: ' f'{color("SCO", "blue")}: '
@@ -6807,9 +6870,6 @@ class HCI_IsoDataPacket(HCI_Packet):
) )
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH' fmt = '<BHH'
args = [ args = [
HCI_ISO_DATA_PACKET, HCI_ISO_DATA_PACKET,

View File

@@ -141,7 +141,7 @@ class HfFeature(enum.IntFlag):
""" """
HF supported features (AT+BRSF=) (normative). HF supported features (AT+BRSF=) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
""" """
EC_NR = 0x001 # Echo Cancel & Noise reduction EC_NR = 0x001 # Echo Cancel & Noise reduction
@@ -155,14 +155,14 @@ class HfFeature(enum.IntFlag):
HF_INDICATORS = 0x100 HF_INDICATORS = 0x100
ESCO_S4_SETTINGS_SUPPORTED = 0x200 ESCO_S4_SETTINGS_SUPPORTED = 0x200
ENHANCED_VOICE_RECOGNITION_STATUS = 0x400 ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
VOICE_RECOGNITION_TEST = 0x800 VOICE_RECOGNITION_TEXT = 0x800
class AgFeature(enum.IntFlag): class AgFeature(enum.IntFlag):
""" """
AG supported features (+BRSF:) (normative). AG supported features (+BRSF:) (normative).
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007. Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
""" """
THREE_WAY_CALLING = 0x001 THREE_WAY_CALLING = 0x001
@@ -178,7 +178,7 @@ class AgFeature(enum.IntFlag):
HF_INDICATORS = 0x400 HF_INDICATORS = 0x400
ESCO_S4_SETTINGS_SUPPORTED = 0x800 ESCO_S4_SETTINGS_SUPPORTED = 0x800
ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000 ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
VOICE_RECOGNITION_TEST = 0x2000 VOICE_RECOGNITION_TEXT = 0x2000
class AudioCodec(enum.IntEnum): class AudioCodec(enum.IntEnum):
@@ -1390,6 +1390,7 @@ class AgProtocol(pyee.EventEmitter):
def _on_bac(self, *args) -> None: def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args] self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.emit('supported_audio_codecs', self.supported_audio_codecs)
self.send_ok() self.send_ok()
def _on_bcs(self, codec: bytes) -> None: def _on_bcs(self, codec: bytes) -> None:
@@ -1618,7 +1619,7 @@ class ProfileVersion(enum.IntEnum):
""" """
Profile version (normative). Profile version (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. Hands-Free Profile v1.8, 6.3 SDP Interoperability Requirements.
""" """
V1_5 = 0x0105 V1_5 = 0x0105
@@ -1632,7 +1633,7 @@ class HfSdpFeature(enum.IntFlag):
""" """
HF supported features (normative). HF supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
""" """
EC_NR = 0x01 # Echo Cancel & Noise reduction EC_NR = 0x01 # Echo Cancel & Noise reduction
@@ -1640,16 +1641,17 @@ class HfSdpFeature(enum.IntFlag):
CLI_PRESENTATION_CAPABILITY = 0x04 CLI_PRESENTATION_CAPABILITY = 0x04
VOICE_RECOGNITION_ACTIVATION = 0x08 VOICE_RECOGNITION_ACTIVATION = 0x08
REMOTE_VOLUME_CONTROL = 0x10 REMOTE_VOLUME_CONTROL = 0x10
WIDE_BAND = 0x20 # Wide band speech WIDE_BAND_SPEECH = 0x20
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEST = 0x80 VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND = 0x100
class AgSdpFeature(enum.IntFlag): class AgSdpFeature(enum.IntFlag):
""" """
AG supported features (normative). AG supported features (normative).
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements. Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
""" """
THREE_WAY_CALLING = 0x01 THREE_WAY_CALLING = 0x01
@@ -1657,9 +1659,10 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_FUNCTION = 0x04 VOICE_RECOGNITION_FUNCTION = 0x04
IN_BAND_RING_TONE_CAPABILITY = 0x08 IN_BAND_RING_TONE_CAPABILITY = 0x08
VOICE_TAG = 0x10 # Attach a number to voice tag VOICE_TAG = 0x10 # Attach a number to voice tag
WIDE_BAND = 0x20 # Wide band speech WIDE_BAND_SPEECH = 0x20
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEST = 0x80 VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND_SPEED_SPEECH = 0x100
def make_hf_sdp_records( def make_hf_sdp_records(
@@ -1692,11 +1695,11 @@ def make_hf_sdp_records(
in configuration.supported_hf_features in configuration.supported_hf_features
): ):
hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features: if HfFeature.VOICE_RECOGNITION_TEXT in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEXT
if AudioCodec.MSBC in configuration.supported_audio_codecs: if AudioCodec.MSBC in configuration.supported_audio_codecs:
hf_supported_features |= HfSdpFeature.WIDE_BAND hf_supported_features |= HfSdpFeature.WIDE_BAND_SPEECH
return [ return [
sdp.ServiceAttribute( sdp.ServiceAttribute(
@@ -1772,14 +1775,14 @@ def make_ag_sdp_records(
in configuration.supported_ag_features in configuration.supported_ag_features
): ):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features: if AgFeature.VOICE_RECOGNITION_TEXT in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEXT
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features: if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features: if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs: if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND ag_supported_features |= AgSdpFeature.WIDE_BAND_SPEECH
return [ return [
sdp.ServiceAttribute( sdp.ServiceAttribute(

View File

@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False, check_address_type: bool = False,
) -> Optional[Connection]: ) -> Optional[Connection]:
for connection in self.connections.values(): for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes(): if bytes(connection.peer_address) == bytes(bd_addr):
if ( if (
check_address_type check_address_type
and connection.peer_address.address_type != bd_addr.address_type and connection.peer_address.address_type != bd_addr.address_type

View File

@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload) return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def to_bytes(self) -> bytes: def __bytes__(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid) header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload return header + self.payload
@@ -233,9 +233,6 @@ class L2CAP_PDU:
self.cid = cid self.cid = cid
self.payload = payload self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str: def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}' return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -333,11 +330,8 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset): def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return self.to_bytes() return self.pdu
def __str__(self) -> str: def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]' result = f'{color(self.name, "yellow")} [ID={self.identifier}]'

View File

@@ -39,7 +39,6 @@ from bumble.device import (
AdvertisingEventProperties, AdvertisingEventProperties,
AdvertisingType, AdvertisingType,
Device, Device,
Phy,
) )
from bumble.gatt import Service from bumble.gatt import Service
from bumble.hci import ( from bumble.hci import (
@@ -47,6 +46,7 @@ from bumble.hci import (
HCI_PAGE_TIMEOUT_ERROR, HCI_PAGE_TIMEOUT_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address, Address,
Phy,
) )
from google.protobuf import any_pb2 # pytype: disable=pyi-error from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error from google.protobuf import empty_pb2 # pytype: disable=pyi-error

View File

@@ -95,7 +95,7 @@ class AudioInputStatus(OpenIntEnum):
Cf. 3.4 Audio Input Status Cf. 3.4 Audio Input Status
''' '''
INATIVE = 0x00 INACTIVE = 0x00
ACTIVE = 0x01 ACTIVE = 0x01
@@ -104,7 +104,7 @@ class AudioInputControlPointOpCode(OpenIntEnum):
Cf. 3.5.1 Audio Input Control Point procedure requirements Cf. 3.5.1 Audio Input Control Point procedure requirements
''' '''
SET_GAIN_SETTING = 0x00 SET_GAIN_SETTING = 0x01
UNMUTE = 0x02 UNMUTE = 0x02
MUTE = 0x03 MUTE = 0x03
SET_MANUAL_GAIN_MODE = 0x04 SET_MANUAL_GAIN_MODE = 0x04
@@ -239,7 +239,7 @@ class AudioInputControlPoint:
or gain_settings_operand or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum > self.gain_settings_properties.gain_settings_maximum
): ):
logger.error("gain_seetings value out of range") logger.error("gain_settings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE) raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
if self.audio_input_state.gain_settings != gain_settings_operand: if self.audio_input_state.gain_settings != gain_settings_operand:

View File

@@ -102,6 +102,7 @@ class ContextType(enum.IntFlag):
# fmt: off # fmt: off
PROHIBITED = 0x0000 PROHIBITED = 0x0000
UNSPECIFIED = 0x0001
CONVERSATIONAL = 0x0002 CONVERSATIONAL = 0x0002
MEDIA = 0x0004 MEDIA = 0x0004
GAME = 0x0008 GAME = 0x0008
@@ -264,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack( struct.pack(
'<2sBIB', '<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(), bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
self.announcement_type, self.announcement_type,
self.available_audio_contexts, self.available_audio_contexts,
len(self.metadata), len(self.metadata),
@@ -397,18 +398,21 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04 OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05 CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int codec_frames_per_sdu: int | None = None
@classmethod @classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration: def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0 offset = 0
# Allowed default values. sampling_frequency: SamplingFrequency | None = None
audio_channel_allocation = AudioLocation.NOT_ALLOWED frame_duration: FrameDuration | None = None
codec_frames_per_sdu = 1 audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
while offset < len(data): while offset < len(data):
length, type = struct.unpack_from('BB', data, offset) length, type = struct.unpack_from('BB', data, offset)
offset += 2 offset += 2
@@ -426,8 +430,6 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU: elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration( return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency, sampling_frequency=sampling_frequency,
frame_duration=frame_duration, frame_duration=frame_duration,
@@ -437,23 +439,43 @@ class CodecSpecificConfiguration:
) )
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
return struct.pack( return b''.join(
'<BBBBBBBBIBBHBBB', [
2, struct.pack(fmt, length, tag, value)
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY, for fmt, length, tag, value in [
self.sampling_frequency, (
2, '<BBB',
CodecSpecificConfiguration.Type.FRAME_DURATION, 2,
self.frame_duration, CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
5, self.sampling_frequency,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION, ),
self.audio_channel_allocation, (
3, '<BBB',
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME, 2,
self.octets_per_codec_frame, CodecSpecificConfiguration.Type.FRAME_DURATION,
2, self.frame_duration,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU, ),
self.codec_frames_per_sdu, (
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
) )
@@ -465,6 +487,24 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self: def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little')) return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
@dataclasses.dataclass @dataclasses.dataclass
class BasicAudioAnnouncement: class BasicAudioAnnouncement:
@@ -473,26 +513,37 @@ class BasicAudioAnnouncement:
index: int index: int
codec_specific_configuration: CodecSpecificConfiguration codec_specific_configuration: CodecSpecificConfiguration
@dataclasses.dataclass def __bytes__(self) -> bytes:
class CodecInfo: codec_specific_configuration_bytes = bytes(
coding_format: hci.CodecID self.codec_specific_configuration
company_id: int )
vendor_specific_codec_id: int return (
bytes([self.index, len(codec_specific_configuration_bytes)])
@classmethod + codec_specific_configuration_bytes
def from_bytes(cls, data: bytes) -> Self: )
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
@dataclasses.dataclass @dataclasses.dataclass
class Subgroup: class Subgroup:
codec_id: BasicAudioAnnouncement.CodecInfo codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS] bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
presentation_delay: int presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup] subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -504,7 +555,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]): for _ in range(data[3]):
num_bis = data[offset] num_bis = data[offset]
offset += 1 offset += 1
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5]) codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
offset += 5 offset += 5
codec_specific_configuration_length = data[offset] codec_specific_configuration_length = data[offset]
offset += 1 offset += 1
@@ -548,3 +599,25 @@ class BasicAudioAnnouncement:
) )
return cls(presentation_delay, subgroups) return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)

View File

@@ -344,9 +344,6 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica ] # Keep a copy so we can re-serialize to an exact replica
return result return result
def to_bytes(self):
return bytes(self)
def __bytes__(self): def __bytes__(self):
# Return early if we have a cache # Return early if we have a cache
if self.bytes: if self.bytes:
@@ -623,11 +620,8 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset): def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.pdu
def __str__(self): def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]' result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'

View File

@@ -298,11 +298,8 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None: def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self): def __bytes__(self):
return self.to_bytes() return self.pdu
def __str__(self): def __str__(self):
result = color(self.name, 'yellow') result = color(self.name, 'yellow')
@@ -1839,7 +1836,7 @@ class Session:
if self.is_initiator: if self.is_initiator:
if self.pairing_method == PairingMethod.OOB: if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command() self.send_pairing_random_command()
else: elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command() self.send_pairing_confirm_command()
else: else:
if self.pairing_method == PairingMethod.PASSKEY: if self.pairing_method == PairingMethod.PASSKEY:
@@ -1949,7 +1946,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}' f'{connection.peer_address}: {command}'
) )
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, command.to_bytes()) connection.send_l2cap_pdu(cid, bytes(command))
def on_smp_security_request_command( def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command self, connection: Connection, request: SMP_Security_Request_Command

View File

@@ -370,11 +370,13 @@ class PumpedPacketSource(ParserSource):
self.parser.feed_data(packet) self.parser.feed_data(packet)
except asyncio.CancelledError: except asyncio.CancelledError:
logger.debug('source pump task done') logger.debug('source pump task done')
self.terminated.set_result(None) if not self.terminated.done():
self.terminated.set_result(None)
break break
except Exception as error: except Exception as error:
logger.warning(f'exception while waiting for packet: {error}') logger.warning(f'exception while waiting for packet: {error}')
self.terminated.set_exception(error) if not self.terminated.done():
self.terminated.set_exception(error)
break break
self.pump_task = asyncio.create_task(pump_packets()) self.pump_task = asyncio.create_task(pump_packets())

View File

@@ -21,9 +21,9 @@ import sys
import os import os
import io import io
import logging import logging
import websockets from typing import Iterable, Optional
from typing import Optional import websockets
import bumble.core import bumble.core
from bumble.device import Device, ScoLink from bumble.device import Device, ScoLink
@@ -82,6 +82,10 @@ def on_microphone_volume(level: int):
send_message(type='microphone_volume', level=level) send_message(type='microphone_volume', level=level)
def on_supported_audio_codecs(codecs: Iterable[hfp.AudioCodec]):
send_message(type='supported_audio_codecs', codecs=[codec.name for codec in codecs])
def on_sco_state_change(codec: int): def on_sco_state_change(codec: int):
if codec == hfp.AudioCodec.CVSD: if codec == hfp.AudioCodec.CVSD:
sample_rate = 8000 sample_rate = 8000
@@ -207,6 +211,7 @@ async def main() -> None:
ag_protocol = hfp.AgProtocol(dlc, configuration) ag_protocol = hfp.AgProtocol(dlc, configuration)
ag_protocol.on('speaker_volume', on_speaker_volume) ag_protocol.on('speaker_volume', on_speaker_volume)
ag_protocol.on('microphone_volume', on_microphone_volume) ag_protocol.on('microphone_volume', on_microphone_volume)
ag_protocol.on('supported_audio_codecs', on_supported_audio_codecs)
on_hfp_state_change(True) on_hfp_state_change(True)
dlc.multiplexer.l2cap_channel.on( dlc.multiplexer.l2cap_channel.on(
'close', lambda: on_hfp_state_change(False) 'close', lambda: on_hfp_state_change(False)
@@ -241,7 +246,7 @@ async def main() -> None:
# Pick the first one # Pick the first one
channel, version, hf_sdp_features = hfp_record channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}') print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features}') print(f'HF features: {hf_sdp_features.name}')
# Request authentication # Request authentication
print('*** Authenticating...') print('*** Authenticating...')

View File

@@ -161,7 +161,13 @@ async def main() -> None:
else: else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb') file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration) if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
# Write a LC3 header. # Write a LC3 header.
file_output.write( file_output.write(
bytes([0x1C, 0xCC]) # Header. bytes([0x1C, 0xCC]) # Header.

View File

@@ -80,7 +80,7 @@ impl Address {
/// Creates a new [Address] object. /// Creates a new [Address] object.
pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> { pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> {
Python::with_gil(|py| { Python::with_gil(|py| {
PyModule::import(py, intern!(py, "bumble.device"))? PyModule::import(py, intern!(py, "bumble.hci"))?
.getattr(intern!(py, "Address"))? .getattr(intern!(py, "Address"))?
.call1((address, address_type)) .call1((address, address_type))
.map(|any| Self(any.into())) .map(|any| Self(any.into()))

View File

@@ -51,7 +51,7 @@ install_requires =
pyserial-asyncio >= 0.5; platform_system!='Emscripten' pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten' pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten' pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 12.0; platform_system!='Emscripten' websockets == 13.1; platform_system!='Emscripten'
[options.entry_points] [options.entry_points]
console_scripts = console_scripts =

View File

@@ -39,6 +39,8 @@ from bumble.profiles.ascs import (
) )
from bumble.profiles.bap import ( from bumble.profiles.bap import (
AudioLocation, AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration, SupportedFrameDuration,
SupportedSamplingFrequency, SupportedSamplingFrequency,
SamplingFrequency, SamplingFrequency,
@@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_pacs(): async def test_pacs():

View File

@@ -19,9 +19,7 @@ import asyncio
import functools import functools
import logging import logging
import os import os
from types import LambdaType
import pytest import pytest
from unittest import mock
from bumble.core import ( from bumble.core import (
BT_BR_EDR_TRANSPORT, BT_BR_EDR_TRANSPORT,
@@ -29,7 +27,13 @@ from bumble.core import (
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
ConnectionParameters, ConnectionParameters,
) )
from bumble.device import AdvertisingParameters, Connection, Device from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import AclPacketQueue, Host from bumble.host import AclPacketQueue, Host
from bumble.hci import ( from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND, HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -265,7 +269,8 @@ async def test_flush():
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_legacy_advertising(): async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host)) device = TwoDevices()[0]
await device.power_on()
# Start advertising # Start advertising
await device.start_advertising() await device.start_advertising()
@@ -283,7 +288,10 @@ async def test_legacy_advertising():
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart): async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host)) devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart) await device.start_advertising(auto_restart=auto_restart)
device.on_connection( device.on_connection(
@@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier() await async_barrier()
if auto_restart: if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising assert device.is_advertising
else: else:
assert not device.is_advertising assert not device.is_advertising
@@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising(): async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host)) device = TwoDevices()[0]
await device.power_on()
# Start advertising # Start advertising
advertising_set = await device.create_advertising_set() advertising_set = await device.create_advertising_set()
@@ -332,7 +346,8 @@ async def test_extended_advertising():
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type): async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) device = TwoDevices()[0]
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5') peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type):
) )
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type): async def test_extended_advertising_connection_out_of_order(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host)) devices = TwoDevices()
peer_address = Address('F0:F1:F2:F3:F4:F5') device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
advertising_set = await device.create_advertising_set( advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type) advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
) )
@@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection( device.on_connection(
0x0001, 0x0001,
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
peer_address, Address('F0:F1:F2:F3:F4:F5'),
None, None,
None, None,
BT_PERIPHERAL_ROLE, BT_PERIPHERAL_ROLE,
@@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier() await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_get_remote_le_features(): async def test_get_remote_le_features():

View File

@@ -57,7 +57,7 @@ from .test_utils import async_barrier
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def basic_check(x): def basic_check(x):
pdu = x.to_bytes() pdu = bytes(x)
parsed = ATT_PDU.from_bytes(pdu) parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x) x_str = str(x)
parsed_str = str(parsed) parsed_str = str(parsed)
@@ -74,7 +74,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u)) v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes()) w = UUID.from_bytes(bytes(v))
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6' assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234) u1 = UUID.from_16_bits(0x1234)
@@ -851,7 +851,12 @@ async def test_unsubscribe():
await async_barrier() await async_barrier()
mock1.assert_called_once_with(ANY, True, False) mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe() assert len(server.gatt_server.subscribers) == 1
def callback(_):
pass
await c2.subscribe(callback)
await async_barrier() await async_barrier()
mock2.assert_called_once_with(ANY, True, False) mock2.assert_called_once_with(ANY, True, False)
@@ -861,10 +866,16 @@ async def test_unsubscribe():
mock1.assert_called_once_with(ANY, False, False) mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock() mock2.reset_mock()
await c2.unsubscribe() await c2.unsubscribe(callback)
await async_barrier() await async_barrier()
mock2.assert_called_once_with(ANY, False, False) mock2.assert_called_once_with(ANY, False, False)
# All CCCDs should be zeros now
assert list(server.gatt_server.subscribers.values())[0] == {
c1.handle: bytes([0, 0]),
c2.handle: bytes([0, 0]),
}
mock1.reset_mock() mock1.reset_mock()
await c1.unsubscribe() await c1.unsubscribe()
await async_barrier() await async_barrier()

View File

@@ -75,13 +75,13 @@ from bumble.hci import (
def basic_check(x): def basic_check(x):
packet = x.to_bytes() packet = bytes(x)
print(packet.hex()) print(packet.hex())
parsed = HCI_Packet.from_bytes(packet) parsed = HCI_Packet.from_bytes(packet)
x_str = str(x) x_str = str(x)
parsed_str = str(parsed) parsed_str = str(parsed)
print(x_str) print(x_str)
parsed_bytes = parsed.to_bytes() parsed_bytes = bytes(parsed)
assert x_str == parsed_str assert x_str == parsed_str
assert packet == parsed_bytes assert packet == parsed_bytes
@@ -188,7 +188,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]), return_parameters=bytes([7]),
) )
basic_check(event) basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes()) event = HCI_Packet.from_bytes(bytes(event))
assert event.return_parameters == 7 assert event.return_parameters == 7
# With a simple status as an integer status # With a simple status as an integer status
@@ -562,7 +562,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2' '6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
) )
assert packet.to_bytes() == data assert bytes(packet) == data
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -61,7 +61,7 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature: def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return ( return (
hfp.HfSdpFeature.WIDE_BAND hfp.HfSdpFeature.WIDE_BAND_SPEECH
| hfp.HfSdpFeature.THREE_WAY_CALLING | hfp.HfSdpFeature.THREE_WAY_CALLING
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
) )
@@ -108,7 +108,7 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature: def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return ( return (
hfp.AgSdpFeature.WIDE_BAND hfp.AgSdpFeature.WIDE_BAND_SPEECH
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
| hfp.AgSdpFeature.THREE_WAY_CALLING | hfp.AgSdpFeature.THREE_WAY_CALLING
) )

View File

@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0]) result = await peer.discover_included_services(result[0])
assert len(result) == 2 assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID # Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes() assert bytes(result[1].uuid) == bytes(s3.uuid)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------