Compare commits

...

7 Commits

Author SHA1 Message Date
Gilles Boccon-Gibod
8b9ce03e86 Merge pull request #108 from google/gbg/fix-bluez-vhci
support more commands in controller.py
2023-01-08 14:40:26 -08:00
Gilles Boccon-Gibod
7e854efbbb support more commands in controller.py 2023-01-06 21:51:47 -08:00
Lucas Abel
17db5dd4ff Merge pull request #103 from google/uael/device-fixes
Misc device fixes
2022-12-20 12:15:49 -08:00
Abel Lucas
ea0a7e2347 device: commit LE connection **before** reading it's PHY 2022-12-20 19:25:43 +00:00
Gilles Boccon-Gibod
ea6a8d4339 Merge pull request #104 from google/gbg/fix-windll-load
fix libusb loading on Windows
2022-12-20 08:05:57 -08:00
Abel Lucas
ce049865a4 device: always prefer R2 for remote name request 2022-12-20 01:48:08 +00:00
Gilles Boccon-Gibod
6e0129b71d fix libusb loading on Windows 2022-12-18 22:00:26 -08:00
3 changed files with 53 additions and 47 deletions

View File

@@ -458,8 +458,8 @@ class Controller:
return
# Send a scan report
report = HCI_Object(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type=HCI_LE_Advertising_Report_Event.ADV_IND,
address_type=sender_address.address_type,
address=sender_address,
@@ -469,8 +469,8 @@ class Controller:
self.send_hci_packet(HCI_LE_Advertising_Report_Event([report]))
# Simulate a scan response
report = HCI_Object(
HCI_LE_Advertising_Report_Event.REPORT_FIELDS,
report = HCI_LE_Advertising_Report_Event.Report(
HCI_LE_Advertising_Report_Event.Report.FIELDS,
event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP,
address_type=sender_address.address_type,
address=sender_address,
@@ -738,10 +738,10 @@ class Controller:
self.advertising_parameters = command
return bytes([HCI_SUCCESS])
def on_hci_le_read_advertising_channel_tx_power_command(self, _command):
def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Channel Tx Power
Command
See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Physical Channel
Tx Power Command
'''
return bytes([HCI_SUCCESS, self.advertising_channel_tx_power])
@@ -1008,7 +1008,7 @@ class Controller:
def on_hci_le_read_phy_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY command
See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY Command
'''
return struct.pack(
'<BHBB',
@@ -1028,3 +1028,9 @@ class Controller:
'rx_phys': command.rx_phys,
}
return bytes([HCI_SUCCESS])
def on_hci_le_read_transmit_power_command(self, command):
'''
See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command
'''
return struct.pack('<BBB', HCI_SUCCESS, 0, 0)

View File

@@ -2237,8 +2237,7 @@ class Device(CompositeEventEmitter):
result = await self.send_command(
HCI_Remote_Name_Request_Command(
bd_addr=peer_address,
# TODO investigate other options
page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R0,
page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2,
reserved=0,
clock_offset=0, # TODO investigate non-0 values
)
@@ -2369,50 +2368,50 @@ class Device(CompositeEventEmitter):
self.advertising_own_address_type = None
self.advertising = False
# Create and notify of the new connection asynchronously
async def new_connection():
# Figure out which PHY we're connected with
if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
result = await asyncio.shield(
self.send_command(
HCI_LE_Read_PHY_Command(
connection_handle=connection_handle
),
check_result=True,
)
if own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
self_address = self.public_address
else:
self_address = self.random_address
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
self_address,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY),
)
self.connections[connection_handle] = connection
# If supported, read which PHY we're connected with before
# notifying listeners of the new connection.
if self.host.supports_command(HCI_LE_READ_PHY_COMMAND):
async def read_phy():
result = await self.send_command(
HCI_LE_Read_PHY_Command(connection_handle=connection_handle),
check_result=True,
)
phy = ConnectionPHY(
connection.phy = ConnectionPHY(
result.return_parameters.tx_phy, result.return_parameters.rx_phy
)
else:
phy = ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY)
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
self_address = self.random_address
if own_address_type in (
OwnAddressType.PUBLIC,
OwnAddressType.RESOLVABLE_OR_PUBLIC,
):
self_address = self.public_address
# Create a new connection
connection = Connection(
self,
connection_handle,
transport,
self_address,
peer_address,
peer_resolvable_address,
role,
connection_parameters,
phy,
)
self.connections[connection_handle] = connection
# Do so asynchronously to not block the current event handler
connection.abort_on('disconnection', read_phy())
else:
# Emit an event to notify listeners of the new connection
self.emit('connection', connection)
self.abort_on('flush', new_connection())
@host_event_handler
def on_connection_failure(self, transport, peer_address, error_code):
logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}')

View File

@@ -46,8 +46,9 @@ def load_libusb():
when usb1.USBContext is created.
'''
if libusb_path := libusb_package.get_library_path():
logger.debug(f'loading libusb library at {libusb_path}')
dll_loader = ctypes.WinDLL if platform.system() == 'Windows' else ctypes.CDLL
libusb_dll = dll_loader(libusb_path, use_errno=True, use_last_error=True)
libusb_dll = dll_loader(str(libusb_path), use_errno=True, use_last_error=True)
usb1.loadLibrary(libusb_dll)