forked from auracaster/bumble_mirror
Implement HCI_Mode_Change_Event
This commit is contained in:
@@ -27,7 +27,7 @@ from bumble.colors import color
|
|||||||
from bumble.core import (
|
from bumble.core import (
|
||||||
PhysicalTransport,
|
PhysicalTransport,
|
||||||
)
|
)
|
||||||
|
from bumble import hci
|
||||||
from bumble.hci import (
|
from bumble.hci import (
|
||||||
HCI_ACL_DATA_PACKET,
|
HCI_ACL_DATA_PACKET,
|
||||||
HCI_COMMAND_DISALLOWED_ERROR,
|
HCI_COMMAND_DISALLOWED_ERROR,
|
||||||
@@ -618,8 +618,8 @@ class Controller:
|
|||||||
cis_sync_delay=0,
|
cis_sync_delay=0,
|
||||||
transport_latency_c_to_p=0,
|
transport_latency_c_to_p=0,
|
||||||
transport_latency_p_to_c=0,
|
transport_latency_p_to_c=0,
|
||||||
phy_c_to_p=1,
|
phy_c_to_p=0,
|
||||||
phy_p_to_c=1,
|
phy_p_to_c=0,
|
||||||
nse=0,
|
nse=0,
|
||||||
bn_c_to_p=0,
|
bn_c_to_p=0,
|
||||||
bn_p_to_c=0,
|
bn_p_to_c=0,
|
||||||
@@ -977,7 +977,68 @@ class Controller:
|
|||||||
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
|
self, connection.peer_address, HCI_Connection_Complete_Event.LinkType.ESCO
|
||||||
)
|
)
|
||||||
|
|
||||||
def on_hci_switch_role_command(self, command):
|
def on_hci_sniff_mode_command(self, command: hci.HCI_Sniff_Mode_Command):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec Vol 4, Part E - 7.2.2 Sniff Mode command
|
||||||
|
'''
|
||||||
|
if self.link is None:
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_Command_Status_Event(
|
||||||
|
status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
|
||||||
|
num_hci_command_packets=1,
|
||||||
|
command_opcode=command.op_code,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_Command_Status_Event(
|
||||||
|
status=HCI_SUCCESS,
|
||||||
|
num_hci_command_packets=1,
|
||||||
|
command_opcode=command.op_code,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_Mode_Change_Event(
|
||||||
|
status=HCI_SUCCESS,
|
||||||
|
connection_handle=command.connection_handle,
|
||||||
|
current_mode=hci.HCI_Mode_Change_Event.Mode.SNIFF,
|
||||||
|
interval=2,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def on_hci_exit_sniff_mode_command(self, command: hci.HCI_Exit_Sniff_Mode_Command):
|
||||||
|
'''
|
||||||
|
See Bluetooth spec Vol 4, Part E - 7.2.3 Exit Sniff Mode command
|
||||||
|
'''
|
||||||
|
|
||||||
|
if self.link is None:
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_Command_Status_Event(
|
||||||
|
status=hci.HCI_UNKNOWN_CONNECTION_IDENTIFIER_ERROR,
|
||||||
|
num_hci_command_packets=1,
|
||||||
|
command_opcode=command.op_code,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return
|
||||||
|
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_Command_Status_Event(
|
||||||
|
status=HCI_SUCCESS,
|
||||||
|
num_hci_command_packets=1,
|
||||||
|
command_opcode=command.op_code,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.send_hci_packet(
|
||||||
|
hci.HCI_Mode_Change_Event(
|
||||||
|
status=HCI_SUCCESS,
|
||||||
|
connection_handle=command.connection_handle,
|
||||||
|
current_mode=hci.HCI_Mode_Change_Event.Mode.ACTIVE,
|
||||||
|
interval=2,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def on_hci_switch_role_command(self, command: hci.HCI_Switch_Role_Command):
|
||||||
'''
|
'''
|
||||||
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
|
See Bluetooth spec Vol 4, Part E - 7.2.8 Switch Role command
|
||||||
'''
|
'''
|
||||||
|
|||||||
@@ -5877,6 +5877,19 @@ class Device(utils.CompositeEventEmitter):
|
|||||||
|
|
||||||
utils.AsyncRunner.spawn(reply())
|
utils.AsyncRunner.spawn(reply())
|
||||||
|
|
||||||
|
# [Classic only]
|
||||||
|
@host_event_handler
|
||||||
|
@with_connection_from_handle
|
||||||
|
def on_mode_change(
|
||||||
|
self, connection: Connection, status: int, current_mode: int, interval: int
|
||||||
|
):
|
||||||
|
if status == hci.HCI_SUCCESS:
|
||||||
|
connection.classic_mode = current_mode
|
||||||
|
connection.classic_interval = interval
|
||||||
|
connection.emit(connection.EVENT_MODE_CHANGE)
|
||||||
|
else:
|
||||||
|
connection.emit(connection.EVENT_MODE_CHANGE_FAILURE)
|
||||||
|
|
||||||
# [Classic only]
|
# [Classic only]
|
||||||
@host_event_handler
|
@host_event_handler
|
||||||
@with_connection_from_address
|
@with_connection_from_address
|
||||||
|
|||||||
@@ -575,6 +575,37 @@ async def test_cis_setup_failure():
|
|||||||
await asyncio.wait_for(cis_create_task, _TIMEOUT)
|
await asyncio.wait_for(cis_create_task, _TIMEOUT)
|
||||||
|
|
||||||
|
|
||||||
|
# -----------------------------------------------------------------------------
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_enter_and_exit_sniff_mode():
|
||||||
|
devices = TwoDevices()
|
||||||
|
await devices.setup_connection()
|
||||||
|
|
||||||
|
m = mock.Mock()
|
||||||
|
devices.connections[0].on(Connection.EVENT_MODE_CHANGE, m)
|
||||||
|
|
||||||
|
await devices[0].send_command(
|
||||||
|
hci.HCI_Sniff_Mode_Command(
|
||||||
|
connection_handle=devices.connections[0].handle,
|
||||||
|
sniff_max_interval=2,
|
||||||
|
sniff_min_interval=2,
|
||||||
|
sniff_attempt=2,
|
||||||
|
sniff_timeout=2,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
m.assert_called_once()
|
||||||
|
assert devices.connections[0].classic_mode == hci.HCI_Mode_Change_Event.Mode.SNIFF
|
||||||
|
assert devices.connections[0].classic_interval == 2
|
||||||
|
|
||||||
|
await devices[0].send_command(
|
||||||
|
hci.HCI_Exit_Sniff_Mode_Command(connection_handle=devices.connections[0].handle)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert devices.connections[0].classic_mode == hci.HCI_Mode_Change_Event.Mode.ACTIVE
|
||||||
|
assert devices.connections[0].classic_interval == 2
|
||||||
|
|
||||||
|
|
||||||
# -----------------------------------------------------------------------------
|
# -----------------------------------------------------------------------------
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_power_on_default_static_address_should_not_be_any():
|
async def test_power_on_default_static_address_should_not_be_any():
|
||||||
|
|||||||
Reference in New Issue
Block a user