mirror of
https://github.com/google/bumble.git
synced 2026-04-16 00:25:31 +00:00
Add HCI Packets annotations and send_sco_sdu
This commit is contained in:
@@ -28,7 +28,6 @@ from collections.abc import Callable, Iterable, Sequence
|
|||||||
from dataclasses import field
|
from dataclasses import field
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
ClassVar,
|
|
||||||
Literal,
|
Literal,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
cast,
|
cast,
|
||||||
@@ -2300,10 +2299,10 @@ class HCI_Packet:
|
|||||||
Abstract Base class for HCI packets
|
Abstract Base class for HCI packets
|
||||||
'''
|
'''
|
||||||
|
|
||||||
hci_packet_type: ClassVar[int]
|
hci_packet_type: int
|
||||||
|
|
||||||
@staticmethod
|
@classmethod
|
||||||
def from_bytes(packet: bytes) -> HCI_Packet:
|
def from_bytes(cls, packet: bytes) -> HCI_Packet:
|
||||||
packet_type = packet[0]
|
packet_type = packet[0]
|
||||||
|
|
||||||
if packet_type == HCI_COMMAND_PACKET:
|
if packet_type == HCI_COMMAND_PACKET:
|
||||||
@@ -2323,7 +2322,7 @@ class HCI_Packet:
|
|||||||
|
|
||||||
return HCI_CustomPacket(packet)
|
return HCI_CustomPacket(packet)
|
||||||
|
|
||||||
def __init__(self, name):
|
def __init__(self, name: str) -> None:
|
||||||
self.name = name
|
self.name = name
|
||||||
|
|
||||||
def __bytes__(self) -> bytes:
|
def __bytes__(self) -> bytes:
|
||||||
@@ -2335,7 +2334,7 @@ class HCI_Packet:
|
|||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
class HCI_CustomPacket(HCI_Packet):
|
class HCI_CustomPacket(HCI_Packet):
|
||||||
def __init__(self, payload):
|
def __init__(self, payload: bytes) -> None:
|
||||||
super().__init__('HCI_CUSTOM_PACKET')
|
super().__init__('HCI_CUSTOM_PACKET')
|
||||||
self.hci_packet_type = payload[0]
|
self.hci_packet_type = payload[0]
|
||||||
self.payload = payload
|
self.payload = payload
|
||||||
@@ -7452,6 +7451,7 @@ class HCI_Vendor_Event(HCI_Event):
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
@dataclasses.dataclass
|
||||||
class HCI_AclDataPacket(HCI_Packet):
|
class HCI_AclDataPacket(HCI_Packet):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
|
See Bluetooth spec @ 5.4.2 HCI ACL Data Packets
|
||||||
@@ -7459,8 +7459,14 @@ class HCI_AclDataPacket(HCI_Packet):
|
|||||||
|
|
||||||
hci_packet_type = HCI_ACL_DATA_PACKET
|
hci_packet_type = HCI_ACL_DATA_PACKET
|
||||||
|
|
||||||
@staticmethod
|
connection_handle: int
|
||||||
def from_bytes(packet: bytes) -> HCI_AclDataPacket:
|
pb_flag: int
|
||||||
|
bc_flag: int
|
||||||
|
data_total_length: int
|
||||||
|
data: bytes
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, packet: bytes) -> HCI_AclDataPacket:
|
||||||
# Read the header
|
# Read the header
|
||||||
h, data_total_length = struct.unpack_from('<HH', packet, 1)
|
h, data_total_length = struct.unpack_from('<HH', packet, 1)
|
||||||
connection_handle = h & 0xFFF
|
connection_handle = h & 0xFFF
|
||||||
@@ -7469,25 +7475,22 @@ class HCI_AclDataPacket(HCI_Packet):
|
|||||||
data = packet[5:]
|
data = packet[5:]
|
||||||
if len(data) != data_total_length:
|
if len(data) != data_total_length:
|
||||||
raise InvalidPacketError('invalid packet length')
|
raise InvalidPacketError('invalid packet length')
|
||||||
return HCI_AclDataPacket(
|
return cls(
|
||||||
connection_handle, pb_flag, bc_flag, data_total_length, data
|
connection_handle=connection_handle,
|
||||||
|
pb_flag=pb_flag,
|
||||||
|
bc_flag=bc_flag,
|
||||||
|
data_total_length=data_total_length,
|
||||||
|
data=data,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __bytes__(self):
|
def __bytes__(self) -> bytes:
|
||||||
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
|
h = (self.pb_flag << 12) | (self.bc_flag << 14) | self.connection_handle
|
||||||
return (
|
return (
|
||||||
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
|
struct.pack('<BHH', HCI_ACL_DATA_PACKET, h, self.data_total_length)
|
||||||
+ self.data
|
+ self.data
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(self, connection_handle, pb_flag, bc_flag, data_total_length, data):
|
def __str__(self) -> str:
|
||||||
self.connection_handle = connection_handle
|
|
||||||
self.pb_flag = pb_flag
|
|
||||||
self.bc_flag = bc_flag
|
|
||||||
self.data_total_length = data_total_length
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
def __str__(self):
|
|
||||||
return (
|
return (
|
||||||
f'{color("ACL", "blue")}: '
|
f'{color("ACL", "blue")}: '
|
||||||
f'handle=0x{self.connection_handle:04x}, '
|
f'handle=0x{self.connection_handle:04x}, '
|
||||||
@@ -7498,6 +7501,7 @@ class HCI_AclDataPacket(HCI_Packet):
|
|||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
|
@dataclasses.dataclass
|
||||||
class HCI_SynchronousDataPacket(HCI_Packet):
|
class HCI_SynchronousDataPacket(HCI_Packet):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
|
See Bluetooth spec @ 5.4.3 HCI SCO Data Packets
|
||||||
@@ -7505,8 +7509,13 @@ class HCI_SynchronousDataPacket(HCI_Packet):
|
|||||||
|
|
||||||
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
|
hci_packet_type = HCI_SYNCHRONOUS_DATA_PACKET
|
||||||
|
|
||||||
@staticmethod
|
connection_handle: int
|
||||||
def from_bytes(packet: bytes) -> HCI_SynchronousDataPacket:
|
packet_status: int
|
||||||
|
data_total_length: int
|
||||||
|
data: bytes
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_bytes(cls, packet: bytes) -> HCI_SynchronousDataPacket:
|
||||||
# Read the header
|
# Read the header
|
||||||
h, data_total_length = struct.unpack_from('<HB', packet, 1)
|
h, data_total_length = struct.unpack_from('<HB', packet, 1)
|
||||||
connection_handle = h & 0xFFF
|
connection_handle = h & 0xFFF
|
||||||
@@ -7516,8 +7525,11 @@ class HCI_SynchronousDataPacket(HCI_Packet):
|
|||||||
raise InvalidPacketError(
|
raise InvalidPacketError(
|
||||||
f'invalid packet length {len(data)} != {data_total_length}'
|
f'invalid packet length {len(data)} != {data_total_length}'
|
||||||
)
|
)
|
||||||
return HCI_SynchronousDataPacket(
|
return cls(
|
||||||
connection_handle, packet_status, data_total_length, data
|
connection_handle=connection_handle,
|
||||||
|
packet_status=packet_status,
|
||||||
|
data_total_length=data_total_length,
|
||||||
|
data=data,
|
||||||
)
|
)
|
||||||
|
|
||||||
def __bytes__(self) -> bytes:
|
def __bytes__(self) -> bytes:
|
||||||
@@ -7527,18 +7539,6 @@ class HCI_SynchronousDataPacket(HCI_Packet):
|
|||||||
+ self.data
|
+ self.data
|
||||||
)
|
)
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
connection_handle: int,
|
|
||||||
packet_status: int,
|
|
||||||
data_total_length: int,
|
|
||||||
data: bytes,
|
|
||||||
) -> None:
|
|
||||||
self.connection_handle = connection_handle
|
|
||||||
self.packet_status = packet_status
|
|
||||||
self.data_total_length = data_total_length
|
|
||||||
self.data = data
|
|
||||||
|
|
||||||
def __str__(self) -> str:
|
def __str__(self) -> str:
|
||||||
return (
|
return (
|
||||||
f'{color("SCO", "blue")}: '
|
f'{color("SCO", "blue")}: '
|
||||||
@@ -7556,7 +7556,7 @@ class HCI_IsoDataPacket(HCI_Packet):
|
|||||||
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
|
See Bluetooth spec @ 5.4.5 HCI ISO Data Packets
|
||||||
'''
|
'''
|
||||||
|
|
||||||
hci_packet_type: ClassVar[int] = HCI_ISO_DATA_PACKET
|
hci_packet_type = HCI_ISO_DATA_PACKET
|
||||||
|
|
||||||
connection_handle: int
|
connection_handle: int
|
||||||
data_total_length: int
|
data_total_length: int
|
||||||
|
|||||||
@@ -732,6 +732,16 @@ class Host(utils.EventEmitter):
|
|||||||
)
|
)
|
||||||
packet_queue.enqueue(acl_packet, connection_handle)
|
packet_queue.enqueue(acl_packet, connection_handle)
|
||||||
|
|
||||||
|
def send_sco_sdu(self, connection_handle: int, sdu: bytes) -> None:
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_SynchronousDataPacket(
|
||||||
|
connection_handle=connection_handle,
|
||||||
|
packet_status=0,
|
||||||
|
data_total_length=len(sdu),
|
||||||
|
data=sdu,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
|
def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None:
|
||||||
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))
|
self.send_acl_sdu(connection_handle, bytes(L2CAP_PDU(cid, pdu)))
|
||||||
|
|
||||||
|
|||||||
@@ -100,13 +100,9 @@ def on_sco_packet(packet: hci.HCI_SynchronousDataPacket):
|
|||||||
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
|
if source_file and (pcm_data := source_file.read(packet.data_total_length)):
|
||||||
assert ag_protocol
|
assert ag_protocol
|
||||||
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
|
host = ag_protocol.dlc.multiplexer.l2cap_channel.connection.device.host
|
||||||
host.send_hci_packet(
|
host.send_sco_sdu(
|
||||||
hci.HCI_SynchronousDataPacket(
|
connection_handle=packet.connection_handle,
|
||||||
connection_handle=packet.connection_handle,
|
sdu=pcm_data,
|
||||||
packet_status=0,
|
|
||||||
data_total_length=len(pcm_data),
|
|
||||||
data=pcm_data,
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user