Compare commits

...

17 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
9d3d5495ce only use __bytes__ when not argument is needed. 2024-11-23 15:56:14 -08:00
Gilles Boccon-Gibod
b57096abe2 Merge pull request #595 from wpiet/aics-opcode-fix
Amend Opcode value in `Audio Input Control Service`
2024-11-23 08:56:23 -08:00
Wojciech Pietraszewski
100bea6b41 Fix typos
Amends the typo in the `INACTIVE` field in `Audio Input Status` characteristic.
Amends the typo in the log message of `_set_gain_settings` method.
2024-11-21 18:29:44 +01:00
Wojciech Pietraszewski
63819bf9dd Amend Opcode value in Audio Input Control Service
Corrects the Audio Input Control Point
Opcode value for `Set Gain Setting` field.
2024-11-21 16:40:49 +01:00
zxzxwu
e3fdab4175 Merge pull request #593 from zxzxwu/periodic
Support Periodic Advertising
2024-11-19 17:22:37 +08:00
Josh Wu
bbcd14dbf0 Support Periodic Advertising 2024-11-19 16:27:13 +08:00
zxzxwu
01dc0d574b Merge pull request #590 from SergeantSerk/parse-scan-response-data
Correctly parse scan response from device config
2024-11-17 15:39:11 +08:00
zxzxwu
5e959d638e Merge pull request #591 from zxzxwu/auracast_scan
Improve Broadcast Scanning
2024-11-16 04:10:27 +08:00
Josh Wu
c88b32a406 Improve Broadcast Scanning 2024-11-16 02:02:28 +08:00
zxzxwu
5a72eefb89 Merge pull request #587 from zxzxwu/device
Replace HCI member imports in device.py
2024-11-13 15:25:32 +08:00
Josh Wu
430046944b Replace HCI member import in device.py 2024-11-12 16:53:21 +08:00
zxzxwu
21d23320eb Merge pull request #584 from zxzxwu/commands6.0
Add Core Spec 6.0 new commands support mapping
2024-11-12 04:17:24 +00:00
Serkan
d0990ee04d Correctly parse scan response from device config
Parses scan response data correctly just like advertising data
2024-11-07 21:49:33 +03:00
Josh Wu
2d88e853e8 Add Core Spec 6.0 new commands support mapping 2024-11-07 14:36:54 +08:00
Gilles Boccon-Gibod
a060a70fba Merge pull request #583 from google/gbg/more-gatt-tests
regression test for GATT unsubscription
2024-11-04 13:03:57 -08:00
Gilles Boccon-Gibod
a06394ad4a Merge pull request #582 from google/gbg/580
fix #580
2024-11-04 13:03:15 -08:00
Gilles Boccon-Gibod
b2864dac2d fix #580 2024-11-02 10:29:40 -07:00
26 changed files with 913 additions and 651 deletions

View File

@@ -60,7 +60,7 @@ AURACAST_DEFAULT_ATT_MTU = 256
class BroadcastScanner(pyee.EventEmitter):
@dataclasses.dataclass
class Broadcast(pyee.EventEmitter):
name: str
name: str | None
sync: bumble.device.PeriodicAdvertisingSync
rssi: int = 0
public_broadcast_announcement: Optional[
@@ -135,7 +135,8 @@ class BroadcastScanner(pyee.EventEmitter):
self.sync.advertiser_address,
color(self.sync.state.name, 'green'),
)
print(f' {color("Name", "cyan")}: {self.name}')
if self.name is not None:
print(f' {color("Name", "cyan")}: {self.name}')
if self.appearance:
print(f' {color("Appearance", "cyan")}: {str(self.appearance)}')
print(f' {color("RSSI", "cyan")}: {self.rssi}')
@@ -174,7 +175,7 @@ class BroadcastScanner(pyee.EventEmitter):
print(color(' Codec ID:', 'yellow'))
print(
color(' Coding Format: ', 'green'),
subgroup.codec_id.coding_format.name,
subgroup.codec_id.codec_id.name,
)
print(
color(' Company ID: ', 'green'),
@@ -274,13 +275,24 @@ class BroadcastScanner(pyee.EventEmitter):
await self.device.stop_scanning()
def on_advertisement(self, advertisement: bumble.device.Advertisement) -> None:
if (
broadcast_name := advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
if not (
ads := advertisement.data.get_all(
bumble.core.AdvertisingData.SERVICE_DATA_16_BIT_UUID
)
) is None:
) or not (
any(
ad
for ad in ads
if isinstance(ad, tuple)
and ad[0] == bumble.gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE
)
):
return
assert isinstance(broadcast_name, str)
broadcast_name = advertisement.data.get(
bumble.core.AdvertisingData.BROADCAST_NAME
)
assert isinstance(broadcast_name, str) or broadcast_name is None
if broadcast := self.broadcasts.get(advertisement.address):
broadcast.update(advertisement)
@@ -291,7 +303,7 @@ class BroadcastScanner(pyee.EventEmitter):
)
async def on_new_broadcast(
self, name: str, advertisement: bumble.device.Advertisement
self, name: str | None, advertisement: bumble.device.Advertisement
) -> None:
periodic_advertising_sync = await self.device.create_periodic_advertising_sync(
advertiser_address=advertisement.address,
@@ -299,10 +311,7 @@ class BroadcastScanner(pyee.EventEmitter):
sync_timeout=self.sync_timeout,
filter_duplicates=self.filter_duplicates,
)
broadcast = self.Broadcast(
name,
periodic_advertising_sync,
)
broadcast = self.Broadcast(name, periodic_advertising_sync)
broadcast.update(advertisement)
self.broadcasts[advertisement.address] = broadcast
periodic_advertising_sync.on('loss', lambda: self.on_broadcast_loss(broadcast))

View File

@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]),
)
# Return a packet with 'respond to sender' set to True
return (response.to_bytes(), True)
return (bytes(response), True)
return None

View File

@@ -486,7 +486,12 @@ class Speaker:
def on_pdu(pdu: HCI_IsoDataPacket, ase: ascs.AseStateMachine):
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
pcm = decode(
codec_config.frame_duration.us,
codec_config.audio_channel_allocation.channel_count,
@@ -495,11 +500,17 @@ class Speaker:
self.device.abort_on('disconnection', self.ui_server.send_audio(pcm))
def on_ase_state_change(ase: ascs.AseStateMachine) -> None:
codec_config = ase.codec_specific_configuration
if ase.state == ascs.AseStateMachine.State.STREAMING:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
assert ase.cis_link
if ase.role == ascs.AudioRole.SOURCE:
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or ase.cis_link is None
or codec_config.octets_per_codec_frame is None
or codec_config.frame_duration is None
or codec_config.codec_frames_per_sdu is None
):
return
ase.cis_link.abort_on(
'disconnection',
lc3_source_task(
@@ -514,10 +525,17 @@ class Speaker:
),
)
else:
if not ase.cis_link:
return
ase.cis_link.sink = functools.partial(on_pdu, ase=ase)
elif ase.state == ascs.AseStateMachine.State.CODEC_CONFIGURED:
codec_config = ase.codec_specific_configuration
assert isinstance(codec_config, bap.CodecSpecificConfiguration)
if (
not isinstance(codec_config, bap.CodecSpecificConfiguration)
or codec_config.sampling_frequency is None
or codec_config.frame_duration is None
or codec_config.audio_channel_allocation is None
):
return
if ase.role == ascs.AudioRole.SOURCE:
setup_encoders(
codec_config.sampling_frequency.hz,

View File

@@ -291,9 +291,6 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property
def is_command(self):
return ((self.op_code >> 6) & 1) == 1
@@ -303,7 +300,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = color(self.name, 'yellow')

View File

@@ -314,7 +314,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}'
)
if self.host:
self.host.on_packet(packet.to_bytes())
self.host.on_packet(bytes(packet))
# This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self):
@@ -1192,7 +1192,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
self._public_address.to_bytes()
bytes(self._public_address)
if self._public_address is not None
else bytes(6)
)
@@ -1543,6 +1543,41 @@ class Controller:
}
return bytes([HCI_SUCCESS])
def on_hci_le_set_advertising_set_random_address_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.52 LE Set Advertising Set Random Address
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.53 LE Set Extended Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS, 0])
def on_hci_le_set_extended_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.54 LE Set Extended Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_scan_response_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.55 LE Set Extended Scan Response Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_extended_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.56 LE Set Extended Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_maximum_advertising_data_length_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.57 LE Read Maximum Advertising Data
@@ -1557,6 +1592,27 @@ class Controller:
'''
return struct.pack('<BB', HCI_SUCCESS, 0xF0)
def on_hci_le_set_periodic_advertising_parameters_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.61 LE Set Periodic Advertising Parameters
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_data_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.62 LE Set Periodic Advertising Data
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_set_periodic_advertising_enable_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.63 LE Set Periodic Advertising Enable
Command
'''
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, _command):
'''
See Bluetooth spec Vol 4, Part E - 7.8.74 LE Read Transmit Power Command

File diff suppressed because it is too large Load Diff

View File

@@ -410,7 +410,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
'<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
)
super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes

View File

@@ -292,7 +292,7 @@ class Client:
logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
)
self.send_gatt_pdu(command.to_bytes())
self.send_gatt_pdu(bytes(command))
async def send_request(self, request: ATT_PDU):
logger.debug(
@@ -310,7 +310,7 @@ class Client:
self.pending_request = request
try:
self.send_gatt_pdu(request.to_bytes())
self.send_gatt_pdu(bytes(request))
response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT
)
@@ -328,7 +328,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}'
)
self.send_gatt_pdu(confirmation.to_bytes())
self.send_gatt_pdu(bytes(confirmation))
async def request_mtu(self, mtu: int) -> int:
# Check the range

View File

@@ -353,7 +353,7 @@ class Server(EventEmitter):
logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}'
)
self.send_gatt_pdu(connection.handle, response.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(response))
async def notify_subscriber(
self,
@@ -450,7 +450,7 @@ class Server(EventEmitter):
)
try:
self.send_gatt_pdu(connection.handle, indication.to_bytes())
self.send_gatt_pdu(connection.handle, bytes(indication))
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red'))

View File

@@ -915,6 +915,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_READ_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+3),
HCI_WRITE_CURRENT_IAC_LAP_COMMAND : 1 << (11*8+4),
HCI_SET_AFH_HOST_CHANNEL_CLASSIFICATION_COMMAND : 1 << (12*8+1),
HCI_LE_CS_READ_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+2),
HCI_LE_CS_WRITE_CACHED_REMOTE_FAE_TABLE_COMMAND : 1 << (12*8+3),
HCI_READ_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+4),
HCI_WRITE_INQUIRY_SCAN_TYPE_COMMAND : 1 << (12*8+5),
HCI_READ_INQUIRY_MODE_COMMAND : 1 << (12*8+6),
@@ -940,6 +942,8 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (16*8+3),
HCI_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+4),
HCI_REJECT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (16*8+5),
HCI_LE_CS_CREATE_CONFIG_COMMAND : 1 << (16*8+6),
HCI_LE_CS_REMOVE_CONFIG_COMMAND : 1 << (16*8+7),
HCI_READ_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+0),
HCI_WRITE_EXTENDED_INQUIRY_RESPONSE_COMMAND : 1 << (17*8+1),
HCI_REFRESH_ENCRYPTION_KEY_COMMAND : 1 << (17*8+2),
@@ -963,13 +967,20 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_SEND_KEYPRESS_NOTIFICATION_COMMAND : 1 << (20*8+2),
HCI_IO_CAPABILITY_REQUEST_NEGATIVE_REPLY_COMMAND : 1 << (20*8+3),
HCI_READ_ENCRYPTION_KEY_SIZE_COMMAND : 1 << (20*8+4),
HCI_LE_CS_READ_LOCAL_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+5),
HCI_LE_CS_READ_REMOTE_SUPPORTED_CAPABILITIES_COMMAND : 1 << (20*8+6),
HCI_LE_CS_WRITE_CACHED_REMOTE_SUPPORTED_CAPABILITIES : 1 << (20*8+7),
HCI_SET_EVENT_MASK_PAGE_2_COMMAND : 1 << (22*8+2),
HCI_READ_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+0),
HCI_WRITE_FLOW_CONTROL_MODE_COMMAND : 1 << (23*8+1),
HCI_READ_DATA_BLOCK_SIZE_COMMAND : 1 << (23*8+2),
HCI_LE_CS_TEST_COMMAND : 1 << (23*8+3),
HCI_LE_CS_TEST_END_COMMAND : 1 << (23*8+4),
HCI_READ_ENHANCED_TRANSMIT_POWER_LEVEL_COMMAND : 1 << (24*8+0),
HCI_LE_CS_SECURITY_ENABLE_COMMAND : 1 << (24*8+1),
HCI_READ_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+5),
HCI_WRITE_LE_HOST_SUPPORT_COMMAND : 1 << (24*8+6),
HCI_LE_CS_SET_DEFAULT_SETTINGS_COMMAND : 1 << (24*8+7),
HCI_LE_SET_EVENT_MASK_COMMAND : 1 << (25*8+0),
HCI_LE_READ_BUFFER_SIZE_COMMAND : 1 << (25*8+1),
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (25*8+2),
@@ -1000,6 +1011,10 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_RECEIVER_TEST_COMMAND : 1 << (28*8+4),
HCI_LE_TRANSMITTER_TEST_COMMAND : 1 << (28*8+5),
HCI_LE_TEST_END_COMMAND : 1 << (28*8+6),
HCI_LE_ENABLE_MONITORING_ADVERTISERS_COMMAND : 1 << (28*8+7),
HCI_LE_CS_SET_CHANNEL_CLASSIFICATION_COMMAND : 1 << (29*8+0),
HCI_LE_CS_SET_PROCEDURE_PARAMETERS_COMMAND : 1 << (29*8+1),
HCI_LE_CS_PROCEDURE_ENABLE_COMMAND : 1 << (29*8+2),
HCI_ENHANCED_SETUP_SYNCHRONOUS_CONNECTION_COMMAND : 1 << (29*8+3),
HCI_ENHANCED_ACCEPT_SYNCHRONOUS_CONNECTION_REQUEST_COMMAND : 1 << (29*8+4),
HCI_READ_LOCAL_SUPPORTED_CODECS_COMMAND : 1 << (29*8+5),
@@ -1136,11 +1151,21 @@ HCI_SUPPORTED_COMMANDS_MASKS = {
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND : 1 << (46*8+0),
HCI_LE_SUBRATE_REQUEST_COMMAND : 1 << (46*8+1),
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (46*8+2),
HCI_LE_SET_DECISION_DATA_COMMAND : 1 << (46*8+3),
HCI_LE_SET_DECISION_INSTRUCTIONS_COMMAND : 1 << (46*8+4),
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND : 1 << (46*8+5),
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND : 1 << (46*8+6),
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND : 1 << (46*8+7),
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND : 1 << (47*8+0),
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND : 1 << (47*8+1),
HCI_LE_READ_ALL_LOCAL_SUPPORTED_FEATURES_COMMAND : 1 << (47*8+2),
HCI_LE_READ_ALL_REMOTE_FEATURES_COMMAND : 1 << (47*8+3),
HCI_LE_SET_HOST_FEATURE_V2_COMMAND : 1 << (47*8+4),
HCI_LE_ADD_DEVICE_TO_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+5),
HCI_LE_REMOVE_DEVICE_FROM_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+6),
HCI_LE_CLEAR_MONITORED_ADVERTISERS_LIST_COMMAND : 1 << (47*8+7),
HCI_LE_READ_MONITORED_ADVERTISERS_LIST_SIZE_COMMAND : 1 << (48*8+0),
HCI_LE_FRAME_SPACE_UPDATE_COMMAND : 1 << (48*8+1),
}
# LE Supported Features
@@ -1457,7 +1482,7 @@ class CodingFormat:
vendor_specific_codec_id: int = 0
@classmethod
def parse_from_bytes(cls, data: bytes, offset: int):
def parse_from_bytes(cls, data: bytes, offset: int) -> tuple[int, CodingFormat]:
(codec_id, company_id, vendor_specific_codec_id) = struct.unpack_from(
'<BHH', data, offset
)
@@ -1467,14 +1492,15 @@ class CodingFormat:
vendor_specific_codec_id=vendor_specific_codec_id,
)
def to_bytes(self) -> bytes:
@classmethod
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def __bytes__(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@@ -1691,7 +1717,7 @@ class HCI_Object:
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, 'to_bytes'
field_value, '__bytes__'
):
field_bytes = bytes(field_value)
if isinstance(field_type, int) and 4 < field_type <= 256:
@@ -1736,7 +1762,7 @@ class HCI_Object:
def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def to_bytes(self):
def __bytes__(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@staticmethod
@@ -1831,9 +1857,6 @@ class HCI_Object:
for field_name, field_value in field_strings
)
def __bytes__(self):
return self.to_bytes()
def __init__(self, fields, **kwargs):
self.fields = fields
self.init_from_fields(self, fields, kwargs)
@@ -2008,9 +2031,6 @@ class Address:
def is_static(self):
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_bytes(self):
return self.address_bytes
def to_string(self, with_type_qualifier=True):
'''
String representation of the address, MSB first, with an optional type
@@ -2022,7 +2042,7 @@ class Address:
return result + '/P'
def __bytes__(self):
return self.to_bytes()
return self.address_bytes
def __hash__(self):
return hash(self.address_bytes)
@@ -2228,16 +2248,13 @@ class HCI_Command(HCI_Packet):
self.op_code = op_code
self.parameters = parameters
def to_bytes(self):
def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters
return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
+ parameters
)
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'green')
if fields := getattr(self, 'fields', None):
@@ -4302,6 +4319,61 @@ class HCI_LE_Clear_Advertising_Sets_Command(HCI_Command):
'''
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
('periodic_advertising_interval_min', 2),
('periodic_advertising_interval_max', 2),
('periodic_advertising_properties', 2),
]
)
class HCI_LE_Set_Periodic_Advertising_Parameters_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.61 LE Set Periodic Advertising Parameters command
'''
class Properties(enum.IntFlag):
INCLUDE_TX_POWER = 1 << 6
advertising_handle: int
periodic_advertising_interval_min: int
periodic_advertising_interval_max: int
periodic_advertising_properties: int
# -----------------------------------------------------------------------------
@HCI_Command.command(
[
('advertising_handle', 1),
(
'operation',
{
'size': 1,
'mapper': lambda x: HCI_LE_Set_Extended_Advertising_Data_Command.Operation(
x
).name,
},
),
(
'advertising_data',
{
'parser': HCI_Object.parse_length_prefixed_bytes,
'serializer': HCI_Object.serialize_length_prefixed_bytes,
},
),
]
)
class HCI_LE_Set_Periodic_Advertising_Data_Command(HCI_Command):
'''
See Bluetooth spec @ 7.8.62 LE Set Periodic Advertising Data command
'''
advertising_handle: int
operation: int
advertising_data: bytes
# -----------------------------------------------------------------------------
@HCI_Command.command([('enable', 1), ('advertising_handle', 1)])
class HCI_LE_Set_Periodic_Advertising_Enable_Command(HCI_Command):
@@ -5106,13 +5178,10 @@ class HCI_Event(HCI_Packet):
self.event_code = event_code
self.parameters = parameters
def to_bytes(self):
def __bytes__(self):
parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None):
@@ -6663,7 +6732,7 @@ class HCI_AclDataPacket(HCI_Packet):
connection_handle, pb_flag, bc_flag, data_total_length, data
)
def to_bytes(self):
def __bytes__(self):
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
@@ -6677,9 +6746,6 @@ class HCI_AclDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self):
return self.to_bytes()
def __str__(self):
return (
f'{color("ACL", "blue")}: '
@@ -6713,7 +6779,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
connection_handle, packet_status, data_total_length, data
)
def to_bytes(self) -> bytes:
def __bytes__(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle
return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
@@ -6732,9 +6798,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -6807,9 +6870,6 @@ class HCI_IsoDataPacket(HCI_Packet):
)
def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH'
args = [
HCI_ISO_DATA_PACKET,

View File

@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if bytes(connection.peer_address) == bytes(bd_addr):
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type

View File

@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def to_bytes(self) -> bytes:
def __bytes__(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
@@ -233,9 +233,6 @@ class L2CAP_PDU:
self.cid = cid
self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -333,11 +330,8 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes:
return self.to_bytes()
return self.pdu
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'

View File

@@ -39,7 +39,6 @@ from bumble.device import (
AdvertisingEventProperties,
AdvertisingType,
Device,
Phy,
)
from bumble.gatt import Service
from bumble.hci import (
@@ -47,6 +46,7 @@ from bumble.hci import (
HCI_PAGE_TIMEOUT_ERROR,
HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
Address,
Phy,
)
from google.protobuf import any_pb2 # pytype: disable=pyi-error
from google.protobuf import empty_pb2 # pytype: disable=pyi-error

View File

@@ -95,7 +95,7 @@ class AudioInputStatus(OpenIntEnum):
Cf. 3.4 Audio Input Status
'''
INATIVE = 0x00
INACTIVE = 0x00
ACTIVE = 0x01
@@ -104,7 +104,7 @@ class AudioInputControlPointOpCode(OpenIntEnum):
Cf. 3.5.1 Audio Input Control Point procedure requirements
'''
SET_GAIN_SETTING = 0x00
SET_GAIN_SETTING = 0x01
UNMUTE = 0x02
MUTE = 0x03
SET_MANUAL_GAIN_MODE = 0x04
@@ -239,7 +239,7 @@ class AudioInputControlPoint:
or gain_settings_operand
> self.gain_settings_properties.gain_settings_maximum
):
logger.error("gain_seetings value out of range")
logger.error("gain_settings value out of range")
raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
if self.audio_input_state.gain_settings != gain_settings_operand:

View File

@@ -265,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
@@ -398,18 +398,21 @@ class CodecSpecificConfiguration:
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
sampling_frequency: SamplingFrequency
frame_duration: FrameDuration
audio_channel_allocation: AudioLocation
octets_per_codec_frame: int
codec_frames_per_sdu: int
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
offset = 0
# Allowed default values.
audio_channel_allocation = AudioLocation.NOT_ALLOWED
codec_frames_per_sdu = 1
sampling_frequency: SamplingFrequency | None = None
frame_duration: FrameDuration | None = None
audio_channel_allocation: AudioLocation | None = None
octets_per_codec_frame: int | None = None
codec_frames_per_sdu: int | None = None
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
@@ -427,8 +430,6 @@ class CodecSpecificConfiguration:
elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
# pylint: disable=possibly-used-before-assignment,used-before-assignment
return CodecSpecificConfiguration(
sampling_frequency=sampling_frequency,
frame_duration=frame_duration,
@@ -438,23 +439,43 @@ class CodecSpecificConfiguration:
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBBBBBBBIBBHBBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
return b''.join(
[
struct.pack(fmt, length, tag, value)
for fmt, length, tag, value in [
(
'<BBB',
2,
CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
self.sampling_frequency,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.FRAME_DURATION,
self.frame_duration,
),
(
'<BBI',
5,
CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
self.audio_channel_allocation,
),
(
'<BBH',
3,
CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
self.octets_per_codec_frame,
),
(
'<BBB',
2,
CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
self.codec_frames_per_sdu,
),
]
if value is not None
]
)
@@ -466,6 +487,24 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
@dataclasses.dataclass
class BasicAudioAnnouncement:
@@ -474,26 +513,37 @@ class BasicAudioAnnouncement:
index: int
codec_specific_configuration: CodecSpecificConfiguration
@dataclasses.dataclass
class CodecInfo:
coding_format: hci.CodecID
company_id: int
vendor_specific_codec_id: int
@classmethod
def from_bytes(cls, data: bytes) -> Self:
coding_format = hci.CodecID(data[0])
company_id = int.from_bytes(data[1:3], 'little')
vendor_specific_codec_id = int.from_bytes(data[3:5], 'little')
return cls(coding_format, company_id, vendor_specific_codec_id)
def __bytes__(self) -> bytes:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([self.index, len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
)
@dataclasses.dataclass
class Subgroup:
codec_id: BasicAudioAnnouncement.CodecInfo
codec_id: hci.CodingFormat
codec_specific_configuration: CodecSpecificConfiguration
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
+ metadata_bytes
+ b''.join(map(bytes, self.bis))
)
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -505,7 +555,7 @@ class BasicAudioAnnouncement:
for _ in range(data[3]):
num_bis = data[offset]
offset += 1
codec_id = cls.CodecInfo.from_bytes(data[offset : offset + 5])
codec_id = hci.CodingFormat.from_bytes(data[offset : offset + 5])
offset += 5
codec_specific_configuration_length = data[offset]
offset += 1
@@ -549,3 +599,25 @@ class BasicAudioAnnouncement:
)
return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)

View File

@@ -344,9 +344,6 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica
return result
def to_bytes(self):
return bytes(self)
def __bytes__(self):
# Return early if we have a cache
if self.bytes:
@@ -623,11 +620,8 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'

View File

@@ -298,11 +298,8 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
return self.pdu
def __str__(self):
result = color(self.name, 'yellow')
@@ -1839,7 +1836,7 @@ class Session:
if self.is_initiator:
if self.pairing_method == PairingMethod.OOB:
self.send_pairing_random_command()
else:
elif self.pairing_method == PairingMethod.PASSKEY:
self.send_pairing_confirm_command()
else:
if self.pairing_method == PairingMethod.PASSKEY:
@@ -1949,7 +1946,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}'
)
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, command.to_bytes())
connection.send_l2cap_pdu(cid, bytes(command))
def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command

View File

@@ -370,11 +370,13 @@ class PumpedPacketSource(ParserSource):
self.parser.feed_data(packet)
except asyncio.CancelledError:
logger.debug('source pump task done')
self.terminated.set_result(None)
if not self.terminated.done():
self.terminated.set_result(None)
break
except Exception as error:
logger.warning(f'exception while waiting for packet: {error}')
self.terminated.set_exception(error)
if not self.terminated.done():
self.terminated.set_exception(error)
break
self.pump_task = asyncio.create_task(pump_packets())

View File

@@ -161,7 +161,13 @@ async def main() -> None:
else:
file_output = open(f'{datetime.datetime.now().isoformat()}.lc3', 'wb')
codec_configuration = ase.codec_specific_configuration
assert isinstance(codec_configuration, CodecSpecificConfiguration)
if (
not isinstance(codec_configuration, CodecSpecificConfiguration)
or codec_configuration.sampling_frequency is None
or codec_configuration.audio_channel_allocation is None
or codec_configuration.frame_duration is None
):
return
# Write a LC3 header.
file_output.write(
bytes([0x1C, 0xCC]) # Header.

View File

@@ -80,7 +80,7 @@ impl Address {
/// Creates a new [Address] object.
pub fn new(address: &str, address_type: AddressType) -> PyResult<Self> {
Python::with_gil(|py| {
PyModule::import(py, intern!(py, "bumble.device"))?
PyModule::import(py, intern!(py, "bumble.hci"))?
.getattr(intern!(py, "Address"))?
.call1((address, address_type))
.map(|any| Self(any.into()))

View File

@@ -51,7 +51,7 @@ install_requires =
pyserial-asyncio >= 0.5; platform_system!='Emscripten'
pyserial >= 3.5; platform_system!='Emscripten'
pyusb >= 1.2; platform_system!='Emscripten'
websockets >= 12.0; platform_system!='Emscripten'
websockets == 13.1; platform_system!='Emscripten'
[options.entry_points]
console_scripts =

View File

@@ -39,6 +39,8 @@ from bumble.profiles.ascs import (
)
from bumble.profiles.bap import (
AudioLocation,
BasicAudioAnnouncement,
BroadcastAudioAnnouncement,
SupportedFrameDuration,
SupportedSamplingFrequency,
SamplingFrequency,
@@ -200,6 +202,56 @@ def test_codec_specific_configuration() -> None:
assert CodecSpecificConfiguration.from_bytes(bytes(config)) == config
# -----------------------------------------------------------------------------
def test_broadcast_audio_announcement() -> None:
broadcast_audio_announcement = BroadcastAudioAnnouncement(123456)
assert (
BroadcastAudioAnnouncement.from_bytes(bytes(broadcast_audio_announcement))
== broadcast_audio_announcement
)
# -----------------------------------------------------------------------------
def test_basic_audio_announcement() -> None:
basic_audio_announcement = BasicAudioAnnouncement(
presentation_delay=40000,
subgroups=[
BasicAudioAnnouncement.Subgroup(
codec_id=CodingFormat(codec_id=CodecID.LC3),
codec_specific_configuration=CodecSpecificConfiguration(
sampling_frequency=SamplingFrequency.FREQ_48000,
frame_duration=FrameDuration.DURATION_10000_US,
octets_per_codec_frame=100,
),
metadata=Metadata(
[
Metadata.Entry(tag=Metadata.Tag.LANGUAGE, data=b'eng'),
Metadata.Entry(tag=Metadata.Tag.PROGRAM_INFO, data=b'Disco'),
]
),
bis=[
BasicAudioAnnouncement.BIS(
index=0,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_LEFT
),
),
BasicAudioAnnouncement.BIS(
index=1,
codec_specific_configuration=CodecSpecificConfiguration(
audio_channel_allocation=AudioLocation.FRONT_RIGHT
),
),
],
)
],
)
assert (
BasicAudioAnnouncement.from_bytes(bytes(basic_audio_announcement))
== basic_audio_announcement
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():

View File

@@ -19,9 +19,7 @@ import asyncio
import functools
import logging
import os
from types import LambdaType
import pytest
from unittest import mock
from bumble.core import (
BT_BR_EDR_TRANSPORT,
@@ -29,7 +27,13 @@ from bumble.core import (
BT_PERIPHERAL_ROLE,
ConnectionParameters,
)
from bumble.device import AdvertisingParameters, Connection, Device
from bumble.device import (
AdvertisingEventProperties,
AdvertisingParameters,
Connection,
Device,
PeriodicAdvertisingParameters,
)
from bumble.host import AclPacketQueue, Host
from bumble.hci import (
HCI_ACCEPT_CONNECTION_REQUEST_COMMAND,
@@ -265,7 +269,8 @@ async def test_flush():
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_legacy_advertising():
device = Device(host=mock.AsyncMock(Host))
device = TwoDevices()[0]
await device.power_on()
# Start advertising
await device.start_advertising()
@@ -283,7 +288,10 @@ async def test_legacy_advertising():
)
@pytest.mark.asyncio
async def test_legacy_advertising_disconnection(auto_restart):
device = Device(host=mock.AsyncMock(spec=Host))
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
await device.start_advertising(auto_restart=auto_restart)
device.on_connection(
@@ -305,6 +313,11 @@ async def test_legacy_advertising_disconnection(auto_restart):
await async_barrier()
if auto_restart:
assert device.legacy_advertising_set
started = asyncio.Event()
if not device.is_advertising:
device.legacy_advertising_set.once('start', started.set)
await asyncio.wait_for(started.wait(), _TIMEOUT)
assert device.is_advertising
else:
assert not device.is_advertising
@@ -313,7 +326,8 @@ async def test_legacy_advertising_disconnection(auto_restart):
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_extended_advertising():
device = Device(host=mock.AsyncMock(Host))
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set()
@@ -332,7 +346,8 @@ async def test_extended_advertising():
)
@pytest.mark.asyncio
async def test_extended_advertising_connection(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
device = TwoDevices()[0]
await device.power_on()
peer_address = Address('F0:F1:F2:F3:F4:F5')
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
@@ -368,8 +383,10 @@ async def test_extended_advertising_connection(own_address_type):
)
@pytest.mark.asyncio
async def test_extended_advertising_connection_out_of_order(own_address_type):
device = Device(host=mock.AsyncMock(spec=Host))
peer_address = Address('F0:F1:F2:F3:F4:F5')
devices = TwoDevices()
device = devices[0]
devices.controllers[0].le_features = bytes.fromhex('ffffffffffffffff')
await device.power_on()
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(own_address_type=own_address_type)
)
@@ -382,7 +399,7 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
device.on_connection(
0x0001,
BT_LE_TRANSPORT,
peer_address,
Address('F0:F1:F2:F3:F4:F5'),
None,
None,
BT_PERIPHERAL_ROLE,
@@ -397,6 +414,34 @@ async def test_extended_advertising_connection_out_of_order(own_address_type):
await async_barrier()
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_periodic_advertising():
device = TwoDevices()[0]
await device.power_on()
# Start advertising
advertising_set = await device.create_advertising_set(
advertising_parameters=AdvertisingParameters(
advertising_event_properties=AdvertisingEventProperties(
is_connectable=False
)
),
advertising_data=b'123',
periodic_advertising_parameters=PeriodicAdvertisingParameters(),
periodic_advertising_data=b'abc',
)
assert device.extended_advertising_sets
assert advertising_set.enabled
assert not advertising_set.periodic_enabled
await advertising_set.start_periodic()
assert advertising_set.periodic_enabled
await advertising_set.stop_periodic()
assert not advertising_set.periodic_enabled
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_remote_le_features():

View File

@@ -57,7 +57,7 @@ from .test_utils import async_barrier
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = x.to_bytes()
pdu = bytes(x)
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
@@ -74,7 +74,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes())
w = UUID.from_bytes(bytes(v))
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)

View File

@@ -75,13 +75,13 @@ from bumble.hci import (
def basic_check(x):
packet = x.to_bytes()
packet = bytes(x)
print(packet.hex())
parsed = HCI_Packet.from_bytes(packet)
x_str = str(x)
parsed_str = str(parsed)
print(x_str)
parsed_bytes = parsed.to_bytes()
parsed_bytes = bytes(parsed)
assert x_str == parsed_str
assert packet == parsed_bytes
@@ -188,7 +188,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]),
)
basic_check(event)
event = HCI_Packet.from_bytes(event.to_bytes())
event = HCI_Packet.from_bytes(bytes(event))
assert event.return_parameters == 7
# With a simple status as an integer status
@@ -562,7 +562,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert packet.to_bytes() == data
assert bytes(packet) == data
# -----------------------------------------------------------------------------

View File

@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0])
assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes()
assert bytes(result[1].uuid) == bytes(s3.uuid)
# -----------------------------------------------------------------------------