Merge pull request #344 from zxzxwu/cis

CIS and SCO responder support
This commit is contained in:
zxzxwu
2023-11-30 21:00:55 +08:00
committed by GitHub
7 changed files with 588 additions and 32 deletions

View File

@@ -21,7 +21,7 @@ import functools
import json import json
import asyncio import asyncio
import logging import logging
from contextlib import asynccontextmanager, AsyncExitStack from contextlib import asynccontextmanager, AsyncExitStack, closing
from dataclasses import dataclass from dataclasses import dataclass
from collections.abc import Iterable from collections.abc import Iterable
from typing import ( from typing import (
@@ -49,6 +49,7 @@ from .hci import (
HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE,
HCI_CENTRAL_ROLE, HCI_CENTRAL_ROLE,
HCI_COMMAND_STATUS_PENDING, HCI_COMMAND_STATUS_PENDING,
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE,
HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR,
HCI_DISPLAY_YES_NO_IO_CAPABILITY, HCI_DISPLAY_YES_NO_IO_CAPABILITY,
HCI_DISPLAY_ONLY_IO_CAPABILITY, HCI_DISPLAY_ONLY_IO_CAPABILITY,
@@ -85,29 +86,35 @@ from .hci import (
HCI_Constant, HCI_Constant,
HCI_Create_Connection_Cancel_Command, HCI_Create_Connection_Cancel_Command,
HCI_Create_Connection_Command, HCI_Create_Connection_Command,
HCI_Create_Connection_Command,
HCI_Disconnect_Command, HCI_Disconnect_Command,
HCI_Encryption_Change_Event, HCI_Encryption_Change_Event,
HCI_Error, HCI_Error,
HCI_IO_Capability_Request_Reply_Command, HCI_IO_Capability_Request_Reply_Command,
HCI_Inquiry_Cancel_Command, HCI_Inquiry_Cancel_Command,
HCI_Inquiry_Command, HCI_Inquiry_Command,
HCI_IsoDataPacket,
HCI_LE_Accept_CIS_Request_Command,
HCI_LE_Add_Device_To_Resolving_List_Command, HCI_LE_Add_Device_To_Resolving_List_Command,
HCI_LE_Advertising_Report_Event, HCI_LE_Advertising_Report_Event,
HCI_LE_Clear_Resolving_List_Command, HCI_LE_Clear_Resolving_List_Command,
HCI_LE_Connection_Update_Command, HCI_LE_Connection_Update_Command,
HCI_LE_Create_Connection_Cancel_Command, HCI_LE_Create_Connection_Cancel_Command,
HCI_LE_Create_Connection_Command, HCI_LE_Create_Connection_Command,
HCI_LE_Create_CIS_Command,
HCI_LE_Enable_Encryption_Command, HCI_LE_Enable_Encryption_Command,
HCI_LE_Extended_Advertising_Report_Event, HCI_LE_Extended_Advertising_Report_Event,
HCI_LE_Extended_Create_Connection_Command, HCI_LE_Extended_Create_Connection_Command,
HCI_LE_Rand_Command, HCI_LE_Rand_Command,
HCI_LE_Read_PHY_Command, HCI_LE_Read_PHY_Command,
HCI_LE_Reject_CIS_Request_Command,
HCI_LE_Remove_Advertising_Set_Command, HCI_LE_Remove_Advertising_Set_Command,
HCI_LE_Set_Address_Resolution_Enable_Command, HCI_LE_Set_Address_Resolution_Enable_Command,
HCI_LE_Set_Advertising_Data_Command, HCI_LE_Set_Advertising_Data_Command,
HCI_LE_Set_Advertising_Enable_Command, HCI_LE_Set_Advertising_Enable_Command,
HCI_LE_Set_Advertising_Parameters_Command, HCI_LE_Set_Advertising_Parameters_Command,
HCI_LE_Set_Advertising_Set_Random_Address_Command, HCI_LE_Set_Advertising_Set_Random_Address_Command,
HCI_LE_Set_CIG_Parameters_Command,
HCI_LE_Set_Data_Length_Command, HCI_LE_Set_Data_Length_Command,
HCI_LE_Set_Default_PHY_Command, HCI_LE_Set_Default_PHY_Command,
HCI_LE_Set_Extended_Scan_Enable_Command, HCI_LE_Set_Extended_Scan_Enable_Command,
@@ -116,6 +123,7 @@ from .hci import (
HCI_LE_Set_Extended_Advertising_Data_Command, HCI_LE_Set_Extended_Advertising_Data_Command,
HCI_LE_Set_Extended_Advertising_Enable_Command, HCI_LE_Set_Extended_Advertising_Enable_Command,
HCI_LE_Set_Extended_Advertising_Parameters_Command, HCI_LE_Set_Extended_Advertising_Parameters_Command,
HCI_LE_Set_Host_Feature_Command,
HCI_LE_Set_PHY_Command, HCI_LE_Set_PHY_Command,
HCI_LE_Set_Random_Address_Command, HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command, HCI_LE_Set_Scan_Enable_Command,
@@ -130,6 +138,7 @@ from .hci import (
HCI_Switch_Role_Command, HCI_Switch_Role_Command,
HCI_Set_Connection_Encryption_Command, HCI_Set_Connection_Encryption_Command,
HCI_StatusError, HCI_StatusError,
HCI_SynchronousDataPacket,
HCI_User_Confirmation_Request_Negative_Reply_Command, HCI_User_Confirmation_Request_Negative_Reply_Command,
HCI_User_Confirmation_Request_Reply_Command, HCI_User_Confirmation_Request_Reply_Command,
HCI_User_Passkey_Request_Negative_Reply_Command, HCI_User_Passkey_Request_Negative_Reply_Command,
@@ -161,6 +170,7 @@ from .core import (
from .utils import ( from .utils import (
AsyncRunner, AsyncRunner,
CompositeEventEmitter, CompositeEventEmitter,
EventWatcher,
setup_event_forwarding, setup_event_forwarding,
composite_listener, composite_listener,
deprecated, deprecated,
@@ -592,6 +602,46 @@ class ConnectionParametersPreferences:
ConnectionParametersPreferences.default = ConnectionParametersPreferences() ConnectionParametersPreferences.default = ConnectionParametersPreferences()
# -----------------------------------------------------------------------------
@dataclass
class ScoLink(CompositeEventEmitter):
device: Device
acl_connection: Connection
handle: int
link_type: int
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# -----------------------------------------------------------------------------
@dataclass
class CisLink(CompositeEventEmitter):
class State(IntEnum):
PENDING = 0
ESTABLISHED = 1
device: Device
acl_connection: Connection # Based ACL connection
handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events)
cis_id: int # CIS ID assigned by Central device
cig_id: int # CIG ID assigned by Central device
state: State = State.PENDING
def __post_init__(self):
super().__init__()
async def disconnect(
self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR
) -> None:
await self.device.disconnect(self, reason)
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
class Connection(CompositeEventEmitter): class Connection(CompositeEventEmitter):
device: Device device: Device
@@ -870,6 +920,7 @@ class DeviceConfiguration:
self.keystore = None self.keystore = None
self.gatt_services: List[Dict[str, Any]] = [] self.gatt_services: List[Dict[str, Any]] = []
self.address_resolution_offload = False self.address_resolution_offload = False
self.cis_enabled = False
def load_from_dict(self, config: Dict[str, Any]) -> None: def load_from_dict(self, config: Dict[str, Any]) -> None:
# Load simple properties # Load simple properties
@@ -905,6 +956,7 @@ class DeviceConfiguration:
self.address_resolution_offload = config.get( self.address_resolution_offload = config.get(
'address_resolution_offload', self.address_resolution_offload 'address_resolution_offload', self.address_resolution_offload
) )
self.cis_enabled = config.get('cis_enabled', self.cis_enabled)
# Load or synthesize an IRK # Load or synthesize an IRK
irk = config.get('irk') irk = config.get('irk')
@@ -1012,6 +1064,9 @@ class Device(CompositeEventEmitter):
advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator]
config: DeviceConfiguration config: DeviceConfiguration
extended_advertising_handles: Set[int] extended_advertising_handles: Set[int]
sco_links: Dict[int, ScoLink]
cis_links: Dict[int, CisLink]
_pending_cis: Dict[int, Tuple[int, int]]
@composite_listener @composite_listener
class Listener: class Listener:
@@ -1104,6 +1159,9 @@ class Device(CompositeEventEmitter):
self.disconnecting = False self.disconnecting = False
self.connections = {} # Connections, by connection handle self.connections = {} # Connections, by connection handle
self.pending_connections = {} # Connections, by BD address (BR/EDR only) self.pending_connections = {} # Connections, by BD address (BR/EDR only)
self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only)
self.cis_links = {} # CisLinks, by connection handle (LE only)
self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle
self.classic_enabled = False self.classic_enabled = False
self.inquiry_response = None self.inquiry_response = None
self.address_resolver = None self.address_resolver = None
@@ -1133,6 +1191,7 @@ class Device(CompositeEventEmitter):
self.le_enabled = config.le_enabled self.le_enabled = config.le_enabled
self.classic_enabled = config.classic_enabled self.classic_enabled = config.classic_enabled
self.le_simultaneous_enabled = config.le_simultaneous_enabled self.le_simultaneous_enabled = config.le_simultaneous_enabled
self.cis_enabled = config.cis_enabled
self.classic_sc_enabled = config.classic_sc_enabled self.classic_sc_enabled = config.classic_sc_enabled
self.classic_ssp_enabled = config.classic_ssp_enabled self.classic_ssp_enabled = config.classic_ssp_enabled
self.classic_smp_enabled = config.classic_smp_enabled self.classic_smp_enabled = config.classic_smp_enabled
@@ -1443,6 +1502,16 @@ class Device(CompositeEventEmitter):
) # type: ignore[call-arg] ) # type: ignore[call-arg]
) )
if self.cis_enabled:
await self.send_command(
HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg]
bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
bit_value=1,
)
)
if self.classic_enabled: if self.classic_enabled:
await self.send_command( await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg] HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
@@ -2366,7 +2435,9 @@ class Device(CompositeEventEmitter):
check_result=True, check_result=True,
) )
async def disconnect(self, connection, reason): async def disconnect(
self, connection: Union[Connection, ScoLink, CisLink], reason: int
) -> None:
# Create a future so that we can wait for the disconnection's result # Create a future so that we can wait for the disconnection's result
pending_disconnection = asyncio.get_running_loop().create_future() pending_disconnection = asyncio.get_running_loop().create_future()
connection.on('disconnection', pending_disconnection.set_result) connection.on('disconnection', pending_disconnection.set_result)
@@ -2374,7 +2445,7 @@ class Device(CompositeEventEmitter):
# Request a disconnection # Request a disconnection
result = await self.send_command( result = await self.send_command(
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg]
) )
try: try:
@@ -2837,6 +2908,154 @@ class Device(CompositeEventEmitter):
self.remove_listener('remote_name', handler) self.remove_listener('remote_name', handler)
self.remove_listener('remote_name_failure', failure_handler) self.remove_listener('remote_name_failure', failure_handler)
# [LE only]
@experimental('Only for testing.')
async def setup_cig(
self,
cig_id: int,
cis_id: List[int],
sdu_interval: Tuple[int, int],
framing: int,
max_sdu: Tuple[int, int],
retransmission_number: int,
max_transport_latency: Tuple[int, int],
) -> List[int]:
"""Sends HCI_LE_Set_CIG_Parameters_Command.
Args:
cig_id: CIG_ID.
cis_id: CID ID list.
sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental).
framing: Un-framing(0) or Framing(1).
max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental).
retransmission_number: retransmission_number.
max_transport_latency: Max transport latencies of
(Central->Peripheral, Peripheral->Cental).
Returns:
List of created CIS handles corresponding to the same order of [cid_id].
"""
num_cis = len(cis_id)
response = await self.send_command(
HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg]
cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1],
worst_case_sca=0x00, # 251-500 ppm
packing=0x00, # Sequential
framing=framing,
max_transport_latency_c_to_p=max_transport_latency[0],
max_transport_latency_p_to_c=max_transport_latency[1],
cis_id=cis_id,
max_sdu_c_to_p=[max_sdu[0]] * num_cis,
max_sdu_p_to_c=[max_sdu[1]] * num_cis,
phy_c_to_p=[HCI_LE_2M_PHY] * num_cis,
phy_p_to_c=[HCI_LE_2M_PHY] * num_cis,
rtn_c_to_p=[retransmission_number] * num_cis,
rtn_p_to_c=[retransmission_number] * num_cis,
),
check_result=True,
)
# Ideally, we should manage CIG lifecycle, but they are not useful for Unicast
# Server, so here it only provides a basic functionality for testing.
cis_handles = response.return_parameters.connection_handle[:]
for id, cis_handle in zip(cis_id, cis_handles):
self._pending_cis[cis_handle] = (id, cig_id)
return cis_handles
# [LE only]
@experimental('Only for testing.')
async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]:
for cis_handle, acl_handle in cis_acl_pairs:
acl_connection = self.lookup_connection(acl_handle)
assert acl_connection
cis_id, cig_id = self._pending_cis.pop(cis_handle)
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cis_id=cis_id,
cig_id=cig_id,
)
result = await self.send_command(
HCI_LE_Create_CIS_Command( # type: ignore[call-arg]
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Create_CIS_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishments: Dict[int, asyncio.Future[CisLink]] = {}
for cis_handle, _ in cis_acl_pairs:
pending_cis_establishments[
cis_handle
] = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if pending_future := pending_cis_establishments.get(
cis_link.handle, None
):
pending_future.set_result(cis_link)
return await asyncio.gather(*pending_cis_establishments.values())
# [LE only]
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Accept_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
pending_cis_establishment = asyncio.get_running_loop().create_future()
with closing(EventWatcher()) as watcher:
@watcher.on(self, 'cis_establishment')
def on_cis_establishment(cis_link: CisLink) -> None:
if cis_link.handle == handle:
pending_cis_establishment.set_result(cis_link)
return await pending_cis_establishment
# [LE only]
@experimental('Only for testing.')
async def reject_cis_request(
self,
handle: int,
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
result = await self.send_command(
HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle, reason=reason
),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
'HCI_LE_Reject_CIS_Request_Command failed: '
f'{HCI_Constant.error_name(result.status)}'
)
raise HCI_StatusError(result)
@host_event_handler @host_event_handler
def on_flush(self): def on_flush(self):
self.emit('flush') self.emit('flush')
@@ -3041,30 +3260,35 @@ class Device(CompositeEventEmitter):
) )
@host_event_handler @host_event_handler
@with_connection_from_handle def on_disconnection(self, connection_handle: int, reason: int) -> None:
def on_disconnection(self, connection, reason): if connection := self.connections.pop(connection_handle, None):
logger.debug( logger.debug(
f'*** Disconnection: [0x{connection.handle:04X}] ' f'*** Disconnection: [0x{connection.handle:04X}] '
f'{connection.peer_address} as {connection.role_name}, reason={reason}' f'{connection.peer_address} as {connection.role_name}, reason={reason}'
) )
connection.emit('disconnection', reason) connection.emit('disconnection', reason)
# Remove the connection from the map # Cleanup subsystems that maintain per-connection state
del self.connections[connection.handle] self.gatt_server.on_disconnection(connection)
# Cleanup subsystems that maintain per-connection state # Restart advertising if auto-restart is enabled
self.gatt_server.on_disconnection(connection) if self.auto_restart_advertising:
logger.debug('restarting advertising')
# Restart advertising if auto-restart is enabled self.abort_on(
if self.auto_restart_advertising: 'flush',
logger.debug('restarting advertising') self.start_advertising(
self.abort_on( advertising_type=self.advertising_type, # type: ignore[arg-type]
'flush', own_address_type=self.advertising_own_address_type, # type: ignore[arg-type]
self.start_advertising( auto_restart=True,
advertising_type=self.advertising_type, ),
own_address_type=self.advertising_own_address_type, )
auto_restart=True, elif sco_link := self.sco_links.pop(connection_handle, None):
), sco_link.emit('disconnection', reason)
elif cis_link := self.cis_links.pop(connection_handle, None):
cis_link.emit('disconnection', reason)
else:
logger.error(
f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***'
) )
@host_event_handler @host_event_handler
@@ -3343,6 +3567,107 @@ class Device(CompositeEventEmitter):
connection.emit('remote_name_failure', error) connection.emit('remote_name_failure', error)
self.emit('remote_name_failure', address, error) self.emit('remote_name_failure', address, error)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection(
self, acl_connection: Connection, sco_handle: int, link_type: int
) -> None:
logger.debug(
f'*** SCO connected: {acl_connection.peer_address}, '
f'sco_handle=[0x{sco_handle:04X}], '
f'link_type=[0x{link_type:02X}] ***'
)
sco_link = self.sco_links[sco_handle] = ScoLink(
device=self,
acl_connection=acl_connection,
handle=sco_handle,
link_type=link_type,
)
self.emit('sco_connection', sco_link)
# [Classic only]
@host_event_handler
@with_connection_from_address
@experimental('Only for testing.')
def on_sco_connection_failure(
self, acl_connection: Connection, status: int
) -> None:
logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***')
self.emit('sco_connection_failure')
# [Classic only]
@host_event_handler
@experimental('Only for testing')
def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None:
if sco_link := self.sco_links.get(sco_handle, None):
sco_link.emit('pdu', packet)
# [LE only]
@host_event_handler
@with_connection_from_handle
@experimental('Only for testing')
def on_cis_request(
self,
acl_connection: Connection,
cis_handle: int,
cig_id: int,
cis_id: int,
) -> None:
logger.debug(
f'*** CIS Request '
f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cig_id:02X}], '
f'cis_id=[0x{cis_id:02X}] ***'
)
# LE_CIS_Established event doesn't provide info, so we must store them here.
self.cis_links[cis_handle] = CisLink(
device=self,
acl_connection=acl_connection,
handle=cis_handle,
cig_id=cig_id,
cis_id=cis_id,
)
self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment(self, cis_handle: int) -> None:
cis_link = self.cis_links[cis_handle]
cis_link.state = CisLink.State.ESTABLISHED
assert cis_link.acl_connection
logger.debug(
f'*** CIS Establishment '
f'{cis_link.acl_connection.peer_address}, '
f'cis_handle=[0x{cis_handle:04X}], '
f'cig_id=[0x{cis_link.cig_id:02X}], '
f'cis_id=[0x{cis_link.cis_id:02X}] ***'
)
cis_link.emit('establishment')
self.emit('cis_establishment', cis_link)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None:
logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***')
if cis_link := self.cis_links.pop(cis_handle, None):
cis_link.emit('establishment_failure')
self.emit('cis_establishment_failure', cis_handle, status)
# [LE only]
@host_event_handler
@experimental('Only for testing')
def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None:
if cis_link := self.cis_links.get(handle, None):
cis_link.emit('pdu', packet)
@host_event_handler @host_event_handler
@with_connection_from_handle @with_connection_from_handle
def on_connection_encryption_change(self, connection, encryption): def on_connection_encryption_change(self, connection, encryption):

View File

@@ -4451,7 +4451,10 @@ class HCI_LE_Accept_CIS_Request_Command(HCI_Command):
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@HCI_Command.command( @HCI_Command.command(
fields=[('connection_handle', 2)], fields=[
('connection_handle', 2),
('reason', {'size': 1, 'mapper': HCI_Constant.error_name}),
],
) )
class HCI_LE_Reject_CIS_Request_Command(HCI_Command): class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
''' '''
@@ -4459,6 +4462,7 @@ class HCI_LE_Reject_CIS_Request_Command(HCI_Command):
''' '''
connection_handle: int connection_handle: int
reason: int
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------

View File

@@ -850,10 +850,10 @@ class EscoParameters:
# Common # Common
input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = ( input_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.PCM
) )
output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = ( output_coding_format: HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat = (
HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.TRANSPARENT HCI_Enhanced_Setup_Synchronous_Connection_Command.CodingFormat.PCM
) )
input_coded_data_size: int = 16 input_coded_data_size: int = 16
output_coded_data_size: int = 16 output_coded_data_size: int = 16
@@ -960,6 +960,8 @@ _ESCO_PARAMETERS_MSBC_T1 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
), ),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
) )
@@ -974,10 +976,12 @@ _ESCO_PARAMETERS_MSBC_T2 = EscoParameters(
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_2_EV5
| HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5 | HCI_Enhanced_Setup_Synchronous_Connection_Command.PacketType.NO_3_EV5
), ),
input_bandwidth=32000,
output_bandwidth=32000,
retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY, retransmission_effort=HCI_Enhanced_Setup_Synchronous_Connection_Command.RetransmissionEffort.OPTIMIZE_FOR_QUALITY,
) )
ESCO_PERAMETERS = { ESCO_PARAMETERS = {
DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0, DefaultCodecParameters.SCO_CVSD_D0: _ESCO_PARAMETERS_CVSD_D0,
DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1, DefaultCodecParameters.SCO_CVSD_D1: _ESCO_PARAMETERS_CVSD_D1,
DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1, DefaultCodecParameters.ESCO_CVSD_S1: _ESCO_PARAMETERS_CVSD_S1,

View File

@@ -32,8 +32,8 @@ from .hci import (
Address, Address,
HCI_ACL_DATA_PACKET, HCI_ACL_DATA_PACKET,
HCI_COMMAND_PACKET, HCI_COMMAND_PACKET,
HCI_COMMAND_COMPLETE_EVENT,
HCI_EVENT_PACKET, HCI_EVENT_PACKET,
HCI_ISO_DATA_PACKET,
HCI_LE_READ_BUFFER_SIZE_COMMAND, HCI_LE_READ_BUFFER_SIZE_COMMAND,
HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND, HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND,
HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND, HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND,
@@ -52,6 +52,7 @@ from .hci import (
HCI_Constant, HCI_Constant,
HCI_Error, HCI_Error,
HCI_Event, HCI_Event,
HCI_IsoDataPacket,
HCI_LE_Long_Term_Key_Request_Negative_Reply_Command, HCI_LE_Long_Term_Key_Request_Negative_Reply_Command,
HCI_LE_Long_Term_Key_Request_Reply_Command, HCI_LE_Long_Term_Key_Request_Reply_Command,
HCI_LE_Read_Buffer_Size_Command, HCI_LE_Read_Buffer_Size_Command,
@@ -75,7 +76,6 @@ from .core import (
BT_LE_TRANSPORT, BT_LE_TRANSPORT,
ConnectionPHY, ConnectionPHY,
ConnectionParameters, ConnectionParameters,
InvalidStateError,
) )
from .utils import AbortableEventEmitter from .utils import AbortableEventEmitter
from .transport.common import TransportLostError from .transport.common import TransportLostError
@@ -243,7 +243,7 @@ class Host(AbortableEventEmitter):
# understand # understand
le_event_mask = bytes.fromhex('1F00000000000000') le_event_mask = bytes.fromhex('1F00000000000000')
else: else:
le_event_mask = bytes.fromhex('FFFFF00000000000') le_event_mask = bytes.fromhex('FFFFFFFF00000000')
await self.send_command( await self.send_command(
HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask) HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask)
@@ -495,6 +495,8 @@ class Host(AbortableEventEmitter):
self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet)) self.on_hci_acl_data_packet(cast(HCI_AclDataPacket, packet))
elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET: elif packet.hci_packet_type == HCI_SYNCHRONOUS_DATA_PACKET:
self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet)) self.on_hci_sco_data_packet(cast(HCI_SynchronousDataPacket, packet))
elif packet.hci_packet_type == HCI_ISO_DATA_PACKET:
self.on_hci_iso_data_packet(cast(HCI_IsoDataPacket, packet))
else: else:
logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') logger.warning(f'!!! unknown packet type {packet.hci_packet_type}')
@@ -515,6 +517,10 @@ class Host(AbortableEventEmitter):
# Experimental # Experimental
self.emit('sco_packet', packet.connection_handle, packet) self.emit('sco_packet', packet.connection_handle, packet)
def on_hci_iso_data_packet(self, packet: HCI_IsoDataPacket) -> None:
# Experimental
self.emit('iso_packet', packet.connection_handle, packet)
def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None:
self.emit('l2cap_pdu', connection.handle, cid, pdu) self.emit('l2cap_pdu', connection.handle, cid, pdu)
@@ -715,6 +721,24 @@ class Host(AbortableEventEmitter):
def on_hci_le_extended_advertising_report_event(self, event): def on_hci_le_extended_advertising_report_event(self, event):
self.on_hci_le_advertising_report_event(event) self.on_hci_le_advertising_report_event(event)
def on_hci_le_cis_request_event(self, event):
self.emit(
'cis_request',
event.acl_connection_handle,
event.cis_connection_handle,
event.cig_id,
event.cis_id,
)
def on_hci_le_cis_established_event(self, event):
# The remaining parameters are unused for now.
if event.status == HCI_SUCCESS:
self.emit('cis_establishment', event.connection_handle)
else:
self.emit(
'cis_establishment_failure', event.connection_handle, event.status
)
def on_hci_le_remote_connection_parameter_request_event(self, event): def on_hci_le_remote_connection_parameter_request_event(self, event):
if event.connection_handle not in self.connections: if event.connection_handle not in self.connections:
logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle') logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle')

5
examples/leaudio.json Normal file
View File

@@ -0,0 +1,5 @@
{
"name": "Bumble-LEA",
"keystore": "JsonKeyStore",
"advertising_interval": 100
}

107
examples/run_cig_setup.py Normal file
View File

@@ -0,0 +1,107 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.device import (
Device,
Connection,
)
from bumble.hci import (
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_cig_setup.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_cig_setup.py device1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].cis_enabled = True
devices[1].cis_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
await devices[0].start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.PUBLIC,
)
connection = await devices[1].connect(
devices[0].public_address, own_address_type=OwnAddressType.PUBLIC
)
cid_ids = [2, 3]
cis_handles = await devices[1].setup_cig(
cig_id=1,
cis_id=cid_ids,
sdu_interval=(10000, 0),
framing=0,
max_sdu=(120, 0),
retransmission_number=13,
max_transport_latency=(100, 0),
)
def on_cis_request(
connection: Connection, cis_handle: int, _cig_id: int, _cis_id: int
):
connection.abort_on('disconnection', devices[0].accept_cis_request(cis_handle))
devices[0].on('cis_request', on_cis_request)
cis_links = await devices[1].create_cis(
[(cis, connection.handle) for cis in cis_handles]
)
for cis_link in cis_links:
await cis_link.disconnect()
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())

View File

@@ -0,0 +1,87 @@
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import dataclasses
import logging
import sys
import os
from bumble.core import BT_BR_EDR_TRANSPORT
from bumble.device import Device, ScoLink
from bumble.hci import HCI_Enhanced_Setup_Synchronous_Connection_Command
from bumble.hfp import DefaultCodecParameters, ESCO_PARAMETERS
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print(
'Usage: run_esco_connection.py <config-file>'
'<transport-spec-for-device-1> <transport-spec-for-device-2>'
)
print(
'example: run_esco_connection.py classic1.json'
'tcp-client:127.0.0.1:6402 tcp-client:127.0.0.1:6402'
)
return
print('<<< connecting to HCI...')
hci_transports = await asyncio.gather(
open_transport_or_link(sys.argv[2]), open_transport_or_link(sys.argv[3])
)
print('<<< connected')
devices = [
Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
for hci_transport in hci_transports
]
devices[0].classic_enabled = True
devices[1].classic_enabled = True
await asyncio.gather(*[device.power_on() for device in devices])
connections = await asyncio.gather(
devices[0].accept(devices[1].public_address),
devices[1].connect(devices[0].public_address, transport=BT_BR_EDR_TRANSPORT),
)
def on_sco(sco_link: ScoLink):
connections[0].abort_on('disconnection', sco_link.disconnect())
devices[0].once('sco_connection', on_sco)
await devices[0].send_command(
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**dataclasses.asdict(ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3])
# type: ignore[call-args]
)
)
await asyncio.gather(
*[hci_transport.source.terminated for hci_transport in hci_transports]
)
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())