Compare commits

..

7 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod 843466c822 a few more constants from the spec 2023-12-03 17:16:25 -08:00
zxzxwu 3adcc8be09 Merge pull request #360 from zxzxwu/hci
Remove # type: ignore[call-arg] in HCI_Command builders
2023-12-03 19:18:04 +08:00
zxzxwu c853d56302 Merge pull request #361 from zxzxwu/hci-bug
Fix typo
2023-12-03 04:22:59 +08:00
Josh Wu dc97be5b35 Fix typo 2023-12-02 23:42:21 +08:00
zxzxwu 73dbdfff9f Merge pull request #356 from zxzxwu/bap
Add Published Audio Capabilities Service
2023-12-02 23:34:57 +08:00
Josh Wu dff14e1258 Add Published Audio Capabilities Service 2023-12-02 23:16:37 +08:00
Josh Wu 10a3833893 Remove # type: ignore[call-arg] in HCI_Command builders 2023-12-02 19:18:54 +08:00
13 changed files with 931 additions and 216 deletions
+54 -58
View File
@@ -1432,7 +1432,7 @@ class Device(CompositeEventEmitter):
await self.host.reset()
# Try to get the public address from the controller
response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg]
response = await self.send_command(HCI_Read_BD_ADDR_Command())
if response.return_parameters.status == HCI_SUCCESS:
logger.debug(
color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow')
@@ -1455,7 +1455,7 @@ class Device(CompositeEventEmitter):
HCI_Write_LE_Host_Support_Command(
le_supported_host=int(self.le_enabled),
simultaneous_le_host=int(self.le_simultaneous_enabled),
) # type: ignore[call-arg]
)
)
if self.le_enabled:
@@ -1465,7 +1465,7 @@ class Device(CompositeEventEmitter):
if self.host.supports_command(HCI_LE_RAND_COMMAND):
# Get 8 random bytes
response = await self.send_command(
HCI_LE_Rand_Command(), check_result=True # type: ignore[call-arg]
HCI_LE_Rand_Command(), check_result=True
)
# Ensure the address bytes can be a static random address
@@ -1486,7 +1486,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Random_Address_Command(
random_address=self.random_address
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1499,12 +1499,12 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Address_Resolution_Enable_Command(
address_resolution_enable=1
) # type: ignore[call-arg]
)
)
if self.cis_enabled:
await self.send_command(
HCI_LE_Set_Host_Feature_Command( # type: ignore[call-arg]
HCI_LE_Set_Host_Feature_Command(
bit_number=(
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE
),
@@ -1514,20 +1514,20 @@ class Device(CompositeEventEmitter):
if self.classic_enabled:
await self.send_command(
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg]
HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8'))
)
await self.send_command(
HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) # type: ignore[call-arg]
HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device)
)
await self.send_command(
HCI_Write_Simple_Pairing_Mode_Command(
simple_pairing_mode=int(self.classic_ssp_enabled)
) # type: ignore[call-arg]
)
)
await self.send_command(
HCI_Write_Secure_Connections_Host_Support_Command(
secure_connections_host_support=int(self.classic_sc_enabled)
) # type: ignore[call-arg]
)
)
await self.set_connectable(self.connectable)
await self.set_discoverable(self.discoverable)
@@ -1551,7 +1551,7 @@ class Device(CompositeEventEmitter):
self.address_resolver = smp.AddressResolver(resolving_keys)
if self.address_resolution_offload:
await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg]
await self.send_command(HCI_LE_Clear_Resolving_List_Command())
for irk, address in resolving_keys:
await self.send_command(
@@ -1560,7 +1560,7 @@ class Device(CompositeEventEmitter):
peer_identity_address=address,
peer_irk=irk,
local_irk=self.irk,
) # type: ignore[call-arg]
)
)
def supports_le_feature(self, feature):
@@ -1595,7 +1595,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Advertising_Data_Command(
advertising_data=self.advertising_data
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1604,7 +1604,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Scan_Response_Data_Command(
scan_response_data=self.scan_response_data
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1630,13 +1630,13 @@ class Device(CompositeEventEmitter):
peer_address=peer_address,
advertising_channel_map=7,
advertising_filter_policy=0,
), # type: ignore[call-arg]
),
check_result=True,
)
# Enable advertising
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), # type: ignore[call-arg]
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1),
check_result=True,
)
@@ -1649,7 +1649,7 @@ class Device(CompositeEventEmitter):
# Disable advertising
if self.advertising:
await self.send_command(
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), # type: ignore[call-arg]
HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0),
check_result=True,
)
@@ -1716,7 +1716,7 @@ class Device(CompositeEventEmitter):
secondary_advertising_phy=1, # LE 1M
advertising_sid=0,
scan_request_notification_enable=0,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1728,7 +1728,7 @@ class Device(CompositeEventEmitter):
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
advertising_data=advertising_data,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1740,7 +1740,7 @@ class Device(CompositeEventEmitter):
operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA,
fragment_preference=0x01, # Should not fragment
scan_response_data=scan_response,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1752,7 +1752,7 @@ class Device(CompositeEventEmitter):
HCI_LE_Set_Advertising_Set_Random_Address_Command(
advertising_handle=adv_handle,
random_address=self.random_address,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1763,13 +1763,13 @@ class Device(CompositeEventEmitter):
advertising_handles=[adv_handle],
durations=[0], # Forever
max_extended_advertising_events=[0], # Infinite
), # type: ignore[call-arg]
),
check_result=True,
)
except HCI_Error as error:
# When any step fails, cleanup the advertising handle.
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=False,
)
raise error
@@ -1791,12 +1791,12 @@ class Device(CompositeEventEmitter):
advertising_handles=[adv_handle],
durations=[0],
max_extended_advertising_events=[0],
), # type: ignore[call-arg]
),
check_result=True,
)
# Remove advertising set
await self.send_command(
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle), # type: ignore[call-arg]
HCI_LE_Remove_Advertising_Set_Command(advertising_handle=adv_handle),
check_result=True,
)
self.extended_advertising_handles.remove(adv_handle)
@@ -1864,7 +1864,7 @@ class Device(CompositeEventEmitter):
scan_types=[scan_type] * scanning_phy_count,
scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count,
scan_windows=[int(scan_window / 0.625)] * scanning_phy_count,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1875,7 +1875,7 @@ class Device(CompositeEventEmitter):
filter_duplicates=1 if filter_duplicates else 0,
duration=0, # TODO allow other values
period=0, # TODO allow other values
), # type: ignore[call-arg]
),
check_result=True,
)
else:
@@ -1893,7 +1893,7 @@ class Device(CompositeEventEmitter):
le_scan_window=int(scan_window / 0.625),
own_address_type=own_address_type,
scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1901,7 +1901,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Scan_Enable_Command(
le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -1914,12 +1914,12 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_LE_Set_Extended_Scan_Enable_Command(
enable=0, filter_duplicates=0, duration=0, period=0
), # type: ignore[call-arg]
),
check_result=True,
)
else:
await self.send_command(
HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), # type: ignore[call-arg]
HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0),
check_result=True,
)
@@ -1939,7 +1939,7 @@ class Device(CompositeEventEmitter):
async def start_discovery(self, auto_restart: bool = True) -> None:
await self.send_command(
HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), # type: ignore[call-arg]
HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE),
check_result=True,
)
@@ -1948,7 +1948,7 @@ class Device(CompositeEventEmitter):
lap=HCI_GENERAL_INQUIRY_LAP,
inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH,
num_responses=0, # Unlimited number of responses.
) # type: ignore[call-arg]
)
)
if response.status != HCI_Command_Status_Event.PENDING:
self.discovering = False
@@ -1959,7 +1959,7 @@ class Device(CompositeEventEmitter):
async def stop_discovery(self) -> None:
if self.discovering:
await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) # type: ignore[call-arg]
await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True)
self.auto_restart_inquiry = True
self.discovering = False
@@ -2007,7 +2007,7 @@ class Device(CompositeEventEmitter):
await self.send_command(
HCI_Write_Extended_Inquiry_Response_Command(
fec_required=0, extended_inquiry_response=self.inquiry_response
), # type: ignore[call-arg]
),
check_result=True,
)
await self.set_scan_enable(
@@ -2196,7 +2196,7 @@ class Device(CompositeEventEmitter):
supervision_timeouts=supervision_timeouts,
min_ce_lengths=min_ce_lengths,
max_ce_lengths=max_ce_lengths,
) # type: ignore[call-arg]
)
)
else:
if HCI_LE_1M_PHY not in connection_parameters_preferences:
@@ -2225,7 +2225,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=int(prefs.supervision_timeout / 10),
min_ce_length=int(prefs.min_ce_length / 0.625),
max_ce_length=int(prefs.max_ce_length / 0.625),
) # type: ignore[call-arg]
)
)
else:
# Save pending connection
@@ -2242,7 +2242,7 @@ class Device(CompositeEventEmitter):
clock_offset=0x0000,
allow_role_switch=0x01,
reserved=0,
) # type: ignore[call-arg]
)
)
if result.status != HCI_Command_Status_Event.PENDING:
@@ -2261,10 +2261,10 @@ class Device(CompositeEventEmitter):
)
except asyncio.TimeoutError:
if transport == BT_LE_TRANSPORT:
await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) # type: ignore[call-arg]
await self.send_command(HCI_LE_Create_Connection_Cancel_Command())
else:
await self.send_command(
HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) # type: ignore[call-arg]
HCI_Create_Connection_Cancel_Command(bd_addr=peer_address)
)
try:
@@ -2378,7 +2378,7 @@ class Device(CompositeEventEmitter):
try:
# Accept connection request
await self.send_command(
HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) # type: ignore[call-arg]
HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role)
)
# Wait for connection complete
@@ -2445,7 +2445,7 @@ class Device(CompositeEventEmitter):
# Request a disconnection
result = await self.send_command(
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) # type: ignore[call-arg]
HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason)
)
try:
@@ -2476,7 +2476,7 @@ class Device(CompositeEventEmitter):
connection_handle=connection.handle,
tx_octets=tx_octets,
tx_time=tx_time,
), # type: ignore[call-arg]
),
check_result=True,
)
@@ -2522,7 +2522,7 @@ class Device(CompositeEventEmitter):
supervision_timeout=supervision_timeout,
min_ce_length=min_ce_length,
max_ce_length=max_ce_length,
) # type: ignore[call-arg]
)
)
if result.status != HCI_Command_Status_Event.PENDING:
raise HCI_StatusError(result)
@@ -2850,7 +2850,7 @@ class Device(CompositeEventEmitter):
try:
result = await self.send_command(
HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) # type: ignore[call-arg]
HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
@@ -2892,7 +2892,7 @@ class Device(CompositeEventEmitter):
page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
reserved=0,
clock_offset=0, # TODO investigate non-0 values
) # type: ignore[call-arg]
)
)
if result.status != HCI_COMMAND_STATUS_PENDING:
@@ -2938,7 +2938,7 @@ class Device(CompositeEventEmitter):
num_cis = len(cis_id)
response = await self.send_command(
HCI_LE_Set_CIG_Parameters_Command( # type: ignore[call-arg]
HCI_LE_Set_CIG_Parameters_Command(
cig_id=cig_id,
sdu_interval_c_to_p=sdu_interval[0],
sdu_interval_p_to_c=sdu_interval[1],
@@ -2982,7 +2982,7 @@ class Device(CompositeEventEmitter):
)
result = await self.send_command(
HCI_LE_Create_CIS_Command( # type: ignore[call-arg]
HCI_LE_Create_CIS_Command(
cis_connection_handle=[p[0] for p in cis_acl_pairs],
acl_connection_handle=[p[1] for p in cis_acl_pairs],
),
@@ -3015,9 +3015,7 @@ class Device(CompositeEventEmitter):
@experimental('Only for testing.')
async def accept_cis_request(self, handle: int) -> CisLink:
result = await self.send_command(
HCI_LE_Accept_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle
),
HCI_LE_Accept_CIS_Request_Command(connection_handle=handle),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
@@ -3045,9 +3043,7 @@ class Device(CompositeEventEmitter):
reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR,
) -> None:
result = await self.send_command(
HCI_LE_Reject_CIS_Request_Command( # type: ignore[call-arg]
connection_handle=handle, reason=reason
),
HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason),
)
if result.status != HCI_COMMAND_STATUS_PENDING:
logger.warning(
@@ -3439,7 +3435,7 @@ class Device(CompositeEventEmitter):
try:
if await connection.abort_on('disconnection', method()):
await self.host.send_command(
HCI_User_Confirmation_Request_Reply_Command( # type: ignore[call-arg]
HCI_User_Confirmation_Request_Reply_Command(
bd_addr=connection.peer_address
)
)
@@ -3448,7 +3444,7 @@ class Device(CompositeEventEmitter):
logger.warning(f'exception while confirming: {error}')
await self.host.send_command(
HCI_User_Confirmation_Request_Negative_Reply_Command( # type: ignore[call-arg]
HCI_User_Confirmation_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
@@ -3469,7 +3465,7 @@ class Device(CompositeEventEmitter):
)
if number is not None:
await self.host.send_command(
HCI_User_Passkey_Request_Reply_Command( # type: ignore[call-arg]
HCI_User_Passkey_Request_Reply_Command(
bd_addr=connection.peer_address, numeric_value=number
)
)
@@ -3478,7 +3474,7 @@ class Device(CompositeEventEmitter):
logger.warning(f'exception while asking for pass-key: {error}')
await self.host.send_command(
HCI_User_Passkey_Request_Negative_Reply_Command( # type: ignore[call-arg]
HCI_User_Passkey_Request_Negative_Reply_Command(
bd_addr=connection.peer_address
)
)
+70 -43
View File
@@ -561,6 +561,12 @@ HCI_LE_TRANSMITTER_TEST_V4_COMMAND = hci_c
HCI_LE_SET_DATA_RELATED_ADDRESS_CHANGES_COMMAND = hci_command_op_code(0x08, 0x007C)
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND = hci_command_op_code(0x08, 0x007D)
HCI_LE_SUBRATE_REQUEST_COMMAND = hci_command_op_code(0x08, 0x007E)
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x007F)
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND = hci_command_op_code(0x08, 0x0082)
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND = hci_command_op_code(0x08, 0x0083)
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND = hci_command_op_code(0x08, 0x0084)
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND = hci_command_op_code(0x08, 0x0085)
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND = hci_command_op_code(0x08, 0x0086)
# HCI Error Codes
@@ -1317,56 +1323,72 @@ HCI_SUPPORTED_COMMANDS_FLAGS = (
(
HCI_LE_SET_DEFAULT_SUBRATE_COMMAND,
HCI_LE_SUBRATE_REQUEST_COMMAND,
HCI_LE_SET_EXTENDED_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
HCI_LE_SET_PERIODIC_ADVERTISING_SUBEVENT_DATA_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_RESPONSE_DATA_COMMAND,
HCI_LE_SET_PERIODIC_SYNC_SUBEVENT_COMMAND
),
# Octet 47
(
HCI_LE_EXTENDED_CREATE_CONNECTION_V2_COMMAND,
HCI_LE_SET_PERIODIC_ADVERTISING_PARAMETERS_V2_COMMAND,
None,
None,
None,
None,
None,
None,
None
)
)
# LE Supported Features
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
# See Bluetooth spec @ Vol 6, Part B, 4.6 FEATURE SUPPORT
HCI_LE_ENCRYPTION_LE_SUPPORTED_FEATURE = 0
HCI_CONNECTION_PARAMETERS_REQUEST_PROCEDURE_LE_SUPPORTED_FEATURE = 1
HCI_EXTENDED_REJECT_INDICATION_LE_SUPPORTED_FEATURE = 2
HCI_PERIPHERAL_INITIATED_FEATURE_EXCHANGE_LE_SUPPORTED_FEATURE = 3
HCI_LE_PING_LE_SUPPORTED_FEATURE = 4
HCI_LE_DATA_PACKET_LENGTH_EXTENSION_LE_SUPPORTED_FEATURE = 5
HCI_LL_PRIVACY_LE_SUPPORTED_FEATURE = 6
HCI_EXTENDED_SCANNER_FILTER_POLICIES_LE_SUPPORTED_FEATURE = 7
HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE = 8
HCI_STABLE_MODULATION_INDEX_TRANSMITTER_LE_SUPPORTED_FEATURE = 9
HCI_STABLE_MODULATION_INDEX_RECEIVER_LE_SUPPORTED_FEATURE = 10
HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE = 11
HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE = 12
HCI_LE_PERIODIC_ADVERTISING_LE_SUPPORTED_FEATURE = 13
HCI_CHANNEL_SELECTION_ALGORITHM_2_LE_SUPPORTED_FEATURE = 14
HCI_LE_POWER_CLASS_1_LE_SUPPORTED_FEATURE = 15
HCI_MINIMUM_NUMBER_OF_USED_CHANNELS_PROCEDURE_LE_SUPPORTED_FEATURE = 16
HCI_CONNECTION_CTE_REQUEST_LE_SUPPORTED_FEATURE = 17
HCI_CONNECTION_CTE_RESPONSE_LE_SUPPORTED_FEATURE = 18
HCI_CONNECTIONLESS_CTE_TRANSMITTER_LE_SUPPORTED_FEATURE = 19
HCI_CONNECTIONLESS_CTR_RECEIVER_LE_SUPPORTED_FEATURE = 20
HCI_ANTENNA_SWITCHING_DURING_CTE_TRANSMISSION_LE_SUPPORTED_FEATURE = 21
HCI_ANTENNA_SWITCHING_DURING_CTE_RECEPTION_LE_SUPPORTED_FEATURE = 22
HCI_RECEIVING_CONSTANT_TONE_EXTENSIONS_LE_SUPPORTED_FEATURE = 23
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_SENDER_LE_SUPPORTED_FEATURE = 24
HCI_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECIPIENT_LE_SUPPORTED_FEATURE = 25
HCI_SLEEP_CLOCK_ACCURACY_UPDATES_LE_SUPPORTED_FEATURE = 26
HCI_REMOTE_PUBLIC_KEY_VALIDATION_LE_SUPPORTED_FEATURE = 27
HCI_CONNECTED_ISOCHRONOUS_STREAM_CENTRAL_LE_SUPPORTED_FEATURE = 28
HCI_CONNECTED_ISOCHRONOUS_STREAM_PERIPHERAL_LE_SUPPORTED_FEATURE = 29
HCI_ISOCHRONOUS_BROADCASTER_LE_SUPPORTED_FEATURE = 30
HCI_SYNCHRONIZED_RECEIVER_LE_SUPPORTED_FEATURE = 31
HCI_CONNECTED_ISOCHRONOUS_STREAM_LE_SUPPORTED_FEATURE = 32
HCI_LE_POWER_CONTROL_REQUEST_LE_SUPPORTED_FEATURE = 33
HCI_LE_POWER_CONTROL_REQUEST_DUP_LE_SUPPORTED_FEATURE = 34
HCI_LE_PATH_LOSS_MONITORING_LE_SUPPORTED_FEATURE = 35
HCI_PERIODIC_ADVERTISING_ADI_SUPPORT_LE_SUPPORTED_FEATURE = 36
HCI_CONNECTION_SUBRATING_LE_SUPPORTED_FEATURE = 37
HCI_CONNECTION_SUBRATING_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 38
HCI_CHANNEL_CLASSIFICATION_LE_SUPPORTED_FEATURE = 39
HCI_ADVERTISING_CODING_SELECTION_LE_SUPPORTED_FEATURE = 40
HCI_ADVERTISING_CODING_SELECTION_HOST_SUPPORT_LE_SUPPORTED_FEATURE = 41
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_ADVERTISER_LE_SUPPORTED_FEATURE = 43
HCI_PERIODIC_ADVERTISING_WITH_RESPONSES_SCANNER_LE_SUPPORTED_FEATURE = 44
HCI_LE_SUPPORTED_FEATURES_NAMES = {
flag: feature_name for (feature_name, flag) in globals().items()
@@ -1629,7 +1651,7 @@ class HCI_Object:
field_bytes = bytes(field_value)
elif field_type == 'v':
# Variable-length bytes field, with 1-byte length at the beginning
field_bytes = bytes(field_bytes)
field_bytes = bytes(field_value)
field_length = len(field_bytes)
field_bytes = bytes([field_length]) + field_bytes
elif isinstance(field_value, (bytes, bytearray)) or hasattr(
@@ -2018,6 +2040,7 @@ class HCI_Command(HCI_Packet):
hci_packet_type = HCI_COMMAND_PACKET
command_names: Dict[int, str] = {}
command_classes: Dict[int, Type[HCI_Command]] = {}
op_code: int
@staticmethod
def command(fields=(), return_parameters_fields=()):
@@ -2103,7 +2126,11 @@ class HCI_Command(HCI_Packet):
return_parameters.fields = cls.return_parameters_fields
return return_parameters
def __init__(self, op_code, parameters=None, **kwargs):
def __init__(self, op_code=-1, parameters=None, **kwargs):
# Since the legacy implementation relies on an __init__ injector, typing always
# complains that positional argument op_code is not passed, so here sets a
# default value to allow building derived HCI_Command without op_code.
assert op_code != -1
super().__init__(HCI_Command.command_name(op_code))
if (fields := getattr(self, 'fields', None)) and kwargs:
HCI_Object.init_from_fields(self, fields, kwargs)
+1 -1
View File
@@ -1926,7 +1926,7 @@ class ChannelManager:
supervision_timeout=request.timeout,
min_ce_length=0,
max_ce_length=0,
) # type: ignore[call-arg]
)
)
else:
self.send_control_frame(
+496
View File
@@ -0,0 +1,496 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
from collections.abc import Sequence
import dataclasses
import enum
import struct
import functools
from typing import Optional, List, Union
from bumble import hci
from bumble import gatt
from bumble import gatt_client
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
class AudioLocation(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location'''
# fmt: off
NOT_ALLOWED = 0x00000000
FRONT_LEFT = 0x00000001
FRONT_RIGHT = 0x00000002
FRONT_CENTER = 0x00000004
LOW_FREQUENCY_EFFECTS_1 = 0x00000008
BACK_LEFT = 0x00000010
BACK_RIGHT = 0x00000020
FRONT_LEFT_OF_CENTER = 0x00000040
FRONT_RIGHT_OF_CENTER = 0x00000080
BACK_CENTER = 0x00000100
LOW_FREQUENCY_EFFECTS_2 = 0x00000200
SIDE_LEFT = 0x00000400
SIDE_RIGHT = 0x00000800
TOP_FRONT_LEFT = 0x00001000
TOP_FRONT_RIGHT = 0x00002000
TOP_FRONT_CENTER = 0x00004000
TOP_CENTER = 0x00008000
TOP_BACK_LEFT = 0x00010000
TOP_BACK_RIGHT = 0x00020000
TOP_SIDE_LEFT = 0x00040000
TOP_SIDE_RIGHT = 0x00080000
TOP_BACK_CENTER = 0x00100000
BOTTOM_FRONT_CENTER = 0x00200000
BOTTOM_FRONT_LEFT = 0x00400000
BOTTOM_FRONT_RIGHT = 0x00800000
FRONT_LEFT_WIDE = 0x01000000
FRONT_RIGHT_WIDE = 0x02000000
LEFT_SURROUND = 0x04000000
RIGHT_SURROUND = 0x08000000
class AudioInputType(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
# fmt: off
UNSPECIFIED = 0x00
BLUETOOTH = 0x01
MICROPHONE = 0x02
ANALOG = 0x03
DIGITAL = 0x04
RADIO = 0x05
STREAMING = 0x06
AMBIENT = 0x07
class ContextType(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type'''
# fmt: off
PROHIBITED = 0x0000
CONVERSATIONAL = 0x0002
MEDIA = 0x0004
GAME = 0x0008
INSTRUCTIONAL = 0x0010
VOICE_ASSISTANTS = 0x0020
LIVE = 0x0040
SOUND_EFFECTS = 0x0080
NOTIFICATIONS = 0x0100
RINGTONE = 0x0200
ALERTS = 0x0400
EMERGENCY_ALARM = 0x0800
class SamplingFrequency(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
# fmt: off
FREQ_8000 = 0x01
FREQ_11025 = 0x02
FREQ_16000 = 0x03
FREQ_22050 = 0x04
FREQ_24000 = 0x05
FREQ_32000 = 0x06
FREQ_44100 = 0x07
FREQ_48000 = 0x08
FREQ_88200 = 0x09
FREQ_96000 = 0x0A
FREQ_176400 = 0x0B
FREQ_192000 = 0x0C
FREQ_384000 = 0x0D
# fmt: on
@classmethod
def from_hz(cls, frequency: int) -> SamplingFrequency:
return {
8000: SamplingFrequency.FREQ_8000,
11025: SamplingFrequency.FREQ_11025,
16000: SamplingFrequency.FREQ_16000,
22050: SamplingFrequency.FREQ_22050,
24000: SamplingFrequency.FREQ_24000,
32000: SamplingFrequency.FREQ_32000,
44100: SamplingFrequency.FREQ_44100,
48000: SamplingFrequency.FREQ_48000,
88200: SamplingFrequency.FREQ_88200,
96000: SamplingFrequency.FREQ_96000,
176400: SamplingFrequency.FREQ_176400,
192000: SamplingFrequency.FREQ_192000,
384000: SamplingFrequency.FREQ_384000,
}[frequency]
@property
def hz(self) -> int:
return {
SamplingFrequency.FREQ_8000: 8000,
SamplingFrequency.FREQ_11025: 11025,
SamplingFrequency.FREQ_16000: 16000,
SamplingFrequency.FREQ_22050: 22050,
SamplingFrequency.FREQ_24000: 24000,
SamplingFrequency.FREQ_32000: 32000,
SamplingFrequency.FREQ_44100: 44100,
SamplingFrequency.FREQ_48000: 48000,
SamplingFrequency.FREQ_88200: 88200,
SamplingFrequency.FREQ_96000: 96000,
SamplingFrequency.FREQ_176400: 176400,
SamplingFrequency.FREQ_192000: 192000,
SamplingFrequency.FREQ_384000: 384000,
}[self]
class SupportedSamplingFrequency(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency'''
# fmt: off
FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1)
FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1)
FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1)
FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1)
FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1)
FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1)
FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1)
FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1)
FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1)
FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1)
FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1)
FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1)
FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1)
# fmt: on
@classmethod
def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency:
MAPPING = {
8000: SupportedSamplingFrequency.FREQ_8000,
11025: SupportedSamplingFrequency.FREQ_11025,
16000: SupportedSamplingFrequency.FREQ_16000,
22050: SupportedSamplingFrequency.FREQ_22050,
24000: SupportedSamplingFrequency.FREQ_24000,
32000: SupportedSamplingFrequency.FREQ_32000,
44100: SupportedSamplingFrequency.FREQ_44100,
48000: SupportedSamplingFrequency.FREQ_48000,
88200: SupportedSamplingFrequency.FREQ_88200,
96000: SupportedSamplingFrequency.FREQ_96000,
176400: SupportedSamplingFrequency.FREQ_176400,
192000: SupportedSamplingFrequency.FREQ_192000,
384000: SupportedSamplingFrequency.FREQ_384000,
}
return functools.reduce(
lambda x, y: x | MAPPING[y],
frequencies,
cls(0),
)
class FrameDuration(enum.IntEnum):
'''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration'''
# fmt: off
DURATION_7500_US = 0x00
DURATION_10000_US = 0x01
class SupportedFrameDuration(enum.IntFlag):
'''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
# fmt: off
DURATION_7500_US_SUPPORTED = 0b0001
DURATION_10000_US_SUPPORTED = 0b0010
DURATION_7500_US_PREFERRED = 0b0001
DURATION_10000_US_PREFERRED = 0b0010
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def bits_to_channel_counts(data: int) -> List[int]:
pos = 0
counts = []
while data != 0:
# Bit 0 = count 1
# Bit 1 = count 2, and so on
pos += 1
if data & 1:
counts.append(pos)
data >>= 1
return counts
def channel_counts_to_bits(counts: Sequence[int]) -> int:
return sum(set([1 << (count - 1) for count in counts]))
# -----------------------------------------------------------------------------
# Structures
# -----------------------------------------------------------------------------
@dataclasses.dataclass
class CodecSpecificCapabilities:
'''See:
* Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures
* Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements
'''
class Type(enum.IntEnum):
# fmt: off
SAMPLING_FREQUENCY = 0x01
FRAME_DURATION = 0x02
AUDIO_CHANNEL_COUNT = 0x03
OCTETS_PER_FRAME = 0x04
CODEC_FRAMES_PER_SDU = 0x05
supported_sampling_frequencies: SupportedSamplingFrequency
supported_frame_durations: SupportedFrameDuration
supported_audio_channel_counts: Sequence[int]
min_octets_per_codec_frame: int
max_octets_per_codec_frame: int
supported_max_codec_frames_per_sdu: int
@classmethod
def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
offset = 0
# Allowed default values.
supported_audio_channel_counts = [1]
supported_max_codec_frames_per_sdu = 1
while offset < len(data):
length, type = struct.unpack_from('BB', data, offset)
offset += 2
value = int.from_bytes(data[offset : offset + length - 1], 'little')
offset += length - 1
if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY:
supported_sampling_frequencies = SupportedSamplingFrequency(value)
elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
supported_frame_durations = SupportedFrameDuration(value)
elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
supported_audio_channel_counts = bits_to_channel_counts(value)
elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
min_octets_per_sample = value & 0xFFFF
max_octets_per_sample = value >> 16
elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU:
supported_max_codec_frames_per_sdu = value
# It is expected here that if some fields are missing, an error should be raised.
return CodecSpecificCapabilities(
supported_sampling_frequencies=supported_sampling_frequencies,
supported_frame_durations=supported_frame_durations,
supported_audio_channel_counts=supported_audio_channel_counts,
min_octets_per_codec_frame=min_octets_per_sample,
max_octets_per_codec_frame=max_octets_per_sample,
supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
)
def __bytes__(self) -> bytes:
return struct.pack(
'<BBHBBBBBBBBHHBBB',
3,
CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY,
self.supported_sampling_frequencies,
2,
CodecSpecificCapabilities.Type.FRAME_DURATION,
self.supported_frame_durations,
2,
CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
channel_counts_to_bits(self.supported_audio_channel_counts),
5,
CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
self.min_octets_per_codec_frame,
self.max_octets_per_codec_frame,
2,
CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU,
self.supported_max_codec_frames_per_sdu,
)
@dataclasses.dataclass
class PacRecord:
coding_format: hci.CodingFormat
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
# TODO: Parse Metadata
metadata: bytes = b''
@classmethod
def from_bytes(cls, data: bytes) -> PacRecord:
offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0)
codec_specific_capabilities_size = data[offset]
offset += 1
codec_specific_capabilities_bytes = data[
offset : offset + codec_specific_capabilities_size
]
offset += codec_specific_capabilities_size
metadata_size = data[offset]
metadata = data[offset : offset + metadata_size]
codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
codec_specific_capabilities = codec_specific_capabilities_bytes
else:
codec_specific_capabilities = CodecSpecificCapabilities.from_bytes(
codec_specific_capabilities_bytes
)
return PacRecord(
coding_format=coding_format,
codec_specific_capabilities=codec_specific_capabilities,
metadata=metadata,
)
def __bytes__(self) -> bytes:
capabilities_bytes = bytes(self.codec_specific_capabilities)
return (
bytes(self.coding_format)
+ bytes([len(capabilities_bytes)])
+ capabilities_bytes
+ bytes([len(self.metadata)])
+ self.metadata
)
# -----------------------------------------------------------------------------
# Server
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesService(gatt.TemplateService):
UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
sink_pac: Optional[gatt.Characteristic]
sink_audio_locations: Optional[gatt.Characteristic]
source_pac: Optional[gatt.Characteristic]
source_audio_locations: Optional[gatt.Characteristic]
available_audio_contexts: gatt.Characteristic
supported_audio_contexts: gatt.Characteristic
def __init__(
self,
supported_source_context: ContextType,
supported_sink_context: ContextType,
available_source_context: ContextType,
available_sink_context: ContextType,
sink_pac: Sequence[PacRecord] = [],
sink_audio_locations: Optional[AudioLocation] = None,
source_pac: Sequence[PacRecord] = [],
source_audio_locations: Optional[AudioLocation] = None,
) -> None:
characteristics = []
self.supported_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', supported_sink_context, supported_source_context),
)
characteristics.append(self.supported_audio_contexts)
self.available_audio_contexts = gatt.Characteristic(
uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ
| gatt.Characteristic.Properties.NOTIFY,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<HH', available_sink_context, available_source_context),
)
characteristics.append(self.available_audio_contexts)
if sink_pac:
self.sink_pac = gatt.Characteristic(
uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)),
)
characteristics.append(self.sink_pac)
if sink_audio_locations is not None:
self.sink_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', sink_audio_locations),
)
characteristics.append(self.sink_audio_locations)
if source_pac:
self.source_pac = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)),
)
characteristics.append(self.source_pac)
if source_audio_locations is not None:
self.source_audio_locations = gatt.Characteristic(
uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC,
properties=gatt.Characteristic.Properties.READ,
permissions=gatt.Characteristic.Permissions.READABLE,
value=struct.pack('<I', source_audio_locations),
)
characteristics.append(self.source_audio_locations)
super().__init__(characteristics)
# -----------------------------------------------------------------------------
# Client
# -----------------------------------------------------------------------------
class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
SERVICE_CLASS = PublishedAudioCapabilitiesService
sink_pac: Optional[gatt_client.CharacteristicProxy] = None
sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
source_pac: Optional[gatt_client.CharacteristicProxy] = None
source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
available_audio_contexts: gatt_client.CharacteristicProxy
supported_audio_contexts: gatt_client.CharacteristicProxy
def __init__(self, service_proxy: gatt_client.ServiceProxy):
self.service_proxy = service_proxy
self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
)[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_PAC_CHARACTERISTIC
):
self.sink_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_PAC_CHARACTERISTIC
):
self.source_pac = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
):
self.sink_audio_locations = characteristics[0]
if characteristics := service_proxy.get_characteristics_by_uuid(
gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
):
self.source_audio_locations = characteristics[0]
+1 -1
View File
@@ -1090,7 +1090,7 @@ class Session:
# We can now encrypt the connection with the short term key, so that we can
# distribute the long term and/or other keys over an encrypted connection
self.manager.device.host.send_command_sync(
HCI_LE_Enable_Encryption_Command( # type: ignore[call-arg]
HCI_LE_Enable_Encryption_Command(
connection_handle=self.connection.handle,
random_number=bytes(8),
encrypted_diversifier=0,
+13 -53
View File
@@ -1,19 +1,19 @@
ANDROID REMOTE HCI APP
======================
This application allows using an android phone's built-in Bluetooth controller with
This application allows using an android phone's built-in Bluetooth controller with
a Bumble host stack running outside the phone (typically a development laptop or desktop).
The app runs an HCI proxy between a TCP socket on the "outside" and the Bluetooth HCI HAL
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
on the "inside". (See [this page](https://source.android.com/docs/core/connect/bluetooth) for a high level
description of the Android Bluetooth HCI HAL).
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
The HCI packets received on the TCP socket are forwarded to the phone's controller, and the
packets coming from the controller are forwarded to the TCP socket.
Building
--------
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `extras/android/RemoteHCI` top level directory.
You can build the app by running `./gradlew build` (use `gradlew.bat` on Windows) from the `RemoteHCI` top level directory.
You can also build with Android Studio: open the `RemoteHCI` project. You can build and/or debug from there.
If the build succeeds, you can find the app APKs (debug and release) at:
@@ -25,23 +25,9 @@ If the build succeeds, you can find the app APKs (debug and release) at:
Running
-------
!!! note
In the following examples, it is assumed that shell commands are executed while in the
app's root directory, `extras/android/RemoteHCI`. If you are in a different directory,
adjust the relative paths accordingly.
### Preconditions
When the proxy starts (tapping the "Start" button in the app's main activity, or running the proxy
from an `adb shell` command line), it will try to bind to the Bluetooth HAL.
This requires that there is no other HAL client, and requires certain privileges.
For running as a regular app, this requires disabling SELinux temporarily.
For running as a command-line executable, this just requires a root shell.
#### Root Shell
!!! tip "Restart `adb` as root"
```bash
$ adb root
```
When the proxy starts (tapping the "Start" button in the app's main activity), it will try to
bind to the Bluetooth HAL. This requires disabling SELinux temporarily, and being the only HAL client.
#### Disabling SELinux
Binding to the Bluetooth HCI HAL requires certain SELinux permissions that can't simply be changed
@@ -70,8 +56,8 @@ development phone).
This state will also reset to the normal SELinux enforcement when you reboot.
#### Stopping the bluetooth process
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Since the Bluetooth HAL service can only accept one client, and that in normal conditions
that client is the Android's bluetooth stack, it is required to first shut down the
Android bluetooth stack process.
!!! tip "Checking if the Bluetooth process is running"
@@ -93,33 +79,7 @@ Airplane Mode, then rebooting. The bluetooth process should, in theory, not rest
$ adb shell cmd bluetooth_manager disable
```
### Running as a command line app
You push the built APK to a temporary location on the phone's filesystem, then launch the command
line executable with an `adb shell` command.
!!! tip "Pushing the executable"
```bash
$ adb push app/build/outputs/apk/release/app-release-unsigned.apk /data/local/tmp/remotehci.apk
```
Do this every time you rebuild. Alternatively, you can push the `debug` APK instead:
```bash
$ adb push app/build/outputs/apk/debug/app-debug.apk /data/local/tmp/remotehci.apk
```
!!! tip "Start the proxy from the command line"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface"
```
This will run the proxy, listening on the default TCP port.
If you want a different port, pass it as a command line parameter
!!! tip "Start the proxy from the command line with a specific TCP port"
```bash
adb shell "CLASSPATH=/data/local/tmp/remotehci.apk app_process /system/bin com.github.google.bumble.remotehci.CommandLineInterface 12345"
```
### Running as a normal app
### Starting the app
You can start the app from the Android launcher, from Android Studio, or with `adb`
#### Launching from the launcher
@@ -143,11 +103,11 @@ automatically start the proxy, and/or set the port number.
#### Selecting a TCP port
The RemoteHCI app's main activity has a "TCP Port" setting where you can change the port on
which the proxy is accepting connections. If the default value isn't suitable, you can
which the proxy is accepting connections. If the default value isn't suitable, you can
change it there (you can also use the special value 0 to let the OS assign a port number for you).
### Connecting to the proxy
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
To connect the Bumble stack to the proxy, you need to be able to reach the phone's network
stack. This can be done over the phone's WiFi connection, or, alternatively, using an `adb`
TCP forward (which should be faster than over WiFi).
@@ -156,7 +116,7 @@ TCP forward (which should be faster than over WiFi).
```bash
$ adb forward tcp:<outside-port> tcp:<inside-port>
```
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
Where ``<outside-port>`` is the port number for a listening socket on your laptop or
desktop machine, and <inside-port> is the TCP port selected in the app's user interface.
Those two ports may be the same, of course.
For example, with the default TCP port 9993:
@@ -165,7 +125,7 @@ TCP forward (which should be faster than over WiFi).
```
Once you've ensured that you can reach the proxy's TCP port on the phone, either directly or
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
via an `adb` forward, you can then use it as a Bumble transport, using the transport name:
``tcp-client:<host>:<port>`` syntax.
!!! example "Connecting a Bumble client"
-1
View File
@@ -73,7 +73,6 @@ async def main() -> None:
HCI_Enhanced_Setup_Synchronous_Connection_Command(
connection_handle=connections[0].handle,
**ESCO_PARAMETERS[DefaultCodecParameters.ESCO_CVSD_S3].asdict(),
# type: ignore[call-args]
)
)
+134
View File
@@ -0,0 +1,134 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import sys
import os
from bumble.core import AdvertisingData
from bumble.device import Device
from bumble.hci import (
CodecID,
CodingFormat,
OwnAddressType,
HCI_LE_Set_Extended_Advertising_Parameters_Command,
)
from bumble.profiles.bap import (
CodecSpecificCapabilities,
ContextType,
AudioLocation,
SupportedSamplingFrequency,
SupportedFrameDuration,
PacRecord,
PublishedAudioCapabilitiesService,
)
from bumble.transport import open_transport_or_link
# -----------------------------------------------------------------------------
async def main() -> None:
if len(sys.argv) < 3:
print('Usage: run_cig_setup.py <config-file>' '<transport-spec-for-device>')
return
print('<<< connecting to HCI...')
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
print('<<< connected')
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
device.cis_enabled = True
await device.power_on()
device.add_service(
PublishedAudioCapabilitiesService(
supported_source_context=ContextType.PROHIBITED,
available_source_context=ContextType.PROHIBITED,
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
sink_audio_locations=(
AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT
),
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
)
)
advertising_data = bytes(
AdvertisingData(
[
(
AdvertisingData.COMPLETE_LOCAL_NAME,
bytes('Bumble LE Audio', 'utf-8'),
),
(
AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
bytes(PublishedAudioCapabilitiesService.UUID),
),
]
)
)
await device.start_extended_advertising(
advertising_properties=(
HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties.CONNECTABLE_ADVERTISING
),
own_address_type=OwnAddressType.RANDOM,
advertising_data=advertising_data,
)
await hci_transport.source.terminated
# -----------------------------------------------------------------------------
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
asyncio.run(main())
@@ -1,5 +1,5 @@
[versions]
agp = "8.2.0"
agp = "8.3.0-alpha11"
kotlin = "1.9.0"
core-ktx = "1.12.0"
junit = "4.13.2"
@@ -1,57 +0,0 @@
package com.github.google.bumble.remotehci
import java.io.IOException
class CommandLineInterface {
companion object {
fun printUsage() {
System.out.println("usage: <launch-command> [-h|--help] [<tcp-port>]")
}
@JvmStatic fun main(args: Array<String>) {
System.out.println("Starting proxy")
var tcpPort = DEFAULT_TCP_PORT
if (args.isNotEmpty()) {
if (args[0] == "-h" || args[0] == "--help") {
printUsage()
return
}
try {
tcpPort = args[0].toInt()
} catch (error: NumberFormatException) {
System.out.println("ERROR: invalid TCP port argument")
printUsage()
return
}
}
try {
val hciProxy = HciProxy(tcpPort, object : HciProxy.Listener {
override fun onHostConnectionState(connected: Boolean) {
}
override fun onHciPacketCountChange(
commandPacketsReceived: Int,
aclPacketsReceived: Int,
scoPacketsReceived: Int,
eventPacketsSent: Int,
aclPacketsSent: Int,
scoPacketsSent: Int
) {
}
override fun onMessage(message: String?) {
System.out.println(message)
}
})
hciProxy.run()
} catch (error: IOException) {
System.err.println("Exception while running HCI Server: $error")
} catch (error: HciProxy.HalException) {
System.err.println("HAL exception: ${error.message}")
}
}
}
}
@@ -1,5 +1,5 @@
[versions]
agp = "8.2.0"
agp = "8.3.0-alpha11"
kotlin = "1.8.10"
core-ktx = "1.9.0"
junit = "4.13.2"
+151
View File
@@ -0,0 +1,151 @@
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import os
import pytest
import logging
from bumble import device
from bumble.hci import CodecID, CodingFormat
from bumble.profiles.bap import (
AudioLocation,
SupportedFrameDuration,
SupportedSamplingFrequency,
CodecSpecificCapabilities,
ContextType,
PacRecord,
PublishedAudioCapabilitiesService,
PublishedAudioCapabilitiesServiceProxy,
)
from .test_utils import TwoDevices
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
def test_codec_specific_capabilities() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
assert CodecSpecificCapabilities.from_bytes(bytes(cap)) == cap
# -----------------------------------------------------------------------------
def test_pac_record() -> None:
SAMPLE_FREQUENCY = SupportedSamplingFrequency.FREQ_16000
FRAME_SURATION = SupportedFrameDuration.DURATION_10000_US_SUPPORTED
AUDIO_CHANNEL_COUNTS = [1]
cap = CodecSpecificCapabilities(
supported_sampling_frequencies=SAMPLE_FREQUENCY,
supported_frame_durations=FRAME_SURATION,
supported_audio_channel_counts=AUDIO_CHANNEL_COUNTS,
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
)
pac_record = PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=cap,
metadata=b'',
)
assert PacRecord.from_bytes(bytes(pac_record)) == pac_record
# -----------------------------------------------------------------------------
def test_vendor_specific_pac_record() -> None:
# Vendor-Specific codec, Google, ID=0xFFFF. No capabilities and metadata.
RAW_DATA = bytes.fromhex('ffe000ffff0000')
assert bytes(PacRecord.from_bytes(RAW_DATA)) == RAW_DATA
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_pacs():
devices = TwoDevices()
devices[0].add_service(
PublishedAudioCapabilitiesService(
supported_sink_context=ContextType.MEDIA,
available_sink_context=ContextType.MEDIA,
supported_source_context=0,
available_source_context=0,
sink_pac=[
# Codec Capability Setting 16_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_16000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=40,
max_octets_per_codec_frame=40,
supported_max_codec_frames_per_sdu=1,
),
),
# Codec Capability Setting 24_2
PacRecord(
coding_format=CodingFormat(CodecID.LC3),
codec_specific_capabilities=CodecSpecificCapabilities(
supported_sampling_frequencies=(
SupportedSamplingFrequency.FREQ_24000
),
supported_frame_durations=(
SupportedFrameDuration.DURATION_10000_US_SUPPORTED
),
supported_audio_channel_counts=[1],
min_octets_per_codec_frame=60,
max_octets_per_codec_frame=60,
supported_max_codec_frames_per_sdu=1,
),
),
],
sink_audio_locations=AudioLocation.FRONT_LEFT | AudioLocation.FRONT_RIGHT,
)
)
await devices.setup_connection()
peer = device.Peer(devices.connections[1])
pacs_client = await peer.discover_service_and_create_proxy(
PublishedAudioCapabilitiesServiceProxy
)
# -----------------------------------------------------------------------------
async def run():
await test_pacs()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run())
+9
View File
@@ -53,6 +53,7 @@ from bumble.hci import (
HCI_LE_Set_Random_Address_Command,
HCI_LE_Set_Scan_Enable_Command,
HCI_LE_Set_Scan_Parameters_Command,
HCI_LE_Setup_ISO_Data_Path_Command,
HCI_Number_Of_Completed_Packets_Event,
HCI_Packet,
HCI_PIN_Code_Request_Reply_Command,
@@ -455,6 +456,14 @@ def test_HCI_LE_Setup_ISO_Data_Path_Command():
assert command.controller_delay == 0
assert command.codec_configuration == b''
command = HCI_LE_Setup_ISO_Data_Path_Command(
connection_handle=0x0060,
data_path_direction=0x00,
data_path_id=0x01,
codec_id=CodingFormat(CodecID.TRANSPARENT),
controller_delay=0x00,
codec_configuration=b'',
)
basic_check(command)