Compare commits

..

1 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod b9f91f695a fix oob support in pair.py 2024-11-27 12:58:03 -08:00
21 changed files with 151 additions and 100 deletions
+1 -1
View File
@@ -83,7 +83,7 @@ async def async_main():
return_parameters=bytes([hci.HCI_SUCCESS]),
)
# Return a packet with 'respond to sender' set to True
return (bytes(response), True)
return (response.to_bytes(), True)
return None
+13 -8
View File
@@ -373,7 +373,9 @@ async def pair(
shared_data = (
None
if oob == '-'
else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
else OobData.from_ad(
AdvertisingData.from_bytes(bytes.fromhex(oob))
).shared_data
)
legacy_context = OobLegacyContext()
oob_contexts = PairingConfig.OobConfig(
@@ -381,16 +383,19 @@ async def pair(
peer_data=shared_data,
legacy_context=legacy_context,
)
oob_data = OobData(
address=device.random_address,
shared_data=shared_data,
legacy_context=legacy_context,
)
print(color('@@@-----------------------------------', 'yellow'))
print(color('@@@ OOB Data:', 'yellow'))
print(color(f'@@@ {our_oob_context.share()}', 'yellow'))
if shared_data is None:
oob_data = OobData(
address=device.random_address, shared_data=our_oob_context.share()
)
print(
color(
f'@@@ SHARE: {bytes(oob_data.to_ad()).hex()}',
'yellow',
)
)
print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow'))
print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
print(color('@@@-----------------------------------', 'yellow'))
else:
oob_contexts = None
+5 -1
View File
@@ -57,6 +57,7 @@ if TYPE_CHECKING:
# pylint: disable=line-too-long
ATT_CID = 0x04
ATT_PSM = 0x001F
ATT_ERROR_RESPONSE = 0x01
ATT_EXCHANGE_MTU_REQUEST = 0x02
@@ -291,6 +292,9 @@ class ATT_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def to_bytes(self):
return self.pdu
@property
def is_command(self):
return ((self.op_code >> 6) & 1) == 1
@@ -300,7 +304,7 @@ class ATT_PDU:
return ((self.op_code >> 7) & 1) == 1
def __bytes__(self):
return self.pdu
return self.to_bytes()
def __str__(self):
result = color(self.name, 'yellow')
+2 -2
View File
@@ -314,7 +314,7 @@ class Controller:
f'{color("CONTROLLER -> HOST", "green")}: {packet}'
)
if self.host:
self.host.on_packet(bytes(packet))
self.host.on_packet(packet.to_bytes())
# This method allows the controller to emulate the same API as a transport source
async def wait_for_termination(self):
@@ -1192,7 +1192,7 @@ class Controller:
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
'''
bd_addr = (
bytes(self._public_address)
self._public_address.to_bytes()
if self._public_address is not None
else bytes(6)
)
+3
View File
@@ -1624,6 +1624,9 @@ class AdvertisingData:
[bytes([len(x[1]) + 1, x[0]]) + x[1] for x in self.ad_structures]
)
def to_bytes(self) -> bytes:
return bytes(self)
def to_string(self, separator=', '):
return separator.join(
[AdvertisingData.ad_data_to_string(x[0], x[1]) for x in self.ad_structures]
+1 -1
View File
@@ -1986,7 +1986,7 @@ class Device(CompositeEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if bytes(connection.peer_address) == bytes(bd_addr):
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type
+1 -1
View File
@@ -410,7 +410,7 @@ class IncludedServiceDeclaration(Attribute):
def __init__(self, service: Service) -> None:
declaration_bytes = struct.pack(
'<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
)
super().__init__(
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
+3 -3
View File
@@ -292,7 +292,7 @@ class Client:
logger.debug(
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
)
self.send_gatt_pdu(bytes(command))
self.send_gatt_pdu(command.to_bytes())
async def send_request(self, request: ATT_PDU):
logger.debug(
@@ -310,7 +310,7 @@ class Client:
self.pending_request = request
try:
self.send_gatt_pdu(bytes(request))
self.send_gatt_pdu(request.to_bytes())
response = await asyncio.wait_for(
self.pending_response, GATT_REQUEST_TIMEOUT
)
@@ -328,7 +328,7 @@ class Client:
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
f'{confirmation}'
)
self.send_gatt_pdu(bytes(confirmation))
self.send_gatt_pdu(confirmation.to_bytes())
async def request_mtu(self, mtu: int) -> int:
# Check the range
+2 -2
View File
@@ -353,7 +353,7 @@ class Server(EventEmitter):
logger.debug(
f'GATT Response from server: [0x{connection.handle:04X}] {response}'
)
self.send_gatt_pdu(connection.handle, bytes(response))
self.send_gatt_pdu(connection.handle, response.to_bytes())
async def notify_subscriber(
self,
@@ -450,7 +450,7 @@ class Server(EventEmitter):
)
try:
self.send_gatt_pdu(connection.handle, bytes(indication))
self.send_gatt_pdu(connection.handle, indication.to_bytes())
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
except asyncio.TimeoutError as error:
logger.warning(color('!!! GATT Indicate timeout', 'red'))
+32 -8
View File
@@ -1496,11 +1496,14 @@ class CodingFormat:
def from_bytes(cls, data: bytes) -> CodingFormat:
return cls.parse_from_bytes(data, 0)[1]
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
return struct.pack(
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
)
def __bytes__(self) -> bytes:
return self.to_bytes()
# -----------------------------------------------------------------------------
class HCI_Constant:
@@ -1717,7 +1720,7 @@ class HCI_Object:
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
field_value, '__bytes__'
field_value, 'to_bytes'
):
field_bytes = bytes(field_value)
if isinstance(field_type, int) and 4 < field_type <= 256:
@@ -1762,7 +1765,7 @@ class HCI_Object:
def from_bytes(cls, data, offset, fields):
return cls(fields, **cls.dict_from_bytes(data, offset, fields))
def __bytes__(self):
def to_bytes(self):
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
@staticmethod
@@ -1857,6 +1860,9 @@ class HCI_Object:
for field_name, field_value in field_strings
)
def __bytes__(self):
return self.to_bytes()
def __init__(self, fields, **kwargs):
self.fields = fields
self.init_from_fields(self, fields, kwargs)
@@ -2031,6 +2037,9 @@ class Address:
def is_static(self):
return self.is_random and (self.address_bytes[5] >> 6 == 3)
def to_bytes(self):
return self.address_bytes
def to_string(self, with_type_qualifier=True):
'''
String representation of the address, MSB first, with an optional type
@@ -2042,7 +2051,7 @@ class Address:
return result + '/P'
def __bytes__(self):
return self.address_bytes
return self.to_bytes()
def __hash__(self):
return hash(self.address_bytes)
@@ -2248,13 +2257,16 @@ class HCI_Command(HCI_Packet):
self.op_code = op_code
self.parameters = parameters
def __bytes__(self):
def to_bytes(self):
parameters = b'' if self.parameters is None else self.parameters
return (
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
+ parameters
)
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'green')
if fields := getattr(self, 'fields', None):
@@ -5178,10 +5190,13 @@ class HCI_Event(HCI_Packet):
self.event_code = event_code
self.parameters = parameters
def __bytes__(self):
def to_bytes(self):
parameters = b'' if self.parameters is None else self.parameters
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'magenta')
if fields := getattr(self, 'fields', None):
@@ -6732,7 +6747,7 @@ class HCI_AclDataPacket(HCI_Packet):
connection_handle, pb_flag, bc_flag, data_total_length, data
)
def __bytes__(self):
def to_bytes(self):
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
return (
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
@@ -6746,6 +6761,9 @@ class HCI_AclDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self):
return self.to_bytes()
def __str__(self):
return (
f'{color("ACL", "blue")}: '
@@ -6779,7 +6797,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
connection_handle, packet_status, data_total_length, data
)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
h = (self.packet_status << 12) | self.connection_handle
return (
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
@@ -6798,6 +6816,9 @@ class HCI_SynchronousDataPacket(HCI_Packet):
self.data_total_length = data_total_length
self.data = data
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return (
f'{color("SCO", "blue")}: '
@@ -6870,6 +6891,9 @@ class HCI_IsoDataPacket(HCI_Packet):
)
def __bytes__(self) -> bytes:
return self.to_bytes()
def to_bytes(self) -> bytes:
fmt = '<BHH'
args = [
HCI_ISO_DATA_PACKET,
+17 -20
View File
@@ -141,7 +141,7 @@ class HfFeature(enum.IntFlag):
"""
HF supported features (AT+BRSF=) (normative).
Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
EC_NR = 0x001 # Echo Cancel & Noise reduction
@@ -155,14 +155,14 @@ class HfFeature(enum.IntFlag):
HF_INDICATORS = 0x100
ESCO_S4_SETTINGS_SUPPORTED = 0x200
ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
VOICE_RECOGNITION_TEXT = 0x800
VOICE_RECOGNITION_TEST = 0x800
class AgFeature(enum.IntFlag):
"""
AG supported features (+BRSF:) (normative).
Hands-Free Profile v1.9, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 and 3GPP 27.007.
"""
THREE_WAY_CALLING = 0x001
@@ -178,7 +178,7 @@ class AgFeature(enum.IntFlag):
HF_INDICATORS = 0x400
ESCO_S4_SETTINGS_SUPPORTED = 0x800
ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
VOICE_RECOGNITION_TEXT = 0x2000
VOICE_RECOGNITION_TEST = 0x2000
class AudioCodec(enum.IntEnum):
@@ -1390,7 +1390,6 @@ class AgProtocol(pyee.EventEmitter):
def _on_bac(self, *args) -> None:
self.supported_audio_codecs = [AudioCodec(int(value)) for value in args]
self.emit('supported_audio_codecs', self.supported_audio_codecs)
self.send_ok()
def _on_bcs(self, codec: bytes) -> None:
@@ -1619,7 +1618,7 @@ class ProfileVersion(enum.IntEnum):
"""
Profile version (normative).
Hands-Free Profile v1.8, 6.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
V1_5 = 0x0105
@@ -1633,7 +1632,7 @@ class HfSdpFeature(enum.IntFlag):
"""
HF supported features (normative).
Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
EC_NR = 0x01 # Echo Cancel & Noise reduction
@@ -1641,17 +1640,16 @@ class HfSdpFeature(enum.IntFlag):
CLI_PRESENTATION_CAPABILITY = 0x04
VOICE_RECOGNITION_ACTIVATION = 0x08
REMOTE_VOLUME_CONTROL = 0x10
WIDE_BAND_SPEECH = 0x20
WIDE_BAND = 0x20 # Wide band speech
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND = 0x100
VOICE_RECOGNITION_TEST = 0x80
class AgSdpFeature(enum.IntFlag):
"""
AG supported features (normative).
Hands-Free Profile v1.9, 6.3 SDP Interoperability Requirements.
Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements.
"""
THREE_WAY_CALLING = 0x01
@@ -1659,10 +1657,9 @@ class AgSdpFeature(enum.IntFlag):
VOICE_RECOGNITION_FUNCTION = 0x04
IN_BAND_RING_TONE_CAPABILITY = 0x08
VOICE_TAG = 0x10 # Attach a number to voice tag
WIDE_BAND_SPEECH = 0x20
WIDE_BAND = 0x20 # Wide band speech
ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
VOICE_RECOGNITION_TEXT = 0x80
SUPER_WIDE_BAND_SPEED_SPEECH = 0x100
VOICE_RECOGNITION_TEST = 0x80
def make_hf_sdp_records(
@@ -1695,11 +1692,11 @@ def make_hf_sdp_records(
in configuration.supported_hf_features
):
hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if HfFeature.VOICE_RECOGNITION_TEXT in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEXT
if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features:
hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST
if AudioCodec.MSBC in configuration.supported_audio_codecs:
hf_supported_features |= HfSdpFeature.WIDE_BAND_SPEECH
hf_supported_features |= HfSdpFeature.WIDE_BAND
return [
sdp.ServiceAttribute(
@@ -1775,14 +1772,14 @@ def make_ag_sdp_records(
in configuration.supported_ag_features
):
ag_supported_features |= AgSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
if AgFeature.VOICE_RECOGNITION_TEXT in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEXT
if AgFeature.VOICE_RECOGNITION_TEST in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_TEST
if AgFeature.IN_BAND_RING_TONE_CAPABILITY in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
if AgFeature.VOICE_RECOGNITION_FUNCTION in configuration.supported_ag_features:
ag_supported_features |= AgSdpFeature.VOICE_RECOGNITION_FUNCTION
if AudioCodec.MSBC in configuration.supported_audio_codecs:
ag_supported_features |= AgSdpFeature.WIDE_BAND_SPEECH
ag_supported_features |= AgSdpFeature.WIDE_BAND
return [
sdp.ServiceAttribute(
+1 -1
View File
@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
check_address_type: bool = False,
) -> Optional[Connection]:
for connection in self.connections.values():
if bytes(connection.peer_address) == bytes(bd_addr):
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
if (
check_address_type
and connection.peer_address.address_type != bd_addr.address_type
+8 -2
View File
@@ -225,7 +225,7 @@ class L2CAP_PDU:
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
header = struct.pack('<HH', len(self.payload), self.cid)
return header + self.payload
@@ -233,6 +233,9 @@ class L2CAP_PDU:
self.cid = cid
self.payload = payload
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
@@ -330,9 +333,12 @@ class L2CAP_Control_Frame:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
return self.pdu
def __bytes__(self) -> bytes:
return self.to_bytes()
def __str__(self) -> str:
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
if fields := getattr(self, 'fields', None):
+38 -30
View File
@@ -265,7 +265,7 @@ class UnicastServerAdvertisingData:
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
struct.pack(
'<2sBIB',
bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
self.announcement_type,
self.available_audio_contexts,
len(self.metadata),
@@ -487,23 +487,24 @@ class BroadcastAudioAnnouncement:
def from_bytes(cls, data: bytes) -> Self:
return cls(int.from_bytes(data[:3], 'little'))
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
return self.broadcast_id.to_bytes(3, 'little')
def __bytes__(self) -> bytes:
return self.to_bytes()
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
return core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes()
+ self.to_bytes()
),
)
]
).to_bytes()
@dataclasses.dataclass
@@ -513,7 +514,7 @@ class BasicAudioAnnouncement:
index: int
codec_specific_configuration: CodecSpecificConfiguration
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
@@ -522,6 +523,9 @@ class BasicAudioAnnouncement:
+ codec_specific_configuration_bytes
)
def __bytes__(self) -> bytes:
return self.to_bytes()
@dataclasses.dataclass
class Subgroup:
codec_id: hci.CodingFormat
@@ -529,14 +533,14 @@ class BasicAudioAnnouncement:
metadata: le_audio.Metadata
bis: List[BasicAudioAnnouncement.BIS]
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
metadata_bytes = bytes(self.metadata)
codec_specific_configuration_bytes = bytes(
self.codec_specific_configuration
)
return (
bytes([len(self.bis)])
+ bytes(self.codec_id)
+ self.codec_id.to_bytes()
+ bytes([len(codec_specific_configuration_bytes)])
+ codec_specific_configuration_bytes
+ bytes([len(metadata_bytes)])
@@ -544,6 +548,9 @@ class BasicAudioAnnouncement:
+ b''.join(map(bytes, self.bis))
)
def __bytes__(self) -> bytes:
return self.to_bytes()
presentation_delay: int
subgroups: List[BasicAudioAnnouncement.Subgroup]
@@ -600,24 +607,25 @@ class BasicAudioAnnouncement:
return cls(presentation_delay, subgroups)
def __bytes__(self) -> bytes:
def to_bytes(self) -> bytes:
return (
self.presentation_delay.to_bytes(3, 'little')
+ bytes([len(self.subgroups)])
+ b''.join(map(bytes, self.subgroups))
)
def __bytes__(self) -> bytes:
return self.to_bytes()
def get_advertising_data(self) -> bytes:
return bytes(
core.AdvertisingData(
[
return core.AdvertisingData(
[
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
(
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
+ bytes(self)
),
)
]
)
)
gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes()
+ self.to_bytes()
),
)
]
).to_bytes()
+7 -1
View File
@@ -344,6 +344,9 @@ class DataElement:
] # Keep a copy so we can re-serialize to an exact replica
return result
def to_bytes(self):
return bytes(self)
def __bytes__(self):
# Return early if we have a cache
if self.bytes:
@@ -620,9 +623,12 @@ class SDP_PDU:
def init_from_bytes(self, pdu, offset):
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self):
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
if fields := getattr(self, 'fields', None):
+5 -2
View File
@@ -298,9 +298,12 @@ class SMP_Command:
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
def __bytes__(self):
def to_bytes(self):
return self.pdu
def __bytes__(self):
return self.to_bytes()
def __str__(self):
result = color(self.name, 'yellow')
if fields := getattr(self, 'fields', None):
@@ -1946,7 +1949,7 @@ class Manager(EventEmitter):
f'{connection.peer_address}: {command}'
)
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
connection.send_l2cap_pdu(cid, bytes(command))
connection.send_l2cap_pdu(cid, command.to_bytes())
def on_smp_security_request_command(
self, connection: Connection, request: SMP_Security_Request_Command
+3 -8
View File
@@ -21,10 +21,10 @@ import sys
import os
import io
import logging
from typing import Iterable, Optional
import websockets
from typing import Optional
import bumble.core
from bumble.device import Device, ScoLink
from bumble.transport import open_transport_or_link
@@ -82,10 +82,6 @@ def on_microphone_volume(level: int):
send_message(type='microphone_volume', level=level)
def on_supported_audio_codecs(codecs: Iterable[hfp.AudioCodec]):
send_message(type='supported_audio_codecs', codecs=[codec.name for codec in codecs])
def on_sco_state_change(codec: int):
if codec == hfp.AudioCodec.CVSD:
sample_rate = 8000
@@ -211,7 +207,6 @@ async def main() -> None:
ag_protocol = hfp.AgProtocol(dlc, configuration)
ag_protocol.on('speaker_volume', on_speaker_volume)
ag_protocol.on('microphone_volume', on_microphone_volume)
ag_protocol.on('supported_audio_codecs', on_supported_audio_codecs)
on_hfp_state_change(True)
dlc.multiplexer.l2cap_channel.on(
'close', lambda: on_hfp_state_change(False)
@@ -246,7 +241,7 @@ async def main() -> None:
# Pick the first one
channel, version, hf_sdp_features = hfp_record
print(f'HF version: {version}')
print(f'HF features: {hf_sdp_features.name}')
print(f'HF features: {hf_sdp_features}')
# Request authentication
print('*** Authenticating...')
+2 -2
View File
@@ -57,7 +57,7 @@ from .test_utils import async_barrier
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = bytes(x)
pdu = x.to_bytes()
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
@@ -74,7 +74,7 @@ def test_UUID():
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(bytes(v))
w = UUID.from_bytes(v.to_bytes())
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)
+4 -4
View File
@@ -75,13 +75,13 @@ from bumble.hci import (
def basic_check(x):
packet = bytes(x)
packet = x.to_bytes()
print(packet.hex())
parsed = HCI_Packet.from_bytes(packet)
x_str = str(x)
parsed_str = str(parsed)
print(x_str)
parsed_bytes = bytes(parsed)
parsed_bytes = parsed.to_bytes()
assert x_str == parsed_str
assert packet == parsed_bytes
@@ -188,7 +188,7 @@ def test_HCI_Command_Complete_Event():
return_parameters=bytes([7]),
)
basic_check(event)
event = HCI_Packet.from_bytes(bytes(event))
event = HCI_Packet.from_bytes(event.to_bytes())
assert event.return_parameters == 7
# With a simple status as an integer status
@@ -562,7 +562,7 @@ def test_iso_data_packet():
'6281bc77ed6a3206d984bcdabee6be831c699cb50e2'
)
assert bytes(packet) == data
assert packet.to_bytes() == data
# -----------------------------------------------------------------------------
+2 -2
View File
@@ -61,7 +61,7 @@ def _default_hf_configuration() -> hfp.HfConfiguration:
# -----------------------------------------------------------------------------
def _default_hf_sdp_features() -> hfp.HfSdpFeature:
return (
hfp.HfSdpFeature.WIDE_BAND_SPEECH
hfp.HfSdpFeature.WIDE_BAND
| hfp.HfSdpFeature.THREE_WAY_CALLING
| hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY
)
@@ -108,7 +108,7 @@ def _default_ag_configuration() -> hfp.AgConfiguration:
# -----------------------------------------------------------------------------
def _default_ag_sdp_features() -> hfp.AgSdpFeature:
return (
hfp.AgSdpFeature.WIDE_BAND_SPEECH
hfp.AgSdpFeature.WIDE_BAND
| hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY
| hfp.AgSdpFeature.THREE_WAY_CALLING
)
+1 -1
View File
@@ -240,7 +240,7 @@ async def test_self_gatt():
result = await peer.discover_included_services(result[0])
assert len(result) == 2
# Service UUID is only present when the UUID is 16-bit Bluetooth UUID
assert bytes(result[1].uuid) == bytes(s3.uuid)
assert result[1].uuid.to_bytes() == s3.uuid.to_bytes()
# -----------------------------------------------------------------------------