mirror of
https://github.com/google/bumble.git
synced 2026-05-09 04:08:02 +00:00
@@ -31,6 +31,8 @@ from bumble.core import (
|
||||
BT_BR_EDR_TRANSPORT,
|
||||
AdvertisingData,
|
||||
DeviceClass,
|
||||
InvalidArgumentError,
|
||||
InvalidPacketError,
|
||||
ProtocolError,
|
||||
bit_flags_to_strings,
|
||||
name_or_number,
|
||||
@@ -92,14 +94,14 @@ def map_class_of_device(class_of_device):
|
||||
)
|
||||
|
||||
|
||||
def phy_list_to_bits(phys):
|
||||
def phy_list_to_bits(phys: Optional[Iterable[int]]) -> int:
|
||||
if phys is None:
|
||||
return 0
|
||||
|
||||
phy_bits = 0
|
||||
for phy in phys:
|
||||
if phy not in HCI_LE_PHY_TYPE_TO_BIT:
|
||||
raise ValueError('invalid PHY')
|
||||
raise InvalidArgumentError('invalid PHY')
|
||||
phy_bits |= 1 << HCI_LE_PHY_TYPE_TO_BIT[phy]
|
||||
return phy_bits
|
||||
|
||||
@@ -1553,7 +1555,7 @@ class HCI_Object:
|
||||
new_offset, field_value = field_type(data, offset)
|
||||
return (field_value, new_offset - offset)
|
||||
|
||||
raise ValueError(f'unknown field type {field_type}')
|
||||
raise InvalidArgumentError(f'unknown field type {field_type}')
|
||||
|
||||
@staticmethod
|
||||
def dict_from_bytes(data, offset, fields):
|
||||
@@ -1622,7 +1624,7 @@ class HCI_Object:
|
||||
if 0 <= field_value <= 255:
|
||||
field_bytes = bytes([field_value])
|
||||
else:
|
||||
raise ValueError('value too large for *-typed field')
|
||||
raise InvalidArgumentError('value too large for *-typed field')
|
||||
else:
|
||||
field_bytes = bytes(field_value)
|
||||
elif field_type == 'v':
|
||||
@@ -1641,7 +1643,9 @@ class HCI_Object:
|
||||
elif len(field_bytes) > field_type:
|
||||
field_bytes = field_bytes[:field_type]
|
||||
else:
|
||||
raise ValueError(f"don't know how to serialize type {type(field_value)}")
|
||||
raise InvalidArgumentError(
|
||||
f"don't know how to serialize type {type(field_value)}"
|
||||
)
|
||||
|
||||
return field_bytes
|
||||
|
||||
@@ -1905,7 +1909,7 @@ class Address:
|
||||
self.address_bytes = bytes(reversed(bytes.fromhex(address)))
|
||||
|
||||
if len(self.address_bytes) != 6:
|
||||
raise ValueError('invalid address length')
|
||||
raise InvalidArgumentError('invalid address length')
|
||||
|
||||
self.address_type = address_type
|
||||
|
||||
@@ -2108,7 +2112,7 @@ class HCI_Command(HCI_Packet):
|
||||
op_code, length = struct.unpack_from('<HB', packet, 1)
|
||||
parameters = packet[4:]
|
||||
if len(parameters) != length:
|
||||
raise ValueError('invalid packet length')
|
||||
raise InvalidPacketError('invalid packet length')
|
||||
|
||||
# Look for a registered class
|
||||
cls = HCI_Command.command_classes.get(op_code)
|
||||
@@ -4807,7 +4811,7 @@ class HCI_Event(HCI_Packet):
|
||||
length = packet[2]
|
||||
parameters = packet[3:]
|
||||
if len(parameters) != length:
|
||||
raise ValueError('invalid packet length')
|
||||
raise InvalidPacketError('invalid packet length')
|
||||
|
||||
cls: Any
|
||||
if event_code == HCI_LE_META_EVENT:
|
||||
@@ -6342,7 +6346,7 @@ class HCI_AclDataPacket(HCI_Packet):
|
||||
bc_flag = (h >> 14) & 3
|
||||
data = packet[5:]
|
||||
if len(data) != data_total_length:
|
||||
raise ValueError('invalid packet length')
|
||||
raise InvalidPacketError('invalid packet length')
|
||||
return HCI_AclDataPacket(
|
||||
connection_handle, pb_flag, bc_flag, data_total_length, data
|
||||
)
|
||||
@@ -6390,7 +6394,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
|
||||
packet_status = (h >> 12) & 0b11
|
||||
data = packet[4:]
|
||||
if len(data) != data_total_length:
|
||||
raise ValueError(
|
||||
raise InvalidPacketError(
|
||||
f'invalid packet length {len(data)} != {data_total_length}'
|
||||
)
|
||||
return HCI_SynchronousDataPacket(
|
||||
|
||||
Reference in New Issue
Block a user