Merge branch 'main' into update

This commit is contained in:
khsiao-google
2025-11-01 17:33:51 +08:00
13 changed files with 143 additions and 116 deletions

View File

@@ -121,6 +121,72 @@ class Connection:
# -----------------------------------------------------------------------------
class Controller:
hci_sink: Optional[TransportSink] = None
central_connections: dict[
Address, Connection
] # Connections where this controller is the central
peripheral_connections: dict[
Address, Connection
] # Connections where this controller is the peripheral
classic_connections: dict[Address, Connection] # Connections in BR/EDR
central_cis_links: dict[int, CisLink] # CIS links by handle
peripheral_cis_links: dict[int, CisLink] # CIS links by handle
hci_version: int = HCI_VERSION_BLUETOOTH_CORE_5_0
hci_revision: int = 0
lmp_version: int = HCI_VERSION_BLUETOOTH_CORE_5_0
lmp_subversion: int = 0
lmp_features: bytes = bytes.fromhex(
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
manufacturer_name: int = 0xFFFF
acl_data_packet_length: int = 27
total_num_acl_data_packets: int = 64
le_acl_data_packet_length: int = 27
total_num_le_acl_data_packets: int = 64
iso_data_packet_length: int = 960
total_num_iso_data_packets: int = 64
event_mask: int = 0
event_mask_page_2: int = 0
supported_commands: bytes = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
le_event_mask: int = 0
advertising_parameters: Optional[hci.HCI_LE_Set_Advertising_Parameters_Command] = (
None
)
le_features: bytes = bytes.fromhex('ff49010000000000')
le_states: bytes = bytes.fromhex('ffff3fffff030000')
advertising_channel_tx_power: int = 0
filter_accept_list_size: int = 8
filter_duplicates: bool = False
resolving_list_size: int = 8
supported_max_tx_octets: int = 27
supported_max_tx_time: int = 10000
supported_max_rx_octets: int = 27
supported_max_rx_time: int = 10000
suggested_max_tx_octets: int = 27
suggested_max_tx_time: int = 0x0148
default_phy: dict[str, int]
le_scan_type: int = 0
le_scan_interval: int = 0x10
le_scan_window: int = 0x10
le_scan_enable: int = 0
le_scan_own_address_type: int = Address.RANDOM_DEVICE_ADDRESS
le_scanning_filter_policy: int = 0
le_scan_response_data: Optional[bytes] = None
le_address_resolution: bool = False
le_rpa_timeout: int = 0
sync_flow_control: bool = False
local_name: str = 'Bumble'
advertising_interval: int = 2000
advertising_data: Optional[bytes] = None
advertising_timer_handle: Optional[asyncio.Handle] = None
_random_address: 'Address' = Address('00:00:00:00:00:00')
def __init__(
self,
name: str,
@@ -130,77 +196,18 @@ class Controller:
public_address: Optional[Union[bytes, str, Address]] = None,
) -> None:
self.name = name
self.hci_sink: Optional[TransportSink] = None
self.link = link
self.central_connections: dict[Address, Connection] = (
{}
) # Connections where this controller is the central
self.peripheral_connections: dict[Address, Connection] = (
{}
) # Connections where this controller is the peripheral
self.classic_connections: dict[Address, Connection] = (
{}
) # Connections in BR/EDR
self.central_cis_links: dict[int, CisLink] = {} # CIS links by handle
self.peripheral_cis_links: dict[int, CisLink] = {} # CIS links by handle
self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.hci_revision = 0
self.lmp_version = HCI_VERSION_BLUETOOTH_CORE_5_0
self.lmp_subversion = 0
self.lmp_features = bytes.fromhex(
'0000000060000000'
) # BR/EDR Not Supported, LE Supported (Controller)
self.manufacturer_name = 0xFFFF
self.acl_data_packet_length = 27
self.total_num_acl_data_packets = 64
self.le_acl_data_packet_length = 27
self.total_num_le_acl_data_packets = 64
self.iso_data_packet_length = 960
self.total_num_iso_data_packets = 64
self.event_mask = b'\x00'
self.event_mask_page_2 = b'\x00'
self.supported_commands = bytes.fromhex(
'2000800000c000000000e4000000a822000000000000040000f7ffff7f000000'
'30f0f9ff01008004002000000000000000000000000000000000000000000000'
)
self.le_event_mask = b'\x00'
self.advertising_parameters = None
self.le_features = bytes.fromhex('ff49010000000000')
self.le_states = bytes.fromhex('ffff3fffff030000')
self.advertising_channel_tx_power = 0
self.filter_accept_list_size = 8
self.filter_duplicates = False
self.resolving_list_size = 8
self.supported_max_tx_octets = 27
self.supported_max_tx_time = 10000 # microseconds
self.supported_max_rx_octets = 27
self.supported_max_rx_time = 10000 # microseconds
self.suggested_max_tx_octets = 27
self.suggested_max_tx_time = 0x0148 # microseconds
self.central_connections = {}
self.peripheral_connections = {}
self.classic_connections = {}
self.central_cis_links = {}
self.peripheral_cis_links = {}
self.default_phy = {
'all_phys': 0,
'tx_phys': 0,
'rx_phys': 0,
}
self.le_scan_type = 0
self.le_scan_interval = 0x10
self.le_scan_window = 0x10
self.le_scan_enable = 0
self.le_scan_own_address_type = Address.RANDOM_DEVICE_ADDRESS
self.le_scanning_filter_policy = 0
self.le_scan_response_data = None
self.le_address_resolution = False
self.le_rpa_timeout = 0
self.sync_flow_control = False
self.local_name = 'Bumble'
self.advertising_interval = 2000 # Fixed for now
self.advertising_data: Optional[bytes] = None
self.advertising_timer_handle: Optional[asyncio.Handle] = None
self._random_address = Address('00:00:00:00:00:00')
if isinstance(public_address, Address):
self._public_address = public_address
elif public_address is not None:
@@ -489,8 +496,8 @@ class Controller:
logger.debug(
f'New CENTRAL connection handle: 0x{connection_handle:04X}'
)
else:
connection = None
else:
connection = None
# Say that the connection has completed
self.send_hci_packet(
@@ -1117,7 +1124,9 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.3.1 Set Event Mask Command
'''
self.event_mask = command.event_mask
self.event_mask = int.from_bytes(
command.event_mask, byteorder='little', signed=False
)
return bytes([HCI_SUCCESS])
def on_hci_reset_command(self, _command: hci.HCI_Reset_Command) -> Optional[bytes]:
@@ -1245,7 +1254,9 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.3.69 Set Event Mask Page 2 Command
'''
self.event_mask_page_2 = command.event_mask_page_2
self.event_mask_page_2 = int.from_bytes(
command.event_mask_page_2, byteorder='little', signed=False
)
return bytes([HCI_SUCCESS])
def on_hci_read_le_host_support_command(
@@ -1414,7 +1425,9 @@ class Controller:
'''
See Bluetooth spec Vol 4, Part E - 7.8.1 LE Set Event Mask Command
'''
self.le_event_mask = command.le_event_mask
self.le_event_mask = int.from_bytes(
command.le_event_mask, byteorder='little', signed=False
)
return bytes([HCI_SUCCESS])
def on_hci_le_read_buffer_size_command(
@@ -1940,7 +1953,8 @@ class Controller:
'''
# Remove old CIG implicitly.
for handle, cis_link in list(self.central_cis_links.items()):
cis_links = list(self.central_cis_links.items())
for handle, cis_link in cis_links:
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(handle)
@@ -2004,7 +2018,8 @@ class Controller:
status = HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR
for cis_handle, cis_link in list(self.central_cis_links.items()):
cis_links = list(self.central_cis_links.items())
for cis_handle, cis_link in cis_links:
if cis_link.cig_id == command.cig_id:
self.central_cis_links.pop(cis_handle)
status = HCI_SUCCESS