support selecting usb device by serial number

This commit is contained in:
Gilles Boccon-Gibod
2022-05-29 16:30:05 -07:00
parent 7e8b201999
commit 86ded3fece
5 changed files with 86 additions and 33 deletions

View File

@@ -243,7 +243,7 @@ class L2CAP_Control_Frame:
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('reason', {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.map_reason(x)}),
('reason', {'size': 2, 'mapper': lambda x: L2CAP_Command_Reject.reason_name(x)}),
('data', '*')
])
class L2CAP_Command_Reject(L2CAP_Control_Frame):
@@ -262,7 +262,7 @@ class L2CAP_Command_Reject(L2CAP_Control_Frame):
}
@staticmethod
def map_reason(reason):
def reason_name(reason):
return name_or_number(L2CAP_Command_Reject.REASON_NAMES, reason)
@@ -330,7 +330,7 @@ class L2CAP_Configure_Request(L2CAP_Control_Frame):
@L2CAP_Control_Frame.subclass([
('source_cid', 2),
('flags', 2),
('result', {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.map_result(x)}),
('result', {'size': 2, 'mapper': lambda x: L2CAP_Configure_Response.result_name(x)}),
('options', '*')
])
class L2CAP_Configure_Response(L2CAP_Control_Frame):
@@ -355,7 +355,7 @@ class L2CAP_Configure_Response(L2CAP_Control_Frame):
}
@staticmethod
def map_result(result):
def result_name(result):
return name_or_number(L2CAP_Configure_Response.RESULT_NAMES, result)
@@ -403,31 +403,49 @@ class L2CAP_Echo_Response(L2CAP_Control_Frame):
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('info_type', 2)
('info_type', {'size': 2, 'mapper': lambda x: L2CAP_Information_Request.info_type_name(x)})
])
class L2CAP_Information_Request(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.10 INFORMATION REQUEST
'''
SUCCESS = 0x00
NOT_SUPPORTED = 0x01
CONNECTIONLESS_MTU = 0x0001
EXTENDED_FEATURES_SUPPORTED = 0x0002
FIXED_CHANNELS_SUPPORTED = 0x0003
INFO_TYPE_NAMES = {
CONNECTIONLESS_MTU: 'CONNECTIONLESS_MTU',
EXTENDED_FEATURES_SUPPORTED: 'EXTENDED_FEATURES_SUPPORTED',
FIXED_CHANNELS_SUPPORTED: 'FIXED_CHANNELS_SUPPORTED'
}
@staticmethod
def info_type_name(info_type):
return name_or_number(L2CAP_Information_Request.INFO_TYPE_NAMES, info_type)
# -----------------------------------------------------------------------------
@L2CAP_Control_Frame.subclass([
('info_type', 2),
('result', 2),
('info_type', {'size': 2, 'mapper': L2CAP_Information_Request.info_type_name}),
('result', {'size': 2, 'mapper': lambda x: L2CAP_Information_Response.result_name(x)}),
('data', '*')
])
class L2CAP_Information_Response(L2CAP_Control_Frame):
'''
See Bluetooth spec @ Vol 3, Part A - 4.11 INFORMATION RESPONSE
'''
SUCCESS = 0x00
NOT_SUPPORTED = 0x01
RESULT_NAMES = {
SUCCESS: 'SUCCESS',
NOT_SUPPORTED: 'NOT_SUPPORTED'
}
@staticmethod
def result_name(result):
return name_or_number(L2CAP_Information_Response.RESULT_NAMES, result)
# -----------------------------------------------------------------------------
@@ -473,7 +491,7 @@ class L2CAP_LE_Credit_Based_Connection_Request(L2CAP_Control_Frame):
('mtu', 2),
('mps', 2),
('initial_credits', 2),
('result', {'size': 2, 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.map_result(x)})
('result', {'size': 2, 'mapper': lambda x: L2CAP_LE_Credit_Based_Connection_Response.result_name(x)})
])
class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
'''
@@ -505,7 +523,7 @@ class L2CAP_LE_Credit_Based_Connection_Response(L2CAP_Control_Frame):
}
@staticmethod
def map_result(result):
def result_name(result):
return name_or_number(L2CAP_LE_Credit_Based_Connection_Response.CONNECTION_RESULT_NAMES, result)
@@ -980,13 +998,13 @@ class ChannelManager:
def on_l2cap_information_request(self, connection, cid, request):
if request.info_type == L2CAP_Information_Request.CONNECTIONLESS_MTU:
result = L2CAP_Information_Request.SUCCESS
result = L2CAP_Information_Response.SUCCESS
data = struct.pack('<H', 1024) # TODO: don't use a fixed value
elif request.info_type == L2CAP_Information_Request.EXTENDED_FEATURES_SUPPORTED:
result = L2CAP_Information_Request.SUCCESS
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('00000000') # TODO: don't use a fixed value
elif request.info_type == L2CAP_Information_Request.FIXED_CHANNELS_SUPPORTED:
result = L2CAP_Information_Request.SUCCESS
result = L2CAP_Information_Response.SUCCESS
data = bytes.fromhex('FFFFFFFFFFFFFFFF') # TODO: don't use a fixed value
else:
result = L2CAP_Information_Request.NO_SUPPORTED

View File

@@ -37,14 +37,17 @@ async def open_usb_transport(spec):
'''
Open a USB transport.
The parameter string has this syntax:
either <index> or <vendor>:<product>
either <index> or <vendor>:<product>[/<serial-number>]
With <index> as the 0-based index to select amongst all the devices that appear
to be supporting Bluetooth HCI (0 being the first one), or
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal.
Where <vendor> and <product> are the vendor ID and product ID in hexadecimal. The
/<serial-number> suffix max be specified when more than one device with the same
vendor and product identifiers are present.
Examples:
0 --> the first BT USB dongle
04b4:f901 --> the BT USB dongle with vendor=04b4 and product=f901
04b4:f901/00E04C239987 --> the BT USB dongle with vendor=04b4 and product=f901 and serial number 00E04C239987
'''
USB_RECIPIENT_DEVICE = 0x00
@@ -268,22 +271,32 @@ async def open_usb_transport(spec):
found = None
if ':' in spec:
vendor_id, product_id = spec.split(':')
found = context.getByVendorIDAndProductID(int(vendor_id, 16), int(product_id, 16), skip_on_error=True)
if '/' in product_id:
product_id, serial_number = product_id.split('/')
for device in context.getDeviceIterator(skip_on_error=True):
if (
device.getVendorID() == int(vendor_id, 16) and
device.getProductID() == int(product_id, 16) and
device.getSerialNumber() == serial_number
):
found = device
break
device.close()
else:
found = context.getByVendorIDAndProductID(int(vendor_id, 16), int(product_id, 16), skip_on_error=True)
else:
device_index = int(spec)
device_iterator = context.getDeviceIterator(skip_on_error=True)
try:
for device in device_iterator:
if device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and \
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and \
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER:
if device_index == 0:
found = device
break
device_index -= 1
device.close()
finally:
device_iterator.close()
for device in context.getDeviceIterator(skip_on_error=True):
if (
device.getDeviceClass() == USB_DEVICE_CLASS_WIRELESS_CONTROLLER and
device.getDeviceSubClass() == USB_DEVICE_SUBCLASS_RF_CONTROLLER and
device.getDeviceProtocol() == USB_DEVICE_PROTOCOL_BLUETOOTH_PRIMARY_CONTROLLER
):
if device_index == 0:
found = device
break
device_index -= 1
device.close()
if found is None:
context.close()