Compare commits

...

23 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
9d3d5495ce only use __bytes__ when not argument is needed. 2024-11-23 15:56:14 -08:00
Gilles Boccon-Gibod
b57096abe2 Merge pull request #595 from wpiet/aics-opcode-fix
Amend Opcode value in `Audio Input Control Service`
2024-11-23 08:56:23 -08:00
Wojciech Pietraszewski
100bea6b41 Fix typos
Amends the typo in the `INACTIVE` field in `Audio Input Status` characteristic.
Amends the typo in the log message of `_set_gain_settings` method.
2024-11-21 18:29:44 +01:00
Wojciech Pietraszewski
63819bf9dd Amend Opcode value in Audio Input Control Service
Corrects the Audio Input Control Point
Opcode value for `Set Gain Setting` field.
2024-11-21 16:40:49 +01:00
zxzxwu
e3fdab4175 Merge pull request #593 from zxzxwu/periodic
Support Periodic Advertising
2024-11-19 17:22:37 +08:00
Josh Wu
bbcd14dbf0 Support Periodic Advertising 2024-11-19 16:27:13 +08:00
zxzxwu
01dc0d574b Merge pull request #590 from SergeantSerk/parse-scan-response-data
Correctly parse scan response from device config
2024-11-17 15:39:11 +08:00
zxzxwu
5e959d638e Merge pull request #591 from zxzxwu/auracast_scan
Improve Broadcast Scanning
2024-11-16 04:10:27 +08:00
Josh Wu
c88b32a406 Improve Broadcast Scanning 2024-11-16 02:02:28 +08:00
zxzxwu
5a72eefb89 Merge pull request #587 from zxzxwu/device
Replace HCI member imports in device.py
2024-11-13 15:25:32 +08:00
Josh Wu
430046944b Replace HCI member import in device.py 2024-11-12 16:53:21 +08:00
zxzxwu
21d23320eb Merge pull request #584 from zxzxwu/commands6.0
Add Core Spec 6.0 new commands support mapping
2024-11-12 04:17:24 +00:00
Serkan
d0990ee04d Correctly parse scan response from device config
Parses scan response data correctly just like advertising data
2024-11-07 21:49:33 +03:00
Josh Wu
2d88e853e8 Add Core Spec 6.0 new commands support mapping 2024-11-07 14:36:54 +08:00
Gilles Boccon-Gibod
a060a70fba Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-04 13:03:57 -08:00
Gilles Boccon-Gibod
a06394ad4a Merge pull request #582 from google/gbg/580
fix #580
2024-11-04 13:03:15 -08:00
Gilles Boccon-Gibod
a1414c2b5b add unsubscribe test 2024-11-03 19:08:27 -08:00
Gilles Boccon-Gibod
b2864dac2d fix #580 2024-11-02 10:29:40 -07:00
Gilles Boccon-Gibod
b78f895143 Merge pull request #579 from jmdietrich-gcx/unsubscribe_characteristic_in_gatt_client
Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
2024-10-31 04:07:02 -07:00
zxzxwu
c4e9726828 Merge pull request #581 from zxzxwu/context
[BAP] Add missing Unspecified context type
2024-10-31 11:04:25 +00:00
Gilles Boccon-Gibod
d4b8e8348a Merge pull request #574 from google/gbg/update-python-versions
remove test for deprecated Python 3.8 and add 3.13
2024-10-31 03:44:01 -07:00
Josh Wu
19debaa52e [BAP] Add missing Unspecified context type 2024-10-31 18:11:40 +08:00
Jan-Marcel Dietrich
73fe564321 Remove characteristic in GATT Client unsubscribe() if it's the last subscriber
GATT Client's subscribe() adds the characteristic itself as subscriber.
Therefore the characteristic has to be removed in unsubscribe(), if it's
the last subscriber. Otherwise the clean up does not work correctly and
the CCCD never is set back to 0 in the remote device.
2024-10-30 07:34:22 +01:00
26 changed files with 933 additions and 653 deletions

View File

@@ -60,7 +60,7 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass
class Broadcast(pyee.EventEmitter):
name: str
name: str | None
sync: bumble.device.PeriodicAdvertisingSync
rssi: int = 0
public_broadcast_announcement: Optional[
@@ -135,7 +135,8 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address,
color(self.sync.state.name, 'green'),
)
print(f' {color("Name", "cyan")}: {self.name}')
if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -174,7 +175,7 @@ class BroadcastScanner(pyee.EventEmitter):
print(color(' Codec ID:', 'yellow'))
print(
color(' Coding Format: ', 'green'),
subgroup.codec_id.coding_format.name,
subgroup.codec_id.codec_id.name,
)
print(
color(' Company ID: ', 'green'),
@@ -274,13 +275,24 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if (
broadcast_name := advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
if not (
ads := advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
)
) is None:
) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
)
):
return
assert isinstance(broadcast_name, str)
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
assert isinstance(broadcast_name, str) or broadcast_name is None
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -291,7 +303,7 @@ class BroadcastScanner(pyee.EventEmitter):
)
async def on_new_broadcast(
self, name: str, advertisement: bumble.device.Advertisement
self, name: str | None, advertisement: bumble.device.Advertisement
) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address,
@@ -299,10 +311,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates,
)
broadcast = self.Broadcast(
name,
periodic_advertising_sync,
)
broadcast = self.Broadcast(name, periodic_advertising_sync)
broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))

View File

@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]),
)
# Return a packet with 'respond to sender' set to True
return (response.to_bytes(), True)
return (bytes(response), True)
return None

View File

@@ -486,7 +486,12 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
pcm = decode(
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
@@ -495,11 +500,17 @@ class Speaker:
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
@@ -514,10 +525,17 @@ class Speaker:
),
)
else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
if ase.role == ascs.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,

View File

@@ -291,9 +291,6 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property
def is_command(self):
return ((self.op_code >> 6) & 1) == 1
@@ -303,7 +300,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = color(self.name, 'yellow')

View File

@@ -314,7 +314,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}'
)
if self.host:
self.host.on_packet(packet.to_bytes())
self.host.on_packet(bytes(packet))
# This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self):
@@ -1192,7 +1192,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
self._public_address.to_bytes()
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
@@ -1543,6 +1543,41 @@ class Controller:
}
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1557,6 +1592,27 @@ class Controller:
'''
return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

File diff suppressed because it is too large Load Diff

View File

@@ -410,7 +410,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
'<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
)
super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes

View File

@@ -292,7 +292,7 @@ class Client:
logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
)
self.send_gatt_pdu(command.to_bytes())
self.send_gatt_pdu(bytes(command))
async def send_request(self, request: ATT_PDU):
logger.debug(
@@ -310,7 +310,7 @@ class Client:
self.pending_request = request
try:
self.send_gatt_pdu(request.to_bytes())
self.send_gatt_pdu(bytes(request))
response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT
)
@@ -328,7 +328,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}'
)
self.send_gatt_pdu(confirmation.to_bytes())
self.send_gatt_pdu(bytes(confirmation))
async def request_mtu(self, mtu: int) -> int:
# Check the range
@@ -898,6 +898,12 @@ class Client:
) and subscriber in subscribers:
subscribers.remove(subscriber)
# The characteristic itself is added as subscriber. If it is the
# last remaining subscriber, we remove it, such that the clean up
# works correctly. Otherwise the CCCD never is set back to 0.
if len(subscribers) == 1 and characteristic in subscribers:
subscribers.remove(characteristic)
# Cleanup if we removed the last one
if not subscribers:
del subscriber_set[characteristic.handle]

View File

@@ -353,7 +353,7 @@ class Server(EventEmitter):
logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}'
)
self.send_gatt_pdu(connection.handle, response.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(response))
async def notify_subscriber(
self,
@@ -450,7 +450,7 @@ class Server(EventEmitter):
)
try:
self.send_gatt_pdu(connection.handle, indication.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(indication))
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red'))

View File

@@ -915,6 +915,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3),
HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4),
HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1),
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+2),
HCI_LE_CS_WRITE_CACHED_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+3),
HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4),
HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5),
HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6),
@@ -940,6 +942,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3),
HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4),
HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5),
HCI_LE_CS_CREATE_CONFIG_COMMAND : 1 << (16*8+6),
HCI_LE_CS_REMOVE_CONFIG_COMMAND : 1 << (16*8+7),
HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0),
HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1),
HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2),
@@ -963,13 +967,20 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2),
HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3),
HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4),
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+5),
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+6),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES : 1 << (20*8+7),
HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2),
HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0),
HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1),
HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2),
HCI_LE_CS_TEST_COMMAND : 1 << (23*8+3),
HCI_LE_CS_TEST_END_COMMAND : 1 << (23*8+4),
HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0),
HCI_LE_CS_SECURITY_ENABLE_COMMAND : 1 << (24*8+1),
HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5),
HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6),
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND : 1 << (24*8+7),
HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0),
HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1),
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2),
@@ -1000,6 +1011,10 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4),
HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5),
HCI_LE_TEST_END_COMMAND : 1 << (28*8+6),
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND : 1 << (28*8+7),
HCI_LE_CS_SET_CHANNEL_CLASSIFICATION_COMMAND : 1 << (29*8+0),
HCI_LE_CS_SET_PROCEDURE_PARAMETERS_COMMAND : 1 << (29*8+1),
HCI_LE_CS_PROCEDURE_ENABLE_COMMAND : 1 << (29*8+2),
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3),
HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4),
HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5),
@@ -1136,11 +1151,21 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0),
HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1),
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2),
HCI_LE_SET_DECISION_DATA_COMMAND : 1 << (46*8+3),
HCI_LE_SET_DECISION_INSTRUCTIONS_COMMAND : 1 << (46*8+4),
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5),
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6),
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7),
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0),
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1),
HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (47*8+2),
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND : 1 << (47*8+3),
HCI_LE_SET_HOST_FEATURE_V2_COMMAND : 1 << (47*8+4),
HCI_LE_ADD_DEVICE_TO_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+5),
HCI_LE_REMOVE_DEVICE_FROM_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+6),
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
}
# LE Supported Features
@@ -1457,7 +1482,7 @@ class CodingFormat:
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
@@ -1467,14 +1492,15 @@ class CodingFormat:
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
@classmethod
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def __bytes__(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@@ -1691,7 +1717,7 @@ class HCI_Object:
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes'
field_value, '__bytes__'
):
field_bytes = bytes(field_value)
if isinstance(field_type, int) and 4 < field_type <= 256:
@@ -1736,7 +1762,7 @@ class HCI_Object:
def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def to_bytes(self):
def __bytes__(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@staticmethod
@@ -1831,9 +1857,6 @@ class HCI_Object:
for field_name, field_value in field_strings
)
def __bytes__(self):
return self.to_bytes()
def __init__(self, fields, **kwargs):
self.fields = fields
self.init_from_fields(self, fields, kwargs)
@@ -2008,9 +2031,6 @@ class Address:
def is_static(self):
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_bytes(self):
return self.address_bytes
def to_string(self, with_type_qualifier=True):
'''
String representation of the address, MSB first, with an optional type
@@ -2022,7 +2042,7 @@ class Address:
return result + '/P'
def __bytes__(self):
return self.to_bytes()
return self.address_bytes
def __hash__(self):
return hash(self.address_bytes)
@@ -2228,16 +2248,13 @@ class HCI_Command(HCI_Packet):
self.op_code = op_code
self.parameters = parameters
def to_bytes(self):
def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters
return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
+ parameters
)
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'green')
if fields := getattr(self, 'fields', None):
@@ -4302,6 +4319,61 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
('periodic_advertising_interval_min', 2),
('periodic_advertising_interval_max', 2),
('periodic_advertising_properties', 2),
]
)
class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command
'''
class Properties(enum.IntFlag):
INCLUDE_TX_POWER = 1 << 6
advertising_handle: int
periodic_advertising_interval_min: int
periodic_advertising_interval_max: int
periodic_advertising_properties: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
(
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
).name,
},
),
(
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
)
class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command
'''
advertising_handle: int
operation: int
advertising_data: bytes
# -----------------------------------------------------------------------------
@HCI_Command.command([('enable', 1), ('advertising_handle', 1)])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):
@@ -5106,13 +5178,10 @@ class HCI_Event(HCI_Packet):
self.event_code = event_code
self.parameters = parameters
def to_bytes(self):
def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None):
@@ -6663,7 +6732,7 @@ class HCI_AclDataPacket(HCI_Packet):
connection_handle, pb_flag, bc_flag, data_total_length, data
)
def to_bytes(self):
def __bytes__(self):
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
@@ -6677,9 +6746,6 @@ class HCI_AclDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self):
return self.to_bytes()
def __str__(self):
return (
f'{color("ACL", "blue")}: '
@@ -6713,7 +6779,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
connection_handle, packet_status, data_total_length, data
)
def to_bytes(self) -> bytes:
def __bytes__(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle
return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
@@ -6732,9 +6798,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -6807,9 +6870,6 @@ class HCI_IsoDataPacket(HCI_Packet):
)
def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH'
args = [
HCI_ISO_DATA_PACKET,

View File

@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if bytes(connection.peer_address) == bytes(bd_addr):
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type

View File

@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def to_bytes(self) -> bytes:
def __bytes__(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
@@ -233,9 +233,6 @@ class L2CAP_PDU:
self.cid = cid
self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -333,11 +330,8 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes:
return self.to_bytes()
return self.pdu
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'

View File

@@ -39,7 +39,6 @@ from bumble.device import (
AdvertisingEventProperties,
AdvertisingType,
Device,
Phy,
)
from bumble.gatt import Service
from bumble.hci import (
@@ -47,6 +46,7 @@ from bumble.hci import (
HCI_PAGE_TIMEOUT_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address,
Phy,
)
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error

View File

@@ -95,7 +95,7 @@ class AudioInputStatus(OpenIntEnum):
Cf. 3.4 Audio Input Status
'''
INATIVE = 0x00
INACTIVE = 0x00
ACTIVE = 0x01
@@ -104,7 +104,7 @@ class AudioInputControlPointOpCode(OpenIntEnum):
Cf. 3.5.1 Audio Input Control Point procedure requirements
'''
SET_GAIN_SETTING = 0x00
SET_GAIN_SETTING = 0x01
UNMUTE = 0x02
MUTE = 0x03
SET_MANUAL_GAIN_MODE = 0x04
@@ -239,7 +239,7 @@ class AudioInputControlPoint:
or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum
):
logger.error("gain_seetings value out of range")
logger.error("gain_settings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
if self.audio_input_state.gain_settings != gain_settings_operand:

View File

@@ -102,6 +102,7 @@ class ContextType(enum.IntFlag):
# fmt: off
PROHIBITED = 0x0000
UNSPECIFIED = 0x0001
CONVERSATIONAL = 0x0002
MEDIA = 0x0004
GAME = 0x0008
@@ -264,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
@@ -397,18 +398,21 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency
frame_duration: FrameDuration
audio_channel_allocation: AudioLocation
octets_per_codec_frame: int
codec_frames_per_sdu: int
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0
# Allowed default values.
audio_channel_allocation = AudioLocation.NOT_ALLOWED
codec_frames_per_sdu = 1
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
@@ -426,8 +430,6 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,
@@ -437,23 +439,43 @@ class CodecSpecificConfiguration:
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBBBBBBBIBBHBBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
return b''.join(
[
struct.pack(fmt, length, tag, value)
for fmt, length, tag, value in [
(
'<BBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
),
(
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
)
@@ -465,6 +487,24 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
@dataclasses.dataclass
class BasicAudioAnnouncement:
@@ -473,26 +513,37 @@ class BasicAudioAnnouncement:
index: int
codec_specific_configuration: CodecSpecificConfiguration
@dataclasses.dataclass
class CodecInfo:
coding_format: hci.CodecID
company_id: int
vendor_specific_codec_id: int
@classmethod
def from_bytes(cls, data: bytes) -> Self:
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
def __bytes__(self) -> bytes:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([self.index, len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
)
@dataclasses.dataclass
class Subgroup:
codec_id: BasicAudioAnnouncement.CodecInfo
codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -504,7 +555,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]):
num_bis = data[offset]
offset += 1
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
offset += 5
codec_specific_configuration_length = data[offset]
offset += 1
@@ -548,3 +599,25 @@ class BasicAudioAnnouncement:
)
return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)

View File

@@ -344,9 +344,6 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica
return result
def to_bytes(self):
return bytes(self)
def __bytes__(self):
# Return early if we have a cache
if self.bytes:
@@ -623,11 +620,8 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'

View File

@@ -298,11 +298,8 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = color(self.name, 'yellow')
@@ -1839,7 +1836,7 @@ class Session:
if self.is_initiator:
if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command()
else:
elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:
@@ -1949,7 +1946,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}'
)
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, command.to_bytes())
connection.send_l2cap_pdu(cid, bytes(command))
def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command

View File

@@ -370,11 +370,13 @@ class PumpedPacketSource(ParserSource):
self.parser.feed_data(packet)
except asyncio.CancelledError:
logger.debug('source pump task done')
self.terminated.set_result(None)
if not self.terminated.done():
self.terminated.set_result(None)
break
except Exception as error:
logger.warning(f'exception while waiting for packet: {error}')
self.terminated.set_exception(error)
if not self.terminated.done():
self.terminated.set_exception(error)
break
self.pump_task = asyncio.create_task(pump_packets())

View File

@@ -161,7 +161,13 @@ async def main() -> None:
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration)
if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
# Write a LC3 header.
file_output.write(
bytes([0x1C, 0xCC]) # Header.

View File

@@ -80,7 +80,7 @@ impl Address {
/// Creates a new [Address] object.
pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> {
Python::with_gil(|py| {
PyModule::import(py, intern!(py, "bumble.device"))?
PyModule::import(py, intern!(py, "bumble.hci"))?
.getattr(intern!(py, "Address"))?
.call1((address, address_type))
.map(|any| Self(any.into()))

View File

@@ -51,7 +51,7 @@ install_requires =
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 12.0; platform_system!='Emscripten'
websockets == 13.1; platform_system!='Emscripten'
[options.entry_points]
console_scripts =

View File

@@ -39,6 +39,8 @@ from bumble.profiles.ascs import (
)
from bumble.profiles.bap import (
AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration,
SupportedSamplingFrequency,
SamplingFrequency,
@@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():

View File

@@ -19,9 +19,7 @@ import asyncio
import functools
import logging
import os
from types import LambdaType
import pytest
from unittest import mock
from bumble.core import (
BT_BR_EDR_TRANSPORT,
@@ -29,7 +27,13 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import AclPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -265,7 +269,8 @@ async def test_flush():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
device = TwoDevices()[0]
await device.power_on()
# Start advertising
await device.start_advertising()
@@ -283,7 +288,10 @@ async def test_legacy_advertising():
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
@@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier()
if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising
else:
assert not device.is_advertising
@@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set()
@@ -332,7 +346,8 @@ async def test_extended_advertising():
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
device = TwoDevices()[0]
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type):
)
@pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
@@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
BT_PERIPHERAL_ROLE,
@@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():

View File

@@ -57,7 +57,7 @@ from .test_utils import async_barrier
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = x.to_bytes()
pdu = bytes(x)
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
@@ -74,7 +74,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes())
w = UUID.from_bytes(bytes(v))
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)
@@ -851,7 +851,12 @@ async def test_unsubscribe():
await async_barrier()
mock1.assert_called_once_with(ANY, True, False)
await c2.subscribe()
assert len(server.gatt_server.subscribers) == 1
def callback(_):
pass
await c2.subscribe(callback)
await async_barrier()
mock2.assert_called_once_with(ANY, True, False)
@@ -861,10 +866,16 @@ async def test_unsubscribe():
mock1.assert_called_once_with(ANY, False, False)
mock2.reset_mock()
await c2.unsubscribe()
await c2.unsubscribe(callback)
await async_barrier()
mock2.assert_called_once_with(ANY, False, False)
# All CCCDs should be zeros now
assert list(server.gatt_server.subscribers.values())[0] == {
c1.handle: bytes([0, 0]),
c2.handle: bytes([0, 0]),
}
mock1.reset_mock()
await c1.unsubscribe()
await async_barrier()

View File

@@ -75,13 +75,13 @@ from bumble.hci import (
def basic_check(x):
packet = x.to_bytes()
packet = bytes(x)
print(packet.hex())
parsed = HCI_Packet.from_bytes(packet)
x_str = str(x)
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
parsed_bytes = bytes(parsed)
assert x_str == parsed_str
assert packet == parsed_bytes
@@ -188,7 +188,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]),
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
event = HCI_Packet.from_bytes(bytes(event))
assert event.return_parameters == 7
# With a simple status as an integer status
@@ -562,7 +562,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert packet.to_bytes() == data
assert bytes(packet) == data
# -----------------------------------------------------------------------------

View File

@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0])
assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes()
assert bytes(result[1].uuid) == bytes(s3.uuid)
# -----------------------------------------------------------------------------