forked from auracaster/bumble_mirror
only use __bytes__ when not argument is needed.
This commit is contained in:
@@ -291,9 +291,6 @@ class ATT_PDU:
|
||||
def init_from_bytes(self, pdu, offset):
|
||||
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
|
||||
|
||||
def to_bytes(self):
|
||||
return self.pdu
|
||||
|
||||
@property
|
||||
def is_command(self):
|
||||
return ((self.op_code >> 6) & 1) == 1
|
||||
@@ -303,7 +300,7 @@ class ATT_PDU:
|
||||
return ((self.op_code >> 7) & 1) == 1
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
return self.pdu
|
||||
|
||||
def __str__(self):
|
||||
result = color(self.name, 'yellow')
|
||||
|
||||
@@ -314,7 +314,7 @@ class Controller:
|
||||
f'{color("CONTROLLER -> HOST", "green")}: {packet}'
|
||||
)
|
||||
if self.host:
|
||||
self.host.on_packet(packet.to_bytes())
|
||||
self.host.on_packet(bytes(packet))
|
||||
|
||||
# This method allows the controller to emulate the same API as a transport source
|
||||
async def wait_for_termination(self):
|
||||
@@ -1192,7 +1192,7 @@ class Controller:
|
||||
See Bluetooth spec Vol 4, Part E - 7.4.6 Read BD_ADDR Command
|
||||
'''
|
||||
bd_addr = (
|
||||
self._public_address.to_bytes()
|
||||
bytes(self._public_address)
|
||||
if self._public_address is not None
|
||||
else bytes(6)
|
||||
)
|
||||
|
||||
@@ -1624,9 +1624,6 @@ class AdvertisingData:
|
||||
[bytes([len(x[1]) + 1, x[0]]) + x[1] for x in self.ad_structures]
|
||||
)
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
return bytes(self)
|
||||
|
||||
def to_string(self, separator=', '):
|
||||
return separator.join(
|
||||
[AdvertisingData.ad_data_to_string(x[0], x[1]) for x in self.ad_structures]
|
||||
|
||||
@@ -1986,7 +1986,7 @@ class Device(CompositeEventEmitter):
|
||||
check_address_type: bool = False,
|
||||
) -> Optional[Connection]:
|
||||
for connection in self.connections.values():
|
||||
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
|
||||
if bytes(connection.peer_address) == bytes(bd_addr):
|
||||
if (
|
||||
check_address_type
|
||||
and connection.peer_address.address_type != bd_addr.address_type
|
||||
|
||||
@@ -410,7 +410,7 @@ class IncludedServiceDeclaration(Attribute):
|
||||
|
||||
def __init__(self, service: Service) -> None:
|
||||
declaration_bytes = struct.pack(
|
||||
'<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
|
||||
'<HH2s', service.handle, service.end_group_handle, bytes(service.uuid)
|
||||
)
|
||||
super().__init__(
|
||||
GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
|
||||
|
||||
@@ -292,7 +292,7 @@ class Client:
|
||||
logger.debug(
|
||||
f'GATT Command from client: [0x{self.connection.handle:04X}] {command}'
|
||||
)
|
||||
self.send_gatt_pdu(command.to_bytes())
|
||||
self.send_gatt_pdu(bytes(command))
|
||||
|
||||
async def send_request(self, request: ATT_PDU):
|
||||
logger.debug(
|
||||
@@ -310,7 +310,7 @@ class Client:
|
||||
self.pending_request = request
|
||||
|
||||
try:
|
||||
self.send_gatt_pdu(request.to_bytes())
|
||||
self.send_gatt_pdu(bytes(request))
|
||||
response = await asyncio.wait_for(
|
||||
self.pending_response, GATT_REQUEST_TIMEOUT
|
||||
)
|
||||
@@ -328,7 +328,7 @@ class Client:
|
||||
f'GATT Confirmation from client: [0x{self.connection.handle:04X}] '
|
||||
f'{confirmation}'
|
||||
)
|
||||
self.send_gatt_pdu(confirmation.to_bytes())
|
||||
self.send_gatt_pdu(bytes(confirmation))
|
||||
|
||||
async def request_mtu(self, mtu: int) -> int:
|
||||
# Check the range
|
||||
|
||||
@@ -353,7 +353,7 @@ class Server(EventEmitter):
|
||||
logger.debug(
|
||||
f'GATT Response from server: [0x{connection.handle:04X}] {response}'
|
||||
)
|
||||
self.send_gatt_pdu(connection.handle, response.to_bytes())
|
||||
self.send_gatt_pdu(connection.handle, bytes(response))
|
||||
|
||||
async def notify_subscriber(
|
||||
self,
|
||||
@@ -450,7 +450,7 @@ class Server(EventEmitter):
|
||||
)
|
||||
|
||||
try:
|
||||
self.send_gatt_pdu(connection.handle, indication.to_bytes())
|
||||
self.send_gatt_pdu(connection.handle, bytes(indication))
|
||||
await asyncio.wait_for(pending_confirmation, GATT_REQUEST_TIMEOUT)
|
||||
except asyncio.TimeoutError as error:
|
||||
logger.warning(color('!!! GATT Indicate timeout', 'red'))
|
||||
|
||||
@@ -1496,14 +1496,11 @@ class CodingFormat:
|
||||
def from_bytes(cls, data: bytes) -> CodingFormat:
|
||||
return cls.parse_from_bytes(data, 0)[1]
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
return struct.pack(
|
||||
'<BHH', self.codec_id, self.company_id, self.vendor_specific_codec_id
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
|
||||
# -----------------------------------------------------------------------------
|
||||
class HCI_Constant:
|
||||
@@ -1720,7 +1717,7 @@ class HCI_Object:
|
||||
field_length = len(field_bytes)
|
||||
field_bytes = bytes([field_length]) + field_bytes
|
||||
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
|
||||
field_value, 'to_bytes'
|
||||
field_value, '__bytes__'
|
||||
):
|
||||
field_bytes = bytes(field_value)
|
||||
if isinstance(field_type, int) and 4 < field_type <= 256:
|
||||
@@ -1765,7 +1762,7 @@ class HCI_Object:
|
||||
def from_bytes(cls, data, offset, fields):
|
||||
return cls(fields, **cls.dict_from_bytes(data, offset, fields))
|
||||
|
||||
def to_bytes(self):
|
||||
def __bytes__(self):
|
||||
return HCI_Object.dict_to_bytes(self.__dict__, self.fields)
|
||||
|
||||
@staticmethod
|
||||
@@ -1860,9 +1857,6 @@ class HCI_Object:
|
||||
for field_name, field_value in field_strings
|
||||
)
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
|
||||
def __init__(self, fields, **kwargs):
|
||||
self.fields = fields
|
||||
self.init_from_fields(self, fields, kwargs)
|
||||
@@ -2037,9 +2031,6 @@ class Address:
|
||||
def is_static(self):
|
||||
return self.is_random and (self.address_bytes[5] >> 6 == 3)
|
||||
|
||||
def to_bytes(self):
|
||||
return self.address_bytes
|
||||
|
||||
def to_string(self, with_type_qualifier=True):
|
||||
'''
|
||||
String representation of the address, MSB first, with an optional type
|
||||
@@ -2051,7 +2042,7 @@ class Address:
|
||||
return result + '/P'
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
return self.address_bytes
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.address_bytes)
|
||||
@@ -2257,16 +2248,13 @@ class HCI_Command(HCI_Packet):
|
||||
self.op_code = op_code
|
||||
self.parameters = parameters
|
||||
|
||||
def to_bytes(self):
|
||||
def __bytes__(self):
|
||||
parameters = b'' if self.parameters is None else self.parameters
|
||||
return (
|
||||
struct.pack('<BHB', HCI_COMMAND_PACKET, self.op_code, len(parameters))
|
||||
+ parameters
|
||||
)
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
|
||||
def __str__(self):
|
||||
result = color(self.name, 'green')
|
||||
if fields := getattr(self, 'fields', None):
|
||||
@@ -5190,13 +5178,10 @@ class HCI_Event(HCI_Packet):
|
||||
self.event_code = event_code
|
||||
self.parameters = parameters
|
||||
|
||||
def to_bytes(self):
|
||||
def __bytes__(self):
|
||||
parameters = b'' if self.parameters is None else self.parameters
|
||||
return bytes([HCI_EVENT_PACKET, self.event_code, len(parameters)]) + parameters
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
|
||||
def __str__(self):
|
||||
result = color(self.name, 'magenta')
|
||||
if fields := getattr(self, 'fields', None):
|
||||
@@ -6747,7 +6732,7 @@ class HCI_AclDataPacket(HCI_Packet):
|
||||
connection_handle, pb_flag, bc_flag, data_total_length, data
|
||||
)
|
||||
|
||||
def to_bytes(self):
|
||||
def __bytes__(self):
|
||||
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
|
||||
return (
|
||||
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
|
||||
@@ -6761,9 +6746,6 @@ class HCI_AclDataPacket(HCI_Packet):
|
||||
self.data_total_length = data_total_length
|
||||
self.data = data
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
|
||||
def __str__(self):
|
||||
return (
|
||||
f'{color("ACL", "blue")}: '
|
||||
@@ -6797,7 +6779,7 @@ class HCI_SynchronousDataPacket(HCI_Packet):
|
||||
connection_handle, packet_status, data_total_length, data
|
||||
)
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
h = (self.packet_status << 12) | self.connection_handle
|
||||
return (
|
||||
struct.pack('<BHB', HCI_SYNCHRONOUS_DATA_PACKET, h, self.data_total_length)
|
||||
@@ -6816,9 +6798,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
|
||||
self.data_total_length = data_total_length
|
||||
self.data = data
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return (
|
||||
f'{color("SCO", "blue")}: '
|
||||
@@ -6891,9 +6870,6 @@ class HCI_IsoDataPacket(HCI_Packet):
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
fmt = '<BHH'
|
||||
args = [
|
||||
HCI_ISO_DATA_PACKET,
|
||||
|
||||
@@ -199,7 +199,7 @@ class Host(AbortableEventEmitter):
|
||||
check_address_type: bool = False,
|
||||
) -> Optional[Connection]:
|
||||
for connection in self.connections.values():
|
||||
if connection.peer_address.to_bytes() == bd_addr.to_bytes():
|
||||
if bytes(connection.peer_address) == bytes(bd_addr):
|
||||
if (
|
||||
check_address_type
|
||||
and connection.peer_address.address_type != bd_addr.address_type
|
||||
|
||||
@@ -225,7 +225,7 @@ class L2CAP_PDU:
|
||||
|
||||
return L2CAP_PDU(l2cap_pdu_cid, l2cap_pdu_payload)
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
header = struct.pack('<HH', len(self.payload), self.cid)
|
||||
return header + self.payload
|
||||
|
||||
@@ -233,9 +233,6 @@ class L2CAP_PDU:
|
||||
self.cid = cid
|
||||
self.payload = payload
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return f'{color("L2CAP", "green")} [CID={self.cid}]: {self.payload.hex()}'
|
||||
|
||||
@@ -333,11 +330,8 @@ class L2CAP_Control_Frame:
|
||||
def init_from_bytes(self, pdu, offset):
|
||||
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
return self.pdu
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
return self.pdu
|
||||
|
||||
def __str__(self) -> str:
|
||||
result = f'{color(self.name, "yellow")} [ID={self.identifier}]'
|
||||
|
||||
@@ -265,7 +265,7 @@ class UnicastServerAdvertisingData:
|
||||
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
|
||||
struct.pack(
|
||||
'<2sBIB',
|
||||
gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
|
||||
bytes(gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE),
|
||||
self.announcement_type,
|
||||
self.available_audio_contexts,
|
||||
len(self.metadata),
|
||||
@@ -487,24 +487,23 @@ class BroadcastAudioAnnouncement:
|
||||
def from_bytes(cls, data: bytes) -> Self:
|
||||
return cls(int.from_bytes(data[:3], 'little'))
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.broadcast_id.to_bytes(3, 'little')
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
def get_advertising_data(self) -> bytes:
|
||||
return core.AdvertisingData(
|
||||
[
|
||||
(
|
||||
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
|
||||
return bytes(
|
||||
core.AdvertisingData(
|
||||
[
|
||||
(
|
||||
gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes()
|
||||
+ self.to_bytes()
|
||||
),
|
||||
)
|
||||
]
|
||||
).to_bytes()
|
||||
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
|
||||
(
|
||||
bytes(gatt.GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE)
|
||||
+ bytes(self)
|
||||
),
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@dataclasses.dataclass
|
||||
@@ -514,7 +513,7 @@ class BasicAudioAnnouncement:
|
||||
index: int
|
||||
codec_specific_configuration: CodecSpecificConfiguration
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
codec_specific_configuration_bytes = bytes(
|
||||
self.codec_specific_configuration
|
||||
)
|
||||
@@ -523,9 +522,6 @@ class BasicAudioAnnouncement:
|
||||
+ codec_specific_configuration_bytes
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
@dataclasses.dataclass
|
||||
class Subgroup:
|
||||
codec_id: hci.CodingFormat
|
||||
@@ -533,14 +529,14 @@ class BasicAudioAnnouncement:
|
||||
metadata: le_audio.Metadata
|
||||
bis: List[BasicAudioAnnouncement.BIS]
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
metadata_bytes = bytes(self.metadata)
|
||||
codec_specific_configuration_bytes = bytes(
|
||||
self.codec_specific_configuration
|
||||
)
|
||||
return (
|
||||
bytes([len(self.bis)])
|
||||
+ self.codec_id.to_bytes()
|
||||
+ bytes(self.codec_id)
|
||||
+ bytes([len(codec_specific_configuration_bytes)])
|
||||
+ codec_specific_configuration_bytes
|
||||
+ bytes([len(metadata_bytes)])
|
||||
@@ -548,9 +544,6 @@ class BasicAudioAnnouncement:
|
||||
+ b''.join(map(bytes, self.bis))
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
presentation_delay: int
|
||||
subgroups: List[BasicAudioAnnouncement.Subgroup]
|
||||
|
||||
@@ -607,25 +600,24 @@ class BasicAudioAnnouncement:
|
||||
|
||||
return cls(presentation_delay, subgroups)
|
||||
|
||||
def to_bytes(self) -> bytes:
|
||||
def __bytes__(self) -> bytes:
|
||||
return (
|
||||
self.presentation_delay.to_bytes(3, 'little')
|
||||
+ bytes([len(self.subgroups)])
|
||||
+ b''.join(map(bytes, self.subgroups))
|
||||
)
|
||||
|
||||
def __bytes__(self) -> bytes:
|
||||
return self.to_bytes()
|
||||
|
||||
def get_advertising_data(self) -> bytes:
|
||||
return core.AdvertisingData(
|
||||
[
|
||||
(
|
||||
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
|
||||
return bytes(
|
||||
core.AdvertisingData(
|
||||
[
|
||||
(
|
||||
gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE.to_bytes()
|
||||
+ self.to_bytes()
|
||||
),
|
||||
)
|
||||
]
|
||||
).to_bytes()
|
||||
core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
|
||||
(
|
||||
bytes(gatt.GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE)
|
||||
+ bytes(self)
|
||||
),
|
||||
)
|
||||
]
|
||||
)
|
||||
)
|
||||
|
||||
@@ -344,9 +344,6 @@ class DataElement:
|
||||
] # Keep a copy so we can re-serialize to an exact replica
|
||||
return result
|
||||
|
||||
def to_bytes(self):
|
||||
return bytes(self)
|
||||
|
||||
def __bytes__(self):
|
||||
# Return early if we have a cache
|
||||
if self.bytes:
|
||||
@@ -623,11 +620,8 @@ class SDP_PDU:
|
||||
def init_from_bytes(self, pdu, offset):
|
||||
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
|
||||
|
||||
def to_bytes(self):
|
||||
return self.pdu
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
return self.pdu
|
||||
|
||||
def __str__(self):
|
||||
result = f'{color(self.name, "blue")} [TID={self.transaction_id}]'
|
||||
|
||||
@@ -298,11 +298,8 @@ class SMP_Command:
|
||||
def init_from_bytes(self, pdu: bytes, offset: int) -> None:
|
||||
return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
|
||||
|
||||
def to_bytes(self):
|
||||
return self.pdu
|
||||
|
||||
def __bytes__(self):
|
||||
return self.to_bytes()
|
||||
return self.pdu
|
||||
|
||||
def __str__(self):
|
||||
result = color(self.name, 'yellow')
|
||||
@@ -1949,7 +1946,7 @@ class Manager(EventEmitter):
|
||||
f'{connection.peer_address}: {command}'
|
||||
)
|
||||
cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
|
||||
connection.send_l2cap_pdu(cid, command.to_bytes())
|
||||
connection.send_l2cap_pdu(cid, bytes(command))
|
||||
|
||||
def on_smp_security_request_command(
|
||||
self, connection: Connection, request: SMP_Security_Request_Command
|
||||
|
||||
Reference in New Issue
Block a user